1use crate::events::types::*;
6use crate::identity::ComponentIdentity;
7use fuchsia_inspect::{self as inspect, NumericProperty};
8use fuchsia_inspect_contrib::inspect_log;
9use fuchsia_inspect_contrib::nodes::BoundedListNode;
10use futures::channel::{mpsc, oneshot};
11use futures::task::{Context, Poll};
12use futures::{Future, Stream, StreamExt};
13use log::{debug, error};
14use pin_project::pin_project;
15use std::collections::{BTreeMap, BTreeSet};
16use std::pin::Pin;
17use std::sync::{Arc, Weak};
18use thiserror::Error;
19
20const RECENT_EVENT_LIMIT: usize = 200;
21
22pub struct EventRouter {
25 consumers: BTreeMap<EventType, Vec<Weak<dyn EventConsumer + Send + Sync>>>,
27 producers_registered: BTreeSet<EventType>,
29
30 sender: mpsc::UnboundedSender<Event>,
32 receiver: mpsc::UnboundedReceiver<Event>,
33
34 inspect_logger: EventStreamLogger,
35}
36
37impl EventRouter {
38 pub fn new(node: inspect::Node) -> Self {
40 let (sender, receiver) = mpsc::unbounded();
41 Self {
42 consumers: BTreeMap::new(),
43 sender,
44 receiver,
45 producers_registered: BTreeSet::new(),
46 inspect_logger: EventStreamLogger::new(node),
47 }
48 }
49
50 pub fn add_producer<T>(&mut self, config: ProducerConfig<'_, T>)
53 where
54 T: EventProducer,
55 {
56 let events: BTreeSet<_> = config.events.into_iter().collect();
57 self.producers_registered.append(&mut events.clone());
58 let dispatcher = Dispatcher::new(events, self.sender.clone());
59 config.producer.set_dispatcher(dispatcher);
60 }
61
62 pub fn add_consumer<T>(&mut self, config: ConsumerConfig<'_, T>)
65 where
66 T: EventConsumer + Send + Sync + 'static,
67 {
68 let subscriber_weak = Arc::downgrade(config.consumer);
69 for event_type in config.events {
70 self.consumers
71 .entry(event_type)
72 .or_default()
73 .push(Weak::clone(&subscriber_weak) as Weak<dyn EventConsumer + Send + Sync>);
74 }
75 }
76
77 pub fn start(mut self) -> Result<(TerminateHandle, impl Future<Output = ()>), RouterError> {
88 self.validate_routing()?;
89
90 let (terminate_handle, mut stream) = EventStream::new(self.receiver);
91 let mut consumers = self.consumers;
92 let mut inspect_logger = self.inspect_logger;
93
94 let fut = async move {
95 loop {
96 match stream.next().await {
97 None => {
98 debug!("Event ingestion finished");
99 break;
100 }
101 Some(event) => {
102 inspect_logger.log(&event);
103
104 let event_type = event.ty();
105 let weak_consumers = match consumers.remove(&event_type) {
106 Some(c) => c,
107 None => continue,
108 };
109
110 let mut singleton_event = Some(event);
111
112 let mut active_consumers = vec![];
114 for weak_consumer in weak_consumers {
115 if let Some(consumer) = weak_consumer.upgrade() {
116 active_consumers.push(weak_consumer);
117 if let Some(e) = singleton_event.take() {
118 consumer.handle(e);
119 };
120 }
121 }
122
123 consumers.insert(event_type, active_consumers);
125 }
126 }
127 }
128 };
129 Ok((terminate_handle, fut))
130 }
131
132 fn validate_routing(&mut self) -> Result<(), RouterError> {
133 for consumed_event in self.consumers.keys() {
134 if !self.producers_registered.contains(consumed_event) {
135 return Err(RouterError::MissingProducer(consumed_event.clone()));
136 }
137 }
138 for produced_event in &self.producers_registered {
139 if !self.consumers.contains_key(produced_event) {
140 return Err(RouterError::MissingConsumer(produced_event.clone()));
141 }
142 }
143 Ok(())
144 }
145}
146
147#[pin_project]
150struct EventStream {
151 #[pin]
153 receiver: mpsc::UnboundedReceiver<Event>,
154
155 #[pin]
158 on_terminate: oneshot::Receiver<()>,
159
160 on_drained: Option<oneshot::Sender<()>>,
162}
163
164impl EventStream {
165 fn new(receiver: mpsc::UnboundedReceiver<Event>) -> (TerminateHandle, Self) {
166 let (snd, rcv) = oneshot::channel();
167 let (drain_snd, drain_rcv) = oneshot::channel();
168 (
169 TerminateHandle { snd, drained: drain_rcv },
170 Self { receiver, on_terminate: rcv, on_drained: Some(drain_snd) },
171 )
172 }
173}
174
175impl Stream for EventStream {
176 type Item = Event;
177
178 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188 let mut this = self.project();
189
190 match this.on_terminate.poll(cx) {
199 Poll::Pending => {}
200 Poll::Ready(_) => {
201 this.receiver.close();
202 }
203 }
204 match this.receiver.poll_next(cx) {
206 Poll::Pending => Poll::Pending,
207 Poll::Ready(None) => {
208 if let Some(snd) = this.on_drained.take() {
210 snd.send(()).unwrap_or_else(|err| {
211 error!(err:?; "Failed to notify the events have been drained.");
212 });
213 };
214 Poll::Ready(None)
215 }
216 res @ Poll::Ready(Some(_)) => res,
217 }
218 }
219}
220
221pub struct TerminateHandle {
223 snd: oneshot::Sender<()>,
224 drained: oneshot::Receiver<()>,
225}
226
227impl TerminateHandle {
228 pub async fn terminate(self) {
231 self.snd.send(()).unwrap_or_else(|err| {
232 error!(err:?; "Failed to terminate the event ingestion.");
233 });
234 self.drained
235 .await
236 .unwrap_or_else(|err| error!(err:?; "Error waiting for events to be drained."));
237 }
238}
239
240pub struct Dispatcher {
245 allowed_events: BTreeSet<EventType>,
246 sender: Option<mpsc::UnboundedSender<Event>>,
247}
248
249impl Default for Dispatcher {
251 fn default() -> Self {
252 Self { allowed_events: BTreeSet::new(), sender: None }
253 }
254}
255
256impl Dispatcher {
257 fn new(allowed_events: BTreeSet<EventType>, sender: mpsc::UnboundedSender<Event>) -> Self {
258 Self { allowed_events, sender: Some(sender) }
259 }
260
261 pub fn emit(&mut self, event: Event) -> Result<(), mpsc::TrySendError<Event>> {
264 if let Some(sender) = &mut self.sender {
265 if self.allowed_events.contains(&event.ty()) {
266 sender.unbounded_send(event)?;
267 }
268 }
269 Ok(())
270 }
271
272 #[cfg(test)]
273 pub fn new_for_test(
274 allowed_events: BTreeSet<EventType>,
275 ) -> (mpsc::UnboundedReceiver<Event>, Self) {
276 let (sender, receiver) = mpsc::unbounded();
277 (receiver, Self::new(allowed_events, sender))
278 }
279}
280
281struct EventStreamLogger {
282 counters: BTreeMap<EventType, inspect::UintProperty>,
283 component_log_node: BoundedListNode,
284 counters_node: inspect::Node,
285 _node: inspect::Node,
286}
287
288impl EventStreamLogger {
289 pub fn new(node: inspect::Node) -> Self {
291 let counters_node = node.create_child("event_counts");
292 let recent_events_node = node.create_child("recent_events");
293 Self {
294 _node: node,
295 counters: BTreeMap::new(),
296 counters_node,
297 component_log_node: BoundedListNode::new(recent_events_node, RECENT_EVENT_LIMIT),
298 }
299 }
300
301 pub fn log(&mut self, event: &Event) {
303 let ty = event.ty();
304 if self.counters.contains_key(&ty) {
305 self.counters.get_mut(&ty).unwrap().add(1);
306 } else {
307 let counter = self.counters_node.create_uint(ty.as_ref(), 1);
308 self.counters.insert(ty.clone(), counter);
309 }
310 match &event.payload {
312 EventPayload::LogSinkRequested(LogSinkRequestedPayload { component, .. })
313 | EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
314 component, ..
315 }) => {
316 self.log_inspect(ty.as_ref(), component);
317 }
318 }
319 }
320
321 fn log_inspect(&mut self, event_name: &str, identity: &ComponentIdentity) {
322 inspect_log!(self.component_log_node,
324 "event" => event_name,
325 "moniker" => identity.moniker.to_string(),
326 );
327 }
328}
329
330#[derive(Debug, Error)]
332pub enum RouterError {
333 #[error("Missing consumer for event type {0:?}")]
334 MissingConsumer(EventType),
335
336 #[error("Missing producer for event type {0:?}")]
337 MissingProducer(EventType),
338}
339
340pub struct ProducerConfig<'a, T> {
342 pub producer: &'a mut T,
344
345 pub events: Vec<EventType>,
347}
348
349pub struct ConsumerConfig<'a, T> {
351 pub consumer: &'a Arc<T>,
353
354 pub events: Vec<EventType>,
356}
357
358pub trait EventConsumer {
360 fn handle(self: Arc<Self>, event: Event);
363}
364
365pub trait EventProducer {
367 fn set_dispatcher(&mut self, dispatcher: Dispatcher);
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use assert_matches::assert_matches;
376 use diagnostics_assertions::{assert_data_tree, AnyProperty};
377 use fidl::encoding::ProxyChannelBox;
378 use fidl::endpoints::RequestStream;
379 use fidl_fuchsia_inspect::InspectSinkMarker;
380 use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequestStream};
381 use fuchsia_async as fasync;
382 use fuchsia_sync::Mutex;
383 use futures::FutureExt;
384 use moniker::ExtendedMoniker;
385 use std::sync::LazyLock;
386 use zx::AsHandleRef;
387
388 const TEST_URL: &str = "NO-OP URL";
389 const FAKE_TIMESTAMP: i64 = 5;
390 static IDENTITY: LazyLock<Arc<ComponentIdentity>> = LazyLock::new(|| {
391 Arc::new(ComponentIdentity::new(ExtendedMoniker::parse_str("./a/b").unwrap(), TEST_URL))
392 });
393
394 #[derive(Default)]
395 struct TestEventProducer {
396 dispatcher: Dispatcher,
397 }
398
399 impl TestEventProducer {
400 fn emit(&mut self, event_type: EventType, identity: Arc<ComponentIdentity>) {
401 let event = match event_type {
402 EventType::LogSinkRequested => {
403 let (_, request_stream) =
404 fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
405 Event {
406 timestamp: zx::BootInstant::from_nanos(FAKE_TIMESTAMP),
407 payload: EventPayload::LogSinkRequested(LogSinkRequestedPayload {
408 component: identity,
409 request_stream,
410 }),
411 }
412 }
413 EventType::InspectSinkRequested => {
414 let (_, request_stream) =
415 fidl::endpoints::create_proxy_and_stream::<InspectSinkMarker>();
416 Event {
417 timestamp: zx::BootInstant::from_nanos(FAKE_TIMESTAMP),
418 payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
419 component: identity,
420 request_stream,
421 }),
422 }
423 }
424 };
425 let _ = self.dispatcher.emit(event);
426 }
427 }
428
429 impl EventProducer for TestEventProducer {
430 fn set_dispatcher(&mut self, dispatcher: Dispatcher) {
431 self.dispatcher = dispatcher;
432 }
433 }
434
435 struct TestEventConsumer {
436 event_sender: Mutex<mpsc::UnboundedSender<Event>>,
437 }
438
439 impl TestEventConsumer {
440 fn new() -> (mpsc::UnboundedReceiver<Event>, Arc<Self>) {
441 let (event_sender, event_receiver) = mpsc::unbounded();
442 (event_receiver, Arc::new(Self { event_sender: Mutex::new(event_sender) }))
443 }
444 }
445
446 impl EventConsumer for TestEventConsumer {
447 fn handle(self: Arc<Self>, event: Event) {
448 self.event_sender.lock().unbounded_send(event).unwrap();
449 }
450 }
451
452 #[fuchsia::test]
453 fn invalid_routing() {
454 let mut producer = TestEventProducer::default();
455 let (_receiver, consumer) = TestEventConsumer::new();
456 let mut router = EventRouter::new(inspect::Node::default());
457 router.add_producer(ProducerConfig {
458 producer: &mut producer,
459 events: vec![EventType::InspectSinkRequested],
460 });
461 router.add_consumer(ConsumerConfig {
462 consumer: &consumer,
463 events: vec![EventType::InspectSinkRequested, EventType::LogSinkRequested],
464 });
465
466 match router.start() {
470 Err(err) => {
471 assert_matches!(err, RouterError::MissingProducer(EventType::LogSinkRequested));
472 }
473 Ok(_) => panic!("expected an error from routing events"),
474 }
475
476 let mut producer = TestEventProducer::default();
477 let (_receiver, consumer) = TestEventConsumer::new();
478 let mut router = EventRouter::new(inspect::Node::default());
479 router.add_producer(ProducerConfig {
480 producer: &mut producer,
481 events: vec![EventType::InspectSinkRequested],
482 });
483 router.add_consumer(ConsumerConfig {
484 consumer: &consumer,
485 events: vec![EventType::LogSinkRequested],
486 });
487
488 match router.start() {
489 Err(err) => {
490 assert_matches!(
491 err,
492 RouterError::MissingConsumer(EventType::InspectSinkRequested)
493 | RouterError::MissingProducer(EventType::LogSinkRequested)
494 );
495 }
496 Ok(_) => panic!("expected an error from routing events"),
497 }
498 }
499
500 #[fuchsia::test]
501 async fn event_subscription() {
502 let mut producer = TestEventProducer::default();
503 let (mut first_receiver, first_consumer) = TestEventConsumer::new();
504 let (mut second_receiver, second_consumer) = TestEventConsumer::new();
505 let mut router = EventRouter::new(inspect::Node::default());
506 router.add_producer(ProducerConfig {
507 producer: &mut producer,
508 events: vec![EventType::LogSinkRequested],
509 });
510 router.add_consumer(ConsumerConfig {
511 consumer: &first_consumer,
512 events: vec![EventType::LogSinkRequested],
513 });
514 router.add_consumer(ConsumerConfig {
515 consumer: &second_consumer,
516 events: vec![EventType::LogSinkRequested],
517 });
518
519 let (_terminate_handle, fut) = router.start().unwrap();
520 let _router_task = fasync::Task::spawn(fut);
521
522 let (_, server_end) = fidl::endpoints::create_proxy::<LogSinkMarker>();
524 let request_stream_koid = server_end.as_handle_ref().get_koid().unwrap();
525 let request_stream = LogSinkRequestStream::from_channel(fidl::AsyncChannel::from_channel(
526 server_end.into_channel(),
527 ));
528 let timestamp = zx::BootInstant::get();
529 producer
530 .dispatcher
531 .emit(Event {
532 timestamp,
533 payload: EventPayload::LogSinkRequested(LogSinkRequestedPayload {
534 component: IDENTITY.clone(),
535 request_stream,
536 }),
537 })
538 .unwrap();
539
540 let first_event = first_receiver.next().await.unwrap();
543 assert_matches!(first_event, Event {
544 payload: EventPayload::LogSinkRequested(payload),
545 ..
546 } => {
547 assert_eq!(payload.component, *IDENTITY);
548 let actual_koid = payload.request_stream
549 .into_inner().0.channel().as_channel().as_handle_ref().get_koid().unwrap();
550 assert_eq!(actual_koid, request_stream_koid);
551 });
552 assert!(second_receiver.next().now_or_never().is_none());
553 }
554
555 #[fuchsia::test]
556 async fn consumers_cleanup() {
557 let mut producer = TestEventProducer::default();
558 let (mut first_receiver, first_consumer) = TestEventConsumer::new();
559 let (mut second_receiver, second_consumer) = TestEventConsumer::new();
560 let (mut third_receiver, third_consumer) = TestEventConsumer::new();
561 let mut router = EventRouter::new(inspect::Node::default());
562 router.add_producer(ProducerConfig {
563 producer: &mut producer,
564 events: vec![EventType::InspectSinkRequested],
565 });
566 router.add_consumer(ConsumerConfig {
567 consumer: &first_consumer,
568 events: vec![EventType::InspectSinkRequested],
569 });
570 router.add_consumer(ConsumerConfig {
571 consumer: &second_consumer,
572 events: vec![EventType::InspectSinkRequested],
573 });
574 router.add_consumer(ConsumerConfig {
575 consumer: &third_consumer,
576 events: vec![EventType::InspectSinkRequested],
577 });
578
579 drop(first_consumer);
580 drop(third_consumer);
581
582 let (_terminate_handle, fut) = router.start().unwrap();
583 let _router_task = fasync::Task::spawn(fut);
584
585 let (_, request_stream) = fidl::endpoints::create_proxy_and_stream::<InspectSinkMarker>();
587 producer
588 .dispatcher
589 .emit(Event {
590 timestamp: zx::BootInstant::get(),
591 payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
592 component: IDENTITY.clone(),
593 request_stream,
594 }),
595 })
596 .unwrap();
597
598 let event = second_receiver.next().await.unwrap();
600 assert_matches!(event.payload, EventPayload::InspectSinkRequested(_));
601 assert!(first_receiver.next().now_or_never().unwrap().is_none());
602 assert!(third_receiver.next().now_or_never().unwrap().is_none());
603
604 let (_, request_stream) = fidl::endpoints::create_proxy_and_stream::<InspectSinkMarker>();
606 producer
607 .dispatcher
608 .emit(Event {
609 timestamp: zx::BootInstant::get(),
610 payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
611 component: IDENTITY.clone(),
612 request_stream,
613 }),
614 })
615 .unwrap();
616 let event = second_receiver.next().await.unwrap();
617 assert_matches!(event.payload, EventPayload::InspectSinkRequested(_));
618 assert!(first_receiver.next().now_or_never().unwrap().is_none());
619 assert!(third_receiver.next().now_or_never().unwrap().is_none());
620 }
621
622 #[fuchsia::test]
623 async fn inspect_log() {
624 let inspector = inspect::Inspector::default();
625 let mut router = EventRouter::new(inspector.root().create_child("events"));
626 let mut producer1 = TestEventProducer::default();
627 let mut producer2 = TestEventProducer::default();
628 let (receiver, consumer) = TestEventConsumer::new();
629 router.add_consumer(ConsumerConfig {
630 consumer: &consumer,
631 events: vec![EventType::InspectSinkRequested, EventType::LogSinkRequested],
632 });
633 router.add_producer(ProducerConfig {
634 producer: &mut producer1,
635 events: vec![EventType::LogSinkRequested],
636 });
637 router.add_producer(ProducerConfig {
638 producer: &mut producer2,
639 events: vec![EventType::InspectSinkRequested],
640 });
641
642 producer1.emit(EventType::LogSinkRequested, IDENTITY.clone());
643 producer2.emit(EventType::InspectSinkRequested, IDENTITY.clone());
644
645 let (_terminate_handle, fut) = router.start().unwrap();
647 let _router_task = fasync::Task::spawn(fut);
648 receiver.take(2).collect::<Vec<_>>().await;
649
650 assert_data_tree!(inspector, root: {
651 events: {
652 event_counts: {
653 log_sink_requested: 1u64,
654 inspect_sink_requested: 1u64,
655 },
656 recent_events: {
657 "0": {
658 "@time": AnyProperty,
659 event: "log_sink_requested",
660 moniker: "a/b"
661 },
662 "1": {
663 "@time": AnyProperty,
664 event: "inspect_sink_requested",
665 moniker: "a/b"
666 },
667 }
668 }
669 });
670 }
671
672 #[fuchsia::test]
673 async fn event_stream_semantics() {
674 let inspector = inspect::Inspector::default();
675 let mut router = EventRouter::new(inspector.root().create_child("events"));
676 let mut producer1 = TestEventProducer::default();
677 let mut producer2 = TestEventProducer::default();
678 let (receiver, consumer) = TestEventConsumer::new();
679 router.add_consumer(ConsumerConfig {
680 consumer: &consumer,
681 events: vec![EventType::InspectSinkRequested],
682 });
683 router.add_producer(ProducerConfig {
684 producer: &mut producer1,
685 events: vec![EventType::InspectSinkRequested],
686 });
687 router.add_producer(ProducerConfig {
688 producer: &mut producer2,
689 events: vec![EventType::InspectSinkRequested],
690 });
691
692 let identity = |moniker| {
693 Arc::new(ComponentIdentity::new(ExtendedMoniker::parse_str(moniker).unwrap(), TEST_URL))
694 };
695
696 producer1.emit(EventType::InspectSinkRequested, identity("./a"));
697 producer2.emit(EventType::InspectSinkRequested, identity("./b"));
698 producer1.emit(EventType::InspectSinkRequested, identity("./c"));
699 producer2.emit(EventType::InspectSinkRequested, identity("./d"));
700
701 let (_terminate_handle, fut) = router.start().unwrap();
703 let _router_task = fasync::Task::spawn(fut);
704 let events = receiver.take(4).collect::<Vec<_>>().await;
705
706 let expected_events = vec![
707 inspect_sink_requested(identity("./a")),
708 inspect_sink_requested(identity("./b")),
709 inspect_sink_requested(identity("./c")),
710 inspect_sink_requested(identity("./d")),
711 ];
712 assert_eq!(events.len(), expected_events.len());
713 for (event, expected_event) in std::iter::zip(events, expected_events) {
714 assert_event(event, expected_event);
715 }
716 }
717
718 #[fuchsia::test]
719 async fn stream_draining() {
720 let inspector = inspect::Inspector::default();
721 let mut router = EventRouter::new(inspector.root().create_child("events"));
722 let mut producer = TestEventProducer::default();
723 let (mut receiver, consumer) = TestEventConsumer::new();
724 router.add_consumer(ConsumerConfig {
725 consumer: &consumer,
726 events: vec![EventType::InspectSinkRequested],
727 });
728 router.add_producer(ProducerConfig {
729 producer: &mut producer,
730 events: vec![EventType::InspectSinkRequested],
731 });
732 router.add_producer(ProducerConfig {
733 producer: &mut producer,
734 events: vec![EventType::InspectSinkRequested],
735 });
736
737 producer.emit(EventType::InspectSinkRequested, IDENTITY.clone());
738
739 let (terminate_handle, fut) = router.start().unwrap();
740 let _router_task = fasync::Task::spawn(fut);
741 let on_drained = terminate_handle.terminate();
742 let drain_finished = fasync::Task::spawn(on_drained);
743
744 assert_event(receiver.next().await.unwrap(), inspect_sink_requested(IDENTITY.clone()));
745
746 drain_finished.await;
748
749 producer.emit(EventType::InspectSinkRequested, IDENTITY.clone());
751 assert!(receiver.next().now_or_never().is_none());
752 }
753
754 fn assert_event(event: Event, other: Event) {
755 assert_eq!(event.timestamp, other.timestamp);
756 match (event.payload, other.payload) {
757 (
758 EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
759 component: this_identity,
760 ..
761 }),
762 EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
763 component: other_identity,
764 ..
765 }),
766 ) => {
767 assert_eq!(this_identity, other_identity);
768 }
769 _ => unimplemented!("no other combinations are expected in these tests"),
770 }
771 }
772
773 fn inspect_sink_requested(identity: Arc<ComponentIdentity>) -> Event {
774 let (_proxy, request_stream) =
775 fidl::endpoints::create_proxy_and_stream::<InspectSinkMarker>();
776 Event {
777 timestamp: zx::BootInstant::from_nanos(FAKE_TIMESTAMP),
778 payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
779 component: identity,
780 request_stream,
781 }),
782 }
783 }
784}