archivist_lib/events/
router.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::types::*;
6use crate::identity::ComponentIdentity;
7use fuchsia_inspect::{self as inspect, NumericProperty};
8use fuchsia_inspect_contrib::inspect_log;
9use fuchsia_inspect_contrib::nodes::BoundedListNode;
10use futures::channel::{mpsc, oneshot};
11use futures::task::{Context, Poll};
12use futures::{Future, Stream, StreamExt};
13use log::{debug, error};
14use pin_project::pin_project;
15use std::collections::{BTreeMap, BTreeSet};
16use std::pin::Pin;
17use std::sync::{Arc, Weak};
18use thiserror::Error;
19
20const RECENT_EVENT_LIMIT: usize = 200;
21
22/// Core archivist internal event router that supports multiple event producers and multiple event
23/// consumers.
24pub struct EventRouter {
25    // All the consumers that have been registered for an event.
26    consumers: BTreeMap<EventType, Vec<Weak<dyn EventConsumer + Send + Sync>>>,
27    // The types of all events that can be produced. Used only for validation.
28    producers_registered: BTreeSet<EventType>,
29
30    // Ends of the channel used by event producers.
31    sender: mpsc::UnboundedSender<Event>,
32    receiver: mpsc::UnboundedReceiver<Event>,
33
34    inspect_logger: EventStreamLogger,
35}
36
37impl EventRouter {
38    /// Creates a new empty event router.
39    pub fn new(node: inspect::Node) -> Self {
40        let (sender, receiver) = mpsc::unbounded();
41        Self {
42            consumers: BTreeMap::new(),
43            sender,
44            receiver,
45            producers_registered: BTreeSet::new(),
46            inspect_logger: EventStreamLogger::new(node),
47        }
48    }
49
50    /// Registers an event producer with the given configuration specifying the types of events the
51    /// given producer is allowed to emit.
52    pub fn add_producer<T>(&mut self, config: ProducerConfig<'_, T>)
53    where
54        T: EventProducer,
55    {
56        let events: BTreeSet<_> = config.events.into_iter().collect();
57        self.producers_registered.append(&mut events.clone());
58        let dispatcher = Dispatcher::new(events, self.sender.clone());
59        config.producer.set_dispatcher(dispatcher);
60    }
61
62    /// Registers an event consumer with the given configuration specifying the types of events the
63    /// given consumer will receive.
64    pub fn add_consumer<T>(&mut self, config: ConsumerConfig<'_, T>)
65    where
66        T: EventConsumer + Send + Sync + 'static,
67    {
68        let subscriber_weak = Arc::downgrade(config.consumer);
69        for event_type in config.events {
70            self.consumers
71                .entry(event_type)
72                .or_default()
73                .push(Weak::clone(&subscriber_weak) as Weak<dyn EventConsumer + Send + Sync>);
74        }
75    }
76
77    /// Starts listening for events emitted by the registered producers and dispatching them to
78    /// registered consumers.
79    ///
80    /// First, validates that for every event type that will be dispatched, there exists at least
81    /// one consumer. And that for every event that will be consumed, there exists at least one
82    /// producer.
83    ///
84    /// Afterwards, listens to events emitted by producers. When an event arrives it sends it to
85    /// all consumers of the event. Since all events are singletons, the first consumer that was
86    /// registered will get the singleton data and the rest won't.
87    pub fn start(mut self) -> Result<(TerminateHandle, impl Future<Output = ()>), RouterError> {
88        self.validate_routing()?;
89
90        let (terminate_handle, mut stream) = EventStream::new(self.receiver);
91        let mut consumers = self.consumers;
92        let mut inspect_logger = self.inspect_logger;
93
94        let fut = async move {
95            loop {
96                match stream.next().await {
97                    None => {
98                        debug!("Event ingestion finished");
99                        break;
100                    }
101                    Some(event) => {
102                        inspect_logger.log(&event);
103
104                        let event_type = event.ty();
105                        let weak_consumers = match consumers.remove(&event_type) {
106                            Some(c) => c,
107                            None => continue,
108                        };
109
110                        let mut singleton_event = Some(event);
111
112                        // Consumers which weak reference could be upgraded will be stored here.
113                        let mut active_consumers = vec![];
114                        for weak_consumer in weak_consumers {
115                            if let Some(consumer) = weak_consumer.upgrade() {
116                                active_consumers.push(weak_consumer);
117                                if let Some(e) = singleton_event.take() {
118                                    consumer.handle(e);
119                                };
120                            }
121                        }
122
123                        // We insert the list of active consumers back in the map.
124                        consumers.insert(event_type, active_consumers);
125                    }
126                }
127            }
128        };
129        Ok((terminate_handle, fut))
130    }
131
132    fn validate_routing(&mut self) -> Result<(), RouterError> {
133        for consumed_event in self.consumers.keys() {
134            if !self.producers_registered.contains(consumed_event) {
135                return Err(RouterError::MissingProducer(consumed_event.clone()));
136            }
137        }
138        for produced_event in &self.producers_registered {
139            if !self.consumers.contains_key(produced_event) {
140                return Err(RouterError::MissingConsumer(produced_event.clone()));
141            }
142        }
143        Ok(())
144    }
145}
146
147/// Stream of events that  provides the mechanisms used to notify when the events have
148/// been drained.
149#[pin_project]
150struct EventStream {
151    /// The stream containing events.
152    #[pin]
153    receiver: mpsc::UnboundedReceiver<Event>,
154
155    /// When this future is ready, the stream will be closed. Messages still in the buffer
156    /// will be drained.
157    #[pin]
158    on_terminate: oneshot::Receiver<()>,
159
160    /// When the stream has been drained a notification will be sent through this channel.
161    on_drained: Option<oneshot::Sender<()>>,
162}
163
164impl EventStream {
165    fn new(receiver: mpsc::UnboundedReceiver<Event>) -> (TerminateHandle, Self) {
166        let (snd, rcv) = oneshot::channel();
167        let (drain_snd, drain_rcv) = oneshot::channel();
168        (
169            TerminateHandle { snd, drained: drain_rcv },
170            Self { receiver, on_terminate: rcv, on_drained: Some(drain_snd) },
171        )
172    }
173}
174
175impl Stream for EventStream {
176    type Item = Event;
177
178    /// This stream implementation merges two streams into a single one polling from each of them
179    /// in a round robin fashion. When one stream finishes, this will keep polling from the
180    /// remaining one.
181    ///
182    /// When receiving a request for termination, the event stream will be
183    /// closed so that no new messages can be sent through that channel, but it'll still be drained.
184    ///
185    /// When the stream has been drained, a message is sent through the appropriate
186    /// channel.
187    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188        let mut this = self.project();
189
190        // First check if request to terminate the event ingestion has been requested, if
191        // it has, then close the channel to which events are sent. This will prevent
192        // further messages to be sent, but it remains possible to drain the channel
193        // buffer.
194        //
195        // IMPORTANT: If we ever use this event stream for events emitted internally, then we
196        // should bring back the changed undone in https://fxrev.dev/744413 as internal event
197        // streams shouldn't be closed on termination.
198        match this.on_terminate.poll(cx) {
199            Poll::Pending => {}
200            Poll::Ready(_) => {
201                this.receiver.close();
202            }
203        }
204        // Poll the stream and track whether it's drained or not.
205        match this.receiver.poll_next(cx) {
206            Poll::Pending => Poll::Pending,
207            Poll::Ready(None) => {
208                // Notify once that it has been drained.
209                if let Some(snd) = this.on_drained.take() {
210                    snd.send(()).unwrap_or_else(|err| {
211                        error!(err:?; "Failed to notify the events have been drained.");
212                    });
213                };
214                Poll::Ready(None)
215            }
216            res @ Poll::Ready(Some(_)) => res,
217        }
218    }
219}
220
221/// Allows to termiante event ingestion.
222pub struct TerminateHandle {
223    snd: oneshot::Sender<()>,
224    drained: oneshot::Receiver<()>,
225}
226
227impl TerminateHandle {
228    /// Terminates event ingestion. Buffered events will be drained. The returned future
229    /// will complete once all buffered events have been drained.
230    pub async fn terminate(self) {
231        self.snd.send(()).unwrap_or_else(|err| {
232            error!(err:?; "Failed to terminate the event ingestion.");
233        });
234        self.drained
235            .await
236            .unwrap_or_else(|err| error!(err:?; "Error waiting for events to be drained."));
237    }
238}
239
240/// Allows to emit events of a restricted set of types.
241///
242/// Event producers will receive a `Dispatcher` instance that will allow them to emit events of
243/// restricted set of types.
244pub struct Dispatcher {
245    allowed_events: BTreeSet<EventType>,
246    sender: Option<mpsc::UnboundedSender<Event>>,
247}
248
249/// Returns a no-op dispatcher.
250impl Default for Dispatcher {
251    fn default() -> Self {
252        Self { allowed_events: BTreeSet::new(), sender: None }
253    }
254}
255
256impl Dispatcher {
257    fn new(allowed_events: BTreeSet<EventType>, sender: mpsc::UnboundedSender<Event>) -> Self {
258        Self { allowed_events, sender: Some(sender) }
259    }
260
261    /// Emits an event. If the event isn't in the restricted set of allowed types, this operation
262    /// is a no-op. An error is returned when sending the event into the channel fails.
263    pub fn emit(&mut self, event: Event) -> Result<(), mpsc::TrySendError<Event>> {
264        if let Some(sender) = &mut self.sender {
265            if self.allowed_events.contains(&event.ty()) {
266                sender.unbounded_send(event)?;
267            }
268        }
269        Ok(())
270    }
271
272    #[cfg(test)]
273    pub fn new_for_test(
274        allowed_events: BTreeSet<EventType>,
275    ) -> (mpsc::UnboundedReceiver<Event>, Self) {
276        let (sender, receiver) = mpsc::unbounded();
277        (receiver, Self::new(allowed_events, sender))
278    }
279}
280
281struct EventStreamLogger {
282    counters: BTreeMap<EventType, inspect::UintProperty>,
283    component_log_node: BoundedListNode,
284    counters_node: inspect::Node,
285    _node: inspect::Node,
286}
287
288impl EventStreamLogger {
289    /// Creates a new event logger. All inspect data will be written as children of `parent`.
290    pub fn new(node: inspect::Node) -> Self {
291        let counters_node = node.create_child("event_counts");
292        let recent_events_node = node.create_child("recent_events");
293        Self {
294            _node: node,
295            counters: BTreeMap::new(),
296            counters_node,
297            component_log_node: BoundedListNode::new(recent_events_node, RECENT_EVENT_LIMIT),
298        }
299    }
300
301    /// Log a new component event to inspect.
302    pub fn log(&mut self, event: &Event) {
303        let ty = event.ty();
304        if self.counters.contains_key(&ty) {
305            self.counters.get_mut(&ty).unwrap().add(1);
306        } else {
307            let counter = self.counters_node.create_uint(ty.as_ref(), 1);
308            self.counters.insert(ty.clone(), counter);
309        }
310        // TODO(https://fxbug.dev/42174041): leverage string references for the payload.
311        match &event.payload {
312            EventPayload::LogSinkRequested(LogSinkRequestedPayload { component, .. })
313            | EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
314                component, ..
315            }) => {
316                self.log_inspect(ty.as_ref(), component);
317            }
318        }
319    }
320
321    fn log_inspect(&mut self, event_name: &str, identity: &ComponentIdentity) {
322        // TODO(https://fxbug.dev/42174041): leverage string references for the `event_name`.
323        inspect_log!(self.component_log_node,
324            "event" => event_name,
325            "moniker" => identity.moniker.to_string(),
326        );
327    }
328}
329
330/// Set of errors that can happen when setting up an event router and executing its dispatching loop.
331#[derive(Debug, Error)]
332pub enum RouterError {
333    #[error("Missing consumer for event type {0:?}")]
334    MissingConsumer(EventType),
335
336    #[error("Missing producer for event type {0:?}")]
337    MissingProducer(EventType),
338}
339
340/// Configuration for an event producer.
341pub struct ProducerConfig<'a, T> {
342    /// The event producer that will receive a `Dispatcher`
343    pub producer: &'a mut T,
344
345    /// The set of events that the `producer` will be allowed to emit.
346    pub events: Vec<EventType>,
347}
348
349/// Configuration for an event consumer.
350pub struct ConsumerConfig<'a, T> {
351    /// The event consumer that will receive events when they are emitted by producers.
352    pub consumer: &'a Arc<T>,
353
354    /// The set of event types that the `consumer` will receive.
355    pub events: Vec<EventType>,
356}
357
358/// Trait implemented by data types which receive events.
359pub trait EventConsumer {
360    /// Event consumers will receive a call on this method when an event they are interested on
361    /// happens.
362    fn handle(self: Arc<Self>, event: Event);
363}
364
365/// Trait implemented by data types which emit events.
366pub trait EventProducer {
367    /// Whent registered, event producers will receive a call on this method with the `dispatcher`
368    /// they can use to emit events.
369    fn set_dispatcher(&mut self, dispatcher: Dispatcher);
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use assert_matches::assert_matches;
376    use diagnostics_assertions::{assert_data_tree, AnyProperty};
377    use fidl::encoding::ProxyChannelBox;
378    use fidl::endpoints::RequestStream;
379    use fidl_fuchsia_inspect::InspectSinkMarker;
380    use fidl_fuchsia_logger::{LogSinkMarker, LogSinkRequestStream};
381    use fuchsia_async as fasync;
382    use fuchsia_sync::Mutex;
383    use futures::FutureExt;
384    use moniker::ExtendedMoniker;
385    use std::sync::LazyLock;
386    use zx::AsHandleRef;
387
388    const TEST_URL: &str = "NO-OP URL";
389    const FAKE_TIMESTAMP: i64 = 5;
390    static IDENTITY: LazyLock<Arc<ComponentIdentity>> = LazyLock::new(|| {
391        Arc::new(ComponentIdentity::new(ExtendedMoniker::parse_str("./a/b").unwrap(), TEST_URL))
392    });
393
394    #[derive(Default)]
395    struct TestEventProducer {
396        dispatcher: Dispatcher,
397    }
398
399    impl TestEventProducer {
400        fn emit(&mut self, event_type: EventType, identity: Arc<ComponentIdentity>) {
401            let event = match event_type {
402                EventType::LogSinkRequested => {
403                    let (_, request_stream) =
404                        fidl::endpoints::create_proxy_and_stream::<LogSinkMarker>();
405                    Event {
406                        timestamp: zx::BootInstant::from_nanos(FAKE_TIMESTAMP),
407                        payload: EventPayload::LogSinkRequested(LogSinkRequestedPayload {
408                            component: identity,
409                            request_stream,
410                        }),
411                    }
412                }
413                EventType::InspectSinkRequested => {
414                    let (_, request_stream) =
415                        fidl::endpoints::create_proxy_and_stream::<InspectSinkMarker>();
416                    Event {
417                        timestamp: zx::BootInstant::from_nanos(FAKE_TIMESTAMP),
418                        payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
419                            component: identity,
420                            request_stream,
421                        }),
422                    }
423                }
424            };
425            let _ = self.dispatcher.emit(event);
426        }
427    }
428
429    impl EventProducer for TestEventProducer {
430        fn set_dispatcher(&mut self, dispatcher: Dispatcher) {
431            self.dispatcher = dispatcher;
432        }
433    }
434
435    struct TestEventConsumer {
436        event_sender: Mutex<mpsc::UnboundedSender<Event>>,
437    }
438
439    impl TestEventConsumer {
440        fn new() -> (mpsc::UnboundedReceiver<Event>, Arc<Self>) {
441            let (event_sender, event_receiver) = mpsc::unbounded();
442            (event_receiver, Arc::new(Self { event_sender: Mutex::new(event_sender) }))
443        }
444    }
445
446    impl EventConsumer for TestEventConsumer {
447        fn handle(self: Arc<Self>, event: Event) {
448            self.event_sender.lock().unbounded_send(event).unwrap();
449        }
450    }
451
452    #[fuchsia::test]
453    fn invalid_routing() {
454        let mut producer = TestEventProducer::default();
455        let (_receiver, consumer) = TestEventConsumer::new();
456        let mut router = EventRouter::new(inspect::Node::default());
457        router.add_producer(ProducerConfig {
458            producer: &mut producer,
459            events: vec![EventType::InspectSinkRequested],
460        });
461        router.add_consumer(ConsumerConfig {
462            consumer: &consumer,
463            events: vec![EventType::InspectSinkRequested, EventType::LogSinkRequested],
464        });
465
466        // An explicit match is needed here since unwrap_err requires Debug implemented for both T
467        // and E in Result<T, E> and T is a pair which second element is `impl Future` which
468        // doesn't implement Debug.
469        match router.start() {
470            Err(err) => {
471                assert_matches!(err, RouterError::MissingProducer(EventType::LogSinkRequested));
472            }
473            Ok(_) => panic!("expected an error from routing events"),
474        }
475
476        let mut producer = TestEventProducer::default();
477        let (_receiver, consumer) = TestEventConsumer::new();
478        let mut router = EventRouter::new(inspect::Node::default());
479        router.add_producer(ProducerConfig {
480            producer: &mut producer,
481            events: vec![EventType::InspectSinkRequested],
482        });
483        router.add_consumer(ConsumerConfig {
484            consumer: &consumer,
485            events: vec![EventType::LogSinkRequested],
486        });
487
488        match router.start() {
489            Err(err) => {
490                assert_matches!(
491                    err,
492                    RouterError::MissingConsumer(EventType::InspectSinkRequested)
493                        | RouterError::MissingProducer(EventType::LogSinkRequested)
494                );
495            }
496            Ok(_) => panic!("expected an error from routing events"),
497        }
498    }
499
500    #[fuchsia::test]
501    async fn event_subscription() {
502        let mut producer = TestEventProducer::default();
503        let (mut first_receiver, first_consumer) = TestEventConsumer::new();
504        let (mut second_receiver, second_consumer) = TestEventConsumer::new();
505        let mut router = EventRouter::new(inspect::Node::default());
506        router.add_producer(ProducerConfig {
507            producer: &mut producer,
508            events: vec![EventType::LogSinkRequested],
509        });
510        router.add_consumer(ConsumerConfig {
511            consumer: &first_consumer,
512            events: vec![EventType::LogSinkRequested],
513        });
514        router.add_consumer(ConsumerConfig {
515            consumer: &second_consumer,
516            events: vec![EventType::LogSinkRequested],
517        });
518
519        let (_terminate_handle, fut) = router.start().unwrap();
520        let _router_task = fasync::Task::spawn(fut);
521
522        // Emit an event
523        let (_, server_end) = fidl::endpoints::create_proxy::<LogSinkMarker>();
524        let request_stream_koid = server_end.as_handle_ref().get_koid().unwrap();
525        let request_stream = LogSinkRequestStream::from_channel(fidl::AsyncChannel::from_channel(
526            server_end.into_channel(),
527        ));
528        let timestamp = zx::BootInstant::get();
529        producer
530            .dispatcher
531            .emit(Event {
532                timestamp,
533                payload: EventPayload::LogSinkRequested(LogSinkRequestedPayload {
534                    component: IDENTITY.clone(),
535                    request_stream,
536                }),
537            })
538            .unwrap();
539
540        // The first consumer that was registered receives the request stream. The second one
541        // receives nothing.
542        let first_event = first_receiver.next().await.unwrap();
543        assert_matches!(first_event, Event {
544            payload: EventPayload::LogSinkRequested(payload),
545            ..
546        } => {
547            assert_eq!(payload.component, *IDENTITY);
548            let actual_koid = payload.request_stream
549                .into_inner().0.channel().as_channel().as_handle_ref().get_koid().unwrap();
550            assert_eq!(actual_koid, request_stream_koid);
551        });
552        assert!(second_receiver.next().now_or_never().is_none());
553    }
554
555    #[fuchsia::test]
556    async fn consumers_cleanup() {
557        let mut producer = TestEventProducer::default();
558        let (mut first_receiver, first_consumer) = TestEventConsumer::new();
559        let (mut second_receiver, second_consumer) = TestEventConsumer::new();
560        let (mut third_receiver, third_consumer) = TestEventConsumer::new();
561        let mut router = EventRouter::new(inspect::Node::default());
562        router.add_producer(ProducerConfig {
563            producer: &mut producer,
564            events: vec![EventType::InspectSinkRequested],
565        });
566        router.add_consumer(ConsumerConfig {
567            consumer: &first_consumer,
568            events: vec![EventType::InspectSinkRequested],
569        });
570        router.add_consumer(ConsumerConfig {
571            consumer: &second_consumer,
572            events: vec![EventType::InspectSinkRequested],
573        });
574        router.add_consumer(ConsumerConfig {
575            consumer: &third_consumer,
576            events: vec![EventType::InspectSinkRequested],
577        });
578
579        drop(first_consumer);
580        drop(third_consumer);
581
582        let (_terminate_handle, fut) = router.start().unwrap();
583        let _router_task = fasync::Task::spawn(fut);
584
585        // Emit an event
586        let (_, request_stream) = fidl::endpoints::create_proxy_and_stream::<InspectSinkMarker>();
587        producer
588            .dispatcher
589            .emit(Event {
590                timestamp: zx::BootInstant::get(),
591                payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
592                    component: IDENTITY.clone(),
593                    request_stream,
594                }),
595            })
596            .unwrap();
597
598        // We see the event only in the receiver which consumer wasn't dropped.
599        let event = second_receiver.next().await.unwrap();
600        assert_matches!(event.payload, EventPayload::InspectSinkRequested(_));
601        assert!(first_receiver.next().now_or_never().unwrap().is_none());
602        assert!(third_receiver.next().now_or_never().unwrap().is_none());
603
604        // We see additional events in the second receiver which remains alive.
605        let (_, request_stream) = fidl::endpoints::create_proxy_and_stream::<InspectSinkMarker>();
606        producer
607            .dispatcher
608            .emit(Event {
609                timestamp: zx::BootInstant::get(),
610                payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
611                    component: IDENTITY.clone(),
612                    request_stream,
613                }),
614            })
615            .unwrap();
616        let event = second_receiver.next().await.unwrap();
617        assert_matches!(event.payload, EventPayload::InspectSinkRequested(_));
618        assert!(first_receiver.next().now_or_never().unwrap().is_none());
619        assert!(third_receiver.next().now_or_never().unwrap().is_none());
620    }
621
622    #[fuchsia::test]
623    async fn inspect_log() {
624        let inspector = inspect::Inspector::default();
625        let mut router = EventRouter::new(inspector.root().create_child("events"));
626        let mut producer1 = TestEventProducer::default();
627        let mut producer2 = TestEventProducer::default();
628        let (receiver, consumer) = TestEventConsumer::new();
629        router.add_consumer(ConsumerConfig {
630            consumer: &consumer,
631            events: vec![EventType::InspectSinkRequested, EventType::LogSinkRequested],
632        });
633        router.add_producer(ProducerConfig {
634            producer: &mut producer1,
635            events: vec![EventType::LogSinkRequested],
636        });
637        router.add_producer(ProducerConfig {
638            producer: &mut producer2,
639            events: vec![EventType::InspectSinkRequested],
640        });
641
642        producer1.emit(EventType::LogSinkRequested, IDENTITY.clone());
643        producer2.emit(EventType::InspectSinkRequested, IDENTITY.clone());
644
645        // Consume the events.
646        let (_terminate_handle, fut) = router.start().unwrap();
647        let _router_task = fasync::Task::spawn(fut);
648        receiver.take(2).collect::<Vec<_>>().await;
649
650        assert_data_tree!(inspector, root: {
651            events: {
652                event_counts: {
653                    log_sink_requested: 1u64,
654                    inspect_sink_requested: 1u64,
655                },
656                recent_events: {
657                    "0": {
658                        "@time": AnyProperty,
659                        event: "log_sink_requested",
660                        moniker: "a/b"
661                    },
662                    "1": {
663                        "@time": AnyProperty,
664                        event: "inspect_sink_requested",
665                        moniker: "a/b"
666                    },
667                }
668            }
669        });
670    }
671
672    #[fuchsia::test]
673    async fn event_stream_semantics() {
674        let inspector = inspect::Inspector::default();
675        let mut router = EventRouter::new(inspector.root().create_child("events"));
676        let mut producer1 = TestEventProducer::default();
677        let mut producer2 = TestEventProducer::default();
678        let (receiver, consumer) = TestEventConsumer::new();
679        router.add_consumer(ConsumerConfig {
680            consumer: &consumer,
681            events: vec![EventType::InspectSinkRequested],
682        });
683        router.add_producer(ProducerConfig {
684            producer: &mut producer1,
685            events: vec![EventType::InspectSinkRequested],
686        });
687        router.add_producer(ProducerConfig {
688            producer: &mut producer2,
689            events: vec![EventType::InspectSinkRequested],
690        });
691
692        let identity = |moniker| {
693            Arc::new(ComponentIdentity::new(ExtendedMoniker::parse_str(moniker).unwrap(), TEST_URL))
694        };
695
696        producer1.emit(EventType::InspectSinkRequested, identity("./a"));
697        producer2.emit(EventType::InspectSinkRequested, identity("./b"));
698        producer1.emit(EventType::InspectSinkRequested, identity("./c"));
699        producer2.emit(EventType::InspectSinkRequested, identity("./d"));
700
701        // We should see the events in order of emission.
702        let (_terminate_handle, fut) = router.start().unwrap();
703        let _router_task = fasync::Task::spawn(fut);
704        let events = receiver.take(4).collect::<Vec<_>>().await;
705
706        let expected_events = vec![
707            inspect_sink_requested(identity("./a")),
708            inspect_sink_requested(identity("./b")),
709            inspect_sink_requested(identity("./c")),
710            inspect_sink_requested(identity("./d")),
711        ];
712        assert_eq!(events.len(), expected_events.len());
713        for (event, expected_event) in std::iter::zip(events, expected_events) {
714            assert_event(event, expected_event);
715        }
716    }
717
718    #[fuchsia::test]
719    async fn stream_draining() {
720        let inspector = inspect::Inspector::default();
721        let mut router = EventRouter::new(inspector.root().create_child("events"));
722        let mut producer = TestEventProducer::default();
723        let (mut receiver, consumer) = TestEventConsumer::new();
724        router.add_consumer(ConsumerConfig {
725            consumer: &consumer,
726            events: vec![EventType::InspectSinkRequested],
727        });
728        router.add_producer(ProducerConfig {
729            producer: &mut producer,
730            events: vec![EventType::InspectSinkRequested],
731        });
732        router.add_producer(ProducerConfig {
733            producer: &mut producer,
734            events: vec![EventType::InspectSinkRequested],
735        });
736
737        producer.emit(EventType::InspectSinkRequested, IDENTITY.clone());
738
739        let (terminate_handle, fut) = router.start().unwrap();
740        let _router_task = fasync::Task::spawn(fut);
741        let on_drained = terminate_handle.terminate();
742        let drain_finished = fasync::Task::spawn(on_drained);
743
744        assert_event(receiver.next().await.unwrap(), inspect_sink_requested(IDENTITY.clone()));
745
746        // This future must be complete now.
747        drain_finished.await;
748
749        // We must never see any new event emitted by the producer.
750        producer.emit(EventType::InspectSinkRequested, IDENTITY.clone());
751        assert!(receiver.next().now_or_never().is_none());
752    }
753
754    fn assert_event(event: Event, other: Event) {
755        assert_eq!(event.timestamp, other.timestamp);
756        match (event.payload, other.payload) {
757            (
758                EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
759                    component: this_identity,
760                    ..
761                }),
762                EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
763                    component: other_identity,
764                    ..
765                }),
766            ) => {
767                assert_eq!(this_identity, other_identity);
768            }
769            _ => unimplemented!("no other combinations are expected in these tests"),
770        }
771    }
772
773    fn inspect_sink_requested(identity: Arc<ComponentIdentity>) -> Event {
774        let (_proxy, request_stream) =
775            fidl::endpoints::create_proxy_and_stream::<InspectSinkMarker>();
776        Event {
777            timestamp: zx::BootInstant::from_nanos(FAKE_TIMESTAMP),
778            payload: EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
779                component: identity,
780                request_stream,
781            }),
782        }
783    }
784}