archivist_lib/logs/
container.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::diagnostics::TRACE_CATEGORY;
6use crate::identity::ComponentIdentity;
7use crate::logs::multiplex::PinStream;
8use crate::logs::shared_buffer::{self, ContainerBuffer, LazyItem};
9use crate::logs::stats::LogStreamStats;
10use crate::logs::stored_message::StoredMessage;
11use derivative::Derivative;
12use diagnostics_data::{BuilderArgs, Data, LogError, Logs, LogsData, LogsDataBuilder};
13use fidl::endpoints::RequestStream;
14use fidl_fuchsia_diagnostics::{LogInterestSelector, StreamMode};
15use fidl_fuchsia_diagnostics_types::{Interest as FidlInterest, Severity as FidlSeverity};
16use fidl_fuchsia_logger::{LogSinkOnInitRequest, LogSinkRequest, LogSinkRequestStream};
17use fuchsia_async::condition::Condition;
18use futures::future::{Fuse, FusedFuture};
19use futures::prelude::*;
20use futures::select;
21use futures::stream::StreamExt;
22use log::{debug, error};
23use selectors::SelectorExt;
24use std::cmp::Ordering;
25use std::collections::BTreeMap;
26use std::pin::pin;
27use std::sync::Arc;
28use std::task::Poll;
29use {fuchsia_async as fasync, fuchsia_trace as ftrace};
30
31pub type OnInactive = Box<dyn Fn(&LogsArtifactsContainer) + Send + Sync>;
32
33#[derive(Derivative)]
34#[derivative(Debug)]
35pub struct LogsArtifactsContainer {
36    /// The source of logs in this container.
37    pub identity: Arc<ComponentIdentity>,
38
39    /// Inspect instrumentation.
40    pub stats: Arc<LogStreamStats>,
41
42    /// Buffer for all log messages.
43    #[derivative(Debug = "ignore")]
44    buffer: ContainerBuffer,
45
46    /// Mutable state for the container.
47    #[derivative(Debug = "ignore")]
48    state: Arc<Condition<ContainerState>>,
49
50    /// A callback which is called when the container is inactive i.e. has no channels, sockets or
51    /// stored logs.
52    #[derivative(Debug = "ignore")]
53    on_inactive: Option<OnInactive>,
54}
55
56#[derive(Debug)]
57struct ContainerState {
58    /// Number of LogSink channels currently being listened to for this component.
59    num_active_channels: u64,
60
61    /// Current interest for this component.
62    interests: BTreeMap<Interest, usize>,
63
64    is_initializing: bool,
65}
66
67#[derive(Debug, PartialEq)]
68pub struct CursorItem {
69    pub rolled_out: u64,
70    pub message: Arc<StoredMessage>,
71    pub identity: Arc<ComponentIdentity>,
72}
73
74impl Eq for CursorItem {}
75
76impl Ord for CursorItem {
77    fn cmp(&self, other: &Self) -> Ordering {
78        self.message.timestamp().cmp(&other.message.timestamp())
79    }
80}
81
82impl PartialOrd for CursorItem {
83    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84        Some(self.cmp(other))
85    }
86}
87
88impl LogsArtifactsContainer {
89    pub fn new<'a>(
90        identity: Arc<ComponentIdentity>,
91        interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
92        initial_interest: Option<FidlSeverity>,
93        stats: Arc<LogStreamStats>,
94        buffer: ContainerBuffer,
95        on_inactive: Option<OnInactive>,
96    ) -> Self {
97        let mut interests = BTreeMap::new();
98        if let Some(severity) = initial_interest {
99            interests.insert(Interest::from(severity), 1);
100        }
101        let new = Self {
102            identity,
103            buffer,
104            state: Arc::new(Condition::new(ContainerState {
105                num_active_channels: 0,
106                interests,
107                is_initializing: true,
108            })),
109            stats,
110            on_inactive,
111        };
112
113        // there are no control handles so this won't notify anyone
114        new.update_interest(interest_selectors, &[]);
115
116        new
117    }
118
119    fn create_raw_cursor(
120        &self,
121        buffer_cursor: shared_buffer::Cursor,
122    ) -> impl Stream<Item = CursorItem> {
123        let identity = Arc::clone(&self.identity);
124        buffer_cursor
125            .enumerate()
126            .scan((zx::BootInstant::ZERO, 0u64), move |(last_timestamp, rolled_out), (i, item)| {
127                futures::future::ready(match item {
128                    LazyItem::Next(message) => {
129                        *last_timestamp = message.timestamp();
130                        Some(Some(CursorItem {
131                            message,
132                            identity: Arc::clone(&identity),
133                            rolled_out: *rolled_out,
134                        }))
135                    }
136                    LazyItem::ItemsRolledOut(rolled_out_count, timestamp) => {
137                        if i > 0 {
138                            *rolled_out += rolled_out_count;
139                        }
140                        *last_timestamp = timestamp;
141                        Some(None)
142                    }
143                })
144            })
145            .filter_map(future::ready)
146    }
147
148    /// Returns a stream of this component's log messages. These are the raw messages in FXT format.
149    ///
150    /// # Rolled out logs
151    ///
152    /// When messages are evicted from our internal buffers before a client can read them, they
153    /// are counted as rolled out messages which gets appended to the metadata of the next message.
154    /// If there is no next message, there is no way to know how many messages were rolled out.
155    pub fn cursor_raw(&self, mode: StreamMode) -> PinStream<CursorItem> {
156        let Some(buffer_cursor) = self.buffer.cursor(mode) else {
157            return Box::pin(futures::stream::empty());
158        };
159        Box::pin(self.create_raw_cursor(buffer_cursor))
160    }
161
162    /// Returns a stream of this component's log messages.
163    ///
164    /// # Rolled out logs
165    ///
166    /// When messages are evicted from our internal buffers before a client can read them, they
167    /// are counted as rolled out messages which gets appended to the metadata of the next message.
168    /// If there is no next message, there is no way to know how many messages were rolled out.
169    pub fn cursor(
170        &self,
171        mode: StreamMode,
172        parent_trace_id: ftrace::Id,
173    ) -> PinStream<Arc<LogsData>> {
174        let Some(buffer_cursor) = self.buffer.cursor(mode) else {
175            return Box::pin(futures::stream::empty());
176        };
177        let mut rolled_out_count = 0;
178        Box::pin(self.create_raw_cursor(buffer_cursor).map(
179            move |CursorItem { message, identity, rolled_out }| {
180                rolled_out_count += rolled_out;
181                let trace_id = ftrace::Id::random();
182                let _trace_guard = ftrace::async_enter!(
183                    trace_id,
184                    TRACE_CATEGORY,
185                    c"LogContainer::cursor.parse_message",
186                    // An async duration cannot have multiple concurrent child async durations
187                    // so we include the nonce as metadata to manually determine relationship.
188                    "parent_trace_id" => u64::from(parent_trace_id),
189                    "trace_id" => u64::from(trace_id)
190                );
191                match message.parse(&identity) {
192                    Ok(m) => Arc::new(maybe_add_rolled_out_error(&mut rolled_out_count, m)),
193                    Err(err) => {
194                        let data = maybe_add_rolled_out_error(
195                            &mut rolled_out_count,
196                            LogsDataBuilder::new(BuilderArgs {
197                                moniker: identity.moniker.clone(),
198                                timestamp: message.timestamp(),
199                                component_url: Some(identity.url.clone()),
200                                severity: diagnostics_data::Severity::Warn,
201                            })
202                            .add_error(diagnostics_data::LogError::FailedToParseRecord(format!(
203                                "{err:?}"
204                            )))
205                            .build(),
206                        );
207                        Arc::new(data)
208                    }
209                }
210            },
211        ))
212    }
213
214    /// Handle `LogSink` protocol on `stream`. Each socket received from the `LogSink` client is
215    /// drained by a `Task` which is sent on `sender`. The `Task`s do not complete until their
216    /// sockets have been closed.
217    pub fn handle_log_sink(
218        self: &Arc<Self>,
219        stream: LogSinkRequestStream,
220        scope: fasync::ScopeHandle,
221    ) {
222        if stream
223            .control_handle()
224            .send_on_init(LogSinkOnInitRequest {
225                buffer: Some(self.buffer.iob()),
226                interest: Some(self.state.lock().min_interest()),
227                ..Default::default()
228            })
229            .is_err()
230        {
231            return;
232        }
233
234        {
235            let mut guard = self.state.lock();
236            guard.num_active_channels += 1;
237            guard.is_initializing = false;
238        }
239        scope.spawn(Arc::clone(self).actually_handle_log_sink(stream));
240    }
241
242    /// This function does not return until the channel is closed.
243    async fn actually_handle_log_sink(self: Arc<Self>, mut stream: LogSinkRequestStream) {
244        let mut previous_interest_sent = None;
245        debug!(identity:% = self.identity; "Draining LogSink channel.");
246
247        let mut hanging_gets = Vec::new();
248        let mut interest_changed = pin!(Fuse::terminated());
249
250        loop {
251            select! {
252                next = stream.next() => {
253                    let Some(next) = next else { break };
254                    match next {
255                        Ok(LogSinkRequest::ConnectStructured { socket, .. }) => {
256                            self.buffer.add_socket(socket);
257                        }
258                        Ok(LogSinkRequest::WaitForInterestChange { responder }) => {
259                            // If the interest has changed since we last reported it, we'll report
260                            // it below.
261                            hanging_gets.push(responder);
262                        }
263                        Err(e) => error!(identity:% = self.identity, e:%; "error handling log sink"),
264                        Ok(LogSinkRequest::_UnknownMethod { .. }) => {}
265                    }
266                }
267                _ = interest_changed => {}
268            }
269
270            if !hanging_gets.is_empty() {
271                let min_interest = self.state.lock().min_interest();
272                if Some(&min_interest) != previous_interest_sent.as_ref() {
273                    // Send updates to all outstanding hanging gets.
274                    for responder in hanging_gets.drain(..) {
275                        let _ = responder.send(Ok(&min_interest));
276                    }
277                    interest_changed.set(Fuse::terminated());
278                    previous_interest_sent = Some(min_interest);
279                } else if interest_changed.is_terminated() {
280                    // Set ourselves up to be woken when the interest changes.
281                    let previous_interest_sent = previous_interest_sent.clone();
282                    interest_changed.set(
283                        self.state
284                            .when(move |state| {
285                                if previous_interest_sent != Some(state.min_interest()) {
286                                    Poll::Ready(())
287                                } else {
288                                    Poll::Pending
289                                }
290                            })
291                            .fuse(),
292                    );
293                }
294            }
295        }
296
297        debug!(identity:% = self.identity; "LogSink channel closed.");
298        self.state.lock().num_active_channels -= 1;
299        self.check_inactive();
300    }
301
302    /// Updates log stats in inspect and push the message onto the container's buffer.
303    pub fn ingest_message(&self, message: StoredMessage) {
304        self.buffer.push_back(message.bytes());
305    }
306
307    /// Set the `Interest` for this component, notifying all active `LogSink/WaitForInterestChange`
308    /// hanging gets with the new interset if it is a change from the previous interest.
309    /// For any match that is also contained in `previous_selectors`, the previous values will be
310    /// removed from the set of interests.
311    pub fn update_interest<'a>(
312        &self,
313        interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
314        previous_selectors: &[LogInterestSelector],
315    ) {
316        let mut new_interest = FidlInterest::default();
317        let mut remove_interest = FidlInterest::default();
318        for selector in interest_selectors {
319            if self
320                .identity
321                .moniker
322                .matches_component_selector(&selector.selector)
323                .unwrap_or_default()
324            {
325                new_interest = selector.interest.clone();
326                // If there are more matches, ignore them, we'll pick the first match.
327                break;
328            }
329        }
330
331        if let Some(previous_selector) = previous_selectors.iter().find(|s| {
332            self.identity.moniker.matches_component_selector(&s.selector).unwrap_or_default()
333        }) {
334            remove_interest = previous_selector.interest.clone();
335        }
336
337        let mut state = self.state.lock();
338        // Unfortunately we cannot use a match statement since `FidlInterest` doesn't derive Eq.
339        // It does derive PartialEq though. All these branches will send an interest update if the
340        // minimum interest changes after performing the required actions.
341        if new_interest == FidlInterest::default() && remove_interest != FidlInterest::default() {
342            state.erase(&remove_interest);
343        } else if new_interest != FidlInterest::default()
344            && remove_interest == FidlInterest::default()
345        {
346            state.push_interest(new_interest);
347        } else if new_interest != FidlInterest::default()
348            && remove_interest != FidlInterest::default()
349        {
350            state.erase(&remove_interest);
351            state.push_interest(new_interest);
352        } else {
353            return;
354        }
355
356        for waker in state.drain_wakers() {
357            waker.wake();
358        }
359    }
360
361    /// Resets the `Interest` for this component, notifying all active
362    /// `LogSink/WaitForInterestChange` hanging gets with the lowest interest found in the set of
363    /// requested interests for all control handles.
364    pub fn reset_interest(&self, interest_selectors: &[LogInterestSelector]) {
365        for selector in interest_selectors {
366            if self
367                .identity
368                .moniker
369                .matches_component_selector(&selector.selector)
370                .unwrap_or_default()
371            {
372                let mut state = self.state.lock();
373                state.erase(&selector.interest);
374                for waker in state.drain_wakers() {
375                    waker.wake();
376                }
377                return;
378            }
379        }
380    }
381
382    /// Returns `true` if this container corresponds to a running component, or still has pending
383    /// objects to drain.
384    pub fn is_active(&self) -> bool {
385        let state = self.state.lock();
386        state.is_initializing || state.num_active_channels > 0 || self.buffer.is_active()
387    }
388
389    /// Called whenever there's a transition that means the component might no longer be active.
390    fn check_inactive(&self) {
391        if !self.is_active() {
392            if let Some(on_inactive) = &self.on_inactive {
393                on_inactive(self);
394            }
395        }
396    }
397
398    /// Stop accepting new messages, ensuring that pending Cursors return Poll::Ready(None) after
399    /// consuming any messages received before this call.
400    pub fn terminate(&self) {
401        self.buffer.terminate();
402    }
403
404    #[cfg(test)]
405    pub fn mark_stopped(&self) {
406        self.state.lock().is_initializing = false;
407        self.check_inactive();
408    }
409
410    pub fn buffer(&self) -> &ContainerBuffer {
411        &self.buffer
412    }
413}
414
415fn maybe_add_rolled_out_error(rolled_out_messages: &mut u64, mut msg: Data<Logs>) -> Data<Logs> {
416    if *rolled_out_messages != 0 {
417        // Add rolled out metadata
418        msg.metadata
419            .errors
420            .get_or_insert(vec![])
421            .push(LogError::RolledOutLogs { count: *rolled_out_messages });
422    }
423    *rolled_out_messages = 0;
424    msg
425}
426
427impl ContainerState {
428    /// Pushes the given `interest` to the set.
429    fn push_interest(&mut self, interest: FidlInterest) {
430        if interest != FidlInterest::default() {
431            let count = self.interests.entry(interest.into()).or_insert(0);
432            *count += 1;
433        }
434    }
435
436    /// Removes the given `interest` from the set
437    fn erase(&mut self, interest: &FidlInterest) {
438        let interest = interest.clone().into();
439        if let Some(count) = self.interests.get_mut(&interest) {
440            if *count <= 1 {
441                self.interests.remove(&interest);
442            } else {
443                *count -= 1;
444            }
445        }
446    }
447
448    /// Returns a copy of the lowest interest in the set. If the set is empty, an EMPTY interest is
449    /// returned.
450    fn min_interest(&self) -> FidlInterest {
451        // btreemap: keys are sorted and ascending.
452        self.interests.keys().next().map(|i| i.0.clone()).unwrap_or_default()
453    }
454}
455
456#[derive(Debug, PartialEq)]
457struct Interest(FidlInterest);
458
459impl From<FidlInterest> for Interest {
460    fn from(interest: FidlInterest) -> Interest {
461        Interest(interest)
462    }
463}
464
465impl From<FidlSeverity> for Interest {
466    fn from(severity: FidlSeverity) -> Interest {
467        Interest(FidlInterest { min_severity: Some(severity), ..Default::default() })
468    }
469}
470
471impl std::ops::Deref for Interest {
472    type Target = FidlInterest;
473    fn deref(&self) -> &Self::Target {
474        &self.0
475    }
476}
477
478impl Eq for Interest {}
479
480impl Ord for Interest {
481    fn cmp(&self, other: &Self) -> Ordering {
482        self.min_severity.cmp(&other.min_severity)
483    }
484}
485
486impl PartialOrd for Interest {
487    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
488        Some(self.cmp(other))
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use crate::logs::shared_buffer::{SharedBuffer, create_ring_buffer};
496    use fidl_fuchsia_diagnostics::{ComponentSelector, StringSelector};
497    use fidl_fuchsia_diagnostics_types::Severity;
498    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
499    use fuchsia_async::{Task, TestExecutor};
500    use fuchsia_inspect as inspect;
501    use fuchsia_inspect_derive::WithInspect;
502    use moniker::ExtendedMoniker;
503
504    fn initialize_container(
505        severity: Option<Severity>,
506        scope: fasync::ScopeHandle,
507    ) -> (Arc<LogsArtifactsContainer>, LogSinkProxy) {
508        let identity = Arc::new(ComponentIdentity::new(
509            ExtendedMoniker::parse_str("/foo/bar").unwrap(),
510            "fuchsia-pkg://test",
511        ));
512        let stats = Arc::new(
513            LogStreamStats::default()
514                .with_inspect(inspect::component::inspector().root(), identity.moniker.to_string())
515                .expect("failed to attach component log stats"),
516        );
517        let buffer = SharedBuffer::new(
518            create_ring_buffer(1024 * 1024),
519            Box::new(|_| {}),
520            Default::default(),
521        );
522        let container = Arc::new(LogsArtifactsContainer::new(
523            identity,
524            std::iter::empty(),
525            severity,
526            Arc::clone(&stats),
527            buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats),
528            None,
529        ));
530        // Connect out LogSink under test and take its events channel.
531        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
532        container.handle_log_sink(stream, scope);
533        (container, proxy)
534    }
535
536    #[fuchsia::test(allow_stalls = false)]
537    async fn update_interest() {
538        // Sync path test (initial interest)
539        let scope = fasync::Scope::new();
540        let (container, log_sink) = initialize_container(None, scope.to_handle());
541
542        // Get initial interest
543        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
544
545        // Async (blocking) path test.
546        assert_eq!(initial_interest.min_severity, None);
547        let log_sink_clone = log_sink.clone();
548        let mut interest_future =
549            Task::spawn(async move { log_sink_clone.wait_for_interest_change().await });
550
551        // The call should be blocked.
552        assert!(TestExecutor::poll_until_stalled(&mut interest_future).await.is_pending());
553
554        // We should see this interest update. This should unblock the hanging get.
555        container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
556
557        // Verify we see the last interest we set.
558        assert_eq!(interest_future.await.unwrap().unwrap().min_severity, Some(Severity::Info));
559    }
560
561    #[fuchsia::test]
562    async fn initial_interest() {
563        let scope = fasync::Scope::new();
564        let (_container, log_sink) = initialize_container(Some(Severity::Info), scope.to_handle());
565        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
566        assert_eq!(initial_interest.min_severity, Some(Severity::Info));
567    }
568
569    #[fuchsia::test]
570    async fn interest_serverity_semantics() {
571        let scope = fasync::Scope::new();
572        let (container, log_sink) = initialize_container(None, scope.to_handle());
573        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
574        assert_eq!(initial_interest.min_severity, None);
575        // Set some interest.
576        container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
577        assert_severity(&log_sink, Severity::Info).await;
578        assert_interests(&container, [(Severity::Info, 1)]);
579
580        // Sending a higher interest (WARN > INFO) has no visible effect, even if the new interest
581        // (WARN) will be tracked internally until reset.
582        container.update_interest([interest(&["foo", "bar"], Some(Severity::Warn))].iter(), &[]);
583        assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
584
585        // Sending a lower interest (DEBUG < INFO) updates the previous one.
586        container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
587        assert_severity(&log_sink, Severity::Debug).await;
588        assert_interests(
589            &container,
590            [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
591        );
592
593        // Sending the same interest leads to tracking it twice, but no updates are sent since it's
594        // the same minimum interest.
595        container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
596        assert_interests(
597            &container,
598            [(Severity::Debug, 2), (Severity::Info, 1), (Severity::Warn, 1)],
599        );
600
601        // The first reset does nothing, since the new minimum interest remains the same (we had
602        // inserted twice, therefore we need to reset twice).
603        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
604        assert_interests(
605            &container,
606            [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
607        );
608
609        // The second reset causes a change in minimum interest -> now INFO.
610        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
611        assert_severity(&log_sink, Severity::Info).await;
612        assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
613
614        // If we pass a previous severity (INFO), then we undo it and set the new one (ERROR).
615        // However, we get WARN since that's the minimum severity in the set.
616        container.update_interest(
617            [interest(&["foo", "bar"], Some(Severity::Error))].iter(),
618            &[interest(&["foo", "bar"], Some(Severity::Info))],
619        );
620        assert_severity(&log_sink, Severity::Warn).await;
621        assert_interests(&container, [(Severity::Error, 1), (Severity::Warn, 1)]);
622
623        // When we reset warn, now we get ERROR since that's the minimum severity in the set.
624        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Warn))]);
625        assert_severity(&log_sink, Severity::Error).await;
626        assert_interests(&container, [(Severity::Error, 1)]);
627
628        // When we reset ERROR , we get back to EMPTY since we have removed all interests from the
629        // set.
630        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Error))]);
631        assert_eq!(
632            log_sink.wait_for_interest_change().await.unwrap().unwrap(),
633            FidlInterest::default()
634        );
635
636        assert_interests(&container, []);
637    }
638
639    fn interest(moniker: &[&str], min_severity: Option<Severity>) -> LogInterestSelector {
640        LogInterestSelector {
641            selector: ComponentSelector {
642                moniker_segments: Some(
643                    moniker.iter().map(|s| StringSelector::ExactMatch(s.to_string())).collect(),
644                ),
645                ..Default::default()
646            },
647            interest: FidlInterest { min_severity, ..Default::default() },
648        }
649    }
650
651    async fn assert_severity(proxy: &LogSinkProxy, severity: Severity) {
652        assert_eq!(
653            proxy.wait_for_interest_change().await.unwrap().unwrap().min_severity.unwrap(),
654            severity
655        );
656    }
657
658    fn assert_interests<const N: usize>(
659        container: &LogsArtifactsContainer,
660        severities: [(Severity, usize); N],
661    ) {
662        let mut expected_map = BTreeMap::new();
663        expected_map.extend(IntoIterator::into_iter(severities).map(|(s, c)| {
664            let interest = FidlInterest { min_severity: Some(s), ..Default::default() };
665            (interest.into(), c)
666        }));
667        assert_eq!(expected_map, container.state.lock().interests);
668    }
669}