Skip to main content

diagnostics_reader/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![deny(missing_docs)]
6
7//! A library for reading Inspect and Log data from
8//! the ArchiveAccessor FIDL protocol.
9
10use async_stream::stream;
11use diagnostics_data::{DiagnosticsData, LogsData};
12#[cfg(fuchsia_api_level_less_than = "HEAD")]
13use diagnostics_message as _;
14use fidl_fuchsia_diagnostics::{
15    ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
16    ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
17    Selector, SelectorArgument, StreamMode, StreamParameters,
18};
19use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
20use fuchsia_component::client;
21use futures::channel::mpsc;
22use futures::prelude::*;
23use futures::sink::SinkExt;
24use futures::stream::FusedStream;
25use pin_project::pin_project;
26use serde::Deserialize;
27use std::future::ready;
28use std::marker::PhantomData;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32use thiserror::Error;
33use zx::{self as zx, MonotonicDuration};
34
35/// Alias for ArchiveReader<Logs>. Used for reading logs.
36pub type LogsArchiveReader = ArchiveReader<Logs>;
37
38/// Alias for ArchiveReader<Inspect>. Used for reading inspect.
39pub type InspectArchiveReader = ArchiveReader<Inspect>;
40
41pub use diagnostics_data::{Data, Inspect, Logs, Severity};
42pub use diagnostics_hierarchy::{DiagnosticsHierarchy, Property, hierarchy};
43
44const RETRY_DELAY_MS: i64 = 300;
45
46#[cfg(fuchsia_api_level_at_least = "HEAD")]
47const FORMAT: Format = Format::Cbor;
48#[cfg(fuchsia_api_level_less_than = "HEAD")]
49const FORMAT: Format = Format::Json;
50
51/// Errors that this library can return
52#[derive(Debug, Error)]
53pub enum Error {
54    /// Failed to connect to the archive accessor
55    #[error("Failed to connect to the archive accessor")]
56    ConnectToArchive(#[source] anyhow::Error),
57
58    /// Failed to create the BatchIterator channel ends
59    #[error("Failed to create the BatchIterator channel ends")]
60    CreateIteratorProxy(#[source] fidl::Error),
61
62    /// Failed to stream diagnostics from the accessor
63    #[error("Failed to stream diagnostics from the accessor")]
64    StreamDiagnostics(#[source] fidl::Error),
65
66    /// Failed to call iterator server
67    #[error("Failed to call iterator server")]
68    GetNextCall(#[source] fidl::Error),
69
70    /// Received error from the GetNext response
71    #[error("Received error from the GetNext response: {0:?}")]
72    GetNextReaderError(ReaderError),
73
74    /// Failed to read json received
75    #[error("Failed to read json received")]
76    ReadJson(#[source] serde_json::Error),
77
78    /// Failed to read cbor received
79    #[cfg(fuchsia_api_level_at_least = "HEAD")]
80    #[error("Failed to read cbor received")]
81    ReadCbor(#[source] anyhow::Error),
82
83    /// Failed to parse the diagnostics data from the json received
84    #[error("Failed to parse the diagnostics data from the json received")]
85    ParseDiagnosticsData(#[source] serde_json::Error),
86
87    /// Failed to read vmo from the response
88    #[error("Failed to read vmo from the response")]
89    ReadVmo(#[source] zx::Status),
90
91    /// Parser got stuck or failed to advance
92    #[error("Parser got stuck or failed to advance")]
93    ParserStuck,
94    /// Failed to acquire mutex
95    #[cfg(fuchsia_api_level_at_least = "HEAD")]
96    #[error("Failed to acquire mutex")]
97    MutexError,
98}
99
100/// An inspect tree selector for a component.
101pub struct ComponentSelector {
102    moniker: Vec<String>,
103    tree_selectors: Vec<String>,
104}
105
106impl ComponentSelector {
107    /// Create a new component event selector.
108    /// By default it will select the whole tree unless tree selectors are provided.
109    /// `moniker` is the realm path relative to the realm of the running component plus the
110    /// component name. For example: [a, b, component].
111    pub fn new(moniker: Vec<String>) -> Self {
112        Self { moniker, tree_selectors: Vec::new() }
113    }
114
115    /// Select a section of the inspect tree.
116    pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
117        self.tree_selectors.push(tree_selector.into());
118        self
119    }
120
121    fn moniker_str(&self) -> String {
122        self.moniker.join("/")
123    }
124}
125
126/// Trait used for things that can be converted to selector arguments.
127pub trait ToSelectorArguments {
128    /// Converts this to selector arguments.
129    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
130}
131
132/// Trait used for things that can be converted to component selector arguments.
133pub trait ToComponentSelectorArguments {
134    /// Converts this to selector arguments.
135    fn to_component_selector_arguments(self) -> ComponentSelector;
136}
137
138impl ToComponentSelectorArguments for &str {
139    fn to_component_selector_arguments(self) -> ComponentSelector {
140        if self.contains("\\:") {
141            // String is already escaped, don't escape it.
142            ComponentSelector::new(self.split("/").map(|value| value.to_string()).collect())
143        } else {
144            // String isn't escaped, escape it
145            ComponentSelector::new(
146                selectors::sanitize_moniker_for_selectors(self)
147                    .split("/")
148                    .map(|value| value.to_string())
149                    .collect(),
150            )
151            .with_tree_selector("[...]root")
152        }
153    }
154}
155
156impl ToComponentSelectorArguments for String {
157    fn to_component_selector_arguments(self) -> ComponentSelector {
158        self.as_str().to_component_selector_arguments()
159    }
160}
161
162impl ToComponentSelectorArguments for ComponentSelector {
163    fn to_component_selector_arguments(self) -> ComponentSelector {
164        self
165    }
166}
167
168impl ToSelectorArguments for String {
169    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
170        Box::new([SelectorArgument::RawSelector(self)].into_iter())
171    }
172}
173
174impl ToSelectorArguments for &str {
175    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
176        Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
177    }
178}
179
180impl ToSelectorArguments for ComponentSelector {
181    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
182        let moniker = self.moniker_str();
183        // If not tree selectors were provided, select the full tree.
184        if self.tree_selectors.is_empty() {
185            Box::new([SelectorArgument::RawSelector(format!("{moniker}:root"))].into_iter())
186        } else {
187            Box::new(
188                self.tree_selectors
189                    .into_iter()
190                    .map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
191            )
192        }
193    }
194}
195
196impl ToSelectorArguments for Selector {
197    fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
198        Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
199    }
200}
201
202/// Before unsealing this, consider whether your code belongs in this file.
203pub trait SerializableValue: private::Sealed {
204    /// The Format of this SerializableValue. Either Logs or Inspect.
205    const FORMAT_OF_VALUE: Format;
206}
207
208/// Trait used to verify that a JSON payload has a valid diagnostics payload.
209pub trait CheckResponse: private::Sealed {
210    /// Returns true if the response has a valid payload.
211    fn has_payload(&self) -> bool;
212}
213
214// The "sealed trait" pattern.
215//
216// https://rust-lang.github.io/api-guidelines/future-proofing.html
217mod private {
218    pub trait Sealed {}
219}
220impl private::Sealed for serde_json::Value {}
221impl private::Sealed for ciborium::Value {}
222impl<D: DiagnosticsData> private::Sealed for Data<D> {}
223
224impl<D: DiagnosticsData> CheckResponse for Data<D> {
225    fn has_payload(&self) -> bool {
226        self.payload.is_some()
227    }
228}
229
230impl SerializableValue for serde_json::Value {
231    const FORMAT_OF_VALUE: Format = Format::Json;
232}
233
234impl CheckResponse for serde_json::Value {
235    fn has_payload(&self) -> bool {
236        match self {
237            serde_json::Value::Object(obj) => {
238                obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
239            }
240            _ => false,
241        }
242    }
243}
244
245#[cfg(fuchsia_api_level_at_least = "HEAD")]
246impl SerializableValue for ciborium::Value {
247    const FORMAT_OF_VALUE: Format = Format::Cbor;
248}
249
250impl CheckResponse for ciborium::Value {
251    fn has_payload(&self) -> bool {
252        match self {
253            ciborium::Value::Map(m) => {
254                let payload_key = ciborium::Value::Text("payload".into());
255                m.iter().any(|(key, _)| *key == payload_key)
256            }
257            _ => false,
258        }
259    }
260}
261
262/// Retry configuration for ArchiveReader
263#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
264pub enum RetryConfig {
265    /// The minimum schema count required for a successful read.
266    /// This guarantees that a read will contain at least MinSchemaCount
267    /// results.
268    MinSchemaCount(usize),
269}
270
271impl RetryConfig {
272    /// Always retry
273    pub fn always() -> Self {
274        Self::MinSchemaCount(1)
275    }
276
277    /// Never retry
278    pub fn never() -> Self {
279        Self::MinSchemaCount(0)
280    }
281
282    /// Retry result_count times
283    fn should_retry(&self, result_count: usize) -> bool {
284        match self {
285            Self::MinSchemaCount(min) => *min > result_count,
286        }
287    }
288}
289
290/// A trait representing a type of diagnostics data.
291pub trait DiagnosticsDataType: private::Sealed {}
292
293impl private::Sealed for Logs {}
294
295impl private::Sealed for Inspect {}
296
297impl DiagnosticsDataType for Logs {}
298
299impl DiagnosticsDataType for Inspect {}
300
301/// Utility for reading inspect data of a running component using the injected Archive
302/// Reader service.
303pub struct ArchiveReader<T> {
304    archive: Option<ArchiveAccessorProxy>,
305    selectors: Vec<SelectorArgument>,
306    retry_config: RetryConfig,
307    timeout: Option<MonotonicDuration>,
308    batch_retrieval_timeout_seconds: Option<i64>,
309    max_aggregated_content_size_bytes: Option<u64>,
310    format: Option<Format>,
311    _phantom: PhantomData<T>,
312}
313
314impl<T: DiagnosticsDataType> ArchiveReader<T> {
315    /// Initializes the ArchiveReader with a custom connection to an ArchiveAccessor.
316    /// By default, the connection will be initialized by connecting to
317    /// fuchsia.diagnostics.ArchiveAccessor
318    pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
319        self.archive = Some(archive);
320        self
321    }
322
323    /// Sets the minimum number of schemas expected in a result in order for the
324    /// result to be considered a success.
325    pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
326        self.retry_config = RetryConfig::MinSchemaCount(minimum_schema_count);
327        self
328    }
329
330    /// Sets a custom retry configuration. By default we always retry.
331    pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
332        self.retry_config = config;
333        self
334    }
335
336    /// Sets the maximum time to wait for a response from the Archive.
337    /// Do not use in tests unless timeout is the expected behavior.
338    pub fn with_timeout(&mut self, duration: MonotonicDuration) -> &mut Self {
339        self.timeout = Some(duration);
340        self
341    }
342
343    /// Filters logs for a specific component or component selector.
344    /// If string input, the string may be either a component selector string
345    /// or a moniker, or a ComponentSelector may be passed directly.
346    pub fn select_all_for_component(
347        &mut self,
348        component: impl ToComponentSelectorArguments,
349    ) -> &mut Self {
350        self.selectors.extend(component.to_component_selector_arguments().to_selector_arguments());
351        self
352    }
353
354    /// Connects to the ArchiveAccessor and returns data matching provided selectors.
355    async fn snapshot_shared<D>(&self) -> Result<Vec<Data<D>>, Error>
356    where
357        D: DiagnosticsData + 'static,
358    {
359        let data_future = self.snapshot_inner::<D, Data<D>>(FORMAT);
360        let data = match self.timeout {
361            Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
362            None => data_future.await?,
363        };
364        Ok(data)
365    }
366
367    async fn snapshot_inner<D, Y>(&self, format: Format) -> Result<Vec<Y>, Error>
368    where
369        D: DiagnosticsData,
370        Y: for<'a> Deserialize<'a> + CheckResponse + Send + 'static,
371    {
372        loop {
373            let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
374            let result = drain_batch_iterator::<Y>(Arc::new(iterator))
375                .filter_map(|value| ready(value.ok()))
376                .collect::<Vec<_>>()
377                .await;
378
379            if self.retry_config.should_retry(result.len()) {
380                fasync::Timer::new(fasync::MonotonicInstant::after(
381                    zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
382                ))
383                .await;
384            } else {
385                return Ok(result);
386            }
387        }
388    }
389
390    fn batch_iterator<D>(
391        &self,
392        mode: StreamMode,
393        format: Format,
394    ) -> Result<BatchIteratorProxy, Error>
395    where
396        D: DiagnosticsData,
397    {
398        let archive = match &self.archive {
399            Some(archive) => archive.clone(),
400            None => client::connect_to_protocol::<ArchiveAccessorMarker>()
401                .map_err(Error::ConnectToArchive)?,
402        };
403
404        let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
405
406        let stream_parameters = StreamParameters {
407            stream_mode: Some(mode),
408            data_type: Some(D::DATA_TYPE),
409            format: Some(format),
410            client_selector_configuration: if self.selectors.is_empty() {
411                Some(ClientSelectorConfiguration::SelectAll(true))
412            } else {
413                Some(ClientSelectorConfiguration::Selectors(self.selectors.to_vec()))
414            },
415            performance_configuration: Some(PerformanceConfiguration {
416                max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
417                batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
418                ..Default::default()
419            }),
420            ..Default::default()
421        };
422
423        archive
424            .stream_diagnostics(&stream_parameters, server_end)
425            .map_err(Error::StreamDiagnostics)?;
426        Ok(iterator)
427    }
428}
429
430impl ArchiveReader<Logs> {
431    /// Creates an ArchiveReader for reading logs
432    pub fn logs() -> Self {
433        ArchiveReader::<Logs> {
434            timeout: None,
435            format: None,
436            selectors: vec![],
437            retry_config: RetryConfig::always(),
438            archive: None,
439            batch_retrieval_timeout_seconds: None,
440            max_aggregated_content_size_bytes: None,
441            _phantom: PhantomData,
442        }
443    }
444
445    #[doc(hidden)]
446    pub fn with_format(&mut self, format: Format) -> &mut Self {
447        self.format = Some(format);
448        self
449    }
450
451    #[inline]
452    fn format(&self) -> Format {
453        match self.format {
454            Some(f) => f,
455            None => {
456                #[cfg(fuchsia_api_level_at_least = "HEAD")]
457                let ret = Format::Fxt;
458                #[cfg(fuchsia_api_level_less_than = "HEAD")]
459                let ret = Format::Json;
460                ret
461            }
462        }
463    }
464
465    /// Connects to the ArchiveAccessor and returns data matching provided selectors.
466    pub async fn snapshot(&self) -> Result<Vec<Data<Logs>>, Error> {
467        loop {
468            let iterator = self.batch_iterator::<Logs>(StreamMode::Snapshot, self.format())?;
469            let result = drain_batch_iterator_for_logs(Arc::new(iterator))
470                .filter_map(|value| ready(value.ok()))
471                .collect::<Vec<_>>()
472                .await;
473
474            if self.retry_config.should_retry(result.len()) {
475                fasync::Timer::new(fasync::MonotonicInstant::after(
476                    zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
477                ))
478                .await;
479            } else {
480                return Ok(result);
481            }
482        }
483    }
484
485    /// Connects to the ArchiveAccessor and returns a stream of data containing a snapshot of the
486    /// current buffer in the Archivist as well as new data that arrives.
487    pub fn snapshot_then_subscribe(&self) -> Result<Subscription, Error> {
488        let iterator =
489            self.batch_iterator::<Logs>(StreamMode::SnapshotThenSubscribe, self.format())?;
490        Ok(Subscription::new(iterator))
491    }
492}
493
494impl ArchiveReader<Inspect> {
495    /// Creates an ArchiveReader for reading Inspect data.
496    pub fn inspect() -> Self {
497        ArchiveReader::<Inspect> {
498            timeout: None,
499            format: None,
500            selectors: vec![],
501            retry_config: RetryConfig::always(),
502            archive: None,
503            batch_retrieval_timeout_seconds: None,
504            max_aggregated_content_size_bytes: None,
505            _phantom: PhantomData,
506        }
507    }
508
509    /// Set the maximum time to wait for a wait for a single component
510    /// to have its diagnostics data "pumped".
511    pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
512        self.batch_retrieval_timeout_seconds = Some(timeout);
513        self
514    }
515
516    /// Sets the total number of bytes allowed in a single VMO read.
517    pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
518        self.max_aggregated_content_size_bytes = Some(limit_bytes);
519        self
520    }
521
522    /// Connects to the ArchiveAccessor and returns inspect data matching provided selectors.
523    /// Returns the raw json for each hierarchy fetched. This is used for CTF compatibility
524    /// tests (which test various implementation details of the JSON format),
525    /// and use beyond such tests is discouraged.
526    pub async fn snapshot_raw<T>(&self) -> Result<T, Error>
527    where
528        T: for<'a> Deserialize<'a>
529            + SerializableValue
530            + From<Vec<T>>
531            + CheckResponse
532            + 'static
533            + Send,
534    {
535        let data_future = self.snapshot_inner::<Inspect, T>(T::FORMAT_OF_VALUE);
536        let data = match self.timeout {
537            Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
538            None => data_future.await?,
539        };
540        Ok(T::from(data))
541    }
542
543    /// Adds selectors used for performing filtering inspect hierarchies.
544    /// This may be called multiple times to add additional selectors.
545    pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
546    where
547        T: Iterator<Item = S>,
548        S: ToSelectorArguments,
549    {
550        for selector in selectors {
551            self.add_selector(selector);
552        }
553        self
554    }
555
556    /// Requests a single component tree (or sub-tree).
557    pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
558        self.selectors.extend(selector.to_selector_arguments());
559        self
560    }
561
562    /// Connects to the ArchiveAccessor and returns data matching provided selectors.
563    pub async fn snapshot(&self) -> Result<Vec<Data<Inspect>>, Error> {
564        self.snapshot_shared::<Inspect>().await
565    }
566}
567
568#[derive(Debug, Deserialize)]
569#[serde(untagged)]
570enum OneOrMany<T> {
571    Many(Vec<T>),
572    One(T),
573}
574
575fn stream_batch<T>(
576    iterator: Arc<BatchIteratorProxy>,
577    process_content: impl Fn(FormattedContent) -> Result<OneOrMany<T>, Error>,
578) -> impl Stream<Item = Result<T, Error>>
579where
580    T: for<'a> Deserialize<'a> + Send + 'static,
581{
582    stream! {
583        loop {
584            let next_batch = iterator
585                .get_next()
586                .await
587                .map_err(Error::GetNextCall)?
588                .map_err(Error::GetNextReaderError)?;
589            if next_batch.is_empty() {
590                // End of stream
591                return;
592            }
593            for formatted_content in next_batch {
594                let output = process_content(formatted_content)?;
595                match output {
596                    OneOrMany::One(data) => yield Ok(data),
597                    OneOrMany::Many(datas) => {
598                        for data in datas {
599                            yield Ok(data);
600                        }
601                    }
602                }
603            }
604        }
605    }
606}
607
608/// Drain a batch iterator.
609pub fn drain_batch_iterator<T>(
610    iterator: Arc<BatchIteratorProxy>,
611) -> impl Stream<Item = Result<T, Error>>
612where
613    T: for<'a> Deserialize<'a> + Send + 'static,
614{
615    stream_batch(iterator, |formatted_content| match formatted_content {
616        FormattedContent::Json(data) => {
617            let mut buf = vec![0; data.size as usize];
618            data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
619            serde_json::from_slice(&buf).map_err(Error::ReadJson)
620        }
621        #[cfg(fuchsia_api_level_at_least = "HEAD")]
622        FormattedContent::Cbor(vmo) => {
623            let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
624            vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
625            Ok(ciborium::from_reader(buf.as_slice()).map_err(|err| Error::ReadCbor(err.into()))?)
626        }
627        #[cfg(fuchsia_api_level_at_least = "HEAD")]
628        FormattedContent::Fxt(_) => unreachable!("We never expect FXT for Inspect"),
629        FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
630            unreachable!("Received unrecognized FIDL message")
631        }
632    })
633}
634
635fn drain_batch_iterator_for_logs(
636    iterator: Arc<BatchIteratorProxy>,
637) -> impl Stream<Item = Result<LogsData, Error>> {
638    #[cfg(fuchsia_api_level_at_least = "HEAD")]
639    let parser = Arc::new(std::sync::Mutex::new(diagnostics_message::MessageParser::new()));
640    stream_batch::<LogsData>(iterator, move |formatted_content| match formatted_content {
641        FormattedContent::Json(data) => {
642            let mut buf = vec![0; data.size as usize];
643            data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
644            serde_json::from_slice(&buf).map_err(Error::ReadJson)
645        }
646        #[cfg(fuchsia_api_level_at_least = "HEAD")]
647        FormattedContent::Fxt(vmo) => {
648            let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
649            vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
650            let mut current_slice: &[u8] = &buf;
651            let mut items = vec![];
652            let mut parser = parser.lock().map_err(|_| Error::MutexError)?;
653
654            while !current_slice.is_empty() {
655                match parser.parse_next(current_slice) {
656                    Ok((maybe_data, remaining)) => {
657                        if remaining.len() == current_slice.len() {
658                            return Err(Error::ParserStuck);
659                        }
660                        if let Some(data) = maybe_data {
661                            items.push(data);
662                        }
663                        current_slice = remaining;
664                    }
665                    Err(_) => {
666                        // This can happen if we are reading a truncated record.
667                        // Stop parsing this buffer.
668                        break;
669                    }
670                }
671            }
672            Ok(OneOrMany::Many(items))
673        }
674        #[cfg(fuchsia_api_level_at_least = "HEAD")]
675        FormattedContent::Cbor(_) => unreachable!("We never expect CBOR"),
676        FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
677            unreachable!("Received unrecognized FIDL message")
678        }
679    })
680}
681
682/// A subscription used for reading logs.
683#[pin_project]
684pub struct Subscription {
685    #[pin]
686    recv: Pin<Box<dyn FusedStream<Item = Result<LogsData, Error>> + Send>>,
687    iterator: Arc<BatchIteratorProxy>,
688}
689
690const DATA_CHANNEL_SIZE: usize = 32;
691const ERROR_CHANNEL_SIZE: usize = 2;
692
693impl Subscription {
694    /// Creates a new subscription stream to a batch iterator.
695    /// The stream will return diagnostics data structures.
696    pub fn new(iterator: BatchIteratorProxy) -> Self {
697        let iterator = Arc::new(iterator);
698        Subscription {
699            recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone()).fuse()),
700            iterator,
701        }
702    }
703
704    /// Wait for the connection with the server to be established.
705    pub async fn wait_for_ready(&self) {
706        self.iterator.wait_for_ready().await.expect("doesn't disconnect");
707    }
708
709    /// Splits the subscription into two separate streams: results and errors.
710    pub fn split_streams(mut self) -> (SubscriptionResultsStream<LogsData>, mpsc::Receiver<Error>) {
711        let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
712        let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
713        let _drain_task = fasync::Task::spawn(async move {
714            while let Some(result) = self.next().await {
715                match result {
716                    Ok(value) => results_sender.send(value).await.ok(),
717                    Err(e) => errors_sender.send(e).await.ok(),
718                };
719            }
720        });
721        (SubscriptionResultsStream { recv, _drain_task }, errors)
722    }
723}
724
725impl Stream for Subscription {
726    type Item = Result<LogsData, Error>;
727
728    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
729        let this = self.project();
730        this.recv.poll_next(cx)
731    }
732}
733
734impl FusedStream for Subscription {
735    fn is_terminated(&self) -> bool {
736        self.recv.is_terminated()
737    }
738}
739
740/// A stream for reading diagnostics data
741#[pin_project]
742pub struct SubscriptionResultsStream<T> {
743    #[pin]
744    recv: mpsc::Receiver<T>,
745    _drain_task: fasync::Task<()>,
746}
747
748impl<T> Stream for SubscriptionResultsStream<T>
749where
750    T: for<'a> Deserialize<'a>,
751{
752    type Item = T;
753
754    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
755        let this = self.project();
756        this.recv.poll_next(cx)
757    }
758}
759
760impl<T> FusedStream for SubscriptionResultsStream<T>
761where
762    T: for<'a> Deserialize<'a>,
763{
764    fn is_terminated(&self) -> bool {
765        self.recv.is_terminated()
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use super::*;
772    use assert_matches::assert_matches;
773    use diagnostics_assertions::assert_data_tree;
774    use diagnostics_log::{Publisher, PublisherOptions};
775    use fidl::endpoints::ServerEnd;
776    use fidl_fuchsia_diagnostics as fdiagnostics;
777    use fuchsia_component_test::{
778        Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
779    };
780    use futures::TryStreamExt;
781    use log::{error, info};
782
783    const TEST_COMPONENT_URL: &str = "#meta/inspect_test_component.cm";
784
785    struct ComponentOptions {
786        publish_n_trees: u64,
787    }
788
789    async fn start_component(opts: ComponentOptions) -> Result<RealmInstance, anyhow::Error> {
790        let builder = RealmBuilder::new().await?;
791        let test_component = builder
792            .add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
793            .await?;
794        builder
795            .add_route(
796                Route::new()
797                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
798                    .from(Ref::parent())
799                    .to(&test_component),
800            )
801            .await?;
802        builder.init_mutable_config_to_empty(&test_component).await.unwrap();
803        builder
804            .set_config_value(&test_component, "publish_n_trees", opts.publish_n_trees.into())
805            .await
806            .unwrap();
807        let instance = builder.build().await?;
808        Ok(instance)
809    }
810
811    // All selectors in this test select against all tree names, in order to ensure the expected
812    // number of trees are published
813    #[fuchsia::test]
814    async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
815        let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
816        let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
817        let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
818        let results = ArchiveReader::inspect()
819            .add_selector(format!("{component_selector}:[...]root"))
820            .snapshot()
821            .await?;
822        assert_eq!(results.len(), 1);
823        assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
824            "tree-0": 0u64,
825            int: 3u64,
826            "lazy-node": {
827                a: "test",
828                child: {
829                    double: 3.25,
830                },
831            }
832        });
833        // add_selector can take either a String or a Selector.
834        let lazy_property_selector = Selector {
835            component_selector: Some(fdiagnostics::ComponentSelector {
836                moniker_segments: Some(vec![
837                    fdiagnostics::StringSelector::ExactMatch(format!(
838                        "realm_builder:{}",
839                        instance.root.child_name()
840                    )),
841                    fdiagnostics::StringSelector::ExactMatch("test_component".into()),
842                ]),
843                ..Default::default()
844            }),
845            tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
846                fdiagnostics::PropertySelector {
847                    node_path: vec![
848                        fdiagnostics::StringSelector::ExactMatch("root".into()),
849                        fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
850                    ],
851                    target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
852                },
853            )),
854            tree_names: Some(fdiagnostics::TreeNames::All(fdiagnostics::All {})),
855            ..Default::default()
856        };
857        let int_property_selector = format!("{component_selector}:[...]root:int");
858        let mut reader = ArchiveReader::inspect();
859        reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
860        let response = reader.snapshot().await?;
861        assert_eq!(response.len(), 1);
862        assert_eq!(response[0].moniker.to_string(), moniker);
863        assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
864            int: 3u64,
865            "lazy-node": {
866                a: "test"
867            }
868        });
869        Ok(())
870    }
871
872    #[fuchsia::test]
873    async fn select_all_for_moniker() {
874        let instance = start_component(ComponentOptions { publish_n_trees: 1 })
875            .await
876            .expect("component started");
877        let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
878        let results = ArchiveReader::inspect()
879            .select_all_for_component(moniker)
880            .snapshot()
881            .await
882            .expect("snapshotted");
883        assert_eq!(results.len(), 1);
884        assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
885            "tree-0": 0u64,
886            int: 3u64,
887            "lazy-node": {
888                a: "test",
889                child: {
890                    double: 3.25,
891                },
892            }
893        });
894    }
895
896    #[fuchsia::test]
897    async fn timeout() -> Result<(), anyhow::Error> {
898        let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
899
900        let mut reader = ArchiveReader::inspect();
901        reader
902            .add_selector(format!(
903                "realm_builder\\:{}/test_component:root",
904                instance.root.child_name()
905            ))
906            .with_timeout(zx::MonotonicDuration::from_nanos(0));
907        let result = reader.snapshot().await;
908        assert!(result.unwrap().is_empty());
909        Ok(())
910    }
911
912    #[fuchsia::test]
913    async fn component_selector() {
914        let selector = ComponentSelector::new(vec!["a".to_string()]);
915        assert_eq!(selector.moniker_str(), "a");
916        let arguments: Vec<_> = selector.to_selector_arguments().collect();
917        assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
918
919        let selector =
920            ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
921        assert_eq!(selector.moniker_str(), "b/c/a");
922
923        let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
924        let arguments: Vec<_> = selector.to_selector_arguments().collect();
925        assert_eq!(
926            arguments,
927            vec![
928                SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
929                SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
930            ]
931        );
932    }
933
934    #[fuchsia::test]
935    async fn custom_archive() {
936        let proxy = spawn_fake_archive(serde_json::json!({
937            "moniker": "moniker",
938            "version": 1,
939            "data_source": "Inspect",
940            "metadata": {
941              "component_url": "component-url",
942              "timestamp": 0,
943              "filename": "filename",
944            },
945            "payload": {
946                "root": {
947                    "x": 1,
948                }
949            }
950        }));
951        let result =
952            ArchiveReader::inspect().with_archive(proxy).snapshot().await.expect("got result");
953        assert_eq!(result.len(), 1);
954        assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
955    }
956
957    #[fuchsia::test]
958    async fn handles_lists_correctly_on_snapshot_raw() {
959        let value = serde_json::json!({
960            "moniker": "moniker",
961            "version": 1,
962            "data_source": "Inspect",
963            "metadata": {
964            "component_url": "component-url",
965            "timestamp": 0,
966            "filename": "filename",
967            },
968            "payload": {
969                "root": {
970                    "x": 1,
971                }
972            }
973        });
974        let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
975        let mut reader = ArchiveReader::inspect();
976        reader.with_archive(proxy);
977        let json_result = reader.snapshot_raw::<serde_json::Value>().await.expect("got result");
978        match json_result {
979            serde_json::Value::Array(values) => {
980                assert_eq!(values.len(), 1);
981                assert_eq!(values[0], value);
982            }
983            result => panic!("unexpected result: {result:?}"),
984        }
985        let cbor_result = reader.snapshot_raw::<ciborium::Value>().await.expect("got result");
986        match cbor_result {
987            ciborium::Value::Array(values) => {
988                assert_eq!(values.len(), 1);
989                let json_result =
990                    values[0].deserialized::<serde_json::Value>().expect("convert to json");
991                assert_eq!(json_result, value);
992            }
993            result => panic!("unexpected result: {result:?}"),
994        }
995    }
996
997    #[fuchsia::test(logging = false)]
998    async fn snapshot_then_subscribe() {
999        let (_instance, publisher, reader) = init_isolated_logging().await;
1000        let (mut stream, _errors) =
1001            reader.snapshot_then_subscribe().expect("subscribed to logs").split_streams();
1002        publisher.register_logger(None).unwrap();
1003        info!("hello from test");
1004        error!("error from test");
1005        let log = stream.next().await.unwrap();
1006        assert_eq!(log.msg().unwrap(), "hello from test");
1007        let log = stream.next().await.unwrap();
1008        assert_eq!(log.msg().unwrap(), "error from test");
1009    }
1010
1011    #[fuchsia::test]
1012    async fn read_many_trees_with_filtering() {
1013        let instance = start_component(ComponentOptions { publish_n_trees: 2 })
1014            .await
1015            .expect("component started");
1016        let selector = format!(
1017            "realm_builder\\:{}/test_component:[name=tree-0]root",
1018            instance.root.child_name()
1019        );
1020        let results = ArchiveReader::inspect()
1021            .add_selector(selector)
1022            // Only one schema since empty schemas are filtered out
1023            .with_minimum_schema_count(1)
1024            .snapshot()
1025            .await
1026            .expect("snapshotted");
1027        assert_matches!(results.iter().find(|v| v.metadata.name.as_ref() == "tree-1"), None);
1028        let should_have_data =
1029            results.into_iter().find(|v| v.metadata.name.as_ref() == "tree-0").unwrap();
1030        assert_data_tree!(should_have_data.payload.unwrap(), root: contains {
1031            "tree-0": 0u64,
1032        });
1033    }
1034
1035    fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
1036        let (proxy, mut stream) =
1037            fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>();
1038        fasync::Task::spawn(async move {
1039            while let Some(request) = stream.try_next().await.expect("stream request") {
1040                match request {
1041                    fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
1042                        result_stream,
1043                        ..
1044                    } => {
1045                        let data = data_to_send.clone();
1046                        fasync::Task::spawn(handle_batch_iterator(data, result_stream)).detach();
1047                    }
1048                    fdiagnostics::ArchiveAccessorRequest::WaitForReady { responder, .. } => {
1049                        let _ = responder.send();
1050                    }
1051                    fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
1052                        unreachable!("Unexpected method call");
1053                    }
1054                }
1055            }
1056        })
1057        .detach();
1058        proxy
1059    }
1060
1061    async fn handle_batch_iterator(
1062        data: serde_json::Value,
1063        result_stream: ServerEnd<fdiagnostics::BatchIteratorMarker>,
1064    ) {
1065        let mut called = false;
1066        let mut stream = result_stream.into_stream();
1067        while let Some(req) = stream.try_next().await.expect("stream request") {
1068            match req {
1069                fdiagnostics::BatchIteratorRequest::WaitForReady { responder } => {
1070                    let _ = responder.send();
1071                }
1072                fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
1073                    if called {
1074                        responder.send(Ok(Vec::new())).expect("send response");
1075                        continue;
1076                    }
1077                    called = true;
1078                    let content = serde_json::to_string_pretty(&data).expect("json pretty");
1079                    let vmo_size = content.len() as u64;
1080                    let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
1081                    vmo.write(content.as_bytes(), 0).expect("write vmo");
1082                    let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
1083                    responder
1084                        .send(Ok(vec![fdiagnostics::FormattedContent::Json(buffer)]))
1085                        .expect("send response");
1086                }
1087                fdiagnostics::BatchIteratorRequest::_UnknownMethod { .. } => {
1088                    unreachable!("Unexpected method call");
1089                }
1090            }
1091        }
1092    }
1093
1094    async fn create_realm() -> RealmBuilder {
1095        let builder = RealmBuilder::new().await.expect("create realm builder");
1096        let archivist = builder
1097            .add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
1098            .await
1099            .expect("add child archivist");
1100        builder
1101            .add_route(
1102                Route::new()
1103                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1104                    .capability(
1105                        Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
1106                            .optional(),
1107                    )
1108                    .capability(Capability::event_stream("stopped"))
1109                    .capability(Capability::event_stream("capability_requested"))
1110                    .from(Ref::parent())
1111                    .to(&archivist),
1112            )
1113            .await
1114            .expect("added routes from parent to archivist");
1115        builder
1116            .add_route(
1117                Route::new()
1118                    .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1119                    .from(&archivist)
1120                    .to(Ref::parent()),
1121            )
1122            .await
1123            .expect("routed LogSink from archivist to parent");
1124        builder
1125            .add_route(
1126                Route::new()
1127                    .capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
1128                    .from(Ref::dictionary(&archivist, "diagnostics-accessors"))
1129                    .to(Ref::parent()),
1130            )
1131            .await
1132            .expect("routed ArchiveAccessor from archivist to parent");
1133        builder
1134    }
1135
1136    async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader<Logs>) {
1137        let instance = create_realm().await.build().await.unwrap();
1138        let log_sink_client = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1139        let accessor_proxy = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1140        let mut reader = ArchiveReader::logs();
1141        reader.with_archive(accessor_proxy);
1142        let options = PublisherOptions::default().use_log_sink(log_sink_client);
1143        let publisher = Publisher::new_async(options).await.unwrap();
1144        (instance, publisher, reader)
1145    }
1146
1147    #[fuchsia::test]
1148    fn retry_config_behavior() {
1149        let config = RetryConfig::MinSchemaCount(1);
1150        let got = 0;
1151
1152        assert!(config.should_retry(got));
1153
1154        let config = RetryConfig::MinSchemaCount(1);
1155        let got = 1;
1156
1157        assert!(!config.should_retry(got));
1158
1159        let config = RetryConfig::MinSchemaCount(1);
1160        let got = 2;
1161
1162        assert!(!config.should_retry(got));
1163
1164        let config = RetryConfig::MinSchemaCount(0);
1165        let got = 1;
1166
1167        assert!(!config.should_retry(got));
1168
1169        let config = RetryConfig::always();
1170        let got = 0;
1171
1172        assert!(config.should_retry(got));
1173
1174        let config = RetryConfig::never();
1175        let got = 0;
1176
1177        assert!(!config.should_retry(got));
1178    }
1179}