1use crate::logs::repository::LogsRepository;
6use diagnostics_data::{Data, Logs};
7use fidl_fuchsia_diagnostics::{Selector, StreamMode};
8use fuchsia_async::OnSignals;
9use fuchsia_trace as ftrace;
10use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
11use futures::channel::{mpsc, oneshot};
12use futures::executor::block_on;
13use futures::future::Either;
14use futures::{select, FutureExt, StreamExt};
15use log::warn;
16use selectors::FastError;
17use std::collections::HashSet;
18use std::fmt::Display;
19use std::io::{self, Write};
20use std::pin::pin;
21use std::sync::Arc;
22use std::{mem, thread};
23use zx::Signals;
24
25const MAX_SERIAL_WRITE_SIZE: usize = 256;
26
27pub async fn launch_serial(
31 allow_serial_log_tags: Vec<String>,
32 deny_serial_log_tags: Vec<String>,
33 logs_repo: Arc<LogsRepository>,
34 sink: impl Write + Send + 'static,
35 mut freeze_receiver: mpsc::UnboundedReceiver<oneshot::Sender<zx::EventPair>>,
36 flush_receiver: UnboundedReceiver<UnboundedSender<()>>,
37) {
38 let writer = SerialWriter::new(sink, deny_serial_log_tags.into_iter().collect());
39 let mut barrier = writer.get_barrier();
40 let mut write_logs_to_serial = pin!(SerialConfig::new(allow_serial_log_tags)
41 .write_logs(logs_repo, writer, flush_receiver)
42 .fuse());
43 loop {
44 select! {
45 _ = write_logs_to_serial => break,
46 freeze_request = freeze_receiver.next() => {
47 if let Some(request) = freeze_request {
48 barrier.wait().await;
50 let (client, server) = zx::EventPair::create();
51 let _ = request.send(client);
52 let _ = OnSignals::new(&server, Signals::EVENTPAIR_PEER_CLOSED).await;
53 }
54 }
55 }
56 }
57}
58
59#[derive(Default)]
60pub struct SerialConfig {
61 selectors: Vec<Selector>,
62}
63
64impl SerialConfig {
65 pub fn new<C>(allowed_components: Vec<C>) -> Self
67 where
68 C: AsRef<str> + Display,
69 {
70 let selectors = allowed_components
71 .into_iter()
72 .filter_map(|selector| {
73 match selectors::parse_component_selector::<FastError>(selector.as_ref()) {
74 Ok(s) => Some(Selector {
75 component_selector: Some(s),
76 tree_selector: None,
77 ..Selector::default()
78 }),
79 Err(err) => {
80 warn!(selector:%, err:?; "Failed to parse component selector");
81 None
82 }
83 }
84 })
85 .collect();
86 Self { selectors }
87 }
88
89 async fn write_logs(
93 self,
94 repo: Arc<LogsRepository>,
95 mut writer: SerialWriter,
96 mut flush_receiver: UnboundedReceiver<UnboundedSender<()>>,
97 ) {
98 let mut log_stream = repo
99 .logs_cursor(
100 StreamMode::SnapshotThenSubscribe,
101 Some(self.selectors),
102 ftrace::Id::random(),
103 )
104 .fuse();
105 loop {
106 match select! {
107 log = log_stream.next() => Either::Left(log),
108 flush_request = flush_receiver.next() => Either::Right(flush_request),
109 } {
110 Either::Left(Some(log)) => {
112 writer.log(&log).await;
113 }
114 Either::Left(None) => {
117 break;
118 }
119 Either::Right(Some(flush_request)) => {
121 repo.flush().await;
126
127 while let Some(Some(log)) = log_stream.next().now_or_never() {
129 writer.log(&log).await;
130 }
131 writer.get_barrier().wait().await;
133
134 let _ = flush_request.unbounded_send(());
137 }
138 Either::Right(None) => {
140 break;
141 }
142 }
143 }
144 writer.get_barrier().wait().await;
146 }
147}
148
149#[derive(Default)]
151pub struct SerialSink;
152
153impl Write for SerialSink {
154 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
155 if cfg!(debug_assertions) {
156 debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
157 } else {
158 use std::sync::atomic::{AtomicBool, Ordering};
159 static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
160 if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
161 {
162 let size = buffer.len();
163 log::error!(
164 size;
165 "Skipping write to serial due to internal error. Exceeded max buffer size."
166 );
167 return Ok(buffer.len());
168 }
169 }
170 unsafe {
172 zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
173 }
174 Ok(buffer.len())
175 }
176
177 fn flush(&mut self) -> io::Result<()> {
178 Ok(())
179 }
180}
181
182enum WorkerCommand {
184 Write(Vec<u8>),
186 Flush(oneshot::Sender<()>),
188}
189
190struct SerialWriter {
195 denied_tags: HashSet<String>,
196 sender: mpsc::UnboundedSender<WorkerCommand>,
197 buffers: mpsc::UnboundedReceiver<Vec<u8>>,
198 current_buffer: Vec<u8>,
199}
200
201impl SerialWriter {
202 fn new<S: Write + Send + 'static>(mut sink: S, denied_tags: HashSet<String>) -> Self {
204 let (sender, mut receiver) = mpsc::unbounded();
205 let (buffer_sender, buffers) = mpsc::unbounded();
206
207 for _ in 0..31 {
209 buffer_sender.unbounded_send(Vec::with_capacity(MAX_SERIAL_WRITE_SIZE)).unwrap();
210 }
211
212 thread::spawn(move || {
213 block_on(async {
214 while let Some(command) = receiver.next().await {
215 match command {
216 WorkerCommand::Write(bytes) => {
217 let _ = sink.write(&bytes);
219 let _ = buffer_sender.unbounded_send(bytes);
221 }
222 WorkerCommand::Flush(notifier) => {
223 let _ = notifier.send(());
224 }
225 }
226 }
227 });
228 });
229
230 Self {
231 denied_tags,
232 sender,
233 buffers,
234 current_buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE),
235 }
236 }
237
238 async fn write(&mut self, mut bytes: &[u8]) {
240 while !bytes.is_empty() {
241 if self.current_buffer.capacity() == 0 {
242 self.current_buffer = self.buffers.next().await.unwrap();
243 self.current_buffer.clear();
244 }
245 let (part, rem) = bytes.split_at(std::cmp::min(self.space(), bytes.len()));
246 self.current_buffer.extend(part);
247 if !rem.is_empty() {
248 self.flush();
249 }
250 bytes = rem;
251 }
252 }
253
254 async fn io_writer(&mut self, required: usize) -> IoWriter<'_> {
260 assert!(required < MAX_SERIAL_WRITE_SIZE);
261 if self.current_buffer.capacity() == 0 || self.space() < required {
262 self.flush();
263 self.current_buffer = self.buffers.next().await.unwrap();
264 self.current_buffer.clear();
265 }
266 IoWriter(self)
267 }
268
269 fn flush(&mut self) {
271 if !self.current_buffer.is_empty() {
272 self.current_buffer.push(b'\n');
273 self.sender
274 .unbounded_send(WorkerCommand::Write(mem::take(&mut self.current_buffer)))
275 .unwrap();
276 }
277 }
278
279 fn get_barrier(&self) -> Barrier {
281 Barrier(self.sender.clone())
282 }
283
284 fn space(&self) -> usize {
286 MAX_SERIAL_WRITE_SIZE - 1 - self.current_buffer.len()
288 }
289
290 async fn log(&mut self, log: &Data<Logs>) {
292 if let Some(tags) = log.tags() {
293 if tags.iter().any(|tag| self.denied_tags.contains(tag)) {
294 return;
295 }
296 }
297
298 write!(
299 self.io_writer(64).await, "[{:05}.{:03}] {:05}:{:05}> [",
301 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
302 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
303 % 1000,
304 log.pid().unwrap_or(0),
305 log.tid().unwrap_or(0)
306 )
307 .unwrap();
308
309 if let Some(tags) = log.tags().filter(|tags| !tags.is_empty()) {
310 for (i, tag) in tags.iter().enumerate() {
311 self.write(tag.as_bytes()).await;
312 if i < tags.len() - 1 {
313 self.write(b", ").await;
314 }
315 }
316 } else {
317 self.write(log.component_name().as_bytes()).await;
318 }
319
320 self.write(b"]").await;
324
325 write!(self.io_writer(16).await, " {}: ", log.severity()).unwrap();
326 if let Some(m) = log.msg() {
327 self.write(m.as_bytes()).await;
328 }
329
330 for key_str in log.payload_keys_strings() {
331 self.write(b" ").await;
332 self.write(key_str.as_bytes()).await;
333 }
334
335 self.flush();
340 }
341}
342
343struct IoWriter<'a>(&'a mut SerialWriter);
344
345impl Write for IoWriter<'_> {
346 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
347 self.0.current_buffer.extend(buf);
348 Ok(buf.len())
349 }
350
351 fn flush(&mut self) -> io::Result<()> {
352 Ok(())
353 }
354}
355
356struct Barrier(mpsc::UnboundedSender<WorkerCommand>);
357
358impl Barrier {
359 async fn wait(&mut self) {
361 let (tx, rx) = oneshot::channel();
362
363 self.0.unbounded_send(WorkerCommand::Flush(tx)).unwrap();
364
365 let _ = rx.await;
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use fuchsia_async::{self as fasync};
373 use futures::channel::mpsc::{self, unbounded};
374 use futures::SinkExt;
375
376 use super::*;
377 use crate::identity::ComponentIdentity;
378 use crate::logs::testing::make_message;
379 use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
380 use fidl::endpoints::create_proxy;
381 use fidl_fuchsia_logger::LogSinkMarker;
382 use fuchsia_async::TimeoutExt;
383 use futures::FutureExt;
384 use moniker::ExtendedMoniker;
385 use std::sync::Mutex;
386 use std::time::Duration;
387 use zx::BootInstant;
388
389 struct TestSink {
391 buffer: Vec<u8>,
392 snd: mpsc::UnboundedSender<String>,
393 }
394
395 impl TestSink {
396 fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
397 let (snd, rcv) = mpsc::unbounded();
398 (Self { buffer: Vec::new(), snd }, rcv)
399 }
400 }
401
402 impl Write for TestSink {
403 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
404 for chunk in buf.split_inclusive(|c| *c == b'\n') {
405 if !self.buffer.is_empty() {
406 self.buffer.extend(chunk);
407 if *self.buffer.last().unwrap() == b'\n' {
408 self.snd
409 .unbounded_send(
410 String::from_utf8(std::mem::take(&mut self.buffer))
411 .expect("wrote valid utf8"),
412 )
413 .expect("sent item");
414 }
415 } else if *chunk.last().unwrap() == b'\n' {
416 self.snd
417 .unbounded_send(str::from_utf8(chunk).expect("wrote valid utf8").into())
418 .unwrap();
419 } else {
420 self.buffer.extend(chunk);
421 }
422 }
423 Ok(buf.len())
424 }
425
426 fn flush(&mut self) -> io::Result<()> {
427 Ok(())
428 }
429 }
430
431 #[derive(Clone, Default)]
433 struct FakeSink(Arc<Mutex<Vec<u8>>>);
434
435 impl FakeSink {
436 fn with_buffer<R>(&self, f: impl FnOnce(&Vec<u8>) -> R) -> R {
437 f(&self.0.lock().unwrap())
438 }
439 }
440
441 impl Write for FakeSink {
442 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
443 self.0.lock().unwrap().write(buf)
444 }
445
446 fn flush(&mut self) -> io::Result<()> {
447 unreachable!();
448 }
449 }
450
451 #[fuchsia::test]
452 async fn write_to_serial_handles_denied_tags() {
453 let log = LogsDataBuilder::new(BuilderArgs {
454 timestamp: BootInstant::from_nanos(1),
455 component_url: Some("url".into()),
456 moniker: "core/foo".try_into().unwrap(),
457 severity: Severity::Info,
458 })
459 .add_tag("denied-tag")
460 .build();
461 let sink = FakeSink::default();
462 let mut writer =
463 SerialWriter::new(sink.clone(), HashSet::from_iter(["denied-tag".to_string()]));
464 writer.log(&log).await;
465 writer.get_barrier().wait().await;
466 assert!(sink.with_buffer(|b| b.is_empty()));
467 }
468
469 #[fuchsia::test]
470 async fn write_to_serial_splits_lines() {
471 let message = concat!(
472 "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
473 "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
474 "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
475 );
476 let log = LogsDataBuilder::new(BuilderArgs {
477 timestamp: BootInstant::from_nanos(123456789),
478 component_url: Some("url".into()),
479 moniker: "core/foo".try_into().unwrap(),
480 severity: Severity::Info,
481 })
482 .add_tag("bar")
483 .set_message(message)
484 .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
485 .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
486 .set_pid(1234)
487 .set_tid(5678)
488 .build();
489 let sink = FakeSink::default();
490 let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
491 writer.log(&log).await;
492 writer.get_barrier().wait().await;
493 sink.with_buffer(|b| {
494 assert_eq!(
495 str::from_utf8(b).unwrap(),
496 format!(
497 "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
498 &message[..218],
499 &message[218..]
500 )
501 )
502 });
503 }
504
505 #[fuchsia::test]
506 async fn when_no_tags_are_present_the_component_name_is_used() {
507 let log = LogsDataBuilder::new(BuilderArgs {
508 timestamp: BootInstant::from_nanos(123456789),
509 component_url: Some("url".into()),
510 moniker: "core/foo".try_into().unwrap(),
511 severity: Severity::Info,
512 })
513 .set_message("my msg")
514 .set_pid(1234)
515 .set_tid(5678)
516 .build();
517 let sink = FakeSink::default();
518 let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
519 writer.log(&log).await;
520 writer.flush();
521 writer.get_barrier().wait().await;
522 sink.with_buffer(|b| {
523 assert_eq!(str::from_utf8(b).unwrap(), "[00000.123] 01234:05678> [foo] INFO: my msg\n");
524 });
525 }
526
527 #[fuchsia::test]
528 async fn pauses_logs_correctly() {
529 let repo = LogsRepository::for_test(fasync::Scope::new());
530
531 let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
532 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
533 "fuchsia-pkg://bootstrap-foo",
534 )));
535 let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
536 ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
537 "fuchsia-pkg://bootstrap-bar",
538 )));
539
540 let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
541 ExtendedMoniker::parse_str("./core/foo").unwrap(),
542 "fuchsia-pkg://core-foo",
543 )));
544 let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
545 ExtendedMoniker::parse_str("./core/baz").unwrap(),
546 "fuchsia-pkg://core-baz",
547 )));
548
549 bootstrap_foo_container.ingest_message(make_message(
550 "a",
551 None,
552 zx::BootInstant::from_nanos(1),
553 ));
554
555 let (_flush_sender, flush_receiver) = unbounded();
556
557 core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
558 let (sink, mut rcv) = TestSink::new();
559 let cloned_repo = Arc::clone(&repo);
560 let (mut sender, receiver) = unbounded();
561 let _serial_task = fasync::Task::spawn(async move {
562 let allowed = vec!["bootstrap/**".into(), "/core/foo".into()];
563 let denied = vec!["foo".into()];
564 launch_serial(allowed, denied, cloned_repo, sink, receiver, flush_receiver).await;
565 });
566 bootstrap_bar_container.ingest_message(make_message(
567 "b",
568 Some("foo"),
569 zx::BootInstant::from_nanos(3),
570 ));
571
572 let received = rcv.next().await.unwrap();
573
574 assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: a\n");
575
576 let (tx, rx) = oneshot::channel();
577 sender.send(tx).await.unwrap();
578
579 let freeze_token = rx.await.unwrap();
580
581 core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
582
583 assert!(rcv.next().map(|_| false).on_timeout(Duration::from_millis(500), || true).await);
586
587 drop(freeze_token);
588
589 assert_eq!(rcv.next().await.unwrap(), "[00000.000] 00001:00002> [foo] DEBUG: c\n");
590 }
591
592 #[fuchsia::test]
593 async fn writes_ingested_logs() {
594 let serial_config = SerialConfig::new(vec!["bootstrap/**", "/core/foo"]);
595 let repo = LogsRepository::for_test(fasync::Scope::new());
596
597 let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
598 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
599 "fuchsia-pkg://bootstrap-foo",
600 )));
601 let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
602 ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
603 "fuchsia-pkg://bootstrap-bar",
604 )));
605
606 let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
607 ExtendedMoniker::parse_str("./core/foo").unwrap(),
608 "fuchsia-pkg://core-foo",
609 )));
610 let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
611 ExtendedMoniker::parse_str("./core/baz").unwrap(),
612 "fuchsia-pkg://core-baz",
613 )));
614
615 bootstrap_foo_container.ingest_message(make_message(
616 "a",
617 None,
618 zx::BootInstant::from_nanos(1),
619 ));
620 core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
621 let (sink, rcv) = TestSink::new();
622
623 let writer = SerialWriter::new(sink, HashSet::from_iter(["foo".to_string()]));
624 let (_flush_sender, flush_receiver) = unbounded();
625 let _serial_task = fasync::Task::spawn(serial_config.write_logs(
626 Arc::clone(&repo),
627 writer,
628 flush_receiver,
629 ));
630 bootstrap_bar_container.ingest_message(make_message(
631 "b",
632 Some("foo"),
633 zx::BootInstant::from_nanos(3),
634 ));
635 core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
636 let received: Vec<_> = rcv.take(2).collect().await;
637
638 assert_eq!(
642 received,
643 vec![
644 "[00000.000] 00001:00002> [foo] DEBUG: a\n",
645 "[00000.000] 00001:00002> [foo] DEBUG: c\n"
646 ]
647 );
648 }
649
650 #[fuchsia::test]
651 async fn flush_drains_all_logs() {
652 for _ in 0..500 {
653 let scope = fasync::Scope::new();
654 let repo = LogsRepository::for_test(scope.new_child());
655 let identity = Arc::new(ComponentIdentity::new(
656 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
657 "fuchsia-pkg://bootstrap-foo",
658 ));
659
660 let (log_sink, server_end) = create_proxy::<LogSinkMarker>();
661 let log_container = repo.get_log_container(identity);
662 log_container.handle_log_sink(server_end.into_stream(), scope.as_handle().clone());
663 let (client_socket, server_socket) = zx::Socket::create_datagram();
664 log_sink.connect_structured(server_socket).unwrap();
665
666 let (sink, mut rcv) = TestSink::new();
667 let cloned_repo = Arc::clone(&repo);
668 let (_freeze_sender, freeze_receiver) = unbounded();
669 let (flush_sender, flush_receiver) = unbounded();
670 scope.spawn(async move {
671 let allowed = vec!["bootstrap/**".into()];
672 let denied = vec![];
673 launch_serial(allowed, denied, cloned_repo, sink, freeze_receiver, flush_receiver)
674 .await;
675 });
676
677 let mut buffer = [0u8; 1024];
678 let cursor = std::io::Cursor::new(&mut buffer[..]);
679 let mut encoder = diagnostics_log_encoding::encode::Encoder::new(
680 cursor,
681 diagnostics_log_encoding::encode::EncoderOpts::default(),
682 );
683 log_sink.wait_for_interest_change().await.unwrap().unwrap();
685
686 encoder
687 .write_record(diagnostics_log_encoding::Record {
688 timestamp: zx::BootInstant::from_nanos(1),
689 severity: Severity::Info as u8,
690 arguments: vec![
691 diagnostics_log_encoding::Argument::new("tag", "foo"),
692 diagnostics_log_encoding::Argument::new("message", "a"),
693 ],
694 })
695 .unwrap();
696 client_socket
697 .write(&encoder.inner().get_ref()[..encoder.inner().position() as usize])
698 .unwrap();
699 let (sender, mut receiver) = unbounded();
700 flush_sender.unbounded_send(sender).unwrap();
702 assert_eq!(receiver.next().await, Some(()));
705 let received = rcv.next().now_or_never().unwrap().unwrap();
706 assert_eq!(received, "[00000.000] 00000:00000> [foo] INFO: a\n");
707 }
708 }
709}