fuchsia_async/runtime/fuchsia/
timer.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for creating futures that represent timers.
6//!
7//! This module contains the `Timer` type which is a future that will resolve
8//! at a particular point in the future.
9
10use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
11use crate::PacketReceiver;
12use fuchsia_sync::Mutex;
13
14use futures::future::FusedFuture;
15use futures::stream::FusedStream;
16use futures::task::{AtomicWaker, Context};
17use futures::{FutureExt, Stream};
18use std::cell::UnsafeCell;
19use std::fmt;
20use std::future::Future;
21use std::marker::PhantomPinned;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicU8, Ordering};
24use std::sync::Arc;
25use std::task::{ready, Poll, Waker};
26use zx::AsHandleRef as _;
27
28pub trait TimeInterface:
29    Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
30{
31    type Timeline: zx::Timeline + Send + Sync + 'static;
32
33    fn from_nanos(nanos: i64) -> Self;
34    fn into_nanos(self) -> i64;
35    fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
36    fn now() -> i64;
37    fn create_timer() -> zx::Timer<Self::Timeline>;
38}
39
40impl TimeInterface for MonotonicInstant {
41    type Timeline = zx::MonotonicTimeline;
42
43    fn from_nanos(nanos: i64) -> Self {
44        Self::from_nanos(nanos)
45    }
46
47    fn into_nanos(self) -> i64 {
48        self.into_nanos()
49    }
50
51    fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
52        zx::MonotonicInstant::from_nanos(nanos)
53    }
54
55    fn now() -> i64 {
56        EHandle::local().inner().now().into_nanos()
57    }
58
59    fn create_timer() -> zx::Timer<Self::Timeline> {
60        zx::Timer::<Self::Timeline>::create()
61    }
62}
63
64impl TimeInterface for BootInstant {
65    type Timeline = zx::BootTimeline;
66
67    fn from_nanos(nanos: i64) -> Self {
68        Self::from_nanos(nanos)
69    }
70
71    fn into_nanos(self) -> i64 {
72        self.into_nanos()
73    }
74
75    fn zx_instant(nanos: i64) -> zx::BootInstant {
76        zx::BootInstant::from_nanos(nanos)
77    }
78
79    fn now() -> i64 {
80        EHandle::local().inner().boot_now().into_nanos()
81    }
82
83    fn create_timer() -> zx::Timer<Self::Timeline> {
84        zx::Timer::<Self::Timeline>::create()
85    }
86}
87
88impl WakeupTime for std::time::Instant {
89    fn into_timer(self) -> Timer {
90        let now_as_instant = std::time::Instant::now();
91        let now_as_time = MonotonicInstant::now();
92        EHandle::local()
93            .mono_timers()
94            .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
95    }
96}
97
98impl WakeupTime for MonotonicInstant {
99    fn into_timer(self) -> Timer {
100        EHandle::local().mono_timers().new_timer(self)
101    }
102}
103
104impl WakeupTime for BootInstant {
105    fn into_timer(self) -> Timer {
106        EHandle::local().boot_timers().new_timer(self)
107    }
108}
109
110impl WakeupTime for zx::MonotonicInstant {
111    fn into_timer(self) -> Timer {
112        EHandle::local().mono_timers().new_timer(self.into())
113    }
114}
115
116impl WakeupTime for zx::BootInstant {
117    fn into_timer(self) -> Timer {
118        EHandle::local().boot_timers().new_timer(self.into())
119    }
120}
121
122/// An asynchronous timer.
123#[must_use = "futures do nothing unless polled"]
124pub struct Timer(TimerState);
125
126impl Timer {
127    /// Create a new timer scheduled to fire at `time`.
128    pub fn new(time: impl WakeupTime) -> Self {
129        time.into_timer()
130    }
131
132    /// Reset the `Timer` to a fire at a new time.
133    pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
134        let nanos = time.into_nanos();
135        // This can be Relaxed because because there are no loads or stores that follow that could
136        // possibly be reordered before here that matter: the first thing `try_reset_timer` does is
137        // take a lock which will have its own memory barriers, and the store to the time is next
138        // going to be read by this same task prior to taking the lock in `Timers::inner`.
139        if self.0.state.load(Ordering::Relaxed) != REGISTERED
140            || !self.0.timers.try_reset_timer(&self.0, nanos)
141        {
142            // SAFETY: This is safe because we know the timer isn't registered which means we truly
143            // have exclusive access to TimerState.
144            unsafe { *self.0.nanos.get() = nanos };
145            self.0.state.store(UNREGISTERED, Ordering::Relaxed);
146        }
147    }
148}
149
150impl fmt::Debug for Timer {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
152        f.debug_struct("Timer").field("time", &self.0.nanos).finish()
153    }
154}
155
156impl Drop for Timer {
157    fn drop(&mut self) {
158        self.0.timers.unregister(&self.0);
159    }
160}
161
162impl Future for Timer {
163    type Output = ();
164    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
165        // SAFETY: We call `unregister` when `Timer` is dropped.
166        unsafe { self.0.timers.poll(self.as_ref(), cx) }
167    }
168}
169
170struct TimerState {
171    timers: Arc<dyn TimersInterface>,
172
173    // This is safe to access/mutate if the lock on `Timers::inner` is held.
174    nanos: UnsafeCell<i64>,
175
176    waker: AtomicWaker,
177    state: AtomicU8,
178
179    // Holds the index in the heap.  This is safe to access/mutate if the lock on `Timers::inner` is
180    // held.
181    index: UnsafeCell<HeapIndex>,
182
183    // StateRef stores a pointer to `TimerState`, so this must be pinned.
184    _pinned: PhantomPinned,
185}
186
187// SAFETY: TimerState is thread-safe.  See the safety comments elsewhere.
188unsafe impl Send for TimerState {}
189unsafe impl Sync for TimerState {}
190
191// Set when the timer is not registered in the heap.
192const UNREGISTERED: u8 = 0;
193
194// Set when the timer is in the heap.
195const REGISTERED: u8 = 1;
196
197// Set when the timer has fired.
198const FIRED: u8 = 2;
199
200// Set when the timer is terminated.
201const TERMINATED: u8 = 3;
202
203/// An index in the heap.
204#[derive(Clone, Copy, Debug, Eq, PartialEq)]
205struct HeapIndex(usize);
206
207impl HeapIndex {
208    const NULL: HeapIndex = HeapIndex(usize::MAX);
209
210    fn get(&self) -> Option<usize> {
211        if *self == HeapIndex::NULL {
212            None
213        } else {
214            Some(self.0)
215        }
216    }
217}
218
219impl From<usize> for HeapIndex {
220    fn from(value: usize) -> Self {
221        Self(value)
222    }
223}
224
225impl FusedFuture for Timer {
226    fn is_terminated(&self) -> bool {
227        self.0.state.load(Ordering::Relaxed) == TERMINATED
228    }
229}
230
231// A note on safety:
232//
233//  1. We remove the timer from the heap before we drop TimerState, and TimerState is pinned, so
234//     it's safe to store pointers in the heap i.e. the pointers are live since we make sure we
235//     remove them before dropping `TimerState`.
236//
237//  2. Provided we do #1, it is always safe to access the atomic fields of TimerState.
238//
239//  3. Once the timer has been registered, it is safe to access the non-atomic fields of TimerState
240//     whilst holding the lock on `Timers::inner`.
241#[derive(Copy, Clone, Debug)]
242struct StateRef(*const TimerState);
243
244// SAFETY: See the notes above regarding safety.
245unsafe impl Send for StateRef {}
246unsafe impl Sync for StateRef {}
247
248impl StateRef {
249    fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
250        // SAFETY: `inner` is locked.
251        unsafe {
252            // As soon as we set the state to FIRED, the heap no longer owns the timer and it might
253            // be re-registered.  This store is safe to be Relaxed because `AtomicWaker::take` has a
254            // Release barrier, so the store can't be reordered after it, and therefore we can be
255            // certain that another thread which re-registers the waker will see the state is FIRED
256            // (and will interpret that as meaning that the task should not block and instead
257            // immediately complete; see `Timers::poll`).
258            (*self.0).state.store(FIRED, Ordering::Relaxed);
259            (*self.0).waker.take()
260        }
261    }
262
263    // # Safety
264    //
265    // `Timers::inner` must be locked.
266    unsafe fn nanos(&self) -> i64 {
267        *(*self.0).nanos.get()
268    }
269
270    // # Safety
271    //
272    // `Timers::inner` must be locked.
273    unsafe fn nanos_mut(&mut self) -> &mut i64 {
274        &mut *(*self.0).nanos.get()
275    }
276
277    // # Safety
278    //
279    // `Timers::inner` must be locked.
280    unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
281        std::mem::replace(&mut *(*self.0).index.get(), index)
282    }
283}
284
285/// An asynchronous interval timer.
286///
287/// This is a stream of events resolving at a rate of once-per interval.  This generates an event
288/// for *every* elapsed duration, even if multiple have elapsed since last polled.
289///
290/// TODO(https://fxbug.dev/375632319): This is lack of BootInstant support.
291#[derive(Debug)]
292#[must_use = "streams do nothing unless polled"]
293pub struct Interval {
294    timer: Pin<Box<Timer>>,
295    next: MonotonicInstant,
296    duration: zx::MonotonicDuration,
297}
298
299impl Interval {
300    /// Create a new `Interval` which yields every `duration`.
301    pub fn new(duration: zx::MonotonicDuration) -> Self {
302        let next = MonotonicInstant::after(duration);
303        Interval { timer: Box::pin(Timer::new(next)), next, duration }
304    }
305}
306
307impl FusedStream for Interval {
308    fn is_terminated(&self) -> bool {
309        // `Interval` never yields `None`
310        false
311    }
312}
313
314impl Stream for Interval {
315    type Item = ();
316    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
317        ready!(self.timer.poll_unpin(cx));
318        let next = self.next + self.duration;
319        self.timer.as_mut().reset(next);
320        self.next = next;
321        Poll::Ready(Some(()))
322    }
323}
324
325pub(crate) struct Timers<T: TimeInterface> {
326    inner: Mutex<Inner>,
327
328    // We can't easily use ReceiverRegistration because there would be a circular dependency we'd
329    // have to break: Executor -> Timers -> ReceiverRegistration -> EHandle -> ScopeRef ->
330    // Executor, so instead we just store the port key and then change the executor to drop the
331    // registration at the appropriate place.
332    port_key: u64,
333
334    fake: bool,
335
336    timer: zx::Timer<T::Timeline>,
337}
338
339struct Inner {
340    // The queue of active timer objects.
341    timers: Heap,
342
343    // The last deadline we set on the zircon timer, or None if the queue was empty and the timer
344    // was canceled most recently.
345    last_deadline: Option<i64>,
346
347    // True if there's a pending async_wait.
348    async_wait: bool,
349}
350
351impl<T: TimeInterface> Timers<T> {
352    pub fn new(port_key: u64, fake: bool) -> Self {
353        Self {
354            inner: Mutex::new(Inner {
355                timers: Heap::default(),
356                last_deadline: None,
357                async_wait: false,
358            }),
359            port_key,
360            fake,
361            timer: T::create_timer(),
362        }
363    }
364
365    pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
366        let nanos = time.into_nanos();
367        Timer(TimerState {
368            timers: self.clone(),
369            nanos: UnsafeCell::new(nanos),
370            waker: AtomicWaker::new(),
371            state: AtomicU8::new(UNREGISTERED),
372            index: UnsafeCell::new(HeapIndex::NULL),
373            _pinned: PhantomPinned,
374        })
375    }
376
377    /// Ensures the underlying Zircon timer has been correctly set or canceled after
378    /// the queue has been updated.
379    ///
380    /// # Safety
381    ///
382    /// Callers must ensure that `self.inner` is locked before calling this method.
383    fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
384        // Our new deadline is the deadline for the front of the queue, or no deadline (infinite) if
385        // there is no front of the queue.
386        let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
387
388        // If the effective deadline of the queue has changed, reprogram the zircon timer's
389        // deadline.
390        if new_deadline != inner.last_deadline {
391            inner.last_deadline = new_deadline;
392            match inner.last_deadline {
393                Some(deadline) => {
394                    self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
395                }
396                None => self.timer.cancel().unwrap(),
397            }
398        }
399
400        // If this is being called while processing the timer packet from a previous async wait,
401        // then clear the async wait flag.  This is the very last thing we need to do, so this async
402        // wait operation is effectively over.
403        if from_receive_packet {
404            inner.async_wait = false;
405        }
406
407        // If we have a valid timeout, but we have no in-flight async wait operation, post a new
408        // one.
409        if inner.last_deadline.is_some() && !inner.async_wait {
410            if self.fake {
411                // Clear the signal used for fake timers so that we can use it to trigger
412                // next time.
413                self.timer.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
414            }
415
416            self.timer
417                .wait_async_handle(
418                    EHandle::local().port(),
419                    self.port_key,
420                    if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
421                    zx::WaitAsyncOpts::empty(),
422                )
423                .unwrap();
424
425            inner.async_wait = true;
426        }
427    }
428
429    pub fn port_key(&self) -> u64 {
430        self.port_key
431    }
432
433    /// Wakes timers that should be firing now.  Returns true if any timers were woken.
434    pub fn wake_timers(&self) -> bool {
435        self.wake_timers_impl(false)
436    }
437
438    fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
439        let now = T::now();
440        let mut timers_woken = false;
441
442        loop {
443            let waker = {
444                let mut inner = self.inner.lock();
445
446                // SAFETY: `inner` is locked.
447                let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
448                if deadline.is_some_and(|d| d <= now) {
449                    let timer = inner.timers.pop().unwrap();
450                    timer.into_waker(&mut inner)
451                } else {
452                    // We are now finished (one way or the other). Setup the underlying Zircon timer
453                    // to reflect the new state of the queue, and break out of the loop.
454                    //
455                    // When processing a timer packet (from_receive_packet is true), it is very
456                    // important that this always be the last thing we do before breaking out of the
457                    // loop (dropping the lock in the process) for good.
458                    //
459                    // Failing to do this at the end can lead to the timer queue stalling.  Doing it
460                    // early (when we are dispatching expired timers) can lead to an ever
461                    // multiplying army of posted async waits.
462                    //
463                    // See https://g-issues.fuchsia.dev/issues/396173066 for details.
464                    self.setup_zircon_timer(&mut inner, from_receive_packet);
465                    break;
466                }
467            };
468            if let Some(waker) = waker {
469                waker.wake()
470            }
471            timers_woken = true;
472        }
473        timers_woken
474    }
475
476    /// Wakes the next timer and returns its time.
477    pub fn wake_next_timer(&self) -> Option<T> {
478        let (nanos, waker) = {
479            let mut inner = self.inner.lock();
480            let Some(timer) = inner.timers.pop() else { return None };
481            // SAFETY: `inner` is locked.
482            let nanos = unsafe { timer.nanos() };
483            (nanos, timer.into_waker(&mut inner))
484        };
485        if let Some(waker) = waker {
486            waker.wake();
487        }
488        Some(T::from_nanos(nanos))
489    }
490
491    /// Returns the next timer due to expire.
492    pub fn next_timer(&self) -> Option<T> {
493        // SAFETY: `inner` is locked.
494        self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
495    }
496
497    /// If there's a timer ready, sends a notification to wake up the receiver.
498    ///
499    /// # Panics
500    ///
501    /// This will panic if we are not using fake time.
502    pub fn maybe_notify(&self, now: T) {
503        assert!(self.fake, "calling this function requires using fake time.");
504        // SAFETY: `inner` is locked.
505        if self
506            .inner
507            .lock()
508            .timers
509            .peek()
510            .map_or(false, |state| unsafe { state.nanos() } <= now.into_nanos())
511        {
512            self.timer.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
513        }
514    }
515}
516
517impl<T: TimeInterface> PacketReceiver for Timers<T> {
518    fn receive_packet(&self, _packet: zx::Packet) {
519        self.wake_timers_impl(true);
520    }
521}
522
523// See comments on the implementation below.
524trait TimersInterface: Send + Sync + 'static {
525    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
526    fn unregister(&self, state: &TimerState);
527    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
528}
529
530impl<T: TimeInterface> TimersInterface for Timers<T> {
531    // # Safety
532    //
533    // `unregister` must be called before `Timer` is dropped.
534    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
535        // See https://docs.rs/futures/0.3.5/futures/task/struct.AtomicWaker.html
536        // for more information.
537        // Quick check to avoid registration if already done.
538        //
539        // This is safe to be Relaxed because `AtomicWaker::register` has barriers which means that
540        // the load further down can't be moved before registering the waker, which means we can't
541        // miss the timer firing.  If the timer isn't registered, the time might have been reset but
542        // that would have been by the same task, so there should be no ordering issue there.  If we
543        // then try and register the timer, we take the lock on `inner` so there will be barriers
544        // there.
545        let state = timer.0.state.load(Ordering::Relaxed);
546
547        if state == TERMINATED {
548            return Poll::Ready(());
549        }
550
551        if state == FIRED {
552            timer.0.state.store(TERMINATED, Ordering::Relaxed);
553            return Poll::Ready(());
554        }
555
556        if state == UNREGISTERED {
557            // SAFETY: The state is UNREGISTERED, so we have exclusive access.
558            let nanos = unsafe { *timer.0.nanos.get() };
559            if nanos <= T::now() {
560                timer.0.state.store(FIRED, Ordering::Relaxed);
561                return Poll::Ready(());
562            }
563            let mut inner = self.inner.lock();
564
565            // We store a pointer to `timer` here. This is safe to do because `timer` is pinned, and
566            // we always make sure we call `unregister` before `timer` is dropped.
567            inner.timers.push(StateRef(&timer.0));
568
569            // Now that we have added a new timer to the queue, setup the
570            // underlying zircon timer to reflect the new state of the queue.
571            self.setup_zircon_timer(&mut inner, false);
572
573            timer.0.state.store(REGISTERED, Ordering::Relaxed);
574        }
575
576        timer.0.waker.register(cx.waker());
577
578        // Now that we've registered a waker, we need to check to see if the timer has been marked
579        // as FIRED by another thread in the meantime (e.g. in StateRef::into_waker).  In that case
580        // the timer is never going to fire again as it is no longer managed by the timer heap, so
581        // the timer's task would become Pending but nothing would wake it up later.
582        // Loading the state *must* happen after the above `AtomicWaker::register` (which
583        // establishes an Acquire barrier, preventing the below load from being reordered above it),
584        // or else we could racily hit the above scenario.
585        let state = timer.0.state.load(Ordering::Relaxed);
586        match state {
587            FIRED => {
588                timer.0.state.store(TERMINATED, Ordering::Relaxed);
589                Poll::Ready(())
590            }
591            REGISTERED => Poll::Pending,
592            // TERMINATED is only set in `poll` which has exclusive access to the task (&mut
593            // Context).
594            // UNREGISTERED would indicate a logic bug somewhere.
595            _ => {
596                unreachable!();
597            }
598        }
599    }
600
601    fn unregister(&self, timer: &TimerState) {
602        if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
603            // If the timer was never registered, then we have exclusive access and we can skip the
604            // rest of this (avoiding the lock on `inner`).
605            // We cannot early-exit if the timer is FIRED or TERMINATED because then we could race
606            // with another thread that is actively using the timer object, and if this call
607            // completes before it blocks on `inner`, then the timer's resources could be
608            // deallocated, which would result in a use-after-free on the other thread.
609            return;
610        }
611        let mut inner = self.inner.lock();
612        // SAFETY: `inner` is locked.
613        let index = unsafe { *timer.index.get() };
614        if let Some(index) = index.get() {
615            inner.timers.remove(index);
616            if index == 0 {
617                // The front of the queue just changed.  Make sure to update the underlying zircon
618                // timer state to match the new queue state.
619                self.setup_zircon_timer(&mut inner, false);
620            }
621            timer.state.store(UNREGISTERED, Ordering::Relaxed);
622        }
623    }
624
625    /// Returns true if the timer was successfully reset.
626    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
627        let mut inner = self.inner.lock();
628        // SAFETY: `inner` is locked.
629        let index = unsafe { *timer.index.get() };
630        if let Some(old_index) = index.get() {
631            if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
632                // If the timer has moved into or out-of the front queue position, update the
633                // underlying zircon timer to reflect the new queue state.
634                self.setup_zircon_timer(&mut inner, false);
635            }
636            timer.state.store(REGISTERED, Ordering::Relaxed);
637            true
638        } else {
639            false
640        }
641    }
642}
643
644#[derive(Default)]
645struct Heap(Vec<StateRef>);
646
647// BinaryHeap doesn't support removal, and BTreeSet ends up increasing binary size significantly,
648// so we roll our own binary heap.
649impl Heap {
650    fn push(&mut self, mut timer: StateRef) {
651        let index = self.0.len();
652        self.0.push(timer);
653        // SAFETY: `inner` is locked.
654        unsafe {
655            timer.set_index(index.into());
656        }
657        self.fix_up(index);
658    }
659
660    fn peek(&self) -> Option<&StateRef> {
661        self.0.first()
662    }
663
664    fn pop(&mut self) -> Option<StateRef> {
665        if let Some(&first) = self.0.first() {
666            self.remove(0);
667            Some(first)
668        } else {
669            None
670        }
671    }
672
673    fn swap(&mut self, a: usize, b: usize) {
674        self.0.swap(a, b);
675        // SAFETY: `inner` is locked.
676        unsafe {
677            self.0[a].set_index(a.into());
678            self.0[b].set_index(b.into());
679        }
680    }
681
682    /// Resets the timer at the given index to the new time and returns the new index.
683    fn reset(&mut self, index: usize, nanos: i64) -> usize {
684        // SAFETY: `inner` is locked.
685        if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
686            self.fix_up(index)
687        } else {
688            self.fix_down(index)
689        }
690    }
691
692    fn remove(&mut self, index: usize) {
693        // SAFETY: `inner` is locked.
694        unsafe {
695            let old_index = self.0[index].set_index(HeapIndex::NULL);
696            debug_assert_eq!(old_index, index.into());
697        }
698
699        // Swap the item at slot `index` to the end of the vector so we can truncate it away, and
700        // then swap the previously last item into the correct spot.
701        let last = self.0.len() - 1;
702        if index < last {
703            let fix_up;
704            unsafe {
705                // SAFETY: `inner` is locked.
706                fix_up = self.0[last].nanos() < self.0[index].nanos();
707                self.0[index] = self.0[last];
708                self.0[index].set_index(index.into());
709            };
710            self.0.truncate(last);
711            if fix_up {
712                self.fix_up(index);
713            } else {
714                self.fix_down(index);
715            }
716        } else {
717            self.0.truncate(last);
718        };
719    }
720
721    /// Returns the new index
722    fn fix_up(&mut self, mut index: usize) -> usize {
723        while index > 0 {
724            let parent = (index - 1) / 2;
725            // SAFETY: `inner` is locked.
726            if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
727                return index;
728            }
729            self.swap(parent, index);
730            index = parent;
731        }
732        index
733    }
734
735    /// Returns the new index
736    fn fix_down(&mut self, mut index: usize) -> usize {
737        let len = self.0.len();
738        loop {
739            let left = index * 2 + 1;
740            if left >= len {
741                return index;
742            }
743
744            let mut swap_with = None;
745
746            // SAFETY: `inner` is locked.
747            unsafe {
748                let mut nanos = self.0[index].nanos();
749                let left_nanos = self.0[left].nanos();
750                if left_nanos < nanos {
751                    swap_with = Some(left);
752                    nanos = left_nanos;
753                }
754                let right = left + 1;
755                if right < len && self.0[right].nanos() < nanos {
756                    swap_with = Some(right);
757                }
758            }
759
760            let Some(swap_with) = swap_with else { return index };
761            self.swap(index, swap_with);
762            index = swap_with;
763        }
764    }
765}
766
767#[cfg(test)]
768mod test {
769    use super::*;
770    use crate::{LocalExecutor, SendExecutor, Task, TestExecutor};
771    use assert_matches::assert_matches;
772    use futures::channel::oneshot::channel;
773    use futures::future::Either;
774    use futures::prelude::*;
775    use rand::seq::SliceRandom;
776    use rand::{thread_rng, Rng};
777    use std::future::poll_fn;
778    use std::pin::pin;
779    use zx::MonotonicDuration;
780
781    trait TestTimeInterface:
782        TimeInterface
783        + WakeupTime
784        + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
785        + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
786    {
787        fn after(duration: zx::Duration<Self::Timeline>) -> Self;
788    }
789
790    impl TestTimeInterface for MonotonicInstant {
791        fn after(duration: zx::MonotonicDuration) -> Self {
792            Self::after(duration)
793        }
794    }
795
796    impl TestTimeInterface for BootInstant {
797        fn after(duration: zx::BootDuration) -> Self {
798            Self::after(duration)
799        }
800    }
801
802    fn test_shorter_fires_first<T: TestTimeInterface>() {
803        let mut exec = LocalExecutor::new();
804        let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
805        let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
806        match exec.run_singlethreaded(future::select(shorter, longer)) {
807            Either::Left(_) => {}
808            Either::Right(_) => panic!("wrong timer fired"),
809        }
810    }
811
812    #[test]
813    fn shorter_fires_first() {
814        test_shorter_fires_first::<MonotonicInstant>();
815        test_shorter_fires_first::<BootInstant>();
816    }
817
818    fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
819        SendExecutor::new(4).run(async {
820            let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
821            let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
822            match future::select(shorter, longer).await {
823                Either::Left(_) => {}
824                Either::Right(_) => panic!("wrong timer fired"),
825            }
826        });
827    }
828
829    #[test]
830    fn shorter_fires_first_multithreaded() {
831        test_shorter_fires_first_multithreaded::<MonotonicInstant>();
832        test_shorter_fires_first_multithreaded::<BootInstant>();
833    }
834
835    fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
836        let mut exec = TestExecutor::new();
837        let now = T::now();
838        let before = pin!(Timer::new(T::from_nanos(now - 1)));
839        let after = pin!(Timer::new(T::from_nanos(now + 1)));
840        assert_matches!(
841            exec.run_singlethreaded(futures::future::select(before, after)),
842            Either::Left(_),
843            "Timer in the past should fire first"
844        );
845    }
846
847    #[test]
848    fn timer_before_now_fires_immediately() {
849        test_timer_before_now_fires_immediately::<MonotonicInstant>();
850        test_timer_before_now_fires_immediately::<BootInstant>();
851    }
852
853    #[test]
854    fn fires_after_timeout() {
855        let mut exec = TestExecutor::new_with_fake_time();
856        exec.set_fake_time(MonotonicInstant::from_nanos(0));
857        let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
858        let mut future = pin!(Timer::new(deadline));
859        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
860        exec.set_fake_time(deadline);
861        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
862    }
863
864    #[test]
865    fn interval() {
866        let mut exec = TestExecutor::new_with_fake_time();
867        let start = MonotonicInstant::from_nanos(0);
868        exec.set_fake_time(start);
869
870        let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
871        let mut future = pin!({
872            let counter = counter.clone();
873            Interval::new(MonotonicDuration::from_seconds(5))
874                .map(move |()| {
875                    counter.fetch_add(1, Ordering::SeqCst);
876                })
877                .collect::<()>()
878        });
879
880        // PollResult for the first time before the timer runs
881        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
882        assert_eq!(0, counter.load(Ordering::SeqCst));
883
884        // Pretend to wait until the next timer
885        let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
886        assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
887        exec.set_fake_time(first_deadline);
888        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
889        assert_eq!(1, counter.load(Ordering::SeqCst));
890
891        // PollResulting again before the timer runs shouldn't produce another item from the stream
892        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
893        assert_eq!(1, counter.load(Ordering::SeqCst));
894
895        // "Wait" until the next timeout and poll again: expect another item from the stream
896        let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
897        exec.set_fake_time(second_deadline);
898        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
899        assert_eq!(2, counter.load(Ordering::SeqCst));
900
901        assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
902    }
903
904    #[test]
905    fn timer_fake_time() {
906        let mut exec = TestExecutor::new_with_fake_time();
907        exec.set_fake_time(MonotonicInstant::from_nanos(0));
908
909        let mut timer =
910            pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
911        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
912
913        exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
914        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
915    }
916
917    fn create_timers(
918        timers: &Arc<Timers<MonotonicInstant>>,
919        nanos: &[i64],
920        timer_futures: &mut Vec<Pin<Box<Timer>>>,
921    ) {
922        let waker = futures::task::noop_waker();
923        let mut cx = Context::from_waker(&waker);
924        for &n in nanos {
925            let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
926            let _ = timer.poll_unpin(&mut cx);
927            timer_futures.push(timer);
928        }
929    }
930
931    #[test]
932    fn timer_heap() {
933        let _exec = TestExecutor::new_with_fake_time();
934        let timers = Arc::new(Timers::<MonotonicInstant>::new(0, true));
935
936        let mut timer_futures = Vec::new();
937        let mut nanos: Vec<_> = (0..1000).collect();
938        let mut rng = thread_rng();
939        nanos.shuffle(&mut rng);
940
941        create_timers(&timers, &nanos, &mut timer_futures);
942
943        // Make sure the timers fire in the correct order.
944        for i in 0..1000 {
945            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
946        }
947
948        timer_futures.clear();
949        create_timers(&timers, &nanos, &mut timer_futures);
950
951        // Remove half of them in random order, and ensure the remaining timers are correctly
952        // ordered.
953        timer_futures.shuffle(&mut rng);
954        timer_futures.truncate(500);
955        let mut last_time = None;
956        for _ in 0..500 {
957            let time = timers.wake_next_timer().unwrap();
958            if let Some(last_time) = last_time {
959                assert!(last_time <= time);
960            }
961            last_time = Some(time);
962        }
963        assert_eq!(timers.wake_next_timer(), None);
964
965        timer_futures = vec![];
966        create_timers(&timers, &nanos, &mut timer_futures);
967
968        // Replace them all in random order.
969        timer_futures.shuffle(&mut rng);
970        let mut nanos: Vec<_> = (1000..2000).collect();
971        nanos.shuffle(&mut rng);
972
973        for (fut, n) in timer_futures.iter_mut().zip(nanos) {
974            fut.as_mut().reset(MonotonicInstant::from_nanos(n));
975        }
976
977        // Check they all get changed and now fire in the correct order.
978        for i in 1000..2000 {
979            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
980        }
981    }
982
983    #[test]
984    fn timer_heap_with_same_time() {
985        let _exec = TestExecutor::new_with_fake_time();
986        let timers = Arc::new(Timers::<MonotonicInstant>::new(0, true));
987
988        let mut timer_futures = Vec::new();
989        let mut nanos: Vec<_> = (1..100).collect();
990        let mut rng = thread_rng();
991        nanos.shuffle(&mut rng);
992
993        create_timers(&timers, &nanos, &mut timer_futures);
994
995        // Create some timers with the same time.
996        let time = rng.gen_range(0..101);
997        let same_time = [time; 100];
998        create_timers(&timers, &same_time, &mut timer_futures);
999
1000        nanos.extend(&same_time);
1001        nanos.sort();
1002
1003        for n in nanos {
1004            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1005        }
1006    }
1007
1008    #[test]
1009    fn timer_reset_to_earlier_time() {
1010        let mut exec = LocalExecutor::new();
1011
1012        for _ in 0..100 {
1013            let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1014            let (sender, receiver) = channel();
1015            let task = Task::spawn(async move {
1016                let mut timer = pin!(Timer::new(instant));
1017                let mut receiver = pin!(receiver.fuse());
1018                poll_fn(|cx| loop {
1019                    if timer.as_mut().poll_unpin(cx).is_ready() {
1020                        return Poll::Ready(());
1021                    }
1022                    if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1023                        timer
1024                            .as_mut()
1025                            .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1026                    } else {
1027                        return Poll::Pending;
1028                    }
1029                })
1030                .await;
1031            });
1032            sender.send(()).unwrap();
1033
1034            exec.run_singlethreaded(task);
1035
1036            if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1037                return;
1038            }
1039        }
1040
1041        panic!("Timer fired late in all 100 attempts");
1042    }
1043}