archivist_lib/logs/
container.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::diagnostics::TRACE_CATEGORY;
6use crate::identity::ComponentIdentity;
7use crate::logs::multiplex::PinStream;
8use crate::logs::shared_buffer::{self, ContainerBuffer, LazyItem};
9use crate::logs::stats::LogStreamStats;
10use crate::logs::stored_message::StoredMessage;
11use derivative::Derivative;
12use diagnostics_data::{BuilderArgs, Data, LogError, Logs, LogsData, LogsDataBuilder};
13use fidl_fuchsia_diagnostics::{LogInterestSelector, StreamMode};
14use fidl_fuchsia_diagnostics_types::{Interest as FidlInterest, Severity as FidlSeverity};
15use fidl_fuchsia_logger::{LogSinkRequest, LogSinkRequestStream};
16use fuchsia_async::condition::Condition;
17use futures::future::{Fuse, FusedFuture};
18use futures::prelude::*;
19use futures::select;
20use futures::stream::StreamExt;
21use log::{debug, error};
22use selectors::SelectorExt;
23use std::cmp::Ordering;
24use std::collections::BTreeMap;
25use std::pin::pin;
26use std::sync::Arc;
27use std::task::Poll;
28use {fuchsia_async as fasync, fuchsia_trace as ftrace};
29
30pub type OnInactive = Box<dyn Fn(&LogsArtifactsContainer) + Send + Sync>;
31
32#[derive(Derivative)]
33#[derivative(Debug)]
34pub struct LogsArtifactsContainer {
35    /// The source of logs in this container.
36    pub identity: Arc<ComponentIdentity>,
37
38    /// Inspect instrumentation.
39    pub stats: Arc<LogStreamStats>,
40
41    /// Buffer for all log messages.
42    #[derivative(Debug = "ignore")]
43    buffer: ContainerBuffer,
44
45    /// Mutable state for the container.
46    #[derivative(Debug = "ignore")]
47    state: Arc<Condition<ContainerState>>,
48
49    /// A callback which is called when the container is inactive i.e. has no channels, sockets or
50    /// stored logs.
51    #[derivative(Debug = "ignore")]
52    on_inactive: Option<OnInactive>,
53}
54
55#[derive(Debug)]
56struct ContainerState {
57    /// Number of LogSink channels currently being listened to for this component.
58    num_active_channels: u64,
59
60    /// Current interest for this component.
61    interests: BTreeMap<Interest, usize>,
62
63    is_initializing: bool,
64}
65
66#[derive(Debug, PartialEq)]
67pub struct CursorItem {
68    pub rolled_out: u64,
69    pub message: Arc<StoredMessage>,
70    pub identity: Arc<ComponentIdentity>,
71}
72
73impl Eq for CursorItem {}
74
75impl Ord for CursorItem {
76    fn cmp(&self, other: &Self) -> Ordering {
77        self.message.timestamp().cmp(&other.message.timestamp())
78    }
79}
80
81impl PartialOrd for CursorItem {
82    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
83        Some(self.cmp(other))
84    }
85}
86
87impl LogsArtifactsContainer {
88    pub fn new<'a>(
89        identity: Arc<ComponentIdentity>,
90        interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
91        initial_interest: Option<FidlSeverity>,
92        stats: Arc<LogStreamStats>,
93        buffer: ContainerBuffer,
94        on_inactive: Option<OnInactive>,
95    ) -> Self {
96        let mut interests = BTreeMap::new();
97        if let Some(severity) = initial_interest {
98            interests.insert(Interest::from(severity), 1);
99        }
100        let new = Self {
101            identity,
102            buffer,
103            state: Arc::new(Condition::new(ContainerState {
104                num_active_channels: 0,
105                interests,
106                is_initializing: true,
107            })),
108            stats,
109            on_inactive,
110        };
111
112        // there are no control handles so this won't notify anyone
113        new.update_interest(interest_selectors, &[]);
114
115        new
116    }
117
118    fn create_raw_cursor(
119        &self,
120        buffer_cursor: shared_buffer::Cursor,
121    ) -> impl Stream<Item = CursorItem> {
122        let identity = Arc::clone(&self.identity);
123        buffer_cursor
124            .enumerate()
125            .scan((zx::BootInstant::ZERO, 0u64), move |(last_timestamp, rolled_out), (i, item)| {
126                futures::future::ready(match item {
127                    LazyItem::Next(message) => {
128                        *last_timestamp = message.timestamp();
129                        Some(Some(CursorItem {
130                            message,
131                            identity: Arc::clone(&identity),
132                            rolled_out: *rolled_out,
133                        }))
134                    }
135                    LazyItem::ItemsRolledOut(rolled_out_count, timestamp) => {
136                        if i > 0 {
137                            *rolled_out += rolled_out_count;
138                        }
139                        *last_timestamp = timestamp;
140                        Some(None)
141                    }
142                })
143            })
144            .filter_map(future::ready)
145    }
146
147    /// Returns a stream of this component's log messages. These are the raw messages in FXT format.
148    ///
149    /// # Rolled out logs
150    ///
151    /// When messages are evicted from our internal buffers before a client can read them, they
152    /// are counted as rolled out messages which gets appended to the metadata of the next message.
153    /// If there is no next message, there is no way to know how many messages were rolled out.
154    pub fn cursor_raw(&self, mode: StreamMode) -> PinStream<CursorItem> {
155        let Some(buffer_cursor) = self.buffer.cursor(mode) else {
156            return Box::pin(futures::stream::empty());
157        };
158        Box::pin(self.create_raw_cursor(buffer_cursor))
159    }
160
161    /// Returns a stream of this component's log messages.
162    ///
163    /// # Rolled out logs
164    ///
165    /// When messages are evicted from our internal buffers before a client can read them, they
166    /// are counted as rolled out messages which gets appended to the metadata of the next message.
167    /// If there is no next message, there is no way to know how many messages were rolled out.
168    pub fn cursor(
169        &self,
170        mode: StreamMode,
171        parent_trace_id: ftrace::Id,
172    ) -> PinStream<Arc<LogsData>> {
173        let Some(buffer_cursor) = self.buffer.cursor(mode) else {
174            return Box::pin(futures::stream::empty());
175        };
176        let mut rolled_out_count = 0;
177        Box::pin(self.create_raw_cursor(buffer_cursor).map(
178            move |CursorItem { message, identity, rolled_out }| {
179                rolled_out_count += rolled_out;
180                let trace_id = ftrace::Id::random();
181                let _trace_guard = ftrace::async_enter!(
182                    trace_id,
183                    TRACE_CATEGORY,
184                    c"LogContainer::cursor.parse_message",
185                    // An async duration cannot have multiple concurrent child async durations
186                    // so we include the nonce as metadata to manually determine relationship.
187                    "parent_trace_id" => u64::from(parent_trace_id),
188                    "trace_id" => u64::from(trace_id)
189                );
190                match message.parse(&identity) {
191                    Ok(m) => Arc::new(maybe_add_rolled_out_error(&mut rolled_out_count, m)),
192                    Err(err) => {
193                        let data = maybe_add_rolled_out_error(
194                            &mut rolled_out_count,
195                            LogsDataBuilder::new(BuilderArgs {
196                                moniker: identity.moniker.clone(),
197                                timestamp: message.timestamp(),
198                                component_url: Some(identity.url.clone()),
199                                severity: diagnostics_data::Severity::Warn,
200                            })
201                            .add_error(diagnostics_data::LogError::FailedToParseRecord(format!(
202                                "{err:?}"
203                            )))
204                            .build(),
205                        );
206                        Arc::new(data)
207                    }
208                }
209            },
210        ))
211    }
212
213    /// Handle `LogSink` protocol on `stream`. Each socket received from the `LogSink` client is
214    /// drained by a `Task` which is sent on `sender`. The `Task`s do not complete until their
215    /// sockets have been closed.
216    pub fn handle_log_sink(
217        self: &Arc<Self>,
218        stream: LogSinkRequestStream,
219        scope: fasync::ScopeHandle,
220    ) {
221        {
222            let mut guard = self.state.lock();
223            guard.num_active_channels += 1;
224            guard.is_initializing = false;
225        }
226        scope.spawn(Arc::clone(self).actually_handle_log_sink(stream));
227    }
228
229    /// This function does not return until the channel is closed.
230    async fn actually_handle_log_sink(self: Arc<Self>, mut stream: LogSinkRequestStream) {
231        let mut previous_interest_sent = None;
232        debug!(identity:% = self.identity; "Draining LogSink channel.");
233
234        let mut hanging_gets = Vec::new();
235        let mut interest_changed = pin!(Fuse::terminated());
236
237        loop {
238            select! {
239                next = stream.next() => {
240                    let Some(next) = next else { break };
241                    match next {
242                        Ok(LogSinkRequest::Connect { .. }) => {
243                            error!("Received unexpected Connect message from {:?}", self.identity);
244                            return;
245                        }
246                        Ok(LogSinkRequest::ConnectStructured { socket, .. }) => {
247                            self.buffer.add_socket(socket);
248                        }
249                        Ok(LogSinkRequest::WaitForInterestChange { responder }) => {
250                            // If the interest has changed since we last reported it, we'll report
251                            // it below.
252                            hanging_gets.push(responder);
253                        }
254                        Err(e) => error!(identity:% = self.identity, e:%; "error handling log sink"),
255                        Ok(LogSinkRequest::_UnknownMethod { .. }) => {}
256                    }
257                }
258                _ = interest_changed => {}
259            }
260
261            if !hanging_gets.is_empty() {
262                let min_interest = self.state.lock().min_interest();
263                if Some(&min_interest) != previous_interest_sent.as_ref() {
264                    // Send updates to all outstanding hanging gets.
265                    for responder in hanging_gets.drain(..) {
266                        let _ = responder.send(Ok(&min_interest));
267                    }
268                    interest_changed.set(Fuse::terminated());
269                    previous_interest_sent = Some(min_interest);
270                } else if interest_changed.is_terminated() {
271                    // Set ourselves up to be woken when the interest changes.
272                    let previous_interest_sent = previous_interest_sent.clone();
273                    interest_changed.set(
274                        self.state
275                            .when(move |state| {
276                                if previous_interest_sent != Some(state.min_interest()) {
277                                    Poll::Ready(())
278                                } else {
279                                    Poll::Pending
280                                }
281                            })
282                            .fuse(),
283                    );
284                }
285            }
286        }
287
288        debug!(identity:% = self.identity; "LogSink channel closed.");
289        self.state.lock().num_active_channels -= 1;
290        self.check_inactive();
291    }
292
293    /// Updates log stats in inspect and push the message onto the container's buffer.
294    pub fn ingest_message(&self, message: StoredMessage) {
295        self.stats.ingest_message(message.size(), message.severity());
296        self.buffer.push_back(message.bytes());
297    }
298
299    /// Set the `Interest` for this component, notifying all active `LogSink/WaitForInterestChange`
300    /// hanging gets with the new interset if it is a change from the previous interest.
301    /// For any match that is also contained in `previous_selectors`, the previous values will be
302    /// removed from the set of interests.
303    pub fn update_interest<'a>(
304        &self,
305        interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
306        previous_selectors: &[LogInterestSelector],
307    ) {
308        let mut new_interest = FidlInterest::default();
309        let mut remove_interest = FidlInterest::default();
310        for selector in interest_selectors {
311            if self
312                .identity
313                .moniker
314                .matches_component_selector(&selector.selector)
315                .unwrap_or_default()
316            {
317                new_interest = selector.interest.clone();
318                // If there are more matches, ignore them, we'll pick the first match.
319                break;
320            }
321        }
322
323        if let Some(previous_selector) = previous_selectors.iter().find(|s| {
324            self.identity.moniker.matches_component_selector(&s.selector).unwrap_or_default()
325        }) {
326            remove_interest = previous_selector.interest.clone();
327        }
328
329        let mut state = self.state.lock();
330        // Unfortunately we cannot use a match statement since `FidlInterest` doesn't derive Eq.
331        // It does derive PartialEq though. All these branches will send an interest update if the
332        // minimum interest changes after performing the required actions.
333        if new_interest == FidlInterest::default() && remove_interest != FidlInterest::default() {
334            state.erase(&remove_interest);
335        } else if new_interest != FidlInterest::default()
336            && remove_interest == FidlInterest::default()
337        {
338            state.push_interest(new_interest);
339        } else if new_interest != FidlInterest::default()
340            && remove_interest != FidlInterest::default()
341        {
342            state.erase(&remove_interest);
343            state.push_interest(new_interest);
344        } else {
345            return;
346        }
347
348        for waker in state.drain_wakers() {
349            waker.wake();
350        }
351    }
352
353    /// Resets the `Interest` for this component, notifying all active
354    /// `LogSink/WaitForInterestChange` hanging gets with the lowest interest found in the set of
355    /// requested interests for all control handles.
356    pub fn reset_interest(&self, interest_selectors: &[LogInterestSelector]) {
357        for selector in interest_selectors {
358            if self
359                .identity
360                .moniker
361                .matches_component_selector(&selector.selector)
362                .unwrap_or_default()
363            {
364                let mut state = self.state.lock();
365                state.erase(&selector.interest);
366                for waker in state.drain_wakers() {
367                    waker.wake();
368                }
369                return;
370            }
371        }
372    }
373
374    /// Returns `true` if this container corresponds to a running component, or still has pending
375    /// objects to drain.
376    pub fn is_active(&self) -> bool {
377        let state = self.state.lock();
378        state.is_initializing || state.num_active_channels > 0 || self.buffer.is_active()
379    }
380
381    /// Called whenever there's a transition that means the component might no longer be active.
382    fn check_inactive(&self) {
383        if !self.is_active() {
384            if let Some(on_inactive) = &self.on_inactive {
385                on_inactive(self);
386            }
387        }
388    }
389
390    /// Stop accepting new messages, ensuring that pending Cursors return Poll::Ready(None) after
391    /// consuming any messages received before this call.
392    pub fn terminate(&self) {
393        self.buffer.terminate();
394    }
395
396    #[cfg(test)]
397    pub fn mark_stopped(&self) {
398        self.state.lock().is_initializing = false;
399        self.check_inactive();
400    }
401}
402
403fn maybe_add_rolled_out_error(rolled_out_messages: &mut u64, mut msg: Data<Logs>) -> Data<Logs> {
404    if *rolled_out_messages != 0 {
405        // Add rolled out metadata
406        msg.metadata
407            .errors
408            .get_or_insert(vec![])
409            .push(LogError::RolledOutLogs { count: *rolled_out_messages });
410    }
411    *rolled_out_messages = 0;
412    msg
413}
414
415impl ContainerState {
416    /// Pushes the given `interest` to the set.
417    fn push_interest(&mut self, interest: FidlInterest) {
418        if interest != FidlInterest::default() {
419            let count = self.interests.entry(interest.into()).or_insert(0);
420            *count += 1;
421        }
422    }
423
424    /// Removes the given `interest` from the set
425    fn erase(&mut self, interest: &FidlInterest) {
426        let interest = interest.clone().into();
427        if let Some(count) = self.interests.get_mut(&interest) {
428            if *count <= 1 {
429                self.interests.remove(&interest);
430            } else {
431                *count -= 1;
432            }
433        }
434    }
435
436    /// Returns a copy of the lowest interest in the set. If the set is empty, an EMPTY interest is
437    /// returned.
438    fn min_interest(&self) -> FidlInterest {
439        // btreemap: keys are sorted and ascending.
440        self.interests.keys().next().map(|i| i.0.clone()).unwrap_or_default()
441    }
442}
443
444#[derive(Debug, PartialEq)]
445struct Interest(FidlInterest);
446
447impl From<FidlInterest> for Interest {
448    fn from(interest: FidlInterest) -> Interest {
449        Interest(interest)
450    }
451}
452
453impl From<FidlSeverity> for Interest {
454    fn from(severity: FidlSeverity) -> Interest {
455        Interest(FidlInterest { min_severity: Some(severity), ..Default::default() })
456    }
457}
458
459impl std::ops::Deref for Interest {
460    type Target = FidlInterest;
461    fn deref(&self) -> &Self::Target {
462        &self.0
463    }
464}
465
466impl Eq for Interest {}
467
468impl Ord for Interest {
469    fn cmp(&self, other: &Self) -> Ordering {
470        self.min_severity.cmp(&other.min_severity)
471    }
472}
473
474impl PartialOrd for Interest {
475    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
476        Some(self.cmp(other))
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use crate::logs::shared_buffer::SharedBuffer;
484    use fidl_fuchsia_diagnostics::{ComponentSelector, StringSelector};
485    use fidl_fuchsia_diagnostics_types::Severity;
486    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
487    use fuchsia_async::{Task, TestExecutor};
488    use fuchsia_inspect as inspect;
489    use fuchsia_inspect_derive::WithInspect;
490    use moniker::ExtendedMoniker;
491
492    fn initialize_container(
493        severity: Option<Severity>,
494        scope: fasync::ScopeHandle,
495    ) -> (Arc<LogsArtifactsContainer>, LogSinkProxy) {
496        let identity = Arc::new(ComponentIdentity::new(
497            ExtendedMoniker::parse_str("/foo/bar").unwrap(),
498            "fuchsia-pkg://test",
499        ));
500        let stats = Arc::new(
501            LogStreamStats::default()
502                .with_inspect(inspect::component::inspector().root(), identity.moniker.to_string())
503                .expect("failed to attach component log stats"),
504        );
505        let buffer = SharedBuffer::new(1024 * 1024, Box::new(|_| {}));
506        let container = Arc::new(LogsArtifactsContainer::new(
507            identity,
508            std::iter::empty(),
509            severity,
510            Arc::clone(&stats),
511            buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats),
512            None,
513        ));
514        // Connect out LogSink under test and take its events channel.
515        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
516        container.handle_log_sink(stream, scope);
517        (container, proxy)
518    }
519
520    #[fuchsia::test(allow_stalls = false)]
521    async fn update_interest() {
522        // Sync path test (initial interest)
523        let scope = fasync::Scope::new();
524        let (container, log_sink) = initialize_container(None, scope.to_handle());
525
526        // Get initial interest
527        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
528
529        // Async (blocking) path test.
530        assert_eq!(initial_interest.min_severity, None);
531        let log_sink_clone = log_sink.clone();
532        let mut interest_future =
533            Task::spawn(async move { log_sink_clone.wait_for_interest_change().await });
534
535        // The call should be blocked.
536        assert!(TestExecutor::poll_until_stalled(&mut interest_future).await.is_pending());
537
538        // We should see this interest update. This should unblock the hanging get.
539        container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
540
541        // Verify we see the last interest we set.
542        assert_eq!(interest_future.await.unwrap().unwrap().min_severity, Some(Severity::Info));
543    }
544
545    #[fuchsia::test]
546    async fn initial_interest() {
547        let scope = fasync::Scope::new();
548        let (_container, log_sink) = initialize_container(Some(Severity::Info), scope.to_handle());
549        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
550        assert_eq!(initial_interest.min_severity, Some(Severity::Info));
551    }
552
553    #[fuchsia::test]
554    async fn interest_serverity_semantics() {
555        let scope = fasync::Scope::new();
556        let (container, log_sink) = initialize_container(None, scope.to_handle());
557        let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
558        assert_eq!(initial_interest.min_severity, None);
559        // Set some interest.
560        container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
561        assert_severity(&log_sink, Severity::Info).await;
562        assert_interests(&container, [(Severity::Info, 1)]);
563
564        // Sending a higher interest (WARN > INFO) has no visible effect, even if the new interest
565        // (WARN) will be tracked internally until reset.
566        container.update_interest([interest(&["foo", "bar"], Some(Severity::Warn))].iter(), &[]);
567        assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
568
569        // Sending a lower interest (DEBUG < INFO) updates the previous one.
570        container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
571        assert_severity(&log_sink, Severity::Debug).await;
572        assert_interests(
573            &container,
574            [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
575        );
576
577        // Sending the same interest leads to tracking it twice, but no updates are sent since it's
578        // the same minimum interest.
579        container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
580        assert_interests(
581            &container,
582            [(Severity::Debug, 2), (Severity::Info, 1), (Severity::Warn, 1)],
583        );
584
585        // The first reset does nothing, since the new minimum interest remains the same (we had
586        // inserted twice, therefore we need to reset twice).
587        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
588        assert_interests(
589            &container,
590            [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
591        );
592
593        // The second reset causes a change in minimum interest -> now INFO.
594        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
595        assert_severity(&log_sink, Severity::Info).await;
596        assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
597
598        // If we pass a previous severity (INFO), then we undo it and set the new one (ERROR).
599        // However, we get WARN since that's the minimum severity in the set.
600        container.update_interest(
601            [interest(&["foo", "bar"], Some(Severity::Error))].iter(),
602            &[interest(&["foo", "bar"], Some(Severity::Info))],
603        );
604        assert_severity(&log_sink, Severity::Warn).await;
605        assert_interests(&container, [(Severity::Error, 1), (Severity::Warn, 1)]);
606
607        // When we reset warn, now we get ERROR since that's the minimum severity in the set.
608        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Warn))]);
609        assert_severity(&log_sink, Severity::Error).await;
610        assert_interests(&container, [(Severity::Error, 1)]);
611
612        // When we reset ERROR , we get back to EMPTY since we have removed all interests from the
613        // set.
614        container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Error))]);
615        assert_eq!(
616            log_sink.wait_for_interest_change().await.unwrap().unwrap(),
617            FidlInterest::default()
618        );
619
620        assert_interests(&container, []);
621    }
622
623    fn interest(moniker: &[&str], min_severity: Option<Severity>) -> LogInterestSelector {
624        LogInterestSelector {
625            selector: ComponentSelector {
626                moniker_segments: Some(
627                    moniker.iter().map(|s| StringSelector::ExactMatch(s.to_string())).collect(),
628                ),
629                ..Default::default()
630            },
631            interest: FidlInterest { min_severity, ..Default::default() },
632        }
633    }
634
635    async fn assert_severity(proxy: &LogSinkProxy, severity: Severity) {
636        assert_eq!(
637            proxy.wait_for_interest_change().await.unwrap().unwrap().min_severity.unwrap(),
638            severity
639        );
640    }
641
642    fn assert_interests<const N: usize>(
643        container: &LogsArtifactsContainer,
644        severities: [(Severity, usize); N],
645    ) {
646        let mut expected_map = BTreeMap::new();
647        expected_map.extend(IntoIterator::into_iter(severities).map(|(s, c)| {
648            let interest = FidlInterest { min_severity: Some(s), ..Default::default() };
649            (interest.into(), c)
650        }));
651        assert_eq!(expected_map, container.state.lock().interests);
652    }
653}