archivist_lib/inspect/
mod.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::accessor::PerformanceConfig;
6use crate::diagnostics::{BatchIteratorConnectionStats, TRACE_CATEGORY};
7use crate::inspect::container::{ReadSnapshot, SnapshotData, UnpopulatedInspectDataContainer};
8use crate::pipeline::{ComponentAllowlist, PrivacyExplicitOption};
9use diagnostics_data::{self as schema, Data, Inspect, InspectDataBuilder, InspectHandleName};
10use diagnostics_hierarchy::{DiagnosticsHierarchy, HierarchyMatcher};
11use fidl_fuchsia_diagnostics::Selector;
12use fidl_fuchsia_inspect::DEFAULT_TREE_NAME;
13use fuchsia_inspect::reader::PartialNodeHierarchy;
14use fuchsia_trace as ftrace;
15use futures::prelude::*;
16use log::error;
17use selectors::SelectorExt;
18use std::sync::Arc;
19
20pub mod collector;
21pub mod container;
22pub mod repository;
23pub mod servers;
24
25use container::PopulatedInspectDataContainer;
26
27/// Packet containing a node hierarchy and all the metadata needed to
28/// populate a diagnostics schema for that node hierarchy.
29pub struct NodeHierarchyData {
30    // Name of the file that created this snapshot.
31    name: Option<InspectHandleName>,
32    // Timestamp at which this snapshot resolved or failed.
33    timestamp: zx::BootInstant,
34    // Errors encountered when processing this snapshot.
35    errors: Vec<schema::InspectError>,
36    // Optional DiagnosticsHierarchy of the inspect hierarchy, in case reading fails
37    // and we have errors to share with client.
38    hierarchy: Option<DiagnosticsHierarchy>,
39    // Whether or not this data comes from an escrowed VMO.
40    escrowed: bool,
41}
42
43impl From<SnapshotData> for NodeHierarchyData {
44    fn from(data: SnapshotData) -> NodeHierarchyData {
45        match data.snapshot {
46            Some(snapshot) => match convert_snapshot_to_node_hierarchy(snapshot) {
47                Ok(node_hierarchy) => NodeHierarchyData {
48                    name: data.name,
49                    timestamp: data.timestamp,
50                    errors: data.errors,
51                    hierarchy: Some(node_hierarchy),
52                    escrowed: data.escrowed,
53                },
54                Err(e) => NodeHierarchyData {
55                    name: data.name,
56                    timestamp: data.timestamp,
57                    errors: vec![schema::InspectError { message: format!("{e:?}") }],
58                    hierarchy: None,
59                    escrowed: data.escrowed,
60                },
61            },
62            None => NodeHierarchyData {
63                name: data.name,
64                timestamp: data.timestamp,
65                errors: data.errors,
66                hierarchy: None,
67                escrowed: data.escrowed,
68            },
69        }
70    }
71}
72
73/// ReaderServer holds the state and data needed to serve Inspect data
74/// reading requests for a single client.
75pub struct ReaderServer {
76    /// Selectors provided by the client which define what inspect data is returned by read
77    /// requests. A none type implies that all available data should be returned.
78    selectors: Option<Vec<Selector>>,
79}
80
81fn convert_snapshot_to_node_hierarchy(
82    snapshot: ReadSnapshot,
83) -> Result<DiagnosticsHierarchy, fuchsia_inspect::reader::ReaderError> {
84    match snapshot {
85        ReadSnapshot::Single(snapshot) => Ok(PartialNodeHierarchy::try_from(snapshot)?.into()),
86        ReadSnapshot::Tree(snapshot_tree) => snapshot_tree.try_into(),
87        ReadSnapshot::Finished(hierarchy) => Ok(hierarchy),
88    }
89}
90
91impl ReaderServer {
92    /// Create a stream of filtered inspect data, ready to serve.
93    pub fn stream(
94        unpopulated_diagnostics_sources: Vec<UnpopulatedInspectDataContainer>,
95        performance_configuration: PerformanceConfig,
96        selectors: Option<Vec<Selector>>,
97        stats: Arc<BatchIteratorConnectionStats>,
98        parent_trace_id: ftrace::Id,
99    ) -> impl Stream<Item = Data<Inspect>> + Send + 'static {
100        let server = Arc::new(Self { selectors });
101
102        let batch_timeout = performance_configuration.batch_timeout_sec;
103        let maximum_concurrent_snapshots_per_reader =
104            performance_configuration.maximum_concurrent_snapshots_per_reader;
105
106        futures::stream::iter(unpopulated_diagnostics_sources)
107            .map(move |unpopulated| {
108                let global_stats = Arc::clone(stats.global_stats());
109                unpopulated.populate(batch_timeout, global_stats, parent_trace_id)
110            })
111            .flatten()
112            .map(future::ready)
113            // buffer a small number in memory in case later components time out
114            .buffer_unordered(maximum_concurrent_snapshots_per_reader as usize)
115            // filter each component's inspect
116            .filter_map(move |populated| {
117                let server_clone = Arc::clone(&server);
118                async move { server_clone.filter_snapshot(populated, parent_trace_id) }
119            })
120    }
121
122    fn filter_single_components_snapshot(
123        snapshot_data: SnapshotData,
124        static_allowlist: ComponentAllowlist,
125        client_matcher: Option<HierarchyMatcher>,
126        moniker: &str,
127        parent_trace_id: ftrace::Id,
128    ) -> Option<NodeHierarchyData> {
129        let filename = snapshot_data.name.clone();
130        let node_hierarchy_data = {
131            let unfiltered_node_hierarchy_data: NodeHierarchyData = {
132                let trace_id = ftrace::Id::random();
133                let _trace_guard = ftrace::async_enter!(
134                    trace_id,
135                    TRACE_CATEGORY,
136                    c"SnapshotData -> NodeHierarchyData",
137                    // An async duration cannot have multiple concurrent child async durations
138                    // so we include the nonce as metadata to manually determine relationship.
139                    "parent_trace_id" => u64::from(parent_trace_id),
140                    "trace_id" => u64::from(trace_id),
141                    "moniker" => moniker,
142                    "filename" => filename
143                            .as_ref()
144                            .and_then(InspectHandleName::as_filename)
145                            .unwrap_or(""),
146                    "name" => filename
147                            .as_ref()
148                            .and_then(InspectHandleName::as_name)
149                            .unwrap_or("")
150                );
151                snapshot_data.into()
152            };
153
154            let handle_name = unfiltered_node_hierarchy_data
155                .name
156                .as_ref()
157                .map(|name| name.as_ref())
158                .unwrap_or(DEFAULT_TREE_NAME);
159            match static_allowlist.matcher(handle_name) {
160                PrivacyExplicitOption::Found(matcher) => {
161                    let Some(node_hierarchy) = unfiltered_node_hierarchy_data.hierarchy else {
162                        return Some(unfiltered_node_hierarchy_data);
163                    };
164                    let trace_id = ftrace::Id::random();
165                    let _trace_guard = ftrace::async_enter!(
166                        trace_id,
167                        TRACE_CATEGORY,
168                        c"ReaderServer::filter_single_components_snapshot.filter_hierarchy",
169                        // An async duration cannot have multiple concurrent child async durations
170                        // so we include the nonce as metadata to manually determine relationship.
171                        "parent_trace_id" => u64::from(parent_trace_id),
172                        "trace_id" => u64::from(trace_id),
173                        "moniker" => moniker,
174                        "filename"  => unfiltered_node_hierarchy_data
175                                .name
176                                .as_ref()
177                                .and_then(InspectHandleName::as_filename)
178                                .unwrap_or(""),
179                        "name" => unfiltered_node_hierarchy_data
180                                .name
181                                .as_ref()
182                                .and_then(InspectHandleName::as_name)
183                                .unwrap_or(""),
184                        "selector_type" => "static"
185                    );
186                    let filtered_hierarchy =
187                        diagnostics_hierarchy::filter_hierarchy(node_hierarchy, matcher)?;
188                    NodeHierarchyData {
189                        name: unfiltered_node_hierarchy_data.name,
190                        timestamp: unfiltered_node_hierarchy_data.timestamp,
191                        errors: unfiltered_node_hierarchy_data.errors,
192                        hierarchy: Some(filtered_hierarchy),
193                        escrowed: unfiltered_node_hierarchy_data.escrowed,
194                    }
195                }
196                PrivacyExplicitOption::NotFound => return None,
197                PrivacyExplicitOption::FilteringDisabled => unfiltered_node_hierarchy_data,
198            }
199        };
200
201        let Some(dynamic_matcher) = client_matcher else {
202            // If matcher is present, and there was an HierarchyMatcher,
203            // then this means the client provided their own selectors, and a subset of
204            // them matched this component. So we need to filter each of the snapshots from
205            // this component with the dynamically provided components.
206            return Some(node_hierarchy_data);
207        };
208        let Some(node_hierarchy) = node_hierarchy_data.hierarchy else {
209            return Some(node_hierarchy_data);
210        };
211
212        let trace_id = ftrace::Id::random();
213        let _trace_guard = ftrace::async_enter!(
214            trace_id,
215            TRACE_CATEGORY,
216            c"ReaderServer::filter_single_components_snapshot.filter_hierarchy",
217            // An async duration cannot have multiple concurrent child async durations
218            // so we include the nonce as metadata to manually determine relationship.
219            "parent_trace_id" => u64::from(parent_trace_id),
220            "trace_id" => u64::from(trace_id),
221            "moniker" => moniker,
222            "filename" => {
223                node_hierarchy_data
224                    .name
225                    .as_ref()
226                    .and_then(InspectHandleName::as_filename)
227                    .unwrap_or("")
228            },
229            "name" => {
230                node_hierarchy_data
231                    .name
232                    .as_ref()
233                    .and_then(InspectHandleName::as_name)
234                    .unwrap_or("")
235            },
236            "selector_type" => "client"
237        );
238        diagnostics_hierarchy::filter_hierarchy(node_hierarchy, &dynamic_matcher).map(
239            |filtered_hierarchy| NodeHierarchyData {
240                name: node_hierarchy_data.name,
241                timestamp: node_hierarchy_data.timestamp,
242                errors: node_hierarchy_data.errors,
243                hierarchy: Some(filtered_hierarchy),
244                escrowed: node_hierarchy_data.escrowed,
245            },
246        )
247    }
248
249    /// Takes a PopulatedInspectDataContainer and converts all non-error
250    /// results into in-memory node hierarchies. The hierarchies are filtered
251    /// such that the only diagnostics properties they contain are those
252    /// configured by the static and client-provided selectors.
253    ///
254    // TODO(https://fxbug.dev/42122598): Error entries should still be included, but with a custom hierarchy
255    //             that makes it clear to clients that snapshotting failed.
256    fn filter_snapshot(
257        &self,
258        pumped_inspect_data: PopulatedInspectDataContainer,
259        parent_trace_id: ftrace::Id,
260    ) -> Option<Data<Inspect>> {
261        // Since a single PopulatedInspectDataContainer shares a moniker for all pieces of data it
262        // contains, we can store the result of component selector filtering to avoid reapplying
263        // the selectors.
264        let mut client_selectors: Option<HierarchyMatcher> = None;
265
266        // We iterate the vector of pumped inspect data packets, consuming each inspect vmo
267        // and filtering it using the provided selector regular expressions. Each filtered
268        // inspect hierarchy is then added to an accumulator as a HierarchyData to be converted
269        // into a JSON string and returned.
270        if let Some(configured_selectors) = &self.selectors {
271            client_selectors = {
272                let matching_selectors = pumped_inspect_data
273                    .identity
274                    .moniker
275                    .match_against_selectors(configured_selectors.as_slice())
276                    .collect::<Result<Vec<_>, _>>()
277                    .unwrap_or_else(|err| {
278                        error!(
279                            moniker:? = pumped_inspect_data.identity.moniker, err:?;
280                            "Failed to evaluate client selectors",
281                        );
282                        Vec::new()
283                    });
284
285                if matching_selectors.is_empty() {
286                    None
287                } else {
288                    match matching_selectors.try_into() {
289                        Ok(hierarchy_matcher) => Some(hierarchy_matcher),
290                        Err(e) => {
291                            error!(e:?; "Failed to create hierarchy matcher");
292                            None
293                        }
294                    }
295                }
296            };
297
298            // If there were configured matchers and none of them matched
299            // this component, then we should return early since there is no data to
300            // extract.
301            client_selectors.as_ref()?;
302        }
303
304        let identity = Arc::clone(&pumped_inspect_data.identity);
305
306        let hierarchy_data = ReaderServer::filter_single_components_snapshot(
307            pumped_inspect_data.snapshot,
308            pumped_inspect_data.component_allowlist,
309            client_selectors,
310            identity.to_string().as_str(),
311            parent_trace_id,
312        )?;
313        let mut builder = InspectDataBuilder::new(
314            pumped_inspect_data.identity.moniker.clone(),
315            identity.url.clone(),
316            hierarchy_data.timestamp,
317        );
318        if let Some(hierarchy) = hierarchy_data.hierarchy {
319            builder = builder.with_hierarchy(hierarchy);
320        }
321        if let Some(name) = hierarchy_data.name {
322            builder = builder.with_name(name);
323        }
324        if !hierarchy_data.errors.is_empty() {
325            builder = builder.with_errors(hierarchy_data.errors);
326        }
327        Some(builder.escrowed(hierarchy_data.escrowed).build())
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334    use crate::accessor::BatchIterator;
335    use crate::diagnostics::AccessorStats;
336    use crate::events::router::EventConsumer;
337    use crate::events::types::{Event, EventPayload, InspectSinkRequestedPayload};
338    use crate::identity::ComponentIdentity;
339    use crate::inspect::container::InspectHandle;
340    use crate::inspect::repository::InspectRepository;
341    use crate::inspect::servers::InspectSinkServer;
342    use crate::pipeline::Pipeline;
343    use diagnostics_assertions::{assert_data_tree, AnyProperty};
344    use fidl::endpoints::{create_proxy_and_stream, ClientEnd};
345    use fidl_fuchsia_diagnostics::{BatchIteratorMarker, BatchIteratorProxy, Format, StreamMode};
346    use fidl_fuchsia_inspect::{
347        InspectSinkMarker, InspectSinkPublishRequest, TreeMarker, TreeProxy,
348    };
349    use fuchsia_async::{self as fasync, Task};
350    use fuchsia_inspect::{Inspector, InspectorConfig};
351
352    use futures::StreamExt;
353    use inspect_runtime::{service, TreeServerSendPreference};
354    use moniker::ExtendedMoniker;
355    use selectors::VerboseError;
356    use serde_json::json;
357    use std::collections::HashMap;
358    use test_case::test_case;
359    use zx::Peered;
360
361    const TEST_URL: &str = "fuchsia-pkg://test";
362    const BATCH_RETRIEVAL_TIMEOUT_SECONDS: i64 = 300;
363
364    #[fuchsia::test]
365    async fn read_server_formatting_tree_inspect_sink() {
366        let inspector = inspector_for_reader_test();
367        let scope = fasync::Scope::new();
368        let tree_client =
369            service::spawn_tree_server(inspector, TreeServerSendPreference::default(), &scope);
370        verify_reader(tree_client).await;
371    }
372
373    #[fuchsia::test]
374    async fn reader_server_reports_errors() {
375        // This inspector doesn't contain valid inspect data.
376        let vmo = zx::Vmo::create(4096).unwrap();
377        let inspector = Inspector::new(InspectorConfig::default().vmo(vmo));
378        let scope = fasync::Scope::new();
379        let tree_client =
380            service::spawn_tree_server(inspector, TreeServerSendPreference::default(), &scope);
381        let (done0, done1) = zx::Channel::create();
382        // Run the actual test in a separate thread so that it does not block on FS operations.
383        // Use signalling on a zx::Channel to indicate that the test is done.
384        std::thread::spawn(move || {
385            let done = done1;
386            let mut executor = fasync::LocalExecutor::new();
387            executor.run_singlethreaded(async {
388                verify_reader_with_mode(tree_client, VerifyMode::ExpectComponentFailure).await;
389                done.signal_peer(zx::Signals::NONE, zx::Signals::USER_0).expect("signalling peer");
390            });
391        });
392
393        fasync::OnSignals::new(&done0, zx::Signals::USER_0).await.unwrap();
394    }
395
396    #[test_case(vec![63, 65], vec![64, 64] ; "merge_errorful_component_into_next_batch")]
397    #[test_case(vec![64, 65, 64, 64], vec![64, 64, 64, 64, 1] ; "errorful_component_doesnt_halt_iteration")]
398    #[test_case(vec![65], vec![64, 1] ; "component_with_more_than_max_batch_size_is_split_in_two")]
399    #[test_case(vec![1usize; 64], vec![64] ; "sixty_four_vmos_packed_into_one_batch")]
400    #[test_case(vec![64, 63, 1], vec![64, 64] ; "max_batch_intact_two_batches_merged")]
401    #[test_case(vec![33, 33, 33], vec![64, 35] ; "three_directories_two_batches")]
402    #[fuchsia::test]
403    async fn stress_test_diagnostics_repository(
404        component_handle_counts: Vec<usize>,
405        expected_batch_results: Vec<usize>,
406    ) {
407        let component_name_handle_counts: Vec<(String, usize)> = component_handle_counts
408            .into_iter()
409            .enumerate()
410            .map(|(index, handle_count)| (format!("diagnostics_{index}"), handle_count))
411            .collect();
412
413        let inspector = inspector_for_reader_test();
414
415        let mut clients = HashMap::<String, Vec<TreeProxy>>::new();
416        let scope = fasync::Scope::new();
417        for (component_name, handle_count) in component_name_handle_counts.clone() {
418            for _ in 0..handle_count {
419                let inspector_dup = Inspector::new(
420                    InspectorConfig::default()
421                        .vmo(inspector.duplicate_vmo().expect("failed to duplicate vmo")),
422                );
423                let client = service::spawn_tree_server(
424                    inspector_dup,
425                    TreeServerSendPreference::default(),
426                    &scope,
427                );
428                clients.entry(component_name.clone()).or_default().push(client.into_proxy());
429            }
430        }
431
432        let pipeline = Arc::new(Pipeline::for_test(None));
433        let inspect_repo =
434            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], fasync::Scope::new()));
435
436        for (component, handles) in clients {
437            let moniker = ExtendedMoniker::parse_str(&component).unwrap();
438            let identity = Arc::new(ComponentIdentity::new(moniker, TEST_URL));
439            for (i, handle) in handles.into_iter().enumerate() {
440                inspect_repo.add_inspect_handle(
441                    Arc::clone(&identity),
442                    InspectHandle::tree(handle, Some(format!("tree_{i}"))),
443                );
444            }
445        }
446
447        let inspector = Inspector::default();
448        let root = inspector.root();
449        let test_archive_accessor_node = root.create_child("test_archive_accessor_node");
450
451        let test_accessor_stats = Arc::new(AccessorStats::new(test_archive_accessor_node));
452        let test_batch_iterator_stats1 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
453
454        let _result_json = read_snapshot_verify_batch_count_and_batch_size(
455            Arc::clone(&inspect_repo),
456            Arc::clone(&pipeline),
457            expected_batch_results,
458            test_batch_iterator_stats1,
459        )
460        .await;
461    }
462
463    fn inspector_for_reader_test() -> Inspector {
464        let inspector = Inspector::default();
465        let root = inspector.root();
466        let child_1 = root.create_child("child_1");
467        child_1.record_int("some-int", 2);
468        let child_1_1 = child_1.create_child("child_1_1");
469        child_1_1.record_int("some-int", 3);
470        child_1_1.record_int("not-wanted-int", 4);
471        root.record(child_1_1);
472        root.record(child_1);
473        let child_2 = root.create_child("child_2");
474        child_2.record_int("some-int", 2);
475        root.record(child_2);
476        inspector
477    }
478
479    enum VerifyMode {
480        ExpectSuccess,
481        ExpectComponentFailure,
482    }
483
484    /// Verify that data can be read via InspectRepository, and that `AccessorStats` are updated
485    /// accordingly.
486    async fn verify_reader(tree_client: ClientEnd<TreeMarker>) {
487        verify_reader_with_mode(tree_client, VerifyMode::ExpectSuccess).await;
488    }
489
490    async fn verify_reader_with_mode(tree_client: ClientEnd<TreeMarker>, mode: VerifyMode) {
491        let child_1_1_selector =
492            selectors::parse_selector::<VerboseError>(r#"*:root/child_1/*:some-int"#).unwrap();
493        let child_2_selector =
494            selectors::parse_selector::<VerboseError>(r#"test_component:root/child_2:*"#).unwrap();
495
496        let static_selectors_opt = Some(vec![child_1_1_selector, child_2_selector]);
497
498        let pipeline = Arc::new(Pipeline::for_test(static_selectors_opt));
499        let inspect_repo =
500            Arc::new(InspectRepository::new(vec![Arc::downgrade(&pipeline)], fasync::Scope::new()));
501
502        // The moniker here is made up since the selector is a glob
503        // selector, so any path would match.
504        let component_id = ExtendedMoniker::parse_str("./test_component").unwrap();
505        let inspector = Inspector::default();
506        let root = inspector.root();
507        let test_archive_accessor_node = root.create_child("test_archive_accessor_node");
508
509        assert_data_tree!(inspector, root: {test_archive_accessor_node: {}});
510
511        let test_accessor_stats = Arc::new(AccessorStats::new(test_archive_accessor_node));
512
513        let test_batch_iterator_stats1 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
514
515        assert_data_tree!(inspector, root: {
516            test_archive_accessor_node: {
517                connections_closed: 0u64,
518                connections_opened: 0u64,
519                inspect: {
520                    batch_iterator_connections: {
521                        "0": {
522                            get_next: {
523                                terminal_responses: 0u64,
524                                responses: 0u64,
525                                requests: 0u64,
526                            }
527                        }
528                    },
529                    batch_iterator: {
530                        connections_closed: 0u64,
531                        connections_opened: 0u64,
532                        get_next: {
533                            time_usec: AnyProperty,
534                            requests: 0u64,
535                            responses: 0u64,
536                            result_count: 0u64,
537                            result_errors: 0u64,
538                        }
539                    },
540                    component_timeouts_count: 0u64,
541                    reader_servers_constructed: 1u64,
542                    reader_servers_destroyed: 0u64,
543                    schema_truncation_count: 0u64,
544                    max_snapshot_sizes_bytes: AnyProperty,
545                    snapshot_schema_truncation_percentage: AnyProperty,
546                },
547                logs: {
548                    batch_iterator_connections: {},
549                    batch_iterator: {
550                        connections_closed: 0u64,
551                        connections_opened: 0u64,
552                        get_next: {
553                            requests: 0u64,
554                            responses: 0u64,
555                            result_count: 0u64,
556                            result_errors: 0u64,
557                            time_usec: AnyProperty,
558                        }
559                    },
560                    component_timeouts_count: 0u64,
561                    reader_servers_constructed: 0u64,
562                    reader_servers_destroyed: 0u64,
563                    max_snapshot_sizes_bytes: AnyProperty,
564                    snapshot_schema_truncation_percentage: AnyProperty,
565                    schema_truncation_count: 0u64,
566                },
567                stream_diagnostics_requests: 0u64,
568            },
569        });
570
571        let inspector_arc = Arc::new(inspector);
572
573        let identity = Arc::new(ComponentIdentity::new(component_id, TEST_URL));
574
575        let (proxy, request_stream) = create_proxy_and_stream::<InspectSinkMarker>();
576        proxy
577            .publish(InspectSinkPublishRequest { tree: Some(tree_client), ..Default::default() })
578            .unwrap();
579
580        let scope = fasync::Scope::new();
581        let inspect_sink_server =
582            Arc::new(InspectSinkServer::new(Arc::clone(&inspect_repo), scope.new_child()));
583        Arc::clone(&inspect_sink_server).handle(Event {
584            timestamp: zx::BootInstant::get(),
585            payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
586                component: Arc::clone(&identity),
587                request_stream,
588            }),
589        });
590
591        drop(proxy);
592
593        scope.close().await;
594
595        let expected_get_next_result_errors = match mode {
596            VerifyMode::ExpectComponentFailure => 1u64,
597            _ => 0u64,
598        };
599
600        {
601            let result_json = read_snapshot(
602                Arc::clone(&inspect_repo),
603                Arc::clone(&pipeline),
604                Arc::clone(&inspector_arc),
605                test_batch_iterator_stats1,
606            )
607            .await;
608
609            let result_array = result_json.as_array().expect("unit test json should be array.");
610            assert_eq!(result_array.len(), 1, "Expect only one schema to be returned.");
611
612            let result_map =
613                result_array[0].as_object().expect("entries in the schema array are json objects.");
614
615            let result_payload =
616                result_map.get("payload").expect("diagnostics schema requires payload entry.");
617
618            let expected_payload = match mode {
619                VerifyMode::ExpectSuccess => json!({
620                    "root": {
621                        "child_1": {
622                            "child_1_1": {
623                                "some-int": 3
624                            }
625                        },
626                        "child_2": {
627                            "some-int": 2
628                        }
629                    }
630                }),
631                VerifyMode::ExpectComponentFailure => json!(null),
632            };
633            assert_eq!(*result_payload, expected_payload);
634
635            // stream_diagnostics_requests is 0 since its tracked via archive_accessor server,
636            // which isn't running in this unit test.
637            assert_data_tree!(Arc::clone(&inspector_arc), root: {
638                test_archive_accessor_node: {
639                    connections_closed: 0u64,
640                    connections_opened: 0u64,
641                    inspect: {
642                        batch_iterator_connections: {},
643                        batch_iterator: {
644                            connections_closed: 1u64,
645                            connections_opened: 1u64,
646                            get_next: {
647                                time_usec: AnyProperty,
648                                requests: 2u64,
649                                responses: 2u64,
650                                result_count: 1u64,
651                                result_errors: expected_get_next_result_errors,
652                            }
653                        },
654                        component_timeouts_count: 0u64,
655                        component_time_usec: AnyProperty,
656                        reader_servers_constructed: 1u64,
657                        reader_servers_destroyed: 1u64,
658                        schema_truncation_count: 0u64,
659                        max_snapshot_sizes_bytes: AnyProperty,
660                        snapshot_schema_truncation_percentage: AnyProperty,
661                        longest_processing_times: contains {
662                            "test_component": contains {
663                                "@time": AnyProperty,
664                                duration_seconds: AnyProperty,
665                            }
666                        },
667                    },
668                    logs: {
669                        batch_iterator_connections: {},
670                        batch_iterator: {
671                            connections_closed: 0u64,
672                            connections_opened: 0u64,
673                            get_next: {
674                                requests: 0u64,
675                                responses: 0u64,
676                                result_count: 0u64,
677                                result_errors: 0u64,
678                                time_usec: AnyProperty,
679                            }
680                        },
681                        component_timeouts_count: 0u64,
682                        reader_servers_constructed: 0u64,
683                        reader_servers_destroyed: 0u64,
684                        max_snapshot_sizes_bytes: AnyProperty,
685                        snapshot_schema_truncation_percentage: AnyProperty,
686                        schema_truncation_count: 0u64,
687                    },
688                    stream_diagnostics_requests: 0u64,
689                },
690            });
691        }
692
693        let test_batch_iterator_stats2 = Arc::new(test_accessor_stats.new_inspect_batch_iterator());
694
695        inspect_repo.terminate_inspect(identity);
696        {
697            let result_json = read_snapshot(
698                Arc::clone(&inspect_repo),
699                Arc::clone(&pipeline),
700                Arc::clone(&inspector_arc),
701                test_batch_iterator_stats2,
702            )
703            .await;
704
705            let result_array = result_json.as_array().expect("unit test json should be array.");
706            assert_eq!(result_array.len(), 0, "Expect no schemas to be returned.");
707
708            assert_data_tree!(Arc::clone(&inspector_arc), root: {
709                test_archive_accessor_node: {
710                    connections_closed: 0u64,
711                    connections_opened: 0u64,
712                    inspect: {
713                        batch_iterator_connections: {},
714                        batch_iterator: {
715                            connections_closed: 2u64,
716                            connections_opened: 2u64,
717                            get_next: {
718                                time_usec: AnyProperty,
719                                requests: 3u64,
720                                responses: 3u64,
721                                result_count: 1u64,
722                                result_errors: expected_get_next_result_errors,
723                            }
724                        },
725                        component_timeouts_count: 0u64,
726                        component_time_usec: AnyProperty,
727                        reader_servers_constructed: 2u64,
728                        reader_servers_destroyed: 2u64,
729                        schema_truncation_count: 0u64,
730                        max_snapshot_sizes_bytes: AnyProperty,
731                        snapshot_schema_truncation_percentage: AnyProperty,
732                        longest_processing_times: contains {
733                            "test_component": contains {
734                                "@time": AnyProperty,
735                                duration_seconds: AnyProperty,
736                            }
737                        },
738                    },
739                    logs: {
740                        batch_iterator_connections: {},
741                        batch_iterator: {
742                            connections_closed: 0u64,
743                            connections_opened: 0u64,
744                            get_next: {
745                                requests: 0u64,
746                                responses: 0u64,
747                                result_count: 0u64,
748                                result_errors: 0u64,
749                                time_usec: AnyProperty,
750                            }
751                        },
752                        component_timeouts_count: 0u64,
753                        reader_servers_constructed: 0u64,
754                        reader_servers_destroyed: 0u64,
755                        max_snapshot_sizes_bytes: AnyProperty,
756                        snapshot_schema_truncation_percentage: AnyProperty,
757                        schema_truncation_count: 0u64,
758                    },
759                    stream_diagnostics_requests: 0u64,
760                },
761            });
762        }
763    }
764
765    fn start_snapshot(
766        inspect_repo: Arc<InspectRepository>,
767        pipeline: Arc<Pipeline>,
768        stats: Arc<BatchIteratorConnectionStats>,
769    ) -> (BatchIteratorProxy, Task<()>) {
770        let test_performance_config = PerformanceConfig {
771            batch_timeout_sec: BATCH_RETRIEVAL_TIMEOUT_SECONDS,
772            aggregated_content_limit_bytes: None,
773            maximum_concurrent_snapshots_per_reader: 4,
774        };
775
776        let trace_id = ftrace::Id::random();
777        let static_hierarchy_allowlist = pipeline.static_hierarchy_allowlist();
778        let reader_server = ReaderServer::stream(
779            inspect_repo.fetch_inspect_data(&None, static_hierarchy_allowlist),
780            test_performance_config,
781            // No selectors
782            None,
783            Arc::clone(&stats),
784            trace_id,
785        );
786        let (consumer, batch_iterator_requests) = create_proxy_and_stream::<BatchIteratorMarker>();
787        (
788            consumer,
789            Task::spawn(async {
790                BatchIterator::new(
791                    reader_server,
792                    batch_iterator_requests.peekable(),
793                    StreamMode::Snapshot,
794                    stats,
795                    None,
796                    ftrace::Id::random(),
797                    Format::Json,
798                )
799                .unwrap()
800                .run()
801                .await
802                .unwrap()
803            }),
804        )
805    }
806
807    async fn read_snapshot(
808        inspect_repo: Arc<InspectRepository>,
809        pipeline: Arc<Pipeline>,
810        _test_inspector: Arc<Inspector>,
811        stats: Arc<BatchIteratorConnectionStats>,
812    ) -> serde_json::Value {
813        let (consumer, server) = start_snapshot(inspect_repo, pipeline, stats);
814
815        let mut result_vec: Vec<String> = Vec::new();
816        loop {
817            let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
818                consumer.get_next().await.unwrap().unwrap();
819
820            if next_batch.is_empty() {
821                break;
822            }
823            for formatted_content in next_batch {
824                match formatted_content {
825                    fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
826                        let mut buf = vec![0; data.size as usize];
827                        data.vmo.read(&mut buf, 0).expect("reading vmo");
828                        let hierarchy_string = std::str::from_utf8(&buf).unwrap();
829                        result_vec.push(hierarchy_string.to_string());
830                    }
831                    _ => panic!("test only produces json formatted data"),
832                }
833            }
834        }
835
836        // ensures connection is marked as closed, wait for stream to terminate
837        drop(consumer);
838        server.await;
839
840        let result_string = format!("[{}]", result_vec.join(","));
841        serde_json::from_str(&result_string).unwrap_or_else(|_| {
842            panic!("unit tests shouldn't be creating malformed json: {result_string}")
843        })
844    }
845
846    async fn read_snapshot_verify_batch_count_and_batch_size(
847        inspect_repo: Arc<InspectRepository>,
848        pipeline: Arc<Pipeline>,
849        expected_batch_sizes: Vec<usize>,
850        stats: Arc<BatchIteratorConnectionStats>,
851    ) -> serde_json::Value {
852        let (consumer, server) = start_snapshot(inspect_repo, pipeline, stats);
853
854        let mut result_vec: Vec<String> = Vec::new();
855        let mut batch_counts = Vec::new();
856        loop {
857            let next_batch: Vec<fidl_fuchsia_diagnostics::FormattedContent> =
858                consumer.get_next().await.unwrap().unwrap();
859
860            if next_batch.is_empty() {
861                assert_eq!(expected_batch_sizes, batch_counts);
862                break;
863            }
864
865            batch_counts.push(next_batch.len());
866
867            for formatted_content in next_batch {
868                match formatted_content {
869                    fidl_fuchsia_diagnostics::FormattedContent::Json(data) => {
870                        let mut buf = vec![0; data.size as usize];
871                        data.vmo.read(&mut buf, 0).expect("reading vmo");
872                        let hierarchy_string = std::str::from_utf8(&buf).unwrap();
873                        result_vec.push(hierarchy_string.to_string());
874                    }
875                    _ => panic!("test only produces json formatted data"),
876                }
877            }
878        }
879
880        // ensures connection is marked as closed, wait for stream to terminate
881        drop(consumer);
882        server.await;
883
884        let result_string = format!("[{}]", result_vec.join(","));
885        serde_json::from_str(&result_string).unwrap_or_else(|_| {
886            panic!("unit tests shouldn't be creating malformed json: {result_string}")
887        })
888    }
889}