1use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::{CursorItem, LogsArtifactsContainer};
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::multiplex::{Multiplexer, MultiplexerHandleAction};
11use crate::logs::shared_buffer::SharedBuffer;
12use crate::logs::stats::LogStreamStats;
13use anyhow::format_err;
14use diagnostics_data::{LogsData, Severity};
15use fidl_fuchsia_diagnostics::{LogInterestSelector, Selector, StreamMode};
16use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
17use flyweights::FlyStr;
18use fuchsia_inspect_derive::WithInspect;
19use fuchsia_sync::Mutex;
20use fuchsia_url::boot_url::BootUrl;
21use fuchsia_url::AbsoluteComponentUrl;
22use futures::channel::mpsc;
23use futures::prelude::*;
24use log::{debug, error, LevelFilter};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, LazyLock, Weak};
31use {fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace};
32
33#[derive(Ord, PartialOrd, Eq, PartialEq)]
35pub struct ComponentInitialInterest {
36 component: UrlOrMoniker,
38 log_severity: Severity,
40}
41impl FromStr for ComponentInitialInterest {
44 type Err = anyhow::Error;
45 fn from_str(s: &str) -> Result<Self, Self::Err> {
46 let mut split = s.rsplitn(2, ":");
47 match (split.next(), split.next()) {
48 (Some(severity), Some(url_or_moniker)) => {
49 let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
50 return Err(format_err!("invalid url or moniker"));
51 };
52 let Ok(severity) = Severity::from_str(severity) else {
53 return Err(format_err!("invalid severity"));
54 };
55 Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
56 }
57 _ => Err(format_err!("invalid interest")),
58 }
59 }
60}
61
62#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
63pub enum UrlOrMoniker {
64 Url(FlyStr),
66 Moniker(ExtendedMoniker),
68 Partial(FlyStr),
70}
71
72impl FromStr for UrlOrMoniker {
73 type Err = ();
74 fn from_str(s: &str) -> Result<Self, Self::Err> {
75 if AbsoluteComponentUrl::from_str(s).is_ok() || BootUrl::parse(s).is_ok() {
76 Ok(UrlOrMoniker::Url(s.into()))
77 } else if s.starts_with("/") {
78 if let Ok(moniker) = Moniker::from_str(s) {
79 Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
80 } else {
81 Err(())
82 }
83 } else {
84 Ok(UrlOrMoniker::Partial(s.into()))
85 }
86 }
87}
88
89pub const STATIC_CONNECTION_ID: usize = 0;
91static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
92static ARCHIVIST_MONIKER: LazyLock<Moniker> =
93 LazyLock::new(|| Moniker::parse_str("bootstrap/archivist").unwrap());
94
95pub struct LogsRepository {
98 mutable_state: Mutex<LogsRepositoryState>,
99 shared_buffer: Arc<SharedBuffer>,
100 scope_handle: fasync::ScopeHandle,
101}
102
103impl LogsRepository {
104 pub fn new(
105 logs_max_cached_original_bytes: u64,
106 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
107 parent: &fuchsia_inspect::Node,
108 scope: fasync::Scope,
109 ) -> Arc<Self> {
110 let scope_handle = scope.to_handle();
111 Arc::new_cyclic(|me: &Weak<LogsRepository>| {
112 let me = Weak::clone(me);
113 LogsRepository {
114 scope_handle,
115 mutable_state: Mutex::new(LogsRepositoryState::new(
116 parent,
117 initial_interests,
118 scope,
119 )),
120 shared_buffer: SharedBuffer::new(
121 logs_max_cached_original_bytes as usize,
122 Box::new(move |identity| {
123 if let Some(this) = me.upgrade() {
124 this.on_container_inactive(&identity);
125 }
126 }),
127 ),
128 }
129 })
130 }
131
132 pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
135 where
136 K: DebugLog + Send + Sync + 'static,
137 {
138 let mut mutable_state = self.mutable_state.lock();
139
140 if mutable_state.draining_klog {
143 return;
144 }
145 mutable_state.draining_klog = true;
146
147 let container =
148 mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
149 let Some(ref scope) = mutable_state.scope else {
150 return;
151 };
152 scope.spawn(async move {
153 debug!("Draining debuglog.");
154 let mut kernel_logger = DebugLogBridge::create(klog_reader);
155 let mut messages = match kernel_logger.existing_logs() {
156 Ok(messages) => messages,
157 Err(e) => {
158 error!(e:%; "failed to read from kernel log, important logs may be missing");
159 return;
160 }
161 };
162 messages.sort_by_key(|m| m.timestamp());
163 for message in messages {
164 container.ingest_message(message);
165 }
166
167 let res = kernel_logger
168 .listen()
169 .try_for_each(|message| async {
170 container.ingest_message(message);
171 Ok(())
172 })
173 .await;
174 if let Err(e) = res {
175 error!(e:%; "failed to drain kernel log, important logs may be missing");
176 }
177 });
178 }
179
180 pub fn logs_cursor_raw(
181 &self,
182 mode: StreamMode,
183 selectors: Option<Vec<Selector>>,
184 parent_trace_id: ftrace::Id,
185 ) -> impl Stream<Item = CursorItem> + Send {
186 let mut repo = self.mutable_state.lock();
187 let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
188 let cursor = c.cursor_raw(mode);
189 (Arc::clone(identity), cursor)
190 });
191 let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
192 repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
193 merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
194 merged
195 }
196
197 pub fn logs_cursor(
198 &self,
199 mode: StreamMode,
200 selectors: Option<Vec<Selector>>,
201 parent_trace_id: ftrace::Id,
202 ) -> impl Stream<Item = Arc<LogsData>> + Send + 'static {
203 let mut repo = self.mutable_state.lock();
204 let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
205 let cursor = c.cursor(mode, parent_trace_id);
206 (Arc::clone(identity), cursor)
207 });
208 let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
209 repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
210 merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
211 merged
212 }
213
214 pub fn get_log_container(
215 self: &Arc<Self>,
216 identity: Arc<ComponentIdentity>,
217 ) -> Arc<LogsArtifactsContainer> {
218 self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
219 }
220
221 pub async fn wait_for_termination(&self) {
224 let Some(scope) = self.mutable_state.lock().scope.take() else {
225 error!("Attempted to terminate twice");
226 return;
227 };
228 scope.join().await;
229 debug!("Log ingestion stopped.");
231 self.shared_buffer.terminate().await;
234 let mut repo = self.mutable_state.lock();
235 for container in repo.logs_data_store.values() {
236 container.terminate();
237 }
238 repo.logs_multiplexers.terminate();
239 }
240
241 pub fn stop_accepting_new_log_sinks(&self) {
244 self.scope_handle.close();
245 }
246
247 pub fn new_interest_connection(&self) -> usize {
250 INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
251 }
252
253 pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
255 self.mutable_state.lock().update_logs_interest(connection_id, selectors);
256 }
257
258 pub fn finish_interest_connection(&self, connection_id: usize) {
260 self.mutable_state.lock().finish_interest_connection(connection_id);
261 }
262
263 fn on_container_inactive(&self, identity: &ComponentIdentity) {
264 let mut repo = self.mutable_state.lock();
265 if !repo.is_live(identity) {
266 repo.remove(identity);
267 }
268 }
269}
270
271#[cfg(test)]
272impl LogsRepository {
273 pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
274 LogsRepository::new(
275 crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES,
276 std::iter::empty(),
277 &Default::default(),
278 scope,
279 )
280 }
281}
282
283impl EventConsumer for LogsRepository {
284 fn handle(self: Arc<Self>, event: Event) {
285 match event.payload {
286 EventPayload::LogSinkRequested(LogSinkRequestedPayload {
287 component,
288 request_stream,
289 }) => {
290 debug!(identity:% = component; "LogSink requested.");
291 let container = self.get_log_container(component);
292 container.handle_log_sink(request_stream, self.scope_handle.clone());
293 }
294 _ => unreachable!("Archivist state just subscribes to log sink requested"),
295 }
296 }
297}
298
299pub struct LogsRepositoryState {
300 logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
301 inspect_node: inspect::Node,
302
303 logs_multiplexers: MultiplexerBroker,
305
306 interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
309
310 draining_klog: bool,
312
313 scope: Option<fasync::Scope>,
315
316 initial_interests: BTreeMap<UrlOrMoniker, Severity>,
318}
319
320impl LogsRepositoryState {
321 fn new(
322 parent: &fuchsia_inspect::Node,
323 initial_interests: impl Iterator<Item = ComponentInitialInterest>,
324 scope: fasync::Scope,
325 ) -> Self {
326 Self {
327 inspect_node: parent.create_child("log_sources"),
328 logs_data_store: HashMap::new(),
329 logs_multiplexers: MultiplexerBroker::new(),
330 interest_registrations: BTreeMap::new(),
331 draining_klog: false,
332 initial_interests: initial_interests
333 .map(|ComponentInitialInterest { component, log_severity }| {
334 (component, log_severity)
335 })
336 .collect(),
337 scope: Some(scope),
338 }
339 }
340
341 pub fn get_log_container(
344 &mut self,
345 identity: Arc<ComponentIdentity>,
346 shared_buffer: &Arc<SharedBuffer>,
347 repo: &Arc<LogsRepository>,
348 ) -> Arc<LogsArtifactsContainer> {
349 match self.logs_data_store.get(&identity) {
350 None => {
351 let initial_interest = self.get_initial_interest(identity.as_ref());
352 let weak_repo = Arc::downgrade(repo);
353 let stats = LogStreamStats::default()
354 .with_inspect(&self.inspect_node, identity.moniker.to_string())
355 .expect("failed to attach component log stats");
356 stats.set_url(&identity.url);
357 let stats = Arc::new(stats);
358 let container = Arc::new(LogsArtifactsContainer::new(
359 Arc::clone(&identity),
360 self.interest_registrations.values().flat_map(|s| s.iter()),
361 initial_interest,
362 Arc::clone(&stats),
363 shared_buffer.new_container_buffer(Arc::clone(&identity), stats),
364 Some(Box::new(move |c| {
365 if let Some(repo) = weak_repo.upgrade() {
366 repo.on_container_inactive(&c.identity)
367 }
368 })),
369 ));
370 self.logs_data_store.insert(identity, Arc::clone(&container));
371 self.logs_multiplexers.send(&container);
372 container
373 }
374 Some(existing) => Arc::clone(existing),
375 }
376 }
377
378 fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
379 let exact_url_severity =
380 self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
381 let exact_moniker_severity =
382 self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
383
384 let partial_severity = self
385 .initial_interests
386 .iter()
387 .filter_map(|(uom, severity)| match uom {
388 UrlOrMoniker::Partial(p) => {
389 if identity.url.contains(p.as_str())
390 || identity.moniker.to_string().contains(p.as_str())
391 {
392 Some(*severity)
393 } else {
394 None
395 }
396 }
397 _ => None,
398 })
399 .min();
400
401 [exact_url_severity, exact_moniker_severity, partial_severity]
402 .into_iter()
403 .flatten()
404 .min()
405 .map(FidlSeverity::from)
406 }
407
408 fn is_live(&self, identity: &ComponentIdentity) -> bool {
409 match self.logs_data_store.get(identity) {
410 Some(container) => container.is_active(),
411 None => false,
412 }
413 }
414
415 fn maybe_update_own_logs_interest(
418 &mut self,
419 selectors: &[LogInterestSelector],
420 clear_interest: bool,
421 ) {
422 let lowest_selector = selectors
423 .iter()
424 .filter(|selector| {
425 ARCHIVIST_MONIKER.matches_component_selector(&selector.selector).unwrap_or(false)
426 })
427 .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
428 if let Some(selector) = lowest_selector {
429 if clear_interest {
430 log::set_max_level(LevelFilter::Info);
431 } else {
432 log::set_max_level(
433 match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
434 FidlSeverity::Trace => LevelFilter::Trace,
435 FidlSeverity::Debug => LevelFilter::Debug,
436 FidlSeverity::Info => LevelFilter::Info,
437 FidlSeverity::Warn => LevelFilter::Warn,
438 FidlSeverity::Error => LevelFilter::Error,
439 FidlSeverity::Fatal => LevelFilter::Error,
442 FidlSeverity::__SourceBreaking { .. } => return,
443 },
444 );
445 }
446 }
447 }
448
449 fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
450 self.maybe_update_own_logs_interest(&selectors, false);
451 let previous_selectors =
452 self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
453 let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
455 for logs_data in self.logs_data_store.values() {
456 logs_data.update_interest(new_selectors.iter(), &previous_selectors);
457 }
458 }
459
460 pub fn finish_interest_connection(&mut self, connection_id: usize) {
461 let selectors = self.interest_registrations.remove(&connection_id);
462 if let Some(selectors) = selectors {
463 self.maybe_update_own_logs_interest(&selectors, true);
464 for logs_data in self.logs_data_store.values() {
465 logs_data.reset_interest(&selectors);
466 }
467 }
468 }
469
470 pub fn remove(&mut self, identity: &ComponentIdentity) {
471 self.logs_data_store.remove(identity);
472 }
473}
474
475type LiveIteratorsMap = HashMap<usize, (StreamMode, Box<dyn MultiplexerHandleAction + Send>)>;
476
477pub struct MultiplexerBroker {
479 live_iterators: Arc<Mutex<LiveIteratorsMap>>,
480 cleanup_sender: mpsc::UnboundedSender<usize>,
481 _live_iterators_cleanup_task: fasync::Task<()>,
482}
483
484impl MultiplexerBroker {
485 fn new() -> Self {
486 let (cleanup_sender, mut receiver) = mpsc::unbounded();
487 let live_iterators = Arc::new(Mutex::new(HashMap::new()));
488 let live_iterators_clone = Arc::clone(&live_iterators);
489 Self {
490 live_iterators,
491 cleanup_sender,
492 _live_iterators_cleanup_task: fasync::Task::spawn(async move {
493 while let Some(id) = receiver.next().await {
494 live_iterators_clone.lock().remove(&id);
495 }
496 }),
497 }
498 }
499
500 fn cleanup_sender(&self) -> mpsc::UnboundedSender<usize> {
501 self.cleanup_sender.clone()
502 }
503
504 fn add(&mut self, mode: StreamMode, recipient: Box<dyn MultiplexerHandleAction + Send>) {
507 match mode {
508 StreamMode::Snapshot => recipient.close(),
510 StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => {
511 self.live_iterators.lock().insert(recipient.multiplexer_id(), (mode, recipient));
512 }
513 }
514 }
515
516 pub fn send(&mut self, container: &Arc<LogsArtifactsContainer>) {
519 self.live_iterators
520 .lock()
521 .retain(|_, (mode, recipient)| recipient.send_cursor_from(*mode, container));
522 }
523
524 fn terminate(&mut self) {
526 for (_, (_, recipient)) in self.live_iterators.lock().drain() {
527 recipient.close();
528 }
529 }
530}
531
532#[cfg(test)]
533mod tests {
534 use super::*;
535 use crate::logs::testing::make_message;
536 use fidl_fuchsia_logger::LogSinkMarker;
537
538 use moniker::ExtendedMoniker;
539 use selectors::FastError;
540 use std::time::Duration;
541
542 #[fuchsia::test]
543 async fn data_repo_filters_logs_by_selectors() {
544 let repo = LogsRepository::for_test(fasync::Scope::new());
545 let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
546 ExtendedMoniker::parse_str("./foo").unwrap(),
547 "fuchsia-pkg://foo",
548 )));
549 let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
550 ExtendedMoniker::parse_str("./bar").unwrap(),
551 "fuchsia-pkg://bar",
552 )));
553
554 foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
555 bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
556 foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
557
558 let stream = repo.logs_cursor(StreamMode::Snapshot, None, ftrace::Id::random());
559
560 let results =
561 stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
562 assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
563
564 let filtered_stream = repo.logs_cursor(
565 StreamMode::Snapshot,
566 Some(vec![selectors::parse_selector::<FastError>("foo:root").unwrap()]),
567 ftrace::Id::random(),
568 );
569
570 let results =
571 filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
572 assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
573 }
574
575 #[fuchsia::test]
576 async fn multiplexer_broker_cleanup() {
577 let repo = LogsRepository::for_test(fasync::Scope::new());
578 let stream =
579 repo.logs_cursor(StreamMode::SnapshotThenSubscribe, None, ftrace::Id::random());
580
581 assert_eq!(repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().len(), 1);
582
583 drop(stream);
585 loop {
586 fasync::Timer::new(Duration::from_millis(100)).await;
587 if repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().is_empty() {
588 break;
589 }
590 }
591 }
592
593 #[fuchsia::test]
594 async fn data_repo_correctly_sets_initial_interests() {
595 let repo = LogsRepository::new(
596 100000,
597 [
598 ComponentInitialInterest {
599 component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
600 log_severity: Severity::Info,
601 },
602 ComponentInitialInterest {
603 component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
604 log_severity: Severity::Warn,
605 },
606 ComponentInitialInterest {
607 component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
608 log_severity: Severity::Error,
609 },
610 ComponentInitialInterest {
611 component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
612 log_severity: Severity::Debug,
613 },
614 ]
615 .into_iter(),
616 &fuchsia_inspect::Node::default(),
617 fasync::Scope::new(),
618 );
619
620 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
622 ExtendedMoniker::parse_str("core/foo").unwrap(),
623 "fuchsia-pkg://foo",
624 )));
625 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
626 .await;
627
628 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
630 ExtendedMoniker::parse_str("core/baz").unwrap(),
631 "fuchsia-pkg://baz",
632 )));
633 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
634 .await;
635
636 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
639 ExtendedMoniker::parse_str("core/bar").unwrap(),
640 "fuchsia-pkg://bar",
641 )));
642 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
643 .await;
644
645 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
648 ExtendedMoniker::parse_str("core/quux").unwrap(),
649 "fuchsia-pkg://quux",
650 )));
651 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
652 }
653
654 #[fuchsia::test]
655 async fn data_repo_correctly_handles_partial_matching() {
656 let repo = LogsRepository::new(
657 100000,
658 [
659 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
660 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
661 "/core/bust:DEBUG".parse(),
662 "core/bar:ERROR".parse(),
663 "foo:DEBUG".parse(),
664 "both:TRACE".parse(),
665 ]
666 .into_iter()
667 .map(Result::unwrap),
668 &fuchsia_inspect::Node::default(),
669 fasync::Scope::new(),
670 );
671
672 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
674 ExtendedMoniker::parse_str("core/foo").unwrap(),
675 "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
676 )));
677 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
678 .await;
679
680 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
682 ExtendedMoniker::parse_str("core/not-foo").unwrap(),
683 "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
684 )));
685 expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
686 .await;
687
688 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
690 ExtendedMoniker::parse_str("core/baz").unwrap(),
691 "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
692 )));
693 expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
694 .await;
695
696 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
699 ExtendedMoniker::parse_str("core/bar").unwrap(),
700 "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
701 )));
702 expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
703 .await;
704
705 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
708 ExtendedMoniker::parse_str("core/quux").unwrap(),
709 "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
710 )));
711 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
712
713 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
715 ExtendedMoniker::parse_str("core/both").unwrap(),
716 "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
717 )));
718 expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
719 .await;
720
721 let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
723 ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
724 "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
725 )));
726 expect_initial_interest(None, container, repo.scope_handle.clone()).await;
727 }
728
729 async fn expect_initial_interest(
730 expected_severity: Option<FidlSeverity>,
731 container: Arc<LogsArtifactsContainer>,
732 scope: fasync::ScopeHandle,
733 ) {
734 let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
735 container.handle_log_sink(stream, scope);
736 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
737 assert_eq!(initial_interest.min_severity, expected_severity);
738 }
739}