archivist_lib/logs/
serial.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::logs::repository::LogsRepository;
6use anyhow::Error;
7use diagnostics_data::{Data, Logs};
8use fidl_fuchsia_diagnostics::{Selector, StreamMode};
9use fuchsia_trace as ftrace;
10use futures::StreamExt;
11use log::warn;
12use selectors::FastError;
13use std::borrow::Cow;
14use std::collections::HashSet;
15use std::fmt::Display;
16use std::io::{self, Write};
17use std::sync::Arc;
18
19const MAX_SERIAL_WRITE_SIZE: usize = 256;
20
21#[derive(Default)]
22pub struct SerialConfig {
23    selectors: Vec<Selector>,
24    denied_tags: HashSet<String>,
25}
26
27impl SerialConfig {
28    /// Creates a new serial configuration from the given structured config values.
29    pub fn new<C, T>(allowed_components: Vec<C>, denied_tags: Vec<T>) -> Self
30    where
31        C: AsRef<str> + Display,
32        T: Into<String>,
33    {
34        let selectors = allowed_components
35            .into_iter()
36            .filter_map(|selector| {
37                match selectors::parse_component_selector::<FastError>(selector.as_ref()) {
38                    Ok(s) => Some(Selector {
39                        component_selector: Some(s),
40                        tree_selector: None,
41                        ..Selector::default()
42                    }),
43                    Err(err) => {
44                        warn!(selector:%, err:?; "Failed to parse component selector");
45                        None
46                    }
47                }
48            })
49            .collect();
50        Self { selectors, denied_tags: HashSet::from_iter(denied_tags.into_iter().map(Into::into)) }
51    }
52
53    /// Returns a future that resolves when there's no more logs to write to serial. This can only
54    /// happen when all log sink connections have been closed for the components that were
55    /// configured to emit logs.
56    pub async fn write_logs<S: Write>(self, repo: Arc<LogsRepository>, mut sink: S) {
57        let Self { denied_tags, selectors } = self;
58        let mut log_stream = repo.logs_cursor(
59            StreamMode::SnapshotThenSubscribe,
60            Some(selectors),
61            ftrace::Id::random(),
62        );
63        while let Some(log) = log_stream.next().await {
64            SerialWriter::log(log.as_ref(), &denied_tags, &mut sink).ok();
65        }
66    }
67}
68
69/// A sink to write to serial. This Write implementation must be used together with SerialWriter.
70#[derive(Default)]
71pub struct SerialSink;
72
73impl Write for SerialSink {
74    fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
75        if cfg!(debug_assertions) {
76            debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
77        } else {
78            use std::sync::atomic::{AtomicBool, Ordering};
79            static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
80            if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
81            {
82                let size = buffer.len();
83                log::error!(
84                    size;
85                    "Skipping write to serial due to internal error. Exceeded max buffer size."
86                );
87                return Ok(buffer.len());
88            }
89        }
90        // SAFETY: calling a syscall. We pass a pointer to the buffer and its exact size.
91        unsafe {
92            zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
93        }
94        Ok(buffer.len())
95    }
96
97    fn flush(&mut self) -> io::Result<()> {
98        Ok(())
99    }
100}
101
102struct SerialWriter<'a, S> {
103    buffer: Vec<u8>,
104    denied_tags: &'a HashSet<String>,
105    sink: &'a mut S,
106}
107
108impl<S: Write> Write for SerialWriter<'_, S> {
109    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
110        // -1 since we always write a `\n` when flushing.
111        let count = (self.buffer.capacity() - self.buffer.len() - 1).min(data.len());
112        let actual_count = self.buffer.write(&data[..count])?;
113        debug_assert_eq!(actual_count, count);
114        if self.buffer.len() == self.buffer.capacity() - 1 {
115            self.flush()?;
116        }
117        Ok(actual_count)
118    }
119
120    fn flush(&mut self) -> io::Result<()> {
121        debug_assert!(self.buffer.len() < MAX_SERIAL_WRITE_SIZE);
122        let wrote = self.buffer.write(b"\n")?;
123        debug_assert_eq!(wrote, 1);
124        self.sink.write_all(self.buffer.as_slice())?;
125        self.buffer.clear();
126        Ok(())
127    }
128}
129
130impl<'a, S: Write> SerialWriter<'a, S> {
131    fn log(
132        log: &Data<Logs>,
133        denied_tags: &'a HashSet<String>,
134        sink: &'a mut S,
135    ) -> Result<(), Error> {
136        let mut this =
137            Self { buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE), sink, denied_tags };
138        write!(
139            &mut this,
140            "[{:05}.{:03}] {:05}:{:05}> [",
141            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
142            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
143                % 1000,
144            log.pid().unwrap_or(0),
145            log.tid().unwrap_or(0)
146        )?;
147
148        let empty_tags = log.tags().map(|tags| tags.is_empty()).unwrap_or(true);
149        if empty_tags {
150            write!(&mut this, "{}", log.component_name())?;
151        } else {
152            // Unwrap is safe, if we are here it means that we actually have tags.
153            let tags = log.tags().unwrap();
154            for (i, tag) in tags.iter().enumerate() {
155                if this.denied_tags.contains(tag) {
156                    return Ok(());
157                }
158                write!(&mut this, "{}", tag)?;
159                if i < tags.len() - 1 {
160                    write!(&mut this, ", ")?;
161                }
162            }
163        }
164
165        write!(&mut this, "] {}: ", log.severity())?;
166        let mut pending_message_parts = [Cow::Borrowed(log.msg().unwrap_or(""))]
167            .into_iter()
168            .chain(log.payload_keys_strings().map(|s| Cow::Owned(format!(" {}", s))));
169        let mut pending_str = None;
170
171        loop {
172            let (data, offset) = match pending_str.take() {
173                Some((s, offset)) => (s, offset),
174                None => match pending_message_parts.next() {
175                    Some(s) => (s, 0),
176                    None => break,
177                },
178            };
179            let count = this.write(&data.as_bytes()[offset..])?;
180            if offset + count < data.len() {
181                pending_str = Some((data, offset + count));
182            }
183        }
184        if !this.buffer.is_empty() {
185            this.flush()?;
186        }
187        Ok(())
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::identity::ComponentIdentity;
195    use crate::logs::testing::make_message;
196    use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
197    use fuchsia_async as fasync;
198    use futures::channel::mpsc;
199    use moniker::ExtendedMoniker;
200    use zx::BootInstant;
201
202    struct TestSink {
203        snd: mpsc::UnboundedSender<String>,
204    }
205
206    impl TestSink {
207        fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
208            let (snd, rcv) = mpsc::unbounded();
209            (Self { snd }, rcv)
210        }
211    }
212
213    impl Write for TestSink {
214        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
215            let string = String::from_utf8(buf.to_vec()).expect("wrote valid utf8");
216            self.snd.unbounded_send(string).expect("sent item");
217            Ok(buf.len())
218        }
219
220        fn flush(&mut self) -> io::Result<()> {
221            Ok(())
222        }
223    }
224
225    #[fuchsia::test]
226    fn write_to_serial_handles_denied_tags() {
227        let log = LogsDataBuilder::new(BuilderArgs {
228            timestamp: BootInstant::from_nanos(1),
229            component_url: Some("url".into()),
230            moniker: "core/foo".try_into().unwrap(),
231            severity: Severity::Info,
232        })
233        .add_tag("denied-tag")
234        .build();
235        let denied_tags = HashSet::from_iter(["denied-tag".to_string()]);
236        let mut sink = Vec::new();
237        SerialWriter::log(&log, &denied_tags, &mut sink).expect("write succeeded");
238        assert!(sink.is_empty());
239    }
240
241    #[fuchsia::test]
242    fn write_to_serial_splits_lines() {
243        let message = concat!(
244            "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
245            "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
246            "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
247        );
248        let log = LogsDataBuilder::new(BuilderArgs {
249            timestamp: BootInstant::from_nanos(123456789),
250            component_url: Some("url".into()),
251            moniker: "core/foo".try_into().unwrap(),
252            severity: Severity::Info,
253        })
254        .add_tag("bar")
255        .set_message(message)
256        .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
257        .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
258        .set_pid(1234)
259        .set_tid(5678)
260        .build();
261        let mut sink = Vec::new();
262        SerialWriter::log(&log, &HashSet::new(), &mut sink).expect("write succeeded");
263        assert_eq!(
264            String::from_utf8(sink).unwrap(),
265            format!(
266                "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
267                &message[..218],
268                &message[218..]
269            )
270        );
271    }
272
273    #[fuchsia::test]
274    fn when_no_tags_are_present_the_component_name_is_used() {
275        let log = LogsDataBuilder::new(BuilderArgs {
276            timestamp: BootInstant::from_nanos(123456789),
277            component_url: Some("url".into()),
278            moniker: "core/foo".try_into().unwrap(),
279            severity: Severity::Info,
280        })
281        .set_message("my msg")
282        .set_pid(1234)
283        .set_tid(5678)
284        .build();
285        let mut sink = Vec::new();
286        SerialWriter::log(&log, &HashSet::new(), &mut sink).expect("write succeeded");
287        assert_eq!(
288            String::from_utf8(sink).unwrap(),
289            "[00000.123] 01234:05678> [foo] INFO: my msg\n"
290        );
291    }
292
293    #[fuchsia::test]
294    async fn writes_ingested_logs() {
295        let serial_config = SerialConfig::new(vec!["bootstrap/**", "/core/foo"], vec!["foo"]);
296        let repo = LogsRepository::for_test(fasync::Scope::new());
297
298        let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
299            ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
300            "fuchsia-pkg://bootstrap-foo",
301        )));
302        let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
303            ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
304            "fuchsia-pkg://bootstrap-bar",
305        )));
306
307        let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
308            ExtendedMoniker::parse_str("./core/foo").unwrap(),
309            "fuchsia-pkg://core-foo",
310        )));
311        let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
312            ExtendedMoniker::parse_str("./core/baz").unwrap(),
313            "fuchsia-pkg://core-baz",
314        )));
315
316        bootstrap_foo_container.ingest_message(make_message(
317            "a",
318            None,
319            zx::BootInstant::from_nanos(1),
320        ));
321        core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
322        let (sink, rcv) = TestSink::new();
323        let _serial_task = fasync::Task::spawn(serial_config.write_logs(Arc::clone(&repo), sink));
324        bootstrap_bar_container.ingest_message(make_message(
325            "b",
326            Some("foo"),
327            zx::BootInstant::from_nanos(3),
328        ));
329        core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
330
331        let received = rcv.take(2).collect::<Vec<_>>().await;
332
333        // We must see the logs emitted before we installed the serial listener and after. We must
334        // not see the log from /core/baz and we must not see the log from bootstrap/bar with tag
335        // "foo".
336        assert_eq!(
337            received,
338            vec![
339                "[00000.000] 00001:00002> [foo] DEBUG: a\n",
340                "[00000.000] 00001:00002> [foo] DEBUG: c\n"
341            ]
342        );
343    }
344}