fuchsia_async/runtime/fuchsia/executor/
local.rs1use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, MAIN_TASK_ID, TaskHandle};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{Future, poll_fn};
16use std::pin::pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
19use std::task::{Context, Poll};
20
21pub struct LocalExecutor {
31 pub(crate) ehandle: EHandle,
34 }
36
37impl fmt::Debug for LocalExecutor {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40 }
41}
42
43impl Default for LocalExecutor {
44 fn default() -> Self {
46 Self::new_with_port(zx::Port::create(), None)
47 }
48}
49
50impl LocalExecutor {
51 pub(crate) fn new_with_port(
54 port: zx::Port,
55 instrument: Option<Arc<dyn TaskInstrument>>,
56 ) -> Self {
57 let inner = Arc::new(Executor::new_with_port(
58 ExecutorTime::RealTime,
59 true,
60 1,
61 port,
62 instrument,
63 ));
64 let root_scope = ScopeHandle::root(inner);
65 Executor::set_local(root_scope.clone());
66 Self { ehandle: EHandle { root_scope } }
67 }
68
69 pub fn port(&self) -> &zx::Port {
71 self.ehandle.port()
72 }
73
74 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
76 where
77 F: Future,
78 {
79 assert!(
80 self.ehandle.inner().is_real_time(),
81 "Error: called `run_singlethreaded` on an executor using fake time"
82 );
83
84 let Poll::Ready(result) = self.run::<false, _>(main_future) else {
85 unreachable!()
86 };
87 result
88 }
89
90 fn run<const UNTIL_STALLED: bool, Fut: Future>(
91 &mut self,
92 main_future: Fut,
93 ) -> Poll<Fut::Output> {
94 unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
98 unsafe { std::mem::transmute(obj) }
99 }
100
101 let scope = &self.ehandle.root_scope;
102 let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
103
104 unsafe {
107 scope.insert_task(remove_lifetime(task), false);
108 }
109
110 struct DropMainTask<'a>(&'a EHandle);
111 impl Drop for DropMainTask<'_> {
112 fn drop(&mut self) {
113 unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
116 }
117 }
118 let _drop_main_task = DropMainTask(&self.ehandle);
119
120 self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
121
122 unsafe {
125 self.ehandle.global_scope().poll_join_result(
126 MAIN_TASK_ID,
127 &mut Context::from_waker(&futures::task::noop_waker()),
128 )
129 }
130 }
131
132 #[doc(hidden)]
133 pub fn root_scope(&self) -> &ScopeHandle {
135 self.ehandle.global_scope()
136 }
137}
138
139impl Drop for LocalExecutor {
140 fn drop(&mut self) {
141 self.ehandle.inner().mark_done();
142 self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
143 }
144}
145
146#[derive(Default)]
148pub struct LocalExecutorBuilder {
149 port: Option<zx::Port>,
150 instrument: Option<Arc<dyn TaskInstrument>>,
151}
152
153impl LocalExecutorBuilder {
154 pub fn new() -> Self {
156 Self::default()
157 }
158
159 pub fn port(mut self, port: zx::Port) -> Self {
161 self.port = Some(port);
162 self
163 }
164
165 pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
167 self.instrument = instrument;
168 self
169 }
170
171 pub fn build(self) -> LocalExecutor {
173 match self.port {
174 Some(port) => LocalExecutor::new_with_port(port, self.instrument),
175 None => LocalExecutor::default(),
176 }
177 }
178}
179
180pub struct TestExecutor {
185 local: LocalExecutor,
187}
188
189impl Default for TestExecutor {
190 fn default() -> Self {
191 Self::new()
192 }
193}
194
195impl TestExecutor {
196 pub fn new() -> Self {
198 Self::builder().build()
199 }
200
201 pub fn new_with_fake_time() -> Self {
203 Self::builder().fake_time(true).build()
204 }
205
206 pub fn builder() -> TestExecutorBuilder {
208 TestExecutorBuilder::new()
209 }
210
211 pub fn port(&self) -> &zx::Port {
213 self.local.port()
214 }
215
216 pub fn now(&self) -> MonotonicInstant {
218 self.local.ehandle.inner().now()
219 }
220
221 pub fn boot_now(&self) -> BootInstant {
223 self.local.ehandle.inner().boot_now()
224 }
225
226 pub fn set_fake_time(&self, t: MonotonicInstant) {
232 self.local.ehandle.inner().set_fake_time(t)
233 }
234
235 pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
246 self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
247 }
248
249 pub fn global_scope(&self) -> &ScopeHandle {
251 self.local.root_scope()
252 }
253
254 pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
256 where
257 F: Future,
258 {
259 self.local.run_singlethreaded(main_future)
260 }
261
262 pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
273 where
274 F: Future + Unpin,
275 {
276 let mut main_future = pin!(main_future);
277
278 struct Cleanup(Arc<Executor>);
280 impl Drop for Cleanup {
281 fn drop(&mut self) {
282 *self.0.owner_data.lock() = None;
283 }
284 }
285 let _cleanup = Cleanup(self.local.ehandle.inner().clone());
286 *self.local.ehandle.inner().owner_data.lock() =
287 Some(Box::new(UntilStalledData { watcher: None }));
288
289 loop {
290 let result = self.local.run::<true, _>(main_future.as_mut());
291 if result.is_ready() {
292 return result;
293 }
294
295 if let Some(watcher) = with_data(|data| data.watcher.take()) {
297 watcher.waker.wake();
298 watcher.done.store(true, Ordering::Relaxed);
301 } else {
302 break;
303 }
304 }
305
306 Poll::Pending
307 }
308
309 pub fn wake_expired_timers(&mut self) -> bool {
315 self.local.ehandle.inner().monotonic_timers().wake_timers()
316 || self.local.ehandle.inner().boot_timers().wake_timers()
317 }
318
319 pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
332 self.local.ehandle.inner().monotonic_timers().wake_next_timer()
333 }
334
335 pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
338 self.local.ehandle.inner().boot_timers().wake_next_timer()
339 }
340
341 pub fn next_timer() -> Option<MonotonicInstant> {
343 EHandle::local().inner().monotonic_timers().next_timer()
344 }
345
346 pub fn next_boot_timer() -> Option<BootInstant> {
348 EHandle::local().inner().boot_timers().next_timer()
349 }
350
351 pub async fn advance_to(time: MonotonicInstant) {
360 let ehandle = EHandle::local();
361 loop {
362 let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
363 if let Some(next_timer) = Self::next_timer()
364 && next_timer <= time
365 {
366 ehandle.inner().set_fake_time(next_timer);
367 continue;
368 }
369 ehandle.inner().set_fake_time(time);
370 break;
371 }
372 }
373
374 pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
398 let watcher =
399 Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
400
401 assert!(
402 with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
403 "Error: Another task has called `poll_until_stalled`."
404 );
405
406 struct Watcher(Arc<StalledWatcher>);
407
408 impl Drop for Watcher {
410 fn drop(&mut self) {
411 if !self.0.done.swap(true, Ordering::Relaxed) {
412 with_data(|data| data.watcher = None);
413 }
414 }
415 }
416
417 let watcher = Watcher(watcher);
418
419 let poll_fn = poll_fn(|cx: &mut Context<'_>| {
420 if watcher.0.done.load(Ordering::Relaxed) {
421 Poll::Ready(())
422 } else {
423 watcher.0.waker.register(cx.waker());
424 Poll::Pending
425 }
426 });
427 match future::select(poll_fn, fut).await {
428 Either::Left(_) => Poll::Pending,
429 Either::Right((value, _)) => Poll::Ready(value),
430 }
431 }
432}
433
434#[derive(Default)]
436pub struct TestExecutorBuilder {
437 port: Option<zx::Port>,
438 fake_time: bool,
439 instrument: Option<Arc<dyn TaskInstrument>>,
440}
441
442impl TestExecutorBuilder {
443 pub fn new() -> Self {
445 Self::default()
446 }
447
448 pub fn port(mut self, port: zx::Port) -> Self {
450 self.port = Some(port);
451 self
452 }
453
454 pub fn fake_time(mut self, fake_time: bool) -> Self {
456 self.fake_time = fake_time;
457 self
458 }
459
460 pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
462 self.instrument = Some(instrument);
463 self
464 }
465
466 pub fn build(self) -> TestExecutor {
468 let time = if self.fake_time {
469 ExecutorTime::FakeTime {
470 mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
471 mono_to_boot_offset_ns: AtomicI64::new(0),
472 }
473 } else {
474 ExecutorTime::RealTime
475 };
476 let port = self.port.unwrap_or_else(zx::Port::create);
477 let inner = Arc::new(Executor::new_with_port(
478 time,
479 true,
480 1,
481 port,
482 self.instrument,
483 ));
484 let root_scope = ScopeHandle::root(inner);
485 Executor::set_local(root_scope.clone());
486 let local = LocalExecutor { ehandle: EHandle { root_scope } };
487 TestExecutor { local }
488 }
489}
490
491struct StalledWatcher {
492 waker: AtomicWaker,
493 done: AtomicBool,
494}
495
496struct UntilStalledData {
497 watcher: Option<Arc<StalledWatcher>>,
498}
499
500fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
506 const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
507 with TestExecutor::run_until_stalled";
508 f(EHandle::local()
509 .inner()
510 .owner_data
511 .lock()
512 .as_mut()
513 .expect(MESSAGE)
514 .downcast_mut::<UntilStalledData>()
515 .expect(MESSAGE))
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::handle::on_signals::OnSignalsFuture;
522 use crate::{Interval, Timer, WakeupTime};
523 use assert_matches::assert_matches;
524 use futures::StreamExt;
525 use std::cell::{Cell, RefCell};
526 use std::rc::Rc;
527 use std::task::Waker;
528
529 fn spawn(future: impl Future<Output = ()> + Send + 'static) {
530 crate::EHandle::local().spawn_detached(future);
531 }
532
533 #[test]
535 fn stepwise_two_steps() {
536 let fut_step = Rc::new(Cell::new(0));
537 let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
538 let fut_waker_clone = fut_waker.clone();
539 let fut_step_clone = fut_step.clone();
540 let fut_fn = move |cx: &mut Context<'_>| {
541 fut_waker_clone.borrow_mut().replace(cx.waker().clone());
542 match fut_step_clone.get() {
543 0 => {
544 fut_step_clone.set(1);
545 Poll::Pending
546 }
547 1 => {
548 fut_step_clone.set(2);
549 Poll::Ready(())
550 }
551 _ => panic!("future called after done"),
552 }
553 };
554 let fut = Box::new(future::poll_fn(fut_fn));
555 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
556 executor.local.ehandle.spawn_local_detached(fut);
559 assert_eq!(fut_step.get(), 0);
560 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
561 assert_eq!(fut_step.get(), 1);
562
563 fut_waker.borrow_mut().take().unwrap().wake();
564 assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
565 assert_eq!(fut_step.get(), 2);
566 }
567
568 #[test]
569 fn stepwise_timer() {
571 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
572 executor.set_fake_time(MonotonicInstant::from_nanos(0));
573 let mut fut =
574 pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
575
576 let _ = executor.run_until_stalled(&mut fut);
577 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
578
579 executor.set_fake_time(MonotonicInstant::from_nanos(1000));
580 assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
581 assert!(executor.run_until_stalled(&mut fut).is_ready());
582 }
583
584 #[test]
586 fn stepwise_event() {
587 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
588 let event = zx::Event::create();
589 let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
590
591 let _ = executor.run_until_stalled(&mut fut);
592
593 event.signal(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
594 assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
595 }
596
597 #[test]
600 fn run_until_stalled_preserves_order() {
601 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
602 let spawned_fut_completed = Arc::new(AtomicBool::new(false));
603 let spawned_fut_completed_writer = spawned_fut_completed.clone();
604 let spawned_fut = Box::pin(async move {
605 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
606 spawned_fut_completed_writer.store(true, Ordering::SeqCst);
607 });
608 let mut main_fut = pin!(async {
609 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
610 });
611 spawn(spawned_fut);
612 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
613 executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
614 assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
617 assert!(spawned_fut_completed.load(Ordering::SeqCst));
618 }
619
620 #[test]
621 fn task_destruction() {
622 struct DropSpawner {
623 dropped: Arc<AtomicBool>,
624 }
625 impl Drop for DropSpawner {
626 fn drop(&mut self) {
627 self.dropped.store(true, Ordering::SeqCst);
628 let dropped_clone = self.dropped.clone();
629 spawn(async {
630 let _dropped_clone = dropped_clone;
632 panic!("task spawned in drop shouldn't be polled");
633 });
634 }
635 }
636 let mut dropped = Arc::new(AtomicBool::new(false));
637 let drop_spawner = DropSpawner { dropped: dropped.clone() };
638 let mut executor = TestExecutorBuilder::new().build();
639 let mut main_fut = pin!(async move {
640 spawn(async move {
641 let _drop_spawner = drop_spawner;
643 future::pending::<()>().await;
644 });
645 });
646 assert!(executor.run_until_stalled(&mut main_fut).is_ready());
647 assert!(
648 !dropped.load(Ordering::SeqCst),
649 "executor dropped pending task before destruction"
650 );
651
652 drop(executor);
655 let dropped = Arc::get_mut(&mut dropped)
656 .expect("someone else is unexpectedly still holding on to a reference");
657 assert!(
658 dropped.load(Ordering::SeqCst),
659 "executor did not drop pending task during destruction"
660 );
661 }
662
663 #[test]
664 fn time_now_real_time() {
665 let _executor = LocalExecutorBuilder::new().build();
666 let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
667 let t2 = MonotonicInstant::now().into_zx();
668 let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
669 assert!(t1 <= t2);
670 assert!(t2 <= t3);
671 }
672
673 #[test]
674 fn time_now_fake_time() {
675 let executor = TestExecutorBuilder::new().fake_time(true).build();
676 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
677 executor.set_fake_time(t1);
678 assert_eq!(MonotonicInstant::now(), t1);
679
680 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
681 executor.set_fake_time(t2);
682 assert_eq!(MonotonicInstant::now(), t2);
683 }
684
685 #[test]
686 fn time_now_fake_time_boot() {
687 let executor = TestExecutorBuilder::new().fake_time(true).build();
688 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
689 executor.set_fake_time(t1);
690 assert_eq!(MonotonicInstant::now(), t1);
691 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
692
693 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
694 executor.set_fake_time(t2);
695 assert_eq!(MonotonicInstant::now(), t2);
696 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
697
698 const TEST_BOOT_OFFSET: i64 = 42;
699
700 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
701 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
702 }
703
704 #[test]
705 fn time_boot_now() {
706 let executor = TestExecutorBuilder::new().fake_time(true).build();
707 let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
708 executor.set_fake_time(t1);
709 assert_eq!(MonotonicInstant::now(), t1);
710 assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
711
712 let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
713 executor.set_fake_time(t2);
714 assert_eq!(MonotonicInstant::now(), t2);
715 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
716
717 const TEST_BOOT_OFFSET: i64 = 42;
718
719 executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
720 assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
721 }
722
723 #[test]
724 fn time_after_overflow() {
725 let executor = TestExecutorBuilder::new().fake_time(true).build();
726
727 executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
728 assert_eq!(
729 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
730 MonotonicInstant::INFINITE
731 );
732
733 executor.set_fake_time(
734 MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
735 );
736 assert_eq!(
737 MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
738 MonotonicInstant::INFINITE_PAST
739 );
740 }
741
742 async fn multi_wake(n: usize) {
744 let mut done = false;
745 futures::future::poll_fn(|cx| {
746 if done {
747 return Poll::Ready(());
748 }
749 for _ in 1..n {
750 cx.waker().wake_by_ref()
751 }
752 done = true;
753 Poll::Pending
754 })
755 .await;
756 }
757
758 #[test]
759 fn test_boot_time_tracks_mono_time() {
760 const FAKE_TIME: i64 = 42;
761 let executor = TestExecutorBuilder::new().fake_time(true).build();
762 executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
763 assert_eq!(
764 BootInstant::from_nanos(FAKE_TIME),
765 executor.boot_now(),
766 "boot time should have advanced"
767 );
768
769 executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
771 assert_eq!(
772 BootInstant::from_nanos(2 * FAKE_TIME),
773 executor.boot_now(),
774 "boot time should have advanced again"
775 );
776 }
777
778 #[test]
781 fn many_wakeups() {
782 let mut executor = LocalExecutorBuilder::new().build();
783 executor.run_singlethreaded(multi_wake(4096 * 2));
784 }
785
786 fn advance_to_with(timer_duration: impl WakeupTime) {
787 let mut executor = TestExecutorBuilder::new().fake_time(true).build();
788 executor.set_fake_time(MonotonicInstant::from_nanos(0));
789
790 let mut fut = pin!(async {
791 let timer_fired = Arc::new(AtomicBool::new(false));
792 futures::join!(
793 async {
794 Timer::new(timer_duration).await;
796 timer_fired.store(true, Ordering::SeqCst);
797 },
798 async {
799 let mut fired = 0;
801 let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
802 while interval.next().await.is_some() {
803 fired += 1;
804 if fired == 3 {
805 break;
806 }
807 }
808 assert_eq!(fired, 3, "interval timer should have fired multiple times.");
809 },
810 async {
811 assert!(
812 !timer_fired.load(Ordering::SeqCst),
813 "the oneshot timer shouldn't be fired"
814 );
815 TestExecutor::advance_to(MonotonicInstant::after(
816 zx::MonotonicDuration::from_millis(500),
817 ))
818 .await;
819 assert!(
821 !timer_fired.load(Ordering::SeqCst),
822 "the oneshot timer shouldn't be fired"
823 );
824 TestExecutor::advance_to(MonotonicInstant::after(
825 zx::MonotonicDuration::from_millis(500),
826 ))
827 .await;
828
829 assert!(
830 timer_fired.load(Ordering::SeqCst),
831 "the oneshot timer should have fired"
832 );
833
834 TestExecutor::advance_to(MonotonicInstant::after(
836 zx::MonotonicDuration::from_seconds(2),
837 ))
838 .await;
839 }
840 )
841 });
842 assert!(executor.run_until_stalled(&mut fut).is_ready());
843 }
844
845 #[test]
846 fn test_advance_to() {
847 advance_to_with(zx::MonotonicDuration::from_seconds(1));
848 }
849
850 #[test]
851 fn test_advance_to_boot() {
852 advance_to_with(zx::BootDuration::from_seconds(1));
853 }
854}