1use crate::diagnostics::TRACE_CATEGORY;
6use crate::identity::ComponentIdentity;
7use crate::logs::multiplex::PinStream;
8use crate::logs::shared_buffer::{self, ContainerBuffer, LazyItem};
9use crate::logs::stats::LogStreamStats;
10use crate::logs::stored_message::StoredMessage;
11use derivative::Derivative;
12use diagnostics_data::{BuilderArgs, Data, LogError, Logs, LogsData, LogsDataBuilder};
13use fidl_fuchsia_diagnostics::{LogInterestSelector, StreamMode};
14use fidl_fuchsia_diagnostics_types::{Interest as FidlInterest, Severity as FidlSeverity};
15use fidl_fuchsia_logger::{LogSinkRequest, LogSinkRequestStream};
16use fuchsia_async::condition::Condition;
17use futures::future::{Fuse, FusedFuture};
18use futures::prelude::*;
19use futures::select;
20use futures::stream::StreamExt;
21use log::{debug, error};
22use selectors::SelectorExt;
23use std::cmp::Ordering;
24use std::collections::BTreeMap;
25use std::pin::pin;
26use std::sync::Arc;
27use std::task::Poll;
28use {fuchsia_async as fasync, fuchsia_trace as ftrace};
29
30pub type OnInactive = Box<dyn Fn(&LogsArtifactsContainer) + Send + Sync>;
31
32#[derive(Derivative)]
33#[derivative(Debug)]
34pub struct LogsArtifactsContainer {
35 pub identity: Arc<ComponentIdentity>,
37
38 pub stats: Arc<LogStreamStats>,
40
41 #[derivative(Debug = "ignore")]
43 buffer: ContainerBuffer,
44
45 #[derivative(Debug = "ignore")]
47 state: Arc<Condition<ContainerState>>,
48
49 #[derivative(Debug = "ignore")]
52 on_inactive: Option<OnInactive>,
53}
54
55#[derive(Debug)]
56struct ContainerState {
57 num_active_channels: u64,
59
60 interests: BTreeMap<Interest, usize>,
62
63 is_initializing: bool,
64}
65
66#[derive(Debug, PartialEq)]
67pub struct CursorItem {
68 pub rolled_out: u64,
69 pub message: Arc<StoredMessage>,
70 pub identity: Arc<ComponentIdentity>,
71}
72
73impl Eq for CursorItem {}
74
75impl Ord for CursorItem {
76 fn cmp(&self, other: &Self) -> Ordering {
77 self.message.timestamp().cmp(&other.message.timestamp())
78 }
79}
80
81impl PartialOrd for CursorItem {
82 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
83 Some(self.cmp(other))
84 }
85}
86
87impl LogsArtifactsContainer {
88 pub fn new<'a>(
89 identity: Arc<ComponentIdentity>,
90 interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
91 initial_interest: Option<FidlSeverity>,
92 stats: Arc<LogStreamStats>,
93 buffer: ContainerBuffer,
94 on_inactive: Option<OnInactive>,
95 ) -> Self {
96 let mut interests = BTreeMap::new();
97 if let Some(severity) = initial_interest {
98 interests.insert(Interest::from(severity), 1);
99 }
100 let new = Self {
101 identity,
102 buffer,
103 state: Arc::new(Condition::new(ContainerState {
104 num_active_channels: 0,
105 interests,
106 is_initializing: true,
107 })),
108 stats,
109 on_inactive,
110 };
111
112 new.update_interest(interest_selectors, &[]);
114
115 new
116 }
117
118 fn create_raw_cursor(
119 &self,
120 buffer_cursor: shared_buffer::Cursor,
121 ) -> impl Stream<Item = CursorItem> {
122 let identity = Arc::clone(&self.identity);
123 buffer_cursor
124 .enumerate()
125 .scan((zx::BootInstant::ZERO, 0u64), move |(last_timestamp, rolled_out), (i, item)| {
126 futures::future::ready(match item {
127 LazyItem::Next(message) => {
128 *last_timestamp = message.timestamp();
129 Some(Some(CursorItem {
130 message,
131 identity: Arc::clone(&identity),
132 rolled_out: *rolled_out,
133 }))
134 }
135 LazyItem::ItemsRolledOut(rolled_out_count, timestamp) => {
136 if i > 0 {
137 *rolled_out += rolled_out_count;
138 }
139 *last_timestamp = timestamp;
140 Some(None)
141 }
142 })
143 })
144 .filter_map(future::ready)
145 }
146
147 pub fn cursor_raw(&self, mode: StreamMode) -> PinStream<CursorItem> {
155 let Some(buffer_cursor) = self.buffer.cursor(mode) else {
156 return Box::pin(futures::stream::empty());
157 };
158 Box::pin(self.create_raw_cursor(buffer_cursor))
159 }
160
161 pub fn cursor(
169 &self,
170 mode: StreamMode,
171 parent_trace_id: ftrace::Id,
172 ) -> PinStream<Arc<LogsData>> {
173 let Some(buffer_cursor) = self.buffer.cursor(mode) else {
174 return Box::pin(futures::stream::empty());
175 };
176 let mut rolled_out_count = 0;
177 Box::pin(self.create_raw_cursor(buffer_cursor).map(
178 move |CursorItem { message, identity, rolled_out }| {
179 rolled_out_count += rolled_out;
180 let trace_id = ftrace::Id::random();
181 let _trace_guard = ftrace::async_enter!(
182 trace_id,
183 TRACE_CATEGORY,
184 c"LogContainer::cursor.parse_message",
185 "parent_trace_id" => u64::from(parent_trace_id),
188 "trace_id" => u64::from(trace_id)
189 );
190 match message.parse(&identity) {
191 Ok(m) => Arc::new(maybe_add_rolled_out_error(&mut rolled_out_count, m)),
192 Err(err) => {
193 let data = maybe_add_rolled_out_error(
194 &mut rolled_out_count,
195 LogsDataBuilder::new(BuilderArgs {
196 moniker: identity.moniker.clone(),
197 timestamp: message.timestamp(),
198 component_url: Some(identity.url.clone()),
199 severity: diagnostics_data::Severity::Warn,
200 })
201 .add_error(diagnostics_data::LogError::FailedToParseRecord(format!(
202 "{err:?}"
203 )))
204 .build(),
205 );
206 Arc::new(data)
207 }
208 }
209 },
210 ))
211 }
212
213 pub fn handle_log_sink(
217 self: &Arc<Self>,
218 stream: LogSinkRequestStream,
219 scope: fasync::ScopeHandle,
220 ) {
221 {
222 let mut guard = self.state.lock();
223 guard.num_active_channels += 1;
224 guard.is_initializing = false;
225 }
226 scope.spawn(Arc::clone(self).actually_handle_log_sink(stream));
227 }
228
229 async fn actually_handle_log_sink(self: Arc<Self>, mut stream: LogSinkRequestStream) {
231 let mut previous_interest_sent = None;
232 debug!(identity:% = self.identity; "Draining LogSink channel.");
233
234 let mut hanging_gets = Vec::new();
235 let mut interest_changed = pin!(Fuse::terminated());
236
237 loop {
238 select! {
239 next = stream.next() => {
240 let Some(next) = next else { break };
241 match next {
242 Ok(LogSinkRequest::Connect { .. }) => {
243 error!("Received unexpected Connect message from {:?}", self.identity);
244 return;
245 }
246 Ok(LogSinkRequest::ConnectStructured { socket, .. }) => {
247 self.buffer.add_socket(socket);
248 }
249 Ok(LogSinkRequest::WaitForInterestChange { responder }) => {
250 hanging_gets.push(responder);
253 }
254 Err(e) => error!(identity:% = self.identity, e:%; "error handling log sink"),
255 Ok(LogSinkRequest::_UnknownMethod { .. }) => {}
256 }
257 }
258 _ = interest_changed => {}
259 }
260
261 if !hanging_gets.is_empty() {
262 let min_interest = self.state.lock().min_interest();
263 if Some(&min_interest) != previous_interest_sent.as_ref() {
264 for responder in hanging_gets.drain(..) {
266 let _ = responder.send(Ok(&min_interest));
267 }
268 interest_changed.set(Fuse::terminated());
269 previous_interest_sent = Some(min_interest);
270 } else if interest_changed.is_terminated() {
271 let previous_interest_sent = previous_interest_sent.clone();
273 interest_changed.set(
274 self.state
275 .when(move |state| {
276 if previous_interest_sent != Some(state.min_interest()) {
277 Poll::Ready(())
278 } else {
279 Poll::Pending
280 }
281 })
282 .fuse(),
283 );
284 }
285 }
286 }
287
288 debug!(identity:% = self.identity; "LogSink channel closed.");
289 self.state.lock().num_active_channels -= 1;
290 self.check_inactive();
291 }
292
293 pub fn ingest_message(&self, message: StoredMessage) {
295 self.stats.ingest_message(message.size(), message.severity());
296 self.buffer.push_back(message.bytes());
297 }
298
299 pub fn update_interest<'a>(
304 &self,
305 interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
306 previous_selectors: &[LogInterestSelector],
307 ) {
308 let mut new_interest = FidlInterest::default();
309 let mut remove_interest = FidlInterest::default();
310 for selector in interest_selectors {
311 if self
312 .identity
313 .moniker
314 .matches_component_selector(&selector.selector)
315 .unwrap_or_default()
316 {
317 new_interest = selector.interest.clone();
318 break;
320 }
321 }
322
323 if let Some(previous_selector) = previous_selectors.iter().find(|s| {
324 self.identity.moniker.matches_component_selector(&s.selector).unwrap_or_default()
325 }) {
326 remove_interest = previous_selector.interest.clone();
327 }
328
329 let mut state = self.state.lock();
330 if new_interest == FidlInterest::default() && remove_interest != FidlInterest::default() {
334 state.erase(&remove_interest);
335 } else if new_interest != FidlInterest::default()
336 && remove_interest == FidlInterest::default()
337 {
338 state.push_interest(new_interest);
339 } else if new_interest != FidlInterest::default()
340 && remove_interest != FidlInterest::default()
341 {
342 state.erase(&remove_interest);
343 state.push_interest(new_interest);
344 } else {
345 return;
346 }
347
348 for waker in state.drain_wakers() {
349 waker.wake();
350 }
351 }
352
353 pub fn reset_interest(&self, interest_selectors: &[LogInterestSelector]) {
357 for selector in interest_selectors {
358 if self
359 .identity
360 .moniker
361 .matches_component_selector(&selector.selector)
362 .unwrap_or_default()
363 {
364 let mut state = self.state.lock();
365 state.erase(&selector.interest);
366 for waker in state.drain_wakers() {
367 waker.wake();
368 }
369 return;
370 }
371 }
372 }
373
374 pub fn is_active(&self) -> bool {
377 let state = self.state.lock();
378 state.is_initializing || state.num_active_channels > 0 || self.buffer.is_active()
379 }
380
381 fn check_inactive(&self) {
383 if !self.is_active() {
384 if let Some(on_inactive) = &self.on_inactive {
385 on_inactive(self);
386 }
387 }
388 }
389
390 pub fn terminate(&self) {
393 self.buffer.terminate();
394 }
395
396 #[cfg(test)]
397 pub fn mark_stopped(&self) {
398 self.state.lock().is_initializing = false;
399 self.check_inactive();
400 }
401}
402
403fn maybe_add_rolled_out_error(rolled_out_messages: &mut u64, mut msg: Data<Logs>) -> Data<Logs> {
404 if *rolled_out_messages != 0 {
405 msg.metadata
407 .errors
408 .get_or_insert(vec![])
409 .push(LogError::RolledOutLogs { count: *rolled_out_messages });
410 }
411 *rolled_out_messages = 0;
412 msg
413}
414
415impl ContainerState {
416 fn push_interest(&mut self, interest: FidlInterest) {
418 if interest != FidlInterest::default() {
419 let count = self.interests.entry(interest.into()).or_insert(0);
420 *count += 1;
421 }
422 }
423
424 fn erase(&mut self, interest: &FidlInterest) {
426 let interest = interest.clone().into();
427 if let Some(count) = self.interests.get_mut(&interest) {
428 if *count <= 1 {
429 self.interests.remove(&interest);
430 } else {
431 *count -= 1;
432 }
433 }
434 }
435
436 fn min_interest(&self) -> FidlInterest {
439 self.interests.keys().next().map(|i| i.0.clone()).unwrap_or_default()
441 }
442}
443
444#[derive(Debug, PartialEq)]
445struct Interest(FidlInterest);
446
447impl From<FidlInterest> for Interest {
448 fn from(interest: FidlInterest) -> Interest {
449 Interest(interest)
450 }
451}
452
453impl From<FidlSeverity> for Interest {
454 fn from(severity: FidlSeverity) -> Interest {
455 Interest(FidlInterest { min_severity: Some(severity), ..Default::default() })
456 }
457}
458
459impl std::ops::Deref for Interest {
460 type Target = FidlInterest;
461 fn deref(&self) -> &Self::Target {
462 &self.0
463 }
464}
465
466impl Eq for Interest {}
467
468impl Ord for Interest {
469 fn cmp(&self, other: &Self) -> Ordering {
470 self.min_severity.cmp(&other.min_severity)
471 }
472}
473
474impl PartialOrd for Interest {
475 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
476 Some(self.cmp(other))
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use crate::logs::shared_buffer::SharedBuffer;
484 use fidl_fuchsia_diagnostics::{ComponentSelector, StringSelector};
485 use fidl_fuchsia_diagnostics_types::Severity;
486 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
487 use fuchsia_async::{Task, TestExecutor};
488 use fuchsia_inspect as inspect;
489 use fuchsia_inspect_derive::WithInspect;
490 use moniker::ExtendedMoniker;
491
492 fn initialize_container(
493 severity: Option<Severity>,
494 scope: fasync::ScopeHandle,
495 ) -> (Arc<LogsArtifactsContainer>, LogSinkProxy) {
496 let identity = Arc::new(ComponentIdentity::new(
497 ExtendedMoniker::parse_str("/foo/bar").unwrap(),
498 "fuchsia-pkg://test",
499 ));
500 let stats = Arc::new(
501 LogStreamStats::default()
502 .with_inspect(inspect::component::inspector().root(), identity.moniker.to_string())
503 .expect("failed to attach component log stats"),
504 );
505 let buffer = SharedBuffer::new(1024 * 1024, Box::new(|_| {}));
506 let container = Arc::new(LogsArtifactsContainer::new(
507 identity,
508 std::iter::empty(),
509 severity,
510 Arc::clone(&stats),
511 buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats),
512 None,
513 ));
514 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
516 container.handle_log_sink(stream, scope);
517 (container, proxy)
518 }
519
520 #[fuchsia::test(allow_stalls = false)]
521 async fn update_interest() {
522 let scope = fasync::Scope::new();
524 let (container, log_sink) = initialize_container(None, scope.to_handle());
525
526 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
528
529 assert_eq!(initial_interest.min_severity, None);
531 let log_sink_clone = log_sink.clone();
532 let mut interest_future =
533 Task::spawn(async move { log_sink_clone.wait_for_interest_change().await });
534
535 assert!(TestExecutor::poll_until_stalled(&mut interest_future).await.is_pending());
537
538 container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
540
541 assert_eq!(interest_future.await.unwrap().unwrap().min_severity, Some(Severity::Info));
543 }
544
545 #[fuchsia::test]
546 async fn initial_interest() {
547 let scope = fasync::Scope::new();
548 let (_container, log_sink) = initialize_container(Some(Severity::Info), scope.to_handle());
549 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
550 assert_eq!(initial_interest.min_severity, Some(Severity::Info));
551 }
552
553 #[fuchsia::test]
554 async fn interest_serverity_semantics() {
555 let scope = fasync::Scope::new();
556 let (container, log_sink) = initialize_container(None, scope.to_handle());
557 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
558 assert_eq!(initial_interest.min_severity, None);
559 container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
561 assert_severity(&log_sink, Severity::Info).await;
562 assert_interests(&container, [(Severity::Info, 1)]);
563
564 container.update_interest([interest(&["foo", "bar"], Some(Severity::Warn))].iter(), &[]);
567 assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
568
569 container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
571 assert_severity(&log_sink, Severity::Debug).await;
572 assert_interests(
573 &container,
574 [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
575 );
576
577 container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
580 assert_interests(
581 &container,
582 [(Severity::Debug, 2), (Severity::Info, 1), (Severity::Warn, 1)],
583 );
584
585 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
588 assert_interests(
589 &container,
590 [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
591 );
592
593 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
595 assert_severity(&log_sink, Severity::Info).await;
596 assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
597
598 container.update_interest(
601 [interest(&["foo", "bar"], Some(Severity::Error))].iter(),
602 &[interest(&["foo", "bar"], Some(Severity::Info))],
603 );
604 assert_severity(&log_sink, Severity::Warn).await;
605 assert_interests(&container, [(Severity::Error, 1), (Severity::Warn, 1)]);
606
607 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Warn))]);
609 assert_severity(&log_sink, Severity::Error).await;
610 assert_interests(&container, [(Severity::Error, 1)]);
611
612 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Error))]);
615 assert_eq!(
616 log_sink.wait_for_interest_change().await.unwrap().unwrap(),
617 FidlInterest::default()
618 );
619
620 assert_interests(&container, []);
621 }
622
623 fn interest(moniker: &[&str], min_severity: Option<Severity>) -> LogInterestSelector {
624 LogInterestSelector {
625 selector: ComponentSelector {
626 moniker_segments: Some(
627 moniker.iter().map(|s| StringSelector::ExactMatch(s.to_string())).collect(),
628 ),
629 ..Default::default()
630 },
631 interest: FidlInterest { min_severity, ..Default::default() },
632 }
633 }
634
635 async fn assert_severity(proxy: &LogSinkProxy, severity: Severity) {
636 assert_eq!(
637 proxy.wait_for_interest_change().await.unwrap().unwrap().min_severity.unwrap(),
638 severity
639 );
640 }
641
642 fn assert_interests<const N: usize>(
643 container: &LogsArtifactsContainer,
644 severities: [(Severity, usize); N],
645 ) {
646 let mut expected_map = BTreeMap::new();
647 expected_map.extend(IntoIterator::into_iter(severities).map(|(s, c)| {
648 let interest = FidlInterest { min_severity: Some(s), ..Default::default() };
649 (interest.into(), c)
650 }));
651 assert_eq!(expected_map, container.state.lock().interests);
652 }
653}