1use crate::message::action_fuse::ActionFuse;
6use crate::message::base::{
7 ActionSender, Attribution, Audience, Filter, Fingerprint, Message, MessageAction,
8 MessageClientId, MessageError, MessageType, MessengerAction, MessengerId, MessengerType,
9 Signature, Status,
10};
11use crate::message::beacon::{Beacon, BeaconBuilder};
12use crate::message::delegate::Delegate;
13use crate::message::messenger::{Messenger, MessengerClient};
14use crate::{trace, trace_guard};
15use anyhow::format_err;
16use futures::StreamExt;
17use std::borrow::Cow;
18use std::collections::{HashMap, HashSet};
19use std::rc::Rc;
20use {fuchsia_async as fasync, fuchsia_trace as ftrace};
21
22type ExitSender = futures::channel::mpsc::UnboundedSender<()>;
24
25#[derive(thiserror::Error, Debug, Clone)]
26pub enum Error {
27 #[error("Failed to send response for operation: {0:?}")]
28 ResponseSendFail(Cow<'static, str>),
29 #[error("Messenger not present")]
30 MessengerNotFound,
31}
32
33#[derive(Clone)]
35struct Broker {
36 messenger_id: MessengerId,
39 filter: Filter,
42}
43
44impl PartialEq for Broker {
45 fn eq(&self, other: &Self) -> bool {
46 self.messenger_id == other.messenger_id
49 }
50}
51
52pub struct MessageHub {
56 action_tx: ActionSender,
58 addresses: HashMap<crate::Address, MessengerId>,
61 sinks: HashSet<MessengerId>,
63 beacons: HashMap<MessengerId, Beacon>,
66 brokers: Vec<Broker>,
68 next_id: MessengerId,
70 next_message_client_id: MessageClientId,
72 messenger_channel_closed: bool,
74 exit_tx: ExitSender,
76}
77
78impl MessageHub {
79 pub(crate) fn create() -> Delegate {
81 let (action_tx, mut action_rx) =
82 futures::channel::mpsc::unbounded::<(Fingerprint, MessageAction, Option<Beacon>)>();
83 let (messenger_tx, mut messenger_rx) =
84 futures::channel::mpsc::unbounded::<MessengerAction>();
85
86 let (exit_tx, mut exit_rx) = futures::channel::mpsc::unbounded::<()>();
87
88 let mut hub = MessageHub {
89 next_id: 0,
90 next_message_client_id: 0,
91 action_tx,
92 beacons: HashMap::new(),
93 addresses: HashMap::new(),
94 sinks: HashSet::new(),
95 brokers: Vec::new(),
96 messenger_channel_closed: false,
97 exit_tx,
98 };
99
100 fasync::Task::local(async move {
101 let id = ftrace::Id::new();
102
103 trace!(id, c"message hub");
104 loop {
105 futures::select_biased! {
109 _ = exit_rx.next() => {
110 break;
111 }
112 message_action = action_rx.select_next_some() => {
113 trace!(
114 id,
115 c"message action"
116 );
117 let (fingerprint, action, beacon) = message_action;
118 hub.process_request(id, fingerprint, action, beacon).await;
119 }
120 messenger_action = messenger_rx.next() => {
121 trace!(
122 id,
123 c"messenger action"
124 );
125 match messenger_action {
126 Some(action) => {
127 hub.process_messenger_request(id, action).await;
128 }
129 None => {
130 hub.messenger_channel_closed = true;
131 hub.check_exit();
132 }
133 }
134 }
135 }
136 }
137 })
138 .detach();
139
140 Delegate::new(messenger_tx)
141 }
142
143 fn check_exit(&self) {
144 if self.messenger_channel_closed && self.beacons.is_empty() {
145 let _ = self.exit_tx.unbounded_send(());
147 }
148 }
149
150 fn is_broker(&self, messenger_id: MessengerId) -> bool {
152 self.brokers.iter().any(|broker| broker.messenger_id == messenger_id)
153 }
154
155 fn resolve_messenger_id(&self, signature: &Signature) -> Result<MessengerId, Error> {
157 Ok(match signature {
158 Signature::Anonymous(id) => *id,
159 Signature::Address(address) => {
160 *self.addresses.get(address).ok_or(Error::MessengerNotFound)?
161 }
162 })
163 }
164
165 async fn send_to_next(&mut self, id: ftrace::Id, sender_id: MessengerId, message: Message) {
171 trace!(id, c"send_to_next");
172 let mut recipients = vec![];
173
174 let message_type = message.get_type();
175
176 let mut require_delivery = false;
177
178 if let MessageType::Reply(source) = message_type {
180 let source_return_path = source.get_return_path();
184 let mut target_index = None;
185
186 let source_return_path_messenger_ids: HashSet<MessengerId> =
187 source_return_path.iter().map(|beacon| beacon.get_messenger_id()).collect();
188
189 let broker_ids: Vec<_> = self
194 .brokers
195 .iter()
196 .filter(|broker| {
197 !source_return_path_messenger_ids.contains(&broker.messenger_id)
198 && self
199 .resolve_messenger_id(&message.get_author())
200 .map_or(true, |id| id != broker.messenger_id)
201 && (broker.filter)(&message)
202 })
203 .map(|broker| broker.messenger_id)
204 .collect();
205
206 let mut return_path: Vec<Beacon> = broker_ids
207 .iter()
208 .map(|broker_id| {
209 self.beacons.get(broker_id).expect("beacon should resolve").clone()
210 })
211 .collect();
212
213 return_path.extend(source_return_path.iter().cloned());
216 let last_index = return_path.len() - 1;
217
218 if self.is_broker(sender_id) && !source_return_path_messenger_ids.contains(&sender_id) {
219 let mut candidate_index = self
222 .brokers
223 .iter()
224 .position(|broker| broker.messenger_id == sender_id)
225 .expect("broker should be found")
226 + 1;
227
228 while candidate_index < self.brokers.len() && target_index.is_none() {
231 target_index = broker_ids.iter().position(|broker_id| {
232 *broker_id == self.brokers[candidate_index].messenger_id
233 && !source_return_path_messenger_ids.contains(broker_id)
234 });
235
236 candidate_index += 1;
237 }
238
239 if target_index.is_none() {
241 target_index = Some(broker_ids.len());
242 }
243 } else if sender_id == message.get_return_path()[0].get_messenger_id()
244 && !matches!(message.get_attribution(), Attribution::Derived(..))
245 {
246 target_index = Some(0);
249
250 source.report_status(Status::Received).await;
255 } else {
256 for (index, beacon) in return_path.iter().enumerate().take(last_index) {
257 if beacon.get_messenger_id() == sender_id {
258 target_index = Some(index + 1);
259 }
260 }
261 }
262
263 if let Some(index) = target_index {
264 recipients.push(return_path.swap_remove(index));
265
266 if index == last_index {
267 message.report_status(Status::Received).await;
269 }
270 }
271 } else if let Some(beacon) = self.beacons.get(&sender_id) {
272 let author_id = self
273 .resolve_messenger_id(&message.get_author())
274 .expect("messenger should be present");
275
276 let mut target_messengers: Vec<_> = {
280 let iter = self
282 .brokers
283 .iter()
284 .filter(|&broker| broker.messenger_id != author_id && (broker.filter)(&message))
285 .map(|broker| broker.messenger_id);
286
287 let should_find_sender = {
288 let beacon_messenger_id = beacon.get_messenger_id();
289 beacon_messenger_id != author_id && self.is_broker(beacon_messenger_id)
290 };
291
292 if should_find_sender {
293 iter.skip_while(|&id| id != sender_id).skip(1).take(1).collect()
295 } else {
296 iter.take(1).collect()
297 }
298 };
299
300 if target_messengers.is_empty() {
302 if let MessageType::Origin(audience) = message_type {
303 if let Ok((resolved_messengers, delivery_required)) =
304 self.resolve_audience(sender_id, audience)
305 {
306 target_messengers.append(&mut Vec::from_iter(resolved_messengers));
307 require_delivery |= delivery_required;
308 } else {
309 message.report_status(Status::Undeliverable).await;
312 }
313
314 if let Audience::Broadcast = audience {
315 message.report_status(Status::Broadcasted).await;
317 }
318 }
319 }
320
321 for messenger in target_messengers {
323 if let Some(beacon) = self.beacons.get(&messenger) {
324 recipients.push(beacon.clone());
325 }
326 }
327 }
328
329 let mut successful_delivery = None;
330 for recipient in recipients {
332 if recipient.deliver(message.clone(), self.next_message_client_id).await.is_ok() {
333 self.next_message_client_id += 1;
334 if successful_delivery.is_none() {
335 successful_delivery = Some(true);
336 }
337 } else {
338 successful_delivery = Some(false);
339 }
340 }
341
342 if require_delivery {
343 message
344 .report_status(if let Some(true) = successful_delivery {
345 Status::Received
346 } else {
347 Status::Undeliverable
348 })
349 .await;
350 }
351 }
352
353 fn resolve_audience(
359 &self,
360 sender_id: MessengerId,
361 audience: &Audience,
362 ) -> Result<(HashSet<MessengerId>, bool), anyhow::Error> {
363 let mut return_set = HashSet::new();
364 let mut delivery_required = false;
365
366 match audience {
367 Audience::Address(address) => {
368 delivery_required = true;
369 if let Some(&messenger_id) = self.addresses.get(address) {
370 let _ = return_set.insert(messenger_id);
371 } else {
372 return Err(format_err!("could not resolve address"));
373 }
374 }
375 Audience::EventSink => {
376 return_set.extend(self.sinks.iter());
377 }
378 Audience::Messenger(signature) => {
379 delivery_required = true;
380 match signature {
381 Signature::Address(address) => {
382 if let Some(&messenger_id) = self.addresses.get(address) {
383 let _ = return_set.insert(messenger_id);
384 } else {
385 return Err(format_err!("could not resolve signature"));
386 }
387 }
388 Signature::Anonymous(id) => {
389 let _ = return_set.insert(*id);
390 }
391 }
392 }
393 Audience::Broadcast => {
394 for &id in self.beacons.keys() {
396 if id != sender_id && !self.is_broker(id) {
397 let _ = return_set.insert(id);
398 }
399 }
400 }
401 }
402
403 Ok((return_set, delivery_required))
404 }
405
406 async fn process_messenger_request(&mut self, id: ftrace::Id, action: MessengerAction) {
407 match action {
408 MessengerAction::Create(messenger_descriptor, responder, messenger_tx) => {
409 trace!(
410 id,
411 c"process messenger request create",
412 "messenger_type" => format!("{:?}", messenger_descriptor.messenger_type).as_str()
413 );
414
415 let mut optional_address = None;
416 if let MessengerType::Addressable(address) = messenger_descriptor.messenger_type {
417 if self.addresses.contains_key(&address) {
418 let _ = responder.send(Err(MessageError::AddressConflict { address }));
421 return;
422 }
423 optional_address = Some(address);
424 }
425
426 let id = self.next_id;
427 let signature = if let Some(address) = optional_address {
428 Signature::Address(address)
429 } else {
430 Signature::Anonymous(id)
431 };
432
433 let messenger =
434 Messenger::new(Fingerprint { id, signature }, self.action_tx.clone());
435
436 let fuse = ActionFuse::create(Box::new(move || {
438 if messenger_tx.is_closed() {
440 return;
441 }
442
443 messenger_tx
445 .unbounded_send(MessengerAction::DeleteBySignature(signature))
446 .unwrap_or_else(|_| {
447 log::warn!(
448 "messenger_tx failed to send delete action for signature: {:?}",
449 signature
450 )
451 });
452 }));
453
454 self.next_id += 1;
455 let (beacon, receptor) =
456 BeaconBuilder::new(messenger.clone()).add_fuse(Rc::clone(&fuse)).build();
457 let _ = self.beacons.insert(id, beacon);
458
459 match messenger_descriptor.messenger_type {
460 MessengerType::Broker(filter) => {
461 self.brokers.push(Broker { messenger_id: id, filter });
462 }
463 MessengerType::Addressable(address) => {
464 let _ = self.addresses.insert(address, id);
465 }
466 #[cfg(test)]
467 MessengerType::EventSink => {
468 let _ = self.sinks.insert(id);
469 }
470 MessengerType::Unbound => {
471 }
473 }
474
475 let response_result =
476 responder.send(Ok((MessengerClient::new(messenger, fuse), receptor)));
477 #[allow(clippy::redundant_pattern_matching)]
478 if let Err(_) = response_result {
479 log::warn!(
480 "Receiving end of oneshot closed while trying to create messenger client \
481 for client with id: {}",
482 id,
483 );
484 }
485 }
486 #[cfg(test)]
487 MessengerAction::CheckPresence(signature, responder) => {
488 trace!(id, c"process messenger request check presence");
489 let _ = responder.send(Ok(self.resolve_messenger_id(&signature).is_ok()));
490 }
491 MessengerAction::DeleteBySignature(signature) => {
492 trace!(id, c"process messenger request delete");
493 self.delete_by_signature(signature)
494 }
495 }
496 }
497
498 fn delete_by_signature(&mut self, signature: Signature) {
499 let id = self.resolve_messenger_id(&signature).expect("messenger should be present");
500
501 let _ = self.sinks.remove(&id);
503
504 let _ = self.beacons.remove(&id);
506 self.brokers.retain(|broker| id != broker.messenger_id);
507 if let Signature::Address(address) = signature {
508 let _ = self.addresses.remove(&address);
509 }
510
511 self.check_exit();
512 }
513
514 async fn process_request(
516 &mut self,
517 id: ftrace::Id,
518 fingerprint: Fingerprint,
519 action: MessageAction,
520 beacon: Option<Beacon>,
521 ) {
522 let (mut outgoing_message, _guard) = match action {
523 MessageAction::Send(payload, message_type) => {
524 let guard = trace_guard!(
525 id,
526 c"process request send",
527 "payload" => format!("{payload:?}").as_str()
528 );
529 (Message::new(fingerprint, payload, message_type), guard)
530 }
531 MessageAction::Forward(forwarded_message) => {
532 let guard = trace_guard!(id, c"process request forward");
533 if let Some(beacon) = self.beacons.get(&fingerprint.id) {
534 match forwarded_message.get_type() {
535 MessageType::Origin(audience) => {
536 if Audience::Messenger(fingerprint.signature) == *audience {
538 return;
539 }
540 if !self.is_broker(beacon.get_messenger_id()) {
542 return;
543 }
544 }
545 MessageType::Reply(source) => {
546 if let Some(recipient) = source.get_return_path().last() {
547 if recipient.get_messenger_id() == fingerprint.id {
549 return;
550 }
551 } else {
552 forwarded_message.report_status(Status::Undeliverable).await;
554 return;
555 }
556 }
557 }
558 } else {
559 forwarded_message.report_status(Status::Undeliverable).await;
560 return;
561 }
562 (forwarded_message, guard)
563 }
564 };
565
566 if let Some(handle) = beacon {
567 outgoing_message.add_participant(handle);
568 }
569
570 self.send_to_next(id, fingerprint.id, outgoing_message).await;
571 }
572}