diagnostics_log/fuchsia/
sink.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::{PublishError, SeverityExt};
5use diagnostics_log_encoding::encode::{
6    Encoder, EncoderOpts, EncodingError, MutableBuffer, RecordEvent, TestRecord, WriteEventParams,
7};
8use diagnostics_log_encoding::{Argument, Metatag, RawSeverity};
9use fidl::endpoints::ClientEnd;
10use fidl_fuchsia_logger::{LogSinkMarker, LogSinkSynchronousProxy, MAX_DATAGRAM_LEN_BYTES};
11use fuchsia_runtime as rt;
12use std::borrow::Cow;
13use std::collections::HashSet;
14use std::io::Cursor;
15use std::sync::atomic::{AtomicU32, Ordering};
16use zx::{self as zx, AsHandleRef};
17
18#[derive(Default)]
19pub(crate) struct SinkConfig {
20    pub(crate) metatags: HashSet<Metatag>,
21    pub(crate) retry_on_buffer_full: bool,
22    pub(crate) tags: Vec<String>,
23    pub(crate) always_log_file_line: bool,
24}
25
26thread_local! {
27    static PROCESS_ID: zx::Koid =
28        rt::process_self().get_koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX));
29    static THREAD_ID: zx::Koid = rt::with_thread_self(|thread| {
30        thread.get_koid().unwrap_or_else(|_| zx::Koid::from_raw(zx::sys::zx_koid_t::MAX))
31    });
32}
33
34pub(crate) struct Sink {
35    socket: zx::Socket,
36    num_events_dropped: AtomicU32,
37    config: SinkConfig,
38}
39
40impl Sink {
41    pub fn new(
42        log_sink: &ClientEnd<LogSinkMarker>,
43        config: SinkConfig,
44    ) -> Result<Self, PublishError> {
45        let (socket, remote_socket) = zx::Socket::create_datagram();
46        let log_sink = zx::Unowned::<LogSinkSynchronousProxy>::new(log_sink.channel());
47        log_sink.connect_structured(remote_socket).map_err(PublishError::SendSocket)?;
48        Ok(Self { socket, config, num_events_dropped: AtomicU32::new(0) })
49    }
50}
51
52impl Sink {
53    #[inline]
54    fn encode_and_send(
55        &self,
56        encode: impl FnOnce(&mut Encoder<Cursor<&mut [u8]>>, u32) -> Result<(), EncodingError>,
57    ) {
58        let ordering = Ordering::Relaxed;
59        let previously_dropped = self.num_events_dropped.swap(0, ordering);
60        let restore_and_increment_dropped_count = || {
61            self.num_events_dropped.fetch_add(previously_dropped + 1, ordering);
62        };
63
64        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
65        let mut encoder = Encoder::new(
66            Cursor::new(&mut buf[..]),
67            EncoderOpts { always_log_file_line: self.config.always_log_file_line },
68        );
69        if encode(&mut encoder, previously_dropped).is_err() {
70            restore_and_increment_dropped_count();
71            return;
72        }
73
74        let end = encoder.inner().cursor();
75        let packet = &encoder.inner().get_ref()[..end];
76        self.send(packet, restore_and_increment_dropped_count);
77    }
78
79    fn send(&self, packet: &[u8], on_error: impl Fn()) {
80        while let Err(status) = self.socket.write(packet) {
81            if status != zx::Status::SHOULD_WAIT || !self.config.retry_on_buffer_full {
82                on_error();
83                break;
84            }
85            let Ok(signals) = self
86                .socket
87                .wait_handle(
88                    zx::Signals::SOCKET_PEER_CLOSED | zx::Signals::SOCKET_WRITABLE,
89                    zx::MonotonicInstant::INFINITE,
90                )
91                .to_result()
92            else {
93                on_error();
94                break;
95            };
96            if signals.contains(zx::Signals::SOCKET_PEER_CLOSED) {
97                on_error();
98                break;
99            }
100        }
101    }
102
103    pub(crate) fn record_log(&self, record: &log::Record<'_>) {
104        self.encode_and_send(|encoder, previously_dropped| {
105            encoder.write_event(WriteEventParams {
106                event: LogEvent::new(record),
107                tags: &self.config.tags,
108                metatags: self.config.metatags.iter(),
109                pid: PROCESS_ID.with(|p| *p),
110                tid: THREAD_ID.with(|t| *t),
111                dropped: previously_dropped.into(),
112            })
113        });
114    }
115
116    pub fn event_for_testing(&self, record: TestRecord<'_>) {
117        self.encode_and_send(move |encoder, previously_dropped| {
118            encoder.write_event(WriteEventParams {
119                event: record,
120                tags: &self.config.tags,
121                metatags: std::iter::empty(),
122                pid: PROCESS_ID.with(|p| *p),
123                tid: THREAD_ID.with(|t| *t),
124                dropped: previously_dropped.into(),
125            })
126        });
127    }
128}
129
130#[doc(hidden)]
131pub struct LogEvent<'a> {
132    record: &'a log::Record<'a>,
133    timestamp: zx::BootInstant,
134}
135
136impl<'a> LogEvent<'a> {
137    pub fn new(record: &'a log::Record<'a>) -> Self {
138        Self { record, timestamp: zx::BootInstant::get() }
139    }
140}
141
142impl RecordEvent for LogEvent<'_> {
143    fn raw_severity(&self) -> RawSeverity {
144        self.record.metadata().raw_severity()
145    }
146
147    fn file(&self) -> Option<&str> {
148        self.record.file()
149    }
150
151    fn line(&self) -> Option<u32> {
152        self.record.line()
153    }
154
155    fn target(&self) -> &str {
156        self.record.target()
157    }
158
159    fn timestamp(&self) -> zx::BootInstant {
160        self.timestamp
161    }
162
163    fn write_arguments<B: MutableBuffer>(
164        self,
165        writer: &mut Encoder<B>,
166    ) -> Result<(), EncodingError> {
167        let args = self.record.args();
168        let message =
169            args.as_str().map(Cow::Borrowed).unwrap_or_else(|| Cow::Owned(args.to_string()));
170        writer.write_argument(Argument::message(message))?;
171        self.record
172            .key_values()
173            .visit(&mut KeyValuesVisitor(writer))
174            .map_err(EncodingError::other)?;
175        Ok(())
176    }
177}
178
179struct KeyValuesVisitor<'a, B>(&'a mut Encoder<B>);
180
181impl<B: MutableBuffer> log::kv::VisitSource<'_> for KeyValuesVisitor<'_, B> {
182    fn visit_pair(
183        &mut self,
184        key: log::kv::Key<'_>,
185        value: log::kv::Value<'_>,
186    ) -> Result<(), log::kv::Error> {
187        value.visit(ValueVisitor { encoder: self.0, key: key.as_str() })
188    }
189}
190
191struct ValueVisitor<'a, B> {
192    encoder: &'a mut Encoder<B>,
193    key: &'a str,
194}
195
196impl<B: MutableBuffer> log::kv::VisitValue<'_> for ValueVisitor<'_, B> {
197    fn visit_any(&mut self, value: log::kv::Value<'_>) -> Result<(), log::kv::Error> {
198        self.encoder
199            .write_raw_argument(self.key, format!("{value}"))
200            .map_err(log::kv::Error::boxed)?;
201        Ok(())
202    }
203
204    fn visit_null(&mut self) -> Result<(), log::kv::Error> {
205        self.encoder.write_raw_argument(self.key, "null").map_err(log::kv::Error::boxed)?;
206        Ok(())
207    }
208
209    fn visit_u64(&mut self, value: u64) -> Result<(), log::kv::Error> {
210        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
211        Ok(())
212    }
213
214    fn visit_i64(&mut self, value: i64) -> Result<(), log::kv::Error> {
215        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
216        Ok(())
217    }
218
219    fn visit_f64(&mut self, value: f64) -> Result<(), log::kv::Error> {
220        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
221        Ok(())
222    }
223
224    fn visit_bool(&mut self, value: bool) -> Result<(), log::kv::Error> {
225        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
226        Ok(())
227    }
228
229    fn visit_str(&mut self, value: &str) -> Result<(), log::kv::Error> {
230        self.encoder.write_raw_argument(self.key, value).map_err(log::kv::Error::boxed)?;
231        Ok(())
232    }
233
234    // TODO(https://fxbug.dev/360919323): when we enable kv_std we must support visit_error and
235    // visit_borrowed_error.
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use crate::{increment_clock, log_every_n_seconds};
242    use diagnostics_log_encoding::parse::parse_record;
243    use diagnostics_log_encoding::{Argument, Record};
244    use diagnostics_log_types::Severity;
245    use fidl::endpoints::create_request_stream;
246    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequest};
247    use futures::stream::StreamExt;
248    use futures::AsyncReadExt;
249    use log::{debug, error, info, trace, warn};
250    use std::sync::{Arc, Mutex};
251    use std::time::Duration;
252    use test_util::assert_gt;
253    use zx::Status;
254
255    const TARGET: &str = "diagnostics_log_lib_test::fuchsia::sink::tests";
256
257    struct TestLogger {
258        sink: Sink,
259    }
260
261    impl TestLogger {
262        fn new(sink: Sink) -> Self {
263            Self { sink }
264        }
265    }
266
267    impl log::Log for TestLogger {
268        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
269            true
270        }
271
272        fn log(&self, record: &log::Record<'_>) {
273            if self.enabled(record.metadata()) {
274                self.sink.record_log(record);
275            }
276        }
277
278        fn flush(&self) {}
279    }
280
281    async fn init_sink(config: SinkConfig) -> fidl::Socket {
282        let (client, mut requests) = create_request_stream::<LogSinkMarker>();
283        let sink = Sink::new(&client, config).unwrap();
284        log::set_boxed_logger(Box::new(TestLogger::new(sink))).expect("set logger");
285        log::set_max_level(log::LevelFilter::Info);
286
287        match requests.next().await.unwrap().unwrap() {
288            LogSinkRequest::ConnectStructured { socket, .. } => socket,
289            _ => panic!("sink ctor sent the wrong message"),
290        }
291    }
292
293    fn arg_prefix() -> Vec<Argument<'static>> {
294        vec![Argument::pid(PROCESS_ID.with(|p| *p)), Argument::tid(THREAD_ID.with(|t| *t))]
295    }
296
297    #[fuchsia::test(logging = false)]
298    async fn wait_and_retry_is_possible() {
299        // 160 writes so we write 5 MB given that we write 32K each write. Without enabling
300        // retrying, this would lead to dropped logs.
301        const TOTAL_WRITES: usize = 32 * 5;
302        let (client, mut requests) = create_request_stream::<LogSinkMarker>();
303        // Writes a megabyte of data to the Sink.
304        std::thread::spawn(move || {
305            let sink = Sink::new(
306                &client,
307                SinkConfig { retry_on_buffer_full: true, ..SinkConfig::default() },
308            )
309            .unwrap();
310            for i in 0..TOTAL_WRITES {
311                let buf = [i as u8; MAX_DATAGRAM_LEN_BYTES as _];
312                sink.send(&buf, || unreachable!("We should never drop a log in this test"));
313            }
314        });
315        let socket = match requests.next().await.unwrap().unwrap() {
316            LogSinkRequest::ConnectStructured { socket, .. } => socket,
317            _ => panic!("sink ctor sent the wrong message"),
318        };
319        let mut socket = fuchsia_async::Socket::from_socket(socket);
320        // Ensure we are able to read all of the data written to the socket and we didn't drop
321        // anything.
322        for i in 0..TOTAL_WRITES {
323            let mut buf = vec![0u8; MAX_DATAGRAM_LEN_BYTES as _];
324            let len = socket.read(&mut buf).await.unwrap();
325            assert_eq!(len, MAX_DATAGRAM_LEN_BYTES as usize);
326            assert_eq!(buf, vec![i as u8; MAX_DATAGRAM_LEN_BYTES as _]);
327            std::thread::sleep(std::time::Duration::from_millis(50));
328        }
329    }
330
331    #[fuchsia::test(logging = false)]
332    async fn packets_are_sent() {
333        let socket = init_sink(SinkConfig {
334            metatags: HashSet::from([Metatag::Target]),
335            ..SinkConfig::default()
336        })
337        .await;
338        log::set_max_level(log::LevelFilter::Trace);
339        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
340        let mut next_message = || {
341            let len = socket.read(&mut buf).unwrap();
342            let (record, _) = parse_record(&buf[..len]).unwrap();
343            assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
344            record.into_owned()
345        };
346
347        // emit some expected messages and then we'll retrieve them for parsing
348        trace!(count = 123; "whoa this is noisy");
349        let observed_trace = next_message();
350        debug!(maybe = true; "don't try this at home");
351        let observed_debug = next_message();
352        info!("this is a message");
353        let observed_info = next_message();
354        warn!(reason = "just cuz"; "this is a warning");
355        let observed_warn = next_message();
356        error!(e = "something went pretty wrong"; "this is an error");
357        let error_line = line!() - 1;
358        let metatag = Argument::tag(TARGET);
359        let observed_error = next_message();
360
361        // TRACE
362        {
363            let mut expected_trace = Record {
364                timestamp: observed_trace.timestamp,
365                severity: Severity::Trace as u8,
366                arguments: arg_prefix(),
367            };
368            expected_trace.arguments.push(metatag.clone());
369            expected_trace.arguments.push(Argument::message("whoa this is noisy"));
370            expected_trace.arguments.push(Argument::new("count", 123));
371            assert_eq!(observed_trace, expected_trace);
372        }
373
374        // DEBUG
375        {
376            let mut expected_debug = Record {
377                timestamp: observed_debug.timestamp,
378                severity: Severity::Debug as u8,
379                arguments: arg_prefix(),
380            };
381            expected_debug.arguments.push(metatag.clone());
382            expected_debug.arguments.push(Argument::message("don't try this at home"));
383            expected_debug.arguments.push(Argument::new("maybe", true));
384            assert_eq!(observed_debug, expected_debug);
385        }
386
387        // INFO
388        {
389            let mut expected_info = Record {
390                timestamp: observed_info.timestamp,
391                severity: Severity::Info as u8,
392                arguments: arg_prefix(),
393            };
394            expected_info.arguments.push(metatag.clone());
395            expected_info.arguments.push(Argument::message("this is a message"));
396            assert_eq!(observed_info, expected_info);
397        }
398
399        // WARN
400        {
401            let mut expected_warn = Record {
402                timestamp: observed_warn.timestamp,
403                severity: Severity::Warn as u8,
404                arguments: arg_prefix(),
405            };
406            expected_warn.arguments.push(metatag.clone());
407            expected_warn.arguments.push(Argument::message("this is a warning"));
408            expected_warn.arguments.push(Argument::new("reason", "just cuz"));
409            assert_eq!(observed_warn, expected_warn);
410        }
411
412        // ERROR
413        {
414            let mut expected_error = Record {
415                timestamp: observed_error.timestamp,
416                severity: Severity::Error as u8,
417                arguments: arg_prefix(),
418            };
419            expected_error
420                .arguments
421                .push(Argument::file("src/lib/diagnostics/log/rust/src/fuchsia/sink.rs"));
422            expected_error.arguments.push(Argument::line(error_line as u64));
423            expected_error.arguments.push(metatag);
424            expected_error.arguments.push(Argument::message("this is an error"));
425            expected_error.arguments.push(Argument::new("e", "something went pretty wrong"));
426            assert_eq!(observed_error, expected_error);
427        }
428    }
429
430    #[fuchsia::test(logging = false)]
431    async fn tags_are_sent() {
432        let socket = init_sink(SinkConfig {
433            tags: vec!["tags_are_sent".to_string()],
434            ..SinkConfig::default()
435        })
436        .await;
437        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
438        let mut next_message = || {
439            let len = socket.read(&mut buf).unwrap();
440            let (record, _) = parse_record(&buf[..len]).unwrap();
441            assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
442            record.into_owned()
443        };
444
445        info!("this should have a tag");
446        let observed = next_message();
447
448        let mut expected = Record {
449            timestamp: observed.timestamp,
450            severity: Severity::Info as u8,
451            arguments: arg_prefix(),
452        };
453        expected.arguments.push(Argument::message("this should have a tag"));
454        expected.arguments.push(Argument::tag("tags_are_sent"));
455        assert_eq!(observed, expected);
456    }
457
458    #[fuchsia::test(logging = false)]
459    async fn log_every_n_seconds_test() {
460        let socket = init_sink(SinkConfig { ..SinkConfig::default() }).await;
461        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
462        let next_message = |buf: &mut [u8]| {
463            let len = socket.read(buf).unwrap();
464            let (record, _) = parse_record(&buf[..len]).unwrap();
465            assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "socket must be empty");
466            record.into_owned()
467        };
468
469        let log_fn = || {
470            log_every_n_seconds!(5, INFO, "test message");
471        };
472
473        let expect_message = |buf: &mut [u8]| {
474            let observed = next_message(buf);
475
476            let mut expected = Record {
477                timestamp: observed.timestamp,
478                severity: Severity::Info as u8,
479                arguments: arg_prefix(),
480            };
481            expected.arguments.push(Argument::message("test message"));
482            assert_eq!(observed, expected);
483        };
484
485        log_fn();
486        // First log call should result in a message.
487        expect_message(&mut buf);
488        log_fn();
489        // Subsequent log call in less than 5 seconds should NOT
490        // result in a message.
491        assert_eq!(socket.read(&mut buf), Err(Status::SHOULD_WAIT));
492        increment_clock(Duration::from_secs(5));
493
494        // Calling log_fn after 5 seconds should result in a message.
495        log_fn();
496        expect_message(&mut buf);
497    }
498
499    #[fuchsia::test(logging = false)]
500    async fn drop_count_is_tracked() {
501        let socket = init_sink(SinkConfig::default()).await;
502        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
503        const MESSAGE_SIZE: usize = 104;
504        const MESSAGE_SIZE_WITH_DROPS: usize = 136;
505        const NUM_DROPPED: usize = 100;
506
507        let socket_capacity = || {
508            let info = socket.info().unwrap();
509            info.rx_buf_max - info.rx_buf_size
510        };
511        let emit_message = || info!("it's-a-me, a message-o");
512        let mut drain_message = |with_drops| {
513            let len = socket.read(&mut buf).unwrap();
514
515            let expected_len = if with_drops { MESSAGE_SIZE_WITH_DROPS } else { MESSAGE_SIZE };
516            assert_eq!(len, expected_len, "constant message size is used to calculate thresholds");
517
518            let (record, _) = parse_record(&buf[..len]).unwrap();
519            let mut expected_args = arg_prefix();
520
521            if with_drops {
522                expected_args.push(Argument::dropped(NUM_DROPPED as u64));
523            }
524
525            expected_args.push(Argument::message("it's-a-me, a message-o"));
526
527            assert_eq!(
528                record,
529                Record {
530                    timestamp: record.timestamp,
531                    severity: Severity::Info as u8,
532                    arguments: expected_args
533                }
534            );
535        };
536
537        // fill up the socket
538        let mut num_emitted = 0;
539        while socket_capacity() > MESSAGE_SIZE {
540            emit_message();
541            num_emitted += 1;
542            assert_eq!(
543                socket.info().unwrap().rx_buf_size,
544                num_emitted * MESSAGE_SIZE,
545                "incorrect bytes stored after {} messages sent",
546                num_emitted
547            );
548        }
549
550        // drop messages
551        for _ in 0..NUM_DROPPED {
552            emit_message();
553        }
554
555        // make space for a message to convey the drop count
556        // we drain two messages here because emitting the drop count adds to the size of the packet
557        // if we only drain one message then we're relying on the kernel's buffer size to satisfy
558        //   (rx_buf_max_size % MESSAGE_SIZE) > (MESSAGE_SIZE_WITH_DROPS - MESSAGE_SIZE)
559        // this is true at the time of writing of this test but we don't know whether that's a
560        // guarantee.
561        drain_message(false);
562        drain_message(false);
563        // we use this count below to drain the rest of the messages
564        num_emitted -= 2;
565        // convey the drop count, it's now at the tail of the socket
566        emit_message();
567        // drain remaining "normal" messages ahead of the drop count
568        for _ in 0..num_emitted {
569            drain_message(false);
570        }
571        // verify that messages were dropped
572        drain_message(true);
573
574        // check that we return to normal after reporting the drops
575        emit_message();
576        drain_message(false);
577        assert_eq!(socket.outstanding_read_bytes().unwrap(), 0, "must drain all messages");
578    }
579
580    #[fuchsia::test(logging = false)]
581    async fn build_record_from_log_event() {
582        let before_timestamp = zx::BootInstant::get();
583        let last_record = Arc::new(Mutex::new(None));
584        let logger = TrackerLogger::new(last_record.clone());
585        log::set_boxed_logger(Box::new(logger)).expect("set logger");
586        log::set_max_level(log::LevelFilter::Info);
587        log::info!(
588            is_a_str = "hahaha",
589            is_debug:? = PrintMe(5),
590            is_signed = -500,
591            is_unsigned = 1000u64,
592            is_bool = false;
593            "blarg this is a message"
594        );
595
596        let guard = last_record.lock().unwrap();
597        let encoder = guard.as_ref().unwrap();
598        let (record, _) = parse_record(encoder.inner().get_ref()).expect("wrote valid record");
599        assert_gt!(record.timestamp, before_timestamp);
600        assert_eq!(
601            record,
602            Record {
603                timestamp: record.timestamp,
604                severity: Severity::Info as u8,
605                arguments: vec![
606                    Argument::pid(PROCESS_ID.with(|p| *p)),
607                    Argument::tid(THREAD_ID.with(|p| *p)),
608                    Argument::tag("diagnostics_log_lib_test::fuchsia::sink::tests"),
609                    Argument::message("blarg this is a message"),
610                    Argument::other("is_a_str", "hahaha"),
611                    Argument::other("is_debug", "PrintMe(5)"),
612                    Argument::other("is_signed", -500),
613                    Argument::other("is_unsigned", 1000u64),
614                    Argument::other("is_bool", false),
615                    Argument::tag("a-tag"),
616                ]
617            }
618        );
619    }
620
621    // Note the inner u32 is used in the debug implementation.
622    #[derive(Debug)]
623    struct PrintMe(#[allow(unused)] u32);
624
625    type ByteEncoder = Encoder<Cursor<[u8; 1024]>>;
626
627    struct TrackerLogger {
628        last_record: Arc<Mutex<Option<ByteEncoder>>>,
629    }
630
631    impl TrackerLogger {
632        fn new(last_record: Arc<Mutex<Option<ByteEncoder>>>) -> Self {
633            Self { last_record }
634        }
635    }
636
637    impl log::Log for TrackerLogger {
638        fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
639            true
640        }
641
642        fn log(&self, record: &log::Record<'_>) {
643            let mut encoder = Encoder::new(Cursor::new([0u8; 1024]), EncoderOpts::default());
644            encoder
645                .write_event(WriteEventParams {
646                    event: LogEvent::new(record),
647                    tags: &["a-tag"],
648                    metatags: [Metatag::Target].iter(),
649                    pid: PROCESS_ID.with(|p| *p),
650                    tid: THREAD_ID.with(|t| *t),
651                    dropped: 0,
652                })
653                .expect("wrote event");
654            let mut last_record = self.last_record.lock().unwrap();
655            last_record.replace(encoder);
656        }
657
658        fn flush(&self) {}
659    }
660}