fuchsia_async/runtime/
mod.rs1#[cfg(target_os = "fuchsia")]
6mod fuchsia;
7#[cfg(target_os = "fuchsia")]
8use self::fuchsia as implementation;
9
10#[cfg(not(target_os = "fuchsia"))]
11mod portable;
12#[cfg(not(target_os = "fuchsia"))]
13use self::portable as implementation;
14
15pub use implementation::executor::{
17 LocalExecutor, MonotonicDuration, MonotonicInstant, SendExecutor, SpawnableFuture, TestExecutor,
18};
19pub use implementation::task::{unblock, JoinHandle, Task};
20pub use implementation::timer::Timer;
21
22mod task_group;
23pub use task_group::*;
24
25#[cfg(target_os = "fuchsia")]
27pub use self::fuchsia::{
28 executor::{BootInstant, EHandle, PacketReceiver, ReceiverRegistration},
29 timer::Interval,
30};
31
32pub mod scope {
36 pub use super::implementation::scope::{Scope, ScopeHandle};
37
38 #[cfg(target_os = "fuchsia")]
39 pub use super::implementation::scope::{Join, ScopeActiveGuard, ScopeStream, Spawnable};
40}
41
42pub use scope::{Scope, ScopeHandle};
43
44use futures::prelude::*;
45use pin_project_lite::pin_project;
46use std::pin::Pin;
47use std::task::{ready, Context, Poll};
48
49pub trait DurationExt {
51 fn after_now(self) -> MonotonicInstant;
56}
57
58pub trait WakeupTime {
60 fn into_timer(self) -> Timer;
65}
66
67#[cfg(target_os = "fuchsia")]
68impl WakeupTime for std::time::Duration {
69 fn into_timer(self) -> Timer {
70 EHandle::local().mono_timers().new_timer(MonotonicInstant::now() + self.into())
71 }
72}
73
74#[cfg(not(target_os = "fuchsia"))]
75impl WakeupTime for std::time::Duration {
76 fn into_timer(self) -> Timer {
77 Timer::from(self)
78 }
79}
80
81#[cfg(target_os = "fuchsia")]
82impl WakeupTime for MonotonicDuration {
83 fn into_timer(self) -> Timer {
84 EHandle::local().mono_timers().new_timer(MonotonicInstant::after(self))
85 }
86}
87
88#[cfg(target_os = "fuchsia")]
89impl WakeupTime for zx::BootDuration {
90 fn into_timer(self) -> Timer {
91 EHandle::local().boot_timers().new_timer(BootInstant::after(self))
92 }
93}
94
95impl DurationExt for std::time::Duration {
96 #[allow(clippy::useless_conversion)] fn after_now(self) -> MonotonicInstant {
98 MonotonicInstant::now() + self.into()
99 }
100}
101
102pub trait TimeoutExt: Future + Sized {
104 fn on_timeout<WT, OT>(self, time: WT, on_timeout: OT) -> OnTimeout<Self, OT>
107 where
108 WT: WakeupTime,
109 OT: FnOnce() -> Self::Output,
110 {
111 OnTimeout { timer: time.into_timer(), future: self, on_timeout: Some(on_timeout) }
112 }
113
114 fn on_stalled<OS>(self, timeout: std::time::Duration, on_stalled: OS) -> OnStalled<Self, OS>
119 where
120 OS: FnOnce() -> Self::Output,
121 {
122 OnStalled {
123 timer: timeout.into_timer(),
124 future: self,
125 timeout,
126 on_stalled: Some(on_stalled),
127 }
128 }
129}
130
131impl<F: Future + Sized> TimeoutExt for F {}
132
133pin_project! {
134 #[derive(Debug)]
136 #[must_use = "futures do nothing unless polled"]
137 pub struct OnTimeout<F, OT> {
138 #[pin]
139 timer: Timer,
140 #[pin]
141 future: F,
142 on_timeout: Option<OT>,
143 }
144}
145
146impl<F: Future, OT> Future for OnTimeout<F, OT>
147where
148 OT: FnOnce() -> F::Output,
149{
150 type Output = F::Output;
151
152 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
153 let this = self.project();
154 if let Poll::Ready(item) = this.future.poll(cx) {
155 return Poll::Ready(item);
156 }
157 if let Poll::Ready(()) = this.timer.poll(cx) {
158 let ot = this.on_timeout.take().expect("polled withtimeout after completion");
159 let item = (ot)();
160 return Poll::Ready(item);
161 }
162 Poll::Pending
163 }
164}
165
166pin_project! {
167 #[derive(Debug)]
170 #[must_use = "futures do nothing unless polled"]
171 pub struct OnStalled<F, OS> {
172 #[pin]
173 timer: Timer,
174 #[pin]
175 future: F,
176 timeout: std::time::Duration,
177 on_stalled: Option<OS>,
178 }
179}
180
181impl<F: Future, OS> Future for OnStalled<F, OS>
182where
183 OS: FnOnce() -> F::Output,
184{
185 type Output = F::Output;
186
187 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188 let mut this = self.project();
189 if let Poll::Ready(item) = this.future.poll(cx) {
190 return Poll::Ready(item);
191 }
192 match this.timer.as_mut().poll(cx) {
193 Poll::Ready(()) => {}
194 Poll::Pending => {
195 this.timer.set(this.timeout.into_timer());
196 ready!(this.timer.as_mut().poll(cx));
197 }
198 }
199 Poll::Ready((this.on_stalled.take().expect("polled after completion"))())
200 }
201}
202
203#[cfg(test)]
204mod task_tests {
205
206 use super::*;
207 use futures::channel::oneshot;
208
209 fn run(f: impl Send + 'static + Future<Output = ()>) {
210 const TEST_THREADS: u8 = 2;
211 SendExecutor::new(TEST_THREADS).run(f)
212 }
213
214 #[test]
215 fn can_detach() {
216 run(async move {
217 let (tx_started, rx_started) = oneshot::channel();
218 let (tx_continue, rx_continue) = oneshot::channel();
219 let (tx_done, rx_done) = oneshot::channel();
220 {
221 Task::spawn(async move {
224 tx_started.send(()).unwrap();
225 rx_continue.await.unwrap();
226 tx_done.send(()).unwrap();
227 })
228 .detach();
229 }
230 rx_started.await.unwrap();
232 tx_continue.send(()).unwrap();
233 rx_done.await.unwrap();
234 });
235 }
236
237 #[test]
238 fn can_join() {
239 run(async move {
241 assert_eq!(42, Task::spawn(async move { 42u8 }).await);
242 })
243 }
244
245 #[test]
246 fn can_join_unblock() {
247 run(async move {
249 assert_eq!(42, unblock(|| 42u8).await);
250 })
251 }
252
253 #[test]
254 fn can_join_unblock_local() {
255 LocalExecutor::new().run_singlethreaded(async move {
257 assert_eq!(42, unblock(|| 42u8).await);
258 });
259 }
260
261 #[test]
262 #[should_panic]
263 #[cfg_attr(feature = "variant_asan", ignore)]
265 fn unblock_fn_panics() {
266 run(async move {
267 unblock(|| panic!("bad")).await;
268 })
269 }
270
271 #[test]
272 fn can_join_local() {
273 LocalExecutor::new().run_singlethreaded(async move {
275 assert_eq!(42, Task::local(async move { 42u8 }).await);
276 })
277 }
278
279 #[test]
280 fn can_cancel() {
281 run(async move {
282 let (_tx_start, rx_start) = oneshot::channel::<()>();
283 let (tx_done, rx_done) = oneshot::channel();
284 drop(Task::spawn(async move {
286 rx_start.await.unwrap();
287 tx_done.send(()).unwrap();
288 }));
289 rx_done.await.expect_err("done should not be sent");
291 })
292 }
293}
294
295#[cfg(test)]
296mod timer_tests {
297 use super::*;
298 use futures::future::Either;
299 use std::pin::pin;
300
301 #[test]
302 fn shorter_fires_first_instant() {
303 use std::time::{Duration, Instant};
304 let mut exec = LocalExecutor::new();
305 let now = Instant::now();
306 let shorter = pin!(Timer::new(now + Duration::from_millis(100)));
307 let longer = pin!(Timer::new(now + Duration::from_secs(1)));
308 match exec.run_singlethreaded(future::select(shorter, longer)) {
309 Either::Left((_, _)) => {}
310 Either::Right((_, _)) => panic!("wrong timer fired"),
311 }
312 }
313
314 #[cfg(target_os = "fuchsia")]
315 #[test]
316 fn can_use_zx_duration() {
317 let mut exec = LocalExecutor::new();
318 let start = MonotonicInstant::now();
319 let timer = Timer::new(MonotonicDuration::from_millis(100));
320 exec.run_singlethreaded(timer);
321 let end = MonotonicInstant::now();
322 assert!(end - start > MonotonicDuration::from_millis(100));
323 }
324
325 #[test]
326 fn can_detect_stalls() {
327 use std::sync::atomic::{AtomicU64, Ordering};
328 use std::sync::Arc;
329 use std::time::Duration;
330 let runs = Arc::new(AtomicU64::new(0));
331 assert_eq!(
332 {
333 let runs = runs.clone();
334 LocalExecutor::new().run_singlethreaded(
335 async move {
336 let mut sleep = Duration::from_millis(1);
337 loop {
338 Timer::new(sleep).await;
339 sleep *= 2;
340 runs.fetch_add(1, Ordering::SeqCst);
341 }
342 }
343 .on_stalled(Duration::from_secs(1), || 1u8),
344 )
345 },
346 1u8
347 );
348 assert!(runs.load(Ordering::SeqCst) >= 9);
349 }
350}