archivist_lib/inspect/servers/
inspect_sink.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::inspect::container::InspectHandle;
9use crate::inspect::repository::InspectRepository;
10use anyhow::Error;
11use fidl::endpoints::{ControlHandle, Responder};
12use futures::StreamExt;
13use log::{debug, warn};
14use std::sync::Arc;
15use {fidl_fuchsia_inspect as finspect, fuchsia_async as fasync};
16
17pub struct InspectSinkServer {
18    /// Shared repository holding the Inspect handles.
19    repo: Arc<InspectRepository>,
20
21    /// Scope holding all tasks associated with this server.
22    scope: fasync::Scope,
23}
24
25impl InspectSinkServer {
26    /// Construct a server.
27    pub fn new(repo: Arc<InspectRepository>, scope: fasync::Scope) -> Self {
28        Self { repo, scope }
29    }
30
31    /// Handle incoming events. Mainly for use in EventConsumer impl.
32    fn spawn(&self, component: Arc<ComponentIdentity>, stream: finspect::InspectSinkRequestStream) {
33        let repo = Arc::clone(&self.repo);
34        self.scope.spawn(async move {
35            if let Err(e) = Self::handle_requests(repo, component, stream).await {
36                warn!("error handling InspectSink requests: {e}");
37            }
38        });
39    }
40
41    async fn handle_requests(
42        repo: Arc<InspectRepository>,
43        component: Arc<ComponentIdentity>,
44        mut stream: finspect::InspectSinkRequestStream,
45    ) -> Result<(), Error> {
46        while let Some(Ok(request)) = stream.next().await {
47            match request {
48                finspect::InspectSinkRequest::Publish {
49                    payload: finspect::InspectSinkPublishRequest { tree: Some(tree), name, .. },
50                    ..
51                } => repo.add_inspect_handle(
52                    Arc::clone(&component),
53                    InspectHandle::tree(tree.into_proxy(), name),
54                ),
55                finspect::InspectSinkRequest::Publish {
56                    payload: finspect::InspectSinkPublishRequest { tree: None, name, .. },
57                    control_handle,
58                } => {
59                    warn!(name:?, component:%; "InspectSink/Publish without a tree");
60                    control_handle.shutdown();
61                }
62                finspect::InspectSinkRequest::Escrow {
63                    payload:
64                        finspect::InspectSinkEscrowRequest {
65                            vmo: Some(vmo),
66                            name,
67                            token: Some(token),
68                            tree,
69                            ..
70                        },
71                    ..
72                } => {
73                    repo.escrow_handle(
74                        Arc::clone(&component),
75                        vmo,
76                        token,
77                        name,
78                        tree.map(zx::Koid::from_raw),
79                    );
80                }
81                finspect::InspectSinkRequest::Escrow {
82                    control_handle,
83                    payload: finspect::InspectSinkEscrowRequest { vmo, token, .. },
84                } => {
85                    warn!(
86                        component:%,
87                        has_vmo = vmo.is_some(),
88                        has_token = token.is_some();
89                        "Attempted to escrow inspect without required data"
90                    );
91                    control_handle.shutdown();
92                }
93                finspect::InspectSinkRequest::FetchEscrow {
94                    responder,
95                    payload:
96                        finspect::InspectSinkFetchEscrowRequest { tree, token: Some(token), .. },
97                } => {
98                    let vmo = repo.fetch_escrow(Arc::clone(&component), token, tree);
99                    if let Err(err) = responder.send(finspect::InspectSinkFetchEscrowResponse {
100                        vmo,
101                        ..Default::default()
102                    }) {
103                        debug!("Failed sending FetchEscrowResponse: {err:?}");
104                    }
105                }
106                finspect::InspectSinkRequest::FetchEscrow {
107                    responder,
108                    payload: finspect::InspectSinkFetchEscrowRequest { token: None, .. },
109                } => {
110                    warn!(component:%; "Attempted to fetch escrowed inspect with invalid data");
111                    responder.control_handle().shutdown();
112                }
113                finspect::InspectSinkRequest::_UnknownMethod {
114                    ordinal,
115                    control_handle,
116                    method_type,
117                    ..
118                } => {
119                    warn!(ordinal, method_type:?; "Received unknown request for InspectSink");
120                    // Close the connection if we receive an unknown interaction.
121                    control_handle.shutdown();
122                }
123            }
124        }
125
126        Ok(())
127    }
128}
129
130impl EventConsumer for InspectSinkServer {
131    fn handle(self: Arc<Self>, event: Event) {
132        match event.payload {
133            EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
134                component,
135                request_stream,
136            }) => {
137                self.spawn(component, request_stream);
138            }
139            _ => unreachable!("InspectSinkServer is only subscribed to InspectSinkRequested"),
140        }
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use crate::events::router::EventConsumer;
148    use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
149    use crate::identity::ComponentIdentity;
150    use crate::inspect::container::InspectHandle;
151    use crate::inspect::repository::InspectRepository;
152    use crate::pipeline::StaticHierarchyAllowlist;
153    use assert_matches::assert_matches;
154    use diagnostics_assertions::assert_json_diff;
155    use fidl::endpoints::{ClientEnd, create_proxy_and_stream};
156    use fidl_fuchsia_diagnostics as fdiagnostics;
157    use fidl_fuchsia_inspect::{
158        InspectSinkMarker, InspectSinkProxy, InspectSinkPublishRequest, TreeMarker,
159    };
160    use fuchsia_inspect::reader::read;
161    use fuchsia_inspect::{Inspector, InspectorConfig};
162    use futures::Future;
163    use inspect_runtime::TreeServerSendPreference;
164    use inspect_runtime::service::spawn_tree_server_with_stream;
165    use selectors::VerboseError;
166    use std::collections::HashMap;
167    use std::sync::Arc;
168    use zx::{self as zx, AsHandleRef};
169
170    struct TestHarness {
171        /// The underlying repository.
172        repo: Arc<InspectRepository>,
173
174        /// Component-specific state.
175        components: HashMap<Arc<ComponentIdentity>, TestComponent>,
176
177        /// The server that would be held by the Archivist.
178        _server: Arc<InspectSinkServer>,
179
180        /// Scope running InspectSinkServer.
181        scope: Option<fasync::Scope>,
182    }
183
184    struct TestComponent {
185        proxy: Option<InspectSinkProxy>,
186        /// Scope running Tree server(s).
187        scope: Option<fasync::Scope>,
188    }
189
190    impl TestHarness {
191        /// Construct an InspectSinkServer with a ComponentIdentity/InspectSinkProxy pair
192        /// for each input ComponentIdentity.
193        fn new(identity: Vec<Arc<ComponentIdentity>>) -> Self {
194            let repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
195            let scope = fasync::Scope::new();
196            let server = Arc::new(InspectSinkServer::new(Arc::clone(&repo), scope.new_child()));
197
198            let components = identity
199                .into_iter()
200                .map(|id| {
201                    let (proxy, request_stream) = create_proxy_and_stream::<InspectSinkMarker>();
202
203                    Arc::clone(&server).handle(Event {
204                        timestamp: zx::BootInstant::get(),
205                        payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
206                            component: Arc::clone(&id),
207                            request_stream,
208                        }),
209                    });
210
211                    (id, TestComponent { proxy: Some(proxy), scope: Some(fasync::Scope::new()) })
212                })
213                .collect();
214
215            Self { repo, _server: server, scope: Some(scope), components }
216        }
217
218        /// Publish `tree` via the proxy associated with `component`.
219        fn publish(
220            &mut self,
221            id: &Arc<ComponentIdentity>,
222            tree: ClientEnd<TreeMarker>,
223        ) -> zx::Koid {
224            let koid = tree.as_handle_ref().get_koid().unwrap();
225            let component = self.components.get(id).expect("unknown component");
226            let proxy = component.proxy.as_ref().expect("InspectSink proxy stopped");
227            proxy
228                .publish(InspectSinkPublishRequest { tree: Some(tree), ..Default::default() })
229                .unwrap();
230            koid
231        }
232
233        /// Start a TreeProxy server and return the proxy.
234        fn serve(
235            &mut self,
236            component: &Arc<ComponentIdentity>,
237            inspector: Inspector,
238            settings: TreeServerSendPreference,
239        ) -> ClientEnd<TreeMarker> {
240            let (tree, stream) = fidl::endpoints::create_request_stream::<TreeMarker>();
241            self.serve_stream(component, inspector, settings, stream);
242            tree
243        }
244
245        /// Start a TreeProxy server over the provided ServerEnd.
246        fn serve_stream(
247            &mut self,
248            component: &Arc<ComponentIdentity>,
249            inspector: Inspector,
250            settings: TreeServerSendPreference,
251            stream: finspect::TreeRequestStream,
252        ) {
253            let component = self.components.get_mut(component).expect("unknown component");
254            let scope = component.scope.as_ref().expect("already dropped tree server");
255            spawn_tree_server_with_stream(inspector, settings, stream, scope);
256        }
257
258        /// Drop the server(s) associated with `component`, as initialized by `serve`.
259        async fn drop_tree_servers(&mut self, component: &Arc<ComponentIdentity>) {
260            let component = self.components.get_mut(component).expect("unknown component");
261            let scope = component.scope.take().expect("tree server(s) already dropped");
262            scope.cancel().await;
263        }
264
265        /// Execute closure `assertions` on the `InspectArtifactsContainer` associated with
266        /// `identity`.
267        ///
268        /// This function will wait for data to be available in `self.repo`, and therefore
269        /// might hang indefinitely if the data never appears. This is not a problem since
270        /// it is a unit test and `fx test` has timeouts available.
271        async fn assert<const N: usize, F, Fut>(
272            &self,
273            identity: &Arc<ComponentIdentity>,
274            koids: [zx::Koid; N],
275            assertions: F,
276        ) where
277            F: FnOnce([Arc<InspectHandle>; N]) -> Fut,
278            Fut: Future<Output = ()>,
279        {
280            self.assert_on_selector(
281                identity,
282                koids,
283                assertions,
284                selectors::parse_selector::<VerboseError>(&format!("{identity}:root"))
285                    .expect("parse selector"),
286            )
287            .await
288        }
289
290        async fn assert_on_selector<const N: usize, F, Fut>(
291            &self,
292            identity: &Arc<ComponentIdentity>,
293            koids: [zx::Koid; N],
294            assertions: F,
295            selector: fdiagnostics::Selector,
296        ) where
297            F: FnOnce([Arc<InspectHandle>; N]) -> Fut,
298            Fut: Future<Output = ()>,
299        {
300            self.repo.wait_for_artifact(identity).await;
301            let containers = self.repo.fetch_inspect_data(
302                &Some(vec![selector]),
303                StaticHierarchyAllowlist::new_disabled(),
304            );
305            assert_eq!(containers.len(), 1);
306            assertions(
307                koids
308                    .iter()
309                    .map(|koid| {
310                        containers[0]
311                            .inspect_handles
312                            .iter()
313                            .filter_map(|h| h.upgrade())
314                            .find(|handle| handle.koid() == *koid)
315                            .unwrap()
316                    })
317                    .collect::<Vec<_>>()
318                    .try_into()
319                    .unwrap(),
320            )
321            .await;
322        }
323
324        /// Drops all published proxies, stops the InspectSink server, and waits for it to complete.
325        async fn stop_all(&mut self) {
326            for (_, component) in self.components.iter_mut() {
327                component.proxy = None;
328            }
329            self.scope.take().unwrap().close().await;
330        }
331    }
332
333    #[fuchsia::test]
334    async fn connect() {
335        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
336
337        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
338
339        let insp = Inspector::default();
340        insp.root().record_int("int", 0);
341        let tree = test.serve(&identity, insp, TreeServerSendPreference::default());
342        let koid = test.publish(&identity, tree);
343
344        test.assert(&identity, [koid], |handles| async move {
345            assert_matches!(
346                handles[0].as_ref(),
347                InspectHandle::Tree { proxy: tree, .. } => {
348                   let hierarchy = read(tree).await.unwrap();
349                   assert_json_diff!(hierarchy, root: {
350                       int: 0i64,
351                   });
352            });
353        })
354        .await;
355    }
356
357    #[fuchsia::test]
358    async fn publish_multiple_times_on_the_same_connection() {
359        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
360
361        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
362
363        let insp = Inspector::default();
364        insp.root().record_int("int", 0);
365        let tree = test.serve(&identity, insp, TreeServerSendPreference::default());
366
367        let other_insp = Inspector::default();
368        other_insp.root().record_double("double", 1.24);
369        let other_tree = test.serve(&identity, other_insp, TreeServerSendPreference::default());
370
371        let koid0 = test.publish(&identity, tree);
372        let koid1 = test.publish(&identity, other_tree);
373
374        test.assert(&identity, [koid0, koid1], |handles| async move {
375            assert_matches!(
376                handles[0].as_ref(),
377                InspectHandle::Tree { proxy: tree, ..} => {
378                   let hierarchy = read(tree).await.unwrap();
379                   assert_json_diff!(hierarchy, root: {
380                       int: 0i64,
381                   });
382            });
383
384            assert_matches!(
385                handles[1].as_ref(),
386                InspectHandle::Tree { proxy: tree, .. } => {
387                   let hierarchy = read(tree).await.unwrap();
388                   assert_json_diff!(hierarchy, root: {
389                       double: 1.24,
390                   });
391            });
392        })
393        .await;
394    }
395
396    #[fuchsia::test]
397    async fn tree_remains_after_inspect_sink_disconnects() {
398        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
399
400        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
401
402        let insp = Inspector::default();
403        insp.root().record_int("int", 0);
404        let tree = test.serve(&identity, insp, TreeServerSendPreference::default());
405        let koid = test.publish(&identity, tree);
406
407        test.assert(&identity, [koid], |handles| async move {
408            assert_matches!(
409                handles[0].as_ref(),
410                InspectHandle::Tree { proxy: tree, .. } => {
411                   let hierarchy = read(tree).await.unwrap();
412                   assert_json_diff!(hierarchy, root: {
413                       int: 0i64,
414                   });
415            });
416        })
417        .await;
418
419        test.stop_all().await;
420
421        // the data must remain present as long as the tree server started above is alive
422        test.assert(&identity, [koid], |handles| async move {
423            assert_matches!(
424                handles[0].as_ref(),
425                InspectHandle::Tree { proxy: tree, ..} => {
426                   let hierarchy = read(tree).await.unwrap();
427                   assert_json_diff!(hierarchy, root: {
428                       int: 0i64,
429                   });
430            });
431        })
432        .await;
433    }
434
435    #[fuchsia::test]
436    async fn connect_with_multiple_proxies() {
437        let identities: Vec<Arc<ComponentIdentity>> = vec![
438            Arc::new(vec!["a", "b", "foo.cm"].into()),
439            Arc::new(vec!["a", "b", "foo2.cm"].into()),
440        ];
441
442        let mut test = TestHarness::new(identities.clone());
443
444        let insp = Inspector::default();
445        insp.root().record_int("int", 0);
446        let tree = test.serve(&identities[0], insp, TreeServerSendPreference::default());
447
448        let insp2 = Inspector::default();
449        insp2.root().record_bool("is_insp2", true);
450        let tree2 = test.serve(&identities[1], insp2, TreeServerSendPreference::default());
451
452        let koid_component_0 = test.publish(&identities[0], tree);
453        let koid_component_1 = test.publish(&identities[1], tree2);
454
455        test.assert(&identities[0], [koid_component_0], |handles| async move {
456            assert_matches!(
457                handles[0].as_ref(),
458                InspectHandle::Tree { proxy: tree, .. } => {
459                   let hierarchy = read(tree).await.unwrap();
460                   assert_json_diff!(hierarchy, root: {
461                       int: 0i64,
462                   });
463            });
464        })
465        .await;
466
467        test.assert(&identities[1], [koid_component_1], |handles| async move {
468            assert_matches!(
469                handles[0].as_ref(),
470                InspectHandle::Tree { proxy: tree, .. } => {
471                   let hierarchy = read(tree).await.unwrap();
472                   assert_json_diff!(hierarchy, root: {
473                       is_insp2: true,
474                   });
475            });
476        })
477        .await;
478    }
479
480    #[fuchsia::test]
481    async fn dropping_tree_removes_component_identity_from_repo() {
482        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
483
484        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
485
486        let tree = test.serve(&identity, Inspector::default(), TreeServerSendPreference::default());
487        let koid = test.publish(&identity, tree);
488
489        test.stop_all().await;
490
491        // this executing to completion means the identity was present
492        test.assert(&identity, [koid], |handles: [_; 1]| {
493            assert_eq!(handles.len(), 1);
494            async {}
495        })
496        .await;
497
498        test.drop_tree_servers(&identity).await;
499
500        // this executing to completion means the identity is not there anymore; we know
501        // it previously was present
502        test.repo.wait_until_gone(&identity).await;
503    }
504
505    #[fuchsia::test]
506    async fn fetch_escrow() {
507        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
508        let test = TestHarness::new(vec![Arc::clone(&identity)]);
509        let proxy = test.components.get(&identity).unwrap().proxy.as_ref().unwrap();
510
511        // Escrow a VMO.
512        let vmo = {
513            let inspector = Inspector::default();
514            inspector.root().record_int("val", 123);
515            inspector.frozen_vmo_copy().unwrap()
516        };
517        let (token0, token1) = zx::EventPair::create();
518
519        proxy
520            .escrow(finspect::InspectSinkEscrowRequest {
521                vmo: Some(vmo),
522                token: Some(finspect::EscrowToken { token: token0 }),
523                ..Default::default()
524            })
525            .unwrap();
526
527        // Fetch the escrowed VMO.
528        let vmo = assert_matches!(
529            proxy
530                .fetch_escrow(finspect::InspectSinkFetchEscrowRequest {
531                    token: Some(finspect::EscrowToken { token: token1 }),
532                    ..Default::default()
533                })
534                .await,
535            Ok(finspect::InspectSinkFetchEscrowResponse { vmo: Some(vmo), .. }) => vmo
536        );
537
538        // Verify the fetched VMO.
539        let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
540        let hierarchy = read(&inspector).await.unwrap();
541        assert_json_diff!(hierarchy, root: {
542            val: 123i64,
543        });
544    }
545
546    #[fuchsia::test]
547    async fn fetch_escrow_with_tree() {
548        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
549        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
550
551        // Escrow a VMO.
552        let vmo = {
553            let inspector = Inspector::default();
554            inspector.root().record_int("val", 123);
555            inspector.frozen_vmo_copy().unwrap()
556        };
557        let (token0, token1) = zx::EventPair::create();
558
559        let proxy = test.components.get(&identity).unwrap().proxy.as_ref().unwrap();
560        proxy
561            .escrow(finspect::InspectSinkEscrowRequest {
562                vmo: Some(vmo),
563                token: Some(finspect::EscrowToken { token: token0 }),
564                ..Default::default()
565            })
566            .unwrap();
567
568        // Fetch the escrowed VMO, provide tree to publish with the same name.
569        let (tree, stream) = fidl::endpoints::create_request_stream::<TreeMarker>();
570        let koid = tree.as_handle_ref().get_koid().unwrap();
571        let resp = proxy
572            .fetch_escrow(finspect::InspectSinkFetchEscrowRequest {
573                token: Some(finspect::EscrowToken { token: token1 }),
574                tree: Some(tree),
575                ..Default::default()
576            })
577            .await;
578
579        let vmo = assert_matches!(
580            resp, Ok(finspect::InspectSinkFetchEscrowResponse { vmo: Some(vmo), .. }) => vmo);
581
582        // Verify the fetched VMO.
583        let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
584        let hierarchy = read(&inspector).await.unwrap();
585        assert_json_diff!(hierarchy, root: {
586            val: 123i64,
587        });
588
589        // Serve the escrowed VMO.
590        test.serve_stream(&identity, inspector, TreeServerSendPreference::default(), stream);
591
592        // Verify the published tree serves the escrowed content.
593        test.assert(&identity, [koid], |handles| async move {
594            let tree = assert_matches!(
595                handles[0].as_ref(),
596                InspectHandle::Tree { proxy, .. } => proxy
597            );
598            let hierarchy = read(tree).await.unwrap();
599            assert_json_diff!(hierarchy, root: {
600                val: 123i64,
601            });
602        })
603        .await;
604    }
605
606    #[fuchsia::test]
607    async fn fetch_escrow_with_tree_and_name() {
608        const VMO_NAME: &str = "FetchEscrowWithTreeVmo";
609
610        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
611        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
612
613        // Escrow a VMO.
614        let vmo = {
615            let inspector = Inspector::default();
616            inspector.root().record_int("val", 123);
617            inspector.frozen_vmo_copy().unwrap()
618        };
619        let (token0, token1) = zx::EventPair::create();
620
621        let proxy = test.components.get(&identity).unwrap().proxy.as_ref().unwrap();
622        proxy
623            .escrow(finspect::InspectSinkEscrowRequest {
624                vmo: Some(vmo),
625                token: Some(finspect::EscrowToken { token: token0 }),
626                name: Some(VMO_NAME.to_string()),
627                ..Default::default()
628            })
629            .unwrap();
630
631        // Fetch the escrowed VMO, provide tree to publish with the same name.
632        let (tree, stream) = fidl::endpoints::create_request_stream::<TreeMarker>();
633        let koid = tree.as_handle_ref().get_koid().unwrap();
634        let resp = proxy
635            .fetch_escrow(finspect::InspectSinkFetchEscrowRequest {
636                token: Some(finspect::EscrowToken { token: token1 }),
637                tree: Some(tree),
638                ..Default::default()
639            })
640            .await;
641
642        let vmo = assert_matches!(
643            resp, Ok(finspect::InspectSinkFetchEscrowResponse { vmo: Some(vmo), .. }) => vmo);
644
645        // Verify the fetched VMO.
646        let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
647        let hierarchy = read(&inspector).await.unwrap();
648        assert_json_diff!(hierarchy, root: {
649            val: 123i64,
650        });
651
652        // Serve the escrowed VMO.
653        test.serve_stream(&identity, inspector, TreeServerSendPreference::default(), stream);
654
655        // Verify the published tree serves the escrowed content.
656        test.assert_on_selector(
657            &identity,
658            [koid],
659            |handles| async move {
660                let tree = assert_matches!(
661                    handles[0].as_ref(),
662                    InspectHandle::Tree { proxy, .. } => proxy
663                );
664                let hierarchy = read(tree).await.unwrap();
665                assert_json_diff!(hierarchy, root: {
666                    val: 123i64,
667                });
668            },
669            selectors::parse_selector::<VerboseError>(&format!("{identity}:[name={VMO_NAME}]root"))
670                .expect("parse selector"),
671        )
672        .await
673    }
674}