1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::{CursorItem, LogsArtifactsContainer};
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::multiplex::{Multiplexer, MultiplexerHandleAction};
11use crate::logs::shared_buffer::SharedBuffer;
12use crate::logs::stats::LogStreamStats;
13use anyhow::format_err;
14use diagnostics_data::{LogsData, Severity};
15use fidl_fuchsia_diagnostics::{LogInterestSelector, Selector, StreamMode};
16use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
17use flyweights::FlyStr;
18use fuchsia_inspect_derive::WithInspect;
19use fuchsia_sync::Mutex;
20use fuchsia_url::boot_url::BootUrl;
21use fuchsia_url::AbsoluteComponentUrl;
22use futures::channel::mpsc;
23use futures::prelude::*;
24use log::{debug, error, LevelFilter};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, LazyLock, Weak};
31use {fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace};
32
33#[derive(Ord, PartialOrd, Eq, PartialEq)]
35pub struct ComponentInitialInterest {
36 component: UrlOrMoniker,
38 log_severity: Severity,
40}
41impl FromStr for ComponentInitialInterest {
44 type Err = anyhow::Error;
45 fn from_str(s: &str) -> Result<Self, Self::Err> {
46 let mut split = s.rsplitn(2, ":");
47 match (split.next(), split.next()) {
48 (Some(severity), Some(url_or_moniker)) => {
49 let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
50 return Err(format_err!("invalid url or moniker"));
51 };
52 let Ok(severity) = Severity::from_str(severity) else {
53 return Err(format_err!("invalid severity"));
54 };
55 Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
56 }
57 _ => Err(format_err!("invalid interest")),
58 }
59 }
60}
61
62#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
63pub enum UrlOrMoniker {
64 Url(FlyStr),
66 Moniker(ExtendedMoniker),
68}
69
70impl FromStr for UrlOrMoniker {
71 type Err = ();
72 fn from_str(s: &str) -> Result<Self, Self::Err> {
73 if AbsoluteComponentUrl::from_str(s).is_ok() || BootUrl::parse(s).is_ok() {
74 Ok(UrlOrMoniker::Url(s.into()))
75 } else if let Ok(moniker) = Moniker::from_str(s) {
76 Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
77 } else {
78 Err(())
79 }
80 }
81}
82
83pub const STATIC_CONNECTION_ID: usize = 0;
85static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
86static ARCHIVIST_MONIKER: LazyLock<Moniker> =
87 LazyLock::new(|| Moniker::parse_str("bootstrap/archivist").unwrap());
88
89pub struct LogsRepository {
92 mutable_state: Mutex<LogsRepositoryState>,
93 shared_buffer: Arc<SharedBuffer>,
94 scope_handle: fasync::ScopeHandle,
95}
96
97impl LogsRepository {
98 pub fn new(
99 logs_max_cached_original_bytes: u64,
100 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
101 parent: &fuchsia_inspect::Node,
102 scope: fasync::Scope,
103 ) -> Arc<Self> {
104 let scope_handle = scope.to_handle();
105 Arc::new_cyclic(|me: &Weak<LogsRepository>| {
106 let me = Weak::clone(me);
107 LogsRepository {
108 scope_handle,
109 mutable_state: Mutex::new(LogsRepositoryState::new(
110 parent,
111 initial_interests,
112 scope,
113 )),
114 shared_buffer: SharedBuffer::new(
115 logs_max_cached_original_bytes as usize,
116 Box::new(move |identity| {
117 if let Some(this) = me.upgrade() {
118 this.on_container_inactive(&identity);
119 }
120 }),
121 ),
122 }
123 })
124 }
125
126 pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
129 where
130 K: DebugLog + Send + Sync + 'static,
131 {
132 let mut mutable_state = self.mutable_state.lock();
133
134 if mutable_state.draining_klog {
137 return;
138 }
139 mutable_state.draining_klog = true;
140
141 let container =
142 mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
143 let Some(ref scope) = mutable_state.scope else {
144 return;
145 };
146 scope.spawn(async move {
147 debug!("Draining debuglog.");
148 let mut kernel_logger = DebugLogBridge::create(klog_reader);
149 let mut messages = match kernel_logger.existing_logs() {
150 Ok(messages) => messages,
151 Err(e) => {
152 error!(e:%; "failed to read from kernel log, important logs may be missing");
153 return;
154 }
155 };
156 messages.sort_by_key(|m| m.timestamp());
157 for message in messages {
158 container.ingest_message(message);
159 }
160
161 let res = kernel_logger
162 .listen()
163 .try_for_each(|message| async {
164 container.ingest_message(message);
165 Ok(())
166 })
167 .await;
168 if let Err(e) = res {
169 error!(e:%; "failed to drain kernel log, important logs may be missing");
170 }
171 });
172 }
173
174 pub fn logs_cursor_raw(
175 &self,
176 mode: StreamMode,
177 selectors: Option<Vec<Selector>>,
178 parent_trace_id: ftrace::Id,
179 ) -> impl Stream<Item = CursorItem> + Send {
180 let mut repo = self.mutable_state.lock();
181 let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
182 let cursor = c.cursor_raw(mode);
183 (Arc::clone(identity), cursor)
184 });
185 let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
186 repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
187 merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
188 merged
189 }
190
191 pub fn logs_cursor(
192 &self,
193 mode: StreamMode,
194 selectors: Option<Vec<Selector>>,
195 parent_trace_id: ftrace::Id,
196 ) -> impl Stream<Item = Arc<LogsData>> + Send + 'static {
197 let mut repo = self.mutable_state.lock();
198 let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
199 let cursor = c.cursor(mode, parent_trace_id);
200 (Arc::clone(identity), cursor)
201 });
202 let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
203 repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
204 merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
205 merged
206 }
207
208 pub fn get_log_container(
209 self: &Arc<Self>,
210 identity: Arc<ComponentIdentity>,
211 ) -> Arc<LogsArtifactsContainer> {
212 self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
213 }
214
215 pub async fn wait_for_termination(&self) {
218 let Some(scope) = self.mutable_state.lock().scope.take() else {
219 error!("Attempted to terminate twice");
220 return;
221 };
222 scope.join().await;
223 debug!("Log ingestion stopped.");
225 self.shared_buffer.terminate().await;
228 let mut repo = self.mutable_state.lock();
229 for container in repo.logs_data_store.values() {
230 container.terminate();
231 }
232 repo.logs_multiplexers.terminate();
233 }
234
235 pub fn stop_accepting_new_log_sinks(&self) {
238 self.scope_handle.close();
239 }
240
241 pub fn new_interest_connection(&self) -> usize {
244 INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
245 }
246
247 pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
249 self.mutable_state.lock().update_logs_interest(connection_id, selectors);
250 }
251
252 pub fn finish_interest_connection(&self, connection_id: usize) {
254 self.mutable_state.lock().finish_interest_connection(connection_id);
255 }
256
257 fn on_container_inactive(&self, identity: &ComponentIdentity) {
258 let mut repo = self.mutable_state.lock();
259 if !repo.is_live(identity) {
260 repo.remove(identity);
261 }
262 }
263}
264
265#[cfg(test)]
266impl LogsRepository {
267 pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
268 LogsRepository::new(
269 crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES,
270 std::iter::empty(),
271 &Default::default(),
272 scope,
273 )
274 }
275}
276
277impl EventConsumer for LogsRepository {
278 fn handle(self: Arc<Self>, event: Event) {
279 match event.payload {
280 EventPayload::LogSinkRequested(LogSinkRequestedPayload {
281 component,
282 request_stream,
283 }) => {
284 debug!(identity:% = component; "LogSink requested.");
285 let container = self.get_log_container(component);
286 container.handle_log_sink(request_stream, self.scope_handle.clone());
287 }
288 _ => unreachable!("Archivist state just subscribes to log sink requested"),
289 }
290 }
291}
292
293pub struct LogsRepositoryState {
294 logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
295 inspect_node: inspect::Node,
296
297 logs_multiplexers: MultiplexerBroker,
299
300 interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
303
304 draining_klog: bool,
306
307 scope: Option<fasync::Scope>,
309
310 initial_interests: BTreeMap<UrlOrMoniker, Severity>,
312}
313
314impl LogsRepositoryState {
315 fn new(
316 parent: &fuchsia_inspect::Node,
317 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
318 scope: fasync::Scope,
319 ) -> Self {
320 Self {
321 inspect_node: parent.create_child("log_sources"),
322 logs_data_store: HashMap::new(),
323 logs_multiplexers: MultiplexerBroker::new(),
324 interest_registrations: BTreeMap::new(),
325 draining_klog: false,
326 initial_interests: initial_interests
327 .map(|ComponentInitialInterest { component, log_severity }| {
328 (component, log_severity)
329 })
330 .collect(),
331 scope: Some(scope),
332 }
333 }
334
335 pub fn get_log_container(
338 &mut self,
339 identity: Arc<ComponentIdentity>,
340 shared_buffer: &Arc<SharedBuffer>,
341 repo: &Arc<LogsRepository>,
342 ) -> Arc<LogsArtifactsContainer> {
343 match self.logs_data_store.get(&identity) {
344 None => {
345 let initial_interest = self.get_initial_interest(identity.as_ref());
346 let weak_repo = Arc::downgrade(repo);
347 let stats = LogStreamStats::default()
348 .with_inspect(&self.inspect_node, identity.moniker.to_string())
349 .expect("failed to attach component log stats");
350 stats.set_url(&identity.url);
351 let stats = Arc::new(stats);
352 let container = Arc::new(LogsArtifactsContainer::new(
353 Arc::clone(&identity),
354 self.interest_registrations.values().flat_map(|s| s.iter()),
355 initial_interest,
356 Arc::clone(&stats),
357 shared_buffer.new_container_buffer(Arc::clone(&identity), stats),
358 Some(Box::new(move |c| {
359 if let Some(repo) = weak_repo.upgrade() {
360 repo.on_container_inactive(&c.identity)
361 }
362 })),
363 ));
364 self.logs_data_store.insert(identity, Arc::clone(&container));
365 self.logs_multiplexers.send(&container);
366 container
367 }
368 Some(existing) => Arc::clone(existing),
369 }
370 }
371
372 fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
373 match (
374 self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())),
375 self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())),
376 ) {
377 (None, None) => None,
378 (Some(severity), None) | (None, Some(severity)) => Some(FidlSeverity::from(*severity)),
379 (Some(s1), Some(s2)) => Some(FidlSeverity::from(std::cmp::min(*s1, *s2))),
380 }
381 }
382
383 fn is_live(&self, identity: &ComponentIdentity) -> bool {
384 match self.logs_data_store.get(identity) {
385 Some(container) => container.is_active(),
386 None => false,
387 }
388 }
389
390 fn maybe_update_own_logs_interest(
393 &mut self,
394 selectors: &[LogInterestSelector],
395 clear_interest: bool,
396 ) {
397 let lowest_selector = selectors
398 .iter()
399 .filter(|selector| {
400 ARCHIVIST_MONIKER.matches_component_selector(&selector.selector).unwrap_or(false)
401 })
402 .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
403 if let Some(selector) = lowest_selector {
404 if clear_interest {
405 log::set_max_level(LevelFilter::Info);
406 } else {
407 log::set_max_level(
408 match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
409 FidlSeverity::Trace => LevelFilter::Trace,
410 FidlSeverity::Debug => LevelFilter::Debug,
411 FidlSeverity::Info => LevelFilter::Info,
412 FidlSeverity::Warn => LevelFilter::Warn,
413 FidlSeverity::Error => LevelFilter::Error,
414 FidlSeverity::Fatal => LevelFilter::Error,
417 FidlSeverity::__SourceBreaking { .. } => return,
418 },
419 );
420 }
421 }
422 }
423
424 fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
425 self.maybe_update_own_logs_interest(&selectors, false);
426 let previous_selectors =
427 self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
428 let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
430 for logs_data in self.logs_data_store.values() {
431 logs_data.update_interest(new_selectors.iter(), &previous_selectors);
432 }
433 }
434
435 pub fn finish_interest_connection(&mut self, connection_id: usize) {
436 let selectors = self.interest_registrations.remove(&connection_id);
437 if let Some(selectors) = selectors {
438 self.maybe_update_own_logs_interest(&selectors, true);
439 for logs_data in self.logs_data_store.values() {
440 logs_data.reset_interest(&selectors);
441 }
442 }
443 }
444
445 pub fn remove(&mut self, identity: &ComponentIdentity) {
446 self.logs_data_store.remove(identity);
447 }
448}
449
450type LiveIteratorsMap = HashMap<usize, (StreamMode, Box<dyn MultiplexerHandleAction + Send>)>;
451
452pub struct MultiplexerBroker {
454 live_iterators: Arc<Mutex<LiveIteratorsMap>>,
455 cleanup_sender: mpsc::UnboundedSender<usize>,
456 _live_iterators_cleanup_task: fasync::Task<()>,
457}
458
459impl MultiplexerBroker {
460 fn new() -> Self {
461 let (cleanup_sender, mut receiver) = mpsc::unbounded();
462 let live_iterators = Arc::new(Mutex::new(HashMap::new()));
463 let live_iterators_clone = Arc::clone(&live_iterators);
464 Self {
465 live_iterators,
466 cleanup_sender,
467 _live_iterators_cleanup_task: fasync::Task::spawn(async move {
468 while let Some(id) = receiver.next().await {
469 live_iterators_clone.lock().remove(&id);
470 }
471 }),
472 }
473 }
474
475 fn cleanup_sender(&self) -> mpsc::UnboundedSender<usize> {
476 self.cleanup_sender.clone()
477 }
478
479 fn add(&mut self, mode: StreamMode, recipient: Box<dyn MultiplexerHandleAction + Send>) {
482 match mode {
483 StreamMode::Snapshot => recipient.close(),
485 StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => {
486 self.live_iterators.lock().insert(recipient.multiplexer_id(), (mode, recipient));
487 }
488 }
489 }
490
491 pub fn send(&mut self, container: &Arc<LogsArtifactsContainer>) {
494 self.live_iterators
495 .lock()
496 .retain(|_, (mode, recipient)| recipient.send_cursor_from(*mode, container));
497 }
498
499 fn terminate(&mut self) {
501 for (_, (_, recipient)) in self.live_iterators.lock().drain() {
502 recipient.close();
503 }
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510 use crate::logs::testing::make_message;
511 use fidl_fuchsia_logger::LogSinkMarker;
512
513 use moniker::ExtendedMoniker;
514 use selectors::FastError;
515 use std::time::Duration;
516
517 #[fuchsia::test]
518 async fn data_repo_filters_logs_by_selectors() {
519 let repo = LogsRepository::for_test(fasync::Scope::new());
520 let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
521 ExtendedMoniker::parse_str("./foo").unwrap(),
522 "fuchsia-pkg://foo",
523 )));
524 let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
525 ExtendedMoniker::parse_str("./bar").unwrap(),
526 "fuchsia-pkg://bar",
527 )));
528
529 foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
530 bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
531 foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
532
533 let stream = repo.logs_cursor(StreamMode::Snapshot, None, ftrace::Id::random());
534
535 let results =
536 stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
537 assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
538
539 let filtered_stream = repo.logs_cursor(
540 StreamMode::Snapshot,
541 Some(vec![selectors::parse_selector::<FastError>("foo:root").unwrap()]),
542 ftrace::Id::random(),
543 );
544
545 let results =
546 filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
547 assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
548 }
549
550 #[fuchsia::test]
551 async fn multiplexer_broker_cleanup() {
552 let repo = LogsRepository::for_test(fasync::Scope::new());
553 let stream =
554 repo.logs_cursor(StreamMode::SnapshotThenSubscribe, None, ftrace::Id::random());
555
556 assert_eq!(repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().len(), 1);
557
558 drop(stream);
560 loop {
561 fasync::Timer::new(Duration::from_millis(100)).await;
562 if repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().is_empty() {
563 break;
564 }
565 }
566 }
567
568 #[fuchsia::test]
569 async fn data_repo_correctly_sets_initial_interests() {
570 let repo = LogsRepository::new(
571 100000,
572 [
573 ComponentInitialInterest {
574 component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
575 log_severity: Severity::Info,
576 },
577 ComponentInitialInterest {
578 component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
579 log_severity: Severity::Warn,
580 },
581 ComponentInitialInterest {
582 component: UrlOrMoniker::Moniker("core/bar".try_into().unwrap()),
583 log_severity: Severity::Error,
584 },
585 ComponentInitialInterest {
586 component: UrlOrMoniker::Moniker("core/foo".try_into().unwrap()),
587 log_severity: Severity::Debug,
588 },
589 ]
590 .into_iter(),
591 &fuchsia_inspect::Node::default(),
592 fasync::Scope::new(),
593 );
594
595 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
597 ExtendedMoniker::parse_str("core/foo").unwrap(),
598 "fuchsia-pkg://foo",
599 )));
600 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
601 .await;
602
603 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
605 ExtendedMoniker::parse_str("core/baz").unwrap(),
606 "fuchsia-pkg://baz",
607 )));
608 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
609 .await;
610
611 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
614 ExtendedMoniker::parse_str("core/bar").unwrap(),
615 "fuchsia-pkg://bar",
616 )));
617 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
618 .await;
619
620 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
623 ExtendedMoniker::parse_str("core/quux").unwrap(),
624 "fuchsia-pkg://quux",
625 )));
626 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
627 }
628
629 async fn expect_initial_interest(
630 expected_severity: Option<FidlSeverity>,
631 container: Arc<LogsArtifactsContainer>,
632 scope: fasync::ScopeHandle,
633 ) {
634 let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
635 container.handle_log_sink(stream, scope);
636 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
637 assert_eq!(initial_interest.min_severity, expected_severity);
638 }
639}