fuchsia_async/runtime/fuchsia/executor/
local.rs1use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle, MAIN_TASK_ID};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use futures::future::{self, Either};
12use futures::task::AtomicWaker;
13use std::fmt;
14use std::future::{poll_fn, Future};
15use std::pin::pin;
16use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20pub struct LocalExecutor {
30 pub(crate) ehandle: EHandle,
33 }
35
36impl fmt::Debug for LocalExecutor {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
39 }
40}
41
42impl Default for LocalExecutor {
43 fn default() -> Self {
44 Self::new()
45 }
46}
47
48impl LocalExecutor {
49 pub fn new() -> Self {
51 Self::new_with_port(zx::Port::create())
52 }
53
54 pub fn new_with_port(port: zx::Port) -> Self {
56 let inner = Arc::new(Executor::new_with_port(
57 ExecutorTime::RealTime,
58 true,
59 1,
60 port,
61 ));
62 let root_scope = ScopeHandle::root(inner);
63 Executor::set_local(root_scope.clone());
64 Self { ehandle: EHandle { root_scope } }
65 }
66
67 pub fn port(&self) -> &zx::Port {
69 self.ehandle.port()
70 }
71
72 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
74 where
75 F: Future,
76 {
77 assert!(
78 self.ehandle.inner().is_real_time(),
79 "Error: called `run_singlethreaded` on an executor using fake time"
80 );
81
82 let Poll::Ready(result) = self.run::<false, _>(main_future) else {
83 unreachable!()
84 };
85 result
86 }
87
88 fn run<const UNTIL_STALLED: bool, Fut: Future>(
89 &mut self,
90 main_future: Fut,
91 ) -> Poll<Fut::Output> {
92 unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
96 std::mem::transmute(obj)
97 }
98
99 let scope = &self.ehandle.root_scope;
100 let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
101
102 unsafe {
105 scope.insert_task(remove_lifetime(task), false);
106 }
107
108 struct DropMainTask<'a>(&'a EHandle);
109 impl Drop for DropMainTask<'_> {
110 fn drop(&mut self) {
111 unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
114 }
115 }
116 let _drop_main_task = DropMainTask(&self.ehandle);
117
118 self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
119
120 unsafe {
123 self.ehandle.global_scope().poll_join_result(
124 MAIN_TASK_ID,
125 &mut Context::from_waker(&futures::task::noop_waker()),
126 )
127 }
128 }
129
130 #[doc(hidden)]
131 pub fn root_scope(&self) -> &ScopeHandle {
133 self.ehandle.global_scope()
134 }
135}
136
137impl Drop for LocalExecutor {
138 fn drop(&mut self) {
139 self.ehandle.inner().mark_done();
140 self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
141 }
142}
143
144pub struct TestExecutor {
149 local: LocalExecutor,
151}
152
153impl Default for TestExecutor {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159impl TestExecutor {
160 pub fn new() -> Self {
162 Self { local: LocalExecutor::new() }
163 }
164
165 pub fn new_with_port(port: zx::Port) -> Self {
167 Self { local: LocalExecutor::new_with_port(port) }
168 }
169
170 pub fn port(&self) -> &zx::Port {
172 self.local.port()
173 }
174
175 pub fn new_with_fake_time() -> Self {
177 let inner = Arc::new(Executor::new(
178 ExecutorTime::FakeTime {
179 mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
180 mono_to_boot_offset_ns: AtomicI64::new(0),
181 },
182 true,
183 1,
184 ));
185 let root_scope = ScopeHandle::root(inner);
186 Executor::set_local(root_scope.clone());
187 Self { local: LocalExecutor { ehandle: EHandle { root_scope } } }
188 }
189
190 pub fn now(&self) -> MonotonicInstant {
192 self.local.ehandle.inner().now()
193 }
194
195 pub fn boot_now(&self) -> BootInstant {
197 self.local.ehandle.inner().boot_now()
198 }
199
200 pub fn set_fake_time(&self, t: MonotonicInstant) {
206 self.local.ehandle.inner().set_fake_time(t)
207 }
208
209 pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
220 self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
221 }
222
223 pub fn global_scope(&self) -> &ScopeHandle {
225 self.local.root_scope()
226 }
227
228 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
230 where
231 F: Future,
232 {
233 self.local.run_singlethreaded(main_future)
234 }
235
236 pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
247 where
248 F: Future + Unpin,
249 {
250 let mut main_future = pin!(main_future);
251
252 struct Cleanup(Arc<Executor>);
254 impl Drop for Cleanup {
255 fn drop(&mut self) {
256 *self.0.owner_data.lock() = None;
257 }
258 }
259 let _cleanup = Cleanup(self.local.ehandle.inner().clone());
260 *self.local.ehandle.inner().owner_data.lock() =
261 Some(Box::new(UntilStalledData { watcher: None }));
262
263 loop {
264 let result = self.local.run::<true, _>(main_future.as_mut());
265 if result.is_ready() {
266 return result;
267 }
268
269 if let Some(watcher) = with_data(|data| data.watcher.take()) {
271 watcher.waker.wake();
272 watcher.done.store(true, Ordering::Relaxed);
275 } else {
276 break;
277 }
278 }
279
280 Poll::Pending
281 }
282
283 pub fn wake_expired_timers(&mut self) -> bool {
289 self.local.ehandle.inner().monotonic_timers().wake_timers()
290 || self.local.ehandle.inner().boot_timers().wake_timers()
291 }
292
293 pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
306 self.local.ehandle.inner().monotonic_timers().wake_next_timer()
307 }
308
309 pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
312 self.local.ehandle.inner().boot_timers().wake_next_timer()
313 }
314
315 pub fn next_timer() -> Option<MonotonicInstant> {
317 EHandle::local().inner().monotonic_timers().next_timer()
318 }
319
320 pub fn next_boot_timer() -> Option<BootInstant> {
322 EHandle::local().inner().boot_timers().next_timer()
323 }
324
325 pub async fn advance_to(time: MonotonicInstant) {
334 let ehandle = EHandle::local();
335 loop {
336 let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
337 if let Some(next_timer) = Self::next_timer() {
338 if next_timer <= time {
339 ehandle.inner().set_fake_time(next_timer);
340 continue;
341 }
342 }
343 ehandle.inner().set_fake_time(time);
344 break;
345 }
346 }
347
348 pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
372 let watcher =
373 Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
374
375 assert!(
376 with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
377 "Error: Another task has called `poll_until_stalled`."
378 );
379
380 struct Watcher(Arc<StalledWatcher>);
381
382 impl Drop for Watcher {
384 fn drop(&mut self) {
385 if !self.0.done.swap(true, Ordering::Relaxed) {
386 with_data(|data| data.watcher = None);
387 }
388 }
389 }
390
391 let watcher = Watcher(watcher);
392
393 let poll_fn = poll_fn(|cx: &mut Context<'_>| {
394 if watcher.0.done.load(Ordering::Relaxed) {
395 Poll::Ready(())
396 } else {
397 watcher.0.waker.register(cx.waker());
398 Poll::Pending
399 }
400 });
401 match future::select(poll_fn, fut).await {
402 Either::Left(_) => Poll::Pending,
403 Either::Right((value, _)) => Poll::Ready(value),
404 }
405 }
406}
407
408struct StalledWatcher {
409 waker: AtomicWaker,
410 done: AtomicBool,
411}
412
413struct UntilStalledData {
414 watcher: Option<Arc<StalledWatcher>>,
415}
416
417fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
423 const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
424 with TestExecutor::run_until_stalled";
425 f(EHandle::local()
426 .inner()
427 .owner_data
428 .lock()
429 .as_mut()
430 .expect(MESSAGE)
431 .downcast_mut::<UntilStalledData>()
432 .expect(MESSAGE))
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use crate::handle::on_signals::OnSignalsFuture;
439 use crate::{Interval, Timer, WakeupTime};
440 use assert_matches::assert_matches;
441 use futures::StreamExt;
442 use std::cell::{Cell, RefCell};
443 use std::rc::Rc;
444 use std::task::Waker;
445 use zx::{self as zx, AsHandleRef};
446
447 fn spawn(future: impl Future<Output = ()> + Send + 'static) {
448 crate::EHandle::local().spawn_detached(future);
449 }
450
451 #[test]
453 fn stepwise_two_steps() {
454 let fut_step = Rc::new(Cell::new(0));
455 let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
456 let fut_waker_clone = fut_waker.clone();
457 let fut_step_clone = fut_step.clone();
458 let fut_fn = move |cx: &mut Context<'_>| {
459 fut_waker_clone.borrow_mut().replace(cx.waker().clone());
460 match fut_step_clone.get() {
461 0 => {
462 fut_step_clone.set(1);
463 Poll::Pending
464 }
465 1 => {
466 fut_step_clone.set(2);
467 Poll::Ready(())
468 }
469 _ => panic!("future called after done"),
470 }
471 };
472 let fut = Box::new(future::poll_fn(fut_fn));
473 let mut executor = TestExecutor::new_with_fake_time();
474 executor.local.ehandle.spawn_local_detached(fut);
477 assert_eq!(fut_step.get(), 0);
478 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
479 assert_eq!(fut_step.get(), 1);
480
481 fut_waker.borrow_mut().take().unwrap().wake();
482 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
483 assert_eq!(fut_step.get(), 2);
484 }
485
486 #[test]
487 fn stepwise_timer() {
489 let mut executor = TestExecutor::new_with_fake_time();
490 executor.set_fake_time(MonotonicInstant::from_nanos(0));
491 let mut fut =
492 pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
493
494 let _ = executor.run_until_stalled(&mut fut);
495 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
496
497 executor.set_fake_time(MonotonicInstant::from_nanos(1000));
498 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
499 assert!(executor.run_until_stalled(&mut fut).is_ready());
500 }
501
502 #[test]
504 fn stepwise_event() {
505 let mut executor = TestExecutor::new_with_fake_time();
506 let event = zx::Event::create();
507 let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
508
509 let _ = executor.run_until_stalled(&mut fut);
510
511 event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
512 assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
513 }
514
515 #[test]
518 fn run_until_stalled_preserves_order() {
519 let mut executor = TestExecutor::new_with_fake_time();
520 let spawned_fut_completed = Arc::new(AtomicBool::new(false));
521 let spawned_fut_completed_writer = spawned_fut_completed.clone();
522 let spawned_fut = Box::pin(async move {
523 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
524 spawned_fut_completed_writer.store(true, Ordering::SeqCst);
525 });
526 let mut main_fut = pin!(async {
527 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
528 });
529 spawn(spawned_fut);
530 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
531 executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
532 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
535 assert!(spawned_fut_completed.load(Ordering::SeqCst));
536 }
537
538 #[test]
539 fn task_destruction() {
540 struct DropSpawner {
541 dropped: Arc<AtomicBool>,
542 }
543 impl Drop for DropSpawner {
544 fn drop(&mut self) {
545 self.dropped.store(true, Ordering::SeqCst);
546 let dropped_clone = self.dropped.clone();
547 spawn(async {
548 let _dropped_clone = dropped_clone;
550 panic!("task spawned in drop shouldn't be polled");
551 });
552 }
553 }
554 let mut dropped = Arc::new(AtomicBool::new(false));
555 let drop_spawner = DropSpawner { dropped: dropped.clone() };
556 let mut executor = TestExecutor::new();
557 let mut main_fut = pin!(async move {
558 spawn(async move {
559 let _drop_spawner = drop_spawner;
561 future::pending::<()>().await;
562 });
563 });
564 assert!(executor.run_until_stalled(&mut main_fut).is_ready());
565 assert!(
566 !dropped.load(Ordering::SeqCst),
567 "executor dropped pending task before destruction"
568 );
569
570 drop(executor);
573 let dropped = Arc::get_mut(&mut dropped)
574 .expect("someone else is unexpectedly still holding on to a reference");
575 assert!(
576 dropped.load(Ordering::SeqCst),
577 "executor did not drop pending task during destruction"
578 );
579 }
580
581 #[test]
582 fn time_now_real_time() {
583 let _executor = LocalExecutor::new();
584 let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
585 let t2 = MonotonicInstant::now().into_zx();
586 let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
587 assert!(t1 <= t2);
588 assert!(t2 <= t3);
589 }
590
591 #[test]
592 fn time_now_fake_time() {
593 let executor = TestExecutor::new_with_fake_time();
594 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
595 executor.set_fake_time(t1);
596 assert_eq!(MonotonicInstant::now(), t1);
597
598 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
599 executor.set_fake_time(t2);
600 assert_eq!(MonotonicInstant::now(), t2);
601 }
602
603 #[test]
604 fn time_now_fake_time_boot() {
605 let executor = TestExecutor::new_with_fake_time();
606 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
607 executor.set_fake_time(t1);
608 assert_eq!(MonotonicInstant::now(), t1);
609 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
610
611 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
612 executor.set_fake_time(t2);
613 assert_eq!(MonotonicInstant::now(), t2);
614 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
615
616 const TEST_BOOT_OFFSET: i64 = 42;
617
618 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
619 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
620 }
621
622 #[test]
623 fn time_boot_now() {
624 let executor = TestExecutor::new_with_fake_time();
625 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
626 executor.set_fake_time(t1);
627 assert_eq!(MonotonicInstant::now(), t1);
628 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
629
630 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
631 executor.set_fake_time(t2);
632 assert_eq!(MonotonicInstant::now(), t2);
633 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
634
635 const TEST_BOOT_OFFSET: i64 = 42;
636
637 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
638 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
639 }
640
641 #[test]
642 fn time_after_overflow() {
643 let executor = TestExecutor::new_with_fake_time();
644
645 executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
646 assert_eq!(
647 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
648 MonotonicInstant::INFINITE
649 );
650
651 executor.set_fake_time(
652 MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
653 );
654 assert_eq!(
655 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
656 MonotonicInstant::INFINITE_PAST
657 );
658 }
659
660 async fn multi_wake(n: usize) {
662 let mut done = false;
663 futures::future::poll_fn(|cx| {
664 if done {
665 return Poll::Ready(());
666 }
667 for _ in 1..n {
668 cx.waker().wake_by_ref()
669 }
670 done = true;
671 Poll::Pending
672 })
673 .await;
674 }
675
676 #[test]
677 fn test_boot_time_tracks_mono_time() {
678 const FAKE_TIME: i64 = 42;
679 let executor = TestExecutor::new_with_fake_time();
680 executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
681 assert_eq!(
682 BootInstant::from_nanos(FAKE_TIME),
683 executor.boot_now(),
684 "boot time should have advanced"
685 );
686
687 executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
689 assert_eq!(
690 BootInstant::from_nanos(2 * FAKE_TIME),
691 executor.boot_now(),
692 "boot time should have advanced again"
693 );
694 }
695
696 #[test]
699 fn many_wakeups() {
700 let mut executor = LocalExecutor::new();
701 executor.run_singlethreaded(multi_wake(4096 * 2));
702 }
703
704 fn advance_to_with(timer_duration: impl WakeupTime) {
705 let mut executor = TestExecutor::new_with_fake_time();
706 executor.set_fake_time(MonotonicInstant::from_nanos(0));
707
708 let mut fut = pin!(async {
709 let timer_fired = Arc::new(AtomicBool::new(false));
710 futures::join!(
711 async {
712 Timer::new(timer_duration).await;
714 timer_fired.store(true, Ordering::SeqCst);
715 },
716 async {
717 let mut fired = 0;
719 let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
720 while interval.next().await.is_some() {
721 fired += 1;
722 if fired == 3 {
723 break;
724 }
725 }
726 assert_eq!(fired, 3, "interval timer should have fired multiple times.");
727 },
728 async {
729 assert!(
730 !timer_fired.load(Ordering::SeqCst),
731 "the oneshot timer shouldn't be fired"
732 );
733 TestExecutor::advance_to(MonotonicInstant::after(
734 zx::MonotonicDuration::from_millis(500),
735 ))
736 .await;
737 assert!(
739 !timer_fired.load(Ordering::SeqCst),
740 "the oneshot timer shouldn't be fired"
741 );
742 TestExecutor::advance_to(MonotonicInstant::after(
743 zx::MonotonicDuration::from_millis(500),
744 ))
745 .await;
746
747 assert!(
748 timer_fired.load(Ordering::SeqCst),
749 "the oneshot timer should have fired"
750 );
751
752 TestExecutor::advance_to(MonotonicInstant::after(
754 zx::MonotonicDuration::from_seconds(2),
755 ))
756 .await;
757 }
758 )
759 });
760 assert!(executor.run_until_stalled(&mut fut).is_ready());
761 }
762
763 #[test]
764 fn test_advance_to() {
765 advance_to_with(zx::MonotonicDuration::from_seconds(1));
766 }
767
768 #[test]
769 fn test_advance_to_boot() {
770 advance_to_with(zx::BootDuration::from_seconds(1));
771 }
772}