1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::{CursorItem, LogsArtifactsContainer};
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::multiplex::{Multiplexer, MultiplexerHandleAction};
11use crate::logs::shared_buffer::SharedBuffer;
12use crate::logs::stats::LogStreamStats;
13use anyhow::format_err;
14use diagnostics_data::{LogsData, Severity};
15use fidl_fuchsia_diagnostics::{LogInterestSelector, Selector, StreamMode, StringSelector};
16use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
17use flyweights::FlyStr;
18use fuchsia_inspect_derive::WithInspect;
19use fuchsia_sync::Mutex;
20use fuchsia_url::AbsoluteComponentUrl;
21use fuchsia_url::boot_url::BootUrl;
22use futures::channel::mpsc;
23use futures::prelude::*;
24use log::{LevelFilter, debug, error};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, OnceLock, Weak};
31use {fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace};
32
33#[derive(Ord, PartialOrd, Eq, PartialEq)]
35pub struct ComponentInitialInterest {
36 component: UrlOrMoniker,
38 log_severity: Severity,
40}
41impl FromStr for ComponentInitialInterest {
44 type Err = anyhow::Error;
45 fn from_str(s: &str) -> Result<Self, Self::Err> {
46 let mut split = s.rsplitn(2, ":");
47 match (split.next(), split.next()) {
48 (Some(severity), Some(url_or_moniker)) => {
49 let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
50 return Err(format_err!("invalid url or moniker"));
51 };
52 let Ok(severity) = Severity::from_str(severity) else {
53 return Err(format_err!("invalid severity"));
54 };
55 Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
56 }
57 _ => Err(format_err!("invalid interest")),
58 }
59 }
60}
61
62#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
63pub enum UrlOrMoniker {
64 Url(FlyStr),
66 Moniker(ExtendedMoniker),
68 Partial(FlyStr),
70}
71
72impl FromStr for UrlOrMoniker {
73 type Err = ();
74 fn from_str(s: &str) -> Result<Self, Self::Err> {
75 if AbsoluteComponentUrl::from_str(s).is_ok() || BootUrl::parse(s).is_ok() {
76 Ok(UrlOrMoniker::Url(s.into()))
77 } else if s.starts_with("/") {
78 if let Ok(moniker) = Moniker::from_str(s) {
79 Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
80 } else {
81 Err(())
82 }
83 } else {
84 Ok(UrlOrMoniker::Partial(s.into()))
85 }
86 }
87}
88
89pub const STATIC_CONNECTION_ID: usize = 0;
91static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
92
93pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
95
96pub const ARCHIVIST_TAG: u64 = 0;
101
102pub struct LogsRepository {
105 mutable_state: Mutex<LogsRepositoryState>,
106 shared_buffer: Arc<SharedBuffer>,
107 scope_handle: fasync::ScopeHandle,
108}
109
110impl LogsRepository {
111 pub fn new(
112 ring_buffer: ring_buffer::Reader,
113 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
114 parent: &fuchsia_inspect::Node,
115 scope: fasync::Scope,
116 ) -> Arc<Self> {
117 let scope_handle = scope.to_handle();
118 Arc::new_cyclic(|me: &Weak<LogsRepository>| {
119 let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
120 let me_clone = Weak::clone(me);
121 let shared_buffer = SharedBuffer::new(
122 ring_buffer,
123 Box::new(move |identity| {
124 if let Some(this) = me_clone.upgrade() {
125 this.on_container_inactive(&identity);
126 }
127 }),
128 Default::default(),
129 );
130 if let Some(m) = ARCHIVIST_MONIKER.get() {
131 let archivist_container = mutable_state.create_log_container(
132 Arc::new(ComponentIdentity::new(
133 ExtendedMoniker::ComponentInstance(m.clone()),
134 "fuchsia-pkg://UNKNOWN",
135 )),
136 &shared_buffer,
137 Weak::clone(me),
138 );
139 assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
141 }
142 LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
143 })
144 }
145
146 pub async fn flush(&self) {
147 self.shared_buffer.flush().await;
148 }
149
150 pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
153 where
154 K: DebugLog + Send + Sync + 'static,
155 {
156 let mut mutable_state = self.mutable_state.lock();
157
158 if mutable_state.draining_klog {
161 return;
162 }
163 mutable_state.draining_klog = true;
164
165 let container =
166 mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
167 let Some(ref scope) = mutable_state.scope else {
168 return;
169 };
170 scope.spawn(async move {
171 debug!("Draining debuglog.");
172 let mut kernel_logger = DebugLogBridge::create(klog_reader);
173 let mut messages = match kernel_logger.existing_logs() {
174 Ok(messages) => messages,
175 Err(e) => {
176 error!(e:%; "failed to read from kernel log, important logs may be missing");
177 return;
178 }
179 };
180 messages.sort_by_key(|m| m.timestamp());
181 for message in messages {
182 container.ingest_message(message);
183 }
184
185 let res = kernel_logger
186 .listen()
187 .try_for_each(|message| async {
188 container.ingest_message(message);
189 Ok(())
190 })
191 .await;
192 if let Err(e) = res {
193 error!(e:%; "failed to drain kernel log, important logs may be missing");
194 }
195 });
196 }
197
198 pub fn logs_cursor_raw(
199 &self,
200 mode: StreamMode,
201 selectors: Option<Vec<Selector>>,
202 parent_trace_id: ftrace::Id,
203 ) -> impl Stream<Item = CursorItem> + Send {
204 let mut repo = self.mutable_state.lock();
205 let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
206 let cursor = c.cursor_raw(mode);
207 (Arc::clone(identity), cursor)
208 });
209 let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
210 repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
211 merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
212 merged
213 }
214
215 pub fn logs_cursor(
216 &self,
217 mode: StreamMode,
218 selectors: Option<Vec<Selector>>,
219 parent_trace_id: ftrace::Id,
220 ) -> impl Stream<Item = Arc<LogsData>> + Send + 'static {
221 let mut repo = self.mutable_state.lock();
222 let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
223 let cursor = c.cursor(mode, parent_trace_id);
224 (Arc::clone(identity), cursor)
225 });
226 let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
227 repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
228 merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
229 merged
230 }
231
232 pub fn get_log_container(
233 self: &Arc<Self>,
234 identity: Arc<ComponentIdentity>,
235 ) -> Arc<LogsArtifactsContainer> {
236 self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
237 }
238
239 pub async fn wait_for_termination(&self) {
242 let Some(scope) = self.mutable_state.lock().scope.take() else {
243 error!("Attempted to terminate twice");
244 return;
245 };
246 scope.join().await;
247 debug!("Log ingestion stopped.");
249 self.shared_buffer.terminate().await;
252 let mut repo = self.mutable_state.lock();
253 for container in repo.logs_data_store.values() {
254 container.terminate();
255 }
256 repo.logs_multiplexers.terminate();
257 }
258
259 pub fn stop_accepting_new_log_sinks(&self) {
262 self.scope_handle.close();
263 }
264
265 pub fn new_interest_connection(&self) -> usize {
268 INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
269 }
270
271 pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
273 self.mutable_state.lock().update_logs_interest(connection_id, selectors);
274 }
275
276 pub fn finish_interest_connection(&self, connection_id: usize) {
278 self.mutable_state.lock().finish_interest_connection(connection_id);
279 }
280
281 fn on_container_inactive(&self, identity: &ComponentIdentity) {
282 let mut repo = self.mutable_state.lock();
283 if !repo.is_live(identity) {
284 repo.remove(identity);
285 }
286 }
287}
288
289#[cfg(test)]
290impl LogsRepository {
291 pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
292 use crate::logs::shared_buffer::create_ring_buffer;
293
294 LogsRepository::new(
295 create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
296 std::iter::empty(),
297 &Default::default(),
298 scope,
299 )
300 }
301}
302
303impl EventConsumer for LogsRepository {
304 fn handle(self: Arc<Self>, event: Event) {
305 match event.payload {
306 EventPayload::LogSinkRequested(LogSinkRequestedPayload {
307 component,
308 request_stream,
309 }) => {
310 debug!(identity:% = component; "LogSink requested.");
311 let container = self.get_log_container(component);
312 container.handle_log_sink(request_stream, self.scope_handle.clone());
313 }
314 _ => unreachable!("Archivist state just subscribes to log sink requested"),
315 }
316 }
317}
318
319pub struct LogsRepositoryState {
320 logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
321 inspect_node: inspect::Node,
322
323 logs_multiplexers: MultiplexerBroker,
325
326 interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
329
330 draining_klog: bool,
332
333 scope: Option<fasync::Scope>,
335
336 initial_interests: BTreeMap<UrlOrMoniker, Severity>,
338}
339
340impl LogsRepositoryState {
341 fn new(
342 parent: &fuchsia_inspect::Node,
343 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
344 scope: fasync::Scope,
345 ) -> Self {
346 Self {
347 inspect_node: parent.create_child("log_sources"),
348 logs_data_store: HashMap::new(),
349 logs_multiplexers: MultiplexerBroker::new(),
350 interest_registrations: BTreeMap::new(),
351 draining_klog: false,
352 initial_interests: initial_interests
353 .map(|ComponentInitialInterest { component, log_severity }| {
354 (component, log_severity)
355 })
356 .collect(),
357 scope: Some(scope),
358 }
359 }
360
361 pub fn get_log_container(
364 &mut self,
365 identity: Arc<ComponentIdentity>,
366 shared_buffer: &Arc<SharedBuffer>,
367 repo: &Arc<LogsRepository>,
368 ) -> Arc<LogsArtifactsContainer> {
369 match self.logs_data_store.get(&identity) {
370 None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
371 Some(existing) => Arc::clone(existing),
372 }
373 }
374
375 fn create_log_container(
376 &mut self,
377 identity: Arc<ComponentIdentity>,
378 shared_buffer: &Arc<SharedBuffer>,
379 repo: Weak<LogsRepository>,
380 ) -> Arc<LogsArtifactsContainer> {
381 let initial_interest = self.get_initial_interest(identity.as_ref());
382 let stats = LogStreamStats::default()
383 .with_inspect(&self.inspect_node, identity.moniker.to_string())
384 .expect("failed to attach component log stats");
385 stats.set_url(&identity.url);
386 let stats = Arc::new(stats);
387 let container = Arc::new(LogsArtifactsContainer::new(
388 Arc::clone(&identity),
389 self.interest_registrations.values().flat_map(|s| s.iter()),
390 initial_interest,
391 Arc::clone(&stats),
392 shared_buffer.new_container_buffer(Arc::clone(&identity), stats),
393 Some(Box::new(move |c| {
394 if let Some(repo) = repo.upgrade() {
395 repo.on_container_inactive(&c.identity)
396 }
397 })),
398 ));
399 self.logs_data_store.insert(identity, Arc::clone(&container));
400 self.logs_multiplexers.send(&container);
401 container
402 }
403
404 fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
405 let exact_url_severity =
406 self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
407 let exact_moniker_severity =
408 self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
409
410 let partial_severity = self
411 .initial_interests
412 .iter()
413 .filter_map(|(uom, severity)| match uom {
414 UrlOrMoniker::Partial(p) => {
415 if identity.url.contains(p.as_str())
416 || identity.moniker.to_string().contains(p.as_str())
417 {
418 Some(*severity)
419 } else {
420 None
421 }
422 }
423 _ => None,
424 })
425 .min();
426
427 [exact_url_severity, exact_moniker_severity, partial_severity]
428 .into_iter()
429 .flatten()
430 .min()
431 .map(FidlSeverity::from)
432 }
433
434 fn is_live(&self, identity: &ComponentIdentity) -> bool {
435 match self.logs_data_store.get(identity) {
436 Some(container) => container.is_active(),
437 None => false,
438 }
439 }
440
441 fn maybe_update_own_logs_interest(
444 &mut self,
445 selectors: &[LogInterestSelector],
446 clear_interest: bool,
447 ) {
448 let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
449 let lowest_selector = selectors
450 .iter()
451 .filter(|selector| {
452 if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
463 matches!(
464 &s[..],
465 [StringSelector::StringPattern(s)] if s == "**" || s == "*"
466 )
467 }) {
468 return false;
469 }
470
471 moniker.matches_component_selector(&selector.selector).unwrap_or(false)
472 })
473 .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
474 if let Some(selector) = lowest_selector {
475 if clear_interest {
476 log::set_max_level(LevelFilter::Info);
477 } else {
478 log::set_max_level(
479 match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
480 FidlSeverity::Trace => LevelFilter::Trace,
481 FidlSeverity::Debug => LevelFilter::Debug,
482 FidlSeverity::Info => LevelFilter::Info,
483 FidlSeverity::Warn => LevelFilter::Warn,
484 FidlSeverity::Error => LevelFilter::Error,
485 FidlSeverity::Fatal => LevelFilter::Error,
488 FidlSeverity::__SourceBreaking { .. } => return,
489 },
490 );
491 }
492 }
493 }
494
495 fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
496 self.maybe_update_own_logs_interest(&selectors, false);
497 let previous_selectors =
498 self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
499 let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
501 for logs_data in self.logs_data_store.values() {
502 logs_data.update_interest(new_selectors.iter(), &previous_selectors);
503 }
504 }
505
506 pub fn finish_interest_connection(&mut self, connection_id: usize) {
507 let selectors = self.interest_registrations.remove(&connection_id);
508 if let Some(selectors) = selectors {
509 self.maybe_update_own_logs_interest(&selectors, true);
510 for logs_data in self.logs_data_store.values() {
511 logs_data.reset_interest(&selectors);
512 }
513 }
514 }
515
516 pub fn remove(&mut self, identity: &ComponentIdentity) {
517 self.logs_data_store.remove(identity);
518 }
519}
520
521type LiveIteratorsMap = HashMap<usize, (StreamMode, Box<dyn MultiplexerHandleAction + Send>)>;
522
523pub struct MultiplexerBroker {
525 live_iterators: Arc<Mutex<LiveIteratorsMap>>,
526 cleanup_sender: mpsc::UnboundedSender<usize>,
527 _live_iterators_cleanup_task: fasync::Task<()>,
528}
529
530impl MultiplexerBroker {
531 fn new() -> Self {
532 let (cleanup_sender, mut receiver) = mpsc::unbounded();
533 let live_iterators = Arc::new(Mutex::new(HashMap::new()));
534 let live_iterators_clone = Arc::clone(&live_iterators);
535 Self {
536 live_iterators,
537 cleanup_sender,
538 _live_iterators_cleanup_task: fasync::Task::spawn(async move {
539 while let Some(id) = receiver.next().await {
540 live_iterators_clone.lock().remove(&id);
541 }
542 }),
543 }
544 }
545
546 fn cleanup_sender(&self) -> mpsc::UnboundedSender<usize> {
547 self.cleanup_sender.clone()
548 }
549
550 fn add(&mut self, mode: StreamMode, recipient: Box<dyn MultiplexerHandleAction + Send>) {
553 match mode {
554 StreamMode::Snapshot => recipient.close(),
556 StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => {
557 self.live_iterators.lock().insert(recipient.multiplexer_id(), (mode, recipient));
558 }
559 }
560 }
561
562 pub fn send(&mut self, container: &Arc<LogsArtifactsContainer>) {
565 self.live_iterators
566 .lock()
567 .retain(|_, (mode, recipient)| recipient.send_cursor_from(*mode, container));
568 }
569
570 fn terminate(&mut self) {
572 for (_, (_, recipient)) in self.live_iterators.lock().drain() {
573 recipient.close();
574 }
575 }
576}
577
578#[cfg(test)]
579mod tests {
580 use super::*;
581 use crate::logs::shared_buffer::create_ring_buffer;
582 use crate::logs::testing::make_message;
583 use fidl_fuchsia_logger::LogSinkMarker;
584
585 use moniker::ExtendedMoniker;
586 use selectors::FastError;
587 use std::time::Duration;
588
589 #[fuchsia::test]
590 async fn data_repo_filters_logs_by_selectors() {
591 let repo = LogsRepository::for_test(fasync::Scope::new());
592 let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
593 ExtendedMoniker::parse_str("./foo").unwrap(),
594 "fuchsia-pkg://foo",
595 )));
596 let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
597 ExtendedMoniker::parse_str("./bar").unwrap(),
598 "fuchsia-pkg://bar",
599 )));
600
601 foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
602 bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
603 foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
604
605 let stream = repo.logs_cursor(StreamMode::Snapshot, None, ftrace::Id::random());
606
607 let results =
608 stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
609 assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
610
611 let filtered_stream = repo.logs_cursor(
612 StreamMode::Snapshot,
613 Some(vec![selectors::parse_selector::<FastError>("foo:root").unwrap()]),
614 ftrace::Id::random(),
615 );
616
617 let results =
618 filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
619 assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
620 }
621
622 #[fuchsia::test]
623 async fn multiplexer_broker_cleanup() {
624 let repo = LogsRepository::for_test(fasync::Scope::new());
625 let stream =
626 repo.logs_cursor(StreamMode::SnapshotThenSubscribe, None, ftrace::Id::random());
627
628 assert_eq!(repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().len(), 1);
629
630 drop(stream);
632 loop {
633 fasync::Timer::new(Duration::from_millis(100)).await;
634 if repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().is_empty() {
635 break;
636 }
637 }
638 }
639
640 #[fuchsia::test]
641 async fn data_repo_correctly_sets_initial_interests() {
642 let repo = LogsRepository::new(
643 create_ring_buffer(100000),
644 [
645 ComponentInitialInterest {
646 component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
647 log_severity: Severity::Info,
648 },
649 ComponentInitialInterest {
650 component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
651 log_severity: Severity::Warn,
652 },
653 ComponentInitialInterest {
654 component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
655 log_severity: Severity::Error,
656 },
657 ComponentInitialInterest {
658 component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
659 log_severity: Severity::Debug,
660 },
661 ]
662 .into_iter(),
663 &fuchsia_inspect::Node::default(),
664 fasync::Scope::new(),
665 );
666
667 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
669 ExtendedMoniker::parse_str("core/foo").unwrap(),
670 "fuchsia-pkg://foo",
671 )));
672 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
673 .await;
674
675 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
677 ExtendedMoniker::parse_str("core/baz").unwrap(),
678 "fuchsia-pkg://baz",
679 )));
680 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
681 .await;
682
683 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
686 ExtendedMoniker::parse_str("core/bar").unwrap(),
687 "fuchsia-pkg://bar",
688 )));
689 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
690 .await;
691
692 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
695 ExtendedMoniker::parse_str("core/quux").unwrap(),
696 "fuchsia-pkg://quux",
697 )));
698 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
699 }
700
701 #[fuchsia::test]
702 async fn data_repo_correctly_handles_partial_matching() {
703 let repo = LogsRepository::new(
704 create_ring_buffer(100000),
705 [
706 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
707 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
708 "/core/bust:DEBUG".parse(),
709 "core/bar:ERROR".parse(),
710 "foo:DEBUG".parse(),
711 "both:TRACE".parse(),
712 ]
713 .into_iter()
714 .map(Result::unwrap),
715 &fuchsia_inspect::Node::default(),
716 fasync::Scope::new(),
717 );
718
719 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
721 ExtendedMoniker::parse_str("core/foo").unwrap(),
722 "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
723 )));
724 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
725 .await;
726
727 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
729 ExtendedMoniker::parse_str("core/not-foo").unwrap(),
730 "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
731 )));
732 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
733 .await;
734
735 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
737 ExtendedMoniker::parse_str("core/baz").unwrap(),
738 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
739 )));
740 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
741 .await;
742
743 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
746 ExtendedMoniker::parse_str("core/bar").unwrap(),
747 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
748 )));
749 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
750 .await;
751
752 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
755 ExtendedMoniker::parse_str("core/quux").unwrap(),
756 "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
757 )));
758 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
759
760 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
762 ExtendedMoniker::parse_str("core/both").unwrap(),
763 "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
764 )));
765 expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
766 .await;
767
768 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
770 ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
771 "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
772 )));
773 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
774 }
775
776 async fn expect_initial_interest(
777 expected_severity: Option<FidlSeverity>,
778 container: Arc<LogsArtifactsContainer>,
779 scope: fasync::ScopeHandle,
780 ) {
781 let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
782 container.handle_log_sink(stream, scope);
783 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
784 assert_eq!(initial_interest.min_severity, expected_severity);
785 }
786}