settings/message/base.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::message::action_fuse::ActionFuseHandle;
6use crate::message::beacon::Beacon;
7use crate::message::message_client::MessageClient;
8use crate::message::messenger::MessengerClient;
9use crate::message::receptor::Receptor;
10use futures::channel::mpsc::UnboundedSender;
11use futures::channel::oneshot::Sender;
12use std::fmt::{Debug, Formatter};
13use std::hash::Hash;
14use std::rc::Rc;
15use thiserror::Error;
16
17/// Trait alias for types of data that can be used as the payload in a
18/// MessageHub.
19pub trait Payload: Clone + Debug {}
20impl<T: Clone + Debug> Payload for T {}
21
22/// Trait alias for types of data that can be used as an address in a
23/// MessageHub.
24pub trait Address: Copy + Debug + Eq + Hash + Unpin {}
25impl<T: Copy + Debug + Eq + Hash + Unpin> Address for T {}
26
27/// `Filter` is used by the `MessageHub` to determine whether an incoming
28/// message should be directed to associated broker.
29pub type Filter = Rc<dyn Fn(&Message) -> bool>;
30
31/// A mod for housing common definitions for messengers. Messengers are
32/// MessageHub participants, which are capable of sending and receiving
33/// messages.
34pub(super) mod messenger {
35 use super::MessengerType;
36
37 /// `Descriptor` is a blueprint for creating a messenger. It is sent to the
38 /// MessageHub by clients, which interprets the information to build the
39 /// messenger.
40 #[derive(Clone)]
41 pub struct Descriptor {
42 /// The type of messenger to be created. This determines how messages
43 /// can be directed to a messenger created from this Descriptor.
44 /// Please reference [Audience](crate::message::base::Audience) to see how these types map
45 /// to audience targets.
46 pub messenger_type: MessengerType,
47 }
48}
49
50#[allow(clippy::large_enum_variant)] // TODO(https://fxbug.dev/401086933)
51/// A MessageEvent defines the data that can be returned through a message
52/// receptor.
53#[derive(Debug, PartialEq)]
54pub enum MessageEvent {
55 /// A message that has been delivered, either as a new message directed at to
56 /// the recipient's address or a reply to a previously sent message
57 /// (dependent on the receptor's context).
58 Message(crate::Payload, MessageClient),
59 /// A status update for the message that spawned the receptor delivering this
60 /// update.
61 Status(Status),
62}
63
64#[derive(Error, Debug, Clone)]
65pub enum MessageError {
66 #[error("Address conflig:{address:?} already exists")]
67 AddressConflict { address: crate::Address },
68 #[error("Unexpected Error")]
69 Unexpected,
70}
71
72/// The types of results possible from sending or replying.
73#[derive(Copy, Clone, PartialEq, Debug)]
74pub enum Status {
75 // Sent to some audience, potentially no one.
76 Broadcasted,
77 // Received by the intended address.
78 Received,
79 // Could not be delivered to the specified target.
80 Undeliverable,
81 // Acknowledged by a recipient.
82 Acknowledged,
83 Timeout,
84}
85
86/// The intended recipients for a message.
87#[derive(Clone, Debug, PartialEq, Hash, Eq)]
88pub enum Audience {
89 // All non-broker messengers outside of the sender.
90 Broadcast,
91 // The messenger at the specified address.
92 Address(crate::Address),
93 // The messenger with the specified signature.
94 Messenger(Signature),
95 // A messenger that is an event sink.
96 EventSink,
97}
98
99/// An identifier that can be used to send messages directly to a Messenger.
100/// Included with Message instances.
101#[derive(Copy, Clone, Debug, PartialEq, Hash, Eq)]
102pub enum Signature {
103 // Messenger at a given address.
104 Address(crate::Address),
105
106 // The messenger cannot be directly addressed.
107 Anonymous(MessengerId),
108}
109
110#[derive(Copy, Clone, Debug)]
111pub struct Fingerprint {
112 pub id: MessengerId,
113 pub signature: Signature,
114}
115
116/// The messengers that can participate in messaging
117#[derive(Clone)]
118pub enum MessengerType {
119 /// An endpoint in the messenger graph. Can have messages specifically
120 /// addressed to it and can author new messages.
121 Addressable(crate::Address),
122 /// A intermediary messenger. Will receive every forwarded message. Brokers
123 /// are able to send and reply to messages, but the main purpose is to observe
124 /// messages. An optional filter may be specified, which limits the messages
125 /// directed to this broker.
126 Broker(Filter),
127 /// Used in tests to observe broadcast events.
128 #[cfg(test)]
129 EventSink,
130 /// A messenger that cannot be reached by an address.
131 Unbound,
132}
133
134/// We must implement Debug since the `Filter` does not provide
135/// a `Debug` implementation.
136impl Debug for MessengerType {
137 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
138 match self {
139 MessengerType::Addressable(addr) => write!(f, "Addressable({addr:?})"),
140 MessengerType::Broker(_) => write!(f, "Broker"),
141 #[cfg(test)]
142 MessengerType::EventSink => write!(f, "EventSink"),
143 MessengerType::Unbound => write!(f, "Unbound"),
144 }
145 }
146}
147
148/// MessageType captures details about the Message's source.
149#[derive(Clone, Debug)]
150pub enum MessageType {
151 /// A completely new message that is intended for the specified audience.
152 Origin(Audience),
153 /// A response to a previously received message. Note that the value must
154 /// be boxed to mitigate recursive sizing issues as MessageType is held by
155 /// Message.
156 Reply(Box<Message>),
157}
158
159/// `Attribution` describes the relationship of the message path in relation
160/// to the author.
161#[derive(Clone, Debug)]
162pub enum Attribution {
163 /// `Source` attributed messages are the original messages to be sent on a
164 /// path. For example, a source attribution for an origin message type will
165 /// be authored by the original sender. In a reply message type, a source
166 /// attribution means the reply was authored by the original message's
167 /// intended target.
168 Source(MessageType),
169 /// `Derived` attributed messages are messages that have been modified by
170 /// someone in the message path. They follow the same trajectory (audience
171 /// or return path), but their message has been altered. The supplied
172 /// signature is the messenger that modified the specified message.
173 Derived(Box<Message>, Signature),
174}
175
176/// The core messaging unit. A Message may be annotated by messengers, but is
177/// not associated with a particular Messenger instance.
178#[derive(Clone, Debug)]
179pub struct Message {
180 author: Fingerprint,
181 payload: crate::Payload,
182 attribution: Attribution,
183 // The return path is generated while the message is passed from messenger
184 // to messenger on the way to the intended recipient. It indicates the
185 // messengers that would like to be informed of replies to this message.
186 // The message author is always the last element in this vector. New
187 // participants are pushed to the front.
188 return_path: Vec<Beacon>,
189}
190
191impl Message {
192 /// Returns a new Message instance. Only the MessageHub can mint new messages.
193 pub(super) fn new(
194 author: Fingerprint,
195 payload: crate::Payload,
196 attribution: Attribution,
197 ) -> Message {
198 let mut return_path = vec![];
199
200 // A derived message adopts the return path of the original message.
201 if let Attribution::Derived(message, _) = &attribution {
202 return_path.extend(message.get_return_path().iter().cloned());
203 }
204
205 Message { author, payload, attribution, return_path }
206 }
207
208 /// Adds an entity to be notified on any replies.
209 pub(super) fn add_participant(&mut self, participant: Beacon) {
210 self.return_path.insert(0, participant);
211 }
212
213 /// Returns the Signatures of messengers who have modified this message
214 /// through propagation.
215 #[cfg(test)]
216 pub(super) fn get_modifiers(&self) -> Vec<Signature> {
217 let mut modifiers = vec![];
218
219 if let Attribution::Derived(origin, signature) = &self.attribution {
220 modifiers.push(*signature);
221 modifiers.extend(origin.get_modifiers());
222 }
223
224 modifiers
225 }
226
227 pub(crate) fn get_author(&self) -> Signature {
228 match &self.attribution {
229 Attribution::Source(_) => self.author.signature,
230 Attribution::Derived(message, _) => message.get_author(),
231 }
232 }
233
234 /// Binds the action fuse to the author's receptor. The fuse will fire
235 /// once that receptor is released.
236 pub(super) async fn bind_to_author(&mut self, fuse: ActionFuseHandle) {
237 if let Some(beacon) = self.return_path.last_mut() {
238 beacon.add_fuse(fuse).await;
239 }
240 }
241
242 /// Returns the list of participants for the reply return path.
243 pub(super) fn get_return_path(&self) -> &Vec<Beacon> {
244 &self.return_path
245 }
246
247 /// Returns the message's attribution, which identifies whether it has been modified by a source
248 /// other than the original author.
249 pub(crate) fn get_attribution(&self) -> &Attribution {
250 &self.attribution
251 }
252
253 /// Returns the message's type.
254 pub(crate) fn get_type(&self) -> &MessageType {
255 match &self.attribution {
256 Attribution::Source(message_type) => message_type,
257 Attribution::Derived(message, _) => message.get_type(),
258 }
259 }
260
261 /// Returns a reference to the message's payload.
262 pub(crate) fn payload(&self) -> &crate::Payload {
263 &self.payload
264 }
265
266 /// Delivers the supplied status to all participants in the return path.
267 pub(super) async fn report_status(&self, status: Status) {
268 for beacon in &self.return_path {
269 // It's ok if the other end is already closed and does not get an update, so we can
270 // safely ignore the result here.
271 let _ = beacon.status(status).await;
272 }
273 }
274}
275
276/// Type definition for a sender handed by the MessageHub to messengers to
277/// send actions.
278pub(super) type ActionSender = UnboundedSender<(Fingerprint, MessageAction, Option<Beacon>)>;
279
280/// An internal identifier used by the MessageHub to identify messengers.
281pub(super) type MessengerId = usize;
282
283/// An internal identifier used by the `MessageHub` to identify `MessageClient`.
284pub(super) type MessageClientId = usize;
285
286pub(super) type CreateMessengerResult = Result<(MessengerClient, Receptor), MessageError>;
287
288/// Callback for handing back a messenger
289pub(super) type MessengerSender = Sender<CreateMessengerResult>;
290
291/// Callback for checking on messenger presence
292#[cfg(test)]
293pub(super) type MessengerPresenceSender = Sender<MessengerPresenceResult>;
294#[cfg(test)]
295pub(super) type MessengerPresenceResult = Result<bool, MessageError>;
296
297/// Type definition for a sender handed by the MessageHub to spawned components
298/// (messenger factories and messengers) to control messengers.
299pub(super) type MessengerActionSender = UnboundedSender<MessengerAction>;
300
301/// Internal representation of possible actions around a messenger.
302pub(super) enum MessengerAction {
303 /// Creates a top level messenger
304 Create(messenger::Descriptor, MessengerSender, MessengerActionSender),
305 #[cfg(test)]
306 /// Check whether a messenger exists for the given [`Signature`]
307 CheckPresence(Signature, MessengerPresenceSender),
308 /// Deletes a messenger by its [`Signature`]
309 DeleteBySignature(Signature),
310}
311
312/// Internal representation for possible actions on a message.
313#[derive(Debug)]
314pub(super) enum MessageAction {
315 // A new message sent to the specified audience.
316 Send(crate::Payload, Attribution),
317 // The message has been forwarded by the current holder.
318 Forward(Message),
319}