fuchsia_async/runtime/
mod.rs1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(target_os = "fuchsia")]
11pub mod instrument;
12
13#[cfg(not(target_os = "fuchsia"))]
14mod portable;
15#[cfg(not(target_os = "fuchsia"))]
16use self::portable as implementation;
17
18pub use implementation::executor::{
20 LocalExecutor, LocalExecutorBuilder, MonotonicDuration, MonotonicInstant, SendExecutor,
21 SendExecutorBuilder, SpawnableFuture, TestExecutor, TestExecutorBuilder,
22};
23pub use implementation::task::{unblock, yield_now, JoinHandle, Task};
24pub use implementation::timer::Timer;
25
26mod task_group;
27pub use task_group::*;
28
29#[cfg(target_os = "fuchsia")]
31pub use self::fuchsia::{
32 executor::{
33 BootInstant, EHandle, PacketReceiver, RawReceiverRegistration, ReceiverRegistration,
34 },
35 timer::Interval,
36};
37
38pub mod scope;
42
43pub use scope::{Scope, ScopeHandle};
44
45use futures::prelude::*;
46use pin_project_lite::pin_project;
47use std::pin::Pin;
48use std::task::{ready, Context, Poll};
49
50pub trait DurationExt {
52 fn after_now(self) -> MonotonicInstant;
57}
58
59pub trait WakeupTime {
61 fn into_timer(self) -> Timer;
66}
67
68#[cfg(target_os = "fuchsia")]
69impl WakeupTime for std::time::Duration {
70 fn into_timer(self) -> Timer {
71 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
72 }
73}
74
75#[cfg(not(target_os = "fuchsia"))]
76impl WakeupTime for std::time::Duration {
77 fn into_timer(self) -> Timer {
78 Timer::from(self)
79 }
80}
81
82#[cfg(target_os = "fuchsia")]
83impl WakeupTime for MonotonicDuration {
84 fn into_timer(self) -> Timer {
85 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
86 }
87}
88
89#[cfg(target_os = "fuchsia")]
90impl WakeupTime for zx::BootDuration {
91 fn into_timer(self) -> Timer {
92 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
93 }
94}
95
96impl DurationExt for std::time::Duration {
97 #[allow(clippy::useless_conversion)] fn after_now(self) -> MonotonicInstant {
99 MonotonicInstant::now() + self.into()
100 }
101}
102
103pub trait TimeoutExt: Future + Sized {
105 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
108 where
109 WT: WakeupTime,
110 OT: FnOnce() -> Self::Output,
111 {
112 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
113 }
114
115 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
120 where
121 OS: FnOnce() -> Self::Output,
122 {
123 OnStalled {
124 timer: timeout.into_timer(),
125 future: self,
126 timeout,
127 on_stalled: Some(on_stalled),
128 }
129 }
130}
131
132impl<F: Future + Sized> TimeoutExt for F {}
133
134pin_project! {
135 #[derive(Debug)]
137 #[must_use = "futures do nothing unless polled"]
138 pub struct OnTimeout<F, OT> {
139 #[pin]
140 timer: Timer,
141 #[pin]
142 future: F,
143 on_timeout: Option<OT>,
144 }
145}
146
147impl<F: Future, OT> Future for OnTimeout<F, OT>
148where
149 OT: FnOnce() -> F::Output,
150{
151 type Output = F::Output;
152
153 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
154 let this = self.project();
155 if let Poll::Ready(item) = this.future.poll(cx) {
156 return Poll::Ready(item);
157 }
158 if let Poll::Ready(()) = this.timer.poll(cx) {
159 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
160 let item = (ot)();
161 return Poll::Ready(item);
162 }
163 Poll::Pending
164 }
165}
166
167pin_project! {
168 #[derive(Debug)]
171 #[must_use = "futures do nothing unless polled"]
172 pub struct OnStalled<F, OS> {
173 #[pin]
174 timer: Timer,
175 #[pin]
176 future: F,
177 timeout: std::time::Duration,
178 on_stalled: Option<OS>,
179 }
180}
181
182impl<F: Future, OS> Future for OnStalled<F, OS>
183where
184 OS: FnOnce() -> F::Output,
185{
186 type Output = F::Output;
187
188 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189 let mut this = self.project();
190 if let Poll::Ready(item) = this.future.poll(cx) {
191 return Poll::Ready(item);
192 }
193 match this.timer.as_mut().poll(cx) {
194 Poll::Ready(()) => {}
195 Poll::Pending => {
196 this.timer.set(this.timeout.into_timer());
197 ready!(this.timer.as_mut().poll(cx));
198 }
199 }
200 Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
201 }
202}
203
204#[cfg(test)]
205mod task_tests {
206
207 use super::*;
208 use futures::channel::oneshot;
209
210 fn run(f: impl Send + 'static + Future<Output = ()>) {
211 const TEST_THREADS: u8 = 2;
212 SendExecutorBuilder::new().num_threads(TEST_THREADS).build().run(f)
213 }
214
215 #[test]
216 fn can_detach() {
217 run(async move {
218 let (tx_started, rx_started) = oneshot::channel();
219 let (tx_continue, rx_continue) = oneshot::channel();
220 let (tx_done, rx_done) = oneshot::channel();
221 {
222 Task::spawn(async move {
225 tx_started.send(()).unwrap();
226 rx_continue.await.unwrap();
227 tx_done.send(()).unwrap();
228 })
229 .detach();
230 }
231 rx_started.await.unwrap();
233 tx_continue.send(()).unwrap();
234 rx_done.await.unwrap();
235 });
236 }
237
238 #[test]
239 fn can_join() {
240 run(async move {
242 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
243 })
244 }
245
246 #[test]
247 fn can_join_unblock() {
248 run(async move {
250 assert_eq!(42, unblock(|| 42u8).await);
251 })
252 }
253
254 #[test]
255 fn can_join_unblock_local() {
256 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
258 assert_eq!(42, unblock(|| 42u8).await);
259 });
260 }
261
262 #[test]
263 #[should_panic]
264 #[cfg_attr(feature = "variant_asan", ignore)]
266 fn unblock_fn_panics() {
267 run(async move {
268 unblock(|| panic!("bad")).await;
269 })
270 }
271
272 #[test]
273 fn can_join_local() {
274 LocalExecutorBuilder::new().build().run_singlethreaded(async move {
276 assert_eq!(42, Task::local(async move { 42u8 }).await);
277 })
278 }
279
280 #[test]
281 fn can_cancel() {
282 run(async move {
283 let (_tx_start, rx_start) = oneshot::channel::<()>();
284 let (tx_done, rx_done) = oneshot::channel();
285 drop(Task::spawn(async move {
287 rx_start.await.unwrap();
288 tx_done.send(()).unwrap();
289 }));
290 rx_done.await.expect_err("done should not be sent");
292 })
293 }
294}
295
296#[cfg(test)]
297mod timer_tests {
298 use super::*;
299 use futures::future::Either;
300 use std::pin::pin;
301
302 #[test]
303 fn shorter_fires_first_instant() {
304 use std::time::{Duration, Instant};
305 let mut exec = LocalExecutorBuilder::new().build();
306 let now = Instant::now();
307 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
308 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
309 match exec.run_singlethreaded(future::select(shorter, longer)) {
310 Either::Left((_, _)) => {}
311 Either::Right((_, _)) => panic!("wrong timer fired"),
312 }
313 }
314
315 #[cfg(target_os = "fuchsia")]
316 #[test]
317 fn can_use_zx_duration() {
318 let mut exec = LocalExecutorBuilder::new().build();
319 let start = MonotonicInstant::now();
320 let timer = Timer::new(MonotonicDuration::from_millis(100));
321 exec.run_singlethreaded(timer);
322 let end = MonotonicInstant::now();
323 assert!(end - start > MonotonicDuration::from_millis(100));
324 }
325
326 #[test]
327 fn can_detect_stalls() {
328 use std::sync::atomic::{AtomicU64, Ordering};
329 use std::sync::Arc;
330 use std::time::Duration;
331 let runs = Arc::new(AtomicU64::new(0));
332 assert_eq!(
333 {
334 let runs = runs.clone();
335 LocalExecutorBuilder::new().build().run_singlethreaded(
336 async move {
337 let mut sleep = Duration::from_millis(1);
338 loop {
339 Timer::new(sleep).await;
340 sleep *= 2;
341 runs.fetch_add(1, Ordering::SeqCst);
342 }
343 }
344 .on_stalled(Duration::from_secs(1), || 1u8),
345 )
346 },
347 1u8
348 );
349 assert!(runs.load(Ordering::SeqCst) >= 9);
350 }
351}