archivist_lib/inspect/servers/
inspect_sink.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::inspect::container::InspectHandle;
9use crate::inspect::repository::InspectRepository;
10use anyhow::Error;
11use fidl::endpoints::{ControlHandle, Responder};
12use futures::StreamExt;
13use log::warn;
14use std::sync::Arc;
15use {fidl_fuchsia_inspect as finspect, fuchsia_async as fasync};
16
17pub struct InspectSinkServer {
18    /// Shared repository holding the Inspect handles.
19    repo: Arc<InspectRepository>,
20
21    /// Scope holding all tasks associated with this server.
22    scope: fasync::Scope,
23}
24
25impl InspectSinkServer {
26    /// Construct a server.
27    pub fn new(repo: Arc<InspectRepository>, scope: fasync::Scope) -> Self {
28        Self { repo, scope }
29    }
30
31    /// Handle incoming events. Mainly for use in EventConsumer impl.
32    fn spawn(&self, component: Arc<ComponentIdentity>, stream: finspect::InspectSinkRequestStream) {
33        let repo = Arc::clone(&self.repo);
34        self.scope.spawn(async move {
35            if let Err(e) = Self::handle_requests(repo, component, stream).await {
36                warn!("error handling InspectSink requests: {e}");
37            }
38        });
39    }
40
41    async fn handle_requests(
42        repo: Arc<InspectRepository>,
43        component: Arc<ComponentIdentity>,
44        mut stream: finspect::InspectSinkRequestStream,
45    ) -> Result<(), Error> {
46        while let Some(Ok(request)) = stream.next().await {
47            match request {
48                finspect::InspectSinkRequest::Publish {
49                    payload: finspect::InspectSinkPublishRequest { tree: Some(tree), name, .. },
50                    ..
51                } => repo.add_inspect_handle(
52                    Arc::clone(&component),
53                    InspectHandle::tree(tree.into_proxy(), name),
54                ),
55                finspect::InspectSinkRequest::Publish {
56                    payload: finspect::InspectSinkPublishRequest { tree: None, name, .. },
57                    control_handle,
58                } => {
59                    warn!(name:?, component:%; "InspectSink/Publish without a tree");
60                    control_handle.shutdown();
61                }
62                finspect::InspectSinkRequest::Escrow {
63                    payload:
64                        finspect::InspectSinkEscrowRequest {
65                            vmo: Some(vmo),
66                            name,
67                            token: Some(token),
68                            tree,
69                            ..
70                        },
71                    ..
72                } => {
73                    repo.escrow_handle(
74                        Arc::clone(&component),
75                        vmo,
76                        token,
77                        name,
78                        tree.map(zx::Koid::from_raw),
79                    );
80                }
81                finspect::InspectSinkRequest::Escrow {
82                    control_handle,
83                    payload: finspect::InspectSinkEscrowRequest { vmo, token, .. },
84                } => {
85                    warn!(
86                        component:%,
87                        has_vmo = vmo.is_some(),
88                        has_token = token.is_some();
89                        "Attempted to escrow inspect without required data"
90                    );
91                    control_handle.shutdown();
92                }
93                finspect::InspectSinkRequest::FetchEscrow {
94                    responder,
95                    payload:
96                        finspect::InspectSinkFetchEscrowRequest { tree, token: Some(token), .. },
97                } => {
98                    let vmo = repo.fetch_escrow(Arc::clone(&component), token, tree);
99                    let _ = responder.send(finspect::InspectSinkFetchEscrowResponse {
100                        vmo,
101                        ..Default::default()
102                    });
103                }
104                finspect::InspectSinkRequest::FetchEscrow {
105                    responder,
106                    payload: finspect::InspectSinkFetchEscrowRequest { token: None, .. },
107                } => {
108                    warn!(component:%; "Attempted to fetch escrowed inspect with invalid data");
109                    responder.control_handle().shutdown();
110                }
111                finspect::InspectSinkRequest::_UnknownMethod {
112                    ordinal,
113                    control_handle,
114                    method_type,
115                    ..
116                } => {
117                    warn!(ordinal, method_type:?; "Received unknown request for InspectSink");
118                    // Close the connection if we receive an unknown interaction.
119                    control_handle.shutdown();
120                }
121            }
122        }
123
124        Ok(())
125    }
126}
127
128impl EventConsumer for InspectSinkServer {
129    fn handle(self: Arc<Self>, event: Event) {
130        match event.payload {
131            EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
132                component,
133                request_stream,
134            }) => {
135                self.spawn(component, request_stream);
136            }
137            _ => unreachable!("InspectSinkServer is only subscribed to InspectSinkRequested"),
138        }
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use crate::events::router::EventConsumer;
146    use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
147    use crate::identity::ComponentIdentity;
148    use crate::inspect::container::InspectHandle;
149    use crate::inspect::repository::InspectRepository;
150    use crate::pipeline::StaticHierarchyAllowlist;
151    use assert_matches::assert_matches;
152    use diagnostics_assertions::assert_json_diff;
153    use fidl::endpoints::{create_proxy_and_stream, ClientEnd};
154    use fidl_fuchsia_inspect::{
155        InspectSinkMarker, InspectSinkProxy, InspectSinkPublishRequest, TreeMarker,
156    };
157    use fuchsia_inspect::reader::read;
158    use fuchsia_inspect::Inspector;
159    use futures::Future;
160    use inspect_runtime::service::spawn_tree_server;
161    use inspect_runtime::TreeServerSendPreference;
162    use selectors::VerboseError;
163    use std::sync::Arc;
164    use zx::{self as zx, AsHandleRef};
165
166    struct TestHarness {
167        /// Associates a faux component via ComponentIdentity with an InspectSinkProxy
168        proxy_pairs: Vec<(Arc<ComponentIdentity>, Option<InspectSinkProxy>)>,
169
170        /// The underlying repository.
171        repo: Arc<InspectRepository>,
172
173        /// The server that would be held by the Archivist.
174        _server: Arc<InspectSinkServer>,
175
176        /// The koids of the published TreeProxies in the order they were published.
177        koids: Vec<zx::Koid>,
178
179        /// The servers for each component's Tree protocol
180        tree_pairs: Vec<(Arc<ComponentIdentity>, Option<fasync::Scope>)>,
181
182        scope: Option<fasync::Scope>,
183    }
184
185    impl TestHarness {
186        /// Construct an InspectSinkServer with a ComponentIdentity/InspectSinkProxy pair
187        /// for each input ComponentIdentity.
188        fn new(identity: Vec<Arc<ComponentIdentity>>) -> Self {
189            let mut proxy_pairs = vec![];
190            let repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
191            let scope = fasync::Scope::new();
192            let server = Arc::new(InspectSinkServer::new(Arc::clone(&repo), scope.new_child()));
193            for id in identity.into_iter() {
194                let (proxy, request_stream) = create_proxy_and_stream::<InspectSinkMarker>();
195
196                Arc::clone(&server).handle(Event {
197                    timestamp: zx::BootInstant::get(),
198                    payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
199                        component: Arc::clone(&id),
200                        request_stream,
201                    }),
202                });
203
204                proxy_pairs.push((id, Some(proxy)));
205            }
206
207            Self {
208                proxy_pairs,
209                repo,
210                _server: server,
211                koids: vec![],
212                tree_pairs: vec![],
213                scope: Some(scope),
214            }
215        }
216
217        /// Publish `tree` via the proxy associated with `component`.
218        fn publish(&mut self, component: &Arc<ComponentIdentity>, tree: ClientEnd<TreeMarker>) {
219            for (id, proxy) in &self.proxy_pairs {
220                if id != component {
221                    continue;
222                }
223
224                if let Some(proxy) = &proxy {
225                    self.koids.push(tree.as_handle_ref().get_koid().unwrap());
226                    proxy
227                        .publish(InspectSinkPublishRequest {
228                            tree: Some(tree),
229                            ..Default::default()
230                        })
231                        .unwrap();
232                    return;
233                } else {
234                    panic!("cannot publish on stopped server/proxy pair");
235                }
236            }
237        }
238
239        /// Start a TreeProxy server and return the proxy.
240        fn serve(
241            &mut self,
242            component: Arc<ComponentIdentity>,
243            inspector: Inspector,
244            settings: TreeServerSendPreference,
245        ) -> ClientEnd<TreeMarker> {
246            let child = fasync::Scope::new();
247            let tree = spawn_tree_server(inspector, settings, &child);
248            self.tree_pairs.push((component, Some(child)));
249            tree
250        }
251
252        /// Drop the server(s) associated with `component`, as initialized by `serve`.
253        async fn drop_tree_servers(&mut self, component: &Arc<ComponentIdentity>) {
254            for (id, ref mut scope) in &mut self.tree_pairs {
255                if id != component {
256                    continue;
257                }
258
259                if scope.is_none() {
260                    continue;
261                }
262
263                scope.take().unwrap().cancel().await;
264            }
265        }
266
267        /// The published koids, with 0 referring to the first published tree.
268        fn published_koids(&self) -> &[zx::Koid] {
269            &self.koids
270        }
271
272        /// Execute closure `assertions` on the `InspectArtifactsContainer` associated with
273        /// `identity`.
274        ///
275        /// This function will wait for data to be available in `self.repo`, and therefore
276        /// might hang indefinitely if the data never appears. This is not a problem since
277        /// it is a unit test and `fx test` has timeouts available.
278        async fn assert<const N: usize, F, Fut>(
279            &self,
280            identity: &Arc<ComponentIdentity>,
281            koids: [zx::Koid; N],
282            assertions: F,
283        ) where
284            F: FnOnce([Arc<InspectHandle>; N]) -> Fut,
285            Fut: Future<Output = ()>,
286        {
287            self.repo.wait_for_artifact(identity).await;
288            let containers = self.repo.fetch_inspect_data(
289                &Some(vec![selectors::parse_selector::<VerboseError>(&format!("{identity}:root"))
290                    .expect("parse selector")]),
291                StaticHierarchyAllowlist::new_disabled(),
292            );
293            assert_eq!(containers.len(), 1);
294            assertions(
295                koids
296                    .iter()
297                    .map(|koid| {
298                        containers[0]
299                            .inspect_handles
300                            .iter()
301                            .filter_map(|h| h.upgrade())
302                            .find(|handle| handle.koid() == *koid)
303                            .unwrap()
304                    })
305                    .collect::<Vec<_>>()
306                    .try_into()
307                    .unwrap(),
308            )
309            .await;
310        }
311
312        /// Drops all published proxies, stops the server, and waits for it to complete.
313        async fn stop_all(&mut self) {
314            for (_, ref mut proxy) in &mut self.proxy_pairs {
315                proxy.take();
316            }
317
318            self.scope.take().unwrap().close().await;
319        }
320
321        async fn wait_until_gone(&self, component: &Arc<ComponentIdentity>) {
322            self.repo.wait_until_gone(component).await;
323        }
324    }
325
326    #[fuchsia::test]
327    async fn connect() {
328        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
329
330        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
331
332        let insp = Inspector::default();
333        insp.root().record_int("int", 0);
334        let tree = test.serve(Arc::clone(&identity), insp, TreeServerSendPreference::default());
335        test.publish(&identity, tree);
336
337        let koid = test.published_koids()[0];
338
339        test.assert(&identity, [koid], |handles| async move {
340            assert_matches!(
341                handles[0].as_ref(),
342                InspectHandle::Tree { proxy: tree, .. } => {
343                   let hierarchy = read(tree).await.unwrap();
344                   assert_json_diff!(hierarchy, root: {
345                       int: 0i64,
346                   });
347            });
348        })
349        .await;
350    }
351
352    #[fuchsia::test]
353    async fn publish_multiple_times_on_the_same_connection() {
354        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
355
356        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
357
358        let insp = Inspector::default();
359        insp.root().record_int("int", 0);
360        let tree = test.serve(Arc::clone(&identity), insp, TreeServerSendPreference::default());
361
362        let other_insp = Inspector::default();
363        other_insp.root().record_double("double", 1.24);
364        let other_tree =
365            test.serve(Arc::clone(&identity), other_insp, TreeServerSendPreference::default());
366
367        test.publish(&identity, tree);
368        test.publish(&identity, other_tree);
369
370        let koid0 = test.published_koids()[0];
371        let koid1 = test.published_koids()[1];
372
373        test.assert(&identity, [koid0, koid1], |handles| async move {
374            assert_matches!(
375                handles[0].as_ref(),
376                InspectHandle::Tree { proxy: tree, ..} => {
377                   let hierarchy = read(tree).await.unwrap();
378                   assert_json_diff!(hierarchy, root: {
379                       int: 0i64,
380                   });
381            });
382
383            assert_matches!(
384                handles[1].as_ref(),
385                InspectHandle::Tree { proxy: tree, .. } => {
386                   let hierarchy = read(tree).await.unwrap();
387                   assert_json_diff!(hierarchy, root: {
388                       double: 1.24,
389                   });
390            });
391        })
392        .await;
393    }
394
395    #[fuchsia::test]
396    async fn tree_remains_after_inspect_sink_disconnects() {
397        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
398
399        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
400
401        let insp = Inspector::default();
402        insp.root().record_int("int", 0);
403        let tree = test.serve(Arc::clone(&identity), insp, TreeServerSendPreference::default());
404        test.publish(&identity, tree);
405
406        let koid = test.published_koids()[0];
407
408        test.assert(&identity, [koid], |handles| async move {
409            assert_matches!(
410                handles[0].as_ref(),
411                InspectHandle::Tree { proxy: tree, .. } => {
412                   let hierarchy = read(tree).await.unwrap();
413                   assert_json_diff!(hierarchy, root: {
414                       int: 0i64,
415                   });
416            });
417        })
418        .await;
419
420        test.stop_all().await;
421
422        // the data must remain present as long as the tree server started above is alive
423        test.assert(&identity, [koid], |handles| async move {
424            assert_matches!(
425                handles[0].as_ref(),
426                InspectHandle::Tree { proxy: tree, ..} => {
427                   let hierarchy = read(tree).await.unwrap();
428                   assert_json_diff!(hierarchy, root: {
429                       int: 0i64,
430                   });
431            });
432        })
433        .await;
434    }
435
436    #[fuchsia::test]
437    async fn connect_with_multiple_proxies() {
438        let identities: Vec<Arc<ComponentIdentity>> = vec![
439            Arc::new(vec!["a", "b", "foo.cm"].into()),
440            Arc::new(vec!["a", "b", "foo2.cm"].into()),
441        ];
442
443        let mut test = TestHarness::new(identities.clone());
444
445        let insp = Inspector::default();
446        insp.root().record_int("int", 0);
447        let tree =
448            test.serve(Arc::clone(&identities[0]), insp, TreeServerSendPreference::default());
449
450        let insp2 = Inspector::default();
451        insp2.root().record_bool("is_insp2", true);
452        let tree2 =
453            test.serve(Arc::clone(&identities[1]), insp2, TreeServerSendPreference::default());
454
455        test.publish(&identities[0], tree);
456        test.publish(&identities[1], tree2);
457
458        let koid_component_0 = test.published_koids()[0];
459        let koid_component_1 = test.published_koids()[1];
460
461        test.assert(&identities[0], [koid_component_0], |handles| async move {
462            assert_matches!(
463                handles[0].as_ref(),
464                InspectHandle::Tree { proxy: tree, .. } => {
465                   let hierarchy = read(tree).await.unwrap();
466                   assert_json_diff!(hierarchy, root: {
467                       int: 0i64,
468                   });
469            });
470        })
471        .await;
472
473        test.assert(&identities[1], [koid_component_1], |handles| async move {
474            assert_matches!(
475                handles[0].as_ref(),
476                InspectHandle::Tree { proxy: tree, .. } => {
477                   let hierarchy = read(tree).await.unwrap();
478                   assert_json_diff!(hierarchy, root: {
479                       is_insp2: true,
480                   });
481            });
482        })
483        .await;
484    }
485
486    #[fuchsia::test]
487    async fn dropping_tree_removes_component_identity_from_repo() {
488        let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
489
490        let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
491
492        let tree = test.serve(
493            Arc::clone(&identity),
494            Inspector::default(),
495            TreeServerSendPreference::default(),
496        );
497        test.publish(&identity, tree);
498
499        test.stop_all().await;
500
501        // this executing to completion means the identity was present
502        test.assert(&identity, [test.published_koids()[0]], |handles: [_; 1]| {
503            assert_eq!(handles.len(), 1);
504            async {}
505        })
506        .await;
507
508        test.drop_tree_servers(&identity).await;
509
510        // this executing to completion means the identity is not there anymore; we know
511        // it previously was present
512        test.wait_until_gone(&identity).await;
513    }
514}