settings/message/
base.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::message::action_fuse::ActionFuseHandle;
6use crate::message::beacon::Beacon;
7use crate::message::message_client::MessageClient;
8use crate::message::messenger::MessengerClient;
9use crate::message::receptor::Receptor;
10use futures::channel::mpsc::UnboundedSender;
11use futures::channel::oneshot::Sender;
12use std::fmt::{Debug, Formatter};
13use std::hash::Hash;
14use std::rc::Rc;
15use thiserror::Error;
16
17/// Trait alias for types of data that can be used as the payload in a
18/// MessageHub.
19pub trait Payload: Clone + Debug {}
20impl<T: Clone + Debug> Payload for T {}
21
22/// Trait alias for types of data that can be used as an address in a
23/// MessageHub.
24pub trait Address: Copy + Debug + Eq + Hash + Unpin {}
25impl<T: Copy + Debug + Eq + Hash + Unpin> Address for T {}
26
27/// `Filter` is used by the `MessageHub` to determine whether an incoming
28/// message should be directed to associated broker.
29pub type Filter = Rc<dyn Fn(&Message) -> bool>;
30
31/// A mod for housing common definitions for messengers. Messengers are
32/// MessageHub participants, which are capable of sending and receiving
33/// messages.
34pub(super) mod messenger {
35    use super::MessengerType;
36
37    /// `Descriptor` is a blueprint for creating a messenger. It is sent to the
38    /// MessageHub by clients, which interprets the information to build the
39    /// messenger.
40    #[derive(Clone)]
41    pub struct Descriptor {
42        /// The type of messenger to be created. This determines how messages
43        /// can be directed to a messenger created from this Descriptor.
44        /// Please reference [Audience](crate::message::base::Audience) to see how these types map
45        /// to audience targets.
46        pub messenger_type: MessengerType,
47    }
48}
49
50#[allow(clippy::large_enum_variant)] // TODO(https://fxbug.dev/401086933)
51/// A MessageEvent defines the data that can be returned through a message
52/// receptor.
53#[derive(Debug, PartialEq)]
54pub enum MessageEvent {
55    /// A message that has been delivered, either as a new message directed at to
56    /// the recipient's address or a reply to a previously sent message
57    /// (dependent on the receptor's context).
58    Message(crate::Payload, MessageClient),
59    /// A status update for the message that spawned the receptor delivering this
60    /// update.
61    Status(Status),
62}
63
64#[derive(Error, Debug, Clone)]
65pub enum MessageError {
66    #[error("Address conflig:{address:?} already exists")]
67    AddressConflict { address: crate::Address },
68    #[error("Unexpected Error")]
69    Unexpected,
70}
71
72/// The types of results possible from sending or replying.
73#[derive(Copy, Clone, PartialEq, Debug)]
74pub enum Status {
75    // Sent to some audience, potentially no one.
76    Broadcasted,
77    // Received by the intended address.
78    Received,
79    // Could not be delivered to the specified target.
80    Undeliverable,
81    // Acknowledged by a recipient.
82    Acknowledged,
83    Timeout,
84}
85
86/// The intended recipients for a message.
87#[derive(Clone, Debug, PartialEq, Hash, Eq)]
88pub enum Audience {
89    // All non-broker messengers outside of the sender.
90    Broadcast,
91    // The messenger at the specified address.
92    Address(crate::Address),
93    // The messenger with the specified signature.
94    Messenger(Signature),
95    // A messenger that is an event sink.
96    EventSink,
97}
98
99/// An identifier that can be used to send messages directly to a Messenger.
100/// Included with Message instances.
101#[derive(Copy, Clone, Debug, PartialEq, Hash, Eq)]
102pub enum Signature {
103    // Messenger at a given address.
104    Address(crate::Address),
105
106    // The messenger cannot be directly addressed.
107    Anonymous(MessengerId),
108}
109
110#[derive(Copy, Clone, Debug)]
111pub struct Fingerprint {
112    pub id: MessengerId,
113    pub signature: Signature,
114}
115
116/// The messengers that can participate in messaging
117#[derive(Clone)]
118pub enum MessengerType {
119    /// An endpoint in the messenger graph. Can have messages specifically
120    /// addressed to it and can author new messages.
121    Addressable(crate::Address),
122    /// A intermediary messenger. Will receive every forwarded message. Brokers
123    /// are able to send and reply to messages, but the main purpose is to observe
124    /// messages. An optional filter may be specified, which limits the messages
125    /// directed to this broker.
126    Broker(Filter),
127    /// Used in tests to observe broadcast events.
128    #[cfg(test)]
129    EventSink,
130    /// A messenger that cannot be reached by an address.
131    Unbound,
132}
133
134/// We must implement Debug since the `Filter` does not provide
135/// a `Debug` implementation.
136impl Debug for MessengerType {
137    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
138        match self {
139            MessengerType::Addressable(addr) => write!(f, "Addressable({addr:?})"),
140            MessengerType::Broker(_) => write!(f, "Broker"),
141            #[cfg(test)]
142            MessengerType::EventSink => write!(f, "EventSink"),
143            MessengerType::Unbound => write!(f, "Unbound"),
144        }
145    }
146}
147
148/// MessageType captures details about the Message's source.
149#[derive(Clone, Debug)]
150pub enum MessageType {
151    /// A completely new message that is intended for the specified audience.
152    Origin(Audience),
153    /// A response to a previously received message. Note that the value must
154    /// be boxed to mitigate recursive sizing issues as MessageType is held by
155    /// Message.
156    Reply(Box<Message>),
157}
158
159/// `Attribution` describes the relationship of the message path in relation
160/// to the author.
161#[derive(Clone, Debug)]
162pub enum Attribution {
163    /// `Source` attributed messages are the original messages to be sent on a
164    /// path. For example, a source attribution for an origin message type will
165    /// be authored by the original sender. In a reply message type, a source
166    /// attribution means the reply was authored by the original message's
167    /// intended target.
168    Source(MessageType),
169    /// `Derived` attributed messages are messages that have been modified by
170    /// someone in the message path. They follow the same trajectory (audience
171    /// or return path), but their message has been altered. The supplied
172    /// signature is the messenger that modified the specified message.
173    Derived(Box<Message>, Signature),
174}
175
176/// The core messaging unit. A Message may be annotated by messengers, but is
177/// not associated with a particular Messenger instance.
178#[derive(Clone, Debug)]
179pub struct Message {
180    author: Fingerprint,
181    payload: crate::Payload,
182    attribution: Attribution,
183    // The return path is generated while the message is passed from messenger
184    // to messenger on the way to the intended recipient. It indicates the
185    // messengers that would like to be informed of replies to this message.
186    // The message author is always the last element in this vector. New
187    // participants are pushed to the front.
188    return_path: Vec<Beacon>,
189}
190
191impl Message {
192    /// Returns a new Message instance. Only the MessageHub can mint new messages.
193    pub(super) fn new(
194        author: Fingerprint,
195        payload: crate::Payload,
196        attribution: Attribution,
197    ) -> Message {
198        let mut return_path = vec![];
199
200        // A derived message adopts the return path of the original message.
201        if let Attribution::Derived(message, _) = &attribution {
202            return_path.extend(message.get_return_path().iter().cloned());
203        }
204
205        Message { author, payload, attribution, return_path }
206    }
207
208    /// Adds an entity to be notified on any replies.
209    pub(super) fn add_participant(&mut self, participant: Beacon) {
210        self.return_path.insert(0, participant);
211    }
212
213    /// Returns the Signatures of messengers who have modified this message
214    /// through propagation.
215    #[cfg(test)]
216    pub(super) fn get_modifiers(&self) -> Vec<Signature> {
217        let mut modifiers = vec![];
218
219        if let Attribution::Derived(origin, signature) = &self.attribution {
220            modifiers.push(*signature);
221            modifiers.extend(origin.get_modifiers());
222        }
223
224        modifiers
225    }
226
227    pub(crate) fn get_author(&self) -> Signature {
228        match &self.attribution {
229            Attribution::Source(_) => self.author.signature,
230            Attribution::Derived(message, _) => message.get_author(),
231        }
232    }
233
234    /// Binds the action fuse to the author's receptor. The fuse will fire
235    /// once that receptor is released.
236    pub(super) async fn bind_to_author(&mut self, fuse: ActionFuseHandle) {
237        if let Some(beacon) = self.return_path.last_mut() {
238            beacon.add_fuse(fuse).await;
239        }
240    }
241
242    /// Returns the list of participants for the reply return path.
243    pub(super) fn get_return_path(&self) -> &Vec<Beacon> {
244        &self.return_path
245    }
246
247    /// Returns the message's attribution, which identifies whether it has been modified by a source
248    /// other than the original author.
249    pub(crate) fn get_attribution(&self) -> &Attribution {
250        &self.attribution
251    }
252
253    /// Returns the message's type.
254    pub(crate) fn get_type(&self) -> &MessageType {
255        match &self.attribution {
256            Attribution::Source(message_type) => message_type,
257            Attribution::Derived(message, _) => message.get_type(),
258        }
259    }
260
261    /// Returns a reference to the message's payload.
262    pub(crate) fn payload(&self) -> &crate::Payload {
263        &self.payload
264    }
265
266    /// Delivers the supplied status to all participants in the return path.
267    pub(super) async fn report_status(&self, status: Status) {
268        for beacon in &self.return_path {
269            // It's ok if the other end is already closed and does not get an update, so we can
270            // safely ignore the result here.
271            let _ = beacon.status(status).await;
272        }
273    }
274}
275
276/// Type definition for a sender handed by the MessageHub to messengers to
277/// send actions.
278pub(super) type ActionSender = UnboundedSender<(Fingerprint, MessageAction, Option<Beacon>)>;
279
280/// An internal identifier used by the MessageHub to identify messengers.
281pub(super) type MessengerId = usize;
282
283/// An internal identifier used by the `MessageHub` to identify `MessageClient`.
284pub(super) type MessageClientId = usize;
285
286pub(super) type CreateMessengerResult = Result<(MessengerClient, Receptor), MessageError>;
287
288/// Callback for handing back a messenger
289pub(super) type MessengerSender = Sender<CreateMessengerResult>;
290
291/// Callback for checking on messenger presence
292#[cfg(test)]
293pub(super) type MessengerPresenceSender = Sender<MessengerPresenceResult>;
294#[cfg(test)]
295pub(super) type MessengerPresenceResult = Result<bool, MessageError>;
296
297/// Type definition for a sender handed by the MessageHub to spawned components
298/// (messenger factories and messengers) to control messengers.
299pub(super) type MessengerActionSender = UnboundedSender<MessengerAction>;
300
301/// Internal representation of possible actions around a messenger.
302pub(super) enum MessengerAction {
303    /// Creates a top level messenger
304    Create(messenger::Descriptor, MessengerSender, MessengerActionSender),
305    #[cfg(test)]
306    /// Check whether a messenger exists for the given [`Signature`]
307    CheckPresence(Signature, MessengerPresenceSender),
308    /// Deletes a messenger by its [`Signature`]
309    DeleteBySignature(Signature),
310}
311
312/// Internal representation for possible actions on a message.
313#[derive(Debug)]
314pub(super) enum MessageAction {
315    // A new message sent to the specified audience.
316    Send(crate::Payload, Attribution),
317    // The message has been forwarded by the current holder.
318    Forward(Message),
319}