1use crate::{PublishError, SeverityExt};
5use diagnostics_log_encoding::encode::{
6 Encoder, EncoderOpts, EncodingError, MutableBuffer, RecordEvent, TestRecord, WriteEventParams,
7};
8use diagnostics_log_encoding::{Argument, Metatag, RawSeverity};
9use fidl_fuchsia_logger::{LogSinkProxy, MAX_DATAGRAM_LEN_BYTES};
10use fuchsia_runtime as rt;
11use std::borrow::Cow;
12use std::collections::HashSet;
13use std::io::Cursor;
14use std::sync::atomic::{AtomicU32, Ordering};
15use zx::{self as zx, AsHandleRef};
16
17#[derive(Default)]
18pub(crate) struct SinkConfig {
19 pub(crate) metatags: HashSet<Metatag>,
20 pub(crate) retry_on_buffer_full: bool,
21 pub(crate) tags: Vec<String>,
22 pub(crate) always_log_file_line: bool,
23}
24
25thread_local! {
26 static PROCESS_ID: zx::Koid =
27 rt::process_self().get_koid().expect("couldn't read our own process koid");
28 static THREAD_ID: zx::Koid = rt::thread_self()
29 .get_koid()
30 .expect("couldn't read our own thread id");
31}
32
33pub(crate) struct Sink {
34 socket: zx::Socket,
35 num_events_dropped: AtomicU32,
36 config: SinkConfig,
37}
38
39impl Sink {
40 pub fn new(log_sink: &LogSinkProxy, config: SinkConfig) -> Result<Self, PublishError> {
41 let (socket, remote_socket) = zx::Socket::create_datagram();
42 log_sink.connect_structured(remote_socket).map_err(PublishError::SendSocket)?;
43 Ok(Self { socket, config, num_events_dropped: AtomicU32::new(0) })
44 }
45}
46
47impl Sink {
48 #[inline]
49 fn encode_and_send(
50 &self,
51 encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
52 ) {
53 let ordering = Ordering::Relaxed;
54 let previously_dropped = self.num_events_dropped.swap(0, ordering);
55 let restore_and_increment_dropped_count = || {
56 self.num_events_dropped.fetch_add(previously_dropped + 1, ordering);
57 };
58
59 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
60 let mut encoder = Encoder::new(
61 Cursor::new(&mut buf[..]),
62 EncoderOpts { always_log_file_line: self.config.always_log_file_line },
63 );
64 if encode(&mut encoder, previously_dropped).is_err() {
65 restore_and_increment_dropped_count();
66 return;
67 }
68
69 let end = encoder.inner().cursor();
70 let packet = &encoder.inner().get_ref()[..end];
71 self.send(packet, restore_and_increment_dropped_count);
72 }
73
74 fn send(&self, packet: &[u8], on_error: impl Fn()) {
75 while let Err(status) = self.socket.write(packet) {
76 if status != zx::Status::SHOULD_WAIT || !self.config.retry_on_buffer_full {
77 on_error();
78 break;
79 }
80 let Ok(signals) = self.socket.wait_handle(
81 zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
82 zx::MonotonicInstant::INFINITE,
83 ) else {
84 on_error();
85 break;
86 };
87 if signals.contains(zx::Signals::SOCKET_PEER_CLOSED) {
88 on_error();
89 break;
90 }
91 }
92 }
93
94 pub(crate) fn record_log(&self, record: &log::Record<'_>) {
95 self.encode_and_send(|encoder, previously_dropped| {
96 encoder.write_event(WriteEventParams {
97 event: LogEvent::new(record),
98 tags: &self.config.tags,
99 metatags: self.config.metatags.iter(),
100 pid: PROCESS_ID.with(|p| *p),
101 tid: THREAD_ID.with(|t| *t),
102 dropped: previously_dropped.into(),
103 })
104 });
105 }
106
107 pub fn event_for_testing(&self, record: TestRecord<'_>) {
108 self.encode_and_send(move |encoder, previously_dropped| {
109 encoder.write_event(WriteEventParams {
110 event: record,
111 tags: &self.config.tags,
112 metatags: std::iter::empty(),
113 pid: PROCESS_ID.with(|p| *p),
114 tid: THREAD_ID.with(|t| *t),
115 dropped: previously_dropped.into(),
116 })
117 });
118 }
119}
120
121#[doc(hidden)]
122pub struct LogEvent<'a> {
123 record: &'a log::Record<'a>,
124 timestamp: zx::BootInstant,
125}
126
127impl<'a> LogEvent<'a> {
128 pub fn new(record: &'a log::Record<'a>) -> Self {
129 Self { record, timestamp: zx::BootInstant::get() }
130 }
131}
132
133impl RecordEvent for LogEvent<'_> {
134 fn raw_severity(&self) -> RawSeverity {
135 self.record.metadata().raw_severity()
136 }
137
138 fn file(&self) -> Option<&str> {
139 self.record.file()
140 }
141
142 fn line(&self) -> Option<u32> {
143 self.record.line()
144 }
145
146 fn target(&self) -> &str {
147 self.record.target()
148 }
149
150 fn timestamp(&self) -> zx::BootInstant {
151 self.timestamp
152 }
153
154 fn write_arguments<B: MutableBuffer>(
155 self,
156 writer: &mut Encoder<B>,
157 ) -> Result<(), EncodingError> {
158 let args = self.record.args();
159 let message =
160 args.as_str().map(Cow::Borrowed).unwrap_or_else(|| Cow::Owned(args.to_string()));
161 writer.write_argument(Argument::message(message))?;
162 self.record
163 .key_values()
164 .visit(&mut KeyValuesVisitor(writer))
165 .map_err(EncodingError::other)?;
166 Ok(())
167 }
168}
169
170struct KeyValuesVisitor<'a, B>(&'a mut Encoder<B>);
171
172impl<B: MutableBuffer> log::kv::VisitSource<'_> for KeyValuesVisitor<'_, B> {
173 fn visit_pair(
174 &mut self,
175 key: log::kv::Key<'_>,
176 value: log::kv::Value<'_>,
177 ) -> Result<(), log::kv::Error> {
178 value.visit(ValueVisitor { encoder: self.0, key: key.as_str() })
179 }
180}
181
182struct ValueVisitor<'a, B> {
183 encoder: &'a mut Encoder<B>,
184 key: &'a str,
185}
186
187impl<B: MutableBuffer> log::kv::VisitValue<'_> for ValueVisitor<'_, B> {
188 fn visit_any(&mut self, value: log::kv::Value<'_>) -> Result<(), log::kv::Error> {
189 self.encoder
190 .write_raw_argument(self.key, format!("{value}"))
191 .map_err(log::kv::Error::boxed)?;
192 Ok(())
193 }
194
195 fn visit_null(&mut self) -> Result<(), log::kv::Error> {
196 self.encoder.write_raw_argument(self.key, "null").map_err(log::kv::Error::boxed)?;
197 Ok(())
198 }
199
200 fn visit_u64(&mut self, value: u64) -> Result<(), log::kv::Error> {
201 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
202 Ok(())
203 }
204
205 fn visit_i64(&mut self, value: i64) -> Result<(), log::kv::Error> {
206 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
207 Ok(())
208 }
209
210 fn visit_f64(&mut self, value: f64) -> Result<(), log::kv::Error> {
211 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
212 Ok(())
213 }
214
215 fn visit_bool(&mut self, value: bool) -> Result<(), log::kv::Error> {
216 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
217 Ok(())
218 }
219
220 fn visit_str(&mut self, value: &str) -> Result<(), log::kv::Error> {
221 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
222 Ok(())
223 }
224
225 }
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use crate::{increment_clock, log_every_n_seconds};
233 use diagnostics_log_encoding::parse::parse_record;
234 use diagnostics_log_encoding::{Argument, Record};
235 use diagnostics_log_types::Severity;
236 use fidl::endpoints::create_proxy_and_stream;
237 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest};
238 use futures::stream::StreamExt;
239 use futures::AsyncReadExt;
240 use log::{debug, error, info, trace, warn};
241 use std::sync::{Arc, Mutex};
242 use std::time::Duration;
243 use test_util::assert_gt;
244 use zx::Status;
245
246 const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
247
248 struct TestLogger {
249 sink: Sink,
250 }
251
252 impl TestLogger {
253 fn new(sink: Sink) -> Self {
254 Self { sink }
255 }
256 }
257
258 impl log::Log for TestLogger {
259 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
260 true
261 }
262
263 fn log(&self, record: &log::Record<'_>) {
264 if self.enabled(record.metadata()) {
265 self.sink.record_log(record);
266 }
267 }
268
269 fn flush(&self) {}
270 }
271
272 async fn init_sink(config: SinkConfig) -> fidl::Socket {
273 let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
274 let sink = Sink::new(&proxy, config).unwrap();
275 log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
276 log::set_max_level(log::LevelFilter::Info);
277
278 match requests.next().await.unwrap().unwrap() {
279 LogSinkRequest::ConnectStructured { socket, .. } => socket,
280 _ => panic!("sink ctor sent the wrong message"),
281 }
282 }
283
284 fn arg_prefix() -> Vec<Argument<'static>> {
285 vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
286 }
287
288 #[fuchsia::test(logging = false)]
289 async fn wait_and_retry_is_possible() {
290 const TOTAL_WRITES: usize = 32 * 5;
293 let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
294 std::thread::spawn(move || {
296 let sink = Sink::new(
297 &proxy,
298 SinkConfig { retry_on_buffer_full: true, ..SinkConfig::default() },
299 )
300 .unwrap();
301 for i in 0..TOTAL_WRITES {
302 let buf = [i as u8; MAX_DATAGRAM_LEN_BYTES as _];
303 sink.send(&buf, || unreachable!("We should never drop a log in this test"));
304 }
305 });
306 let socket = match requests.next().await.unwrap().unwrap() {
307 LogSinkRequest::ConnectStructured { socket, .. } => socket,
308 _ => panic!("sink ctor sent the wrong message"),
309 };
310 let mut socket = fuchsia_async::Socket::from_socket(socket);
311 for i in 0..TOTAL_WRITES {
314 let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
315 let len = socket.read(&mut buf).await.unwrap();
316 assert_eq!(len, MAX_DATAGRAM_LEN_BYTES as usize);
317 assert_eq!(buf, vec![i as u8; MAX_DATAGRAM_LEN_BYTES as _]);
318 std::thread::sleep(std::time::Duration::from_millis(50));
319 }
320 }
321
322 #[fuchsia::test(logging = false)]
323 async fn packets_are_sent() {
324 let socket = init_sink(SinkConfig {
325 metatags: HashSet::from([Metatag::Target]),
326 ..SinkConfig::default()
327 })
328 .await;
329 log::set_max_level(log::LevelFilter::Trace);
330 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
331 let mut next_message = || {
332 let len = socket.read(&mut buf).unwrap();
333 let (record, _) = parse_record(&buf[..len]).unwrap();
334 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
335 record.into_owned()
336 };
337
338 trace!(count = 123; "whoa this is noisy");
340 let observed_trace = next_message();
341 debug!(maybe = true; "don't try this at home");
342 let observed_debug = next_message();
343 info!("this is a message");
344 let observed_info = next_message();
345 warn!(reason = "just cuz"; "this is a warning");
346 let observed_warn = next_message();
347 error!(e = "something went pretty wrong"; "this is an error");
348 let error_line = line!() - 1;
349 let metatag = Argument::tag(TARGET);
350 let observed_error = next_message();
351
352 {
354 let mut expected_trace = Record {
355 timestamp: observed_trace.timestamp,
356 severity: Severity::Trace as u8,
357 arguments: arg_prefix(),
358 };
359 expected_trace.arguments.push(metatag.clone());
360 expected_trace.arguments.push(Argument::message("whoa this is noisy"));
361 expected_trace.arguments.push(Argument::new("count", 123));
362 assert_eq!(observed_trace, expected_trace);
363 }
364
365 {
367 let mut expected_debug = Record {
368 timestamp: observed_debug.timestamp,
369 severity: Severity::Debug as u8,
370 arguments: arg_prefix(),
371 };
372 expected_debug.arguments.push(metatag.clone());
373 expected_debug.arguments.push(Argument::message("don't try this at home"));
374 expected_debug.arguments.push(Argument::new("maybe", true));
375 assert_eq!(observed_debug, expected_debug);
376 }
377
378 {
380 let mut expected_info = Record {
381 timestamp: observed_info.timestamp,
382 severity: Severity::Info as u8,
383 arguments: arg_prefix(),
384 };
385 expected_info.arguments.push(metatag.clone());
386 expected_info.arguments.push(Argument::message("this is a message"));
387 assert_eq!(observed_info, expected_info);
388 }
389
390 {
392 let mut expected_warn = Record {
393 timestamp: observed_warn.timestamp,
394 severity: Severity::Warn as u8,
395 arguments: arg_prefix(),
396 };
397 expected_warn.arguments.push(metatag.clone());
398 expected_warn.arguments.push(Argument::message("this is a warning"));
399 expected_warn.arguments.push(Argument::new("reason", "just cuz"));
400 assert_eq!(observed_warn, expected_warn);
401 }
402
403 {
405 let mut expected_error = Record {
406 timestamp: observed_error.timestamp,
407 severity: Severity::Error as u8,
408 arguments: arg_prefix(),
409 };
410 expected_error
411 .arguments
412 .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
413 expected_error.arguments.push(Argument::line(error_line as u64));
414 expected_error.arguments.push(metatag);
415 expected_error.arguments.push(Argument::message("this is an error"));
416 expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
417 assert_eq!(observed_error, expected_error);
418 }
419 }
420
421 #[fuchsia::test(logging = false)]
422 async fn tags_are_sent() {
423 let socket = init_sink(SinkConfig {
424 tags: vec!["tags_are_sent".to_string()],
425 ..SinkConfig::default()
426 })
427 .await;
428 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
429 let mut next_message = || {
430 let len = socket.read(&mut buf).unwrap();
431 let (record, _) = parse_record(&buf[..len]).unwrap();
432 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
433 record.into_owned()
434 };
435
436 info!("this should have a tag");
437 let observed = next_message();
438
439 let mut expected = Record {
440 timestamp: observed.timestamp,
441 severity: Severity::Info as u8,
442 arguments: arg_prefix(),
443 };
444 expected.arguments.push(Argument::message("this should have a tag"));
445 expected.arguments.push(Argument::tag("tags_are_sent"));
446 assert_eq!(observed, expected);
447 }
448
449 #[fuchsia::test(logging = false)]
450 async fn log_every_n_seconds_test() {
451 let socket = init_sink(SinkConfig { ..SinkConfig::default() }).await;
452 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
453 let next_message = |buf: &mut [u8]| {
454 let len = socket.read(buf).unwrap();
455 let (record, _) = parse_record(&buf[..len]).unwrap();
456 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
457 record.into_owned()
458 };
459
460 let log_fn = || {
461 log_every_n_seconds!(5, INFO, "test message");
462 };
463
464 let expect_message = |buf: &mut [u8]| {
465 let observed = next_message(buf);
466
467 let mut expected = Record {
468 timestamp: observed.timestamp,
469 severity: Severity::Info as u8,
470 arguments: arg_prefix(),
471 };
472 expected.arguments.push(Argument::message("test message"));
473 assert_eq!(observed, expected);
474 };
475
476 log_fn();
477 expect_message(&mut buf);
479 log_fn();
480 assert_eq!(socket.read(&mut buf), Err(Status::SHOULD_WAIT));
483 increment_clock(Duration::from_secs(5));
484
485 log_fn();
487 expect_message(&mut buf);
488 }
489
490 #[fuchsia::test(logging = false)]
491 async fn drop_count_is_tracked() {
492 let socket = init_sink(SinkConfig::default()).await;
493 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
494 const MESSAGE_SIZE: usize = 104;
495 const MESSAGE_SIZE_WITH_DROPS: usize = 136;
496 const NUM_DROPPED: usize = 100;
497
498 let socket_capacity = || {
499 let info = socket.info().unwrap();
500 info.rx_buf_max - info.rx_buf_size
501 };
502 let emit_message = || info!("it's-a-me, a message-o");
503 let mut drain_message = |with_drops| {
504 let len = socket.read(&mut buf).unwrap();
505
506 let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
507 assert_eq!(len, expected_len, "constant message size is used to calculate thresholds");
508
509 let (record, _) = parse_record(&buf[..len]).unwrap();
510 let mut expected_args = arg_prefix();
511
512 if with_drops {
513 expected_args.push(Argument::dropped(NUM_DROPPED as u64));
514 }
515
516 expected_args.push(Argument::message("it's-a-me, a message-o"));
517
518 assert_eq!(
519 record,
520 Record {
521 timestamp: record.timestamp,
522 severity: Severity::Info as u8,
523 arguments: expected_args
524 }
525 );
526 };
527
528 let mut num_emitted = 0;
530 while socket_capacity() > MESSAGE_SIZE {
531 emit_message();
532 num_emitted += 1;
533 assert_eq!(
534 socket.info().unwrap().rx_buf_size,
535 num_emitted * MESSAGE_SIZE,
536 "incorrect bytes stored after {} messages sent",
537 num_emitted
538 );
539 }
540
541 for _ in 0..NUM_DROPPED {
543 emit_message();
544 }
545
546 drain_message(false);
553 drain_message(false);
554 num_emitted -= 2;
556 emit_message();
558 for _ in 0..num_emitted {
560 drain_message(false);
561 }
562 drain_message(true);
564
565 emit_message();
567 drain_message(false);
568 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "must drain all messages");
569 }
570
571 #[fuchsia::test(logging = false)]
572 async fn build_record_from_log_event() {
573 let before_timestamp = zx::BootInstant::get();
574 let last_record = Arc::new(Mutex::new(None));
575 let logger = TrackerLogger::new(last_record.clone());
576 log::set_boxed_logger(Box::new(logger)).expect("set logger");
577 log::set_max_level(log::LevelFilter::Info);
578 log::info!(
579 is_a_str = "hahaha",
580 is_debug:? = PrintMe(5),
581 is_signed = -500,
582 is_unsigned = 1000u64,
583 is_bool = false;
584 "blarg this is a message"
585 );
586
587 let guard = last_record.lock().unwrap();
588 let encoder = guard.as_ref().unwrap();
589 let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
590 assert_gt!(record.timestamp, before_timestamp);
591 assert_eq!(
592 record,
593 Record {
594 timestamp: record.timestamp,
595 severity: Severity::Info as u8,
596 arguments: vec![
597 Argument::pid(PROCESS_ID.with(|p| *p)),
598 Argument::tid(THREAD_ID.with(|p| *p)),
599 Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
600 Argument::message("blarg this is a message"),
601 Argument::other("is_a_str", "hahaha"),
602 Argument::other("is_debug", "PrintMe(5)"),
603 Argument::other("is_signed", -500),
604 Argument::other("is_unsigned", 1000u64),
605 Argument::other("is_bool", false),
606 Argument::tag("a-tag"),
607 ]
608 }
609 );
610 }
611
612 #[derive(Debug)]
614 struct PrintMe(#[allow(unused)] u32);
615
616 type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
617
618 struct TrackerLogger {
619 last_record: Arc<Mutex<Option<ByteEncoder>>>,
620 }
621
622 impl TrackerLogger {
623 fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
624 Self { last_record }
625 }
626 }
627
628 impl log::Log for TrackerLogger {
629 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
630 true
631 }
632
633 fn log(&self, record: &log::Record<'_>) {
634 let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
635 encoder
636 .write_event(WriteEventParams {
637 event: LogEvent::new(record),
638 tags: &["a-tag"],
639 metatags: [Metatag::Target].iter(),
640 pid: PROCESS_ID.with(|p| *p),
641 tid: THREAD_ID.with(|t| *t),
642 dropped: 0,
643 })
644 .expect("wrote event");
645 let mut last_record = self.last_record.lock().unwrap();
646 last_record.replace(encoder);
647 }
648
649 fn flush(&self) {}
650 }
651}