1use crate::logs::repository::LogsRepository;
6use diagnostics_data::{Data, Logs};
7use fidl_fuchsia_diagnostics::{Selector, StreamMode};
8use fuchsia_async::OnSignals;
9use fuchsia_trace as ftrace;
10use futures::channel::mpsc::UnboundedReceiver;
11use futures::channel::{mpsc, oneshot};
12use futures::executor::block_on;
13use futures::{FutureExt, StreamExt, select};
14use log::warn;
15use selectors::FastError;
16use std::collections::HashSet;
17use std::io::{self, Write};
18use std::sync::Arc;
19use std::{mem, thread};
20use zx::Signals;
21
22pub const MAX_SERIAL_WRITE_SIZE: usize = 256;
23
24pub async fn launch_serial(
28 allow_serial_log_tags: impl IntoIterator<Item = impl AsRef<str>>,
29 deny_serial_log_tags: impl IntoIterator<Item = impl ToString>,
30 logs_repo: Arc<LogsRepository>,
31 sink: impl Write + Send + 'static,
32 mut freeze_receiver: mpsc::UnboundedReceiver<oneshot::Sender<zx::EventPair>>,
33 mut flush_receiver: UnboundedReceiver<oneshot::Sender<()>>,
34) {
35 let mut writer =
36 SerialWriter::new(sink, deny_serial_log_tags.into_iter().map(|s| s.to_string()).collect());
37
38 let mut barrier = writer.get_barrier();
39 let mut log_stream = logs_repo
40 .logs_cursor(
41 StreamMode::SnapshotThenSubscribe,
42 Some(selectors_from_tags(allow_serial_log_tags)),
43 ftrace::Id::random(),
44 )
45 .fuse();
46 loop {
47 select! {
48 log = log_stream.next() => {
49 if let Some(log) = log {
50 writer.log(&log).await;
52 } else {
53 break;
55 }
56 }
57 freeze_request = freeze_receiver.next() => {
58 if let Some(request) = freeze_request {
59 barrier.wait().await;
61 let (client, server) = zx::EventPair::create();
62 let _ = request.send(client);
63 let _ = OnSignals::new(&server, Signals::EVENTPAIR_PEER_CLOSED).await;
64 }
65 }
66 flush_request = flush_receiver.next() => {
67 if let Some(flush_request) = flush_request {
68 logs_repo.flush().await;
72
73 while let Some(Some(log)) = log_stream.next().now_or_never() {
75 writer.log(&log).await;
76 }
77
78 writer.get_barrier().wait().await;
80
81 let _ = flush_request.send(());
84 }
85 }
86 }
87 }
88 writer.get_barrier().wait().await;
90}
91
92fn selectors_from_tags(tags: impl IntoIterator<Item = impl AsRef<str>>) -> Vec<Selector> {
93 tags.into_iter()
94 .filter_map(|selector| {
95 let selector = selector.as_ref();
96 match selectors::parse_component_selector::<FastError>(selector) {
97 Ok(s) => Some(Selector {
98 component_selector: Some(s),
99 tree_selector: None,
100 ..Selector::default()
101 }),
102 Err(err) => {
103 warn!(selector:%, err:?; "Failed to parse component selector");
104 None
105 }
106 }
107 })
108 .collect()
109}
110
111#[derive(Default)]
113pub struct SerialSink;
114
115impl Write for SerialSink {
116 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
117 if cfg!(debug_assertions) {
118 debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
119 } else {
120 use std::sync::atomic::{AtomicBool, Ordering};
121 static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
122 if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
123 {
124 let size = buffer.len();
125 log::error!(
126 size;
127 "Skipping write to serial due to internal error. Exceeded max buffer size."
128 );
129 return Ok(buffer.len());
130 }
131 }
132 unsafe {
134 zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
135 }
136 Ok(buffer.len())
137 }
138
139 fn flush(&mut self) -> io::Result<()> {
140 Ok(())
141 }
142}
143
144enum WorkerCommand {
146 Write(Vec<u8>),
148 Flush(oneshot::Sender<()>),
150}
151
152struct SerialWriter {
157 denied_tags: HashSet<String>,
158 sender: mpsc::UnboundedSender<WorkerCommand>,
159 buffers: mpsc::UnboundedReceiver<Vec<u8>>,
160 current_buffer: Vec<u8>,
161}
162
163impl SerialWriter {
164 fn new<S: Write + Send + 'static>(mut sink: S, denied_tags: HashSet<String>) -> Self {
166 let (sender, mut receiver) = mpsc::unbounded();
167 let (buffer_sender, buffers) = mpsc::unbounded();
168
169 for _ in 0..31 {
171 buffer_sender.unbounded_send(Vec::with_capacity(MAX_SERIAL_WRITE_SIZE)).unwrap();
172 }
173
174 thread::spawn(move || {
175 block_on(async {
176 while let Some(command) = receiver.next().await {
177 match command {
178 WorkerCommand::Write(bytes) => {
179 let _ = sink.write(&bytes);
181 let _ = buffer_sender.unbounded_send(bytes);
183 }
184 WorkerCommand::Flush(notifier) => {
185 let _ = notifier.send(());
186 }
187 }
188 }
189 });
190 });
191
192 Self {
193 denied_tags,
194 sender,
195 buffers,
196 current_buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE),
197 }
198 }
199
200 async fn write(&mut self, mut bytes: &[u8]) {
202 while !bytes.is_empty() {
203 if self.current_buffer.capacity() == 0 {
204 self.current_buffer = self.buffers.next().await.unwrap();
205 self.current_buffer.clear();
206 }
207 let (part, rem) = bytes.split_at(std::cmp::min(self.space(), bytes.len()));
208 self.current_buffer.extend(part);
209 if !rem.is_empty() {
210 self.flush();
211 }
212 bytes = rem;
213 }
214 }
215
216 async fn io_writer(&mut self, required: usize) -> IoWriter<'_> {
222 assert!(required < MAX_SERIAL_WRITE_SIZE);
223 if self.current_buffer.capacity() == 0 || self.space() < required {
224 self.flush();
225 self.current_buffer = self.buffers.next().await.unwrap();
226 self.current_buffer.clear();
227 }
228 IoWriter(self)
229 }
230
231 fn flush(&mut self) {
233 if !self.current_buffer.is_empty() {
234 self.current_buffer.push(b'\n');
235 self.sender
236 .unbounded_send(WorkerCommand::Write(mem::take(&mut self.current_buffer)))
237 .unwrap();
238 }
239 }
240
241 fn get_barrier(&self) -> Barrier {
243 Barrier(self.sender.clone())
244 }
245
246 fn space(&self) -> usize {
248 MAX_SERIAL_WRITE_SIZE - 1 - self.current_buffer.len()
250 }
251
252 async fn log(&mut self, log: &Data<Logs>) {
254 if !self.denied_tags.is_empty() {
256 if let Some(tags) = log.tags() {
257 if tags.iter().any(|tag| self.denied_tags.contains(tag)) {
258 return;
259 }
260 }
261
262 let component_name = log.component_name();
265
266 if self.denied_tags.contains(component_name.as_ref()) {
267 return;
268 }
269
270 for name in component_name.split(":") {
273 if self.denied_tags.contains(name) {
274 return;
275 }
276 }
277 }
278
279 write!(
280 self.io_writer(64).await, "[{:05}.{:03}] {:05}:{:05}> [",
282 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
283 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
284 % 1000,
285 log.pid().unwrap_or(0),
286 log.tid().unwrap_or(0)
287 )
288 .unwrap();
289
290 if let Some(tags) = log.tags().filter(|tags| !tags.is_empty()) {
291 for (i, tag) in tags.iter().enumerate() {
292 self.write(tag.as_bytes()).await;
293 if i < tags.len() - 1 {
294 self.write(b", ").await;
295 }
296 }
297 } else {
298 self.write(log.component_name().as_bytes()).await;
299 }
300
301 self.write(b"]").await;
305
306 write!(self.io_writer(16).await, " {}: ", log.severity()).unwrap();
307 if let Some(m) = log.msg() {
308 self.write(m.as_bytes()).await;
309 }
310
311 for key_str in log.payload_keys_strings() {
312 self.write(b" ").await;
313 self.write(key_str.as_bytes()).await;
314 }
315
316 self.flush();
321 }
322}
323
324struct IoWriter<'a>(&'a mut SerialWriter);
325
326impl Write for IoWriter<'_> {
327 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
328 self.0.current_buffer.extend(buf);
329 Ok(buf.len())
330 }
331
332 fn flush(&mut self) -> io::Result<()> {
333 Ok(())
334 }
335}
336
337struct Barrier(mpsc::UnboundedSender<WorkerCommand>);
338
339impl Barrier {
340 async fn wait(&mut self) {
342 let (tx, rx) = oneshot::channel();
343
344 self.0.unbounded_send(WorkerCommand::Flush(tx)).unwrap();
345
346 let _ = rx.await;
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use fuchsia_async::{self as fasync};
354 use futures::SinkExt;
355 use futures::channel::mpsc::{self, unbounded};
356
357 use super::*;
358 use crate::identity::ComponentIdentity;
359 use crate::logs::testing::make_message;
360 use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
361 use fidl::endpoints::create_proxy;
362 use fidl_fuchsia_logger::LogSinkMarker;
363 use fuchsia_async::TimeoutExt;
364 use futures::FutureExt;
365 use moniker::ExtendedMoniker;
366 use std::sync::Mutex;
367 use std::time::Duration;
368 use zx::BootInstant;
369
370 struct TestSink {
372 buffer: Vec<u8>,
373 snd: mpsc::UnboundedSender<String>,
374 }
375
376 impl TestSink {
377 fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
378 let (snd, rcv) = mpsc::unbounded();
379 (Self { buffer: Vec::new(), snd }, rcv)
380 }
381 }
382
383 impl Write for TestSink {
384 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
385 for chunk in buf.split_inclusive(|c| *c == b'\n') {
386 if !self.buffer.is_empty() {
387 self.buffer.extend(chunk);
388 if *self.buffer.last().unwrap() == b'\n' {
389 self.snd
390 .unbounded_send(
391 String::from_utf8(std::mem::take(&mut self.buffer))
392 .expect("wrote valid utf8"),
393 )
394 .expect("sent item");
395 }
396 } else if *chunk.last().unwrap() == b'\n' {
397 self.snd
398 .unbounded_send(str::from_utf8(chunk).expect("wrote valid utf8").into())
399 .unwrap();
400 } else {
401 self.buffer.extend(chunk);
402 }
403 }
404 Ok(buf.len())
405 }
406
407 fn flush(&mut self) -> io::Result<()> {
408 Ok(())
409 }
410 }
411
412 #[derive(Clone, Default)]
414 struct FakeSink(Arc<Mutex<Vec<u8>>>);
415
416 impl FakeSink {
417 fn with_buffer<R>(&self, f: impl FnOnce(&Vec<u8>) -> R) -> R {
418 f(&self.0.lock().unwrap())
419 }
420 }
421
422 impl Write for FakeSink {
423 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
424 self.0.lock().unwrap().write(buf)
425 }
426
427 fn flush(&mut self) -> io::Result<()> {
428 unreachable!();
429 }
430 }
431
432 #[fuchsia::test]
433 async fn write_to_serial_handles_denied_tags() {
434 let log = LogsDataBuilder::new(BuilderArgs {
435 timestamp: BootInstant::from_nanos(1),
436 component_url: Some("url".into()),
437 moniker: "core/foo".try_into().unwrap(),
438 severity: Severity::Info,
439 })
440 .add_tag("denied-tag")
441 .build();
442 let sink = FakeSink::default();
443 let mut writer =
444 SerialWriter::new(sink.clone(), HashSet::from_iter(["denied-tag".to_string()]));
445 writer.log(&log).await;
446 writer.get_barrier().wait().await;
447 assert!(sink.with_buffer(|b| b.is_empty()));
448 }
449
450 #[fuchsia::test]
451 async fn write_to_serial_splits_lines() {
452 let message = concat!(
453 "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
454 "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
455 "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
456 );
457 let log = LogsDataBuilder::new(BuilderArgs {
458 timestamp: BootInstant::from_nanos(123456789),
459 component_url: Some("url".into()),
460 moniker: "core/foo".try_into().unwrap(),
461 severity: Severity::Info,
462 })
463 .add_tag("bar")
464 .set_message(message)
465 .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
466 .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
467 .set_pid(1234)
468 .set_tid(5678)
469 .build();
470 let sink = FakeSink::default();
471 let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
472 writer.log(&log).await;
473 writer.get_barrier().wait().await;
474 sink.with_buffer(|b| {
475 assert_eq!(
476 str::from_utf8(b).unwrap(),
477 format!(
478 "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
479 &message[..218],
480 &message[218..]
481 )
482 )
483 });
484 }
485
486 #[fuchsia::test]
487 async fn when_no_tags_are_present_the_component_name_is_used() {
488 let log = LogsDataBuilder::new(BuilderArgs {
489 timestamp: BootInstant::from_nanos(123456789),
490 component_url: Some("url".into()),
491 moniker: "core/foo".try_into().unwrap(),
492 severity: Severity::Info,
493 })
494 .set_message("my msg")
495 .set_pid(1234)
496 .set_tid(5678)
497 .build();
498 let sink = FakeSink::default();
499 let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
500 writer.log(&log).await;
501 writer.flush();
502 writer.get_barrier().wait().await;
503 sink.with_buffer(|b| {
504 assert_eq!(str::from_utf8(b).unwrap(), "[00000.123] 01234:05678> [foo] INFO: my msg\n");
505 });
506 }
507
508 #[fuchsia::test]
509 async fn pauses_logs_correctly() {
510 let repo = LogsRepository::for_test(fasync::Scope::new());
511
512 let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
513 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
514 "fuchsia-pkg://bootstrap-foo",
515 )));
516 let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
517 ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
518 "fuchsia-pkg://bootstrap-bar",
519 )));
520 let bootstrap_denied_component_container =
521 repo.get_log_container(Arc::new(ComponentIdentity::new(
522 ExtendedMoniker::parse_str("./bootstrap/denied_component").unwrap(),
523 "fuchsia-pkg://bootstrap-denied_component",
524 )));
525
526 let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
527 ExtendedMoniker::parse_str("./core/foo").unwrap(),
528 "fuchsia-pkg://core-foo",
529 )));
530 let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
531 ExtendedMoniker::parse_str("./core/baz").unwrap(),
532 "fuchsia-pkg://core-baz",
533 )));
534
535 bootstrap_foo_container.ingest_message(make_message(
536 "a",
537 None,
538 zx::BootInstant::from_nanos(1),
539 ));
540
541 let (_flush_sender, flush_receiver) = unbounded();
542
543 core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
544 let (sink, mut rcv) = TestSink::new();
545 let cloned_repo = Arc::clone(&repo);
546 let (mut sender, receiver) = unbounded();
547 let _serial_task = fasync::Task::spawn(async move {
548 let allowed = &["bootstrap/**", "/core/foo"];
549 let denied = &["denied_tag", "denied_component"];
550 launch_serial(allowed, denied, cloned_repo, sink, receiver, flush_receiver).await;
551 });
552
553 bootstrap_bar_container.ingest_message(make_message(
554 "b",
555 Some("denied_tag"),
556 zx::BootInstant::from_nanos(3),
557 ));
558 bootstrap_denied_component_container.ingest_message(make_message(
559 "d",
560 None,
561 zx::BootInstant::from_nanos(3),
562 ));
563
564 let received = rcv.next().await.unwrap();
565 assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: a\n");
566
567 let (tx, rx) = oneshot::channel();
568 sender.send(tx).await.unwrap();
569
570 let freeze_token = rx.await.unwrap();
571
572 core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
573
574 assert!(rcv.next().map(|_| false).on_timeout(Duration::from_millis(500), || true).await);
577
578 drop(freeze_token);
579
580 assert_eq!(rcv.next().await.unwrap(), "[00000.000] 00001:00002> [foo] DEBUG: c\n");
581 }
582
583 #[fuchsia::test]
584 async fn writes_ingested_logs() {
585 let repo = LogsRepository::for_test(fasync::Scope::new());
586
587 let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
588 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
589 "fuchsia-pkg://bootstrap-foo",
590 )));
591 let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
592 ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
593 "fuchsia-pkg://bootstrap-bar",
594 )));
595 let denied_collection = repo.get_log_container(Arc::new(ComponentIdentity::new(
596 ExtendedMoniker::parse_str("./bootstrap/denied-collection:foo-bar").unwrap(),
597 "fuchsia-pkg://bootstrap-denied-collection:foo-bar",
598 )));
599 let bootstrap_denied_component_container =
600 repo.get_log_container(Arc::new(ComponentIdentity::new(
601 ExtendedMoniker::parse_str("./bootstrap/denied_component").unwrap(),
602 "fuchsia-pkg://bootstrap-denied_component",
603 )));
604 let denied_collection_instance = repo.get_log_container(Arc::new(ComponentIdentity::new(
605 ExtendedMoniker::parse_str("./bootstrap/collection:denied-foo-bar").unwrap(),
606 "fuchsia-pkg://bootstrap-collection:denied-foo-bar",
607 )));
608
609 let denied_collection_and_instance =
610 repo.get_log_container(Arc::new(ComponentIdentity::new(
611 ExtendedMoniker::parse_str("./bootstrap/collection:denied-foo-bar").unwrap(),
612 "fuchsia-pkg://bootstrap-collection:denied-foo-bar",
613 )));
614
615 let collection_and_instance = repo.get_log_container(Arc::new(ComponentIdentity::new(
616 ExtendedMoniker::parse_str("./bootstrap/collection:foo-bar").unwrap(),
617 "fuchsia-pkg://bootstrap-collection:foo-bar",
618 )));
619
620 let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
621 ExtendedMoniker::parse_str("./core/foo").unwrap(),
622 "fuchsia-pkg://core-foo",
623 )));
624 let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
625 ExtendedMoniker::parse_str("./core/baz").unwrap(),
626 "fuchsia-pkg://core-baz",
627 )));
628
629 bootstrap_foo_container.ingest_message(make_message(
630 "a",
631 None,
632 zx::BootInstant::from_nanos(1),
633 ));
634 core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
635 denied_collection.ingest_message(make_message("d", None, zx::BootInstant::from_nanos(2)));
636 denied_collection_instance.ingest_message(make_message(
637 "e",
638 None,
639 zx::BootInstant::from_nanos(2),
640 ));
641 denied_collection_and_instance.ingest_message(make_message(
642 "f",
643 None,
644 zx::BootInstant::from_nanos(2),
645 ));
646 collection_and_instance.ingest_message(make_message(
647 "g",
648 None,
649 zx::BootInstant::from_nanos(2),
650 ));
651
652 let (sink, rcv) = TestSink::new();
653
654 let (_freeze_sender, freeze_receiver) = unbounded();
655 let (_flush_sender, flush_receiver) = unbounded();
656 let _serial_task = fasync::Task::spawn(launch_serial(
657 &["bootstrap/**", "/core/foo"],
658 &[
659 "denied_tag",
660 "denied_component",
661 "denied-collection",
662 "denied-foo-bar",
663 "collection:denied-foo-bar",
664 ],
665 Arc::clone(&repo),
666 sink,
667 freeze_receiver,
668 flush_receiver,
669 ));
670
671 bootstrap_bar_container.ingest_message(make_message(
672 "b",
673 Some("denied_tag"),
674 zx::BootInstant::from_nanos(3),
675 ));
676 bootstrap_denied_component_container.ingest_message(make_message(
677 "d",
678 None,
679 zx::BootInstant::from_nanos(3),
680 ));
681 core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
682 let received: Vec<_> = rcv.take(3).collect().await;
683
684 assert_eq!(
688 received,
689 vec![
690 "[00000.000] 00001:00002> [foo] DEBUG: a\n",
691 "[00000.000] 00001:00002> [collection:foo-bar] DEBUG: g\n",
692 "[00000.000] 00001:00002> [foo] DEBUG: c\n"
693 ]
694 );
695 }
696
697 #[fuchsia::test]
698 async fn flush_drains_all_logs() {
699 for _ in 0..500 {
700 let scope = fasync::Scope::new();
701 let repo = LogsRepository::for_test(scope.new_child());
702 let identity = Arc::new(ComponentIdentity::new(
703 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
704 "fuchsia-pkg://bootstrap-foo",
705 ));
706
707 let (log_sink, server_end) = create_proxy::<LogSinkMarker>();
708 let log_container = repo.get_log_container(identity);
709 log_container.handle_log_sink(server_end.into_stream(), scope.as_handle().clone());
710 let (client_socket, server_socket) = zx::Socket::create_datagram();
711 log_sink.connect_structured(server_socket).unwrap();
712
713 let (sink, mut rcv) = TestSink::new();
714 let cloned_repo = Arc::clone(&repo);
715 let (_freeze_sender, freeze_receiver) = unbounded();
716 let (flush_sender, flush_receiver) = unbounded();
717 scope.spawn(async move {
718 let allowed = &["bootstrap/**"];
719 let denied: &[&str] = &[];
720 launch_serial(allowed, denied, cloned_repo, sink, freeze_receiver, flush_receiver)
721 .await;
722 });
723
724 let mut buffer = [0u8; 1024];
725 let cursor = std::io::Cursor::new(&mut buffer[..]);
726 let mut encoder = diagnostics_log_encoding::encode::Encoder::new(
727 cursor,
728 diagnostics_log_encoding::encode::EncoderOpts::default(),
729 );
730 log_sink.wait_for_interest_change().await.unwrap().unwrap();
732
733 encoder
734 .write_record(diagnostics_log_encoding::Record {
735 timestamp: zx::BootInstant::from_nanos(1),
736 severity: Severity::Info as u8,
737 arguments: vec![
738 diagnostics_log_encoding::Argument::new("tag", "foo"),
739 diagnostics_log_encoding::Argument::new("message", "a"),
740 ],
741 })
742 .unwrap();
743 client_socket
744 .write(&encoder.inner().get_ref()[..encoder.inner().position() as usize])
745 .unwrap();
746 let (sender, receiver) = oneshot::channel();
747 flush_sender.unbounded_send(sender).unwrap();
749 assert_eq!(receiver.await, Ok(()));
752 let received = rcv.next().now_or_never().unwrap().unwrap();
753 assert_eq!(received, "[00000.000] 00000:00000> [foo] INFO: a\n");
754 }
755 }
756}