diagnostics_log/fuchsia/
sink.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::{PublishError, SeverityExt};
5use diagnostics_log_encoding::encode::{
6    Encoder, EncoderOpts, EncodingError, MutableBuffer, RecordEvent, TestRecord, WriteEventParams,
7};
8use diagnostics_log_encoding::{Argument, Metatag, RawSeverity};
9use fidl_fuchsia_logger::{LogSinkProxy, MAX_DATAGRAM_LEN_BYTES};
10use fuchsia_runtime as rt;
11use std::borrow::Cow;
12use std::collections::HashSet;
13use std::io::Cursor;
14use std::sync::atomic::{AtomicU32, Ordering};
15use zx::{self as zx, AsHandleRef};
16
17#[derive(Default)]
18pub(crate) struct SinkConfig {
19    pub(crate) metatags: HashSet<Metatag>,
20    pub(crate) retry_on_buffer_full: bool,
21    pub(crate) tags: Vec<String>,
22    pub(crate) always_log_file_line: bool,
23}
24
25thread_local! {
26    static PROCESS_ID: zx::Koid =
27        rt::process_self().get_koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
28    static THREAD_ID: zx::Koid = rt::thread_self()
29        .get_koid()
30        .unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
31}
32
33pub(crate) struct Sink {
34    socket: zx::Socket,
35    num_events_dropped: AtomicU32,
36    config: SinkConfig,
37}
38
39impl Sink {
40    pub fn new(log_sink: &LogSinkProxy, config: SinkConfig) -> Result<Self, PublishError> {
41        let (socket, remote_socket) = zx::Socket::create_datagram();
42        log_sink.connect_structured(remote_socket).map_err(PublishError::SendSocket)?;
43        Ok(Self { socket, config, num_events_dropped: AtomicU32::new(0) })
44    }
45}
46
47impl Sink {
48    #[inline]
49    fn encode_and_send(
50        &self,
51        encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
52    ) {
53        let ordering = Ordering::Relaxed;
54        let previously_dropped = self.num_events_dropped.swap(0, ordering);
55        let restore_and_increment_dropped_count = || {
56            self.num_events_dropped.fetch_add(previously_dropped + 1, ordering);
57        };
58
59        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
60        let mut encoder = Encoder::new(
61            Cursor::new(&mut buf[..]),
62            EncoderOpts { always_log_file_line: self.config.always_log_file_line },
63        );
64        if encode(&mut encoder, previously_dropped).is_err() {
65            restore_and_increment_dropped_count();
66            return;
67        }
68
69        let end = encoder.inner().cursor();
70        let packet = &encoder.inner().get_ref()[..end];
71        self.send(packet, restore_and_increment_dropped_count);
72    }
73
74    fn send(&self, packet: &[u8], on_error: impl Fn()) {
75        while let Err(status) = self.socket.write(packet) {
76            if status != zx::Status::SHOULD_WAIT || !self.config.retry_on_buffer_full {
77                on_error();
78                break;
79            }
80            let Ok(signals) = self
81                .socket
82                .wait_handle(
83                    zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
84                    zx::MonotonicInstant::INFINITE,
85                )
86                .to_result()
87            else {
88                on_error();
89                break;
90            };
91            if signals.contains(zx::Signals::SOCKET_PEER_CLOSED) {
92                on_error();
93                break;
94            }
95        }
96    }
97
98    pub(crate) fn record_log(&self, record: &log::Record<'_>) {
99        self.encode_and_send(|encoder, previously_dropped| {
100            encoder.write_event(WriteEventParams {
101                event: LogEvent::new(record),
102                tags: &self.config.tags,
103                metatags: self.config.metatags.iter(),
104                pid: PROCESS_ID.with(|p| *p),
105                tid: THREAD_ID.with(|t| *t),
106                dropped: previously_dropped.into(),
107            })
108        });
109    }
110
111    pub fn event_for_testing(&self, record: TestRecord<'_>) {
112        self.encode_and_send(move |encoder, previously_dropped| {
113            encoder.write_event(WriteEventParams {
114                event: record,
115                tags: &self.config.tags,
116                metatags: std::iter::empty(),
117                pid: PROCESS_ID.with(|p| *p),
118                tid: THREAD_ID.with(|t| *t),
119                dropped: previously_dropped.into(),
120            })
121        });
122    }
123}
124
125#[doc(hidden)]
126pub struct LogEvent<'a> {
127    record: &'a log::Record<'a>,
128    timestamp: zx::BootInstant,
129}
130
131impl<'a> LogEvent<'a> {
132    pub fn new(record: &'a log::Record<'a>) -> Self {
133        Self { record, timestamp: zx::BootInstant::get() }
134    }
135}
136
137impl RecordEvent for LogEvent<'_> {
138    fn raw_severity(&self) -> RawSeverity {
139        self.record.metadata().raw_severity()
140    }
141
142    fn file(&self) -> Option<&str> {
143        self.record.file()
144    }
145
146    fn line(&self) -> Option<u32> {
147        self.record.line()
148    }
149
150    fn target(&self) -> &str {
151        self.record.target()
152    }
153
154    fn timestamp(&self) -> zx::BootInstant {
155        self.timestamp
156    }
157
158    fn write_arguments<B: MutableBuffer>(
159        self,
160        writer: &mut Encoder<B>,
161    ) -> Result<(), EncodingError> {
162        let args = self.record.args();
163        let message =
164            args.as_str().map(Cow::Borrowed).unwrap_or_else(|| Cow::Owned(args.to_string()));
165        writer.write_argument(Argument::message(message))?;
166        self.record
167            .key_values()
168            .visit(&mut KeyValuesVisitor(writer))
169            .map_err(EncodingError::other)?;
170        Ok(())
171    }
172}
173
174struct KeyValuesVisitor<'a, B>(&'a mut Encoder<B>);
175
176impl<B: MutableBuffer> log::kv::VisitSource<'_> for KeyValuesVisitor<'_, B> {
177    fn visit_pair(
178        &mut self,
179        key: log::kv::Key<'_>,
180        value: log::kv::Value<'_>,
181    ) -> Result<(), log::kv::Error> {
182        value.visit(ValueVisitor { encoder: self.0, key: key.as_str() })
183    }
184}
185
186struct ValueVisitor<'a, B> {
187    encoder: &'a mut Encoder<B>,
188    key: &'a str,
189}
190
191impl<B: MutableBuffer> log::kv::VisitValue<'_> for ValueVisitor<'_, B> {
192    fn visit_any(&mut self, value: log::kv::Value<'_>) -> Result<(), log::kv::Error> {
193        self.encoder
194            .write_raw_argument(self.key, format!("{value}"))
195            .map_err(log::kv::Error::boxed)?;
196        Ok(())
197    }
198
199    fn visit_null(&mut self) -> Result<(), log::kv::Error> {
200        self.encoder.write_raw_argument(self.key, "null").map_err(log::kv::Error::boxed)?;
201        Ok(())
202    }
203
204    fn visit_u64(&mut self, value: u64) -> Result<(), log::kv::Error> {
205        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
206        Ok(())
207    }
208
209    fn visit_i64(&mut self, value: i64) -> Result<(), log::kv::Error> {
210        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
211        Ok(())
212    }
213
214    fn visit_f64(&mut self, value: f64) -> Result<(), log::kv::Error> {
215        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
216        Ok(())
217    }
218
219    fn visit_bool(&mut self, value: bool) -> Result<(), log::kv::Error> {
220        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
221        Ok(())
222    }
223
224    fn visit_str(&mut self, value: &str) -> Result<(), log::kv::Error> {
225        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
226        Ok(())
227    }
228
229    // TODO(https://fxbug.dev/360919323): when we enable kv_std we must support visit_error and
230    // visit_borrowed_error.
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use crate::{increment_clock, log_every_n_seconds};
237    use diagnostics_log_encoding::parse::parse_record;
238    use diagnostics_log_encoding::{Argument, Record};
239    use diagnostics_log_types::Severity;
240    use fidl::endpoints::create_proxy_and_stream;
241    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest};
242    use futures::stream::StreamExt;
243    use futures::AsyncReadExt;
244    use log::{debug, error, info, trace, warn};
245    use std::sync::{Arc, Mutex};
246    use std::time::Duration;
247    use test_util::assert_gt;
248    use zx::Status;
249
250    const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
251
252    struct TestLogger {
253        sink: Sink,
254    }
255
256    impl TestLogger {
257        fn new(sink: Sink) -> Self {
258            Self { sink }
259        }
260    }
261
262    impl log::Log for TestLogger {
263        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
264            true
265        }
266
267        fn log(&self, record: &log::Record<'_>) {
268            if self.enabled(record.metadata()) {
269                self.sink.record_log(record);
270            }
271        }
272
273        fn flush(&self) {}
274    }
275
276    async fn init_sink(config: SinkConfig) -> fidl::Socket {
277        let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
278        let sink = Sink::new(&proxy, config).unwrap();
279        log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
280        log::set_max_level(log::LevelFilter::Info);
281
282        match requests.next().await.unwrap().unwrap() {
283            LogSinkRequest::ConnectStructured { socket, .. } => socket,
284            _ => panic!("sink ctor sent the wrong message"),
285        }
286    }
287
288    fn arg_prefix() -> Vec<Argument<'static>> {
289        vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
290    }
291
292    #[fuchsia::test(logging = false)]
293    async fn wait_and_retry_is_possible() {
294        // 160 writes so we write 5 MB given that we write 32K each write. Without enabling
295        // retrying, this would lead to dropped logs.
296        const TOTAL_WRITES: usize = 32 * 5;
297        let (proxy, mut requests) = create_proxy_and_stream::<LogSinkMarker>();
298        // Writes a megabyte of data to the Sink.
299        std::thread::spawn(move || {
300            let sink = Sink::new(
301                &proxy,
302                SinkConfig { retry_on_buffer_full: true, ..SinkConfig::default() },
303            )
304            .unwrap();
305            for i in 0..TOTAL_WRITES {
306                let buf = [i as u8; MAX_DATAGRAM_LEN_BYTES as _];
307                sink.send(&buf, || unreachable!("We should never drop a log in this test"));
308            }
309        });
310        let socket = match requests.next().await.unwrap().unwrap() {
311            LogSinkRequest::ConnectStructured { socket, .. } => socket,
312            _ => panic!("sink ctor sent the wrong message"),
313        };
314        let mut socket = fuchsia_async::Socket::from_socket(socket);
315        // Ensure we are able to read all of the data written to the socket and we didn't drop
316        // anything.
317        for i in 0..TOTAL_WRITES {
318            let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
319            let len = socket.read(&mut buf).await.unwrap();
320            assert_eq!(len, MAX_DATAGRAM_LEN_BYTES as usize);
321            assert_eq!(buf, vec![i as u8; MAX_DATAGRAM_LEN_BYTES as _]);
322            std::thread::sleep(std::time::Duration::from_millis(50));
323        }
324    }
325
326    #[fuchsia::test(logging = false)]
327    async fn packets_are_sent() {
328        let socket = init_sink(SinkConfig {
329            metatags: HashSet::from([Metatag::Target]),
330            ..SinkConfig::default()
331        })
332        .await;
333        log::set_max_level(log::LevelFilter::Trace);
334        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
335        let mut next_message = || {
336            let len = socket.read(&mut buf).unwrap();
337            let (record, _) = parse_record(&buf[..len]).unwrap();
338            assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
339            record.into_owned()
340        };
341
342        // emit some expected messages and then we'll retrieve them for parsing
343        trace!(count = 123; "whoa this is noisy");
344        let observed_trace = next_message();
345        debug!(maybe = true; "don't try this at home");
346        let observed_debug = next_message();
347        info!("this is a message");
348        let observed_info = next_message();
349        warn!(reason = "just cuz"; "this is a warning");
350        let observed_warn = next_message();
351        error!(e = "something went pretty wrong"; "this is an error");
352        let error_line = line!() - 1;
353        let metatag = Argument::tag(TARGET);
354        let observed_error = next_message();
355
356        // TRACE
357        {
358            let mut expected_trace = Record {
359                timestamp: observed_trace.timestamp,
360                severity: Severity::Trace as u8,
361                arguments: arg_prefix(),
362            };
363            expected_trace.arguments.push(metatag.clone());
364            expected_trace.arguments.push(Argument::message("whoa this is noisy"));
365            expected_trace.arguments.push(Argument::new("count", 123));
366            assert_eq!(observed_trace, expected_trace);
367        }
368
369        // DEBUG
370        {
371            let mut expected_debug = Record {
372                timestamp: observed_debug.timestamp,
373                severity: Severity::Debug as u8,
374                arguments: arg_prefix(),
375            };
376            expected_debug.arguments.push(metatag.clone());
377            expected_debug.arguments.push(Argument::message("don't try this at home"));
378            expected_debug.arguments.push(Argument::new("maybe", true));
379            assert_eq!(observed_debug, expected_debug);
380        }
381
382        // INFO
383        {
384            let mut expected_info = Record {
385                timestamp: observed_info.timestamp,
386                severity: Severity::Info as u8,
387                arguments: arg_prefix(),
388            };
389            expected_info.arguments.push(metatag.clone());
390            expected_info.arguments.push(Argument::message("this is a message"));
391            assert_eq!(observed_info, expected_info);
392        }
393
394        // WARN
395        {
396            let mut expected_warn = Record {
397                timestamp: observed_warn.timestamp,
398                severity: Severity::Warn as u8,
399                arguments: arg_prefix(),
400            };
401            expected_warn.arguments.push(metatag.clone());
402            expected_warn.arguments.push(Argument::message("this is a warning"));
403            expected_warn.arguments.push(Argument::new("reason", "just cuz"));
404            assert_eq!(observed_warn, expected_warn);
405        }
406
407        // ERROR
408        {
409            let mut expected_error = Record {
410                timestamp: observed_error.timestamp,
411                severity: Severity::Error as u8,
412                arguments: arg_prefix(),
413            };
414            expected_error
415                .arguments
416                .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
417            expected_error.arguments.push(Argument::line(error_line as u64));
418            expected_error.arguments.push(metatag);
419            expected_error.arguments.push(Argument::message("this is an error"));
420            expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
421            assert_eq!(observed_error, expected_error);
422        }
423    }
424
425    #[fuchsia::test(logging = false)]
426    async fn tags_are_sent() {
427        let socket = init_sink(SinkConfig {
428            tags: vec!["tags_are_sent".to_string()],
429            ..SinkConfig::default()
430        })
431        .await;
432        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
433        let mut next_message = || {
434            let len = socket.read(&mut buf).unwrap();
435            let (record, _) = parse_record(&buf[..len]).unwrap();
436            assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
437            record.into_owned()
438        };
439
440        info!("this should have a tag");
441        let observed = next_message();
442
443        let mut expected = Record {
444            timestamp: observed.timestamp,
445            severity: Severity::Info as u8,
446            arguments: arg_prefix(),
447        };
448        expected.arguments.push(Argument::message("this should have a tag"));
449        expected.arguments.push(Argument::tag("tags_are_sent"));
450        assert_eq!(observed, expected);
451    }
452
453    #[fuchsia::test(logging = false)]
454    async fn log_every_n_seconds_test() {
455        let socket = init_sink(SinkConfig { ..SinkConfig::default() }).await;
456        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
457        let next_message = |buf: &mut [u8]| {
458            let len = socket.read(buf).unwrap();
459            let (record, _) = parse_record(&buf[..len]).unwrap();
460            assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
461            record.into_owned()
462        };
463
464        let log_fn = || {
465            log_every_n_seconds!(5, INFO, "test message");
466        };
467
468        let expect_message = |buf: &mut [u8]| {
469            let observed = next_message(buf);
470
471            let mut expected = Record {
472                timestamp: observed.timestamp,
473                severity: Severity::Info as u8,
474                arguments: arg_prefix(),
475            };
476            expected.arguments.push(Argument::message("test message"));
477            assert_eq!(observed, expected);
478        };
479
480        log_fn();
481        // First log call should result in a message.
482        expect_message(&mut buf);
483        log_fn();
484        // Subsequent log call in less than 5 seconds should NOT
485        // result in a message.
486        assert_eq!(socket.read(&mut buf), Err(Status::SHOULD_WAIT));
487        increment_clock(Duration::from_secs(5));
488
489        // Calling log_fn after 5 seconds should result in a message.
490        log_fn();
491        expect_message(&mut buf);
492    }
493
494    #[fuchsia::test(logging = false)]
495    async fn drop_count_is_tracked() {
496        let socket = init_sink(SinkConfig::default()).await;
497        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
498        const MESSAGE_SIZE: usize = 104;
499        const MESSAGE_SIZE_WITH_DROPS: usize = 136;
500        const NUM_DROPPED: usize = 100;
501
502        let socket_capacity = || {
503            let info = socket.info().unwrap();
504            info.rx_buf_max - info.rx_buf_size
505        };
506        let emit_message = || info!("it's-a-me, a message-o");
507        let mut drain_message = |with_drops| {
508            let len = socket.read(&mut buf).unwrap();
509
510            let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
511            assert_eq!(len, expected_len, "constant message size is used to calculate thresholds");
512
513            let (record, _) = parse_record(&buf[..len]).unwrap();
514            let mut expected_args = arg_prefix();
515
516            if with_drops {
517                expected_args.push(Argument::dropped(NUM_DROPPED as u64));
518            }
519
520            expected_args.push(Argument::message("it's-a-me, a message-o"));
521
522            assert_eq!(
523                record,
524                Record {
525                    timestamp: record.timestamp,
526                    severity: Severity::Info as u8,
527                    arguments: expected_args
528                }
529            );
530        };
531
532        // fill up the socket
533        let mut num_emitted = 0;
534        while socket_capacity() > MESSAGE_SIZE {
535            emit_message();
536            num_emitted += 1;
537            assert_eq!(
538                socket.info().unwrap().rx_buf_size,
539                num_emitted * MESSAGE_SIZE,
540                "incorrect bytes stored after {} messages sent",
541                num_emitted
542            );
543        }
544
545        // drop messages
546        for _ in 0..NUM_DROPPED {
547            emit_message();
548        }
549
550        // make space for a message to convey the drop count
551        // we drain two messages here because emitting the drop count adds to the size of the packet
552        // if we only drain one message then we're relying on the kernel's buffer size to satisfy
553        //   (rx_buf_max_size % MESSAGE_SIZE) > (MESSAGE_SIZE_WITH_DROPS - MESSAGE_SIZE)
554        // this is true at the time of writing of this test but we don't know whether that's a
555        // guarantee.
556        drain_message(false);
557        drain_message(false);
558        // we use this count below to drain the rest of the messages
559        num_emitted -= 2;
560        // convey the drop count, it's now at the tail of the socket
561        emit_message();
562        // drain remaining "normal" messages ahead of the drop count
563        for _ in 0..num_emitted {
564            drain_message(false);
565        }
566        // verify that messages were dropped
567        drain_message(true);
568
569        // check that we return to normal after reporting the drops
570        emit_message();
571        drain_message(false);
572        assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "must drain all messages");
573    }
574
575    #[fuchsia::test(logging = false)]
576    async fn build_record_from_log_event() {
577        let before_timestamp = zx::BootInstant::get();
578        let last_record = Arc::new(Mutex::new(None));
579        let logger = TrackerLogger::new(last_record.clone());
580        log::set_boxed_logger(Box::new(logger)).expect("set logger");
581        log::set_max_level(log::LevelFilter::Info);
582        log::info!(
583            is_a_str = "hahaha",
584            is_debug:? = PrintMe(5),
585            is_signed = -500,
586            is_unsigned = 1000u64,
587            is_bool = false;
588            "blarg this is a message"
589        );
590
591        let guard = last_record.lock().unwrap();
592        let encoder = guard.as_ref().unwrap();
593        let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
594        assert_gt!(record.timestamp, before_timestamp);
595        assert_eq!(
596            record,
597            Record {
598                timestamp: record.timestamp,
599                severity: Severity::Info as u8,
600                arguments: vec![
601                    Argument::pid(PROCESS_ID.with(|p| *p)),
602                    Argument::tid(THREAD_ID.with(|p| *p)),
603                    Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
604                    Argument::message("blarg this is a message"),
605                    Argument::other("is_a_str", "hahaha"),
606                    Argument::other("is_debug", "PrintMe(5)"),
607                    Argument::other("is_signed", -500),
608                    Argument::other("is_unsigned", 1000u64),
609                    Argument::other("is_bool", false),
610                    Argument::tag("a-tag"),
611                ]
612            }
613        );
614    }
615
616    // Note the inner u32 is used in the debug implementation.
617    #[derive(Debug)]
618    struct PrintMe(#[allow(unused)] u32);
619
620    type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
621
622    struct TrackerLogger {
623        last_record: Arc<Mutex<Option<ByteEncoder>>>,
624    }
625
626    impl TrackerLogger {
627        fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
628            Self { last_record }
629        }
630    }
631
632    impl log::Log for TrackerLogger {
633        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
634            true
635        }
636
637        fn log(&self, record: &log::Record<'_>) {
638            let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
639            encoder
640                .write_event(WriteEventParams {
641                    event: LogEvent::new(record),
642                    tags: &["a-tag"],
643                    metatags: [Metatag::Target].iter(),
644                    pid: PROCESS_ID.with(|p| *p),
645                    tid: THREAD_ID.with(|t| *t),
646                    dropped: 0,
647                })
648                .expect("wrote event");
649            let mut last_record = self.last_record.lock().unwrap();
650            last_record.replace(encoder);
651        }
652
653        fn flush(&self) {}
654    }
655}