1#![deny(missing_docs)]
6
7use async_stream::stream;
11use diagnostics_data::{DiagnosticsData, LogsData};
12#[cfg(fuchsia_api_level_less_than = "HEAD")]
13use diagnostics_message as _;
14#[cfg(fuchsia_api_level_at_least = "HEAD")]
15use diagnostics_message::from_extended_record;
16use fidl_fuchsia_diagnostics::{
17 ArchiveAccessorMarker, ArchiveAccessorProxy, BatchIteratorMarker, BatchIteratorProxy,
18 ClientSelectorConfiguration, Format, FormattedContent, PerformanceConfiguration, ReaderError,
19 Selector, SelectorArgument, StreamMode, StreamParameters,
20};
21use fuchsia_async::{self as fasync, DurationExt, TimeoutExt};
22use fuchsia_component::client;
23use futures::channel::mpsc;
24use futures::prelude::*;
25use futures::sink::SinkExt;
26use futures::stream::FusedStream;
27use pin_project::pin_project;
28use serde::Deserialize;
29use std::future::ready;
30use std::marker::PhantomData;
31use std::pin::Pin;
32use std::sync::Arc;
33use std::task::{Context, Poll};
34use thiserror::Error;
35use zx::{self as zx, MonotonicDuration};
36
37pub type LogsArchiveReader = ArchiveReader<Logs>;
39
40pub type InspectArchiveReader = ArchiveReader<Inspect>;
42
43pub use diagnostics_data::{Data, Inspect, Logs, Severity};
44pub use diagnostics_hierarchy::{hierarchy, DiagnosticsHierarchy, Property};
45
46const RETRY_DELAY_MS: i64 = 300;
47
48#[cfg(fuchsia_api_level_at_least = "HEAD")]
49const FORMAT: Format = Format::Cbor;
50#[cfg(fuchsia_api_level_less_than = "HEAD")]
51const FORMAT: Format = Format::Json;
52
53#[derive(Debug, Error)]
55pub enum Error {
56 #[error("Failed to connect to the archive accessor")]
58 ConnectToArchive(#[source] anyhow::Error),
59
60 #[error("Failed to create the BatchIterator channel ends")]
62 CreateIteratorProxy(#[source] fidl::Error),
63
64 #[error("Failed to stream diagnostics from the accessor")]
66 StreamDiagnostics(#[source] fidl::Error),
67
68 #[error("Failed to call iterator server")]
70 GetNextCall(#[source] fidl::Error),
71
72 #[error("Received error from the GetNext response: {0:?}")]
74 GetNextReaderError(ReaderError),
75
76 #[error("Failed to read json received")]
78 ReadJson(#[source] serde_json::Error),
79
80 #[cfg(fuchsia_api_level_at_least = "HEAD")]
82 #[error("Failed to read cbor received")]
83 ReadCbor(#[source] anyhow::Error),
84
85 #[error("Failed to parse the diagnostics data from the json received")]
87 ParseDiagnosticsData(#[source] serde_json::Error),
88
89 #[error("Failed to read vmo from the response")]
91 ReadVmo(#[source] zx::Status),
92}
93
94pub struct ComponentSelector {
96 moniker: Vec<String>,
97 tree_selectors: Vec<String>,
98}
99
100impl ComponentSelector {
101 pub fn new(moniker: Vec<String>) -> Self {
106 Self { moniker, tree_selectors: Vec::new() }
107 }
108
109 pub fn with_tree_selector(mut self, tree_selector: impl Into<String>) -> Self {
111 self.tree_selectors.push(tree_selector.into());
112 self
113 }
114
115 fn moniker_str(&self) -> String {
116 self.moniker.join("/")
117 }
118}
119
120pub trait ToSelectorArguments {
122 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>>;
124}
125
126pub trait ToComponentSelectorArguments {
128 fn to_component_selector_arguments(self) -> ComponentSelector;
130}
131
132impl ToComponentSelectorArguments for &str {
133 fn to_component_selector_arguments(self) -> ComponentSelector {
134 if self.contains("\\:") {
135 ComponentSelector::new(self.split("/").map(|value| value.to_string()).collect())
137 } else {
138 ComponentSelector::new(
140 selectors::sanitize_moniker_for_selectors(self)
141 .split("/")
142 .map(|value| value.to_string())
143 .collect(),
144 )
145 .with_tree_selector("[...]root")
146 }
147 }
148}
149
150impl ToComponentSelectorArguments for String {
151 fn to_component_selector_arguments(self) -> ComponentSelector {
152 self.as_str().to_component_selector_arguments()
153 }
154}
155
156impl ToComponentSelectorArguments for ComponentSelector {
157 fn to_component_selector_arguments(self) -> ComponentSelector {
158 self
159 }
160}
161
162impl ToSelectorArguments for String {
163 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
164 Box::new([SelectorArgument::RawSelector(self)].into_iter())
165 }
166}
167
168impl ToSelectorArguments for &str {
169 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
170 Box::new([SelectorArgument::RawSelector(self.to_string())].into_iter())
171 }
172}
173
174impl ToSelectorArguments for ComponentSelector {
175 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
176 let moniker = self.moniker_str();
177 if self.tree_selectors.is_empty() {
179 Box::new([SelectorArgument::RawSelector(format!("{}:root", moniker))].into_iter())
180 } else {
181 Box::new(
182 self.tree_selectors
183 .into_iter()
184 .map(move |s| SelectorArgument::RawSelector(format!("{moniker}:{s}"))),
185 )
186 }
187 }
188}
189
190impl ToSelectorArguments for Selector {
191 fn to_selector_arguments(self) -> Box<dyn Iterator<Item = SelectorArgument>> {
192 Box::new([SelectorArgument::StructuredSelector(self)].into_iter())
193 }
194}
195
196pub trait SerializableValue: private::Sealed {
198 const FORMAT_OF_VALUE: Format;
200}
201
202pub trait CheckResponse: private::Sealed {
204 fn has_payload(&self) -> bool;
206}
207
208mod private {
212 pub trait Sealed {}
213}
214impl private::Sealed for serde_json::Value {}
215impl private::Sealed for ciborium::Value {}
216impl<D: DiagnosticsData> private::Sealed for Data<D> {}
217
218impl<D: DiagnosticsData> CheckResponse for Data<D> {
219 fn has_payload(&self) -> bool {
220 self.payload.is_some()
221 }
222}
223
224impl SerializableValue for serde_json::Value {
225 const FORMAT_OF_VALUE: Format = Format::Json;
226}
227
228impl CheckResponse for serde_json::Value {
229 fn has_payload(&self) -> bool {
230 match self {
231 serde_json::Value::Object(obj) => {
232 obj.get("payload").map(|p| !matches!(p, serde_json::Value::Null)).is_some()
233 }
234 _ => false,
235 }
236 }
237}
238
239#[cfg(fuchsia_api_level_at_least = "HEAD")]
240impl SerializableValue for ciborium::Value {
241 const FORMAT_OF_VALUE: Format = Format::Cbor;
242}
243
244impl CheckResponse for ciborium::Value {
245 fn has_payload(&self) -> bool {
246 match self {
247 ciborium::Value::Map(m) => {
248 let payload_key = ciborium::Value::Text("payload".into());
249 m.iter().any(|(key, _)| *key == payload_key)
250 }
251 _ => false,
252 }
253 }
254}
255
256#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
258pub enum RetryConfig {
259 MinSchemaCount(usize),
263}
264
265impl RetryConfig {
266 pub fn always() -> Self {
268 Self::MinSchemaCount(1)
269 }
270
271 pub fn never() -> Self {
273 Self::MinSchemaCount(0)
274 }
275
276 fn should_retry(&self, result_count: usize) -> bool {
278 match self {
279 Self::MinSchemaCount(min) => *min > result_count,
280 }
281 }
282}
283
284pub trait DiagnosticsDataType: private::Sealed {}
286
287impl private::Sealed for Logs {}
288
289impl private::Sealed for Inspect {}
290
291impl DiagnosticsDataType for Logs {}
292
293impl DiagnosticsDataType for Inspect {}
294
295pub struct ArchiveReader<T> {
298 archive: Option<ArchiveAccessorProxy>,
299 selectors: Vec<SelectorArgument>,
300 retry_config: RetryConfig,
301 timeout: Option<MonotonicDuration>,
302 batch_retrieval_timeout_seconds: Option<i64>,
303 max_aggregated_content_size_bytes: Option<u64>,
304 _phantom: PhantomData<T>,
305}
306
307impl<T: DiagnosticsDataType> ArchiveReader<T> {
308 pub fn with_archive(&mut self, archive: ArchiveAccessorProxy) -> &mut Self {
312 self.archive = Some(archive);
313 self
314 }
315
316 pub fn with_minimum_schema_count(&mut self, minimum_schema_count: usize) -> &mut Self {
319 self.retry_config = RetryConfig::MinSchemaCount(minimum_schema_count);
320 self
321 }
322
323 pub fn retry(&mut self, config: RetryConfig) -> &mut Self {
325 self.retry_config = config;
326 self
327 }
328
329 pub fn with_timeout(&mut self, duration: MonotonicDuration) -> &mut Self {
332 self.timeout = Some(duration);
333 self
334 }
335
336 pub fn select_all_for_component(
340 &mut self,
341 component: impl ToComponentSelectorArguments,
342 ) -> &mut Self {
343 self.selectors.extend(component.to_component_selector_arguments().to_selector_arguments());
344 self
345 }
346
347 async fn snapshot_shared<D>(&self) -> Result<Vec<Data<D>>, Error>
349 where
350 D: DiagnosticsData + 'static,
351 {
352 let data_future = self.snapshot_inner::<D, Data<D>>(FORMAT);
353 let data = match self.timeout {
354 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
355 None => data_future.await?,
356 };
357 Ok(data)
358 }
359
360 async fn snapshot_inner<D, Y>(&self, format: Format) -> Result<Vec<Y>, Error>
361 where
362 D: DiagnosticsData,
363 Y: for<'a> Deserialize<'a> + CheckResponse + Send + 'static,
364 {
365 loop {
366 let iterator = self.batch_iterator::<D>(StreamMode::Snapshot, format)?;
367 let result = drain_batch_iterator::<Y>(Arc::new(iterator))
368 .filter_map(|value| ready(value.ok()))
369 .collect::<Vec<_>>()
370 .await;
371
372 if self.retry_config.should_retry(result.len()) {
373 fasync::Timer::new(fasync::MonotonicInstant::after(
374 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
375 ))
376 .await;
377 } else {
378 return Ok(result);
379 }
380 }
381 }
382
383 fn batch_iterator<D>(
384 &self,
385 mode: StreamMode,
386 format: Format,
387 ) -> Result<BatchIteratorProxy, Error>
388 where
389 D: DiagnosticsData,
390 {
391 let archive = match &self.archive {
392 Some(archive) => archive.clone(),
393 None => client::connect_to_protocol::<ArchiveAccessorMarker>()
394 .map_err(Error::ConnectToArchive)?,
395 };
396
397 let (iterator, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
398
399 let stream_parameters = StreamParameters {
400 stream_mode: Some(mode),
401 data_type: Some(D::DATA_TYPE),
402 format: Some(format),
403 client_selector_configuration: if self.selectors.is_empty() {
404 Some(ClientSelectorConfiguration::SelectAll(true))
405 } else {
406 Some(ClientSelectorConfiguration::Selectors(self.selectors.to_vec()))
407 },
408 performance_configuration: Some(PerformanceConfiguration {
409 max_aggregate_content_size_bytes: self.max_aggregated_content_size_bytes,
410 batch_retrieval_timeout_seconds: self.batch_retrieval_timeout_seconds,
411 ..Default::default()
412 }),
413 ..Default::default()
414 };
415
416 archive
417 .stream_diagnostics(&stream_parameters, server_end)
418 .map_err(Error::StreamDiagnostics)?;
419 Ok(iterator)
420 }
421}
422
423impl ArchiveReader<Logs> {
424 pub fn logs() -> Self {
426 ArchiveReader::<Logs> {
427 timeout: None,
428 selectors: vec![],
429 retry_config: RetryConfig::always(),
430 archive: None,
431 batch_retrieval_timeout_seconds: None,
432 max_aggregated_content_size_bytes: None,
433 _phantom: PhantomData,
434 }
435 }
436
437 #[inline]
438 fn format(&self) -> Format {
439 #[cfg(fuchsia_api_level_at_least = "HEAD")]
440 let ret = Format::Fxt;
441 #[cfg(fuchsia_api_level_less_than = "HEAD")]
442 let ret = Format::Json;
443 ret
444 }
445
446 pub async fn snapshot(&self) -> Result<Vec<Data<Logs>>, Error> {
448 loop {
449 let iterator = self.batch_iterator::<Logs>(StreamMode::Snapshot, self.format())?;
450 let result = drain_batch_iterator_for_logs(Arc::new(iterator))
451 .filter_map(|value| ready(value.ok()))
452 .collect::<Vec<_>>()
453 .await;
454
455 if self.retry_config.should_retry(result.len()) {
456 fasync::Timer::new(fasync::MonotonicInstant::after(
457 zx::MonotonicDuration::from_millis(RETRY_DELAY_MS),
458 ))
459 .await;
460 } else {
461 return Ok(result);
462 }
463 }
464 }
465
466 pub fn snapshot_then_subscribe(&self) -> Result<Subscription, Error> {
469 let iterator =
470 self.batch_iterator::<Logs>(StreamMode::SnapshotThenSubscribe, self.format())?;
471 Ok(Subscription::new(iterator))
472 }
473}
474
475impl ArchiveReader<Inspect> {
476 pub fn inspect() -> Self {
478 ArchiveReader::<Inspect> {
479 timeout: None,
480 selectors: vec![],
481 retry_config: RetryConfig::always(),
482 archive: None,
483 batch_retrieval_timeout_seconds: None,
484 max_aggregated_content_size_bytes: None,
485 _phantom: PhantomData,
486 }
487 }
488
489 pub fn with_batch_retrieval_timeout_seconds(&mut self, timeout: i64) -> &mut Self {
492 self.batch_retrieval_timeout_seconds = Some(timeout);
493 self
494 }
495
496 pub fn with_aggregated_result_bytes_limit(&mut self, limit_bytes: u64) -> &mut Self {
498 self.max_aggregated_content_size_bytes = Some(limit_bytes);
499 self
500 }
501
502 pub async fn snapshot_raw<T>(&self) -> Result<T, Error>
507 where
508 T: for<'a> Deserialize<'a>
509 + SerializableValue
510 + From<Vec<T>>
511 + CheckResponse
512 + 'static
513 + Send,
514 {
515 let data_future = self.snapshot_inner::<Inspect, T>(T::FORMAT_OF_VALUE);
516 let data = match self.timeout {
517 Some(timeout) => data_future.on_timeout(timeout.after_now(), || Ok(Vec::new())).await?,
518 None => data_future.await?,
519 };
520 Ok(T::from(data))
521 }
522
523 pub fn add_selectors<T, S>(&mut self, selectors: T) -> &mut Self
526 where
527 T: Iterator<Item = S>,
528 S: ToSelectorArguments,
529 {
530 for selector in selectors {
531 self.add_selector(selector);
532 }
533 self
534 }
535
536 pub fn add_selector(&mut self, selector: impl ToSelectorArguments) -> &mut Self {
538 self.selectors.extend(selector.to_selector_arguments());
539 self
540 }
541
542 pub async fn snapshot(&self) -> Result<Vec<Data<Inspect>>, Error> {
544 self.snapshot_shared::<Inspect>().await
545 }
546}
547
548#[derive(Debug, Deserialize)]
549#[serde(untagged)]
550enum OneOrMany<T> {
551 Many(Vec<T>),
552 One(T),
553}
554
555fn stream_batch<T>(
556 iterator: Arc<BatchIteratorProxy>,
557 process_content: impl Fn(FormattedContent) -> Result<OneOrMany<T>, Error>,
558) -> impl Stream<Item = Result<T, Error>>
559where
560 T: for<'a> Deserialize<'a> + Send + 'static,
561{
562 stream! {
563 loop {
564 let next_batch = iterator
565 .get_next()
566 .await
567 .map_err(Error::GetNextCall)?
568 .map_err(Error::GetNextReaderError)?;
569 if next_batch.is_empty() {
570 return;
572 }
573 for formatted_content in next_batch {
574 let output = process_content(formatted_content)?;
575 match output {
576 OneOrMany::One(data) => yield Ok(data),
577 OneOrMany::Many(datas) => {
578 for data in datas {
579 yield Ok(data);
580 }
581 }
582 }
583 }
584 }
585 }
586}
587
588fn drain_batch_iterator<T>(
589 iterator: Arc<BatchIteratorProxy>,
590) -> impl Stream<Item = Result<T, Error>>
591where
592 T: for<'a> Deserialize<'a> + Send + 'static,
593{
594 stream_batch(iterator, |formatted_content| match formatted_content {
595 FormattedContent::Json(data) => {
596 let mut buf = vec![0; data.size as usize];
597 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
598 serde_json::from_slice(&buf).map_err(Error::ReadJson)
599 }
600 #[cfg(fuchsia_api_level_at_least = "HEAD")]
601 FormattedContent::Cbor(vmo) => {
602 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
603 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
604 Ok(ciborium::from_reader(buf.as_slice()).map_err(|err| Error::ReadCbor(err.into()))?)
605 }
606 #[cfg(fuchsia_api_level_at_least = "PLATFORM")]
607 FormattedContent::Text(_) => unreachable!("We never expect Text"),
608 #[cfg(fuchsia_api_level_at_least = "HEAD")]
609 FormattedContent::Fxt(_) => unreachable!("We never expect FXT for Inspect"),
610 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
611 unreachable!("Received unrecognized FIDL message")
612 }
613 })
614}
615
616fn drain_batch_iterator_for_logs(
617 iterator: Arc<BatchIteratorProxy>,
618) -> impl Stream<Item = Result<LogsData, Error>> {
619 stream_batch::<LogsData>(iterator, |formatted_content| match formatted_content {
620 FormattedContent::Json(data) => {
621 let mut buf = vec![0; data.size as usize];
622 data.vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
623 serde_json::from_slice(&buf).map_err(Error::ReadJson)
624 }
625 #[cfg(fuchsia_api_level_at_least = "HEAD")]
626 FormattedContent::Fxt(vmo) => {
627 let mut buf = vec![0; vmo.get_content_size().expect("Always returns Ok") as usize];
628 vmo.read(&mut buf, 0).map_err(Error::ReadVmo)?;
629 let mut current_slice = buf.as_ref();
630 let mut ret: Option<OneOrMany<LogsData>> = None;
631 loop {
632 let (data, remaining) = from_extended_record(current_slice).unwrap();
633 ret = Some(match ret.take() {
634 Some(OneOrMany::One(one)) => OneOrMany::Many(vec![one, data]),
635 Some(OneOrMany::Many(mut many)) => {
636 many.push(data);
637 OneOrMany::Many(many)
638 }
639 None => OneOrMany::One(data),
640 });
641 if remaining.is_empty() {
642 break;
643 }
644 current_slice = remaining;
645 }
646 Ok(ret.unwrap())
647 }
648 #[cfg(fuchsia_api_level_at_least = "PLATFORM")]
649 FormattedContent::Text(_) => unreachable!("We never expect Text"),
650 #[cfg(fuchsia_api_level_at_least = "HEAD")]
651 FormattedContent::Cbor(_) => unreachable!("We never expect CBOR"),
652 FormattedContent::__SourceBreaking { unknown_ordinal: _ } => {
653 unreachable!("Received unrecognized FIDL message")
654 }
655 })
656}
657
658#[pin_project]
660pub struct Subscription {
661 #[pin]
662 recv: Pin<Box<dyn FusedStream<Item = Result<LogsData, Error>> + Send>>,
663 iterator: Arc<BatchIteratorProxy>,
664}
665
666const DATA_CHANNEL_SIZE: usize = 32;
667const ERROR_CHANNEL_SIZE: usize = 2;
668
669impl Subscription {
670 pub fn new(iterator: BatchIteratorProxy) -> Self {
673 let iterator = Arc::new(iterator);
674 Subscription {
675 recv: Box::pin(drain_batch_iterator_for_logs(iterator.clone()).fuse()),
676 iterator,
677 }
678 }
679
680 pub async fn wait_for_ready(&self) {
682 self.iterator.wait_for_ready().await.expect("doesn't disconnect");
683 }
684
685 pub fn split_streams(mut self) -> (SubscriptionResultsStream<LogsData>, mpsc::Receiver<Error>) {
687 let (mut errors_sender, errors) = mpsc::channel(ERROR_CHANNEL_SIZE);
688 let (mut results_sender, recv) = mpsc::channel(DATA_CHANNEL_SIZE);
689 let _drain_task = fasync::Task::spawn(async move {
690 while let Some(result) = self.next().await {
691 match result {
692 Ok(value) => results_sender.send(value).await.ok(),
693 Err(e) => errors_sender.send(e).await.ok(),
694 };
695 }
696 });
697 (SubscriptionResultsStream { recv, _drain_task }, errors)
698 }
699}
700
701impl Stream for Subscription {
702 type Item = Result<LogsData, Error>;
703
704 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
705 let this = self.project();
706 this.recv.poll_next(cx)
707 }
708}
709
710impl FusedStream for Subscription {
711 fn is_terminated(&self) -> bool {
712 self.recv.is_terminated()
713 }
714}
715
716#[pin_project]
718pub struct SubscriptionResultsStream<T> {
719 #[pin]
720 recv: mpsc::Receiver<T>,
721 _drain_task: fasync::Task<()>,
722}
723
724impl<T> Stream for SubscriptionResultsStream<T>
725where
726 T: for<'a> Deserialize<'a>,
727{
728 type Item = T;
729
730 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
731 let this = self.project();
732 this.recv.poll_next(cx)
733 }
734}
735
736impl<T> FusedStream for SubscriptionResultsStream<T>
737where
738 T: for<'a> Deserialize<'a>,
739{
740 fn is_terminated(&self) -> bool {
741 self.recv.is_terminated()
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748 use assert_matches::assert_matches;
749 use diagnostics_assertions::assert_data_tree;
750 use diagnostics_log::{Publisher, PublisherOptions};
751 use fidl::endpoints::ServerEnd;
752 use fuchsia_component_test::{
753 Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
754 };
755 use futures::TryStreamExt;
756 use log::{error, info};
757 use {fidl_fuchsia_diagnostics as fdiagnostics, fidl_fuchsia_logger as flogger};
758
759 const TEST_COMPONENT_URL: &str = "#meta/inspect_test_component.cm";
760
761 struct ComponentOptions {
762 publish_n_trees: u64,
763 }
764
765 async fn start_component(opts: ComponentOptions) -> Result<RealmInstance, anyhow::Error> {
766 let builder = RealmBuilder::new().await?;
767 let test_component = builder
768 .add_child("test_component", TEST_COMPONENT_URL, ChildOptions::new().eager())
769 .await?;
770 builder
771 .add_route(
772 Route::new()
773 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
774 .from(Ref::parent())
775 .to(&test_component),
776 )
777 .await?;
778 builder.init_mutable_config_to_empty(&test_component).await.unwrap();
779 builder
780 .set_config_value(&test_component, "publish_n_trees", opts.publish_n_trees.into())
781 .await
782 .unwrap();
783 let instance = builder.build().await?;
784 Ok(instance)
785 }
786
787 #[fuchsia::test]
790 async fn inspect_data_for_component() -> Result<(), anyhow::Error> {
791 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
792 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
793 let component_selector = selectors::sanitize_moniker_for_selectors(&moniker);
794 let results = ArchiveReader::inspect()
795 .add_selector(format!("{component_selector}:[...]root"))
796 .snapshot()
797 .await?;
798 assert_eq!(results.len(), 1);
799 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
800 "tree-0": 0u64,
801 int: 3u64,
802 "lazy-node": {
803 a: "test",
804 child: {
805 double: 3.25,
806 },
807 }
808 });
809 let lazy_property_selector = Selector {
811 component_selector: Some(fdiagnostics::ComponentSelector {
812 moniker_segments: Some(vec![
813 fdiagnostics::StringSelector::ExactMatch(format!(
814 "realm_builder:{}",
815 instance.root.child_name()
816 )),
817 fdiagnostics::StringSelector::ExactMatch("test_component".into()),
818 ]),
819 ..Default::default()
820 }),
821 tree_selector: Some(fdiagnostics::TreeSelector::PropertySelector(
822 fdiagnostics::PropertySelector {
823 node_path: vec![
824 fdiagnostics::StringSelector::ExactMatch("root".into()),
825 fdiagnostics::StringSelector::ExactMatch("lazy-node".into()),
826 ],
827 target_properties: fdiagnostics::StringSelector::ExactMatch("a".into()),
828 },
829 )),
830 tree_names: Some(fdiagnostics::TreeNames::All(fdiagnostics::All {})),
831 ..Default::default()
832 };
833 let int_property_selector = format!("{component_selector}:[...]root:int");
834 let mut reader = ArchiveReader::inspect();
835 reader.add_selector(int_property_selector).add_selector(lazy_property_selector);
836 let response = reader.snapshot().await?;
837 assert_eq!(response.len(), 1);
838 assert_eq!(response[0].moniker.to_string(), moniker);
839 assert_data_tree!(response[0].payload.as_ref().unwrap(), root: {
840 int: 3u64,
841 "lazy-node": {
842 a: "test"
843 }
844 });
845 Ok(())
846 }
847
848 #[fuchsia::test]
849 async fn select_all_for_moniker() {
850 let instance = start_component(ComponentOptions { publish_n_trees: 1 })
851 .await
852 .expect("component started");
853 let moniker = format!("realm_builder:{}/test_component", instance.root.child_name());
854 let results = ArchiveReader::inspect()
855 .select_all_for_component(moniker)
856 .snapshot()
857 .await
858 .expect("snapshotted");
859 assert_eq!(results.len(), 1);
860 assert_data_tree!(results[0].payload.as_ref().unwrap(), root: {
861 "tree-0": 0u64,
862 int: 3u64,
863 "lazy-node": {
864 a: "test",
865 child: {
866 double: 3.25,
867 },
868 }
869 });
870 }
871
872 #[fuchsia::test]
873 async fn timeout() -> Result<(), anyhow::Error> {
874 let instance = start_component(ComponentOptions { publish_n_trees: 1 }).await?;
875
876 let mut reader = ArchiveReader::inspect();
877 reader
878 .add_selector(format!(
879 "realm_builder\\:{}/test_component:root",
880 instance.root.child_name()
881 ))
882 .with_timeout(zx::MonotonicDuration::from_nanos(0));
883 let result = reader.snapshot().await;
884 assert!(result.unwrap().is_empty());
885 Ok(())
886 }
887
888 #[fuchsia::test]
889 async fn component_selector() {
890 let selector = ComponentSelector::new(vec!["a".to_string()]);
891 assert_eq!(selector.moniker_str(), "a");
892 let arguments: Vec<_> = selector.to_selector_arguments().collect();
893 assert_eq!(arguments, vec![SelectorArgument::RawSelector("a:root".to_string())]);
894
895 let selector =
896 ComponentSelector::new(vec!["b".to_string(), "c".to_string(), "a".to_string()]);
897 assert_eq!(selector.moniker_str(), "b/c/a");
898
899 let selector = selector.with_tree_selector("root/b/c:d").with_tree_selector("root/e:f");
900 let arguments: Vec<_> = selector.to_selector_arguments().collect();
901 assert_eq!(
902 arguments,
903 vec![
904 SelectorArgument::RawSelector("b/c/a:root/b/c:d".into()),
905 SelectorArgument::RawSelector("b/c/a:root/e:f".into()),
906 ]
907 );
908 }
909
910 #[fuchsia::test]
911 async fn custom_archive() {
912 let proxy = spawn_fake_archive(serde_json::json!({
913 "moniker": "moniker",
914 "version": 1,
915 "data_source": "Inspect",
916 "metadata": {
917 "component_url": "component-url",
918 "timestamp": 0,
919 "filename": "filename",
920 },
921 "payload": {
922 "root": {
923 "x": 1,
924 }
925 }
926 }));
927 let result =
928 ArchiveReader::inspect().with_archive(proxy).snapshot().await.expect("got result");
929 assert_eq!(result.len(), 1);
930 assert_data_tree!(result[0].payload.as_ref().unwrap(), root: { x: 1u64 });
931 }
932
933 #[fuchsia::test]
934 async fn handles_lists_correctly_on_snapshot_raw() {
935 let value = serde_json::json!({
936 "moniker": "moniker",
937 "version": 1,
938 "data_source": "Inspect",
939 "metadata": {
940 "component_url": "component-url",
941 "timestamp": 0,
942 "filename": "filename",
943 },
944 "payload": {
945 "root": {
946 "x": 1,
947 }
948 }
949 });
950 let proxy = spawn_fake_archive(serde_json::json!([value.clone()]));
951 let mut reader = ArchiveReader::inspect();
952 reader.with_archive(proxy);
953 let json_result = reader.snapshot_raw::<serde_json::Value>().await.expect("got result");
954 match json_result {
955 serde_json::Value::Array(values) => {
956 assert_eq!(values.len(), 1);
957 assert_eq!(values[0], value);
958 }
959 result => panic!("unexpected result: {:?}", result),
960 }
961 let cbor_result = reader.snapshot_raw::<ciborium::Value>().await.expect("got result");
962 match cbor_result {
963 ciborium::Value::Array(values) => {
964 assert_eq!(values.len(), 1);
965 let json_result =
966 values[0].deserialized::<serde_json::Value>().expect("convert to json");
967 assert_eq!(json_result, value);
968 }
969 result => panic!("unexpected result: {:?}", result),
970 }
971 }
972
973 #[fuchsia::test(logging = false)]
974 async fn snapshot_then_subscribe() {
975 let (_instance, publisher, reader) = init_isolated_logging().await;
976 let (mut stream, _errors) =
977 reader.snapshot_then_subscribe().expect("subscribed to logs").split_streams();
978 log::set_boxed_logger(Box::new(publisher)).unwrap();
979 info!("hello from test");
980 error!("error from test");
981 let log = stream.next().await.unwrap();
982 assert_eq!(log.msg().unwrap(), "hello from test");
983 let log = stream.next().await.unwrap();
984 assert_eq!(log.msg().unwrap(), "error from test");
985 }
986
987 #[fuchsia::test]
988 async fn read_many_trees_with_filtering() {
989 let instance = start_component(ComponentOptions { publish_n_trees: 2 })
990 .await
991 .expect("component started");
992 let selector = format!(
993 "realm_builder\\:{}/test_component:[name=tree-0]root",
994 instance.root.child_name()
995 );
996 let results = ArchiveReader::inspect()
997 .add_selector(selector)
998 .with_minimum_schema_count(1)
1000 .snapshot()
1001 .await
1002 .expect("snapshotted");
1003 assert_matches!(results.iter().find(|v| v.metadata.name.as_ref() == "tree-1"), None);
1004 let should_have_data =
1005 results.into_iter().find(|v| v.metadata.name.as_ref() == "tree-0").unwrap();
1006 assert_data_tree!(should_have_data.payload.unwrap(), root: contains {
1007 "tree-0": 0u64,
1008 });
1009 }
1010
1011 fn spawn_fake_archive(data_to_send: serde_json::Value) -> fdiagnostics::ArchiveAccessorProxy {
1012 let (proxy, mut stream) =
1013 fidl::endpoints::create_proxy_and_stream::<fdiagnostics::ArchiveAccessorMarker>();
1014 fasync::Task::spawn(async move {
1015 while let Some(request) = stream.try_next().await.expect("stream request") {
1016 match request {
1017 fdiagnostics::ArchiveAccessorRequest::StreamDiagnostics {
1018 result_stream,
1019 ..
1020 } => {
1021 let data = data_to_send.clone();
1022 fasync::Task::spawn(handle_batch_iterator(data, result_stream)).detach();
1023 }
1024 fdiagnostics::ArchiveAccessorRequest::WaitForReady { responder, .. } => {
1025 let _ = responder.send();
1026 }
1027 fdiagnostics::ArchiveAccessorRequest::_UnknownMethod { .. } => {
1028 unreachable!("Unexpected method call");
1029 }
1030 }
1031 }
1032 })
1033 .detach();
1034 proxy
1035 }
1036
1037 async fn handle_batch_iterator(
1038 data: serde_json::Value,
1039 result_stream: ServerEnd<fdiagnostics::BatchIteratorMarker>,
1040 ) {
1041 let mut called = false;
1042 let mut stream = result_stream.into_stream();
1043 while let Some(req) = stream.try_next().await.expect("stream request") {
1044 match req {
1045 fdiagnostics::BatchIteratorRequest::WaitForReady { responder } => {
1046 let _ = responder.send();
1047 }
1048 fdiagnostics::BatchIteratorRequest::GetNext { responder } => {
1049 if called {
1050 responder.send(Ok(Vec::new())).expect("send response");
1051 continue;
1052 }
1053 called = true;
1054 let content = serde_json::to_string_pretty(&data).expect("json pretty");
1055 let vmo_size = content.len() as u64;
1056 let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
1057 vmo.write(content.as_bytes(), 0).expect("write vmo");
1058 let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
1059 responder
1060 .send(Ok(vec![fdiagnostics::FormattedContent::Json(buffer)]))
1061 .expect("send response");
1062 }
1063 fdiagnostics::BatchIteratorRequest::_UnknownMethod { .. } => {
1064 unreachable!("Unexpected method call");
1065 }
1066 }
1067 }
1068 }
1069
1070 async fn create_realm() -> RealmBuilder {
1071 let builder = RealmBuilder::new().await.expect("create realm builder");
1072 let archivist = builder
1073 .add_child("archivist", "#meta/archivist-for-embedding.cm", ChildOptions::new().eager())
1074 .await
1075 .expect("add child archivist");
1076 builder
1077 .add_route(
1078 Route::new()
1079 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1080 .capability(
1081 Capability::protocol_by_name("fuchsia.tracing.provider.Registry")
1082 .optional(),
1083 )
1084 .capability(Capability::event_stream("stopped"))
1085 .capability(Capability::event_stream("capability_requested"))
1086 .from(Ref::parent())
1087 .to(&archivist),
1088 )
1089 .await
1090 .expect("added routes from parent to archivist");
1091 builder
1092 .add_route(
1093 Route::new()
1094 .capability(Capability::protocol_by_name("fuchsia.logger.LogSink"))
1095 .from(&archivist)
1096 .to(Ref::parent()),
1097 )
1098 .await
1099 .expect("routed LogSink from archivist to parent");
1100 builder
1101 .add_route(
1102 Route::new()
1103 .capability(Capability::protocol_by_name("fuchsia.diagnostics.ArchiveAccessor"))
1104 .from_dictionary("diagnostics-accessors")
1105 .from(&archivist)
1106 .to(Ref::parent()),
1107 )
1108 .await
1109 .expect("routed ArchiveAccessor from archivist to parent");
1110 builder
1111 }
1112
1113 async fn init_isolated_logging() -> (RealmInstance, Publisher, ArchiveReader<Logs>) {
1114 let instance = create_realm().await.build().await.unwrap();
1115 let log_sink_proxy =
1116 instance.root.connect_to_protocol_at_exposed_dir::<flogger::LogSinkMarker>().unwrap();
1117 let accessor_proxy = instance
1118 .root
1119 .connect_to_protocol_at_exposed_dir::<fdiagnostics::ArchiveAccessorMarker>()
1120 .unwrap();
1121 let mut reader = ArchiveReader::logs();
1122 reader.with_archive(accessor_proxy);
1123 let options = PublisherOptions::default()
1124 .wait_for_initial_interest(false)
1125 .use_log_sink(log_sink_proxy);
1126 let publisher = Publisher::new(options).unwrap();
1127 (instance, publisher, reader)
1128 }
1129
1130 #[fuchsia::test]
1131 fn retry_config_behavior() {
1132 let config = RetryConfig::MinSchemaCount(1);
1133 let got = 0;
1134
1135 assert!(config.should_retry(got));
1136
1137 let config = RetryConfig::MinSchemaCount(1);
1138 let got = 1;
1139
1140 assert!(!config.should_retry(got));
1141
1142 let config = RetryConfig::MinSchemaCount(1);
1143 let got = 2;
1144
1145 assert!(!config.should_retry(got));
1146
1147 let config = RetryConfig::MinSchemaCount(0);
1148 let got = 1;
1149
1150 assert!(!config.should_retry(got));
1151
1152 let config = RetryConfig::always();
1153 let got = 0;
1154
1155 assert!(config.should_retry(got));
1156
1157 let config = RetryConfig::never();
1158 let got = 0;
1159
1160 assert!(!config.should_retry(got));
1161 }
1162}