archivist_lib/logs/
serial.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::logs::repository::LogsRepository;
6use diagnostics_data::{Data, Logs};
7use fidl_fuchsia_diagnostics::{Selector, StreamMode};
8use fuchsia_async::OnSignals;
9use fuchsia_trace as ftrace;
10use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
11use futures::channel::{mpsc, oneshot};
12use futures::executor::block_on;
13use futures::future::Either;
14use futures::{select, FutureExt, StreamExt};
15use log::warn;
16use selectors::FastError;
17use std::collections::HashSet;
18use std::fmt::Display;
19use std::io::{self, Write};
20use std::pin::pin;
21use std::sync::Arc;
22use std::{mem, thread};
23use zx::Signals;
24
25const MAX_SERIAL_WRITE_SIZE: usize = 256;
26
27/// Function that forwards logs from Archivist to the serial port. Logs will be filtered by
28/// `allow_serial_log_tags` to include logs in the serial output, and `deny_serial_log_tags` to
29/// exclude specific tags.
30pub async fn launch_serial(
31    allow_serial_log_tags: Vec<String>,
32    deny_serial_log_tags: Vec<String>,
33    logs_repo: Arc<LogsRepository>,
34    sink: impl Write + Send + 'static,
35    mut freeze_receiver: mpsc::UnboundedReceiver<oneshot::Sender<zx::EventPair>>,
36    flush_receiver: UnboundedReceiver<UnboundedSender<()>>,
37) {
38    let writer = SerialWriter::new(sink, deny_serial_log_tags.into_iter().collect());
39    let mut barrier = writer.get_barrier();
40    let mut write_logs_to_serial = pin!(SerialConfig::new(allow_serial_log_tags)
41        .write_logs(logs_repo, writer, flush_receiver)
42        .fuse());
43    loop {
44        select! {
45            _ = write_logs_to_serial => break,
46            freeze_request = freeze_receiver.next() => {
47                if let Some(request) = freeze_request {
48                    // We must use the barrier before we send back the event.
49                    barrier.wait().await;
50                    let (client, server) = zx::EventPair::create();
51                    let _ = request.send(client);
52                    let _ = OnSignals::new(&server, Signals::EVENTPAIR_PEER_CLOSED).await;
53                }
54            }
55        }
56    }
57}
58
59#[derive(Default)]
60pub struct SerialConfig {
61    selectors: Vec<Selector>,
62}
63
64impl SerialConfig {
65    /// Creates a new serial configuration from the given structured config values.
66    pub fn new<C>(allowed_components: Vec<C>) -> Self
67    where
68        C: AsRef<str> + Display,
69    {
70        let selectors = allowed_components
71            .into_iter()
72            .filter_map(|selector| {
73                match selectors::parse_component_selector::<FastError>(selector.as_ref()) {
74                    Ok(s) => Some(Selector {
75                        component_selector: Some(s),
76                        tree_selector: None,
77                        ..Selector::default()
78                    }),
79                    Err(err) => {
80                        warn!(selector:%, err:?; "Failed to parse component selector");
81                        None
82                    }
83                }
84            })
85            .collect();
86        Self { selectors }
87    }
88
89    /// Returns a future that resolves when there's no more logs to write to serial. This can only
90    /// happen when all log sink connections have been closed for the components that were
91    /// configured to emit logs.
92    async fn write_logs(
93        self,
94        repo: Arc<LogsRepository>,
95        mut writer: SerialWriter,
96        mut flush_receiver: UnboundedReceiver<UnboundedSender<()>>,
97    ) {
98        let mut log_stream = repo
99            .logs_cursor(
100                StreamMode::SnapshotThenSubscribe,
101                Some(self.selectors),
102                ftrace::Id::random(),
103            )
104            .fuse();
105        loop {
106            match select! {
107                log = log_stream.next() => Either::Left(log),
108                flush_request = flush_receiver.next() => Either::Right(flush_request),
109            } {
110                // We've received a log
111                Either::Left(Some(log)) => {
112                    writer.log(&log).await;
113                }
114                // We've hit the end of the log stream and Archivist
115                // is shutting down.
116                Either::Left(None) => {
117                    break;
118                }
119                // We've received a flush request
120                Either::Right(Some(flush_request)) => {
121                    // We have a background thread that polls sockets.
122                    // Ensure the background thread is polled first before
123                    // we write the remaining logs to serial to ensure
124                    // we capture all logs.
125                    repo.flush().await;
126
127                    // Write all pending logs to serial
128                    while let Some(Some(log)) = log_stream.next().now_or_never() {
129                        writer.log(&log).await;
130                    }
131                    // Flush the serial thread (ensuring everything goes to serial)
132                    writer.get_barrier().wait().await;
133
134                    // Reply to the flush request and continue normal logging operations.
135                    // Ignore the error, because the channel may have been closed which is OK.
136                    let _ = flush_request.unbounded_send(());
137                }
138                // The flush server has exited, and Archivist is shutting down.
139                Either::Right(None) => {
140                    break;
141                }
142            }
143        }
144        // Ensure logs are flushed before we finish.
145        writer.get_barrier().wait().await;
146    }
147}
148
149/// A sink to write to serial. This Write implementation must be used together with SerialWriter.
150#[derive(Default)]
151pub struct SerialSink;
152
153impl Write for SerialSink {
154    fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
155        if cfg!(debug_assertions) {
156            debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
157        } else {
158            use std::sync::atomic::{AtomicBool, Ordering};
159            static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
160            if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
161            {
162                let size = buffer.len();
163                log::error!(
164                    size;
165                    "Skipping write to serial due to internal error. Exceeded max buffer size."
166                );
167                return Ok(buffer.len());
168            }
169        }
170        // SAFETY: calling a syscall. We pass a pointer to the buffer and its exact size.
171        unsafe {
172            zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
173        }
174        Ok(buffer.len())
175    }
176
177    fn flush(&mut self) -> io::Result<()> {
178        Ok(())
179    }
180}
181
182/// An enum to represent commands sent to the worker thread.
183enum WorkerCommand {
184    /// Write the given bytes.
185    Write(Vec<u8>),
186    /// Flush all pending writes and notify when done.
187    Flush(oneshot::Sender<()>),
188}
189
190/// A writer that provides an async interface to a synchronous, blocking write operation.
191///
192/// It spawns a dedicated thread to handle the synchronous writes, allowing the async
193/// context to remain non-blocking.
194struct SerialWriter {
195    denied_tags: HashSet<String>,
196    sender: mpsc::UnboundedSender<WorkerCommand>,
197    buffers: mpsc::UnboundedReceiver<Vec<u8>>,
198    current_buffer: Vec<u8>,
199}
200
201impl SerialWriter {
202    /// Creates a new Writer and spawns a worker thread.
203    fn new<S: Write + Send + 'static>(mut sink: S, denied_tags: HashSet<String>) -> Self {
204        let (sender, mut receiver) = mpsc::unbounded();
205        let (buffer_sender, buffers) = mpsc::unbounded();
206
207        // Limit to 32 buffers (one comes from `current_buffer`).
208        for _ in 0..31 {
209            buffer_sender.unbounded_send(Vec::with_capacity(MAX_SERIAL_WRITE_SIZE)).unwrap();
210        }
211
212        thread::spawn(move || {
213            block_on(async {
214                while let Some(command) = receiver.next().await {
215                    match command {
216                        WorkerCommand::Write(bytes) => {
217                            // Ignore errors.
218                            let _ = sink.write(&bytes);
219                            // Send the buffer back.
220                            let _ = buffer_sender.unbounded_send(bytes);
221                        }
222                        WorkerCommand::Flush(notifier) => {
223                            let _ = notifier.send(());
224                        }
225                    }
226                }
227            });
228        });
229
230        Self {
231            denied_tags,
232            sender,
233            buffers,
234            current_buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE),
235        }
236    }
237
238    /// Asynchronously writes bytes.
239    async fn write(&mut self, mut bytes: &[u8]) {
240        while !bytes.is_empty() {
241            if self.current_buffer.capacity() == 0 {
242                self.current_buffer = self.buffers.next().await.unwrap();
243                self.current_buffer.clear();
244            }
245            let (part, rem) = bytes.split_at(std::cmp::min(self.space(), bytes.len()));
246            self.current_buffer.extend(part);
247            if !rem.is_empty() {
248                self.flush();
249            }
250            bytes = rem;
251        }
252    }
253
254    /// Return a synchronous writer with the required capacity.
255    ///
256    /// NOTE: Using `io_writer` will mean that line breaks don't occur in the middle: they will
257    /// always be at the beginning of whatever is being output, which is different from what happens
258    /// with `write`.
259    async fn io_writer(&mut self, required: usize) -> IoWriter<'_> {
260        assert!(required < MAX_SERIAL_WRITE_SIZE);
261        if self.current_buffer.capacity() == 0 || self.space() < required {
262            self.flush();
263            self.current_buffer = self.buffers.next().await.unwrap();
264            self.current_buffer.clear();
265        }
266        IoWriter(self)
267    }
268
269    /// Flush the buffer.
270    fn flush(&mut self) {
271        if !self.current_buffer.is_empty() {
272            self.current_buffer.push(b'\n');
273            self.sender
274                .unbounded_send(WorkerCommand::Write(mem::take(&mut self.current_buffer)))
275                .unwrap();
276        }
277    }
278
279    /// Returns a barrier.
280    fn get_barrier(&self) -> Barrier {
281        Barrier(self.sender.clone())
282    }
283
284    /// Returns the amount of space in the current buffer.
285    fn space(&self) -> usize {
286        // Always leave room for the \n.
287        MAX_SERIAL_WRITE_SIZE - 1 - self.current_buffer.len()
288    }
289
290    /// Writes a log record.
291    async fn log(&mut self, log: &Data<Logs>) {
292        if let Some(tags) = log.tags() {
293            if tags.iter().any(|tag| self.denied_tags.contains(tag)) {
294                return;
295            }
296        }
297
298        write!(
299            self.io_writer(64).await, // 64 is ample
300            "[{:05}.{:03}] {:05}:{:05}> [",
301            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
302            zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
303                % 1000,
304            log.pid().unwrap_or(0),
305            log.tid().unwrap_or(0)
306        )
307        .unwrap();
308
309        if let Some(tags) = log.tags().filter(|tags| !tags.is_empty()) {
310            for (i, tag) in tags.iter().enumerate() {
311                self.write(tag.as_bytes()).await;
312                if i < tags.len() - 1 {
313                    self.write(b", ").await;
314                }
315            }
316        } else {
317            self.write(log.component_name().as_bytes()).await;
318        }
319
320        // Write this separately from the next so that the line-break, if necessary (unlikely
321        // because the tags shouldn't take up much space), comes after this, but before the
322        // severity.
323        self.write(b"]").await;
324
325        write!(self.io_writer(16).await, " {}: ", log.severity()).unwrap();
326        if let Some(m) = log.msg() {
327            self.write(m.as_bytes()).await;
328        }
329
330        for key_str in log.payload_keys_strings() {
331            self.write(b" ").await;
332            self.write(key_str.as_bytes()).await;
333        }
334
335        // NOTE: Whilst it might be tempting (for performance reasons) to try and buffer up more
336        // messages before flushing, there are downstream consumers (in tests) that get confused if
337        // part lines are written to the serial log, so we must make sure we write whole lines, and
338        // it's easiest if we just send one line at a time.
339        self.flush();
340    }
341}
342
343struct IoWriter<'a>(&'a mut SerialWriter);
344
345impl Write for IoWriter<'_> {
346    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
347        self.0.current_buffer.extend(buf);
348        Ok(buf.len())
349    }
350
351    fn flush(&mut self) -> io::Result<()> {
352        Ok(())
353    }
354}
355
356struct Barrier(mpsc::UnboundedSender<WorkerCommand>);
357
358impl Barrier {
359    /// Asynchronously waits for all pending writes to complete.
360    async fn wait(&mut self) {
361        let (tx, rx) = oneshot::channel();
362
363        self.0.unbounded_send(WorkerCommand::Flush(tx)).unwrap();
364
365        // Wait for the worker thread to signal that the flush is complete.
366        let _ = rx.await;
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use fuchsia_async::{self as fasync};
373    use futures::channel::mpsc::{self, unbounded};
374    use futures::SinkExt;
375
376    use super::*;
377    use crate::identity::ComponentIdentity;
378    use crate::logs::testing::make_message;
379    use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
380    use fidl::endpoints::create_proxy;
381    use fidl_fuchsia_logger::LogSinkMarker;
382    use fuchsia_async::TimeoutExt;
383    use futures::FutureExt;
384    use moniker::ExtendedMoniker;
385    use std::sync::Mutex;
386    use std::time::Duration;
387    use zx::BootInstant;
388
389    /// TestSink will send log lines received (delimited by \n) over a channel.
390    struct TestSink {
391        buffer: Vec<u8>,
392        snd: mpsc::UnboundedSender<String>,
393    }
394
395    impl TestSink {
396        fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
397            let (snd, rcv) = mpsc::unbounded();
398            (Self { buffer: Vec::new(), snd }, rcv)
399        }
400    }
401
402    impl Write for TestSink {
403        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
404            for chunk in buf.split_inclusive(|c| *c == b'\n') {
405                if !self.buffer.is_empty() {
406                    self.buffer.extend(chunk);
407                    if *self.buffer.last().unwrap() == b'\n' {
408                        self.snd
409                            .unbounded_send(
410                                String::from_utf8(std::mem::take(&mut self.buffer))
411                                    .expect("wrote valid utf8"),
412                            )
413                            .expect("sent item");
414                    }
415                } else if *chunk.last().unwrap() == b'\n' {
416                    self.snd
417                        .unbounded_send(str::from_utf8(chunk).expect("wrote valid utf8").into())
418                        .unwrap();
419                } else {
420                    self.buffer.extend(chunk);
421                }
422            }
423            Ok(buf.len())
424        }
425
426        fn flush(&mut self) -> io::Result<()> {
427            Ok(())
428        }
429    }
430
431    /// FakeSink collects logs into a buffer.
432    #[derive(Clone, Default)]
433    struct FakeSink(Arc<Mutex<Vec<u8>>>);
434
435    impl FakeSink {
436        fn with_buffer<R>(&self, f: impl FnOnce(&Vec<u8>) -> R) -> R {
437            f(&self.0.lock().unwrap())
438        }
439    }
440
441    impl Write for FakeSink {
442        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
443            self.0.lock().unwrap().write(buf)
444        }
445
446        fn flush(&mut self) -> io::Result<()> {
447            unreachable!();
448        }
449    }
450
451    #[fuchsia::test]
452    async fn write_to_serial_handles_denied_tags() {
453        let log = LogsDataBuilder::new(BuilderArgs {
454            timestamp: BootInstant::from_nanos(1),
455            component_url: Some("url".into()),
456            moniker: "core/foo".try_into().unwrap(),
457            severity: Severity::Info,
458        })
459        .add_tag("denied-tag")
460        .build();
461        let sink = FakeSink::default();
462        let mut writer =
463            SerialWriter::new(sink.clone(), HashSet::from_iter(["denied-tag".to_string()]));
464        writer.log(&log).await;
465        writer.get_barrier().wait().await;
466        assert!(sink.with_buffer(|b| b.is_empty()));
467    }
468
469    #[fuchsia::test]
470    async fn write_to_serial_splits_lines() {
471        let message = concat!(
472            "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
473            "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
474            "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
475        );
476        let log = LogsDataBuilder::new(BuilderArgs {
477            timestamp: BootInstant::from_nanos(123456789),
478            component_url: Some("url".into()),
479            moniker: "core/foo".try_into().unwrap(),
480            severity: Severity::Info,
481        })
482        .add_tag("bar")
483        .set_message(message)
484        .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
485        .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
486        .set_pid(1234)
487        .set_tid(5678)
488        .build();
489        let sink = FakeSink::default();
490        let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
491        writer.log(&log).await;
492        writer.get_barrier().wait().await;
493        sink.with_buffer(|b| {
494            assert_eq!(
495                str::from_utf8(b).unwrap(),
496                format!(
497                    "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
498                    &message[..218],
499                    &message[218..]
500                )
501            )
502        });
503    }
504
505    #[fuchsia::test]
506    async fn when_no_tags_are_present_the_component_name_is_used() {
507        let log = LogsDataBuilder::new(BuilderArgs {
508            timestamp: BootInstant::from_nanos(123456789),
509            component_url: Some("url".into()),
510            moniker: "core/foo".try_into().unwrap(),
511            severity: Severity::Info,
512        })
513        .set_message("my msg")
514        .set_pid(1234)
515        .set_tid(5678)
516        .build();
517        let sink = FakeSink::default();
518        let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
519        writer.log(&log).await;
520        writer.flush();
521        writer.get_barrier().wait().await;
522        sink.with_buffer(|b| {
523            assert_eq!(str::from_utf8(b).unwrap(), "[00000.123] 01234:05678> [foo] INFO: my msg\n");
524        });
525    }
526
527    #[fuchsia::test]
528    async fn pauses_logs_correctly() {
529        let repo = LogsRepository::for_test(fasync::Scope::new());
530
531        let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
532            ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
533            "fuchsia-pkg://bootstrap-foo",
534        )));
535        let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
536            ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
537            "fuchsia-pkg://bootstrap-bar",
538        )));
539
540        let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
541            ExtendedMoniker::parse_str("./core/foo").unwrap(),
542            "fuchsia-pkg://core-foo",
543        )));
544        let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
545            ExtendedMoniker::parse_str("./core/baz").unwrap(),
546            "fuchsia-pkg://core-baz",
547        )));
548
549        bootstrap_foo_container.ingest_message(make_message(
550            "a",
551            None,
552            zx::BootInstant::from_nanos(1),
553        ));
554
555        let (_flush_sender, flush_receiver) = unbounded();
556
557        core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
558        let (sink, mut rcv) = TestSink::new();
559        let cloned_repo = Arc::clone(&repo);
560        let (mut sender, receiver) = unbounded();
561        let _serial_task = fasync::Task::spawn(async move {
562            let allowed = vec!["bootstrap/**".into(), "/core/foo".into()];
563            let denied = vec!["foo".into()];
564            launch_serial(allowed, denied, cloned_repo, sink, receiver, flush_receiver).await;
565        });
566        bootstrap_bar_container.ingest_message(make_message(
567            "b",
568            Some("foo"),
569            zx::BootInstant::from_nanos(3),
570        ));
571
572        let received = rcv.next().await.unwrap();
573
574        assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: a\n");
575
576        let (tx, rx) = oneshot::channel();
577        sender.send(tx).await.unwrap();
578
579        let freeze_token = rx.await.unwrap();
580
581        core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
582
583        // The pipeline is asynchronous, so all we can do is assert that no message is sent with a
584        // timeout.
585        assert!(rcv.next().map(|_| false).on_timeout(Duration::from_millis(500), || true).await);
586
587        drop(freeze_token);
588
589        assert_eq!(rcv.next().await.unwrap(), "[00000.000] 00001:00002> [foo] DEBUG: c\n");
590    }
591
592    #[fuchsia::test]
593    async fn writes_ingested_logs() {
594        let serial_config = SerialConfig::new(vec!["bootstrap/**", "/core/foo"]);
595        let repo = LogsRepository::for_test(fasync::Scope::new());
596
597        let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
598            ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
599            "fuchsia-pkg://bootstrap-foo",
600        )));
601        let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
602            ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
603            "fuchsia-pkg://bootstrap-bar",
604        )));
605
606        let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
607            ExtendedMoniker::parse_str("./core/foo").unwrap(),
608            "fuchsia-pkg://core-foo",
609        )));
610        let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
611            ExtendedMoniker::parse_str("./core/baz").unwrap(),
612            "fuchsia-pkg://core-baz",
613        )));
614
615        bootstrap_foo_container.ingest_message(make_message(
616            "a",
617            None,
618            zx::BootInstant::from_nanos(1),
619        ));
620        core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
621        let (sink, rcv) = TestSink::new();
622
623        let writer = SerialWriter::new(sink, HashSet::from_iter(["foo".to_string()]));
624        let (_flush_sender, flush_receiver) = unbounded();
625        let _serial_task = fasync::Task::spawn(serial_config.write_logs(
626            Arc::clone(&repo),
627            writer,
628            flush_receiver,
629        ));
630        bootstrap_bar_container.ingest_message(make_message(
631            "b",
632            Some("foo"),
633            zx::BootInstant::from_nanos(3),
634        ));
635        core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
636        let received: Vec<_> = rcv.take(2).collect().await;
637
638        // We must see the logs emitted before we installed the serial listener and after. We must
639        // not see the log from /core/baz and we must not see the log from bootstrap/bar with tag
640        // "foo".
641        assert_eq!(
642            received,
643            vec![
644                "[00000.000] 00001:00002> [foo] DEBUG: a\n",
645                "[00000.000] 00001:00002> [foo] DEBUG: c\n"
646            ]
647        );
648    }
649
650    #[fuchsia::test]
651    async fn flush_drains_all_logs() {
652        for _ in 0..500 {
653            let scope = fasync::Scope::new();
654            let repo = LogsRepository::for_test(scope.new_child());
655            let identity = Arc::new(ComponentIdentity::new(
656                ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
657                "fuchsia-pkg://bootstrap-foo",
658            ));
659
660            let (log_sink, server_end) = create_proxy::<LogSinkMarker>();
661            let log_container = repo.get_log_container(identity);
662            log_container.handle_log_sink(server_end.into_stream(), scope.as_handle().clone());
663            let (client_socket, server_socket) = zx::Socket::create_datagram();
664            log_sink.connect_structured(server_socket).unwrap();
665
666            let (sink, mut rcv) = TestSink::new();
667            let cloned_repo = Arc::clone(&repo);
668            let (_freeze_sender, freeze_receiver) = unbounded();
669            let (flush_sender, flush_receiver) = unbounded();
670            scope.spawn(async move {
671                let allowed = vec!["bootstrap/**".into()];
672                let denied = vec![];
673                launch_serial(allowed, denied, cloned_repo, sink, freeze_receiver, flush_receiver)
674                    .await;
675            });
676
677            let mut buffer = [0u8; 1024];
678            let cursor = std::io::Cursor::new(&mut buffer[..]);
679            let mut encoder = diagnostics_log_encoding::encode::Encoder::new(
680                cursor,
681                diagnostics_log_encoding::encode::EncoderOpts::default(),
682            );
683            // Wait for initial interest to ensure that the socket is registered.
684            log_sink.wait_for_interest_change().await.unwrap().unwrap();
685
686            encoder
687                .write_record(diagnostics_log_encoding::Record {
688                    timestamp: zx::BootInstant::from_nanos(1),
689                    severity: Severity::Info as u8,
690                    arguments: vec![
691                        diagnostics_log_encoding::Argument::new("tag", "foo"),
692                        diagnostics_log_encoding::Argument::new("message", "a"),
693                    ],
694                })
695                .unwrap();
696            client_socket
697                .write(&encoder.inner().get_ref()[..encoder.inner().position() as usize])
698                .unwrap();
699            let (sender, mut receiver) = unbounded();
700            // Send the log flush command
701            flush_sender.unbounded_send(sender).unwrap();
702            // Wait for flush to complete. Flush involves a background thread,
703            // so keep polling until the background thread handles the flush event.
704            assert_eq!(receiver.next().await, Some(()));
705            let received = rcv.next().now_or_never().unwrap().unwrap();
706            assert_eq!(received, "[00000.000] 00000:00000> [foo] INFO: a\n");
707        }
708    }
709}