archivist_lib/logs/
repository.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::{CursorItem, LogsArtifactsContainer};
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::multiplex::{Multiplexer, MultiplexerHandleAction};
11use crate::logs::shared_buffer::SharedBuffer;
12use crate::logs::stats::LogStreamStats;
13use anyhow::format_err;
14use diagnostics_data::{LogsData, Severity};
15use fidl_fuchsia_diagnostics::{LogInterestSelector, Selector, StreamMode};
16use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
17use flyweights::FlyStr;
18use fuchsia_inspect_derive::WithInspect;
19use fuchsia_sync::Mutex;
20use fuchsia_url::boot_url::BootUrl;
21use fuchsia_url::AbsoluteComponentUrl;
22use futures::channel::mpsc;
23use futures::prelude::*;
24use log::{debug, error, LevelFilter};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, LazyLock, Weak};
31use {fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace};
32
33// LINT.IfChange
34#[derive(Ord, PartialOrd, Eq, PartialEq)]
35pub struct ComponentInitialInterest {
36    /// The URL or moniker for the component which should receive the initial interest.
37    component: UrlOrMoniker,
38    /// The log severity the initial interest should specify.
39    log_severity: Severity,
40}
41// LINT.ThenChange(/src/lib/assembly/config_schema/src/platform_config/diagnostics_config.rs)
42
43impl FromStr for ComponentInitialInterest {
44    type Err = anyhow::Error;
45    fn from_str(s: &str) -> Result<Self, Self::Err> {
46        let mut split = s.rsplitn(2, ":");
47        match (split.next(), split.next()) {
48            (Some(severity), Some(url_or_moniker)) => {
49                let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
50                    return Err(format_err!("invalid url or moniker"));
51                };
52                let Ok(severity) = Severity::from_str(severity) else {
53                    return Err(format_err!("invalid severity"));
54                };
55                Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
56            }
57            _ => Err(format_err!("invalid interest")),
58        }
59    }
60}
61
62#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
63pub enum UrlOrMoniker {
64    /// An absolute fuchsia url to a component.
65    Url(FlyStr),
66    /// The absolute moniker for a component.
67    Moniker(ExtendedMoniker),
68}
69
70impl FromStr for UrlOrMoniker {
71    type Err = ();
72    fn from_str(s: &str) -> Result<Self, Self::Err> {
73        if AbsoluteComponentUrl::from_str(s).is_ok() || BootUrl::parse(s).is_ok() {
74            Ok(UrlOrMoniker::Url(s.into()))
75        } else if let Ok(moniker) = Moniker::from_str(s) {
76            Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
77        } else {
78            Err(())
79        }
80    }
81}
82
83/// Static ID, used for persistent changes to interest settings.
84pub const STATIC_CONNECTION_ID: usize = 0;
85static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
86static ARCHIVIST_MONIKER: LazyLock<Moniker> =
87    LazyLock::new(|| Moniker::parse_str("bootstrap/archivist").unwrap());
88
89/// LogsRepository holds all diagnostics data and is a singleton wrapped by multiple
90/// [`pipeline::Pipeline`]s in a given Archivist instance.
91pub struct LogsRepository {
92    mutable_state: Mutex<LogsRepositoryState>,
93    shared_buffer: Arc<SharedBuffer>,
94    scope_handle: fasync::ScopeHandle,
95}
96
97impl LogsRepository {
98    pub fn new(
99        logs_max_cached_original_bytes: u64,
100        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
101        parent: &fuchsia_inspect::Node,
102        scope: fasync::Scope,
103    ) -> Arc<Self> {
104        let scope_handle = scope.to_handle();
105        Arc::new_cyclic(|me: &Weak<LogsRepository>| {
106            let me = Weak::clone(me);
107            LogsRepository {
108                scope_handle,
109                mutable_state: Mutex::new(LogsRepositoryState::new(
110                    parent,
111                    initial_interests,
112                    scope,
113                )),
114                shared_buffer: SharedBuffer::new(
115                    logs_max_cached_original_bytes as usize,
116                    Box::new(move |identity| {
117                        if let Some(this) = me.upgrade() {
118                            this.on_container_inactive(&identity);
119                        }
120                    }),
121                ),
122            }
123        })
124    }
125
126    /// Drain the kernel's debug log. The returned future completes once
127    /// existing messages have been ingested.
128    pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
129    where
130        K: DebugLog + Send + Sync + 'static,
131    {
132        let mut mutable_state = self.mutable_state.lock();
133
134        // We can only have one klog reader, if this is already set, it means we are already
135        // draining klog.
136        if mutable_state.draining_klog {
137            return;
138        }
139        mutable_state.draining_klog = true;
140
141        let container =
142            mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
143        let Some(ref scope) = mutable_state.scope else {
144            return;
145        };
146        scope.spawn(async move {
147            debug!("Draining debuglog.");
148            let mut kernel_logger = DebugLogBridge::create(klog_reader);
149            let mut messages = match kernel_logger.existing_logs() {
150                Ok(messages) => messages,
151                Err(e) => {
152                    error!(e:%; "failed to read from kernel log, important logs may be missing");
153                    return;
154                }
155            };
156            messages.sort_by_key(|m| m.timestamp());
157            for message in messages {
158                container.ingest_message(message);
159            }
160
161            let res = kernel_logger
162                .listen()
163                .try_for_each(|message| async {
164                    container.ingest_message(message);
165                    Ok(())
166                })
167                .await;
168            if let Err(e) = res {
169                error!(e:%; "failed to drain kernel log, important logs may be missing");
170            }
171        });
172    }
173
174    pub fn logs_cursor_raw(
175        &self,
176        mode: StreamMode,
177        selectors: Option<Vec<Selector>>,
178        parent_trace_id: ftrace::Id,
179    ) -> impl Stream<Item = CursorItem> + Send {
180        let mut repo = self.mutable_state.lock();
181        let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
182            let cursor = c.cursor_raw(mode);
183            (Arc::clone(identity), cursor)
184        });
185        let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
186        repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
187        merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
188        merged
189    }
190
191    pub fn logs_cursor(
192        &self,
193        mode: StreamMode,
194        selectors: Option<Vec<Selector>>,
195        parent_trace_id: ftrace::Id,
196    ) -> impl Stream<Item = Arc<LogsData>> + Send + 'static {
197        let mut repo = self.mutable_state.lock();
198        let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
199            let cursor = c.cursor(mode, parent_trace_id);
200            (Arc::clone(identity), cursor)
201        });
202        let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
203        repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
204        merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
205        merged
206    }
207
208    pub fn get_log_container(
209        self: &Arc<Self>,
210        identity: Arc<ComponentIdentity>,
211    ) -> Arc<LogsArtifactsContainer> {
212        self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
213    }
214
215    /// Waits until `stop_accepting_new_log_sinks` is called and all log sink tasks have completed.
216    /// After that, any pending Cursors will return Poll::Ready(None).
217    pub async fn wait_for_termination(&self) {
218        let Some(scope) = self.mutable_state.lock().scope.take() else {
219            error!("Attempted to terminate twice");
220            return;
221        };
222        scope.join().await;
223        // Process messages from log sink.
224        debug!("Log ingestion stopped.");
225        // Terminate the shared buffer first so that pending messages are processed before we
226        // terminate all the containers.
227        self.shared_buffer.terminate().await;
228        let mut repo = self.mutable_state.lock();
229        for container in repo.logs_data_store.values() {
230            container.terminate();
231        }
232        repo.logs_multiplexers.terminate();
233    }
234
235    /// Closes the connection in which new logger draining tasks are sent. No more logger tasks
236    /// will be accepted when this is called and we'll proceed to terminate logs.
237    pub fn stop_accepting_new_log_sinks(&self) {
238        self.scope_handle.close();
239    }
240
241    /// Returns an id to use for a new interest connection. Used by both LogSettings and Log, to
242    /// ensure shared uniqueness of their connections.
243    pub fn new_interest_connection(&self) -> usize {
244        INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
245    }
246
247    /// Updates log selectors associated with an interest connection.
248    pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
249        self.mutable_state.lock().update_logs_interest(connection_id, selectors);
250    }
251
252    /// Indicates that the connection associated with the given ID is now done.
253    pub fn finish_interest_connection(&self, connection_id: usize) {
254        self.mutable_state.lock().finish_interest_connection(connection_id);
255    }
256
257    fn on_container_inactive(&self, identity: &ComponentIdentity) {
258        let mut repo = self.mutable_state.lock();
259        if !repo.is_live(identity) {
260            repo.remove(identity);
261        }
262    }
263}
264
265#[cfg(test)]
266impl LogsRepository {
267    pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
268        LogsRepository::new(
269            crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES,
270            std::iter::empty(),
271            &Default::default(),
272            scope,
273        )
274    }
275}
276
277impl EventConsumer for LogsRepository {
278    fn handle(self: Arc<Self>, event: Event) {
279        match event.payload {
280            EventPayload::LogSinkRequested(LogSinkRequestedPayload {
281                component,
282                request_stream,
283            }) => {
284                debug!(identity:% = component; "LogSink requested.");
285                let container = self.get_log_container(component);
286                container.handle_log_sink(request_stream, self.scope_handle.clone());
287            }
288            _ => unreachable!("Archivist state just subscribes to log sink requested"),
289        }
290    }
291}
292
293pub struct LogsRepositoryState {
294    logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
295    inspect_node: inspect::Node,
296
297    /// BatchIterators for logs need to be made aware of new components starting and their logs.
298    logs_multiplexers: MultiplexerBroker,
299
300    /// Interest registrations that we have received through fuchsia.logger.Log/ListWithSelectors
301    /// or through fuchsia.logger.LogSettings/SetInterest.
302    interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
303
304    /// Whether or not we are draining the kernel log.
305    draining_klog: bool,
306
307    /// Scope where log ingestion tasks are running.
308    scope: Option<fasync::Scope>,
309
310    /// The initial log interests with which archivist was configured.
311    initial_interests: BTreeMap<UrlOrMoniker, Severity>,
312}
313
314impl LogsRepositoryState {
315    fn new(
316        parent: &fuchsia_inspect::Node,
317        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
318        scope: fasync::Scope,
319    ) -> Self {
320        Self {
321            inspect_node: parent.create_child("log_sources"),
322            logs_data_store: HashMap::new(),
323            logs_multiplexers: MultiplexerBroker::new(),
324            interest_registrations: BTreeMap::new(),
325            draining_klog: false,
326            initial_interests: initial_interests
327                .map(|ComponentInitialInterest { component, log_severity }| {
328                    (component, log_severity)
329                })
330                .collect(),
331            scope: Some(scope),
332        }
333    }
334
335    /// Returns a container for logs artifacts, constructing one and adding it to the trie if
336    /// necessary.
337    pub fn get_log_container(
338        &mut self,
339        identity: Arc<ComponentIdentity>,
340        shared_buffer: &Arc<SharedBuffer>,
341        repo: &Arc<LogsRepository>,
342    ) -> Arc<LogsArtifactsContainer> {
343        match self.logs_data_store.get(&identity) {
344            None => {
345                let initial_interest = self.get_initial_interest(identity.as_ref());
346                let weak_repo = Arc::downgrade(repo);
347                let stats = LogStreamStats::default()
348                    .with_inspect(&self.inspect_node, identity.moniker.to_string())
349                    .expect("failed to attach component log stats");
350                stats.set_url(&identity.url);
351                let stats = Arc::new(stats);
352                let container = Arc::new(LogsArtifactsContainer::new(
353                    Arc::clone(&identity),
354                    self.interest_registrations.values().flat_map(|s| s.iter()),
355                    initial_interest,
356                    Arc::clone(&stats),
357                    shared_buffer.new_container_buffer(Arc::clone(&identity), stats),
358                    Some(Box::new(move |c| {
359                        if let Some(repo) = weak_repo.upgrade() {
360                            repo.on_container_inactive(&c.identity)
361                        }
362                    })),
363                ));
364                self.logs_data_store.insert(identity, Arc::clone(&container));
365                self.logs_multiplexers.send(&container);
366                container
367            }
368            Some(existing) => Arc::clone(existing),
369        }
370    }
371
372    fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
373        match (
374            self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())),
375            self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())),
376        ) {
377            (None, None) => None,
378            (Some(severity), None) | (None, Some(severity)) => Some(FidlSeverity::from(*severity)),
379            (Some(s1), Some(s2)) => Some(FidlSeverity::from(std::cmp::min(*s1, *s2))),
380        }
381    }
382
383    fn is_live(&self, identity: &ComponentIdentity) -> bool {
384        match self.logs_data_store.get(identity) {
385            Some(container) => container.is_active(),
386            None => false,
387        }
388    }
389
390    /// Updates our own log interest if we are the root Archivist and logging
391    /// to klog.
392    fn maybe_update_own_logs_interest(
393        &mut self,
394        selectors: &[LogInterestSelector],
395        clear_interest: bool,
396    ) {
397        let lowest_selector = selectors
398            .iter()
399            .filter(|selector| {
400                ARCHIVIST_MONIKER.matches_component_selector(&selector.selector).unwrap_or(false)
401            })
402            .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
403        if let Some(selector) = lowest_selector {
404            if clear_interest {
405                log::set_max_level(LevelFilter::Info);
406            } else {
407                log::set_max_level(
408                    match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
409                        FidlSeverity::Trace => LevelFilter::Trace,
410                        FidlSeverity::Debug => LevelFilter::Debug,
411                        FidlSeverity::Info => LevelFilter::Info,
412                        FidlSeverity::Warn => LevelFilter::Warn,
413                        FidlSeverity::Error => LevelFilter::Error,
414                        // Log has no "Fatal" level, so set it to Error
415                        // instead.
416                        FidlSeverity::Fatal => LevelFilter::Error,
417                        FidlSeverity::__SourceBreaking { .. } => return,
418                    },
419                );
420            }
421        }
422    }
423
424    fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
425        self.maybe_update_own_logs_interest(&selectors, false);
426        let previous_selectors =
427            self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
428        // unwrap safe, we just inserted.
429        let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
430        for logs_data in self.logs_data_store.values() {
431            logs_data.update_interest(new_selectors.iter(), &previous_selectors);
432        }
433    }
434
435    pub fn finish_interest_connection(&mut self, connection_id: usize) {
436        let selectors = self.interest_registrations.remove(&connection_id);
437        if let Some(selectors) = selectors {
438            self.maybe_update_own_logs_interest(&selectors, true);
439            for logs_data in self.logs_data_store.values() {
440                logs_data.reset_interest(&selectors);
441            }
442        }
443    }
444
445    pub fn remove(&mut self, identity: &ComponentIdentity) {
446        self.logs_data_store.remove(identity);
447    }
448}
449
450type LiveIteratorsMap = HashMap<usize, (StreamMode, Box<dyn MultiplexerHandleAction + Send>)>;
451
452/// Ensures that BatchIterators get access to logs from newly started components.
453pub struct MultiplexerBroker {
454    live_iterators: Arc<Mutex<LiveIteratorsMap>>,
455    cleanup_sender: mpsc::UnboundedSender<usize>,
456    _live_iterators_cleanup_task: fasync::Task<()>,
457}
458
459impl MultiplexerBroker {
460    fn new() -> Self {
461        let (cleanup_sender, mut receiver) = mpsc::unbounded();
462        let live_iterators = Arc::new(Mutex::new(HashMap::new()));
463        let live_iterators_clone = Arc::clone(&live_iterators);
464        Self {
465            live_iterators,
466            cleanup_sender,
467            _live_iterators_cleanup_task: fasync::Task::spawn(async move {
468                while let Some(id) = receiver.next().await {
469                    live_iterators_clone.lock().remove(&id);
470                }
471            }),
472        }
473    }
474
475    fn cleanup_sender(&self) -> mpsc::UnboundedSender<usize> {
476        self.cleanup_sender.clone()
477    }
478
479    /// A new BatchIterator has been created and must be notified when future log containers are
480    /// created.
481    fn add(&mut self, mode: StreamMode, recipient: Box<dyn MultiplexerHandleAction + Send>) {
482        match mode {
483            // snapshot streams only want to know about what's currently available
484            StreamMode::Snapshot => recipient.close(),
485            StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => {
486                self.live_iterators.lock().insert(recipient.multiplexer_id(), (mode, recipient));
487            }
488        }
489    }
490
491    /// Notify existing BatchIterators of a new logs container so they can include its messages
492    /// in their results.
493    pub fn send(&mut self, container: &Arc<LogsArtifactsContainer>) {
494        self.live_iterators
495            .lock()
496            .retain(|_, (mode, recipient)| recipient.send_cursor_from(*mode, container));
497    }
498
499    /// Notify all multiplexers to terminate their streams once sub streams have terminated.
500    fn terminate(&mut self) {
501        for (_, (_, recipient)) in self.live_iterators.lock().drain() {
502            recipient.close();
503        }
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510    use crate::logs::testing::make_message;
511    use fidl_fuchsia_logger::LogSinkMarker;
512
513    use moniker::ExtendedMoniker;
514    use selectors::FastError;
515    use std::time::Duration;
516
517    #[fuchsia::test]
518    async fn data_repo_filters_logs_by_selectors() {
519        let repo = LogsRepository::for_test(fasync::Scope::new());
520        let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
521            ExtendedMoniker::parse_str("./foo").unwrap(),
522            "fuchsia-pkg://foo",
523        )));
524        let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
525            ExtendedMoniker::parse_str("./bar").unwrap(),
526            "fuchsia-pkg://bar",
527        )));
528
529        foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
530        bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
531        foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
532
533        let stream = repo.logs_cursor(StreamMode::Snapshot, None, ftrace::Id::random());
534
535        let results =
536            stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
537        assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
538
539        let filtered_stream = repo.logs_cursor(
540            StreamMode::Snapshot,
541            Some(vec![selectors::parse_selector::<FastError>("foo:root").unwrap()]),
542            ftrace::Id::random(),
543        );
544
545        let results =
546            filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
547        assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
548    }
549
550    #[fuchsia::test]
551    async fn multiplexer_broker_cleanup() {
552        let repo = LogsRepository::for_test(fasync::Scope::new());
553        let stream =
554            repo.logs_cursor(StreamMode::SnapshotThenSubscribe, None, ftrace::Id::random());
555
556        assert_eq!(repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().len(), 1);
557
558        // When the multiplexer goes away it must be forgotten by the broker.
559        drop(stream);
560        loop {
561            fasync::Timer::new(Duration::from_millis(100)).await;
562            if repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().is_empty() {
563                break;
564            }
565        }
566    }
567
568    #[fuchsia::test]
569    async fn data_repo_correctly_sets_initial_interests() {
570        let repo = LogsRepository::new(
571            100000,
572            [
573                ComponentInitialInterest {
574                    component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
575                    log_severity: Severity::Info,
576                },
577                ComponentInitialInterest {
578                    component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
579                    log_severity: Severity::Warn,
580                },
581                ComponentInitialInterest {
582                    component: UrlOrMoniker::Moniker("core/bar".try_into().unwrap()),
583                    log_severity: Severity::Error,
584                },
585                ComponentInitialInterest {
586                    component: UrlOrMoniker::Moniker("core/foo".try_into().unwrap()),
587                    log_severity: Severity::Debug,
588                },
589            ]
590            .into_iter(),
591            &fuchsia_inspect::Node::default(),
592            fasync::Scope::new(),
593        );
594
595        // We have the moniker configured, use the associated severity.
596        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
597            ExtendedMoniker::parse_str("core/foo").unwrap(),
598            "fuchsia-pkg://foo",
599        )));
600        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
601            .await;
602
603        // We have the URL configure, use the associated severity.
604        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
605            ExtendedMoniker::parse_str("core/baz").unwrap(),
606            "fuchsia-pkg://baz",
607        )));
608        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
609            .await;
610
611        // We have both a URL and a moniker in the config. Pick the minimium one, in this case Info
612        // for the URL over Error for the moniker.
613        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
614            ExtendedMoniker::parse_str("core/bar").unwrap(),
615            "fuchsia-pkg://bar",
616        )));
617        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
618            .await;
619
620        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
621        // severity isn't set.
622        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
623            ExtendedMoniker::parse_str("core/quux").unwrap(),
624            "fuchsia-pkg://quux",
625        )));
626        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
627    }
628
629    async fn expect_initial_interest(
630        expected_severity: Option<FidlSeverity>,
631        container: Arc<LogsArtifactsContainer>,
632        scope: fasync::ScopeHandle,
633    ) {
634        let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
635        container.handle_log_sink(stream, scope);
636        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
637        assert_eq!(initial_interest.min_severity, expected_severity);
638    }
639}