1use crate::diagnostics::TRACE_CATEGORY;
6use crate::identity::ComponentIdentity;
7use crate::logs::multiplex::PinStream;
8use crate::logs::shared_buffer::{self, ContainerBuffer, LazyItem};
9use crate::logs::stats::LogStreamStats;
10use crate::logs::stored_message::StoredMessage;
11use derivative::Derivative;
12use diagnostics_data::{BuilderArgs, Data, LogError, Logs, LogsData, LogsDataBuilder};
13use fidl::endpoints::RequestStream;
14use fidl_fuchsia_diagnostics::{LogInterestSelector, StreamMode};
15use fidl_fuchsia_diagnostics_types::{Interest as FidlInterest, Severity as FidlSeverity};
16use fidl_fuchsia_logger::{LogSinkOnInitRequest, LogSinkRequest, LogSinkRequestStream};
17use fuchsia_async::condition::Condition;
18use futures::future::{Fuse, FusedFuture};
19use futures::prelude::*;
20use futures::select;
21use futures::stream::StreamExt;
22use log::{debug, error};
23use selectors::SelectorExt;
24use std::cmp::Ordering;
25use std::collections::BTreeMap;
26use std::pin::pin;
27use std::sync::Arc;
28use std::task::Poll;
29use {fuchsia_async as fasync, fuchsia_trace as ftrace};
30
31pub type OnInactive = Box<dyn Fn(&LogsArtifactsContainer) + Send + Sync>;
32
33#[derive(Derivative)]
34#[derivative(Debug)]
35pub struct LogsArtifactsContainer {
36 pub identity: Arc<ComponentIdentity>,
38
39 pub stats: Arc<LogStreamStats>,
41
42 #[derivative(Debug = "ignore")]
44 buffer: ContainerBuffer,
45
46 #[derivative(Debug = "ignore")]
48 state: Arc<Condition<ContainerState>>,
49
50 #[derivative(Debug = "ignore")]
53 on_inactive: Option<OnInactive>,
54}
55
56#[derive(Debug)]
57struct ContainerState {
58 num_active_channels: u64,
60
61 interests: BTreeMap<Interest, usize>,
63
64 is_initializing: bool,
65}
66
67#[derive(Debug, PartialEq)]
68pub struct CursorItem {
69 pub rolled_out: u64,
70 pub message: Arc<StoredMessage>,
71 pub identity: Arc<ComponentIdentity>,
72}
73
74impl Eq for CursorItem {}
75
76impl Ord for CursorItem {
77 fn cmp(&self, other: &Self) -> Ordering {
78 self.message.timestamp().cmp(&other.message.timestamp())
79 }
80}
81
82impl PartialOrd for CursorItem {
83 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84 Some(self.cmp(other))
85 }
86}
87
88impl LogsArtifactsContainer {
89 pub fn new<'a>(
90 identity: Arc<ComponentIdentity>,
91 interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
92 initial_interest: Option<FidlSeverity>,
93 stats: Arc<LogStreamStats>,
94 buffer: ContainerBuffer,
95 on_inactive: Option<OnInactive>,
96 ) -> Self {
97 let mut interests = BTreeMap::new();
98 if let Some(severity) = initial_interest {
99 interests.insert(Interest::from(severity), 1);
100 }
101 let new = Self {
102 identity,
103 buffer,
104 state: Arc::new(Condition::new(ContainerState {
105 num_active_channels: 0,
106 interests,
107 is_initializing: true,
108 })),
109 stats,
110 on_inactive,
111 };
112
113 new.update_interest(interest_selectors, &[]);
115
116 new
117 }
118
119 fn create_raw_cursor(
120 &self,
121 buffer_cursor: shared_buffer::Cursor,
122 ) -> impl Stream<Item = CursorItem> {
123 let identity = Arc::clone(&self.identity);
124 buffer_cursor
125 .enumerate()
126 .scan((zx::BootInstant::ZERO, 0u64), move |(last_timestamp, rolled_out), (i, item)| {
127 futures::future::ready(match item {
128 LazyItem::Next(message) => {
129 *last_timestamp = message.timestamp();
130 Some(Some(CursorItem {
131 message,
132 identity: Arc::clone(&identity),
133 rolled_out: *rolled_out,
134 }))
135 }
136 LazyItem::ItemsRolledOut(rolled_out_count, timestamp) => {
137 if i > 0 {
138 *rolled_out += rolled_out_count;
139 }
140 *last_timestamp = timestamp;
141 Some(None)
142 }
143 })
144 })
145 .filter_map(future::ready)
146 }
147
148 pub fn cursor_raw(&self, mode: StreamMode) -> PinStream<CursorItem> {
156 let Some(buffer_cursor) = self.buffer.cursor(mode) else {
157 return Box::pin(futures::stream::empty());
158 };
159 Box::pin(self.create_raw_cursor(buffer_cursor))
160 }
161
162 pub fn cursor(
170 &self,
171 mode: StreamMode,
172 parent_trace_id: ftrace::Id,
173 ) -> PinStream<Arc<LogsData>> {
174 let Some(buffer_cursor) = self.buffer.cursor(mode) else {
175 return Box::pin(futures::stream::empty());
176 };
177 let mut rolled_out_count = 0;
178 Box::pin(self.create_raw_cursor(buffer_cursor).map(
179 move |CursorItem { message, identity, rolled_out }| {
180 rolled_out_count += rolled_out;
181 let trace_id = ftrace::Id::random();
182 let _trace_guard = ftrace::async_enter!(
183 trace_id,
184 TRACE_CATEGORY,
185 c"LogContainer::cursor.parse_message",
186 "parent_trace_id" => u64::from(parent_trace_id),
189 "trace_id" => u64::from(trace_id)
190 );
191 match message.parse(&identity) {
192 Ok(m) => Arc::new(maybe_add_rolled_out_error(&mut rolled_out_count, m)),
193 Err(err) => {
194 let data = maybe_add_rolled_out_error(
195 &mut rolled_out_count,
196 LogsDataBuilder::new(BuilderArgs {
197 moniker: identity.moniker.clone(),
198 timestamp: message.timestamp(),
199 component_url: Some(identity.url.clone()),
200 severity: diagnostics_data::Severity::Warn,
201 })
202 .add_error(diagnostics_data::LogError::FailedToParseRecord(format!(
203 "{err:?}"
204 )))
205 .build(),
206 );
207 Arc::new(data)
208 }
209 }
210 },
211 ))
212 }
213
214 pub fn handle_log_sink(
218 self: &Arc<Self>,
219 stream: LogSinkRequestStream,
220 scope: fasync::ScopeHandle,
221 ) {
222 if stream
223 .control_handle()
224 .send_on_init(LogSinkOnInitRequest {
225 buffer: Some(self.buffer.iob()),
226 interest: Some(self.state.lock().min_interest()),
227 ..Default::default()
228 })
229 .is_err()
230 {
231 return;
232 }
233
234 {
235 let mut guard = self.state.lock();
236 guard.num_active_channels += 1;
237 guard.is_initializing = false;
238 }
239 scope.spawn(Arc::clone(self).actually_handle_log_sink(stream));
240 }
241
242 async fn actually_handle_log_sink(self: Arc<Self>, mut stream: LogSinkRequestStream) {
244 let mut previous_interest_sent = None;
245 debug!(identity:% = self.identity; "Draining LogSink channel.");
246
247 let mut hanging_gets = Vec::new();
248 let mut interest_changed = pin!(Fuse::terminated());
249
250 loop {
251 select! {
252 next = stream.next() => {
253 let Some(next) = next else { break };
254 match next {
255 Ok(LogSinkRequest::ConnectStructured { socket, .. }) => {
256 self.buffer.add_socket(socket);
257 }
258 Ok(LogSinkRequest::WaitForInterestChange { responder }) => {
259 hanging_gets.push(responder);
262 }
263 Err(e) => error!(identity:% = self.identity, e:%; "error handling log sink"),
264 Ok(LogSinkRequest::_UnknownMethod { .. }) => {}
265 }
266 }
267 _ = interest_changed => {}
268 }
269
270 if !hanging_gets.is_empty() {
271 let min_interest = self.state.lock().min_interest();
272 if Some(&min_interest) != previous_interest_sent.as_ref() {
273 for responder in hanging_gets.drain(..) {
275 let _ = responder.send(Ok(&min_interest));
276 }
277 interest_changed.set(Fuse::terminated());
278 previous_interest_sent = Some(min_interest);
279 } else if interest_changed.is_terminated() {
280 let previous_interest_sent = previous_interest_sent.clone();
282 interest_changed.set(
283 self.state
284 .when(move |state| {
285 if previous_interest_sent != Some(state.min_interest()) {
286 Poll::Ready(())
287 } else {
288 Poll::Pending
289 }
290 })
291 .fuse(),
292 );
293 }
294 }
295 }
296
297 debug!(identity:% = self.identity; "LogSink channel closed.");
298 self.state.lock().num_active_channels -= 1;
299 self.check_inactive();
300 }
301
302 pub fn ingest_message(&self, message: StoredMessage) {
304 self.buffer.push_back(message.bytes());
305 }
306
307 pub fn update_interest<'a>(
312 &self,
313 interest_selectors: impl Iterator<Item = &'a LogInterestSelector>,
314 previous_selectors: &[LogInterestSelector],
315 ) {
316 let mut new_interest = FidlInterest::default();
317 let mut remove_interest = FidlInterest::default();
318 for selector in interest_selectors {
319 if self
320 .identity
321 .moniker
322 .matches_component_selector(&selector.selector)
323 .unwrap_or_default()
324 {
325 new_interest = selector.interest.clone();
326 break;
328 }
329 }
330
331 if let Some(previous_selector) = previous_selectors.iter().find(|s| {
332 self.identity.moniker.matches_component_selector(&s.selector).unwrap_or_default()
333 }) {
334 remove_interest = previous_selector.interest.clone();
335 }
336
337 let mut state = self.state.lock();
338 if new_interest == FidlInterest::default() && remove_interest != FidlInterest::default() {
342 state.erase(&remove_interest);
343 } else if new_interest != FidlInterest::default()
344 && remove_interest == FidlInterest::default()
345 {
346 state.push_interest(new_interest);
347 } else if new_interest != FidlInterest::default()
348 && remove_interest != FidlInterest::default()
349 {
350 state.erase(&remove_interest);
351 state.push_interest(new_interest);
352 } else {
353 return;
354 }
355
356 for waker in state.drain_wakers() {
357 waker.wake();
358 }
359 }
360
361 pub fn reset_interest(&self, interest_selectors: &[LogInterestSelector]) {
365 for selector in interest_selectors {
366 if self
367 .identity
368 .moniker
369 .matches_component_selector(&selector.selector)
370 .unwrap_or_default()
371 {
372 let mut state = self.state.lock();
373 state.erase(&selector.interest);
374 for waker in state.drain_wakers() {
375 waker.wake();
376 }
377 return;
378 }
379 }
380 }
381
382 pub fn is_active(&self) -> bool {
385 let state = self.state.lock();
386 state.is_initializing || state.num_active_channels > 0 || self.buffer.is_active()
387 }
388
389 fn check_inactive(&self) {
391 if !self.is_active() {
392 if let Some(on_inactive) = &self.on_inactive {
393 on_inactive(self);
394 }
395 }
396 }
397
398 pub fn terminate(&self) {
401 self.buffer.terminate();
402 }
403
404 #[cfg(test)]
405 pub fn mark_stopped(&self) {
406 self.state.lock().is_initializing = false;
407 self.check_inactive();
408 }
409
410 pub fn buffer(&self) -> &ContainerBuffer {
411 &self.buffer
412 }
413}
414
415fn maybe_add_rolled_out_error(rolled_out_messages: &mut u64, mut msg: Data<Logs>) -> Data<Logs> {
416 if *rolled_out_messages != 0 {
417 msg.metadata
419 .errors
420 .get_or_insert(vec![])
421 .push(LogError::RolledOutLogs { count: *rolled_out_messages });
422 }
423 *rolled_out_messages = 0;
424 msg
425}
426
427impl ContainerState {
428 fn push_interest(&mut self, interest: FidlInterest) {
430 if interest != FidlInterest::default() {
431 let count = self.interests.entry(interest.into()).or_insert(0);
432 *count += 1;
433 }
434 }
435
436 fn erase(&mut self, interest: &FidlInterest) {
438 let interest = interest.clone().into();
439 if let Some(count) = self.interests.get_mut(&interest) {
440 if *count <= 1 {
441 self.interests.remove(&interest);
442 } else {
443 *count -= 1;
444 }
445 }
446 }
447
448 fn min_interest(&self) -> FidlInterest {
451 self.interests.keys().next().map(|i| i.0.clone()).unwrap_or_default()
453 }
454}
455
456#[derive(Debug, PartialEq)]
457struct Interest(FidlInterest);
458
459impl From<FidlInterest> for Interest {
460 fn from(interest: FidlInterest) -> Interest {
461 Interest(interest)
462 }
463}
464
465impl From<FidlSeverity> for Interest {
466 fn from(severity: FidlSeverity) -> Interest {
467 Interest(FidlInterest { min_severity: Some(severity), ..Default::default() })
468 }
469}
470
471impl std::ops::Deref for Interest {
472 type Target = FidlInterest;
473 fn deref(&self) -> &Self::Target {
474 &self.0
475 }
476}
477
478impl Eq for Interest {}
479
480impl Ord for Interest {
481 fn cmp(&self, other: &Self) -> Ordering {
482 self.min_severity.cmp(&other.min_severity)
483 }
484}
485
486impl PartialOrd for Interest {
487 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
488 Some(self.cmp(other))
489 }
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use crate::logs::shared_buffer::{SharedBuffer, create_ring_buffer};
496 use fidl_fuchsia_diagnostics::{ComponentSelector, StringSelector};
497 use fidl_fuchsia_diagnostics_types::Severity;
498 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkProxy};
499 use fuchsia_async::{Task, TestExecutor};
500 use fuchsia_inspect as inspect;
501 use fuchsia_inspect_derive::WithInspect;
502 use moniker::ExtendedMoniker;
503
504 fn initialize_container(
505 severity: Option<Severity>,
506 scope: fasync::ScopeHandle,
507 ) -> (Arc<LogsArtifactsContainer>, LogSinkProxy) {
508 let identity = Arc::new(ComponentIdentity::new(
509 ExtendedMoniker::parse_str("/foo/bar").unwrap(),
510 "fuchsia-pkg://test",
511 ));
512 let stats = Arc::new(
513 LogStreamStats::default()
514 .with_inspect(inspect::component::inspector().root(), identity.moniker.to_string())
515 .expect("failed to attach component log stats"),
516 );
517 let buffer = SharedBuffer::new(
518 create_ring_buffer(1024 * 1024),
519 Box::new(|_| {}),
520 Default::default(),
521 );
522 let container = Arc::new(LogsArtifactsContainer::new(
523 identity,
524 std::iter::empty(),
525 severity,
526 Arc::clone(&stats),
527 buffer.new_container_buffer(Arc::new(vec!["a"].into()), stats),
528 None,
529 ));
530 let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
532 container.handle_log_sink(stream, scope);
533 (container, proxy)
534 }
535
536 #[fuchsia::test(allow_stalls = false)]
537 async fn update_interest() {
538 let scope = fasync::Scope::new();
540 let (container, log_sink) = initialize_container(None, scope.to_handle());
541
542 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
544
545 assert_eq!(initial_interest.min_severity, None);
547 let log_sink_clone = log_sink.clone();
548 let mut interest_future =
549 Task::spawn(async move { log_sink_clone.wait_for_interest_change().await });
550
551 assert!(TestExecutor::poll_until_stalled(&mut interest_future).await.is_pending());
553
554 container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
556
557 assert_eq!(interest_future.await.unwrap().unwrap().min_severity, Some(Severity::Info));
559 }
560
561 #[fuchsia::test]
562 async fn initial_interest() {
563 let scope = fasync::Scope::new();
564 let (_container, log_sink) = initialize_container(Some(Severity::Info), scope.to_handle());
565 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
566 assert_eq!(initial_interest.min_severity, Some(Severity::Info));
567 }
568
569 #[fuchsia::test]
570 async fn interest_serverity_semantics() {
571 let scope = fasync::Scope::new();
572 let (container, log_sink) = initialize_container(None, scope.to_handle());
573 let initial_interest = log_sink.wait_for_interest_change().await.unwrap().unwrap();
574 assert_eq!(initial_interest.min_severity, None);
575 container.update_interest([interest(&["foo", "bar"], Some(Severity::Info))].iter(), &[]);
577 assert_severity(&log_sink, Severity::Info).await;
578 assert_interests(&container, [(Severity::Info, 1)]);
579
580 container.update_interest([interest(&["foo", "bar"], Some(Severity::Warn))].iter(), &[]);
583 assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
584
585 container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
587 assert_severity(&log_sink, Severity::Debug).await;
588 assert_interests(
589 &container,
590 [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
591 );
592
593 container.update_interest([interest(&["foo", "bar"], Some(Severity::Debug))].iter(), &[]);
596 assert_interests(
597 &container,
598 [(Severity::Debug, 2), (Severity::Info, 1), (Severity::Warn, 1)],
599 );
600
601 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
604 assert_interests(
605 &container,
606 [(Severity::Debug, 1), (Severity::Info, 1), (Severity::Warn, 1)],
607 );
608
609 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Debug))]);
611 assert_severity(&log_sink, Severity::Info).await;
612 assert_interests(&container, [(Severity::Info, 1), (Severity::Warn, 1)]);
613
614 container.update_interest(
617 [interest(&["foo", "bar"], Some(Severity::Error))].iter(),
618 &[interest(&["foo", "bar"], Some(Severity::Info))],
619 );
620 assert_severity(&log_sink, Severity::Warn).await;
621 assert_interests(&container, [(Severity::Error, 1), (Severity::Warn, 1)]);
622
623 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Warn))]);
625 assert_severity(&log_sink, Severity::Error).await;
626 assert_interests(&container, [(Severity::Error, 1)]);
627
628 container.reset_interest(&[interest(&["foo", "bar"], Some(Severity::Error))]);
631 assert_eq!(
632 log_sink.wait_for_interest_change().await.unwrap().unwrap(),
633 FidlInterest::default()
634 );
635
636 assert_interests(&container, []);
637 }
638
639 fn interest(moniker: &[&str], min_severity: Option<Severity>) -> LogInterestSelector {
640 LogInterestSelector {
641 selector: ComponentSelector {
642 moniker_segments: Some(
643 moniker.iter().map(|s| StringSelector::ExactMatch(s.to_string())).collect(),
644 ),
645 ..Default::default()
646 },
647 interest: FidlInterest { min_severity, ..Default::default() },
648 }
649 }
650
651 async fn assert_severity(proxy: &LogSinkProxy, severity: Severity) {
652 assert_eq!(
653 proxy.wait_for_interest_change().await.unwrap().unwrap().min_severity.unwrap(),
654 severity
655 );
656 }
657
658 fn assert_interests<const N: usize>(
659 container: &LogsArtifactsContainer,
660 severities: [(Severity, usize); N],
661 ) {
662 let mut expected_map = BTreeMap::new();
663 expected_map.extend(IntoIterator::into_iter(severities).map(|(s, c)| {
664 let interest = FidlInterest { min_severity: Some(s), ..Default::default() };
665 (interest.into(), c)
666 }));
667 assert_eq!(expected_map, container.state.lock().interests);
668 }
669}