archivist_lib/inspect/
mod.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::accessor::PerformanceConfig;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::inspect::container::{ReadSnapshot, SnapshotData, UnpopulatedInspectDataContainer};
8use crate::pipeline::{ComponentAllowlist, PrivacyExplicitOption};
9use diagnostics_data::{self as schema, Data, Inspect, InspectDataBuilder, InspectHandleName};
10use diagnostics_hierarchy::{DiagnosticsHierarchy, HierarchyMatcher};
11use fidl_fuchsia_diagnostics::Selector;
12use fidl_fuchsia_inspect::DEFAULT_TREE_NAME;
13use fuchsia_inspect::reader::PartialNodeHierarchy;
14use fuchsia_trace as ftrace;
15use futures::prelude::*;
16use log::error;
17use selectors::SelectorExt;
18use std::sync::Arc;
19
20pub mod collector;
21pub mod container;
22pub mod repository;
23pub mod servers;
24
25use container::PopulatedInspectDataContainer;
26
27/// Packet containing a node hierarchy and all the metadata needed to
28/// populate a diagnostics schema for that node hierarchy.
29#[derive(Debug)]
30pub struct NodeHierarchyData {
31    // Name of the file that created this snapshot.
32    name: Option<InspectHandleName>,
33    // Timestamp at which this snapshot resolved or failed.
34    timestamp: zx::BootInstant,
35    // Errors encountered when processing this snapshot.
36    errors: Vec<schema::InspectError>,
37    // Optional DiagnosticsHierarchy of the inspect hierarchy, in case reading fails
38    // and we have errors to share with client.
39    hierarchy: Option<DiagnosticsHierarchy>,
40    // Whether or not this data comes from an escrowed VMO.
41    escrowed: bool,
42}
43
44impl From<SnapshotData> for NodeHierarchyData {
45    fn from(data: SnapshotData) -> NodeHierarchyData {
46        match data.snapshot {
47            Some(snapshot) => match convert_snapshot_to_node_hierarchy(snapshot) {
48                Ok(node_hierarchy) => {
49                    let errors = data
50                        .errors
51                        .into_iter()
52                        .chain(node_hierarchy.error_iter().filter_map(|(key, mv)| {
53                            mv.map(|mv| schema::InspectError {
54                                message: format!(
55                                    "Lazy value {} could not be loaded: {:?}",
56                                    key.join("/"),
57                                    mv.reason
58                                ),
59                            })
60                        }))
61                        .collect();
62                    NodeHierarchyData {
63                        name: data.name,
64                        timestamp: data.timestamp,
65                        errors,
66                        hierarchy: Some(node_hierarchy),
67                        escrowed: data.escrowed,
68                    }
69                }
70                Err(e) => NodeHierarchyData {
71                    name: data.name,
72                    timestamp: data.timestamp,
73                    errors: vec![schema::InspectError { message: format!("{e:?}") }],
74                    hierarchy: None,
75                    escrowed: data.escrowed,
76                },
77            },
78            None => NodeHierarchyData {
79                name: data.name,
80                timestamp: data.timestamp,
81                errors: data.errors,
82                hierarchy: None,
83                escrowed: data.escrowed,
84            },
85        }
86    }
87}
88
89/// ReaderServer holds the state and data needed to serve Inspect data
90/// reading requests for a single client.
91pub struct ReaderServer {
92    /// Selectors provided by the client which define what inspect data is returned by read
93    /// requests. A none type implies that all available data should be returned.
94    selectors: Option<Vec<Selector>>,
95}
96
97fn convert_snapshot_to_node_hierarchy(
98    snapshot: ReadSnapshot,
99) -> Result<DiagnosticsHierarchy, fuchsia_inspect::reader::ReaderError> {
100    match snapshot {
101        ReadSnapshot::Single(snapshot) => Ok(PartialNodeHierarchy::try_from(snapshot)?.into()),
102        ReadSnapshot::Tree(snapshot_tree) => snapshot_tree.try_into(),
103        ReadSnapshot::Finished(hierarchy) => Ok(hierarchy),
104    }
105}
106
107impl ReaderServer {
108    /// Create a stream of filtered inspect data, ready to serve.
109    pub fn stream(
110        unpopulated_diagnostics_sources: Vec<UnpopulatedInspectDataContainer>,
111        performance_configuration: PerformanceConfig,
112        selectors: Option<Vec<Selector>>,
113        stats: Arc<BatchIteratorConnectionStats>,
114        parent_trace_id: ftrace::Id,
115    ) -> impl Stream<Item = Data<Inspect>> + Send + 'static {
116        let server = Arc::new(Self { selectors });
117
118        let batch_timeout = performance_configuration.batch_timeout_sec;
119        let maximum_concurrent_snapshots_per_reader =
120            performance_configuration.maximum_concurrent_snapshots_per_reader;
121
122        futures::stream::iter(unpopulated_diagnostics_sources)
123            .map(move |unpopulated| {
124                let global_stats = Arc::clone(stats.global_stats());
125                unpopulated.populate(batch_timeout, global_stats, parent_trace_id)
126            })
127            .flatten()
128            .map(future::ready)
129            // buffer a small number in memory in case later components time out
130            .buffer_unordered(maximum_concurrent_snapshots_per_reader as usize)
131            // filter each component's inspect
132            .filter_map(move |populated| {
133                let server_clone = Arc::clone(&server);
134                async move { server_clone.filter_snapshot(populated, parent_trace_id) }
135            })
136    }
137
138    fn filter_single_components_snapshot(
139        snapshot_data: SnapshotData,
140        static_allowlist: ComponentAllowlist,
141        client_matcher: Option<HierarchyMatcher>,
142        moniker: &str,
143        parent_trace_id: ftrace::Id,
144    ) -> Option<NodeHierarchyData> {
145        let filename = snapshot_data.name.clone();
146        let node_hierarchy_data = {
147            let unfiltered_node_hierarchy_data: NodeHierarchyData = {
148                let trace_id = ftrace::Id::random();
149                let _trace_guard = ftrace::async_enter!(
150                    trace_id,
151                    TRACE_CATEGORY,
152                    c"SnapshotData -> NodeHierarchyData",
153                    // An async duration cannot have multiple concurrent child async durations
154                    // so we include the nonce as metadata to manually determine relationship.
155                    "parent_trace_id" => u64::from(parent_trace_id),
156                    "trace_id" => u64::from(trace_id),
157                    "moniker" => moniker,
158                    "filename" => filename
159                            .as_ref()
160                            .and_then(InspectHandleName::as_filename)
161                            .unwrap_or(""),
162                    "name" => filename
163                            .as_ref()
164                            .and_then(InspectHandleName::as_name)
165                            .unwrap_or("")
166                );
167                snapshot_data.into()
168            };
169
170            let handle_name = unfiltered_node_hierarchy_data
171                .name
172                .as_ref()
173                .map(|name| name.as_ref())
174                .unwrap_or(DEFAULT_TREE_NAME);
175            match static_allowlist.matcher(handle_name) {
176                PrivacyExplicitOption::Found(matcher) => {
177                    let Some(node_hierarchy) = unfiltered_node_hierarchy_data.hierarchy else {
178                        return Some(unfiltered_node_hierarchy_data);
179                    };
180                    let trace_id = ftrace::Id::random();
181                    let _trace_guard = ftrace::async_enter!(
182                        trace_id,
183                        TRACE_CATEGORY,
184                        c"ReaderServer::filter_single_components_snapshot.filter_hierarchy",
185                        // An async duration cannot have multiple concurrent child async durations
186                        // so we include the nonce as metadata to manually determine relationship.
187                        "parent_trace_id" => u64::from(parent_trace_id),
188                        "trace_id" => u64::from(trace_id),
189                        "moniker" => moniker,
190                        "filename"  => unfiltered_node_hierarchy_data
191                                .name
192                                .as_ref()
193                                .and_then(InspectHandleName::as_filename)
194                                .unwrap_or(""),
195                        "name" => unfiltered_node_hierarchy_data
196                                .name
197                                .as_ref()
198                                .and_then(InspectHandleName::as_name)
199                                .unwrap_or(""),
200                        "selector_type" => "static"
201                    );
202                    let filtered_hierarchy =
203                        diagnostics_hierarchy::filter_hierarchy(node_hierarchy, matcher)?;
204                    NodeHierarchyData {
205                        name: unfiltered_node_hierarchy_data.name,
206                        timestamp: unfiltered_node_hierarchy_data.timestamp,
207                        errors: unfiltered_node_hierarchy_data.errors,
208                        hierarchy: Some(filtered_hierarchy),
209                        escrowed: unfiltered_node_hierarchy_data.escrowed,
210                    }
211                }
212                PrivacyExplicitOption::NotFound => return None,
213                PrivacyExplicitOption::FilteringDisabled => unfiltered_node_hierarchy_data,
214            }
215        };
216
217        let Some(dynamic_matcher) = client_matcher else {
218            // If matcher is present, and there was an HierarchyMatcher,
219            // then this means the client provided their own selectors, and a subset of
220            // them matched this component. So we need to filter each of the snapshots from
221            // this component with the dynamically provided components.
222            return Some(node_hierarchy_data);
223        };
224        let Some(node_hierarchy) = node_hierarchy_data.hierarchy else {
225            return Some(node_hierarchy_data);
226        };
227
228        let trace_id = ftrace::Id::random();
229        let _trace_guard = ftrace::async_enter!(
230            trace_id,
231            TRACE_CATEGORY,
232            c"ReaderServer::filter_single_components_snapshot.filter_hierarchy",
233            // An async duration cannot have multiple concurrent child async durations
234            // so we include the nonce as metadata to manually determine relationship.
235            "parent_trace_id" => u64::from(parent_trace_id),
236            "trace_id" => u64::from(trace_id),
237            "moniker" => moniker,
238            "filename" => {
239                node_hierarchy_data
240                    .name
241                    .as_ref()
242                    .and_then(InspectHandleName::as_filename)
243                    .unwrap_or("")
244            },
245            "name" => {
246                node_hierarchy_data
247                    .name
248                    .as_ref()
249                    .and_then(InspectHandleName::as_name)
250                    .unwrap_or("")
251            },
252            "selector_type" => "client"
253        );
254        diagnostics_hierarchy::filter_hierarchy(node_hierarchy, &dynamic_matcher).map(
255            |filtered_hierarchy| NodeHierarchyData {
256                name: node_hierarchy_data.name,
257                timestamp: node_hierarchy_data.timestamp,
258                errors: node_hierarchy_data.errors,
259                hierarchy: Some(filtered_hierarchy),
260                escrowed: node_hierarchy_data.escrowed,
261            },
262        )
263    }
264
265    /// Takes a PopulatedInspectDataContainer and converts all non-error
266    /// results into in-memory node hierarchies. The hierarchies are filtered
267    /// such that the only diagnostics properties they contain are those
268    /// configured by the static and client-provided selectors.
269    ///
270    // TODO(https://fxbug.dev/42122598): Error entries should still be included, but with a custom hierarchy
271    //             that makes it clear to clients that snapshotting failed.
272    fn filter_snapshot(
273        &self,
274        pumped_inspect_data: PopulatedInspectDataContainer,
275        parent_trace_id: ftrace::Id,
276    ) -> Option<Data<Inspect>> {
277        // Since a single PopulatedInspectDataContainer shares a moniker for all pieces of data it
278        // contains, we can store the result of component selector filtering to avoid reapplying
279        // the selectors.
280        let mut client_selectors: Option<HierarchyMatcher> = None;
281
282        // We iterate the vector of pumped inspect data packets, consuming each inspect vmo
283        // and filtering it using the provided selector regular expressions. Each filtered
284        // inspect hierarchy is then added to an accumulator as a HierarchyData to be converted
285        // into a JSON string and returned.
286        if let Some(configured_selectors) = &self.selectors {
287            client_selectors = {
288                let matching_selectors = pumped_inspect_data
289                    .identity
290                    .moniker
291                    .match_against_selectors(configured_selectors.as_slice())
292                    .collect::<Result<Vec<_>, _>>()
293                    .unwrap_or_else(|err| {
294                        error!(
295                            moniker:? = pumped_inspect_data.identity.moniker, err:?;
296                            "Failed to evaluate client selectors",
297                        );
298                        Vec::new()
299                    });
300
301                if matching_selectors.is_empty() {
302                    None
303                } else {
304                    match matching_selectors.try_into() {
305                        Ok(hierarchy_matcher) => Some(hierarchy_matcher),
306                        Err(e) => {
307                            error!(e:?; "Failed to create hierarchy matcher");
308                            None
309                        }
310                    }
311                }
312            };
313
314            // If there were configured matchers and none of them matched
315            // this component, then we should return early since there is no data to
316            // extract.
317            client_selectors.as_ref()?;
318        }
319
320        let identity = Arc::clone(&pumped_inspect_data.identity);
321
322        let hierarchy_data = ReaderServer::filter_single_components_snapshot(
323            pumped_inspect_data.snapshot,
324            pumped_inspect_data.component_allowlist,
325            client_selectors,
326            identity.to_string().as_str(),
327            parent_trace_id,
328        )?;
329        let mut builder = InspectDataBuilder::new(
330            pumped_inspect_data.identity.moniker.clone(),
331            identity.url.clone(),
332            hierarchy_data.timestamp,
333        );
334        if let Some(hierarchy) = hierarchy_data.hierarchy {
335            builder = builder.with_hierarchy(hierarchy);
336        }
337        if let Some(name) = hierarchy_data.name {
338            builder = builder.with_name(name);
339        }
340        if !hierarchy_data.errors.is_empty() {
341            builder = builder.with_errors(hierarchy_data.errors);
342        }
343        Some(builder.escrowed(hierarchy_data.escrowed).build())
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use crate::accessor::BatchIterator;
351    use crate::diagnostics::AccessorStats;
352    use crate::events::router::EventConsumer;
353    use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
354    use crate::identity::ComponentIdentity;
355    use crate::inspect::container::InspectHandle;
356    use crate::inspect::repository::InspectRepository;
357    use crate::inspect::servers::InspectSinkServer;
358    use crate::pipeline::Pipeline;
359    use diagnostics_assertions::{assert_data_tree, AnyProperty};
360    use fidl::endpoints::{create_proxy_and_stream, ClientEnd};
361    use fidl_fuchsia_diagnostics::{BatchIteratorMarker, BatchIteratorProxy, Format, StreamMode};
362    use fidl_fuchsia_inspect::{
363        InspectSinkMarker, InspectSinkPublishRequest, TreeMarker, TreeProxy,
364    };
365    use fuchsia_async::{self as fasync, Task};
366    use fuchsia_inspect::{Inspector, InspectorConfig};
367
368    use futures::StreamExt;
369    use inspect_runtime::{service, TreeServerSendPreference};
370    use moniker::ExtendedMoniker;
371    use selectors::VerboseError;
372    use serde_json::json;
373    use std::collections::HashMap;
374    use test_case::test_case;
375    use zx::Peered;
376
377    const TEST_URL: &str = "fuchsia-pkg://test";
378    const BATCH_RETRIEVAL_TIMEOUT_SECONDS: i64 = 300;
379
380    #[fuchsia::test]
381    async fn read_server_formatting_tree_inspect_sink() {
382        let inspector = inspector_for_reader_test();
383        let scope = fasync::Scope::new();
384        let tree_client =
385            service::spawn_tree_server(inspector, TreeServerSendPreference::default(), &scope);
386        verify_reader(tree_client).await;
387    }
388
389    #[fuchsia::test]
390    async fn reader_server_reports_errors() {
391        // This inspector doesn't contain valid inspect data.
392        let vmo = zx::Vmo::create(4096).unwrap();
393        let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
394        let scope = fasync::Scope::new();
395        let tree_client =
396            service::spawn_tree_server(inspector, TreeServerSendPreference::default(), &scope);
397        let (done0, done1) = zx::Channel::create();
398        // Run the actual test in a separate thread so that it does not block on FS operations.
399        // Use signalling on a zx::Channel to indicate that the test is done.
400        std::thread::spawn(move || {
401            let done = done1;
402            let mut executor = fasync::LocalExecutor::new();
403            executor.run_singlethreaded(async {
404                verify_reader_with_mode(tree_client, VerifyMode::ExpectComponentFailure).await;
405                done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer");
406            });
407        });
408
409        fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap();
410    }
411
412    #[test_case(vec![63, 65], vec![64, 64] ; "merge_errorful_component_into_next_batch")]
413    #[test_case(vec![64, 65, 64, 64], vec![64, 64, 64, 64, 1] ; "errorful_component_doesnt_halt_iteration")]
414    #[test_case(vec![65], vec![64, 1] ; "component_with_more_than_max_batch_size_is_split_in_two")]
415    #[test_case(vec![1usize; 64], vec![64] ; "sixty_four_vmos_packed_into_one_batch")]
416    #[test_case(vec![64, 63, 1], vec![64, 64] ; "max_batch_intact_two_batches_merged")]
417    #[test_case(vec![33, 33, 33], vec![64, 35] ; "three_directories_two_batches")]
418    #[fuchsia::test]
419    async fn stress_test_diagnostics_repository(
420        component_handle_counts: Vec<usize>,
421        expected_batch_results: Vec<usize>,
422    ) {
423        let component_name_handle_counts: Vec<(String, usize)> = component_handle_counts
424            .into_iter()
425            .enumerate()
426            .map(|(index, handle_count)| (format!("diagnostics_{index}"), handle_count))
427            .collect();
428
429        let inspector = inspector_for_reader_test();
430
431        let mut clients = HashMap::<String, Vec<TreeProxy>>::new();
432        let scope = fasync::Scope::new();
433        for (component_name, handle_count) in component_name_handle_counts.clone() {
434            for _ in 0..handle_count {
435                let inspector_dup = Inspector::new(
436                    InspectorConfig::default()
437                        .vmo(inspector.duplicate_vmo().expect("failed to duplicate vmo")),
438                );
439                let client = service::spawn_tree_server(
440                    inspector_dup,
441                    TreeServerSendPreference::default(),
442                    &scope,
443                );
444                clients.entry(component_name.clone()).or_default().push(client.into_proxy());
445            }
446        }
447
448        let pipeline = Arc::new(Pipeline::for_test(None));
449        let inspect_repo =
450            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], fasync::Scope::new()));
451
452        for (component, handles) in clients {
453            let moniker = ExtendedMoniker::parse_str(&component).unwrap();
454            let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
455            for (i, handle) in handles.into_iter().enumerate() {
456                inspect_repo.add_inspect_handle(
457                    Arc::clone(&identity),
458                    InspectHandle::tree(handle, Some(format!("tree_{i}"))),
459                );
460            }
461        }
462
463        let inspector = Inspector::default();
464        let root = inspector.root();
465        let test_archive_accessor_node = root.create_child("test_archive_accessor_node");
466
467        let test_accessor_stats = Arc::new(AccessorStats::new(test_archive_accessor_node));
468        let test_batch_iterator_stats1 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
469
470        let _result_json = read_snapshot_verify_batch_count_and_batch_size(
471            Arc::clone(&inspect_repo),
472            Arc::clone(&pipeline),
473            expected_batch_results,
474            test_batch_iterator_stats1,
475        )
476        .await;
477    }
478
479    fn inspector_for_reader_test() -> Inspector {
480        let inspector = Inspector::default();
481        let root = inspector.root();
482        let child_1 = root.create_child("child_1");
483        child_1.record_int("some-int", 2);
484        let child_1_1 = child_1.create_child("child_1_1");
485        child_1_1.record_int("some-int", 3);
486        child_1_1.record_int("not-wanted-int", 4);
487        root.record(child_1_1);
488        root.record(child_1);
489        let child_2 = root.create_child("child_2");
490        child_2.record_int("some-int", 2);
491        root.record(child_2);
492        inspector
493    }
494
495    enum VerifyMode {
496        ExpectSuccess,
497        ExpectComponentFailure,
498    }
499
500    /// Verify that data can be read via InspectRepository, and that `AccessorStats` are updated
501    /// accordingly.
502    async fn verify_reader(tree_client: ClientEnd<TreeMarker>) {
503        verify_reader_with_mode(tree_client, VerifyMode::ExpectSuccess).await;
504    }
505
506    async fn verify_reader_with_mode(tree_client: ClientEnd<TreeMarker>, mode: VerifyMode) {
507        let child_1_1_selector =
508            selectors::parse_selector::<VerboseError>(r#"*:root/child_1/*:some-int"#).unwrap();
509        let child_2_selector =
510            selectors::parse_selector::<VerboseError>(r#"test_component:root/child_2:*"#).unwrap();
511
512        let static_selectors_opt = Some(vec![child_1_1_selector, child_2_selector]);
513
514        let pipeline = Arc::new(Pipeline::for_test(static_selectors_opt));
515        let inspect_repo =
516            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], fasync::Scope::new()));
517
518        // The moniker here is made up since the selector is a glob
519        // selector, so any path would match.
520        let component_id = ExtendedMoniker::parse_str("./test_component").unwrap();
521        let inspector = Inspector::default();
522        let root = inspector.root();
523        let test_archive_accessor_node = root.create_child("test_archive_accessor_node");
524
525        assert_data_tree!(inspector, root: {test_archive_accessor_node: {}});
526
527        let test_accessor_stats = Arc::new(AccessorStats::new(test_archive_accessor_node));
528
529        let test_batch_iterator_stats1 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
530
531        assert_data_tree!(inspector, root: {
532            test_archive_accessor_node: {
533                connections_closed: 0u64,
534                connections_opened: 0u64,
535                inspect: {
536                    batch_iterator_connections: {
537                        "0": {
538                            get_next: {
539                                terminal_responses: 0u64,
540                                responses: 0u64,
541                                requests: 0u64,
542                            }
543                        }
544                    },
545                    batch_iterator: {
546                        connections_closed: 0u64,
547                        connections_opened: 0u64,
548                        get_next: {
549                            time_usec: AnyProperty,
550                            requests: 0u64,
551                            responses: 0u64,
552                            result_count: 0u64,
553                            result_errors: 0u64,
554                        }
555                    },
556                    component_timeouts_count: 0u64,
557                    reader_servers_constructed: 1u64,
558                    reader_servers_destroyed: 0u64,
559                    schema_truncation_count: 0u64,
560                    max_snapshot_sizes_bytes: AnyProperty,
561                    snapshot_schema_truncation_percentage: AnyProperty,
562                },
563                logs: {
564                    batch_iterator_connections: {},
565                    batch_iterator: {
566                        connections_closed: 0u64,
567                        connections_opened: 0u64,
568                        get_next: {
569                            requests: 0u64,
570                            responses: 0u64,
571                            result_count: 0u64,
572                            result_errors: 0u64,
573                            time_usec: AnyProperty,
574                        }
575                    },
576                    component_timeouts_count: 0u64,
577                    reader_servers_constructed: 0u64,
578                    reader_servers_destroyed: 0u64,
579                    max_snapshot_sizes_bytes: AnyProperty,
580                    snapshot_schema_truncation_percentage: AnyProperty,
581                    schema_truncation_count: 0u64,
582                },
583                stream_diagnostics_requests: 0u64,
584            },
585        });
586
587        let inspector_arc = Arc::new(inspector);
588
589        let identity = Arc::new(ComponentIdentity::new(component_id, TEST_URL));
590
591        let (proxy, request_stream) = create_proxy_and_stream::<InspectSinkMarker>();
592        proxy
593            .publish(InspectSinkPublishRequest { tree: Some(tree_client), ..Default::default() })
594            .unwrap();
595
596        let scope = fasync::Scope::new();
597        let inspect_sink_server =
598            Arc::new(InspectSinkServer::new(Arc::clone(&inspect_repo), scope.new_child()));
599        Arc::clone(&inspect_sink_server).handle(Event {
600            timestamp: zx::BootInstant::get(),
601            payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
602                component: Arc::clone(&identity),
603                request_stream,
604            }),
605        });
606
607        drop(proxy);
608
609        scope.close().await;
610
611        let expected_get_next_result_errors = match mode {
612            VerifyMode::ExpectComponentFailure => 1u64,
613            _ => 0u64,
614        };
615
616        {
617            let result_json = read_snapshot(
618                Arc::clone(&inspect_repo),
619                Arc::clone(&pipeline),
620                Arc::clone(&inspector_arc),
621                test_batch_iterator_stats1,
622            )
623            .await;
624
625            let result_array = result_json.as_array().expect("unit test json should be array.");
626            assert_eq!(result_array.len(), 1, "Expect only one schema to be returned.");
627
628            let result_map =
629                result_array[0].as_object().expect("entries in the schema array are json objects.");
630
631            let result_payload =
632                result_map.get("payload").expect("diagnostics schema requires payload entry.");
633
634            let expected_payload = match mode {
635                VerifyMode::ExpectSuccess => json!({
636                    "root": {
637                        "child_1": {
638                            "child_1_1": {
639                                "some-int": 3
640                            }
641                        },
642                        "child_2": {
643                            "some-int": 2
644                        }
645                    }
646                }),
647                VerifyMode::ExpectComponentFailure => json!(null),
648            };
649            assert_eq!(*result_payload, expected_payload);
650
651            // stream_diagnostics_requests is 0 since its tracked via archive_accessor server,
652            // which isn't running in this unit test.
653            assert_data_tree!(Arc::clone(&inspector_arc), root: {
654                test_archive_accessor_node: {
655                    connections_closed: 0u64,
656                    connections_opened: 0u64,
657                    inspect: {
658                        batch_iterator_connections: {},
659                        batch_iterator: {
660                            connections_closed: 1u64,
661                            connections_opened: 1u64,
662                            get_next: {
663                                time_usec: AnyProperty,
664                                requests: 2u64,
665                                responses: 2u64,
666                                result_count: 1u64,
667                                result_errors: expected_get_next_result_errors,
668                            }
669                        },
670                        component_timeouts_count: 0u64,
671                        component_time_usec: AnyProperty,
672                        reader_servers_constructed: 1u64,
673                        reader_servers_destroyed: 1u64,
674                        schema_truncation_count: 0u64,
675                        max_snapshot_sizes_bytes: AnyProperty,
676                        snapshot_schema_truncation_percentage: AnyProperty,
677                        longest_processing_times: contains {
678                            "test_component": contains {
679                                "@time": AnyProperty,
680                                duration_seconds: AnyProperty,
681                            }
682                        },
683                    },
684                    logs: {
685                        batch_iterator_connections: {},
686                        batch_iterator: {
687                            connections_closed: 0u64,
688                            connections_opened: 0u64,
689                            get_next: {
690                                requests: 0u64,
691                                responses: 0u64,
692                                result_count: 0u64,
693                                result_errors: 0u64,
694                                time_usec: AnyProperty,
695                            }
696                        },
697                        component_timeouts_count: 0u64,
698                        reader_servers_constructed: 0u64,
699                        reader_servers_destroyed: 0u64,
700                        max_snapshot_sizes_bytes: AnyProperty,
701                        snapshot_schema_truncation_percentage: AnyProperty,
702                        schema_truncation_count: 0u64,
703                    },
704                    stream_diagnostics_requests: 0u64,
705                },
706            });
707        }
708
709        let test_batch_iterator_stats2 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
710
711        inspect_repo.terminate_inspect(identity);
712        {
713            let result_json = read_snapshot(
714                Arc::clone(&inspect_repo),
715                Arc::clone(&pipeline),
716                Arc::clone(&inspector_arc),
717                test_batch_iterator_stats2,
718            )
719            .await;
720
721            let result_array = result_json.as_array().expect("unit test json should be array.");
722            assert_eq!(result_array.len(), 0, "Expect no schemas to be returned.");
723
724            assert_data_tree!(Arc::clone(&inspector_arc), root: {
725                test_archive_accessor_node: {
726                    connections_closed: 0u64,
727                    connections_opened: 0u64,
728                    inspect: {
729                        batch_iterator_connections: {},
730                        batch_iterator: {
731                            connections_closed: 2u64,
732                            connections_opened: 2u64,
733                            get_next: {
734                                time_usec: AnyProperty,
735                                requests: 3u64,
736                                responses: 3u64,
737                                result_count: 1u64,
738                                result_errors: expected_get_next_result_errors,
739                            }
740                        },
741                        component_timeouts_count: 0u64,
742                        component_time_usec: AnyProperty,
743                        reader_servers_constructed: 2u64,
744                        reader_servers_destroyed: 2u64,
745                        schema_truncation_count: 0u64,
746                        max_snapshot_sizes_bytes: AnyProperty,
747                        snapshot_schema_truncation_percentage: AnyProperty,
748                        longest_processing_times: contains {
749                            "test_component": contains {
750                                "@time": AnyProperty,
751                                duration_seconds: AnyProperty,
752                            }
753                        },
754                    },
755                    logs: {
756                        batch_iterator_connections: {},
757                        batch_iterator: {
758                            connections_closed: 0u64,
759                            connections_opened: 0u64,
760                            get_next: {
761                                requests: 0u64,
762                                responses: 0u64,
763                                result_count: 0u64,
764                                result_errors: 0u64,
765                                time_usec: AnyProperty,
766                            }
767                        },
768                        component_timeouts_count: 0u64,
769                        reader_servers_constructed: 0u64,
770                        reader_servers_destroyed: 0u64,
771                        max_snapshot_sizes_bytes: AnyProperty,
772                        snapshot_schema_truncation_percentage: AnyProperty,
773                        schema_truncation_count: 0u64,
774                    },
775                    stream_diagnostics_requests: 0u64,
776                },
777            });
778        }
779    }
780
781    fn start_snapshot(
782        inspect_repo: Arc<InspectRepository>,
783        pipeline: Arc<Pipeline>,
784        stats: Arc<BatchIteratorConnectionStats>,
785    ) -> (BatchIteratorProxy, Task<()>) {
786        let test_performance_config = PerformanceConfig {
787            batch_timeout_sec: BATCH_RETRIEVAL_TIMEOUT_SECONDS,
788            aggregated_content_limit_bytes: None,
789            maximum_concurrent_snapshots_per_reader: 4,
790        };
791
792        let trace_id = ftrace::Id::random();
793        let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
794        let reader_server = ReaderServer::stream(
795            inspect_repo.fetch_inspect_data(&None, static_hierarchy_allowlist),
796            test_performance_config,
797            // No selectors
798            None,
799            Arc::clone(&stats),
800            trace_id,
801        );
802        let (consumer, batch_iterator_requests) = create_proxy_and_stream::<BatchIteratorMarker>();
803        (
804            consumer,
805            Task::spawn(async {
806                BatchIterator::new(
807                    reader_server,
808                    batch_iterator_requests.peekable(),
809                    StreamMode::Snapshot,
810                    stats,
811                    None,
812                    ftrace::Id::random(),
813                    Format::Json,
814                )
815                .unwrap()
816                .run()
817                .await
818                .unwrap()
819            }),
820        )
821    }
822
823    async fn read_snapshot(
824        inspect_repo: Arc<InspectRepository>,
825        pipeline: Arc<Pipeline>,
826        _test_inspector: Arc<Inspector>,
827        stats: Arc<BatchIteratorConnectionStats>,
828    ) -> serde_json::Value {
829        let (consumer, server) = start_snapshot(inspect_repo, pipeline, stats);
830
831        let mut result_vec: Vec<String> = Vec::new();
832        loop {
833            let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
834                consumer.get_next().await.unwrap().unwrap();
835
836            if next_batch.is_empty() {
837                break;
838            }
839            for formatted_content in next_batch {
840                match formatted_content {
841                    fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
842                        let mut buf = vec![0; data.size as usize];
843                        data.vmo.read(&mut buf, 0).expect("reading vmo");
844                        let hierarchy_string = std::str::from_utf8(&buf).unwrap();
845                        result_vec.push(hierarchy_string.to_string());
846                    }
847                    _ => panic!("test only produces json formatted data"),
848                }
849            }
850        }
851
852        // ensures connection is marked as closed, wait for stream to terminate
853        drop(consumer);
854        server.await;
855
856        let result_string = format!("[{}]", result_vec.join(","));
857        serde_json::from_str(&result_string).unwrap_or_else(|_| {
858            panic!("unit tests shouldn't be creating malformed json: {result_string}")
859        })
860    }
861
862    async fn read_snapshot_verify_batch_count_and_batch_size(
863        inspect_repo: Arc<InspectRepository>,
864        pipeline: Arc<Pipeline>,
865        expected_batch_sizes: Vec<usize>,
866        stats: Arc<BatchIteratorConnectionStats>,
867    ) -> serde_json::Value {
868        let (consumer, server) = start_snapshot(inspect_repo, pipeline, stats);
869
870        let mut result_vec: Vec<String> = Vec::new();
871        let mut batch_counts = Vec::new();
872        loop {
873            let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
874                consumer.get_next().await.unwrap().unwrap();
875
876            if next_batch.is_empty() {
877                assert_eq!(expected_batch_sizes, batch_counts);
878                break;
879            }
880
881            batch_counts.push(next_batch.len());
882
883            for formatted_content in next_batch {
884                match formatted_content {
885                    fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
886                        let mut buf = vec![0; data.size as usize];
887                        data.vmo.read(&mut buf, 0).expect("reading vmo");
888                        let hierarchy_string = std::str::from_utf8(&buf).unwrap();
889                        result_vec.push(hierarchy_string.to_string());
890                    }
891                    _ => panic!("test only produces json formatted data"),
892                }
893            }
894        }
895
896        // ensures connection is marked as closed, wait for stream to terminate
897        drop(consumer);
898        server.await;
899
900        let result_string = format!("[{}]", result_vec.join(","));
901        serde_json::from_str(&result_string).unwrap_or_else(|_| {
902            panic!("unit tests shouldn't be creating malformed json: {result_string}")
903        })
904    }
905}