fuchsia_async/runtime/
mod.rs1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(all(not(target_os = "fuchsia"), not(target_arch = "wasm32")))]
11mod portable;
12#[cfg(all(not(target_os = "fuchsia"), not(target_arch = "wasm32")))]
13use self::portable as implementation;
14
15#[cfg(all(not(target_os = "fuchsia"), target_arch = "wasm32"))]
16mod stub;
17#[cfg(all(not(target_os = "fuchsia"), target_arch = "wasm32"))]
18use self::stub as implementation;
19
20pub use implementation::executor::{
22 LocalExecutor, MonotonicDuration, MonotonicInstant, SendExecutor, SpawnableFuture, TestExecutor,
23};
24pub use implementation::task::{unblock, JoinHandle, Task};
25pub use implementation::timer::Timer;
26
27#[cfg(not(target_arch = "wasm32"))]
28mod task_group;
29#[cfg(not(target_arch = "wasm32"))]
30pub use task_group::*;
31
32#[cfg(target_os = "fuchsia")]
34pub use self::fuchsia::{
35 executor::{BootInstant, EHandle, PacketReceiver, ReceiverRegistration},
36 timer::Interval,
37};
38
39pub mod scope {
43 pub use super::implementation::scope::{Scope, ScopeHandle};
44
45 #[cfg(target_os = "fuchsia")]
46 pub use super::implementation::scope::{Join, ScopeStream, Spawnable};
47}
48
49pub use scope::{Scope, ScopeHandle};
50
51use futures::prelude::*;
52use pin_project_lite::pin_project;
53use std::pin::Pin;
54use std::task::{ready, Context, Poll};
55
56pub trait DurationExt {
58 fn after_now(self) -> MonotonicInstant;
63}
64
65pub trait WakeupTime {
67 fn into_timer(self) -> Timer;
72}
73
74#[cfg(target_os = "fuchsia")]
75impl WakeupTime for std::time::Duration {
76 fn into_timer(self) -> Timer {
77 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
78 }
79}
80
81#[cfg(not(target_os = "fuchsia"))]
82impl WakeupTime for std::time::Duration {
83 fn into_timer(self) -> Timer {
84 Timer::from(self)
85 }
86}
87
88#[cfg(target_os = "fuchsia")]
89impl WakeupTime for MonotonicDuration {
90 fn into_timer(self) -> Timer {
91 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
92 }
93}
94
95#[cfg(target_os = "fuchsia")]
96impl WakeupTime for zx::BootDuration {
97 fn into_timer(self) -> Timer {
98 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
99 }
100}
101
102impl DurationExt for std::time::Duration {
103 fn after_now(self) -> MonotonicInstant {
104 MonotonicInstant::now() + self.into()
105 }
106}
107
108pub trait TimeoutExt: Future + Sized {
110 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
113 where
114 WT: WakeupTime,
115 OT: FnOnce() -> Self::Output,
116 {
117 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
118 }
119
120 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
125 where
126 OS: FnOnce() -> Self::Output,
127 {
128 OnStalled {
129 timer: timeout.into_timer(),
130 future: self,
131 timeout,
132 on_stalled: Some(on_stalled),
133 }
134 }
135}
136
137impl<F: Future + Sized> TimeoutExt for F {}
138
139pin_project! {
140 #[derive(Debug)]
142 #[must_use = "futures do nothing unless polled"]
143 pub struct OnTimeout<F, OT> {
144 #[pin]
145 timer: Timer,
146 #[pin]
147 future: F,
148 on_timeout: Option<OT>,
149 }
150}
151
152impl<F: Future, OT> Future for OnTimeout<F, OT>
153where
154 OT: FnOnce() -> F::Output,
155{
156 type Output = F::Output;
157
158 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
159 let this = self.project();
160 if let Poll::Ready(item) = this.future.poll(cx) {
161 return Poll::Ready(item);
162 }
163 if let Poll::Ready(()) = this.timer.poll(cx) {
164 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
165 let item = (ot)();
166 return Poll::Ready(item);
167 }
168 Poll::Pending
169 }
170}
171
172pin_project! {
173 #[derive(Debug)]
176 #[must_use = "futures do nothing unless polled"]
177 pub struct OnStalled<F, OS> {
178 #[pin]
179 timer: Timer,
180 #[pin]
181 future: F,
182 timeout: std::time::Duration,
183 on_stalled: Option<OS>,
184 }
185}
186
187impl<F: Future, OS> Future for OnStalled<F, OS>
188where
189 OS: FnOnce() -> F::Output,
190{
191 type Output = F::Output;
192
193 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
194 let mut this = self.project();
195 if let Poll::Ready(item) = this.future.poll(cx) {
196 return Poll::Ready(item);
197 }
198 match this.timer.as_mut().poll(cx) {
199 Poll::Ready(()) => {}
200 Poll::Pending => {
201 this.timer.set(this.timeout.into_timer());
202 ready!(this.timer.as_mut().poll(cx));
203 }
204 }
205 Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
206 }
207}
208
209#[cfg(test)]
210mod task_tests {
211
212 use super::*;
213 use futures::channel::oneshot;
214
215 fn run(f: impl Send + 'static + Future<Output = ()>) {
216 const TEST_THREADS: u8 = 2;
217 SendExecutor::new(TEST_THREADS).run(f)
218 }
219
220 #[test]
221 fn can_detach() {
222 run(async move {
223 let (tx_started, rx_started) = oneshot::channel();
224 let (tx_continue, rx_continue) = oneshot::channel();
225 let (tx_done, rx_done) = oneshot::channel();
226 {
227 Task::spawn(async move {
230 tx_started.send(()).unwrap();
231 rx_continue.await.unwrap();
232 tx_done.send(()).unwrap();
233 })
234 .detach();
235 }
236 rx_started.await.unwrap();
238 tx_continue.send(()).unwrap();
239 rx_done.await.unwrap();
240 });
241 }
242
243 #[test]
244 fn can_join() {
245 run(async move {
247 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
248 })
249 }
250
251 #[test]
252 fn can_join_unblock() {
253 run(async move {
255 assert_eq!(42, unblock(|| 42u8).await);
256 })
257 }
258
259 #[test]
260 fn can_join_unblock_local() {
261 LocalExecutor::new().run_singlethreaded(async move {
263 assert_eq!(42, unblock(|| 42u8).await);
264 });
265 }
266
267 #[test]
268 #[should_panic]
269 #[cfg_attr(feature = "variant_asan", ignore)]
271 fn unblock_fn_panics() {
272 run(async move {
273 unblock(|| panic!("bad")).await;
274 })
275 }
276
277 #[test]
278 fn can_join_local() {
279 LocalExecutor::new().run_singlethreaded(async move {
281 assert_eq!(42, Task::local(async move { 42u8 }).await);
282 })
283 }
284
285 #[test]
286 fn can_cancel() {
287 run(async move {
288 let (_tx_start, rx_start) = oneshot::channel::<()>();
289 let (tx_done, rx_done) = oneshot::channel();
290 let _ = Task::spawn(async move {
292 rx_start.await.unwrap();
293 tx_done.send(()).unwrap();
294 });
295 rx_done.await.expect_err("done should not be sent");
297 })
298 }
299}
300
301#[cfg(test)]
302mod timer_tests {
303 use super::*;
304 use futures::future::Either;
305 use std::pin::pin;
306
307 #[test]
308 fn shorter_fires_first_instant() {
309 use std::time::{Duration, Instant};
310 let mut exec = LocalExecutor::new();
311 let now = Instant::now();
312 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
313 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
314 match exec.run_singlethreaded(future::select(shorter, longer)) {
315 Either::Left((_, _)) => {}
316 Either::Right((_, _)) => panic!("wrong timer fired"),
317 }
318 }
319
320 #[cfg(target_os = "fuchsia")]
321 #[test]
322 fn can_use_zx_duration() {
323 let mut exec = LocalExecutor::new();
324 let start = MonotonicInstant::now();
325 let timer = Timer::new(MonotonicDuration::from_millis(100));
326 exec.run_singlethreaded(timer);
327 let end = MonotonicInstant::now();
328 assert!(end - start > MonotonicDuration::from_millis(100));
329 }
330
331 #[test]
332 fn can_detect_stalls() {
333 use std::sync::atomic::{AtomicU64, Ordering};
334 use std::sync::Arc;
335 use std::time::Duration;
336 let runs = Arc::new(AtomicU64::new(0));
337 assert_eq!(
338 {
339 let runs = runs.clone();
340 LocalExecutor::new().run_singlethreaded(
341 async move {
342 let mut sleep = Duration::from_millis(1);
343 loop {
344 Timer::new(sleep).await;
345 sleep *= 2;
346 runs.fetch_add(1, Ordering::SeqCst);
347 }
348 }
349 .on_stalled(Duration::from_secs(1), || 1u8),
350 )
351 },
352 1u8
353 );
354 assert!(runs.load(Ordering::SeqCst) >= 9);
355 }
356}