Skip to main content

test_diagnostics/
diagnostics.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Error, Result};
6use diagnostics_data::LogsData;
7use flex_fuchsia_test_manager as ftest_manager;
8use futures::stream::BoxStream;
9use futures::{Stream, StreamExt};
10#[cfg(not(feature = "fdomain"))]
11use log_command::LogsDataStream;
12#[cfg(feature = "fdomain")]
13use log_command_fdomain::LogsDataStream;
14use pin_project::pin_project;
15use std::pin::Pin;
16use std::task::{Context, Poll};
17
18#[cfg(target_os = "fuchsia")]
19use crate::diagnostics::fuchsia::BatchLogStream;
20
21#[pin_project]
22pub struct LogStream {
23    #[pin]
24    stream: BoxStream<'static, Result<LogsData, Error>>,
25}
26
27impl LogStream {
28    fn new<S>(stream: S) -> Self
29    where
30        S: Stream<Item = Result<LogsData, Error>> + Send + 'static,
31    {
32        Self { stream: stream.boxed() }
33    }
34
35    pub fn from_syslog(syslog: ftest_manager::Syslog) -> Result<LogStream> {
36        get_log_stream(syslog)
37    }
38}
39
40impl Stream for LogStream {
41    type Item = Result<LogsData, Error>;
42    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43        let this = self.project();
44        this.stream.poll_next(cx)
45    }
46}
47
48#[cfg(target_os = "fuchsia")]
49fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream> {
50    match syslog {
51        ftest_manager::Syslog::Batch(client_end) => {
52            Ok(LogStream::new(BatchLogStream::from_client_end(client_end)?))
53        }
54        ftest_manager::Syslog::Stream(client_end) => Ok(LogStream::new(
55            LogsDataStream::new(flex_client::socket_to_async(client_end))
56                .map(|result| result.map_err(Error::from)),
57        )),
58        _ => {
59            panic!("not supported")
60        }
61    }
62}
63
64#[cfg(not(target_os = "fuchsia"))]
65fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream> {
66    match syslog {
67        ftest_manager::Syslog::Stream(client_end) => Ok(LogStream::new(
68            LogsDataStream::new(flex_client::socket_to_async(client_end))
69                .map(|result| result.map_err(Error::from)),
70        )),
71        ftest_manager::Syslog::Batch(_) => panic!("batch iterator not supported on host"),
72        _ => {
73            panic!("not supported")
74        }
75    }
76}
77
78#[cfg(target_os = "fuchsia")]
79mod fuchsia {
80    use super::*;
81    use diagnostics_reader::Subscription;
82    use fidl::endpoints::ClientEnd;
83    use fidl_fuchsia_diagnostics::BatchIteratorMarker;
84
85    #[pin_project]
86    pub struct BatchLogStream {
87        #[pin]
88        subscription: Subscription,
89    }
90
91    impl BatchLogStream {
92        #[cfg(test)]
93        pub fn new() -> Result<(Self, ftest_manager::LogsIterator), fidl::Error> {
94            let (proxy, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
95            let subscription = Subscription::new(proxy);
96            Ok((Self { subscription }, ftest_manager::LogsIterator::Batch(server_end)))
97        }
98
99        pub fn from_client_end(
100            client_end: ClientEnd<BatchIteratorMarker>,
101        ) -> Result<Self, fidl::Error> {
102            Ok(Self { subscription: Subscription::new(client_end.into_proxy()) })
103        }
104    }
105
106    impl Stream for BatchLogStream {
107        type Item = Result<LogsData, Error>;
108        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
109            let this = self.project();
110            match this.subscription.poll_next(cx) {
111                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
112                Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
113                Poll::Ready(None) => Poll::Ready(None),
114                Poll::Pending => Poll::Pending,
115            }
116        }
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use diagnostics_data::{ExtendedMoniker, Severity, Timestamp};
124    use fuchsia_async as fasync;
125
126    #[cfg(target_os = "fuchsia")]
127    mod fuchsia {
128        use super::*;
129        use assert_matches::assert_matches;
130        use fidl::endpoints::ServerEnd;
131        use fidl_fuchsia_diagnostics::{
132            BatchIteratorMarker, BatchIteratorRequest, FormattedContent, ReaderError,
133        };
134        use fidl_fuchsia_mem as fmem;
135        use futures::TryStreamExt;
136
137        fn create_log_stream() -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
138            let (stream, iterator) = BatchLogStream::new()?;
139            Ok((LogStream::new(stream), iterator))
140        }
141
142        struct BatchIteratorOpts {
143            with_error: bool,
144        }
145
146        /// Spanws a dummy batch iterator for testing that return 3 logs: "1", "2", "3" all with
147        /// the same severity
148        async fn spawn_batch_iterator_server(
149            server_end: ServerEnd<BatchIteratorMarker>,
150            opts: BatchIteratorOpts,
151        ) {
152            let mut request_stream = server_end.into_stream();
153            let mut values = vec![1i64, 2, 3].into_iter();
154            while let Some(request) = request_stream.try_next().await.expect("get next request") {
155                match request {
156                    BatchIteratorRequest::WaitForReady { responder } => {
157                        responder.send().expect("sent response")
158                    }
159                    BatchIteratorRequest::GetNext { responder } => match values.next() {
160                        None => {
161                            responder.send(Ok(vec![])).expect("send empty response");
162                        }
163                        Some(value) => {
164                            if opts.with_error {
165                                responder.send(Err(ReaderError::Io)).expect("send error");
166                                continue;
167                            }
168                            let content = get_json_data(value);
169                            let size = content.len() as u64;
170                            let vmo = zx::Vmo::create(size).expect("create vmo");
171                            vmo.write(content.as_bytes(), 0).expect("write vmo");
172                            let result = FormattedContent::Json(fmem::Buffer { vmo, size });
173                            responder.send(Ok(vec![result])).expect("send response");
174                        }
175                    },
176                    BatchIteratorRequest::_UnknownMethod { .. } => {
177                        unreachable!("We aren't expecting any other call");
178                    }
179                }
180            }
181        }
182
183        #[fasync::run_singlethreaded(test)]
184        async fn log_stream_returns_logs() {
185            let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
186            let server_end = match iterator {
187                ftest_manager::LogsIterator::Batch(server_end) => server_end,
188                _ => panic!("unexpected logs iterator server end"),
189            };
190            fasync::Task::spawn(spawn_batch_iterator_server(
191                server_end,
192                BatchIteratorOpts { with_error: false },
193            ))
194            .detach();
195            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
196            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
197            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
198            assert_matches!(log_stream.next().await, None);
199        }
200
201        #[fasync::run_singlethreaded(test)]
202        async fn log_stream_can_return_errors() {
203            let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
204            let server_end = match iterator {
205                ftest_manager::LogsIterator::Batch(server_end) => server_end,
206                _ => panic!("unexpected logs iterator server end"),
207            };
208            fasync::Task::spawn(spawn_batch_iterator_server(
209                server_end,
210                BatchIteratorOpts { with_error: true },
211            ))
212            .detach();
213            assert_matches!(log_stream.next().await, Some(Err(_)));
214        }
215    }
216
217    #[cfg(not(target_os = "fuchsia"))]
218    mod host {
219        use super::*;
220        use assert_matches::assert_matches;
221        use futures::AsyncWriteExt;
222
223        fn create_log_stream(
224            client: flex_client::ClientArg,
225        ) -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
226            let (client_end, server_end) = client.create_stream_socket();
227            let (stream, iterator) = (
228                LogStream::new(
229                    LogsDataStream::new(flex_client::socket_to_async(client_end))
230                        .map(|result| result.map_err(Error::from)),
231                ),
232                ftest_manager::LogsIterator::Stream(server_end),
233            );
234            Ok((LogStream::new(stream), iterator))
235        }
236
237        async fn spawn_archive_iterator_server(socket: flex_client::Socket, with_error: bool) {
238            let mut socket = flex_client::socket_to_async(socket);
239            let mut values = vec![1, 2, 3].into_iter();
240            loop {
241                match values.next() {
242                    None => {
243                        // End of stream, close socket.
244                        break;
245                    }
246                    Some(value) => {
247                        if with_error {
248                            // Send invalid JSON to trigger an error other than
249                            // ZX_ERR_PEER_CLOSED
250                            let _ = socket.write_all("5".as_bytes()).await;
251                            continue;
252                        }
253                        socket.write_all(get_json_data(value).as_bytes()).await.unwrap();
254                    }
255                }
256            }
257        }
258
259        async fn archive_stream_returns_logs(client: flex_client::ClientArg) {
260            let (mut log_stream, iterator) = create_log_stream(client).expect("got log stream");
261            let server_end = match iterator {
262                ftest_manager::LogsIterator::Stream(server_end) => server_end,
263                _ => panic!("unexpected logs iterator server end"),
264            };
265            fasync::Task::spawn(spawn_archive_iterator_server(server_end, false)).detach();
266            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
267            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
268            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
269            assert_matches!(log_stream.next().await, None);
270        }
271
272        #[fasync::run_singlethreaded(test)]
273        async fn archive_stream_returns_logs_inline() {
274            let client = fdomain_local::local_client_empty();
275            archive_stream_returns_logs(client.clone()).await;
276        }
277
278        async fn archive_stream_can_return_errors(client: flex_client::ClientArg) {
279            let (mut log_stream, iterator) = create_log_stream(client).expect("got log stream");
280            let server_end = match iterator {
281                ftest_manager::LogsIterator::Stream(server_end) => server_end,
282                _ => panic!("unexpected logs iterator server end"),
283            };
284            fasync::Task::spawn(spawn_archive_iterator_server(server_end, true)).detach();
285            assert_matches!(log_stream.next().await, Some(Err(_)));
286        }
287
288        #[fasync::run_singlethreaded(test)]
289        async fn archive_stream_can_return_errors_inline() {
290            let client = fdomain_local::local_client_empty();
291            archive_stream_can_return_errors(client.clone()).await;
292        }
293    }
294
295    fn get_json_data(value: i64) -> String {
296        let data = diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
297            timestamp: Timestamp::from_nanos(0),
298            component_url: Some("fake-url".into()),
299            moniker: ExtendedMoniker::parse_str("test/moniker").unwrap(),
300            severity: Severity::Info,
301        })
302        .set_message(value.to_string())
303        .build();
304
305        serde_json::to_string(&data).expect("serialize to json")
306    }
307}