archivist_lib/logs/
serial.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::logs::repository::LogsRepository;
6use crate::logs::servers::LogFreezeServer;
7use anyhow::Error;
8use diagnostics_data::{Data, Logs};
9use fidl_fuchsia_diagnostics::{Selector, StreamMode};
10use fidl_fuchsia_diagnostics_system::SerialLogControlRequestStream;
11use fuchsia_async::OnSignals;
12use fuchsia_trace as ftrace;
13use futures::channel::mpsc::UnboundedReceiver;
14use futures::future::{select, Either};
15use futures::{FutureExt, StreamExt};
16use log::warn;
17use selectors::FastError;
18use std::borrow::Cow;
19use std::collections::HashSet;
20use std::fmt::Display;
21use std::io::{self, Write};
22use std::pin::pin;
23use std::sync::Arc;
24use zx::Signals;
25
26const MAX_SERIAL_WRITE_SIZE: usize = 256;
27
28/// Function that forwards logs from Archivist
29/// to the serial port. Logs will be filtered by allow_serial_log_tags
30/// to include logs in the serial output, and deny_serial_log_tags to exclude specific tags.
31pub async fn launch_serial(
32    allow_serial_log_tags: Vec<String>,
33    deny_serial_log_tags: Vec<String>,
34    logs_repo: Arc<LogsRepository>,
35    writer: impl Write,
36    mut freeze_receiver: UnboundedReceiver<SerialLogControlRequestStream>,
37) {
38    let mut write_logs_to_serial =
39        pin!(SerialConfig::new(allow_serial_log_tags, deny_serial_log_tags)
40            .write_logs(logs_repo, writer)
41            .fuse());
42    loop {
43        let log_freezer_future = pin!(async {
44            // Wait for FDIO to give us the channel
45            let stream = (freeze_receiver.next().await)?;
46            let (client, server) = zx::EventPair::create();
47            // Acquire the lock, and send the token.
48            LogFreezeServer::new(client).wait_for_client_freeze_request(stream).await;
49            Some(server)
50        }
51        .fuse());
52        let maybe_frozen_token = select(&mut write_logs_to_serial, log_freezer_future).await;
53        if let Either::Right((Some(token), _)) = maybe_frozen_token {
54            // Lock acquired, wait for it to be released before doing anything else.
55            // Ignore any errors, as we may either get PEER_CLOSED as an error or signal.
56            let _ = OnSignals::new(&token, Signals::EVENTPAIR_PEER_CLOSED).await;
57        } else {
58            // Serial writer exited, no work left to do.
59            break;
60        }
61    }
62}
63
64#[derive(Default)]
65pub struct SerialConfig {
66    selectors: Vec<Selector>,
67    denied_tags: HashSet<String>,
68}
69
70impl SerialConfig {
71    /// Creates a new serial configuration from the given structured config values.
72    pub fn new<C, T>(allowed_components: Vec<C>, denied_tags: Vec<T>) -> Self
73    where
74        C: AsRef<str> + Display,
75        T: Into<String>,
76    {
77        let selectors = allowed_components
78            .into_iter()
79            .filter_map(|selector| {
80                match selectors::parse_component_selector::<FastError>(selector.as_ref()) {
81                    Ok(s) => Some(Selector {
82                        component_selector: Some(s),
83                        tree_selector: None,
84                        ..Selector::default()
85                    }),
86                    Err(err) => {
87                        warn!(selector:%, err:?; "Failed to parse component selector");
88                        None
89                    }
90                }
91            })
92            .collect();
93        Self { selectors, denied_tags: HashSet::from_iter(denied_tags.into_iter().map(Into::into)) }
94    }
95
96    /// Returns a future that resolves when there's no more logs to write to serial. This can only
97    /// happen when all log sink connections have been closed for the components that were
98    /// configured to emit logs.
99    pub async fn write_logs<S: Write>(self, repo: Arc<LogsRepository>, mut sink: S) {
100        let Self { denied_tags, selectors } = self;
101        let mut log_stream = repo.logs_cursor(
102            StreamMode::SnapshotThenSubscribe,
103            Some(selectors),
104            ftrace::Id::random(),
105        );
106        while let Some(log) = log_stream.next().await {
107            SerialWriter::log(log.as_ref(), &denied_tags, &mut sink).ok();
108        }
109    }
110}
111
112/// A sink to write to serial. This Write implementation must be used together with SerialWriter.
113#[derive(Default)]
114pub struct SerialSink;
115
116impl Write for SerialSink {
117    fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
118        if cfg!(debug_assertions) {
119            debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
120        } else {
121            use std::sync::atomic::{AtomicBool, Ordering};
122            static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
123            if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
124            {
125                let size = buffer.len();
126                log::error!(
127                    size;
128                    "Skipping write to serial due to internal error. Exceeded max buffer size."
129                );
130                return Ok(buffer.len());
131            }
132        }
133        // SAFETY: calling a syscall. We pass a pointer to the buffer and its exact size.
134        unsafe {
135            zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
136        }
137        Ok(buffer.len())
138    }
139
140    fn flush(&mut self) -> io::Result<()> {
141        Ok(())
142    }
143}
144
145struct SerialWriter<'a, S> {
146    buffer: Vec<u8>,
147    denied_tags: &'a HashSet<String>,
148    sink: &'a mut S,
149}
150
151impl<S: Write> Write for SerialWriter<'_, S> {
152    fn write(&mut self, data: &[u8]) -> io::Result<usize> {
153        // -1 since we always write a `\n` when flushing.
154        let count = (self.buffer.capacity() - self.buffer.len() - 1).min(data.len());
155        let actual_count = self.buffer.write(&data[..count])?;
156        debug_assert_eq!(actual_count, count);
157        if self.buffer.len() == self.buffer.capacity() - 1 {
158            self.flush()?;
159        }
160        Ok(actual_count)
161    }
162
163    fn flush(&mut self) -> io::Result<()> {
164        debug_assert!(self.buffer.len() < MAX_SERIAL_WRITE_SIZE);
165        let wrote = self.buffer.write(b"\n")?;
166        debug_assert_eq!(wrote, 1);
167        self.sink.write_all(self.buffer.as_slice())?;
168        self.buffer.clear();
169        Ok(())
170    }
171}
172
173impl<'a, S: Write> SerialWriter<'a, S> {
174    fn log(
175        log: &Data<Logs>,
176        denied_tags: &'a HashSet<String>,
177        sink: &'a mut S,
178    ) -> Result<(), Error> {
179        let mut this =
180            Self { buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE), sink, denied_tags };
181        write!(
182            &mut this,
183            "[{:05}.{:03}] {:05}:{:05}> [",
184            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
185            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
186                % 1000,
187            log.pid().unwrap_or(0),
188            log.tid().unwrap_or(0)
189        )?;
190
191        let empty_tags = log.tags().map(|tags| tags.is_empty()).unwrap_or(true);
192        if empty_tags {
193            write!(&mut this, "{}", log.component_name())?;
194        } else {
195            // Unwrap is safe, if we are here it means that we actually have tags.
196            let tags = log.tags().unwrap();
197            for (i, tag) in tags.iter().enumerate() {
198                if this.denied_tags.contains(tag) {
199                    return Ok(());
200                }
201                write!(&mut this, "{tag}")?;
202                if i < tags.len() - 1 {
203                    write!(&mut this, ", ")?;
204                }
205            }
206        }
207
208        write!(&mut this, "] {}: ", log.severity())?;
209        let mut pending_message_parts = [Cow::Borrowed(log.msg().unwrap_or(""))]
210            .into_iter()
211            .chain(log.payload_keys_strings().map(|s| Cow::Owned(format!(" {s}"))));
212        let mut pending_str = None;
213
214        loop {
215            let (data, offset) = match pending_str.take() {
216                Some((s, offset)) => (s, offset),
217                None => match pending_message_parts.next() {
218                    Some(s) => (s, 0),
219                    None => break,
220                },
221            };
222            let count = this.write(&data.as_bytes()[offset..])?;
223            if offset + count < data.len() {
224                pending_str = Some((data, offset + count));
225            }
226        }
227        if !this.buffer.is_empty() {
228            this.flush()?;
229        }
230        Ok(())
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use fidl::endpoints::create_proxy_and_stream;
237    use fidl_fuchsia_diagnostics_system::SerialLogControlMarker;
238    use fuchsia_async::{self as fasync};
239    use futures::channel::mpsc::{self, unbounded};
240    use futures::SinkExt;
241    use std::future::{poll_fn, Future};
242    use std::task::Poll;
243
244    use super::*;
245    use crate::identity::ComponentIdentity;
246    use crate::logs::testing::make_message;
247    use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
248    use futures::FutureExt;
249    use moniker::ExtendedMoniker;
250    use std::pin::pin;
251    use zx::BootInstant;
252
253    struct TestSink {
254        snd: mpsc::UnboundedSender<String>,
255    }
256
257    impl TestSink {
258        fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
259            let (snd, rcv) = mpsc::unbounded();
260            (Self { snd }, rcv)
261        }
262    }
263
264    impl Write for TestSink {
265        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
266            let string = String::from_utf8(buf.to_vec()).expect("wrote valid utf8");
267            self.snd.unbounded_send(string).expect("sent item");
268            Ok(buf.len())
269        }
270
271        fn flush(&mut self) -> io::Result<()> {
272            Ok(())
273        }
274    }
275
276    #[fuchsia::test]
277    fn write_to_serial_handles_denied_tags() {
278        let log = LogsDataBuilder::new(BuilderArgs {
279            timestamp: BootInstant::from_nanos(1),
280            component_url: Some("url".into()),
281            moniker: "core/foo".try_into().unwrap(),
282            severity: Severity::Info,
283        })
284        .add_tag("denied-tag")
285        .build();
286        let denied_tags = HashSet::from_iter(["denied-tag".to_string()]);
287        let mut sink = Vec::new();
288        SerialWriter::log(&log, &denied_tags, &mut sink).expect("write succeeded");
289        assert!(sink.is_empty());
290    }
291
292    #[fuchsia::test]
293    fn write_to_serial_splits_lines() {
294        let message = concat!(
295            "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
296            "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
297            "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
298        );
299        let log = LogsDataBuilder::new(BuilderArgs {
300            timestamp: BootInstant::from_nanos(123456789),
301            component_url: Some("url".into()),
302            moniker: "core/foo".try_into().unwrap(),
303            severity: Severity::Info,
304        })
305        .add_tag("bar")
306        .set_message(message)
307        .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
308        .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
309        .set_pid(1234)
310        .set_tid(5678)
311        .build();
312        let mut sink = Vec::new();
313        SerialWriter::log(&log, &HashSet::new(), &mut sink).expect("write succeeded");
314        assert_eq!(
315            String::from_utf8(sink).unwrap(),
316            format!(
317                "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
318                &message[..218],
319                &message[218..]
320            )
321        );
322    }
323
324    #[fuchsia::test]
325    fn when_no_tags_are_present_the_component_name_is_used() {
326        let log = LogsDataBuilder::new(BuilderArgs {
327            timestamp: BootInstant::from_nanos(123456789),
328            component_url: Some("url".into()),
329            moniker: "core/foo".try_into().unwrap(),
330            severity: Severity::Info,
331        })
332        .set_message("my msg")
333        .set_pid(1234)
334        .set_tid(5678)
335        .build();
336        let mut sink = Vec::new();
337        SerialWriter::log(&log, &HashSet::new(), &mut sink).expect("write succeeded");
338        assert_eq!(
339            String::from_utf8(sink).unwrap(),
340            "[00000.123] 01234:05678> [foo] INFO: my msg\n"
341        );
342    }
343
344    async fn poll_once<F: Future + Unpin>(mut future: F) {
345        poll_fn(|context| {
346            let _ = future.poll_unpin(context);
347            Poll::Ready(())
348        })
349        .await;
350    }
351
352    #[fuchsia::test]
353    async fn pauses_logs_correctly() {
354        let repo = LogsRepository::for_test(fasync::Scope::new());
355
356        let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
357            ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
358            "fuchsia-pkg://bootstrap-foo",
359        )));
360        let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
361            ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
362            "fuchsia-pkg://bootstrap-bar",
363        )));
364
365        let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
366            ExtendedMoniker::parse_str("./core/foo").unwrap(),
367            "fuchsia-pkg://core-foo",
368        )));
369        let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
370            ExtendedMoniker::parse_str("./core/baz").unwrap(),
371            "fuchsia-pkg://core-baz",
372        )));
373
374        bootstrap_foo_container.ingest_message(make_message(
375            "a",
376            None,
377            zx::BootInstant::from_nanos(1),
378        ));
379
380        core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
381        let (sink, mut rcv) = TestSink::new();
382        let cloned_repo = Arc::clone(&repo);
383        let (mut sender, receiver) = unbounded();
384        let mut serial_task = pin!(async move {
385            let allowed = vec!["bootstrap/**".into(), "/core/foo".into()];
386            let denied = vec!["foo".into()];
387            launch_serial(allowed, denied, cloned_repo, sink, receiver).await;
388        }
389        .fuse());
390        bootstrap_bar_container.ingest_message(make_message(
391            "b",
392            Some("foo"),
393            zx::BootInstant::from_nanos(3),
394        ));
395
396        poll_once(&mut serial_task).await;
397        let received = rcv.next().now_or_never().unwrap().unwrap();
398
399        assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: a\n");
400
401        let (client, server) = create_proxy_and_stream::<SerialLogControlMarker>();
402        sender.send(server).await.unwrap();
403        let freeze_token = futures::select! {
404            _ = serial_task => None,
405            token = client.freeze_serial_forwarding().fuse() => Some(token),
406        }
407        .unwrap();
408        core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
409        let received_future = rcv.next();
410        poll_once(&mut serial_task).await;
411
412        assert!(received_future.now_or_never().is_none());
413        drop(freeze_token);
414        poll_once(&mut serial_task).await;
415        let received = rcv.next().now_or_never().unwrap().unwrap();
416
417        assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: c\n");
418    }
419
420    #[fuchsia::test]
421    async fn writes_ingested_logs() {
422        let serial_config = SerialConfig::new(vec!["bootstrap/**", "/core/foo"], vec!["foo"]);
423        let repo = LogsRepository::for_test(fasync::Scope::new());
424
425        let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
426            ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
427            "fuchsia-pkg://bootstrap-foo",
428        )));
429        let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
430            ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
431            "fuchsia-pkg://bootstrap-bar",
432        )));
433
434        let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
435            ExtendedMoniker::parse_str("./core/foo").unwrap(),
436            "fuchsia-pkg://core-foo",
437        )));
438        let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
439            ExtendedMoniker::parse_str("./core/baz").unwrap(),
440            "fuchsia-pkg://core-baz",
441        )));
442
443        bootstrap_foo_container.ingest_message(make_message(
444            "a",
445            None,
446            zx::BootInstant::from_nanos(1),
447        ));
448        core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
449        let (sink, rcv) = TestSink::new();
450        let mut serial_task = pin!(serial_config.write_logs(Arc::clone(&repo), sink));
451        bootstrap_bar_container.ingest_message(make_message(
452            "b",
453            Some("foo"),
454            zx::BootInstant::from_nanos(3),
455        ));
456        core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
457        poll_fn(|context| loop {
458            if Poll::Pending == serial_task.poll_unpin(context) {
459                return Poll::Ready(());
460            }
461        })
462        .await;
463        let received = rcv.take(2).collect::<Vec<_>>().now_or_never().unwrap();
464
465        // We must see the logs emitted before we installed the serial listener and after. We must
466        // not see the log from /core/baz and we must not see the log from bootstrap/bar with tag
467        // "foo".
468        assert_eq!(
469            received,
470            vec![
471                "[00000.000] 00001:00002> [foo] DEBUG: a\n",
472                "[00000.000] 00001:00002> [foo] DEBUG: c\n"
473            ]
474        );
475    }
476}