1use crate::{PublishError, SeverityExt};
5use diagnostics_log_encoding::encode::{
6 Encoder, EncoderOpts, EncodingError, MutableBuffer, RecordEvent, TestRecord, WriteEventParams,
7};
8use diagnostics_log_encoding::{Argument, Metatag, RawSeverity};
9use fidl::endpoints::ClientEnd;
10use fidl_fuchsia_logger::{LogSinkMarker, LogSinkSynchronousProxy, MAX_DATAGRAM_LEN_BYTES};
11use fuchsia_runtime as rt;
12use std::borrow::Cow;
13use std::collections::HashSet;
14use std::io::Cursor;
15use std::sync::atomic::{AtomicU32, Ordering};
16use zx::{self as zx, AsHandleRef};
17
18#[derive(Default)]
19pub(crate) struct SinkConfig {
20 pub(crate) metatags: HashSet<Metatag>,
21 pub(crate) retry_on_buffer_full: bool,
22 pub(crate) tags: Vec<String>,
23 pub(crate) always_log_file_line: bool,
24}
25
26thread_local! {
27 static PROCESS_ID: zx::Koid =
28 rt::process_self().get_koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
29 static THREAD_ID: zx::Koid = rt::with_thread_self(|thread| {
30 thread.get_koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX))
31 });
32}
33
34pub(crate) struct Sink {
35 socket: zx::Socket,
36 num_events_dropped: AtomicU32,
37 config: SinkConfig,
38}
39
40impl Sink {
41 pub fn new(
42 log_sink: &ClientEnd<LogSinkMarker>,
43 config: SinkConfig,
44 ) -> Result<Self, PublishError> {
45 let (socket, remote_socket) = zx::Socket::create_datagram();
46 let log_sink = zx::Unowned::<LogSinkSynchronousProxy>::new(log_sink.channel());
47 log_sink.connect_structured(remote_socket).map_err(PublishError::SendSocket)?;
48 Ok(Self { socket, config, num_events_dropped: AtomicU32::new(0) })
49 }
50}
51
52impl Sink {
53 #[inline]
54 fn encode_and_send(
55 &self,
56 encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
57 ) {
58 let ordering = Ordering::Relaxed;
59 let previously_dropped = self.num_events_dropped.swap(0, ordering);
60 let restore_and_increment_dropped_count = || {
61 self.num_events_dropped.fetch_add(previously_dropped + 1, ordering);
62 };
63
64 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
65 let mut encoder = Encoder::new(
66 Cursor::new(&mut buf[..]),
67 EncoderOpts { always_log_file_line: self.config.always_log_file_line },
68 );
69 if encode(&mut encoder, previously_dropped).is_err() {
70 restore_and_increment_dropped_count();
71 return;
72 }
73
74 let end = encoder.inner().cursor();
75 let packet = &encoder.inner().get_ref()[..end];
76 self.send(packet, restore_and_increment_dropped_count);
77 }
78
79 fn send(&self, packet: &[u8], on_error: impl Fn()) {
80 while let Err(status) = self.socket.write(packet) {
81 if status != zx::Status::SHOULD_WAIT || !self.config.retry_on_buffer_full {
82 on_error();
83 break;
84 }
85 let Ok(signals) = self
86 .socket
87 .wait_handle(
88 zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
89 zx::MonotonicInstant::INFINITE,
90 )
91 .to_result()
92 else {
93 on_error();
94 break;
95 };
96 if signals.contains(zx::Signals::SOCKET_PEER_CLOSED) {
97 on_error();
98 break;
99 }
100 }
101 }
102
103 pub(crate) fn record_log(&self, record: &log::Record<'_>) {
104 self.encode_and_send(|encoder, previously_dropped| {
105 encoder.write_event(WriteEventParams {
106 event: LogEvent::new(record),
107 tags: &self.config.tags,
108 metatags: self.config.metatags.iter(),
109 pid: PROCESS_ID.with(|p| *p),
110 tid: THREAD_ID.with(|t| *t),
111 dropped: previously_dropped.into(),
112 })
113 });
114 }
115
116 pub fn event_for_testing(&self, record: TestRecord<'_>) {
117 self.encode_and_send(move |encoder, previously_dropped| {
118 encoder.write_event(WriteEventParams {
119 event: record,
120 tags: &self.config.tags,
121 metatags: std::iter::empty(),
122 pid: PROCESS_ID.with(|p| *p),
123 tid: THREAD_ID.with(|t| *t),
124 dropped: previously_dropped.into(),
125 })
126 });
127 }
128}
129
130#[doc(hidden)]
131pub struct LogEvent<'a> {
132 record: &'a log::Record<'a>,
133 timestamp: zx::BootInstant,
134}
135
136impl<'a> LogEvent<'a> {
137 pub fn new(record: &'a log::Record<'a>) -> Self {
138 Self { record, timestamp: zx::BootInstant::get() }
139 }
140}
141
142impl RecordEvent for LogEvent<'_> {
143 fn raw_severity(&self) -> RawSeverity {
144 self.record.metadata().raw_severity()
145 }
146
147 fn file(&self) -> Option<&str> {
148 self.record.file()
149 }
150
151 fn line(&self) -> Option<u32> {
152 self.record.line()
153 }
154
155 fn target(&self) -> &str {
156 self.record.target()
157 }
158
159 fn timestamp(&self) -> zx::BootInstant {
160 self.timestamp
161 }
162
163 fn write_arguments<B: MutableBuffer>(
164 self,
165 writer: &mut Encoder<B>,
166 ) -> Result<(), EncodingError> {
167 let args = self.record.args();
168 let message =
169 args.as_str().map(Cow::Borrowed).unwrap_or_else(|| Cow::Owned(args.to_string()));
170 writer.write_argument(Argument::message(message))?;
171 self.record
172 .key_values()
173 .visit(&mut KeyValuesVisitor(writer))
174 .map_err(EncodingError::other)?;
175 Ok(())
176 }
177}
178
179struct KeyValuesVisitor<'a, B>(&'a mut Encoder<B>);
180
181impl<B: MutableBuffer> log::kv::VisitSource<'_> for KeyValuesVisitor<'_, B> {
182 fn visit_pair(
183 &mut self,
184 key: log::kv::Key<'_>,
185 value: log::kv::Value<'_>,
186 ) -> Result<(), log::kv::Error> {
187 value.visit(ValueVisitor { encoder: self.0, key: key.as_str() })
188 }
189}
190
191struct ValueVisitor<'a, B> {
192 encoder: &'a mut Encoder<B>,
193 key: &'a str,
194}
195
196impl<B: MutableBuffer> log::kv::VisitValue<'_> for ValueVisitor<'_, B> {
197 fn visit_any(&mut self, value: log::kv::Value<'_>) -> Result<(), log::kv::Error> {
198 self.encoder
199 .write_raw_argument(self.key, format!("{value}"))
200 .map_err(log::kv::Error::boxed)?;
201 Ok(())
202 }
203
204 fn visit_null(&mut self) -> Result<(), log::kv::Error> {
205 self.encoder.write_raw_argument(self.key, "null").map_err(log::kv::Error::boxed)?;
206 Ok(())
207 }
208
209 fn visit_u64(&mut self, value: u64) -> Result<(), log::kv::Error> {
210 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
211 Ok(())
212 }
213
214 fn visit_i64(&mut self, value: i64) -> Result<(), log::kv::Error> {
215 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
216 Ok(())
217 }
218
219 fn visit_f64(&mut self, value: f64) -> Result<(), log::kv::Error> {
220 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
221 Ok(())
222 }
223
224 fn visit_bool(&mut self, value: bool) -> Result<(), log::kv::Error> {
225 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
226 Ok(())
227 }
228
229 fn visit_str(&mut self, value: &str) -> Result<(), log::kv::Error> {
230 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
231 Ok(())
232 }
233
234 }
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use crate::{increment_clock, log_every_n_seconds};
242 use diagnostics_log_encoding::parse::parse_record;
243 use diagnostics_log_encoding::{Argument, Record};
244 use diagnostics_log_types::Severity;
245 use fidl::endpoints::create_request_stream;
246 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest};
247 use futures::stream::StreamExt;
248 use futures::AsyncReadExt;
249 use log::{debug, error, info, trace, warn};
250 use std::sync::{Arc, Mutex};
251 use std::time::Duration;
252 use test_util::assert_gt;
253 use zx::Status;
254
255 const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
256
257 struct TestLogger {
258 sink: Sink,
259 }
260
261 impl TestLogger {
262 fn new(sink: Sink) -> Self {
263 Self { sink }
264 }
265 }
266
267 impl log::Log for TestLogger {
268 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
269 true
270 }
271
272 fn log(&self, record: &log::Record<'_>) {
273 if self.enabled(record.metadata()) {
274 self.sink.record_log(record);
275 }
276 }
277
278 fn flush(&self) {}
279 }
280
281 async fn init_sink(config: SinkConfig) -> fidl::Socket {
282 let (client, mut requests) = create_request_stream::<LogSinkMarker>();
283 let sink = Sink::new(&client, config).unwrap();
284 log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
285 log::set_max_level(log::LevelFilter::Info);
286
287 match requests.next().await.unwrap().unwrap() {
288 LogSinkRequest::ConnectStructured { socket, .. } => socket,
289 _ => panic!("sink ctor sent the wrong message"),
290 }
291 }
292
293 fn arg_prefix() -> Vec<Argument<'static>> {
294 vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
295 }
296
297 #[fuchsia::test(logging = false)]
298 async fn wait_and_retry_is_possible() {
299 const TOTAL_WRITES: usize = 32 * 5;
302 let (client, mut requests) = create_request_stream::<LogSinkMarker>();
303 std::thread::spawn(move || {
305 let sink = Sink::new(
306 &client,
307 SinkConfig { retry_on_buffer_full: true, ..SinkConfig::default() },
308 )
309 .unwrap();
310 for i in 0..TOTAL_WRITES {
311 let buf = [i as u8; MAX_DATAGRAM_LEN_BYTES as _];
312 sink.send(&buf, || unreachable!("We should never drop a log in this test"));
313 }
314 });
315 let socket = match requests.next().await.unwrap().unwrap() {
316 LogSinkRequest::ConnectStructured { socket, .. } => socket,
317 _ => panic!("sink ctor sent the wrong message"),
318 };
319 let mut socket = fuchsia_async::Socket::from_socket(socket);
320 for i in 0..TOTAL_WRITES {
323 let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
324 let len = socket.read(&mut buf).await.unwrap();
325 assert_eq!(len, MAX_DATAGRAM_LEN_BYTES as usize);
326 assert_eq!(buf, vec![i as u8; MAX_DATAGRAM_LEN_BYTES as _]);
327 std::thread::sleep(std::time::Duration::from_millis(50));
328 }
329 }
330
331 #[fuchsia::test(logging = false)]
332 async fn packets_are_sent() {
333 let socket = init_sink(SinkConfig {
334 metatags: HashSet::from([Metatag::Target]),
335 ..SinkConfig::default()
336 })
337 .await;
338 log::set_max_level(log::LevelFilter::Trace);
339 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
340 let mut next_message = || {
341 let len = socket.read(&mut buf).unwrap();
342 let (record, _) = parse_record(&buf[..len]).unwrap();
343 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
344 record.into_owned()
345 };
346
347 trace!(count = 123; "whoa this is noisy");
349 let observed_trace = next_message();
350 debug!(maybe = true; "don't try this at home");
351 let observed_debug = next_message();
352 info!("this is a message");
353 let observed_info = next_message();
354 warn!(reason = "just cuz"; "this is a warning");
355 let observed_warn = next_message();
356 error!(e = "something went pretty wrong"; "this is an error");
357 let error_line = line!() - 1;
358 let metatag = Argument::tag(TARGET);
359 let observed_error = next_message();
360
361 {
363 let mut expected_trace = Record {
364 timestamp: observed_trace.timestamp,
365 severity: Severity::Trace as u8,
366 arguments: arg_prefix(),
367 };
368 expected_trace.arguments.push(metatag.clone());
369 expected_trace.arguments.push(Argument::message("whoa this is noisy"));
370 expected_trace.arguments.push(Argument::new("count", 123));
371 assert_eq!(observed_trace, expected_trace);
372 }
373
374 {
376 let mut expected_debug = Record {
377 timestamp: observed_debug.timestamp,
378 severity: Severity::Debug as u8,
379 arguments: arg_prefix(),
380 };
381 expected_debug.arguments.push(metatag.clone());
382 expected_debug.arguments.push(Argument::message("don't try this at home"));
383 expected_debug.arguments.push(Argument::new("maybe", true));
384 assert_eq!(observed_debug, expected_debug);
385 }
386
387 {
389 let mut expected_info = Record {
390 timestamp: observed_info.timestamp,
391 severity: Severity::Info as u8,
392 arguments: arg_prefix(),
393 };
394 expected_info.arguments.push(metatag.clone());
395 expected_info.arguments.push(Argument::message("this is a message"));
396 assert_eq!(observed_info, expected_info);
397 }
398
399 {
401 let mut expected_warn = Record {
402 timestamp: observed_warn.timestamp,
403 severity: Severity::Warn as u8,
404 arguments: arg_prefix(),
405 };
406 expected_warn.arguments.push(metatag.clone());
407 expected_warn.arguments.push(Argument::message("this is a warning"));
408 expected_warn.arguments.push(Argument::new("reason", "just cuz"));
409 assert_eq!(observed_warn, expected_warn);
410 }
411
412 {
414 let mut expected_error = Record {
415 timestamp: observed_error.timestamp,
416 severity: Severity::Error as u8,
417 arguments: arg_prefix(),
418 };
419 expected_error
420 .arguments
421 .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
422 expected_error.arguments.push(Argument::line(error_line as u64));
423 expected_error.arguments.push(metatag);
424 expected_error.arguments.push(Argument::message("this is an error"));
425 expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
426 assert_eq!(observed_error, expected_error);
427 }
428 }
429
430 #[fuchsia::test(logging = false)]
431 async fn tags_are_sent() {
432 let socket = init_sink(SinkConfig {
433 tags: vec!["tags_are_sent".to_string()],
434 ..SinkConfig::default()
435 })
436 .await;
437 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
438 let mut next_message = || {
439 let len = socket.read(&mut buf).unwrap();
440 let (record, _) = parse_record(&buf[..len]).unwrap();
441 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
442 record.into_owned()
443 };
444
445 info!("this should have a tag");
446 let observed = next_message();
447
448 let mut expected = Record {
449 timestamp: observed.timestamp,
450 severity: Severity::Info as u8,
451 arguments: arg_prefix(),
452 };
453 expected.arguments.push(Argument::message("this should have a tag"));
454 expected.arguments.push(Argument::tag("tags_are_sent"));
455 assert_eq!(observed, expected);
456 }
457
458 #[fuchsia::test(logging = false)]
459 async fn log_every_n_seconds_test() {
460 let socket = init_sink(SinkConfig { ..SinkConfig::default() }).await;
461 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
462 let next_message = |buf: &mut [u8]| {
463 let len = socket.read(buf).unwrap();
464 let (record, _) = parse_record(&buf[..len]).unwrap();
465 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
466 record.into_owned()
467 };
468
469 let log_fn = || {
470 log_every_n_seconds!(5, INFO, "test message");
471 };
472
473 let expect_message = |buf: &mut [u8]| {
474 let observed = next_message(buf);
475
476 let mut expected = Record {
477 timestamp: observed.timestamp,
478 severity: Severity::Info as u8,
479 arguments: arg_prefix(),
480 };
481 expected.arguments.push(Argument::message("test message"));
482 assert_eq!(observed, expected);
483 };
484
485 log_fn();
486 expect_message(&mut buf);
488 log_fn();
489 assert_eq!(socket.read(&mut buf), Err(Status::SHOULD_WAIT));
492 increment_clock(Duration::from_secs(5));
493
494 log_fn();
496 expect_message(&mut buf);
497 }
498
499 #[fuchsia::test(logging = false)]
500 async fn drop_count_is_tracked() {
501 let socket = init_sink(SinkConfig::default()).await;
502 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
503 const MESSAGE_SIZE: usize = 104;
504 const MESSAGE_SIZE_WITH_DROPS: usize = 136;
505 const NUM_DROPPED: usize = 100;
506
507 let socket_capacity = || {
508 let info = socket.info().unwrap();
509 info.rx_buf_max - info.rx_buf_size
510 };
511 let emit_message = || info!("it's-a-me, a message-o");
512 let mut drain_message = |with_drops| {
513 let len = socket.read(&mut buf).unwrap();
514
515 let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
516 assert_eq!(len, expected_len, "constant message size is used to calculate thresholds");
517
518 let (record, _) = parse_record(&buf[..len]).unwrap();
519 let mut expected_args = arg_prefix();
520
521 if with_drops {
522 expected_args.push(Argument::dropped(NUM_DROPPED as u64));
523 }
524
525 expected_args.push(Argument::message("it's-a-me, a message-o"));
526
527 assert_eq!(
528 record,
529 Record {
530 timestamp: record.timestamp,
531 severity: Severity::Info as u8,
532 arguments: expected_args
533 }
534 );
535 };
536
537 let mut num_emitted = 0;
539 while socket_capacity() > MESSAGE_SIZE {
540 emit_message();
541 num_emitted += 1;
542 assert_eq!(
543 socket.info().unwrap().rx_buf_size,
544 num_emitted * MESSAGE_SIZE,
545 "incorrect bytes stored after {} messages sent",
546 num_emitted
547 );
548 }
549
550 for _ in 0..NUM_DROPPED {
552 emit_message();
553 }
554
555 drain_message(false);
562 drain_message(false);
563 num_emitted -= 2;
565 emit_message();
567 for _ in 0..num_emitted {
569 drain_message(false);
570 }
571 drain_message(true);
573
574 emit_message();
576 drain_message(false);
577 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "must drain all messages");
578 }
579
580 #[fuchsia::test(logging = false)]
581 async fn build_record_from_log_event() {
582 let before_timestamp = zx::BootInstant::get();
583 let last_record = Arc::new(Mutex::new(None));
584 let logger = TrackerLogger::new(last_record.clone());
585 log::set_boxed_logger(Box::new(logger)).expect("set logger");
586 log::set_max_level(log::LevelFilter::Info);
587 log::info!(
588 is_a_str = "hahaha",
589 is_debug:? = PrintMe(5),
590 is_signed = -500,
591 is_unsigned = 1000u64,
592 is_bool = false;
593 "blarg this is a message"
594 );
595
596 let guard = last_record.lock().unwrap();
597 let encoder = guard.as_ref().unwrap();
598 let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
599 assert_gt!(record.timestamp, before_timestamp);
600 assert_eq!(
601 record,
602 Record {
603 timestamp: record.timestamp,
604 severity: Severity::Info as u8,
605 arguments: vec![
606 Argument::pid(PROCESS_ID.with(|p| *p)),
607 Argument::tid(THREAD_ID.with(|p| *p)),
608 Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
609 Argument::message("blarg this is a message"),
610 Argument::other("is_a_str", "hahaha"),
611 Argument::other("is_debug", "PrintMe(5)"),
612 Argument::other("is_signed", -500),
613 Argument::other("is_unsigned", 1000u64),
614 Argument::other("is_bool", false),
615 Argument::tag("a-tag"),
616 ]
617 }
618 );
619 }
620
621 #[derive(Debug)]
623 struct PrintMe(#[allow(unused)] u32);
624
625 type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
626
627 struct TrackerLogger {
628 last_record: Arc<Mutex<Option<ByteEncoder>>>,
629 }
630
631 impl TrackerLogger {
632 fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
633 Self { last_record }
634 }
635 }
636
637 impl log::Log for TrackerLogger {
638 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
639 true
640 }
641
642 fn log(&self, record: &log::Record<'_>) {
643 let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
644 encoder
645 .write_event(WriteEventParams {
646 event: LogEvent::new(record),
647 tags: &["a-tag"],
648 metatags: [Metatag::Target].iter(),
649 pid: PROCESS_ID.with(|p| *p),
650 tid: THREAD_ID.with(|t| *t),
651 dropped: 0,
652 })
653 .expect("wrote event");
654 let mut last_record = self.last_record.lock().unwrap();
655 last_record.replace(encoder);
656 }
657
658 fn flush(&self) {}
659 }
660}