fuchsia_async/runtime/fuchsia/executor/
local.rs1use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle, MAIN_TASK_ID};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{poll_fn, Future};
16use std::pin::pin;
17use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21pub struct LocalExecutor {
31 pub(crate) ehandle: EHandle,
34 }
36
37impl fmt::Debug for LocalExecutor {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40 }
41}
42
43impl Default for LocalExecutor {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl LocalExecutor {
50 pub fn new() -> Self {
52 Self::new_with_port(zx::Port::create(), None)
53 }
54
55 pub(crate) fn new_with_port(
58 port: zx::Port,
59 instrument: Option<Arc<dyn TaskInstrument>>,
60 ) -> Self {
61 let inner = Arc::new(Executor::new_with_port(
62 ExecutorTime::RealTime,
63 true,
64 1,
65 port,
66 instrument,
67 ));
68 let root_scope = ScopeHandle::root(inner);
69 Executor::set_local(root_scope.clone());
70 Self { ehandle: EHandle { root_scope } }
71 }
72
73 pub fn port(&self) -> &zx::Port {
75 self.ehandle.port()
76 }
77
78 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
80 where
81 F: Future,
82 {
83 assert!(
84 self.ehandle.inner().is_real_time(),
85 "Error: called `run_singlethreaded` on an executor using fake time"
86 );
87
88 let Poll::Ready(result) = self.run::<false, _>(main_future) else {
89 unreachable!()
90 };
91 result
92 }
93
94 fn run<const UNTIL_STALLED: bool, Fut: Future>(
95 &mut self,
96 main_future: Fut,
97 ) -> Poll<Fut::Output> {
98 unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
102 std::mem::transmute(obj)
103 }
104
105 let scope = &self.ehandle.root_scope;
106 let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
107
108 unsafe {
111 scope.insert_task(remove_lifetime(task), false);
112 }
113
114 struct DropMainTask<'a>(&'a EHandle);
115 impl Drop for DropMainTask<'_> {
116 fn drop(&mut self) {
117 unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
120 }
121 }
122 let _drop_main_task = DropMainTask(&self.ehandle);
123
124 self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
125
126 unsafe {
129 self.ehandle.global_scope().poll_join_result(
130 MAIN_TASK_ID,
131 &mut Context::from_waker(&futures::task::noop_waker()),
132 )
133 }
134 }
135
136 #[doc(hidden)]
137 pub fn root_scope(&self) -> &ScopeHandle {
139 self.ehandle.global_scope()
140 }
141}
142
143impl Drop for LocalExecutor {
144 fn drop(&mut self) {
145 self.ehandle.inner().mark_done();
146 self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
147 }
148}
149
150#[derive(Default)]
152pub struct LocalExecutorBuilder {
153 port: Option<zx::Port>,
154 instrument: Option<Arc<dyn TaskInstrument>>,
155}
156
157impl LocalExecutorBuilder {
158 pub fn new() -> Self {
160 Self::default()
161 }
162
163 pub fn port(mut self, port: zx::Port) -> Self {
165 self.port = Some(port);
166 self
167 }
168
169 pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
171 self.instrument = instrument;
172 self
173 }
174
175 pub fn build(self) -> LocalExecutor {
177 match self.port {
178 Some(port) => LocalExecutor::new_with_port(port, self.instrument),
179 None => LocalExecutor::new(),
180 }
181 }
182}
183
184pub struct TestExecutor {
189 local: LocalExecutor,
191}
192
193impl Default for TestExecutor {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199impl TestExecutor {
200 pub fn new() -> Self {
202 Self::builder().build()
203 }
204
205 pub fn new_with_fake_time() -> Self {
207 Self::builder().fake_time(true).build()
208 }
209
210 pub fn builder() -> TestExecutorBuilder {
212 TestExecutorBuilder::new()
213 }
214
215 pub fn port(&self) -> &zx::Port {
217 self.local.port()
218 }
219
220 pub fn now(&self) -> MonotonicInstant {
222 self.local.ehandle.inner().now()
223 }
224
225 pub fn boot_now(&self) -> BootInstant {
227 self.local.ehandle.inner().boot_now()
228 }
229
230 pub fn set_fake_time(&self, t: MonotonicInstant) {
236 self.local.ehandle.inner().set_fake_time(t)
237 }
238
239 pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
250 self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
251 }
252
253 pub fn global_scope(&self) -> &ScopeHandle {
255 self.local.root_scope()
256 }
257
258 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
260 where
261 F: Future,
262 {
263 self.local.run_singlethreaded(main_future)
264 }
265
266 pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
277 where
278 F: Future + Unpin,
279 {
280 let mut main_future = pin!(main_future);
281
282 struct Cleanup(Arc<Executor>);
284 impl Drop for Cleanup {
285 fn drop(&mut self) {
286 *self.0.owner_data.lock() = None;
287 }
288 }
289 let _cleanup = Cleanup(self.local.ehandle.inner().clone());
290 *self.local.ehandle.inner().owner_data.lock() =
291 Some(Box::new(UntilStalledData { watcher: None }));
292
293 loop {
294 let result = self.local.run::<true, _>(main_future.as_mut());
295 if result.is_ready() {
296 return result;
297 }
298
299 if let Some(watcher) = with_data(|data| data.watcher.take()) {
301 watcher.waker.wake();
302 watcher.done.store(true, Ordering::Relaxed);
305 } else {
306 break;
307 }
308 }
309
310 Poll::Pending
311 }
312
313 pub fn wake_expired_timers(&mut self) -> bool {
319 self.local.ehandle.inner().monotonic_timers().wake_timers()
320 || self.local.ehandle.inner().boot_timers().wake_timers()
321 }
322
323 pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
336 self.local.ehandle.inner().monotonic_timers().wake_next_timer()
337 }
338
339 pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
342 self.local.ehandle.inner().boot_timers().wake_next_timer()
343 }
344
345 pub fn next_timer() -> Option<MonotonicInstant> {
347 EHandle::local().inner().monotonic_timers().next_timer()
348 }
349
350 pub fn next_boot_timer() -> Option<BootInstant> {
352 EHandle::local().inner().boot_timers().next_timer()
353 }
354
355 pub async fn advance_to(time: MonotonicInstant) {
364 let ehandle = EHandle::local();
365 loop {
366 let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
367 if let Some(next_timer) = Self::next_timer() {
368 if next_timer <= time {
369 ehandle.inner().set_fake_time(next_timer);
370 continue;
371 }
372 }
373 ehandle.inner().set_fake_time(time);
374 break;
375 }
376 }
377
378 pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
402 let watcher =
403 Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
404
405 assert!(
406 with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
407 "Error: Another task has called `poll_until_stalled`."
408 );
409
410 struct Watcher(Arc<StalledWatcher>);
411
412 impl Drop for Watcher {
414 fn drop(&mut self) {
415 if !self.0.done.swap(true, Ordering::Relaxed) {
416 with_data(|data| data.watcher = None);
417 }
418 }
419 }
420
421 let watcher = Watcher(watcher);
422
423 let poll_fn = poll_fn(|cx: &mut Context<'_>| {
424 if watcher.0.done.load(Ordering::Relaxed) {
425 Poll::Ready(())
426 } else {
427 watcher.0.waker.register(cx.waker());
428 Poll::Pending
429 }
430 });
431 match future::select(poll_fn, fut).await {
432 Either::Left(_) => Poll::Pending,
433 Either::Right((value, _)) => Poll::Ready(value),
434 }
435 }
436}
437
438#[derive(Default)]
440pub struct TestExecutorBuilder {
441 port: Option<zx::Port>,
442 fake_time: bool,
443 instrument: Option<Arc<dyn TaskInstrument>>,
444}
445
446impl TestExecutorBuilder {
447 pub fn new() -> Self {
449 Self::default()
450 }
451
452 pub fn port(mut self, port: zx::Port) -> Self {
454 self.port = Some(port);
455 self
456 }
457
458 pub fn fake_time(mut self, fake_time: bool) -> Self {
460 self.fake_time = fake_time;
461 self
462 }
463
464 pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
466 self.instrument = Some(instrument);
467 self
468 }
469
470 pub fn build(self) -> TestExecutor {
472 let time = if self.fake_time {
473 ExecutorTime::FakeTime {
474 mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
475 mono_to_boot_offset_ns: AtomicI64::new(0),
476 }
477 } else {
478 ExecutorTime::RealTime
479 };
480 let port = self.port.unwrap_or_else(zx::Port::create);
481 let inner = Arc::new(Executor::new_with_port(
482 time,
483 true,
484 1,
485 port,
486 self.instrument,
487 ));
488 let root_scope = ScopeHandle::root(inner);
489 Executor::set_local(root_scope.clone());
490 let local = LocalExecutor { ehandle: EHandle { root_scope } };
491 TestExecutor { local }
492 }
493}
494
495struct StalledWatcher {
496 waker: AtomicWaker,
497 done: AtomicBool,
498}
499
500struct UntilStalledData {
501 watcher: Option<Arc<StalledWatcher>>,
502}
503
504fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
510 const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
511 with TestExecutor::run_until_stalled";
512 f(EHandle::local()
513 .inner()
514 .owner_data
515 .lock()
516 .as_mut()
517 .expect(MESSAGE)
518 .downcast_mut::<UntilStalledData>()
519 .expect(MESSAGE))
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525 use crate::handle::on_signals::OnSignalsFuture;
526 use crate::{Interval, Timer, WakeupTime};
527 use assert_matches::assert_matches;
528 use futures::StreamExt;
529 use std::cell::{Cell, RefCell};
530 use std::rc::Rc;
531 use std::task::Waker;
532 use zx::{self as zx, AsHandleRef};
533
534 fn spawn(future: impl Future<Output = ()> + Send + 'static) {
535 crate::EHandle::local().spawn_detached(future);
536 }
537
538 #[test]
540 fn stepwise_two_steps() {
541 let fut_step = Rc::new(Cell::new(0));
542 let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
543 let fut_waker_clone = fut_waker.clone();
544 let fut_step_clone = fut_step.clone();
545 let fut_fn = move |cx: &mut Context<'_>| {
546 fut_waker_clone.borrow_mut().replace(cx.waker().clone());
547 match fut_step_clone.get() {
548 0 => {
549 fut_step_clone.set(1);
550 Poll::Pending
551 }
552 1 => {
553 fut_step_clone.set(2);
554 Poll::Ready(())
555 }
556 _ => panic!("future called after done"),
557 }
558 };
559 let fut = Box::new(future::poll_fn(fut_fn));
560 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
561 executor.local.ehandle.spawn_local_detached(fut);
564 assert_eq!(fut_step.get(), 0);
565 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
566 assert_eq!(fut_step.get(), 1);
567
568 fut_waker.borrow_mut().take().unwrap().wake();
569 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
570 assert_eq!(fut_step.get(), 2);
571 }
572
573 #[test]
574 fn stepwise_timer() {
576 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
577 executor.set_fake_time(MonotonicInstant::from_nanos(0));
578 let mut fut =
579 pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
580
581 let _ = executor.run_until_stalled(&mut fut);
582 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
583
584 executor.set_fake_time(MonotonicInstant::from_nanos(1000));
585 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
586 assert!(executor.run_until_stalled(&mut fut).is_ready());
587 }
588
589 #[test]
591 fn stepwise_event() {
592 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
593 let event = zx::Event::create();
594 let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
595
596 let _ = executor.run_until_stalled(&mut fut);
597
598 event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
599 assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
600 }
601
602 #[test]
605 fn run_until_stalled_preserves_order() {
606 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
607 let spawned_fut_completed = Arc::new(AtomicBool::new(false));
608 let spawned_fut_completed_writer = spawned_fut_completed.clone();
609 let spawned_fut = Box::pin(async move {
610 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
611 spawned_fut_completed_writer.store(true, Ordering::SeqCst);
612 });
613 let mut main_fut = pin!(async {
614 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
615 });
616 spawn(spawned_fut);
617 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
618 executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
619 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
622 assert!(spawned_fut_completed.load(Ordering::SeqCst));
623 }
624
625 #[test]
626 fn task_destruction() {
627 struct DropSpawner {
628 dropped: Arc<AtomicBool>,
629 }
630 impl Drop for DropSpawner {
631 fn drop(&mut self) {
632 self.dropped.store(true, Ordering::SeqCst);
633 let dropped_clone = self.dropped.clone();
634 spawn(async {
635 let _dropped_clone = dropped_clone;
637 panic!("task spawned in drop shouldn't be polled");
638 });
639 }
640 }
641 let mut dropped = Arc::new(AtomicBool::new(false));
642 let drop_spawner = DropSpawner { dropped: dropped.clone() };
643 let mut executor = TestExecutorBuilder::new().build();
644 let mut main_fut = pin!(async move {
645 spawn(async move {
646 let _drop_spawner = drop_spawner;
648 future::pending::<()>().await;
649 });
650 });
651 assert!(executor.run_until_stalled(&mut main_fut).is_ready());
652 assert!(
653 !dropped.load(Ordering::SeqCst),
654 "executor dropped pending task before destruction"
655 );
656
657 drop(executor);
660 let dropped = Arc::get_mut(&mut dropped)
661 .expect("someone else is unexpectedly still holding on to a reference");
662 assert!(
663 dropped.load(Ordering::SeqCst),
664 "executor did not drop pending task during destruction"
665 );
666 }
667
668 #[test]
669 fn time_now_real_time() {
670 let _executor = LocalExecutorBuilder::new().build();
671 let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
672 let t2 = MonotonicInstant::now().into_zx();
673 let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
674 assert!(t1 <= t2);
675 assert!(t2 <= t3);
676 }
677
678 #[test]
679 fn time_now_fake_time() {
680 let executor = TestExecutorBuilder::new().fake_time(true).build();
681 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
682 executor.set_fake_time(t1);
683 assert_eq!(MonotonicInstant::now(), t1);
684
685 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
686 executor.set_fake_time(t2);
687 assert_eq!(MonotonicInstant::now(), t2);
688 }
689
690 #[test]
691 fn time_now_fake_time_boot() {
692 let executor = TestExecutorBuilder::new().fake_time(true).build();
693 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
694 executor.set_fake_time(t1);
695 assert_eq!(MonotonicInstant::now(), t1);
696 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
697
698 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
699 executor.set_fake_time(t2);
700 assert_eq!(MonotonicInstant::now(), t2);
701 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
702
703 const TEST_BOOT_OFFSET: i64 = 42;
704
705 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
706 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
707 }
708
709 #[test]
710 fn time_boot_now() {
711 let executor = TestExecutorBuilder::new().fake_time(true).build();
712 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
713 executor.set_fake_time(t1);
714 assert_eq!(MonotonicInstant::now(), t1);
715 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
716
717 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
718 executor.set_fake_time(t2);
719 assert_eq!(MonotonicInstant::now(), t2);
720 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
721
722 const TEST_BOOT_OFFSET: i64 = 42;
723
724 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
725 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
726 }
727
728 #[test]
729 fn time_after_overflow() {
730 let executor = TestExecutorBuilder::new().fake_time(true).build();
731
732 executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
733 assert_eq!(
734 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
735 MonotonicInstant::INFINITE
736 );
737
738 executor.set_fake_time(
739 MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
740 );
741 assert_eq!(
742 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
743 MonotonicInstant::INFINITE_PAST
744 );
745 }
746
747 async fn multi_wake(n: usize) {
749 let mut done = false;
750 futures::future::poll_fn(|cx| {
751 if done {
752 return Poll::Ready(());
753 }
754 for _ in 1..n {
755 cx.waker().wake_by_ref()
756 }
757 done = true;
758 Poll::Pending
759 })
760 .await;
761 }
762
763 #[test]
764 fn test_boot_time_tracks_mono_time() {
765 const FAKE_TIME: i64 = 42;
766 let executor = TestExecutorBuilder::new().fake_time(true).build();
767 executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
768 assert_eq!(
769 BootInstant::from_nanos(FAKE_TIME),
770 executor.boot_now(),
771 "boot time should have advanced"
772 );
773
774 executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
776 assert_eq!(
777 BootInstant::from_nanos(2 * FAKE_TIME),
778 executor.boot_now(),
779 "boot time should have advanced again"
780 );
781 }
782
783 #[test]
786 fn many_wakeups() {
787 let mut executor = LocalExecutorBuilder::new().build();
788 executor.run_singlethreaded(multi_wake(4096 * 2));
789 }
790
791 fn advance_to_with(timer_duration: impl WakeupTime) {
792 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
793 executor.set_fake_time(MonotonicInstant::from_nanos(0));
794
795 let mut fut = pin!(async {
796 let timer_fired = Arc::new(AtomicBool::new(false));
797 futures::join!(
798 async {
799 Timer::new(timer_duration).await;
801 timer_fired.store(true, Ordering::SeqCst);
802 },
803 async {
804 let mut fired = 0;
806 let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
807 while interval.next().await.is_some() {
808 fired += 1;
809 if fired == 3 {
810 break;
811 }
812 }
813 assert_eq!(fired, 3, "interval timer should have fired multiple times.");
814 },
815 async {
816 assert!(
817 !timer_fired.load(Ordering::SeqCst),
818 "the oneshot timer shouldn't be fired"
819 );
820 TestExecutor::advance_to(MonotonicInstant::after(
821 zx::MonotonicDuration::from_millis(500),
822 ))
823 .await;
824 assert!(
826 !timer_fired.load(Ordering::SeqCst),
827 "the oneshot timer shouldn't be fired"
828 );
829 TestExecutor::advance_to(MonotonicInstant::after(
830 zx::MonotonicDuration::from_millis(500),
831 ))
832 .await;
833
834 assert!(
835 timer_fired.load(Ordering::SeqCst),
836 "the oneshot timer should have fired"
837 );
838
839 TestExecutor::advance_to(MonotonicInstant::after(
841 zx::MonotonicDuration::from_seconds(2),
842 ))
843 .await;
844 }
845 )
846 });
847 assert!(executor.run_until_stalled(&mut fut).is_ready());
848 }
849
850 #[test]
851 fn test_advance_to() {
852 advance_to_with(zx::MonotonicDuration::from_seconds(1));
853 }
854
855 #[test]
856 fn test_advance_to_boot() {
857 advance_to_with(zx::BootDuration::from_seconds(1));
858 }
859}