archivist_lib/logs/
container.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::diagnostics::TRACE_CATEGORY;
6use crate::identity::ComponentIdentity;
7use crate::logs::multiplex::PinStream;
8use crate::logs::shared_buffer::{self, ContainerBuffer, LazyItem};
9use crate::logs::socket::{Encoding, LogMessageSocket};
10use crate::logs::stats::LogStreamStats;
11use crate::logs::stored_message::StoredMessage;
12use derivative::Derivative;
13use diagnostics_data::{BuilderArgs, Data, LogError, Logs, LogsData, LogsDataBuilder};
14use fidl_fuchsia_diagnostics::{LogInterestSelector, StreamMode};
15use fidl_fuchsia_diagnostics_types::{Interest as FidlInterest, Severity as FidlSeverity};
16use fidl_fuchsia_logger::{LogSinkRequest, LogSinkRequestStream};
17use fuchsia_async::condition::Condition;
18use futures::future::{Fuse, FusedFuture};
19use futures::prelude::*;
20use futures::select;
21use futures::stream::StreamExt;
22use log::{debug, error, warn};
23use selectors::SelectorExt;
24use std::cmp::Ordering;
25use std::collections::BTreeMap;
26use std::pin::pin;
27use std::sync::Arc;
28use std::task::Poll;
29use {fuchsia_async as fasync, fuchsia_trace as ftrace};
30
31pub type OnInactive = Box<dyn Fn(&LogsArtifactsContainer) + Send + Sync>;
32
33#[derive(Derivative)]
34#[derivative(Debug)]
35pub struct LogsArtifactsContainer {
36    /// The source of logs in this container.
37    pub identity: Arc<ComponentIdentity>,
38
39    /// Inspect instrumentation.
40    pub stats: Arc<LogStreamStats>,
41
42    /// Buffer for all log messages.
43    #[derivative(Debug = "ignore")]
44    buffer: ContainerBuffer,
45
46    /// Mutable state for the container.
47    #[derivative(Debug = "ignore")]
48    state: Arc<Condition<ContainerState>>,
49
50    /// A callback which is called when the container is inactive i.e. has no channels, sockets or
51    /// stored logs.
52    #[derivative(Debug = "ignore")]
53    on_inactive: Option<OnInactive>,
54}
55
56#[derive(Debug)]
57struct ContainerState {
58    /// Number of legacy sockets currently being drained for this component.  Sockets that use
59    /// structured messages use the buffer's socket handling.
60    num_active_legacy_sockets: u64,
61
62    /// Number of LogSink channels currently being listened to for this component.
63    num_active_channels: u64,
64
65    /// Current interest for this component.
66    interests: BTreeMap<Interest, usize>,
67
68    is_initializing: bool,
69}
70
71#[derive(Debug, PartialEq)]
72pub struct CursorItem {
73    pub rolled_out: u64,
74    pub message: Arc<StoredMessage>,
75    pub identity: Arc<ComponentIdentity>,
76}
77
78impl Eq for CursorItem {}
79
80impl Ord for CursorItem {
81    fn cmp(&self, other: &Self) -> Ordering {
82        self.message.timestamp().cmp(&other.message.timestamp())
83    }
84}
85
86impl PartialOrd for CursorItem {
87    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88        Some(self.message.timestamp().cmp(&other.message.timestamp()))
89    }
90}
91
92impl LogsArtifactsContainer {
93    pub fn new<'a>(
94        identity: Arc<ComponentIdentity>,
95        interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
96        initial_interest: Option<FidlSeverity>,
97        stats: Arc<LogStreamStats>,
98        buffer: ContainerBuffer,
99        on_inactive: Option<OnInactive>,
100    ) -> Self {
101        let mut interests = BTreeMap::new();
102        if let Some(severity) = initial_interest {
103            interests.insert(Interest::from(severity), 1);
104        }
105        let new = Self {
106            identity,
107            buffer,
108            state: Arc::new(Condition::new(ContainerState {
109                num_active_channels: 0,
110                num_active_legacy_sockets: 0,
111                interests,
112                is_initializing: true,
113            })),
114            stats,
115            on_inactive,
116        };
117
118        // there are no control handles so this won't notify anyone
119        new.update_interest(interest_selectors, &[]);
120
121        new
122    }
123
124    fn create_raw_cursor(
125        &self,
126        buffer_cursor: shared_buffer::Cursor,
127    ) -> impl Stream<Item = CursorItem> {
128        let identity = Arc::clone(&self.identity);
129        buffer_cursor
130            .enumerate()
131            .scan((zx::BootInstant::ZERO, 0u64), move |(last_timestamp, rolled_out), (i, item)| {
132                futures::future::ready(match item {
133                    LazyItem::Next(message) => {
134                        *last_timestamp = message.timestamp();
135                        Some(Some(CursorItem {
136                            message,
137                            identity: Arc::clone(&identity),
138                            rolled_out: *rolled_out,
139                        }))
140                    }
141                    LazyItem::ItemsRolledOut(rolled_out_count, timestamp) => {
142                        if i > 0 {
143                            *rolled_out += rolled_out_count;
144                        }
145                        *last_timestamp = timestamp;
146                        Some(None)
147                    }
148                })
149            })
150            .filter_map(future::ready)
151    }
152
153    /// Returns a stream of this component's log messages. These are the raw messages in FXT format.
154    ///
155    /// # Rolled out logs
156    ///
157    /// When messages are evicted from our internal buffers before a client can read them, they
158    /// are counted as rolled out messages which gets appended to the metadata of the next message.
159    /// If there is no next message, there is no way to know how many messages were rolled out.
160    pub fn cursor_raw(&self, mode: StreamMode) -> PinStream<CursorItem> {
161        let Some(buffer_cursor) = self.buffer.cursor(mode) else {
162            return Box::pin(futures::stream::empty());
163        };
164        Box::pin(self.create_raw_cursor(buffer_cursor))
165    }
166
167    /// Returns a stream of this component's log messages.
168    ///
169    /// # Rolled out logs
170    ///
171    /// When messages are evicted from our internal buffers before a client can read them, they
172    /// are counted as rolled out messages which gets appended to the metadata of the next message.
173    /// If there is no next message, there is no way to know how many messages were rolled out.
174    pub fn cursor(
175        &self,
176        mode: StreamMode,
177        parent_trace_id: ftrace::Id,
178    ) -> PinStream<Arc<LogsData>> {
179        let Some(buffer_cursor) = self.buffer.cursor(mode) else {
180            return Box::pin(futures::stream::empty());
181        };
182        let mut rolled_out_count = 0;
183        Box::pin(self.create_raw_cursor(buffer_cursor).map(
184            move |CursorItem { message, identity, rolled_out }| {
185                rolled_out_count += rolled_out;
186                let trace_id = ftrace::Id::random();
187                let _trace_guard = ftrace::async_enter!(
188                    trace_id,
189                    TRACE_CATEGORY,
190                    c"LogContainer::cursor.parse_message",
191                    // An async duration cannot have multiple concurrent child async durations
192                    // so we include the nonce as metadata to manually determine relationship.
193                    "parent_trace_id" => u64::from(parent_trace_id),
194                    "trace_id" => u64::from(trace_id)
195                );
196                match message.parse(&identity) {
197                    Ok(m) => Arc::new(maybe_add_rolled_out_error(&mut rolled_out_count, m)),
198                    Err(err) => {
199                        let data = maybe_add_rolled_out_error(
200                            &mut rolled_out_count,
201                            LogsDataBuilder::new(BuilderArgs {
202                                moniker: identity.moniker.clone(),
203                                timestamp: message.timestamp(),
204                                component_url: Some(identity.url.clone()),
205                                severity: diagnostics_data::Severity::Warn,
206                            })
207                            .add_error(diagnostics_data::LogError::FailedToParseRecord(format!(
208                                "{err:?}"
209                            )))
210                            .build(),
211                        );
212                        Arc::new(data)
213                    }
214                }
215            },
216        ))
217    }
218
219    /// Handle `LogSink` protocol on `stream`. Each socket received from the `LogSink` client is
220    /// drained by a `Task` which is sent on `sender`. The `Task`s do not complete until their
221    /// sockets have been closed.
222    pub fn handle_log_sink(
223        self: &Arc<Self>,
224        stream: LogSinkRequestStream,
225        scope: fasync::ScopeHandle,
226    ) {
227        {
228            let mut guard = self.state.lock();
229            guard.num_active_channels += 1;
230            guard.is_initializing = false;
231        }
232        scope.spawn(Arc::clone(self).actually_handle_log_sink(stream, scope.clone()));
233    }
234
235    /// This function does not return until the channel is closed.
236    async fn actually_handle_log_sink(
237        self: Arc<Self>,
238        mut stream: LogSinkRequestStream,
239        scope: fasync::ScopeHandle,
240    ) {
241        let mut previous_interest_sent = None;
242        debug!(identity:% = self.identity; "Draining LogSink channel.");
243
244        let mut hanging_gets = Vec::new();
245        let mut interest_changed = pin!(Fuse::terminated());
246
247        loop {
248            select! {
249                next = stream.next() => {
250                    let Some(next) = next else { break };
251                    match next {
252                        Ok(LogSinkRequest::Connect { socket, .. }) => {
253                            // TODO(https://fxbug.dev/378977533): Add support for ingesting
254                            // the legacy log format directly to the shared buffer.
255                            let socket = fasync::Socket::from_socket(socket);
256                            let log_stream = LogMessageSocket::new(socket, Arc::clone(&self.stats));
257                            self.state.lock().num_active_legacy_sockets += 1;
258                            scope.spawn(Arc::clone(&self).drain_messages(log_stream));
259                        }
260                        Ok(LogSinkRequest::ConnectStructured { socket, .. }) => {
261                            self.buffer.add_socket(socket);
262                        }
263                        Ok(LogSinkRequest::WaitForInterestChange { responder }) => {
264                            // If the interest has changed since we last reported it, we'll report
265                            // it below.
266                            hanging_gets.push(responder);
267                        }
268                        Err(e) => error!(identity:% = self.identity, e:%; "error handling log sink"),
269                        Ok(LogSinkRequest::_UnknownMethod { .. }) => {}
270                    }
271                }
272                _ = interest_changed => {}
273            }
274
275            if !hanging_gets.is_empty() {
276                let min_interest = self.state.lock().min_interest();
277                if Some(&min_interest) != previous_interest_sent.as_ref() {
278                    // Send updates to all outstanding hanging gets.
279                    for responder in hanging_gets.drain(..) {
280                        let _ = responder.send(Ok(&min_interest));
281                    }
282                    interest_changed.set(Fuse::terminated());
283                    previous_interest_sent = Some(min_interest);
284                } else if interest_changed.is_terminated() {
285                    // Set ourselves up to be woken when the interest changes.
286                    let previous_interest_sent = previous_interest_sent.clone();
287                    interest_changed.set(
288                        self.state
289                            .when(move |state| {
290                                if previous_interest_sent != Some(state.min_interest()) {
291                                    Poll::Ready(())
292                                } else {
293                                    Poll::Pending
294                                }
295                            })
296                            .fuse(),
297                    );
298                }
299            }
300        }
301
302        debug!(identity:% = self.identity; "LogSink channel closed.");
303        self.state.lock().num_active_channels -= 1;
304        self.check_inactive();
305    }
306
307    /// Drain a `LogMessageSocket` which wraps a socket from a component
308    /// generating logs.
309    pub async fn drain_messages<E>(self: Arc<Self>, mut log_stream: LogMessageSocket<E>)
310    where
311        E: Encoding + Unpin,
312    {
313        debug!(identity:% = self.identity; "Draining messages from a socket.");
314        loop {
315            match log_stream.next().await {
316                Some(Ok(message)) => self.ingest_message(message),
317                Some(Err(err)) => {
318                    warn!(source:% = self.identity, err:%; "closing socket");
319                    break;
320                }
321                None => break,
322            }
323        }
324        debug!(identity:% = self.identity; "Socket closed.");
325        self.state.lock().num_active_legacy_sockets -= 1;
326        self.check_inactive();
327    }
328
329    /// Updates log stats in inspect and push the message onto the container's buffer.
330    pub fn ingest_message(&self, message: StoredMessage) {
331        self.stats.ingest_message(message.size(), message.severity());
332        self.buffer.push_back(message.bytes());
333    }
334
335    /// Set the `Interest` for this component, notifying all active `LogSink/WaitForInterestChange`
336    /// hanging gets with the new interset if it is a change from the previous interest.
337    /// For any match that is also contained in `previous_selectors`, the previous values will be
338    /// removed from the set of interests.
339    pub fn update_interest<'a>(
340        &self,
341        interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
342        previous_selectors: &[LogInterestSelector],
343    ) {
344        let mut new_interest = FidlInterest::default();
345        let mut remove_interest = FidlInterest::default();
346        for selector in interest_selectors {
347            if self
348                .identity
349                .moniker
350                .matches_component_selector(&selector.selector)
351                .unwrap_or_default()
352            {
353                new_interest = selector.interest.clone();
354                // If there are more matches, ignore them, we'll pick the first match.
355                break;
356            }
357        }
358
359        if let Some(previous_selector) = previous_selectors.iter().find(|s| {
360            self.identity.moniker.matches_component_selector(&s.selector).unwrap_or_default()
361        }) {
362            remove_interest = previous_selector.interest.clone();
363        }
364
365        let mut state = self.state.lock();
366        // Unfortunately we cannot use a match statement since `FidlInterest` doesn't derive Eq.
367        // It does derive PartialEq though. All these branches will send an interest update if the
368        // minimum interest changes after performing the required actions.
369        if new_interest == FidlInterest::default() && remove_interest != FidlInterest::default() {
370            state.erase(&remove_interest);
371        } else if new_interest != FidlInterest::default()
372            && remove_interest == FidlInterest::default()
373        {
374            state.push_interest(new_interest);
375        } else if new_interest != FidlInterest::default()
376            && remove_interest != FidlInterest::default()
377        {
378            state.erase(&remove_interest);
379            state.push_interest(new_interest);
380        } else {
381            return;
382        }
383
384        for waker in state.drain_wakers() {
385            waker.wake();
386        }
387    }
388
389    /// Resets the `Interest` for this component, notifying all active
390    /// `LogSink/WaitForInterestChange` hanging gets with the lowest interest found in the set of
391    /// requested interests for all control handles.
392    pub fn reset_interest(&self, interest_selectors: &[LogInterestSelector]) {
393        for selector in interest_selectors {
394            if self
395                .identity
396                .moniker
397                .matches_component_selector(&selector.selector)
398                .unwrap_or_default()
399            {
400                let mut state = self.state.lock();
401                state.erase(&selector.interest);
402                for waker in state.drain_wakers() {
403                    waker.wake();
404                }
405                return;
406            }
407        }
408    }
409
410    /// Returns `true` if this container corresponds to a running component, or still has pending
411    /// objects to drain.
412    pub fn is_active(&self) -> bool {
413        let state = self.state.lock();
414        state.is_initializing
415            || state.num_active_legacy_sockets > 0
416            || state.num_active_channels > 0
417            || self.buffer.is_active()
418    }
419
420    /// Called whenever there's a transition that means the component might no longer be active.
421    fn check_inactive(&self) {
422        if !self.is_active() {
423            if let Some(on_inactive) = &self.on_inactive {
424                on_inactive(self);
425            }
426        }
427    }
428
429    /// Stop accepting new messages, ensuring that pending Cursors return Poll::Ready(None) after
430    /// consuming any messages received before this call.
431    pub fn terminate(&self) {
432        self.buffer.terminate();
433    }
434
435    #[cfg(test)]
436    pub fn mark_stopped(&self) {
437        self.state.lock().is_initializing = false;
438        self.check_inactive();
439    }
440}
441
442fn maybe_add_rolled_out_error(rolled_out_messages: &mut u64, mut msg: Data<Logs>) -> Data<Logs> {
443    if *rolled_out_messages != 0 {
444        // Add rolled out metadata
445        msg.metadata
446            .errors
447            .get_or_insert(vec![])
448            .push(LogError::RolledOutLogs { count: *rolled_out_messages });
449    }
450    *rolled_out_messages = 0;
451    msg
452}
453
454impl ContainerState {
455    /// Pushes the given `interest` to the set.
456    fn push_interest(&mut self, interest: FidlInterest) {
457        if interest != FidlInterest::default() {
458            let count = self.interests.entry(interest.into()).or_insert(0);
459            *count += 1;
460        }
461    }
462
463    /// Removes the given `interest` from the set
464    fn erase(&mut self, interest: &FidlInterest) {
465        let interest = interest.clone().into();
466        if let Some(count) = self.interests.get_mut(&interest) {
467            if *count <= 1 {
468                self.interests.remove(&interest);
469            } else {
470                *count -= 1;
471            }
472        }
473    }
474
475    /// Returns a copy of the lowest interest in the set. If the set is empty, an EMPTY interest is
476    /// returned.
477    fn min_interest(&self) -> FidlInterest {
478        // btreemap: keys are sorted and ascending.
479        self.interests.keys().next().map(|i| i.0.clone()).unwrap_or_default()
480    }
481}
482
483#[derive(Debug, PartialEq)]
484struct Interest(FidlInterest);
485
486impl From<FidlInterest> for Interest {
487    fn from(interest: FidlInterest) -> Interest {
488        Interest(interest)
489    }
490}
491
492impl From<FidlSeverity> for Interest {
493    fn from(severity: FidlSeverity) -> Interest {
494        Interest(FidlInterest { min_severity: Some(severity), ..Default::default() })
495    }
496}
497
498impl std::ops::Deref for Interest {
499    type Target = FidlInterest;
500    fn deref(&self) -> &Self::Target {
501        &self.0
502    }
503}
504
505impl Eq for Interest {}
506
507impl Ord for Interest {
508    fn cmp(&self, other: &Self) -> Ordering {
509        self.min_severity.cmp(&other.min_severity)
510    }
511}
512
513impl PartialOrd for Interest {
514    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
515        Some(self.cmp(other))
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522    use crate::logs::shared_buffer::SharedBuffer;
523    use fidl_fuchsia_diagnostics::{ComponentSelector, StringSelector};
524    use fidl_fuchsia_diagnostics_types::Severity;
525    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
526    use fuchsia_async::{Task, TestExecutor};
527    use fuchsia_inspect as inspect;
528    use fuchsia_inspect_derive::WithInspect;
529    use moniker::ExtendedMoniker;
530
531    fn initialize_container(
532        severity: Option<Severity>,
533        scope: fasync::ScopeHandle,
534    ) -> (Arc<LogsArtifactsContainer>, LogSinkProxy) {
535        let identity = Arc::new(ComponentIdentity::new(
536            ExtendedMoniker::parse_str("/foo/bar").unwrap(),
537            "fuchsia-pkg://test",
538        ));
539        let stats = Arc::new(
540            LogStreamStats::default()
541                .with_inspect(inspect::component::inspector().root(), identity.moniker.to_string())
542                .expect("failed to attach component log stats"),
543        );
544        let buffer = SharedBuffer::new(1024 * 1024, Box::new(|_| {}));
545        let container = Arc::new(LogsArtifactsContainer::new(
546            identity,
547            std::iter::empty(),
548            severity,
549            Arc::clone(&stats),
550            buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats),
551            None,
552        ));
553        // Connect out LogSink under test and take its events channel.
554        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
555        container.handle_log_sink(stream, scope);
556        (container, proxy)
557    }
558
559    #[fuchsia::test(allow_stalls = false)]
560    async fn update_interest() {
561        // Sync path test (initial interest)
562        let scope = fasync::Scope::new();
563        let (container, log_sink) = initialize_container(None, scope.to_handle());
564
565        // Get initial interest
566        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
567
568        // Async (blocking) path test.
569        assert_eq!(initial_interest.min_severity, None);
570        let log_sink_clone = log_sink.clone();
571        let mut interest_future =
572            Task::spawn(async move { log_sink_clone.wait_for_interest_change().await });
573
574        // The call should be blocked.
575        assert!(TestExecutor::poll_until_stalled(&mut interest_future).await.is_pending());
576
577        // We should see this interest update. This should unblock the hanging get.
578        container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
579
580        // Verify we see the last interest we set.
581        assert_eq!(interest_future.await.unwrap().unwrap().min_severity, Some(Severity::Info));
582    }
583
584    #[fuchsia::test]
585    async fn initial_interest() {
586        let scope = fasync::Scope::new();
587        let (_container, log_sink) = initialize_container(Some(Severity::Info), scope.to_handle());
588        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
589        assert_eq!(initial_interest.min_severity, Some(Severity::Info));
590    }
591
592    #[fuchsia::test]
593    async fn interest_serverity_semantics() {
594        let scope = fasync::Scope::new();
595        let (container, log_sink) = initialize_container(None, scope.to_handle());
596        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
597        assert_eq!(initial_interest.min_severity, None);
598        // Set some interest.
599        container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
600        assert_severity(&log_sink, Severity::Info).await;
601        assert_interests(&container, [(Severity::Info, 1)]);
602
603        // Sending a higher interest (WARN > INFO) has no visible effect, even if the new interest
604        // (WARN) will be tracked internally until reset.
605        container.update_interest([interest(&["foo", "bar"], Some(Severity::Warn))].iter(), &[]);
606        assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
607
608        // Sending a lower interest (DEBUG < INFO) updates the previous one.
609        container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
610        assert_severity(&log_sink, Severity::Debug).await;
611        assert_interests(
612            &container,
613            [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
614        );
615
616        // Sending the same interest leads to tracking it twice, but no updates are sent since it's
617        // the same minimum interest.
618        container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
619        assert_interests(
620            &container,
621            [(Severity::Debug, 2), (Severity::Info, 1), (Severity::Warn, 1)],
622        );
623
624        // The first reset does nothing, since the new minimum interest remains the same (we had
625        // inserted twice, therefore we need to reset twice).
626        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
627        assert_interests(
628            &container,
629            [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
630        );
631
632        // The second reset causes a change in minimum interest -> now INFO.
633        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
634        assert_severity(&log_sink, Severity::Info).await;
635        assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
636
637        // If we pass a previous severity (INFO), then we undo it and set the new one (ERROR).
638        // However, we get WARN since that's the minimum severity in the set.
639        container.update_interest(
640            [interest(&["foo", "bar"], Some(Severity::Error))].iter(),
641            &[interest(&["foo", "bar"], Some(Severity::Info))],
642        );
643        assert_severity(&log_sink, Severity::Warn).await;
644        assert_interests(&container, [(Severity::Error, 1), (Severity::Warn, 1)]);
645
646        // When we reset warn, now we get ERROR since that's the minimum severity in the set.
647        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Warn))]);
648        assert_severity(&log_sink, Severity::Error).await;
649        assert_interests(&container, [(Severity::Error, 1)]);
650
651        // When we reset ERROR , we get back to EMPTY since we have removed all interests from the
652        // set.
653        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Error))]);
654        assert_eq!(
655            log_sink.wait_for_interest_change().await.unwrap().unwrap(),
656            FidlInterest::default()
657        );
658
659        assert_interests(&container, []);
660    }
661
662    fn interest(moniker: &[&str], min_severity: Option<Severity>) -> LogInterestSelector {
663        LogInterestSelector {
664            selector: ComponentSelector {
665                moniker_segments: Some(
666                    moniker.iter().map(|s| StringSelector::ExactMatch(s.to_string())).collect(),
667                ),
668                ..Default::default()
669            },
670            interest: FidlInterest { min_severity, ..Default::default() },
671        }
672    }
673
674    async fn assert_severity(proxy: &LogSinkProxy, severity: Severity) {
675        assert_eq!(
676            proxy.wait_for_interest_change().await.unwrap().unwrap().min_severity.unwrap(),
677            severity
678        );
679    }
680
681    fn assert_interests<const N: usize>(
682        container: &LogsArtifactsContainer,
683        severities: [(Severity, usize); N],
684    ) {
685        let mut expected_map = BTreeMap::new();
686        expected_map.extend(IntoIterator::into_iter(severities).map(|(s, c)| {
687            let interest = FidlInterest { min_severity: Some(s), ..Default::default() };
688            (interest.into(), c)
689        }));
690        assert_eq!(expected_map, container.state.lock().interests);
691    }
692}