settings/message/
receptor.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::message::action_fuse::ActionFuseHandle;
6use crate::message::base::{MessageEvent, Signature, Status};
7use crate::message::message_client::MessageClient;
8use anyhow::{format_err, Error};
9use futures::channel::mpsc::UnboundedReceiver;
10use futures::task::{Context, Poll};
11use futures::{Stream, StreamExt};
12use std::pin::Pin;
13
14type EventReceiver = UnboundedReceiver<MessageEvent>;
15
16/// A Receptor is a wrapper around a channel dedicated towards either receiving
17/// top-level messages delivered to the recipient's address or replies to a
18/// message the recipient sent previously. Receptors are always paired with a
19/// Beacon.
20///
21/// Clients interact with the Receptor similar to a Receiver, waiting on a new
22/// MessageEvent via the watch method.
23pub struct Receptor {
24    signature: Signature,
25    event_rx: EventReceiver,
26    // Fuse to be triggered when all receptors go out of scope.
27    _fuse: ActionFuseHandle,
28    _chained_fuse: Option<ActionFuseHandle>,
29}
30
31impl Stream for Receptor {
32    type Item = MessageEvent;
33
34    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35        self.event_rx.poll_next_unpin(cx)
36    }
37}
38
39impl Receptor {
40    pub(super) fn new(
41        signature: Signature,
42        event_rx: EventReceiver,
43        fuse: ActionFuseHandle,
44        chained_fuse: Option<ActionFuseHandle>,
45    ) -> Self {
46        Self { signature, event_rx, _fuse: fuse, _chained_fuse: chained_fuse }
47    }
48
49    /// Returns the signature associated the top level messenger associated with
50    /// this receptor.
51    pub(crate) fn get_signature(&self) -> Signature {
52        self.signature
53    }
54
55    /// Returns the next pending payload, returning an Error if the origin
56    /// message (if any) was not deliverable or another error was encountered.
57    pub(crate) async fn next_payload(&mut self) -> Result<(crate::Payload, MessageClient), Error> {
58        while let Some(event) = self.next().await {
59            match event {
60                MessageEvent::Message(payload, client) => {
61                    return Ok((payload, client));
62                }
63                MessageEvent::Status(Status::Undeliverable) => {
64                    return Err(format_err!("origin message not delivered"));
65                }
66                _ => {}
67            }
68        }
69
70        Err(format_err!("could not retrieve payload"))
71    }
72
73    pub(crate) async fn next_of<T: TryFrom<crate::Payload>>(
74        &mut self,
75    ) -> Result<(T, MessageClient), Error>
76    where
77        <T as std::convert::TryFrom<crate::Payload>>::Error: std::fmt::Debug,
78    {
79        let (payload, client) = self.next_payload().await?;
80
81        let converted_payload = T::try_from(payload.clone())
82            .map(move |converted_payload| (converted_payload, client))
83            .map_err(|err| format_err!("conversion failed: {:?}", err));
84
85        // Treat any conversion failures as fatal.
86        if converted_payload.is_err() {
87            panic!("did not receive payload of expected type {payload:?}");
88        }
89
90        converted_payload
91    }
92
93    /// Loops until a message of the given type is received ignoring unmatched messages.
94    #[cfg(test)]
95    pub(crate) async fn next_of_type<T: TryFrom<crate::Payload>>(
96        &mut self,
97    ) -> Result<(T, MessageClient), Error>
98    where
99        <T as std::convert::TryFrom<crate::Payload>>::Error: std::fmt::Debug,
100    {
101        loop {
102            let (payload, client) = self.next_payload().await?;
103            let converted_payload = T::try_from(payload.clone())
104                .map(move |converted_payload| (converted_payload, client))
105                .map_err(|err| format_err!("conversion failed: {:?}", err));
106
107            if converted_payload.is_ok() {
108                return converted_payload;
109            }
110
111            // Just go on to the next message if not matched.
112        }
113    }
114
115    #[cfg(test)]
116    pub(crate) async fn wait_for_acknowledge(&mut self) -> Result<(), Error> {
117        while let Some(event) = self.next().await {
118            match event {
119                MessageEvent::Status(Status::Acknowledged) => {
120                    return Ok(());
121                }
122                MessageEvent::Status(Status::Undeliverable) => {
123                    return Err(format_err!("origin message not delivered"));
124                }
125                _ => {}
126            }
127        }
128
129        Err(format_err!("did not encounter acknowledged status"))
130    }
131}
132
133/// Extracts the payload from a given `MessageEvent`. Such event is provided
134/// in an optional argument to match the return value from `Receptor` stream.
135pub(crate) fn extract_payload(event: Option<MessageEvent>) -> Option<crate::Payload> {
136    if let Some(MessageEvent::Message(payload, _)) = event {
137        Some(payload)
138    } else {
139        None
140    }
141}