1use crate::accessor::PerformanceConfig;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::inspect::container::{ReadSnapshot, SnapshotData, UnpopulatedInspectDataContainer};
8use crate::pipeline::{ComponentAllowlist, PrivacyExplicitOption};
9use diagnostics_data::{self as schema, Data, Inspect, InspectDataBuilder, InspectHandleName};
10use diagnostics_hierarchy::{DiagnosticsHierarchy, HierarchyMatcher};
11use fidl_fuchsia_diagnostics::Selector;
12use fidl_fuchsia_inspect::DEFAULT_TREE_NAME;
13use fuchsia_inspect::reader::PartialNodeHierarchy;
14use fuchsia_trace as ftrace;
15use futures::prelude::*;
16use log::error;
17use selectors::SelectorExt;
18use std::sync::Arc;
19
20pub mod collector;
21pub mod container;
22pub mod repository;
23pub mod servers;
24
25use container::PopulatedInspectDataContainer;
26
27#[derive(Debug)]
30pub struct NodeHierarchyData {
31 name: Option<InspectHandleName>,
33 timestamp: zx::BootInstant,
35 errors: Vec<schema::InspectError>,
37 hierarchy: Option<DiagnosticsHierarchy>,
40 escrowed: bool,
42}
43
44impl From<SnapshotData> for NodeHierarchyData {
45 fn from(data: SnapshotData) -> NodeHierarchyData {
46 match data.snapshot {
47 Some(snapshot) => match convert_snapshot_to_node_hierarchy(snapshot) {
48 Ok(node_hierarchy) => {
49 let errors = data
50 .errors
51 .into_iter()
52 .chain(node_hierarchy.error_iter().filter_map(|(key, mv)| {
53 mv.map(|mv| schema::InspectError {
54 message: format!(
55 "Lazy value {} could not be loaded: {:?}",
56 key.join("/"),
57 mv.reason
58 ),
59 })
60 }))
61 .collect();
62 NodeHierarchyData {
63 name: data.name,
64 timestamp: data.timestamp,
65 errors,
66 hierarchy: Some(node_hierarchy),
67 escrowed: data.escrowed,
68 }
69 }
70 Err(e) => NodeHierarchyData {
71 name: data.name,
72 timestamp: data.timestamp,
73 errors: vec![schema::InspectError { message: format!("{e:?}") }],
74 hierarchy: None,
75 escrowed: data.escrowed,
76 },
77 },
78 None => NodeHierarchyData {
79 name: data.name,
80 timestamp: data.timestamp,
81 errors: data.errors,
82 hierarchy: None,
83 escrowed: data.escrowed,
84 },
85 }
86 }
87}
88
89pub struct ReaderServer {
92 selectors: Option<Vec<Selector>>,
95}
96
97fn convert_snapshot_to_node_hierarchy(
98 snapshot: ReadSnapshot,
99) -> Result<DiagnosticsHierarchy, fuchsia_inspect::reader::ReaderError> {
100 match snapshot {
101 ReadSnapshot::Single(snapshot) => Ok(PartialNodeHierarchy::try_from(snapshot)?.into()),
102 ReadSnapshot::Tree(snapshot_tree) => snapshot_tree.try_into(),
103 ReadSnapshot::Finished(hierarchy) => Ok(hierarchy),
104 }
105}
106
107impl ReaderServer {
108 pub fn stream(
110 unpopulated_diagnostics_sources: Vec<UnpopulatedInspectDataContainer>,
111 performance_configuration: PerformanceConfig,
112 selectors: Option<Vec<Selector>>,
113 stats: Arc<BatchIteratorConnectionStats>,
114 parent_trace_id: ftrace::Id,
115 ) -> impl Stream<Item = Data<Inspect>> + Send + 'static {
116 let server = Arc::new(Self { selectors });
117
118 let batch_timeout = performance_configuration.batch_timeout_sec;
119 let maximum_concurrent_snapshots_per_reader =
120 performance_configuration.maximum_concurrent_snapshots_per_reader;
121
122 futures::stream::iter(unpopulated_diagnostics_sources)
123 .map(move |unpopulated| {
124 let global_stats = Arc::clone(stats.global_stats());
125 unpopulated.populate(batch_timeout, global_stats, parent_trace_id)
126 })
127 .flatten()
128 .map(future::ready)
129 .buffer_unordered(maximum_concurrent_snapshots_per_reader as usize)
131 .filter_map(move |populated| {
133 let server_clone = Arc::clone(&server);
134 async move { server_clone.filter_snapshot(populated, parent_trace_id) }
135 })
136 }
137
138 fn filter_single_components_snapshot(
139 snapshot_data: SnapshotData,
140 static_allowlist: ComponentAllowlist,
141 client_matcher: Option<HierarchyMatcher>,
142 moniker: &str,
143 parent_trace_id: ftrace::Id,
144 ) -> Option<NodeHierarchyData> {
145 let filename = snapshot_data.name.clone();
146 let node_hierarchy_data = {
147 let unfiltered_node_hierarchy_data: NodeHierarchyData = {
148 let trace_id = ftrace::Id::random();
149 let _trace_guard = ftrace::async_enter!(
150 trace_id,
151 TRACE_CATEGORY,
152 c"SnapshotData -> NodeHierarchyData",
153 "parent_trace_id" => u64::from(parent_trace_id),
156 "trace_id" => u64::from(trace_id),
157 "moniker" => moniker,
158 "filename" => filename
159 .as_ref()
160 .and_then(InspectHandleName::as_filename)
161 .unwrap_or(""),
162 "name" => filename
163 .as_ref()
164 .and_then(InspectHandleName::as_name)
165 .unwrap_or("")
166 );
167 snapshot_data.into()
168 };
169
170 let handle_name = unfiltered_node_hierarchy_data
171 .name
172 .as_ref()
173 .map(|name| name.as_ref())
174 .unwrap_or(DEFAULT_TREE_NAME);
175 match static_allowlist.matcher(handle_name) {
176 PrivacyExplicitOption::Found(matcher) => {
177 let Some(node_hierarchy) = unfiltered_node_hierarchy_data.hierarchy else {
178 return Some(unfiltered_node_hierarchy_data);
179 };
180 let trace_id = ftrace::Id::random();
181 let _trace_guard = ftrace::async_enter!(
182 trace_id,
183 TRACE_CATEGORY,
184 c"ReaderServer::filter_single_components_snapshot.filter_hierarchy",
185 "parent_trace_id" => u64::from(parent_trace_id),
188 "trace_id" => u64::from(trace_id),
189 "moniker" => moniker,
190 "filename" => unfiltered_node_hierarchy_data
191 .name
192 .as_ref()
193 .and_then(InspectHandleName::as_filename)
194 .unwrap_or(""),
195 "name" => unfiltered_node_hierarchy_data
196 .name
197 .as_ref()
198 .and_then(InspectHandleName::as_name)
199 .unwrap_or(""),
200 "selector_type" => "static"
201 );
202 let filtered_hierarchy =
203 diagnostics_hierarchy::filter_hierarchy(node_hierarchy, matcher)?;
204 NodeHierarchyData {
205 name: unfiltered_node_hierarchy_data.name,
206 timestamp: unfiltered_node_hierarchy_data.timestamp,
207 errors: unfiltered_node_hierarchy_data.errors,
208 hierarchy: Some(filtered_hierarchy),
209 escrowed: unfiltered_node_hierarchy_data.escrowed,
210 }
211 }
212 PrivacyExplicitOption::NotFound => return None,
213 PrivacyExplicitOption::FilteringDisabled => unfiltered_node_hierarchy_data,
214 }
215 };
216
217 let Some(dynamic_matcher) = client_matcher else {
218 return Some(node_hierarchy_data);
223 };
224 let Some(node_hierarchy) = node_hierarchy_data.hierarchy else {
225 return Some(node_hierarchy_data);
226 };
227
228 let trace_id = ftrace::Id::random();
229 let _trace_guard = ftrace::async_enter!(
230 trace_id,
231 TRACE_CATEGORY,
232 c"ReaderServer::filter_single_components_snapshot.filter_hierarchy",
233 "parent_trace_id" => u64::from(parent_trace_id),
236 "trace_id" => u64::from(trace_id),
237 "moniker" => moniker,
238 "filename" => {
239 node_hierarchy_data
240 .name
241 .as_ref()
242 .and_then(InspectHandleName::as_filename)
243 .unwrap_or("")
244 },
245 "name" => {
246 node_hierarchy_data
247 .name
248 .as_ref()
249 .and_then(InspectHandleName::as_name)
250 .unwrap_or("")
251 },
252 "selector_type" => "client"
253 );
254 diagnostics_hierarchy::filter_hierarchy(node_hierarchy, &dynamic_matcher).map(
255 |filtered_hierarchy| NodeHierarchyData {
256 name: node_hierarchy_data.name,
257 timestamp: node_hierarchy_data.timestamp,
258 errors: node_hierarchy_data.errors,
259 hierarchy: Some(filtered_hierarchy),
260 escrowed: node_hierarchy_data.escrowed,
261 },
262 )
263 }
264
265 fn filter_snapshot(
273 &self,
274 pumped_inspect_data: PopulatedInspectDataContainer,
275 parent_trace_id: ftrace::Id,
276 ) -> Option<Data<Inspect>> {
277 let mut client_selectors: Option<HierarchyMatcher> = None;
281
282 if let Some(configured_selectors) = &self.selectors {
287 client_selectors = {
288 let matching_selectors = pumped_inspect_data
289 .identity
290 .moniker
291 .match_against_selectors(configured_selectors.as_slice())
292 .collect::<Result<Vec<_>, _>>()
293 .unwrap_or_else(|err| {
294 error!(
295 moniker:? = pumped_inspect_data.identity.moniker, err:?;
296 "Failed to evaluate client selectors",
297 );
298 Vec::new()
299 });
300
301 if matching_selectors.is_empty() {
302 None
303 } else {
304 match matching_selectors.try_into() {
305 Ok(hierarchy_matcher) => Some(hierarchy_matcher),
306 Err(e) => {
307 error!(e:?; "Failed to create hierarchy matcher");
308 None
309 }
310 }
311 }
312 };
313
314 client_selectors.as_ref()?;
318 }
319
320 let identity = Arc::clone(&pumped_inspect_data.identity);
321
322 let hierarchy_data = ReaderServer::filter_single_components_snapshot(
323 pumped_inspect_data.snapshot,
324 pumped_inspect_data.component_allowlist,
325 client_selectors,
326 identity.to_string().as_str(),
327 parent_trace_id,
328 )?;
329 let mut builder = InspectDataBuilder::new(
330 pumped_inspect_data.identity.moniker.clone(),
331 identity.url.clone(),
332 hierarchy_data.timestamp,
333 );
334 if let Some(hierarchy) = hierarchy_data.hierarchy {
335 builder = builder.with_hierarchy(hierarchy);
336 }
337 if let Some(name) = hierarchy_data.name {
338 builder = builder.with_name(name);
339 }
340 if !hierarchy_data.errors.is_empty() {
341 builder = builder.with_errors(hierarchy_data.errors);
342 }
343 Some(builder.escrowed(hierarchy_data.escrowed).build())
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use crate::accessor::BatchIterator;
351 use crate::diagnostics::AccessorStats;
352 use crate::events::router::EventConsumer;
353 use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
354 use crate::identity::ComponentIdentity;
355 use crate::inspect::container::InspectHandle;
356 use crate::inspect::repository::InspectRepository;
357 use crate::inspect::servers::InspectSinkServer;
358 use crate::pipeline::Pipeline;
359 use diagnostics_assertions::{assert_data_tree, AnyProperty};
360 use fidl::endpoints::{create_proxy_and_stream, ClientEnd};
361 use fidl_fuchsia_diagnostics::{BatchIteratorMarker, BatchIteratorProxy, Format, StreamMode};
362 use fidl_fuchsia_inspect::{
363 InspectSinkMarker, InspectSinkPublishRequest, TreeMarker, TreeProxy,
364 };
365 use fuchsia_async::{self as fasync, Task};
366 use fuchsia_inspect::{Inspector, InspectorConfig};
367
368 use futures::StreamExt;
369 use inspect_runtime::{service, TreeServerSendPreference};
370 use moniker::ExtendedMoniker;
371 use selectors::VerboseError;
372 use serde_json::json;
373 use std::collections::HashMap;
374 use test_case::test_case;
375 use zx::Peered;
376
377 const TEST_URL: &str = "fuchsia-pkg://test";
378 const BATCH_RETRIEVAL_TIMEOUT_SECONDS: i64 = 300;
379
380 #[fuchsia::test]
381 async fn read_server_formatting_tree_inspect_sink() {
382 let inspector = inspector_for_reader_test();
383 let scope = fasync::Scope::new();
384 let tree_client =
385 service::spawn_tree_server(inspector, TreeServerSendPreference::default(), &scope);
386 verify_reader(tree_client).await;
387 }
388
389 #[fuchsia::test]
390 async fn reader_server_reports_errors() {
391 let vmo = zx::Vmo::create(4096).unwrap();
393 let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
394 let scope = fasync::Scope::new();
395 let tree_client =
396 service::spawn_tree_server(inspector, TreeServerSendPreference::default(), &scope);
397 let (done0, done1) = zx::Channel::create();
398 std::thread::spawn(move || {
401 let done = done1;
402 let mut executor = fasync::LocalExecutor::new();
403 executor.run_singlethreaded(async {
404 verify_reader_with_mode(tree_client, VerifyMode::ExpectComponentFailure).await;
405 done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer");
406 });
407 });
408
409 fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap();
410 }
411
412 #[test_case(vec![63, 65], vec![64, 64] ; "merge_errorful_component_into_next_batch")]
413 #[test_case(vec![64, 65, 64, 64], vec![64, 64, 64, 64, 1] ; "errorful_component_doesnt_halt_iteration")]
414 #[test_case(vec![65], vec![64, 1] ; "component_with_more_than_max_batch_size_is_split_in_two")]
415 #[test_case(vec![1usize; 64], vec![64] ; "sixty_four_vmos_packed_into_one_batch")]
416 #[test_case(vec![64, 63, 1], vec![64, 64] ; "max_batch_intact_two_batches_merged")]
417 #[test_case(vec![33, 33, 33], vec![64, 35] ; "three_directories_two_batches")]
418 #[fuchsia::test]
419 async fn stress_test_diagnostics_repository(
420 component_handle_counts: Vec<usize>,
421 expected_batch_results: Vec<usize>,
422 ) {
423 let component_name_handle_counts: Vec<(String, usize)> = component_handle_counts
424 .into_iter()
425 .enumerate()
426 .map(|(index, handle_count)| (format!("diagnostics_{index}"), handle_count))
427 .collect();
428
429 let inspector = inspector_for_reader_test();
430
431 let mut clients = HashMap::<String, Vec<TreeProxy>>::new();
432 let scope = fasync::Scope::new();
433 for (component_name, handle_count) in component_name_handle_counts.clone() {
434 for _ in 0..handle_count {
435 let inspector_dup = Inspector::new(
436 InspectorConfig::default()
437 .vmo(inspector.duplicate_vmo().expect("failed to duplicate vmo")),
438 );
439 let client = service::spawn_tree_server(
440 inspector_dup,
441 TreeServerSendPreference::default(),
442 &scope,
443 );
444 clients.entry(component_name.clone()).or_default().push(client.into_proxy());
445 }
446 }
447
448 let pipeline = Arc::new(Pipeline::for_test(None));
449 let inspect_repo =
450 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], fasync::Scope::new()));
451
452 for (component, handles) in clients {
453 let moniker = ExtendedMoniker::parse_str(&component).unwrap();
454 let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
455 for (i, handle) in handles.into_iter().enumerate() {
456 inspect_repo.add_inspect_handle(
457 Arc::clone(&identity),
458 InspectHandle::tree(handle, Some(format!("tree_{i}"))),
459 );
460 }
461 }
462
463 let inspector = Inspector::default();
464 let root = inspector.root();
465 let test_archive_accessor_node = root.create_child("test_archive_accessor_node");
466
467 let test_accessor_stats = Arc::new(AccessorStats::new(test_archive_accessor_node));
468 let test_batch_iterator_stats1 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
469
470 let _result_json = read_snapshot_verify_batch_count_and_batch_size(
471 Arc::clone(&inspect_repo),
472 Arc::clone(&pipeline),
473 expected_batch_results,
474 test_batch_iterator_stats1,
475 )
476 .await;
477 }
478
479 fn inspector_for_reader_test() -> Inspector {
480 let inspector = Inspector::default();
481 let root = inspector.root();
482 let child_1 = root.create_child("child_1");
483 child_1.record_int("some-int", 2);
484 let child_1_1 = child_1.create_child("child_1_1");
485 child_1_1.record_int("some-int", 3);
486 child_1_1.record_int("not-wanted-int", 4);
487 root.record(child_1_1);
488 root.record(child_1);
489 let child_2 = root.create_child("child_2");
490 child_2.record_int("some-int", 2);
491 root.record(child_2);
492 inspector
493 }
494
495 enum VerifyMode {
496 ExpectSuccess,
497 ExpectComponentFailure,
498 }
499
500 async fn verify_reader(tree_client: ClientEnd<TreeMarker>) {
503 verify_reader_with_mode(tree_client, VerifyMode::ExpectSuccess).await;
504 }
505
506 async fn verify_reader_with_mode(tree_client: ClientEnd<TreeMarker>, mode: VerifyMode) {
507 let child_1_1_selector =
508 selectors::parse_selector::<VerboseError>(r#"*:root/child_1/*:some-int"#).unwrap();
509 let child_2_selector =
510 selectors::parse_selector::<VerboseError>(r#"test_component:root/child_2:*"#).unwrap();
511
512 let static_selectors_opt = Some(vec![child_1_1_selector, child_2_selector]);
513
514 let pipeline = Arc::new(Pipeline::for_test(static_selectors_opt));
515 let inspect_repo =
516 Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], fasync::Scope::new()));
517
518 let component_id = ExtendedMoniker::parse_str("./test_component").unwrap();
521 let inspector = Inspector::default();
522 let root = inspector.root();
523 let test_archive_accessor_node = root.create_child("test_archive_accessor_node");
524
525 assert_data_tree!(inspector, root: {test_archive_accessor_node: {}});
526
527 let test_accessor_stats = Arc::new(AccessorStats::new(test_archive_accessor_node));
528
529 let test_batch_iterator_stats1 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
530
531 assert_data_tree!(inspector, root: {
532 test_archive_accessor_node: {
533 connections_closed: 0u64,
534 connections_opened: 0u64,
535 inspect: {
536 batch_iterator_connections: {
537 "0": {
538 get_next: {
539 terminal_responses: 0u64,
540 responses: 0u64,
541 requests: 0u64,
542 }
543 }
544 },
545 batch_iterator: {
546 connections_closed: 0u64,
547 connections_opened: 0u64,
548 get_next: {
549 time_usec: AnyProperty,
550 requests: 0u64,
551 responses: 0u64,
552 result_count: 0u64,
553 result_errors: 0u64,
554 }
555 },
556 component_timeouts_count: 0u64,
557 reader_servers_constructed: 1u64,
558 reader_servers_destroyed: 0u64,
559 schema_truncation_count: 0u64,
560 max_snapshot_sizes_bytes: AnyProperty,
561 snapshot_schema_truncation_percentage: AnyProperty,
562 },
563 logs: {
564 batch_iterator_connections: {},
565 batch_iterator: {
566 connections_closed: 0u64,
567 connections_opened: 0u64,
568 get_next: {
569 requests: 0u64,
570 responses: 0u64,
571 result_count: 0u64,
572 result_errors: 0u64,
573 time_usec: AnyProperty,
574 }
575 },
576 component_timeouts_count: 0u64,
577 reader_servers_constructed: 0u64,
578 reader_servers_destroyed: 0u64,
579 max_snapshot_sizes_bytes: AnyProperty,
580 snapshot_schema_truncation_percentage: AnyProperty,
581 schema_truncation_count: 0u64,
582 },
583 stream_diagnostics_requests: 0u64,
584 },
585 });
586
587 let inspector_arc = Arc::new(inspector);
588
589 let identity = Arc::new(ComponentIdentity::new(component_id, TEST_URL));
590
591 let (proxy, request_stream) = create_proxy_and_stream::<InspectSinkMarker>();
592 proxy
593 .publish(InspectSinkPublishRequest { tree: Some(tree_client), ..Default::default() })
594 .unwrap();
595
596 let scope = fasync::Scope::new();
597 let inspect_sink_server =
598 Arc::new(InspectSinkServer::new(Arc::clone(&inspect_repo), scope.new_child()));
599 Arc::clone(&inspect_sink_server).handle(Event {
600 timestamp: zx::BootInstant::get(),
601 payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
602 component: Arc::clone(&identity),
603 request_stream,
604 }),
605 });
606
607 drop(proxy);
608
609 scope.close().await;
610
611 let expected_get_next_result_errors = match mode {
612 VerifyMode::ExpectComponentFailure => 1u64,
613 _ => 0u64,
614 };
615
616 {
617 let result_json = read_snapshot(
618 Arc::clone(&inspect_repo),
619 Arc::clone(&pipeline),
620 Arc::clone(&inspector_arc),
621 test_batch_iterator_stats1,
622 )
623 .await;
624
625 let result_array = result_json.as_array().expect("unit test json should be array.");
626 assert_eq!(result_array.len(), 1, "Expect only one schema to be returned.");
627
628 let result_map =
629 result_array[0].as_object().expect("entries in the schema array are json objects.");
630
631 let result_payload =
632 result_map.get("payload").expect("diagnostics schema requires payload entry.");
633
634 let expected_payload = match mode {
635 VerifyMode::ExpectSuccess => json!({
636 "root": {
637 "child_1": {
638 "child_1_1": {
639 "some-int": 3
640 }
641 },
642 "child_2": {
643 "some-int": 2
644 }
645 }
646 }),
647 VerifyMode::ExpectComponentFailure => json!(null),
648 };
649 assert_eq!(*result_payload, expected_payload);
650
651 assert_data_tree!(Arc::clone(&inspector_arc), root: {
654 test_archive_accessor_node: {
655 connections_closed: 0u64,
656 connections_opened: 0u64,
657 inspect: {
658 batch_iterator_connections: {},
659 batch_iterator: {
660 connections_closed: 1u64,
661 connections_opened: 1u64,
662 get_next: {
663 time_usec: AnyProperty,
664 requests: 2u64,
665 responses: 2u64,
666 result_count: 1u64,
667 result_errors: expected_get_next_result_errors,
668 }
669 },
670 component_timeouts_count: 0u64,
671 component_time_usec: AnyProperty,
672 reader_servers_constructed: 1u64,
673 reader_servers_destroyed: 1u64,
674 schema_truncation_count: 0u64,
675 max_snapshot_sizes_bytes: AnyProperty,
676 snapshot_schema_truncation_percentage: AnyProperty,
677 longest_processing_times: contains {
678 "test_component": contains {
679 "@time": AnyProperty,
680 duration_seconds: AnyProperty,
681 }
682 },
683 },
684 logs: {
685 batch_iterator_connections: {},
686 batch_iterator: {
687 connections_closed: 0u64,
688 connections_opened: 0u64,
689 get_next: {
690 requests: 0u64,
691 responses: 0u64,
692 result_count: 0u64,
693 result_errors: 0u64,
694 time_usec: AnyProperty,
695 }
696 },
697 component_timeouts_count: 0u64,
698 reader_servers_constructed: 0u64,
699 reader_servers_destroyed: 0u64,
700 max_snapshot_sizes_bytes: AnyProperty,
701 snapshot_schema_truncation_percentage: AnyProperty,
702 schema_truncation_count: 0u64,
703 },
704 stream_diagnostics_requests: 0u64,
705 },
706 });
707 }
708
709 let test_batch_iterator_stats2 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
710
711 inspect_repo.terminate_inspect(identity);
712 {
713 let result_json = read_snapshot(
714 Arc::clone(&inspect_repo),
715 Arc::clone(&pipeline),
716 Arc::clone(&inspector_arc),
717 test_batch_iterator_stats2,
718 )
719 .await;
720
721 let result_array = result_json.as_array().expect("unit test json should be array.");
722 assert_eq!(result_array.len(), 0, "Expect no schemas to be returned.");
723
724 assert_data_tree!(Arc::clone(&inspector_arc), root: {
725 test_archive_accessor_node: {
726 connections_closed: 0u64,
727 connections_opened: 0u64,
728 inspect: {
729 batch_iterator_connections: {},
730 batch_iterator: {
731 connections_closed: 2u64,
732 connections_opened: 2u64,
733 get_next: {
734 time_usec: AnyProperty,
735 requests: 3u64,
736 responses: 3u64,
737 result_count: 1u64,
738 result_errors: expected_get_next_result_errors,
739 }
740 },
741 component_timeouts_count: 0u64,
742 component_time_usec: AnyProperty,
743 reader_servers_constructed: 2u64,
744 reader_servers_destroyed: 2u64,
745 schema_truncation_count: 0u64,
746 max_snapshot_sizes_bytes: AnyProperty,
747 snapshot_schema_truncation_percentage: AnyProperty,
748 longest_processing_times: contains {
749 "test_component": contains {
750 "@time": AnyProperty,
751 duration_seconds: AnyProperty,
752 }
753 },
754 },
755 logs: {
756 batch_iterator_connections: {},
757 batch_iterator: {
758 connections_closed: 0u64,
759 connections_opened: 0u64,
760 get_next: {
761 requests: 0u64,
762 responses: 0u64,
763 result_count: 0u64,
764 result_errors: 0u64,
765 time_usec: AnyProperty,
766 }
767 },
768 component_timeouts_count: 0u64,
769 reader_servers_constructed: 0u64,
770 reader_servers_destroyed: 0u64,
771 max_snapshot_sizes_bytes: AnyProperty,
772 snapshot_schema_truncation_percentage: AnyProperty,
773 schema_truncation_count: 0u64,
774 },
775 stream_diagnostics_requests: 0u64,
776 },
777 });
778 }
779 }
780
781 fn start_snapshot(
782 inspect_repo: Arc<InspectRepository>,
783 pipeline: Arc<Pipeline>,
784 stats: Arc<BatchIteratorConnectionStats>,
785 ) -> (BatchIteratorProxy, Task<()>) {
786 let test_performance_config = PerformanceConfig {
787 batch_timeout_sec: BATCH_RETRIEVAL_TIMEOUT_SECONDS,
788 aggregated_content_limit_bytes: None,
789 maximum_concurrent_snapshots_per_reader: 4,
790 };
791
792 let trace_id = ftrace::Id::random();
793 let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
794 let reader_server = ReaderServer::stream(
795 inspect_repo.fetch_inspect_data(&None, static_hierarchy_allowlist),
796 test_performance_config,
797 None,
799 Arc::clone(&stats),
800 trace_id,
801 );
802 let (consumer, batch_iterator_requests) = create_proxy_and_stream::<BatchIteratorMarker>();
803 (
804 consumer,
805 Task::spawn(async {
806 BatchIterator::new(
807 reader_server,
808 batch_iterator_requests.peekable(),
809 StreamMode::Snapshot,
810 stats,
811 None,
812 ftrace::Id::random(),
813 Format::Json,
814 )
815 .unwrap()
816 .run()
817 .await
818 .unwrap()
819 }),
820 )
821 }
822
823 async fn read_snapshot(
824 inspect_repo: Arc<InspectRepository>,
825 pipeline: Arc<Pipeline>,
826 _test_inspector: Arc<Inspector>,
827 stats: Arc<BatchIteratorConnectionStats>,
828 ) -> serde_json::Value {
829 let (consumer, server) = start_snapshot(inspect_repo, pipeline, stats);
830
831 let mut result_vec: Vec<String> = Vec::new();
832 loop {
833 let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
834 consumer.get_next().await.unwrap().unwrap();
835
836 if next_batch.is_empty() {
837 break;
838 }
839 for formatted_content in next_batch {
840 match formatted_content {
841 fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
842 let mut buf = vec![0; data.size as usize];
843 data.vmo.read(&mut buf, 0).expect("reading vmo");
844 let hierarchy_string = std::str::from_utf8(&buf).unwrap();
845 result_vec.push(hierarchy_string.to_string());
846 }
847 _ => panic!("test only produces json formatted data"),
848 }
849 }
850 }
851
852 drop(consumer);
854 server.await;
855
856 let result_string = format!("[{}]", result_vec.join(","));
857 serde_json::from_str(&result_string).unwrap_or_else(|_| {
858 panic!("unit tests shouldn't be creating malformed json: {result_string}")
859 })
860 }
861
862 async fn read_snapshot_verify_batch_count_and_batch_size(
863 inspect_repo: Arc<InspectRepository>,
864 pipeline: Arc<Pipeline>,
865 expected_batch_sizes: Vec<usize>,
866 stats: Arc<BatchIteratorConnectionStats>,
867 ) -> serde_json::Value {
868 let (consumer, server) = start_snapshot(inspect_repo, pipeline, stats);
869
870 let mut result_vec: Vec<String> = Vec::new();
871 let mut batch_counts = Vec::new();
872 loop {
873 let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
874 consumer.get_next().await.unwrap().unwrap();
875
876 if next_batch.is_empty() {
877 assert_eq!(expected_batch_sizes, batch_counts);
878 break;
879 }
880
881 batch_counts.push(next_batch.len());
882
883 for formatted_content in next_batch {
884 match formatted_content {
885 fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
886 let mut buf = vec![0; data.size as usize];
887 data.vmo.read(&mut buf, 0).expect("reading vmo");
888 let hierarchy_string = std::str::from_utf8(&buf).unwrap();
889 result_vec.push(hierarchy_string.to_string());
890 }
891 _ => panic!("test only produces json formatted data"),
892 }
893 }
894 }
895
896 drop(consumer);
898 server.await;
899
900 let result_string = format!("[{}]", result_vec.join(","));
901 serde_json::from_str(&result_string).unwrap_or_else(|_| {
902 panic!("unit tests shouldn't be creating malformed json: {result_string}")
903 })
904 }
905}