1use crate::accessor::PerformanceConfig;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::inspect::container::{ReadSnapshot, SnapshotData, UnpopulatedInspectDataContainer};
8use crate::pipeline::{ComponentAllowlist, PrivacyExplicitOption};
9use diagnostics_data::{self as schema, Data, Inspect, InspectDataBuilder, InspectHandleName};
10use diagnostics_hierarchy::{DiagnosticsHierarchy, HierarchyMatcher};
11use fidl_fuchsia_diagnostics::Selector;
12use fidl_fuchsia_inspect::DEFAULT_TREE_NAME;
13use fuchsia_inspect::reader::PartialNodeHierarchy;
14use fuchsia_trace as ftrace;
15use futures::prelude::*;
16use log::error;
17use selectors::SelectorExt;
18use std::sync::Arc;
19
20pub mod collector;
21pub mod container;
22pub mod repository;
23pub mod servers;
24
25use container::PopulatedInspectDataContainer;
26
27pub struct NodeHierarchyData {
30 name: Option<InspectHandleName>,
32 timestamp: zx::BootInstant,
34 errors: Vec<schema::InspectError>,
36 hierarchy: Option<DiagnosticsHierarchy>,
39 escrowed: bool,
41}
42
43impl From<SnapshotData> for NodeHierarchyData {
44 fn from(data: SnapshotData) -> NodeHierarchyData {
45 match data.snapshot {
46 Some(snapshot) => match convert_snapshot_to_node_hierarchy(snapshot) {
47 Ok(node_hierarchy) => NodeHierarchyData {
48 name: data.name,
49 timestamp: data.timestamp,
50 errors: data.errors,
51 hierarchy: Some(node_hierarchy),
52 escrowed: data.escrowed,
53 },
54 Err(e) => NodeHierarchyData {
55 name: data.name,
56 timestamp: data.timestamp,
57 errors: vec![schema::InspectError { message: format!("{e:?}") }],
58 hierarchy: None,
59 escrowed: data.escrowed,
60 },
61 },
62 None => NodeHierarchyData {
63 name: data.name,
64 timestamp: data.timestamp,
65 errors: data.errors,
66 hierarchy: None,
67 escrowed: data.escrowed,
68 },
69 }
70 }
71}
72
73pub struct ReaderServer {
76 selectors: Option<Vec<Selector>>,
79}
80
81fn convert_snapshot_to_node_hierarchy(
82 snapshot: ReadSnapshot,
83) -> Result<DiagnosticsHierarchy, fuchsia_inspect::reader::ReaderError> {
84 match snapshot {
85 ReadSnapshot::Single(snapshot) => Ok(PartialNodeHierarchy::try_from(snapshot)?.into()),
86 ReadSnapshot::Tree(snapshot_tree) => snapshot_tree.try_into(),
87 ReadSnapshot::Finished(hierarchy) => Ok(hierarchy),
88 }
89}
90
91impl ReaderServer {
92 pub fn stream(
94 unpopulated_diagnostics_sources: Vec<UnpopulatedInspectDataContainer>,
95 performance_configuration: PerformanceConfig,
96 selectors: Option<Vec<Selector>>,
97 stats: Arc<BatchIteratorConnectionStats>,
98 parent_trace_id: ftrace::Id,
99 ) -> impl Stream<Item = Data<Inspect>> + Send + 'static {
100 let server = Arc::new(Self { selectors });
101
102 let batch_timeout = performance_configuration.batch_timeout_sec;
103 let maximum_concurrent_snapshots_per_reader =
104 performance_configuration.maximum_concurrent_snapshots_per_reader;
105
106 futures::stream::iter(unpopulated_diagnostics_sources)
107 .map(move |unpopulated| {
108 let global_stats = Arc::clone(stats.global_stats());
109 unpopulated.populate(batch_timeout, global_stats, parent_trace_id)
110 })
111 .flatten()
112 .map(future::ready)
113 .buffer_unordered(maximum_concurrent_snapshots_per_reader as usize)
115 .filter_map(move |populated| {
117 let server_clone = Arc::clone(&server);
118 async move { server_clone.filter_snapshot(populated, parent_trace_id) }
119 })
120 }
121
122 fn filter_single_components_snapshot(
123 snapshot_data: SnapshotData,
124 static_allowlist: ComponentAllowlist,
125 client_matcher: Option<HierarchyMatcher>,
126 moniker: &str,
127 parent_trace_id: ftrace::Id,
128 ) -> Option<NodeHierarchyData> {
129 let filename = snapshot_data.name.clone();
130 let node_hierarchy_data = {
131 let unfiltered_node_hierarchy_data: NodeHierarchyData = {
132 let trace_id = ftrace::Id::random();
133 let _trace_guard = ftrace::async_enter!(
134 trace_id,
135 TRACE_CATEGORY,
136 c"SnapshotData -> NodeHierarchyData",
137 "parent_trace_id" => u64::from(parent_trace_id),
140 "trace_id" => u64::from(trace_id),
141 "moniker" => moniker,
142 "filename" => filename
143 .as_ref()
144 .and_then(InspectHandleName::as_filename)
145 .unwrap_or(""),
146 "name" => filename
147 .as_ref()
148 .and_then(InspectHandleName::as_name)
149 .unwrap_or("")
150 );
151 snapshot_data.into()
152 };
153
154 let handle_name = unfiltered_node_hierarchy_data
155 .name
156 .as_ref()
157 .map(|name| name.as_ref())
158 .unwrap_or(DEFAULT_TREE_NAME);
159 match static_allowlist.matcher(handle_name) {
160 PrivacyExplicitOption::Found(matcher) => {
161 let Some(node_hierarchy) = unfiltered_node_hierarchy_data.hierarchy else {
162 return Some(unfiltered_node_hierarchy_data);
163 };
164 let trace_id = ftrace::Id::random();
165 let _trace_guard = ftrace::async_enter!(
166 trace_id,
167 TRACE_CATEGORY,
168 c"ReaderServer::filter_single_components_snapshot.filter_hierarchy",
169 "parent_trace_id" => u64::from(parent_trace_id),
172 "trace_id" => u64::from(trace_id),
173 "moniker" => moniker,
174 "filename" => unfiltered_node_hierarchy_data
175 .name
176 .as_ref()
177 .and_then(InspectHandleName::as_filename)
178 .unwrap_or(""),
179 "name" => unfiltered_node_hierarchy_data
180 .name
181 .as_ref()
182 .and_then(InspectHandleName::as_name)
183 .unwrap_or(""),
184 "selector_type" => "static"
185 );
186 let filtered_hierarchy =
187 diagnostics_hierarchy::filter_hierarchy(node_hierarchy, matcher)?;
188 NodeHierarchyData {
189 name: unfiltered_node_hierarchy_data.name,
190 timestamp: unfiltered_node_hierarchy_data.timestamp,
191 errors: unfiltered_node_hierarchy_data.errors,
192 hierarchy: Some(filtered_hierarchy),
193 escrowed: unfiltered_node_hierarchy_data.escrowed,
194 }
195 }
196 PrivacyExplicitOption::NotFound => return None,
197 PrivacyExplicitOption::FilteringDisabled => unfiltered_node_hierarchy_data,
198 }
199 };
200
201 let Some(dynamic_matcher) = client_matcher else {
202 return Some(node_hierarchy_data);
207 };
208 let Some(node_hierarchy) = node_hierarchy_data.hierarchy else {
209 return Some(node_hierarchy_data);
210 };
211
212 let trace_id = ftrace::Id::random();
213 let _trace_guard = ftrace::async_enter!(
214 trace_id,
215 TRACE_CATEGORY,
216 c"ReaderServer::filter_single_components_snapshot.filter_hierarchy",
217 "parent_trace_id" => u64::from(parent_trace_id),
220 "trace_id" => u64::from(trace_id),
221 "moniker" => moniker,
222 "filename" => {
223 node_hierarchy_data
224 .name
225 .as_ref()
226 .and_then(InspectHandleName::as_filename)
227 .unwrap_or("")
228 },
229 "name" => {
230 node_hierarchy_data
231 .name
232 .as_ref()
233 .and_then(InspectHandleName::as_name)
234 .unwrap_or("")
235 },
236 "selector_type" => "client"
237 );
238 diagnostics_hierarchy::filter_hierarchy(node_hierarchy, &dynamic_matcher).map(
239 |filtered_hierarchy| NodeHierarchyData {
240 name: node_hierarchy_data.name,
241 timestamp: node_hierarchy_data.timestamp,
242 errors: node_hierarchy_data.errors,
243 hierarchy: Some(filtered_hierarchy),
244 escrowed: node_hierarchy_data.escrowed,
245 },
246 )
247 }
248
249 fn filter_snapshot(
257 &self,
258 pumped_inspect_data: PopulatedInspectDataContainer,
259 parent_trace_id: ftrace::Id,
260 ) -> Option<Data<Inspect>> {
261 let mut client_selectors: Option<HierarchyMatcher> = None;
265
266 if let Some(configured_selectors) = &self.selectors {
271 client_selectors = {
272 let matching_selectors = pumped_inspect_data
273 .identity
274 .moniker
275 .match_against_selectors(configured_selectors.as_slice())
276 .collect::<Result<Vec<_>, _>>()
277 .unwrap_or_else(|err| {
278 error!(
279 moniker:? = pumped_inspect_data.identity.moniker, err:?;
280 "Failed to evaluate client selectors",
281 );
282 Vec::new()
283 });
284
285 if matching_selectors.is_empty() {
286 None
287 } else {
288 match matching_selectors.try_into() {
289 Ok(hierarchy_matcher) => Some(hierarchy_matcher),
290 Err(e) => {
291 error!(e:?; "Failed to create hierarchy matcher");
292 None
293 }
294 }
295 }
296 };
297
298 client_selectors.as_ref()?;
302 }
303
304 let identity = Arc::clone(&pumped_inspect_data.identity);
305
306 let hierarchy_data = ReaderServer::filter_single_components_snapshot(
307 pumped_inspect_data.snapshot,
308 pumped_inspect_data.component_allowlist,
309 client_selectors,
310 identity.to_string().as_str(),
311 parent_trace_id,
312 )?;
313 let mut builder = InspectDataBuilder::new(
314 pumped_inspect_data.identity.moniker.clone(),
315 identity.url.clone(),
316 hierarchy_data.timestamp,
317 );
318 if let Some(hierarchy) = hierarchy_data.hierarchy {
319 builder = builder.with_hierarchy(hierarchy);
320 }
321 if let Some(name) = hierarchy_data.name {
322 builder = builder.with_name(name);
323 }
324 if !hierarchy_data.errors.is_empty() {
325 builder = builder.with_errors(hierarchy_data.errors);
326 }
327 Some(builder.escrowed(hierarchy_data.escrowed).build())
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334 use crate::accessor::BatchIterator;
335 use crate::diagnostics::AccessorStats;
336 use crate::events::router::EventConsumer;
337 use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
338 use crate::identity::ComponentIdentity;
339 use crate::inspect::container::InspectHandle;
340 use crate::inspect::repository::InspectRepository;
341 use crate::inspect::servers::InspectSinkServer;
342 use crate::pipeline::Pipeline;
343 use diagnostics_assertions::{assert_data_tree, AnyProperty};
344 use fidl::endpoints::{create_proxy_and_stream, ClientEnd};
345 use fidl_fuchsia_diagnostics::{BatchIteratorMarker, BatchIteratorProxy, Format, StreamMode};
346 use fidl_fuchsia_inspect::{
347 InspectSinkMarker, InspectSinkPublishRequest, TreeMarker, TreeProxy,
348 };
349 use fuchsia_async::{self as fasync, Task};
350 use fuchsia_inspect::{Inspector, InspectorConfig};
351
352 use futures::StreamExt;
353 use inspect_runtime::{service, TreeServerSendPreference};
354 use moniker::ExtendedMoniker;
355 use selectors::VerboseError;
356 use serde_json::json;
357 use std::collections::HashMap;
358 use test_case::test_case;
359 use zx::Peered;
360
361 const TEST_URL: &str = "fuchsia-pkg://test";
362 const BATCH_RETRIEVAL_TIMEOUT_SECONDS: i64 = 300;
363
364 #[fuchsia::test]
365 async fn read_server_formatting_tree_inspect_sink() {
366 let inspector = inspector_for_reader_test();
367 let scope = fasync::Scope::new();
368 let tree_client =
369 service::spawn_tree_server(inspector, TreeServerSendPreference::default(), &scope);
370 verify_reader(tree_client).await;
371 }
372
373 #[fuchsia::test]
374 async fn reader_server_reports_errors() {
375 let vmo = zx::Vmo::create(4096).unwrap();
377 let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
378 let scope = fasync::Scope::new();
379 let tree_client =
380 service::spawn_tree_server(inspector, TreeServerSendPreference::default(), &scope);
381 let (done0, done1) = zx::Channel::create();
382 std::thread::spawn(move || {
385 let done = done1;
386 let mut executor = fasync::LocalExecutor::new();
387 executor.run_singlethreaded(async {
388 verify_reader_with_mode(tree_client, VerifyMode::ExpectComponentFailure).await;
389 done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer");
390 });
391 });
392
393 fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap();
394 }
395
396 #[test_case(vec![63, 65], vec![64, 64] ; "merge_errorful_component_into_next_batch")]
397 #[test_case(vec![64, 65, 64, 64], vec![64, 64, 64, 64, 1] ; "errorful_component_doesnt_halt_iteration")]
398 #[test_case(vec![65], vec![64, 1] ; "component_with_more_than_max_batch_size_is_split_in_two")]
399 #[test_case(vec![1usize; 64], vec![64] ; "sixty_four_vmos_packed_into_one_batch")]
400 #[test_case(vec![64, 63, 1], vec![64, 64] ; "max_batch_intact_two_batches_merged")]
401 #[test_case(vec![33, 33, 33], vec![64, 35] ; "three_directories_two_batches")]
402 #[fuchsia::test]
403 async fn stress_test_diagnostics_repository(
404 component_handle_counts: Vec<usize>,
405 expected_batch_results: Vec<usize>,
406 ) {
407 let component_name_handle_counts: Vec<(String, usize)> = component_handle_counts
408 .into_iter()
409 .enumerate()
410 .map(|(index, handle_count)| (format!("diagnostics_{index}"), handle_count))
411 .collect();
412
413 let inspector = inspector_for_reader_test();
414
415 let mut clients = HashMap::<String, Vec<TreeProxy>>::new();
416 let scope = fasync::Scope::new();
417 for (component_name, handle_count) in component_name_handle_counts.clone() {
418 for _ in 0..handle_count {
419 let inspector_dup = Inspector::new(
420 InspectorConfig::default()
421 .vmo(inspector.duplicate_vmo().expect("failed to duplicate vmo")),
422 );
423 let client = service::spawn_tree_server(
424 inspector_dup,
425 TreeServerSendPreference::default(),
426 &scope,
427 );
428 clients.entry(component_name.clone()).or_default().push(client.into_proxy());
429 }
430 }
431
432 let pipeline = Arc::new(Pipeline::for_test(None));
433 let inspect_repo =
434 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], fasync::Scope::new()));
435
436 for (component, handles) in clients {
437 let moniker = ExtendedMoniker::parse_str(&component).unwrap();
438 let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
439 for (i, handle) in handles.into_iter().enumerate() {
440 inspect_repo.add_inspect_handle(
441 Arc::clone(&identity),
442 InspectHandle::tree(handle, Some(format!("tree_{i}"))),
443 );
444 }
445 }
446
447 let inspector = Inspector::default();
448 let root = inspector.root();
449 let test_archive_accessor_node = root.create_child("test_archive_accessor_node");
450
451 let test_accessor_stats = Arc::new(AccessorStats::new(test_archive_accessor_node));
452 let test_batch_iterator_stats1 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
453
454 let _result_json = read_snapshot_verify_batch_count_and_batch_size(
455 Arc::clone(&inspect_repo),
456 Arc::clone(&pipeline),
457 expected_batch_results,
458 test_batch_iterator_stats1,
459 )
460 .await;
461 }
462
463 fn inspector_for_reader_test() -> Inspector {
464 let inspector = Inspector::default();
465 let root = inspector.root();
466 let child_1 = root.create_child("child_1");
467 child_1.record_int("some-int", 2);
468 let child_1_1 = child_1.create_child("child_1_1");
469 child_1_1.record_int("some-int", 3);
470 child_1_1.record_int("not-wanted-int", 4);
471 root.record(child_1_1);
472 root.record(child_1);
473 let child_2 = root.create_child("child_2");
474 child_2.record_int("some-int", 2);
475 root.record(child_2);
476 inspector
477 }
478
479 enum VerifyMode {
480 ExpectSuccess,
481 ExpectComponentFailure,
482 }
483
484 async fn verify_reader(tree_client: ClientEnd<TreeMarker>) {
487 verify_reader_with_mode(tree_client, VerifyMode::ExpectSuccess).await;
488 }
489
490 async fn verify_reader_with_mode(tree_client: ClientEnd<TreeMarker>, mode: VerifyMode) {
491 let child_1_1_selector =
492 selectors::parse_selector::<VerboseError>(r#"*:root/child_1/*:some-int"#).unwrap();
493 let child_2_selector =
494 selectors::parse_selector::<VerboseError>(r#"test_component:root/child_2:*"#).unwrap();
495
496 let static_selectors_opt = Some(vec![child_1_1_selector, child_2_selector]);
497
498 let pipeline = Arc::new(Pipeline::for_test(static_selectors_opt));
499 let inspect_repo =
500 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], fasync::Scope::new()));
501
502 let component_id = ExtendedMoniker::parse_str("./test_component").unwrap();
505 let inspector = Inspector::default();
506 let root = inspector.root();
507 let test_archive_accessor_node = root.create_child("test_archive_accessor_node");
508
509 assert_data_tree!(inspector, root: {test_archive_accessor_node: {}});
510
511 let test_accessor_stats = Arc::new(AccessorStats::new(test_archive_accessor_node));
512
513 let test_batch_iterator_stats1 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
514
515 assert_data_tree!(inspector, root: {
516 test_archive_accessor_node: {
517 connections_closed: 0u64,
518 connections_opened: 0u64,
519 inspect: {
520 batch_iterator_connections: {
521 "0": {
522 get_next: {
523 terminal_responses: 0u64,
524 responses: 0u64,
525 requests: 0u64,
526 }
527 }
528 },
529 batch_iterator: {
530 connections_closed: 0u64,
531 connections_opened: 0u64,
532 get_next: {
533 time_usec: AnyProperty,
534 requests: 0u64,
535 responses: 0u64,
536 result_count: 0u64,
537 result_errors: 0u64,
538 }
539 },
540 component_timeouts_count: 0u64,
541 reader_servers_constructed: 1u64,
542 reader_servers_destroyed: 0u64,
543 schema_truncation_count: 0u64,
544 max_snapshot_sizes_bytes: AnyProperty,
545 snapshot_schema_truncation_percentage: AnyProperty,
546 },
547 logs: {
548 batch_iterator_connections: {},
549 batch_iterator: {
550 connections_closed: 0u64,
551 connections_opened: 0u64,
552 get_next: {
553 requests: 0u64,
554 responses: 0u64,
555 result_count: 0u64,
556 result_errors: 0u64,
557 time_usec: AnyProperty,
558 }
559 },
560 component_timeouts_count: 0u64,
561 reader_servers_constructed: 0u64,
562 reader_servers_destroyed: 0u64,
563 max_snapshot_sizes_bytes: AnyProperty,
564 snapshot_schema_truncation_percentage: AnyProperty,
565 schema_truncation_count: 0u64,
566 },
567 stream_diagnostics_requests: 0u64,
568 },
569 });
570
571 let inspector_arc = Arc::new(inspector);
572
573 let identity = Arc::new(ComponentIdentity::new(component_id, TEST_URL));
574
575 let (proxy, request_stream) = create_proxy_and_stream::<InspectSinkMarker>();
576 proxy
577 .publish(InspectSinkPublishRequest { tree: Some(tree_client), ..Default::default() })
578 .unwrap();
579
580 let scope = fasync::Scope::new();
581 let inspect_sink_server =
582 Arc::new(InspectSinkServer::new(Arc::clone(&inspect_repo), scope.new_child()));
583 Arc::clone(&inspect_sink_server).handle(Event {
584 timestamp: zx::BootInstant::get(),
585 payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
586 component: Arc::clone(&identity),
587 request_stream,
588 }),
589 });
590
591 drop(proxy);
592
593 scope.close().await;
594
595 let expected_get_next_result_errors = match mode {
596 VerifyMode::ExpectComponentFailure => 1u64,
597 _ => 0u64,
598 };
599
600 {
601 let result_json = read_snapshot(
602 Arc::clone(&inspect_repo),
603 Arc::clone(&pipeline),
604 Arc::clone(&inspector_arc),
605 test_batch_iterator_stats1,
606 )
607 .await;
608
609 let result_array = result_json.as_array().expect("unit test json should be array.");
610 assert_eq!(result_array.len(), 1, "Expect only one schema to be returned.");
611
612 let result_map =
613 result_array[0].as_object().expect("entries in the schema array are json objects.");
614
615 let result_payload =
616 result_map.get("payload").expect("diagnostics schema requires payload entry.");
617
618 let expected_payload = match mode {
619 VerifyMode::ExpectSuccess => json!({
620 "root": {
621 "child_1": {
622 "child_1_1": {
623 "some-int": 3
624 }
625 },
626 "child_2": {
627 "some-int": 2
628 }
629 }
630 }),
631 VerifyMode::ExpectComponentFailure => json!(null),
632 };
633 assert_eq!(*result_payload, expected_payload);
634
635 assert_data_tree!(Arc::clone(&inspector_arc), root: {
638 test_archive_accessor_node: {
639 connections_closed: 0u64,
640 connections_opened: 0u64,
641 inspect: {
642 batch_iterator_connections: {},
643 batch_iterator: {
644 connections_closed: 1u64,
645 connections_opened: 1u64,
646 get_next: {
647 time_usec: AnyProperty,
648 requests: 2u64,
649 responses: 2u64,
650 result_count: 1u64,
651 result_errors: expected_get_next_result_errors,
652 }
653 },
654 component_timeouts_count: 0u64,
655 component_time_usec: AnyProperty,
656 reader_servers_constructed: 1u64,
657 reader_servers_destroyed: 1u64,
658 schema_truncation_count: 0u64,
659 max_snapshot_sizes_bytes: AnyProperty,
660 snapshot_schema_truncation_percentage: AnyProperty,
661 longest_processing_times: contains {
662 "test_component": contains {
663 "@time": AnyProperty,
664 duration_seconds: AnyProperty,
665 }
666 },
667 },
668 logs: {
669 batch_iterator_connections: {},
670 batch_iterator: {
671 connections_closed: 0u64,
672 connections_opened: 0u64,
673 get_next: {
674 requests: 0u64,
675 responses: 0u64,
676 result_count: 0u64,
677 result_errors: 0u64,
678 time_usec: AnyProperty,
679 }
680 },
681 component_timeouts_count: 0u64,
682 reader_servers_constructed: 0u64,
683 reader_servers_destroyed: 0u64,
684 max_snapshot_sizes_bytes: AnyProperty,
685 snapshot_schema_truncation_percentage: AnyProperty,
686 schema_truncation_count: 0u64,
687 },
688 stream_diagnostics_requests: 0u64,
689 },
690 });
691 }
692
693 let test_batch_iterator_stats2 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
694
695 inspect_repo.terminate_inspect(identity);
696 {
697 let result_json = read_snapshot(
698 Arc::clone(&inspect_repo),
699 Arc::clone(&pipeline),
700 Arc::clone(&inspector_arc),
701 test_batch_iterator_stats2,
702 )
703 .await;
704
705 let result_array = result_json.as_array().expect("unit test json should be array.");
706 assert_eq!(result_array.len(), 0, "Expect no schemas to be returned.");
707
708 assert_data_tree!(Arc::clone(&inspector_arc), root: {
709 test_archive_accessor_node: {
710 connections_closed: 0u64,
711 connections_opened: 0u64,
712 inspect: {
713 batch_iterator_connections: {},
714 batch_iterator: {
715 connections_closed: 2u64,
716 connections_opened: 2u64,
717 get_next: {
718 time_usec: AnyProperty,
719 requests: 3u64,
720 responses: 3u64,
721 result_count: 1u64,
722 result_errors: expected_get_next_result_errors,
723 }
724 },
725 component_timeouts_count: 0u64,
726 component_time_usec: AnyProperty,
727 reader_servers_constructed: 2u64,
728 reader_servers_destroyed: 2u64,
729 schema_truncation_count: 0u64,
730 max_snapshot_sizes_bytes: AnyProperty,
731 snapshot_schema_truncation_percentage: AnyProperty,
732 longest_processing_times: contains {
733 "test_component": contains {
734 "@time": AnyProperty,
735 duration_seconds: AnyProperty,
736 }
737 },
738 },
739 logs: {
740 batch_iterator_connections: {},
741 batch_iterator: {
742 connections_closed: 0u64,
743 connections_opened: 0u64,
744 get_next: {
745 requests: 0u64,
746 responses: 0u64,
747 result_count: 0u64,
748 result_errors: 0u64,
749 time_usec: AnyProperty,
750 }
751 },
752 component_timeouts_count: 0u64,
753 reader_servers_constructed: 0u64,
754 reader_servers_destroyed: 0u64,
755 max_snapshot_sizes_bytes: AnyProperty,
756 snapshot_schema_truncation_percentage: AnyProperty,
757 schema_truncation_count: 0u64,
758 },
759 stream_diagnostics_requests: 0u64,
760 },
761 });
762 }
763 }
764
765 fn start_snapshot(
766 inspect_repo: Arc<InspectRepository>,
767 pipeline: Arc<Pipeline>,
768 stats: Arc<BatchIteratorConnectionStats>,
769 ) -> (BatchIteratorProxy, Task<()>) {
770 let test_performance_config = PerformanceConfig {
771 batch_timeout_sec: BATCH_RETRIEVAL_TIMEOUT_SECONDS,
772 aggregated_content_limit_bytes: None,
773 maximum_concurrent_snapshots_per_reader: 4,
774 };
775
776 let trace_id = ftrace::Id::random();
777 let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
778 let reader_server = ReaderServer::stream(
779 inspect_repo.fetch_inspect_data(&None, static_hierarchy_allowlist),
780 test_performance_config,
781 None,
783 Arc::clone(&stats),
784 trace_id,
785 );
786 let (consumer, batch_iterator_requests) = create_proxy_and_stream::<BatchIteratorMarker>();
787 (
788 consumer,
789 Task::spawn(async {
790 BatchIterator::new(
791 reader_server,
792 batch_iterator_requests.peekable(),
793 StreamMode::Snapshot,
794 stats,
795 None,
796 ftrace::Id::random(),
797 Format::Json,
798 )
799 .unwrap()
800 .run()
801 .await
802 .unwrap()
803 }),
804 )
805 }
806
807 async fn read_snapshot(
808 inspect_repo: Arc<InspectRepository>,
809 pipeline: Arc<Pipeline>,
810 _test_inspector: Arc<Inspector>,
811 stats: Arc<BatchIteratorConnectionStats>,
812 ) -> serde_json::Value {
813 let (consumer, server) = start_snapshot(inspect_repo, pipeline, stats);
814
815 let mut result_vec: Vec<String> = Vec::new();
816 loop {
817 let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
818 consumer.get_next().await.unwrap().unwrap();
819
820 if next_batch.is_empty() {
821 break;
822 }
823 for formatted_content in next_batch {
824 match formatted_content {
825 fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
826 let mut buf = vec![0; data.size as usize];
827 data.vmo.read(&mut buf, 0).expect("reading vmo");
828 let hierarchy_string = std::str::from_utf8(&buf).unwrap();
829 result_vec.push(hierarchy_string.to_string());
830 }
831 _ => panic!("test only produces json formatted data"),
832 }
833 }
834 }
835
836 drop(consumer);
838 server.await;
839
840 let result_string = format!("[{}]", result_vec.join(","));
841 serde_json::from_str(&result_string).unwrap_or_else(|_| {
842 panic!("unit tests shouldn't be creating malformed json: {result_string}")
843 })
844 }
845
846 async fn read_snapshot_verify_batch_count_and_batch_size(
847 inspect_repo: Arc<InspectRepository>,
848 pipeline: Arc<Pipeline>,
849 expected_batch_sizes: Vec<usize>,
850 stats: Arc<BatchIteratorConnectionStats>,
851 ) -> serde_json::Value {
852 let (consumer, server) = start_snapshot(inspect_repo, pipeline, stats);
853
854 let mut result_vec: Vec<String> = Vec::new();
855 let mut batch_counts = Vec::new();
856 loop {
857 let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
858 consumer.get_next().await.unwrap().unwrap();
859
860 if next_batch.is_empty() {
861 assert_eq!(expected_batch_sizes, batch_counts);
862 break;
863 }
864
865 batch_counts.push(next_batch.len());
866
867 for formatted_content in next_batch {
868 match formatted_content {
869 fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
870 let mut buf = vec![0; data.size as usize];
871 data.vmo.read(&mut buf, 0).expect("reading vmo");
872 let hierarchy_string = std::str::from_utf8(&buf).unwrap();
873 result_vec.push(hierarchy_string.to_string());
874 }
875 _ => panic!("test only produces json formatted data"),
876 }
877 }
878 }
879
880 drop(consumer);
882 server.await;
883
884 let result_string = format!("[{}]", result_vec.join(","));
885 serde_json::from_str(&result_string).unwrap_or_else(|_| {
886 panic!("unit tests shouldn't be creating malformed json: {result_string}")
887 })
888 }
889}