archivist_lib/events/sources/
event_source.rs1use crate::events::router::{Dispatcher, EventProducer};
6use anyhow::Error;
7use fcomponent::EventStreamProxy;
8use fidl_fuchsia_component as fcomponent;
9use fuchsia_component::client::connect_to_protocol_at_path;
10use log::warn;
11
12pub struct EventSource {
13 dispatcher: Dispatcher,
14 event_stream: EventStreamProxy,
15}
16
17impl EventSource {
18 pub async fn new(event_stream_path: &str) -> Result<Self, Error> {
19 let event_stream =
20 connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(event_stream_path)?;
21 let _ = event_stream.wait_for_ready().await;
22 Ok(Self { event_stream, dispatcher: Dispatcher::default() })
23 }
24
25 #[cfg(test)]
26 fn new_for_test(event_stream: EventStreamProxy) -> Self {
27 Self { event_stream, dispatcher: Dispatcher::default() }
29 }
30
31 pub async fn spawn(mut self) -> Result<(), Error> {
32 while let Ok(events) = self.event_stream.get_next().await {
33 for event in events {
34 match event.try_into() {
35 Ok(event) => {
36 if let Err(err) = self.dispatcher.emit(event) {
37 if err.is_disconnected() {
38 break;
39 }
40 }
41 }
42 Err(err) => {
43 warn!(err:?; "Failed to interpret event");
44 }
45 }
46 }
47 }
48 Ok(())
49 }
50}
51
52impl EventProducer for EventSource {
53 fn set_dispatcher(&mut self, dispatcher: Dispatcher) {
54 self.dispatcher = dispatcher;
55 }
56}
57
58#[cfg(test)]
59pub mod tests {
60 use super::*;
61 use crate::events::types::*;
62 use crate::identity::ComponentIdentity;
63 use fuchsia_async as fasync;
64 use futures::channel::mpsc::UnboundedSender;
65 use futures::StreamExt;
66 use moniker::ExtendedMoniker;
67 use std::collections::BTreeSet;
68
69 #[fuchsia::test]
70 async fn event_stream() {
71 let events = BTreeSet::from([EventType::InspectSinkRequested, EventType::LogSinkRequested]);
72 let (mut event_stream, dispatcher) = Dispatcher::new_for_test(events);
73 let (stream_server, _server_task, sender) = spawn_fake_event_stream();
74 let mut source = EventSource::new_for_test(stream_server);
75 source.set_dispatcher(dispatcher);
76 let _task = fasync::Task::spawn(async move { source.spawn().await });
77
78 sender
80 .unbounded_send(fcomponent::Event {
81 header: Some(fcomponent::EventHeader {
82 event_type: Some(fcomponent::EventType::CapabilityRequested),
83 moniker: Some("./foo/bar".to_string()),
84 component_url: Some("fuchsia-pkg://fuchsia.com/foo#meta/bar.cm".to_string()),
85 timestamp: Some(zx::BootInstant::get()),
86 ..Default::default()
87 }),
88 payload: Some(fcomponent::EventPayload::CapabilityRequested(
89 fcomponent::CapabilityRequestedPayload {
90 name: Some("fuchsia.inspect.InspectSink".to_string()),
91 capability: Some(zx::Channel::create().0),
92 ..Default::default()
93 },
94 )),
95 ..Default::default()
96 })
97 .expect("send logsink requested event ok");
98
99 sender
101 .unbounded_send(fcomponent::Event {
102 header: Some(fcomponent::EventHeader {
103 event_type: Some(fcomponent::EventType::CapabilityRequested),
104 moniker: Some("./foo/bar".to_string()),
105 component_url: Some("fuchsia-pkg://fuchsia.com/foo#meta/bar.cm".to_string()),
106 timestamp: Some(zx::BootInstant::get()),
107 ..Default::default()
108 }),
109 payload: Some(fcomponent::EventPayload::CapabilityRequested(
110 fcomponent::CapabilityRequestedPayload {
111 name: Some("fuchsia.logger.LogSink".to_string()),
112 capability: Some(zx::Channel::create().0),
113 ..Default::default()
114 },
115 )),
116 ..Default::default()
117 })
118 .expect("send logsink requested event ok");
119
120 let expected_component_id = ExtendedMoniker::parse_str("./foo/bar").unwrap();
121 let expected_identity = ComponentIdentity::new(
122 expected_component_id,
123 "fuchsia-pkg://fuchsia.com/foo#meta/bar.cm",
124 );
125
126 let event = event_stream.next().await.unwrap();
128 match event.payload {
129 EventPayload::InspectSinkRequested(InspectSinkRequestedPayload {
130 component,
131 request_stream: _,
132 }) => {
133 assert_eq!(*component, expected_identity)
134 }
135 other => panic!("unexpected event payload: {other:?}"),
136 }
137
138 let event = event_stream.next().await.unwrap();
140 match event.payload {
141 EventPayload::LogSinkRequested(LogSinkRequestedPayload { component, .. }) => {
142 assert_eq!(*component, expected_identity)
143 }
144 other => panic!("unexpected event payload: {other:?}"),
145 }
146 }
147
148 fn spawn_fake_event_stream(
149 ) -> (fcomponent::EventStreamProxy, fasync::Task<()>, UnboundedSender<fcomponent::Event>) {
150 let (sender, mut receiver) = futures::channel::mpsc::unbounded::<fcomponent::Event>();
151 let (proxy, server_end) = fidl::endpoints::create_proxy::<fcomponent::EventStreamMarker>();
152 let task = fasync::Task::spawn(async move {
153 let mut request_stream = server_end.into_stream();
154 loop {
155 if let Some(Ok(request)) = request_stream.next().await {
156 match request {
157 fcomponent::EventStreamRequest::GetNext { responder } => {
158 if let Some(event) = receiver.next().await {
159 responder.send(vec![event]).unwrap();
160 } else {
161 break;
162 }
163 }
164 fcomponent::EventStreamRequest::WaitForReady { responder } => {
165 responder.send().unwrap();
166 }
167 }
168 }
169 }
170 });
171 (proxy, task, sender)
172 }
173}