archivist_lib/logs/
serial.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::logs::repository::LogsRepository;
6use diagnostics_data::{Data, Logs};
7use fidl_fuchsia_diagnostics::{Selector, StreamMode};
8use fuchsia_async::OnSignals;
9use fuchsia_trace as ftrace;
10use futures::channel::mpsc::UnboundedReceiver;
11use futures::channel::{mpsc, oneshot};
12use futures::executor::block_on;
13use futures::{FutureExt, StreamExt, select};
14use log::warn;
15use selectors::FastError;
16use std::collections::HashSet;
17use std::io::{self, Write};
18use std::sync::Arc;
19use std::{mem, thread};
20use zx::Signals;
21
22pub const MAX_SERIAL_WRITE_SIZE: usize = 256;
23
24/// Function that forwards logs from Archivist to the serial port. Logs will be filtered by
25/// `allow_serial_log_tags` to include logs in the serial output, and `deny_serial_log_tags` to
26/// exclude specific tags.
27pub async fn launch_serial(
28    allow_serial_log_tags: impl IntoIterator<Item = impl AsRef<str>>,
29    deny_serial_log_tags: impl IntoIterator<Item = impl ToString>,
30    logs_repo: Arc<LogsRepository>,
31    sink: impl Write + Send + 'static,
32    mut freeze_receiver: mpsc::UnboundedReceiver<oneshot::Sender<zx::EventPair>>,
33    mut flush_receiver: UnboundedReceiver<oneshot::Sender<()>>,
34) {
35    let mut writer =
36        SerialWriter::new(sink, deny_serial_log_tags.into_iter().map(|s| s.to_string()).collect());
37
38    let mut barrier = writer.get_barrier();
39    let mut log_stream = logs_repo
40        .logs_cursor(
41            StreamMode::SnapshotThenSubscribe,
42            Some(selectors_from_tags(allow_serial_log_tags)),
43            ftrace::Id::random(),
44        )
45        .fuse();
46    loop {
47        select! {
48            log = log_stream.next() => {
49                if let Some(log) = log {
50                    // We've received a log
51                    writer.log(&log).await;
52                } else {
53                    // We've hit the end of the log stream and Archivist is shutting down.
54                    break;
55                }
56            }
57            freeze_request = freeze_receiver.next() => {
58                if let Some(request) = freeze_request {
59                    // We must use the barrier before we send back the event.
60                    barrier.wait().await;
61                    let (client, server) = zx::EventPair::create();
62                    let _ = request.send(client);
63                    let _ = OnSignals::new(&server, Signals::EVENTPAIR_PEER_CLOSED).await;
64                }
65            }
66            flush_request = flush_receiver.next() => {
67                if let Some(flush_request) = flush_request {
68                    // We have a background thread that polls sockets.  Ensure the background thread
69                    // is polled first before we write the remaining logs to serial to ensure we
70                    // capture all logs.
71                    logs_repo.flush().await;
72
73                    // Write all pending logs to serial
74                    while let Some(Some(log)) = log_stream.next().now_or_never() {
75                        writer.log(&log).await;
76                    }
77
78                    // Flush the serial thread (ensuring everything goes to serial)
79                    writer.get_barrier().wait().await;
80
81                    // Reply to the flush request and continue normal logging operations.  Ignore
82                    // the error, because the channel may have been closed which is OK.
83                    let _ = flush_request.send(());
84                }
85            }
86        }
87    }
88    // Ensure logs are flushed before we finish.
89    writer.get_barrier().wait().await;
90}
91
92fn selectors_from_tags(tags: impl IntoIterator<Item = impl AsRef<str>>) -> Vec<Selector> {
93    tags.into_iter()
94        .filter_map(|selector| {
95            let selector = selector.as_ref();
96            match selectors::parse_component_selector::<FastError>(selector) {
97                Ok(s) => Some(Selector {
98                    component_selector: Some(s),
99                    tree_selector: None,
100                    ..Selector::default()
101                }),
102                Err(err) => {
103                    warn!(selector:%, err:?; "Failed to parse component selector");
104                    None
105                }
106            }
107        })
108        .collect()
109}
110
111/// A sink to write to serial. This Write implementation must be used together with SerialWriter.
112#[derive(Default)]
113pub struct SerialSink;
114
115impl Write for SerialSink {
116    fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
117        if cfg!(debug_assertions) {
118            debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
119        } else {
120            use std::sync::atomic::{AtomicBool, Ordering};
121            static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
122            if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
123            {
124                let size = buffer.len();
125                log::error!(
126                    size;
127                    "Skipping write to serial due to internal error. Exceeded max buffer size."
128                );
129                return Ok(buffer.len());
130            }
131        }
132        // SAFETY: calling a syscall. We pass a pointer to the buffer and its exact size.
133        unsafe {
134            zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
135        }
136        Ok(buffer.len())
137    }
138
139    fn flush(&mut self) -> io::Result<()> {
140        Ok(())
141    }
142}
143
144/// An enum to represent commands sent to the worker thread.
145enum WorkerCommand {
146    /// Write the given bytes.
147    Write(Vec<u8>),
148    /// Flush all pending writes and notify when done.
149    Flush(oneshot::Sender<()>),
150}
151
152/// A writer that provides an async interface to a synchronous, blocking write operation.
153///
154/// It spawns a dedicated thread to handle the synchronous writes, allowing the async
155/// context to remain non-blocking.
156struct SerialWriter {
157    denied_tags: HashSet<String>,
158    sender: mpsc::UnboundedSender<WorkerCommand>,
159    buffers: mpsc::UnboundedReceiver<Vec<u8>>,
160    current_buffer: Vec<u8>,
161}
162
163impl SerialWriter {
164    /// Creates a new Writer and spawns a worker thread.
165    fn new<S: Write + Send + 'static>(mut sink: S, denied_tags: HashSet<String>) -> Self {
166        let (sender, mut receiver) = mpsc::unbounded();
167        let (buffer_sender, buffers) = mpsc::unbounded();
168
169        // Limit to 32 buffers (one comes from `current_buffer`).
170        for _ in 0..31 {
171            buffer_sender.unbounded_send(Vec::with_capacity(MAX_SERIAL_WRITE_SIZE)).unwrap();
172        }
173
174        thread::spawn(move || {
175            block_on(async {
176                while let Some(command) = receiver.next().await {
177                    match command {
178                        WorkerCommand::Write(bytes) => {
179                            // Ignore errors.
180                            let _ = sink.write(&bytes);
181                            // Send the buffer back.
182                            let _ = buffer_sender.unbounded_send(bytes);
183                        }
184                        WorkerCommand::Flush(notifier) => {
185                            let _ = notifier.send(());
186                        }
187                    }
188                }
189            });
190        });
191
192        Self {
193            denied_tags,
194            sender,
195            buffers,
196            current_buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE),
197        }
198    }
199
200    /// Asynchronously writes bytes.
201    async fn write(&mut self, mut bytes: &[u8]) {
202        while !bytes.is_empty() {
203            if self.current_buffer.capacity() == 0 {
204                self.current_buffer = self.buffers.next().await.unwrap();
205                self.current_buffer.clear();
206            }
207            let (part, rem) = bytes.split_at(std::cmp::min(self.space(), bytes.len()));
208            self.current_buffer.extend(part);
209            if !rem.is_empty() {
210                self.flush();
211            }
212            bytes = rem;
213        }
214    }
215
216    /// Return a synchronous writer with the required capacity.
217    ///
218    /// NOTE: Using `io_writer` will mean that line breaks don't occur in the middle: they will
219    /// always be at the beginning of whatever is being output, which is different from what happens
220    /// with `write`.
221    async fn io_writer(&mut self, required: usize) -> IoWriter<'_> {
222        assert!(required < MAX_SERIAL_WRITE_SIZE);
223        if self.current_buffer.capacity() == 0 || self.space() < required {
224            self.flush();
225            self.current_buffer = self.buffers.next().await.unwrap();
226            self.current_buffer.clear();
227        }
228        IoWriter(self)
229    }
230
231    /// Flush the buffer.
232    fn flush(&mut self) {
233        if !self.current_buffer.is_empty() {
234            self.current_buffer.push(b'\n');
235            self.sender
236                .unbounded_send(WorkerCommand::Write(mem::take(&mut self.current_buffer)))
237                .unwrap();
238        }
239    }
240
241    /// Returns a barrier.
242    fn get_barrier(&self) -> Barrier {
243        Barrier(self.sender.clone())
244    }
245
246    /// Returns the amount of space in the current buffer.
247    fn space(&self) -> usize {
248        // Always leave room for the \n.
249        MAX_SERIAL_WRITE_SIZE - 1 - self.current_buffer.len()
250    }
251
252    /// Writes a log record.
253    async fn log(&mut self, log: &Data<Logs>) {
254        // Most of the time, this is empty.
255        if !self.denied_tags.is_empty() {
256            if let Some(tags) = log.tags() {
257                if tags.iter().any(|tag| self.denied_tags.contains(tag)) {
258                    return;
259                }
260            }
261
262            // Consider `component_name` as a tag. A log viewer will be presented
263            // the component name just as any tag.
264            let component_name = log.component_name();
265
266            if self.denied_tags.contains(component_name.as_ref()) {
267                return;
268            }
269
270            // This may be a dynamic collection, split into parts, and try and match each part
271            // to the tags.
272            for name in component_name.split(":") {
273                if self.denied_tags.contains(name) {
274                    return;
275                }
276            }
277        }
278
279        write!(
280            self.io_writer(64).await, // 64 is ample
281            "[{:05}.{:03}] {:05}:{:05}> [",
282            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
283            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
284                % 1000,
285            log.pid().unwrap_or(0),
286            log.tid().unwrap_or(0)
287        )
288        .unwrap();
289
290        if let Some(tags) = log.tags().filter(|tags| !tags.is_empty()) {
291            for (i, tag) in tags.iter().enumerate() {
292                self.write(tag.as_bytes()).await;
293                if i < tags.len() - 1 {
294                    self.write(b", ").await;
295                }
296            }
297        } else {
298            self.write(log.component_name().as_bytes()).await;
299        }
300
301        // Write this separately from the next so that the line-break, if necessary (unlikely
302        // because the tags shouldn't take up much space), comes after this, but before the
303        // severity.
304        self.write(b"]").await;
305
306        write!(self.io_writer(16).await, " {}: ", log.severity()).unwrap();
307        if let Some(m) = log.msg() {
308            self.write(m.as_bytes()).await;
309        }
310
311        for key_str in log.payload_keys_strings() {
312            self.write(b" ").await;
313            self.write(key_str.as_bytes()).await;
314        }
315
316        // NOTE: Whilst it might be tempting (for performance reasons) to try and buffer up more
317        // messages before flushing, there are downstream consumers (in tests) that get confused if
318        // part lines are written to the serial log, so we must make sure we write whole lines, and
319        // it's easiest if we just send one line at a time.
320        self.flush();
321    }
322}
323
324struct IoWriter<'a>(&'a mut SerialWriter);
325
326impl Write for IoWriter<'_> {
327    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
328        self.0.current_buffer.extend(buf);
329        Ok(buf.len())
330    }
331
332    fn flush(&mut self) -> io::Result<()> {
333        Ok(())
334    }
335}
336
337struct Barrier(mpsc::UnboundedSender<WorkerCommand>);
338
339impl Barrier {
340    /// Asynchronously waits for all pending writes to complete.
341    async fn wait(&mut self) {
342        let (tx, rx) = oneshot::channel();
343
344        self.0.unbounded_send(WorkerCommand::Flush(tx)).unwrap();
345
346        // Wait for the worker thread to signal that the flush is complete.
347        let _ = rx.await;
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use fuchsia_async::{self as fasync};
354    use futures::SinkExt;
355    use futures::channel::mpsc::{self, unbounded};
356
357    use super::*;
358    use crate::identity::ComponentIdentity;
359    use crate::logs::testing::make_message;
360    use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
361    use fidl::endpoints::create_proxy;
362    use fidl_fuchsia_logger::LogSinkMarker;
363    use fuchsia_async::TimeoutExt;
364    use futures::FutureExt;
365    use moniker::ExtendedMoniker;
366    use std::sync::Mutex;
367    use std::time::Duration;
368    use zx::BootInstant;
369
370    /// TestSink will send log lines received (delimited by \n) over a channel.
371    struct TestSink {
372        buffer: Vec<u8>,
373        snd: mpsc::UnboundedSender<String>,
374    }
375
376    impl TestSink {
377        fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
378            let (snd, rcv) = mpsc::unbounded();
379            (Self { buffer: Vec::new(), snd }, rcv)
380        }
381    }
382
383    impl Write for TestSink {
384        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
385            for chunk in buf.split_inclusive(|c| *c == b'\n') {
386                if !self.buffer.is_empty() {
387                    self.buffer.extend(chunk);
388                    if *self.buffer.last().unwrap() == b'\n' {
389                        self.snd
390                            .unbounded_send(
391                                String::from_utf8(std::mem::take(&mut self.buffer))
392                                    .expect("wrote valid utf8"),
393                            )
394                            .expect("sent item");
395                    }
396                } else if *chunk.last().unwrap() == b'\n' {
397                    self.snd
398                        .unbounded_send(str::from_utf8(chunk).expect("wrote valid utf8").into())
399                        .unwrap();
400                } else {
401                    self.buffer.extend(chunk);
402                }
403            }
404            Ok(buf.len())
405        }
406
407        fn flush(&mut self) -> io::Result<()> {
408            Ok(())
409        }
410    }
411
412    /// FakeSink collects logs into a buffer.
413    #[derive(Clone, Default)]
414    struct FakeSink(Arc<Mutex<Vec<u8>>>);
415
416    impl FakeSink {
417        fn with_buffer<R>(&self, f: impl FnOnce(&Vec<u8>) -> R) -> R {
418            f(&self.0.lock().unwrap())
419        }
420    }
421
422    impl Write for FakeSink {
423        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
424            self.0.lock().unwrap().write(buf)
425        }
426
427        fn flush(&mut self) -> io::Result<()> {
428            unreachable!();
429        }
430    }
431
432    #[fuchsia::test]
433    async fn write_to_serial_handles_denied_tags() {
434        let log = LogsDataBuilder::new(BuilderArgs {
435            timestamp: BootInstant::from_nanos(1),
436            component_url: Some("url".into()),
437            moniker: "core/foo".try_into().unwrap(),
438            severity: Severity::Info,
439        })
440        .add_tag("denied-tag")
441        .build();
442        let sink = FakeSink::default();
443        let mut writer =
444            SerialWriter::new(sink.clone(), HashSet::from_iter(["denied-tag".to_string()]));
445        writer.log(&log).await;
446        writer.get_barrier().wait().await;
447        assert!(sink.with_buffer(|b| b.is_empty()));
448    }
449
450    #[fuchsia::test]
451    async fn write_to_serial_splits_lines() {
452        let message = concat!(
453            "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
454            "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
455            "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
456        );
457        let log = LogsDataBuilder::new(BuilderArgs {
458            timestamp: BootInstant::from_nanos(123456789),
459            component_url: Some("url".into()),
460            moniker: "core/foo".try_into().unwrap(),
461            severity: Severity::Info,
462        })
463        .add_tag("bar")
464        .set_message(message)
465        .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
466        .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
467        .set_pid(1234)
468        .set_tid(5678)
469        .build();
470        let sink = FakeSink::default();
471        let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
472        writer.log(&log).await;
473        writer.get_barrier().wait().await;
474        sink.with_buffer(|b| {
475            assert_eq!(
476                str::from_utf8(b).unwrap(),
477                format!(
478                    "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
479                    &message[..218],
480                    &message[218..]
481                )
482            )
483        });
484    }
485
486    #[fuchsia::test]
487    async fn when_no_tags_are_present_the_component_name_is_used() {
488        let log = LogsDataBuilder::new(BuilderArgs {
489            timestamp: BootInstant::from_nanos(123456789),
490            component_url: Some("url".into()),
491            moniker: "core/foo".try_into().unwrap(),
492            severity: Severity::Info,
493        })
494        .set_message("my msg")
495        .set_pid(1234)
496        .set_tid(5678)
497        .build();
498        let sink = FakeSink::default();
499        let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
500        writer.log(&log).await;
501        writer.flush();
502        writer.get_barrier().wait().await;
503        sink.with_buffer(|b| {
504            assert_eq!(str::from_utf8(b).unwrap(), "[00000.123] 01234:05678> [foo] INFO: my msg\n");
505        });
506    }
507
508    #[fuchsia::test]
509    async fn pauses_logs_correctly() {
510        let repo = LogsRepository::for_test(fasync::Scope::new());
511
512        let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
513            ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
514            "fuchsia-pkg://bootstrap-foo",
515        )));
516        let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
517            ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
518            "fuchsia-pkg://bootstrap-bar",
519        )));
520        let bootstrap_denied_component_container =
521            repo.get_log_container(Arc::new(ComponentIdentity::new(
522                ExtendedMoniker::parse_str("./bootstrap/denied_component").unwrap(),
523                "fuchsia-pkg://bootstrap-denied_component",
524            )));
525
526        let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
527            ExtendedMoniker::parse_str("./core/foo").unwrap(),
528            "fuchsia-pkg://core-foo",
529        )));
530        let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
531            ExtendedMoniker::parse_str("./core/baz").unwrap(),
532            "fuchsia-pkg://core-baz",
533        )));
534
535        bootstrap_foo_container.ingest_message(make_message(
536            "a",
537            None,
538            zx::BootInstant::from_nanos(1),
539        ));
540
541        let (_flush_sender, flush_receiver) = unbounded();
542
543        core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
544        let (sink, mut rcv) = TestSink::new();
545        let cloned_repo = Arc::clone(&repo);
546        let (mut sender, receiver) = unbounded();
547        let _serial_task = fasync::Task::spawn(async move {
548            let allowed = &["bootstrap/**", "/core/foo"];
549            let denied = &["denied_tag", "denied_component"];
550            launch_serial(allowed, denied, cloned_repo, sink, receiver, flush_receiver).await;
551        });
552
553        bootstrap_bar_container.ingest_message(make_message(
554            "b",
555            Some("denied_tag"),
556            zx::BootInstant::from_nanos(3),
557        ));
558        bootstrap_denied_component_container.ingest_message(make_message(
559            "d",
560            None,
561            zx::BootInstant::from_nanos(3),
562        ));
563
564        let received = rcv.next().await.unwrap();
565        assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: a\n");
566
567        let (tx, rx) = oneshot::channel();
568        sender.send(tx).await.unwrap();
569
570        let freeze_token = rx.await.unwrap();
571
572        core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
573
574        // The pipeline is asynchronous, so all we can do is assert that no message is sent with a
575        // timeout.
576        assert!(rcv.next().map(|_| false).on_timeout(Duration::from_millis(500), || true).await);
577
578        drop(freeze_token);
579
580        assert_eq!(rcv.next().await.unwrap(), "[00000.000] 00001:00002> [foo] DEBUG: c\n");
581    }
582
583    #[fuchsia::test]
584    async fn writes_ingested_logs() {
585        let repo = LogsRepository::for_test(fasync::Scope::new());
586
587        let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
588            ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
589            "fuchsia-pkg://bootstrap-foo",
590        )));
591        let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
592            ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
593            "fuchsia-pkg://bootstrap-bar",
594        )));
595        let denied_collection = repo.get_log_container(Arc::new(ComponentIdentity::new(
596            ExtendedMoniker::parse_str("./bootstrap/denied-collection:foo-bar").unwrap(),
597            "fuchsia-pkg://bootstrap-denied-collection:foo-bar",
598        )));
599        let bootstrap_denied_component_container =
600            repo.get_log_container(Arc::new(ComponentIdentity::new(
601                ExtendedMoniker::parse_str("./bootstrap/denied_component").unwrap(),
602                "fuchsia-pkg://bootstrap-denied_component",
603            )));
604        let denied_collection_instance = repo.get_log_container(Arc::new(ComponentIdentity::new(
605            ExtendedMoniker::parse_str("./bootstrap/collection:denied-foo-bar").unwrap(),
606            "fuchsia-pkg://bootstrap-collection:denied-foo-bar",
607        )));
608
609        let denied_collection_and_instance =
610            repo.get_log_container(Arc::new(ComponentIdentity::new(
611                ExtendedMoniker::parse_str("./bootstrap/collection:denied-foo-bar").unwrap(),
612                "fuchsia-pkg://bootstrap-collection:denied-foo-bar",
613            )));
614
615        let collection_and_instance = repo.get_log_container(Arc::new(ComponentIdentity::new(
616            ExtendedMoniker::parse_str("./bootstrap/collection:foo-bar").unwrap(),
617            "fuchsia-pkg://bootstrap-collection:foo-bar",
618        )));
619
620        let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
621            ExtendedMoniker::parse_str("./core/foo").unwrap(),
622            "fuchsia-pkg://core-foo",
623        )));
624        let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
625            ExtendedMoniker::parse_str("./core/baz").unwrap(),
626            "fuchsia-pkg://core-baz",
627        )));
628
629        bootstrap_foo_container.ingest_message(make_message(
630            "a",
631            None,
632            zx::BootInstant::from_nanos(1),
633        ));
634        core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
635        denied_collection.ingest_message(make_message("d", None, zx::BootInstant::from_nanos(2)));
636        denied_collection_instance.ingest_message(make_message(
637            "e",
638            None,
639            zx::BootInstant::from_nanos(2),
640        ));
641        denied_collection_and_instance.ingest_message(make_message(
642            "f",
643            None,
644            zx::BootInstant::from_nanos(2),
645        ));
646        collection_and_instance.ingest_message(make_message(
647            "g",
648            None,
649            zx::BootInstant::from_nanos(2),
650        ));
651
652        let (sink, rcv) = TestSink::new();
653
654        let (_freeze_sender, freeze_receiver) = unbounded();
655        let (_flush_sender, flush_receiver) = unbounded();
656        let _serial_task = fasync::Task::spawn(launch_serial(
657            &["bootstrap/**", "/core/foo"],
658            &[
659                "denied_tag",
660                "denied_component",
661                "denied-collection",
662                "denied-foo-bar",
663                "collection:denied-foo-bar",
664            ],
665            Arc::clone(&repo),
666            sink,
667            freeze_receiver,
668            flush_receiver,
669        ));
670
671        bootstrap_bar_container.ingest_message(make_message(
672            "b",
673            Some("denied_tag"),
674            zx::BootInstant::from_nanos(3),
675        ));
676        bootstrap_denied_component_container.ingest_message(make_message(
677            "d",
678            None,
679            zx::BootInstant::from_nanos(3),
680        ));
681        core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
682        let received: Vec<_> = rcv.take(3).collect().await;
683
684        // We must see the logs emitted before we installed the serial listener and after. We must
685        // not see the log from /core/baz and we must not see the log from bootstrap/bar with tag
686        // "foo".
687        assert_eq!(
688            received,
689            vec![
690                "[00000.000] 00001:00002> [foo] DEBUG: a\n",
691                "[00000.000] 00001:00002> [collection:foo-bar] DEBUG: g\n",
692                "[00000.000] 00001:00002> [foo] DEBUG: c\n"
693            ]
694        );
695    }
696
697    #[fuchsia::test]
698    async fn flush_drains_all_logs() {
699        for _ in 0..500 {
700            let scope = fasync::Scope::new();
701            let repo = LogsRepository::for_test(scope.new_child());
702            let identity = Arc::new(ComponentIdentity::new(
703                ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
704                "fuchsia-pkg://bootstrap-foo",
705            ));
706
707            let (log_sink, server_end) = create_proxy::<LogSinkMarker>();
708            let log_container = repo.get_log_container(identity);
709            log_container.handle_log_sink(server_end.into_stream(), scope.as_handle().clone());
710            let (client_socket, server_socket) = zx::Socket::create_datagram();
711            log_sink.connect_structured(server_socket).unwrap();
712
713            let (sink, mut rcv) = TestSink::new();
714            let cloned_repo = Arc::clone(&repo);
715            let (_freeze_sender, freeze_receiver) = unbounded();
716            let (flush_sender, flush_receiver) = unbounded();
717            scope.spawn(async move {
718                let allowed = &["bootstrap/**"];
719                let denied: &[&str] = &[];
720                launch_serial(allowed, denied, cloned_repo, sink, freeze_receiver, flush_receiver)
721                    .await;
722            });
723
724            let mut buffer = [0u8; 1024];
725            let cursor = std::io::Cursor::new(&mut buffer[..]);
726            let mut encoder = diagnostics_log_encoding::encode::Encoder::new(
727                cursor,
728                diagnostics_log_encoding::encode::EncoderOpts::default(),
729            );
730            // Wait for initial interest to ensure that the socket is registered.
731            log_sink.wait_for_interest_change().await.unwrap().unwrap();
732
733            encoder
734                .write_record(diagnostics_log_encoding::Record {
735                    timestamp: zx::BootInstant::from_nanos(1),
736                    severity: Severity::Info as u8,
737                    arguments: vec![
738                        diagnostics_log_encoding::Argument::new("tag", "foo"),
739                        diagnostics_log_encoding::Argument::new("message", "a"),
740                    ],
741                })
742                .unwrap();
743            client_socket
744                .write(&encoder.inner().get_ref()[..encoder.inner().position() as usize])
745                .unwrap();
746            let (sender, receiver) = oneshot::channel();
747            // Send the log flush command
748            flush_sender.unbounded_send(sender).unwrap();
749            // Wait for flush to complete. Flush involves a background thread,
750            // so keep polling until the background thread handles the flush event.
751            assert_eq!(receiver.await, Ok(()));
752            let received = rcv.next().now_or_never().unwrap().unwrap();
753            assert_eq!(received, "[00000.000] 00000:00000> [foo] INFO: a\n");
754        }
755    }
756}