fuchsia_async/runtime/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(not(target_os = "fuchsia"))]
11mod portable;
12#[cfg(not(target_os = "fuchsia"))]
13use self::portable as implementation;
14
15// Exports common to all target os.
16pub use implementation::executor::{
17    LocalExecutor, MonotonicDuration, MonotonicInstant, SendExecutor, SpawnableFuture, TestExecutor,
18};
19pub use implementation::task::{unblock, yield_now, JoinHandle, Task};
20pub use implementation::timer::Timer;
21
22mod task_group;
23pub use task_group::*;
24
25// Fuchsia specific exports.
26#[cfg(target_os = "fuchsia")]
27pub use self::fuchsia::{
28    executor::{
29        BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
30    },
31    timer::Interval,
32};
33
34/// Structured concurrency API for fuchsia-async.
35///
36/// See the [`Scope`] documentation for details.
37pub mod scope;
38
39pub use scope::{Scope, ScopeHandle};
40
41use futures::prelude::*;
42use pin_project_lite::pin_project;
43use std::pin::Pin;
44use std::task::{ready, Context, Poll};
45
46/// An extension trait to provide `after_now` on `zx::MonotonicDuration`.
47pub trait DurationExt {
48    /// Return a `MonotonicInstant` which is a `MonotonicDuration` after the current time.
49    /// `duration.after_now()` is equivalent to `MonotonicInstant::after(duration)`.
50    ///
51    /// This method requires that an executor has been set up.
52    fn after_now(self) -> MonotonicInstant;
53}
54
55/// The time when a Timer should wakeup.
56pub trait WakeupTime {
57    /// Create a timer based on this time.
58    ///
59    /// This is allowed to be inaccurate, but the inaccuracy must make the wakeup time later,
60    /// never earlier.
61    fn into_timer(self) -> Timer;
62}
63
64#[cfg(target_os = "fuchsia")]
65impl WakeupTime for std::time::Duration {
66    fn into_timer(self) -> Timer {
67        EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
68    }
69}
70
71#[cfg(not(target_os = "fuchsia"))]
72impl WakeupTime for std::time::Duration {
73    fn into_timer(self) -> Timer {
74        Timer::from(self)
75    }
76}
77
78#[cfg(target_os = "fuchsia")]
79impl WakeupTime for MonotonicDuration {
80    fn into_timer(self) -> Timer {
81        EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
82    }
83}
84
85#[cfg(target_os = "fuchsia")]
86impl WakeupTime for zx::BootDuration {
87    fn into_timer(self) -> Timer {
88        EHandle::local().boot_timers().new_timer(BootInstant::after(self))
89    }
90}
91
92impl DurationExt for std::time::Duration {
93    #[allow(clippy::useless_conversion)] // Conversion is optionally needed on Fuchsia.
94    fn after_now(self) -> MonotonicInstant {
95        MonotonicInstant::now() + self.into()
96    }
97}
98
99/// A trait which allows futures to be easily wrapped in a timeout.
100pub trait TimeoutExt: Future + Sized {
101    /// Wraps the future in a timeout, calling `on_timeout` to produce a result
102    /// when the timeout occurs.
103    fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
104    where
105        WT: WakeupTime,
106        OT: FnOnce() -> Self::Output,
107    {
108        OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
109    }
110
111    /// Wraps the future in a stall-guard, calling `on_stalled` to produce a result
112    /// when the future hasn't been otherwise polled within the `timeout`.
113    /// This is a heuristic - spurious wakeups will keep the detection from triggering,
114    /// and moving all work to external tasks or threads with force the triggering early.
115    fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
116    where
117        OS: FnOnce() -> Self::Output,
118    {
119        OnStalled {
120            timer: timeout.into_timer(),
121            future: self,
122            timeout,
123            on_stalled: Some(on_stalled),
124        }
125    }
126}
127
128impl<F: Future + Sized> TimeoutExt for F {}
129
130pin_project! {
131    /// A wrapper for a future which will complete with a provided closure when a timeout occurs.
132    #[derive(Debug)]
133    #[must_use = "futures do nothing unless polled"]
134    pub struct OnTimeout<F, OT> {
135        #[pin]
136        timer: Timer,
137        #[pin]
138        future: F,
139        on_timeout: Option<OT>,
140    }
141}
142
143impl<F: Future, OT> Future for OnTimeout<F, OT>
144where
145    OT: FnOnce() -> F::Output,
146{
147    type Output = F::Output;
148
149    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
150        let this = self.project();
151        if let Poll::Ready(item) = this.future.poll(cx) {
152            return Poll::Ready(item);
153        }
154        if let Poll::Ready(()) = this.timer.poll(cx) {
155            let ot = this.on_timeout.take().expect("polled withtimeout after completion");
156            let item = (ot)();
157            return Poll::Ready(item);
158        }
159        Poll::Pending
160    }
161}
162
163pin_project! {
164    /// A wrapper for a future who's steady progress is monitored and will complete with the
165    /// provided closure if no progress is made before the timeout.
166    #[derive(Debug)]
167    #[must_use = "futures do nothing unless polled"]
168    pub struct OnStalled<F, OS> {
169        #[pin]
170        timer: Timer,
171        #[pin]
172        future: F,
173        timeout: std::time::Duration,
174        on_stalled: Option<OS>,
175    }
176}
177
178impl<F: Future, OS> Future for OnStalled<F, OS>
179where
180    OS: FnOnce() -> F::Output,
181{
182    type Output = F::Output;
183
184    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
185        let mut this = self.project();
186        if let Poll::Ready(item) = this.future.poll(cx) {
187            return Poll::Ready(item);
188        }
189        match this.timer.as_mut().poll(cx) {
190            Poll::Ready(()) => {}
191            Poll::Pending => {
192                this.timer.set(this.timeout.into_timer());
193                ready!(this.timer.as_mut().poll(cx));
194            }
195        }
196        Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
197    }
198}
199
200#[cfg(test)]
201mod task_tests {
202
203    use super::*;
204    use futures::channel::oneshot;
205
206    fn run(f: impl Send + 'static + Future<Output = ()>) {
207        const TEST_THREADS: u8 = 2;
208        SendExecutor::new(TEST_THREADS).run(f)
209    }
210
211    #[test]
212    fn can_detach() {
213        run(async move {
214            let (tx_started, rx_started) = oneshot::channel();
215            let (tx_continue, rx_continue) = oneshot::channel();
216            let (tx_done, rx_done) = oneshot::channel();
217            {
218                // spawn a task and detach it
219                // the task will wait for a signal, signal it received it, and then wait for another
220                Task::spawn(async move {
221                    tx_started.send(()).unwrap();
222                    rx_continue.await.unwrap();
223                    tx_done.send(()).unwrap();
224                })
225                .detach();
226            }
227            // task is detached, have a short conversation with it
228            rx_started.await.unwrap();
229            tx_continue.send(()).unwrap();
230            rx_done.await.unwrap();
231        });
232    }
233
234    #[test]
235    fn can_join() {
236        // can we spawn, then join a task
237        run(async move {
238            assert_eq!(42, Task::spawn(async move { 42u8 }).await);
239        })
240    }
241
242    #[test]
243    fn can_join_unblock() {
244        // can we poll a blocked task
245        run(async move {
246            assert_eq!(42, unblock(|| 42u8).await);
247        })
248    }
249
250    #[test]
251    fn can_join_unblock_local() {
252        // can we poll a blocked task in a local executor
253        LocalExecutor::new().run_singlethreaded(async move {
254            assert_eq!(42, unblock(|| 42u8).await);
255        });
256    }
257
258    #[test]
259    #[should_panic]
260    // TODO(https://fxbug.dev/42169733): delete the below
261    #[cfg_attr(feature = "variant_asan", ignore)]
262    fn unblock_fn_panics() {
263        run(async move {
264            unblock(|| panic!("bad")).await;
265        })
266    }
267
268    #[test]
269    fn can_join_local() {
270        // can we spawn, then join a task locally
271        LocalExecutor::new().run_singlethreaded(async move {
272            assert_eq!(42, Task::local(async move { 42u8 }).await);
273        })
274    }
275
276    #[test]
277    fn can_cancel() {
278        run(async move {
279            let (_tx_start, rx_start) = oneshot::channel::<()>();
280            let (tx_done, rx_done) = oneshot::channel();
281            // Start and immediately cancel the task (by dropping it).
282            drop(Task::spawn(async move {
283                rx_start.await.unwrap();
284                tx_done.send(()).unwrap();
285            }));
286            // we should see an error on receive
287            rx_done.await.expect_err("done should not be sent");
288        })
289    }
290}
291
292#[cfg(test)]
293mod timer_tests {
294    use super::*;
295    use futures::future::Either;
296    use std::pin::pin;
297
298    #[test]
299    fn shorter_fires_first_instant() {
300        use std::time::{Duration, Instant};
301        let mut exec = LocalExecutor::new();
302        let now = Instant::now();
303        let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
304        let longer = pin!(Timer::new(now + Duration::from_secs(1)));
305        match exec.run_singlethreaded(future::select(shorter, longer)) {
306            Either::Left((_, _)) => {}
307            Either::Right((_, _)) => panic!("wrong timer fired"),
308        }
309    }
310
311    #[cfg(target_os = "fuchsia")]
312    #[test]
313    fn can_use_zx_duration() {
314        let mut exec = LocalExecutor::new();
315        let start = MonotonicInstant::now();
316        let timer = Timer::new(MonotonicDuration::from_millis(100));
317        exec.run_singlethreaded(timer);
318        let end = MonotonicInstant::now();
319        assert!(end - start > MonotonicDuration::from_millis(100));
320    }
321
322    #[test]
323    fn can_detect_stalls() {
324        use std::sync::atomic::{AtomicU64, Ordering};
325        use std::sync::Arc;
326        use std::time::Duration;
327        let runs = Arc::new(AtomicU64::new(0));
328        assert_eq!(
329            {
330                let runs = runs.clone();
331                LocalExecutor::new().run_singlethreaded(
332                    async move {
333                        let mut sleep = Duration::from_millis(1);
334                        loop {
335                            Timer::new(sleep).await;
336                            sleep *= 2;
337                            runs.fetch_add(1, Ordering::SeqCst);
338                        }
339                    }
340                    .on_stalled(Duration::from_secs(1), || 1u8),
341                )
342            },
343            1u8
344        );
345        assert!(runs.load(Ordering::SeqCst) >= 9);
346    }
347}