fuchsia_async/runtime/
mod.rs1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(not(target_os = "fuchsia"))]
11mod portable;
12#[cfg(not(target_os = "fuchsia"))]
13use self::portable as implementation;
14
15pub use implementation::executor::{
17 LocalExecutor, MonotonicDuration, MonotonicInstant, SendExecutor, SpawnableFuture, TestExecutor,
18};
19pub use implementation::task::{unblock, yield_now, JoinHandle, Task};
20pub use implementation::timer::Timer;
21
22mod task_group;
23pub use task_group::*;
24
25#[cfg(target_os = "fuchsia")]
27pub use self::fuchsia::{
28 executor::{BootInstant, EHandle, PacketReceiver, ReceiverRegistration},
29 timer::Interval,
30};
31
32pub mod scope;
36
37pub use scope::{Scope, ScopeHandle};
38
39use futures::prelude::*;
40use pin_project_lite::pin_project;
41use std::pin::Pin;
42use std::task::{ready, Context, Poll};
43
44pub trait DurationExt {
46 fn after_now(self) -> MonotonicInstant;
51}
52
53pub trait WakeupTime {
55 fn into_timer(self) -> Timer;
60}
61
62#[cfg(target_os = "fuchsia")]
63impl WakeupTime for std::time::Duration {
64 fn into_timer(self) -> Timer {
65 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
66 }
67}
68
69#[cfg(not(target_os = "fuchsia"))]
70impl WakeupTime for std::time::Duration {
71 fn into_timer(self) -> Timer {
72 Timer::from(self)
73 }
74}
75
76#[cfg(target_os = "fuchsia")]
77impl WakeupTime for MonotonicDuration {
78 fn into_timer(self) -> Timer {
79 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
80 }
81}
82
83#[cfg(target_os = "fuchsia")]
84impl WakeupTime for zx::BootDuration {
85 fn into_timer(self) -> Timer {
86 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
87 }
88}
89
90impl DurationExt for std::time::Duration {
91 #[allow(clippy::useless_conversion)] fn after_now(self) -> MonotonicInstant {
93 MonotonicInstant::now() + self.into()
94 }
95}
96
97pub trait TimeoutExt: Future + Sized {
99 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
102 where
103 WT: WakeupTime,
104 OT: FnOnce() -> Self::Output,
105 {
106 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
107 }
108
109 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
114 where
115 OS: FnOnce() -> Self::Output,
116 {
117 OnStalled {
118 timer: timeout.into_timer(),
119 future: self,
120 timeout,
121 on_stalled: Some(on_stalled),
122 }
123 }
124}
125
126impl<F: Future + Sized> TimeoutExt for F {}
127
128pin_project! {
129 #[derive(Debug)]
131 #[must_use = "futures do nothing unless polled"]
132 pub struct OnTimeout<F, OT> {
133 #[pin]
134 timer: Timer,
135 #[pin]
136 future: F,
137 on_timeout: Option<OT>,
138 }
139}
140
141impl<F: Future, OT> Future for OnTimeout<F, OT>
142where
143 OT: FnOnce() -> F::Output,
144{
145 type Output = F::Output;
146
147 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
148 let this = self.project();
149 if let Poll::Ready(item) = this.future.poll(cx) {
150 return Poll::Ready(item);
151 }
152 if let Poll::Ready(()) = this.timer.poll(cx) {
153 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
154 let item = (ot)();
155 return Poll::Ready(item);
156 }
157 Poll::Pending
158 }
159}
160
161pin_project! {
162 #[derive(Debug)]
165 #[must_use = "futures do nothing unless polled"]
166 pub struct OnStalled<F, OS> {
167 #[pin]
168 timer: Timer,
169 #[pin]
170 future: F,
171 timeout: std::time::Duration,
172 on_stalled: Option<OS>,
173 }
174}
175
176impl<F: Future, OS> Future for OnStalled<F, OS>
177where
178 OS: FnOnce() -> F::Output,
179{
180 type Output = F::Output;
181
182 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
183 let mut this = self.project();
184 if let Poll::Ready(item) = this.future.poll(cx) {
185 return Poll::Ready(item);
186 }
187 match this.timer.as_mut().poll(cx) {
188 Poll::Ready(()) => {}
189 Poll::Pending => {
190 this.timer.set(this.timeout.into_timer());
191 ready!(this.timer.as_mut().poll(cx));
192 }
193 }
194 Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
195 }
196}
197
198#[cfg(test)]
199mod task_tests {
200
201 use super::*;
202 use futures::channel::oneshot;
203
204 fn run(f: impl Send + 'static + Future<Output = ()>) {
205 const TEST_THREADS: u8 = 2;
206 SendExecutor::new(TEST_THREADS).run(f)
207 }
208
209 #[test]
210 fn can_detach() {
211 run(async move {
212 let (tx_started, rx_started) = oneshot::channel();
213 let (tx_continue, rx_continue) = oneshot::channel();
214 let (tx_done, rx_done) = oneshot::channel();
215 {
216 Task::spawn(async move {
219 tx_started.send(()).unwrap();
220 rx_continue.await.unwrap();
221 tx_done.send(()).unwrap();
222 })
223 .detach();
224 }
225 rx_started.await.unwrap();
227 tx_continue.send(()).unwrap();
228 rx_done.await.unwrap();
229 });
230 }
231
232 #[test]
233 fn can_join() {
234 run(async move {
236 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
237 })
238 }
239
240 #[test]
241 fn can_join_unblock() {
242 run(async move {
244 assert_eq!(42, unblock(|| 42u8).await);
245 })
246 }
247
248 #[test]
249 fn can_join_unblock_local() {
250 LocalExecutor::new().run_singlethreaded(async move {
252 assert_eq!(42, unblock(|| 42u8).await);
253 });
254 }
255
256 #[test]
257 #[should_panic]
258 #[cfg_attr(feature = "variant_asan", ignore)]
260 fn unblock_fn_panics() {
261 run(async move {
262 unblock(|| panic!("bad")).await;
263 })
264 }
265
266 #[test]
267 fn can_join_local() {
268 LocalExecutor::new().run_singlethreaded(async move {
270 assert_eq!(42, Task::local(async move { 42u8 }).await);
271 })
272 }
273
274 #[test]
275 fn can_cancel() {
276 run(async move {
277 let (_tx_start, rx_start) = oneshot::channel::<()>();
278 let (tx_done, rx_done) = oneshot::channel();
279 drop(Task::spawn(async move {
281 rx_start.await.unwrap();
282 tx_done.send(()).unwrap();
283 }));
284 rx_done.await.expect_err("done should not be sent");
286 })
287 }
288}
289
290#[cfg(test)]
291mod timer_tests {
292 use super::*;
293 use futures::future::Either;
294 use std::pin::pin;
295
296 #[test]
297 fn shorter_fires_first_instant() {
298 use std::time::{Duration, Instant};
299 let mut exec = LocalExecutor::new();
300 let now = Instant::now();
301 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
302 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
303 match exec.run_singlethreaded(future::select(shorter, longer)) {
304 Either::Left((_, _)) => {}
305 Either::Right((_, _)) => panic!("wrong timer fired"),
306 }
307 }
308
309 #[cfg(target_os = "fuchsia")]
310 #[test]
311 fn can_use_zx_duration() {
312 let mut exec = LocalExecutor::new();
313 let start = MonotonicInstant::now();
314 let timer = Timer::new(MonotonicDuration::from_millis(100));
315 exec.run_singlethreaded(timer);
316 let end = MonotonicInstant::now();
317 assert!(end - start > MonotonicDuration::from_millis(100));
318 }
319
320 #[test]
321 fn can_detect_stalls() {
322 use std::sync::atomic::{AtomicU64, Ordering};
323 use std::sync::Arc;
324 use std::time::Duration;
325 let runs = Arc::new(AtomicU64::new(0));
326 assert_eq!(
327 {
328 let runs = runs.clone();
329 LocalExecutor::new().run_singlethreaded(
330 async move {
331 let mut sleep = Duration::from_millis(1);
332 loop {
333 Timer::new(sleep).await;
334 sleep *= 2;
335 runs.fetch_add(1, Ordering::SeqCst);
336 }
337 }
338 .on_stalled(Duration::from_secs(1), || 1u8),
339 )
340 },
341 1u8
342 );
343 assert!(runs.load(Ordering::SeqCst) >= 9);
344 }
345}