fuchsia_async/runtime/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(all(not(target_os = "fuchsia"), not(target_arch = "wasm32")))]
11mod portable;
12#[cfg(all(not(target_os = "fuchsia"), not(target_arch = "wasm32")))]
13use self::portable as implementation;
14
15#[cfg(all(not(target_os = "fuchsia"), target_arch = "wasm32"))]
16mod stub;
17#[cfg(all(not(target_os = "fuchsia"), target_arch = "wasm32"))]
18use self::stub as implementation;
19
20// Exports common to all target os.
21pub use implementation::executor::{
22    LocalExecutor, MonotonicDuration, MonotonicInstant, SendExecutor, SpawnableFuture, TestExecutor,
23};
24pub use implementation::task::{unblock, JoinHandle, Task};
25pub use implementation::timer::Timer;
26
27#[cfg(not(target_arch = "wasm32"))]
28mod task_group;
29#[cfg(not(target_arch = "wasm32"))]
30pub use task_group::*;
31
32// Fuchsia specific exports.
33#[cfg(target_os = "fuchsia")]
34pub use self::fuchsia::{
35    executor::{BootInstant, EHandle, PacketReceiver, ReceiverRegistration},
36    timer::Interval,
37};
38
39/// Structured concurrency API for fuchsia-async.
40///
41/// See the [`Scope`] documentation for details.
42pub mod scope {
43    pub use super::implementation::scope::{Scope, ScopeHandle};
44
45    #[cfg(target_os = "fuchsia")]
46    pub use super::implementation::scope::{Join, ScopeStream, Spawnable};
47}
48
49pub use scope::{Scope, ScopeHandle};
50
51use futures::prelude::*;
52use pin_project_lite::pin_project;
53use std::pin::Pin;
54use std::task::{ready, Context, Poll};
55
56/// An extension trait to provide `after_now` on `zx::MonotonicDuration`.
57pub trait DurationExt {
58    /// Return a `MonotonicInstant` which is a `MonotonicDuration` after the current time.
59    /// `duration.after_now()` is equivalent to `MonotonicInstant::after(duration)`.
60    ///
61    /// This method requires that an executor has been set up.
62    fn after_now(self) -> MonotonicInstant;
63}
64
65/// The time when a Timer should wakeup.
66pub trait WakeupTime {
67    /// Create a timer based on this time.
68    ///
69    /// This is allowed to be inaccurate, but the inaccuracy must make the wakeup time later,
70    /// never earlier.
71    fn into_timer(self) -> Timer;
72}
73
74#[cfg(target_os = "fuchsia")]
75impl WakeupTime for std::time::Duration {
76    fn into_timer(self) -> Timer {
77        EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
78    }
79}
80
81#[cfg(not(target_os = "fuchsia"))]
82impl WakeupTime for std::time::Duration {
83    fn into_timer(self) -> Timer {
84        Timer::from(self)
85    }
86}
87
88#[cfg(target_os = "fuchsia")]
89impl WakeupTime for MonotonicDuration {
90    fn into_timer(self) -> Timer {
91        EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
92    }
93}
94
95#[cfg(target_os = "fuchsia")]
96impl WakeupTime for zx::BootDuration {
97    fn into_timer(self) -> Timer {
98        EHandle::local().boot_timers().new_timer(BootInstant::after(self))
99    }
100}
101
102impl DurationExt for std::time::Duration {
103    fn after_now(self) -> MonotonicInstant {
104        MonotonicInstant::now() + self.into()
105    }
106}
107
108/// A trait which allows futures to be easily wrapped in a timeout.
109pub trait TimeoutExt: Future + Sized {
110    /// Wraps the future in a timeout, calling `on_timeout` to produce a result
111    /// when the timeout occurs.
112    fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
113    where
114        WT: WakeupTime,
115        OT: FnOnce() -> Self::Output,
116    {
117        OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
118    }
119
120    /// Wraps the future in a stall-guard, calling `on_stalled` to produce a result
121    /// when the future hasn't been otherwise polled within the `timeout`.
122    /// This is a heuristic - spurious wakeups will keep the detection from triggering,
123    /// and moving all work to external tasks or threads with force the triggering early.
124    fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
125    where
126        OS: FnOnce() -> Self::Output,
127    {
128        OnStalled {
129            timer: timeout.into_timer(),
130            future: self,
131            timeout,
132            on_stalled: Some(on_stalled),
133        }
134    }
135}
136
137impl<F: Future + Sized> TimeoutExt for F {}
138
139pin_project! {
140    /// A wrapper for a future which will complete with a provided closure when a timeout occurs.
141    #[derive(Debug)]
142    #[must_use = "futures do nothing unless polled"]
143    pub struct OnTimeout<F, OT> {
144        #[pin]
145        timer: Timer,
146        #[pin]
147        future: F,
148        on_timeout: Option<OT>,
149    }
150}
151
152impl<F: Future, OT> Future for OnTimeout<F, OT>
153where
154    OT: FnOnce() -> F::Output,
155{
156    type Output = F::Output;
157
158    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159        let this = self.project();
160        if let Poll::Ready(item) = this.future.poll(cx) {
161            return Poll::Ready(item);
162        }
163        if let Poll::Ready(()) = this.timer.poll(cx) {
164            let ot = this.on_timeout.take().expect("polled withtimeout after completion");
165            let item = (ot)();
166            return Poll::Ready(item);
167        }
168        Poll::Pending
169    }
170}
171
172pin_project! {
173    /// A wrapper for a future who's steady progress is monitored and will complete with the
174    /// provided closure if no progress is made before the timeout.
175    #[derive(Debug)]
176    #[must_use = "futures do nothing unless polled"]
177    pub struct OnStalled<F, OS> {
178        #[pin]
179        timer: Timer,
180        #[pin]
181        future: F,
182        timeout: std::time::Duration,
183        on_stalled: Option<OS>,
184    }
185}
186
187impl<F: Future, OS> Future for OnStalled<F, OS>
188where
189    OS: FnOnce() -> F::Output,
190{
191    type Output = F::Output;
192
193    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
194        let mut this = self.project();
195        if let Poll::Ready(item) = this.future.poll(cx) {
196            return Poll::Ready(item);
197        }
198        match this.timer.as_mut().poll(cx) {
199            Poll::Ready(()) => {}
200            Poll::Pending => {
201                this.timer.set(this.timeout.into_timer());
202                ready!(this.timer.as_mut().poll(cx));
203            }
204        }
205        Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
206    }
207}
208
209#[cfg(test)]
210mod task_tests {
211
212    use super::*;
213    use futures::channel::oneshot;
214
215    fn run(f: impl Send + 'static + Future<Output = ()>) {
216        const TEST_THREADS: u8 = 2;
217        SendExecutor::new(TEST_THREADS).run(f)
218    }
219
220    #[test]
221    fn can_detach() {
222        run(async move {
223            let (tx_started, rx_started) = oneshot::channel();
224            let (tx_continue, rx_continue) = oneshot::channel();
225            let (tx_done, rx_done) = oneshot::channel();
226            {
227                // spawn a task and detach it
228                // the task will wait for a signal, signal it received it, and then wait for another
229                Task::spawn(async move {
230                    tx_started.send(()).unwrap();
231                    rx_continue.await.unwrap();
232                    tx_done.send(()).unwrap();
233                })
234                .detach();
235            }
236            // task is detached, have a short conversation with it
237            rx_started.await.unwrap();
238            tx_continue.send(()).unwrap();
239            rx_done.await.unwrap();
240        });
241    }
242
243    #[test]
244    fn can_join() {
245        // can we spawn, then join a task
246        run(async move {
247            assert_eq!(42, Task::spawn(async move { 42u8 }).await);
248        })
249    }
250
251    #[test]
252    fn can_join_unblock() {
253        // can we poll a blocked task
254        run(async move {
255            assert_eq!(42, unblock(|| 42u8).await);
256        })
257    }
258
259    #[test]
260    fn can_join_unblock_local() {
261        // can we poll a blocked task in a local executor
262        LocalExecutor::new().run_singlethreaded(async move {
263            assert_eq!(42, unblock(|| 42u8).await);
264        });
265    }
266
267    #[test]
268    #[should_panic]
269    // TODO(https://fxbug.dev/42169733): delete the below
270    #[cfg_attr(feature = "variant_asan", ignore)]
271    fn unblock_fn_panics() {
272        run(async move {
273            unblock(|| panic!("bad")).await;
274        })
275    }
276
277    #[test]
278    fn can_join_local() {
279        // can we spawn, then join a task locally
280        LocalExecutor::new().run_singlethreaded(async move {
281            assert_eq!(42, Task::local(async move { 42u8 }).await);
282        })
283    }
284
285    #[test]
286    fn can_cancel() {
287        run(async move {
288            let (_tx_start, rx_start) = oneshot::channel::<()>();
289            let (tx_done, rx_done) = oneshot::channel();
290            // Start and immediately cancel the task (by dropping it).
291            let _ = Task::spawn(async move {
292                rx_start.await.unwrap();
293                tx_done.send(()).unwrap();
294            });
295            // we should see an error on receive
296            rx_done.await.expect_err("done should not be sent");
297        })
298    }
299}
300
301#[cfg(test)]
302mod timer_tests {
303    use super::*;
304    use futures::future::Either;
305    use std::pin::pin;
306
307    #[test]
308    fn shorter_fires_first_instant() {
309        use std::time::{Duration, Instant};
310        let mut exec = LocalExecutor::new();
311        let now = Instant::now();
312        let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
313        let longer = pin!(Timer::new(now + Duration::from_secs(1)));
314        match exec.run_singlethreaded(future::select(shorter, longer)) {
315            Either::Left((_, _)) => {}
316            Either::Right((_, _)) => panic!("wrong timer fired"),
317        }
318    }
319
320    #[cfg(target_os = "fuchsia")]
321    #[test]
322    fn can_use_zx_duration() {
323        let mut exec = LocalExecutor::new();
324        let start = MonotonicInstant::now();
325        let timer = Timer::new(MonotonicDuration::from_millis(100));
326        exec.run_singlethreaded(timer);
327        let end = MonotonicInstant::now();
328        assert!(end - start > MonotonicDuration::from_millis(100));
329    }
330
331    #[test]
332    fn can_detect_stalls() {
333        use std::sync::atomic::{AtomicU64, Ordering};
334        use std::sync::Arc;
335        use std::time::Duration;
336        let runs = Arc::new(AtomicU64::new(0));
337        assert_eq!(
338            {
339                let runs = runs.clone();
340                LocalExecutor::new().run_singlethreaded(
341                    async move {
342                        let mut sleep = Duration::from_millis(1);
343                        loop {
344                            Timer::new(sleep).await;
345                            sleep *= 2;
346                            runs.fetch_add(1, Ordering::SeqCst);
347                        }
348                    }
349                    .on_stalled(Duration::from_secs(1), || 1u8),
350                )
351            },
352            1u8
353        );
354        assert!(runs.load(Ordering::SeqCst) >= 9);
355    }
356}