settings/message/
beacon.rs1use crate::message::action_fuse::{ActionFuse, ActionFuseHandle};
6use crate::message::base::{Message, MessageClientId, MessageEvent, MessengerId, Status};
7use crate::message::message_client::MessageClient;
8use crate::message::messenger::Messenger;
9use crate::message::receptor::Receptor;
10use anyhow::{format_err, Error};
11use fuchsia_async::{self as fasync, DurationExt};
12use futures::channel::mpsc::UnboundedSender;
13use futures::future::{AbortHandle, Abortable, TryFutureExt};
14use futures::lock::Mutex;
15use std::rc::Rc;
16use zx::MonotonicDuration;
17
18pub struct BeaconBuilder {
20 messenger: Messenger,
21 chained_fuses: Option<ActionFuseHandle>,
22 timeout: Option<MonotonicDuration>,
23}
24
25impl BeaconBuilder {
26 pub(super) fn new(messenger: Messenger) -> Self {
27 Self { messenger, chained_fuses: None, timeout: None }
28 }
29
30 pub(super) fn add_fuse(mut self, fuse: ActionFuseHandle) -> Self {
31 self.chained_fuses = Some(fuse);
32 self
33 }
34
35 pub(super) fn set_timeout(mut self, duration: Option<MonotonicDuration>) -> Self {
36 self.timeout = duration;
37 self
38 }
39
40 pub(super) fn build(self) -> (Beacon, Receptor) {
41 Beacon::create(self.messenger, self.chained_fuses, self.timeout)
42 }
43}
44
45#[derive(Clone, Debug)]
54pub struct Beacon {
55 messenger: Messenger,
59 event_sender: UnboundedSender<MessageEvent>,
62 sentinel: Rc<Mutex<Sentinel>>,
64 timeout_abort_client: AbortHandle,
66}
67
68impl Beacon {
69 fn create(
72 messenger: Messenger,
73 fuses: Option<ActionFuseHandle>,
74 timeout: Option<MonotonicDuration>,
75 ) -> (Beacon, Receptor) {
76 let sentinel = Rc::new(Mutex::new(Sentinel::new()));
77 let (event_tx, event_rx) = futures::channel::mpsc::unbounded::<MessageEvent>();
78 let (timeout_abort_client, timeout_abort_server) = AbortHandle::new_pair();
79 let signature = messenger.get_signature();
80 let beacon = Beacon {
81 messenger,
82 event_sender: event_tx.clone(),
83 sentinel: sentinel.clone(),
84 timeout_abort_client: timeout_abort_client.clone(),
85 };
86
87 let receptor = Receptor::new(
89 signature,
90 event_rx,
91 ActionFuse::create(Box::new(move || {
92 let sentinel = sentinel.clone();
93 fasync::Task::local(async move {
94 timeout_abort_client.abort();
95 sentinel.lock().await.trigger().await;
96 })
97 .detach();
98 })),
99 fuses,
100 );
101
102 if let Some(duration) = timeout {
103 let abortable_timeout = Abortable::new(
104 async move {
105 fuchsia_async::Timer::new(duration.after_now()).await;
106 event_tx
108 .unbounded_send(MessageEvent::Status(Status::Timeout))
109 .expect("Beacon::create, event_tx failed to send Timeout status message");
110 },
111 timeout_abort_server,
112 );
113
114 fasync::Task::local(abortable_timeout.unwrap_or_else(|_| ())).detach();
115 }
116 (beacon, receptor)
117 }
118
119 pub(super) async fn status(&self, status: Status) -> Result<(), Error> {
122 if self.event_sender.unbounded_send(MessageEvent::Status(status)).is_err() {
123 return Err(format_err!("failed to deliver status"));
124 }
125
126 Ok(())
127 }
128
129 pub(super) async fn deliver(
131 &self,
132 message: Message,
133 client_id: MessageClientId,
134 ) -> Result<(), Error> {
135 self.timeout_abort_client.abort();
136 if self
137 .event_sender
138 .unbounded_send(MessageEvent::Message(
139 message.payload().clone(),
140 MessageClient::new(client_id, message, self.messenger.clone()),
141 ))
142 .is_err()
143 {
144 return Err(format_err!("failed to deliver message"));
145 }
146
147 Ok(())
148 }
149
150 pub(super) async fn add_fuse(&mut self, fuse: ActionFuseHandle) {
152 self.sentinel.lock().await.add_fuse(fuse);
153 }
154
155 pub(super) fn get_messenger_id(&self) -> MessengerId {
157 self.messenger.get_id()
158 }
159}
160
161struct Sentinel {
164 active: bool,
165 fuses: Vec<ActionFuseHandle>,
166}
167
168impl Sentinel {
169 fn new() -> Self {
171 Self { active: true, fuses: vec![] }
172 }
173
174 fn add_fuse(&mut self, fuse: ActionFuseHandle) {
176 if !self.active {
178 return;
179 }
180
181 self.fuses.push(fuse);
182 }
183
184 async fn trigger(&mut self) {
186 self.active = false;
187 self.fuses.clear();
189 }
190}