component_events/
events.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{format_err, Error};
6use fidl::endpoints::{ProtocolMarker, ServerEnd};
7use fuchsia_component::client::connect_to_protocol_at_path;
8use futures::task::{Context, Poll};
9use futures::{ready, TryFuture};
10use lazy_static::lazy_static;
11use pin_project_lite::pin_project;
12use std::collections::VecDeque;
13use thiserror::Error;
14use {fidl_fuchsia_component as fcomponent, fidl_fuchsia_io as fio};
15
16lazy_static! {
17    /// The path of the static event stream that, by convention, synchronously listens for
18    /// Resolved events.
19    pub static ref START_COMPONENT_TREE_STREAM: String = "StartComponentTree".into();
20}
21
22/// Returns the string name for the given `event_type`
23pub fn event_name(event_type: &fcomponent::EventType) -> String {
24    match event_type {
25        fcomponent::EventType::CapabilityRequested => "capability_requested",
26        fcomponent::EventType::Discovered => unreachable!("This isn't used anymore"),
27        fcomponent::EventType::Destroyed => "destroyed",
28        fcomponent::EventType::Resolved => "resolved",
29        fcomponent::EventType::Unresolved => "unresolved",
30        fcomponent::EventType::Started => "started",
31        fcomponent::EventType::Stopped => "stopped",
32        fcomponent::EventType::DebugStarted => "debug_started",
33        #[cfg(fuchsia_api_level_at_least = "HEAD")]
34        fcomponent::EventType::DirectoryReady => unreachable!("This isn't used anymore"),
35    }
36    .to_string()
37}
38
39pin_project! {
40    pub struct EventStream {
41        stream: fcomponent::EventStreamProxy,
42        buffer: VecDeque<fcomponent::Event>,
43        #[pin]
44        fut: Option<<fcomponent::EventStreamProxy as fcomponent::EventStreamProxyInterface>::GetNextResponseFut>,
45    }
46}
47
48#[derive(Debug, Error, Clone)]
49pub enum EventStreamError {
50    #[error("Stream terminated unexpectedly")]
51    StreamClosed,
52}
53
54impl EventStream {
55    pub fn new(stream: fcomponent::EventStreamProxy) -> Self {
56        Self { stream, buffer: VecDeque::new(), fut: None }
57    }
58
59    pub fn open_at_path_pipelined(path: impl Into<String>) -> Result<Self, Error> {
60        Ok(Self::new(connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(&path.into())?))
61    }
62
63    pub async fn open_at_path(path: impl Into<String>) -> Result<Self, Error> {
64        let event_stream =
65            connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(&path.into())?;
66        event_stream.wait_for_ready().await?;
67        Ok(Self::new(event_stream))
68    }
69
70    pub async fn open() -> Result<Self, Error> {
71        let event_stream = connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(
72            "/svc/fuchsia.component.EventStream",
73        )?;
74        event_stream.wait_for_ready().await?;
75        Ok(Self::new(event_stream))
76    }
77
78    pub fn open_pipelined() -> Result<Self, Error> {
79        Ok(Self::new(connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(
80            "/svc/fuchsia.component.EventStream",
81        )?))
82    }
83
84    pub async fn next(&mut self) -> Result<fcomponent::Event, EventStreamError> {
85        if let Some(event) = self.buffer.pop_front() {
86            return Ok(event);
87        }
88        match self.stream.get_next().await {
89            Ok(events) => {
90                let mut iter = events.into_iter();
91                if let Some(real_event) = iter.next() {
92                    let ret = real_event;
93                    while let Some(value) = iter.next() {
94                        self.buffer.push_back(value);
95                    }
96                    return Ok(ret);
97                } else {
98                    // This should never happen, we should always
99                    // have at least one event.
100                    Err(EventStreamError::StreamClosed)
101                }
102            }
103            Err(_) => Err(EventStreamError::StreamClosed),
104        }
105    }
106}
107
108impl futures::Stream for EventStream {
109    type Item = fcomponent::Event;
110
111    fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
112        let mut this = self.project();
113
114        // Return queued up events when possible.
115        if let Some(event) = this.buffer.pop_front() {
116            return Poll::Ready(Some(event));
117        }
118
119        // Otherwise, listen for more events.
120        if let None = this.fut.as_mut().as_pin_mut() {
121            this.fut.set(Some(this.stream.get_next()));
122        }
123
124        let step = ready!(this.fut.as_mut().as_pin_mut().unwrap().try_poll(cx));
125        this.fut.set(None);
126
127        match step {
128            Ok(events) => {
129                let mut iter = events.into_iter();
130                let ret = iter.next().unwrap();
131                // Store leftover events for subsequent polls.
132                while let Some(leftover) = iter.next() {
133                    this.buffer.push_back(leftover);
134                }
135                Poll::Ready(Some(ret))
136            }
137            Err(_) => Poll::Ready(None),
138        }
139    }
140}
141
142/// Common features of any event - event type, target moniker, conversion function
143pub trait Event: TryFrom<fcomponent::Event, Error = anyhow::Error> {
144    const TYPE: fcomponent::EventType;
145    const NAME: &'static str;
146
147    fn target_moniker(&self) -> &str;
148    fn component_url(&self) -> &str;
149    fn timestamp(&self) -> zx::BootInstant;
150    fn is_ok(&self) -> bool;
151    fn is_err(&self) -> bool;
152}
153
154#[derive(Copy, Debug, PartialEq, Eq, Clone, Ord, PartialOrd)]
155/// Simplifies the exit status represented by an Event. All stop status values
156/// that indicate failure are crushed into `Crash`.
157pub enum ExitStatus {
158    Clean,
159    Crash(i32),
160}
161
162impl From<i32> for ExitStatus {
163    fn from(exit_status: i32) -> Self {
164        match exit_status {
165            0 => ExitStatus::Clean,
166            _ => ExitStatus::Crash(exit_status),
167        }
168    }
169}
170
171#[derive(Debug)]
172struct EventHeader {
173    event_type: fcomponent::EventType,
174    component_url: String,
175    moniker: String,
176    timestamp: zx::BootInstant,
177}
178
179impl TryFrom<fcomponent::EventHeader> for EventHeader {
180    type Error = anyhow::Error;
181
182    fn try_from(header: fcomponent::EventHeader) -> Result<Self, Self::Error> {
183        let event_type = header.event_type.ok_or_else(|| format_err!("No event type"))?;
184        let component_url = header.component_url.ok_or_else(|| format_err!("No component url"))?;
185        let moniker = header.moniker.ok_or_else(|| format_err!("No moniker"))?;
186        let timestamp = header
187            .timestamp
188            .ok_or_else(|| format_err!("Missing timestamp from the Event object"))?;
189        Ok(EventHeader { event_type, component_url, moniker, timestamp })
190    }
191}
192
193#[derive(Debug, PartialEq, Eq)]
194pub struct EventError {
195    pub description: String,
196}
197
198/// The macro defined below will automatically create event classes corresponding
199/// to their events.fidl and hooks.rs counterparts. Every event class implements
200/// the Event and Handler traits. These minimum requirements allow every event to
201/// be handled by the events client library.
202
203/// Creates an event class based on event type and an optional payload
204/// * event_type -> FIDL name for event type
205/// * payload -> If an event has a payload, describe the additional params:
206///   * name -> FIDL name for the payload
207///   * data -> If a payload contains data items, describe the additional params:
208///     * name -> FIDL name for the data item
209///     * ty -> Rust type for the data item
210///   * client_protocols -> If a payload contains client-side protocols, describe
211///     the additional params:
212///     * name -> FIDL name for the protocol
213///     * ty -> Rust type for the protocol proxy
214///   * server_protocols -> If a payload contains server-side protocols, describe
215///     the additional params:
216///     * name -> FIDL name for the protocol
217// TODO(https://fxbug.dev/42131403): This marco is getting complicated. Consider replacing it
218//                  with a procedural macro.
219macro_rules! create_event {
220    // Entry points
221    (
222        event_type: $event_type:ident,
223        event_name: $event_name:ident,
224        payload: {
225            data: {$(
226                {
227                    name: $data_name:ident,
228                    ty: $data_ty:ty,
229                }
230            )*},
231            client_protocols: {$(
232                {
233                    name: $client_protocol_name:ident,
234                    ty: $client_protocol_ty:ty,
235                }
236            )*},
237            server_protocols: {$(
238                {
239                    name: $server_protocol_name:ident,
240                }
241            )*},
242        },
243        error_payload: {
244            $(
245                {
246                    name: $error_data_name:ident,
247                    ty: $error_data_ty:ty,
248                }
249            )*
250        }
251    ) => {
252        paste::paste! {
253            #[derive(Debug)]
254            pub struct [<$event_type Payload>] {
255                $(pub $client_protocol_name: $client_protocol_ty,)*
256                $(pub $server_protocol_name: Option<zx::Channel>,)*
257                $(pub $data_name: $data_ty,)*
258            }
259
260            #[derive(Debug)]
261            pub struct [<$event_type Error>] {
262                $(pub $error_data_name: $error_data_ty,)*
263                pub description: String,
264            }
265
266            #[derive(Debug)]
267            pub struct $event_type {
268                header: EventHeader,
269                result: Result<[<$event_type Payload>], [<$event_type Error>]>,
270            }
271
272            impl $event_type {
273                pub fn result<'a>(&'a self) -> Result<&'a [<$event_type Payload>], &'a [<$event_type Error>]> {
274                    self.result.as_ref()
275                }
276
277                $(
278                    pub fn [<take_ $server_protocol_name>]<T: ProtocolMarker>(&mut self)
279                            -> Option<T::RequestStream> {
280                        self.result.as_mut()
281                            .ok()
282                            .and_then(|payload| payload.$server_protocol_name.take())
283                            .map(|channel| {
284                                let server_end = ServerEnd::<T>::new(channel);
285                                server_end.into_stream()
286                            })
287                    }
288                )*
289            }
290
291            impl Event for $event_type {
292                const TYPE: fcomponent::EventType = fcomponent::EventType::$event_type;
293                const NAME: &'static str = stringify!($event_name);
294
295                fn target_moniker(&self) -> &str {
296                    &self.header.moniker
297                }
298
299                fn component_url(&self) -> &str {
300                    &self.header.component_url
301                }
302
303                fn timestamp(&self) -> zx::BootInstant {
304                    self.header.timestamp
305                }
306
307                fn is_ok(&self) -> bool {
308                    self.result.is_ok()
309                }
310
311                fn is_err(&self) -> bool {
312                    self.result.is_err()
313                }
314            }
315
316            impl TryFrom<fcomponent::Event> for $event_type {
317                type Error = anyhow::Error;
318
319                fn try_from(event: fcomponent::Event) -> Result<Self, Self::Error> {
320                    // Extract the payload from the Event object.
321                    let result = match event.payload {
322                        Some(payload) => {
323                            // This payload will be unused for event types that have no additional
324                            // fields.
325                            #[allow(unused)]
326                            let payload = match payload {
327                                fcomponent::EventPayload::$event_type(payload) => Ok(payload),
328                                _ => Err(format_err!("Incorrect payload type, {:?}", payload)),
329                            }?;
330
331                            // Extract the additional data from the Payload object.
332                            $(
333                                let $data_name: $data_ty = payload.$data_name.coerce().ok_or(
334                                    format_err!("Missing {} from {} object",
335                                        stringify!($data_name), stringify!($event_type))
336                                )?;
337                            )*
338
339                            // Extract the additional protocols from the Payload object.
340                            $(
341                                let $client_protocol_name: $client_protocol_ty = payload.$client_protocol_name.ok_or(
342                                    format_err!("Missing {} from {} object",
343                                        stringify!($client_protocol_name), stringify!($event_type))
344                                )?.into_proxy();
345                            )*
346                            $(
347                                let $server_protocol_name: Option<zx::Channel> =
348                                    Some(payload.$server_protocol_name.ok_or(
349                                        format_err!("Missing {} from {} object",
350                                            stringify!($server_protocol_name), stringify!($event_type))
351                                    )?);
352                            )*
353
354                            #[allow(dead_code)]
355                            let payload = paste::paste! {
356                                [<$event_type Payload>] {
357                                    $($data_name,)*
358                                    $($client_protocol_name,)*
359                                    $($server_protocol_name,)*
360                                }
361                            };
362
363                            Ok(Ok(payload))
364                        },
365                        None => Err(format_err!("Missing event_result from Event object")),
366                    }?;
367
368                    let event = {
369                        let header = event.header
370                            .ok_or(format_err!("Missing Event header"))
371                            .and_then(|header| EventHeader::try_from(header))?;
372
373                        if header.event_type != Self::TYPE {
374                            return Err(format_err!("Incorrect event type"));
375                        }
376
377                        $event_type { header, result }
378                    };
379                    Ok(event)
380                }
381            }
382        }
383    };
384    ($event_type:ident, $event_name:ident) => {
385        create_event!(event_type: $event_type, event_name: $event_name,
386                      payload: {
387                          data: {},
388                          client_protocols: {},
389                          server_protocols: {},
390                      },
391                      error_payload: {});
392    };
393}
394
395// To create a class for an event, use the above macro here.
396create_event!(Destroyed, destroyed);
397create_event!(Resolved, resolved);
398create_event!(Unresolved, unresolved);
399create_event!(Started, started);
400create_event!(
401    event_type: Stopped,
402    event_name: stopped,
403    payload: {
404        data: {
405            {
406                name: status,
407                ty: ExitStatus,
408            }
409            {
410                name: exit_code,
411                ty: Option<i64>,
412            }
413        },
414        client_protocols: {},
415        server_protocols: {},
416    },
417    error_payload: {}
418);
419create_event!(
420    event_type: CapabilityRequested,
421    event_name: capability_requested,
422    payload: {
423        data: {
424            {
425                name: name,
426                ty: String,
427            }
428        },
429        client_protocols: {},
430        server_protocols: {
431            {
432                name: capability,
433            }
434        },
435    },
436    error_payload: {
437        {
438            name: name,
439            ty: String,
440        }
441    }
442);
443create_event!(
444    event_type: DebugStarted,
445    event_name: debug_started,
446    payload: {
447        data: {
448            {
449                name: break_on_start,
450                ty: zx::EventPair,
451            }
452        },
453        client_protocols: {
454            {
455                name: runtime_dir,
456                ty: fio::DirectoryProxy,
457            }
458        },
459        server_protocols: {},
460    },
461    error_payload: {}
462);
463
464trait Coerce<T> {
465    fn coerce(self) -> Option<T>;
466}
467
468impl<T> Coerce<T> for Option<T> {
469    fn coerce(self) -> Option<T> {
470        self
471    }
472}
473
474impl<T> Coerce<Option<T>> for Option<T> {
475    fn coerce(self) -> Option<Option<T>> {
476        Some(self)
477    }
478}
479
480impl Coerce<ExitStatus> for Option<i32> {
481    fn coerce(self) -> Option<ExitStatus> {
482        self.map(Into::into)
483    }
484}