fuchsia_async/runtime/fuchsia/executor/
local.rs1use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle, MAIN_TASK_ID};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use futures::future::{self, Either};
12use futures::task::AtomicWaker;
13use std::fmt;
14use std::future::{poll_fn, Future};
15use std::pin::pin;
16use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20pub struct LocalExecutor {
30 pub(crate) ehandle: EHandle,
33 }
35
36impl fmt::Debug for LocalExecutor {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
39 }
40}
41
42impl LocalExecutor {
43 pub fn new() -> Self {
45 let inner = Arc::new(Executor::new(
46 ExecutorTime::RealTime,
47 true,
48 1,
49 ));
50 let root_scope = ScopeHandle::root(inner);
51 Executor::set_local(root_scope.clone());
52 Self { ehandle: EHandle { root_scope } }
53 }
54
55 pub fn port(&self) -> &zx::Port {
57 self.ehandle.port()
58 }
59
60 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
62 where
63 F: Future,
64 {
65 assert!(
66 self.ehandle.inner().is_real_time(),
67 "Error: called `run_singlethreaded` on an executor using fake time"
68 );
69
70 let Poll::Ready(result) = self.run::<false, _>(main_future) else {
71 unreachable!()
72 };
73 result
74 }
75
76 fn run<const UNTIL_STALLED: bool, Fut: Future>(
77 &mut self,
78 main_future: Fut,
79 ) -> Poll<Fut::Output> {
80 unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
84 std::mem::transmute(obj)
85 }
86
87 let scope = &self.ehandle.root_scope;
88 let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
89
90 unsafe {
93 scope.insert_task(remove_lifetime(task), false);
94 }
95
96 struct DropMainTask<'a>(&'a EHandle);
97 impl Drop for DropMainTask<'_> {
98 fn drop(&mut self) {
99 unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
102 }
103 }
104 let _drop_main_task = DropMainTask(&self.ehandle);
105
106 self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
107
108 unsafe {
111 self.ehandle.global_scope().poll_join_result(
112 MAIN_TASK_ID,
113 &mut Context::from_waker(&futures::task::noop_waker()),
114 )
115 }
116 }
117
118 #[doc(hidden)]
119 pub fn root_scope(&self) -> &ScopeHandle {
121 self.ehandle.global_scope()
122 }
123}
124
125impl Drop for LocalExecutor {
126 fn drop(&mut self) {
127 self.ehandle.inner().mark_done();
128 self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
129 }
130}
131
132pub struct TestExecutor {
137 local: LocalExecutor,
139}
140
141impl TestExecutor {
142 pub fn new() -> Self {
144 Self { local: LocalExecutor::new() }
145 }
146
147 pub fn port(&self) -> &zx::Port {
149 self.local.port()
150 }
151
152 pub fn new_with_fake_time() -> Self {
154 let inner = Arc::new(Executor::new(
155 ExecutorTime::FakeTime {
156 mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
157 mono_to_boot_offset_ns: AtomicI64::new(0),
158 },
159 true,
160 1,
161 ));
162 let root_scope = ScopeHandle::root(inner);
163 Executor::set_local(root_scope.clone());
164 Self { local: LocalExecutor { ehandle: EHandle { root_scope } } }
165 }
166
167 pub fn now(&self) -> MonotonicInstant {
169 self.local.ehandle.inner().now()
170 }
171
172 pub fn boot_now(&self) -> BootInstant {
174 self.local.ehandle.inner().boot_now()
175 }
176
177 pub fn set_fake_time(&self, t: MonotonicInstant) {
183 self.local.ehandle.inner().set_fake_time(t)
184 }
185
186 pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
197 self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
198 }
199
200 pub fn global_scope(&self) -> &ScopeHandle {
202 self.local.root_scope()
203 }
204
205 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
207 where
208 F: Future,
209 {
210 self.local.run_singlethreaded(main_future)
211 }
212
213 pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
224 where
225 F: Future + Unpin,
226 {
227 let mut main_future = pin!(main_future);
228
229 struct Cleanup(Arc<Executor>);
231 impl Drop for Cleanup {
232 fn drop(&mut self) {
233 *self.0.owner_data.lock() = None;
234 }
235 }
236 let _cleanup = Cleanup(self.local.ehandle.inner().clone());
237 *self.local.ehandle.inner().owner_data.lock() =
238 Some(Box::new(UntilStalledData { watcher: None }));
239
240 loop {
241 let result = self.local.run::<true, _>(main_future.as_mut());
242 if result.is_ready() {
243 return result;
244 }
245
246 if let Some(watcher) = with_data(|data| data.watcher.take()) {
248 watcher.waker.wake();
249 watcher.done.store(true, Ordering::Relaxed);
252 } else {
253 break;
254 }
255 }
256
257 Poll::Pending
258 }
259
260 pub fn wake_expired_timers(&mut self) -> bool {
266 self.local.ehandle.inner().monotonic_timers().wake_timers()
267 || self.local.ehandle.inner().boot_timers().wake_timers()
268 }
269
270 pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
283 self.local.ehandle.inner().monotonic_timers().wake_next_timer()
284 }
285
286 pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
289 self.local.ehandle.inner().boot_timers().wake_next_timer()
290 }
291
292 pub fn next_timer() -> Option<MonotonicInstant> {
294 EHandle::local().inner().monotonic_timers().next_timer()
295 }
296
297 pub fn next_boot_timer() -> Option<BootInstant> {
299 EHandle::local().inner().boot_timers().next_timer()
300 }
301
302 pub async fn advance_to(time: MonotonicInstant) {
311 let ehandle = EHandle::local();
312 loop {
313 let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
314 if let Some(next_timer) = Self::next_timer() {
315 if next_timer <= time {
316 ehandle.inner().set_fake_time(next_timer);
317 continue;
318 }
319 }
320 ehandle.inner().set_fake_time(time);
321 break;
322 }
323 }
324
325 pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
349 let watcher =
350 Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
351
352 assert!(
353 with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
354 "Error: Another task has called `poll_until_stalled`."
355 );
356
357 struct Watcher(Arc<StalledWatcher>);
358
359 impl Drop for Watcher {
361 fn drop(&mut self) {
362 if !self.0.done.swap(true, Ordering::Relaxed) {
363 with_data(|data| data.watcher = None);
364 }
365 }
366 }
367
368 let watcher = Watcher(watcher);
369
370 let poll_fn = poll_fn(|cx: &mut Context<'_>| {
371 if watcher.0.done.load(Ordering::Relaxed) {
372 Poll::Ready(())
373 } else {
374 watcher.0.waker.register(cx.waker());
375 Poll::Pending
376 }
377 });
378 match future::select(poll_fn, fut).await {
379 Either::Left(_) => Poll::Pending,
380 Either::Right((value, _)) => Poll::Ready(value),
381 }
382 }
383}
384
385struct StalledWatcher {
386 waker: AtomicWaker,
387 done: AtomicBool,
388}
389
390struct UntilStalledData {
391 watcher: Option<Arc<StalledWatcher>>,
392}
393
394fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
400 const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
401 with TestExecutor::run_until_stalled";
402 f(&mut EHandle::local()
403 .inner()
404 .owner_data
405 .lock()
406 .as_mut()
407 .expect(MESSAGE)
408 .downcast_mut::<UntilStalledData>()
409 .expect(MESSAGE))
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::handle::on_signals::OnSignals;
416 use crate::{Interval, Timer, WakeupTime};
417 use assert_matches::assert_matches;
418 use futures::StreamExt;
419 use std::cell::{Cell, RefCell};
420 use std::task::Waker;
421 use zx::{self as zx, AsHandleRef};
422
423 fn spawn(future: impl Future<Output = ()> + Send + 'static) {
424 crate::EHandle::local().spawn_detached(future);
425 }
426
427 #[test]
429 fn stepwise_two_steps() {
430 let fut_step = Arc::new(Cell::new(0));
431 let fut_waker: Arc<RefCell<Option<Waker>>> = Arc::new(RefCell::new(None));
432 let fut_waker_clone = fut_waker.clone();
433 let fut_step_clone = fut_step.clone();
434 let fut_fn = move |cx: &mut Context<'_>| {
435 fut_waker_clone.borrow_mut().replace(cx.waker().clone());
436 match fut_step_clone.get() {
437 0 => {
438 fut_step_clone.set(1);
439 Poll::Pending
440 }
441 1 => {
442 fut_step_clone.set(2);
443 Poll::Ready(())
444 }
445 _ => panic!("future called after done"),
446 }
447 };
448 let fut = Box::new(future::poll_fn(fut_fn));
449 let mut executor = TestExecutor::new_with_fake_time();
450 executor.local.ehandle.spawn_local_detached(fut);
453 assert_eq!(fut_step.get(), 0);
454 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
455 assert_eq!(fut_step.get(), 1);
456
457 fut_waker.borrow_mut().take().unwrap().wake();
458 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
459 assert_eq!(fut_step.get(), 2);
460 }
461
462 #[test]
463 fn stepwise_timer() {
465 let mut executor = TestExecutor::new_with_fake_time();
466 executor.set_fake_time(MonotonicInstant::from_nanos(0));
467 let mut fut =
468 pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
469
470 let _ = executor.run_until_stalled(&mut fut);
471 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
472
473 executor.set_fake_time(MonotonicInstant::from_nanos(1000));
474 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
475 assert!(executor.run_until_stalled(&mut fut).is_ready());
476 }
477
478 #[test]
480 fn stepwise_event() {
481 let mut executor = TestExecutor::new_with_fake_time();
482 let event = zx::Event::create();
483 let mut fut = pin!(OnSignals::new(&event, zx::Signals::USER_0));
484
485 let _ = executor.run_until_stalled(&mut fut);
486
487 event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
488 assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
489 }
490
491 #[test]
494 fn run_until_stalled_preserves_order() {
495 let mut executor = TestExecutor::new_with_fake_time();
496 let spawned_fut_completed = Arc::new(AtomicBool::new(false));
497 let spawned_fut_completed_writer = spawned_fut_completed.clone();
498 let spawned_fut = Box::pin(async move {
499 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
500 spawned_fut_completed_writer.store(true, Ordering::SeqCst);
501 });
502 let mut main_fut = pin!(async {
503 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
504 });
505 spawn(spawned_fut);
506 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
507 executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
508 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
511 assert_eq!(spawned_fut_completed.load(Ordering::SeqCst), true);
512 }
513
514 #[test]
515 fn task_destruction() {
516 struct DropSpawner {
517 dropped: Arc<AtomicBool>,
518 }
519 impl Drop for DropSpawner {
520 fn drop(&mut self) {
521 self.dropped.store(true, Ordering::SeqCst);
522 let dropped_clone = self.dropped.clone();
523 spawn(async {
524 let _dropped_clone = dropped_clone;
526 panic!("task spawned in drop shouldn't be polled");
527 });
528 }
529 }
530 let mut dropped = Arc::new(AtomicBool::new(false));
531 let drop_spawner = DropSpawner { dropped: dropped.clone() };
532 let mut executor = TestExecutor::new();
533 let mut main_fut = pin!(async move {
534 spawn(async move {
535 let _drop_spawner = drop_spawner;
537 future::pending::<()>().await;
538 });
539 });
540 assert!(executor.run_until_stalled(&mut main_fut).is_ready());
541 assert_eq!(
542 dropped.load(Ordering::SeqCst),
543 false,
544 "executor dropped pending task before destruction"
545 );
546
547 drop(executor);
550 let dropped = Arc::get_mut(&mut dropped)
551 .expect("someone else is unexpectedly still holding on to a reference");
552 assert_eq!(
553 dropped.load(Ordering::SeqCst),
554 true,
555 "executor did not drop pending task during destruction"
556 );
557 }
558
559 #[test]
560 fn time_now_real_time() {
561 let _executor = LocalExecutor::new();
562 let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
563 let t2 = MonotonicInstant::now().into_zx();
564 let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
565 assert!(t1 <= t2);
566 assert!(t2 <= t3);
567 }
568
569 #[test]
570 fn time_now_fake_time() {
571 let executor = TestExecutor::new_with_fake_time();
572 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
573 executor.set_fake_time(t1);
574 assert_eq!(MonotonicInstant::now(), t1);
575
576 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
577 executor.set_fake_time(t2);
578 assert_eq!(MonotonicInstant::now(), t2);
579 }
580
581 #[test]
582 fn time_now_fake_time_boot() {
583 let executor = TestExecutor::new_with_fake_time();
584 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
585 executor.set_fake_time(t1);
586 assert_eq!(MonotonicInstant::now(), t1);
587 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
588
589 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
590 executor.set_fake_time(t2);
591 assert_eq!(MonotonicInstant::now(), t2);
592 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
593
594 const TEST_BOOT_OFFSET: i64 = 42;
595
596 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
597 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
598 }
599
600 #[test]
601 fn time_boot_now() {
602 let executor = TestExecutor::new_with_fake_time();
603 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
604 executor.set_fake_time(t1);
605 assert_eq!(MonotonicInstant::now(), t1);
606 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
607
608 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
609 executor.set_fake_time(t2);
610 assert_eq!(MonotonicInstant::now(), t2);
611 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
612
613 const TEST_BOOT_OFFSET: i64 = 42;
614
615 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
616 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
617 }
618
619 #[test]
620 fn time_after_overflow() {
621 let executor = TestExecutor::new_with_fake_time();
622
623 executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
624 assert_eq!(
625 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
626 MonotonicInstant::INFINITE
627 );
628
629 executor.set_fake_time(
630 MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
631 );
632 assert_eq!(
633 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
634 MonotonicInstant::INFINITE_PAST
635 );
636 }
637
638 async fn multi_wake(n: usize) {
640 let mut done = false;
641 futures::future::poll_fn(|cx| {
642 if done {
643 return Poll::Ready(());
644 }
645 for _ in 1..n {
646 cx.waker().wake_by_ref()
647 }
648 done = true;
649 Poll::Pending
650 })
651 .await;
652 }
653
654 #[test]
655 fn test_boot_time_tracks_mono_time() {
656 const FAKE_TIME: i64 = 42;
657 let executor = TestExecutor::new_with_fake_time();
658 executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
659 assert_eq!(
660 BootInstant::from_nanos(FAKE_TIME),
661 executor.boot_now(),
662 "boot time should have advanced"
663 );
664
665 executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
667 assert_eq!(
668 BootInstant::from_nanos(2 * FAKE_TIME),
669 executor.boot_now(),
670 "boot time should have advanced again"
671 );
672 }
673
674 #[test]
677 fn many_wakeups() {
678 let mut executor = LocalExecutor::new();
679 executor.run_singlethreaded(multi_wake(4096 * 2));
680 }
681
682 fn advance_to_with(timer_duration: impl WakeupTime) {
683 let mut executor = TestExecutor::new_with_fake_time();
684 executor.set_fake_time(MonotonicInstant::from_nanos(0));
685
686 let mut fut = pin!(async {
687 let timer_fired = Arc::new(AtomicBool::new(false));
688 futures::join!(
689 async {
690 Timer::new(timer_duration).await;
692 timer_fired.store(true, Ordering::SeqCst);
693 },
694 async {
695 let mut fired = 0;
697 let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
698 while let Some(_) = interval.next().await {
699 fired += 1;
700 if fired == 3 {
701 break;
702 }
703 }
704 assert_eq!(fired, 3, "interval timer should have fired multiple times.");
705 },
706 async {
707 assert!(
708 !timer_fired.load(Ordering::SeqCst),
709 "the oneshot timer shouldn't be fired"
710 );
711 TestExecutor::advance_to(MonotonicInstant::after(
712 zx::MonotonicDuration::from_millis(500),
713 ))
714 .await;
715 assert!(
717 !timer_fired.load(Ordering::SeqCst),
718 "the oneshot timer shouldn't be fired"
719 );
720 TestExecutor::advance_to(MonotonicInstant::after(
721 zx::MonotonicDuration::from_millis(500),
722 ))
723 .await;
724
725 assert!(
726 timer_fired.load(Ordering::SeqCst),
727 "the oneshot timer should have fired"
728 );
729
730 TestExecutor::advance_to(MonotonicInstant::after(
732 zx::MonotonicDuration::from_seconds(2),
733 ))
734 .await;
735 }
736 )
737 });
738 assert!(executor.run_until_stalled(&mut fut).is_ready());
739 }
740
741 #[test]
742 fn test_advance_to() {
743 advance_to_with(zx::MonotonicDuration::from_seconds(1));
744 }
745
746 #[test]
747 fn test_advance_to_boot() {
748 advance_to_with(zx::BootDuration::from_seconds(1));
749 }
750}