archivist_lib/events/sources/
event_source.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::events::router::{Dispatcher, EventProducer};
6use anyhow::Error;
7use fcomponent::EventStreamProxy;
8use fidl_fuchsia_component as fcomponent;
9use fuchsia_component::client::connect_to_protocol_at_path;
10use log::warn;
11
12pub struct EventSource {
13    dispatcher: Dispatcher,
14    event_stream: EventStreamProxy,
15}
16
17impl EventSource {
18    pub async fn new(event_stream_path: &str) -> Result<Self, Error> {
19        let event_stream =
20            connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(event_stream_path)?;
21        let _ = event_stream.wait_for_ready().await;
22        Ok(Self { event_stream, dispatcher: Dispatcher::default() })
23    }
24
25    #[cfg(test)]
26    fn new_for_test(event_stream: EventStreamProxy) -> Self {
27        // Connect to /events/event_stream which contains our newer FIDL protocol
28        Self { event_stream, dispatcher: Dispatcher::default() }
29    }
30
31    pub async fn spawn(mut self) -> Result<(), Error> {
32        while let Ok(events) = self.event_stream.get_next().await {
33            for event in events {
34                match event.try_into() {
35                    Ok(event) => {
36                        if let Err(err) = self.dispatcher.emit(event) {
37                            if err.is_disconnected() {
38                                break;
39                            }
40                        }
41                    }
42                    Err(err) => {
43                        warn!(err:?; "Failed to interpret event");
44                    }
45                }
46            }
47        }
48        Ok(())
49    }
50}
51
52impl EventProducer for EventSource {
53    fn set_dispatcher(&mut self, dispatcher: Dispatcher) {
54        self.dispatcher = dispatcher;
55    }
56}
57
58#[cfg(test)]
59pub mod tests {
60    use super::*;
61    use crate::events::types::*;
62    use crate::identity::ComponentIdentity;
63    use fuchsia_async as fasync;
64    use futures::channel::mpsc::UnboundedSender;
65    use futures::StreamExt;
66    use moniker::ExtendedMoniker;
67    use std::collections::BTreeSet;
68
69    #[fuchsia::test]
70    async fn event_stream() {
71        let events = BTreeSet::from([EventType::InspectSinkRequested, EventType::LogSinkRequested]);
72        let (mut event_stream, dispatcher) = Dispatcher::new_for_test(events);
73        let (stream_server, _server_task, sender) = spawn_fake_event_stream();
74        let mut source = EventSource::new_for_test(stream_server);
75        source.set_dispatcher(dispatcher);
76        let _task = fasync::Task::spawn(async move { source.spawn().await });
77
78        // Send a `InspectSinkRequested` event for diagnostics.
79        sender
80            .unbounded_send(fcomponent::Event {
81                header: Some(fcomponent::EventHeader {
82                    event_type: Some(fcomponent::EventType::CapabilityRequested),
83                    moniker: Some("./foo/bar".to_string()),
84                    component_url: Some("fuchsia-pkg://fuchsia.com/foo#meta/bar.cm".to_string()),
85                    timestamp: Some(zx::BootInstant::get()),
86                    ..Default::default()
87                }),
88                payload: Some(fcomponent::EventPayload::CapabilityRequested(
89                    fcomponent::CapabilityRequestedPayload {
90                        name: Some("fuchsia.inspect.InspectSink".to_string()),
91                        capability: Some(zx::Channel::create().0),
92                        ..Default::default()
93                    },
94                )),
95                ..Default::default()
96            })
97            .expect("send logsink requested event ok");
98
99        // Send a `LogSinkRequested` event.
100        sender
101            .unbounded_send(fcomponent::Event {
102                header: Some(fcomponent::EventHeader {
103                    event_type: Some(fcomponent::EventType::CapabilityRequested),
104                    moniker: Some("./foo/bar".to_string()),
105                    component_url: Some("fuchsia-pkg://fuchsia.com/foo#meta/bar.cm".to_string()),
106                    timestamp: Some(zx::BootInstant::get()),
107                    ..Default::default()
108                }),
109                payload: Some(fcomponent::EventPayload::CapabilityRequested(
110                    fcomponent::CapabilityRequestedPayload {
111                        name: Some("fuchsia.logger.LogSink".to_string()),
112                        capability: Some(zx::Channel::create().0),
113                        ..Default::default()
114                    },
115                )),
116                ..Default::default()
117            })
118            .expect("send logsink requested event ok");
119
120        let expected_component_id = ExtendedMoniker::parse_str("./foo/bar").unwrap();
121        let expected_identity = ComponentIdentity::new(
122            expected_component_id,
123            "fuchsia-pkg://fuchsia.com/foo#meta/bar.cm",
124        );
125
126        // Assert the third received event was a InsepctSinkRequested event.
127        let event = event_stream.next().await.unwrap();
128        match event.payload {
129            EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
130                component,
131                request_stream: _,
132            }) => {
133                assert_eq!(*component, expected_identity)
134            }
135            other => panic!("unexpected event payload: {other:?}"),
136        }
137
138        // Assert the last received event was a LogSinkRequested event.
139        let event = event_stream.next().await.unwrap();
140        match event.payload {
141            EventPayload::LogSinkRequested(LogSinkRequestedPayload { component, .. }) => {
142                assert_eq!(*component, expected_identity)
143            }
144            other => panic!("unexpected event payload: {other:?}"),
145        }
146    }
147
148    fn spawn_fake_event_stream(
149    ) -> (fcomponent::EventStreamProxy, fasync::Task<()>, UnboundedSender<fcomponent::Event>) {
150        let (sender, mut receiver) = futures::channel::mpsc::unbounded::<fcomponent::Event>();
151        let (proxy, server_end) = fidl::endpoints::create_proxy::<fcomponent::EventStreamMarker>();
152        let task = fasync::Task::spawn(async move {
153            let mut request_stream = server_end.into_stream();
154            loop {
155                if let Some(Ok(request)) = request_stream.next().await {
156                    match request {
157                        fcomponent::EventStreamRequest::GetNext { responder } => {
158                            if let Some(event) = receiver.next().await {
159                                responder.send(vec![event]).unwrap();
160                            } else {
161                                break;
162                            }
163                        }
164                        fcomponent::EventStreamRequest::WaitForReady { responder } => {
165                            responder.send().unwrap();
166                        }
167                    }
168                }
169            }
170        });
171        (proxy, task, sender)
172    }
173}