fuchsia_async/runtime/fuchsia/
timer.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for creating futures that represent timers.
6//!
7//! This module contains the `Timer` type which is a future that will resolve
8//! at a particular point in the future.
9
10use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
11use crate::PacketReceiver;
12use fuchsia_sync::Mutex;
13
14use futures::future::FusedFuture;
15use futures::stream::FusedStream;
16use futures::task::{AtomicWaker, Context};
17use futures::{FutureExt, Stream};
18use std::cell::UnsafeCell;
19use std::fmt;
20use std::future::Future;
21use std::marker::PhantomPinned;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicU8, Ordering};
24use std::sync::Arc;
25use std::task::{ready, Poll, Waker};
26use zx::AsHandleRef as _;
27
28pub trait TimeInterface:
29    Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
30{
31    type Timeline: zx::Timeline + Send + Sync + 'static;
32
33    fn from_nanos(nanos: i64) -> Self;
34    fn into_nanos(self) -> i64;
35    fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
36    fn now() -> i64;
37    fn create_timer() -> zx::Timer<Self::Timeline>;
38}
39
40impl TimeInterface for MonotonicInstant {
41    type Timeline = zx::MonotonicTimeline;
42
43    fn from_nanos(nanos: i64) -> Self {
44        Self::from_nanos(nanos)
45    }
46
47    fn into_nanos(self) -> i64 {
48        self.into_nanos()
49    }
50
51    fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
52        zx::MonotonicInstant::from_nanos(nanos)
53    }
54
55    fn now() -> i64 {
56        EHandle::local().inner().now().into_nanos()
57    }
58
59    fn create_timer() -> zx::Timer<Self::Timeline> {
60        zx::Timer::<Self::Timeline>::create()
61    }
62}
63
64impl TimeInterface for BootInstant {
65    type Timeline = zx::BootTimeline;
66
67    fn from_nanos(nanos: i64) -> Self {
68        Self::from_nanos(nanos)
69    }
70
71    fn into_nanos(self) -> i64 {
72        self.into_nanos()
73    }
74
75    fn zx_instant(nanos: i64) -> zx::BootInstant {
76        zx::BootInstant::from_nanos(nanos)
77    }
78
79    fn now() -> i64 {
80        EHandle::local().inner().boot_now().into_nanos()
81    }
82
83    fn create_timer() -> zx::Timer<Self::Timeline> {
84        zx::Timer::<Self::Timeline>::create()
85    }
86}
87
88impl WakeupTime for std::time::Instant {
89    fn into_timer(self) -> Timer {
90        let now_as_instant = std::time::Instant::now();
91        let now_as_time = MonotonicInstant::now();
92        EHandle::local()
93            .mono_timers()
94            .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
95    }
96}
97
98impl WakeupTime for MonotonicInstant {
99    fn into_timer(self) -> Timer {
100        EHandle::local().mono_timers().new_timer(self)
101    }
102}
103
104impl WakeupTime for BootInstant {
105    fn into_timer(self) -> Timer {
106        EHandle::local().boot_timers().new_timer(self)
107    }
108}
109
110impl WakeupTime for zx::MonotonicInstant {
111    fn into_timer(self) -> Timer {
112        EHandle::local().mono_timers().new_timer(self.into())
113    }
114}
115
116impl WakeupTime for zx::BootInstant {
117    fn into_timer(self) -> Timer {
118        EHandle::local().boot_timers().new_timer(self.into())
119    }
120}
121
122/// An asynchronous timer.
123#[must_use = "futures do nothing unless polled"]
124pub struct Timer(TimerState);
125
126impl Timer {
127    /// Create a new timer scheduled to fire at `time`.
128    pub fn new(time: impl WakeupTime) -> Self {
129        time.into_timer()
130    }
131
132    /// Reset the `Timer` to a fire at a new time.
133    pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
134        let nanos = time.into_nanos();
135        // If in the UNREGISTERED state, we can skip the call to `try_reset_timer` because the timer
136        // has *never* been registered and so there's no danger of another thread having access to
137        // this timer. In all other states, including the FIRED and TERMINATED states, we must use
138        // `try_reset_timer` as that will take a lock and guarantee that other threads are not
139        // concurrently accessing the timer.
140        //
141        // This can be Relaxed because because there are no loads or stores that follow that could
142        // possibly be reordered before here that matter: the first thing `try_reset_timer` does is
143        // take a lock which will have its own memory barriers, and the store to the time is next
144        // going to be read by this same task prior to taking the lock in `Timers::inner`.
145        if self.0.state.load(Ordering::Relaxed) == UNREGISTERED
146            || !self.0.timers.try_reset_timer(&self.0, nanos)
147        {
148            // SAFETY: This is safe because we know the timer isn't registered which means we truly
149            // have exclusive access to TimerState.
150            unsafe { *self.0.nanos.get() = nanos };
151            self.0.state.store(UNREGISTERED, Ordering::Relaxed);
152        }
153    }
154}
155
156impl fmt::Debug for Timer {
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
158        f.debug_struct("Timer").field("time", &self.0.nanos).finish()
159    }
160}
161
162impl Drop for Timer {
163    fn drop(&mut self) {
164        self.0.timers.unregister(&self.0);
165    }
166}
167
168impl Future for Timer {
169    type Output = ();
170    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171        // SAFETY: We call `unregister` when `Timer` is dropped.
172        unsafe { self.0.timers.poll(self.as_ref(), cx) }
173    }
174}
175
176struct TimerState {
177    timers: Arc<dyn TimersInterface>,
178
179    // This is safe to access/mutate if the lock on `Timers::inner` is held.
180    nanos: UnsafeCell<i64>,
181
182    waker: AtomicWaker,
183    state: AtomicU8,
184
185    // Holds the index in the heap.  This is safe to access/mutate if the lock on `Timers::inner` is
186    // held.
187    index: UnsafeCell<HeapIndex>,
188
189    // StateRef stores a pointer to `TimerState`, so this must be pinned.
190    _pinned: PhantomPinned,
191}
192
193// SAFETY: TimerState is thread-safe.  See the safety comments elsewhere.
194unsafe impl Send for TimerState {}
195unsafe impl Sync for TimerState {}
196
197// Set when the timer is not registered in the heap.
198const UNREGISTERED: u8 = 0;
199
200// Set when the timer is in the heap.
201const REGISTERED: u8 = 1;
202
203// Set when the timer has fired.
204const FIRED: u8 = 2;
205
206// Set when the timer is terminated.
207const TERMINATED: u8 = 3;
208
209/// An index in the heap.
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211struct HeapIndex(usize);
212
213impl HeapIndex {
214    const NULL: HeapIndex = HeapIndex(usize::MAX);
215
216    fn get(&self) -> Option<usize> {
217        if *self == HeapIndex::NULL {
218            None
219        } else {
220            Some(self.0)
221        }
222    }
223}
224
225impl From<usize> for HeapIndex {
226    fn from(value: usize) -> Self {
227        Self(value)
228    }
229}
230
231impl FusedFuture for Timer {
232    fn is_terminated(&self) -> bool {
233        self.0.state.load(Ordering::Relaxed) == TERMINATED
234    }
235}
236
237// A note on safety:
238//
239//  1. We remove the timer from the heap before we drop TimerState, and TimerState is pinned, so
240//     it's safe to store pointers in the heap i.e. the pointers are live since we make sure we
241//     remove them before dropping `TimerState`.
242//
243//  2. Provided we do #1, it is always safe to access the atomic fields of TimerState.
244//
245//  3. Once the timer has been registered, it is safe to access the non-atomic fields of TimerState
246//     whilst holding the lock on `Timers::inner`.
247#[derive(Copy, Clone, Debug)]
248struct StateRef(*const TimerState);
249
250// SAFETY: See the notes above regarding safety.
251unsafe impl Send for StateRef {}
252unsafe impl Sync for StateRef {}
253
254impl StateRef {
255    fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
256        // SAFETY: `inner` is locked.
257        unsafe {
258            // As soon as we set the state to FIRED, the heap no longer owns the timer and it might
259            // be re-registered.  This store is safe to be Relaxed because `AtomicWaker::take` has a
260            // Release barrier, so the store can't be reordered after it, and therefore we can be
261            // certain that another thread which re-registers the waker will see the state is FIRED
262            // (and will interpret that as meaning that the task should not block and instead
263            // immediately complete; see `Timers::poll`).
264            (*self.0).state.store(FIRED, Ordering::Relaxed);
265            (*self.0).waker.take()
266        }
267    }
268
269    // # Safety
270    //
271    // `Timers::inner` must be locked.
272    unsafe fn nanos(&self) -> i64 {
273        *(*self.0).nanos.get()
274    }
275
276    // # Safety
277    //
278    // `Timers::inner` must be locked.
279    unsafe fn nanos_mut(&mut self) -> &mut i64 {
280        &mut *(*self.0).nanos.get()
281    }
282
283    // # Safety
284    //
285    // `Timers::inner` must be locked.
286    unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
287        std::mem::replace(&mut *(*self.0).index.get(), index)
288    }
289}
290
291/// An asynchronous interval timer.
292///
293/// This is a stream of events resolving at a rate of once-per interval.  This generates an event
294/// for *every* elapsed duration, even if multiple have elapsed since last polled.
295///
296/// TODO(https://fxbug.dev/375632319): This is lack of BootInstant support.
297#[derive(Debug)]
298#[must_use = "streams do nothing unless polled"]
299pub struct Interval {
300    timer: Pin<Box<Timer>>,
301    next: MonotonicInstant,
302    duration: zx::MonotonicDuration,
303}
304
305impl Interval {
306    /// Create a new `Interval` which yields every `duration`.
307    pub fn new(duration: zx::MonotonicDuration) -> Self {
308        let next = MonotonicInstant::after(duration);
309        Interval { timer: Box::pin(Timer::new(next)), next, duration }
310    }
311}
312
313impl FusedStream for Interval {
314    fn is_terminated(&self) -> bool {
315        // `Interval` never yields `None`
316        false
317    }
318}
319
320impl Stream for Interval {
321    type Item = ();
322    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
323        ready!(self.timer.poll_unpin(cx));
324        let next = self.next + self.duration;
325        self.timer.as_mut().reset(next);
326        self.next = next;
327        Poll::Ready(Some(()))
328    }
329}
330
331pub(crate) struct Timers<T: TimeInterface> {
332    inner: Mutex<Inner>,
333
334    // We can't easily use ReceiverRegistration because there would be a circular dependency we'd
335    // have to break: Executor -> Timers -> ReceiverRegistration -> EHandle -> ScopeRef ->
336    // Executor, so instead we just store the port key and then change the executor to drop the
337    // registration at the appropriate place.
338    port_key: u64,
339
340    fake: bool,
341
342    timer: zx::Timer<T::Timeline>,
343}
344
345struct Inner {
346    // The queue of active timer objects.
347    timers: Heap,
348
349    // The last deadline we set on the zircon timer, or None if the queue was empty and the timer
350    // was canceled most recently.
351    last_deadline: Option<i64>,
352
353    // True if there's a pending async_wait.
354    async_wait: bool,
355}
356
357impl<T: TimeInterface> Timers<T> {
358    pub fn new(port_key: u64, fake: bool) -> Self {
359        Self {
360            inner: Mutex::new(Inner {
361                timers: Heap::default(),
362                last_deadline: None,
363                async_wait: false,
364            }),
365            port_key,
366            fake,
367            timer: T::create_timer(),
368        }
369    }
370
371    pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
372        let nanos = time.into_nanos();
373        Timer(TimerState {
374            timers: self.clone(),
375            nanos: UnsafeCell::new(nanos),
376            waker: AtomicWaker::new(),
377            state: AtomicU8::new(UNREGISTERED),
378            index: UnsafeCell::new(HeapIndex::NULL),
379            _pinned: PhantomPinned,
380        })
381    }
382
383    /// Ensures the underlying Zircon timer has been correctly set or canceled after
384    /// the queue has been updated.
385    ///
386    /// # Safety
387    ///
388    /// Callers must ensure that `self.inner` is locked before calling this method.
389    fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
390        // Our new deadline is the deadline for the front of the queue, or no deadline (infinite) if
391        // there is no front of the queue.
392        let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
393
394        // If the effective deadline of the queue has changed, reprogram the zircon timer's
395        // deadline.
396        if new_deadline != inner.last_deadline {
397            inner.last_deadline = new_deadline;
398            match inner.last_deadline {
399                Some(deadline) => {
400                    self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
401                }
402                None => self.timer.cancel().unwrap(),
403            }
404        }
405
406        // If this is being called while processing the timer packet from a previous async wait,
407        // then clear the async wait flag.  This is the very last thing we need to do, so this async
408        // wait operation is effectively over.
409        if from_receive_packet {
410            inner.async_wait = false;
411        }
412
413        // If we have a valid timeout, but we have no in-flight async wait operation, post a new
414        // one.
415        if inner.last_deadline.is_some() && !inner.async_wait {
416            if self.fake {
417                // Clear the signal used for fake timers so that we can use it to trigger
418                // next time.
419                self.timer.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
420            }
421
422            self.timer
423                .wait_async_handle(
424                    EHandle::local().port(),
425                    self.port_key,
426                    if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
427                    zx::WaitAsyncOpts::empty(),
428                )
429                .unwrap();
430
431            inner.async_wait = true;
432        }
433    }
434
435    pub fn port_key(&self) -> u64 {
436        self.port_key
437    }
438
439    /// Wakes timers that should be firing now.  Returns true if any timers were woken.
440    pub fn wake_timers(&self) -> bool {
441        self.wake_timers_impl(false)
442    }
443
444    fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
445        let now = T::now();
446        let mut timers_woken = false;
447
448        loop {
449            let waker = {
450                let mut inner = self.inner.lock();
451
452                // SAFETY: `inner` is locked.
453                let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
454                if deadline.is_some_and(|d| d <= now) {
455                    let timer = inner.timers.pop().unwrap();
456                    timer.into_waker(&mut inner)
457                } else {
458                    // We are now finished (one way or the other). Setup the underlying Zircon timer
459                    // to reflect the new state of the queue, and break out of the loop.
460                    //
461                    // When processing a timer packet (from_receive_packet is true), it is very
462                    // important that this always be the last thing we do before breaking out of the
463                    // loop (dropping the lock in the process) for good.
464                    //
465                    // Failing to do this at the end can lead to the timer queue stalling.  Doing it
466                    // early (when we are dispatching expired timers) can lead to an ever
467                    // multiplying army of posted async waits.
468                    //
469                    // See https://g-issues.fuchsia.dev/issues/396173066 for details.
470                    self.setup_zircon_timer(&mut inner, from_receive_packet);
471                    break;
472                }
473            };
474            if let Some(waker) = waker {
475                waker.wake()
476            }
477            timers_woken = true;
478        }
479        timers_woken
480    }
481
482    /// Wakes the next timer and returns its time.
483    pub fn wake_next_timer(&self) -> Option<T> {
484        let (nanos, waker) = {
485            let mut inner = self.inner.lock();
486            let timer = inner.timers.pop()?;
487            // SAFETY: `inner` is locked.
488            let nanos = unsafe { timer.nanos() };
489            (nanos, timer.into_waker(&mut inner))
490        };
491        if let Some(waker) = waker {
492            waker.wake();
493        }
494        Some(T::from_nanos(nanos))
495    }
496
497    /// Returns the next timer due to expire.
498    pub fn next_timer(&self) -> Option<T> {
499        // SAFETY: `inner` is locked.
500        self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
501    }
502
503    /// If there's a timer ready, sends a notification to wake up the receiver.
504    ///
505    /// # Panics
506    ///
507    /// This will panic if we are not using fake time.
508    pub fn maybe_notify(&self, now: T) {
509        assert!(self.fake, "calling this function requires using fake time.");
510        // SAFETY: `inner` is locked.
511        if self
512            .inner
513            .lock()
514            .timers
515            .peek()
516            .is_some_and(|state| unsafe { state.nanos() } <= now.into_nanos())
517        {
518            self.timer.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
519        }
520    }
521}
522
523impl<T: TimeInterface> PacketReceiver for Timers<T> {
524    fn receive_packet(&self, _packet: zx::Packet) {
525        self.wake_timers_impl(true);
526    }
527}
528
529// See comments on the implementation below.
530trait TimersInterface: Send + Sync + 'static {
531    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
532    fn unregister(&self, state: &TimerState);
533    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
534}
535
536impl<T: TimeInterface> TimersInterface for Timers<T> {
537    // # Safety
538    //
539    // `unregister` must be called before `Timer` is dropped.
540    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
541        // See https://docs.rs/futures/0.3.5/futures/task/struct.AtomicWaker.html
542        // for more information.
543        // Quick check to avoid registration if already done.
544        //
545        // This is safe to be Relaxed because `AtomicWaker::register` has barriers which means that
546        // the load further down can't be moved before registering the waker, which means we can't
547        // miss the timer firing.  If the timer isn't registered, the time might have been reset but
548        // that would have been by the same task, so there should be no ordering issue there.  If we
549        // then try and register the timer, we take the lock on `inner` so there will be barriers
550        // there.
551        let state = timer.0.state.load(Ordering::Relaxed);
552
553        if state == TERMINATED {
554            return Poll::Ready(());
555        }
556
557        if state == FIRED {
558            timer.0.state.store(TERMINATED, Ordering::Relaxed);
559            return Poll::Ready(());
560        }
561
562        if state == UNREGISTERED {
563            // SAFETY: The state is UNREGISTERED, so we have exclusive access.
564            let nanos = unsafe { *timer.0.nanos.get() };
565            if nanos <= T::now() {
566                timer.0.state.store(FIRED, Ordering::Relaxed);
567                return Poll::Ready(());
568            }
569            let mut inner = self.inner.lock();
570
571            // We store a pointer to `timer` here. This is safe to do because `timer` is pinned, and
572            // we always make sure we call `unregister` before `timer` is dropped.
573            inner.timers.push(StateRef(&timer.0));
574
575            // Now that we have added a new timer to the queue, setup the
576            // underlying zircon timer to reflect the new state of the queue.
577            self.setup_zircon_timer(&mut inner, false);
578
579            timer.0.state.store(REGISTERED, Ordering::Relaxed);
580        }
581
582        timer.0.waker.register(cx.waker());
583
584        // Now that we've registered a waker, we need to check to see if the timer has been marked
585        // as FIRED by another thread in the meantime (e.g. in StateRef::into_waker).  In that case
586        // the timer is never going to fire again as it is no longer managed by the timer heap, so
587        // the timer's task would become Pending but nothing would wake it up later.
588        // Loading the state *must* happen after the above `AtomicWaker::register` (which
589        // establishes an Acquire barrier, preventing the below load from being reordered above it),
590        // or else we could racily hit the above scenario.
591        let state = timer.0.state.load(Ordering::Relaxed);
592        match state {
593            FIRED => {
594                timer.0.state.store(TERMINATED, Ordering::Relaxed);
595                Poll::Ready(())
596            }
597            REGISTERED => Poll::Pending,
598            // TERMINATED is only set in `poll` which has exclusive access to the task (&mut
599            // Context).
600            // UNREGISTERED would indicate a logic bug somewhere.
601            _ => {
602                unreachable!();
603            }
604        }
605    }
606
607    fn unregister(&self, timer: &TimerState) {
608        if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
609            // If the timer was never registered, then we have exclusive access and we can skip the
610            // rest of this (avoiding the lock on `inner`).
611            // We cannot early-exit if the timer is FIRED or TERMINATED because then we could race
612            // with another thread that is actively using the timer object, and if this call
613            // completes before it blocks on `inner`, then the timer's resources could be
614            // deallocated, which would result in a use-after-free on the other thread.
615            return;
616        }
617        let mut inner = self.inner.lock();
618        // SAFETY: `inner` is locked.
619        let index = unsafe { *timer.index.get() };
620        if let Some(index) = index.get() {
621            inner.timers.remove(index);
622            if index == 0 {
623                // The front of the queue just changed.  Make sure to update the underlying zircon
624                // timer state to match the new queue state.
625                self.setup_zircon_timer(&mut inner, false);
626            }
627            timer.state.store(UNREGISTERED, Ordering::Relaxed);
628        }
629    }
630
631    /// Returns true if the timer was successfully reset.
632    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
633        let mut inner = self.inner.lock();
634        // SAFETY: `inner` is locked.
635        let index = unsafe { *timer.index.get() };
636        if let Some(old_index) = index.get() {
637            if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
638                // If the timer has moved into or out-of the front queue position, update the
639                // underlying zircon timer to reflect the new queue state.
640                self.setup_zircon_timer(&mut inner, false);
641            }
642            timer.state.store(REGISTERED, Ordering::Relaxed);
643            true
644        } else {
645            false
646        }
647    }
648}
649
650#[derive(Default)]
651struct Heap(Vec<StateRef>);
652
653// BinaryHeap doesn't support removal, and BTreeSet ends up increasing binary size significantly,
654// so we roll our own binary heap.
655impl Heap {
656    fn push(&mut self, mut timer: StateRef) {
657        let index = self.0.len();
658        self.0.push(timer);
659        // SAFETY: `inner` is locked.
660        unsafe {
661            timer.set_index(index.into());
662        }
663        self.fix_up(index);
664    }
665
666    fn peek(&self) -> Option<&StateRef> {
667        self.0.first()
668    }
669
670    fn pop(&mut self) -> Option<StateRef> {
671        if let Some(&first) = self.0.first() {
672            self.remove(0);
673            Some(first)
674        } else {
675            None
676        }
677    }
678
679    fn swap(&mut self, a: usize, b: usize) {
680        self.0.swap(a, b);
681        // SAFETY: `inner` is locked.
682        unsafe {
683            self.0[a].set_index(a.into());
684            self.0[b].set_index(b.into());
685        }
686    }
687
688    /// Resets the timer at the given index to the new time and returns the new index.
689    fn reset(&mut self, index: usize, nanos: i64) -> usize {
690        // SAFETY: `inner` is locked.
691        if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
692            self.fix_up(index)
693        } else {
694            self.fix_down(index)
695        }
696    }
697
698    fn remove(&mut self, index: usize) {
699        // SAFETY: `inner` is locked.
700        unsafe {
701            let old_index = self.0[index].set_index(HeapIndex::NULL);
702            debug_assert_eq!(old_index, index.into());
703        }
704
705        // Swap the item at slot `index` to the end of the vector so we can truncate it away, and
706        // then swap the previously last item into the correct spot.
707        let last = self.0.len() - 1;
708        if index < last {
709            let fix_up;
710            unsafe {
711                // SAFETY: `inner` is locked.
712                fix_up = self.0[last].nanos() < self.0[index].nanos();
713                self.0[index] = self.0[last];
714                self.0[index].set_index(index.into());
715            };
716            self.0.truncate(last);
717            if fix_up {
718                self.fix_up(index);
719            } else {
720                self.fix_down(index);
721            }
722        } else {
723            self.0.truncate(last);
724        };
725    }
726
727    /// Returns the new index
728    fn fix_up(&mut self, mut index: usize) -> usize {
729        while index > 0 {
730            let parent = (index - 1) / 2;
731            // SAFETY: `inner` is locked.
732            if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
733                return index;
734            }
735            self.swap(parent, index);
736            index = parent;
737        }
738        index
739    }
740
741    /// Returns the new index
742    fn fix_down(&mut self, mut index: usize) -> usize {
743        let len = self.0.len();
744        loop {
745            let left = index * 2 + 1;
746            if left >= len {
747                return index;
748            }
749
750            let mut swap_with = None;
751
752            // SAFETY: `inner` is locked.
753            unsafe {
754                let mut nanos = self.0[index].nanos();
755                let left_nanos = self.0[left].nanos();
756                if left_nanos < nanos {
757                    swap_with = Some(left);
758                    nanos = left_nanos;
759                }
760                let right = left + 1;
761                if right < len && self.0[right].nanos() < nanos {
762                    swap_with = Some(right);
763                }
764            }
765
766            let Some(swap_with) = swap_with else { return index };
767            self.swap(index, swap_with);
768            index = swap_with;
769        }
770    }
771}
772
773#[cfg(test)]
774mod test {
775    use super::*;
776    use crate::{LocalExecutor, SendExecutor, Task, TestExecutor};
777    use assert_matches::assert_matches;
778    use futures::channel::oneshot::channel;
779    use futures::future::Either;
780    use futures::prelude::*;
781    use rand::seq::SliceRandom;
782    use rand::{thread_rng, Rng};
783    use std::future::poll_fn;
784    use std::pin::pin;
785    use zx::MonotonicDuration;
786
787    trait TestTimeInterface:
788        TimeInterface
789        + WakeupTime
790        + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
791        + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
792    {
793        fn after(duration: zx::Duration<Self::Timeline>) -> Self;
794    }
795
796    impl TestTimeInterface for MonotonicInstant {
797        fn after(duration: zx::MonotonicDuration) -> Self {
798            Self::after(duration)
799        }
800    }
801
802    impl TestTimeInterface for BootInstant {
803        fn after(duration: zx::BootDuration) -> Self {
804            Self::after(duration)
805        }
806    }
807
808    fn test_shorter_fires_first<T: TestTimeInterface>() {
809        let mut exec = LocalExecutor::new();
810        let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
811        let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
812        match exec.run_singlethreaded(future::select(shorter, longer)) {
813            Either::Left(_) => {}
814            Either::Right(_) => panic!("wrong timer fired"),
815        }
816    }
817
818    #[test]
819    fn shorter_fires_first() {
820        test_shorter_fires_first::<MonotonicInstant>();
821        test_shorter_fires_first::<BootInstant>();
822    }
823
824    fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
825        SendExecutor::new(4).run(async {
826            let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
827            let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
828            match future::select(shorter, longer).await {
829                Either::Left(_) => {}
830                Either::Right(_) => panic!("wrong timer fired"),
831            }
832        });
833    }
834
835    #[test]
836    fn shorter_fires_first_multithreaded() {
837        test_shorter_fires_first_multithreaded::<MonotonicInstant>();
838        test_shorter_fires_first_multithreaded::<BootInstant>();
839    }
840
841    fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
842        let mut exec = TestExecutor::new();
843        let now = T::now();
844        let before = pin!(Timer::new(T::from_nanos(now - 1)));
845        let after = pin!(Timer::new(T::from_nanos(now + 1)));
846        assert_matches!(
847            exec.run_singlethreaded(futures::future::select(before, after)),
848            Either::Left(_),
849            "Timer in the past should fire first"
850        );
851    }
852
853    #[test]
854    fn timer_before_now_fires_immediately() {
855        test_timer_before_now_fires_immediately::<MonotonicInstant>();
856        test_timer_before_now_fires_immediately::<BootInstant>();
857    }
858
859    #[test]
860    fn fires_after_timeout() {
861        let mut exec = TestExecutor::new_with_fake_time();
862        exec.set_fake_time(MonotonicInstant::from_nanos(0));
863        let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
864        let mut future = pin!(Timer::new(deadline));
865        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
866        exec.set_fake_time(deadline);
867        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
868    }
869
870    #[test]
871    fn interval() {
872        let mut exec = TestExecutor::new_with_fake_time();
873        let start = MonotonicInstant::from_nanos(0);
874        exec.set_fake_time(start);
875
876        let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
877        let mut future = pin!({
878            let counter = counter.clone();
879            Interval::new(MonotonicDuration::from_seconds(5))
880                .map(move |()| {
881                    counter.fetch_add(1, Ordering::SeqCst);
882                })
883                .collect::<()>()
884        });
885
886        // PollResult for the first time before the timer runs
887        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
888        assert_eq!(0, counter.load(Ordering::SeqCst));
889
890        // Pretend to wait until the next timer
891        let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
892        assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
893        exec.set_fake_time(first_deadline);
894        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
895        assert_eq!(1, counter.load(Ordering::SeqCst));
896
897        // PollResulting again before the timer runs shouldn't produce another item from the stream
898        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
899        assert_eq!(1, counter.load(Ordering::SeqCst));
900
901        // "Wait" until the next timeout and poll again: expect another item from the stream
902        let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
903        exec.set_fake_time(second_deadline);
904        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
905        assert_eq!(2, counter.load(Ordering::SeqCst));
906
907        assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
908    }
909
910    #[test]
911    fn timer_fake_time() {
912        let mut exec = TestExecutor::new_with_fake_time();
913        exec.set_fake_time(MonotonicInstant::from_nanos(0));
914
915        let mut timer =
916            pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
917        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
918
919        exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
920        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
921    }
922
923    fn create_timers(
924        timers: &Arc<Timers<MonotonicInstant>>,
925        nanos: &[i64],
926        timer_futures: &mut Vec<Pin<Box<Timer>>>,
927    ) {
928        let waker = futures::task::noop_waker();
929        let mut cx = Context::from_waker(&waker);
930        for &n in nanos {
931            let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
932            let _ = timer.poll_unpin(&mut cx);
933            timer_futures.push(timer);
934        }
935    }
936
937    #[test]
938    fn timer_heap() {
939        let _exec = TestExecutor::new_with_fake_time();
940        let timers = Arc::new(Timers::<MonotonicInstant>::new(0, true));
941
942        let mut timer_futures = Vec::new();
943        let mut nanos: Vec<_> = (0..1000).collect();
944        let mut rng = thread_rng();
945        nanos.shuffle(&mut rng);
946
947        create_timers(&timers, &nanos, &mut timer_futures);
948
949        // Make sure the timers fire in the correct order.
950        for i in 0..1000 {
951            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
952        }
953
954        timer_futures.clear();
955        create_timers(&timers, &nanos, &mut timer_futures);
956
957        // Remove half of them in random order, and ensure the remaining timers are correctly
958        // ordered.
959        timer_futures.shuffle(&mut rng);
960        timer_futures.truncate(500);
961        let mut last_time = None;
962        for _ in 0..500 {
963            let time = timers.wake_next_timer().unwrap();
964            if let Some(last_time) = last_time {
965                assert!(last_time <= time);
966            }
967            last_time = Some(time);
968        }
969        assert_eq!(timers.wake_next_timer(), None);
970
971        timer_futures = vec![];
972        create_timers(&timers, &nanos, &mut timer_futures);
973
974        // Replace them all in random order.
975        timer_futures.shuffle(&mut rng);
976        let mut nanos: Vec<_> = (1000..2000).collect();
977        nanos.shuffle(&mut rng);
978
979        for (fut, n) in timer_futures.iter_mut().zip(nanos) {
980            fut.as_mut().reset(MonotonicInstant::from_nanos(n));
981        }
982
983        // Check they all get changed and now fire in the correct order.
984        for i in 1000..2000 {
985            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
986        }
987    }
988
989    #[test]
990    fn timer_heap_with_same_time() {
991        let _exec = TestExecutor::new_with_fake_time();
992        let timers = Arc::new(Timers::<MonotonicInstant>::new(0, true));
993
994        let mut timer_futures = Vec::new();
995        let mut nanos: Vec<_> = (1..100).collect();
996        let mut rng = thread_rng();
997        nanos.shuffle(&mut rng);
998
999        create_timers(&timers, &nanos, &mut timer_futures);
1000
1001        // Create some timers with the same time.
1002        let time = rng.gen_range(0..101);
1003        let same_time = [time; 100];
1004        create_timers(&timers, &same_time, &mut timer_futures);
1005
1006        nanos.extend(&same_time);
1007        nanos.sort();
1008
1009        for n in nanos {
1010            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1011        }
1012    }
1013
1014    #[test]
1015    fn timer_reset_to_earlier_time() {
1016        let mut exec = LocalExecutor::new();
1017
1018        for _ in 0..100 {
1019            let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1020            let (sender, receiver) = channel();
1021            let task = Task::spawn(async move {
1022                let mut timer = pin!(Timer::new(instant));
1023                let mut receiver = pin!(receiver.fuse());
1024                poll_fn(|cx| loop {
1025                    if timer.as_mut().poll_unpin(cx).is_ready() {
1026                        return Poll::Ready(());
1027                    }
1028                    if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1029                        timer
1030                            .as_mut()
1031                            .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1032                    } else {
1033                        return Poll::Pending;
1034                    }
1035                })
1036                .await;
1037            });
1038            sender.send(()).unwrap();
1039
1040            exec.run_singlethreaded(task);
1041
1042            if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1043                return;
1044            }
1045        }
1046
1047        panic!("Timer fired late in all 100 attempts");
1048    }
1049
1050    #[test]
1051    fn test_reset() {
1052        // This is a test for https://fxbug.dev/418235546.
1053        SendExecutor::new(2).run(async {
1054            const TIMER_DELAY: zx::MonotonicDuration = zx::Duration::from_micros(100);
1055            let mut timer = pin!(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1056            for _ in 0..10000 {
1057                let _ = futures::poll!(timer.as_mut());
1058                std::thread::sleep(std::time::Duration::from_micros(
1059                    rand::thread_rng().gen_range(80..120),
1060                ));
1061                timer.as_mut().reset(MonotonicInstant::after(TIMER_DELAY));
1062                timer.set(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1063            }
1064        });
1065    }
1066}