archivist_lib/logs/
repository.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::{CursorItem, LogsArtifactsContainer};
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::multiplex::{Multiplexer, MultiplexerHandleAction};
11use crate::logs::shared_buffer::SharedBuffer;
12use crate::logs::stats::LogStreamStats;
13use anyhow::format_err;
14use diagnostics_data::{LogsData, Severity};
15use fidl_fuchsia_diagnostics::{LogInterestSelector, Selector, StreamMode};
16use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
17use flyweights::FlyStr;
18use fuchsia_inspect_derive::WithInspect;
19use fuchsia_sync::Mutex;
20use fuchsia_url::boot_url::BootUrl;
21use fuchsia_url::AbsoluteComponentUrl;
22use futures::channel::mpsc;
23use futures::prelude::*;
24use log::{debug, error, LevelFilter};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, LazyLock, Weak};
31use {fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace};
32
33// LINT.IfChange
34#[derive(Ord, PartialOrd, Eq, PartialEq)]
35pub struct ComponentInitialInterest {
36    /// The URL or moniker for the component which should receive the initial interest.
37    component: UrlOrMoniker,
38    /// The log severity the initial interest should specify.
39    log_severity: Severity,
40}
41// LINT.ThenChange(/src/lib/assembly/config_schema/src/platform_config/diagnostics_config.rs)
42
43impl FromStr for ComponentInitialInterest {
44    type Err = anyhow::Error;
45    fn from_str(s: &str) -> Result<Self, Self::Err> {
46        let mut split = s.rsplitn(2, ":");
47        match (split.next(), split.next()) {
48            (Some(severity), Some(url_or_moniker)) => {
49                let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
50                    return Err(format_err!("invalid url or moniker"));
51                };
52                let Ok(severity) = Severity::from_str(severity) else {
53                    return Err(format_err!("invalid severity"));
54                };
55                Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
56            }
57            _ => Err(format_err!("invalid interest")),
58        }
59    }
60}
61
62#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
63pub enum UrlOrMoniker {
64    /// An absolute fuchsia url to a component.
65    Url(FlyStr),
66    /// The absolute moniker for a component.
67    Moniker(ExtendedMoniker),
68    /// A partial string to match against url or moniker.
69    Partial(FlyStr),
70}
71
72impl FromStr for UrlOrMoniker {
73    type Err = ();
74    fn from_str(s: &str) -> Result<Self, Self::Err> {
75        if AbsoluteComponentUrl::from_str(s).is_ok() || BootUrl::parse(s).is_ok() {
76            Ok(UrlOrMoniker::Url(s.into()))
77        } else if s.starts_with("/") {
78            if let Ok(moniker) = Moniker::from_str(s) {
79                Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
80            } else {
81                Err(())
82            }
83        } else {
84            Ok(UrlOrMoniker::Partial(s.into()))
85        }
86    }
87}
88
89/// Static ID, used for persistent changes to interest settings.
90pub const STATIC_CONNECTION_ID: usize = 0;
91static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
92static ARCHIVIST_MONIKER: LazyLock<Moniker> =
93    LazyLock::new(|| Moniker::parse_str("bootstrap/archivist").unwrap());
94
95/// LogsRepository holds all diagnostics data and is a singleton wrapped by multiple
96/// [`pipeline::Pipeline`]s in a given Archivist instance.
97pub struct LogsRepository {
98    mutable_state: Mutex<LogsRepositoryState>,
99    shared_buffer: Arc<SharedBuffer>,
100    scope_handle: fasync::ScopeHandle,
101}
102
103impl LogsRepository {
104    pub fn new(
105        logs_max_cached_original_bytes: u64,
106        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
107        parent: &fuchsia_inspect::Node,
108        scope: fasync::Scope,
109    ) -> Arc<Self> {
110        let scope_handle = scope.to_handle();
111        Arc::new_cyclic(|me: &Weak<LogsRepository>| {
112            let me = Weak::clone(me);
113            LogsRepository {
114                scope_handle,
115                mutable_state: Mutex::new(LogsRepositoryState::new(
116                    parent,
117                    initial_interests,
118                    scope,
119                )),
120                shared_buffer: SharedBuffer::new(
121                    logs_max_cached_original_bytes as usize,
122                    Box::new(move |identity| {
123                        if let Some(this) = me.upgrade() {
124                            this.on_container_inactive(&identity);
125                        }
126                    }),
127                ),
128            }
129        })
130    }
131
132    /// Drain the kernel's debug log. The returned future completes once
133    /// existing messages have been ingested.
134    pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
135    where
136        K: DebugLog + Send + Sync + 'static,
137    {
138        let mut mutable_state = self.mutable_state.lock();
139
140        // We can only have one klog reader, if this is already set, it means we are already
141        // draining klog.
142        if mutable_state.draining_klog {
143            return;
144        }
145        mutable_state.draining_klog = true;
146
147        let container =
148            mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
149        let Some(ref scope) = mutable_state.scope else {
150            return;
151        };
152        scope.spawn(async move {
153            debug!("Draining debuglog.");
154            let mut kernel_logger = DebugLogBridge::create(klog_reader);
155            let mut messages = match kernel_logger.existing_logs() {
156                Ok(messages) => messages,
157                Err(e) => {
158                    error!(e:%; "failed to read from kernel log, important logs may be missing");
159                    return;
160                }
161            };
162            messages.sort_by_key(|m| m.timestamp());
163            for message in messages {
164                container.ingest_message(message);
165            }
166
167            let res = kernel_logger
168                .listen()
169                .try_for_each(|message| async {
170                    container.ingest_message(message);
171                    Ok(())
172                })
173                .await;
174            if let Err(e) = res {
175                error!(e:%; "failed to drain kernel log, important logs may be missing");
176            }
177        });
178    }
179
180    pub fn logs_cursor_raw(
181        &self,
182        mode: StreamMode,
183        selectors: Option<Vec<Selector>>,
184        parent_trace_id: ftrace::Id,
185    ) -> impl Stream<Item = CursorItem> + Send {
186        let mut repo = self.mutable_state.lock();
187        let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
188            let cursor = c.cursor_raw(mode);
189            (Arc::clone(identity), cursor)
190        });
191        let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
192        repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
193        merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
194        merged
195    }
196
197    pub fn logs_cursor(
198        &self,
199        mode: StreamMode,
200        selectors: Option<Vec<Selector>>,
201        parent_trace_id: ftrace::Id,
202    ) -> impl Stream<Item = Arc<LogsData>> + Send + 'static {
203        let mut repo = self.mutable_state.lock();
204        let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
205            let cursor = c.cursor(mode, parent_trace_id);
206            (Arc::clone(identity), cursor)
207        });
208        let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
209        repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
210        merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
211        merged
212    }
213
214    pub fn get_log_container(
215        self: &Arc<Self>,
216        identity: Arc<ComponentIdentity>,
217    ) -> Arc<LogsArtifactsContainer> {
218        self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
219    }
220
221    /// Waits until `stop_accepting_new_log_sinks` is called and all log sink tasks have completed.
222    /// After that, any pending Cursors will return Poll::Ready(None).
223    pub async fn wait_for_termination(&self) {
224        let Some(scope) = self.mutable_state.lock().scope.take() else {
225            error!("Attempted to terminate twice");
226            return;
227        };
228        scope.join().await;
229        // Process messages from log sink.
230        debug!("Log ingestion stopped.");
231        // Terminate the shared buffer first so that pending messages are processed before we
232        // terminate all the containers.
233        self.shared_buffer.terminate().await;
234        let mut repo = self.mutable_state.lock();
235        for container in repo.logs_data_store.values() {
236            container.terminate();
237        }
238        repo.logs_multiplexers.terminate();
239    }
240
241    /// Closes the connection in which new logger draining tasks are sent. No more logger tasks
242    /// will be accepted when this is called and we'll proceed to terminate logs.
243    pub fn stop_accepting_new_log_sinks(&self) {
244        self.scope_handle.close();
245    }
246
247    /// Returns an id to use for a new interest connection. Used by both LogSettings and Log, to
248    /// ensure shared uniqueness of their connections.
249    pub fn new_interest_connection(&self) -> usize {
250        INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
251    }
252
253    /// Updates log selectors associated with an interest connection.
254    pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
255        self.mutable_state.lock().update_logs_interest(connection_id, selectors);
256    }
257
258    /// Indicates that the connection associated with the given ID is now done.
259    pub fn finish_interest_connection(&self, connection_id: usize) {
260        self.mutable_state.lock().finish_interest_connection(connection_id);
261    }
262
263    fn on_container_inactive(&self, identity: &ComponentIdentity) {
264        let mut repo = self.mutable_state.lock();
265        if !repo.is_live(identity) {
266            repo.remove(identity);
267        }
268    }
269}
270
271#[cfg(test)]
272impl LogsRepository {
273    pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
274        LogsRepository::new(
275            crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES,
276            std::iter::empty(),
277            &Default::default(),
278            scope,
279        )
280    }
281}
282
283impl EventConsumer for LogsRepository {
284    fn handle(self: Arc<Self>, event: Event) {
285        match event.payload {
286            EventPayload::LogSinkRequested(LogSinkRequestedPayload {
287                component,
288                request_stream,
289            }) => {
290                debug!(identity:% = component; "LogSink requested.");
291                let container = self.get_log_container(component);
292                container.handle_log_sink(request_stream, self.scope_handle.clone());
293            }
294            _ => unreachable!("Archivist state just subscribes to log sink requested"),
295        }
296    }
297}
298
299pub struct LogsRepositoryState {
300    logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
301    inspect_node: inspect::Node,
302
303    /// BatchIterators for logs need to be made aware of new components starting and their logs.
304    logs_multiplexers: MultiplexerBroker,
305
306    /// Interest registrations that we have received through fuchsia.logger.Log/ListWithSelectors
307    /// or through fuchsia.logger.LogSettings/SetInterest.
308    interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
309
310    /// Whether or not we are draining the kernel log.
311    draining_klog: bool,
312
313    /// Scope where log ingestion tasks are running.
314    scope: Option<fasync::Scope>,
315
316    /// The initial log interests with which archivist was configured.
317    initial_interests: BTreeMap<UrlOrMoniker, Severity>,
318}
319
320impl LogsRepositoryState {
321    fn new(
322        parent: &fuchsia_inspect::Node,
323        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
324        scope: fasync::Scope,
325    ) -> Self {
326        Self {
327            inspect_node: parent.create_child("log_sources"),
328            logs_data_store: HashMap::new(),
329            logs_multiplexers: MultiplexerBroker::new(),
330            interest_registrations: BTreeMap::new(),
331            draining_klog: false,
332            initial_interests: initial_interests
333                .map(|ComponentInitialInterest { component, log_severity }| {
334                    (component, log_severity)
335                })
336                .collect(),
337            scope: Some(scope),
338        }
339    }
340
341    /// Returns a container for logs artifacts, constructing one and adding it to the trie if
342    /// necessary.
343    pub fn get_log_container(
344        &mut self,
345        identity: Arc<ComponentIdentity>,
346        shared_buffer: &Arc<SharedBuffer>,
347        repo: &Arc<LogsRepository>,
348    ) -> Arc<LogsArtifactsContainer> {
349        match self.logs_data_store.get(&identity) {
350            None => {
351                let initial_interest = self.get_initial_interest(identity.as_ref());
352                let weak_repo = Arc::downgrade(repo);
353                let stats = LogStreamStats::default()
354                    .with_inspect(&self.inspect_node, identity.moniker.to_string())
355                    .expect("failed to attach component log stats");
356                stats.set_url(&identity.url);
357                let stats = Arc::new(stats);
358                let container = Arc::new(LogsArtifactsContainer::new(
359                    Arc::clone(&identity),
360                    self.interest_registrations.values().flat_map(|s| s.iter()),
361                    initial_interest,
362                    Arc::clone(&stats),
363                    shared_buffer.new_container_buffer(Arc::clone(&identity), stats),
364                    Some(Box::new(move |c| {
365                        if let Some(repo) = weak_repo.upgrade() {
366                            repo.on_container_inactive(&c.identity)
367                        }
368                    })),
369                ));
370                self.logs_data_store.insert(identity, Arc::clone(&container));
371                self.logs_multiplexers.send(&container);
372                container
373            }
374            Some(existing) => Arc::clone(existing),
375        }
376    }
377
378    fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
379        let exact_url_severity =
380            self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
381        let exact_moniker_severity =
382            self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
383
384        let partial_severity = self
385            .initial_interests
386            .iter()
387            .filter_map(|(uom, severity)| match uom {
388                UrlOrMoniker::Partial(p) => {
389                    if identity.url.contains(p.as_str())
390                        || identity.moniker.to_string().contains(p.as_str())
391                    {
392                        Some(*severity)
393                    } else {
394                        None
395                    }
396                }
397                _ => None,
398            })
399            .min();
400
401        [exact_url_severity, exact_moniker_severity, partial_severity]
402            .into_iter()
403            .flatten()
404            .min()
405            .map(FidlSeverity::from)
406    }
407
408    fn is_live(&self, identity: &ComponentIdentity) -> bool {
409        match self.logs_data_store.get(identity) {
410            Some(container) => container.is_active(),
411            None => false,
412        }
413    }
414
415    /// Updates our own log interest if we are the root Archivist and logging
416    /// to klog.
417    fn maybe_update_own_logs_interest(
418        &mut self,
419        selectors: &[LogInterestSelector],
420        clear_interest: bool,
421    ) {
422        let lowest_selector = selectors
423            .iter()
424            .filter(|selector| {
425                ARCHIVIST_MONIKER.matches_component_selector(&selector.selector).unwrap_or(false)
426            })
427            .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
428        if let Some(selector) = lowest_selector {
429            if clear_interest {
430                log::set_max_level(LevelFilter::Info);
431            } else {
432                log::set_max_level(
433                    match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
434                        FidlSeverity::Trace => LevelFilter::Trace,
435                        FidlSeverity::Debug => LevelFilter::Debug,
436                        FidlSeverity::Info => LevelFilter::Info,
437                        FidlSeverity::Warn => LevelFilter::Warn,
438                        FidlSeverity::Error => LevelFilter::Error,
439                        // Log has no "Fatal" level, so set it to Error
440                        // instead.
441                        FidlSeverity::Fatal => LevelFilter::Error,
442                        FidlSeverity::__SourceBreaking { .. } => return,
443                    },
444                );
445            }
446        }
447    }
448
449    fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
450        self.maybe_update_own_logs_interest(&selectors, false);
451        let previous_selectors =
452            self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
453        // unwrap safe, we just inserted.
454        let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
455        for logs_data in self.logs_data_store.values() {
456            logs_data.update_interest(new_selectors.iter(), &previous_selectors);
457        }
458    }
459
460    pub fn finish_interest_connection(&mut self, connection_id: usize) {
461        let selectors = self.interest_registrations.remove(&connection_id);
462        if let Some(selectors) = selectors {
463            self.maybe_update_own_logs_interest(&selectors, true);
464            for logs_data in self.logs_data_store.values() {
465                logs_data.reset_interest(&selectors);
466            }
467        }
468    }
469
470    pub fn remove(&mut self, identity: &ComponentIdentity) {
471        self.logs_data_store.remove(identity);
472    }
473}
474
475type LiveIteratorsMap = HashMap<usize, (StreamMode, Box<dyn MultiplexerHandleAction + Send>)>;
476
477/// Ensures that BatchIterators get access to logs from newly started components.
478pub struct MultiplexerBroker {
479    live_iterators: Arc<Mutex<LiveIteratorsMap>>,
480    cleanup_sender: mpsc::UnboundedSender<usize>,
481    _live_iterators_cleanup_task: fasync::Task<()>,
482}
483
484impl MultiplexerBroker {
485    fn new() -> Self {
486        let (cleanup_sender, mut receiver) = mpsc::unbounded();
487        let live_iterators = Arc::new(Mutex::new(HashMap::new()));
488        let live_iterators_clone = Arc::clone(&live_iterators);
489        Self {
490            live_iterators,
491            cleanup_sender,
492            _live_iterators_cleanup_task: fasync::Task::spawn(async move {
493                while let Some(id) = receiver.next().await {
494                    live_iterators_clone.lock().remove(&id);
495                }
496            }),
497        }
498    }
499
500    fn cleanup_sender(&self) -> mpsc::UnboundedSender<usize> {
501        self.cleanup_sender.clone()
502    }
503
504    /// A new BatchIterator has been created and must be notified when future log containers are
505    /// created.
506    fn add(&mut self, mode: StreamMode, recipient: Box<dyn MultiplexerHandleAction + Send>) {
507        match mode {
508            // snapshot streams only want to know about what's currently available
509            StreamMode::Snapshot => recipient.close(),
510            StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => {
511                self.live_iterators.lock().insert(recipient.multiplexer_id(), (mode, recipient));
512            }
513        }
514    }
515
516    /// Notify existing BatchIterators of a new logs container so they can include its messages
517    /// in their results.
518    pub fn send(&mut self, container: &Arc<LogsArtifactsContainer>) {
519        self.live_iterators
520            .lock()
521            .retain(|_, (mode, recipient)| recipient.send_cursor_from(*mode, container));
522    }
523
524    /// Notify all multiplexers to terminate their streams once sub streams have terminated.
525    fn terminate(&mut self) {
526        for (_, (_, recipient)) in self.live_iterators.lock().drain() {
527            recipient.close();
528        }
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use super::*;
535    use crate::logs::testing::make_message;
536    use fidl_fuchsia_logger::LogSinkMarker;
537
538    use moniker::ExtendedMoniker;
539    use selectors::FastError;
540    use std::time::Duration;
541
542    #[fuchsia::test]
543    async fn data_repo_filters_logs_by_selectors() {
544        let repo = LogsRepository::for_test(fasync::Scope::new());
545        let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
546            ExtendedMoniker::parse_str("./foo").unwrap(),
547            "fuchsia-pkg://foo",
548        )));
549        let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
550            ExtendedMoniker::parse_str("./bar").unwrap(),
551            "fuchsia-pkg://bar",
552        )));
553
554        foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
555        bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
556        foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
557
558        let stream = repo.logs_cursor(StreamMode::Snapshot, None, ftrace::Id::random());
559
560        let results =
561            stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
562        assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
563
564        let filtered_stream = repo.logs_cursor(
565            StreamMode::Snapshot,
566            Some(vec![selectors::parse_selector::<FastError>("foo:root").unwrap()]),
567            ftrace::Id::random(),
568        );
569
570        let results =
571            filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
572        assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
573    }
574
575    #[fuchsia::test]
576    async fn multiplexer_broker_cleanup() {
577        let repo = LogsRepository::for_test(fasync::Scope::new());
578        let stream =
579            repo.logs_cursor(StreamMode::SnapshotThenSubscribe, None, ftrace::Id::random());
580
581        assert_eq!(repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().len(), 1);
582
583        // When the multiplexer goes away it must be forgotten by the broker.
584        drop(stream);
585        loop {
586            fasync::Timer::new(Duration::from_millis(100)).await;
587            if repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().is_empty() {
588                break;
589            }
590        }
591    }
592
593    #[fuchsia::test]
594    async fn data_repo_correctly_sets_initial_interests() {
595        let repo = LogsRepository::new(
596            100000,
597            [
598                ComponentInitialInterest {
599                    component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
600                    log_severity: Severity::Info,
601                },
602                ComponentInitialInterest {
603                    component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
604                    log_severity: Severity::Warn,
605                },
606                ComponentInitialInterest {
607                    component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
608                    log_severity: Severity::Error,
609                },
610                ComponentInitialInterest {
611                    component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
612                    log_severity: Severity::Debug,
613                },
614            ]
615            .into_iter(),
616            &fuchsia_inspect::Node::default(),
617            fasync::Scope::new(),
618        );
619
620        // We have the moniker configured, use the associated severity.
621        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
622            ExtendedMoniker::parse_str("core/foo").unwrap(),
623            "fuchsia-pkg://foo",
624        )));
625        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
626            .await;
627
628        // We have the URL configure, use the associated severity.
629        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
630            ExtendedMoniker::parse_str("core/baz").unwrap(),
631            "fuchsia-pkg://baz",
632        )));
633        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
634            .await;
635
636        // We have both a URL and a moniker in the config. Pick the minimium one, in this case Info
637        // for the URL over Error for the moniker.
638        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
639            ExtendedMoniker::parse_str("core/bar").unwrap(),
640            "fuchsia-pkg://bar",
641        )));
642        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
643            .await;
644
645        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
646        // severity isn't set.
647        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
648            ExtendedMoniker::parse_str("core/quux").unwrap(),
649            "fuchsia-pkg://quux",
650        )));
651        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
652    }
653
654    #[fuchsia::test]
655    async fn data_repo_correctly_handles_partial_matching() {
656        let repo = LogsRepository::new(
657            100000,
658            [
659                "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
660                "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
661                "/core/bust:DEBUG".parse(),
662                "core/bar:ERROR".parse(),
663                "foo:DEBUG".parse(),
664                "both:TRACE".parse(),
665            ]
666            .into_iter()
667            .map(Result::unwrap),
668            &fuchsia_inspect::Node::default(),
669            fasync::Scope::new(),
670        );
671
672        // We have a partial moniker configured, use the associated severity.
673        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
674            ExtendedMoniker::parse_str("core/foo").unwrap(),
675            "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
676        )));
677        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
678            .await;
679
680        // We have a partial url configured, use the associated severity.
681        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
682            ExtendedMoniker::parse_str("core/not-foo").unwrap(),
683            "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
684        )));
685        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
686            .await;
687
688        // We have the URL configure, use the associated severity.
689        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
690            ExtendedMoniker::parse_str("core/baz").unwrap(),
691            "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
692        )));
693        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
694            .await;
695
696        // We have both a URL and a moniker in the config. Pick the minimum one, in this case Info
697        // for the URL over Error for the moniker.
698        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
699            ExtendedMoniker::parse_str("core/bar").unwrap(),
700            "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
701        )));
702        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
703            .await;
704
705        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
706        // severity isn't set.
707        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
708            ExtendedMoniker::parse_str("core/quux").unwrap(),
709            "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
710        )));
711        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
712
713        // We have a partial match for both moniker and url, should still work.
714        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
715            ExtendedMoniker::parse_str("core/both").unwrap(),
716            "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
717        )));
718        expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
719            .await;
720
721        // Exact moniker match should not match sub-monikers.
722        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
723            ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
724            "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
725        )));
726        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
727    }
728
729    async fn expect_initial_interest(
730        expected_severity: Option<FidlSeverity>,
731        container: Arc<LogsArtifactsContainer>,
732        scope: fasync::ScopeHandle,
733    ) {
734        let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
735        container.handle_log_sink(stream, scope);
736        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
737        assert_eq!(initial_interest.min_severity, expected_severity);
738    }
739}