fuchsia_async/runtime/fuchsia/executor/
local.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle, MAIN_TASK_ID};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use futures::future::{self, Either};
12use futures::task::AtomicWaker;
13use std::fmt;
14use std::future::{poll_fn, Future};
15use std::pin::pin;
16use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20/// A single-threaded port-based executor for Fuchsia.
21///
22/// Having a `LocalExecutor` in scope allows the creation and polling of zircon objects, such as
23/// [`fuchsia_async::Channel`].
24///
25/// # Panics
26///
27/// `LocalExecutor` will panic on drop if any zircon objects attached to it are still alive. In
28/// other words, zircon objects backed by a `LocalExecutor` must be dropped before it.
29pub struct LocalExecutor {
30    // LINT.IfChange
31    /// The inner executor state.
32    pub(crate) ehandle: EHandle,
33    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
34}
35
36impl fmt::Debug for LocalExecutor {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
39    }
40}
41
42impl Default for LocalExecutor {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl LocalExecutor {
49    /// Create a new single-threaded executor running with actual time.
50    pub fn new() -> Self {
51        let inner = Arc::new(Executor::new(
52            ExecutorTime::RealTime,
53            /* is_local */ true,
54            /* num_threads */ 1,
55        ));
56        let root_scope = ScopeHandle::root(inner);
57        Executor::set_local(root_scope.clone());
58        Self { ehandle: EHandle { root_scope } }
59    }
60
61    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
62    pub fn port(&self) -> &zx::Port {
63        self.ehandle.port()
64    }
65
66    /// Run a single future to completion on a single thread, also polling other active tasks.
67    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
68    where
69        F: Future,
70    {
71        assert!(
72            self.ehandle.inner().is_real_time(),
73            "Error: called `run_singlethreaded` on an executor using fake time"
74        );
75
76        let Poll::Ready(result) = self.run::</* UNTIL_STALLED: */ false, _>(main_future) else {
77            unreachable!()
78        };
79        result
80    }
81
82    fn run<const UNTIL_STALLED: bool, Fut: Future>(
83        &mut self,
84        main_future: Fut,
85    ) -> Poll<Fut::Output> {
86        /// # Safety
87        ///
88        /// See the comment below.
89        unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
90            std::mem::transmute(obj)
91        }
92
93        let scope = &self.ehandle.root_scope;
94        let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
95
96        // SAFETY: Erasing the lifetime is safe because we make sure to drop the main task within
97        // the required lifetime.
98        unsafe {
99            scope.insert_task(remove_lifetime(task), false);
100        }
101
102        struct DropMainTask<'a>(&'a EHandle);
103        impl Drop for DropMainTask<'_> {
104            fn drop(&mut self) {
105                // SAFETY: drop_main_tasks requires that the executor isn't running
106                // i.e. worker_lifecycle isn't running, which will be the case when this runs.
107                unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
108            }
109        }
110        let _drop_main_task = DropMainTask(&self.ehandle);
111
112        self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
113
114        // SAFETY: We spawned the task earlier, so `R` (the return type) will be the correct type
115        // here.
116        unsafe {
117            self.ehandle.global_scope().poll_join_result(
118                MAIN_TASK_ID,
119                &mut Context::from_waker(&futures::task::noop_waker()),
120            )
121        }
122    }
123
124    #[doc(hidden)]
125    /// Returns the root scope of the executor.
126    pub fn root_scope(&self) -> &ScopeHandle {
127        self.ehandle.global_scope()
128    }
129}
130
131impl Drop for LocalExecutor {
132    fn drop(&mut self) {
133        self.ehandle.inner().mark_done();
134        self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
135    }
136}
137
138/// A single-threaded executor for testing. Exposes additional APIs for manipulating executor state
139/// and validating behavior of executed tasks.
140///
141/// TODO(https://fxbug.dev/375631801): This is lack of BootInstant support.
142pub struct TestExecutor {
143    /// LocalExecutor used under the hood, since most of the logic is shared.
144    local: LocalExecutor,
145}
146
147impl Default for TestExecutor {
148    fn default() -> Self {
149        Self::new()
150    }
151}
152
153impl TestExecutor {
154    /// Create a new executor for testing.
155    pub fn new() -> Self {
156        Self { local: LocalExecutor::new() }
157    }
158
159    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
160    pub fn port(&self) -> &zx::Port {
161        self.local.port()
162    }
163
164    /// Create a new single-threaded executor running with fake time.
165    pub fn new_with_fake_time() -> Self {
166        let inner = Arc::new(Executor::new(
167            ExecutorTime::FakeTime {
168                mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
169                mono_to_boot_offset_ns: AtomicI64::new(0),
170            },
171            /* is_local */ true,
172            /* num_threads */ 1,
173        ));
174        let root_scope = ScopeHandle::root(inner);
175        Executor::set_local(root_scope.clone());
176        Self { local: LocalExecutor { ehandle: EHandle { root_scope } } }
177    }
178
179    /// Return the current time according to the executor.
180    pub fn now(&self) -> MonotonicInstant {
181        self.local.ehandle.inner().now()
182    }
183
184    /// Return the current time on the boot timeline, according to the executor.
185    pub fn boot_now(&self) -> BootInstant {
186        self.local.ehandle.inner().boot_now()
187    }
188
189    /// Set the fake time to a given value.
190    ///
191    /// # Panics
192    ///
193    /// If the executor was not created with fake time.
194    pub fn set_fake_time(&self, t: MonotonicInstant) {
195        self.local.ehandle.inner().set_fake_time(t)
196    }
197
198    /// Set the offset between the reading of the monotonic and the boot
199    /// clocks.
200    ///
201    /// This is useful to test the situations in which the boot and monotonic
202    /// offsets diverge.  In realistic scenarios, the offset can only grow,
203    /// and testers should keep that in view when setting duration.
204    ///
205    /// # Panics
206    ///
207    /// If the executor was not created with fake time.
208    pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
209        self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
210    }
211
212    /// Get the global scope of the executor.
213    pub fn global_scope(&self) -> &ScopeHandle {
214        self.local.root_scope()
215    }
216
217    /// Run a single future to completion on a single thread, also polling other active tasks.
218    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
219    where
220        F: Future,
221    {
222        self.local.run_singlethreaded(main_future)
223    }
224
225    /// Poll the future. If it is not ready, dispatch available packets and possibly try
226    /// again. Timers will only fire if this executor uses fake time. Never blocks.
227    ///
228    /// This function is for testing. DO NOT use this function in tests or applications that
229    /// involve any interaction with other threads or processes, as those interactions
230    /// may become stalled waiting for signals from "the outside world" which is beyond
231    /// the knowledge of the executor.
232    ///
233    /// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
234    /// futures must first be pinned using the `pin!` macro.
235    pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
236    where
237        F: Future + Unpin,
238    {
239        let mut main_future = pin!(main_future);
240
241        // Set up an instance of UntilStalledData that works with `poll_until_stalled`.
242        struct Cleanup(Arc<Executor>);
243        impl Drop for Cleanup {
244            fn drop(&mut self) {
245                *self.0.owner_data.lock() = None;
246            }
247        }
248        let _cleanup = Cleanup(self.local.ehandle.inner().clone());
249        *self.local.ehandle.inner().owner_data.lock() =
250            Some(Box::new(UntilStalledData { watcher: None }));
251
252        loop {
253            let result = self.local.run::</* UNTIL_STALLED: */ true, _>(main_future.as_mut());
254            if result.is_ready() {
255                return result;
256            }
257
258            // If a waker was set by `poll_until_stalled`, disarm, wake, and loop.
259            if let Some(watcher) = with_data(|data| data.watcher.take()) {
260                watcher.waker.wake();
261                // Relaxed ordering is fine here because this atomic is only ever access from the
262                // main thread.
263                watcher.done.store(true, Ordering::Relaxed);
264            } else {
265                break;
266            }
267        }
268
269        Poll::Pending
270    }
271
272    /// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
273    ///
274    /// This is intended for use in test code in conjunction with fake time.
275    ///
276    /// The wake will have effect on both the monotonic and the boot timers.
277    pub fn wake_expired_timers(&mut self) -> bool {
278        self.local.ehandle.inner().monotonic_timers().wake_timers()
279            || self.local.ehandle.inner().boot_timers().wake_timers()
280    }
281
282    /// Wake up the next task waiting for a timer, if any, and return the time for which the
283    /// timer was scheduled.
284    ///
285    /// This is intended for use in test code in conjunction with `run_until_stalled`.
286    /// For example, here is how one could test that the Timer future fires after the given
287    /// timeout:
288    ///
289    ///     let deadline = zx::MonotonicDuration::from_seconds(5).after_now();
290    ///     let mut future = Timer::<Never>::new(deadline);
291    ///     assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
292    ///     assert_eq!(Some(deadline), exec.wake_next_timer());
293    ///     assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
294    pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
295        self.local.ehandle.inner().monotonic_timers().wake_next_timer()
296    }
297
298    /// Similar to [wake_next_timer], but operates on the timers on the boot
299    /// timeline.
300    pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
301        self.local.ehandle.inner().boot_timers().wake_next_timer()
302    }
303
304    /// Returns the deadline for the next timer due to expire.
305    pub fn next_timer() -> Option<MonotonicInstant> {
306        EHandle::local().inner().monotonic_timers().next_timer()
307    }
308
309    /// Returns the deadline for the next boot timeline timer due to expire.
310    pub fn next_boot_timer() -> Option<BootInstant> {
311        EHandle::local().inner().boot_timers().next_timer()
312    }
313
314    /// Advances fake time to the specified time.  This will only work if the executor is being run
315    /// via `TestExecutor::run_until_stalled` and can only be called by one task at a time.  This
316    /// will make sure that repeating timers fire as expected.
317    ///
318    /// # Panics
319    ///
320    /// Panics if the executor was not created with fake time, and for the same reasons
321    /// `poll_until_stalled` can below.
322    pub async fn advance_to(time: MonotonicInstant) {
323        let ehandle = EHandle::local();
324        loop {
325            let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
326            if let Some(next_timer) = Self::next_timer() {
327                if next_timer <= time {
328                    ehandle.inner().set_fake_time(next_timer);
329                    continue;
330                }
331            }
332            ehandle.inner().set_fake_time(time);
333            break;
334        }
335    }
336
337    /// Runs the future until it is ready or the executor is stalled. Returns the state of the
338    /// future.
339    ///
340    /// This will only work if the executor is being run via `TestExecutor::run_until_stalled` and
341    /// can only be called by one task at a time.
342    ///
343    /// This can be used in tests to assert that a future should be pending:
344    /// ```
345    /// assert!(
346    ///     TestExecutor::poll_until_stalled(my_fut).await.is_pending(),
347    ///     "my_fut should not be ready!"
348    /// );
349    /// ```
350    ///
351    /// If you just want to know when the executor is stalled, you can do:
352    /// ```
353    /// let _: Poll<()> = TestExecutor::poll_until_stalled(future::pending::<()>()).await;
354    /// ```
355    ///
356    /// # Panics
357    ///
358    /// Panics if another task is currently trying to use `run_until_stalled`, or the executor is
359    /// not using `TestExecutor::run_until_stalled`.
360    pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
361        let watcher =
362            Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
363
364        assert!(
365            with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
366            "Error: Another task has called `poll_until_stalled`."
367        );
368
369        struct Watcher(Arc<StalledWatcher>);
370
371        // Make sure we clean up if we're dropped.
372        impl Drop for Watcher {
373            fn drop(&mut self) {
374                if !self.0.done.swap(true, Ordering::Relaxed) {
375                    with_data(|data| data.watcher = None);
376                }
377            }
378        }
379
380        let watcher = Watcher(watcher);
381
382        let poll_fn = poll_fn(|cx: &mut Context<'_>| {
383            if watcher.0.done.load(Ordering::Relaxed) {
384                Poll::Ready(())
385            } else {
386                watcher.0.waker.register(cx.waker());
387                Poll::Pending
388            }
389        });
390        match future::select(poll_fn, fut).await {
391            Either::Left(_) => Poll::Pending,
392            Either::Right((value, _)) => Poll::Ready(value),
393        }
394    }
395}
396
397struct StalledWatcher {
398    waker: AtomicWaker,
399    done: AtomicBool,
400}
401
402struct UntilStalledData {
403    watcher: Option<Arc<StalledWatcher>>,
404}
405
406/// Calls `f` with `&mut UntilStalledData` that is stored in `owner_data`.
407///
408/// # Panics
409///
410/// Panics if `owner_data` isn't an instance of `UntilStalledData`.
411fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
412    const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
413                           with TestExecutor::run_until_stalled";
414    f(EHandle::local()
415        .inner()
416        .owner_data
417        .lock()
418        .as_mut()
419        .expect(MESSAGE)
420        .downcast_mut::<UntilStalledData>()
421        .expect(MESSAGE))
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use crate::handle::on_signals::OnSignals;
428    use crate::{Interval, Timer, WakeupTime};
429    use assert_matches::assert_matches;
430    use futures::StreamExt;
431    use std::cell::{Cell, RefCell};
432    use std::rc::Rc;
433    use std::task::Waker;
434    use zx::{self as zx, AsHandleRef};
435
436    fn spawn(future: impl Future<Output = ()> + Send + 'static) {
437        crate::EHandle::local().spawn_detached(future);
438    }
439
440    // Runs a future that suspends and returns after being resumed.
441    #[test]
442    fn stepwise_two_steps() {
443        let fut_step = Rc::new(Cell::new(0));
444        let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
445        let fut_waker_clone = fut_waker.clone();
446        let fut_step_clone = fut_step.clone();
447        let fut_fn = move |cx: &mut Context<'_>| {
448            fut_waker_clone.borrow_mut().replace(cx.waker().clone());
449            match fut_step_clone.get() {
450                0 => {
451                    fut_step_clone.set(1);
452                    Poll::Pending
453                }
454                1 => {
455                    fut_step_clone.set(2);
456                    Poll::Ready(())
457                }
458                _ => panic!("future called after done"),
459            }
460        };
461        let fut = Box::new(future::poll_fn(fut_fn));
462        let mut executor = TestExecutor::new_with_fake_time();
463        // Spawn the future rather than waking it the main task because run_until_stalled will wake
464        // the main future on every call, and we want to wake it ourselves using the waker.
465        executor.local.ehandle.spawn_local_detached(fut);
466        assert_eq!(fut_step.get(), 0);
467        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
468        assert_eq!(fut_step.get(), 1);
469
470        fut_waker.borrow_mut().take().unwrap().wake();
471        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
472        assert_eq!(fut_step.get(), 2);
473    }
474
475    #[test]
476    // Runs a future that waits on a timer.
477    fn stepwise_timer() {
478        let mut executor = TestExecutor::new_with_fake_time();
479        executor.set_fake_time(MonotonicInstant::from_nanos(0));
480        let mut fut =
481            pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
482
483        let _ = executor.run_until_stalled(&mut fut);
484        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
485
486        executor.set_fake_time(MonotonicInstant::from_nanos(1000));
487        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
488        assert!(executor.run_until_stalled(&mut fut).is_ready());
489    }
490
491    // Runs a future that waits on an event.
492    #[test]
493    fn stepwise_event() {
494        let mut executor = TestExecutor::new_with_fake_time();
495        let event = zx::Event::create();
496        let mut fut = pin!(OnSignals::new(&event, zx::Signals::USER_0));
497
498        let _ = executor.run_until_stalled(&mut fut);
499
500        event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
501        assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
502    }
503
504    // Using `run_until_stalled` does not modify the order of events
505    // compared to normal execution.
506    #[test]
507    fn run_until_stalled_preserves_order() {
508        let mut executor = TestExecutor::new_with_fake_time();
509        let spawned_fut_completed = Arc::new(AtomicBool::new(false));
510        let spawned_fut_completed_writer = spawned_fut_completed.clone();
511        let spawned_fut = Box::pin(async move {
512            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
513            spawned_fut_completed_writer.store(true, Ordering::SeqCst);
514        });
515        let mut main_fut = pin!(async {
516            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
517        });
518        spawn(spawned_fut);
519        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
520        executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
521        // The timer in `spawned_fut` should fire first, then the
522        // timer in `main_fut`.
523        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
524        assert!(spawned_fut_completed.load(Ordering::SeqCst));
525    }
526
527    #[test]
528    fn task_destruction() {
529        struct DropSpawner {
530            dropped: Arc<AtomicBool>,
531        }
532        impl Drop for DropSpawner {
533            fn drop(&mut self) {
534                self.dropped.store(true, Ordering::SeqCst);
535                let dropped_clone = self.dropped.clone();
536                spawn(async {
537                    // Hold on to a reference here to verify that it, too, is destroyed later
538                    let _dropped_clone = dropped_clone;
539                    panic!("task spawned in drop shouldn't be polled");
540                });
541            }
542        }
543        let mut dropped = Arc::new(AtomicBool::new(false));
544        let drop_spawner = DropSpawner { dropped: dropped.clone() };
545        let mut executor = TestExecutor::new();
546        let mut main_fut = pin!(async move {
547            spawn(async move {
548                // Take ownership of the drop spawner
549                let _drop_spawner = drop_spawner;
550                future::pending::<()>().await;
551            });
552        });
553        assert!(executor.run_until_stalled(&mut main_fut).is_ready());
554        assert!(
555            !dropped.load(Ordering::SeqCst),
556            "executor dropped pending task before destruction"
557        );
558
559        // Should drop the pending task and it's owned drop spawner,
560        // as well as gracefully drop the future spawned from the drop spawner.
561        drop(executor);
562        let dropped = Arc::get_mut(&mut dropped)
563            .expect("someone else is unexpectedly still holding on to a reference");
564        assert!(
565            dropped.load(Ordering::SeqCst),
566            "executor did not drop pending task during destruction"
567        );
568    }
569
570    #[test]
571    fn time_now_real_time() {
572        let _executor = LocalExecutor::new();
573        let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
574        let t2 = MonotonicInstant::now().into_zx();
575        let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
576        assert!(t1 <= t2);
577        assert!(t2 <= t3);
578    }
579
580    #[test]
581    fn time_now_fake_time() {
582        let executor = TestExecutor::new_with_fake_time();
583        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
584        executor.set_fake_time(t1);
585        assert_eq!(MonotonicInstant::now(), t1);
586
587        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
588        executor.set_fake_time(t2);
589        assert_eq!(MonotonicInstant::now(), t2);
590    }
591
592    #[test]
593    fn time_now_fake_time_boot() {
594        let executor = TestExecutor::new_with_fake_time();
595        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
596        executor.set_fake_time(t1);
597        assert_eq!(MonotonicInstant::now(), t1);
598        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
599
600        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
601        executor.set_fake_time(t2);
602        assert_eq!(MonotonicInstant::now(), t2);
603        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
604
605        const TEST_BOOT_OFFSET: i64 = 42;
606
607        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
608        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
609    }
610
611    #[test]
612    fn time_boot_now() {
613        let executor = TestExecutor::new_with_fake_time();
614        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
615        executor.set_fake_time(t1);
616        assert_eq!(MonotonicInstant::now(), t1);
617        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
618
619        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
620        executor.set_fake_time(t2);
621        assert_eq!(MonotonicInstant::now(), t2);
622        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
623
624        const TEST_BOOT_OFFSET: i64 = 42;
625
626        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
627        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
628    }
629
630    #[test]
631    fn time_after_overflow() {
632        let executor = TestExecutor::new_with_fake_time();
633
634        executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
635        assert_eq!(
636            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
637            MonotonicInstant::INFINITE
638        );
639
640        executor.set_fake_time(
641            MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
642        );
643        assert_eq!(
644            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
645            MonotonicInstant::INFINITE_PAST
646        );
647    }
648
649    // This future wakes itself up a number of times during the same cycle
650    async fn multi_wake(n: usize) {
651        let mut done = false;
652        futures::future::poll_fn(|cx| {
653            if done {
654                return Poll::Ready(());
655            }
656            for _ in 1..n {
657                cx.waker().wake_by_ref()
658            }
659            done = true;
660            Poll::Pending
661        })
662        .await;
663    }
664
665    #[test]
666    fn test_boot_time_tracks_mono_time() {
667        const FAKE_TIME: i64 = 42;
668        let executor = TestExecutor::new_with_fake_time();
669        executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
670        assert_eq!(
671            BootInstant::from_nanos(FAKE_TIME),
672            executor.boot_now(),
673            "boot time should have advanced"
674        );
675
676        // Now advance boot without mono.
677        executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
678        assert_eq!(
679            BootInstant::from_nanos(2 * FAKE_TIME),
680            executor.boot_now(),
681            "boot time should have advanced again"
682        );
683    }
684
685    // Ensure that a large amount of wakeups does not exhaust kernel resources,
686    // such as the zx port queue limit.
687    #[test]
688    fn many_wakeups() {
689        let mut executor = LocalExecutor::new();
690        executor.run_singlethreaded(multi_wake(4096 * 2));
691    }
692
693    fn advance_to_with(timer_duration: impl WakeupTime) {
694        let mut executor = TestExecutor::new_with_fake_time();
695        executor.set_fake_time(MonotonicInstant::from_nanos(0));
696
697        let mut fut = pin!(async {
698            let timer_fired = Arc::new(AtomicBool::new(false));
699            futures::join!(
700                async {
701                    // Oneshot timer.
702                    Timer::new(timer_duration).await;
703                    timer_fired.store(true, Ordering::SeqCst);
704                },
705                async {
706                    // Interval timer, fires periodically.
707                    let mut fired = 0;
708                    let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
709                    while interval.next().await.is_some() {
710                        fired += 1;
711                        if fired == 3 {
712                            break;
713                        }
714                    }
715                    assert_eq!(fired, 3, "interval timer should have fired multiple times.");
716                },
717                async {
718                    assert!(
719                        !timer_fired.load(Ordering::SeqCst),
720                        "the oneshot timer shouldn't be fired"
721                    );
722                    TestExecutor::advance_to(MonotonicInstant::after(
723                        zx::MonotonicDuration::from_millis(500),
724                    ))
725                    .await;
726                    // Timer still shouldn't be fired.
727                    assert!(
728                        !timer_fired.load(Ordering::SeqCst),
729                        "the oneshot timer shouldn't be fired"
730                    );
731                    TestExecutor::advance_to(MonotonicInstant::after(
732                        zx::MonotonicDuration::from_millis(500),
733                    ))
734                    .await;
735
736                    assert!(
737                        timer_fired.load(Ordering::SeqCst),
738                        "the oneshot timer should have fired"
739                    );
740
741                    // The interval timer should have fired once.  Make it fire twice more.
742                    TestExecutor::advance_to(MonotonicInstant::after(
743                        zx::MonotonicDuration::from_seconds(2),
744                    ))
745                    .await;
746                }
747            )
748        });
749        assert!(executor.run_until_stalled(&mut fut).is_ready());
750    }
751
752    #[test]
753    fn test_advance_to() {
754        advance_to_with(zx::MonotonicDuration::from_seconds(1));
755    }
756
757    #[test]
758    fn test_advance_to_boot() {
759        advance_to_with(zx::BootDuration::from_seconds(1));
760    }
761}