fuchsia_async/runtime/
mod.rs1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(not(target_os = "fuchsia"))]
11mod portable;
12#[cfg(not(target_os = "fuchsia"))]
13use self::portable as implementation;
14
15pub use implementation::executor::{
17 LocalExecutor, MonotonicDuration, MonotonicInstant, SendExecutor, SpawnableFuture, TestExecutor,
18};
19pub use implementation::task::{unblock, yield_now, JoinHandle, Task};
20pub use implementation::timer::Timer;
21
22mod task_group;
23pub use task_group::*;
24
25#[cfg(target_os = "fuchsia")]
27pub use self::fuchsia::{
28 executor::{
29 BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
30 },
31 timer::Interval,
32};
33
34pub mod scope;
38
39pub use scope::{Scope, ScopeHandle};
40
41use futures::prelude::*;
42use pin_project_lite::pin_project;
43use std::pin::Pin;
44use std::task::{ready, Context, Poll};
45
46pub trait DurationExt {
48 fn after_now(self) -> MonotonicInstant;
53}
54
55pub trait WakeupTime {
57 fn into_timer(self) -> Timer;
62}
63
64#[cfg(target_os = "fuchsia")]
65impl WakeupTime for std::time::Duration {
66 fn into_timer(self) -> Timer {
67 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
68 }
69}
70
71#[cfg(not(target_os = "fuchsia"))]
72impl WakeupTime for std::time::Duration {
73 fn into_timer(self) -> Timer {
74 Timer::from(self)
75 }
76}
77
78#[cfg(target_os = "fuchsia")]
79impl WakeupTime for MonotonicDuration {
80 fn into_timer(self) -> Timer {
81 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
82 }
83}
84
85#[cfg(target_os = "fuchsia")]
86impl WakeupTime for zx::BootDuration {
87 fn into_timer(self) -> Timer {
88 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
89 }
90}
91
92impl DurationExt for std::time::Duration {
93 #[allow(clippy::useless_conversion)] fn after_now(self) -> MonotonicInstant {
95 MonotonicInstant::now() + self.into()
96 }
97}
98
99pub trait TimeoutExt: Future + Sized {
101 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
104 where
105 WT: WakeupTime,
106 OT: FnOnce() -> Self::Output,
107 {
108 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
109 }
110
111 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
116 where
117 OS: FnOnce() -> Self::Output,
118 {
119 OnStalled {
120 timer: timeout.into_timer(),
121 future: self,
122 timeout,
123 on_stalled: Some(on_stalled),
124 }
125 }
126}
127
128impl<F: Future + Sized> TimeoutExt for F {}
129
130pin_project! {
131 #[derive(Debug)]
133 #[must_use = "futures do nothing unless polled"]
134 pub struct OnTimeout<F, OT> {
135 #[pin]
136 timer: Timer,
137 #[pin]
138 future: F,
139 on_timeout: Option<OT>,
140 }
141}
142
143impl<F: Future, OT> Future for OnTimeout<F, OT>
144where
145 OT: FnOnce() -> F::Output,
146{
147 type Output = F::Output;
148
149 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
150 let this = self.project();
151 if let Poll::Ready(item) = this.future.poll(cx) {
152 return Poll::Ready(item);
153 }
154 if let Poll::Ready(()) = this.timer.poll(cx) {
155 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
156 let item = (ot)();
157 return Poll::Ready(item);
158 }
159 Poll::Pending
160 }
161}
162
163pin_project! {
164 #[derive(Debug)]
167 #[must_use = "futures do nothing unless polled"]
168 pub struct OnStalled<F, OS> {
169 #[pin]
170 timer: Timer,
171 #[pin]
172 future: F,
173 timeout: std::time::Duration,
174 on_stalled: Option<OS>,
175 }
176}
177
178impl<F: Future, OS> Future for OnStalled<F, OS>
179where
180 OS: FnOnce() -> F::Output,
181{
182 type Output = F::Output;
183
184 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
185 let mut this = self.project();
186 if let Poll::Ready(item) = this.future.poll(cx) {
187 return Poll::Ready(item);
188 }
189 match this.timer.as_mut().poll(cx) {
190 Poll::Ready(()) => {}
191 Poll::Pending => {
192 this.timer.set(this.timeout.into_timer());
193 ready!(this.timer.as_mut().poll(cx));
194 }
195 }
196 Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
197 }
198}
199
200#[cfg(test)]
201mod task_tests {
202
203 use super::*;
204 use futures::channel::oneshot;
205
206 fn run(f: impl Send + 'static + Future<Output = ()>) {
207 const TEST_THREADS: u8 = 2;
208 SendExecutor::new(TEST_THREADS).run(f)
209 }
210
211 #[test]
212 fn can_detach() {
213 run(async move {
214 let (tx_started, rx_started) = oneshot::channel();
215 let (tx_continue, rx_continue) = oneshot::channel();
216 let (tx_done, rx_done) = oneshot::channel();
217 {
218 Task::spawn(async move {
221 tx_started.send(()).unwrap();
222 rx_continue.await.unwrap();
223 tx_done.send(()).unwrap();
224 })
225 .detach();
226 }
227 rx_started.await.unwrap();
229 tx_continue.send(()).unwrap();
230 rx_done.await.unwrap();
231 });
232 }
233
234 #[test]
235 fn can_join() {
236 run(async move {
238 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
239 })
240 }
241
242 #[test]
243 fn can_join_unblock() {
244 run(async move {
246 assert_eq!(42, unblock(|| 42u8).await);
247 })
248 }
249
250 #[test]
251 fn can_join_unblock_local() {
252 LocalExecutor::new().run_singlethreaded(async move {
254 assert_eq!(42, unblock(|| 42u8).await);
255 });
256 }
257
258 #[test]
259 #[should_panic]
260 #[cfg_attr(feature = "variant_asan", ignore)]
262 fn unblock_fn_panics() {
263 run(async move {
264 unblock(|| panic!("bad")).await;
265 })
266 }
267
268 #[test]
269 fn can_join_local() {
270 LocalExecutor::new().run_singlethreaded(async move {
272 assert_eq!(42, Task::local(async move { 42u8 }).await);
273 })
274 }
275
276 #[test]
277 fn can_cancel() {
278 run(async move {
279 let (_tx_start, rx_start) = oneshot::channel::<()>();
280 let (tx_done, rx_done) = oneshot::channel();
281 drop(Task::spawn(async move {
283 rx_start.await.unwrap();
284 tx_done.send(()).unwrap();
285 }));
286 rx_done.await.expect_err("done should not be sent");
288 })
289 }
290}
291
292#[cfg(test)]
293mod timer_tests {
294 use super::*;
295 use futures::future::Either;
296 use std::pin::pin;
297
298 #[test]
299 fn shorter_fires_first_instant() {
300 use std::time::{Duration, Instant};
301 let mut exec = LocalExecutor::new();
302 let now = Instant::now();
303 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
304 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
305 match exec.run_singlethreaded(future::select(shorter, longer)) {
306 Either::Left((_, _)) => {}
307 Either::Right((_, _)) => panic!("wrong timer fired"),
308 }
309 }
310
311 #[cfg(target_os = "fuchsia")]
312 #[test]
313 fn can_use_zx_duration() {
314 let mut exec = LocalExecutor::new();
315 let start = MonotonicInstant::now();
316 let timer = Timer::new(MonotonicDuration::from_millis(100));
317 exec.run_singlethreaded(timer);
318 let end = MonotonicInstant::now();
319 assert!(end - start > MonotonicDuration::from_millis(100));
320 }
321
322 #[test]
323 fn can_detect_stalls() {
324 use std::sync::atomic::{AtomicU64, Ordering};
325 use std::sync::Arc;
326 use std::time::Duration;
327 let runs = Arc::new(AtomicU64::new(0));
328 assert_eq!(
329 {
330 let runs = runs.clone();
331 LocalExecutor::new().run_singlethreaded(
332 async move {
333 let mut sleep = Duration::from_millis(1);
334 loop {
335 Timer::new(sleep).await;
336 sleep *= 2;
337 runs.fetch_add(1, Ordering::SeqCst);
338 }
339 }
340 .on_stalled(Duration::from_secs(1), || 1u8),
341 )
342 },
343 1u8
344 );
345 assert!(runs.load(Ordering::SeqCst) >= 9);
346 }
347}