1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::inspect::container::InspectHandle;
9use crate::inspect::repository::InspectRepository;
10use anyhow::Error;
11use fidl::endpoints::{ControlHandle, Responder};
12use futures::StreamExt;
13use log::warn;
14use std::sync::Arc;
15use {fidl_fuchsia_inspect as finspect, fuchsia_async as fasync};
16
17pub struct InspectSinkServer {
18 repo: Arc<InspectRepository>,
20
21 scope: fasync::Scope,
23}
24
25impl InspectSinkServer {
26 pub fn new(repo: Arc<InspectRepository>, scope: fasync::Scope) -> Self {
28 Self { repo, scope }
29 }
30
31 fn spawn(&self, component: Arc<ComponentIdentity>, stream: finspect::InspectSinkRequestStream) {
33 let repo = Arc::clone(&self.repo);
34 self.scope.spawn(async move {
35 if let Err(e) = Self::handle_requests(repo, component, stream).await {
36 warn!("error handling InspectSink requests: {e}");
37 }
38 });
39 }
40
41 async fn handle_requests(
42 repo: Arc<InspectRepository>,
43 component: Arc<ComponentIdentity>,
44 mut stream: finspect::InspectSinkRequestStream,
45 ) -> Result<(), Error> {
46 while let Some(Ok(request)) = stream.next().await {
47 match request {
48 finspect::InspectSinkRequest::Publish {
49 payload: finspect::InspectSinkPublishRequest { tree: Some(tree), name, .. },
50 ..
51 } => repo.add_inspect_handle(
52 Arc::clone(&component),
53 InspectHandle::tree(tree.into_proxy(), name),
54 ),
55 finspect::InspectSinkRequest::Publish {
56 payload: finspect::InspectSinkPublishRequest { tree: None, name, .. },
57 control_handle,
58 } => {
59 warn!(name:?, component:%; "InspectSink/Publish without a tree");
60 control_handle.shutdown();
61 }
62 finspect::InspectSinkRequest::Escrow {
63 payload:
64 finspect::InspectSinkEscrowRequest {
65 vmo: Some(vmo),
66 name,
67 token: Some(token),
68 tree,
69 ..
70 },
71 ..
72 } => {
73 repo.escrow_handle(
74 Arc::clone(&component),
75 vmo,
76 token,
77 name,
78 tree.map(zx::Koid::from_raw),
79 );
80 }
81 finspect::InspectSinkRequest::Escrow {
82 control_handle,
83 payload: finspect::InspectSinkEscrowRequest { vmo, token, .. },
84 } => {
85 warn!(
86 component:%,
87 has_vmo = vmo.is_some(),
88 has_token = token.is_some();
89 "Attempted to escrow inspect without required data"
90 );
91 control_handle.shutdown();
92 }
93 finspect::InspectSinkRequest::FetchEscrow {
94 responder,
95 payload:
96 finspect::InspectSinkFetchEscrowRequest { tree, token: Some(token), .. },
97 } => {
98 let vmo = repo.fetch_escrow(Arc::clone(&component), token, tree);
99 let _ = responder.send(finspect::InspectSinkFetchEscrowResponse {
100 vmo,
101 ..Default::default()
102 });
103 }
104 finspect::InspectSinkRequest::FetchEscrow {
105 responder,
106 payload: finspect::InspectSinkFetchEscrowRequest { token: None, .. },
107 } => {
108 warn!(component:%; "Attempted to fetch escrowed inspect with invalid data");
109 responder.control_handle().shutdown();
110 }
111 finspect::InspectSinkRequest::_UnknownMethod {
112 ordinal,
113 control_handle,
114 method_type,
115 ..
116 } => {
117 warn!(ordinal, method_type:?; "Received unknown request for InspectSink");
118 control_handle.shutdown();
120 }
121 }
122 }
123
124 Ok(())
125 }
126}
127
128impl EventConsumer for InspectSinkServer {
129 fn handle(self: Arc<Self>, event: Event) {
130 match event.payload {
131 EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
132 component,
133 request_stream,
134 }) => {
135 self.spawn(component, request_stream);
136 }
137 _ => unreachable!("InspectSinkServer is only subscribed to InspectSinkRequested"),
138 }
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use crate::events::router::EventConsumer;
146 use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
147 use crate::identity::ComponentIdentity;
148 use crate::inspect::container::InspectHandle;
149 use crate::inspect::repository::InspectRepository;
150 use crate::pipeline::StaticHierarchyAllowlist;
151 use assert_matches::assert_matches;
152 use diagnostics_assertions::assert_json_diff;
153 use fidl::endpoints::{create_proxy_and_stream, ClientEnd};
154 use fidl_fuchsia_inspect::{
155 InspectSinkMarker, InspectSinkProxy, InspectSinkPublishRequest, TreeMarker,
156 };
157 use fuchsia_inspect::reader::read;
158 use fuchsia_inspect::Inspector;
159 use futures::Future;
160 use inspect_runtime::service::spawn_tree_server;
161 use inspect_runtime::TreeServerSendPreference;
162 use selectors::VerboseError;
163 use std::sync::Arc;
164 use zx::{self as zx, AsHandleRef};
165
166 struct TestHarness {
167 proxy_pairs: Vec<(Arc<ComponentIdentity>, Option<InspectSinkProxy>)>,
169
170 repo: Arc<InspectRepository>,
172
173 _server: Arc<InspectSinkServer>,
175
176 koids: Vec<zx::Koid>,
178
179 tree_pairs: Vec<(Arc<ComponentIdentity>, Option<fasync::Scope>)>,
181
182 scope: Option<fasync::Scope>,
183 }
184
185 impl TestHarness {
186 fn new(identity: Vec<Arc<ComponentIdentity>>) -> Self {
189 let mut proxy_pairs = vec![];
190 let repo = Arc::new(InspectRepository::new(vec![], fasync::Scope::new()));
191 let scope = fasync::Scope::new();
192 let server = Arc::new(InspectSinkServer::new(Arc::clone(&repo), scope.new_child()));
193 for id in identity.into_iter() {
194 let (proxy, request_stream) = create_proxy_and_stream::<InspectSinkMarker>();
195
196 Arc::clone(&server).handle(Event {
197 timestamp: zx::BootInstant::get(),
198 payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
199 component: Arc::clone(&id),
200 request_stream,
201 }),
202 });
203
204 proxy_pairs.push((id, Some(proxy)));
205 }
206
207 Self {
208 proxy_pairs,
209 repo,
210 _server: server,
211 koids: vec![],
212 tree_pairs: vec![],
213 scope: Some(scope),
214 }
215 }
216
217 fn publish(&mut self, component: &Arc<ComponentIdentity>, tree: ClientEnd<TreeMarker>) {
219 for (id, proxy) in &self.proxy_pairs {
220 if id != component {
221 continue;
222 }
223
224 if let Some(proxy) = &proxy {
225 self.koids.push(tree.as_handle_ref().get_koid().unwrap());
226 proxy
227 .publish(InspectSinkPublishRequest {
228 tree: Some(tree),
229 ..Default::default()
230 })
231 .unwrap();
232 return;
233 } else {
234 panic!("cannot publish on stopped server/proxy pair");
235 }
236 }
237 }
238
239 fn serve(
241 &mut self,
242 component: Arc<ComponentIdentity>,
243 inspector: Inspector,
244 settings: TreeServerSendPreference,
245 ) -> ClientEnd<TreeMarker> {
246 let child = fasync::Scope::new();
247 let tree = spawn_tree_server(inspector, settings, &child);
248 self.tree_pairs.push((component, Some(child)));
249 tree
250 }
251
252 async fn drop_tree_servers(&mut self, component: &Arc<ComponentIdentity>) {
254 for (id, ref mut scope) in &mut self.tree_pairs {
255 if id != component {
256 continue;
257 }
258
259 if scope.is_none() {
260 continue;
261 }
262
263 scope.take().unwrap().cancel().await;
264 }
265 }
266
267 fn published_koids(&self) -> &[zx::Koid] {
269 &self.koids
270 }
271
272 async fn assert<const N: usize, F, Fut>(
279 &self,
280 identity: &Arc<ComponentIdentity>,
281 koids: [zx::Koid; N],
282 assertions: F,
283 ) where
284 F: FnOnce([Arc<InspectHandle>; N]) -> Fut,
285 Fut: Future<Output = ()>,
286 {
287 self.repo.wait_for_artifact(identity).await;
288 let containers = self.repo.fetch_inspect_data(
289 &Some(vec![selectors::parse_selector::<VerboseError>(&format!("{identity}:root"))
290 .expect("parse selector")]),
291 StaticHierarchyAllowlist::new_disabled(),
292 );
293 assert_eq!(containers.len(), 1);
294 assertions(
295 koids
296 .iter()
297 .map(|koid| {
298 containers[0]
299 .inspect_handles
300 .iter()
301 .filter_map(|h| h.upgrade())
302 .find(|handle| handle.koid() == *koid)
303 .unwrap()
304 })
305 .collect::<Vec<_>>()
306 .try_into()
307 .unwrap(),
308 )
309 .await;
310 }
311
312 async fn stop_all(&mut self) {
314 for (_, ref mut proxy) in &mut self.proxy_pairs {
315 proxy.take();
316 }
317
318 self.scope.take().unwrap().close().await;
319 }
320
321 async fn wait_until_gone(&self, component: &Arc<ComponentIdentity>) {
322 self.repo.wait_until_gone(component).await;
323 }
324 }
325
326 #[fuchsia::test]
327 async fn connect() {
328 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
329
330 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
331
332 let insp = Inspector::default();
333 insp.root().record_int("int", 0);
334 let tree = test.serve(Arc::clone(&identity), insp, TreeServerSendPreference::default());
335 test.publish(&identity, tree);
336
337 let koid = test.published_koids()[0];
338
339 test.assert(&identity, [koid], |handles| async move {
340 assert_matches!(
341 handles[0].as_ref(),
342 InspectHandle::Tree { proxy: tree, .. } => {
343 let hierarchy = read(tree).await.unwrap();
344 assert_json_diff!(hierarchy, root: {
345 int: 0i64,
346 });
347 });
348 })
349 .await;
350 }
351
352 #[fuchsia::test]
353 async fn publish_multiple_times_on_the_same_connection() {
354 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
355
356 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
357
358 let insp = Inspector::default();
359 insp.root().record_int("int", 0);
360 let tree = test.serve(Arc::clone(&identity), insp, TreeServerSendPreference::default());
361
362 let other_insp = Inspector::default();
363 other_insp.root().record_double("double", 1.24);
364 let other_tree =
365 test.serve(Arc::clone(&identity), other_insp, TreeServerSendPreference::default());
366
367 test.publish(&identity, tree);
368 test.publish(&identity, other_tree);
369
370 let koid0 = test.published_koids()[0];
371 let koid1 = test.published_koids()[1];
372
373 test.assert(&identity, [koid0, koid1], |handles| async move {
374 assert_matches!(
375 handles[0].as_ref(),
376 InspectHandle::Tree { proxy: tree, ..} => {
377 let hierarchy = read(tree).await.unwrap();
378 assert_json_diff!(hierarchy, root: {
379 int: 0i64,
380 });
381 });
382
383 assert_matches!(
384 handles[1].as_ref(),
385 InspectHandle::Tree { proxy: tree, .. } => {
386 let hierarchy = read(tree).await.unwrap();
387 assert_json_diff!(hierarchy, root: {
388 double: 1.24,
389 });
390 });
391 })
392 .await;
393 }
394
395 #[fuchsia::test]
396 async fn tree_remains_after_inspect_sink_disconnects() {
397 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
398
399 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
400
401 let insp = Inspector::default();
402 insp.root().record_int("int", 0);
403 let tree = test.serve(Arc::clone(&identity), insp, TreeServerSendPreference::default());
404 test.publish(&identity, tree);
405
406 let koid = test.published_koids()[0];
407
408 test.assert(&identity, [koid], |handles| async move {
409 assert_matches!(
410 handles[0].as_ref(),
411 InspectHandle::Tree { proxy: tree, .. } => {
412 let hierarchy = read(tree).await.unwrap();
413 assert_json_diff!(hierarchy, root: {
414 int: 0i64,
415 });
416 });
417 })
418 .await;
419
420 test.stop_all().await;
421
422 test.assert(&identity, [koid], |handles| async move {
424 assert_matches!(
425 handles[0].as_ref(),
426 InspectHandle::Tree { proxy: tree, ..} => {
427 let hierarchy = read(tree).await.unwrap();
428 assert_json_diff!(hierarchy, root: {
429 int: 0i64,
430 });
431 });
432 })
433 .await;
434 }
435
436 #[fuchsia::test]
437 async fn connect_with_multiple_proxies() {
438 let identities: Vec<Arc<ComponentIdentity>> = vec![
439 Arc::new(vec!["a", "b", "foo.cm"].into()),
440 Arc::new(vec!["a", "b", "foo2.cm"].into()),
441 ];
442
443 let mut test = TestHarness::new(identities.clone());
444
445 let insp = Inspector::default();
446 insp.root().record_int("int", 0);
447 let tree =
448 test.serve(Arc::clone(&identities[0]), insp, TreeServerSendPreference::default());
449
450 let insp2 = Inspector::default();
451 insp2.root().record_bool("is_insp2", true);
452 let tree2 =
453 test.serve(Arc::clone(&identities[1]), insp2, TreeServerSendPreference::default());
454
455 test.publish(&identities[0], tree);
456 test.publish(&identities[1], tree2);
457
458 let koid_component_0 = test.published_koids()[0];
459 let koid_component_1 = test.published_koids()[1];
460
461 test.assert(&identities[0], [koid_component_0], |handles| async move {
462 assert_matches!(
463 handles[0].as_ref(),
464 InspectHandle::Tree { proxy: tree, .. } => {
465 let hierarchy = read(tree).await.unwrap();
466 assert_json_diff!(hierarchy, root: {
467 int: 0i64,
468 });
469 });
470 })
471 .await;
472
473 test.assert(&identities[1], [koid_component_1], |handles| async move {
474 assert_matches!(
475 handles[0].as_ref(),
476 InspectHandle::Tree { proxy: tree, .. } => {
477 let hierarchy = read(tree).await.unwrap();
478 assert_json_diff!(hierarchy, root: {
479 is_insp2: true,
480 });
481 });
482 })
483 .await;
484 }
485
486 #[fuchsia::test]
487 async fn dropping_tree_removes_component_identity_from_repo() {
488 let identity: Arc<ComponentIdentity> = Arc::new(vec!["a", "b", "foo.cm"].into());
489
490 let mut test = TestHarness::new(vec![Arc::clone(&identity)]);
491
492 let tree = test.serve(
493 Arc::clone(&identity),
494 Inspector::default(),
495 TreeServerSendPreference::default(),
496 );
497 test.publish(&identity, tree);
498
499 test.stop_all().await;
500
501 test.assert(&identity, [test.published_koids()[0]], |handles: [_; 1]| {
503 assert_eq!(handles.len(), 1);
504 async {}
505 })
506 .await;
507
508 test.drop_tree_servers(&identity).await;
509
510 test.wait_until_gone(&identity).await;
513 }
514}