fuchsia_async/runtime/fuchsia/executor/
local.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle, MAIN_TASK_ID};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use futures::future::{self, Either};
12use futures::task::AtomicWaker;
13use std::fmt;
14use std::future::{poll_fn, Future};
15use std::pin::pin;
16use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20/// A single-threaded port-based executor for Fuchsia.
21///
22/// Having a `LocalExecutor` in scope allows the creation and polling of zircon objects, such as
23/// [`fuchsia_async::Channel`].
24///
25/// # Panics
26///
27/// `LocalExecutor` will panic on drop if any zircon objects attached to it are still alive. In
28/// other words, zircon objects backed by a `LocalExecutor` must be dropped before it.
29pub struct LocalExecutor {
30    // LINT.IfChange
31    /// The inner executor state.
32    pub(crate) ehandle: EHandle,
33    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
34}
35
36impl fmt::Debug for LocalExecutor {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
39    }
40}
41
42impl Default for LocalExecutor {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl LocalExecutor {
49    /// Create a new single-threaded executor running with actual time.
50    pub fn new() -> Self {
51        Self::new_with_port(zx::Port::create())
52    }
53
54    /// Create a new single-threaded executor running with actual time, with a port.
55    pub fn new_with_port(port: zx::Port) -> Self {
56        let inner = Arc::new(Executor::new_with_port(
57            ExecutorTime::RealTime,
58            /* is_local */ true,
59            /* num_threads */ 1,
60            port,
61        ));
62        let root_scope = ScopeHandle::root(inner);
63        Executor::set_local(root_scope.clone());
64        Self { ehandle: EHandle { root_scope } }
65    }
66
67    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
68    pub fn port(&self) -> &zx::Port {
69        self.ehandle.port()
70    }
71
72    /// Run a single future to completion on a single thread, also polling other active tasks.
73    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
74    where
75        F: Future,
76    {
77        assert!(
78            self.ehandle.inner().is_real_time(),
79            "Error: called `run_singlethreaded` on an executor using fake time"
80        );
81
82        let Poll::Ready(result) = self.run::</* UNTIL_STALLED: */ false, _>(main_future) else {
83            unreachable!()
84        };
85        result
86    }
87
88    fn run<const UNTIL_STALLED: bool, Fut: Future>(
89        &mut self,
90        main_future: Fut,
91    ) -> Poll<Fut::Output> {
92        /// # Safety
93        ///
94        /// See the comment below.
95        unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
96            std::mem::transmute(obj)
97        }
98
99        let scope = &self.ehandle.root_scope;
100        let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
101
102        // SAFETY: Erasing the lifetime is safe because we make sure to drop the main task within
103        // the required lifetime.
104        unsafe {
105            scope.insert_task(remove_lifetime(task), false);
106        }
107
108        struct DropMainTask<'a>(&'a EHandle);
109        impl Drop for DropMainTask<'_> {
110            fn drop(&mut self) {
111                // SAFETY: drop_main_tasks requires that the executor isn't running
112                // i.e. worker_lifecycle isn't running, which will be the case when this runs.
113                unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
114            }
115        }
116        let _drop_main_task = DropMainTask(&self.ehandle);
117
118        self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
119
120        // SAFETY: We spawned the task earlier, so `R` (the return type) will be the correct type
121        // here.
122        unsafe {
123            self.ehandle.global_scope().poll_join_result(
124                MAIN_TASK_ID,
125                &mut Context::from_waker(&futures::task::noop_waker()),
126            )
127        }
128    }
129
130    #[doc(hidden)]
131    /// Returns the root scope of the executor.
132    pub fn root_scope(&self) -> &ScopeHandle {
133        self.ehandle.global_scope()
134    }
135}
136
137impl Drop for LocalExecutor {
138    fn drop(&mut self) {
139        self.ehandle.inner().mark_done();
140        self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
141    }
142}
143
144/// A single-threaded executor for testing. Exposes additional APIs for manipulating executor state
145/// and validating behavior of executed tasks.
146///
147/// TODO(https://fxbug.dev/375631801): This is lack of BootInstant support.
148pub struct TestExecutor {
149    /// LocalExecutor used under the hood, since most of the logic is shared.
150    local: LocalExecutor,
151}
152
153impl Default for TestExecutor {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159impl TestExecutor {
160    /// Create a new executor for testing.
161    pub fn new() -> Self {
162        Self { local: LocalExecutor::new() }
163    }
164
165    /// Create a new executor for testing from a port.
166    pub fn new_with_port(port: zx::Port) -> Self {
167        Self { local: LocalExecutor::new_with_port(port) }
168    }
169
170    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
171    pub fn port(&self) -> &zx::Port {
172        self.local.port()
173    }
174
175    /// Create a new single-threaded executor running with fake time.
176    pub fn new_with_fake_time() -> Self {
177        let inner = Arc::new(Executor::new(
178            ExecutorTime::FakeTime {
179                mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
180                mono_to_boot_offset_ns: AtomicI64::new(0),
181            },
182            /* is_local */ true,
183            /* num_threads */ 1,
184        ));
185        let root_scope = ScopeHandle::root(inner);
186        Executor::set_local(root_scope.clone());
187        Self { local: LocalExecutor { ehandle: EHandle { root_scope } } }
188    }
189
190    /// Return the current time according to the executor.
191    pub fn now(&self) -> MonotonicInstant {
192        self.local.ehandle.inner().now()
193    }
194
195    /// Return the current time on the boot timeline, according to the executor.
196    pub fn boot_now(&self) -> BootInstant {
197        self.local.ehandle.inner().boot_now()
198    }
199
200    /// Set the fake time to a given value.
201    ///
202    /// # Panics
203    ///
204    /// If the executor was not created with fake time.
205    pub fn set_fake_time(&self, t: MonotonicInstant) {
206        self.local.ehandle.inner().set_fake_time(t)
207    }
208
209    /// Set the offset between the reading of the monotonic and the boot
210    /// clocks.
211    ///
212    /// This is useful to test the situations in which the boot and monotonic
213    /// offsets diverge.  In realistic scenarios, the offset can only grow,
214    /// and testers should keep that in view when setting duration.
215    ///
216    /// # Panics
217    ///
218    /// If the executor was not created with fake time.
219    pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
220        self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
221    }
222
223    /// Get the global scope of the executor.
224    pub fn global_scope(&self) -> &ScopeHandle {
225        self.local.root_scope()
226    }
227
228    /// Run a single future to completion on a single thread, also polling other active tasks.
229    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
230    where
231        F: Future,
232    {
233        self.local.run_singlethreaded(main_future)
234    }
235
236    /// Poll the future. If it is not ready, dispatch available packets and possibly try
237    /// again. Timers will only fire if this executor uses fake time. Never blocks.
238    ///
239    /// This function is for testing. DO NOT use this function in tests or applications that
240    /// involve any interaction with other threads or processes, as those interactions
241    /// may become stalled waiting for signals from "the outside world" which is beyond
242    /// the knowledge of the executor.
243    ///
244    /// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
245    /// futures must first be pinned using the `pin!` macro.
246    pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
247    where
248        F: Future + Unpin,
249    {
250        let mut main_future = pin!(main_future);
251
252        // Set up an instance of UntilStalledData that works with `poll_until_stalled`.
253        struct Cleanup(Arc<Executor>);
254        impl Drop for Cleanup {
255            fn drop(&mut self) {
256                *self.0.owner_data.lock() = None;
257            }
258        }
259        let _cleanup = Cleanup(self.local.ehandle.inner().clone());
260        *self.local.ehandle.inner().owner_data.lock() =
261            Some(Box::new(UntilStalledData { watcher: None }));
262
263        loop {
264            let result = self.local.run::</* UNTIL_STALLED: */ true, _>(main_future.as_mut());
265            if result.is_ready() {
266                return result;
267            }
268
269            // If a waker was set by `poll_until_stalled`, disarm, wake, and loop.
270            if let Some(watcher) = with_data(|data| data.watcher.take()) {
271                watcher.waker.wake();
272                // Relaxed ordering is fine here because this atomic is only ever access from the
273                // main thread.
274                watcher.done.store(true, Ordering::Relaxed);
275            } else {
276                break;
277            }
278        }
279
280        Poll::Pending
281    }
282
283    /// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
284    ///
285    /// This is intended for use in test code in conjunction with fake time.
286    ///
287    /// The wake will have effect on both the monotonic and the boot timers.
288    pub fn wake_expired_timers(&mut self) -> bool {
289        self.local.ehandle.inner().monotonic_timers().wake_timers()
290            || self.local.ehandle.inner().boot_timers().wake_timers()
291    }
292
293    /// Wake up the next task waiting for a timer, if any, and return the time for which the
294    /// timer was scheduled.
295    ///
296    /// This is intended for use in test code in conjunction with `run_until_stalled`.
297    /// For example, here is how one could test that the Timer future fires after the given
298    /// timeout:
299    ///
300    ///     let deadline = zx::MonotonicDuration::from_seconds(5).after_now();
301    ///     let mut future = Timer::<Never>::new(deadline);
302    ///     assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
303    ///     assert_eq!(Some(deadline), exec.wake_next_timer());
304    ///     assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
305    pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
306        self.local.ehandle.inner().monotonic_timers().wake_next_timer()
307    }
308
309    /// Similar to [wake_next_timer], but operates on the timers on the boot
310    /// timeline.
311    pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
312        self.local.ehandle.inner().boot_timers().wake_next_timer()
313    }
314
315    /// Returns the deadline for the next timer due to expire.
316    pub fn next_timer() -> Option<MonotonicInstant> {
317        EHandle::local().inner().monotonic_timers().next_timer()
318    }
319
320    /// Returns the deadline for the next boot timeline timer due to expire.
321    pub fn next_boot_timer() -> Option<BootInstant> {
322        EHandle::local().inner().boot_timers().next_timer()
323    }
324
325    /// Advances fake time to the specified time.  This will only work if the executor is being run
326    /// via `TestExecutor::run_until_stalled` and can only be called by one task at a time.  This
327    /// will make sure that repeating timers fire as expected.
328    ///
329    /// # Panics
330    ///
331    /// Panics if the executor was not created with fake time, and for the same reasons
332    /// `poll_until_stalled` can below.
333    pub async fn advance_to(time: MonotonicInstant) {
334        let ehandle = EHandle::local();
335        loop {
336            let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
337            if let Some(next_timer) = Self::next_timer() {
338                if next_timer <= time {
339                    ehandle.inner().set_fake_time(next_timer);
340                    continue;
341                }
342            }
343            ehandle.inner().set_fake_time(time);
344            break;
345        }
346    }
347
348    /// Runs the future until it is ready or the executor is stalled. Returns the state of the
349    /// future.
350    ///
351    /// This will only work if the executor is being run via `TestExecutor::run_until_stalled` and
352    /// can only be called by one task at a time.
353    ///
354    /// This can be used in tests to assert that a future should be pending:
355    /// ```
356    /// assert!(
357    ///     TestExecutor::poll_until_stalled(my_fut).await.is_pending(),
358    ///     "my_fut should not be ready!"
359    /// );
360    /// ```
361    ///
362    /// If you just want to know when the executor is stalled, you can do:
363    /// ```
364    /// let _: Poll<()> = TestExecutor::poll_until_stalled(future::pending::<()>()).await;
365    /// ```
366    ///
367    /// # Panics
368    ///
369    /// Panics if another task is currently trying to use `run_until_stalled`, or the executor is
370    /// not using `TestExecutor::run_until_stalled`.
371    pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
372        let watcher =
373            Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
374
375        assert!(
376            with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
377            "Error: Another task has called `poll_until_stalled`."
378        );
379
380        struct Watcher(Arc<StalledWatcher>);
381
382        // Make sure we clean up if we're dropped.
383        impl Drop for Watcher {
384            fn drop(&mut self) {
385                if !self.0.done.swap(true, Ordering::Relaxed) {
386                    with_data(|data| data.watcher = None);
387                }
388            }
389        }
390
391        let watcher = Watcher(watcher);
392
393        let poll_fn = poll_fn(|cx: &mut Context<'_>| {
394            if watcher.0.done.load(Ordering::Relaxed) {
395                Poll::Ready(())
396            } else {
397                watcher.0.waker.register(cx.waker());
398                Poll::Pending
399            }
400        });
401        match future::select(poll_fn, fut).await {
402            Either::Left(_) => Poll::Pending,
403            Either::Right((value, _)) => Poll::Ready(value),
404        }
405    }
406}
407
408struct StalledWatcher {
409    waker: AtomicWaker,
410    done: AtomicBool,
411}
412
413struct UntilStalledData {
414    watcher: Option<Arc<StalledWatcher>>,
415}
416
417/// Calls `f` with `&mut UntilStalledData` that is stored in `owner_data`.
418///
419/// # Panics
420///
421/// Panics if `owner_data` isn't an instance of `UntilStalledData`.
422fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
423    const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
424                           with TestExecutor::run_until_stalled";
425    f(EHandle::local()
426        .inner()
427        .owner_data
428        .lock()
429        .as_mut()
430        .expect(MESSAGE)
431        .downcast_mut::<UntilStalledData>()
432        .expect(MESSAGE))
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use crate::handle::on_signals::OnSignalsFuture;
439    use crate::{Interval, Timer, WakeupTime};
440    use assert_matches::assert_matches;
441    use futures::StreamExt;
442    use std::cell::{Cell, RefCell};
443    use std::rc::Rc;
444    use std::task::Waker;
445    use zx::{self as zx, AsHandleRef};
446
447    fn spawn(future: impl Future<Output = ()> + Send + 'static) {
448        crate::EHandle::local().spawn_detached(future);
449    }
450
451    // Runs a future that suspends and returns after being resumed.
452    #[test]
453    fn stepwise_two_steps() {
454        let fut_step = Rc::new(Cell::new(0));
455        let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
456        let fut_waker_clone = fut_waker.clone();
457        let fut_step_clone = fut_step.clone();
458        let fut_fn = move |cx: &mut Context<'_>| {
459            fut_waker_clone.borrow_mut().replace(cx.waker().clone());
460            match fut_step_clone.get() {
461                0 => {
462                    fut_step_clone.set(1);
463                    Poll::Pending
464                }
465                1 => {
466                    fut_step_clone.set(2);
467                    Poll::Ready(())
468                }
469                _ => panic!("future called after done"),
470            }
471        };
472        let fut = Box::new(future::poll_fn(fut_fn));
473        let mut executor = TestExecutor::new_with_fake_time();
474        // Spawn the future rather than waking it the main task because run_until_stalled will wake
475        // the main future on every call, and we want to wake it ourselves using the waker.
476        executor.local.ehandle.spawn_local_detached(fut);
477        assert_eq!(fut_step.get(), 0);
478        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
479        assert_eq!(fut_step.get(), 1);
480
481        fut_waker.borrow_mut().take().unwrap().wake();
482        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
483        assert_eq!(fut_step.get(), 2);
484    }
485
486    #[test]
487    // Runs a future that waits on a timer.
488    fn stepwise_timer() {
489        let mut executor = TestExecutor::new_with_fake_time();
490        executor.set_fake_time(MonotonicInstant::from_nanos(0));
491        let mut fut =
492            pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
493
494        let _ = executor.run_until_stalled(&mut fut);
495        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
496
497        executor.set_fake_time(MonotonicInstant::from_nanos(1000));
498        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
499        assert!(executor.run_until_stalled(&mut fut).is_ready());
500    }
501
502    // Runs a future that waits on an event.
503    #[test]
504    fn stepwise_event() {
505        let mut executor = TestExecutor::new_with_fake_time();
506        let event = zx::Event::create();
507        let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
508
509        let _ = executor.run_until_stalled(&mut fut);
510
511        event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
512        assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
513    }
514
515    // Using `run_until_stalled` does not modify the order of events
516    // compared to normal execution.
517    #[test]
518    fn run_until_stalled_preserves_order() {
519        let mut executor = TestExecutor::new_with_fake_time();
520        let spawned_fut_completed = Arc::new(AtomicBool::new(false));
521        let spawned_fut_completed_writer = spawned_fut_completed.clone();
522        let spawned_fut = Box::pin(async move {
523            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
524            spawned_fut_completed_writer.store(true, Ordering::SeqCst);
525        });
526        let mut main_fut = pin!(async {
527            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
528        });
529        spawn(spawned_fut);
530        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
531        executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
532        // The timer in `spawned_fut` should fire first, then the
533        // timer in `main_fut`.
534        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
535        assert!(spawned_fut_completed.load(Ordering::SeqCst));
536    }
537
538    #[test]
539    fn task_destruction() {
540        struct DropSpawner {
541            dropped: Arc<AtomicBool>,
542        }
543        impl Drop for DropSpawner {
544            fn drop(&mut self) {
545                self.dropped.store(true, Ordering::SeqCst);
546                let dropped_clone = self.dropped.clone();
547                spawn(async {
548                    // Hold on to a reference here to verify that it, too, is destroyed later
549                    let _dropped_clone = dropped_clone;
550                    panic!("task spawned in drop shouldn't be polled");
551                });
552            }
553        }
554        let mut dropped = Arc::new(AtomicBool::new(false));
555        let drop_spawner = DropSpawner { dropped: dropped.clone() };
556        let mut executor = TestExecutor::new();
557        let mut main_fut = pin!(async move {
558            spawn(async move {
559                // Take ownership of the drop spawner
560                let _drop_spawner = drop_spawner;
561                future::pending::<()>().await;
562            });
563        });
564        assert!(executor.run_until_stalled(&mut main_fut).is_ready());
565        assert!(
566            !dropped.load(Ordering::SeqCst),
567            "executor dropped pending task before destruction"
568        );
569
570        // Should drop the pending task and it's owned drop spawner,
571        // as well as gracefully drop the future spawned from the drop spawner.
572        drop(executor);
573        let dropped = Arc::get_mut(&mut dropped)
574            .expect("someone else is unexpectedly still holding on to a reference");
575        assert!(
576            dropped.load(Ordering::SeqCst),
577            "executor did not drop pending task during destruction"
578        );
579    }
580
581    #[test]
582    fn time_now_real_time() {
583        let _executor = LocalExecutor::new();
584        let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
585        let t2 = MonotonicInstant::now().into_zx();
586        let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
587        assert!(t1 <= t2);
588        assert!(t2 <= t3);
589    }
590
591    #[test]
592    fn time_now_fake_time() {
593        let executor = TestExecutor::new_with_fake_time();
594        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
595        executor.set_fake_time(t1);
596        assert_eq!(MonotonicInstant::now(), t1);
597
598        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
599        executor.set_fake_time(t2);
600        assert_eq!(MonotonicInstant::now(), t2);
601    }
602
603    #[test]
604    fn time_now_fake_time_boot() {
605        let executor = TestExecutor::new_with_fake_time();
606        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
607        executor.set_fake_time(t1);
608        assert_eq!(MonotonicInstant::now(), t1);
609        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
610
611        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
612        executor.set_fake_time(t2);
613        assert_eq!(MonotonicInstant::now(), t2);
614        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
615
616        const TEST_BOOT_OFFSET: i64 = 42;
617
618        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
619        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
620    }
621
622    #[test]
623    fn time_boot_now() {
624        let executor = TestExecutor::new_with_fake_time();
625        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
626        executor.set_fake_time(t1);
627        assert_eq!(MonotonicInstant::now(), t1);
628        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
629
630        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
631        executor.set_fake_time(t2);
632        assert_eq!(MonotonicInstant::now(), t2);
633        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
634
635        const TEST_BOOT_OFFSET: i64 = 42;
636
637        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
638        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
639    }
640
641    #[test]
642    fn time_after_overflow() {
643        let executor = TestExecutor::new_with_fake_time();
644
645        executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
646        assert_eq!(
647            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
648            MonotonicInstant::INFINITE
649        );
650
651        executor.set_fake_time(
652            MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
653        );
654        assert_eq!(
655            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
656            MonotonicInstant::INFINITE_PAST
657        );
658    }
659
660    // This future wakes itself up a number of times during the same cycle
661    async fn multi_wake(n: usize) {
662        let mut done = false;
663        futures::future::poll_fn(|cx| {
664            if done {
665                return Poll::Ready(());
666            }
667            for _ in 1..n {
668                cx.waker().wake_by_ref()
669            }
670            done = true;
671            Poll::Pending
672        })
673        .await;
674    }
675
676    #[test]
677    fn test_boot_time_tracks_mono_time() {
678        const FAKE_TIME: i64 = 42;
679        let executor = TestExecutor::new_with_fake_time();
680        executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
681        assert_eq!(
682            BootInstant::from_nanos(FAKE_TIME),
683            executor.boot_now(),
684            "boot time should have advanced"
685        );
686
687        // Now advance boot without mono.
688        executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
689        assert_eq!(
690            BootInstant::from_nanos(2 * FAKE_TIME),
691            executor.boot_now(),
692            "boot time should have advanced again"
693        );
694    }
695
696    // Ensure that a large amount of wakeups does not exhaust kernel resources,
697    // such as the zx port queue limit.
698    #[test]
699    fn many_wakeups() {
700        let mut executor = LocalExecutor::new();
701        executor.run_singlethreaded(multi_wake(4096 * 2));
702    }
703
704    fn advance_to_with(timer_duration: impl WakeupTime) {
705        let mut executor = TestExecutor::new_with_fake_time();
706        executor.set_fake_time(MonotonicInstant::from_nanos(0));
707
708        let mut fut = pin!(async {
709            let timer_fired = Arc::new(AtomicBool::new(false));
710            futures::join!(
711                async {
712                    // Oneshot timer.
713                    Timer::new(timer_duration).await;
714                    timer_fired.store(true, Ordering::SeqCst);
715                },
716                async {
717                    // Interval timer, fires periodically.
718                    let mut fired = 0;
719                    let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
720                    while interval.next().await.is_some() {
721                        fired += 1;
722                        if fired == 3 {
723                            break;
724                        }
725                    }
726                    assert_eq!(fired, 3, "interval timer should have fired multiple times.");
727                },
728                async {
729                    assert!(
730                        !timer_fired.load(Ordering::SeqCst),
731                        "the oneshot timer shouldn't be fired"
732                    );
733                    TestExecutor::advance_to(MonotonicInstant::after(
734                        zx::MonotonicDuration::from_millis(500),
735                    ))
736                    .await;
737                    // Timer still shouldn't be fired.
738                    assert!(
739                        !timer_fired.load(Ordering::SeqCst),
740                        "the oneshot timer shouldn't be fired"
741                    );
742                    TestExecutor::advance_to(MonotonicInstant::after(
743                        zx::MonotonicDuration::from_millis(500),
744                    ))
745                    .await;
746
747                    assert!(
748                        timer_fired.load(Ordering::SeqCst),
749                        "the oneshot timer should have fired"
750                    );
751
752                    // The interval timer should have fired once.  Make it fire twice more.
753                    TestExecutor::advance_to(MonotonicInstant::after(
754                        zx::MonotonicDuration::from_seconds(2),
755                    ))
756                    .await;
757                }
758            )
759        });
760        assert!(executor.run_until_stalled(&mut fut).is_ready());
761    }
762
763    #[test]
764    fn test_advance_to() {
765        advance_to_with(zx::MonotonicDuration::from_seconds(1));
766    }
767
768    #[test]
769    fn test_advance_to_boot() {
770        advance_to_with(zx::BootDuration::from_seconds(1));
771    }
772}