1#![deny(missing_docs)]
6
7use async_stream::stream;
11use diagnostics_data::{DiagnosticsData, LogsData};
12#[cfg(fuchsia_api_level_less_than = "HEAD")]
13use diagnostics_message as _;
14use fidl_fuchsia_diagnostics::{
15 ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
16 ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
17 Selector, SelectorArgument, StreamMode, StreamParameters,
18};
19use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
20use fuchsia_component::client;
21use futures::channel::mpsc;
22use futures::prelude::*;
23use futures::sink::SinkExt;
24use futures::stream::FusedStream;
25use pin_project::pin_project;
26use serde::Deserialize;
27use std::future::ready;
28use std::marker::PhantomData;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32use thiserror::Error;
33use zx::{self as zx, MonotonicDuration};
34
35pub type LogsArchiveReader = ArchiveReader<Logs>;
37
38pub type InspectArchiveReader = ArchiveReader<Inspect>;
40
41pub use diagnostics_data::{Data, Inspect, Logs, Severity};
42pub use diagnostics_hierarchy::{DiagnosticsHierarchy, Property, hierarchy};
43
44const RETRY_DELAY_MS: i64 = 300;
45
46#[cfg(fuchsia_api_level_at_least = "HEAD")]
47const FORMAT: Format = Format::Cbor;
48#[cfg(fuchsia_api_level_less_than = "HEAD")]
49const FORMAT: Format = Format::Json;
50
51#[derive(Debug, Error)]
53pub enum Error {
54 #[error("Failed to connect to the archive accessor")]
56 ConnectToArchive(#[source] anyhow::Error),
57
58 #[error("Failed to create the BatchIterator channel ends")]
60 CreateIteratorProxy(#[source] fidl::Error),
61
62 #[error("Failed to stream diagnostics from the accessor")]
64 StreamDiagnostics(#[source] fidl::Error),
65
66 #[error("Failed to call iterator server")]
68 GetNextCall(#[source] fidl::Error),
69
70 #[error("Received error from the GetNext response: {0:?}")]
72 GetNextReaderError(ReaderError),
73
74 #[error("Failed to read json received")]
76 ReadJson(#[source] serde_json::Error),
77
78 #[cfg(fuchsia_api_level_at_least = "HEAD")]
80 #[error("Failed to read cbor received")]
81 ReadCbor(#[source] anyhow::Error),
82
83 #[error("Failed to parse the diagnostics data from the json received")]
85 ParseDiagnosticsData(#[source] serde_json::Error),
86
87 #[error("Failed to read vmo from the response")]
89 ReadVmo(#[source] zx::Status),
90
91 #[error("Parser got stuck or failed to advance")]
93 ParserStuck,
94 #[cfg(fuchsia_api_level_at_least = "HEAD")]
96 #[error("Failed to acquire mutex")]
97 MutexError,
98}
99
100pub struct ComponentSelector {
102 moniker: Vec<String>,
103 tree_selectors: Vec<String>,
104}
105
106impl ComponentSelector {
107 pub fn new(moniker: Vec<String>) -> Self {
112 Self { moniker, tree_selectors: Vec::new() }
113 }
114
115 pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
117 self.tree_selectors.push(tree_selector.into());
118 self
119 }
120
121 fn moniker_str(&self) -> String {
122 self.moniker.join("/")
123 }
124}
125
126pub trait ToSelectorArguments {
128 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
130}
131
132pub trait ToComponentSelectorArguments {
134 fn to_component_selector_arguments(self) -> ComponentSelector;
136}
137
138impl ToComponentSelectorArguments for &str {
139 fn to_component_selector_arguments(self) -> ComponentSelector {
140 if self.contains("\\:") {
141 ComponentSelector::new(self.split("/").map(|value| value.to_string()).collect())
143 } else {
144 ComponentSelector::new(
146 selectors::sanitize_moniker_for_selectors(self)
147 .split("/")
148 .map(|value| value.to_string())
149 .collect(),
150 )
151 .with_tree_selector("[...]root")
152 }
153 }
154}
155
156impl ToComponentSelectorArguments for String {
157 fn to_component_selector_arguments(self) -> ComponentSelector {
158 self.as_str().to_component_selector_arguments()
159 }
160}
161
162impl ToComponentSelectorArguments for ComponentSelector {
163 fn to_component_selector_arguments(self) -> ComponentSelector {
164 self
165 }
166}
167
168impl ToSelectorArguments for String {
169 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
170 Box::new([SelectorArgument::RawSelector(self)].into_iter())
171 }
172}
173
174impl ToSelectorArguments for &str {
175 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
176 Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
177 }
178}
179
180impl ToSelectorArguments for ComponentSelector {
181 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
182 let moniker = self.moniker_str();
183 if self.tree_selectors.is_empty() {
185 Box::new([SelectorArgument::RawSelector(format!("{moniker}:root"))].into_iter())
186 } else {
187 Box::new(
188 self.tree_selectors
189 .into_iter()
190 .map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
191 )
192 }
193 }
194}
195
196impl ToSelectorArguments for Selector {
197 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
198 Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
199 }
200}
201
202pub trait SerializableValue: private::Sealed {
204 const FORMAT_OF_VALUE: Format;
206}
207
208pub trait CheckResponse: private::Sealed {
210 fn has_payload(&self) -> bool;
212}
213
214mod private {
218 pub trait Sealed {}
219}
220impl private::Sealed for serde_json::Value {}
221impl private::Sealed for ciborium::Value {}
222impl<D: DiagnosticsData> private::Sealed for Data<D> {}
223
224impl<D: DiagnosticsData> CheckResponse for Data<D> {
225 fn has_payload(&self) -> bool {
226 self.payload.is_some()
227 }
228}
229
230impl SerializableValue for serde_json::Value {
231 const FORMAT_OF_VALUE: Format = Format::Json;
232}
233
234impl CheckResponse for serde_json::Value {
235 fn has_payload(&self) -> bool {
236 match self {
237 serde_json::Value::Object(obj) => {
238 obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
239 }
240 _ => false,
241 }
242 }
243}
244
245#[cfg(fuchsia_api_level_at_least = "HEAD")]
246impl SerializableValue for ciborium::Value {
247 const FORMAT_OF_VALUE: Format = Format::Cbor;
248}
249
250impl CheckResponse for ciborium::Value {
251 fn has_payload(&self) -> bool {
252 match self {
253 ciborium::Value::Map(m) => {
254 let payload_key = ciborium::Value::Text("payload".into());
255 m.iter().any(|(key, _)| *key == payload_key)
256 }
257 _ => false,
258 }
259 }
260}
261
262#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
264pub enum RetryConfig {
265 MinSchemaCount(usize),
269}
270
271impl RetryConfig {
272 pub fn always() -> Self {
274 Self::MinSchemaCount(1)
275 }
276
277 pub fn never() -> Self {
279 Self::MinSchemaCount(0)
280 }
281
282 fn should_retry(&self, result_count: usize) -> bool {
284 match self {
285 Self::MinSchemaCount(min) => *min > result_count,
286 }
287 }
288}
289
290pub trait DiagnosticsDataType: private::Sealed {}
292
293impl private::Sealed for Logs {}
294
295impl private::Sealed for Inspect {}
296
297impl DiagnosticsDataType for Logs {}
298
299impl DiagnosticsDataType for Inspect {}
300
301pub struct ArchiveReader<T> {
304 archive: Option<ArchiveAccessorProxy>,
305 selectors: Vec<SelectorArgument>,
306 retry_config: RetryConfig,
307 timeout: Option<MonotonicDuration>,
308 batch_retrieval_timeout_seconds: Option<i64>,
309 max_aggregated_content_size_bytes: Option<u64>,
310 format: Option<Format>,
311 _phantom: PhantomData<T>,
312}
313
314impl<T: DiagnosticsDataType> ArchiveReader<T> {
315 pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
319 self.archive = Some(archive);
320 self
321 }
322
323 pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
326 self.retry_config = RetryConfig::MinSchemaCount(minimum_schema_count);
327 self
328 }
329
330 pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
332 self.retry_config = config;
333 self
334 }
335
336 pub fn with_timeout(&mut self, duration: MonotonicDuration) -> &mut Self {
339 self.timeout = Some(duration);
340 self
341 }
342
343 pub fn select_all_for_component(
347 &mut self,
348 component: impl ToComponentSelectorArguments,
349 ) -> &mut Self {
350 self.selectors.extend(component.to_component_selector_arguments().to_selector_arguments());
351 self
352 }
353
354 async fn snapshot_shared<D>(&self) -> Result<Vec<Data<D>>, Error>
356 where
357 D: DiagnosticsData + 'static,
358 {
359 let data_future = self.snapshot_inner::<D, Data<D>>(FORMAT);
360 let data = match self.timeout {
361 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
362 None => data_future.await?,
363 };
364 Ok(data)
365 }
366
367 async fn snapshot_inner<D, Y>(&self, format: Format) -> Result<Vec<Y>, Error>
368 where
369 D: DiagnosticsData,
370 Y: for<'a> Deserialize<'a> + CheckResponse + Send + 'static,
371 {
372 loop {
373 let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
374 let result = drain_batch_iterator::<Y>(Arc::new(iterator))
375 .filter_map(|value| ready(value.ok()))
376 .collect::<Vec<_>>()
377 .await;
378
379 if self.retry_config.should_retry(result.len()) {
380 fasync::Timer::new(fasync::MonotonicInstant::after(
381 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
382 ))
383 .await;
384 } else {
385 return Ok(result);
386 }
387 }
388 }
389
390 fn batch_iterator<D>(
391 &self,
392 mode: StreamMode,
393 format: Format,
394 ) -> Result<BatchIteratorProxy, Error>
395 where
396 D: DiagnosticsData,
397 {
398 let archive = match &self.archive {
399 Some(archive) => archive.clone(),
400 None => client::connect_to_protocol::<ArchiveAccessorMarker>()
401 .map_err(Error::ConnectToArchive)?,
402 };
403
404 let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
405
406 let stream_parameters = StreamParameters {
407 stream_mode: Some(mode),
408 data_type: Some(D::DATA_TYPE),
409 format: Some(format),
410 client_selector_configuration: if self.selectors.is_empty() {
411 Some(ClientSelectorConfiguration::SelectAll(true))
412 } else {
413 Some(ClientSelectorConfiguration::Selectors(self.selectors.to_vec()))
414 },
415 performance_configuration: Some(PerformanceConfiguration {
416 max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
417 batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
418 ..Default::default()
419 }),
420 ..Default::default()
421 };
422
423 archive
424 .stream_diagnostics(&stream_parameters, server_end)
425 .map_err(Error::StreamDiagnostics)?;
426 Ok(iterator)
427 }
428}
429
430impl ArchiveReader<Logs> {
431 pub fn logs() -> Self {
433 ArchiveReader::<Logs> {
434 timeout: None,
435 format: None,
436 selectors: vec![],
437 retry_config: RetryConfig::always(),
438 archive: None,
439 batch_retrieval_timeout_seconds: None,
440 max_aggregated_content_size_bytes: None,
441 _phantom: PhantomData,
442 }
443 }
444
445 #[doc(hidden)]
446 pub fn with_format(&mut self, format: Format) -> &mut Self {
447 self.format = Some(format);
448 self
449 }
450
451 #[inline]
452 fn format(&self) -> Format {
453 match self.format {
454 Some(f) => f,
455 None => {
456 #[cfg(fuchsia_api_level_at_least = "HEAD")]
457 let ret = Format::Fxt;
458 #[cfg(fuchsia_api_level_less_than = "HEAD")]
459 let ret = Format::Json;
460 ret
461 }
462 }
463 }
464
465 pub async fn snapshot(&self) -> Result<Vec<Data<Logs>>, Error> {
467 loop {
468 let iterator = self.batch_iterator::<Logs>(StreamMode::Snapshot, self.format())?;
469 let result = drain_batch_iterator_for_logs(Arc::new(iterator))
470 .filter_map(|value| ready(value.ok()))
471 .collect::<Vec<_>>()
472 .await;
473
474 if self.retry_config.should_retry(result.len()) {
475 fasync::Timer::new(fasync::MonotonicInstant::after(
476 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
477 ))
478 .await;
479 } else {
480 return Ok(result);
481 }
482 }
483 }
484
485 pub fn snapshot_then_subscribe(&self) -> Result<Subscription, Error> {
488 let iterator =
489 self.batch_iterator::<Logs>(StreamMode::SnapshotThenSubscribe, self.format())?;
490 Ok(Subscription::new(iterator))
491 }
492}
493
494impl ArchiveReader<Inspect> {
495 pub fn inspect() -> Self {
497 ArchiveReader::<Inspect> {
498 timeout: None,
499 format: None,
500 selectors: vec![],
501 retry_config: RetryConfig::always(),
502 archive: None,
503 batch_retrieval_timeout_seconds: None,
504 max_aggregated_content_size_bytes: None,
505 _phantom: PhantomData,
506 }
507 }
508
509 pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
512 self.batch_retrieval_timeout_seconds = Some(timeout);
513 self
514 }
515
516 pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
518 self.max_aggregated_content_size_bytes = Some(limit_bytes);
519 self
520 }
521
522 pub async fn snapshot_raw<T>(&self) -> Result<T, Error>
527 where
528 T: for<'a> Deserialize<'a>
529 + SerializableValue
530 + From<Vec<T>>
531 + CheckResponse
532 + 'static
533 + Send,
534 {
535 let data_future = self.snapshot_inner::<Inspect, T>(T::FORMAT_OF_VALUE);
536 let data = match self.timeout {
537 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
538 None => data_future.await?,
539 };
540 Ok(T::from(data))
541 }
542
543 pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
546 where
547 T: Iterator<Item = S>,
548 S: ToSelectorArguments,
549 {
550 for selector in selectors {
551 self.add_selector(selector);
552 }
553 self
554 }
555
556 pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
558 self.selectors.extend(selector.to_selector_arguments());
559 self
560 }
561
562 pub async fn snapshot(&self) -> Result<Vec<Data<Inspect>>, Error> {
564 self.snapshot_shared::<Inspect>().await
565 }
566}
567
568#[derive(Debug, Deserialize)]
569#[serde(untagged)]
570enum OneOrMany<T> {
571 Many(Vec<T>),
572 One(T),
573}
574
575fn stream_batch<T>(
576 iterator: Arc<BatchIteratorProxy>,
577 process_content: impl Fn(FormattedContent) -> Result<OneOrMany<T>, Error>,
578) -> impl Stream<Item = Result<T, Error>>
579where
580 T: for<'a> Deserialize<'a> + Send + 'static,
581{
582 stream! {
583 loop {
584 let next_batch = iterator
585 .get_next()
586 .await
587 .map_err(Error::GetNextCall)?
588 .map_err(Error::GetNextReaderError)?;
589 if next_batch.is_empty() {
590 return;
592 }
593 for formatted_content in next_batch {
594 let output = process_content(formatted_content)?;
595 match output {
596 OneOrMany::One(data) => yield Ok(data),
597 OneOrMany::Many(datas) => {
598 for data in datas {
599 yield Ok(data);
600 }
601 }
602 }
603 }
604 }
605 }
606}
607
608pub fn drain_batch_iterator<T>(
610 iterator: Arc<BatchIteratorProxy>,
611) -> impl Stream<Item = Result<T, Error>>
612where
613 T: for<'a> Deserialize<'a> + Send + 'static,
614{
615 stream_batch(iterator, |formatted_content| match formatted_content {
616 FormattedContent::Json(data) => {
617 let mut buf = vec![0; data.size as usize];
618 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
619 serde_json::from_slice(&buf).map_err(Error::ReadJson)
620 }
621 #[cfg(fuchsia_api_level_at_least = "HEAD")]
622 FormattedContent::Cbor(vmo) => {
623 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
624 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
625 Ok(ciborium::from_reader(buf.as_slice()).map_err(|err| Error::ReadCbor(err.into()))?)
626 }
627 #[cfg(fuchsia_api_level_at_least = "HEAD")]
628 FormattedContent::Fxt(_) => unreachable!("We never expect FXT for Inspect"),
629 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
630 unreachable!("Received unrecognized FIDL message")
631 }
632 })
633}
634
635fn drain_batch_iterator_for_logs(
636 iterator: Arc<BatchIteratorProxy>,
637) -> impl Stream<Item = Result<LogsData, Error>> {
638 #[cfg(fuchsia_api_level_at_least = "HEAD")]
639 let parser = Arc::new(std::sync::Mutex::new(diagnostics_message::MessageParser::new()));
640 stream_batch::<LogsData>(iterator, move |formatted_content| match formatted_content {
641 FormattedContent::Json(data) => {
642 let mut buf = vec![0; data.size as usize];
643 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
644 serde_json::from_slice(&buf).map_err(Error::ReadJson)
645 }
646 #[cfg(fuchsia_api_level_at_least = "HEAD")]
647 FormattedContent::Fxt(vmo) => {
648 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
649 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
650 let mut current_slice: &[u8] = &buf;
651 let mut items = vec![];
652 let mut parser = parser.lock().map_err(|_| Error::MutexError)?;
653
654 while !current_slice.is_empty() {
655 match parser.parse_next(current_slice) {
656 Ok((maybe_data, remaining)) => {
657 if remaining.len() == current_slice.len() {
658 return Err(Error::ParserStuck);
659 }
660 if let Some(data) = maybe_data {
661 items.push(data);
662 }
663 current_slice = remaining;
664 }
665 Err(_) => {
666 break;
669 }
670 }
671 }
672 Ok(OneOrMany::Many(items))
673 }
674 #[cfg(fuchsia_api_level_at_least = "HEAD")]
675 FormattedContent::Cbor(_) => unreachable!("We never expect CBOR"),
676 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
677 unreachable!("Received unrecognized FIDL message")
678 }
679 })
680}
681
682#[pin_project]
684pub struct Subscription {
685 #[pin]
686 recv: Pin<Box<dyn FusedStream<Item = Result<LogsData, Error>> + Send>>,
687 iterator: Arc<BatchIteratorProxy>,
688}
689
690const DATA_CHANNEL_SIZE: usize = 32;
691const ERROR_CHANNEL_SIZE: usize = 2;
692
693impl Subscription {
694 pub fn new(iterator: BatchIteratorProxy) -> Self {
697 let iterator = Arc::new(iterator);
698 Subscription {
699 recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone()).fuse()),
700 iterator,
701 }
702 }
703
704 pub async fn wait_for_ready(&self) {
706 self.iterator.wait_for_ready().await.expect("doesn't disconnect");
707 }
708
709 pub fn split_streams(mut self) -> (SubscriptionResultsStream<LogsData>, mpsc::Receiver<Error>) {
711 let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
712 let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
713 let _drain_task = fasync::Task::spawn(async move {
714 while let Some(result) = self.next().await {
715 match result {
716 Ok(value) => results_sender.send(value).await.ok(),
717 Err(e) => errors_sender.send(e).await.ok(),
718 };
719 }
720 });
721 (SubscriptionResultsStream { recv, _drain_task }, errors)
722 }
723}
724
725impl Stream for Subscription {
726 type Item = Result<LogsData, Error>;
727
728 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
729 let this = self.project();
730 this.recv.poll_next(cx)
731 }
732}
733
734impl FusedStream for Subscription {
735 fn is_terminated(&self) -> bool {
736 self.recv.is_terminated()
737 }
738}
739
740#[pin_project]
742pub struct SubscriptionResultsStream<T> {
743 #[pin]
744 recv: mpsc::Receiver<T>,
745 _drain_task: fasync::Task<()>,
746}
747
748impl<T> Stream for SubscriptionResultsStream<T>
749where
750 T: for<'a> Deserialize<'a>,
751{
752 type Item = T;
753
754 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
755 let this = self.project();
756 this.recv.poll_next(cx)
757 }
758}
759
760impl<T> FusedStream for SubscriptionResultsStream<T>
761where
762 T: for<'a> Deserialize<'a>,
763{
764 fn is_terminated(&self) -> bool {
765 self.recv.is_terminated()
766 }
767}
768
769#[cfg(test)]
770mod tests {
771 use super::*;
772 use assert_matches::assert_matches;
773 use diagnostics_assertions::assert_data_tree;
774 use diagnostics_log::{Publisher, PublisherOptions};
775 use fidl::endpoints::ServerEnd;
776 use fidl_fuchsia_diagnostics as fdiagnostics;
777 use fuchsia_component_test::{
778 Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
779 };
780 use futures::TryStreamExt;
781 use log::{error, info};
782
783 const TEST_COMPONENT_URL: &str = "#meta/inspect_test_component.cm";
784
785 struct ComponentOptions {
786 publish_n_trees: u64,
787 }
788
789 async fn start_component(opts: ComponentOptions) -> Result<RealmInstance, anyhow::Error> {
790 let builder = RealmBuilder::new().await?;
791 let test_component = builder
792 .add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
793 .await?;
794 builder
795 .add_route(
796 Route::new()
797 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
798 .from(Ref::parent())
799 .to(&test_component),
800 )
801 .await?;
802 builder.init_mutable_config_to_empty(&test_component).await.unwrap();
803 builder
804 .set_config_value(&test_component, "publish_n_trees", opts.publish_n_trees.into())
805 .await
806 .unwrap();
807 let instance = builder.build().await?;
808 Ok(instance)
809 }
810
811 #[fuchsia::test]
814 async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
815 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
816 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
817 let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
818 let results = ArchiveReader::inspect()
819 .add_selector(format!("{component_selector}:[...]root"))
820 .snapshot()
821 .await?;
822 assert_eq!(results.len(), 1);
823 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
824 "tree-0": 0u64,
825 int: 3u64,
826 "lazy-node": {
827 a: "test",
828 child: {
829 double: 3.25,
830 },
831 }
832 });
833 let lazy_property_selector = Selector {
835 component_selector: Some(fdiagnostics::ComponentSelector {
836 moniker_segments: Some(vec![
837 fdiagnostics::StringSelector::ExactMatch(format!(
838 "realm_builder:{}",
839 instance.root.child_name()
840 )),
841 fdiagnostics::StringSelector::ExactMatch("test_component".into()),
842 ]),
843 ..Default::default()
844 }),
845 tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
846 fdiagnostics::PropertySelector {
847 node_path: vec![
848 fdiagnostics::StringSelector::ExactMatch("root".into()),
849 fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
850 ],
851 target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
852 },
853 )),
854 tree_names: Some(fdiagnostics::TreeNames::All(fdiagnostics::All {})),
855 ..Default::default()
856 };
857 let int_property_selector = format!("{component_selector}:[...]root:int");
858 let mut reader = ArchiveReader::inspect();
859 reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
860 let response = reader.snapshot().await?;
861 assert_eq!(response.len(), 1);
862 assert_eq!(response[0].moniker.to_string(), moniker);
863 assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
864 int: 3u64,
865 "lazy-node": {
866 a: "test"
867 }
868 });
869 Ok(())
870 }
871
872 #[fuchsia::test]
873 async fn select_all_for_moniker() {
874 let instance = start_component(ComponentOptions { publish_n_trees: 1 })
875 .await
876 .expect("component started");
877 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
878 let results = ArchiveReader::inspect()
879 .select_all_for_component(moniker)
880 .snapshot()
881 .await
882 .expect("snapshotted");
883 assert_eq!(results.len(), 1);
884 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
885 "tree-0": 0u64,
886 int: 3u64,
887 "lazy-node": {
888 a: "test",
889 child: {
890 double: 3.25,
891 },
892 }
893 });
894 }
895
896 #[fuchsia::test]
897 async fn timeout() -> Result<(), anyhow::Error> {
898 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
899
900 let mut reader = ArchiveReader::inspect();
901 reader
902 .add_selector(format!(
903 "realm_builder\\:{}/test_component:root",
904 instance.root.child_name()
905 ))
906 .with_timeout(zx::MonotonicDuration::from_nanos(0));
907 let result = reader.snapshot().await;
908 assert!(result.unwrap().is_empty());
909 Ok(())
910 }
911
912 #[fuchsia::test]
913 async fn component_selector() {
914 let selector = ComponentSelector::new(vec!["a".to_string()]);
915 assert_eq!(selector.moniker_str(), "a");
916 let arguments: Vec<_> = selector.to_selector_arguments().collect();
917 assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
918
919 let selector =
920 ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
921 assert_eq!(selector.moniker_str(), "b/c/a");
922
923 let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
924 let arguments: Vec<_> = selector.to_selector_arguments().collect();
925 assert_eq!(
926 arguments,
927 vec![
928 SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
929 SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
930 ]
931 );
932 }
933
934 #[fuchsia::test]
935 async fn custom_archive() {
936 let proxy = spawn_fake_archive(serde_json::json!({
937 "moniker": "moniker",
938 "version": 1,
939 "data_source": "Inspect",
940 "metadata": {
941 "component_url": "component-url",
942 "timestamp": 0,
943 "filename": "filename",
944 },
945 "payload": {
946 "root": {
947 "x": 1,
948 }
949 }
950 }));
951 let result =
952 ArchiveReader::inspect().with_archive(proxy).snapshot().await.expect("got result");
953 assert_eq!(result.len(), 1);
954 assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
955 }
956
957 #[fuchsia::test]
958 async fn handles_lists_correctly_on_snapshot_raw() {
959 let value = serde_json::json!({
960 "moniker": "moniker",
961 "version": 1,
962 "data_source": "Inspect",
963 "metadata": {
964 "component_url": "component-url",
965 "timestamp": 0,
966 "filename": "filename",
967 },
968 "payload": {
969 "root": {
970 "x": 1,
971 }
972 }
973 });
974 let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
975 let mut reader = ArchiveReader::inspect();
976 reader.with_archive(proxy);
977 let json_result = reader.snapshot_raw::<serde_json::Value>().await.expect("got result");
978 match json_result {
979 serde_json::Value::Array(values) => {
980 assert_eq!(values.len(), 1);
981 assert_eq!(values[0], value);
982 }
983 result => panic!("unexpected result: {result:?}"),
984 }
985 let cbor_result = reader.snapshot_raw::<ciborium::Value>().await.expect("got result");
986 match cbor_result {
987 ciborium::Value::Array(values) => {
988 assert_eq!(values.len(), 1);
989 let json_result =
990 values[0].deserialized::<serde_json::Value>().expect("convert to json");
991 assert_eq!(json_result, value);
992 }
993 result => panic!("unexpected result: {result:?}"),
994 }
995 }
996
997 #[fuchsia::test(logging = false)]
998 async fn snapshot_then_subscribe() {
999 let (_instance, publisher, reader) = init_isolated_logging().await;
1000 let (mut stream, _errors) =
1001 reader.snapshot_then_subscribe().expect("subscribed to logs").split_streams();
1002 publisher.register_logger(None).unwrap();
1003 info!("hello from test");
1004 error!("error from test");
1005 let log = stream.next().await.unwrap();
1006 assert_eq!(log.msg().unwrap(), "hello from test");
1007 let log = stream.next().await.unwrap();
1008 assert_eq!(log.msg().unwrap(), "error from test");
1009 }
1010
1011 #[fuchsia::test]
1012 async fn read_many_trees_with_filtering() {
1013 let instance = start_component(ComponentOptions { publish_n_trees: 2 })
1014 .await
1015 .expect("component started");
1016 let selector = format!(
1017 "realm_builder\\:{}/test_component:[name=tree-0]root",
1018 instance.root.child_name()
1019 );
1020 let results = ArchiveReader::inspect()
1021 .add_selector(selector)
1022 .with_minimum_schema_count(1)
1024 .snapshot()
1025 .await
1026 .expect("snapshotted");
1027 assert_matches!(results.iter().find(|v| v.metadata.name.as_ref() == "tree-1"), None);
1028 let should_have_data =
1029 results.into_iter().find(|v| v.metadata.name.as_ref() == "tree-0").unwrap();
1030 assert_data_tree!(should_have_data.payload.unwrap(), root: contains {
1031 "tree-0": 0u64,
1032 });
1033 }
1034
1035 fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
1036 let (proxy, mut stream) =
1037 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>();
1038 fasync::Task::spawn(async move {
1039 while let Some(request) = stream.try_next().await.expect("stream request") {
1040 match request {
1041 fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
1042 result_stream,
1043 ..
1044 } => {
1045 let data = data_to_send.clone();
1046 fasync::Task::spawn(handle_batch_iterator(data, result_stream)).detach();
1047 }
1048 fdiagnostics::ArchiveAccessorRequest::WaitForReady { responder, .. } => {
1049 let _ = responder.send();
1050 }
1051 fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
1052 unreachable!("Unexpected method call");
1053 }
1054 }
1055 }
1056 })
1057 .detach();
1058 proxy
1059 }
1060
1061 async fn handle_batch_iterator(
1062 data: serde_json::Value,
1063 result_stream: ServerEnd<fdiagnostics::BatchIteratorMarker>,
1064 ) {
1065 let mut called = false;
1066 let mut stream = result_stream.into_stream();
1067 while let Some(req) = stream.try_next().await.expect("stream request") {
1068 match req {
1069 fdiagnostics::BatchIteratorRequest::WaitForReady { responder } => {
1070 let _ = responder.send();
1071 }
1072 fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
1073 if called {
1074 responder.send(Ok(Vec::new())).expect("send response");
1075 continue;
1076 }
1077 called = true;
1078 let content = serde_json::to_string_pretty(&data).expect("json pretty");
1079 let vmo_size = content.len() as u64;
1080 let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
1081 vmo.write(content.as_bytes(), 0).expect("write vmo");
1082 let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
1083 responder
1084 .send(Ok(vec![fdiagnostics::FormattedContent::Json(buffer)]))
1085 .expect("send response");
1086 }
1087 fdiagnostics::BatchIteratorRequest::_UnknownMethod { .. } => {
1088 unreachable!("Unexpected method call");
1089 }
1090 }
1091 }
1092 }
1093
1094 async fn create_realm() -> RealmBuilder {
1095 let builder = RealmBuilder::new().await.expect("create realm builder");
1096 let archivist = builder
1097 .add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
1098 .await
1099 .expect("add child archivist");
1100 builder
1101 .add_route(
1102 Route::new()
1103 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1104 .capability(
1105 Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
1106 .optional(),
1107 )
1108 .capability(Capability::event_stream("stopped"))
1109 .capability(Capability::event_stream("capability_requested"))
1110 .from(Ref::parent())
1111 .to(&archivist),
1112 )
1113 .await
1114 .expect("added routes from parent to archivist");
1115 builder
1116 .add_route(
1117 Route::new()
1118 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1119 .from(&archivist)
1120 .to(Ref::parent()),
1121 )
1122 .await
1123 .expect("routed LogSink from archivist to parent");
1124 builder
1125 .add_route(
1126 Route::new()
1127 .capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
1128 .from(Ref::dictionary(&archivist, "diagnostics-accessors"))
1129 .to(Ref::parent()),
1130 )
1131 .await
1132 .expect("routed ArchiveAccessor from archivist to parent");
1133 builder
1134 }
1135
1136 async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader<Logs>) {
1137 let instance = create_realm().await.build().await.unwrap();
1138 let log_sink_client = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1139 let accessor_proxy = instance.root.connect_to_protocol_at_exposed_dir().unwrap();
1140 let mut reader = ArchiveReader::logs();
1141 reader.with_archive(accessor_proxy);
1142 let options = PublisherOptions::default().use_log_sink(log_sink_client);
1143 let publisher = Publisher::new_async(options).await.unwrap();
1144 (instance, publisher, reader)
1145 }
1146
1147 #[fuchsia::test]
1148 fn retry_config_behavior() {
1149 let config = RetryConfig::MinSchemaCount(1);
1150 let got = 0;
1151
1152 assert!(config.should_retry(got));
1153
1154 let config = RetryConfig::MinSchemaCount(1);
1155 let got = 1;
1156
1157 assert!(!config.should_retry(got));
1158
1159 let config = RetryConfig::MinSchemaCount(1);
1160 let got = 2;
1161
1162 assert!(!config.should_retry(got));
1163
1164 let config = RetryConfig::MinSchemaCount(0);
1165 let got = 1;
1166
1167 assert!(!config.should_retry(got));
1168
1169 let config = RetryConfig::always();
1170 let got = 0;
1171
1172 assert!(config.should_retry(got));
1173
1174 let config = RetryConfig::never();
1175 let got = 0;
1176
1177 assert!(!config.should_retry(got));
1178 }
1179}