1use crate::diagnostics::TRACE_CATEGORY;
6use crate::identity::ComponentIdentity;
7use crate::logs::multiplex::PinStream;
8use crate::logs::shared_buffer::{self, ContainerBuffer, LazyItem};
9use crate::logs::socket::{Encoding, LogMessageSocket};
10use crate::logs::stats::LogStreamStats;
11use crate::logs::stored_message::StoredMessage;
12use derivative::Derivative;
13use diagnostics_data::{BuilderArgs, Data, LogError, Logs, LogsData, LogsDataBuilder};
14use fidl_fuchsia_diagnostics::{LogInterestSelector, StreamMode};
15use fidl_fuchsia_diagnostics_types::{Interest as FidlInterest, Severity as FidlSeverity};
16use fidl_fuchsia_logger::{LogSinkRequest, LogSinkRequestStream};
17use fuchsia_async::condition::Condition;
18use futures::future::{Fuse, FusedFuture};
19use futures::prelude::*;
20use futures::select;
21use futures::stream::StreamExt;
22use log::{debug, error, warn};
23use selectors::SelectorExt;
24use std::cmp::Ordering;
25use std::collections::BTreeMap;
26use std::pin::pin;
27use std::sync::Arc;
28use std::task::Poll;
29use {fuchsia_async as fasync, fuchsia_trace as ftrace};
30
31pub type OnInactive = Box<dyn Fn(&LogsArtifactsContainer) + Send + Sync>;
32
33#[derive(Derivative)]
34#[derivative(Debug)]
35pub struct LogsArtifactsContainer {
36 pub identity: Arc<ComponentIdentity>,
38
39 pub stats: Arc<LogStreamStats>,
41
42 #[derivative(Debug = "ignore")]
44 buffer: ContainerBuffer,
45
46 #[derivative(Debug = "ignore")]
48 state: Arc<Condition<ContainerState>>,
49
50 #[derivative(Debug = "ignore")]
53 on_inactive: Option<OnInactive>,
54}
55
56#[derive(Debug)]
57struct ContainerState {
58 num_active_legacy_sockets: u64,
61
62 num_active_channels: u64,
64
65 interests: BTreeMap<Interest, usize>,
67
68 is_initializing: bool,
69}
70
71#[derive(Debug, PartialEq)]
72pub struct CursorItem {
73 pub rolled_out: u64,
74 pub message: Arc<StoredMessage>,
75 pub identity: Arc<ComponentIdentity>,
76}
77
78impl Eq for CursorItem {}
79
80impl Ord for CursorItem {
81 fn cmp(&self, other: &Self) -> Ordering {
82 self.message.timestamp().cmp(&other.message.timestamp())
83 }
84}
85
86impl PartialOrd for CursorItem {
87 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
88 Some(self.message.timestamp().cmp(&other.message.timestamp()))
89 }
90}
91
92impl LogsArtifactsContainer {
93 pub fn new<'a>(
94 identity: Arc<ComponentIdentity>,
95 interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
96 initial_interest: Option<FidlSeverity>,
97 stats: Arc<LogStreamStats>,
98 buffer: ContainerBuffer,
99 on_inactive: Option<OnInactive>,
100 ) -> Self {
101 let mut interests = BTreeMap::new();
102 if let Some(severity) = initial_interest {
103 interests.insert(Interest::from(severity), 1);
104 }
105 let new = Self {
106 identity,
107 buffer,
108 state: Arc::new(Condition::new(ContainerState {
109 num_active_channels: 0,
110 num_active_legacy_sockets: 0,
111 interests,
112 is_initializing: true,
113 })),
114 stats,
115 on_inactive,
116 };
117
118 new.update_interest(interest_selectors, &[]);
120
121 new
122 }
123
124 fn create_raw_cursor(
125 &self,
126 buffer_cursor: shared_buffer::Cursor,
127 ) -> impl Stream<Item = CursorItem> {
128 let identity = Arc::clone(&self.identity);
129 buffer_cursor
130 .enumerate()
131 .scan((zx::BootInstant::ZERO, 0u64), move |(last_timestamp, rolled_out), (i, item)| {
132 futures::future::ready(match item {
133 LazyItem::Next(message) => {
134 *last_timestamp = message.timestamp();
135 Some(Some(CursorItem {
136 message,
137 identity: Arc::clone(&identity),
138 rolled_out: *rolled_out,
139 }))
140 }
141 LazyItem::ItemsRolledOut(rolled_out_count, timestamp) => {
142 if i > 0 {
143 *rolled_out += rolled_out_count;
144 }
145 *last_timestamp = timestamp;
146 Some(None)
147 }
148 })
149 })
150 .filter_map(future::ready)
151 }
152
153 pub fn cursor_raw(&self, mode: StreamMode) -> PinStream<CursorItem> {
161 let Some(buffer_cursor) = self.buffer.cursor(mode) else {
162 return Box::pin(futures::stream::empty());
163 };
164 Box::pin(self.create_raw_cursor(buffer_cursor))
165 }
166
167 pub fn cursor(
175 &self,
176 mode: StreamMode,
177 parent_trace_id: ftrace::Id,
178 ) -> PinStream<Arc<LogsData>> {
179 let Some(buffer_cursor) = self.buffer.cursor(mode) else {
180 return Box::pin(futures::stream::empty());
181 };
182 let mut rolled_out_count = 0;
183 Box::pin(self.create_raw_cursor(buffer_cursor).map(
184 move |CursorItem { message, identity, rolled_out }| {
185 rolled_out_count += rolled_out;
186 let trace_id = ftrace::Id::random();
187 let _trace_guard = ftrace::async_enter!(
188 trace_id,
189 TRACE_CATEGORY,
190 c"LogContainer::cursor.parse_message",
191 "parent_trace_id" => u64::from(parent_trace_id),
194 "trace_id" => u64::from(trace_id)
195 );
196 match message.parse(&identity) {
197 Ok(m) => Arc::new(maybe_add_rolled_out_error(&mut rolled_out_count, m)),
198 Err(err) => {
199 let data = maybe_add_rolled_out_error(
200 &mut rolled_out_count,
201 LogsDataBuilder::new(BuilderArgs {
202 moniker: identity.moniker.clone(),
203 timestamp: message.timestamp(),
204 component_url: Some(identity.url.clone()),
205 severity: diagnostics_data::Severity::Warn,
206 })
207 .add_error(diagnostics_data::LogError::FailedToParseRecord(format!(
208 "{err:?}"
209 )))
210 .build(),
211 );
212 Arc::new(data)
213 }
214 }
215 },
216 ))
217 }
218
219 pub fn handle_log_sink(
223 self: &Arc<Self>,
224 stream: LogSinkRequestStream,
225 scope: fasync::ScopeHandle,
226 ) {
227 {
228 let mut guard = self.state.lock();
229 guard.num_active_channels += 1;
230 guard.is_initializing = false;
231 }
232 scope.spawn(Arc::clone(self).actually_handle_log_sink(stream, scope.clone()));
233 }
234
235 async fn actually_handle_log_sink(
237 self: Arc<Self>,
238 mut stream: LogSinkRequestStream,
239 scope: fasync::ScopeHandle,
240 ) {
241 let mut previous_interest_sent = None;
242 debug!(identity:% = self.identity; "Draining LogSink channel.");
243
244 let mut hanging_gets = Vec::new();
245 let mut interest_changed = pin!(Fuse::terminated());
246
247 loop {
248 select! {
249 next = stream.next() => {
250 let Some(next) = next else { break };
251 match next {
252 Ok(LogSinkRequest::Connect { socket, .. }) => {
253 let socket = fasync::Socket::from_socket(socket);
256 let log_stream = LogMessageSocket::new(socket, Arc::clone(&self.stats));
257 self.state.lock().num_active_legacy_sockets += 1;
258 scope.spawn(Arc::clone(&self).drain_messages(log_stream));
259 }
260 Ok(LogSinkRequest::ConnectStructured { socket, .. }) => {
261 self.buffer.add_socket(socket);
262 }
263 Ok(LogSinkRequest::WaitForInterestChange { responder }) => {
264 hanging_gets.push(responder);
267 }
268 Err(e) => error!(identity:% = self.identity, e:%; "error handling log sink"),
269 Ok(LogSinkRequest::_UnknownMethod { .. }) => {}
270 }
271 }
272 _ = interest_changed => {}
273 }
274
275 if !hanging_gets.is_empty() {
276 let min_interest = self.state.lock().min_interest();
277 if Some(&min_interest) != previous_interest_sent.as_ref() {
278 for responder in hanging_gets.drain(..) {
280 let _ = responder.send(Ok(&min_interest));
281 }
282 interest_changed.set(Fuse::terminated());
283 previous_interest_sent = Some(min_interest);
284 } else if interest_changed.is_terminated() {
285 let previous_interest_sent = previous_interest_sent.clone();
287 interest_changed.set(
288 self.state
289 .when(move |state| {
290 if previous_interest_sent != Some(state.min_interest()) {
291 Poll::Ready(())
292 } else {
293 Poll::Pending
294 }
295 })
296 .fuse(),
297 );
298 }
299 }
300 }
301
302 debug!(identity:% = self.identity; "LogSink channel closed.");
303 self.state.lock().num_active_channels -= 1;
304 self.check_inactive();
305 }
306
307 pub async fn drain_messages<E>(self: Arc<Self>, mut log_stream: LogMessageSocket<E>)
310 where
311 E: Encoding + Unpin,
312 {
313 debug!(identity:% = self.identity; "Draining messages from a socket.");
314 loop {
315 match log_stream.next().await {
316 Some(Ok(message)) => self.ingest_message(message),
317 Some(Err(err)) => {
318 warn!(source:% = self.identity, err:%; "closing socket");
319 break;
320 }
321 None => break,
322 }
323 }
324 debug!(identity:% = self.identity; "Socket closed.");
325 self.state.lock().num_active_legacy_sockets -= 1;
326 self.check_inactive();
327 }
328
329 pub fn ingest_message(&self, message: StoredMessage) {
331 self.stats.ingest_message(message.size(), message.severity());
332 self.buffer.push_back(message.bytes());
333 }
334
335 pub fn update_interest<'a>(
340 &self,
341 interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
342 previous_selectors: &[LogInterestSelector],
343 ) {
344 let mut new_interest = FidlInterest::default();
345 let mut remove_interest = FidlInterest::default();
346 for selector in interest_selectors {
347 if self
348 .identity
349 .moniker
350 .matches_component_selector(&selector.selector)
351 .unwrap_or_default()
352 {
353 new_interest = selector.interest.clone();
354 break;
356 }
357 }
358
359 if let Some(previous_selector) = previous_selectors.iter().find(|s| {
360 self.identity.moniker.matches_component_selector(&s.selector).unwrap_or_default()
361 }) {
362 remove_interest = previous_selector.interest.clone();
363 }
364
365 let mut state = self.state.lock();
366 if new_interest == FidlInterest::default() && remove_interest != FidlInterest::default() {
370 state.erase(&remove_interest);
371 } else if new_interest != FidlInterest::default()
372 && remove_interest == FidlInterest::default()
373 {
374 state.push_interest(new_interest);
375 } else if new_interest != FidlInterest::default()
376 && remove_interest != FidlInterest::default()
377 {
378 state.erase(&remove_interest);
379 state.push_interest(new_interest);
380 } else {
381 return;
382 }
383
384 for waker in state.drain_wakers() {
385 waker.wake();
386 }
387 }
388
389 pub fn reset_interest(&self, interest_selectors: &[LogInterestSelector]) {
393 for selector in interest_selectors {
394 if self
395 .identity
396 .moniker
397 .matches_component_selector(&selector.selector)
398 .unwrap_or_default()
399 {
400 let mut state = self.state.lock();
401 state.erase(&selector.interest);
402 for waker in state.drain_wakers() {
403 waker.wake();
404 }
405 return;
406 }
407 }
408 }
409
410 pub fn is_active(&self) -> bool {
413 let state = self.state.lock();
414 state.is_initializing
415 || state.num_active_legacy_sockets > 0
416 || state.num_active_channels > 0
417 || self.buffer.is_active()
418 }
419
420 fn check_inactive(&self) {
422 if !self.is_active() {
423 if let Some(on_inactive) = &self.on_inactive {
424 on_inactive(self);
425 }
426 }
427 }
428
429 pub fn terminate(&self) {
432 self.buffer.terminate();
433 }
434
435 #[cfg(test)]
436 pub fn mark_stopped(&self) {
437 self.state.lock().is_initializing = false;
438 self.check_inactive();
439 }
440}
441
442fn maybe_add_rolled_out_error(rolled_out_messages: &mut u64, mut msg: Data<Logs>) -> Data<Logs> {
443 if *rolled_out_messages != 0 {
444 msg.metadata
446 .errors
447 .get_or_insert(vec![])
448 .push(LogError::RolledOutLogs { count: *rolled_out_messages });
449 }
450 *rolled_out_messages = 0;
451 msg
452}
453
454impl ContainerState {
455 fn push_interest(&mut self, interest: FidlInterest) {
457 if interest != FidlInterest::default() {
458 let count = self.interests.entry(interest.into()).or_insert(0);
459 *count += 1;
460 }
461 }
462
463 fn erase(&mut self, interest: &FidlInterest) {
465 let interest = interest.clone().into();
466 if let Some(count) = self.interests.get_mut(&interest) {
467 if *count <= 1 {
468 self.interests.remove(&interest);
469 } else {
470 *count -= 1;
471 }
472 }
473 }
474
475 fn min_interest(&self) -> FidlInterest {
478 self.interests.keys().next().map(|i| i.0.clone()).unwrap_or_default()
480 }
481}
482
483#[derive(Debug, PartialEq)]
484struct Interest(FidlInterest);
485
486impl From<FidlInterest> for Interest {
487 fn from(interest: FidlInterest) -> Interest {
488 Interest(interest)
489 }
490}
491
492impl From<FidlSeverity> for Interest {
493 fn from(severity: FidlSeverity) -> Interest {
494 Interest(FidlInterest { min_severity: Some(severity), ..Default::default() })
495 }
496}
497
498impl std::ops::Deref for Interest {
499 type Target = FidlInterest;
500 fn deref(&self) -> &Self::Target {
501 &self.0
502 }
503}
504
505impl Eq for Interest {}
506
507impl Ord for Interest {
508 fn cmp(&self, other: &Self) -> Ordering {
509 self.min_severity.cmp(&other.min_severity)
510 }
511}
512
513impl PartialOrd for Interest {
514 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
515 Some(self.cmp(other))
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522 use crate::logs::shared_buffer::SharedBuffer;
523 use fidl_fuchsia_diagnostics::{ComponentSelector, StringSelector};
524 use fidl_fuchsia_diagnostics_types::Severity;
525 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
526 use fuchsia_async::{Task, TestExecutor};
527 use fuchsia_inspect as inspect;
528 use fuchsia_inspect_derive::WithInspect;
529 use moniker::ExtendedMoniker;
530
531 fn initialize_container(
532 severity: Option<Severity>,
533 scope: fasync::ScopeHandle,
534 ) -> (Arc<LogsArtifactsContainer>, LogSinkProxy) {
535 let identity = Arc::new(ComponentIdentity::new(
536 ExtendedMoniker::parse_str("/foo/bar").unwrap(),
537 "fuchsia-pkg://test",
538 ));
539 let stats = Arc::new(
540 LogStreamStats::default()
541 .with_inspect(inspect::component::inspector().root(), identity.moniker.to_string())
542 .expect("failed to attach component log stats"),
543 );
544 let buffer = SharedBuffer::new(1024 * 1024, Box::new(|_| {}));
545 let container = Arc::new(LogsArtifactsContainer::new(
546 identity,
547 std::iter::empty(),
548 severity,
549 Arc::clone(&stats),
550 buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats),
551 None,
552 ));
553 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
555 container.handle_log_sink(stream, scope);
556 (container, proxy)
557 }
558
559 #[fuchsia::test(allow_stalls = false)]
560 async fn update_interest() {
561 let scope = fasync::Scope::new();
563 let (container, log_sink) = initialize_container(None, scope.to_handle());
564
565 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
567
568 assert_eq!(initial_interest.min_severity, None);
570 let log_sink_clone = log_sink.clone();
571 let mut interest_future =
572 Task::spawn(async move { log_sink_clone.wait_for_interest_change().await });
573
574 assert!(TestExecutor::poll_until_stalled(&mut interest_future).await.is_pending());
576
577 container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
579
580 assert_eq!(interest_future.await.unwrap().unwrap().min_severity, Some(Severity::Info));
582 }
583
584 #[fuchsia::test]
585 async fn initial_interest() {
586 let scope = fasync::Scope::new();
587 let (_container, log_sink) = initialize_container(Some(Severity::Info), scope.to_handle());
588 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
589 assert_eq!(initial_interest.min_severity, Some(Severity::Info));
590 }
591
592 #[fuchsia::test]
593 async fn interest_serverity_semantics() {
594 let scope = fasync::Scope::new();
595 let (container, log_sink) = initialize_container(None, scope.to_handle());
596 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
597 assert_eq!(initial_interest.min_severity, None);
598 container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
600 assert_severity(&log_sink, Severity::Info).await;
601 assert_interests(&container, [(Severity::Info, 1)]);
602
603 container.update_interest([interest(&["foo", "bar"], Some(Severity::Warn))].iter(), &[]);
606 assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
607
608 container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
610 assert_severity(&log_sink, Severity::Debug).await;
611 assert_interests(
612 &container,
613 [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
614 );
615
616 container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
619 assert_interests(
620 &container,
621 [(Severity::Debug, 2), (Severity::Info, 1), (Severity::Warn, 1)],
622 );
623
624 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
627 assert_interests(
628 &container,
629 [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
630 );
631
632 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
634 assert_severity(&log_sink, Severity::Info).await;
635 assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
636
637 container.update_interest(
640 [interest(&["foo", "bar"], Some(Severity::Error))].iter(),
641 &[interest(&["foo", "bar"], Some(Severity::Info))],
642 );
643 assert_severity(&log_sink, Severity::Warn).await;
644 assert_interests(&container, [(Severity::Error, 1), (Severity::Warn, 1)]);
645
646 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Warn))]);
648 assert_severity(&log_sink, Severity::Error).await;
649 assert_interests(&container, [(Severity::Error, 1)]);
650
651 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Error))]);
654 assert_eq!(
655 log_sink.wait_for_interest_change().await.unwrap().unwrap(),
656 FidlInterest::default()
657 );
658
659 assert_interests(&container, []);
660 }
661
662 fn interest(moniker: &[&str], min_severity: Option<Severity>) -> LogInterestSelector {
663 LogInterestSelector {
664 selector: ComponentSelector {
665 moniker_segments: Some(
666 moniker.iter().map(|s| StringSelector::ExactMatch(s.to_string())).collect(),
667 ),
668 ..Default::default()
669 },
670 interest: FidlInterest { min_severity, ..Default::default() },
671 }
672 }
673
674 async fn assert_severity(proxy: &LogSinkProxy, severity: Severity) {
675 assert_eq!(
676 proxy.wait_for_interest_change().await.unwrap().unwrap().min_severity.unwrap(),
677 severity
678 );
679 }
680
681 fn assert_interests<const N: usize>(
682 container: &LogsArtifactsContainer,
683 severities: [(Severity, usize); N],
684 ) {
685 let mut expected_map = BTreeMap::new();
686 expected_map.extend(IntoIterator::into_iter(severities).map(|(s, c)| {
687 let interest = FidlInterest { min_severity: Some(s), ..Default::default() };
688 (interest.into(), c)
689 }));
690 assert_eq!(expected_map, container.state.lock().interests);
691 }
692}