archivist_lib/inspect/
repository.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::identity::ComponentIdentity;
6use crate::inspect::container::{
7    InspectArtifactsContainer, InspectHandle, UnpopulatedInspectDataContainer,
8};
9use crate::pipeline::{Pipeline, StaticHierarchyAllowlist};
10use fidl::endpoints::ClientEnd;
11use fidl::{AsHandleRef, HandleBased};
12use fidl_fuchsia_diagnostics::Selector;
13use flyweights::FlyStr;
14use fuchsia_sync::{RwLock, RwLockWriteGuard};
15use log::{debug, warn};
16use std::collections::HashMap;
17use std::future::Future;
18use std::sync::{Arc, Weak};
19use {fidl_fuchsia_inspect as finspect, fuchsia_async as fasync};
20
21static INSPECT_ESCROW_NAME: zx::Name = zx::Name::new_lossy("InspectEscrowedVmo");
22
23pub struct InspectRepository {
24    inner: RwLock<InspectRepositoryInner>,
25    pipelines: Vec<Weak<Pipeline>>,
26    scope: fasync::Scope,
27}
28
29impl InspectRepository {
30    pub fn new(pipelines: Vec<Weak<Pipeline>>, scope: fasync::Scope) -> InspectRepository {
31        Self {
32            pipelines,
33            scope,
34            inner: RwLock::new(InspectRepositoryInner { diagnostics_containers: HashMap::new() }),
35        }
36    }
37
38    /// Return all the containers that contain Inspect hierarchies which contain data that should
39    /// be selected from.
40    pub fn fetch_inspect_data(
41        &self,
42        component_selectors: &Option<Vec<Selector>>,
43        static_allowlist: StaticHierarchyAllowlist,
44    ) -> Vec<UnpopulatedInspectDataContainer> {
45        self.inner.read().fetch_inspect_data(component_selectors, static_allowlist)
46    }
47
48    fn add_inspect_artifacts(
49        self: &Arc<Self>,
50        mut guard: RwLockWriteGuard<'_, InspectRepositoryInner>,
51        identity: Arc<ComponentIdentity>,
52        proxy_handle: InspectHandle,
53        remove_associated: Option<zx::Koid>,
54    ) {
55        // insert_inspect_artifact_container returns None when we were already tracking the
56        // directory for this component. If that's the case we can return early.
57        let weak_clone = Arc::downgrade(self);
58        let identity_clone = Arc::clone(&identity);
59        let Some(cleanup_task) = guard.insert_inspect_artifact_container(
60            Arc::clone(&identity),
61            proxy_handle,
62            remove_associated,
63            move |koid| {
64                if let Some(this) = weak_clone.upgrade() {
65                    this.on_handle_closed(koid, identity_clone);
66                }
67            },
68        ) else {
69            // If we got None, it means that this was already tracked and there's nothing to do.
70            return;
71        };
72
73        self.scope.spawn(cleanup_task);
74
75        // Let each pipeline know that a new component arrived, and allow the pipeline
76        // to eagerly bucket static selectors based on that component's moniker.
77        for pipeline_weak in self.pipelines.iter() {
78            if let Some(pipeline) = pipeline_weak.upgrade() {
79                pipeline.add_component(&identity.moniker).unwrap_or_else(|err| {
80                    warn!(identity:%, err:?;
81                                "Failed to add inspect artifacts to pipeline wrapper");
82                });
83            }
84        }
85    }
86
87    pub(crate) fn escrow_handle<T: Into<FlyStr>>(
88        self: &Arc<Self>,
89        component: Arc<ComponentIdentity>,
90        vmo: zx::Vmo,
91        token: finspect::EscrowToken,
92        name: Option<T>,
93        tree: Option<zx::Koid>,
94    ) {
95        debug!(identity:% = component; "Escrow inspect handle.");
96        if let Err(err) = vmo.set_name(&INSPECT_ESCROW_NAME) {
97            debug!(err:%; "Failed to set escrow vmo name");
98        }
99        let handle = InspectHandle::escrow(vmo, token, name);
100        let guard = self.inner.write();
101        self.add_inspect_artifacts(guard, Arc::clone(&component), handle, tree);
102    }
103
104    pub(crate) fn fetch_escrow(
105        self: &Arc<Self>,
106        component: Arc<ComponentIdentity>,
107        token: finspect::EscrowToken,
108        tree: Option<ClientEnd<finspect::TreeMarker>>,
109    ) -> Option<zx::Vmo> {
110        debug!(identity:% = component; "Fetch Escrowed inspect handle.");
111        let koid = token.token.as_handle_ref().get_koid().unwrap();
112        let mut guard = self.inner.write();
113        let container = guard.diagnostics_containers.get_mut(&component)?;
114        let (handle, _) = container.remove_handle(koid);
115        let handle = handle?;
116        let InspectHandle::Escrow { vmo, name, .. } = handle.as_ref() else {
117            return None;
118        };
119        if let Some(tree) = tree {
120            self.add_inspect_artifacts(
121                guard,
122                component,
123                InspectHandle::tree(tree.into_proxy(), name.clone()),
124                None,
125            );
126        }
127        Some(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
128    }
129
130    pub(crate) fn add_inspect_handle(
131        self: &Arc<Self>,
132        component: Arc<ComponentIdentity>,
133        handle: InspectHandle,
134    ) {
135        debug!(identity:% = component; "Added inspect handle.");
136        let guard = self.inner.write();
137        self.add_inspect_artifacts(guard, Arc::clone(&component), handle, None);
138    }
139
140    fn on_handle_closed(&self, koid_to_remove: zx::Koid, identity: Arc<ComponentIdentity>) {
141        // Hold the lock while we remove and update pipelines.
142        let mut guard = self.inner.write();
143
144        if let Some(container) = guard.diagnostics_containers.get_mut(&identity) {
145            if container.remove_handle(koid_to_remove).1 != 0 {
146                return;
147            }
148        }
149
150        guard.diagnostics_containers.remove(&identity);
151
152        for pipeline_weak in &self.pipelines {
153            if let Some(pipeline) = pipeline_weak.upgrade() {
154                pipeline.remove_component(&identity.moniker);
155            }
156        }
157    }
158}
159
160#[cfg(test)]
161impl InspectRepository {
162    pub(crate) fn terminate_inspect(&self, identity: Arc<ComponentIdentity>) {
163        self.inner.write().diagnostics_containers.remove(&identity);
164    }
165
166    fn has_match(&self, identity: &Arc<ComponentIdentity>) -> bool {
167        let lock = self.inner.read();
168        lock.get_diagnostics_containers().get(identity).is_some()
169    }
170
171    /// Wait for data to appear for `identity`. Will run indefinitely if no data shows up.
172    pub(crate) async fn wait_for_artifact(&self, identity: &Arc<ComponentIdentity>) {
173        loop {
174            if self.has_match(identity) {
175                return;
176            }
177
178            fasync::Timer::new(zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(
179                100,
180            )))
181            .await;
182        }
183    }
184
185    /// Wait until nothing is present for `identity`. Will run indefinitely if data persists.
186    pub(crate) async fn wait_until_gone(&self, identity: &Arc<ComponentIdentity>) {
187        loop {
188            if !self.has_match(identity) {
189                return;
190            }
191
192            fasync::Timer::new(zx::MonotonicInstant::after(zx::MonotonicDuration::from_millis(
193                100,
194            )))
195            .await;
196        }
197    }
198}
199
200struct InspectRepositoryInner {
201    /// All the diagnostics directories that we are tracking.
202    diagnostics_containers: HashMap<Arc<ComponentIdentity>, InspectArtifactsContainer>,
203}
204
205impl InspectRepositoryInner {
206    // Inserts an InspectArtifactsContainer into the data repository.
207    fn insert_inspect_artifact_container(
208        &mut self,
209        identity: Arc<ComponentIdentity>,
210        proxy_handle: InspectHandle,
211        remove_associated: Option<zx::Koid>,
212        on_closed: impl FnOnce(zx::Koid),
213    ) -> Option<impl Future<Output = ()>> {
214        let mut diag_repo_entry_opt = self.diagnostics_containers.get_mut(&identity);
215        match diag_repo_entry_opt {
216            None => {
217                let mut inspect_container = InspectArtifactsContainer::default();
218                let fut = inspect_container.push_handle(proxy_handle, on_closed);
219                self.diagnostics_containers.insert(identity, inspect_container);
220                fut
221            }
222            Some(ref mut artifacts_container) => {
223                // When we escrow a vmo handle and provide an associated tree koid, we want to
224                // ensure we atomically insert and remove. That's why we provide this optional koid
225                // here and remove it under the same lock as the insertion of the escrowed data.
226                if let Some(koid) = remove_associated {
227                    artifacts_container.remove_handle(koid);
228                }
229                artifacts_container.push_handle(proxy_handle, on_closed)
230            }
231        }
232    }
233
234    fn fetch_inspect_data(
235        &self,
236        all_dynamic_selectors: &Option<Vec<Selector>>,
237        static_allowlist: StaticHierarchyAllowlist,
238    ) -> Vec<UnpopulatedInspectDataContainer> {
239        let mut containers = vec![];
240        for (identity, container) in self.diagnostics_containers.iter() {
241            let component_allowlist = static_allowlist.get(&identity.moniker);
242
243            // This artifact contains inspect and matches a passed selector.
244            if let Some(unpopulated) =
245                container.create_unpopulated(identity, component_allowlist, all_dynamic_selectors)
246            {
247                containers.push(unpopulated);
248            }
249        }
250        containers
251    }
252}
253
254#[cfg(test)]
255impl InspectRepositoryInner {
256    pub(crate) fn get(
257        &self,
258        identity: &Arc<ComponentIdentity>,
259    ) -> Option<&InspectArtifactsContainer> {
260        self.diagnostics_containers.get(identity)
261    }
262
263    pub(crate) fn get_diagnostics_containers(
264        &self,
265    ) -> &HashMap<Arc<ComponentIdentity>, InspectArtifactsContainer> {
266        &self.diagnostics_containers
267    }
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273    use assert_matches::assert_matches;
274    use diagnostics_assertions::assert_data_tree;
275    use fidl::endpoints::Proxy;
276    use fidl::AsHandleRef;
277    use fidl_fuchsia_inspect as finspect;
278    use fuchsia_inspect::{Inspector, InspectorConfig};
279    use moniker::ExtendedMoniker;
280    use selectors::FastError;
281    use std::sync::LazyLock;
282
283    const TEST_URL: &str = "fuchsia-pkg://test";
284    static ESCROW_TEST_RIGHTS: LazyLock<zx::Rights> = LazyLock::new(|| {
285        zx::Rights::BASIC | zx::Rights::READ | zx::Rights::MAP | zx::Rights::PROPERTY
286    });
287
288    #[fuchsia::test]
289    fn inspect_repo_disallows_duplicated_handles() {
290        let _exec = fuchsia_async::LocalExecutor::new();
291        let inspect_repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
292        let moniker = ExtendedMoniker::parse_str("./a/b/foo").unwrap();
293        let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
294
295        let (proxy, _stream) = fidl::endpoints::create_proxy::<finspect::TreeMarker>();
296        let proxy_clone = proxy.clone();
297        inspect_repo
298            .add_inspect_handle(Arc::clone(&identity), InspectHandle::tree(proxy, Some("test")));
299
300        inspect_repo.add_inspect_handle(
301            Arc::clone(&identity),
302            InspectHandle::tree(proxy_clone, Some("test")),
303        );
304
305        let guard = inspect_repo.inner.read();
306        let container = guard.get(&identity).unwrap();
307        assert_eq!(container.handles().len(), 1);
308    }
309
310    #[fuchsia::test]
311    async fn repo_removes_entries_when_inspect_is_disconnected() {
312        let data_repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
313        let moniker = ExtendedMoniker::parse_str("./a/b/foo").unwrap();
314        let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
315        let (proxy, server_end) = fidl::endpoints::create_proxy::<finspect::TreeMarker>();
316        data_repo
317            .add_inspect_handle(Arc::clone(&identity), InspectHandle::tree(proxy, Some("test")));
318        assert!(data_repo.inner.read().get(&identity).is_some());
319        drop(server_end);
320        while data_repo.inner.read().get(&identity).is_some() {
321            fasync::Timer::new(fasync::MonotonicInstant::after(
322                zx::MonotonicDuration::from_millis(100_i64),
323            ))
324            .await;
325        }
326    }
327
328    #[fuchsia::test]
329    async fn related_handle_closes_when_repo_handle_is_removed() {
330        let repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
331        let identity = Arc::new(ComponentIdentity::unknown());
332        let (proxy, server_end) = fidl::endpoints::create_proxy::<finspect::TreeMarker>();
333        let koid = proxy.as_channel().as_handle_ref().get_koid().unwrap();
334        repo.add_inspect_handle(Arc::clone(&identity), InspectHandle::tree(proxy, Some("test")));
335        {
336            let mut guard = repo.inner.write();
337            let container = guard.diagnostics_containers.get_mut(&identity).unwrap();
338            container.remove_handle(koid);
339        }
340        fasync::Channel::from_channel(server_end.into_channel()).on_closed().await.unwrap();
341    }
342
343    #[fuchsia::test]
344    async fn repo_integrates_with_the_pipeline() {
345        let selector = selectors::parse_selector::<FastError>(r#"a/b/foo:root"#).unwrap();
346        let static_selectors_opt = Some(vec![selector]);
347        let pipeline = Arc::new(Pipeline::for_test(static_selectors_opt));
348        let data_repo =
349            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], fasync::Scope::new()));
350        let moniker = ExtendedMoniker::parse_str("./a/b/foo").unwrap();
351        let identity = Arc::new(ComponentIdentity::new(moniker.clone(), TEST_URL));
352        let (proxy, server_end) = fidl::endpoints::create_proxy::<finspect::TreeMarker>();
353        data_repo
354            .add_inspect_handle(Arc::clone(&identity), InspectHandle::tree(proxy, Some("test")));
355        assert!(data_repo.inner.read().get(&identity).is_some());
356        assert!(pipeline.static_hierarchy_allowlist().component_was_added(&moniker));
357
358        // When the directory disconnects, both the pipeline matchers and the repo are cleaned
359        drop(server_end);
360        while data_repo.inner.read().get(&identity).is_some() {
361            fasync::Timer::new(fasync::MonotonicInstant::after(
362                zx::MonotonicDuration::from_millis(100_i64),
363            ))
364            .await;
365        }
366
367        assert!(!pipeline.static_hierarchy_allowlist().component_was_added(&moniker));
368    }
369
370    #[fuchsia::test]
371    fn data_repo_filters_inspect_by_selectors() {
372        let _exec = fuchsia_async::LocalExecutor::new();
373        let data_repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
374        let moniker = ExtendedMoniker::parse_str("./a/b/foo").unwrap();
375        let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
376
377        let (proxy, _server_end) = fidl::endpoints::create_proxy::<finspect::TreeMarker>();
378        data_repo
379            .add_inspect_handle(Arc::clone(&identity), InspectHandle::tree(proxy, Some("test")));
380
381        let moniker2 = ExtendedMoniker::parse_str("./a/b/foo2").unwrap();
382        let identity2 = Arc::new(ComponentIdentity::new(moniker2, TEST_URL));
383        let (proxy, _server_end) = fidl::endpoints::create_proxy::<finspect::TreeMarker>();
384        data_repo
385            .add_inspect_handle(Arc::clone(&identity2), InspectHandle::tree(proxy, Some("test")));
386
387        assert_eq!(
388            2,
389            data_repo
390                .inner
391                .read()
392                .fetch_inspect_data(&None, StaticHierarchyAllowlist::new_disabled())
393                .len()
394        );
395
396        let selectors = Some(vec![
397            selectors::parse_selector::<FastError>("a/b/foo:root").expect("parse selector")
398        ]);
399        assert_eq!(
400            1,
401            data_repo
402                .inner
403                .read()
404                .fetch_inspect_data(&selectors, StaticHierarchyAllowlist::new_disabled())
405                .len()
406        );
407
408        let selectors = Some(vec![
409            selectors::parse_selector::<FastError>("a/b/f*:root").expect("parse selector")
410        ]);
411        assert_eq!(
412            2,
413            data_repo
414                .inner
415                .read()
416                .fetch_inspect_data(&selectors, StaticHierarchyAllowlist::new_disabled())
417                .len()
418        );
419
420        let selectors =
421            Some(vec![selectors::parse_selector::<FastError>("foo:root").expect("parse selector")]);
422        assert_eq!(
423            0,
424            data_repo
425                .inner
426                .read()
427                .fetch_inspect_data(&selectors, StaticHierarchyAllowlist::new_disabled())
428                .len()
429        );
430    }
431
432    #[fuchsia::test]
433    async fn repo_removes_escrowed_data_on_token_peer_closed() {
434        let repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
435        let moniker = ExtendedMoniker::parse_str("a/b/foo").unwrap();
436        let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
437
438        let inspector = Inspector::default();
439        let (ep0, ep1) = zx::EventPair::create();
440        repo.escrow_handle(
441            Arc::clone(&identity),
442            inspector.duplicate_vmo_with_rights(*ESCROW_TEST_RIGHTS).unwrap(),
443            finspect::EscrowToken { token: ep1 },
444            Some("escrow"),
445            None,
446        );
447        {
448            let guard = repo.inner.read();
449            let container = guard.get(&identity);
450            assert!(container.is_some());
451            let mut handles = container.unwrap().handles();
452            assert_eq!(handles.len(), 1);
453            assert_matches!(handles.next().unwrap(), InspectHandle::Escrow { .. });
454        }
455        drop(ep0);
456        while repo.inner.read().get(&identity).is_some() {
457            fasync::Timer::new(fasync::MonotonicInstant::after(
458                zx::MonotonicDuration::from_millis(100_i64),
459            ))
460            .await;
461        }
462    }
463
464    #[fuchsia::test]
465    async fn repo_overwrites_tree_on_escrow() {
466        let repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
467        let moniker = ExtendedMoniker::parse_str("a/b/foo").unwrap();
468        let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
469        let (proxy, server_end) = fidl::endpoints::create_proxy::<finspect::TreeMarker>();
470        let koid = proxy.as_channel().as_handle_ref().get_koid().unwrap();
471
472        repo.add_inspect_handle(Arc::clone(&identity), InspectHandle::tree(proxy, Some("tree")));
473        {
474            let guard = repo.inner.read();
475            let mut handles = guard.get(&identity).unwrap().handles();
476            assert_eq!(handles.len(), 1);
477            assert_matches!(handles.next().unwrap(), InspectHandle::Tree { .. });
478        }
479
480        let inspector = Inspector::default();
481        let (_ep0, ep1) = zx::EventPair::create();
482        repo.escrow_handle(
483            Arc::clone(&identity),
484            inspector.duplicate_vmo_with_rights(*ESCROW_TEST_RIGHTS).unwrap(),
485            finspect::EscrowToken { token: ep1 },
486            Some("escrow"),
487            Some(koid),
488        );
489        {
490            let guard = repo.inner.read();
491            let mut handles = guard.get(&identity).unwrap().handles();
492            assert_eq!(handles.len(), 1);
493            assert_matches!(handles.next().unwrap(), InspectHandle::Escrow { .. });
494        }
495        fasync::Channel::from_channel(server_end.into_channel()).on_closed().await.unwrap();
496    }
497
498    #[fuchsia::test]
499    fn repo_fetch_escrow_removes_data() {
500        let _exec = fuchsia_async::LocalExecutor::new();
501        let repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
502        let moniker = ExtendedMoniker::parse_str("a/b/foo").unwrap();
503        let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
504
505        let inspector = Inspector::default();
506        inspector.root().record_int("foo", 3);
507        let (ep0, ep1) = zx::EventPair::create();
508        repo.escrow_handle(
509            Arc::clone(&identity),
510            inspector.duplicate_vmo_with_rights(*ESCROW_TEST_RIGHTS).unwrap(),
511            finspect::EscrowToken { token: ep1 },
512            Some("escrow"),
513            None,
514        );
515        {
516            let guard = repo.inner.read();
517            let container = guard.get(&identity);
518            assert!(container.is_some());
519            assert_eq!(container.unwrap().handles().len(), 1);
520        }
521
522        let vmo =
523            repo.fetch_escrow(Arc::clone(&identity), finspect::EscrowToken { token: ep0 }, None);
524        assert!(vmo.is_some());
525        let vmo = vmo.unwrap();
526        assert_eq!(vmo.get_name().unwrap(), INSPECT_ESCROW_NAME);
527        let inspector_loaded = Inspector::new(InspectorConfig::default().vmo(vmo));
528        assert_data_tree!(inspector_loaded, root: {
529            foo: 3i64,
530        });
531
532        let guard = repo.inner.read();
533        let container = guard.get(&identity);
534        assert!(container.is_some());
535        assert_eq!(container.unwrap().handles().len(), 0);
536    }
537
538    #[fuchsia::test]
539    fn repo_fetch_escrow_with_tree_returns_data_keeps_tree() {
540        let _exec = fuchsia_async::LocalExecutor::new();
541        let repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
542        let moniker = ExtendedMoniker::parse_str("a/b/foo").unwrap();
543        let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
544
545        let inspector = Inspector::default();
546        inspector.root().record_int("foo", 3);
547        let (ep0, ep1) = zx::EventPair::create();
548        repo.escrow_handle(
549            Arc::clone(&identity),
550            inspector.duplicate_vmo_with_rights(*ESCROW_TEST_RIGHTS).unwrap(),
551            finspect::EscrowToken { token: ep1 },
552            Some("escrow"),
553            None,
554        );
555        {
556            let guard = repo.inner.read();
557            let container = guard.get(&identity);
558            assert!(container.is_some());
559            let mut handles = container.unwrap().handles();
560            assert_eq!(handles.len(), 1);
561            assert_matches!(handles.next().unwrap(), InspectHandle::Escrow { .. });
562        }
563
564        let (client_end, server_end) = fidl::endpoints::create_endpoints::<finspect::TreeMarker>();
565        let vmo = repo.fetch_escrow(
566            Arc::clone(&identity),
567            finspect::EscrowToken { token: ep0 },
568            Some(client_end),
569        );
570        assert!(vmo.is_some());
571        {
572            let guard = repo.inner.read();
573            let mut handles = guard.get(&identity).unwrap().handles();
574            assert_eq!(handles.len(), 1);
575            assert_matches!(handles.next().unwrap(), InspectHandle::Tree { .. });
576        }
577        assert!(!fasync::Channel::from_channel(server_end.into_channel()).is_closed());
578    }
579}