settings/message/
beacon.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::message::action_fuse::{ActionFuse, ActionFuseHandle};
6use crate::message::base::{Message, MessageClientId, MessageEvent, MessengerId, Status};
7use crate::message::message_client::MessageClient;
8use crate::message::messenger::Messenger;
9use crate::message::receptor::Receptor;
10use anyhow::{format_err, Error};
11use fuchsia_async::{self as fasync, DurationExt};
12use futures::channel::mpsc::UnboundedSender;
13use futures::future::{AbortHandle, Abortable, TryFutureExt};
14use futures::lock::Mutex;
15use std::rc::Rc;
16use zx::MonotonicDuration;
17
18/// Helper for creating a beacon. The builder allows chaining additional fuses
19pub struct BeaconBuilder {
20    messenger: Messenger,
21    chained_fuses: Option<ActionFuseHandle>,
22    timeout: Option<MonotonicDuration>,
23}
24
25impl BeaconBuilder {
26    pub(super) fn new(messenger: Messenger) -> Self {
27        Self { messenger, chained_fuses: None, timeout: None }
28    }
29
30    pub(super) fn add_fuse(mut self, fuse: ActionFuseHandle) -> Self {
31        self.chained_fuses = Some(fuse);
32        self
33    }
34
35    pub(super) fn set_timeout(mut self, duration: Option<MonotonicDuration>) -> Self {
36        self.timeout = duration;
37        self
38    }
39
40    pub(super) fn build(self) -> (Beacon, Receptor) {
41        Beacon::create(self.messenger, self.chained_fuses, self.timeout)
42    }
43}
44
45/// A Beacon is the conduit for sending messages to a particular Receptor. An
46/// instance may be cloned and passed around to other components. All copies of
47/// a particular Beacon share a reference to an flag that signals whether the
48/// Receptor is active, which controls whether future messages will be sent.
49///
50/// It is important to note that Beacons spawn from sending a Message. Status
51/// and other context sent through the Beacon are in relation to this original
52/// Message (either an origin or reply).
53#[derive(Clone, Debug)]
54pub struct Beacon {
55    /// A reference to the associated Messenger. This is only used when delivering
56    /// a new message to a beacon, where a MessageClient (which references both
57    /// the recipient's Messenger and the message) must be created.
58    messenger: Messenger,
59    /// The sender half of an internal channel established between the Beacon and
60    /// Receptor.
61    event_sender: UnboundedSender<MessageEvent>,
62    /// Sentinel for secondary ActionFuses
63    sentinel: Rc<Mutex<Sentinel>>,
64    /// Timeout for firing if a response payload is not delivered in time.
65    timeout_abort_client: AbortHandle,
66}
67
68impl Beacon {
69    /// Creates a Beacon, Receptor tuple. The Messenger provided as an argument
70    /// will be associated with any delivered Message for reply purposes.
71    fn create(
72        messenger: Messenger,
73        fuses: Option<ActionFuseHandle>,
74        timeout: Option<MonotonicDuration>,
75    ) -> (Beacon, Receptor) {
76        let sentinel = Rc::new(Mutex::new(Sentinel::new()));
77        let (event_tx, event_rx) = futures::channel::mpsc::unbounded::<MessageEvent>();
78        let (timeout_abort_client, timeout_abort_server) = AbortHandle::new_pair();
79        let signature = messenger.get_signature();
80        let beacon = Beacon {
81            messenger,
82            event_sender: event_tx.clone(),
83            sentinel: sentinel.clone(),
84            timeout_abort_client: timeout_abort_client.clone(),
85        };
86
87        // pass fuse to receptor to hold and set when it goes out of scope.
88        let receptor = Receptor::new(
89            signature,
90            event_rx,
91            ActionFuse::create(Box::new(move || {
92                let sentinel = sentinel.clone();
93                fasync::Task::local(async move {
94                    timeout_abort_client.abort();
95                    sentinel.lock().await.trigger().await;
96                })
97                .detach();
98            })),
99            fuses,
100        );
101
102        if let Some(duration) = timeout {
103            let abortable_timeout = Abortable::new(
104                async move {
105                    fuchsia_async::Timer::new(duration.after_now()).await;
106                    // Panic if send failed, otherwise the client cannot abort processes.
107                    event_tx
108                        .unbounded_send(MessageEvent::Status(Status::Timeout))
109                        .expect("Beacon::create, event_tx failed to send Timeout status message");
110                },
111                timeout_abort_server,
112            );
113
114            fasync::Task::local(abortable_timeout.unwrap_or_else(|_| ())).detach();
115        }
116        (beacon, receptor)
117    }
118
119    /// Sends the Status associated with the original message that spawned
120    /// this beacon.
121    pub(super) async fn status(&self, status: Status) -> Result<(), Error> {
122        if self.event_sender.unbounded_send(MessageEvent::Status(status)).is_err() {
123            return Err(format_err!("failed to deliver status"));
124        }
125
126        Ok(())
127    }
128
129    /// Delivers a response to the original message that spawned this Beacon.
130    pub(super) async fn deliver(
131        &self,
132        message: Message,
133        client_id: MessageClientId,
134    ) -> Result<(), Error> {
135        self.timeout_abort_client.abort();
136        if self
137            .event_sender
138            .unbounded_send(MessageEvent::Message(
139                message.payload().clone(),
140                MessageClient::new(client_id, message, self.messenger.clone()),
141            ))
142            .is_err()
143        {
144            return Err(format_err!("failed to deliver message"));
145        }
146
147        Ok(())
148    }
149
150    /// Adds the specified fuse to the beacon's sentinel.
151    pub(super) async fn add_fuse(&mut self, fuse: ActionFuseHandle) {
152        self.sentinel.lock().await.add_fuse(fuse);
153    }
154
155    /// Returns the identifier for the associated Messenger.
156    pub(super) fn get_messenger_id(&self) -> MessengerId {
157        self.messenger.get_id()
158    }
159}
160
161/// Sentinel gathers actions fuses from other sources and releases them
162/// on-demand.
163struct Sentinel {
164    active: bool,
165    fuses: Vec<ActionFuseHandle>,
166}
167
168impl Sentinel {
169    /// Generates a new Sentinel.
170    fn new() -> Self {
171        Self { active: true, fuses: vec![] }
172    }
173
174    /// Adds a fuse if still active.
175    fn add_fuse(&mut self, fuse: ActionFuseHandle) {
176        // In the case we're not active anymore, do not add fuse.
177        if !self.active {
178            return;
179        }
180
181        self.fuses.push(fuse);
182    }
183
184    /// Removes all pending fuses.
185    async fn trigger(&mut self) {
186        self.active = false;
187        // Clear fuses, triggering them.
188        self.fuses.clear();
189    }
190}