diagnostics_log/fuchsia/
mod.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
3
4use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
7use fuchsia_async as fasync;
8use fuchsia_component::client::connect_to_protocol;
9use std::collections::HashSet;
10use std::fmt::Debug;
11use std::sync::{Arc, Mutex};
12use thiserror::Error;
13
14#[cfg(fuchsia_api_level_less_than = "NEXT")]
15use fidl_fuchsia_diagnostics::Interest;
16#[cfg(fuchsia_api_level_at_least = "NEXT")]
17use fidl_fuchsia_diagnostics_types::Interest;
18
19mod filter;
20mod sink;
21
22use filter::InterestFilter;
23use sink::{Sink, SinkConfig};
24
25pub use diagnostics_log_encoding::encode::TestRecord;
26pub use diagnostics_log_encoding::Metatag;
27pub use paste::paste;
28pub use sink::LogEvent;
29
30#[cfg(test)]
31use std::{
32    sync::atomic::{AtomicI64, Ordering},
33    time::Duration,
34};
35
36/// Callback for interest listeners
37pub trait OnInterestChanged {
38    /// Callback for when the interest changes
39    fn on_changed(&self, severity: Severity);
40}
41
42/// Options to configure a `Publisher`.
43pub struct PublisherOptions<'t> {
44    blocking: bool,
45    pub(crate) interest: Interest,
46    listen_for_interest_updates: bool,
47    log_sink_proxy: Option<LogSinkProxy>,
48    pub(crate) metatags: HashSet<Metatag>,
49    pub(crate) tags: &'t [&'t str],
50    wait_for_initial_interest: bool,
51    pub(crate) always_log_file_line: bool,
52}
53
54impl Default for PublisherOptions<'_> {
55    fn default() -> Self {
56        Self {
57            blocking: false,
58            interest: Interest::default(),
59            listen_for_interest_updates: true,
60            log_sink_proxy: None,
61            metatags: HashSet::new(),
62            tags: &[],
63            wait_for_initial_interest: true,
64            always_log_file_line: false,
65        }
66    }
67}
68
69impl PublisherOptions<'_> {
70    /// Creates a `PublishOptions` with all sets either empty or set to false. This is
71    /// useful when fine grain control of `Publisher` and its behavior is necessary.
72    ///
73    /// However, for the majority of binaries that "just want to log",
74    /// `PublishOptions::default` is preferred as that brings all the default
75    /// configuration that is desired in most scenarios.
76    pub fn empty() -> Self {
77        Self {
78            blocking: false,
79            interest: Interest::default(),
80            listen_for_interest_updates: false,
81            log_sink_proxy: None,
82            metatags: HashSet::new(),
83            tags: &[],
84            wait_for_initial_interest: false,
85            always_log_file_line: false,
86        }
87    }
88}
89macro_rules! publisher_options {
90    ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
91        $(
92            impl<'t> $name<'t> {
93                /// Whether or not to block on initial runtime interest being received before
94                /// starting to emit log records using the default interest configured.
95                ///
96                /// It's recommended that this is set when
97                /// developing to guarantee that a dynamically configured minimum severity makes it
98                /// to the component before it starts emitting logs.
99                ///
100                /// Default: true.
101                pub fn wait_for_initial_interest(mut $self, enable: bool) -> Self {
102                    let this = &mut $self$(.$self_arg)*;
103                    this.wait_for_initial_interest = enable;
104                    $self
105                }
106
107                /// Whether or not to log file/line information regardless of severity.
108                ///
109                /// Default: false.
110                pub fn log_file_line_info(mut $self, enable: bool) -> Self {
111                    let this = &mut $self$(.$self_arg)*;
112                    this.always_log_file_line = enable;
113                    $self
114                }
115
116                /// When set, a `fuchsia_async::Task` will be spawned and held that will be
117                /// listening for interest changes.
118                ///
119                /// Default: true
120                pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
121                    let this = &mut $self$(.$self_arg)*;
122                    this.listen_for_interest_updates = enable;
123                    $self
124                }
125
126                /// Sets the `LogSink` that will be used.
127                ///
128                /// Default: the `fuchsia.logger.LogSink` available in the incoming namespace.
129                pub fn use_log_sink(mut $self, proxy: LogSinkProxy) -> Self {
130                    let this = &mut $self$(.$self_arg)*;
131                    this.log_sink_proxy = Some(proxy);
132                    $self
133                }
134
135                /// When set to true, writes to the log socket will be blocking. This is, we'll
136                /// retry every time the socket buffer is full until we are able to write the log.
137                ///
138                /// Default: false
139                pub fn blocking(mut $self, is_blocking: bool) -> Self {
140                    let this = &mut $self$(.$self_arg)*;
141                    this.blocking = is_blocking;
142                    $self
143                }
144            }
145        )*
146    };
147}
148
149publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
150
151fn initialize_publishing(opts: PublishOptions<'_>) -> Result<Publisher, PublishError> {
152    let publisher = Publisher::new(opts.publisher)?;
153    log::set_boxed_logger(Box::new(publisher.clone()))?;
154    if opts.install_panic_hook {
155        crate::install_panic_hook(opts.panic_prefix);
156    }
157    Ok(publisher)
158}
159
160/// Initializes logging with the given options.
161///
162/// IMPORTANT: this should be called at most once in a program, and must be
163/// called only after an async executor has been set for the current thread,
164/// otherwise it'll return errors or panic. Therefore it's recommended to never
165/// call this from libraries and only do it from binaries.
166pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
167    let _ = initialize_publishing(opts)?;
168    Ok(())
169}
170
171/// Sets the global minimum log severity.
172/// IMPORTANT: this function can panic if `initialize` wasn't called before.
173pub fn set_minimum_severity(severity: impl Into<Severity>) {
174    let severity: Severity = severity.into();
175    log::set_max_level(severity.into());
176}
177
178struct AbortAndJoinOnDrop(
179    Option<futures::future::AbortHandle>,
180    Option<std::thread::JoinHandle<()>>,
181);
182impl Drop for AbortAndJoinOnDrop {
183    fn drop(&mut self) {
184        if let Some(handle) = &mut self.0 {
185            handle.abort();
186        }
187        self.1.take().unwrap().join().unwrap();
188    }
189}
190
191/// Initializes logging with the given options.
192///
193/// This must be used when working in an environment where a [`fuchsia_async::Executor`] can't be
194/// used.
195///
196/// IMPORTANT: this should be called at most once in a program, and must be
197/// called only after an async executor has been set for the current thread,
198/// otherwise it'll return errors or panic. Therefore it's recommended to never
199/// call this from libraries and only do it from binaries.
200pub fn initialize_sync(opts: PublishOptions<'_>) -> impl Drop {
201    let (send, recv) = std::sync::mpsc::channel();
202    let (ready_send, ready_recv) = {
203        let (snd, rcv) = std::sync::mpsc::channel();
204        if opts.publisher.wait_for_initial_interest {
205            (Some(snd), Some(rcv))
206        } else {
207            (None, None)
208        }
209    };
210    let PublishOptions {
211        publisher:
212            PublisherOptions {
213                blocking,
214                interest,
215                metatags,
216                listen_for_interest_updates,
217                log_sink_proxy,
218                tags,
219                wait_for_initial_interest,
220                always_log_file_line,
221            },
222        install_panic_hook,
223        panic_prefix,
224    } = opts;
225    let tags = tags.iter().map(|s| s.to_string()).collect::<Vec<_>>();
226
227    let bg_thread = std::thread::spawn(move || {
228        let options = PublishOptions {
229            publisher: PublisherOptions {
230                interest,
231                metatags,
232                tags: &tags.iter().map(String::as_ref).collect::<Vec<_>>(),
233                listen_for_interest_updates,
234                log_sink_proxy,
235                wait_for_initial_interest,
236                blocking,
237                always_log_file_line,
238            },
239            install_panic_hook,
240            panic_prefix,
241        };
242        let mut exec = fuchsia_async::LocalExecutor::new();
243        let mut publisher = initialize_publishing(options).expect("initialize logging");
244        if let Some(ready_send) = ready_send {
245            ready_send.send(()).unwrap();
246        }
247
248        let interest_listening_task = publisher.take_interest_listening_task();
249
250        if let Some(on_interest_changes) = interest_listening_task {
251            let (on_interest_changes, cancel_interest) =
252                futures::future::abortable(on_interest_changes);
253            send.send(cancel_interest).unwrap();
254            drop(send);
255            exec.run_singlethreaded(on_interest_changes).ok();
256        }
257    });
258    if let Some(ready_recv) = ready_recv {
259        let _ = ready_recv.recv();
260    }
261
262    AbortAndJoinOnDrop(recv.recv().ok(), Some(bg_thread))
263}
264
265/// A `Publisher` acts as broker, implementing [`log::Log`] to receive log
266/// events from a component, and then forwarding that data on to a diagnostics service.
267#[derive(Clone)]
268pub struct Publisher {
269    inner: Arc<InnerPublisher>,
270}
271
272struct InnerPublisher {
273    sink: Sink,
274    filter: InterestFilter,
275    interest_listening_task: Mutex<Option<fasync::Task<()>>>,
276}
277
278impl Default for Publisher {
279    fn default() -> Self {
280        Self::new(PublisherOptions::default()).expect("failed to create Publisher")
281    }
282}
283
284impl Publisher {
285    /// Construct a new `Publisher` using the given options.
286    ///
287    /// Should be called only once.
288    pub fn new(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
289        let proxy = match opts.log_sink_proxy {
290            Some(log_sink) => log_sink,
291            None => connect_to_protocol::<LogSinkMarker>()
292                .map_err(|e| e.to_string())
293                .map_err(PublishError::LogSinkConnect)?,
294        };
295        let sink = Sink::new(
296            &proxy,
297            SinkConfig {
298                tags: opts.tags.iter().map(|s| s.to_string()).collect(),
299                metatags: opts.metatags,
300                retry_on_buffer_full: opts.blocking,
301                always_log_file_line: opts.always_log_file_line,
302            },
303        )?;
304        let (filter, on_change) =
305            InterestFilter::new(proxy, opts.interest, opts.wait_for_initial_interest);
306        let interest_listening_task = if opts.listen_for_interest_updates {
307            Mutex::new(Some(fasync::Task::spawn(on_change)))
308        } else {
309            Mutex::new(None)
310        };
311        Ok(Self { inner: Arc::new(InnerPublisher { sink, filter, interest_listening_task }) })
312    }
313
314    // TODO(https://fxbug.dev/42150573) delete this and make Publisher private
315    /// Publish the provided event for testing.
316    pub fn event_for_testing(&self, record: TestRecord<'_>) {
317        if self.inner.filter.enabled_for_testing(&record) {
318            self.inner.sink.event_for_testing(record);
319        }
320    }
321
322    /// Registers an interest listener
323    pub fn set_interest_listener<T>(&self, listener: T)
324    where
325        T: OnInterestChanged + Send + Sync + 'static,
326    {
327        self.inner.filter.set_interest_listener(listener);
328    }
329
330    /// Takes the task listening for interest changes if one exists.
331    fn take_interest_listening_task(&mut self) -> Option<fasync::Task<()>> {
332        self.inner.interest_listening_task.lock().unwrap().take()
333    }
334}
335
336impl log::Log for Publisher {
337    fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
338        // NOTE: we handle minimum severity directly through the log max_level. So we call,
339        // log::set_max_level, log::max_level where appropriate.
340        true
341    }
342
343    fn log(&self, record: &log::Record<'_>) {
344        self.inner.sink.record_log(record);
345    }
346
347    fn flush(&self) {}
348}
349
350/// Errors arising while forwarding a diagnostics stream to the environment.
351#[derive(Debug, Error)]
352pub enum PublishError {
353    /// Connection to fuchsia.logger.LogSink failed.
354    #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
355    LogSinkConnect(String),
356
357    /// Couldn't create a new socket.
358    #[error("failed to create a socket for logging")]
359    MakeSocket(#[source] zx::Status),
360
361    /// An issue with the LogSink channel or socket prevented us from sending it to the `LogSink`.
362    #[error("failed to send a socket to the LogSink")]
363    SendSocket(#[source] fidl::Error),
364
365    /// Installing a Logger.
366    #[error("failed to install the loger")]
367    InitLogForward(#[from] log::SetLoggerError),
368}
369
370#[cfg(test)]
371static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
372
373/// Increments the test clock.
374#[cfg(test)]
375pub fn increment_clock(duration: Duration) {
376    CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
377}
378
379/// Gets the current monotonic time in nanoseconds.
380#[doc(hidden)]
381pub fn get_now() -> i64 {
382    #[cfg(not(test))]
383    return zx::MonotonicInstant::get().into_nanos();
384
385    #[cfg(test)]
386    CURRENT_TIME_NANOS.load(Ordering::Relaxed)
387}
388
389/// Logs every N seconds using an Atomic variable
390/// to keep track of the time. This will have a higher
391/// performance impact on ARM compared to regular logging due to the use
392/// of an atomic.
393#[macro_export]
394macro_rules! log_every_n_seconds {
395    ($seconds:expr, $severity:expr, $($arg:tt)*) => {
396        use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
397        use $crate::{paste, fuchsia::get_now};
398
399        let now = get_now();
400
401        static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
402        if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
403            paste! {
404                log::[< $severity:lower >]!($($arg)*);
405            }
406            LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
407        }
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414    use diagnostics_reader::ArchiveReader;
415    use fidl_fuchsia_diagnostics_crasher::CrasherMarker;
416    use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
417    use futures::{future, StreamExt};
418    use log::{debug, info};
419    use moniker::ExtendedMoniker;
420
421    #[fuchsia::test]
422    async fn panic_integration_test() {
423        let builder = RealmBuilder::new().await.unwrap();
424        let puppet = builder
425            .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
426            .await
427            .unwrap();
428        builder
429            .add_route(
430                Route::new()
431                    .capability(Capability::protocol::<CrasherMarker>())
432                    .from(&puppet)
433                    .to(Ref::parent()),
434            )
435            .await
436            .unwrap();
437        let realm = builder.build().await.unwrap();
438        let child_name = realm.root.child_name();
439        let reader = ArchiveReader::logs();
440        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
441        let proxy = realm.root.connect_to_protocol_at_exposed_dir::<CrasherMarker>().unwrap();
442        let target_moniker =
443            ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
444                .unwrap();
445        proxy.crash("This is a test panic.").await.unwrap();
446
447        let result =
448            logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
449        assert_eq!(result.line_number(), Some(29).as_ref());
450        assert_eq!(
451            result.file_path(),
452            Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
453        );
454        assert!(result
455            .payload_keys()
456            .unwrap()
457            .get_property("info")
458            .unwrap()
459            .to_string()
460            .contains("This is a test panic."));
461    }
462
463    #[fuchsia::test(logging = false)]
464    async fn verify_setting_minimum_log_severity() {
465        let reader = ArchiveReader::logs();
466        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
467        let publisher = Publisher::new(PublisherOptions {
468            tags: &["verify_setting_minimum_log_severity"],
469            ..PublisherOptions::empty()
470        })
471        .expect("initialized log");
472        log::set_boxed_logger(Box::new(publisher)).unwrap();
473
474        info!("I'm an info log");
475        debug!("I'm a debug log and won't show up");
476
477        set_minimum_severity(Severity::Debug);
478        debug!("I'm a debug log and I show up");
479
480        let results = logs
481            .filter(|data| {
482                future::ready(
483                    data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
484                )
485            })
486            .take(2)
487            .collect::<Vec<_>>()
488            .await;
489        assert_eq!(results[0].msg().unwrap(), "I'm an info log");
490        assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
491    }
492
493    #[fuchsia::test]
494    async fn log_macro_logs_are_recorded() {
495        let reader = ArchiveReader::logs();
496        let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
497
498        let total_threads = 10;
499
500        for i in 0..total_threads {
501            std::thread::spawn(move || {
502                log::info!(thread=i; "log from thread {}", i);
503            });
504        }
505
506        let mut results = logs
507            .filter(|data| {
508                future::ready(
509                    data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded"),
510                )
511            })
512            .take(total_threads);
513
514        let mut seen = vec![];
515        while let Some(log) = results.next().await {
516            let hierarchy = log.payload_keys().unwrap();
517            assert_eq!(hierarchy.properties.len(), 1);
518            assert_eq!(hierarchy.properties[0].name(), "thread");
519            let thread_id = hierarchy.properties[0].uint().unwrap();
520            seen.push(thread_id as usize);
521            assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
522        }
523        seen.sort();
524        assert_eq!(seen, (0..total_threads).collect::<Vec<_>>());
525    }
526}