1use crate::PublishOptions;
5use diagnostics_log_types::Severity;
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_logger::LogSinkMarker;
8use fuchsia_async as fasync;
9use fuchsia_component_client::connect::connect_to_protocol;
10use std::collections::HashSet;
11use std::fmt::Debug;
12use std::sync::{Arc, Mutex};
13use thiserror::Error;
14
15#[cfg(fuchsia_api_level_less_than = "27")]
16use fidl_fuchsia_diagnostics::Interest;
17#[cfg(fuchsia_api_level_at_least = "27")]
18use fidl_fuchsia_diagnostics_types::Interest;
19
20mod filter;
21mod sink;
22
23use filter::InterestFilter;
24use sink::{Sink, SinkConfig};
25
26pub use diagnostics_log_encoding::encode::TestRecord;
27pub use diagnostics_log_encoding::Metatag;
28pub use paste::paste;
29pub use sink::LogEvent;
30
31#[cfg(test)]
32use std::{
33 sync::atomic::{AtomicI64, Ordering},
34 time::Duration,
35};
36
37pub trait OnInterestChanged {
39 fn on_changed(&self, severity: Severity);
41}
42
43pub struct PublisherOptions<'t> {
45 blocking: bool,
46 pub(crate) interest: Interest,
47 listen_for_interest_updates: bool,
48 log_sink_client: Option<ClientEnd<LogSinkMarker>>,
49 pub(crate) metatags: HashSet<Metatag>,
50 pub(crate) tags: &'t [&'t str],
51 wait_for_initial_interest: bool,
52 pub(crate) always_log_file_line: bool,
53}
54
55impl Default for PublisherOptions<'_> {
56 fn default() -> Self {
57 Self {
58 blocking: false,
59 interest: Interest::default(),
60 listen_for_interest_updates: true,
61 log_sink_client: None,
62 metatags: HashSet::new(),
63 tags: &[],
64 wait_for_initial_interest: true,
65 always_log_file_line: false,
66 }
67 }
68}
69
70impl PublisherOptions<'_> {
71 pub fn empty() -> Self {
78 Self {
79 blocking: false,
80 interest: Interest::default(),
81 listen_for_interest_updates: false,
82 log_sink_client: None,
83 metatags: HashSet::new(),
84 tags: &[],
85 wait_for_initial_interest: false,
86 always_log_file_line: false,
87 }
88 }
89}
90macro_rules! publisher_options {
91 ($(($name:ident, $self:ident, $($self_arg:ident),*)),*) => {
92 $(
93 impl<'t> $name<'t> {
94 pub fn wait_for_initial_interest(mut $self, enable: bool) -> Self {
103 let this = &mut $self$(.$self_arg)*;
104 this.wait_for_initial_interest = enable;
105 $self
106 }
107
108 pub fn log_file_line_info(mut $self, enable: bool) -> Self {
112 let this = &mut $self$(.$self_arg)*;
113 this.always_log_file_line = enable;
114 $self
115 }
116
117 pub fn listen_for_interest_updates(mut $self, enable: bool) -> Self {
122 let this = &mut $self$(.$self_arg)*;
123 this.listen_for_interest_updates = enable;
124 $self
125 }
126
127 pub fn use_log_sink(mut $self, client: ClientEnd<LogSinkMarker>) -> Self {
131 let this = &mut $self$(.$self_arg)*;
132 this.log_sink_client = Some(client);
133 $self
134 }
135
136 pub fn blocking(mut $self, is_blocking: bool) -> Self {
141 let this = &mut $self$(.$self_arg)*;
142 this.blocking = is_blocking;
143 $self
144 }
145 }
146 )*
147 };
148}
149
150publisher_options!((PublisherOptions, self,), (PublishOptions, self, publisher));
151
152fn initialize_publishing(opts: PublishOptions<'_>) -> Result<Publisher, PublishError> {
153 let publisher = Publisher::new(opts.publisher)?;
154 publisher.register_logger()?;
155 if opts.install_panic_hook {
156 crate::install_panic_hook(opts.panic_prefix);
157 }
158 Ok(publisher)
159}
160
161pub fn initialize(opts: PublishOptions<'_>) -> Result<(), PublishError> {
168 let _ = initialize_publishing(opts)?;
169 Ok(())
170}
171
172pub fn set_minimum_severity(severity: impl Into<Severity>) {
175 let severity: Severity = severity.into();
176 log::set_max_level(severity.into());
177}
178
179struct AbortAndJoinOnDrop(
180 Option<futures::future::AbortHandle>,
181 Option<std::thread::JoinHandle<()>>,
182);
183impl Drop for AbortAndJoinOnDrop {
184 fn drop(&mut self) {
185 if let Some(handle) = &mut self.0 {
186 handle.abort();
187 }
188 self.1.take().unwrap().join().unwrap();
189 }
190}
191
192pub fn initialize_sync(opts: PublishOptions<'_>) -> impl Drop {
202 let (send, recv) = std::sync::mpsc::channel();
203 let (ready_send, ready_recv) = {
204 let (snd, rcv) = std::sync::mpsc::channel();
205 if opts.publisher.wait_for_initial_interest {
206 (Some(snd), Some(rcv))
207 } else {
208 (None, None)
209 }
210 };
211 let PublishOptions {
212 publisher:
213 PublisherOptions {
214 blocking,
215 interest,
216 metatags,
217 listen_for_interest_updates,
218 log_sink_client,
219 tags,
220 wait_for_initial_interest,
221 always_log_file_line,
222 },
223 install_panic_hook,
224 panic_prefix,
225 } = opts;
226 let tags = tags.iter().map(|s| s.to_string()).collect::<Vec<_>>();
227
228 let bg_thread = std::thread::spawn(move || {
229 let options = PublishOptions {
230 publisher: PublisherOptions {
231 interest,
232 metatags,
233 tags: &tags.iter().map(String::as_ref).collect::<Vec<_>>(),
234 listen_for_interest_updates,
235 log_sink_client,
236 wait_for_initial_interest,
237 blocking,
238 always_log_file_line,
239 },
240 install_panic_hook,
241 panic_prefix,
242 };
243 let mut exec = fuchsia_async::LocalExecutor::new();
244 let mut publisher = initialize_publishing(options).expect("initialize logging");
245 if let Some(ready_send) = ready_send {
246 ready_send.send(()).unwrap();
247 }
248
249 let interest_listening_task = publisher.take_interest_listening_task();
250
251 if let Some(on_interest_changes) = interest_listening_task {
252 let (on_interest_changes, cancel_interest) =
253 futures::future::abortable(on_interest_changes);
254 send.send(cancel_interest).unwrap();
255 drop(send);
256 exec.run_singlethreaded(on_interest_changes).ok();
257 }
258 });
259 if let Some(ready_recv) = ready_recv {
260 let _ = ready_recv.recv();
261 }
262
263 AbortAndJoinOnDrop(recv.recv().ok(), Some(bg_thread))
264}
265
266#[derive(Clone)]
269pub struct Publisher {
270 inner: Arc<InnerPublisher>,
271}
272
273struct InnerPublisher {
274 sink: Sink,
275 filter: InterestFilter,
276 interest_listening_task: Mutex<Option<fasync::Task<()>>>,
277}
278
279impl Default for Publisher {
280 fn default() -> Self {
281 Self::new(PublisherOptions::default()).expect("failed to create Publisher")
282 }
283}
284
285impl Publisher {
286 pub fn new(opts: PublisherOptions<'_>) -> Result<Self, PublishError> {
290 let client = match opts.log_sink_client {
291 Some(log_sink) => log_sink,
292 None => connect_to_protocol()
293 .map_err(|e| e.to_string())
294 .map_err(PublishError::LogSinkConnect)?,
295 };
296 let sink = Sink::new(
297 &client,
298 SinkConfig {
299 tags: opts.tags.iter().map(|s| s.to_string()).collect(),
300 metatags: opts.metatags,
301 retry_on_buffer_full: opts.blocking,
302 always_log_file_line: opts.always_log_file_line,
303 },
304 )?;
305 let (filter, on_change) =
306 InterestFilter::new(client, opts.interest, opts.wait_for_initial_interest);
307 let interest_listening_task = if opts.listen_for_interest_updates {
308 Mutex::new(Some(fasync::Task::spawn(on_change)))
309 } else {
310 Mutex::new(None)
311 };
312 Ok(Self { inner: Arc::new(InnerPublisher { sink, filter, interest_listening_task }) })
313 }
314
315 pub fn event_for_testing(&self, record: TestRecord<'_>) {
318 if self.inner.filter.enabled_for_testing(&record) {
319 self.inner.sink.event_for_testing(record);
320 }
321 }
322
323 pub fn set_interest_listener<T>(&self, listener: T)
325 where
326 T: OnInterestChanged + Send + Sync + 'static,
327 {
328 self.inner.filter.set_interest_listener(listener);
329 }
330
331 fn take_interest_listening_task(&mut self) -> Option<fasync::Task<()>> {
333 self.inner.interest_listening_task.lock().unwrap().take()
334 }
335
336 pub fn register_logger(&self) -> Result<(), PublishError> {
339 unsafe {
342 let ptr = Arc::into_raw(self.inner.clone());
343 log::set_logger(&*ptr).inspect_err(|_| {
344 let _ = Arc::from_raw(ptr);
345 })?;
346 }
347 Ok(())
348 }
349}
350
351impl log::Log for InnerPublisher {
352 fn enabled(&self, _metadata: &log::Metadata<'_>) -> bool {
353 true
356 }
357
358 fn log(&self, record: &log::Record<'_>) {
359 self.sink.record_log(record);
360 }
361
362 fn flush(&self) {}
363}
364
365impl log::Log for Publisher {
366 #[inline]
367 fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
368 self.inner.enabled(metadata)
369 }
370
371 #[inline]
372 fn log(&self, record: &log::Record<'_>) {
373 self.inner.log(record)
374 }
375
376 #[inline]
377 fn flush(&self) {
378 self.inner.flush()
379 }
380}
381
382#[derive(Debug, Error)]
384pub enum PublishError {
385 #[error("failed to connect to fuchsia.logger.LogSink ({0})")]
387 LogSinkConnect(String),
388
389 #[error("failed to create a socket for logging")]
391 MakeSocket(#[source] zx::Status),
392
393 #[error("failed to send a socket to the LogSink")]
395 SendSocket(#[source] fidl::Error),
396
397 #[error("failed to install the loger")]
399 InitLogForward(#[from] log::SetLoggerError),
400}
401
402#[cfg(test)]
403static CURRENT_TIME_NANOS: AtomicI64 = AtomicI64::new(Duration::from_secs(10).as_nanos() as i64);
404
405#[cfg(test)]
407pub fn increment_clock(duration: Duration) {
408 CURRENT_TIME_NANOS.fetch_add(duration.as_nanos() as i64, Ordering::SeqCst);
409}
410
411#[doc(hidden)]
413pub fn get_now() -> i64 {
414 #[cfg(not(test))]
415 return zx::MonotonicInstant::get().into_nanos();
416
417 #[cfg(test)]
418 CURRENT_TIME_NANOS.load(Ordering::Relaxed)
419}
420
421#[macro_export]
426macro_rules! log_every_n_seconds {
427 ($seconds:expr, $severity:expr, $($arg:tt)*) => {
428 use std::{time::Duration, sync::atomic::{Ordering, AtomicI64}};
429 use $crate::{paste, fuchsia::get_now};
430
431 let now = get_now();
432
433 static LAST_LOG_TIMESTAMP: AtomicI64 = AtomicI64::new(0);
434 if now - LAST_LOG_TIMESTAMP.load(Ordering::Acquire) >= Duration::from_secs($seconds).as_nanos() as i64 {
435 paste! {
436 log::[< $severity:lower >]!($($arg)*);
437 }
438 LAST_LOG_TIMESTAMP.store(now, Ordering::Release);
439 }
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use diagnostics_reader::ArchiveReader;
447 use fidl_fuchsia_diagnostics_crasher::{CrasherMarker, CrasherProxy};
448 use fuchsia_component_test::{Capability, ChildOptions, RealmBuilder, Ref, Route};
449 use futures::{future, StreamExt};
450 use log::{debug, info};
451 use moniker::ExtendedMoniker;
452
453 #[fuchsia::test]
454 async fn panic_integration_test() {
455 let builder = RealmBuilder::new().await.unwrap();
456 let puppet = builder
457 .add_child("rust-crasher", "#meta/crasher.cm", ChildOptions::new())
458 .await
459 .unwrap();
460 builder
461 .add_route(
462 Route::new()
463 .capability(Capability::protocol::<CrasherMarker>())
464 .from(&puppet)
465 .to(Ref::parent()),
466 )
467 .await
468 .unwrap();
469 let realm = builder.build().await.unwrap();
470 let child_name = realm.root.child_name();
471 let reader = ArchiveReader::logs();
472 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
473 let proxy: CrasherProxy = realm.root.connect_to_protocol_at_exposed_dir().unwrap();
474 let target_moniker =
475 ExtendedMoniker::parse_str(&format!("realm_builder:{}/rust-crasher", child_name))
476 .unwrap();
477 proxy.crash("This is a test panic.").await.unwrap();
478
479 let result =
480 logs.filter(|data| future::ready(target_moniker == data.moniker)).next().await.unwrap();
481 assert_eq!(result.line_number(), Some(29).as_ref());
482 assert_eq!(
483 result.file_path(),
484 Some("src/lib/diagnostics/log/rust/rust-crasher/src/main.rs")
485 );
486 assert!(result
487 .payload_keys()
488 .unwrap()
489 .get_property("info")
490 .unwrap()
491 .to_string()
492 .contains("This is a test panic."));
493 }
494
495 #[fuchsia::test(logging = false)]
496 async fn verify_setting_minimum_log_severity() {
497 let reader = ArchiveReader::logs();
498 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
499 let publisher = Publisher::new(PublisherOptions {
500 tags: &["verify_setting_minimum_log_severity"],
501 ..PublisherOptions::empty()
502 })
503 .expect("initialized log");
504 log::set_boxed_logger(Box::new(publisher)).unwrap();
505
506 info!("I'm an info log");
507 debug!("I'm a debug log and won't show up");
508
509 set_minimum_severity(Severity::Debug);
510 debug!("I'm a debug log and I show up");
511
512 let results = logs
513 .filter(|data| {
514 future::ready(
515 data.tags().unwrap().iter().any(|t| t == "verify_setting_minimum_log_severity"),
516 )
517 })
518 .take(2)
519 .collect::<Vec<_>>()
520 .await;
521 assert_eq!(results[0].msg().unwrap(), "I'm an info log");
522 assert_eq!(results[1].msg().unwrap(), "I'm a debug log and I show up");
523 }
524
525 #[fuchsia::test]
526 async fn log_macro_logs_are_recorded() {
527 let reader = ArchiveReader::logs();
528 let (logs, _) = reader.snapshot_then_subscribe().unwrap().split_streams();
529
530 let total_threads = 10;
531
532 for i in 0..total_threads {
533 std::thread::spawn(move || {
534 log::info!(thread=i; "log from thread {}", i);
535 });
536 }
537
538 let mut results = logs
539 .filter(|data| {
540 future::ready(
541 data.tags().unwrap().iter().any(|t| t == "log_macro_logs_are_recorded"),
542 )
543 })
544 .take(total_threads);
545
546 let mut seen = vec![];
547 while let Some(log) = results.next().await {
548 let hierarchy = log.payload_keys().unwrap();
549 assert_eq!(hierarchy.properties.len(), 1);
550 assert_eq!(hierarchy.properties[0].name(), "thread");
551 let thread_id = hierarchy.properties[0].uint().unwrap();
552 seen.push(thread_id as usize);
553 assert_eq!(log.msg().unwrap(), format!("log from thread {thread_id}"));
554 }
555 seen.sort();
556 assert_eq!(seen, (0..total_threads).collect::<Vec<_>>());
557 }
558}