1use anyhow::{format_err, Error};
6use fidl::endpoints::{ProtocolMarker, ServerEnd};
7use fuchsia_component::client::connect_to_protocol_at_path;
8use futures::task::{Context, Poll};
9use futures::{ready, TryFuture};
10use lazy_static::lazy_static;
11use pin_project_lite::pin_project;
12use std::collections::VecDeque;
13use thiserror::Error;
14use {fidl_fuchsia_component as fcomponent, fidl_fuchsia_io as fio};
15
16lazy_static! {
17 pub static ref START_COMPONENT_TREE_STREAM: String = "StartComponentTree".into();
20}
21
22pub fn event_name(event_type: &fcomponent::EventType) -> String {
24 match event_type {
25 fcomponent::EventType::CapabilityRequested => "capability_requested",
26 fcomponent::EventType::Discovered => unreachable!("This isn't used anymore"),
27 fcomponent::EventType::Destroyed => "destroyed",
28 fcomponent::EventType::Resolved => "resolved",
29 fcomponent::EventType::Unresolved => "unresolved",
30 fcomponent::EventType::Started => "started",
31 fcomponent::EventType::Stopped => "stopped",
32 fcomponent::EventType::DebugStarted => "debug_started",
33 #[cfg(fuchsia_api_level_at_least = "HEAD")]
34 fcomponent::EventType::DirectoryReady => unreachable!("This isn't used anymore"),
35 }
36 .to_string()
37}
38
39pin_project! {
40 pub struct EventStream {
41 stream: fcomponent::EventStreamProxy,
42 buffer: VecDeque<fcomponent::Event>,
43 #[pin]
44 fut: Option<<fcomponent::EventStreamProxy as fcomponent::EventStreamProxyInterface>::GetNextResponseFut>,
45 }
46}
47
48#[derive(Debug, Error, Clone)]
49pub enum EventStreamError {
50 #[error("Stream terminated unexpectedly")]
51 StreamClosed,
52}
53
54impl EventStream {
55 pub fn new(stream: fcomponent::EventStreamProxy) -> Self {
56 Self { stream, buffer: VecDeque::new(), fut: None }
57 }
58
59 pub fn open_at_path_pipelined(path: impl Into<String>) -> Result<Self, Error> {
60 Ok(Self::new(connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(&path.into())?))
61 }
62
63 pub async fn open_at_path(path: impl Into<String>) -> Result<Self, Error> {
64 let event_stream =
65 connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(&path.into())?;
66 event_stream.wait_for_ready().await?;
67 Ok(Self::new(event_stream))
68 }
69
70 pub async fn open() -> Result<Self, Error> {
71 let event_stream = connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(
72 "/svc/fuchsia.component.EventStream",
73 )?;
74 event_stream.wait_for_ready().await?;
75 Ok(Self::new(event_stream))
76 }
77
78 pub fn open_pipelined() -> Result<Self, Error> {
79 Ok(Self::new(connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(
80 "/svc/fuchsia.component.EventStream",
81 )?))
82 }
83
84 pub async fn next(&mut self) -> Result<fcomponent::Event, EventStreamError> {
85 if let Some(event) = self.buffer.pop_front() {
86 return Ok(event);
87 }
88 match self.stream.get_next().await {
89 Ok(events) => {
90 let mut iter = events.into_iter();
91 if let Some(real_event) = iter.next() {
92 let ret = real_event;
93 while let Some(value) = iter.next() {
94 self.buffer.push_back(value);
95 }
96 return Ok(ret);
97 } else {
98 Err(EventStreamError::StreamClosed)
101 }
102 }
103 Err(_) => Err(EventStreamError::StreamClosed),
104 }
105 }
106}
107
108impl futures::Stream for EventStream {
109 type Item = fcomponent::Event;
110
111 fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112 let mut this = self.project();
113
114 if let Some(event) = this.buffer.pop_front() {
116 return Poll::Ready(Some(event));
117 }
118
119 if let None = this.fut.as_mut().as_pin_mut() {
121 this.fut.set(Some(this.stream.get_next()));
122 }
123
124 let step = ready!(this.fut.as_mut().as_pin_mut().unwrap().try_poll(cx));
125 this.fut.set(None);
126
127 match step {
128 Ok(events) => {
129 let mut iter = events.into_iter();
130 let ret = iter.next().unwrap();
131 while let Some(leftover) = iter.next() {
133 this.buffer.push_back(leftover);
134 }
135 Poll::Ready(Some(ret))
136 }
137 Err(_) => Poll::Ready(None),
138 }
139 }
140}
141
142pub trait Event: TryFrom<fcomponent::Event, Error = anyhow::Error> {
144 const TYPE: fcomponent::EventType;
145 const NAME: &'static str;
146
147 fn target_moniker(&self) -> &str;
148 fn component_url(&self) -> &str;
149 fn timestamp(&self) -> zx::BootInstant;
150 fn is_ok(&self) -> bool;
151 fn is_err(&self) -> bool;
152}
153
154#[derive(Copy, Debug, PartialEq, Eq, Clone, Ord, PartialOrd)]
155pub enum ExitStatus {
158 Clean,
159 Crash(i32),
160}
161
162impl From<i32> for ExitStatus {
163 fn from(exit_status: i32) -> Self {
164 match exit_status {
165 0 => ExitStatus::Clean,
166 _ => ExitStatus::Crash(exit_status),
167 }
168 }
169}
170
171#[derive(Debug)]
172struct EventHeader {
173 event_type: fcomponent::EventType,
174 component_url: String,
175 moniker: String,
176 timestamp: zx::BootInstant,
177}
178
179impl TryFrom<fcomponent::EventHeader> for EventHeader {
180 type Error = anyhow::Error;
181
182 fn try_from(header: fcomponent::EventHeader) -> Result<Self, Self::Error> {
183 let event_type = header.event_type.ok_or_else(|| format_err!("No event type"))?;
184 let component_url = header.component_url.ok_or_else(|| format_err!("No component url"))?;
185 let moniker = header.moniker.ok_or_else(|| format_err!("No moniker"))?;
186 let timestamp = header
187 .timestamp
188 .ok_or_else(|| format_err!("Missing timestamp from the Event object"))?;
189 Ok(EventHeader { event_type, component_url, moniker, timestamp })
190 }
191}
192
193#[derive(Debug, PartialEq, Eq)]
194pub struct EventError {
195 pub description: String,
196}
197
198macro_rules! create_event {
220 (
222 event_type: $event_type:ident,
223 event_name: $event_name:ident,
224 payload: {
225 data: {$(
226 {
227 name: $data_name:ident,
228 ty: $data_ty:ty,
229 }
230 )*},
231 client_protocols: {$(
232 {
233 name: $client_protocol_name:ident,
234 ty: $client_protocol_ty:ty,
235 }
236 )*},
237 server_protocols: {$(
238 {
239 name: $server_protocol_name:ident,
240 }
241 )*},
242 },
243 error_payload: {
244 $(
245 {
246 name: $error_data_name:ident,
247 ty: $error_data_ty:ty,
248 }
249 )*
250 }
251 ) => {
252 paste::paste! {
253 #[derive(Debug)]
254 pub struct [<$event_type Payload>] {
255 $(pub $client_protocol_name: $client_protocol_ty,)*
256 $(pub $server_protocol_name: Option<zx::Channel>,)*
257 $(pub $data_name: $data_ty,)*
258 }
259
260 #[derive(Debug)]
261 pub struct [<$event_type Error>] {
262 $(pub $error_data_name: $error_data_ty,)*
263 pub description: String,
264 }
265
266 #[derive(Debug)]
267 pub struct $event_type {
268 header: EventHeader,
269 result: Result<[<$event_type Payload>], [<$event_type Error>]>,
270 }
271
272 impl $event_type {
273 pub fn result<'a>(&'a self) -> Result<&'a [<$event_type Payload>], &'a [<$event_type Error>]> {
274 self.result.as_ref()
275 }
276
277 $(
278 pub fn [<take_ $server_protocol_name>]<T: ProtocolMarker>(&mut self)
279 -> Option<T::RequestStream> {
280 self.result.as_mut()
281 .ok()
282 .and_then(|payload| payload.$server_protocol_name.take())
283 .map(|channel| {
284 let server_end = ServerEnd::<T>::new(channel);
285 server_end.into_stream()
286 })
287 }
288 )*
289 }
290
291 impl Event for $event_type {
292 const TYPE: fcomponent::EventType = fcomponent::EventType::$event_type;
293 const NAME: &'static str = stringify!($event_name);
294
295 fn target_moniker(&self) -> &str {
296 &self.header.moniker
297 }
298
299 fn component_url(&self) -> &str {
300 &self.header.component_url
301 }
302
303 fn timestamp(&self) -> zx::BootInstant {
304 self.header.timestamp
305 }
306
307 fn is_ok(&self) -> bool {
308 self.result.is_ok()
309 }
310
311 fn is_err(&self) -> bool {
312 self.result.is_err()
313 }
314 }
315
316 impl TryFrom<fcomponent::Event> for $event_type {
317 type Error = anyhow::Error;
318
319 fn try_from(event: fcomponent::Event) -> Result<Self, Self::Error> {
320 let result = match event.payload {
322 Some(payload) => {
323 #[allow(unused)]
326 let payload = match payload {
327 fcomponent::EventPayload::$event_type(payload) => Ok(payload),
328 _ => Err(format_err!("Incorrect payload type, {:?}", payload)),
329 }?;
330
331 $(
333 let $data_name: $data_ty = payload.$data_name.coerce().ok_or(
334 format_err!("Missing {} from {} object",
335 stringify!($data_name), stringify!($event_type))
336 )?;
337 )*
338
339 $(
341 let $client_protocol_name: $client_protocol_ty = payload.$client_protocol_name.ok_or(
342 format_err!("Missing {} from {} object",
343 stringify!($client_protocol_name), stringify!($event_type))
344 )?.into_proxy();
345 )*
346 $(
347 let $server_protocol_name: Option<zx::Channel> =
348 Some(payload.$server_protocol_name.ok_or(
349 format_err!("Missing {} from {} object",
350 stringify!($server_protocol_name), stringify!($event_type))
351 )?);
352 )*
353
354 #[allow(dead_code)]
355 let payload = paste::paste! {
356 [<$event_type Payload>] {
357 $($data_name,)*
358 $($client_protocol_name,)*
359 $($server_protocol_name,)*
360 }
361 };
362
363 Ok(Ok(payload))
364 },
365 None => Err(format_err!("Missing event_result from Event object")),
366 }?;
367
368 let event = {
369 let header = event.header
370 .ok_or(format_err!("Missing Event header"))
371 .and_then(|header| EventHeader::try_from(header))?;
372
373 if header.event_type != Self::TYPE {
374 return Err(format_err!("Incorrect event type"));
375 }
376
377 $event_type { header, result }
378 };
379 Ok(event)
380 }
381 }
382 }
383 };
384 ($event_type:ident, $event_name:ident) => {
385 create_event!(event_type: $event_type, event_name: $event_name,
386 payload: {
387 data: {},
388 client_protocols: {},
389 server_protocols: {},
390 },
391 error_payload: {});
392 };
393}
394
395create_event!(Destroyed, destroyed);
397create_event!(Resolved, resolved);
398create_event!(Unresolved, unresolved);
399create_event!(Started, started);
400create_event!(
401 event_type: Stopped,
402 event_name: stopped,
403 payload: {
404 data: {
405 {
406 name: status,
407 ty: ExitStatus,
408 }
409 {
410 name: exit_code,
411 ty: Option<i64>,
412 }
413 },
414 client_protocols: {},
415 server_protocols: {},
416 },
417 error_payload: {}
418);
419create_event!(
420 event_type: CapabilityRequested,
421 event_name: capability_requested,
422 payload: {
423 data: {
424 {
425 name: name,
426 ty: String,
427 }
428 },
429 client_protocols: {},
430 server_protocols: {
431 {
432 name: capability,
433 }
434 },
435 },
436 error_payload: {
437 {
438 name: name,
439 ty: String,
440 }
441 }
442);
443create_event!(
444 event_type: DebugStarted,
445 event_name: debug_started,
446 payload: {
447 data: {
448 {
449 name: break_on_start,
450 ty: zx::EventPair,
451 }
452 },
453 client_protocols: {
454 {
455 name: runtime_dir,
456 ty: fio::DirectoryProxy,
457 }
458 },
459 server_protocols: {},
460 },
461 error_payload: {}
462);
463
464trait Coerce<T> {
465 fn coerce(self) -> Option<T>;
466}
467
468impl<T> Coerce<T> for Option<T> {
469 fn coerce(self) -> Option<T> {
470 self
471 }
472}
473
474impl<T> Coerce<Option<T>> for Option<T> {
475 fn coerce(self) -> Option<Option<T>> {
476 Some(self)
477 }
478}
479
480impl Coerce<ExitStatus> for Option<i32> {
481 fn coerce(self) -> Option<ExitStatus> {
482 self.map(Into::into)
483 }
484}