fuchsia_async/runtime/fuchsia/executor/
local.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::atomic_future::AtomicFutureHandle;
6use super::common::{EHandle, Executor, ExecutorTime, TaskHandle, MAIN_TASK_ID};
7use super::scope::ScopeHandle;
8use super::time::{BootInstant, MonotonicInstant};
9use zx::BootDuration;
10
11use crate::runtime::instrument::TaskInstrument;
12use futures::future::{self, Either};
13use futures::task::AtomicWaker;
14use std::fmt;
15use std::future::{poll_fn, Future};
16use std::pin::pin;
17use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21/// A single-threaded port-based executor for Fuchsia.
22///
23/// Having a `LocalExecutor` in scope allows the creation and polling of zircon objects, such as
24/// [`fuchsia_async::Channel`].
25///
26/// # Panics
27///
28/// `LocalExecutor` will panic on drop if any zircon objects attached to it are still alive. In
29/// other words, zircon objects backed by a `LocalExecutor` must be dropped before it.
30pub struct LocalExecutor {
31    // LINT.IfChange
32    /// The inner executor state.
33    pub(crate) ehandle: EHandle,
34    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
35}
36
37impl fmt::Debug for LocalExecutor {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("LocalExecutor").field("port", &self.ehandle.inner().port).finish()
40    }
41}
42
43impl Default for LocalExecutor {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl LocalExecutor {
50    /// Create a new single-threaded executor running with actual time.
51    pub fn new() -> Self {
52        Self::new_with_port(zx::Port::create(), None)
53    }
54
55    /// Create a new single-threaded executor running with actual time, with a port
56    /// and instrumentation.
57    pub(crate) fn new_with_port(
58        port: zx::Port,
59        instrument: Option<Arc<dyn TaskInstrument>>,
60    ) -> Self {
61        let inner = Arc::new(Executor::new_with_port(
62            ExecutorTime::RealTime,
63            /* is_local */ true,
64            /* num_threads */ 1,
65            port,
66            instrument,
67        ));
68        let root_scope = ScopeHandle::root(inner);
69        Executor::set_local(root_scope.clone());
70        Self { ehandle: EHandle { root_scope } }
71    }
72
73    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
74    pub fn port(&self) -> &zx::Port {
75        self.ehandle.port()
76    }
77
78    /// Run a single future to completion on a single thread, also polling other active tasks.
79    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
80    where
81        F: Future,
82    {
83        assert!(
84            self.ehandle.inner().is_real_time(),
85            "Error: called `run_singlethreaded` on an executor using fake time"
86        );
87
88        let Poll::Ready(result) = self.run::</* UNTIL_STALLED: */ false, _>(main_future) else {
89            unreachable!()
90        };
91        result
92    }
93
94    fn run<const UNTIL_STALLED: bool, Fut: Future>(
95        &mut self,
96        main_future: Fut,
97    ) -> Poll<Fut::Output> {
98        /// # Safety
99        ///
100        /// See the comment below.
101        unsafe fn remove_lifetime(obj: AtomicFutureHandle<'_>) -> TaskHandle {
102            std::mem::transmute(obj)
103        }
104
105        let scope = &self.ehandle.root_scope;
106        let task = scope.new_local_task(Some(MAIN_TASK_ID), main_future);
107
108        // SAFETY: Erasing the lifetime is safe because we make sure to drop the main task within
109        // the required lifetime.
110        unsafe {
111            scope.insert_task(remove_lifetime(task), false);
112        }
113
114        struct DropMainTask<'a>(&'a EHandle);
115        impl Drop for DropMainTask<'_> {
116            fn drop(&mut self) {
117                // SAFETY: drop_main_tasks requires that the executor isn't running
118                // i.e. worker_lifecycle isn't running, which will be the case when this runs.
119                unsafe { self.0.inner().drop_main_task(&self.0.root_scope) };
120            }
121        }
122        let _drop_main_task = DropMainTask(&self.ehandle);
123
124        self.ehandle.inner().worker_lifecycle::<UNTIL_STALLED>();
125
126        // SAFETY: We spawned the task earlier, so `R` (the return type) will be the correct type
127        // here.
128        unsafe {
129            self.ehandle.global_scope().poll_join_result(
130                MAIN_TASK_ID,
131                &mut Context::from_waker(&futures::task::noop_waker()),
132            )
133        }
134    }
135
136    #[doc(hidden)]
137    /// Returns the root scope of the executor.
138    pub fn root_scope(&self) -> &ScopeHandle {
139        self.ehandle.global_scope()
140    }
141}
142
143impl Drop for LocalExecutor {
144    fn drop(&mut self) {
145        self.ehandle.inner().mark_done();
146        self.ehandle.inner().on_parent_drop(&self.ehandle.root_scope);
147    }
148}
149
150/// A builder for `LocalExecutor`.
151#[derive(Default)]
152pub struct LocalExecutorBuilder {
153    port: Option<zx::Port>,
154    instrument: Option<Arc<dyn TaskInstrument>>,
155}
156
157impl LocalExecutorBuilder {
158    /// Creates a new builder used for constructing a `LocalExecutor`.
159    pub fn new() -> Self {
160        Self::default()
161    }
162
163    /// Sets the port for the executor.
164    pub fn port(mut self, port: zx::Port) -> Self {
165        self.port = Some(port);
166        self
167    }
168
169    /// Sets the instrumentation hook.
170    pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
171        self.instrument = instrument;
172        self
173    }
174
175    /// Builds the `LocalExecutor`, consuming this `LocalExecutorBuilder`.
176    pub fn build(self) -> LocalExecutor {
177        match self.port {
178            Some(port) => LocalExecutor::new_with_port(port, self.instrument),
179            None => LocalExecutor::new(),
180        }
181    }
182}
183
184/// A single-threaded executor for testing. Exposes additional APIs for manipulating executor state
185/// and validating behavior of executed tasks.
186///
187/// TODO(https://fxbug.dev/375631801): This is lack of BootInstant support.
188pub struct TestExecutor {
189    /// LocalExecutor used under the hood, since most of the logic is shared.
190    local: LocalExecutor,
191}
192
193impl Default for TestExecutor {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199impl TestExecutor {
200    /// Create a new executor for testing.
201    pub fn new() -> Self {
202        Self::builder().build()
203    }
204
205    /// Create a new single-threaded executor running with fake time.
206    pub fn new_with_fake_time() -> Self {
207        Self::builder().fake_time(true).build()
208    }
209
210    /// Creates a new builder for a `TestExecutor`.
211    pub fn builder() -> TestExecutorBuilder {
212        TestExecutorBuilder::new()
213    }
214
215    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
216    pub fn port(&self) -> &zx::Port {
217        self.local.port()
218    }
219
220    /// Return the current time according to the executor.
221    pub fn now(&self) -> MonotonicInstant {
222        self.local.ehandle.inner().now()
223    }
224
225    /// Return the current time on the boot timeline, according to the executor.
226    pub fn boot_now(&self) -> BootInstant {
227        self.local.ehandle.inner().boot_now()
228    }
229
230    /// Set the fake time to a given value.
231    ///
232    /// # Panics
233    ///
234    /// If the executor was not created with fake time.
235    pub fn set_fake_time(&self, t: MonotonicInstant) {
236        self.local.ehandle.inner().set_fake_time(t)
237    }
238
239    /// Set the offset between the reading of the monotonic and the boot
240    /// clocks.
241    ///
242    /// This is useful to test the situations in which the boot and monotonic
243    /// offsets diverge.  In realistic scenarios, the offset can only grow,
244    /// and testers should keep that in view when setting duration.
245    ///
246    /// # Panics
247    ///
248    /// If the executor was not created with fake time.
249    pub fn set_fake_boot_to_mono_offset(&self, d: BootDuration) {
250        self.local.ehandle.inner().set_fake_boot_to_mono_offset(d)
251    }
252
253    /// Get the global scope of the executor.
254    pub fn global_scope(&self) -> &ScopeHandle {
255        self.local.root_scope()
256    }
257
258    /// Run a single future to completion on a single thread, also polling other active tasks.
259    pub fn run_singlethreaded<F>(&mut self, main_future: F) -> F::Output
260    where
261        F: Future,
262    {
263        self.local.run_singlethreaded(main_future)
264    }
265
266    /// Poll the future. If it is not ready, dispatch available packets and possibly try
267    /// again. Timers will only fire if this executor uses fake time. Never blocks.
268    ///
269    /// This function is for testing. DO NOT use this function in tests or applications that
270    /// involve any interaction with other threads or processes, as those interactions
271    /// may become stalled waiting for signals from "the outside world" which is beyond
272    /// the knowledge of the executor.
273    ///
274    /// Unpin: this function requires all futures to be `Unpin`able, so any `!Unpin`
275    /// futures must first be pinned using the `pin!` macro.
276    pub fn run_until_stalled<F>(&mut self, main_future: &mut F) -> Poll<F::Output>
277    where
278        F: Future + Unpin,
279    {
280        let mut main_future = pin!(main_future);
281
282        // Set up an instance of UntilStalledData that works with `poll_until_stalled`.
283        struct Cleanup(Arc<Executor>);
284        impl Drop for Cleanup {
285            fn drop(&mut self) {
286                *self.0.owner_data.lock() = None;
287            }
288        }
289        let _cleanup = Cleanup(self.local.ehandle.inner().clone());
290        *self.local.ehandle.inner().owner_data.lock() =
291            Some(Box::new(UntilStalledData { watcher: None }));
292
293        loop {
294            let result = self.local.run::</* UNTIL_STALLED: */ true, _>(main_future.as_mut());
295            if result.is_ready() {
296                return result;
297            }
298
299            // If a waker was set by `poll_until_stalled`, disarm, wake, and loop.
300            if let Some(watcher) = with_data(|data| data.watcher.take()) {
301                watcher.waker.wake();
302                // Relaxed ordering is fine here because this atomic is only ever access from the
303                // main thread.
304                watcher.done.store(true, Ordering::Relaxed);
305            } else {
306                break;
307            }
308        }
309
310        Poll::Pending
311    }
312
313    /// Wake all tasks waiting for expired timers, and return `true` if any task was woken.
314    ///
315    /// This is intended for use in test code in conjunction with fake time.
316    ///
317    /// The wake will have effect on both the monotonic and the boot timers.
318    pub fn wake_expired_timers(&mut self) -> bool {
319        self.local.ehandle.inner().monotonic_timers().wake_timers()
320            || self.local.ehandle.inner().boot_timers().wake_timers()
321    }
322
323    /// Wake up the next task waiting for a timer, if any, and return the time for which the
324    /// timer was scheduled.
325    ///
326    /// This is intended for use in test code in conjunction with `run_until_stalled`.
327    /// For example, here is how one could test that the Timer future fires after the given
328    /// timeout:
329    ///
330    ///     let deadline = zx::MonotonicDuration::from_seconds(5).after_now();
331    ///     let mut future = Timer::<Never>::new(deadline);
332    ///     assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
333    ///     assert_eq!(Some(deadline), exec.wake_next_timer());
334    ///     assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
335    pub fn wake_next_timer(&mut self) -> Option<MonotonicInstant> {
336        self.local.ehandle.inner().monotonic_timers().wake_next_timer()
337    }
338
339    /// Similar to [wake_next_timer], but operates on the timers on the boot
340    /// timeline.
341    pub fn wake_next_boot_timer(&mut self) -> Option<BootInstant> {
342        self.local.ehandle.inner().boot_timers().wake_next_timer()
343    }
344
345    /// Returns the deadline for the next timer due to expire.
346    pub fn next_timer() -> Option<MonotonicInstant> {
347        EHandle::local().inner().monotonic_timers().next_timer()
348    }
349
350    /// Returns the deadline for the next boot timeline timer due to expire.
351    pub fn next_boot_timer() -> Option<BootInstant> {
352        EHandle::local().inner().boot_timers().next_timer()
353    }
354
355    /// Advances fake time to the specified time.  This will only work if the executor is being run
356    /// via `TestExecutor::run_until_stalled` and can only be called by one task at a time.  This
357    /// will make sure that repeating timers fire as expected.
358    ///
359    /// # Panics
360    ///
361    /// Panics if the executor was not created with fake time, and for the same reasons
362    /// `poll_until_stalled` can below.
363    pub async fn advance_to(time: MonotonicInstant) {
364        let ehandle = EHandle::local();
365        loop {
366            let _: Poll<_> = Self::poll_until_stalled(future::pending::<()>()).await;
367            if let Some(next_timer) = Self::next_timer() {
368                if next_timer <= time {
369                    ehandle.inner().set_fake_time(next_timer);
370                    continue;
371                }
372            }
373            ehandle.inner().set_fake_time(time);
374            break;
375        }
376    }
377
378    /// Runs the future until it is ready or the executor is stalled. Returns the state of the
379    /// future.
380    ///
381    /// This will only work if the executor is being run via `TestExecutor::run_until_stalled` and
382    /// can only be called by one task at a time.
383    ///
384    /// This can be used in tests to assert that a future should be pending:
385    /// ```
386    /// assert!(
387    ///     TestExecutor::poll_until_stalled(my_fut).await.is_pending(),
388    ///     "my_fut should not be ready!"
389    /// );
390    /// ```
391    ///
392    /// If you just want to know when the executor is stalled, you can do:
393    /// ```
394    /// let _: Poll<()> = TestExecutor::poll_until_stalled(future::pending::<()>()).await;
395    /// ```
396    ///
397    /// # Panics
398    ///
399    /// Panics if another task is currently trying to use `run_until_stalled`, or the executor is
400    /// not using `TestExecutor::run_until_stalled`.
401    pub async fn poll_until_stalled<T>(fut: impl Future<Output = T> + Unpin) -> Poll<T> {
402        let watcher =
403            Arc::new(StalledWatcher { waker: AtomicWaker::new(), done: AtomicBool::new(false) });
404
405        assert!(
406            with_data(|data| data.watcher.replace(watcher.clone())).is_none(),
407            "Error: Another task has called `poll_until_stalled`."
408        );
409
410        struct Watcher(Arc<StalledWatcher>);
411
412        // Make sure we clean up if we're dropped.
413        impl Drop for Watcher {
414            fn drop(&mut self) {
415                if !self.0.done.swap(true, Ordering::Relaxed) {
416                    with_data(|data| data.watcher = None);
417                }
418            }
419        }
420
421        let watcher = Watcher(watcher);
422
423        let poll_fn = poll_fn(|cx: &mut Context<'_>| {
424            if watcher.0.done.load(Ordering::Relaxed) {
425                Poll::Ready(())
426            } else {
427                watcher.0.waker.register(cx.waker());
428                Poll::Pending
429            }
430        });
431        match future::select(poll_fn, fut).await {
432            Either::Left(_) => Poll::Pending,
433            Either::Right((value, _)) => Poll::Ready(value),
434        }
435    }
436}
437
438/// A builder for `TestExecutor`.
439#[derive(Default)]
440pub struct TestExecutorBuilder {
441    port: Option<zx::Port>,
442    fake_time: bool,
443    instrument: Option<Arc<dyn TaskInstrument>>,
444}
445
446impl TestExecutorBuilder {
447    /// Creates a new builder used for constructing a `TestExecutor`.
448    pub fn new() -> Self {
449        Self::default()
450    }
451
452    /// Sets the port for the executor.
453    pub fn port(mut self, port: zx::Port) -> Self {
454        self.port = Some(port);
455        self
456    }
457
458    /// Sets whether the executor should use fake time.
459    pub fn fake_time(mut self, fake_time: bool) -> Self {
460        self.fake_time = fake_time;
461        self
462    }
463
464    /// Sets the task instrumentation.
465    pub fn instrument(mut self, instrument: Arc<dyn TaskInstrument>) -> Self {
466        self.instrument = Some(instrument);
467        self
468    }
469
470    /// Builds the `TestExecutor`, consuming this `TestExecutorBuilder`.
471    pub fn build(self) -> TestExecutor {
472        let time = if self.fake_time {
473            ExecutorTime::FakeTime {
474                mono_reading_ns: AtomicI64::new(zx::MonotonicInstant::INFINITE_PAST.into_nanos()),
475                mono_to_boot_offset_ns: AtomicI64::new(0),
476            }
477        } else {
478            ExecutorTime::RealTime
479        };
480        let port = self.port.unwrap_or_else(zx::Port::create);
481        let inner = Arc::new(Executor::new_with_port(
482            time,
483            /* is_local */ true,
484            /* num_threads */ 1,
485            port,
486            self.instrument,
487        ));
488        let root_scope = ScopeHandle::root(inner);
489        Executor::set_local(root_scope.clone());
490        let local = LocalExecutor { ehandle: EHandle { root_scope } };
491        TestExecutor { local }
492    }
493}
494
495struct StalledWatcher {
496    waker: AtomicWaker,
497    done: AtomicBool,
498}
499
500struct UntilStalledData {
501    watcher: Option<Arc<StalledWatcher>>,
502}
503
504/// Calls `f` with `&mut UntilStalledData` that is stored in `owner_data`.
505///
506/// # Panics
507///
508/// Panics if `owner_data` isn't an instance of `UntilStalledData`.
509fn with_data<R>(f: impl Fn(&mut UntilStalledData) -> R) -> R {
510    const MESSAGE: &str = "poll_until_stalled only works if the executor is being run \
511                           with TestExecutor::run_until_stalled";
512    f(EHandle::local()
513        .inner()
514        .owner_data
515        .lock()
516        .as_mut()
517        .expect(MESSAGE)
518        .downcast_mut::<UntilStalledData>()
519        .expect(MESSAGE))
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525    use crate::handle::on_signals::OnSignalsFuture;
526    use crate::{Interval, Timer, WakeupTime};
527    use assert_matches::assert_matches;
528    use futures::StreamExt;
529    use std::cell::{Cell, RefCell};
530    use std::rc::Rc;
531    use std::task::Waker;
532    use zx::{self as zx, AsHandleRef};
533
534    fn spawn(future: impl Future<Output = ()> + Send + 'static) {
535        crate::EHandle::local().spawn_detached(future);
536    }
537
538    // Runs a future that suspends and returns after being resumed.
539    #[test]
540    fn stepwise_two_steps() {
541        let fut_step = Rc::new(Cell::new(0));
542        let fut_waker: Rc<RefCell<Option<Waker>>> = Rc::new(RefCell::new(None));
543        let fut_waker_clone = fut_waker.clone();
544        let fut_step_clone = fut_step.clone();
545        let fut_fn = move |cx: &mut Context<'_>| {
546            fut_waker_clone.borrow_mut().replace(cx.waker().clone());
547            match fut_step_clone.get() {
548                0 => {
549                    fut_step_clone.set(1);
550                    Poll::Pending
551                }
552                1 => {
553                    fut_step_clone.set(2);
554                    Poll::Ready(())
555                }
556                _ => panic!("future called after done"),
557            }
558        };
559        let fut = Box::new(future::poll_fn(fut_fn));
560        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
561        // Spawn the future rather than waking it the main task because run_until_stalled will wake
562        // the main future on every call, and we want to wake it ourselves using the waker.
563        executor.local.ehandle.spawn_local_detached(fut);
564        assert_eq!(fut_step.get(), 0);
565        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
566        assert_eq!(fut_step.get(), 1);
567
568        fut_waker.borrow_mut().take().unwrap().wake();
569        assert_eq!(executor.run_until_stalled(&mut future::pending::<()>()), Poll::Pending);
570        assert_eq!(fut_step.get(), 2);
571    }
572
573    #[test]
574    // Runs a future that waits on a timer.
575    fn stepwise_timer() {
576        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
577        executor.set_fake_time(MonotonicInstant::from_nanos(0));
578        let mut fut =
579            pin!(Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_nanos(1000))));
580
581        let _ = executor.run_until_stalled(&mut fut);
582        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(0));
583
584        executor.set_fake_time(MonotonicInstant::from_nanos(1000));
585        assert_eq!(MonotonicInstant::now(), MonotonicInstant::from_nanos(1000));
586        assert!(executor.run_until_stalled(&mut fut).is_ready());
587    }
588
589    // Runs a future that waits on an event.
590    #[test]
591    fn stepwise_event() {
592        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
593        let event = zx::Event::create();
594        let mut fut = pin!(OnSignalsFuture::new(&event, zx::Signals::USER_0));
595
596        let _ = executor.run_until_stalled(&mut fut);
597
598        event.signal_handle(zx::Signals::NONE, zx::Signals::USER_0).unwrap();
599        assert_matches!(executor.run_until_stalled(&mut fut), Poll::Ready(Ok(zx::Signals::USER_0)));
600    }
601
602    // Using `run_until_stalled` does not modify the order of events
603    // compared to normal execution.
604    #[test]
605    fn run_until_stalled_preserves_order() {
606        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
607        let spawned_fut_completed = Arc::new(AtomicBool::new(false));
608        let spawned_fut_completed_writer = spawned_fut_completed.clone();
609        let spawned_fut = Box::pin(async move {
610            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(5))).await;
611            spawned_fut_completed_writer.store(true, Ordering::SeqCst);
612        });
613        let mut main_fut = pin!(async {
614            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10))).await;
615        });
616        spawn(spawned_fut);
617        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Pending);
618        executor.set_fake_time(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(15)));
619        // The timer in `spawned_fut` should fire first, then the
620        // timer in `main_fut`.
621        assert_eq!(executor.run_until_stalled(&mut main_fut), Poll::Ready(()));
622        assert!(spawned_fut_completed.load(Ordering::SeqCst));
623    }
624
625    #[test]
626    fn task_destruction() {
627        struct DropSpawner {
628            dropped: Arc<AtomicBool>,
629        }
630        impl Drop for DropSpawner {
631            fn drop(&mut self) {
632                self.dropped.store(true, Ordering::SeqCst);
633                let dropped_clone = self.dropped.clone();
634                spawn(async {
635                    // Hold on to a reference here to verify that it, too, is destroyed later
636                    let _dropped_clone = dropped_clone;
637                    panic!("task spawned in drop shouldn't be polled");
638                });
639            }
640        }
641        let mut dropped = Arc::new(AtomicBool::new(false));
642        let drop_spawner = DropSpawner { dropped: dropped.clone() };
643        let mut executor = TestExecutorBuilder::new().build();
644        let mut main_fut = pin!(async move {
645            spawn(async move {
646                // Take ownership of the drop spawner
647                let _drop_spawner = drop_spawner;
648                future::pending::<()>().await;
649            });
650        });
651        assert!(executor.run_until_stalled(&mut main_fut).is_ready());
652        assert!(
653            !dropped.load(Ordering::SeqCst),
654            "executor dropped pending task before destruction"
655        );
656
657        // Should drop the pending task and it's owned drop spawner,
658        // as well as gracefully drop the future spawned from the drop spawner.
659        drop(executor);
660        let dropped = Arc::get_mut(&mut dropped)
661            .expect("someone else is unexpectedly still holding on to a reference");
662        assert!(
663            dropped.load(Ordering::SeqCst),
664            "executor did not drop pending task during destruction"
665        );
666    }
667
668    #[test]
669    fn time_now_real_time() {
670        let _executor = LocalExecutorBuilder::new().build();
671        let t1 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
672        let t2 = MonotonicInstant::now().into_zx();
673        let t3 = zx::MonotonicInstant::after(zx::MonotonicDuration::from_seconds(0));
674        assert!(t1 <= t2);
675        assert!(t2 <= t3);
676    }
677
678    #[test]
679    fn time_now_fake_time() {
680        let executor = TestExecutorBuilder::new().fake_time(true).build();
681        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
682        executor.set_fake_time(t1);
683        assert_eq!(MonotonicInstant::now(), t1);
684
685        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
686        executor.set_fake_time(t2);
687        assert_eq!(MonotonicInstant::now(), t2);
688    }
689
690    #[test]
691    fn time_now_fake_time_boot() {
692        let executor = TestExecutorBuilder::new().fake_time(true).build();
693        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
694        executor.set_fake_time(t1);
695        assert_eq!(MonotonicInstant::now(), t1);
696        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
697
698        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
699        executor.set_fake_time(t2);
700        assert_eq!(MonotonicInstant::now(), t2);
701        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
702
703        const TEST_BOOT_OFFSET: i64 = 42;
704
705        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
706        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
707    }
708
709    #[test]
710    fn time_boot_now() {
711        let executor = TestExecutorBuilder::new().fake_time(true).build();
712        let t1 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(0));
713        executor.set_fake_time(t1);
714        assert_eq!(MonotonicInstant::now(), t1);
715        assert_eq!(BootInstant::now().into_nanos(), t1.into_nanos());
716
717        let t2 = MonotonicInstant::from_zx(zx::MonotonicInstant::from_nanos(1000));
718        executor.set_fake_time(t2);
719        assert_eq!(MonotonicInstant::now(), t2);
720        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos());
721
722        const TEST_BOOT_OFFSET: i64 = 42;
723
724        executor.set_fake_boot_to_mono_offset(zx::BootDuration::from_nanos(TEST_BOOT_OFFSET));
725        assert_eq!(BootInstant::now().into_nanos(), t2.into_nanos() + TEST_BOOT_OFFSET);
726    }
727
728    #[test]
729    fn time_after_overflow() {
730        let executor = TestExecutorBuilder::new().fake_time(true).build();
731
732        executor.set_fake_time(MonotonicInstant::INFINITE - zx::MonotonicDuration::from_nanos(100));
733        assert_eq!(
734            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(200)),
735            MonotonicInstant::INFINITE
736        );
737
738        executor.set_fake_time(
739            MonotonicInstant::INFINITE_PAST + zx::MonotonicDuration::from_nanos(100),
740        );
741        assert_eq!(
742            MonotonicInstant::after(zx::MonotonicDuration::from_seconds(-200)),
743            MonotonicInstant::INFINITE_PAST
744        );
745    }
746
747    // This future wakes itself up a number of times during the same cycle
748    async fn multi_wake(n: usize) {
749        let mut done = false;
750        futures::future::poll_fn(|cx| {
751            if done {
752                return Poll::Ready(());
753            }
754            for _ in 1..n {
755                cx.waker().wake_by_ref()
756            }
757            done = true;
758            Poll::Pending
759        })
760        .await;
761    }
762
763    #[test]
764    fn test_boot_time_tracks_mono_time() {
765        const FAKE_TIME: i64 = 42;
766        let executor = TestExecutorBuilder::new().fake_time(true).build();
767        executor.set_fake_time(MonotonicInstant::from_nanos(FAKE_TIME));
768        assert_eq!(
769            BootInstant::from_nanos(FAKE_TIME),
770            executor.boot_now(),
771            "boot time should have advanced"
772        );
773
774        // Now advance boot without mono.
775        executor.set_fake_boot_to_mono_offset(BootDuration::from_nanos(FAKE_TIME));
776        assert_eq!(
777            BootInstant::from_nanos(2 * FAKE_TIME),
778            executor.boot_now(),
779            "boot time should have advanced again"
780        );
781    }
782
783    // Ensure that a large amount of wakeups does not exhaust kernel resources,
784    // such as the zx port queue limit.
785    #[test]
786    fn many_wakeups() {
787        let mut executor = LocalExecutorBuilder::new().build();
788        executor.run_singlethreaded(multi_wake(4096 * 2));
789    }
790
791    fn advance_to_with(timer_duration: impl WakeupTime) {
792        let mut executor = TestExecutorBuilder::new().fake_time(true).build();
793        executor.set_fake_time(MonotonicInstant::from_nanos(0));
794
795        let mut fut = pin!(async {
796            let timer_fired = Arc::new(AtomicBool::new(false));
797            futures::join!(
798                async {
799                    // Oneshot timer.
800                    Timer::new(timer_duration).await;
801                    timer_fired.store(true, Ordering::SeqCst);
802                },
803                async {
804                    // Interval timer, fires periodically.
805                    let mut fired = 0;
806                    let mut interval = pin!(Interval::new(zx::MonotonicDuration::from_seconds(1)));
807                    while interval.next().await.is_some() {
808                        fired += 1;
809                        if fired == 3 {
810                            break;
811                        }
812                    }
813                    assert_eq!(fired, 3, "interval timer should have fired multiple times.");
814                },
815                async {
816                    assert!(
817                        !timer_fired.load(Ordering::SeqCst),
818                        "the oneshot timer shouldn't be fired"
819                    );
820                    TestExecutor::advance_to(MonotonicInstant::after(
821                        zx::MonotonicDuration::from_millis(500),
822                    ))
823                    .await;
824                    // Timer still shouldn't be fired.
825                    assert!(
826                        !timer_fired.load(Ordering::SeqCst),
827                        "the oneshot timer shouldn't be fired"
828                    );
829                    TestExecutor::advance_to(MonotonicInstant::after(
830                        zx::MonotonicDuration::from_millis(500),
831                    ))
832                    .await;
833
834                    assert!(
835                        timer_fired.load(Ordering::SeqCst),
836                        "the oneshot timer should have fired"
837                    );
838
839                    // The interval timer should have fired once.  Make it fire twice more.
840                    TestExecutor::advance_to(MonotonicInstant::after(
841                        zx::MonotonicDuration::from_seconds(2),
842                    ))
843                    .await;
844                }
845            )
846        });
847        assert!(executor.run_until_stalled(&mut fut).is_ready());
848    }
849
850    #[test]
851    fn test_advance_to() {
852        advance_to_with(zx::MonotonicDuration::from_seconds(1));
853    }
854
855    #[test]
856    fn test_advance_to_boot() {
857        advance_to_with(zx::BootDuration::from_seconds(1));
858    }
859}