diagnostics_log/fuchsia/
sink.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::{PublishError, SeverityExt};
5use diagnostics_log_encoding::encode::{
6    Encoder, EncoderOpts, EncodingError, MutableBuffer, RecordEvent, TestRecord, WriteEventParams,
7};
8use diagnostics_log_encoding::{Argument, Metatag, RawSeverity};
9use fidl_fuchsia_logger::{LogSinkProxy, MAX_DATAGRAM_LEN_BYTES};
10use fuchsia_runtime as rt;
11use std::borrow::Cow;
12use std::collections::HashSet;
13use std::io::Cursor;
14use std::sync::atomic::{AtomicU32, Ordering};
15use zx::{self as zx, AsHandleRef};
16
17#[derive(Default)]
18pub(crate) struct SinkConfig {
19    pub(crate) metatags: HashSet<Metatag>,
20    pub(crate) retry_on_buffer_full: bool,
21    pub(crate) tags: Vec<String>,
22    pub(crate) always_log_file_line: bool,
23}
24
25thread_local! {
26    static PROCESS_ID: zx::Koid =
27        rt::process_self().get_koid().expect("couldn't read our own process koid");
28    static THREAD_ID: zx::Koid = rt::thread_self()
29        .get_koid()
30        .expect("couldn't read our own thread id");
31}
32
33pub(crate) struct Sink {
34    socket: zx::Socket,
35    num_events_dropped: AtomicU32,
36    config: SinkConfig,
37}
38
39impl Sink {
40    pub fn new(log_sink: &LogSinkProxy, config: SinkConfig) -> Result<Self, PublishError> {
41        let (socket, remote_socket) = zx::Socket::create_datagram();
42        log_sink.connect_structured(remote_socket).map_err(PublishError::SendSocket)?;
43        Ok(Self { socket, config, num_events_dropped: AtomicU32::new(0) })
44    }
45}
46
47impl Sink {
48    #[inline]
49    fn encode_and_send(
50        &self,
51        encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
52    ) {
53        let ordering = Ordering::Relaxed;
54        let previously_dropped = self.num_events_dropped.swap(0, ordering);
55        let restore_and_increment_dropped_count = || {
56            self.num_events_dropped.fetch_add(previously_dropped + 1, ordering);
57        };
58
59        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
60        let mut encoder = Encoder::new(
61            Cursor::new(&mut buf[..]),
62            EncoderOpts { always_log_file_line: self.config.always_log_file_line },
63        );
64        if encode(&mut encoder, previously_dropped).is_err() {
65            restore_and_increment_dropped_count();
66            return;
67        }
68
69        let end = encoder.inner().cursor();
70        let packet = &encoder.inner().get_ref()[..end];
71        self.send(packet, restore_and_increment_dropped_count);
72    }
73
74    fn send(&self, packet: &[u8], on_error: impl Fn()) {
75        while let Err(status) = self.socket.write(packet) {
76            if status != zx::Status::SHOULD_WAIT || !self.config.retry_on_buffer_full {
77                on_error();
78                break;
79            }
80            let Ok(signals) = self.socket.wait_handle(
81                zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
82                zx::MonotonicInstant::INFINITE,
83            ) else {
84                on_error();
85                break;
86            };
87            if signals.contains(zx::Signals::SOCKET_PEER_CLOSED) {
88                on_error();
89                break;
90            }
91        }
92    }
93
94    pub(crate) fn record_log(&self, record: &log::Record<'_>) {
95        self.encode_and_send(|encoder, previously_dropped| {
96            encoder.write_event(WriteEventParams {
97                event: LogEvent::new(record),
98                tags: &self.config.tags,
99                metatags: self.config.metatags.iter(),
100                pid: PROCESS_ID.with(|p| *p),
101                tid: THREAD_ID.with(|t| *t),
102                dropped: previously_dropped.into(),
103            })
104        });
105    }
106
107    pub fn event_for_testing(&self, record: TestRecord<'_>) {
108        self.encode_and_send(move |encoder, previously_dropped| {
109            encoder.write_event(WriteEventParams {
110                event: record,
111                tags: &self.config.tags,
112                metatags: std::iter::empty(),
113                pid: PROCESS_ID.with(|p| *p),
114                tid: THREAD_ID.with(|t| *t),
115                dropped: previously_dropped.into(),
116            })
117        });
118    }
119}
120
121#[doc(hidden)]
122pub struct LogEvent<'a> {
123    record: &'a log::Record<'a>,
124    timestamp: zx::BootInstant,
125}
126
127impl<'a> LogEvent<'a> {
128    pub fn new(record: &'a log::Record<'a>) -> Self {
129        Self { record, timestamp: zx::BootInstant::get() }
130    }
131}
132
133impl RecordEvent for LogEvent<'_> {
134    fn raw_severity(&self) -> RawSeverity {
135        self.record.metadata().raw_severity()
136    }
137
138    fn file(&self) -> Option<&str> {
139        self.record.file()
140    }
141
142    fn line(&self) -> Option<u32> {
143        self.record.line()
144    }
145
146    fn target(&self) -> &str {
147        self.record.target()
148    }
149
150    fn timestamp(&self) -> zx::BootInstant {
151        self.timestamp
152    }
153
154    fn write_arguments<B: MutableBuffer>(
155        self,
156        writer: &mut Encoder<B>,
157    ) -> Result<(), EncodingError> {
158        let args = self.record.args();
159        let message =
160            args.as_str().map(Cow::Borrowed).unwrap_or_else(|| Cow::Owned(args.to_string()));
161        writer.write_argument(Argument::message(message))?;
162        self.record
163            .key_values()
164            .visit(&mut KeyValuesVisitor(writer))
165            .map_err(EncodingError::other)?;
166        Ok(())
167    }
168}
169
170struct KeyValuesVisitor<'a, B>(&'a mut Encoder<B>);
171
172impl<B: MutableBuffer> log::kv::VisitSource<'_> for KeyValuesVisitor<'_, B> {
173    fn visit_pair(
174        &mut self,
175        key: log::kv::Key<'_>,
176        value: log::kv::Value<'_>,
177    ) -> Result<(), log::kv::Error> {
178        value.visit(ValueVisitor { encoder: self.0, key: key.as_str() })
179    }
180}
181
182struct ValueVisitor<'a, B> {
183    encoder: &'a mut Encoder<B>,
184    key: &'a str,
185}
186
187impl<B: MutableBuffer> log::kv::VisitValue<'_> for ValueVisitor<'_, B> {
188    fn visit_any(&mut self, value: log::kv::Value<'_>) -> Result<(), log::kv::Error> {
189        self.encoder
190            .write_raw_argument(self.key, format!("{value}"))
191            .map_err(log::kv::Error::boxed)?;
192        Ok(())
193    }
194
195    fn visit_null(&mut self) -> Result<(), log::kv::Error> {
196        self.encoder.write_raw_argument(self.key, "null").map_err(log::kv::Error::boxed)?;
197        Ok(())
198    }
199
200    fn visit_u64(&mut self, value: u64) -> Result<(), log::kv::Error> {
201        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
202        Ok(())
203    }
204
205    fn visit_i64(&mut self, value: i64) -> Result<(), log::kv::Error> {
206        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
207        Ok(())
208    }
209
210    fn visit_f64(&mut self, value: f64) -> Result<(), log::kv::Error> {
211        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
212        Ok(())
213    }
214
215    fn visit_bool(&mut self, value: bool) -> Result<(), log::kv::Error> {
216        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
217        Ok(())
218    }
219
220    fn visit_str(&mut self, value: &str) -> Result<(), log::kv::Error> {
221        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
222        Ok(())
223    }
224
225    // TODO(https://fxbug.dev/360919323): when we enable kv_std we must support visit_error and
226    // visit_borrowed_error.
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::{increment_clock, log_every_n_seconds};
233    use diagnostics_log_encoding::parse::parse_record;
234    use diagnostics_log_encoding::{Argument, Record};
235    use diagnostics_log_types::Severity;
236    use fidl::endpoints::create_proxy_and_stream;
237    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest};
238    use futures::stream::StreamExt;
239    use futures::AsyncReadExt;
240    use log::{debug, error, info, trace, warn};
241    use std::sync::{Arc, Mutex};
242    use std::time::Duration;
243    use test_util::assert_gt;
244    use zx::Status;
245
246    const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
247
248    struct TestLogger {
249        sink: Sink,
250    }
251
252    impl TestLogger {
253        fn new(sink: Sink) -> Self {
254            Self { sink }
255        }
256    }
257
258    impl log::Log for TestLogger {
259        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
260            true
261        }
262
263        fn log(&self, record: &log::Record<'_>) {
264            if self.enabled(record.metadata()) {
265                self.sink.record_log(record);
266            }
267        }
268
269        fn flush(&self) {}
270    }
271
272    async fn init_sink(config: SinkConfig) -> fidl::Socket {
273        let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
274        let sink = Sink::new(&proxy, config).unwrap();
275        log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
276        log::set_max_level(log::LevelFilter::Info);
277
278        match requests.next().await.unwrap().unwrap() {
279            LogSinkRequest::ConnectStructured { socket, .. } => socket,
280            _ => panic!("sink ctor sent the wrong message"),
281        }
282    }
283
284    fn arg_prefix() -> Vec<Argument<'static>> {
285        vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
286    }
287
288    #[fuchsia::test(logging = false)]
289    async fn wait_and_retry_is_possible() {
290        // 160 writes so we write 5 MB given that we write 32K each write. Without enabling
291        // retrying, this would lead to dropped logs.
292        const TOTAL_WRITES: usize = 32 * 5;
293        let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
294        // Writes a megabyte of data to the Sink.
295        std::thread::spawn(move || {
296            let sink = Sink::new(
297                &proxy,
298                SinkConfig { retry_on_buffer_full: true, ..SinkConfig::default() },
299            )
300            .unwrap();
301            for i in 0..TOTAL_WRITES {
302                let buf = [i as u8; MAX_DATAGRAM_LEN_BYTES as _];
303                sink.send(&buf, || unreachable!("We should never drop a log in this test"));
304            }
305        });
306        let socket = match requests.next().await.unwrap().unwrap() {
307            LogSinkRequest::ConnectStructured { socket, .. } => socket,
308            _ => panic!("sink ctor sent the wrong message"),
309        };
310        let mut socket = fuchsia_async::Socket::from_socket(socket);
311        // Ensure we are able to read all of the data written to the socket and we didn't drop
312        // anything.
313        for i in 0..TOTAL_WRITES {
314            let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
315            let len = socket.read(&mut buf).await.unwrap();
316            assert_eq!(len, MAX_DATAGRAM_LEN_BYTES as usize);
317            assert_eq!(buf, vec![i as u8; MAX_DATAGRAM_LEN_BYTES as _]);
318            std::thread::sleep(std::time::Duration::from_millis(50));
319        }
320    }
321
322    #[fuchsia::test(logging = false)]
323    async fn packets_are_sent() {
324        let socket = init_sink(SinkConfig {
325            metatags: HashSet::from([Metatag::Target]),
326            ..SinkConfig::default()
327        })
328        .await;
329        log::set_max_level(log::LevelFilter::Trace);
330        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
331        let mut next_message = || {
332            let len = socket.read(&mut buf).unwrap();
333            let (record, _) = parse_record(&buf[..len]).unwrap();
334            assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
335            record.into_owned()
336        };
337
338        // emit some expected messages and then we'll retrieve them for parsing
339        trace!(count = 123; "whoa this is noisy");
340        let observed_trace = next_message();
341        debug!(maybe = true; "don't try this at home");
342        let observed_debug = next_message();
343        info!("this is a message");
344        let observed_info = next_message();
345        warn!(reason = "just cuz"; "this is a warning");
346        let observed_warn = next_message();
347        error!(e = "something went pretty wrong"; "this is an error");
348        let error_line = line!() - 1;
349        let metatag = Argument::tag(TARGET);
350        let observed_error = next_message();
351
352        // TRACE
353        {
354            let mut expected_trace = Record {
355                timestamp: observed_trace.timestamp,
356                severity: Severity::Trace as u8,
357                arguments: arg_prefix(),
358            };
359            expected_trace.arguments.push(metatag.clone());
360            expected_trace.arguments.push(Argument::message("whoa this is noisy"));
361            expected_trace.arguments.push(Argument::new("count", 123));
362            assert_eq!(observed_trace, expected_trace);
363        }
364
365        // DEBUG
366        {
367            let mut expected_debug = Record {
368                timestamp: observed_debug.timestamp,
369                severity: Severity::Debug as u8,
370                arguments: arg_prefix(),
371            };
372            expected_debug.arguments.push(metatag.clone());
373            expected_debug.arguments.push(Argument::message("don't try this at home"));
374            expected_debug.arguments.push(Argument::new("maybe", true));
375            assert_eq!(observed_debug, expected_debug);
376        }
377
378        // INFO
379        {
380            let mut expected_info = Record {
381                timestamp: observed_info.timestamp,
382                severity: Severity::Info as u8,
383                arguments: arg_prefix(),
384            };
385            expected_info.arguments.push(metatag.clone());
386            expected_info.arguments.push(Argument::message("this is a message"));
387            assert_eq!(observed_info, expected_info);
388        }
389
390        // WARN
391        {
392            let mut expected_warn = Record {
393                timestamp: observed_warn.timestamp,
394                severity: Severity::Warn as u8,
395                arguments: arg_prefix(),
396            };
397            expected_warn.arguments.push(metatag.clone());
398            expected_warn.arguments.push(Argument::message("this is a warning"));
399            expected_warn.arguments.push(Argument::new("reason", "just cuz"));
400            assert_eq!(observed_warn, expected_warn);
401        }
402
403        // ERROR
404        {
405            let mut expected_error = Record {
406                timestamp: observed_error.timestamp,
407                severity: Severity::Error as u8,
408                arguments: arg_prefix(),
409            };
410            expected_error
411                .arguments
412                .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
413            expected_error.arguments.push(Argument::line(error_line as u64));
414            expected_error.arguments.push(metatag);
415            expected_error.arguments.push(Argument::message("this is an error"));
416            expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
417            assert_eq!(observed_error, expected_error);
418        }
419    }
420
421    #[fuchsia::test(logging = false)]
422    async fn tags_are_sent() {
423        let socket = init_sink(SinkConfig {
424            tags: vec!["tags_are_sent".to_string()],
425            ..SinkConfig::default()
426        })
427        .await;
428        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
429        let mut next_message = || {
430            let len = socket.read(&mut buf).unwrap();
431            let (record, _) = parse_record(&buf[..len]).unwrap();
432            assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
433            record.into_owned()
434        };
435
436        info!("this should have a tag");
437        let observed = next_message();
438
439        let mut expected = Record {
440            timestamp: observed.timestamp,
441            severity: Severity::Info as u8,
442            arguments: arg_prefix(),
443        };
444        expected.arguments.push(Argument::message("this should have a tag"));
445        expected.arguments.push(Argument::tag("tags_are_sent"));
446        assert_eq!(observed, expected);
447    }
448
449    #[fuchsia::test(logging = false)]
450    async fn log_every_n_seconds_test() {
451        let socket = init_sink(SinkConfig { ..SinkConfig::default() }).await;
452        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
453        let next_message = |buf: &mut [u8]| {
454            let len = socket.read(buf).unwrap();
455            let (record, _) = parse_record(&buf[..len]).unwrap();
456            assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
457            record.into_owned()
458        };
459
460        let log_fn = || {
461            log_every_n_seconds!(5, INFO, "test message");
462        };
463
464        let expect_message = |buf: &mut [u8]| {
465            let observed = next_message(buf);
466
467            let mut expected = Record {
468                timestamp: observed.timestamp,
469                severity: Severity::Info as u8,
470                arguments: arg_prefix(),
471            };
472            expected.arguments.push(Argument::message("test message"));
473            assert_eq!(observed, expected);
474        };
475
476        log_fn();
477        // First log call should result in a message.
478        expect_message(&mut buf);
479        log_fn();
480        // Subsequent log call in less than 5 seconds should NOT
481        // result in a message.
482        assert_eq!(socket.read(&mut buf), Err(Status::SHOULD_WAIT));
483        increment_clock(Duration::from_secs(5));
484
485        // Calling log_fn after 5 seconds should result in a message.
486        log_fn();
487        expect_message(&mut buf);
488    }
489
490    #[fuchsia::test(logging = false)]
491    async fn drop_count_is_tracked() {
492        let socket = init_sink(SinkConfig::default()).await;
493        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
494        const MESSAGE_SIZE: usize = 104;
495        const MESSAGE_SIZE_WITH_DROPS: usize = 136;
496        const NUM_DROPPED: usize = 100;
497
498        let socket_capacity = || {
499            let info = socket.info().unwrap();
500            info.rx_buf_max - info.rx_buf_size
501        };
502        let emit_message = || info!("it's-a-me, a message-o");
503        let mut drain_message = |with_drops| {
504            let len = socket.read(&mut buf).unwrap();
505
506            let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
507            assert_eq!(len, expected_len, "constant message size is used to calculate thresholds");
508
509            let (record, _) = parse_record(&buf[..len]).unwrap();
510            let mut expected_args = arg_prefix();
511
512            if with_drops {
513                expected_args.push(Argument::dropped(NUM_DROPPED as u64));
514            }
515
516            expected_args.push(Argument::message("it's-a-me, a message-o"));
517
518            assert_eq!(
519                record,
520                Record {
521                    timestamp: record.timestamp,
522                    severity: Severity::Info as u8,
523                    arguments: expected_args
524                }
525            );
526        };
527
528        // fill up the socket
529        let mut num_emitted = 0;
530        while socket_capacity() > MESSAGE_SIZE {
531            emit_message();
532            num_emitted += 1;
533            assert_eq!(
534                socket.info().unwrap().rx_buf_size,
535                num_emitted * MESSAGE_SIZE,
536                "incorrect bytes stored after {} messages sent",
537                num_emitted
538            );
539        }
540
541        // drop messages
542        for _ in 0..NUM_DROPPED {
543            emit_message();
544        }
545
546        // make space for a message to convey the drop count
547        // we drain two messages here because emitting the drop count adds to the size of the packet
548        // if we only drain one message then we're relying on the kernel's buffer size to satisfy
549        //   (rx_buf_max_size % MESSAGE_SIZE) > (MESSAGE_SIZE_WITH_DROPS - MESSAGE_SIZE)
550        // this is true at the time of writing of this test but we don't know whether that's a
551        // guarantee.
552        drain_message(false);
553        drain_message(false);
554        // we use this count below to drain the rest of the messages
555        num_emitted -= 2;
556        // convey the drop count, it's now at the tail of the socket
557        emit_message();
558        // drain remaining "normal" messages ahead of the drop count
559        for _ in 0..num_emitted {
560            drain_message(false);
561        }
562        // verify that messages were dropped
563        drain_message(true);
564
565        // check that we return to normal after reporting the drops
566        emit_message();
567        drain_message(false);
568        assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "must drain all messages");
569    }
570
571    #[fuchsia::test(logging = false)]
572    async fn build_record_from_log_event() {
573        let before_timestamp = zx::BootInstant::get();
574        let last_record = Arc::new(Mutex::new(None));
575        let logger = TrackerLogger::new(last_record.clone());
576        log::set_boxed_logger(Box::new(logger)).expect("set logger");
577        log::set_max_level(log::LevelFilter::Info);
578        log::info!(
579            is_a_str = "hahaha",
580            is_debug:? = PrintMe(5),
581            is_signed = -500,
582            is_unsigned = 1000u64,
583            is_bool = false;
584            "blarg this is a message"
585        );
586
587        let guard = last_record.lock().unwrap();
588        let encoder = guard.as_ref().unwrap();
589        let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
590        assert_gt!(record.timestamp, before_timestamp);
591        assert_eq!(
592            record,
593            Record {
594                timestamp: record.timestamp,
595                severity: Severity::Info as u8,
596                arguments: vec![
597                    Argument::pid(PROCESS_ID.with(|p| *p)),
598                    Argument::tid(THREAD_ID.with(|p| *p)),
599                    Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
600                    Argument::message("blarg this is a message"),
601                    Argument::other("is_a_str", "hahaha"),
602                    Argument::other("is_debug", "PrintMe(5)"),
603                    Argument::other("is_signed", -500),
604                    Argument::other("is_unsigned", 1000u64),
605                    Argument::other("is_bool", false),
606                    Argument::tag("a-tag"),
607                ]
608            }
609        );
610    }
611
612    // Note the inner u32 is used in the debug implementation.
613    #[derive(Debug)]
614    struct PrintMe(#[allow(unused)] u32);
615
616    type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
617
618    struct TrackerLogger {
619        last_record: Arc<Mutex<Option<ByteEncoder>>>,
620    }
621
622    impl TrackerLogger {
623        fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
624            Self { last_record }
625        }
626    }
627
628    impl log::Log for TrackerLogger {
629        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
630            true
631        }
632
633        fn log(&self, record: &log::Record<'_>) {
634            let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
635            encoder
636                .write_event(WriteEventParams {
637                    event: LogEvent::new(record),
638                    tags: &["a-tag"],
639                    metatags: [Metatag::Target].iter(),
640                    pid: PROCESS_ID.with(|p| *p),
641                    tid: THREAD_ID.with(|t| *t),
642                    dropped: 0,
643                })
644                .expect("wrote event");
645            let mut last_record = self.last_record.lock().unwrap();
646            last_record.replace(encoder);
647        }
648
649        fn flush(&self) {}
650    }
651}