test_runners_lib/
logs.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Helpers for capturing logs from Fuchsia processes.
6
7use fuchsia_async as fasync;
8use futures::{future, AsyncReadExt as _, AsyncWriteExt as _, FutureExt as _};
9use std::num::NonZeroUsize;
10use thiserror::Error;
11use zx::HandleBased as _;
12
13/// Buffer size for socket read calls to `LoggerStream::buffer_and_drain`.
14const SOCKET_BUFFER_SIZE: usize = 2048;
15
16/// Maximum length we will buffer for a single line. If a line is longer than this
17/// length it will be split up into multiple messages.
18const MAX_LINE_BUFFER_LENGTH: usize = 4096;
19
20/// Error returned by this library.
21#[derive(Debug, PartialEq, Eq, Error, Clone)]
22pub enum LoggerError {
23    #[error("cannot create socket: {:?}", _0)]
24    CreateSocket(zx::Status),
25
26    #[error("cannot duplicate socket: {:?}", _0)]
27    DuplicateSocket(zx::Status),
28
29    #[error("invalid socket: {:?}", _0)]
30    InvalidSocket(zx::Status),
31}
32
33/// Error returned from draining LoggerStream or writing to LogWriter.
34#[derive(Debug, Error)]
35pub enum LogError {
36    /// Error encountered when draining LoggerStream.
37    #[error("can't get logs: {:?}", _0)]
38    Read(std::io::Error),
39
40    /// Error encountered when writing to LogWriter.
41    #[error("can't write logs: {:?}", _0)]
42    Write(std::io::Error),
43}
44
45/// Creates a combined socket handle for stdout and stderr and hooks them to same socket.
46/// It also wraps the socket into stream and returns it back.
47pub fn create_std_combined_log_stream(
48) -> Result<(LoggerStream, zx::Handle, zx::Handle), LoggerError> {
49    let (client, log) = zx::Socket::create_stream();
50
51    let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
52    let clone =
53        log.duplicate_handle(zx::Rights::SAME_RIGHTS).map_err(LoggerError::DuplicateSocket)?;
54
55    Ok((stream, log.into_handle(), clone.into_handle()))
56}
57
58/// Creates a socket handle for stdout/stderr and hooks it to a file handle.
59/// It also wraps the socket into stream and returns it back.
60pub fn create_log_stream() -> Result<(LoggerStream, zx::Handle), LoggerError> {
61    let (client, log) = zx::Socket::create_stream();
62
63    let stream = LoggerStream::new(client).map_err(LoggerError::InvalidSocket)?;
64
65    Ok((stream, log.into_handle()))
66}
67/// Collects logs in background and gives a way to collect those logs.
68pub struct LogStreamReader {
69    fut: future::RemoteHandle<Result<Vec<u8>, LogError>>,
70}
71
72impl LogStreamReader {
73    pub fn new(logger: LoggerStream) -> Self {
74        let (logger_handle, logger_fut) = logger.read_to_end().remote_handle();
75        fasync::Task::spawn(logger_handle).detach();
76        Self { fut: logger_fut }
77    }
78
79    /// Retrieve all logs.
80    pub async fn get_logs(self) -> Result<Vec<u8>, LogError> {
81        self.fut.await
82    }
83}
84
85/// A stream bound to a socket where a source stream is captured.
86/// For example, stdout and stderr streams can be redirected to the contained
87/// socket and captured.
88pub struct LoggerStream {
89    socket: fasync::Socket,
90}
91
92impl Unpin for LoggerStream {}
93
94impl LoggerStream {
95    /// Create a LoggerStream from the provided zx::Socket. The `socket` object
96    /// should be bound to its intended source stream (e.g. "stdout").
97    pub fn new(socket: zx::Socket) -> Result<LoggerStream, zx::Status> {
98        let l = LoggerStream { socket: fasync::Socket::from_socket(socket) };
99        Ok(l)
100    }
101
102    /// Reads all bytes from socket.
103    pub async fn read_to_end(mut self) -> Result<Vec<u8>, LogError> {
104        let mut buffer: Vec<u8> = Vec::new();
105        let _bytes_read = self.socket.read_to_end(&mut buffer).await.map_err(LogError::Read)?;
106        Ok(buffer)
107    }
108
109    /// Drain the `stream` and write all of its contents to `writer`. Bytes are
110    /// delimited by newline and each line will be passed to `writer.write` individually.
111    /// An optional `peek_fn` may be specified which is passed a reference to each line before
112    /// it is written.
113    pub async fn buffer_drain_and_peek(
114        mut self,
115        writer: &mut SocketLogWriter,
116        peek_fn: Option<impl Fn(&[u8])>,
117    ) -> Result<(), LogError> {
118        let mut line_buffer: Vec<u8> = Vec::with_capacity(MAX_LINE_BUFFER_LENGTH);
119        let mut socket_buffer: Vec<u8> = vec![0; SOCKET_BUFFER_SIZE];
120
121        while let Some(bytes_read) = NonZeroUsize::new(
122            self.socket.read(&mut socket_buffer[..]).await.map_err(LogError::Read)?,
123        ) {
124            let bytes_read = bytes_read.get();
125
126            let newline_iter =
127                socket_buffer[..bytes_read].iter().enumerate().filter_map(|(i, &b)| {
128                    if b == b'\n' {
129                        Some(i)
130                    } else {
131                        None
132                    }
133                });
134
135            let mut prev_offset = 0;
136            for idx in newline_iter {
137                let line = &socket_buffer[prev_offset..idx + 1];
138                if !line_buffer.is_empty() {
139                    writer.write(line_buffer.drain(..).as_slice()).await?;
140                }
141                if let Some(ref peek) = &peek_fn {
142                    peek(line);
143                }
144                writer.write(line).await?;
145                prev_offset = idx + 1;
146            }
147            if prev_offset != bytes_read {
148                line_buffer.extend_from_slice(&socket_buffer[prev_offset..bytes_read]);
149            }
150
151            if line_buffer.len() > MAX_LINE_BUFFER_LENGTH {
152                let bytes = &line_buffer[..MAX_LINE_BUFFER_LENGTH];
153                if let Some(ref peek) = &peek_fn {
154                    peek(bytes);
155                }
156                writer.write(bytes).await?;
157                line_buffer.drain(..MAX_LINE_BUFFER_LENGTH);
158            }
159        }
160
161        if !line_buffer.is_empty() {
162            let bytes = &line_buffer[..];
163            if let Some(ref peek) = &peek_fn {
164                peek(bytes);
165            }
166            writer.write(bytes).await?;
167        }
168
169        Ok(())
170    }
171
172    /// Convenience function for buffer_drain_and_peek without a peek function.
173    pub async fn buffer_and_drain(self, writer: &mut SocketLogWriter) -> Result<(), LogError> {
174        self.buffer_drain_and_peek(writer, None::<fn(&[u8])>).await
175    }
176
177    /// Take the underlying socket of this object.
178    pub fn take_socket(self) -> fasync::Socket {
179        self.socket
180    }
181}
182
183/// Utility struct to write to socket asynchrously.
184pub struct SocketLogWriter {
185    logger: fasync::Socket,
186}
187
188impl SocketLogWriter {
189    pub fn new(logger: fasync::Socket) -> Self {
190        Self { logger }
191    }
192
193    pub async fn write_str(&mut self, s: &str) -> Result<(), LogError> {
194        self.write(s.as_bytes()).await
195    }
196
197    pub async fn write(&mut self, bytes: &[u8]) -> Result<(), LogError> {
198        self.logger.write_all(bytes).await.map_err(LogError::Write)
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use anyhow::{format_err, Context as _, Error};
206    use assert_matches::assert_matches;
207    use futures::{try_join, TryStreamExt as _};
208    use rand::distributions::{Alphanumeric, DistString as _};
209    use rand::thread_rng;
210    use std::sync::mpsc;
211    use test_case::test_case;
212
213    #[fuchsia_async::run_singlethreaded(test)]
214    async fn log_writer_reader_work() {
215        let (sock1, sock2) = zx::Socket::create_stream();
216        let mut log_writer = SocketLogWriter::new(fasync::Socket::from_socket(sock1));
217
218        let reader = LoggerStream::new(sock2).unwrap();
219        let reader = LogStreamReader::new(reader);
220
221        log_writer.write_str("this is string one.").await.unwrap();
222        log_writer.write_str("this is string two.").await.unwrap();
223        drop(log_writer);
224
225        let actual = reader.get_logs().await.unwrap();
226        let actual = std::str::from_utf8(&actual).unwrap();
227        assert_eq!(actual, "this is string one.this is string two.".to_owned());
228    }
229
230    #[test_case(String::from("Hello World!") ; "consumes_simple_msg")]
231    #[test_case(get_random_string(10000) ; "consumes_large_msg")]
232    #[fasync::run_singlethreaded(test)]
233    async fn logger_stream_read_to_end(msg: String) -> Result<(), Error> {
234        let (stream, tx) = create_logger_stream()?;
235
236        let () = take_and_write_to_socket(tx, &msg)?;
237        let result = stream.read_to_end().await.context("Failed to read from socket")?;
238        let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
239
240        assert_eq!(actual, msg);
241        Ok(())
242    }
243
244    #[fasync::run_singlethreaded(test)]
245    async fn logger_stream_read_to_end_consumes_concat_msgs() -> Result<(), Error> {
246        let (stream, tx) = create_logger_stream()?;
247        let msgs =
248            vec!["Hello World!".to_owned(), "Hola Mundo!".to_owned(), "你好,世界!".to_owned()];
249
250        for msg in msgs.iter() {
251            let () = write_to_socket(&tx, &msg)?;
252        }
253        std::mem::drop(tx);
254        let result = stream.read_to_end().await.context("Failed to read from socket")?;
255        let actual = std::str::from_utf8(&result).context("Failed to parse bytes")?.to_owned();
256
257        assert_eq!(actual, msgs.join(""));
258        Ok(())
259    }
260
261    #[fasync::run_singlethreaded(test)]
262    async fn buffer_and_drain_reads_each_line_as_a_new_message() -> Result<(), Error> {
263        let (stream, tx) = create_logger_stream()?;
264        let (mut logger, rx) = create_datagram_logger()?;
265        let msg = "Hello World\nHola Mundo!\n你好,世界!";
266
267        let (tx_peeks, rx_peeks) = mpsc::channel();
268
269        let () = take_and_write_to_socket(tx, msg)?;
270        let (actual, ()) = try_join!(read_all_messages(rx), async move {
271            stream
272                .buffer_drain_and_peek(
273                    &mut logger,
274                    Some(move |line: &[u8]| tx_peeks.send(line.len()).unwrap()),
275                )
276                .await
277                .context("Failed to drain stream")
278        },)?;
279
280        let expected = vec![
281            "Hello World\n".to_string(),
282            "Hola Mundo!\n".to_string(),
283            "你好,世界!".to_string(),
284        ];
285        assert_eq!(actual, expected);
286
287        let lengths = rx_peeks.iter().collect::<Vec<_>>();
288
289        assert_eq!(lengths, expected.iter().map(|v| v.len()).collect::<Vec<_>>());
290
291        Ok(())
292    }
293
294    #[fasync::run_singlethreaded(test)]
295    async fn buffer_and_drain_does_not_buffer_past_maximum_size() -> Result<(), Error> {
296        let msg = get_random_string(MAX_LINE_BUFFER_LENGTH + 10);
297        let (stream, tx) = create_logger_stream()?;
298        let (mut logger, rx) = create_datagram_logger()?;
299
300        let (tx_peeks, rx_peeks) = mpsc::channel();
301
302        let () = take_and_write_to_socket(tx, &msg)?;
303        let (actual, ()) = try_join!(read_all_messages(rx), async move {
304            stream
305                .buffer_drain_and_peek(
306                    &mut logger,
307                    Some(move |line: &[u8]| {
308                        tx_peeks.send(line.len()).unwrap();
309                    }),
310                )
311                .await
312                .context("Failed to drain stream")
313        },)?;
314
315        let lengths = rx_peeks.iter().collect::<Vec<_>>();
316
317        assert_eq!(actual.len(), 2);
318        assert_eq!(actual[0], msg[0..MAX_LINE_BUFFER_LENGTH]);
319        assert_eq!(actual[1], msg[MAX_LINE_BUFFER_LENGTH..]);
320
321        assert_eq!(lengths, vec![MAX_LINE_BUFFER_LENGTH, 10]);
322
323        Ok(())
324    }
325
326    #[fasync::run_singlethreaded(test)]
327    async fn buffer_and_drain_dumps_full_buffer_if_no_newline_seen() -> Result<(), Error> {
328        let (stream, tx) = create_logger_stream()?;
329        let (mut logger, rx) = create_datagram_logger()?;
330
331        let ((), ()) = try_join!(
332            async move {
333                let msg = get_random_string(SOCKET_BUFFER_SIZE);
334                // First write up to (SOCKET_BUFFER_SIZE - 1) so that we can
335                // assert that buffer isn't drained prematurely.
336                let () = write_to_socket(&tx, &msg[..SOCKET_BUFFER_SIZE - 1])?;
337
338                // Temporarily convert fasync::Socket back to zx::Socket so that
339                // we can use non-blocking `read` call.
340                let rx = rx.into_zx_socket();
341                let mut buffer = vec![0u8; SOCKET_BUFFER_SIZE];
342                let maybe_bytes_read = rx.read(&mut buffer);
343                assert_eq!(maybe_bytes_read, Err(zx::Status::SHOULD_WAIT));
344
345                // Write last byte
346                let () = write_to_socket(&tx, &msg[SOCKET_BUFFER_SIZE - 1..SOCKET_BUFFER_SIZE])?;
347
348                // Confirm we still didn't write, waiting for newline.
349                let maybe_bytes_read = rx.read(&mut buffer);
350                assert_eq!(maybe_bytes_read, Err(zx::Status::SHOULD_WAIT));
351
352                // Drop socket to unblock the read routine.
353                std::mem::drop(tx);
354
355                // Convert zx::Socket back to fasync::Socket.
356                let mut rx = fasync::Socket::from_socket(rx);
357                let bytes_read =
358                    rx.read(&mut buffer).await.context("Failed to read from socket")?;
359                let msg_written = std::str::from_utf8(&buffer).context("Failed to parse bytes")?;
360
361                assert_eq!(bytes_read, SOCKET_BUFFER_SIZE);
362                assert_eq!(msg_written, msg);
363
364                Ok(())
365            },
366            async move { stream.buffer_and_drain(&mut logger).await.context("Failed to drain stream") },
367        )?;
368
369        Ok(())
370    }
371
372    #[fasync::run_singlethreaded(test)]
373    async fn buffer_and_drain_return_error_if_stream_polls_err() -> Result<(), Error> {
374        let (tx, rx) = zx::Socket::create_stream();
375        // A closed socket should yield an error when stream is polled.
376        let () = rx.half_close()?;
377        let () = tx.half_close()?;
378        let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
379        let (mut logger, _rx) = create_datagram_logger()?;
380
381        let result = stream.buffer_and_drain(&mut logger).await;
382
383        assert_matches!(result, Err(LogError::Read(_)));
384        Ok(())
385    }
386
387    async fn read_all_messages(socket: fasync::Socket) -> Result<Vec<String>, Error> {
388        let mut results = Vec::new();
389        let mut stream = socket.into_datagram_stream();
390        while let Some(bytes) = stream.try_next().await.context("Failed to read socket stream")? {
391            results.push(
392                std::str::from_utf8(&bytes).context("Failed to parse bytes into utf8")?.to_owned(),
393            );
394        }
395
396        Ok(results)
397    }
398
399    fn take_and_write_to_socket(socket: zx::Socket, message: &str) -> Result<(), Error> {
400        write_to_socket(&socket, &message)
401    }
402
403    fn write_to_socket(socket: &zx::Socket, message: &str) -> Result<(), Error> {
404        let bytes_written =
405            socket.write(message.as_bytes()).context("Failed to write to socket")?;
406        match bytes_written == message.len() {
407            true => Ok(()),
408            false => Err(format_err!("Bytes written to socket doesn't match len of message. Message len = {}. Bytes written = {}", message.len(), bytes_written)),
409        }
410    }
411
412    fn create_datagram_logger() -> Result<(SocketLogWriter, fasync::Socket), Error> {
413        let (tx, rx) = zx::Socket::create_datagram();
414        let logger = SocketLogWriter::new(fasync::Socket::from_socket(tx));
415        let rx = fasync::Socket::from_socket(rx);
416        Ok((logger, rx))
417    }
418
419    fn create_logger_stream() -> Result<(LoggerStream, zx::Socket), Error> {
420        let (tx, rx) = zx::Socket::create_stream();
421        let stream = LoggerStream::new(rx).context("Failed to create LoggerStream")?;
422        Ok((stream, tx))
423    }
424
425    fn get_random_string(size: usize) -> String {
426        Alphanumeric.sample_string(&mut thread_rng(), size)
427    }
428}