package_directory/
root_dir_cache.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_inspect as finspect;
6use futures::FutureExt as _;
7use futures::future::BoxFuture;
8use std::collections::HashMap;
9use std::sync::{Arc, Weak};
10
11/// `RootDirCache` is a cache of `Arc<RootDir>`s indexed by their hash.
12///
13/// The cache internally stores `Weak<RootDir>`s and installs a custom `dropper` in its managed
14/// `RootDir`s that removes the corresponding entry when dropped, so it is a cache of
15/// `Arc<RootDir>`s that are actively in use by its clients. This is useful for deduplicating
16/// the `Arc<RootDir>`s used by VFS to serve package directory connections while also keeping
17/// track of which connections are open.
18///
19/// Because of how `RootDir`` is implemented, a package will have alive `Arc`s if there are:
20///   1. fuchsia.io.Directory connections to the package's root directory or any sub directory.
21///   2. fuchsia.io.File connections to the package's files under meta/.
22///   3. fuchsia.io.File connections to the package's content blobs (files not under meta/) *iff*
23///      the `crate::NonMetaStorage` impl serves the connections itself (instead of
24///      forwarding to a remote server).
25///
26/// The `NonMetaStorage` impl we use for Fxblob does serve the File connections itself.
27/// The impl we use for Blobfs does not, but Blobfs will wait to delete blobs that have
28/// open connections until the last connection closes.
29/// Similarly, both Blobfs and Fxblob will wait to delete blobs until the last VMO is closed (VMOs
30/// obtained from fuchsia.io.File.GetBackingMemory will not keep a package alive), so it is safe to
31/// delete packages that RootDirCache says are not open.
32///
33/// Clients close connections to packages by closing their end of the Zircon channel over which the
34/// fuchsia.io.[File|Directory] messages were being sent. Some time after the client end of the
35/// channel is closed, the server (usually in a different process) will be notified by the kernel,
36/// and the task serving the connection will finish, dropping its `Arc<RootDir>`.
37/// When the last `Arc` is dropped, the strong count of the corresponding `std::sync::Weak` in
38/// the `RootDirCache` will decrement to zero. At this point the `RootDirCache`
39/// will no longer report the package as open.
40/// All this is to say that there will be some delay between a package no longer being in use and
41/// clients of `RootDirCache` finding out about that.
42#[derive(Debug, Clone)]
43pub struct RootDirCache<S> {
44    non_meta_storage: S,
45    dirs: Arc<std::sync::Mutex<HashMap<fuchsia_hash::Hash, Weak<crate::RootDir<S>>>>>,
46}
47
48impl<S: crate::NonMetaStorage + Clone> RootDirCache<S> {
49    /// Creates a `RootDirCache` that uses `non_meta_storage` as the backing for the
50    /// internally managed `crate::RootDir`s.
51    pub fn new(non_meta_storage: S) -> Self {
52        let dirs = Arc::new(std::sync::Mutex::new(HashMap::new()));
53        Self { non_meta_storage, dirs }
54    }
55
56    /// Returns an `Arc<RootDir>` corresponding to `hash`.
57    /// If there is not already one in the cache, `root_dir` will be used if provided, otherwise
58    /// a new one will be created using the `non_meta_storage` provided to `Self::new`.
59    ///
60    /// If provided, `root_dir` must be backed by the same `NonMetaStorage` that `Self::new` was
61    /// called with. The provided `root_dir` must not have a dropper set.
62    pub async fn get_or_insert(
63        &self,
64        hash: fuchsia_hash::Hash,
65        root_dir: Option<crate::RootDir<S>>,
66    ) -> Result<Arc<crate::RootDir<S>>, crate::Error> {
67        Ok(if let Some(root_dir) = self.get(&hash) {
68            root_dir
69        } else {
70            // If this is dropped while the dirs lock is held it will deadlock.
71            let dropper = Box::new(Dropper { dirs: Arc::downgrade(&self.dirs), hash });
72            let new_root_dir = match root_dir {
73                Some(mut root_dir) => match root_dir.set_dropper(dropper) {
74                    Ok(()) => Arc::new(root_dir),
75                    // Okay to drop the dropper, the lock is not held and the drop impl handles
76                    // missing entries.
77                    Err(_) => {
78                        return Err(crate::Error::DropperAlreadySet);
79                    }
80                },
81                None => {
82                    // Do this without the lock held because:
83                    // 1. Making a RootDir takes ~100 μs (reading the meta.far)
84                    // 2. If new_with_dropper errors it will drop the dropper, which would deadlock
85                    // 3. If any other async task runs on this thread and drops a RootDir (e.g. b/c
86                    //    a client closed a connection) it will deadlock.
87                    crate::RootDir::new_with_dropper(self.non_meta_storage.clone(), hash, dropper)
88                        .await?
89                }
90            };
91            use std::collections::hash_map::Entry::*;
92            match self.dirs.lock().expect("poisoned mutex").entry(hash) {
93                // Raced with another call to serve.
94                Occupied(mut o) => {
95                    let old_root_dir = o.get_mut();
96                    if let Some(old_root_dir) = old_root_dir.upgrade() {
97                        old_root_dir
98                    } else {
99                        *old_root_dir = Arc::downgrade(&new_root_dir);
100                        new_root_dir
101                    }
102                }
103                Vacant(v) => {
104                    v.insert(Arc::downgrade(&new_root_dir));
105                    new_root_dir
106                }
107            }
108        })
109    }
110
111    /// Returns the `Arc<RootDir>` with the given `hash`, if one exists in the cache.
112    /// Otherwise returns `None`.
113    /// Holding on to the returned `Arc` will keep the package open (as reported by
114    /// `Self::list`).
115    pub fn get(&self, hash: &fuchsia_hash::Hash) -> Option<Arc<crate::RootDir<S>>> {
116        self.dirs.lock().expect("poisoned mutex").get(hash)?.upgrade()
117    }
118
119    /// Packages with live `Arc<RootDir>`s.
120    /// Holding on to the returned `Arc`s will keep the packages open.
121    pub fn list(&self) -> Vec<Arc<crate::RootDir<S>>> {
122        self.dirs.lock().expect("poisoned mutex").iter().filter_map(|(_, v)| v.upgrade()).collect()
123    }
124
125    /// Returns a callback to be given to `fuchsia_inspect::Node::record_lazy_child`.
126    /// Records the package hashes and their corresponding `Arc<RootDir>` strong counts.
127    pub fn record_lazy_inspect(
128        &self,
129    ) -> impl Fn() -> BoxFuture<'static, Result<finspect::Inspector, anyhow::Error>>
130    + Send
131    + Sync
132    + 'static {
133        let dirs = Arc::downgrade(&self.dirs);
134        move || {
135            let dirs = dirs.clone();
136            async move {
137                let inspector = finspect::Inspector::default();
138                if let Some(dirs) = dirs.upgrade() {
139                    let package_counts: HashMap<_, _> = {
140                        let dirs = dirs.lock().expect("poisoned mutex");
141                        dirs.iter().map(|(k, v)| (*k, v.strong_count() as u64)).collect()
142                    };
143                    let root = inspector.root();
144                    let () = package_counts.into_iter().for_each(|(pkg, count)| {
145                        root.record_child(pkg.to_string(), |n| n.record_uint("instances", count))
146                    });
147                }
148                Ok(inspector)
149            }
150            .boxed()
151        }
152    }
153}
154
155/// Removes the corresponding entry from RootDirCache's self.dirs when dropped.
156struct Dropper<S> {
157    dirs: Weak<std::sync::Mutex<HashMap<fuchsia_hash::Hash, Weak<crate::RootDir<S>>>>>,
158    hash: fuchsia_hash::Hash,
159}
160
161impl<S> Drop for Dropper<S> {
162    fn drop(&mut self) {
163        let Some(dirs) = self.dirs.upgrade() else {
164            return;
165        };
166        use std::collections::hash_map::Entry::*;
167        match dirs.lock().expect("poisoned mutex").entry(self.hash) {
168            Occupied(o) => {
169                // In case this raced with a call to serve that added a new one.
170                if o.get().strong_count() == 0 {
171                    o.remove_entry();
172                }
173            }
174            // Never added because creation failed.
175            Vacant(_) => (),
176        };
177    }
178}
179
180impl<S> std::fmt::Debug for Dropper<S> {
181    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182        f.debug_struct("Dropper").field("dirs", &self.dirs).field("hash", &self.hash).finish()
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189    use assert_matches::assert_matches;
190    use diagnostics_assertions::assert_data_tree;
191    use fidl_fuchsia_io as fio;
192    use fuchsia_pkg_testing::PackageBuilder;
193    use fuchsia_pkg_testing::blobfs::Fake as FakeBlobfs;
194
195    #[fuchsia::test]
196    async fn get_or_insert_new_entry() {
197        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
198        let (metafar_blob, _) = pkg.contents();
199        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
200        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
201        let server = RootDirCache::new(blobfs_client);
202
203        let dir = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();
204
205        assert_eq!(server.list().len(), 1);
206
207        drop(dir);
208        assert_eq!(server.list().len(), 0);
209        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
210    }
211
212    #[fuchsia::test]
213    async fn closing_package_connection_closes_package() {
214        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
215        let (metafar_blob, _) = pkg.contents();
216        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
217        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
218        let server = RootDirCache::new(blobfs_client);
219
220        let dir = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();
221        let (proxy, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
222        let scope = vfs::execution_scope::ExecutionScope::new();
223        vfs::directory::serve_on(dir, fio::PERM_READABLE, scope.clone(), server_end);
224        let _ = proxy
225            .get_attributes(Default::default())
226            .await
227            .expect("directory succesfully handling requests");
228        assert_eq!(server.list().len(), 1);
229
230        drop(proxy);
231        let () = scope.wait().await;
232        assert_eq!(server.list().len(), 0);
233        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
234    }
235
236    #[fuchsia::test]
237    async fn get_or_insert_existing_entry() {
238        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
239        let (metafar_blob, _) = pkg.contents();
240        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
241        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
242        let server = RootDirCache::new(blobfs_client);
243
244        let dir0 = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();
245
246        let dir1 = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();
247        assert_eq!(server.list().len(), 1);
248        assert_eq!(Arc::strong_count(&server.list()[0]), 3);
249
250        drop(dir0);
251        drop(dir1);
252        assert_eq!(server.list().len(), 0);
253        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
254    }
255
256    #[fuchsia::test]
257    async fn get_or_insert_provided_root_dir() {
258        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
259        let (metafar_blob, _) = pkg.contents();
260        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
261        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
262        let root_dir = crate::RootDir::new_raw(blobfs_client.clone(), metafar_blob.merkle, None)
263            .await
264            .unwrap();
265        blobfs_fake.delete_blob(metafar_blob.merkle);
266        let server = RootDirCache::new(blobfs_client);
267
268        let dir = server.get_or_insert(metafar_blob.merkle, Some(root_dir)).await.unwrap();
269        assert_eq!(server.list().len(), 1);
270
271        drop(dir);
272        assert_eq!(server.list().len(), 0);
273        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
274    }
275
276    #[fuchsia::test]
277    async fn get_or_insert_provided_root_dir_error_if_already_has_dropper() {
278        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
279        let (metafar_blob, _) = pkg.contents();
280        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
281        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
282        let root_dir =
283            crate::RootDir::new_raw(blobfs_client.clone(), metafar_blob.merkle, Some(Box::new(())))
284                .await
285                .unwrap();
286        let server = RootDirCache::new(blobfs_client);
287
288        assert_matches!(
289            server.get_or_insert(metafar_blob.merkle, Some(root_dir)).await,
290            Err(crate::Error::DropperAlreadySet)
291        );
292        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
293    }
294
295    #[fuchsia::test]
296    async fn get_or_insert_fails_if_root_dir_creation_fails() {
297        let (_blobfs_fake, blobfs_client) = FakeBlobfs::new();
298        let server = RootDirCache::new(blobfs_client);
299
300        assert_matches!(
301            server.get_or_insert([0; 32].into(), None).await,
302            Err(crate::Error::MissingMetaFar)
303        );
304        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
305    }
306
307    #[fuchsia::test]
308    async fn get_or_insert_concurrent_race_to_insert_new_root_dir() {
309        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
310        let (metafar_blob, _) = pkg.contents();
311        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
312        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
313        let server = RootDirCache::new(blobfs_client);
314
315        let fut0 = server.get_or_insert(metafar_blob.merkle, None);
316
317        let fut1 = server.get_or_insert(metafar_blob.merkle, None);
318
319        let (res0, res1) = futures::future::join(fut0, fut1).await;
320        let (dir0, dir1) = (res0.unwrap(), res1.unwrap());
321
322        assert_eq!(server.list().len(), 1);
323        assert_eq!(Arc::strong_count(&server.list()[0]), 3);
324
325        drop(dir0);
326        drop(dir1);
327        assert_eq!(server.list().len(), 0);
328        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
329    }
330
331    #[fuchsia::test]
332    async fn inspect() {
333        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
334        let (metafar_blob, _) = pkg.contents();
335        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
336        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
337        let server = RootDirCache::new(blobfs_client);
338        let _dir = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();
339
340        let inspector = finspect::Inspector::default();
341        inspector.root().record_lazy_child("open-packages", server.record_lazy_inspect());
342
343        assert_data_tree!(inspector, root: {
344            "open-packages": {
345                pkg.hash().to_string() => {
346                    "instances": 1u64,
347                },
348            }
349        });
350    }
351}