fuchsia_async/runtime/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(not(target_os = "fuchsia"))]
11mod portable;
12#[cfg(not(target_os = "fuchsia"))]
13use self::portable as implementation;
14
15// Exports common to all target os.
16pub use implementation::executor::{
17    LocalExecutor, MonotonicDuration, MonotonicInstant, SendExecutor, SpawnableFuture, TestExecutor,
18};
19pub use implementation::task::{unblock, yield_now, JoinHandle, Task};
20pub use implementation::timer::Timer;
21
22mod task_group;
23pub use task_group::*;
24
25// Fuchsia specific exports.
26#[cfg(target_os = "fuchsia")]
27pub use self::fuchsia::{
28    executor::{BootInstant, EHandle, PacketReceiver, ReceiverRegistration},
29    timer::Interval,
30};
31
32/// Structured concurrency API for fuchsia-async.
33///
34/// See the [`Scope`] documentation for details.
35pub mod scope;
36
37pub use scope::{Scope, ScopeHandle};
38
39use futures::prelude::*;
40use pin_project_lite::pin_project;
41use std::pin::Pin;
42use std::task::{ready, Context, Poll};
43
44/// An extension trait to provide `after_now` on `zx::MonotonicDuration`.
45pub trait DurationExt {
46    /// Return a `MonotonicInstant` which is a `MonotonicDuration` after the current time.
47    /// `duration.after_now()` is equivalent to `MonotonicInstant::after(duration)`.
48    ///
49    /// This method requires that an executor has been set up.
50    fn after_now(self) -> MonotonicInstant;
51}
52
53/// The time when a Timer should wakeup.
54pub trait WakeupTime {
55    /// Create a timer based on this time.
56    ///
57    /// This is allowed to be inaccurate, but the inaccuracy must make the wakeup time later,
58    /// never earlier.
59    fn into_timer(self) -> Timer;
60}
61
62#[cfg(target_os = "fuchsia")]
63impl WakeupTime for std::time::Duration {
64    fn into_timer(self) -> Timer {
65        EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
66    }
67}
68
69#[cfg(not(target_os = "fuchsia"))]
70impl WakeupTime for std::time::Duration {
71    fn into_timer(self) -> Timer {
72        Timer::from(self)
73    }
74}
75
76#[cfg(target_os = "fuchsia")]
77impl WakeupTime for MonotonicDuration {
78    fn into_timer(self) -> Timer {
79        EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
80    }
81}
82
83#[cfg(target_os = "fuchsia")]
84impl WakeupTime for zx::BootDuration {
85    fn into_timer(self) -> Timer {
86        EHandle::local().boot_timers().new_timer(BootInstant::after(self))
87    }
88}
89
90impl DurationExt for std::time::Duration {
91    #[allow(clippy::useless_conversion)] // Conversion is optionally needed on Fuchsia.
92    fn after_now(self) -> MonotonicInstant {
93        MonotonicInstant::now() + self.into()
94    }
95}
96
97/// A trait which allows futures to be easily wrapped in a timeout.
98pub trait TimeoutExt: Future + Sized {
99    /// Wraps the future in a timeout, calling `on_timeout` to produce a result
100    /// when the timeout occurs.
101    fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
102    where
103        WT: WakeupTime,
104        OT: FnOnce() -> Self::Output,
105    {
106        OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
107    }
108
109    /// Wraps the future in a stall-guard, calling `on_stalled` to produce a result
110    /// when the future hasn't been otherwise polled within the `timeout`.
111    /// This is a heuristic - spurious wakeups will keep the detection from triggering,
112    /// and moving all work to external tasks or threads with force the triggering early.
113    fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
114    where
115        OS: FnOnce() -> Self::Output,
116    {
117        OnStalled {
118            timer: timeout.into_timer(),
119            future: self,
120            timeout,
121            on_stalled: Some(on_stalled),
122        }
123    }
124}
125
126impl<F: Future + Sized> TimeoutExt for F {}
127
128pin_project! {
129    /// A wrapper for a future which will complete with a provided closure when a timeout occurs.
130    #[derive(Debug)]
131    #[must_use = "futures do nothing unless polled"]
132    pub struct OnTimeout<F, OT> {
133        #[pin]
134        timer: Timer,
135        #[pin]
136        future: F,
137        on_timeout: Option<OT>,
138    }
139}
140
141impl<F: Future, OT> Future for OnTimeout<F, OT>
142where
143    OT: FnOnce() -> F::Output,
144{
145    type Output = F::Output;
146
147    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
148        let this = self.project();
149        if let Poll::Ready(item) = this.future.poll(cx) {
150            return Poll::Ready(item);
151        }
152        if let Poll::Ready(()) = this.timer.poll(cx) {
153            let ot = this.on_timeout.take().expect("polled withtimeout after completion");
154            let item = (ot)();
155            return Poll::Ready(item);
156        }
157        Poll::Pending
158    }
159}
160
161pin_project! {
162    /// A wrapper for a future who's steady progress is monitored and will complete with the
163    /// provided closure if no progress is made before the timeout.
164    #[derive(Debug)]
165    #[must_use = "futures do nothing unless polled"]
166    pub struct OnStalled<F, OS> {
167        #[pin]
168        timer: Timer,
169        #[pin]
170        future: F,
171        timeout: std::time::Duration,
172        on_stalled: Option<OS>,
173    }
174}
175
176impl<F: Future, OS> Future for OnStalled<F, OS>
177where
178    OS: FnOnce() -> F::Output,
179{
180    type Output = F::Output;
181
182    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
183        let mut this = self.project();
184        if let Poll::Ready(item) = this.future.poll(cx) {
185            return Poll::Ready(item);
186        }
187        match this.timer.as_mut().poll(cx) {
188            Poll::Ready(()) => {}
189            Poll::Pending => {
190                this.timer.set(this.timeout.into_timer());
191                ready!(this.timer.as_mut().poll(cx));
192            }
193        }
194        Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
195    }
196}
197
198#[cfg(test)]
199mod task_tests {
200
201    use super::*;
202    use futures::channel::oneshot;
203
204    fn run(f: impl Send + 'static + Future<Output = ()>) {
205        const TEST_THREADS: u8 = 2;
206        SendExecutor::new(TEST_THREADS).run(f)
207    }
208
209    #[test]
210    fn can_detach() {
211        run(async move {
212            let (tx_started, rx_started) = oneshot::channel();
213            let (tx_continue, rx_continue) = oneshot::channel();
214            let (tx_done, rx_done) = oneshot::channel();
215            {
216                // spawn a task and detach it
217                // the task will wait for a signal, signal it received it, and then wait for another
218                Task::spawn(async move {
219                    tx_started.send(()).unwrap();
220                    rx_continue.await.unwrap();
221                    tx_done.send(()).unwrap();
222                })
223                .detach();
224            }
225            // task is detached, have a short conversation with it
226            rx_started.await.unwrap();
227            tx_continue.send(()).unwrap();
228            rx_done.await.unwrap();
229        });
230    }
231
232    #[test]
233    fn can_join() {
234        // can we spawn, then join a task
235        run(async move {
236            assert_eq!(42, Task::spawn(async move { 42u8 }).await);
237        })
238    }
239
240    #[test]
241    fn can_join_unblock() {
242        // can we poll a blocked task
243        run(async move {
244            assert_eq!(42, unblock(|| 42u8).await);
245        })
246    }
247
248    #[test]
249    fn can_join_unblock_local() {
250        // can we poll a blocked task in a local executor
251        LocalExecutor::new().run_singlethreaded(async move {
252            assert_eq!(42, unblock(|| 42u8).await);
253        });
254    }
255
256    #[test]
257    #[should_panic]
258    // TODO(https://fxbug.dev/42169733): delete the below
259    #[cfg_attr(feature = "variant_asan", ignore)]
260    fn unblock_fn_panics() {
261        run(async move {
262            unblock(|| panic!("bad")).await;
263        })
264    }
265
266    #[test]
267    fn can_join_local() {
268        // can we spawn, then join a task locally
269        LocalExecutor::new().run_singlethreaded(async move {
270            assert_eq!(42, Task::local(async move { 42u8 }).await);
271        })
272    }
273
274    #[test]
275    fn can_cancel() {
276        run(async move {
277            let (_tx_start, rx_start) = oneshot::channel::<()>();
278            let (tx_done, rx_done) = oneshot::channel();
279            // Start and immediately cancel the task (by dropping it).
280            drop(Task::spawn(async move {
281                rx_start.await.unwrap();
282                tx_done.send(()).unwrap();
283            }));
284            // we should see an error on receive
285            rx_done.await.expect_err("done should not be sent");
286        })
287    }
288}
289
290#[cfg(test)]
291mod timer_tests {
292    use super::*;
293    use futures::future::Either;
294    use std::pin::pin;
295
296    #[test]
297    fn shorter_fires_first_instant() {
298        use std::time::{Duration, Instant};
299        let mut exec = LocalExecutor::new();
300        let now = Instant::now();
301        let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
302        let longer = pin!(Timer::new(now + Duration::from_secs(1)));
303        match exec.run_singlethreaded(future::select(shorter, longer)) {
304            Either::Left((_, _)) => {}
305            Either::Right((_, _)) => panic!("wrong timer fired"),
306        }
307    }
308
309    #[cfg(target_os = "fuchsia")]
310    #[test]
311    fn can_use_zx_duration() {
312        let mut exec = LocalExecutor::new();
313        let start = MonotonicInstant::now();
314        let timer = Timer::new(MonotonicDuration::from_millis(100));
315        exec.run_singlethreaded(timer);
316        let end = MonotonicInstant::now();
317        assert!(end - start > MonotonicDuration::from_millis(100));
318    }
319
320    #[test]
321    fn can_detect_stalls() {
322        use std::sync::atomic::{AtomicU64, Ordering};
323        use std::sync::Arc;
324        use std::time::Duration;
325        let runs = Arc::new(AtomicU64::new(0));
326        assert_eq!(
327            {
328                let runs = runs.clone();
329                LocalExecutor::new().run_singlethreaded(
330                    async move {
331                        let mut sleep = Duration::from_millis(1);
332                        loop {
333                            Timer::new(sleep).await;
334                            sleep *= 2;
335                            runs.fetch_add(1, Ordering::SeqCst);
336                        }
337                    }
338                    .on_stalled(Duration::from_secs(1), || 1u8),
339                )
340            },
341            1u8
342        );
343        assert!(runs.load(Ordering::SeqCst) >= 9);
344    }
345}