fuchsia_async/runtime/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18// Exports common to all target os.
19pub use implementation::executor::{
20    LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21    SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{unblock, yield_now, JoinHandle, Task};
24pub use implementation::timer::Timer;
25
26mod task_group;
27pub use task_group::*;
28
29// Fuchsia specific exports.
30#[cfg(target_os = "fuchsia")]
31pub use self::fuchsia::{
32    executor::{
33        BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
34    },
35    timer::Interval,
36};
37
38/// Structured concurrency API for fuchsia-async.
39///
40/// See the [`Scope`] documentation for details.
41pub mod scope;
42
43pub use scope::{Scope, ScopeHandle};
44
45use futures::prelude::*;
46use pin_project_lite::pin_project;
47use std::pin::Pin;
48use std::task::{ready, Context, Poll};
49
50/// An extension trait to provide `after_now` on `zx::MonotonicDuration`.
51pub trait DurationExt {
52    /// Return a `MonotonicInstant` which is a `MonotonicDuration` after the current time.
53    /// `duration.after_now()` is equivalent to `MonotonicInstant::after(duration)`.
54    ///
55    /// This method requires that an executor has been set up.
56    fn after_now(self) -> MonotonicInstant;
57}
58
59/// The time when a Timer should wakeup.
60pub trait WakeupTime {
61    /// Create a timer based on this time.
62    ///
63    /// This is allowed to be inaccurate, but the inaccuracy must make the wakeup time later,
64    /// never earlier.
65    fn into_timer(self) -> Timer;
66}
67
68#[cfg(target_os = "fuchsia")]
69impl WakeupTime for std::time::Duration {
70    fn into_timer(self) -> Timer {
71        EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
72    }
73}
74
75#[cfg(not(target_os = "fuchsia"))]
76impl WakeupTime for std::time::Duration {
77    fn into_timer(self) -> Timer {
78        Timer::from(self)
79    }
80}
81
82#[cfg(target_os = "fuchsia")]
83impl WakeupTime for MonotonicDuration {
84    fn into_timer(self) -> Timer {
85        EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
86    }
87}
88
89#[cfg(target_os = "fuchsia")]
90impl WakeupTime for zx::BootDuration {
91    fn into_timer(self) -> Timer {
92        EHandle::local().boot_timers().new_timer(BootInstant::after(self))
93    }
94}
95
96impl DurationExt for std::time::Duration {
97    #[allow(clippy::useless_conversion)] // Conversion is optionally needed on Fuchsia.
98    fn after_now(self) -> MonotonicInstant {
99        MonotonicInstant::now() + self.into()
100    }
101}
102
103/// A trait which allows futures to be easily wrapped in a timeout.
104pub trait TimeoutExt: Future + Sized {
105    /// Wraps the future in a timeout, calling `on_timeout` to produce a result
106    /// when the timeout occurs.
107    fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
108    where
109        WT: WakeupTime,
110        OT: FnOnce() -> Self::Output,
111    {
112        OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
113    }
114
115    /// Wraps the future in a stall-guard, calling `on_stalled` to produce a result
116    /// when the future hasn't been otherwise polled within the `timeout`.
117    /// This is a heuristic - spurious wakeups will keep the detection from triggering,
118    /// and moving all work to external tasks or threads with force the triggering early.
119    fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
120    where
121        OS: FnOnce() -> Self::Output,
122    {
123        OnStalled {
124            timer: timeout.into_timer(),
125            future: self,
126            timeout,
127            on_stalled: Some(on_stalled),
128        }
129    }
130}
131
132impl<F: Future + Sized> TimeoutExt for F {}
133
134pin_project! {
135    /// A wrapper for a future which will complete with a provided closure when a timeout occurs.
136    #[derive(Debug)]
137    #[must_use = "futures do nothing unless polled"]
138    pub struct OnTimeout<F, OT> {
139        #[pin]
140        timer: Timer,
141        #[pin]
142        future: F,
143        on_timeout: Option<OT>,
144    }
145}
146
147impl<F: Future, OT> Future for OnTimeout<F, OT>
148where
149    OT: FnOnce() -> F::Output,
150{
151    type Output = F::Output;
152
153    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154        let this = self.project();
155        if let Poll::Ready(item) = this.future.poll(cx) {
156            return Poll::Ready(item);
157        }
158        if let Poll::Ready(()) = this.timer.poll(cx) {
159            let ot = this.on_timeout.take().expect("polled withtimeout after completion");
160            let item = (ot)();
161            return Poll::Ready(item);
162        }
163        Poll::Pending
164    }
165}
166
167pin_project! {
168    /// A wrapper for a future who's steady progress is monitored and will complete with the
169    /// provided closure if no progress is made before the timeout.
170    #[derive(Debug)]
171    #[must_use = "futures do nothing unless polled"]
172    pub struct OnStalled<F, OS> {
173        #[pin]
174        timer: Timer,
175        #[pin]
176        future: F,
177        timeout: std::time::Duration,
178        on_stalled: Option<OS>,
179    }
180}
181
182impl<F: Future, OS> Future for OnStalled<F, OS>
183where
184    OS: FnOnce() -> F::Output,
185{
186    type Output = F::Output;
187
188    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189        let mut this = self.project();
190        if let Poll::Ready(item) = this.future.poll(cx) {
191            return Poll::Ready(item);
192        }
193        match this.timer.as_mut().poll(cx) {
194            Poll::Ready(()) => {}
195            Poll::Pending => {
196                this.timer.set(this.timeout.into_timer());
197                ready!(this.timer.as_mut().poll(cx));
198            }
199        }
200        Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
201    }
202}
203
204#[cfg(test)]
205mod task_tests {
206
207    use super::*;
208    use futures::channel::oneshot;
209
210    fn run(f: impl Send + 'static + Future<Output = ()>) {
211        const TEST_THREADS: u8 = 2;
212        SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
213    }
214
215    #[test]
216    fn can_detach() {
217        run(async move {
218            let (tx_started, rx_started) = oneshot::channel();
219            let (tx_continue, rx_continue) = oneshot::channel();
220            let (tx_done, rx_done) = oneshot::channel();
221            {
222                // spawn a task and detach it
223                // the task will wait for a signal, signal it received it, and then wait for another
224                Task::spawn(async move {
225                    tx_started.send(()).unwrap();
226                    rx_continue.await.unwrap();
227                    tx_done.send(()).unwrap();
228                })
229                .detach();
230            }
231            // task is detached, have a short conversation with it
232            rx_started.await.unwrap();
233            tx_continue.send(()).unwrap();
234            rx_done.await.unwrap();
235        });
236    }
237
238    #[test]
239    fn can_join() {
240        // can we spawn, then join a task
241        run(async move {
242            assert_eq!(42, Task::spawn(async move { 42u8 }).await);
243        })
244    }
245
246    #[test]
247    fn can_join_unblock() {
248        // can we poll a blocked task
249        run(async move {
250            assert_eq!(42, unblock(|| 42u8).await);
251        })
252    }
253
254    #[test]
255    fn can_join_unblock_local() {
256        // can we poll a blocked task in a local executor
257        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
258            assert_eq!(42, unblock(|| 42u8).await);
259        });
260    }
261
262    #[test]
263    #[should_panic]
264    // TODO(https://fxbug.dev/42169733): delete the below
265    #[cfg_attr(feature = "variant_asan", ignore)]
266    fn unblock_fn_panics() {
267        run(async move {
268            unblock(|| panic!("bad")).await;
269        })
270    }
271
272    #[test]
273    fn can_join_local() {
274        // can we spawn, then join a task locally
275        LocalExecutorBuilder::new().build().run_singlethreaded(async move {
276            assert_eq!(42, Task::local(async move { 42u8 }).await);
277        })
278    }
279
280    #[test]
281    fn can_cancel() {
282        run(async move {
283            let (_tx_start, rx_start) = oneshot::channel::<()>();
284            let (tx_done, rx_done) = oneshot::channel();
285            // Start and immediately cancel the task (by dropping it).
286            drop(Task::spawn(async move {
287                rx_start.await.unwrap();
288                tx_done.send(()).unwrap();
289            }));
290            // we should see an error on receive
291            rx_done.await.expect_err("done should not be sent");
292        })
293    }
294}
295
296#[cfg(test)]
297mod timer_tests {
298    use super::*;
299    use futures::future::Either;
300    use std::pin::pin;
301
302    #[test]
303    fn shorter_fires_first_instant() {
304        use std::time::{Duration, Instant};
305        let mut exec = LocalExecutorBuilder::new().build();
306        let now = Instant::now();
307        let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
308        let longer = pin!(Timer::new(now + Duration::from_secs(1)));
309        match exec.run_singlethreaded(future::select(shorter, longer)) {
310            Either::Left((_, _)) => {}
311            Either::Right((_, _)) => panic!("wrong timer fired"),
312        }
313    }
314
315    #[cfg(target_os = "fuchsia")]
316    #[test]
317    fn can_use_zx_duration() {
318        let mut exec = LocalExecutorBuilder::new().build();
319        let start = MonotonicInstant::now();
320        let timer = Timer::new(MonotonicDuration::from_millis(100));
321        exec.run_singlethreaded(timer);
322        let end = MonotonicInstant::now();
323        assert!(end - start > MonotonicDuration::from_millis(100));
324    }
325
326    #[test]
327    fn can_detect_stalls() {
328        use std::sync::atomic::{AtomicU64, Ordering};
329        use std::sync::Arc;
330        use std::time::Duration;
331        let runs = Arc::new(AtomicU64::new(0));
332        assert_eq!(
333            {
334                let runs = runs.clone();
335                LocalExecutorBuilder::new().build().run_singlethreaded(
336                    async move {
337                        let mut sleep = Duration::from_millis(1);
338                        loop {
339                            Timer::new(sleep).await;
340                            sleep *= 2;
341                            runs.fetch_add(1, Ordering::SeqCst);
342                        }
343                    }
344                    .on_stalled(Duration::from_secs(1), || 1u8),
345                )
346            },
347            1u8
348        );
349        assert!(runs.load(Ordering::SeqCst) >= 9);
350    }
351}