1use crate::{PublishError, SeverityExt};
5use diagnostics_log_encoding::encode::{
6 Encoder, EncoderOpts, EncodingError, MutableBuffer, RecordEvent, TestRecord, WriteEventParams,
7};
8use diagnostics_log_encoding::{Argument, Metatag, RawSeverity};
9use fidl_fuchsia_logger::{LogSinkProxy, MAX_DATAGRAM_LEN_BYTES};
10use fuchsia_runtime as rt;
11use std::borrow::Cow;
12use std::collections::HashSet;
13use std::io::Cursor;
14use std::sync::atomic::{AtomicU32, Ordering};
15use zx::{self as zx, AsHandleRef};
16
17#[derive(Default)]
18pub(crate) struct SinkConfig {
19 pub(crate) metatags: HashSet<Metatag>,
20 pub(crate) retry_on_buffer_full: bool,
21 pub(crate) tags: Vec<String>,
22 pub(crate) always_log_file_line: bool,
23}
24
25thread_local! {
26 static PROCESS_ID: zx::Koid =
27 rt::process_self().get_koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
28 static THREAD_ID: zx::Koid = rt::thread_self()
29 .get_koid()
30 .unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
31}
32
33pub(crate) struct Sink {
34 socket: zx::Socket,
35 num_events_dropped: AtomicU32,
36 config: SinkConfig,
37}
38
39impl Sink {
40 pub fn new(log_sink: &LogSinkProxy, config: SinkConfig) -> Result<Self, PublishError> {
41 let (socket, remote_socket) = zx::Socket::create_datagram();
42 log_sink.connect_structured(remote_socket).map_err(PublishError::SendSocket)?;
43 Ok(Self { socket, config, num_events_dropped: AtomicU32::new(0) })
44 }
45}
46
47impl Sink {
48 #[inline]
49 fn encode_and_send(
50 &self,
51 encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
52 ) {
53 let ordering = Ordering::Relaxed;
54 let previously_dropped = self.num_events_dropped.swap(0, ordering);
55 let restore_and_increment_dropped_count = || {
56 self.num_events_dropped.fetch_add(previously_dropped + 1, ordering);
57 };
58
59 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
60 let mut encoder = Encoder::new(
61 Cursor::new(&mut buf[..]),
62 EncoderOpts { always_log_file_line: self.config.always_log_file_line },
63 );
64 if encode(&mut encoder, previously_dropped).is_err() {
65 restore_and_increment_dropped_count();
66 return;
67 }
68
69 let end = encoder.inner().cursor();
70 let packet = &encoder.inner().get_ref()[..end];
71 self.send(packet, restore_and_increment_dropped_count);
72 }
73
74 fn send(&self, packet: &[u8], on_error: impl Fn()) {
75 while let Err(status) = self.socket.write(packet) {
76 if status != zx::Status::SHOULD_WAIT || !self.config.retry_on_buffer_full {
77 on_error();
78 break;
79 }
80 let Ok(signals) = self
81 .socket
82 .wait_handle(
83 zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
84 zx::MonotonicInstant::INFINITE,
85 )
86 .to_result()
87 else {
88 on_error();
89 break;
90 };
91 if signals.contains(zx::Signals::SOCKET_PEER_CLOSED) {
92 on_error();
93 break;
94 }
95 }
96 }
97
98 pub(crate) fn record_log(&self, record: &log::Record<'_>) {
99 self.encode_and_send(|encoder, previously_dropped| {
100 encoder.write_event(WriteEventParams {
101 event: LogEvent::new(record),
102 tags: &self.config.tags,
103 metatags: self.config.metatags.iter(),
104 pid: PROCESS_ID.with(|p| *p),
105 tid: THREAD_ID.with(|t| *t),
106 dropped: previously_dropped.into(),
107 })
108 });
109 }
110
111 pub fn event_for_testing(&self, record: TestRecord<'_>) {
112 self.encode_and_send(move |encoder, previously_dropped| {
113 encoder.write_event(WriteEventParams {
114 event: record,
115 tags: &self.config.tags,
116 metatags: std::iter::empty(),
117 pid: PROCESS_ID.with(|p| *p),
118 tid: THREAD_ID.with(|t| *t),
119 dropped: previously_dropped.into(),
120 })
121 });
122 }
123}
124
125#[doc(hidden)]
126pub struct LogEvent<'a> {
127 record: &'a log::Record<'a>,
128 timestamp: zx::BootInstant,
129}
130
131impl<'a> LogEvent<'a> {
132 pub fn new(record: &'a log::Record<'a>) -> Self {
133 Self { record, timestamp: zx::BootInstant::get() }
134 }
135}
136
137impl RecordEvent for LogEvent<'_> {
138 fn raw_severity(&self) -> RawSeverity {
139 self.record.metadata().raw_severity()
140 }
141
142 fn file(&self) -> Option<&str> {
143 self.record.file()
144 }
145
146 fn line(&self) -> Option<u32> {
147 self.record.line()
148 }
149
150 fn target(&self) -> &str {
151 self.record.target()
152 }
153
154 fn timestamp(&self) -> zx::BootInstant {
155 self.timestamp
156 }
157
158 fn write_arguments<B: MutableBuffer>(
159 self,
160 writer: &mut Encoder<B>,
161 ) -> Result<(), EncodingError> {
162 let args = self.record.args();
163 let message =
164 args.as_str().map(Cow::Borrowed).unwrap_or_else(|| Cow::Owned(args.to_string()));
165 writer.write_argument(Argument::message(message))?;
166 self.record
167 .key_values()
168 .visit(&mut KeyValuesVisitor(writer))
169 .map_err(EncodingError::other)?;
170 Ok(())
171 }
172}
173
174struct KeyValuesVisitor<'a, B>(&'a mut Encoder<B>);
175
176impl<B: MutableBuffer> log::kv::VisitSource<'_> for KeyValuesVisitor<'_, B> {
177 fn visit_pair(
178 &mut self,
179 key: log::kv::Key<'_>,
180 value: log::kv::Value<'_>,
181 ) -> Result<(), log::kv::Error> {
182 value.visit(ValueVisitor { encoder: self.0, key: key.as_str() })
183 }
184}
185
186struct ValueVisitor<'a, B> {
187 encoder: &'a mut Encoder<B>,
188 key: &'a str,
189}
190
191impl<B: MutableBuffer> log::kv::VisitValue<'_> for ValueVisitor<'_, B> {
192 fn visit_any(&mut self, value: log::kv::Value<'_>) -> Result<(), log::kv::Error> {
193 self.encoder
194 .write_raw_argument(self.key, format!("{value}"))
195 .map_err(log::kv::Error::boxed)?;
196 Ok(())
197 }
198
199 fn visit_null(&mut self) -> Result<(), log::kv::Error> {
200 self.encoder.write_raw_argument(self.key, "null").map_err(log::kv::Error::boxed)?;
201 Ok(())
202 }
203
204 fn visit_u64(&mut self, value: u64) -> Result<(), log::kv::Error> {
205 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
206 Ok(())
207 }
208
209 fn visit_i64(&mut self, value: i64) -> Result<(), log::kv::Error> {
210 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
211 Ok(())
212 }
213
214 fn visit_f64(&mut self, value: f64) -> Result<(), log::kv::Error> {
215 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
216 Ok(())
217 }
218
219 fn visit_bool(&mut self, value: bool) -> Result<(), log::kv::Error> {
220 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
221 Ok(())
222 }
223
224 fn visit_str(&mut self, value: &str) -> Result<(), log::kv::Error> {
225 self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
226 Ok(())
227 }
228
229 }
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use crate::{increment_clock, log_every_n_seconds};
237 use diagnostics_log_encoding::parse::parse_record;
238 use diagnostics_log_encoding::{Argument, Record};
239 use diagnostics_log_types::Severity;
240 use fidl::endpoints::create_proxy_and_stream;
241 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest};
242 use futures::stream::StreamExt;
243 use futures::AsyncReadExt;
244 use log::{debug, error, info, trace, warn};
245 use std::sync::{Arc, Mutex};
246 use std::time::Duration;
247 use test_util::assert_gt;
248 use zx::Status;
249
250 const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
251
252 struct TestLogger {
253 sink: Sink,
254 }
255
256 impl TestLogger {
257 fn new(sink: Sink) -> Self {
258 Self { sink }
259 }
260 }
261
262 impl log::Log for TestLogger {
263 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
264 true
265 }
266
267 fn log(&self, record: &log::Record<'_>) {
268 if self.enabled(record.metadata()) {
269 self.sink.record_log(record);
270 }
271 }
272
273 fn flush(&self) {}
274 }
275
276 async fn init_sink(config: SinkConfig) -> fidl::Socket {
277 let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
278 let sink = Sink::new(&proxy, config).unwrap();
279 log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
280 log::set_max_level(log::LevelFilter::Info);
281
282 match requests.next().await.unwrap().unwrap() {
283 LogSinkRequest::ConnectStructured { socket, .. } => socket,
284 _ => panic!("sink ctor sent the wrong message"),
285 }
286 }
287
288 fn arg_prefix() -> Vec<Argument<'static>> {
289 vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
290 }
291
292 #[fuchsia::test(logging = false)]
293 async fn wait_and_retry_is_possible() {
294 const TOTAL_WRITES: usize = 32 * 5;
297 let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
298 std::thread::spawn(move || {
300 let sink = Sink::new(
301 &proxy,
302 SinkConfig { retry_on_buffer_full: true, ..SinkConfig::default() },
303 )
304 .unwrap();
305 for i in 0..TOTAL_WRITES {
306 let buf = [i as u8; MAX_DATAGRAM_LEN_BYTES as _];
307 sink.send(&buf, || unreachable!("We should never drop a log in this test"));
308 }
309 });
310 let socket = match requests.next().await.unwrap().unwrap() {
311 LogSinkRequest::ConnectStructured { socket, .. } => socket,
312 _ => panic!("sink ctor sent the wrong message"),
313 };
314 let mut socket = fuchsia_async::Socket::from_socket(socket);
315 for i in 0..TOTAL_WRITES {
318 let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
319 let len = socket.read(&mut buf).await.unwrap();
320 assert_eq!(len, MAX_DATAGRAM_LEN_BYTES as usize);
321 assert_eq!(buf, vec![i as u8; MAX_DATAGRAM_LEN_BYTES as _]);
322 std::thread::sleep(std::time::Duration::from_millis(50));
323 }
324 }
325
326 #[fuchsia::test(logging = false)]
327 async fn packets_are_sent() {
328 let socket = init_sink(SinkConfig {
329 metatags: HashSet::from([Metatag::Target]),
330 ..SinkConfig::default()
331 })
332 .await;
333 log::set_max_level(log::LevelFilter::Trace);
334 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
335 let mut next_message = || {
336 let len = socket.read(&mut buf).unwrap();
337 let (record, _) = parse_record(&buf[..len]).unwrap();
338 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
339 record.into_owned()
340 };
341
342 trace!(count = 123; "whoa this is noisy");
344 let observed_trace = next_message();
345 debug!(maybe = true; "don't try this at home");
346 let observed_debug = next_message();
347 info!("this is a message");
348 let observed_info = next_message();
349 warn!(reason = "just cuz"; "this is a warning");
350 let observed_warn = next_message();
351 error!(e = "something went pretty wrong"; "this is an error");
352 let error_line = line!() - 1;
353 let metatag = Argument::tag(TARGET);
354 let observed_error = next_message();
355
356 {
358 let mut expected_trace = Record {
359 timestamp: observed_trace.timestamp,
360 severity: Severity::Trace as u8,
361 arguments: arg_prefix(),
362 };
363 expected_trace.arguments.push(metatag.clone());
364 expected_trace.arguments.push(Argument::message("whoa this is noisy"));
365 expected_trace.arguments.push(Argument::new("count", 123));
366 assert_eq!(observed_trace, expected_trace);
367 }
368
369 {
371 let mut expected_debug = Record {
372 timestamp: observed_debug.timestamp,
373 severity: Severity::Debug as u8,
374 arguments: arg_prefix(),
375 };
376 expected_debug.arguments.push(metatag.clone());
377 expected_debug.arguments.push(Argument::message("don't try this at home"));
378 expected_debug.arguments.push(Argument::new("maybe", true));
379 assert_eq!(observed_debug, expected_debug);
380 }
381
382 {
384 let mut expected_info = Record {
385 timestamp: observed_info.timestamp,
386 severity: Severity::Info as u8,
387 arguments: arg_prefix(),
388 };
389 expected_info.arguments.push(metatag.clone());
390 expected_info.arguments.push(Argument::message("this is a message"));
391 assert_eq!(observed_info, expected_info);
392 }
393
394 {
396 let mut expected_warn = Record {
397 timestamp: observed_warn.timestamp,
398 severity: Severity::Warn as u8,
399 arguments: arg_prefix(),
400 };
401 expected_warn.arguments.push(metatag.clone());
402 expected_warn.arguments.push(Argument::message("this is a warning"));
403 expected_warn.arguments.push(Argument::new("reason", "just cuz"));
404 assert_eq!(observed_warn, expected_warn);
405 }
406
407 {
409 let mut expected_error = Record {
410 timestamp: observed_error.timestamp,
411 severity: Severity::Error as u8,
412 arguments: arg_prefix(),
413 };
414 expected_error
415 .arguments
416 .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
417 expected_error.arguments.push(Argument::line(error_line as u64));
418 expected_error.arguments.push(metatag);
419 expected_error.arguments.push(Argument::message("this is an error"));
420 expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
421 assert_eq!(observed_error, expected_error);
422 }
423 }
424
425 #[fuchsia::test(logging = false)]
426 async fn tags_are_sent() {
427 let socket = init_sink(SinkConfig {
428 tags: vec!["tags_are_sent".to_string()],
429 ..SinkConfig::default()
430 })
431 .await;
432 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
433 let mut next_message = || {
434 let len = socket.read(&mut buf).unwrap();
435 let (record, _) = parse_record(&buf[..len]).unwrap();
436 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
437 record.into_owned()
438 };
439
440 info!("this should have a tag");
441 let observed = next_message();
442
443 let mut expected = Record {
444 timestamp: observed.timestamp,
445 severity: Severity::Info as u8,
446 arguments: arg_prefix(),
447 };
448 expected.arguments.push(Argument::message("this should have a tag"));
449 expected.arguments.push(Argument::tag("tags_are_sent"));
450 assert_eq!(observed, expected);
451 }
452
453 #[fuchsia::test(logging = false)]
454 async fn log_every_n_seconds_test() {
455 let socket = init_sink(SinkConfig { ..SinkConfig::default() }).await;
456 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
457 let next_message = |buf: &mut [u8]| {
458 let len = socket.read(buf).unwrap();
459 let (record, _) = parse_record(&buf[..len]).unwrap();
460 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
461 record.into_owned()
462 };
463
464 let log_fn = || {
465 log_every_n_seconds!(5, INFO, "test message");
466 };
467
468 let expect_message = |buf: &mut [u8]| {
469 let observed = next_message(buf);
470
471 let mut expected = Record {
472 timestamp: observed.timestamp,
473 severity: Severity::Info as u8,
474 arguments: arg_prefix(),
475 };
476 expected.arguments.push(Argument::message("test message"));
477 assert_eq!(observed, expected);
478 };
479
480 log_fn();
481 expect_message(&mut buf);
483 log_fn();
484 assert_eq!(socket.read(&mut buf), Err(Status::SHOULD_WAIT));
487 increment_clock(Duration::from_secs(5));
488
489 log_fn();
491 expect_message(&mut buf);
492 }
493
494 #[fuchsia::test(logging = false)]
495 async fn drop_count_is_tracked() {
496 let socket = init_sink(SinkConfig::default()).await;
497 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
498 const MESSAGE_SIZE: usize = 104;
499 const MESSAGE_SIZE_WITH_DROPS: usize = 136;
500 const NUM_DROPPED: usize = 100;
501
502 let socket_capacity = || {
503 let info = socket.info().unwrap();
504 info.rx_buf_max - info.rx_buf_size
505 };
506 let emit_message = || info!("it's-a-me, a message-o");
507 let mut drain_message = |with_drops| {
508 let len = socket.read(&mut buf).unwrap();
509
510 let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
511 assert_eq!(len, expected_len, "constant message size is used to calculate thresholds");
512
513 let (record, _) = parse_record(&buf[..len]).unwrap();
514 let mut expected_args = arg_prefix();
515
516 if with_drops {
517 expected_args.push(Argument::dropped(NUM_DROPPED as u64));
518 }
519
520 expected_args.push(Argument::message("it's-a-me, a message-o"));
521
522 assert_eq!(
523 record,
524 Record {
525 timestamp: record.timestamp,
526 severity: Severity::Info as u8,
527 arguments: expected_args
528 }
529 );
530 };
531
532 let mut num_emitted = 0;
534 while socket_capacity() > MESSAGE_SIZE {
535 emit_message();
536 num_emitted += 1;
537 assert_eq!(
538 socket.info().unwrap().rx_buf_size,
539 num_emitted * MESSAGE_SIZE,
540 "incorrect bytes stored after {} messages sent",
541 num_emitted
542 );
543 }
544
545 for _ in 0..NUM_DROPPED {
547 emit_message();
548 }
549
550 drain_message(false);
557 drain_message(false);
558 num_emitted -= 2;
560 emit_message();
562 for _ in 0..num_emitted {
564 drain_message(false);
565 }
566 drain_message(true);
568
569 emit_message();
571 drain_message(false);
572 assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "must drain all messages");
573 }
574
575 #[fuchsia::test(logging = false)]
576 async fn build_record_from_log_event() {
577 let before_timestamp = zx::BootInstant::get();
578 let last_record = Arc::new(Mutex::new(None));
579 let logger = TrackerLogger::new(last_record.clone());
580 log::set_boxed_logger(Box::new(logger)).expect("set logger");
581 log::set_max_level(log::LevelFilter::Info);
582 log::info!(
583 is_a_str = "hahaha",
584 is_debug:? = PrintMe(5),
585 is_signed = -500,
586 is_unsigned = 1000u64,
587 is_bool = false;
588 "blarg this is a message"
589 );
590
591 let guard = last_record.lock().unwrap();
592 let encoder = guard.as_ref().unwrap();
593 let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
594 assert_gt!(record.timestamp, before_timestamp);
595 assert_eq!(
596 record,
597 Record {
598 timestamp: record.timestamp,
599 severity: Severity::Info as u8,
600 arguments: vec![
601 Argument::pid(PROCESS_ID.with(|p| *p)),
602 Argument::tid(THREAD_ID.with(|p| *p)),
603 Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
604 Argument::message("blarg this is a message"),
605 Argument::other("is_a_str", "hahaha"),
606 Argument::other("is_debug", "PrintMe(5)"),
607 Argument::other("is_signed", -500),
608 Argument::other("is_unsigned", 1000u64),
609 Argument::other("is_bool", false),
610 Argument::tag("a-tag"),
611 ]
612 }
613 );
614 }
615
616 #[derive(Debug)]
618 struct PrintMe(#[allow(unused)] u32);
619
620 type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
621
622 struct TrackerLogger {
623 last_record: Arc<Mutex<Option<ByteEncoder>>>,
624 }
625
626 impl TrackerLogger {
627 fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
628 Self { last_record }
629 }
630 }
631
632 impl log::Log for TrackerLogger {
633 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
634 true
635 }
636
637 fn log(&self, record: &log::Record<'_>) {
638 let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
639 encoder
640 .write_event(WriteEventParams {
641 event: LogEvent::new(record),
642 tags: &["a-tag"],
643 metatags: [Metatag::Target].iter(),
644 pid: PROCESS_ID.with(|p| *p),
645 tid: THREAD_ID.with(|t| *t),
646 dropped: 0,
647 })
648 .expect("wrote event");
649 let mut last_record = self.last_record.lock().unwrap();
650 last_record.replace(encoder);
651 }
652
653 fn flush(&self) {}
654 }
655}