run_test_suite_lib/output/shell/
writer.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::trace::duration;
6use fuchsia_sync::Mutex;
7use std::io::{Error, Write};
8use std::sync::Arc;
9
10/// A handle around an inner writer. This serves as a "multiplexing" writer that
11/// writes bytes from multiple sources into a single serial destination, typically
12/// to stdout.
13/// Output sent to a handle is buffered until a newline is encountered, then the
14/// buffered output is written to the inner writer.
15/// The handle also supports prepending a prefix to the start of each buffer. This
16/// helps preserve existing behavior where prefixes are added to the start of stdout
17/// and log lines to help a developer understand what produced some output.
18pub(super) struct ShellWriterHandle<W: 'static + Write + Send + Sync> {
19    inner: Arc<Mutex<ShellWriterHandleInner<W>>>,
20    buffer: Vec<u8>,
21    /// Prefix, if any, to prepend to output before writing to the inner writer.
22    prefix: Option<Vec<u8>>,
23    handle_id: u32,
24}
25
26impl<W: 'static + Write + Send + Sync> ShellWriterHandle<W> {
27    const NEWLINE_BYTE: u8 = b'\n';
28    const BUFFER_CAPACITY: usize = 1024;
29
30    /// Create a new handle to a wrapped writer.
31    pub(super) fn new_handle(
32        inner: Arc<Mutex<ShellWriterHandleInner<W>>>,
33        prefix: Option<String>,
34    ) -> Self {
35        let mut lock = inner.lock();
36        let handle_id = lock.num_handles;
37        lock.num_handles += 1;
38        drop(lock);
39        Self {
40            inner,
41            buffer: Vec::with_capacity(Self::BUFFER_CAPACITY),
42            prefix: prefix.map(String::into_bytes),
43            handle_id,
44        }
45    }
46
47    /// Write a full line to the inner writer.
48    fn write_bufs(writer: &mut W, bufs: &[&[u8]]) -> Result<(), Error> {
49        for buf in bufs {
50            writer.write_all(buf)?;
51        }
52        Ok(())
53    }
54}
55
56/// Inner mutable state for |ShellWriterHandle|.
57pub(super) struct ShellWriterHandleInner<W: 'static + Write + Send + Sync> {
58    /// The writer to which all content is passed.
59    writer: W,
60    /// The id of the last handle that wrote to the writer, used to conditionally
61    /// output a prefix only when the handle writing to the output changes.
62    last_writer_id: Option<u32>,
63    /// The number of handles that have been created. Used to assign ids to handles.
64    num_handles: u32,
65}
66
67impl<W: 'static + Write + Send + Sync> ShellWriterHandleInner<W> {
68    pub(super) fn new(writer: W) -> Self {
69        Self { writer, last_writer_id: None, num_handles: 0 }
70    }
71}
72
73/// A handle to a writer contained in a |ShellWriterHandle|. This is exposed for testing
74/// purposes.
75pub struct ShellWriterView<W: 'static + Write + Send + Sync>(Arc<Mutex<ShellWriterHandleInner<W>>>);
76
77impl<W: 'static + Write + Send + Sync> ShellWriterView<W> {
78    pub(super) fn new(inner: Arc<Mutex<ShellWriterHandleInner<W>>>) -> Self {
79        Self(inner)
80    }
81
82    pub fn lock(&self) -> fuchsia_sync::MappedMutexGuard<'_, W> {
83        fuchsia_sync::MutexGuard::map(self.0.lock(), |handle_inner| &mut handle_inner.writer)
84    }
85}
86
87impl<W: 'static + Write + Send + Sync> Write for ShellWriterHandle<W> {
88    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
89        duration!(c"shell_write");
90        // find the last newline in the buffer. In case multiple lines are written as once,
91        // we should write once to the inner writer and add our prefix only once. This helps
92        // avoid spamming the output with prefixes in case many lines are present.
93        let newline_pos = buf
94            .iter()
95            .rev()
96            .position(|byte| *byte == Self::NEWLINE_BYTE)
97            .map(|pos_from_end| buf.len() - pos_from_end - 1);
98        // In case we'd exceed the buffer, just wrte everything, but append a newline to avoid
99        // interspersing.
100        let (final_byte_pos, append_newline) = match newline_pos {
101            // no newline, pushing all to buffer would exceed capacity
102            None if self.buffer.len() + buf.len() > Self::BUFFER_CAPACITY => (buf.len() - 1, true),
103            None => {
104                self.buffer.extend_from_slice(buf);
105                return Ok(buf.len());
106            }
107            // newline exists, but the rest of buf would exceed capacity.
108            Some(pos) if buf.len() - pos > Self::BUFFER_CAPACITY => (buf.len() - 1, true),
109            Some(pos) => (pos, false),
110        };
111
112        let mut inner = self.inner.lock();
113        let last_writer_id = inner.last_writer_id.replace(self.handle_id);
114
115        let mut bufs_to_write = vec![];
116        if let Some(prefix) = self.prefix.as_ref() {
117            if last_writer_id != Some(self.handle_id) {
118                bufs_to_write.push(prefix.as_slice());
119            }
120        }
121        if !self.buffer.is_empty() {
122            bufs_to_write.push(self.buffer.as_slice());
123        }
124        bufs_to_write.push(&buf[..final_byte_pos + 1]);
125        if append_newline {
126            bufs_to_write.push(&[Self::NEWLINE_BYTE]);
127        }
128
129        Self::write_bufs(&mut inner.writer, bufs_to_write.as_slice())?;
130
131        self.buffer.clear();
132        self.buffer.extend_from_slice(&buf[final_byte_pos + 1..]);
133        Ok(buf.len())
134    }
135
136    fn flush(&mut self) -> Result<(), Error> {
137        let mut inner = self.inner.lock();
138        let last_writer_id = inner.last_writer_id.replace(self.handle_id);
139        if !self.buffer.is_empty() {
140            self.buffer.push(Self::NEWLINE_BYTE);
141            let mut bufs_to_write = vec![];
142            if let Some(prefix) = self.prefix.as_ref() {
143                if last_writer_id != Some(self.handle_id) {
144                    bufs_to_write.push(prefix.as_slice());
145                }
146            }
147            bufs_to_write.push(self.buffer.as_slice());
148
149            Self::write_bufs(&mut inner.writer, bufs_to_write.as_slice())?;
150            self.buffer.clear();
151        }
152        inner.writer.flush()
153    }
154}
155
156impl<W: 'static + Write + Send + Sync> std::ops::Drop for ShellWriterHandle<W> {
157    fn drop(&mut self) {
158        let _ = self.flush();
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use std::io::ErrorKind;
166
167    fn create_writer_inner_and_view(
168    ) -> (Arc<Mutex<ShellWriterHandleInner<Vec<u8>>>>, ShellWriterView<Vec<u8>>) {
169        let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(vec![])));
170        (inner.clone(), ShellWriterView(inner))
171    }
172
173    #[fuchsia::test]
174    fn single_handle() {
175        let (handle_inner, output) = create_writer_inner_and_view();
176        let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
177
178        assert_eq!(write_handle.write(b"hello world").unwrap(), b"hello world".len(),);
179        assert!(output.lock().is_empty());
180
181        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len(),);
182        assert_eq!(output.lock().as_slice(), b"hello world\n");
183
184        assert_eq!(write_handle.write(b"flushed output").unwrap(), b"flushed output".len(),);
185        write_handle.flush().unwrap();
186        assert_eq!(output.lock().as_slice(), b"hello world\nflushed output\n");
187    }
188
189    #[fuchsia::test]
190    fn single_handle_with_prefix() {
191        let (handle_inner, output) = create_writer_inner_and_view();
192        let mut write_handle =
193            ShellWriterHandle::new_handle(handle_inner, Some("[prefix] ".to_string()));
194
195        assert_eq!(write_handle.write(b"hello world").unwrap(), b"hello world".len(),);
196        assert!(output.lock().is_empty());
197
198        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len(),);
199        assert_eq!(output.lock().as_slice(), b"[prefix] hello world\n");
200
201        assert_eq!(write_handle.write(b"flushed output").unwrap(), b"flushed output".len(),);
202        write_handle.flush().unwrap();
203        assert_eq!(output.lock().as_slice(), b"[prefix] hello world\nflushed output\n");
204    }
205
206    #[fuchsia::test]
207    fn single_handle_multiple_line() {
208        let (handle_inner, output) = create_writer_inner_and_view();
209        let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
210        const WRITE_BYTES: &[u8] = b"This is a \nmultiline output \nwithout newline termination";
211        assert_eq!(write_handle.write(WRITE_BYTES).unwrap(), WRITE_BYTES.len(),);
212        assert_eq!(output.lock().as_slice(), b"This is a \nmultiline output \n");
213        write_handle.flush().unwrap();
214        assert_eq!(
215            output.lock().as_slice(),
216            b"This is a \nmultiline output \nwithout newline termination\n"
217        );
218        output.lock().clear();
219
220        const TERMINATED_BYTES: &[u8] = b"This is \nnewline terminated \noutput\n";
221        assert_eq!(write_handle.write(TERMINATED_BYTES).unwrap(), TERMINATED_BYTES.len(),);
222        assert_eq!(output.lock().as_slice(), b"This is \nnewline terminated \noutput\n");
223    }
224
225    #[fuchsia::test]
226    fn single_handle_exceed_buffer_in_single_write() {
227        const CAPACITY: usize = ShellWriterHandle::<Vec<u8>>::BUFFER_CAPACITY;
228        // each case consists of a sequence of pairs, where each pair is a string to write, and
229        // the expected output after writing the string.
230        let cases = vec![
231            (
232                "exceed in one write",
233                vec![("a".repeat(CAPACITY + 1), format!("{}\n", "a".repeat(CAPACITY + 1)))],
234            ),
235            (
236                "exceed on second write",
237                vec![
238                    ("a".to_string(), "".to_string()),
239                    ("a".repeat(CAPACITY + 1), format!("{}\n", "a".repeat(CAPACITY + 2))),
240                ],
241            ),
242            (
243                "exceed in one write, with newline",
244                vec![(
245                    format!("\n{}", "a".repeat(CAPACITY + 1)),
246                    format!("\n{}\n", "a".repeat(CAPACITY + 1)),
247                )],
248            ),
249            (
250                "exceed in two writes, with newline",
251                vec![
252                    ("a".to_string(), "".to_string()),
253                    (
254                        format!("\n{}", "a".repeat(CAPACITY + 1)),
255                        format!("a\n{}\n", "a".repeat(CAPACITY + 1)),
256                    ),
257                ],
258            ),
259        ];
260
261        for (case_name, writes) in cases.into_iter() {
262            let (handle_inner, output) = create_writer_inner_and_view();
263            let mut write_handle = ShellWriterHandle::new_handle(handle_inner, None);
264            for (write_no, (to_write, expected)) in writes.into_iter().enumerate() {
265                assert_eq!(
266                    write_handle.write(to_write.as_bytes()).unwrap(),
267                    to_write.as_bytes().len(),
268                    "Got wrong number of bytes written for write {:?} in case {}",
269                    write_no,
270                    case_name
271                );
272                assert_eq!(
273                    String::from_utf8(output.lock().clone()).unwrap(),
274                    expected,
275                    "Buffer contains unexpected contents after write {:?} in case {}",
276                    write_no,
277                    case_name
278                )
279            }
280        }
281    }
282
283    #[fuchsia::test]
284    fn single_handle_with_prefix_multiple_line() {
285        let (handle_inner, output) = create_writer_inner_and_view();
286        let mut write_handle =
287            ShellWriterHandle::new_handle(handle_inner, Some("[prefix] ".to_string()));
288        const WRITE_BYTES: &[u8] = b"This is a \nmultiline output \nwithout newline termination";
289        assert_eq!(write_handle.write(WRITE_BYTES).unwrap(), WRITE_BYTES.len(),);
290        // Note we 'chunk' output in each write to avoid spamming the prefix, so the second
291        // line won't contain the prefix.
292        assert_eq!(output.lock().as_slice(), b"[prefix] This is a \nmultiline output \n");
293        write_handle.flush().unwrap();
294        assert_eq!(
295            output.lock().as_slice(),
296            "[prefix] This is a \nmultiline output \nwithout newline termination\n".as_bytes()
297        );
298
299        const TERMINATED_BYTES: &[u8] = b"This is \nnewline terminated \noutput\n";
300        assert_eq!(write_handle.write(TERMINATED_BYTES).unwrap(), TERMINATED_BYTES.len(),);
301        assert_eq!(
302            output.lock().as_slice(),
303            b"[prefix] This is a \nmultiline output \n\
304            without newline termination\nThis is \nnewline terminated \noutput\n"
305        );
306    }
307
308    #[fuchsia::test]
309    fn multiple_handles() {
310        let (handle_inner, output) = create_writer_inner_and_view();
311        let mut handle_1 =
312            ShellWriterHandle::new_handle(handle_inner.clone(), Some("[1] ".to_string()));
313        let mut handle_2 = ShellWriterHandle::new_handle(handle_inner, Some("[2] ".to_string()));
314
315        write!(handle_1, "hi from 1").unwrap();
316        write!(handle_2, "hi from 2").unwrap();
317        assert!(output.lock().is_empty());
318        write!(handle_1, "\n").unwrap();
319        assert_eq!(output.lock().as_slice(), "[1] hi from 1\n".as_bytes());
320        write!(handle_2, "\n").unwrap();
321        assert_eq!(output.lock().as_slice(), "[1] hi from 1\n[2] hi from 2\n".as_bytes());
322    }
323
324    // The following tests verify behavior of the shell writer when the inner writer
325    // exhibits some allowed edge cases.
326
327    #[fuchsia::test]
328    fn output_written_when_inner_writer_writes_partial_buffer() {
329        /// A writer that writes at most 3 bytes at a time.
330        struct PartialOutputWriter(Vec<u8>);
331        impl Write for PartialOutputWriter {
332            fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
333                if buf.len() >= 3 {
334                    self.0.write(&buf[..3])
335                } else {
336                    self.0.write(buf)
337                }
338            }
339
340            fn flush(&mut self) -> Result<(), Error> {
341                self.0.flush()
342            }
343        }
344
345        let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(PartialOutputWriter(vec![]))));
346        let output = ShellWriterView(inner.clone());
347        let mut write_handle =
348            ShellWriterHandle::new_handle(inner.clone(), Some("[prefix] ".to_string()));
349        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
350        assert!(output.lock().0.is_empty());
351        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
352        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\n");
353
354        let mut write_handle_2 =
355            ShellWriterHandle::new_handle(inner, Some("[prefix2] ".to_string()));
356
357        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
358        assert_eq!(write_handle_2.write(b"world").unwrap(), b"world".len());
359        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\n");
360        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
361        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\nhello\n");
362        assert_eq!(write_handle_2.write(b"\n").unwrap(), b"\n".len());
363        assert_eq!(output.lock().0.as_slice(), b"[prefix] hello\nhello\n[prefix2] world\n");
364    }
365
366    #[fuchsia::test]
367    fn output_written_when_inner_writer_returns_interrupted() {
368        /// A writer that returns interrupted on the first write attempt
369        struct InterruptWriter {
370            buf: Vec<u8>,
371            returned_interrupt: bool,
372        }
373        impl Write for InterruptWriter {
374            fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
375                if !self.returned_interrupt {
376                    self.returned_interrupt = true;
377                    Err(ErrorKind::Interrupted.into())
378                } else {
379                    self.buf.write(buf)
380                }
381            }
382
383            fn flush(&mut self) -> Result<(), Error> {
384                self.buf.flush()
385            }
386        }
387
388        let inner = Arc::new(Mutex::new(ShellWriterHandleInner::new(InterruptWriter {
389            buf: vec![],
390            returned_interrupt: false,
391        })));
392        let output = ShellWriterView(inner.clone());
393        let mut write_handle =
394            ShellWriterHandle::new_handle(inner.clone(), Some("[prefix] ".to_string()));
395        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
396        assert!(output.lock().buf.is_empty());
397        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
398        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\n");
399
400        let mut write_handle_2 =
401            ShellWriterHandle::new_handle(inner.clone(), Some("[prefix2] ".to_string()));
402
403        assert_eq!(write_handle.write(b"hello").unwrap(), b"hello".len());
404        assert_eq!(write_handle_2.write(b"world").unwrap(), b"world".len());
405        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\n");
406        assert_eq!(write_handle.write(b"\n").unwrap(), b"\n".len());
407        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\nhello\n");
408        assert_eq!(write_handle_2.write(b"\n").unwrap(), b"\n".len());
409        assert_eq!(output.lock().buf.as_slice(), b"[prefix] hello\nhello\n[prefix2] world\n");
410    }
411}