archivist_lib/logs/
repository.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::EventConsumer;
6use crate::events::types::{Event, EventPayload, LogSinkRequestedPayload};
7use crate::identity::ComponentIdentity;
8use crate::logs::container::{CursorItem, LogsArtifactsContainer};
9use crate::logs::debuglog::{DebugLog, DebugLogBridge, KERNEL_IDENTITY};
10use crate::logs::multiplex::{Multiplexer, MultiplexerHandleAction};
11use crate::logs::shared_buffer::SharedBuffer;
12use crate::logs::stats::LogStreamStats;
13use anyhow::format_err;
14use diagnostics_data::{LogsData, Severity};
15use fidl_fuchsia_diagnostics::{LogInterestSelector, Selector, StreamMode, StringSelector};
16use fidl_fuchsia_diagnostics_types::Severity as FidlSeverity;
17use flyweights::FlyStr;
18use fuchsia_inspect_derive::WithInspect;
19use fuchsia_sync::Mutex;
20use fuchsia_url::AbsoluteComponentUrl;
21use fuchsia_url::boot_url::BootUrl;
22use futures::channel::mpsc;
23use futures::prelude::*;
24use log::{LevelFilter, debug, error};
25use moniker::{ExtendedMoniker, Moniker};
26use selectors::SelectorExt;
27use std::collections::{BTreeMap, HashMap};
28use std::str::FromStr;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::{Arc, OnceLock, Weak};
31use {fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace};
32
33// LINT.IfChange
34#[derive(Ord, PartialOrd, Eq, PartialEq)]
35pub struct ComponentInitialInterest {
36    /// The URL or moniker for the component which should receive the initial interest.
37    component: UrlOrMoniker,
38    /// The log severity the initial interest should specify.
39    log_severity: Severity,
40}
41// LINT.ThenChange(/src/lib/assembly/config_schema/src/platform_config/diagnostics_config.rs)
42
43impl FromStr for ComponentInitialInterest {
44    type Err = anyhow::Error;
45    fn from_str(s: &str) -> Result<Self, Self::Err> {
46        let mut split = s.rsplitn(2, ":");
47        match (split.next(), split.next()) {
48            (Some(severity), Some(url_or_moniker)) => {
49                let Ok(url_or_moniker) = UrlOrMoniker::from_str(url_or_moniker) else {
50                    return Err(format_err!("invalid url or moniker"));
51                };
52                let Ok(severity) = Severity::from_str(severity) else {
53                    return Err(format_err!("invalid severity"));
54                };
55                Ok(ComponentInitialInterest { log_severity: severity, component: url_or_moniker })
56            }
57            _ => Err(format_err!("invalid interest")),
58        }
59    }
60}
61
62#[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
63pub enum UrlOrMoniker {
64    /// An absolute fuchsia url to a component.
65    Url(FlyStr),
66    /// The absolute moniker for a component.
67    Moniker(ExtendedMoniker),
68    /// A partial string to match against url or moniker.
69    Partial(FlyStr),
70}
71
72impl FromStr for UrlOrMoniker {
73    type Err = ();
74    fn from_str(s: &str) -> Result<Self, Self::Err> {
75        if AbsoluteComponentUrl::from_str(s).is_ok() || BootUrl::parse(s).is_ok() {
76            Ok(UrlOrMoniker::Url(s.into()))
77        } else if s.starts_with("/") {
78            if let Ok(moniker) = Moniker::from_str(s) {
79                Ok(UrlOrMoniker::Moniker(ExtendedMoniker::ComponentInstance(moniker)))
80            } else {
81                Err(())
82            }
83        } else {
84            Ok(UrlOrMoniker::Partial(s.into()))
85        }
86    }
87}
88
89/// Static ID, used for persistent changes to interest settings.
90pub const STATIC_CONNECTION_ID: usize = 0;
91static INTEREST_CONNECTION_ID: AtomicUsize = AtomicUsize::new(STATIC_CONNECTION_ID + 1);
92
93/// This is set when Archivist first starts. This might not be set for unit tests.
94pub static ARCHIVIST_MONIKER: OnceLock<Moniker> = OnceLock::new();
95
96/// The IOBuffer writer tag used for Archivist logs. We rely on the first container we create having
97/// this tag, so the Archivist container must be the first container created after creating the
98/// shared buffer. This tag is used before we create the container because we want to set up logging
99/// at the earliest possible moment after a binary starts.
100pub const ARCHIVIST_TAG: u64 = 0;
101
102/// LogsRepository holds all diagnostics data and is a singleton wrapped by multiple
103/// [`pipeline::Pipeline`]s in a given Archivist instance.
104pub struct LogsRepository {
105    mutable_state: Mutex<LogsRepositoryState>,
106    shared_buffer: Arc<SharedBuffer>,
107    scope_handle: fasync::ScopeHandle,
108}
109
110impl LogsRepository {
111    pub fn new(
112        ring_buffer: ring_buffer::Reader,
113        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
114        parent: &fuchsia_inspect::Node,
115        scope: fasync::Scope,
116    ) -> Arc<Self> {
117        let scope_handle = scope.to_handle();
118        Arc::new_cyclic(|me: &Weak<LogsRepository>| {
119            let mut mutable_state = LogsRepositoryState::new(parent, initial_interests, scope);
120            let me_clone = Weak::clone(me);
121            let shared_buffer = SharedBuffer::new(
122                ring_buffer,
123                Box::new(move |identity| {
124                    if let Some(this) = me_clone.upgrade() {
125                        this.on_container_inactive(&identity);
126                    }
127                }),
128                Default::default(),
129            );
130            if let Some(m) = ARCHIVIST_MONIKER.get() {
131                let archivist_container = mutable_state.create_log_container(
132                    Arc::new(ComponentIdentity::new(
133                        ExtendedMoniker::ComponentInstance(m.clone()),
134                        "fuchsia-pkg://UNKNOWN",
135                    )),
136                    &shared_buffer,
137                    Weak::clone(me),
138                );
139                // We rely on the first container we create ending up with the correct tag.
140                assert_eq!(archivist_container.buffer().iob_tag(), ARCHIVIST_TAG);
141            }
142            LogsRepository { scope_handle, mutable_state: Mutex::new(mutable_state), shared_buffer }
143        })
144    }
145
146    pub async fn flush(&self) {
147        self.shared_buffer.flush().await;
148    }
149
150    /// Drain the kernel's debug log. The returned future completes once
151    /// existing messages have been ingested.
152    pub fn drain_debuglog<K>(self: &Arc<Self>, klog_reader: K)
153    where
154        K: DebugLog + Send + Sync + 'static,
155    {
156        let mut mutable_state = self.mutable_state.lock();
157
158        // We can only have one klog reader, if this is already set, it means we are already
159        // draining klog.
160        if mutable_state.draining_klog {
161            return;
162        }
163        mutable_state.draining_klog = true;
164
165        let container =
166            mutable_state.get_log_container(KERNEL_IDENTITY.clone(), &self.shared_buffer, self);
167        let Some(ref scope) = mutable_state.scope else {
168            return;
169        };
170        scope.spawn(async move {
171            debug!("Draining debuglog.");
172            let mut kernel_logger = DebugLogBridge::create(klog_reader);
173            let mut messages = match kernel_logger.existing_logs() {
174                Ok(messages) => messages,
175                Err(e) => {
176                    error!(e:%; "failed to read from kernel log, important logs may be missing");
177                    return;
178                }
179            };
180            messages.sort_by_key(|m| m.timestamp());
181            for message in messages {
182                container.ingest_message(message);
183            }
184
185            let res = kernel_logger
186                .listen()
187                .try_for_each(|message| async {
188                    container.ingest_message(message);
189                    Ok(())
190                })
191                .await;
192            if let Err(e) = res {
193                error!(e:%; "failed to drain kernel log, important logs may be missing");
194            }
195        });
196    }
197
198    pub fn logs_cursor_raw(
199        &self,
200        mode: StreamMode,
201        selectors: Option<Vec<Selector>>,
202        parent_trace_id: ftrace::Id,
203    ) -> impl Stream<Item = CursorItem> + Send {
204        let mut repo = self.mutable_state.lock();
205        let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
206            let cursor = c.cursor_raw(mode);
207            (Arc::clone(identity), cursor)
208        });
209        let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
210        repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
211        merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
212        merged
213    }
214
215    pub fn logs_cursor(
216        &self,
217        mode: StreamMode,
218        selectors: Option<Vec<Selector>>,
219        parent_trace_id: ftrace::Id,
220    ) -> impl Stream<Item = Arc<LogsData>> + Send + 'static {
221        let mut repo = self.mutable_state.lock();
222        let substreams = repo.logs_data_store.iter().map(|(identity, c)| {
223            let cursor = c.cursor(mode, parent_trace_id);
224            (Arc::clone(identity), cursor)
225        });
226        let (mut merged, mpx_handle) = Multiplexer::new(parent_trace_id, selectors, substreams);
227        repo.logs_multiplexers.add(mode, Box::new(mpx_handle));
228        merged.set_on_drop_id_sender(repo.logs_multiplexers.cleanup_sender());
229        merged
230    }
231
232    pub fn get_log_container(
233        self: &Arc<Self>,
234        identity: Arc<ComponentIdentity>,
235    ) -> Arc<LogsArtifactsContainer> {
236        self.mutable_state.lock().get_log_container(identity, &self.shared_buffer, self)
237    }
238
239    /// Waits until `stop_accepting_new_log_sinks` is called and all log sink tasks have completed.
240    /// After that, any pending Cursors will return Poll::Ready(None).
241    pub async fn wait_for_termination(&self) {
242        let Some(scope) = self.mutable_state.lock().scope.take() else {
243            error!("Attempted to terminate twice");
244            return;
245        };
246        scope.join().await;
247        // Process messages from log sink.
248        debug!("Log ingestion stopped.");
249        // Terminate the shared buffer first so that pending messages are processed before we
250        // terminate all the containers.
251        self.shared_buffer.terminate().await;
252        let mut repo = self.mutable_state.lock();
253        for container in repo.logs_data_store.values() {
254            container.terminate();
255        }
256        repo.logs_multiplexers.terminate();
257    }
258
259    /// Closes the connection in which new logger draining tasks are sent. No more logger tasks
260    /// will be accepted when this is called and we'll proceed to terminate logs.
261    pub fn stop_accepting_new_log_sinks(&self) {
262        self.scope_handle.close();
263    }
264
265    /// Returns an id to use for a new interest connection. Used by both LogSettings and Log, to
266    /// ensure shared uniqueness of their connections.
267    pub fn new_interest_connection(&self) -> usize {
268        INTEREST_CONNECTION_ID.fetch_add(1, Ordering::Relaxed)
269    }
270
271    /// Updates log selectors associated with an interest connection.
272    pub fn update_logs_interest(&self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
273        self.mutable_state.lock().update_logs_interest(connection_id, selectors);
274    }
275
276    /// Indicates that the connection associated with the given ID is now done.
277    pub fn finish_interest_connection(&self, connection_id: usize) {
278        self.mutable_state.lock().finish_interest_connection(connection_id);
279    }
280
281    fn on_container_inactive(&self, identity: &ComponentIdentity) {
282        let mut repo = self.mutable_state.lock();
283        if !repo.is_live(identity) {
284            repo.remove(identity);
285        }
286    }
287}
288
289#[cfg(test)]
290impl LogsRepository {
291    pub fn for_test(scope: fasync::Scope) -> Arc<Self> {
292        use crate::logs::shared_buffer::create_ring_buffer;
293
294        LogsRepository::new(
295            create_ring_buffer(crate::constants::LEGACY_DEFAULT_MAXIMUM_CACHED_LOGS_BYTES as usize),
296            std::iter::empty(),
297            &Default::default(),
298            scope,
299        )
300    }
301}
302
303impl EventConsumer for LogsRepository {
304    fn handle(self: Arc<Self>, event: Event) {
305        match event.payload {
306            EventPayload::LogSinkRequested(LogSinkRequestedPayload {
307                component,
308                request_stream,
309            }) => {
310                debug!(identity:% = component; "LogSink requested.");
311                let container = self.get_log_container(component);
312                container.handle_log_sink(request_stream, self.scope_handle.clone());
313            }
314            _ => unreachable!("Archivist state just subscribes to log sink requested"),
315        }
316    }
317}
318
319pub struct LogsRepositoryState {
320    logs_data_store: HashMap<Arc<ComponentIdentity>, Arc<LogsArtifactsContainer>>,
321    inspect_node: inspect::Node,
322
323    /// BatchIterators for logs need to be made aware of new components starting and their logs.
324    logs_multiplexers: MultiplexerBroker,
325
326    /// Interest registrations that we have received through fuchsia.logger.Log/ListWithSelectors
327    /// or through fuchsia.logger.LogSettings/SetInterest.
328    interest_registrations: BTreeMap<usize, Vec<LogInterestSelector>>,
329
330    /// Whether or not we are draining the kernel log.
331    draining_klog: bool,
332
333    /// Scope where log ingestion tasks are running.
334    scope: Option<fasync::Scope>,
335
336    /// The initial log interests with which archivist was configured.
337    initial_interests: BTreeMap<UrlOrMoniker, Severity>,
338}
339
340impl LogsRepositoryState {
341    fn new(
342        parent: &fuchsia_inspect::Node,
343        initial_interests: impl Iterator<Item = ComponentInitialInterest>,
344        scope: fasync::Scope,
345    ) -> Self {
346        Self {
347            inspect_node: parent.create_child("log_sources"),
348            logs_data_store: HashMap::new(),
349            logs_multiplexers: MultiplexerBroker::new(),
350            interest_registrations: BTreeMap::new(),
351            draining_klog: false,
352            initial_interests: initial_interests
353                .map(|ComponentInitialInterest { component, log_severity }| {
354                    (component, log_severity)
355                })
356                .collect(),
357            scope: Some(scope),
358        }
359    }
360
361    /// Returns a container for logs artifacts, constructing one and adding it to the trie if
362    /// necessary.
363    pub fn get_log_container(
364        &mut self,
365        identity: Arc<ComponentIdentity>,
366        shared_buffer: &Arc<SharedBuffer>,
367        repo: &Arc<LogsRepository>,
368    ) -> Arc<LogsArtifactsContainer> {
369        match self.logs_data_store.get(&identity) {
370            None => self.create_log_container(identity, shared_buffer, Arc::downgrade(repo)),
371            Some(existing) => Arc::clone(existing),
372        }
373    }
374
375    fn create_log_container(
376        &mut self,
377        identity: Arc<ComponentIdentity>,
378        shared_buffer: &Arc<SharedBuffer>,
379        repo: Weak<LogsRepository>,
380    ) -> Arc<LogsArtifactsContainer> {
381        let initial_interest = self.get_initial_interest(identity.as_ref());
382        let stats = LogStreamStats::default()
383            .with_inspect(&self.inspect_node, identity.moniker.to_string())
384            .expect("failed to attach component log stats");
385        stats.set_url(&identity.url);
386        let stats = Arc::new(stats);
387        let container = Arc::new(LogsArtifactsContainer::new(
388            Arc::clone(&identity),
389            self.interest_registrations.values().flat_map(|s| s.iter()),
390            initial_interest,
391            Arc::clone(&stats),
392            shared_buffer.new_container_buffer(Arc::clone(&identity), stats),
393            Some(Box::new(move |c| {
394                if let Some(repo) = repo.upgrade() {
395                    repo.on_container_inactive(&c.identity)
396                }
397            })),
398        ));
399        self.logs_data_store.insert(identity, Arc::clone(&container));
400        self.logs_multiplexers.send(&container);
401        container
402    }
403
404    fn get_initial_interest(&self, identity: &ComponentIdentity) -> Option<FidlSeverity> {
405        let exact_url_severity =
406            self.initial_interests.get(&UrlOrMoniker::Url(identity.url.clone())).copied();
407        let exact_moniker_severity =
408            self.initial_interests.get(&UrlOrMoniker::Moniker(identity.moniker.clone())).copied();
409
410        let partial_severity = self
411            .initial_interests
412            .iter()
413            .filter_map(|(uom, severity)| match uom {
414                UrlOrMoniker::Partial(p) => {
415                    if identity.url.contains(p.as_str())
416                        || identity.moniker.to_string().contains(p.as_str())
417                    {
418                        Some(*severity)
419                    } else {
420                        None
421                    }
422                }
423                _ => None,
424            })
425            .min();
426
427        [exact_url_severity, exact_moniker_severity, partial_severity]
428            .into_iter()
429            .flatten()
430            .min()
431            .map(FidlSeverity::from)
432    }
433
434    fn is_live(&self, identity: &ComponentIdentity) -> bool {
435        match self.logs_data_store.get(identity) {
436            Some(container) => container.is_active(),
437            None => false,
438        }
439    }
440
441    /// Updates our own log interest if we are the root Archivist and logging
442    /// to klog.
443    fn maybe_update_own_logs_interest(
444        &mut self,
445        selectors: &[LogInterestSelector],
446        clear_interest: bool,
447    ) {
448        let Some(moniker) = ARCHIVIST_MONIKER.get() else { return };
449        let lowest_selector = selectors
450            .iter()
451            .filter(|selector| {
452                // If this is an embedded archivist, the wildcard selector "**" is used (in
453                // tests at least) to change the interest level on all components. Since
454                // archivist logs to itself, this creates a situation where archivist logs can
455                // get included which is undesirable in the vast majority of cases. To address
456                // this, we prevent the global wildcard pattern "**" and "*" from matching
457                // archivist. This is clearly a bit of a hack, but it is balanced by it being
458                // the behavior that the vast majority of users will want. It is still possible
459                // to change the interest level for archivist by using an exact match. Note that
460                // this will apply to both the embedded and system archivist to keep things
461                // consistent.
462                if selector.selector.moniker_segments.as_ref().is_some_and(|s| {
463                    matches!(
464                        &s[..],
465                        [StringSelector::StringPattern(s)] if s == "**" || s == "*"
466                    )
467                }) {
468                    return false;
469                }
470
471                moniker.matches_component_selector(&selector.selector).unwrap_or(false)
472            })
473            .min_by_key(|selector| selector.interest.min_severity.unwrap_or(FidlSeverity::Info));
474        if let Some(selector) = lowest_selector {
475            if clear_interest {
476                log::set_max_level(LevelFilter::Info);
477            } else {
478                log::set_max_level(
479                    match selector.interest.min_severity.unwrap_or(FidlSeverity::Info) {
480                        FidlSeverity::Trace => LevelFilter::Trace,
481                        FidlSeverity::Debug => LevelFilter::Debug,
482                        FidlSeverity::Info => LevelFilter::Info,
483                        FidlSeverity::Warn => LevelFilter::Warn,
484                        FidlSeverity::Error => LevelFilter::Error,
485                        // Log has no "Fatal" level, so set it to Error
486                        // instead.
487                        FidlSeverity::Fatal => LevelFilter::Error,
488                        FidlSeverity::__SourceBreaking { .. } => return,
489                    },
490                );
491            }
492        }
493    }
494
495    fn update_logs_interest(&mut self, connection_id: usize, selectors: Vec<LogInterestSelector>) {
496        self.maybe_update_own_logs_interest(&selectors, false);
497        let previous_selectors =
498            self.interest_registrations.insert(connection_id, selectors).unwrap_or_default();
499        // unwrap safe, we just inserted.
500        let new_selectors = self.interest_registrations.get(&connection_id).unwrap();
501        for logs_data in self.logs_data_store.values() {
502            logs_data.update_interest(new_selectors.iter(), &previous_selectors);
503        }
504    }
505
506    pub fn finish_interest_connection(&mut self, connection_id: usize) {
507        let selectors = self.interest_registrations.remove(&connection_id);
508        if let Some(selectors) = selectors {
509            self.maybe_update_own_logs_interest(&selectors, true);
510            for logs_data in self.logs_data_store.values() {
511                logs_data.reset_interest(&selectors);
512            }
513        }
514    }
515
516    pub fn remove(&mut self, identity: &ComponentIdentity) {
517        self.logs_data_store.remove(identity);
518    }
519}
520
521type LiveIteratorsMap = HashMap<usize, (StreamMode, Box<dyn MultiplexerHandleAction + Send>)>;
522
523/// Ensures that BatchIterators get access to logs from newly started components.
524pub struct MultiplexerBroker {
525    live_iterators: Arc<Mutex<LiveIteratorsMap>>,
526    cleanup_sender: mpsc::UnboundedSender<usize>,
527    _live_iterators_cleanup_task: fasync::Task<()>,
528}
529
530impl MultiplexerBroker {
531    fn new() -> Self {
532        let (cleanup_sender, mut receiver) = mpsc::unbounded();
533        let live_iterators = Arc::new(Mutex::new(HashMap::new()));
534        let live_iterators_clone = Arc::clone(&live_iterators);
535        Self {
536            live_iterators,
537            cleanup_sender,
538            _live_iterators_cleanup_task: fasync::Task::spawn(async move {
539                while let Some(id) = receiver.next().await {
540                    live_iterators_clone.lock().remove(&id);
541                }
542            }),
543        }
544    }
545
546    fn cleanup_sender(&self) -> mpsc::UnboundedSender<usize> {
547        self.cleanup_sender.clone()
548    }
549
550    /// A new BatchIterator has been created and must be notified when future log containers are
551    /// created.
552    fn add(&mut self, mode: StreamMode, recipient: Box<dyn MultiplexerHandleAction + Send>) {
553        match mode {
554            // snapshot streams only want to know about what's currently available
555            StreamMode::Snapshot => recipient.close(),
556            StreamMode::SnapshotThenSubscribe | StreamMode::Subscribe => {
557                self.live_iterators.lock().insert(recipient.multiplexer_id(), (mode, recipient));
558            }
559        }
560    }
561
562    /// Notify existing BatchIterators of a new logs container so they can include its messages
563    /// in their results.
564    pub fn send(&mut self, container: &Arc<LogsArtifactsContainer>) {
565        self.live_iterators
566            .lock()
567            .retain(|_, (mode, recipient)| recipient.send_cursor_from(*mode, container));
568    }
569
570    /// Notify all multiplexers to terminate their streams once sub streams have terminated.
571    fn terminate(&mut self) {
572        for (_, (_, recipient)) in self.live_iterators.lock().drain() {
573            recipient.close();
574        }
575    }
576}
577
578#[cfg(test)]
579mod tests {
580    use super::*;
581    use crate::logs::shared_buffer::create_ring_buffer;
582    use crate::logs::testing::make_message;
583    use fidl_fuchsia_logger::LogSinkMarker;
584
585    use moniker::ExtendedMoniker;
586    use selectors::FastError;
587    use std::time::Duration;
588
589    #[fuchsia::test]
590    async fn data_repo_filters_logs_by_selectors() {
591        let repo = LogsRepository::for_test(fasync::Scope::new());
592        let foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
593            ExtendedMoniker::parse_str("./foo").unwrap(),
594            "fuchsia-pkg://foo",
595        )));
596        let bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
597            ExtendedMoniker::parse_str("./bar").unwrap(),
598            "fuchsia-pkg://bar",
599        )));
600
601        foo_container.ingest_message(make_message("a", None, zx::BootInstant::from_nanos(1)));
602        bar_container.ingest_message(make_message("b", None, zx::BootInstant::from_nanos(2)));
603        foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(3)));
604
605        let stream = repo.logs_cursor(StreamMode::Snapshot, None, ftrace::Id::random());
606
607        let results =
608            stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
609        assert_eq!(results, vec!["a".to_string(), "b".to_string(), "c".to_string()]);
610
611        let filtered_stream = repo.logs_cursor(
612            StreamMode::Snapshot,
613            Some(vec![selectors::parse_selector::<FastError>("foo:root").unwrap()]),
614            ftrace::Id::random(),
615        );
616
617        let results =
618            filtered_stream.map(|value| value.msg().unwrap().to_string()).collect::<Vec<_>>().await;
619        assert_eq!(results, vec!["a".to_string(), "c".to_string()]);
620    }
621
622    #[fuchsia::test]
623    async fn multiplexer_broker_cleanup() {
624        let repo = LogsRepository::for_test(fasync::Scope::new());
625        let stream =
626            repo.logs_cursor(StreamMode::SnapshotThenSubscribe, None, ftrace::Id::random());
627
628        assert_eq!(repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().len(), 1);
629
630        // When the multiplexer goes away it must be forgotten by the broker.
631        drop(stream);
632        loop {
633            fasync::Timer::new(Duration::from_millis(100)).await;
634            if repo.mutable_state.lock().logs_multiplexers.live_iterators.lock().is_empty() {
635                break;
636            }
637        }
638    }
639
640    #[fuchsia::test]
641    async fn data_repo_correctly_sets_initial_interests() {
642        let repo = LogsRepository::new(
643            create_ring_buffer(100000),
644            [
645                ComponentInitialInterest {
646                    component: UrlOrMoniker::Url("fuchsia-pkg://bar".into()),
647                    log_severity: Severity::Info,
648                },
649                ComponentInitialInterest {
650                    component: UrlOrMoniker::Url("fuchsia-pkg://baz".into()),
651                    log_severity: Severity::Warn,
652                },
653                ComponentInitialInterest {
654                    component: UrlOrMoniker::Moniker("/core/bar".try_into().unwrap()),
655                    log_severity: Severity::Error,
656                },
657                ComponentInitialInterest {
658                    component: UrlOrMoniker::Moniker("/core/foo".try_into().unwrap()),
659                    log_severity: Severity::Debug,
660                },
661            ]
662            .into_iter(),
663            &fuchsia_inspect::Node::default(),
664            fasync::Scope::new(),
665        );
666
667        // We have the moniker configured, use the associated severity.
668        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
669            ExtendedMoniker::parse_str("core/foo").unwrap(),
670            "fuchsia-pkg://foo",
671        )));
672        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
673            .await;
674
675        // We have the URL configure, use the associated severity.
676        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
677            ExtendedMoniker::parse_str("core/baz").unwrap(),
678            "fuchsia-pkg://baz",
679        )));
680        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
681            .await;
682
683        // We have both a URL and a moniker in the config. Pick the minimium one, in this case Info
684        // for the URL over Error for the moniker.
685        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
686            ExtendedMoniker::parse_str("core/bar").unwrap(),
687            "fuchsia-pkg://bar",
688        )));
689        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
690            .await;
691
692        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
693        // severity isn't set.
694        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
695            ExtendedMoniker::parse_str("core/quux").unwrap(),
696            "fuchsia-pkg://quux",
697        )));
698        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
699    }
700
701    #[fuchsia::test]
702    async fn data_repo_correctly_handles_partial_matching() {
703        let repo = LogsRepository::new(
704            create_ring_buffer(100000),
705            [
706                "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm:INFO".parse(),
707                "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm:WARN".parse(),
708                "/core/bust:DEBUG".parse(),
709                "core/bar:ERROR".parse(),
710                "foo:DEBUG".parse(),
711                "both:TRACE".parse(),
712            ]
713            .into_iter()
714            .map(Result::unwrap),
715            &fuchsia_inspect::Node::default(),
716            fasync::Scope::new(),
717        );
718
719        // We have a partial moniker configured, use the associated severity.
720        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
721            ExtendedMoniker::parse_str("core/foo").unwrap(),
722            "fuchsia-pkg://fuchsia.com/not-foo#meta/not-foo.cm",
723        )));
724        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
725            .await;
726
727        // We have a partial url configured, use the associated severity.
728        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
729            ExtendedMoniker::parse_str("core/not-foo").unwrap(),
730            "fuchsia-pkg://fuchsia.com/foo#meta/foo.cm",
731        )));
732        expect_initial_interest(Some(FidlSeverity::Debug), container, repo.scope_handle.clone())
733            .await;
734
735        // We have the URL configure, use the associated severity.
736        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
737            ExtendedMoniker::parse_str("core/baz").unwrap(),
738            "fuchsia-pkg://fuchsia.com/baz#meta/baz.cm",
739        )));
740        expect_initial_interest(Some(FidlSeverity::Warn), container, repo.scope_handle.clone())
741            .await;
742
743        // We have both a URL and a moniker in the config. Pick the minimum one, in this case Info
744        // for the URL over Error for the moniker.
745        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
746            ExtendedMoniker::parse_str("core/bar").unwrap(),
747            "fuchsia-pkg://fuchsia.com/bar#meta/bar.cm",
748        )));
749        expect_initial_interest(Some(FidlSeverity::Info), container, repo.scope_handle.clone())
750            .await;
751
752        // Neither the moniker nor the URL have an associated severity, therefore, the minimum
753        // severity isn't set.
754        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
755            ExtendedMoniker::parse_str("core/quux").unwrap(),
756            "fuchsia-pkg://fuchsia.com/quux#meta/quux.cm",
757        )));
758        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
759
760        // We have a partial match for both moniker and url, should still work.
761        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
762            ExtendedMoniker::parse_str("core/both").unwrap(),
763            "fuchsia-pkg://fuchsia.com/both#meta/both.cm",
764        )));
765        expect_initial_interest(Some(FidlSeverity::Trace), container, repo.scope_handle.clone())
766            .await;
767
768        // Exact moniker match should not match sub-monikers.
769        let container = repo.get_log_container(Arc::new(ComponentIdentity::new(
770            ExtendedMoniker::parse_str("core/bust/testing").unwrap(),
771            "fuchsia-pkg://fuchsia.com/busted#meta/busted.cm",
772        )));
773        expect_initial_interest(None, container, repo.scope_handle.clone()).await;
774    }
775
776    async fn expect_initial_interest(
777        expected_severity: Option<FidlSeverity>,
778        container: Arc<LogsArtifactsContainer>,
779        scope: fasync::ScopeHandle,
780    ) {
781        let (log_sink, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
782        container.handle_log_sink(stream, scope);
783        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
784        assert_eq!(initial_interest.min_severity, expected_severity);
785    }
786}