test_diagnostics/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! This crate provides helper functions to collect test diagnostics.
6
7use fuchsia_async as fasync;
8use fuchsia_sync::Mutex;
9use futures::channel::mpsc;
10use futures::prelude::*;
11use futures::ready;
12use futures::task::{Context, Poll};
13use std::cell::RefCell;
14use std::io::Write;
15use std::pin::Pin;
16use std::sync::Arc;
17
18pub use crate::diagnostics::LogStream;
19
20mod diagnostics;
21pub mod zstd_compress;
22
23thread_local! {
24    static BUFFER: RefCell<Vec<u8>> = RefCell::new(vec![0; 1024*1024*2]);
25}
26
27/// Future that executes a function when bytes are available on a socket.
28pub struct SocketReadFut<'a, T, F>
29where
30    F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
31{
32    socket: &'a mut fidl::AsyncSocket,
33    on_read_fn: F,
34}
35
36impl<'a, T, F> SocketReadFut<'a, T, F>
37where
38    F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
39{
40    pub fn new(socket: &'a mut fidl::AsyncSocket, on_read_fn: F) -> Self {
41        Self { socket, on_read_fn }
42    }
43}
44
45impl<'a, T, F> Future for SocketReadFut<'a, T, F>
46where
47    F: FnMut(Option<&[u8]>) -> Result<T, std::io::Error> + Unpin,
48{
49    type Output = Result<T, std::io::Error>;
50    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51        BUFFER.with(|b| {
52            let mut b = b.borrow_mut();
53            let mut_self = self.get_mut();
54            let len = ready!(Pin::new(&mut mut_self.socket).poll_read(cx, &mut *b)?);
55            match len {
56                0 => Poll::Ready((mut_self.on_read_fn)(None)),
57                l => Poll::Ready((mut_self.on_read_fn)(Some(&b[..l]))),
58            }
59        })
60    }
61}
62
63pub async fn collect_string_from_socket(socket: fidl::Socket) -> Result<String, anyhow::Error> {
64    let (s, mut r) = mpsc::channel(1024);
65    let task = fasync::Task::spawn(collect_and_send_string_output(socket, s));
66    let mut ret = String::new();
67    while let Some(content) = r.next().await {
68        ret.push_str(&content);
69    }
70    task.await?;
71    Ok(ret)
72}
73
74pub async fn collect_and_send_string_output(
75    socket: fidl::Socket,
76    mut sender: mpsc::Sender<String>,
77) -> Result<(), anyhow::Error> {
78    let mut async_socket = fidl::AsyncSocket::from_socket(socket);
79    loop {
80        let maybe_string = SocketReadFut::new(&mut async_socket, |maybe_buf| {
81            Ok(maybe_buf.map(|buf| String::from_utf8_lossy(buf).into()))
82        })
83        .await?;
84        match maybe_string {
85            Some(string) => sender.send(string).await?,
86            None => return Ok(()),
87        }
88    }
89}
90
91/// A writer that buffers content in memory for some duration before flushing the contents to
92/// an inner writer. After the duration elapses, any new bytes are written immediately to the
93/// inner writer. Calling flush() also immediately flushes the contents.
94/// Errors that occur when flushing on timeout are returned at the next write() or flush()
95/// call. Therefore, callers should make sure to call flush before the StdoutBuffer goes out of
96/// scope.
97pub struct StdoutBuffer<W: Write + Send + 'static> {
98    inner: Arc<Mutex<StdoutBufferInner<W>>>,
99    _timer: fuchsia_async::Task<()>,
100}
101
102impl<W: Write + Send + 'static> StdoutBuffer<W> {
103    /// Crates new StdoutBuffer and starts the timer on log buffering.
104    /// `duration`: Buffers log for this duration or till done() is called.
105    /// `sender`: Channel to send logs on.
106    /// `max_capacity`: Flush log if buffer size exceeds this value. This will not cancel the timer
107    /// and all the logs would be flushed once timer expires.
108    pub fn new(duration: std::time::Duration, writer: W, max_capacity: usize) -> Self {
109        let (inner, timer) = StdoutBufferInner::new(duration, writer, max_capacity);
110        Self { inner, _timer: timer }
111    }
112}
113
114impl<W: Write + Send + 'static> Write for StdoutBuffer<W> {
115    fn flush(&mut self) -> Result<(), std::io::Error> {
116        self.inner.lock().flush()
117    }
118
119    fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
120        self.inner.lock().write(bytes)
121    }
122}
123
124struct StdoutBufferInner<W: Write + Send + 'static> {
125    writer: W,
126    /// Whether to buffer logs or not.
127    buffer: Option<Vec<u8>>,
128    stop_buffer_error: Option<std::io::Error>,
129    max_capacity: usize,
130}
131
132impl<W: Write + Send + 'static> StdoutBufferInner<W> {
133    fn new(
134        duration: std::time::Duration,
135        writer: W,
136        max_capacity: usize,
137    ) -> (Arc<Mutex<Self>>, fuchsia_async::Task<()>) {
138        let new_self = Arc::new(Mutex::new(StdoutBufferInner {
139            writer,
140            buffer: Some(Vec::with_capacity(max_capacity)),
141            stop_buffer_error: None,
142            max_capacity,
143        }));
144
145        let timer = fuchsia_async::Timer::new(duration);
146        let log_buffer = Arc::downgrade(&new_self);
147        let f = async move {
148            timer.await;
149            if let Some(log_buffer) = log_buffer.upgrade() {
150                log_buffer.lock().stop_buffering();
151            }
152        };
153
154        (new_self, fuchsia_async::Task::spawn(f))
155    }
156
157    fn stop_buffering(&mut self) {
158        if let Some(buf) = self.buffer.take() {
159            if let Err(e) = self.writer.write_all(&buf) {
160                self.stop_buffer_error = Some(e);
161            }
162        }
163    }
164}
165
166impl<W: Write + Send + 'static> Write for StdoutBufferInner<W> {
167    fn flush(&mut self) -> Result<(), std::io::Error> {
168        self.stop_buffering();
169        match self.stop_buffer_error.take() {
170            Some(e) => Err(e),
171            None => self.writer.flush(),
172        }
173    }
174
175    fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
176        if let Some(e) = self.stop_buffer_error.take() {
177            return Err(e);
178        }
179        match self.buffer.as_mut() {
180            None => self.writer.write(bytes),
181            Some(buf) if buf.len() + bytes.len() > self.max_capacity => {
182                self.writer.write_all(&buf)?;
183                buf.truncate(0);
184                self.writer.write(bytes)
185            }
186            Some(buf) => Write::write(buf, bytes),
187        }
188    }
189}
190
191impl<W: Write + Send + 'static> Drop for StdoutBufferInner<W> {
192    fn drop(&mut self) {
193        let _ = self.flush();
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use fidl::HandleBased;
201    use futures::StreamExt;
202    use pretty_assertions::assert_eq;
203
204    async fn collect_until_eq<S: Stream<Item = String> + Unpin>(mut stream: S, target: &str) {
205        let mut accumulator = "".to_string();
206        while accumulator.len() < target.len() {
207            match stream.next().await {
208                Some(string) => accumulator.push_str(&string),
209                None => panic!(
210                    "Expected string '{}' but stream terminated after '{}'",
211                    target, accumulator
212                ),
213            }
214        }
215        assert_eq!(target, accumulator);
216    }
217
218    #[fuchsia_async::run_singlethreaded(test)]
219    async fn collect_test_stdout() {
220        let (sock_server, sock_client) = fidl::Socket::create_stream();
221
222        let (sender, mut recv) = mpsc::channel(1);
223
224        let fut =
225            fuchsia_async::Task::spawn(collect_and_send_string_output(sock_client, sender.into()));
226
227        sock_server.write(b"test message 1").expect("Can't write msg to socket");
228        sock_server.write(b"test message 2").expect("Can't write msg to socket");
229        sock_server.write(b"test message 3").expect("Can't write msg to socket");
230
231        collect_until_eq(&mut recv, "test message 1test message 2test message 3").await;
232
233        // can receive messages multiple times
234        sock_server.write(b"test message 4").expect("Can't write msg to socket");
235        collect_until_eq(&mut recv, "test message 4").await;
236
237        // messages can be read after socket server is closed.
238        sock_server.write(b"test message 5").expect("Can't write msg to socket");
239        sock_server.into_handle(); // this will drop this handle and close it.
240        fut.await.expect("log collection should not fail");
241        collect_until_eq(&mut recv, "test message 5").await;
242
243        // socket was closed, this should return None
244        let msg = recv.next().await;
245        assert_eq!(msg, None);
246    }
247
248    /// Host side executor doesn't have a fake timer, so these tests only run on device for now.
249    #[cfg(target_os = "fuchsia")]
250    mod stdout {
251        use super::*;
252        use fuchsia_async::TestExecutor;
253
254        use pretty_assertions::assert_eq;
255        use std::ops::Add;
256
257        struct MutexBytes(Arc<Mutex<Vec<u8>>>);
258
259        impl Write for MutexBytes {
260            fn flush(&mut self) -> Result<(), std::io::Error> {
261                Write::flush(&mut *self.0.lock())
262            }
263
264            fn write(&mut self, bytes: &[u8]) -> Result<usize, std::io::Error> {
265                Write::write(&mut *self.0.lock(), bytes)
266            }
267        }
268
269        #[test]
270        fn log_buffer_without_timeout() {
271            let mut executor = TestExecutor::new_with_fake_time();
272            let output = Arc::new(Mutex::new(vec![]));
273            let writer = MutexBytes(output.clone());
274            let (log_buffer, mut timeout_task) =
275                StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
276
277            write!(log_buffer.lock(), "message1").expect("write message");
278            assert_eq!(*output.lock(), b"");
279            write!(log_buffer.lock(), "message2").expect("write message");
280            assert_eq!(*output.lock(), b"");
281
282            assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
283            assert_eq!(*output.lock(), b"");
284
285            log_buffer.lock().flush().expect("flush buffer");
286            assert_eq!(*output.lock(), b"message1message2");
287        }
288
289        #[test]
290        fn log_buffer_flush_on_drop() {
291            let mut executor = TestExecutor::new_with_fake_time();
292            let output = Arc::new(Mutex::new(vec![]));
293            let writer = MutexBytes(output.clone());
294            let (log_buffer, mut timeout_task) =
295                StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
296
297            write!(log_buffer.lock(), "message1").expect("write message");
298            assert_eq!(*output.lock(), b"");
299            write!(log_buffer.lock(), "message2").expect("write message");
300            assert_eq!(*output.lock(), b"");
301
302            assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
303            assert_eq!(*output.lock(), b"");
304
305            drop(log_buffer);
306            assert_eq!(*output.lock(), b"message1message2");
307        }
308
309        #[test]
310        fn log_buffer_with_timeout() {
311            let mut executor = TestExecutor::new_with_fake_time();
312            let output = Arc::new(Mutex::new(vec![]));
313            let writer = MutexBytes(output.clone());
314            let (log_buffer, mut timeout_task) =
315                StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 100);
316
317            write!(log_buffer.lock(), "message1").expect("write message");
318            assert_eq!(*output.lock(), b"");
319            write!(log_buffer.lock(), "message2").expect("write message");
320            assert_eq!(*output.lock(), b"");
321
322            assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Pending);
323            assert_eq!(*output.lock(), b"");
324
325            executor.set_fake_time(executor.now().add(zx::MonotonicDuration::from_seconds(6)));
326            executor.wake_next_timer();
327            assert_eq!(executor.run_until_stalled(&mut timeout_task), Poll::Ready(()));
328            assert_eq!(*output.lock(), b"message1message2");
329        }
330
331        #[test]
332        fn log_buffer_capacity_reached() {
333            let _executor = TestExecutor::new_with_fake_time();
334            let output = Arc::new(Mutex::new(vec![]));
335            let writer = MutexBytes(output.clone());
336            let (log_buffer, _timeout_task) =
337                StdoutBufferInner::new(std::time::Duration::from_secs(5), writer, 10);
338
339            write!(log_buffer.lock(), "message1").expect("write message");
340            assert_eq!(*output.lock(), b"");
341            write!(log_buffer.lock(), "message2").expect("write message");
342            assert_eq!(*output.lock(), b"message1message2");
343
344            // capacity was reached but buffering is still on, so next msg should buffer
345            write!(log_buffer.lock(), "message1").expect("write message");
346            assert_eq!(*output.lock(), b"message1message2");
347            write!(log_buffer.lock(), "message2").expect("write message");
348            assert_eq!(*output.lock(), b"message1message2message1message2");
349        }
350    }
351}