fuchsia_async/runtime/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(not(target_os = "fuchsia"))]
11mod portable;
12#[cfg(not(target_os = "fuchsia"))]
13use self::portable as implementation;
14
15// Exports common to all target os.
16pub use implementation::executor::{
17    LocalExecutor, MonotonicDuration, MonotonicInstant, SendExecutor, SpawnableFuture, TestExecutor,
18};
19pub use implementation::task::{unblock, JoinHandle, Task};
20pub use implementation::timer::Timer;
21
22mod task_group;
23pub use task_group::*;
24
25// Fuchsia specific exports.
26#[cfg(target_os = "fuchsia")]
27pub use self::fuchsia::{
28    executor::{BootInstant, EHandle, PacketReceiver, ReceiverRegistration},
29    timer::Interval,
30};
31
32/// Structured concurrency API for fuchsia-async.
33///
34/// See the [`Scope`] documentation for details.
35pub mod scope {
36    pub use super::implementation::scope::{Scope, ScopeHandle};
37
38    #[cfg(target_os = "fuchsia")]
39    pub use super::implementation::scope::{Join, ScopeActiveGuard, ScopeStream, Spawnable};
40}
41
42pub use scope::{Scope, ScopeHandle};
43
44use futures::prelude::*;
45use pin_project_lite::pin_project;
46use std::pin::Pin;
47use std::task::{ready, Context, Poll};
48
49/// An extension trait to provide `after_now` on `zx::MonotonicDuration`.
50pub trait DurationExt {
51    /// Return a `MonotonicInstant` which is a `MonotonicDuration` after the current time.
52    /// `duration.after_now()` is equivalent to `MonotonicInstant::after(duration)`.
53    ///
54    /// This method requires that an executor has been set up.
55    fn after_now(self) -> MonotonicInstant;
56}
57
58/// The time when a Timer should wakeup.
59pub trait WakeupTime {
60    /// Create a timer based on this time.
61    ///
62    /// This is allowed to be inaccurate, but the inaccuracy must make the wakeup time later,
63    /// never earlier.
64    fn into_timer(self) -> Timer;
65}
66
67#[cfg(target_os = "fuchsia")]
68impl WakeupTime for std::time::Duration {
69    fn into_timer(self) -> Timer {
70        EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
71    }
72}
73
74#[cfg(not(target_os = "fuchsia"))]
75impl WakeupTime for std::time::Duration {
76    fn into_timer(self) -> Timer {
77        Timer::from(self)
78    }
79}
80
81#[cfg(target_os = "fuchsia")]
82impl WakeupTime for MonotonicDuration {
83    fn into_timer(self) -> Timer {
84        EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
85    }
86}
87
88#[cfg(target_os = "fuchsia")]
89impl WakeupTime for zx::BootDuration {
90    fn into_timer(self) -> Timer {
91        EHandle::local().boot_timers().new_timer(BootInstant::after(self))
92    }
93}
94
95impl DurationExt for std::time::Duration {
96    #[allow(clippy::useless_conversion)] // Conversion is optionally needed on Fuchsia.
97    fn after_now(self) -> MonotonicInstant {
98        MonotonicInstant::now() + self.into()
99    }
100}
101
102/// A trait which allows futures to be easily wrapped in a timeout.
103pub trait TimeoutExt: Future + Sized {
104    /// Wraps the future in a timeout, calling `on_timeout` to produce a result
105    /// when the timeout occurs.
106    fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
107    where
108        WT: WakeupTime,
109        OT: FnOnce() -> Self::Output,
110    {
111        OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
112    }
113
114    /// Wraps the future in a stall-guard, calling `on_stalled` to produce a result
115    /// when the future hasn't been otherwise polled within the `timeout`.
116    /// This is a heuristic - spurious wakeups will keep the detection from triggering,
117    /// and moving all work to external tasks or threads with force the triggering early.
118    fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
119    where
120        OS: FnOnce() -> Self::Output,
121    {
122        OnStalled {
123            timer: timeout.into_timer(),
124            future: self,
125            timeout,
126            on_stalled: Some(on_stalled),
127        }
128    }
129}
130
131impl<F: Future + Sized> TimeoutExt for F {}
132
133pin_project! {
134    /// A wrapper for a future which will complete with a provided closure when a timeout occurs.
135    #[derive(Debug)]
136    #[must_use = "futures do nothing unless polled"]
137    pub struct OnTimeout<F, OT> {
138        #[pin]
139        timer: Timer,
140        #[pin]
141        future: F,
142        on_timeout: Option<OT>,
143    }
144}
145
146impl<F: Future, OT> Future for OnTimeout<F, OT>
147where
148    OT: FnOnce() -> F::Output,
149{
150    type Output = F::Output;
151
152    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
153        let this = self.project();
154        if let Poll::Ready(item) = this.future.poll(cx) {
155            return Poll::Ready(item);
156        }
157        if let Poll::Ready(()) = this.timer.poll(cx) {
158            let ot = this.on_timeout.take().expect("polled withtimeout after completion");
159            let item = (ot)();
160            return Poll::Ready(item);
161        }
162        Poll::Pending
163    }
164}
165
166pin_project! {
167    /// A wrapper for a future who's steady progress is monitored and will complete with the
168    /// provided closure if no progress is made before the timeout.
169    #[derive(Debug)]
170    #[must_use = "futures do nothing unless polled"]
171    pub struct OnStalled<F, OS> {
172        #[pin]
173        timer: Timer,
174        #[pin]
175        future: F,
176        timeout: std::time::Duration,
177        on_stalled: Option<OS>,
178    }
179}
180
181impl<F: Future, OS> Future for OnStalled<F, OS>
182where
183    OS: FnOnce() -> F::Output,
184{
185    type Output = F::Output;
186
187    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188        let mut this = self.project();
189        if let Poll::Ready(item) = this.future.poll(cx) {
190            return Poll::Ready(item);
191        }
192        match this.timer.as_mut().poll(cx) {
193            Poll::Ready(()) => {}
194            Poll::Pending => {
195                this.timer.set(this.timeout.into_timer());
196                ready!(this.timer.as_mut().poll(cx));
197            }
198        }
199        Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
200    }
201}
202
203#[cfg(test)]
204mod task_tests {
205
206    use super::*;
207    use futures::channel::oneshot;
208
209    fn run(f: impl Send + 'static + Future<Output = ()>) {
210        const TEST_THREADS: u8 = 2;
211        SendExecutor::new(TEST_THREADS).run(f)
212    }
213
214    #[test]
215    fn can_detach() {
216        run(async move {
217            let (tx_started, rx_started) = oneshot::channel();
218            let (tx_continue, rx_continue) = oneshot::channel();
219            let (tx_done, rx_done) = oneshot::channel();
220            {
221                // spawn a task and detach it
222                // the task will wait for a signal, signal it received it, and then wait for another
223                Task::spawn(async move {
224                    tx_started.send(()).unwrap();
225                    rx_continue.await.unwrap();
226                    tx_done.send(()).unwrap();
227                })
228                .detach();
229            }
230            // task is detached, have a short conversation with it
231            rx_started.await.unwrap();
232            tx_continue.send(()).unwrap();
233            rx_done.await.unwrap();
234        });
235    }
236
237    #[test]
238    fn can_join() {
239        // can we spawn, then join a task
240        run(async move {
241            assert_eq!(42, Task::spawn(async move { 42u8 }).await);
242        })
243    }
244
245    #[test]
246    fn can_join_unblock() {
247        // can we poll a blocked task
248        run(async move {
249            assert_eq!(42, unblock(|| 42u8).await);
250        })
251    }
252
253    #[test]
254    fn can_join_unblock_local() {
255        // can we poll a blocked task in a local executor
256        LocalExecutor::new().run_singlethreaded(async move {
257            assert_eq!(42, unblock(|| 42u8).await);
258        });
259    }
260
261    #[test]
262    #[should_panic]
263    // TODO(https://fxbug.dev/42169733): delete the below
264    #[cfg_attr(feature = "variant_asan", ignore)]
265    fn unblock_fn_panics() {
266        run(async move {
267            unblock(|| panic!("bad")).await;
268        })
269    }
270
271    #[test]
272    fn can_join_local() {
273        // can we spawn, then join a task locally
274        LocalExecutor::new().run_singlethreaded(async move {
275            assert_eq!(42, Task::local(async move { 42u8 }).await);
276        })
277    }
278
279    #[test]
280    fn can_cancel() {
281        run(async move {
282            let (_tx_start, rx_start) = oneshot::channel::<()>();
283            let (tx_done, rx_done) = oneshot::channel();
284            // Start and immediately cancel the task (by dropping it).
285            drop(Task::spawn(async move {
286                rx_start.await.unwrap();
287                tx_done.send(()).unwrap();
288            }));
289            // we should see an error on receive
290            rx_done.await.expect_err("done should not be sent");
291        })
292    }
293}
294
295#[cfg(test)]
296mod timer_tests {
297    use super::*;
298    use futures::future::Either;
299    use std::pin::pin;
300
301    #[test]
302    fn shorter_fires_first_instant() {
303        use std::time::{Duration, Instant};
304        let mut exec = LocalExecutor::new();
305        let now = Instant::now();
306        let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
307        let longer = pin!(Timer::new(now + Duration::from_secs(1)));
308        match exec.run_singlethreaded(future::select(shorter, longer)) {
309            Either::Left((_, _)) => {}
310            Either::Right((_, _)) => panic!("wrong timer fired"),
311        }
312    }
313
314    #[cfg(target_os = "fuchsia")]
315    #[test]
316    fn can_use_zx_duration() {
317        let mut exec = LocalExecutor::new();
318        let start = MonotonicInstant::now();
319        let timer = Timer::new(MonotonicDuration::from_millis(100));
320        exec.run_singlethreaded(timer);
321        let end = MonotonicInstant::now();
322        assert!(end - start > MonotonicDuration::from_millis(100));
323    }
324
325    #[test]
326    fn can_detect_stalls() {
327        use std::sync::atomic::{AtomicU64, Ordering};
328        use std::sync::Arc;
329        use std::time::Duration;
330        let runs = Arc::new(AtomicU64::new(0));
331        assert_eq!(
332            {
333                let runs = runs.clone();
334                LocalExecutor::new().run_singlethreaded(
335                    async move {
336                        let mut sleep = Duration::from_millis(1);
337                        loop {
338                            Timer::new(sleep).await;
339                            sleep *= 2;
340                            runs.fetch_add(1, Ordering::SeqCst);
341                        }
342                    }
343                    .on_stalled(Duration::from_secs(1), || 1u8),
344                )
345            },
346            1u8
347        );
348        assert!(runs.load(Ordering::SeqCst) >= 9);
349    }
350}