fuchsia_async/runtime/fuchsia/executor/
local.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, MAIN_TASK_ID, TaskHandle};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{Future, poll_fn};
16use std::pin::pin;
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
19use std::task::{Context, Poll};
20
21/// A single-threaded port-based executor for Fuchsia.
22///
23/// Having a `LocalExecutor` in scope allows the creation and polling of zircon objects, such as
24/// [`fuchsia_async::Channel`].
25///
26/// # Panics
27///
28/// `LocalExecutor` will panic on drop if any zircon objects attached to it are still alive. In
29/// other words, zircon objects backed by a `LocalExecutor` must be dropped before it.
30pub struct LocalExecutor {
31    // LINT.IfChange
32    /// The inner executor state.
33    pub(crate) ehandle: EHandle,
34    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
35}
36
37impl fmt::Debug for LocalExecutor {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40    }
41}
42
43impl Default for LocalExecutor {
44    /// Create a new single-threaded executor running with actual time.
45    fn default() -> Self {
46        Self::new_with_port(zx::Port::create(), None)
47    }
48}
49
50impl LocalExecutor {
51    /// Create a new single-threaded executor running with actual time, with a port
52    /// and instrumentation.
53    pub(crate) fn new_with_port(
54        port: zx::Port,
55        instrument: Option<Arc<dyn TaskInstrument>>,
56    ) -> Self {
57        let inner = Arc::new(Executor::new_with_port(
58            ExecutorTime::RealTime,
59            /* is_local */ true,
60            /* num_threads */ 1,
61            port,
62            instrument,
63        ));
64        let root_scope = ScopeHandle::root(inner);
65        Executor::set_local(root_scope.clone());
66        Self { ehandle: EHandle { root_scope } }
67    }
68
69    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
70    pub fn port(&self) -> &zx::Port {
71        self.ehandle.port()
72    }
73
74    /// Run a single future to completion on a single thread, also polling other active tasks.
75    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
76    where
77        F: Future,
78    {
79        assert!(
80            self.ehandle.inner().is_real_time(),
81            "Error: called `run_singlethreaded` on an executor using fake time"
82        );
83
84        let Poll::Ready(result) = self.run::</* UNTIL_STALLED: */ false, _>(main_future) else {
85            unreachable!()
86        };
87        result
88    }
89
90    fn run<const UNTIL_STALLED: bool, Fut: Future>(
91        &mut self,
92        main_future: Fut,
93    ) -> Poll<Fut::Output> {
94        /// # Safety
95        ///
96        /// See the comment below.
97        unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
98            unsafe { std::mem::transmute(obj) }
99        }
100
101        let scope = &self.ehandle.root_scope;
102        let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
103
104        // SAFETY: Erasing the lifetime is safe because we make sure to drop the main task within
105        // the required lifetime.
106        unsafe {
107            scope.insert_task(remove_lifetime(task), false);
108        }
109
110        struct DropMainTask<'a>(&'a EHandle);
111        impl Drop for DropMainTask<'_> {
112            fn drop(&mut self) {
113                // SAFETY: drop_main_tasks requires that the executor isn't running
114                // i.e. worker_lifecycle isn't running, which will be the case when this runs.
115                unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
116            }
117        }
118        let _drop_main_task = DropMainTask(&self.ehandle);
119
120        self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
121
122        // SAFETY: We spawned the task earlier, so `R` (the return type) will be the correct type
123        // here.
124        unsafe {
125            self.ehandle.global_scope().poll_join_result(
126                MAIN_TASK_ID,
127                &mut Context::from_waker(&futures::task::noop_waker()),
128            )
129        }
130    }
131
132    #[doc(hidden)]
133    /// Returns the root scope of the executor.
134    pub fn root_scope(&self) -> &ScopeHandle {
135        self.ehandle.global_scope()
136    }
137}
138
139impl Drop for LocalExecutor {
140    fn drop(&mut self) {
141        self.ehandle.inner().mark_done();
142        self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
143    }
144}
145
146/// A builder for `LocalExecutor`.
147#[derive(Default)]
148pub struct LocalExecutorBuilder {
149    port: Option<zx::Port>,
150    instrument: Option<Arc<dyn TaskInstrument>>,
151}
152
153impl LocalExecutorBuilder {
154    /// Creates a new builder used for constructing a `LocalExecutor`.
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Sets the port for the executor.
160    pub fn port(mut self, port: zx::Port) -> Self {
161        self.port = Some(port);
162        self
163    }
164
165    /// Sets the instrumentation hook.
166    pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
167        self.instrument = instrument;
168        self
169    }
170
171    /// Builds the `LocalExecutor`, consuming this `LocalExecutorBuilder`.
172    pub fn build(self) -> LocalExecutor {
173        match self.port {
174            Some(port) => LocalExecutor::new_with_port(port, self.instrument),
175            None => LocalExecutor::default(),
176        }
177    }
178}
179
180/// A single-threaded executor for testing. Exposes additional APIs for manipulating executor state
181/// and validating behavior of executed tasks.
182///
183/// TODO(https://fxbug.dev/375631801): This is lack of BootInstant support.
184pub struct TestExecutor {
185    /// LocalExecutor used under the hood, since most of the logic is shared.
186    local: LocalExecutor,
187}
188
189impl Default for TestExecutor {
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195impl TestExecutor {
196    /// Create a new executor for testing.
197    pub fn new() -> Self {
198        Self::builder().build()
199    }
200
201    /// Create a new single-threaded executor running with fake time.
202    pub fn new_with_fake_time() -> Self {
203        Self::builder().fake_time(true).build()
204    }
205
206    /// Creates a new builder for a `TestExecutor`.
207    pub fn builder() -> TestExecutorBuilder {
208        TestExecutorBuilder::new()
209    }
210
211    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
212    pub fn port(&self) -> &zx::Port {
213        self.local.port()
214    }
215
216    /// Return the current time according to the executor.
217    pub fn now(&self) -> MonotonicInstant {
218        self.local.ehandle.inner().now()
219    }
220
221    /// Return the current time on the boot timeline, according to the executor.
222    pub fn boot_now(&self) -> BootInstant {
223        self.local.ehandle.inner().boot_now()
224    }
225
226    /// Set the fake time to a given value.
227    ///
228    /// # Panics
229    ///
230    /// If the executor was not created with fake time.
231    pub fn set_fake_time(&self, t: MonotonicInstant) {
232        self.local.ehandle.inner().set_fake_time(t)
233    }
234
235    /// Set the offset between the reading of the monotonic and the boot
236    /// clocks.
237    ///
238    /// This is useful to test the situations in which the boot and monotonic
239    /// offsets diverge.  In realistic scenarios, the offset can only grow,
240    /// and testers should keep that in view when setting duration.
241    ///
242    /// # Panics
243    ///
244    /// If the executor was not created with fake time.
245    pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
246        self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
247    }
248
249    /// Get the global scope of the executor.
250    pub fn global_scope(&self) -> &ScopeHandle {
251        self.local.root_scope()
252    }
253
254    /// Run a single future to completion on a single thread, also polling other active tasks.
255    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
256    where
257        F: Future,
258    {
259        self.local.run_singlethreaded(main_future)
260    }
261
262    /// Poll the future. If it is not ready, dispatch available packets and possibly try
263    /// again. Timers will only fire if this executor uses fake time. Never blocks.
264    ///
265    /// This function is for testing. DO NOT use this function in tests or applications that
266    /// involve any interaction with other threads or processes, as those interactions
267    /// may become stalled waiting for signals from "the outside world" which is beyond
268    /// the knowledge of the executor.
269    ///
270    /// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
271    /// futures must first be pinned using the `pin!` macro.
272    pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
273    where
274        F: Future + Unpin,
275    {
276        let mut main_future = pin!(main_future);
277
278        // Set up an instance of UntilStalledData that works with `poll_until_stalled`.
279        struct Cleanup(Arc<Executor>);
280        impl Drop for Cleanup {
281            fn drop(&mut self) {
282                *self.0.owner_data.lock() = None;
283            }
284        }
285        let _cleanup = Cleanup(self.local.ehandle.inner().clone());
286        *self.local.ehandle.inner().owner_data.lock() =
287            Some(Box::new(UntilStalledData { watcher: None }));
288
289        loop {
290            let result = self.local.run::</* UNTIL_STALLED: */ true, _>(main_future.as_mut());
291            if result.is_ready() {
292                return result;
293            }
294
295            // If a waker was set by `poll_until_stalled`, disarm, wake, and loop.
296            if let Some(watcher) = with_data(|data| data.watcher.take()) {
297                watcher.waker.wake();
298                // Relaxed ordering is fine here because this atomic is only ever access from the
299                // main thread.
300                watcher.done.store(true, Ordering::Relaxed);
301            } else {
302                break;
303            }
304        }
305
306        Poll::Pending
307    }
308
309    /// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
310    ///
311    /// This is intended for use in test code in conjunction with fake time.
312    ///
313    /// The wake will have effect on both the monotonic and the boot timers.
314    pub fn wake_expired_timers(&mut self) -> bool {
315        self.local.ehandle.inner().monotonic_timers().wake_timers()
316            || self.local.ehandle.inner().boot_timers().wake_timers()
317    }
318
319    /// Wake up the next task waiting for a timer, if any, and return the time for which the
320    /// timer was scheduled.
321    ///
322    /// This is intended for use in test code in conjunction with `run_until_stalled`.
323    /// For example, here is how one could test that the Timer future fires after the given
324    /// timeout:
325    ///
326    ///     let deadline = zx::MonotonicDuration::from_seconds(5).after_now();
327    ///     let mut future = Timer::<Never>::new(deadline);
328    ///     assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
329    ///     assert_eq!(Some(deadline), exec.wake_next_timer());
330    ///     assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
331    pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
332        self.local.ehandle.inner().monotonic_timers().wake_next_timer()
333    }
334
335    /// Similar to [wake_next_timer], but operates on the timers on the boot
336    /// timeline.
337    pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
338        self.local.ehandle.inner().boot_timers().wake_next_timer()
339    }
340
341    /// Returns the deadline for the next timer due to expire.
342    pub fn next_timer() -> Option<MonotonicInstant> {
343        EHandle::local().inner().monotonic_timers().next_timer()
344    }
345
346    /// Returns the deadline for the next boot timeline timer due to expire.
347    pub fn next_boot_timer() -> Option<BootInstant> {
348        EHandle::local().inner().boot_timers().next_timer()
349    }
350
351    /// Advances fake time to the specified time.  This will only work if the executor is being run
352    /// via `TestExecutor::run_until_stalled` and can only be called by one task at a time.  This
353    /// will make sure that repeating timers fire as expected.
354    ///
355    /// # Panics
356    ///
357    /// Panics if the executor was not created with fake time, and for the same reasons
358    /// `poll_until_stalled` can below.
359    pub async fn advance_to(time: MonotonicInstant) {
360        let ehandle = EHandle::local();
361        loop {
362            let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
363            if let Some(next_timer) = Self::next_timer()
364                && next_timer <= time
365            {
366                ehandle.inner().set_fake_time(next_timer);
367                continue;
368            }
369            ehandle.inner().set_fake_time(time);
370            break;
371        }
372    }
373
374    /// Runs the future until it is ready or the executor is stalled. Returns the state of the
375    /// future.
376    ///
377    /// This will only work if the executor is being run via `TestExecutor::run_until_stalled` and
378    /// can only be called by one task at a time.
379    ///
380    /// This can be used in tests to assert that a future should be pending:
381    /// ```
382    /// assert!(
383    ///     TestExecutor::poll_until_stalled(my_fut).await.is_pending(),
384    ///     "my_fut should not be ready!"
385    /// );
386    /// ```
387    ///
388    /// If you just want to know when the executor is stalled, you can do:
389    /// ```
390    /// let _: Poll<()> = TestExecutor::poll_until_stalled(future::pending::<()>()).await;
391    /// ```
392    ///
393    /// # Panics
394    ///
395    /// Panics if another task is currently trying to use `run_until_stalled`, or the executor is
396    /// not using `TestExecutor::run_until_stalled`.
397    pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
398        let watcher =
399            Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
400
401        assert!(
402            with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
403            "Error: Another task has called `poll_until_stalled`."
404        );
405
406        struct Watcher(Arc<StalledWatcher>);
407
408        // Make sure we clean up if we're dropped.
409        impl Drop for Watcher {
410            fn drop(&mut self) {
411                if !self.0.done.swap(true, Ordering::Relaxed) {
412                    with_data(|data| data.watcher = None);
413                }
414            }
415        }
416
417        let watcher = Watcher(watcher);
418
419        let poll_fn = poll_fn(|cx: &mut Context<'_>| {
420            if watcher.0.done.load(Ordering::Relaxed) {
421                Poll::Ready(())
422            } else {
423                watcher.0.waker.register(cx.waker());
424                Poll::Pending
425            }
426        });
427        match future::select(poll_fn, fut).await {
428            Either::Left(_) => Poll::Pending,
429            Either::Right((value, _)) => Poll::Ready(value),
430        }
431    }
432}
433
434/// A builder for `TestExecutor`.
435#[derive(Default)]
436pub struct TestExecutorBuilder {
437    port: Option<zx::Port>,
438    fake_time: bool,
439    instrument: Option<Arc<dyn TaskInstrument>>,
440}
441
442impl TestExecutorBuilder {
443    /// Creates a new builder used for constructing a `TestExecutor`.
444    pub fn new() -> Self {
445        Self::default()
446    }
447
448    /// Sets the port for the executor.
449    pub fn port(mut self, port: zx::Port) -> Self {
450        self.port = Some(port);
451        self
452    }
453
454    /// Sets whether the executor should use fake time.
455    pub fn fake_time(mut self, fake_time: bool) -> Self {
456        self.fake_time = fake_time;
457        self
458    }
459
460    /// Sets the task instrumentation.
461    pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
462        self.instrument = Some(instrument);
463        self
464    }
465
466    /// Builds the `TestExecutor`, consuming this `TestExecutorBuilder`.
467    pub fn build(self) -> TestExecutor {
468        let time = if self.fake_time {
469            ExecutorTime::FakeTime {
470                mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
471                mono_to_boot_offset_ns: AtomicI64::new(0),
472            }
473        } else {
474            ExecutorTime::RealTime
475        };
476        let port = self.port.unwrap_or_else(zx::Port::create);
477        let inner = Arc::new(Executor::new_with_port(
478            time,
479            /* is_local */ true,
480            /* num_threads */ 1,
481            port,
482            self.instrument,
483        ));
484        let root_scope = ScopeHandle::root(inner);
485        Executor::set_local(root_scope.clone());
486        let local = LocalExecutor { ehandle: EHandle { root_scope } };
487        TestExecutor { local }
488    }
489}
490
491struct StalledWatcher {
492    waker: AtomicWaker,
493    done: AtomicBool,
494}
495
496struct UntilStalledData {
497    watcher: Option<Arc<StalledWatcher>>,
498}
499
500/// Calls `f` with `&mut UntilStalledData` that is stored in `owner_data`.
501///
502/// # Panics
503///
504/// Panics if `owner_data` isn't an instance of `UntilStalledData`.
505fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
506    const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
507                           with TestExecutor::run_until_stalled";
508    f(EHandle::local()
509        .inner()
510        .owner_data
511        .lock()
512        .as_mut()
513        .expect(MESSAGE)
514        .downcast_mut::<UntilStalledData>()
515        .expect(MESSAGE))
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use crate::handle::on_signals::OnSignalsFuture;
522    use crate::{Interval, Timer, WakeupTime};
523    use assert_matches::assert_matches;
524    use futures::StreamExt;
525    use std::cell::{Cell, RefCell};
526    use std::rc::Rc;
527    use std::task::Waker;
528
529    fn spawn(future: impl Future<Output = ()> + Send + 'static) {
530        crate::EHandle::local().spawn_detached(future);
531    }
532
533    // Runs a future that suspends and returns after being resumed.
534    #[test]
535    fn stepwise_two_steps() {
536        let fut_step = Rc::new(Cell::new(0));
537        let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
538        let fut_waker_clone = fut_waker.clone();
539        let fut_step_clone = fut_step.clone();
540        let fut_fn = move |cx: &mut Context<'_>| {
541            fut_waker_clone.borrow_mut().replace(cx.waker().clone());
542            match fut_step_clone.get() {
543                0 => {
544                    fut_step_clone.set(1);
545                    Poll::Pending
546                }
547                1 => {
548                    fut_step_clone.set(2);
549                    Poll::Ready(())
550                }
551                _ => panic!("future called after done"),
552            }
553        };
554        let fut = Box::new(future::poll_fn(fut_fn));
555        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
556        // Spawn the future rather than waking it the main task because run_until_stalled will wake
557        // the main future on every call, and we want to wake it ourselves using the waker.
558        executor.local.ehandle.spawn_local_detached(fut);
559        assert_eq!(fut_step.get(), 0);
560        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
561        assert_eq!(fut_step.get(), 1);
562
563        fut_waker.borrow_mut().take().unwrap().wake();
564        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
565        assert_eq!(fut_step.get(), 2);
566    }
567
568    #[test]
569    // Runs a future that waits on a timer.
570    fn stepwise_timer() {
571        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
572        executor.set_fake_time(MonotonicInstant::from_nanos(0));
573        let mut fut =
574            pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
575
576        let _ = executor.run_until_stalled(&mut fut);
577        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
578
579        executor.set_fake_time(MonotonicInstant::from_nanos(1000));
580        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
581        assert!(executor.run_until_stalled(&mut fut).is_ready());
582    }
583
584    // Runs a future that waits on an event.
585    #[test]
586    fn stepwise_event() {
587        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
588        let event = zx::Event::create();
589        let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
590
591        let _ = executor.run_until_stalled(&mut fut);
592
593        event.signal(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
594        assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
595    }
596
597    // Using `run_until_stalled` does not modify the order of events
598    // compared to normal execution.
599    #[test]
600    fn run_until_stalled_preserves_order() {
601        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
602        let spawned_fut_completed = Arc::new(AtomicBool::new(false));
603        let spawned_fut_completed_writer = spawned_fut_completed.clone();
604        let spawned_fut = Box::pin(async move {
605            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
606            spawned_fut_completed_writer.store(true, Ordering::SeqCst);
607        });
608        let mut main_fut = pin!(async {
609            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
610        });
611        spawn(spawned_fut);
612        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
613        executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
614        // The timer in `spawned_fut` should fire first, then the
615        // timer in `main_fut`.
616        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
617        assert!(spawned_fut_completed.load(Ordering::SeqCst));
618    }
619
620    #[test]
621    fn task_destruction() {
622        struct DropSpawner {
623            dropped: Arc<AtomicBool>,
624        }
625        impl Drop for DropSpawner {
626            fn drop(&mut self) {
627                self.dropped.store(true, Ordering::SeqCst);
628                let dropped_clone = self.dropped.clone();
629                spawn(async {
630                    // Hold on to a reference here to verify that it, too, is destroyed later
631                    let _dropped_clone = dropped_clone;
632                    panic!("task spawned in drop shouldn't be polled");
633                });
634            }
635        }
636        let mut dropped = Arc::new(AtomicBool::new(false));
637        let drop_spawner = DropSpawner { dropped: dropped.clone() };
638        let mut executor = TestExecutorBuilder::new().build();
639        let mut main_fut = pin!(async move {
640            spawn(async move {
641                // Take ownership of the drop spawner
642                let _drop_spawner = drop_spawner;
643                future::pending::<()>().await;
644            });
645        });
646        assert!(executor.run_until_stalled(&mut main_fut).is_ready());
647        assert!(
648            !dropped.load(Ordering::SeqCst),
649            "executor dropped pending task before destruction"
650        );
651
652        // Should drop the pending task and it's owned drop spawner,
653        // as well as gracefully drop the future spawned from the drop spawner.
654        drop(executor);
655        let dropped = Arc::get_mut(&mut dropped)
656            .expect("someone else is unexpectedly still holding on to a reference");
657        assert!(
658            dropped.load(Ordering::SeqCst),
659            "executor did not drop pending task during destruction"
660        );
661    }
662
663    #[test]
664    fn time_now_real_time() {
665        let _executor = LocalExecutorBuilder::new().build();
666        let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
667        let t2 = MonotonicInstant::now().into_zx();
668        let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
669        assert!(t1 <= t2);
670        assert!(t2 <= t3);
671    }
672
673    #[test]
674    fn time_now_fake_time() {
675        let executor = TestExecutorBuilder::new().fake_time(true).build();
676        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
677        executor.set_fake_time(t1);
678        assert_eq!(MonotonicInstant::now(), t1);
679
680        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
681        executor.set_fake_time(t2);
682        assert_eq!(MonotonicInstant::now(), t2);
683    }
684
685    #[test]
686    fn time_now_fake_time_boot() {
687        let executor = TestExecutorBuilder::new().fake_time(true).build();
688        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
689        executor.set_fake_time(t1);
690        assert_eq!(MonotonicInstant::now(), t1);
691        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
692
693        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
694        executor.set_fake_time(t2);
695        assert_eq!(MonotonicInstant::now(), t2);
696        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
697
698        const TEST_BOOT_OFFSET: i64 = 42;
699
700        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
701        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
702    }
703
704    #[test]
705    fn time_boot_now() {
706        let executor = TestExecutorBuilder::new().fake_time(true).build();
707        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
708        executor.set_fake_time(t1);
709        assert_eq!(MonotonicInstant::now(), t1);
710        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
711
712        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
713        executor.set_fake_time(t2);
714        assert_eq!(MonotonicInstant::now(), t2);
715        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
716
717        const TEST_BOOT_OFFSET: i64 = 42;
718
719        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
720        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
721    }
722
723    #[test]
724    fn time_after_overflow() {
725        let executor = TestExecutorBuilder::new().fake_time(true).build();
726
727        executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
728        assert_eq!(
729            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
730            MonotonicInstant::INFINITE
731        );
732
733        executor.set_fake_time(
734            MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
735        );
736        assert_eq!(
737            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
738            MonotonicInstant::INFINITE_PAST
739        );
740    }
741
742    // This future wakes itself up a number of times during the same cycle
743    async fn multi_wake(n: usize) {
744        let mut done = false;
745        futures::future::poll_fn(|cx| {
746            if done {
747                return Poll::Ready(());
748            }
749            for _ in 1..n {
750                cx.waker().wake_by_ref()
751            }
752            done = true;
753            Poll::Pending
754        })
755        .await;
756    }
757
758    #[test]
759    fn test_boot_time_tracks_mono_time() {
760        const FAKE_TIME: i64 = 42;
761        let executor = TestExecutorBuilder::new().fake_time(true).build();
762        executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
763        assert_eq!(
764            BootInstant::from_nanos(FAKE_TIME),
765            executor.boot_now(),
766            "boot time should have advanced"
767        );
768
769        // Now advance boot without mono.
770        executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
771        assert_eq!(
772            BootInstant::from_nanos(2 * FAKE_TIME),
773            executor.boot_now(),
774            "boot time should have advanced again"
775        );
776    }
777
778    // Ensure that a large amount of wakeups does not exhaust kernel resources,
779    // such as the zx port queue limit.
780    #[test]
781    fn many_wakeups() {
782        let mut executor = LocalExecutorBuilder::new().build();
783        executor.run_singlethreaded(multi_wake(4096 * 2));
784    }
785
786    fn advance_to_with(timer_duration: impl WakeupTime) {
787        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
788        executor.set_fake_time(MonotonicInstant::from_nanos(0));
789
790        let mut fut = pin!(async {
791            let timer_fired = Arc::new(AtomicBool::new(false));
792            futures::join!(
793                async {
794                    // Oneshot timer.
795                    Timer::new(timer_duration).await;
796                    timer_fired.store(true, Ordering::SeqCst);
797                },
798                async {
799                    // Interval timer, fires periodically.
800                    let mut fired = 0;
801                    let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
802                    while interval.next().await.is_some() {
803                        fired += 1;
804                        if fired == 3 {
805                            break;
806                        }
807                    }
808                    assert_eq!(fired, 3, "interval timer should have fired multiple times.");
809                },
810                async {
811                    assert!(
812                        !timer_fired.load(Ordering::SeqCst),
813                        "the oneshot timer shouldn't be fired"
814                    );
815                    TestExecutor::advance_to(MonotonicInstant::after(
816                        zx::MonotonicDuration::from_millis(500),
817                    ))
818                    .await;
819                    // Timer still shouldn't be fired.
820                    assert!(
821                        !timer_fired.load(Ordering::SeqCst),
822                        "the oneshot timer shouldn't be fired"
823                    );
824                    TestExecutor::advance_to(MonotonicInstant::after(
825                        zx::MonotonicDuration::from_millis(500),
826                    ))
827                    .await;
828
829                    assert!(
830                        timer_fired.load(Ordering::SeqCst),
831                        "the oneshot timer should have fired"
832                    );
833
834                    // The interval timer should have fired once.  Make it fire twice more.
835                    TestExecutor::advance_to(MonotonicInstant::after(
836                        zx::MonotonicDuration::from_seconds(2),
837                    ))
838                    .await;
839                }
840            )
841        });
842        assert!(executor.run_until_stalled(&mut fut).is_ready());
843    }
844
845    #[test]
846    fn test_advance_to() {
847        advance_to_with(zx::MonotonicDuration::from_seconds(1));
848    }
849
850    #[test]
851    fn test_advance_to_boot() {
852        advance_to_with(zx::BootDuration::from_seconds(1));
853    }
854}