diagnostics_log/fuchsia/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::LogSinkMarker;
8use fuchsia_async as fasync;
9use fuchsia_component_client::connect::connect_to_protocol;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::sync::{Arc, Mutex};
13use thiserror::Error;
14
15#[cfg(fuchsia_api_level_less_than = "27")]
16use fidl_fuchsia_diagnostics::Interest;
17#[cfg(fuchsia_api_level_at_least = "27")]
18use fidl_fuchsia_diagnostics_types::Interest;
19
20mod filter;
21mod sink;
22
23use filter::InterestFilter;
24use sink::{Sink, SinkConfig};
25
26pub use diagnostics_log_encoding::encode::TestRecord;
27pub use diagnostics_log_encoding::Metatag;
28pub use paste::paste;
29pub use sink::LogEvent;
30
31#[cfg(test)]
32use std::{
33    sync::atomic::{AtomicI64, Ordering},
34    time::Duration,
35};
36
37/// Callback for interest listeners
38pub trait OnInterestChanged {
39    /// Callback for when the interest changes
40    fn on_changed(&self, severity: Severity);
41}
42
43/// Options to configure a `Publisher`.
44pub struct PublisherOptions<'t> {
45    blocking: bool,
46    pub(crate) interest: Interest,
47    listen_for_interest_updates: bool,
48    log_sink_client: Option<ClientEnd<LogSinkMarker>>,
49    pub(crate) metatags: HashSet<Metatag>,
50    pub(crate) tags: &'t [&'t str],
51    wait_for_initial_interest: bool,
52    pub(crate) always_log_file_line: bool,
53}
54
55impl Default for PublisherOptions<'_> {
56    fn default() -> Self {
57        Self {
58            blocking: false,
59            interest: Interest::default(),
60            listen_for_interest_updates: true,
61            log_sink_client: None,
62            metatags: HashSet::new(),
63            tags: &[],
64            wait_for_initial_interest: true,
65            always_log_file_line: false,
66        }
67    }
68}
69
70impl PublisherOptions<'_> {
71    /// Creates a `PublishOptions` with all sets either empty or set to false. This is
72    /// useful when fine grain control of `Publisher` and its behavior is necessary.
73    ///
74    /// However, for the majority of binaries that "just want to log",
75    /// `PublishOptions::default` is preferred as that brings all the default
76    /// configuration that is desired in most scenarios.
77    pub fn empty() -> Self {
78        Self {
79            blocking: false,
80            interest: Interest::default(),
81            listen_for_interest_updates: false,
82            log_sink_client: None,
83            metatags: HashSet::new(),
84            tags: &[],
85            wait_for_initial_interest: false,
86            always_log_file_line: false,
87        }
88    }
89}
90macro_rules! publisher_options {
91    ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
92        $(
93            impl<'t> $name<'t> {
94                /// Whether or not to block on initial runtime interest being received before
95                /// starting to emit log records using the default interest configured.
96                ///
97                /// It's recommended that this is set when
98                /// developing to guarantee that a dynamically configured minimum severity makes it
99                /// to the component before it starts emitting logs.
100                ///
101                /// Default: true.
102                pub fn wait_for_initial_interest(mut $self, enable: bool) -> Self {
103                    let this = &mut $self$(.$self_arg)*;
104                    this.wait_for_initial_interest = enable;
105                    $self
106                }
107
108                /// Whether or not to log file/line information regardless of severity.
109                ///
110                /// Default: false.
111                pub fn log_file_line_info(mut $self, enable: bool) -> Self {
112                    let this = &mut $self$(.$self_arg)*;
113                    this.always_log_file_line = enable;
114                    $self
115                }
116
117                /// When set, a `fuchsia_async::Task` will be spawned and held that will be
118                /// listening for interest changes.
119                ///
120                /// Default: true
121                pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
122                    let this = &mut $self$(.$self_arg)*;
123                    this.listen_for_interest_updates = enable;
124                    $self
125                }
126
127                /// Sets the `LogSink` that will be used.
128                ///
129                /// Default: the `fuchsia.logger.LogSink` available in the incoming namespace.
130                pub fn use_log_sink(mut $self, client: ClientEnd<LogSinkMarker>) -> Self {
131                    let this = &mut $self$(.$self_arg)*;
132                    this.log_sink_client = Some(client);
133                    $self
134                }
135
136                /// When set to true, writes to the log socket will be blocking. This is, we'll
137                /// retry every time the socket buffer is full until we are able to write the log.
138                ///
139                /// Default: false
140                pub fn blocking(mut $self, is_blocking: bool) -> Self {
141                    let this = &mut $self$(.$self_arg)*;
142                    this.blocking = is_blocking;
143                    $self
144                }
145            }
146        )*
147    };
148}
149
150publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
151
152fn initialize_publishing(opts: PublishOptions<'_>) -> Result<Publisher, PublishError> {
153    let publisher = Publisher::new(opts.publisher)?;
154    publisher.register_logger()?;
155    if opts.install_panic_hook {
156        crate::install_panic_hook(opts.panic_prefix);
157    }
158    Ok(publisher)
159}
160
161/// Initializes logging with the given options.
162///
163/// IMPORTANT: this should be called at most once in a program, and must be
164/// called only after an async executor has been set for the current thread,
165/// otherwise it'll return errors or panic. Therefore it's recommended to never
166/// call this from libraries and only do it from binaries.
167pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
168    let _ = initialize_publishing(opts)?;
169    Ok(())
170}
171
172/// Sets the global minimum log severity.
173/// IMPORTANT: this function can panic if `initialize` wasn't called before.
174pub fn set_minimum_severity(severity: impl Into<Severity>) {
175    let severity: Severity = severity.into();
176    log::set_max_level(severity.into());
177}
178
179struct AbortAndJoinOnDrop(
180    Option<futures::future::AbortHandle>,
181    Option<std::thread::JoinHandle<()>>,
182);
183impl Drop for AbortAndJoinOnDrop {
184    fn drop(&mut self) {
185        if let Some(handle) = &mut self.0 {
186            handle.abort();
187        }
188        self.1.take().unwrap().join().unwrap();
189    }
190}
191
192/// Initializes logging with the given options.
193///
194/// This must be used when working in an environment where a [`fuchsia_async::Executor`] can't be
195/// used.
196///
197/// IMPORTANT: this should be called at most once in a program, and must be
198/// called only after an async executor has been set for the current thread,
199/// otherwise it'll return errors or panic. Therefore it's recommended to never
200/// call this from libraries and only do it from binaries.
201pub fn initialize_sync(opts: PublishOptions<'_>) -> impl Drop {
202    let (send, recv) = std::sync::mpsc::channel();
203    let (ready_send, ready_recv) = {
204        let (snd, rcv) = std::sync::mpsc::channel();
205        if opts.publisher.wait_for_initial_interest {
206            (Some(snd), Some(rcv))
207        } else {
208            (None, None)
209        }
210    };
211    let PublishOptions {
212        publisher:
213            PublisherOptions {
214                blocking,
215                interest,
216                metatags,
217                listen_for_interest_updates,
218                log_sink_client,
219                tags,
220                wait_for_initial_interest,
221                always_log_file_line,
222            },
223        install_panic_hook,
224        panic_prefix,
225    } = opts;
226    let tags = tags.iter().map(|s| s.to_string()).collect::<Vec<_>>();
227
228    let bg_thread = std::thread::spawn(move || {
229        let options = PublishOptions {
230            publisher: PublisherOptions {
231                interest,
232                metatags,
233                tags: &tags.iter().map(String::as_ref).collect::<Vec<_>>(),
234                listen_for_interest_updates,
235                log_sink_client,
236                wait_for_initial_interest,
237                blocking,
238                always_log_file_line,
239            },
240            install_panic_hook,
241            panic_prefix,
242        };
243        let mut exec = fuchsia_async::LocalExecutor::new();
244        let mut publisher = initialize_publishing(options).expect("initialize logging");
245        if let Some(ready_send) = ready_send {
246            ready_send.send(()).unwrap();
247        }
248
249        let interest_listening_task = publisher.take_interest_listening_task();
250
251        if let Some(on_interest_changes) = interest_listening_task {
252            let (on_interest_changes, cancel_interest) =
253                futures::future::abortable(on_interest_changes);
254            send.send(cancel_interest).unwrap();
255            drop(send);
256            exec.run_singlethreaded(on_interest_changes).ok();
257        }
258    });
259    if let Some(ready_recv) = ready_recv {
260        let _ = ready_recv.recv();
261    }
262
263    AbortAndJoinOnDrop(recv.recv().ok(), Some(bg_thread))
264}
265
266/// A `Publisher` acts as broker, implementing [`log::Log`] to receive log
267/// events from a component, and then forwarding that data on to a diagnostics service.
268#[derive(Clone)]
269pub struct Publisher {
270    inner: Arc<InnerPublisher>,
271}
272
273struct InnerPublisher {
274    sink: Sink,
275    filter: InterestFilter,
276    interest_listening_task: Mutex<Option<fasync::Task<()>>>,
277}
278
279impl Default for Publisher {
280    fn default() -> Self {
281        Self::new(PublisherOptions::default()).expect("failed to create Publisher")
282    }
283}
284
285impl Publisher {
286    /// Construct a new `Publisher` using the given options.
287    ///
288    /// Should be called only once.
289    pub fn new(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
290        let client = match opts.log_sink_client {
291            Some(log_sink) => log_sink,
292            None => connect_to_protocol()
293                .map_err(|e| e.to_string())
294                .map_err(PublishError::LogSinkConnect)?,
295        };
296        let sink = Sink::new(
297            &client,
298            SinkConfig {
299                tags: opts.tags.iter().map(|s| s.to_string()).collect(),
300                metatags: opts.metatags,
301                retry_on_buffer_full: opts.blocking,
302                always_log_file_line: opts.always_log_file_line,
303            },
304        )?;
305        let (filter, on_change) =
306            InterestFilter::new(client, opts.interest, opts.wait_for_initial_interest);
307        let interest_listening_task = if opts.listen_for_interest_updates {
308            Mutex::new(Some(fasync::Task::spawn(on_change)))
309        } else {
310            Mutex::new(None)
311        };
312        Ok(Self { inner: Arc::new(InnerPublisher { sink, filter, interest_listening_task }) })
313    }
314
315    // TODO(https://fxbug.dev/42150573) delete this and make Publisher private
316    /// Publish the provided event for testing.
317    pub fn event_for_testing(&self, record: TestRecord<'_>) {
318        if self.inner.filter.enabled_for_testing(&record) {
319            self.inner.sink.event_for_testing(record);
320        }
321    }
322
323    /// Registers an interest listener
324    pub fn set_interest_listener<T>(&self, listener: T)
325    where
326        T: OnInterestChanged + Send + Sync + 'static,
327    {
328        self.inner.filter.set_interest_listener(listener);
329    }
330
331    /// Takes the task listening for interest changes if one exists.
332    fn take_interest_listening_task(&mut self) -> Option<fasync::Task<()>> {
333        self.inner.interest_listening_task.lock().unwrap().take()
334    }
335
336    /// Sets the global logger to this publisher. This function may only be called once in the
337    /// lifetime of a program.
338    pub fn register_logger(&self) -> Result<(), PublishError> {
339        // SAFETY: This leaks which guarantees the publisher remains alive for the lifetime of the
340        // program.
341        unsafe {
342            let ptr = Arc::into_raw(self.inner.clone());
343            log::set_logger(&*ptr).inspect_err(|_| {
344                let _ = Arc::from_raw(ptr);
345            })?;
346        }
347        Ok(())
348    }
349}
350
351impl log::Log for InnerPublisher {
352    fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
353        // NOTE: we handle minimum severity directly through the log max_level. So we call,
354        // log::set_max_level, log::max_level where appropriate.
355        true
356    }
357
358    fn log(&self, record: &log::Record<'_>) {
359        self.sink.record_log(record);
360    }
361
362    fn flush(&self) {}
363}
364
365impl log::Log for Publisher {
366    #[inline]
367    fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
368        self.inner.enabled(metadata)
369    }
370
371    #[inline]
372    fn log(&self, record: &log::Record<'_>) {
373        self.inner.log(record)
374    }
375
376    #[inline]
377    fn flush(&self) {
378        self.inner.flush()
379    }
380}
381
382/// Errors arising while forwarding a diagnostics stream to the environment.
383#[derive(Debug, Error)]
384pub enum PublishError {
385    /// Connection to fuchsia.logger.LogSink failed.
386    #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
387    LogSinkConnect(String),
388
389    /// Couldn't create a new socket.
390    #[error("failed to create a socket for logging")]
391    MakeSocket(#[source] zx::Status),
392
393    /// An issue with the LogSink channel or socket prevented us from sending it to the `LogSink`.
394    #[error("failed to send a socket to the LogSink")]
395    SendSocket(#[source] fidl::Error),
396
397    /// Installing a Logger.
398    #[error("failed to install the loger")]
399    InitLogForward(#[from] log::SetLoggerError),
400}
401
402#[cfg(test)]
403static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
404
405/// Increments the test clock.
406#[cfg(test)]
407pub fn increment_clock(duration: Duration) {
408    CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
409}
410
411/// Gets the current monotonic time in nanoseconds.
412#[doc(hidden)]
413pub fn get_now() -> i64 {
414    #[cfg(not(test))]
415    return zx::MonotonicInstant::get().into_nanos();
416
417    #[cfg(test)]
418    CURRENT_TIME_NANOS.load(Ordering::Relaxed)
419}
420
421/// Logs every N seconds using an Atomic variable
422/// to keep track of the time. This will have a higher
423/// performance impact on ARM compared to regular logging due to the use
424/// of an atomic.
425#[macro_export]
426macro_rules! log_every_n_seconds {
427    ($seconds:expr, $severity:expr, $($arg:tt)*) => {
428        use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
429        use $crate::{paste, fuchsia::get_now};
430
431        let now = get_now();
432
433        static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
434        if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
435            paste! {
436                log::[< $severity:lower >]!($($arg)*);
437            }
438            LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
439        }
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use diagnostics_reader::ArchiveReader;
447    use fidl_fuchsia_diagnostics_crasher::{CrasherMarker, CrasherProxy};
448    use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
449    use futures::{future, StreamExt};
450    use log::{debug, info};
451    use moniker::ExtendedMoniker;
452
453    #[fuchsia::test]
454    async fn panic_integration_test() {
455        let builder = RealmBuilder::new().await.unwrap();
456        let puppet = builder
457            .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
458            .await
459            .unwrap();
460        builder
461            .add_route(
462                Route::new()
463                    .capability(Capability::protocol::<CrasherMarker>())
464                    .from(&puppet)
465                    .to(Ref::parent()),
466            )
467            .await
468            .unwrap();
469        let realm = builder.build().await.unwrap();
470        let child_name = realm.root.child_name();
471        let reader = ArchiveReader::logs();
472        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
473        let proxy: CrasherProxy = realm.root.connect_to_protocol_at_exposed_dir().unwrap();
474        let target_moniker =
475            ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
476                .unwrap();
477        proxy.crash("This is a test panic.").await.unwrap();
478
479        let result =
480            logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
481        assert_eq!(result.line_number(), Some(29).as_ref());
482        assert_eq!(
483            result.file_path(),
484            Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
485        );
486        assert!(result
487            .payload_keys()
488            .unwrap()
489            .get_property("info")
490            .unwrap()
491            .to_string()
492            .contains("This is a test panic."));
493    }
494
495    #[fuchsia::test(logging = false)]
496    async fn verify_setting_minimum_log_severity() {
497        let reader = ArchiveReader::logs();
498        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
499        let publisher = Publisher::new(PublisherOptions {
500            tags: &["verify_setting_minimum_log_severity"],
501            ..PublisherOptions::empty()
502        })
503        .expect("initialized log");
504        log::set_boxed_logger(Box::new(publisher)).unwrap();
505
506        info!("I'm an info log");
507        debug!("I'm a debug log and won't show up");
508
509        set_minimum_severity(Severity::Debug);
510        debug!("I'm a debug log and I show up");
511
512        let results = logs
513            .filter(|data| {
514                future::ready(
515                    data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
516                )
517            })
518            .take(2)
519            .collect::<Vec<_>>()
520            .await;
521        assert_eq!(results[0].msg().unwrap(), "I'm an info log");
522        assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
523    }
524
525    #[fuchsia::test]
526    async fn log_macro_logs_are_recorded() {
527        let reader = ArchiveReader::logs();
528        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
529
530        let total_threads = 10;
531
532        for i in 0..total_threads {
533            std::thread::spawn(move || {
534                log::info!(thread=i; "log from thread {}", i);
535            });
536        }
537
538        let mut results = logs
539            .filter(|data| {
540                future::ready(
541                    data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded"),
542                )
543            })
544            .take(total_threads);
545
546        let mut seen = vec![];
547        while let Some(log) = results.next().await {
548            let hierarchy = log.payload_keys().unwrap();
549            assert_eq!(hierarchy.properties.len(), 1);
550            assert_eq!(hierarchy.properties[0].name(), "thread");
551            let thread_id = hierarchy.properties[0].uint().unwrap();
552            seen.push(thread_id as usize);
553            assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
554        }
555        seen.sort();
556        assert_eq!(seen, (0..total_threads).collect::<Vec<_>>());
557    }
558}