settings/message/
message_hub.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::message::action_fuse::ActionFuse;
6use crate::message::base::{
7    ActionSender, Attribution, Audience, Filter, Fingerprint, Message, MessageAction,
8    MessageClientId, MessageError, MessageType, MessengerAction, MessengerId, MessengerType,
9    Signature, Status,
10};
11use crate::message::beacon::{Beacon, BeaconBuilder};
12use crate::message::delegate::Delegate;
13use crate::message::messenger::{Messenger, MessengerClient};
14use crate::{trace, trace_guard};
15use anyhow::format_err;
16use futures::StreamExt;
17use std::borrow::Cow;
18use std::collections::{HashMap, HashSet};
19use std::rc::Rc;
20use {fuchsia_async as fasync, fuchsia_trace as ftrace};
21
22/// Type definition for exit message sender.
23type ExitSender = futures::channel::mpsc::UnboundedSender<()>;
24
25#[derive(thiserror::Error, Debug, Clone)]
26pub enum Error {
27    #[error("Failed to send response for operation: {0:?}")]
28    ResponseSendFail(Cow<'static, str>),
29    #[error("Messenger not present")]
30    MessengerNotFound,
31}
32
33/// `Broker` captures the information necessary to process messages to a broker.
34#[derive(Clone)]
35struct Broker {
36    /// The `MessengerId` associated with the broker so that it can be distinguished from other
37    /// messengers.
38    messenger_id: MessengerId,
39    /// A condition that is applied to a message to determine whether it should be directed to the
40    /// broker.
41    filter: Filter,
42}
43
44impl PartialEq for Broker {
45    fn eq(&self, other: &Self) -> bool {
46        // Since each broker has a unique [`MessengerId`], it is implied that any brokers that share
47        // the same [`MessengerId`] are the same, having matching filters as well.
48        self.messenger_id == other.messenger_id
49    }
50}
51
52/// The MessageHub controls the message flow for a set of messengers. It
53/// processes actions upon messages, incorporates brokers, and signals receipt
54/// of messages.
55pub struct MessageHub {
56    /// A sender given to messengers to signal actions upon the MessageHub.
57    action_tx: ActionSender,
58    /// Address mapping for looking up messengers. Used for sending messages
59    /// to an addressable recipient.
60    addresses: HashMap<crate::Address, MessengerId>,
61    /// Set of messengers acting as event sinks.
62    sinks: HashSet<MessengerId>,
63    /// Mapping of registered messengers (including brokers) to beacons. Used for
64    /// delivering messages from a resolved address or a list of participants.
65    beacons: HashMap<MessengerId, Beacon>,
66    /// An ordered set of messengers who will be forwarded messages.
67    brokers: Vec<Broker>,
68    /// The next id to be given to a messenger.
69    next_id: MessengerId,
70    /// The next id to be given to a `MessageClient`.
71    next_message_client_id: MessageClientId,
72    /// Indicates whether the messenger channel has closed.
73    messenger_channel_closed: bool,
74    /// Sender to signal when the hub should exit.
75    exit_tx: ExitSender,
76}
77
78impl MessageHub {
79    /// Returns a new MessageHub for the given types.
80    pub(crate) fn create() -> Delegate {
81        let (action_tx, mut action_rx) =
82            futures::channel::mpsc::unbounded::<(Fingerprint, MessageAction, Option<Beacon>)>();
83        let (messenger_tx, mut messenger_rx) =
84            futures::channel::mpsc::unbounded::<MessengerAction>();
85
86        let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>();
87
88        let mut hub = MessageHub {
89            next_id: 0,
90            next_message_client_id: 0,
91            action_tx,
92            beacons: HashMap::new(),
93            addresses: HashMap::new(),
94            sinks: HashSet::new(),
95            brokers: Vec::new(),
96            messenger_channel_closed: false,
97            exit_tx,
98        };
99
100        fasync::Task::local(async move {
101            let id = ftrace::Id::new();
102
103            trace!(id, c"message hub");
104            loop {
105                // We must prioritize the action futures. Exit actions
106                // take absolute priority. Message actions are ordered before
107                // messenger in case the messenger is subsequently deleted.
108                futures::select_biased! {
109                    _ = exit_rx.next() => {
110                        break;
111                    }
112                    message_action = action_rx.select_next_some() => {
113                        trace!(
114                            id,
115                            c"message action"
116                        );
117                        let (fingerprint, action, beacon) = message_action;
118                        hub.process_request(id, fingerprint, action, beacon).await;
119                    }
120                    messenger_action = messenger_rx.next() => {
121                        trace!(
122                            id,
123                            c"messenger action"
124                        );
125                        match messenger_action {
126                            Some(action) => {
127                                hub.process_messenger_request(id, action).await;
128                            }
129                            None => {
130                                hub.messenger_channel_closed = true;
131                                hub.check_exit();
132                            }
133                        }
134                    }
135                }
136            }
137        })
138        .detach();
139
140        Delegate::new(messenger_tx)
141    }
142
143    fn check_exit(&self) {
144        if self.messenger_channel_closed && self.beacons.is_empty() {
145            // We can ignore the result. If exit_tx fails to send, the task has already ended.
146            let _ = self.exit_tx.unbounded_send(());
147        }
148    }
149
150    // Determines whether the beacon belongs to a broker.
151    fn is_broker(&self, messenger_id: MessengerId) -> bool {
152        self.brokers.iter().any(|broker| broker.messenger_id == messenger_id)
153    }
154
155    // Derives the underlying MessengerId from a Signature.
156    fn resolve_messenger_id(&self, signature: &Signature) -> Result<MessengerId, Error> {
157        Ok(match signature {
158            Signature::Anonymous(id) => *id,
159            Signature::Address(address) => {
160                *self.addresses.get(address).ok_or(Error::MessengerNotFound)?
161            }
162        })
163    }
164
165    /// Internally routes a message to the next appropriate receiver. New messages
166    /// are routed based on the intended recipient(s), while replies follow the
167    /// return path of the source message. The provided sender id represents the
168    /// id of the current messenger possessing the message and not necessarily
169    /// the original author.
170    async fn send_to_next(&mut self, id: ftrace::Id, sender_id: MessengerId, message: Message) {
171        trace!(id, c"send_to_next");
172        let mut recipients = vec![];
173
174        let message_type = message.get_type();
175
176        let mut require_delivery = false;
177
178        // Replies have a predetermined return path.
179        if let MessageType::Reply(source) = message_type {
180            // The original author of the reply will be the first participant after brokers in
181            // the reply's return path. Otherwise, identify current sender in the source return path
182            // and forward to next participant.
183            let source_return_path = source.get_return_path();
184            let mut target_index = None;
185
186            let source_return_path_messenger_ids: HashSet<MessengerId> =
187                source_return_path.iter().map(|beacon| beacon.get_messenger_id()).collect();
188
189            // Identify participating brokers. This brokers must:
190            // 1. Not be already participating in the return path (with a spawned observer)
191            // 2. Not be the author of the reply.
192            // 3. Have a matching filter.
193            let broker_ids: Vec<_> = self
194                .brokers
195                .iter()
196                .filter(|broker| {
197                    !source_return_path_messenger_ids.contains(&broker.messenger_id)
198                        && self
199                            .resolve_messenger_id(&message.get_author())
200                            .map_or(true, |id| id != broker.messenger_id)
201                        && (broker.filter)(&message)
202                })
203                .map(|broker| broker.messenger_id)
204                .collect();
205
206            let mut return_path: Vec<Beacon> = broker_ids
207                .iter()
208                .map(|broker_id| {
209                    self.beacons.get(broker_id).expect("beacon should resolve").clone()
210                })
211                .collect();
212
213            // The return path places the participating brokers before the participants from the
214            // source message's reply path.
215            return_path.extend(source_return_path.iter().cloned());
216            let last_index = return_path.len() - 1;
217
218            if self.is_broker(sender_id) && !source_return_path_messenger_ids.contains(&sender_id) {
219                // If the sender is in the return path as a broker and not a participant in the
220                // source message's return path, determine next broker to forward to.
221                let mut candidate_index = self
222                    .brokers
223                    .iter()
224                    .position(|broker| broker.messenger_id == sender_id)
225                    .expect("broker should be found")
226                    + 1;
227
228                // A candidate broker is one that is after the sending broker, has a filter
229                // matching the current message, and is not in the return path already.
230                while candidate_index < self.brokers.len() && target_index.is_none() {
231                    target_index = broker_ids.iter().position(|broker_id| {
232                        *broker_id == self.brokers[candidate_index].messenger_id
233                            && !source_return_path_messenger_ids.contains(broker_id)
234                    });
235
236                    candidate_index += 1;
237                }
238
239                // If we can't find a next broker, we should skip over those considered.
240                if target_index.is_none() {
241                    target_index = Some(broker_ids.len());
242                }
243            } else if sender_id == message.get_return_path()[0].get_messenger_id()
244                && !matches!(message.get_attribution(), Attribution::Derived(..))
245            {
246                // If this is the reply's original author, send to the first
247                // messenger in the original message's return path.
248                target_index = Some(0);
249
250                // Mark source message as delivered. In the case the sender is the
251                // original intended audience, this will be a no-op. However, if the
252                // reply comes beforehand, this will ensure the message is properly
253                // acknowledged.
254                source.report_status(Status::Received).await;
255            } else {
256                for (index, beacon) in return_path.iter().enumerate().take(last_index) {
257                    if beacon.get_messenger_id() == sender_id {
258                        target_index = Some(index + 1);
259                    }
260                }
261            }
262
263            if let Some(index) = target_index {
264                recipients.push(return_path.swap_remove(index));
265
266                if index == last_index {
267                    // Ack current message if being sent to intended recipient.
268                    message.report_status(Status::Received).await;
269                }
270            }
271        } else if let Some(beacon) = self.beacons.get(&sender_id) {
272            let author_id = self
273                .resolve_messenger_id(&message.get_author())
274                .expect("messenger should be present");
275
276            // If the message is not a reply, determine if the current sender is a broker.
277            // In the case of a broker, the message should be forwarded to the next
278            // broker.
279            let mut target_messengers: Vec<_> = {
280                // The author cannot participate as a broker.
281                let iter = self
282                    .brokers
283                    .iter()
284                    .filter(|&broker| broker.messenger_id != author_id && (broker.filter)(&message))
285                    .map(|broker| broker.messenger_id);
286
287                let should_find_sender = {
288                    let beacon_messenger_id = beacon.get_messenger_id();
289                    beacon_messenger_id != author_id && self.is_broker(beacon_messenger_id)
290                };
291
292                if should_find_sender {
293                    // Ignore until we find the matching broker, then move the next broker one over.
294                    iter.skip_while(|&id| id != sender_id).skip(1).take(1).collect()
295                } else {
296                    iter.take(1).collect()
297                }
298            };
299
300            // If no broker was added, the original target now should participate.
301            if target_messengers.is_empty() {
302                if let MessageType::Origin(audience) = message_type {
303                    if let Ok((resolved_messengers, delivery_required)) =
304                        self.resolve_audience(sender_id, audience)
305                    {
306                        target_messengers.append(&mut Vec::from_iter(resolved_messengers));
307                        require_delivery |= delivery_required;
308                    } else {
309                        // This error will occur if the sender specifies a non-existent
310                        // address.
311                        message.report_status(Status::Undeliverable).await;
312                    }
313
314                    if let Audience::Broadcast = audience {
315                        // Broadcasts don't require any audience.
316                        message.report_status(Status::Broadcasted).await;
317                    }
318                }
319            }
320
321            // Translate selected messengers into beacon
322            for messenger in target_messengers {
323                if let Some(beacon) = self.beacons.get(&messenger) {
324                    recipients.push(beacon.clone());
325                }
326            }
327        }
328
329        let mut successful_delivery = None;
330        // Send message to each specified recipient.
331        for recipient in recipients {
332            if recipient.deliver(message.clone(), self.next_message_client_id).await.is_ok() {
333                self.next_message_client_id += 1;
334                if successful_delivery.is_none() {
335                    successful_delivery = Some(true);
336                }
337            } else {
338                successful_delivery = Some(false);
339            }
340        }
341
342        if require_delivery {
343            message
344                .report_status(if let Some(true) = successful_delivery {
345                    Status::Received
346                } else {
347                    Status::Undeliverable
348                })
349                .await;
350        }
351    }
352
353    /// Resolves the audience into a set of MessengerIds. Also returns whether
354    /// delivery is required (broadcasts for example don't require delivery
355    /// confirmation). If there is an issue resolving an audience, an error
356    /// is returned. Errors should halt any further processing on the audience
357    /// set.
358    fn resolve_audience(
359        &self,
360        sender_id: MessengerId,
361        audience: &Audience,
362    ) -> Result<(HashSet<MessengerId>, bool), anyhow::Error> {
363        let mut return_set = HashSet::new();
364        let mut delivery_required = false;
365
366        match audience {
367            Audience::Address(address) => {
368                delivery_required = true;
369                if let Some(&messenger_id) = self.addresses.get(address) {
370                    let _ = return_set.insert(messenger_id);
371                } else {
372                    return Err(format_err!("could not resolve address"));
373                }
374            }
375            Audience::EventSink => {
376                return_set.extend(self.sinks.iter());
377            }
378            Audience::Messenger(signature) => {
379                delivery_required = true;
380                match signature {
381                    Signature::Address(address) => {
382                        if let Some(&messenger_id) = self.addresses.get(address) {
383                            let _ = return_set.insert(messenger_id);
384                        } else {
385                            return Err(format_err!("could not resolve signature"));
386                        }
387                    }
388                    Signature::Anonymous(id) => {
389                        let _ = return_set.insert(*id);
390                    }
391                }
392            }
393            Audience::Broadcast => {
394                // Gather all messengers
395                for &id in self.beacons.keys() {
396                    if id != sender_id && !self.is_broker(id) {
397                        let _ = return_set.insert(id);
398                    }
399                }
400            }
401        }
402
403        Ok((return_set, delivery_required))
404    }
405
406    async fn process_messenger_request(&mut self, id: ftrace::Id, action: MessengerAction) {
407        match action {
408            MessengerAction::Create(messenger_descriptor, responder, messenger_tx) => {
409                trace!(
410                    id,
411                    c"process messenger request create",
412                    "messenger_type" => format!("{:?}", messenger_descriptor.messenger_type).as_str()
413                );
414
415                let mut optional_address = None;
416                if let MessengerType::Addressable(address) = messenger_descriptor.messenger_type {
417                    if self.addresses.contains_key(&address) {
418                        // Ignore the result since an error would imply the other side is already
419                        // closed.
420                        let _ = responder.send(Err(MessageError::AddressConflict { address }));
421                        return;
422                    }
423                    optional_address = Some(address);
424                }
425
426                let id = self.next_id;
427                let signature = if let Some(address) = optional_address {
428                    Signature::Address(address)
429                } else {
430                    Signature::Anonymous(id)
431                };
432
433                let messenger =
434                    Messenger::new(Fingerprint { id, signature }, self.action_tx.clone());
435
436                // Create fuse to delete Messenger.
437                let fuse = ActionFuse::create(Box::new(move || {
438                    // Do not send deletion request if other side is closed.
439                    if messenger_tx.is_closed() {
440                        return;
441                    }
442
443                    // ActionFuse drop method might cause the send failed.
444                    messenger_tx
445                        .unbounded_send(MessengerAction::DeleteBySignature(signature))
446                        .unwrap_or_else(|_| {
447                            log::warn!(
448                                "messenger_tx failed to send delete action for signature: {:?}",
449                                signature
450                            )
451                        });
452                }));
453
454                self.next_id += 1;
455                let (beacon, receptor) =
456                    BeaconBuilder::new(messenger.clone()).add_fuse(Rc::clone(&fuse)).build();
457                let _ = self.beacons.insert(id, beacon);
458
459                match messenger_descriptor.messenger_type {
460                    MessengerType::Broker(filter) => {
461                        self.brokers.push(Broker { messenger_id: id, filter });
462                    }
463                    MessengerType::Addressable(address) => {
464                        let _ = self.addresses.insert(address, id);
465                    }
466                    #[cfg(test)]
467                    MessengerType::EventSink => {
468                        let _ = self.sinks.insert(id);
469                    }
470                    MessengerType::Unbound => {
471                        // We do not track Unbounded messengers.
472                    }
473                }
474
475                let response_result =
476                    responder.send(Ok((MessengerClient::new(messenger, fuse), receptor)));
477                #[allow(clippy::redundant_pattern_matching)]
478                if let Err(_) = response_result {
479                    log::warn!(
480                        "Receiving end of oneshot closed while trying to create messenger client \
481                            for client with id: {}",
482                        id,
483                    );
484                }
485            }
486            #[cfg(test)]
487            MessengerAction::CheckPresence(signature, responder) => {
488                trace!(id, c"process messenger request check presence");
489                let _ = responder.send(Ok(self.resolve_messenger_id(&signature).is_ok()));
490            }
491            MessengerAction::DeleteBySignature(signature) => {
492                trace!(id, c"process messenger request delete");
493                self.delete_by_signature(signature)
494            }
495        }
496    }
497
498    fn delete_by_signature(&mut self, signature: Signature) {
499        let id = self.resolve_messenger_id(&signature).expect("messenger should be present");
500
501        // Clean up roles
502        let _ = self.sinks.remove(&id);
503
504        // These are all safe if the containers don't contain any items matching `id`.
505        let _ = self.beacons.remove(&id);
506        self.brokers.retain(|broker| id != broker.messenger_id);
507        if let Signature::Address(address) = signature {
508            let _ = self.addresses.remove(&address);
509        }
510
511        self.check_exit();
512    }
513
514    // Translates messenger requests into actions upon the MessageHub.
515    async fn process_request(
516        &mut self,
517        id: ftrace::Id,
518        fingerprint: Fingerprint,
519        action: MessageAction,
520        beacon: Option<Beacon>,
521    ) {
522        let (mut outgoing_message, _guard) = match action {
523            MessageAction::Send(payload, message_type) => {
524                let guard = trace_guard!(
525                    id,
526                    c"process request send",
527                    "payload" => format!("{payload:?}").as_str()
528                );
529                (Message::new(fingerprint, payload, message_type), guard)
530            }
531            MessageAction::Forward(forwarded_message) => {
532                let guard = trace_guard!(id, c"process request forward");
533                if let Some(beacon) = self.beacons.get(&fingerprint.id) {
534                    match forwarded_message.get_type() {
535                        MessageType::Origin(audience) => {
536                            // Can't forward messages meant for forwarder
537                            if Audience::Messenger(fingerprint.signature) == *audience {
538                                return;
539                            }
540                            // Ignore forward requests from leafs in broadcast
541                            if !self.is_broker(beacon.get_messenger_id()) {
542                                return;
543                            }
544                        }
545                        MessageType::Reply(source) => {
546                            if let Some(recipient) = source.get_return_path().last() {
547                                // If the reply recipient drops the message, do not forward.
548                                if recipient.get_messenger_id() == fingerprint.id {
549                                    return;
550                                }
551                            } else {
552                                // Every reply should have a return path.
553                                forwarded_message.report_status(Status::Undeliverable).await;
554                                return;
555                            }
556                        }
557                    }
558                } else {
559                    forwarded_message.report_status(Status::Undeliverable).await;
560                    return;
561                }
562                (forwarded_message, guard)
563            }
564        };
565
566        if let Some(handle) = beacon {
567            outgoing_message.add_participant(handle);
568        }
569
570        self.send_to_next(id, fingerprint.id, outgoing_message).await;
571    }
572}