fuchsia_async/runtime/fuchsia/executor/
local.rs1use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle, MAIN_TASK_ID};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use futures::future::{self, Either};
12use futures::task::AtomicWaker;
13use std::fmt;
14use std::future::{poll_fn, Future};
15use std::pin::pin;
16use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20pub struct LocalExecutor {
30 pub(crate) ehandle: EHandle,
33 }
35
36impl fmt::Debug for LocalExecutor {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
39 }
40}
41
42impl Default for LocalExecutor {
43 fn default() -> Self {
44 Self::new()
45 }
46}
47
48impl LocalExecutor {
49 pub fn new() -> Self {
51 let inner = Arc::new(Executor::new(
52 ExecutorTime::RealTime,
53 true,
54 1,
55 ));
56 let root_scope = ScopeHandle::root(inner);
57 Executor::set_local(root_scope.clone());
58 Self { ehandle: EHandle { root_scope } }
59 }
60
61 pub fn port(&self) -> &zx::Port {
63 self.ehandle.port()
64 }
65
66 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
68 where
69 F: Future,
70 {
71 assert!(
72 self.ehandle.inner().is_real_time(),
73 "Error: called `run_singlethreaded` on an executor using fake time"
74 );
75
76 let Poll::Ready(result) = self.run::<false, _>(main_future) else {
77 unreachable!()
78 };
79 result
80 }
81
82 fn run<const UNTIL_STALLED: bool, Fut: Future>(
83 &mut self,
84 main_future: Fut,
85 ) -> Poll<Fut::Output> {
86 unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
90 std::mem::transmute(obj)
91 }
92
93 let scope = &self.ehandle.root_scope;
94 let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
95
96 unsafe {
99 scope.insert_task(remove_lifetime(task), false);
100 }
101
102 struct DropMainTask<'a>(&'a EHandle);
103 impl Drop for DropMainTask<'_> {
104 fn drop(&mut self) {
105 unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
108 }
109 }
110 let _drop_main_task = DropMainTask(&self.ehandle);
111
112 self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
113
114 unsafe {
117 self.ehandle.global_scope().poll_join_result(
118 MAIN_TASK_ID,
119 &mut Context::from_waker(&futures::task::noop_waker()),
120 )
121 }
122 }
123
124 #[doc(hidden)]
125 pub fn root_scope(&self) -> &ScopeHandle {
127 self.ehandle.global_scope()
128 }
129}
130
131impl Drop for LocalExecutor {
132 fn drop(&mut self) {
133 self.ehandle.inner().mark_done();
134 self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
135 }
136}
137
138pub struct TestExecutor {
143 local: LocalExecutor,
145}
146
147impl Default for TestExecutor {
148 fn default() -> Self {
149 Self::new()
150 }
151}
152
153impl TestExecutor {
154 pub fn new() -> Self {
156 Self { local: LocalExecutor::new() }
157 }
158
159 pub fn port(&self) -> &zx::Port {
161 self.local.port()
162 }
163
164 pub fn new_with_fake_time() -> Self {
166 let inner = Arc::new(Executor::new(
167 ExecutorTime::FakeTime {
168 mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
169 mono_to_boot_offset_ns: AtomicI64::new(0),
170 },
171 true,
172 1,
173 ));
174 let root_scope = ScopeHandle::root(inner);
175 Executor::set_local(root_scope.clone());
176 Self { local: LocalExecutor { ehandle: EHandle { root_scope } } }
177 }
178
179 pub fn now(&self) -> MonotonicInstant {
181 self.local.ehandle.inner().now()
182 }
183
184 pub fn boot_now(&self) -> BootInstant {
186 self.local.ehandle.inner().boot_now()
187 }
188
189 pub fn set_fake_time(&self, t: MonotonicInstant) {
195 self.local.ehandle.inner().set_fake_time(t)
196 }
197
198 pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
209 self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
210 }
211
212 pub fn global_scope(&self) -> &ScopeHandle {
214 self.local.root_scope()
215 }
216
217 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
219 where
220 F: Future,
221 {
222 self.local.run_singlethreaded(main_future)
223 }
224
225 pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
236 where
237 F: Future + Unpin,
238 {
239 let mut main_future = pin!(main_future);
240
241 struct Cleanup(Arc<Executor>);
243 impl Drop for Cleanup {
244 fn drop(&mut self) {
245 *self.0.owner_data.lock() = None;
246 }
247 }
248 let _cleanup = Cleanup(self.local.ehandle.inner().clone());
249 *self.local.ehandle.inner().owner_data.lock() =
250 Some(Box::new(UntilStalledData { watcher: None }));
251
252 loop {
253 let result = self.local.run::<true, _>(main_future.as_mut());
254 if result.is_ready() {
255 return result;
256 }
257
258 if let Some(watcher) = with_data(|data| data.watcher.take()) {
260 watcher.waker.wake();
261 watcher.done.store(true, Ordering::Relaxed);
264 } else {
265 break;
266 }
267 }
268
269 Poll::Pending
270 }
271
272 pub fn wake_expired_timers(&mut self) -> bool {
278 self.local.ehandle.inner().monotonic_timers().wake_timers()
279 || self.local.ehandle.inner().boot_timers().wake_timers()
280 }
281
282 pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
295 self.local.ehandle.inner().monotonic_timers().wake_next_timer()
296 }
297
298 pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
301 self.local.ehandle.inner().boot_timers().wake_next_timer()
302 }
303
304 pub fn next_timer() -> Option<MonotonicInstant> {
306 EHandle::local().inner().monotonic_timers().next_timer()
307 }
308
309 pub fn next_boot_timer() -> Option<BootInstant> {
311 EHandle::local().inner().boot_timers().next_timer()
312 }
313
314 pub async fn advance_to(time: MonotonicInstant) {
323 let ehandle = EHandle::local();
324 loop {
325 let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
326 if let Some(next_timer) = Self::next_timer() {
327 if next_timer <= time {
328 ehandle.inner().set_fake_time(next_timer);
329 continue;
330 }
331 }
332 ehandle.inner().set_fake_time(time);
333 break;
334 }
335 }
336
337 pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
361 let watcher =
362 Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
363
364 assert!(
365 with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
366 "Error: Another task has called `poll_until_stalled`."
367 );
368
369 struct Watcher(Arc<StalledWatcher>);
370
371 impl Drop for Watcher {
373 fn drop(&mut self) {
374 if !self.0.done.swap(true, Ordering::Relaxed) {
375 with_data(|data| data.watcher = None);
376 }
377 }
378 }
379
380 let watcher = Watcher(watcher);
381
382 let poll_fn = poll_fn(|cx: &mut Context<'_>| {
383 if watcher.0.done.load(Ordering::Relaxed) {
384 Poll::Ready(())
385 } else {
386 watcher.0.waker.register(cx.waker());
387 Poll::Pending
388 }
389 });
390 match future::select(poll_fn, fut).await {
391 Either::Left(_) => Poll::Pending,
392 Either::Right((value, _)) => Poll::Ready(value),
393 }
394 }
395}
396
397struct StalledWatcher {
398 waker: AtomicWaker,
399 done: AtomicBool,
400}
401
402struct UntilStalledData {
403 watcher: Option<Arc<StalledWatcher>>,
404}
405
406fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
412 const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
413 with TestExecutor::run_until_stalled";
414 f(EHandle::local()
415 .inner()
416 .owner_data
417 .lock()
418 .as_mut()
419 .expect(MESSAGE)
420 .downcast_mut::<UntilStalledData>()
421 .expect(MESSAGE))
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::handle::on_signals::OnSignals;
428 use crate::{Interval, Timer, WakeupTime};
429 use assert_matches::assert_matches;
430 use futures::StreamExt;
431 use std::cell::{Cell, RefCell};
432 use std::rc::Rc;
433 use std::task::Waker;
434 use zx::{self as zx, AsHandleRef};
435
436 fn spawn(future: impl Future<Output = ()> + Send + 'static) {
437 crate::EHandle::local().spawn_detached(future);
438 }
439
440 #[test]
442 fn stepwise_two_steps() {
443 let fut_step = Rc::new(Cell::new(0));
444 let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
445 let fut_waker_clone = fut_waker.clone();
446 let fut_step_clone = fut_step.clone();
447 let fut_fn = move |cx: &mut Context<'_>| {
448 fut_waker_clone.borrow_mut().replace(cx.waker().clone());
449 match fut_step_clone.get() {
450 0 => {
451 fut_step_clone.set(1);
452 Poll::Pending
453 }
454 1 => {
455 fut_step_clone.set(2);
456 Poll::Ready(())
457 }
458 _ => panic!("future called after done"),
459 }
460 };
461 let fut = Box::new(future::poll_fn(fut_fn));
462 let mut executor = TestExecutor::new_with_fake_time();
463 executor.local.ehandle.spawn_local_detached(fut);
466 assert_eq!(fut_step.get(), 0);
467 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
468 assert_eq!(fut_step.get(), 1);
469
470 fut_waker.borrow_mut().take().unwrap().wake();
471 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
472 assert_eq!(fut_step.get(), 2);
473 }
474
475 #[test]
476 fn stepwise_timer() {
478 let mut executor = TestExecutor::new_with_fake_time();
479 executor.set_fake_time(MonotonicInstant::from_nanos(0));
480 let mut fut =
481 pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
482
483 let _ = executor.run_until_stalled(&mut fut);
484 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
485
486 executor.set_fake_time(MonotonicInstant::from_nanos(1000));
487 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
488 assert!(executor.run_until_stalled(&mut fut).is_ready());
489 }
490
491 #[test]
493 fn stepwise_event() {
494 let mut executor = TestExecutor::new_with_fake_time();
495 let event = zx::Event::create();
496 let mut fut = pin!(OnSignals::new(&event, zx::Signals::USER_0));
497
498 let _ = executor.run_until_stalled(&mut fut);
499
500 event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
501 assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
502 }
503
504 #[test]
507 fn run_until_stalled_preserves_order() {
508 let mut executor = TestExecutor::new_with_fake_time();
509 let spawned_fut_completed = Arc::new(AtomicBool::new(false));
510 let spawned_fut_completed_writer = spawned_fut_completed.clone();
511 let spawned_fut = Box::pin(async move {
512 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
513 spawned_fut_completed_writer.store(true, Ordering::SeqCst);
514 });
515 let mut main_fut = pin!(async {
516 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
517 });
518 spawn(spawned_fut);
519 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
520 executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
521 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
524 assert!(spawned_fut_completed.load(Ordering::SeqCst));
525 }
526
527 #[test]
528 fn task_destruction() {
529 struct DropSpawner {
530 dropped: Arc<AtomicBool>,
531 }
532 impl Drop for DropSpawner {
533 fn drop(&mut self) {
534 self.dropped.store(true, Ordering::SeqCst);
535 let dropped_clone = self.dropped.clone();
536 spawn(async {
537 let _dropped_clone = dropped_clone;
539 panic!("task spawned in drop shouldn't be polled");
540 });
541 }
542 }
543 let mut dropped = Arc::new(AtomicBool::new(false));
544 let drop_spawner = DropSpawner { dropped: dropped.clone() };
545 let mut executor = TestExecutor::new();
546 let mut main_fut = pin!(async move {
547 spawn(async move {
548 let _drop_spawner = drop_spawner;
550 future::pending::<()>().await;
551 });
552 });
553 assert!(executor.run_until_stalled(&mut main_fut).is_ready());
554 assert!(
555 !dropped.load(Ordering::SeqCst),
556 "executor dropped pending task before destruction"
557 );
558
559 drop(executor);
562 let dropped = Arc::get_mut(&mut dropped)
563 .expect("someone else is unexpectedly still holding on to a reference");
564 assert!(
565 dropped.load(Ordering::SeqCst),
566 "executor did not drop pending task during destruction"
567 );
568 }
569
570 #[test]
571 fn time_now_real_time() {
572 let _executor = LocalExecutor::new();
573 let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
574 let t2 = MonotonicInstant::now().into_zx();
575 let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
576 assert!(t1 <= t2);
577 assert!(t2 <= t3);
578 }
579
580 #[test]
581 fn time_now_fake_time() {
582 let executor = TestExecutor::new_with_fake_time();
583 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
584 executor.set_fake_time(t1);
585 assert_eq!(MonotonicInstant::now(), t1);
586
587 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
588 executor.set_fake_time(t2);
589 assert_eq!(MonotonicInstant::now(), t2);
590 }
591
592 #[test]
593 fn time_now_fake_time_boot() {
594 let executor = TestExecutor::new_with_fake_time();
595 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
596 executor.set_fake_time(t1);
597 assert_eq!(MonotonicInstant::now(), t1);
598 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
599
600 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
601 executor.set_fake_time(t2);
602 assert_eq!(MonotonicInstant::now(), t2);
603 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
604
605 const TEST_BOOT_OFFSET: i64 = 42;
606
607 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
608 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
609 }
610
611 #[test]
612 fn time_boot_now() {
613 let executor = TestExecutor::new_with_fake_time();
614 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
615 executor.set_fake_time(t1);
616 assert_eq!(MonotonicInstant::now(), t1);
617 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
618
619 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
620 executor.set_fake_time(t2);
621 assert_eq!(MonotonicInstant::now(), t2);
622 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
623
624 const TEST_BOOT_OFFSET: i64 = 42;
625
626 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
627 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
628 }
629
630 #[test]
631 fn time_after_overflow() {
632 let executor = TestExecutor::new_with_fake_time();
633
634 executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
635 assert_eq!(
636 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
637 MonotonicInstant::INFINITE
638 );
639
640 executor.set_fake_time(
641 MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
642 );
643 assert_eq!(
644 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
645 MonotonicInstant::INFINITE_PAST
646 );
647 }
648
649 async fn multi_wake(n: usize) {
651 let mut done = false;
652 futures::future::poll_fn(|cx| {
653 if done {
654 return Poll::Ready(());
655 }
656 for _ in 1..n {
657 cx.waker().wake_by_ref()
658 }
659 done = true;
660 Poll::Pending
661 })
662 .await;
663 }
664
665 #[test]
666 fn test_boot_time_tracks_mono_time() {
667 const FAKE_TIME: i64 = 42;
668 let executor = TestExecutor::new_with_fake_time();
669 executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
670 assert_eq!(
671 BootInstant::from_nanos(FAKE_TIME),
672 executor.boot_now(),
673 "boot time should have advanced"
674 );
675
676 executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
678 assert_eq!(
679 BootInstant::from_nanos(2 * FAKE_TIME),
680 executor.boot_now(),
681 "boot time should have advanced again"
682 );
683 }
684
685 #[test]
688 fn many_wakeups() {
689 let mut executor = LocalExecutor::new();
690 executor.run_singlethreaded(multi_wake(4096 * 2));
691 }
692
693 fn advance_to_with(timer_duration: impl WakeupTime) {
694 let mut executor = TestExecutor::new_with_fake_time();
695 executor.set_fake_time(MonotonicInstant::from_nanos(0));
696
697 let mut fut = pin!(async {
698 let timer_fired = Arc::new(AtomicBool::new(false));
699 futures::join!(
700 async {
701 Timer::new(timer_duration).await;
703 timer_fired.store(true, Ordering::SeqCst);
704 },
705 async {
706 let mut fired = 0;
708 let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
709 while interval.next().await.is_some() {
710 fired += 1;
711 if fired == 3 {
712 break;
713 }
714 }
715 assert_eq!(fired, 3, "interval timer should have fired multiple times.");
716 },
717 async {
718 assert!(
719 !timer_fired.load(Ordering::SeqCst),
720 "the oneshot timer shouldn't be fired"
721 );
722 TestExecutor::advance_to(MonotonicInstant::after(
723 zx::MonotonicDuration::from_millis(500),
724 ))
725 .await;
726 assert!(
728 !timer_fired.load(Ordering::SeqCst),
729 "the oneshot timer shouldn't be fired"
730 );
731 TestExecutor::advance_to(MonotonicInstant::after(
732 zx::MonotonicDuration::from_millis(500),
733 ))
734 .await;
735
736 assert!(
737 timer_fired.load(Ordering::SeqCst),
738 "the oneshot timer should have fired"
739 );
740
741 TestExecutor::advance_to(MonotonicInstant::after(
743 zx::MonotonicDuration::from_seconds(2),
744 ))
745 .await;
746 }
747 )
748 });
749 assert!(executor.run_until_stalled(&mut fut).is_ready());
750 }
751
752 #[test]
753 fn test_advance_to() {
754 advance_to_with(zx::MonotonicDuration::from_seconds(1));
755 }
756
757 #[test]
758 fn test_advance_to_boot() {
759 advance_to_with(zx::BootDuration::from_seconds(1));
760 }
761}