test_manager_lib/
diagnostics.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Error;
6use fidl::endpoints::ServerEnd;
7use fidl_fuchsia_diagnostics::{
8    BatchIteratorMarker, ClientSelectorConfiguration, DataType, Format, StreamMode,
9    StreamParameters,
10};
11use log::warn;
12use {
13    fidl_fuchsia_diagnostics as fdiagnostics, fidl_fuchsia_diagnostics_host as fhost,
14    fidl_fuchsia_test_manager as ftest_manager, fuchsia_async as fasync,
15};
16
17pub(crate) struct ServeSyslogOutcome {
18    /// Task serving any protocols needed to proxy logs. For example, this is populated
19    /// when logs are served over overnet using DiagnosticsBridge.
20    pub logs_iterator_task: Option<fasync::Task<Result<(), Error>>>,
21}
22
23/// Connect to archivist and starting serving syslog.
24/// TODO(https://fxbug.dev/42083125): Only take one ArchiveAccessorProxy, not both.
25pub(crate) fn serve_syslog(
26    accessor: fdiagnostics::ArchiveAccessorProxy,
27    host_accessor: fhost::ArchiveAccessorProxy,
28    log_iterator: ftest_manager::LogsIterator,
29) -> Result<ServeSyslogOutcome, anyhow::Error> {
30    let logs_iterator_task = match log_iterator {
31        ftest_manager::LogsIterator::Stream(iterator) => {
32            let iterator_fut = run_iterator_socket(&host_accessor, iterator);
33            Some(fasync::Task::spawn(async move {
34                iterator_fut.await?;
35                Ok(())
36            }))
37        }
38        ftest_manager::LogsIterator::Batch(iterator) => {
39            IsolatedLogsProvider::new(&accessor).start_streaming_logs(iterator)?;
40            None
41        }
42        _ => None,
43    };
44    Ok(ServeSyslogOutcome { logs_iterator_task })
45}
46
47fn run_iterator_socket(
48    host_accessor: &fhost::ArchiveAccessorProxy,
49    socket: zx::Socket,
50) -> fidl::client::QueryResponseFut<()> {
51    host_accessor.stream_diagnostics(
52        &StreamParameters {
53            stream_mode: Some(StreamMode::SnapshotThenSubscribe),
54            data_type: Some(DataType::Logs),
55            format: Some(Format::Json),
56            client_selector_configuration: Some(ClientSelectorConfiguration::SelectAll(true)),
57            batch_retrieval_timeout_seconds: None,
58            ..Default::default()
59        },
60        socket,
61    )
62}
63
64/// Type alias for &'a ArchiveAccessorProxy
65struct IsolatedLogsProvider<'a> {
66    accessor: &'a fdiagnostics::ArchiveAccessorProxy,
67}
68
69impl<'a> IsolatedLogsProvider<'a> {
70    fn new(accessor: &'a fdiagnostics::ArchiveAccessorProxy) -> Self {
71        Self { accessor }
72    }
73
74    fn start_streaming_logs(
75        &self,
76        iterator: ServerEnd<BatchIteratorMarker>,
77    ) -> Result<(), anyhow::Error> {
78        self.start_streaming(iterator, StreamMode::SnapshotThenSubscribe, DataType::Logs, None)
79    }
80
81    fn start_streaming(
82        &self,
83        iterator: ServerEnd<BatchIteratorMarker>,
84        stream_mode: StreamMode,
85        data_type: DataType,
86        batch_timeout: Option<i64>,
87    ) -> Result<(), anyhow::Error> {
88        let stream_parameters = StreamParameters {
89            stream_mode: Some(stream_mode),
90            data_type: Some(data_type),
91            format: Some(Format::Json),
92            client_selector_configuration: Some(ClientSelectorConfiguration::SelectAll(true)),
93            batch_retrieval_timeout_seconds: batch_timeout,
94            ..Default::default()
95        };
96        self.accessor.stream_diagnostics(&stream_parameters, iterator).map_err(|err| {
97            warn!(err:%, data_type:?; "Failed to subscribe to isolated diagnostics data");
98            err
99        })?;
100        Ok(())
101    }
102}