1use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
7use fuchsia_async as fasync;
8use fuchsia_component::client::connect_to_protocol;
9use std::collections::HashSet;
10use std::fmt::Debug;
11use std::sync::{Arc, Mutex};
12use thiserror::Error;
13
14#[cfg(fuchsia_api_level_less_than = "NEXT")]
15use fidl_fuchsia_diagnostics::Interest;
16#[cfg(fuchsia_api_level_at_least = "NEXT")]
17use fidl_fuchsia_diagnostics_types::Interest;
18
19mod filter;
20mod sink;
21
22use filter::InterestFilter;
23use sink::{Sink, SinkConfig};
24
25pub use diagnostics_log_encoding::encode::TestRecord;
26pub use diagnostics_log_encoding::Metatag;
27pub use paste::paste;
28pub use sink::LogEvent;
29
30#[cfg(test)]
31use std::{
32 sync::atomic::{AtomicI64, Ordering},
33 time::Duration,
34};
35
36pub trait OnInterestChanged {
38 fn on_changed(&self, severity: Severity);
40}
41
42pub struct PublisherOptions<'t> {
44 blocking: bool,
45 pub(crate) interest: Interest,
46 listen_for_interest_updates: bool,
47 log_sink_proxy: Option<LogSinkProxy>,
48 pub(crate) metatags: HashSet<Metatag>,
49 pub(crate) tags: &'t [&'t str],
50 wait_for_initial_interest: bool,
51 pub(crate) always_log_file_line: bool,
52}
53
54impl Default for PublisherOptions<'_> {
55 fn default() -> Self {
56 Self {
57 blocking: false,
58 interest: Interest::default(),
59 listen_for_interest_updates: true,
60 log_sink_proxy: None,
61 metatags: HashSet::new(),
62 tags: &[],
63 wait_for_initial_interest: true,
64 always_log_file_line: false,
65 }
66 }
67}
68
69impl PublisherOptions<'_> {
70 pub fn empty() -> Self {
77 Self {
78 blocking: false,
79 interest: Interest::default(),
80 listen_for_interest_updates: false,
81 log_sink_proxy: None,
82 metatags: HashSet::new(),
83 tags: &[],
84 wait_for_initial_interest: false,
85 always_log_file_line: false,
86 }
87 }
88}
89macro_rules! publisher_options {
90 ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
91 $(
92 impl<'t> $name<'t> {
93 pub fn wait_for_initial_interest(mut $self, enable: bool) -> Self {
102 let this = &mut $self$(.$self_arg)*;
103 this.wait_for_initial_interest = enable;
104 $self
105 }
106
107 pub fn log_file_line_info(mut $self, enable: bool) -> Self {
111 let this = &mut $self$(.$self_arg)*;
112 this.always_log_file_line = enable;
113 $self
114 }
115
116 pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
121 let this = &mut $self$(.$self_arg)*;
122 this.listen_for_interest_updates = enable;
123 $self
124 }
125
126 pub fn use_log_sink(mut $self, proxy: LogSinkProxy) -> Self {
130 let this = &mut $self$(.$self_arg)*;
131 this.log_sink_proxy = Some(proxy);
132 $self
133 }
134
135 pub fn blocking(mut $self, is_blocking: bool) -> Self {
140 let this = &mut $self$(.$self_arg)*;
141 this.blocking = is_blocking;
142 $self
143 }
144 }
145 )*
146 };
147}
148
149publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
150
151fn initialize_publishing(opts: PublishOptions<'_>) -> Result<Publisher, PublishError> {
152 let publisher = Publisher::new(opts.publisher)?;
153 log::set_boxed_logger(Box::new(publisher.clone()))?;
154 if opts.install_panic_hook {
155 crate::install_panic_hook(opts.panic_prefix);
156 }
157 Ok(publisher)
158}
159
160pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
167 let _ = initialize_publishing(opts)?;
168 Ok(())
169}
170
171pub fn set_minimum_severity(severity: impl Into<Severity>) {
174 let severity: Severity = severity.into();
175 log::set_max_level(severity.into());
176}
177
178struct AbortAndJoinOnDrop(
179 Option<futures::future::AbortHandle>,
180 Option<std::thread::JoinHandle<()>>,
181);
182impl Drop for AbortAndJoinOnDrop {
183 fn drop(&mut self) {
184 if let Some(handle) = &mut self.0 {
185 handle.abort();
186 }
187 self.1.take().unwrap().join().unwrap();
188 }
189}
190
191pub fn initialize_sync(opts: PublishOptions<'_>) -> impl Drop {
201 let (send, recv) = std::sync::mpsc::channel();
202 let (ready_send, ready_recv) = {
203 let (snd, rcv) = std::sync::mpsc::channel();
204 if opts.publisher.wait_for_initial_interest {
205 (Some(snd), Some(rcv))
206 } else {
207 (None, None)
208 }
209 };
210 let PublishOptions {
211 publisher:
212 PublisherOptions {
213 blocking,
214 interest,
215 metatags,
216 listen_for_interest_updates,
217 log_sink_proxy,
218 tags,
219 wait_for_initial_interest,
220 always_log_file_line,
221 },
222 install_panic_hook,
223 panic_prefix,
224 } = opts;
225 let tags = tags.iter().map(|s| s.to_string()).collect::<Vec<_>>();
226
227 let bg_thread = std::thread::spawn(move || {
228 let options = PublishOptions {
229 publisher: PublisherOptions {
230 interest,
231 metatags,
232 tags: &tags.iter().map(String::as_ref).collect::<Vec<_>>(),
233 listen_for_interest_updates,
234 log_sink_proxy,
235 wait_for_initial_interest,
236 blocking,
237 always_log_file_line,
238 },
239 install_panic_hook,
240 panic_prefix,
241 };
242 let mut exec = fuchsia_async::LocalExecutor::new();
243 let mut publisher = initialize_publishing(options).expect("initialize logging");
244 if let Some(ready_send) = ready_send {
245 ready_send.send(()).unwrap();
246 }
247
248 let interest_listening_task = publisher.take_interest_listening_task();
249
250 if let Some(on_interest_changes) = interest_listening_task {
251 let (on_interest_changes, cancel_interest) =
252 futures::future::abortable(on_interest_changes);
253 send.send(cancel_interest).unwrap();
254 drop(send);
255 exec.run_singlethreaded(on_interest_changes).ok();
256 }
257 });
258 if let Some(ready_recv) = ready_recv {
259 let _ = ready_recv.recv();
260 }
261
262 AbortAndJoinOnDrop(recv.recv().ok(), Some(bg_thread))
263}
264
265#[derive(Clone)]
268pub struct Publisher {
269 inner: Arc<InnerPublisher>,
270}
271
272struct InnerPublisher {
273 sink: Sink,
274 filter: InterestFilter,
275 interest_listening_task: Mutex<Option<fasync::Task<()>>>,
276}
277
278impl Default for Publisher {
279 fn default() -> Self {
280 Self::new(PublisherOptions::default()).expect("failed to create Publisher")
281 }
282}
283
284impl Publisher {
285 pub fn new(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
289 let proxy = match opts.log_sink_proxy {
290 Some(log_sink) => log_sink,
291 None => connect_to_protocol::<LogSinkMarker>()
292 .map_err(|e| e.to_string())
293 .map_err(PublishError::LogSinkConnect)?,
294 };
295 let sink = Sink::new(
296 &proxy,
297 SinkConfig {
298 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
299 metatags: opts.metatags,
300 retry_on_buffer_full: opts.blocking,
301 always_log_file_line: opts.always_log_file_line,
302 },
303 )?;
304 let (filter, on_change) =
305 InterestFilter::new(proxy, opts.interest, opts.wait_for_initial_interest);
306 let interest_listening_task = if opts.listen_for_interest_updates {
307 Mutex::new(Some(fasync::Task::spawn(on_change)))
308 } else {
309 Mutex::new(None)
310 };
311 Ok(Self { inner: Arc::new(InnerPublisher { sink, filter, interest_listening_task }) })
312 }
313
314 pub fn event_for_testing(&self, record: TestRecord<'_>) {
317 if self.inner.filter.enabled_for_testing(&record) {
318 self.inner.sink.event_for_testing(record);
319 }
320 }
321
322 pub fn set_interest_listener<T>(&self, listener: T)
324 where
325 T: OnInterestChanged + Send + Sync + 'static,
326 {
327 self.inner.filter.set_interest_listener(listener);
328 }
329
330 fn take_interest_listening_task(&mut self) -> Option<fasync::Task<()>> {
332 self.inner.interest_listening_task.lock().unwrap().take()
333 }
334}
335
336impl log::Log for Publisher {
337 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
338 true
341 }
342
343 fn log(&self, record: &log::Record<'_>) {
344 self.inner.sink.record_log(record);
345 }
346
347 fn flush(&self) {}
348}
349
350#[derive(Debug, Error)]
352pub enum PublishError {
353 #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
355 LogSinkConnect(String),
356
357 #[error("failed to create a socket for logging")]
359 MakeSocket(#[source] zx::Status),
360
361 #[error("failed to send a socket to the LogSink")]
363 SendSocket(#[source] fidl::Error),
364
365 #[error("failed to install the loger")]
367 InitLogForward(#[from] log::SetLoggerError),
368}
369
370#[cfg(test)]
371static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
372
373#[cfg(test)]
375pub fn increment_clock(duration: Duration) {
376 CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
377}
378
379#[doc(hidden)]
381pub fn get_now() -> i64 {
382 #[cfg(not(test))]
383 return zx::MonotonicInstant::get().into_nanos();
384
385 #[cfg(test)]
386 CURRENT_TIME_NANOS.load(Ordering::Relaxed)
387}
388
389#[macro_export]
394macro_rules! log_every_n_seconds {
395 ($seconds:expr, $severity:expr, $($arg:tt)*) => {
396 use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
397 use $crate::{paste, fuchsia::get_now};
398
399 let now = get_now();
400
401 static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
402 if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
403 paste! {
404 log::[< $severity:lower >]!($($arg)*);
405 }
406 LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
407 }
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use diagnostics_reader::ArchiveReader;
415 use fidl_fuchsia_diagnostics_crasher::CrasherMarker;
416 use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
417 use futures::{future, StreamExt};
418 use log::{debug, info};
419 use moniker::ExtendedMoniker;
420
421 #[fuchsia::test]
422 async fn panic_integration_test() {
423 let builder = RealmBuilder::new().await.unwrap();
424 let puppet = builder
425 .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
426 .await
427 .unwrap();
428 builder
429 .add_route(
430 Route::new()
431 .capability(Capability::protocol::<CrasherMarker>())
432 .from(&puppet)
433 .to(Ref::parent()),
434 )
435 .await
436 .unwrap();
437 let realm = builder.build().await.unwrap();
438 let child_name = realm.root.child_name();
439 let reader = ArchiveReader::logs();
440 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
441 let proxy = realm.root.connect_to_protocol_at_exposed_dir::<CrasherMarker>().unwrap();
442 let target_moniker =
443 ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
444 .unwrap();
445 proxy.crash("This is a test panic.").await.unwrap();
446
447 let result =
448 logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
449 assert_eq!(result.line_number(), Some(29).as_ref());
450 assert_eq!(
451 result.file_path(),
452 Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
453 );
454 assert!(result
455 .payload_keys()
456 .unwrap()
457 .get_property("info")
458 .unwrap()
459 .to_string()
460 .contains("This is a test panic."));
461 }
462
463 #[fuchsia::test(logging = false)]
464 async fn verify_setting_minimum_log_severity() {
465 let reader = ArchiveReader::logs();
466 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
467 let publisher = Publisher::new(PublisherOptions {
468 tags: &["verify_setting_minimum_log_severity"],
469 ..PublisherOptions::empty()
470 })
471 .expect("initialized log");
472 log::set_boxed_logger(Box::new(publisher)).unwrap();
473
474 info!("I'm an info log");
475 debug!("I'm a debug log and won't show up");
476
477 set_minimum_severity(Severity::Debug);
478 debug!("I'm a debug log and I show up");
479
480 let results = logs
481 .filter(|data| {
482 future::ready(
483 data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
484 )
485 })
486 .take(2)
487 .collect::<Vec<_>>()
488 .await;
489 assert_eq!(results[0].msg().unwrap(), "I'm an info log");
490 assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
491 }
492
493 #[fuchsia::test]
494 async fn log_macro_logs_are_recorded() {
495 let reader = ArchiveReader::logs();
496 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
497
498 let total_threads = 10;
499
500 for i in 0..total_threads {
501 std::thread::spawn(move || {
502 log::info!(thread=i; "log from thread {}", i);
503 });
504 }
505
506 let mut results = logs
507 .filter(|data| {
508 future::ready(
509 data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded"),
510 )
511 })
512 .take(total_threads);
513
514 let mut seen = vec![];
515 while let Some(log) = results.next().await {
516 let hierarchy = log.payload_keys().unwrap();
517 assert_eq!(hierarchy.properties.len(), 1);
518 assert_eq!(hierarchy.properties[0].name(), "thread");
519 let thread_id = hierarchy.properties[0].uint().unwrap();
520 seen.push(thread_id as usize);
521 assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
522 }
523 seen.sort();
524 assert_eq!(seen, (0..total_threads).collect::<Vec<_>>());
525 }
526}