1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::inspect::container::InspectHandle;
9use crate::inspect::repository::InspectRepository;
10use anyhow::Error;
11use fidl::endpoints::{ControlHandle, Responder};
12use futures::StreamExt;
13use log::{debug, warn};
14use std::sync::Arc;
15use {fidl_fuchsia_inspect as finspect, fuchsia_async as fasync};
16
17pub struct InspectSinkServer {
18 repo: Arc<InspectRepository>,
20
21 scope: fasync::Scope,
23}
24
25impl InspectSinkServer {
26 pub fn new(repo: Arc<InspectRepository>, scope: fasync::Scope) -> Self {
28 Self { repo, scope }
29 }
30
31 fn spawn(&self, component: Arc<ComponentIdentity>, stream: finspect::InspectSinkRequestStream) {
33 let repo = Arc::clone(&self.repo);
34 self.scope.spawn(async move {
35 if let Err(e) = Self::handle_requests(repo, component, stream).await {
36 warn!("error handling InspectSink requests: {e}");
37 }
38 });
39 }
40
41 async fn handle_requests(
42 repo: Arc<InspectRepository>,
43 component: Arc<ComponentIdentity>,
44 mut stream: finspect::InspectSinkRequestStream,
45 ) -> Result<(), Error> {
46 while let Some(Ok(request)) = stream.next().await {
47 match request {
48 finspect::InspectSinkRequest::Publish {
49 payload: finspect::InspectSinkPublishRequest { tree: Some(tree), name, .. },
50 ..
51 } => repo.add_inspect_handle(
52 Arc::clone(&component),
53 InspectHandle::tree(tree.into_proxy(), name),
54 ),
55 finspect::InspectSinkRequest::Publish {
56 payload: finspect::InspectSinkPublishRequest { tree: None, name, .. },
57 control_handle,
58 } => {
59 warn!(name:?, component:%; "InspectSink/Publish without a tree");
60 control_handle.shutdown();
61 }
62 finspect::InspectSinkRequest::Escrow {
63 payload:
64 finspect::InspectSinkEscrowRequest {
65 vmo: Some(vmo),
66 name,
67 token: Some(token),
68 tree,
69 ..
70 },
71 ..
72 } => {
73 repo.escrow_handle(
74 Arc::clone(&component),
75 vmo,
76 token,
77 name,
78 tree.map(zx::Koid::from_raw),
79 );
80 }
81 finspect::InspectSinkRequest::Escrow {
82 control_handle,
83 payload: finspect::InspectSinkEscrowRequest { vmo, token, .. },
84 } => {
85 warn!(
86 component:%,
87 has_vmo = vmo.is_some(),
88 has_token = token.is_some();
89 "Attempted to escrow inspect without required data"
90 );
91 control_handle.shutdown();
92 }
93 finspect::InspectSinkRequest::FetchEscrow {
94 responder,
95 payload:
96 finspect::InspectSinkFetchEscrowRequest { tree, token: Some(token), .. },
97 } => {
98 let vmo = repo.fetch_escrow(Arc::clone(&component), token, tree);
99 if let Err(err) = responder.send(finspect::InspectSinkFetchEscrowResponse {
100 vmo,
101 ..Default::default()
102 }) {
103 debug!("Failed sending FetchEscrowResponse: {err:?}");
104 }
105 }
106 finspect::InspectSinkRequest::FetchEscrow {
107 responder,
108 payload: finspect::InspectSinkFetchEscrowRequest { token: None, .. },
109 } => {
110 warn!(component:%; "Attempted to fetch escrowed inspect with invalid data");
111 responder.control_handle().shutdown();
112 }
113 finspect::InspectSinkRequest::_UnknownMethod {
114 ordinal,
115 control_handle,
116 method_type,
117 ..
118 } => {
119 warn!(ordinal, method_type:?; "Received unknown request for InspectSink");
120 control_handle.shutdown();
122 }
123 }
124 }
125
126 Ok(())
127 }
128}
129
130impl EventConsumer for InspectSinkServer {
131 fn handle(self: Arc<Self>, event: Event) {
132 match event.payload {
133 EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
134 component,
135 request_stream,
136 }) => {
137 self.spawn(component, request_stream);
138 }
139 _ => unreachable!("InspectSinkServer is only subscribed to InspectSinkRequested"),
140 }
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use crate::events::router::EventConsumer;
148 use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
149 use crate::identity::ComponentIdentity;
150 use crate::inspect::container::InspectHandle;
151 use crate::inspect::repository::InspectRepository;
152 use crate::pipeline::StaticHierarchyAllowlist;
153 use assert_matches::assert_matches;
154 use diagnostics_assertions::assert_json_diff;
155 use fidl::endpoints::{ClientEnd, create_proxy_and_stream};
156 use fidl_fuchsia_diagnostics as fdiagnostics;
157 use fidl_fuchsia_inspect::{
158 InspectSinkMarker, InspectSinkProxy, InspectSinkPublishRequest, TreeMarker,
159 };
160 use fuchsia_inspect::reader::read;
161 use fuchsia_inspect::{Inspector, InspectorConfig};
162 use futures::Future;
163 use inspect_runtime::TreeServerSendPreference;
164 use inspect_runtime::service::spawn_tree_server_with_stream;
165 use selectors::VerboseError;
166 use std::collections::HashMap;
167 use std::sync::Arc;
168 use zx::{self as zx, AsHandleRef};
169
170 struct TestHarness {
171 repo: Arc<InspectRepository>,
173
174 components: HashMap<Arc<ComponentIdentity>, TestComponent>,
176
177 _server: Arc<InspectSinkServer>,
179
180 scope: Option<fasync::Scope>,
182 }
183
184 struct TestComponent {
185 proxy: Option<InspectSinkProxy>,
186 scope: Option<fasync::Scope>,
188 }
189
190 impl TestHarness {
191 fn new(identity: Vec<Arc<ComponentIdentity>>) -> Self {
194 let repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
195 let scope = fasync::Scope::new();
196 let server = Arc::new(InspectSinkServer::new(Arc::clone(&repo), scope.new_child()));
197
198 let components = identity
199 .into_iter()
200 .map(|id| {
201 let (proxy, request_stream) = create_proxy_and_stream::<InspectSinkMarker>();
202
203 Arc::clone(&server).handle(Event {
204 timestamp: zx::BootInstant::get(),
205 payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
206 component: Arc::clone(&id),
207 request_stream,
208 }),
209 });
210
211 (id, TestComponent { proxy: Some(proxy), scope: Some(fasync::Scope::new()) })
212 })
213 .collect();
214
215 Self { repo, _server: server, scope: Some(scope), components }
216 }
217
218 fn publish(
220 &mut self,
221 id: &Arc<ComponentIdentity>,
222 tree: ClientEnd<TreeMarker>,
223 ) -> zx::Koid {
224 let koid = tree.as_handle_ref().get_koid().unwrap();
225 let component = self.components.get(id).expect("unknown component");
226 let proxy = component.proxy.as_ref().expect("InspectSink proxy stopped");
227 proxy
228 .publish(InspectSinkPublishRequest { tree: Some(tree), ..Default::default() })
229 .unwrap();
230 koid
231 }
232
233 fn serve(
235 &mut self,
236 component: &Arc<ComponentIdentity>,
237 inspector: Inspector,
238 settings: TreeServerSendPreference,
239 ) -> ClientEnd<TreeMarker> {
240 let (tree, stream) = fidl::endpoints::create_request_stream::<TreeMarker>();
241 self.serve_stream(component, inspector, settings, stream);
242 tree
243 }
244
245 fn serve_stream(
247 &mut self,
248 component: &Arc<ComponentIdentity>,
249 inspector: Inspector,
250 settings: TreeServerSendPreference,
251 stream: finspect::TreeRequestStream,
252 ) {
253 let component = self.components.get_mut(component).expect("unknown component");
254 let scope = component.scope.as_ref().expect("already dropped tree server");
255 spawn_tree_server_with_stream(inspector, settings, stream, scope);
256 }
257
258 async fn drop_tree_servers(&mut self, component: &Arc<ComponentIdentity>) {
260 let component = self.components.get_mut(component).expect("unknown component");
261 let scope = component.scope.take().expect("tree server(s) already dropped");
262 scope.cancel().await;
263 }
264
265 async fn assert<const N: usize, F, Fut>(
272 &self,
273 identity: &Arc<ComponentIdentity>,
274 koids: [zx::Koid; N],
275 assertions: F,
276 ) where
277 F: FnOnce([Arc<InspectHandle>; N]) -> Fut,
278 Fut: Future<Output = ()>,
279 {
280 self.assert_on_selector(
281 identity,
282 koids,
283 assertions,
284 selectors::parse_selector::<VerboseError>(&format!("{identity}:root"))
285 .expect("parse selector"),
286 )
287 .await
288 }
289
290 async fn assert_on_selector<const N: usize, F, Fut>(
291 &self,
292 identity: &Arc<ComponentIdentity>,
293 koids: [zx::Koid; N],
294 assertions: F,
295 selector: fdiagnostics::Selector,
296 ) where
297 F: FnOnce([Arc<InspectHandle>; N]) -> Fut,
298 Fut: Future<Output = ()>,
299 {
300 self.repo.wait_for_artifact(identity).await;
301 let containers = self.repo.fetch_inspect_data(
302 &Some(vec![selector]),
303 StaticHierarchyAllowlist::new_disabled(),
304 );
305 assert_eq!(containers.len(), 1);
306 assertions(
307 koids
308 .iter()
309 .map(|koid| {
310 containers[0]
311 .inspect_handles
312 .iter()
313 .filter_map(|h| h.upgrade())
314 .find(|handle| handle.koid() == *koid)
315 .unwrap()
316 })
317 .collect::<Vec<_>>()
318 .try_into()
319 .unwrap(),
320 )
321 .await;
322 }
323
324 async fn stop_all(&mut self) {
326 for (_, component) in self.components.iter_mut() {
327 component.proxy = None;
328 }
329 self.scope.take().unwrap().close().await;
330 }
331 }
332
333 #[fuchsia::test]
334 async fn connect() {
335 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
336
337 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
338
339 let insp = Inspector::default();
340 insp.root().record_int("int", 0);
341 let tree = test.serve(&identity, insp, TreeServerSendPreference::default());
342 let koid = test.publish(&identity, tree);
343
344 test.assert(&identity, [koid], |handles| async move {
345 assert_matches!(
346 handles[0].as_ref(),
347 InspectHandle::Tree { proxy: tree, .. } => {
348 let hierarchy = read(tree).await.unwrap();
349 assert_json_diff!(hierarchy, root: {
350 int: 0i64,
351 });
352 });
353 })
354 .await;
355 }
356
357 #[fuchsia::test]
358 async fn publish_multiple_times_on_the_same_connection() {
359 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
360
361 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
362
363 let insp = Inspector::default();
364 insp.root().record_int("int", 0);
365 let tree = test.serve(&identity, insp, TreeServerSendPreference::default());
366
367 let other_insp = Inspector::default();
368 other_insp.root().record_double("double", 1.24);
369 let other_tree = test.serve(&identity, other_insp, TreeServerSendPreference::default());
370
371 let koid0 = test.publish(&identity, tree);
372 let koid1 = test.publish(&identity, other_tree);
373
374 test.assert(&identity, [koid0, koid1], |handles| async move {
375 assert_matches!(
376 handles[0].as_ref(),
377 InspectHandle::Tree { proxy: tree, ..} => {
378 let hierarchy = read(tree).await.unwrap();
379 assert_json_diff!(hierarchy, root: {
380 int: 0i64,
381 });
382 });
383
384 assert_matches!(
385 handles[1].as_ref(),
386 InspectHandle::Tree { proxy: tree, .. } => {
387 let hierarchy = read(tree).await.unwrap();
388 assert_json_diff!(hierarchy, root: {
389 double: 1.24,
390 });
391 });
392 })
393 .await;
394 }
395
396 #[fuchsia::test]
397 async fn tree_remains_after_inspect_sink_disconnects() {
398 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
399
400 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
401
402 let insp = Inspector::default();
403 insp.root().record_int("int", 0);
404 let tree = test.serve(&identity, insp, TreeServerSendPreference::default());
405 let koid = test.publish(&identity, tree);
406
407 test.assert(&identity, [koid], |handles| async move {
408 assert_matches!(
409 handles[0].as_ref(),
410 InspectHandle::Tree { proxy: tree, .. } => {
411 let hierarchy = read(tree).await.unwrap();
412 assert_json_diff!(hierarchy, root: {
413 int: 0i64,
414 });
415 });
416 })
417 .await;
418
419 test.stop_all().await;
420
421 test.assert(&identity, [koid], |handles| async move {
423 assert_matches!(
424 handles[0].as_ref(),
425 InspectHandle::Tree { proxy: tree, ..} => {
426 let hierarchy = read(tree).await.unwrap();
427 assert_json_diff!(hierarchy, root: {
428 int: 0i64,
429 });
430 });
431 })
432 .await;
433 }
434
435 #[fuchsia::test]
436 async fn connect_with_multiple_proxies() {
437 let identities: Vec<Arc<ComponentIdentity>> = vec![
438 Arc::new(vec!["a", "b", "foo.cm"].into()),
439 Arc::new(vec!["a", "b", "foo2.cm"].into()),
440 ];
441
442 let mut test = TestHarness::new(identities.clone());
443
444 let insp = Inspector::default();
445 insp.root().record_int("int", 0);
446 let tree = test.serve(&identities[0], insp, TreeServerSendPreference::default());
447
448 let insp2 = Inspector::default();
449 insp2.root().record_bool("is_insp2", true);
450 let tree2 = test.serve(&identities[1], insp2, TreeServerSendPreference::default());
451
452 let koid_component_0 = test.publish(&identities[0], tree);
453 let koid_component_1 = test.publish(&identities[1], tree2);
454
455 test.assert(&identities[0], [koid_component_0], |handles| async move {
456 assert_matches!(
457 handles[0].as_ref(),
458 InspectHandle::Tree { proxy: tree, .. } => {
459 let hierarchy = read(tree).await.unwrap();
460 assert_json_diff!(hierarchy, root: {
461 int: 0i64,
462 });
463 });
464 })
465 .await;
466
467 test.assert(&identities[1], [koid_component_1], |handles| async move {
468 assert_matches!(
469 handles[0].as_ref(),
470 InspectHandle::Tree { proxy: tree, .. } => {
471 let hierarchy = read(tree).await.unwrap();
472 assert_json_diff!(hierarchy, root: {
473 is_insp2: true,
474 });
475 });
476 })
477 .await;
478 }
479
480 #[fuchsia::test]
481 async fn dropping_tree_removes_component_identity_from_repo() {
482 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
483
484 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
485
486 let tree = test.serve(&identity, Inspector::default(), TreeServerSendPreference::default());
487 let koid = test.publish(&identity, tree);
488
489 test.stop_all().await;
490
491 test.assert(&identity, [koid], |handles: [_; 1]| {
493 assert_eq!(handles.len(), 1);
494 async {}
495 })
496 .await;
497
498 test.drop_tree_servers(&identity).await;
499
500 test.repo.wait_until_gone(&identity).await;
503 }
504
505 #[fuchsia::test]
506 async fn fetch_escrow() {
507 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
508 let test = TestHarness::new(vec![Arc::clone(&identity)]);
509 let proxy = test.components.get(&identity).unwrap().proxy.as_ref().unwrap();
510
511 let vmo = {
513 let inspector = Inspector::default();
514 inspector.root().record_int("val", 123);
515 inspector.frozen_vmo_copy().unwrap()
516 };
517 let (token0, token1) = zx::EventPair::create();
518
519 proxy
520 .escrow(finspect::InspectSinkEscrowRequest {
521 vmo: Some(vmo),
522 token: Some(finspect::EscrowToken { token: token0 }),
523 ..Default::default()
524 })
525 .unwrap();
526
527 let vmo = assert_matches!(
529 proxy
530 .fetch_escrow(finspect::InspectSinkFetchEscrowRequest {
531 token: Some(finspect::EscrowToken { token: token1 }),
532 ..Default::default()
533 })
534 .await,
535 Ok(finspect::InspectSinkFetchEscrowResponse { vmo: Some(vmo), .. }) => vmo
536 );
537
538 let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
540 let hierarchy = read(&inspector).await.unwrap();
541 assert_json_diff!(hierarchy, root: {
542 val: 123i64,
543 });
544 }
545
546 #[fuchsia::test]
547 async fn fetch_escrow_with_tree() {
548 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
549 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
550
551 let vmo = {
553 let inspector = Inspector::default();
554 inspector.root().record_int("val", 123);
555 inspector.frozen_vmo_copy().unwrap()
556 };
557 let (token0, token1) = zx::EventPair::create();
558
559 let proxy = test.components.get(&identity).unwrap().proxy.as_ref().unwrap();
560 proxy
561 .escrow(finspect::InspectSinkEscrowRequest {
562 vmo: Some(vmo),
563 token: Some(finspect::EscrowToken { token: token0 }),
564 ..Default::default()
565 })
566 .unwrap();
567
568 let (tree, stream) = fidl::endpoints::create_request_stream::<TreeMarker>();
570 let koid = tree.as_handle_ref().get_koid().unwrap();
571 let resp = proxy
572 .fetch_escrow(finspect::InspectSinkFetchEscrowRequest {
573 token: Some(finspect::EscrowToken { token: token1 }),
574 tree: Some(tree),
575 ..Default::default()
576 })
577 .await;
578
579 let vmo = assert_matches!(
580 resp, Ok(finspect::InspectSinkFetchEscrowResponse { vmo: Some(vmo), .. }) => vmo);
581
582 let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
584 let hierarchy = read(&inspector).await.unwrap();
585 assert_json_diff!(hierarchy, root: {
586 val: 123i64,
587 });
588
589 test.serve_stream(&identity, inspector, TreeServerSendPreference::default(), stream);
591
592 test.assert(&identity, [koid], |handles| async move {
594 let tree = assert_matches!(
595 handles[0].as_ref(),
596 InspectHandle::Tree { proxy, .. } => proxy
597 );
598 let hierarchy = read(tree).await.unwrap();
599 assert_json_diff!(hierarchy, root: {
600 val: 123i64,
601 });
602 })
603 .await;
604 }
605
606 #[fuchsia::test]
607 async fn fetch_escrow_with_tree_and_name() {
608 const VMO_NAME: &str = "FetchEscrowWithTreeVmo";
609
610 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
611 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
612
613 let vmo = {
615 let inspector = Inspector::default();
616 inspector.root().record_int("val", 123);
617 inspector.frozen_vmo_copy().unwrap()
618 };
619 let (token0, token1) = zx::EventPair::create();
620
621 let proxy = test.components.get(&identity).unwrap().proxy.as_ref().unwrap();
622 proxy
623 .escrow(finspect::InspectSinkEscrowRequest {
624 vmo: Some(vmo),
625 token: Some(finspect::EscrowToken { token: token0 }),
626 name: Some(VMO_NAME.to_string()),
627 ..Default::default()
628 })
629 .unwrap();
630
631 let (tree, stream) = fidl::endpoints::create_request_stream::<TreeMarker>();
633 let koid = tree.as_handle_ref().get_koid().unwrap();
634 let resp = proxy
635 .fetch_escrow(finspect::InspectSinkFetchEscrowRequest {
636 token: Some(finspect::EscrowToken { token: token1 }),
637 tree: Some(tree),
638 ..Default::default()
639 })
640 .await;
641
642 let vmo = assert_matches!(
643 resp, Ok(finspect::InspectSinkFetchEscrowResponse { vmo: Some(vmo), .. }) => vmo);
644
645 let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
647 let hierarchy = read(&inspector).await.unwrap();
648 assert_json_diff!(hierarchy, root: {
649 val: 123i64,
650 });
651
652 test.serve_stream(&identity, inspector, TreeServerSendPreference::default(), stream);
654
655 test.assert_on_selector(
657 &identity,
658 [koid],
659 |handles| async move {
660 let tree = assert_matches!(
661 handles[0].as_ref(),
662 InspectHandle::Tree { proxy, .. } => proxy
663 );
664 let hierarchy = read(tree).await.unwrap();
665 assert_json_diff!(hierarchy, root: {
666 val: 123i64,
667 });
668 },
669 selectors::parse_selector::<VerboseError>(&format!("{identity}:[name={VMO_NAME}]root"))
670 .expect("parse selector"),
671 )
672 .await
673 }
674}