test_diagnostics/
diagnostics.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Error;
6use diagnostics_data::LogsData;
7use fidl_fuchsia_test_manager as ftest_manager;
8use futures::stream::BoxStream;
9use futures::{Stream, StreamExt};
10use log_command::LogsDataStream;
11use pin_project::pin_project;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14
15#[cfg(target_os = "fuchsia")]
16use crate::diagnostics::fuchsia::BatchLogStream;
17
18#[pin_project]
19pub struct LogStream {
20    #[pin]
21    stream: BoxStream<'static, Result<LogsData, Error>>,
22}
23
24impl LogStream {
25    fn new<S>(stream: S) -> Self
26    where
27        S: Stream<Item = Result<LogsData, Error>> + Send + 'static,
28    {
29        Self { stream: stream.boxed() }
30    }
31
32    pub fn from_syslog(syslog: ftest_manager::Syslog) -> Result<LogStream, fidl::Error> {
33        get_log_stream(syslog)
34    }
35}
36
37impl Stream for LogStream {
38    type Item = Result<LogsData, Error>;
39    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40        let this = self.project();
41        this.stream.poll_next(cx)
42    }
43}
44
45#[cfg(target_os = "fuchsia")]
46fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream, fidl::Error> {
47    match syslog {
48        ftest_manager::Syslog::Batch(client_end) => {
49            Ok(LogStream::new(BatchLogStream::from_client_end(client_end)?))
50        }
51        ftest_manager::Syslog::Stream(client_end) => Ok(LogStream::new(
52            LogsDataStream::new(fuchsia_async::Socket::from_socket(client_end))
53                .map(|result| Ok(result)),
54        )),
55        _ => {
56            panic!("not supported")
57        }
58    }
59}
60
61#[cfg(not(target_os = "fuchsia"))]
62fn get_log_stream(syslog: ftest_manager::Syslog) -> Result<LogStream, fidl::Error> {
63    match syslog {
64        ftest_manager::Syslog::Stream(client_end) => Ok(LogStream::new(
65            LogsDataStream::new(fuchsia_async::Socket::from_socket(client_end))
66                .map(|result| Ok(result)),
67        )),
68        ftest_manager::Syslog::Batch(_) => panic!("batch iterator not supported on host"),
69        _ => {
70            panic!("not supported")
71        }
72    }
73}
74
75#[cfg(target_os = "fuchsia")]
76mod fuchsia {
77    use super::*;
78    use diagnostics_reader::Subscription;
79    use fidl::endpoints::ClientEnd;
80    use fidl_fuchsia_diagnostics::BatchIteratorMarker;
81
82    #[pin_project]
83    pub struct BatchLogStream {
84        #[pin]
85        subscription: Subscription,
86    }
87
88    impl BatchLogStream {
89        #[cfg(test)]
90        pub fn new() -> Result<(Self, ftest_manager::LogsIterator), fidl::Error> {
91            let (proxy, server_end) = fidl::endpoints::create_proxy::<BatchIteratorMarker>();
92            let subscription = Subscription::new(proxy);
93            Ok((Self { subscription }, ftest_manager::LogsIterator::Batch(server_end)))
94        }
95
96        pub fn from_client_end(
97            client_end: ClientEnd<BatchIteratorMarker>,
98        ) -> Result<Self, fidl::Error> {
99            Ok(Self { subscription: Subscription::new(client_end.into_proxy()) })
100        }
101    }
102
103    impl Stream for BatchLogStream {
104        type Item = Result<LogsData, Error>;
105        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106            let this = self.project();
107            match this.subscription.poll_next(cx) {
108                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
109                Poll::Ready(Some(Ok(value))) => Poll::Ready(Some(Ok(value))),
110                Poll::Ready(None) => Poll::Ready(None),
111                Poll::Pending => Poll::Pending,
112            }
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use diagnostics_data::{ExtendedMoniker, Severity, Timestamp};
121    use fuchsia_async as fasync;
122
123    #[cfg(target_os = "fuchsia")]
124    mod fuchsia {
125        use super::*;
126        use assert_matches::assert_matches;
127        use fidl::endpoints::ServerEnd;
128        use fidl_fuchsia_diagnostics::{
129            BatchIteratorMarker, BatchIteratorRequest, FormattedContent, ReaderError,
130        };
131        use fidl_fuchsia_mem as fmem;
132        use futures::TryStreamExt;
133
134        fn create_log_stream() -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
135            let (stream, iterator) = BatchLogStream::new()?;
136            Ok((LogStream::new(stream), iterator))
137        }
138
139        struct BatchIteratorOpts {
140            with_error: bool,
141        }
142
143        /// Spanws a dummy batch iterator for testing that return 3 logs: "1", "2", "3" all with
144        /// the same severity
145        async fn spawn_batch_iterator_server(
146            server_end: ServerEnd<BatchIteratorMarker>,
147            opts: BatchIteratorOpts,
148        ) {
149            let mut request_stream = server_end.into_stream();
150            let mut values = vec![1i64, 2, 3].into_iter();
151            while let Some(request) = request_stream.try_next().await.expect("get next request") {
152                match request {
153                    BatchIteratorRequest::WaitForReady { responder } => {
154                        responder.send().expect("sent response")
155                    }
156                    BatchIteratorRequest::GetNext { responder } => match values.next() {
157                        None => {
158                            responder.send(Ok(vec![])).expect("send empty response");
159                        }
160                        Some(value) => {
161                            if opts.with_error {
162                                responder.send(Err(ReaderError::Io)).expect("send error");
163                                continue;
164                            }
165                            let content = get_json_data(value);
166                            let size = content.len() as u64;
167                            let vmo = zx::Vmo::create(size).expect("create vmo");
168                            vmo.write(content.as_bytes(), 0).expect("write vmo");
169                            let result = FormattedContent::Json(fmem::Buffer { vmo, size });
170                            responder.send(Ok(vec![result])).expect("send response");
171                        }
172                    },
173                    BatchIteratorRequest::_UnknownMethod { .. } => {
174                        unreachable!("We aren't expecting any other call");
175                    }
176                }
177            }
178        }
179
180        #[fasync::run_singlethreaded(test)]
181        async fn log_stream_returns_logs() {
182            let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
183            let server_end = match iterator {
184                ftest_manager::LogsIterator::Batch(server_end) => server_end,
185                _ => panic!("unexpected logs iterator server end"),
186            };
187            fasync::Task::spawn(spawn_batch_iterator_server(
188                server_end,
189                BatchIteratorOpts { with_error: false },
190            ))
191            .detach();
192            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
193            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
194            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
195            assert_matches!(log_stream.next().await, None);
196        }
197
198        #[fasync::run_singlethreaded(test)]
199        async fn log_stream_can_return_errors() {
200            let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
201            let server_end = match iterator {
202                ftest_manager::LogsIterator::Batch(server_end) => server_end,
203                _ => panic!("unexpected logs iterator server end"),
204            };
205            fasync::Task::spawn(spawn_batch_iterator_server(
206                server_end,
207                BatchIteratorOpts { with_error: true },
208            ))
209            .detach();
210            assert_matches!(log_stream.next().await, Some(Err(_)));
211        }
212    }
213
214    #[cfg(not(target_os = "fuchsia"))]
215    mod host {
216        use super::*;
217        use assert_matches::assert_matches;
218        use futures::AsyncWriteExt;
219
220        fn create_log_stream() -> Result<(LogStream, ftest_manager::LogsIterator), fidl::Error> {
221            let (client_end, server_end) = fidl::Socket::create_stream();
222            let (stream, iterator) = (
223                LogStream::new(
224                    LogsDataStream::new(fuchsia_async::Socket::from_socket(client_end))
225                        .map(|result| Ok(result)),
226                ),
227                ftest_manager::LogsIterator::Stream(server_end),
228            );
229            Ok((LogStream::new(stream), iterator))
230        }
231
232        async fn spawn_archive_iterator_server(socket: fidl::Socket, with_error: bool) {
233            let mut socket = fuchsia_async::Socket::from_socket(socket);
234            let mut values = vec![1, 2, 3].into_iter();
235            loop {
236                match values.next() {
237                    None => {
238                        // End of stream, close socket.
239                        break;
240                    }
241                    Some(value) => {
242                        if with_error {
243                            // Send invalid JSON to trigger an error other than
244                            // ZX_ERR_PEER_CLOSED
245                            let _ = socket.write_all("5".as_bytes()).await;
246                            continue;
247                        }
248                        socket.write_all(get_json_data(value).as_bytes()).await.unwrap();
249                    }
250                }
251            }
252        }
253
254        async fn archive_stream_returns_logs() {
255            let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
256            let server_end = match iterator {
257                ftest_manager::LogsIterator::Stream(server_end) => server_end,
258                _ => panic!("unexpected logs iterator server end"),
259            };
260            fasync::Task::spawn(spawn_archive_iterator_server(server_end, false)).detach();
261            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("1"));
262            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("2"));
263            assert_eq!(log_stream.next().await.unwrap().expect("got ok result").msg(), Some("3"));
264            assert_matches!(log_stream.next().await, None);
265        }
266
267        #[fasync::run_singlethreaded(test)]
268        async fn archive_stream_returns_logs_inline() {
269            archive_stream_returns_logs().await;
270        }
271
272        async fn archive_stream_can_return_errors() {
273            let (mut log_stream, iterator) = create_log_stream().expect("got log stream");
274            let server_end = match iterator {
275                ftest_manager::LogsIterator::Stream(server_end) => server_end,
276                _ => panic!("unexpected logs iterator server end"),
277            };
278            fasync::Task::spawn(spawn_archive_iterator_server(server_end, true)).detach();
279            assert_matches!(log_stream.next().await, None);
280        }
281
282        #[fasync::run_singlethreaded(test)]
283        async fn archive_stream_can_return_errors_inline() {
284            archive_stream_can_return_errors().await;
285        }
286    }
287
288    fn get_json_data(value: i64) -> String {
289        let data = diagnostics_data::LogsDataBuilder::new(diagnostics_data::BuilderArgs {
290            timestamp: Timestamp::from_nanos(0),
291            component_url: Some("fake-url".into()),
292            moniker: ExtendedMoniker::parse_str("test/moniker").unwrap(),
293            severity: Severity::Info,
294        })
295        .set_message(value.to_string())
296        .build();
297
298        serde_json::to_string(&data).expect("serialize to json")
299    }
300}