fuchsia_async/runtime/fuchsia/
timer.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Support for creating futures that represent timers.
6//!
7//! This module contains the `Timer` type which is a future that will resolve
8//! at a particular point in the future.
9
10use super::executor::Executor;
11use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
12use crate::{PacketReceiver, ReceiverRegistration};
13use fuchsia_sync::Mutex;
14
15use futures::future::FusedFuture;
16use futures::stream::FusedStream;
17use futures::task::{AtomicWaker, Context};
18use futures::{FutureExt, Stream};
19use std::cell::UnsafeCell;
20use std::fmt;
21use std::future::Future;
22use std::marker::PhantomPinned;
23use std::pin::Pin;
24use std::sync::atomic::{AtomicU8, Ordering};
25use std::sync::Arc;
26use std::task::{ready, Poll, Waker};
27use zx::AsHandleRef as _;
28
29pub trait TimeInterface:
30    Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
31{
32    type Timeline: zx::Timeline + Send + Sync + 'static;
33
34    fn from_nanos(nanos: i64) -> Self;
35    fn into_nanos(self) -> i64;
36    fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
37    fn now() -> i64;
38    fn create_timer() -> zx::Timer<Self::Timeline>;
39}
40
41impl TimeInterface for MonotonicInstant {
42    type Timeline = zx::MonotonicTimeline;
43
44    fn from_nanos(nanos: i64) -> Self {
45        Self::from_nanos(nanos)
46    }
47
48    fn into_nanos(self) -> i64 {
49        self.into_nanos()
50    }
51
52    fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
53        zx::MonotonicInstant::from_nanos(nanos)
54    }
55
56    fn now() -> i64 {
57        EHandle::local().inner().now().into_nanos()
58    }
59
60    fn create_timer() -> zx::Timer<Self::Timeline> {
61        zx::Timer::<Self::Timeline>::create()
62    }
63}
64
65impl TimeInterface for BootInstant {
66    type Timeline = zx::BootTimeline;
67
68    fn from_nanos(nanos: i64) -> Self {
69        Self::from_nanos(nanos)
70    }
71
72    fn into_nanos(self) -> i64 {
73        self.into_nanos()
74    }
75
76    fn zx_instant(nanos: i64) -> zx::BootInstant {
77        zx::BootInstant::from_nanos(nanos)
78    }
79
80    fn now() -> i64 {
81        EHandle::local().inner().boot_now().into_nanos()
82    }
83
84    fn create_timer() -> zx::Timer<Self::Timeline> {
85        zx::Timer::<Self::Timeline>::create()
86    }
87}
88
89impl WakeupTime for std::time::Instant {
90    fn into_timer(self) -> Timer {
91        let now_as_instant = std::time::Instant::now();
92        let now_as_time = MonotonicInstant::now();
93        EHandle::local()
94            .mono_timers()
95            .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
96    }
97}
98
99impl WakeupTime for MonotonicInstant {
100    fn into_timer(self) -> Timer {
101        EHandle::local().mono_timers().new_timer(self)
102    }
103}
104
105impl WakeupTime for BootInstant {
106    fn into_timer(self) -> Timer {
107        EHandle::local().boot_timers().new_timer(self)
108    }
109}
110
111impl WakeupTime for zx::MonotonicInstant {
112    fn into_timer(self) -> Timer {
113        EHandle::local().mono_timers().new_timer(self.into())
114    }
115}
116
117impl WakeupTime for zx::BootInstant {
118    fn into_timer(self) -> Timer {
119        EHandle::local().boot_timers().new_timer(self.into())
120    }
121}
122
123/// An asynchronous timer.
124#[must_use = "futures do nothing unless polled"]
125pub struct Timer(TimerState);
126
127impl Timer {
128    /// Create a new timer scheduled to fire at `time`.
129    pub fn new(time: impl WakeupTime) -> Self {
130        time.into_timer()
131    }
132
133    /// Reset the `Timer` to a fire at a new time.
134    pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
135        let nanos = time.into_nanos();
136        // If in the UNREGISTERED state, we can skip the call to `try_reset_timer` because the timer
137        // has *never* been registered and so there's no danger of another thread having access to
138        // this timer. In all other states, including the FIRED and TERMINATED states, we must use
139        // `try_reset_timer` as that will take a lock and guarantee that other threads are not
140        // concurrently accessing the timer.
141        //
142        // This can be Relaxed because because there are no loads or stores that follow that could
143        // possibly be reordered before here that matter: the first thing `try_reset_timer` does is
144        // take a lock which will have its own memory barriers, and the store to the time is next
145        // going to be read by this same task prior to taking the lock in `Timers::inner`.
146        if self.0.state.load(Ordering::Relaxed) == UNREGISTERED
147            || !self.0.timers.try_reset_timer(&self.0, nanos)
148        {
149            // SAFETY: This is safe because we know the timer isn't registered which means we truly
150            // have exclusive access to TimerState.
151            unsafe { *self.0.nanos.get() = nanos };
152            self.0.state.store(UNREGISTERED, Ordering::Relaxed);
153        }
154    }
155}
156
157impl fmt::Debug for Timer {
158    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
159        f.debug_struct("Timer").field("time", &self.0.nanos).finish()
160    }
161}
162
163impl Drop for Timer {
164    fn drop(&mut self) {
165        self.0.timers.unregister(&self.0);
166    }
167}
168
169impl Future for Timer {
170    type Output = ();
171    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
172        // SAFETY: We call `unregister` when `Timer` is dropped.
173        unsafe { self.0.timers.poll(self.as_ref(), cx) }
174    }
175}
176
177struct TimerState {
178    timers: Arc<dyn TimersInterface>,
179
180    // This is safe to access/mutate if the lock on `Timers::inner` is held.
181    nanos: UnsafeCell<i64>,
182
183    waker: AtomicWaker,
184    state: AtomicU8,
185
186    // Holds the index in the heap.  This is safe to access/mutate if the lock on `Timers::inner` is
187    // held.
188    index: UnsafeCell<HeapIndex>,
189
190    // StateRef stores a pointer to `TimerState`, so this must be pinned.
191    _pinned: PhantomPinned,
192}
193
194// SAFETY: TimerState is thread-safe.  See the safety comments elsewhere.
195unsafe impl Send for TimerState {}
196unsafe impl Sync for TimerState {}
197
198// Set when the timer is not registered in the heap.
199const UNREGISTERED: u8 = 0;
200
201// Set when the timer is in the heap.
202const REGISTERED: u8 = 1;
203
204// Set when the timer has fired.
205const FIRED: u8 = 2;
206
207// Set when the timer is terminated.
208const TERMINATED: u8 = 3;
209
210/// An index in the heap.
211#[derive(Clone, Copy, Debug, Eq, PartialEq)]
212struct HeapIndex(usize);
213
214impl HeapIndex {
215    const NULL: HeapIndex = HeapIndex(usize::MAX);
216
217    fn get(&self) -> Option<usize> {
218        if *self == HeapIndex::NULL {
219            None
220        } else {
221            Some(self.0)
222        }
223    }
224}
225
226impl From<usize> for HeapIndex {
227    fn from(value: usize) -> Self {
228        Self(value)
229    }
230}
231
232impl FusedFuture for Timer {
233    fn is_terminated(&self) -> bool {
234        self.0.state.load(Ordering::Relaxed) == TERMINATED
235    }
236}
237
238// A note on safety:
239//
240//  1. We remove the timer from the heap before we drop TimerState, and TimerState is pinned, so
241//     it's safe to store pointers in the heap i.e. the pointers are live since we make sure we
242//     remove them before dropping `TimerState`.
243//
244//  2. Provided we do #1, it is always safe to access the atomic fields of TimerState.
245//
246//  3. Once the timer has been registered, it is safe to access the non-atomic fields of TimerState
247//     whilst holding the lock on `Timers::inner`.
248#[derive(Copy, Clone, Debug)]
249struct StateRef(*const TimerState);
250
251// SAFETY: See the notes above regarding safety.
252unsafe impl Send for StateRef {}
253unsafe impl Sync for StateRef {}
254
255impl StateRef {
256    fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
257        // SAFETY: `inner` is locked.
258        unsafe {
259            // As soon as we set the state to FIRED, the heap no longer owns the timer and it might
260            // be re-registered.  This store is safe to be Relaxed because `AtomicWaker::take` has a
261            // Release barrier, so the store can't be reordered after it, and therefore we can be
262            // certain that another thread which re-registers the waker will see the state is FIRED
263            // (and will interpret that as meaning that the task should not block and instead
264            // immediately complete; see `Timers::poll`).
265            (*self.0).state.store(FIRED, Ordering::Relaxed);
266            (*self.0).waker.take()
267        }
268    }
269
270    // # Safety
271    //
272    // `Timers::inner` must be locked.
273    unsafe fn nanos(&self) -> i64 {
274        *(*self.0).nanos.get()
275    }
276
277    // # Safety
278    //
279    // `Timers::inner` must be locked.
280    unsafe fn nanos_mut(&mut self) -> &mut i64 {
281        &mut *(*self.0).nanos.get()
282    }
283
284    // # Safety
285    //
286    // `Timers::inner` must be locked.
287    unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
288        std::mem::replace(&mut *(*self.0).index.get(), index)
289    }
290}
291
292/// An asynchronous interval timer.
293///
294/// This is a stream of events resolving at a rate of once-per interval.  This generates an event
295/// for *every* elapsed duration, even if multiple have elapsed since last polled.
296///
297/// TODO(https://fxbug.dev/375632319): This is lack of BootInstant support.
298#[derive(Debug)]
299#[must_use = "streams do nothing unless polled"]
300pub struct Interval {
301    timer: Pin<Box<Timer>>,
302    next: MonotonicInstant,
303    duration: zx::MonotonicDuration,
304}
305
306impl Interval {
307    /// Create a new `Interval` which yields every `duration`.
308    pub fn new(duration: zx::MonotonicDuration) -> Self {
309        let next = MonotonicInstant::after(duration);
310        Interval { timer: Box::pin(Timer::new(next)), next, duration }
311    }
312}
313
314impl FusedStream for Interval {
315    fn is_terminated(&self) -> bool {
316        // `Interval` never yields `None`
317        false
318    }
319}
320
321impl Stream for Interval {
322    type Item = ();
323    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
324        ready!(self.timer.poll_unpin(cx));
325        let next = self.next + self.duration;
326        self.timer.as_mut().reset(next);
327        self.next = next;
328        Poll::Ready(Some(()))
329    }
330}
331
332pub(crate) struct Timers<T: TimeInterface> {
333    inner: Mutex<Inner>,
334
335    fake: bool,
336
337    timer: zx::Timer<T::Timeline>,
338
339    // This will form a reference cycle which the caller *must* break by calling `deregister`.
340    receiver_registration: Mutex<Option<ReceiverRegistration<Arc<Self>>>>,
341}
342
343struct Inner {
344    // The queue of active timer objects.
345    timers: Heap,
346
347    // The last deadline we set on the zircon timer, or None if the queue was empty and the timer
348    // was canceled most recently.
349    last_deadline: Option<i64>,
350
351    // True if there's a pending async_wait.
352    async_wait: bool,
353
354    // The port key.
355    port_key: u64,
356}
357
358impl<T: TimeInterface> Timers<T> {
359    pub fn new(fake: bool) -> Self {
360        Self {
361            inner: Mutex::new(Inner {
362                timers: Heap::default(),
363                last_deadline: None,
364                async_wait: false,
365                port_key: 0,
366            }),
367            fake,
368            timer: T::create_timer(),
369            receiver_registration: Mutex::default(),
370        }
371    }
372
373    pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
374        let nanos = time.into_nanos();
375        Timer(TimerState {
376            timers: self.clone(),
377            nanos: UnsafeCell::new(nanos),
378            waker: AtomicWaker::new(),
379            state: AtomicU8::new(UNREGISTERED),
380            index: UnsafeCell::new(HeapIndex::NULL),
381            _pinned: PhantomPinned,
382        })
383    }
384
385    /// Registers the timers to receive packets.  This will establish a reference cycle that
386    /// the caller must break by calling `deregister`.
387    pub fn register(self: &Arc<Self>, executor: &Arc<Executor>) {
388        let key = self
389            .receiver_registration
390            .lock()
391            .get_or_insert_with(|| executor.receivers.register(executor.clone(), self.clone()))
392            .key();
393        self.inner.lock().port_key = key;
394    }
395
396    /// Deregisters the timers and breaks the reference cycle.
397    pub fn deregister(&self) {
398        *self.receiver_registration.lock() = None;
399    }
400
401    /// Ensures the underlying Zircon timer has been correctly set or canceled after
402    /// the queue has been updated.
403    ///
404    /// # Safety
405    ///
406    /// Callers must ensure that `self.inner` is locked before calling this method.
407    fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
408        // Our new deadline is the deadline for the front of the queue, or no deadline (infinite) if
409        // there is no front of the queue.
410        let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
411
412        // If the effective deadline of the queue has changed, reprogram the zircon timer's
413        // deadline.
414        if new_deadline != inner.last_deadline {
415            inner.last_deadline = new_deadline;
416            match inner.last_deadline {
417                Some(deadline) => {
418                    self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
419                }
420                None => self.timer.cancel().unwrap(),
421            }
422        }
423
424        // If this is being called while processing the timer packet from a previous async wait,
425        // then clear the async wait flag.  This is the very last thing we need to do, so this async
426        // wait operation is effectively over.
427        if from_receive_packet {
428            inner.async_wait = false;
429        }
430
431        // If we have a valid timeout, but we have no in-flight async wait operation, post a new
432        // one.
433        if inner.last_deadline.is_some() && !inner.async_wait {
434            if self.fake {
435                // Clear the signal used for fake timers so that we can use it to trigger
436                // next time.
437                self.timer.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
438            }
439
440            self.timer
441                .wait_async_handle(
442                    EHandle::local().port(),
443                    inner.port_key,
444                    if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
445                    zx::WaitAsyncOpts::empty(),
446                )
447                .unwrap();
448
449            inner.async_wait = true;
450        }
451    }
452
453    /// Wakes timers that should be firing now.  Returns true if any timers were woken.
454    pub fn wake_timers(&self) -> bool {
455        self.wake_timers_impl(false)
456    }
457
458    fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
459        let now = T::now();
460        let mut timers_woken = false;
461
462        loop {
463            let waker = {
464                let mut inner = self.inner.lock();
465
466                // SAFETY: `inner` is locked.
467                let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
468                if deadline.is_some_and(|d| d <= now) {
469                    let timer = inner.timers.pop().unwrap();
470                    timer.into_waker(&mut inner)
471                } else {
472                    // We are now finished (one way or the other). Setup the underlying Zircon timer
473                    // to reflect the new state of the queue, and break out of the loop.
474                    //
475                    // When processing a timer packet (from_receive_packet is true), it is very
476                    // important that this always be the last thing we do before breaking out of the
477                    // loop (dropping the lock in the process) for good.
478                    //
479                    // Failing to do this at the end can lead to the timer queue stalling.  Doing it
480                    // early (when we are dispatching expired timers) can lead to an ever
481                    // multiplying army of posted async waits.
482                    //
483                    // See https://g-issues.fuchsia.dev/issues/396173066 for details.
484                    self.setup_zircon_timer(&mut inner, from_receive_packet);
485                    break;
486                }
487            };
488            if let Some(waker) = waker {
489                waker.wake()
490            }
491            timers_woken = true;
492        }
493        timers_woken
494    }
495
496    /// Wakes the next timer and returns its time.
497    pub fn wake_next_timer(&self) -> Option<T> {
498        let (nanos, waker) = {
499            let mut inner = self.inner.lock();
500            let timer = inner.timers.pop()?;
501            // SAFETY: `inner` is locked.
502            let nanos = unsafe { timer.nanos() };
503            (nanos, timer.into_waker(&mut inner))
504        };
505        if let Some(waker) = waker {
506            waker.wake();
507        }
508        Some(T::from_nanos(nanos))
509    }
510
511    /// Returns the next timer due to expire.
512    pub fn next_timer(&self) -> Option<T> {
513        // SAFETY: `inner` is locked.
514        self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
515    }
516
517    /// If there's a timer ready, sends a notification to wake up the receiver.
518    ///
519    /// # Panics
520    ///
521    /// This will panic if we are not using fake time.
522    pub fn maybe_notify(&self, now: T) {
523        assert!(self.fake, "calling this function requires using fake time.");
524        // SAFETY: `inner` is locked.
525        if self
526            .inner
527            .lock()
528            .timers
529            .peek()
530            .is_some_and(|state| unsafe { state.nanos() } <= now.into_nanos())
531        {
532            self.timer.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
533        }
534    }
535}
536
537impl<T: TimeInterface> PacketReceiver for Timers<T> {
538    fn receive_packet(&self, _packet: zx::Packet) {
539        self.wake_timers_impl(true);
540    }
541}
542
543// See comments on the implementation below.
544trait TimersInterface: Send + Sync + 'static {
545    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
546    fn unregister(&self, state: &TimerState);
547    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
548}
549
550impl<T: TimeInterface> TimersInterface for Timers<T> {
551    // # Safety
552    //
553    // `unregister` must be called before `Timer` is dropped.
554    unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
555        // See https://docs.rs/futures/0.3.5/futures/task/struct.AtomicWaker.html
556        // for more information.
557        // Quick check to avoid registration if already done.
558        //
559        // This is safe to be Relaxed because `AtomicWaker::register` has barriers which means that
560        // the load further down can't be moved before registering the waker, which means we can't
561        // miss the timer firing.  If the timer isn't registered, the time might have been reset but
562        // that would have been by the same task, so there should be no ordering issue there.  If we
563        // then try and register the timer, we take the lock on `inner` so there will be barriers
564        // there.
565        let state = timer.0.state.load(Ordering::Relaxed);
566
567        if state == TERMINATED {
568            return Poll::Ready(());
569        }
570
571        if state == FIRED {
572            timer.0.state.store(TERMINATED, Ordering::Relaxed);
573            return Poll::Ready(());
574        }
575
576        if state == UNREGISTERED {
577            // SAFETY: The state is UNREGISTERED, so we have exclusive access.
578            let nanos = unsafe { *timer.0.nanos.get() };
579            if nanos <= T::now() {
580                timer.0.state.store(FIRED, Ordering::Relaxed);
581                return Poll::Ready(());
582            }
583            let mut inner = self.inner.lock();
584
585            // We store a pointer to `timer` here. This is safe to do because `timer` is pinned, and
586            // we always make sure we call `unregister` before `timer` is dropped.
587            inner.timers.push(StateRef(&timer.0));
588
589            // Now that we have added a new timer to the queue, setup the
590            // underlying zircon timer to reflect the new state of the queue.
591            self.setup_zircon_timer(&mut inner, false);
592
593            timer.0.state.store(REGISTERED, Ordering::Relaxed);
594        }
595
596        timer.0.waker.register(cx.waker());
597
598        // Now that we've registered a waker, we need to check to see if the timer has been marked
599        // as FIRED by another thread in the meantime (e.g. in StateRef::into_waker).  In that case
600        // the timer is never going to fire again as it is no longer managed by the timer heap, so
601        // the timer's task would become Pending but nothing would wake it up later.
602        // Loading the state *must* happen after the above `AtomicWaker::register` (which
603        // establishes an Acquire barrier, preventing the below load from being reordered above it),
604        // or else we could racily hit the above scenario.
605        let state = timer.0.state.load(Ordering::Relaxed);
606        match state {
607            FIRED => {
608                timer.0.state.store(TERMINATED, Ordering::Relaxed);
609                Poll::Ready(())
610            }
611            REGISTERED => Poll::Pending,
612            // TERMINATED is only set in `poll` which has exclusive access to the task (&mut
613            // Context).
614            // UNREGISTERED would indicate a logic bug somewhere.
615            _ => {
616                unreachable!();
617            }
618        }
619    }
620
621    fn unregister(&self, timer: &TimerState) {
622        if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
623            // If the timer was never registered, then we have exclusive access and we can skip the
624            // rest of this (avoiding the lock on `inner`).
625            // We cannot early-exit if the timer is FIRED or TERMINATED because then we could race
626            // with another thread that is actively using the timer object, and if this call
627            // completes before it blocks on `inner`, then the timer's resources could be
628            // deallocated, which would result in a use-after-free on the other thread.
629            return;
630        }
631        let mut inner = self.inner.lock();
632        // SAFETY: `inner` is locked.
633        let index = unsafe { *timer.index.get() };
634        if let Some(index) = index.get() {
635            inner.timers.remove(index);
636            if index == 0 {
637                // The front of the queue just changed.  Make sure to update the underlying zircon
638                // timer state to match the new queue state.
639                self.setup_zircon_timer(&mut inner, false);
640            }
641            timer.state.store(UNREGISTERED, Ordering::Relaxed);
642        }
643    }
644
645    /// Returns true if the timer was successfully reset.
646    fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
647        let mut inner = self.inner.lock();
648        // SAFETY: `inner` is locked.
649        let index = unsafe { *timer.index.get() };
650        if let Some(old_index) = index.get() {
651            if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
652                // If the timer has moved into or out-of the front queue position, update the
653                // underlying zircon timer to reflect the new queue state.
654                self.setup_zircon_timer(&mut inner, false);
655            }
656            timer.state.store(REGISTERED, Ordering::Relaxed);
657            true
658        } else {
659            false
660        }
661    }
662}
663
664#[derive(Default)]
665struct Heap(Vec<StateRef>);
666
667// BinaryHeap doesn't support removal, and BTreeSet ends up increasing binary size significantly,
668// so we roll our own binary heap.
669impl Heap {
670    fn push(&mut self, mut timer: StateRef) {
671        let index = self.0.len();
672        self.0.push(timer);
673        // SAFETY: `inner` is locked.
674        unsafe {
675            timer.set_index(index.into());
676        }
677        self.fix_up(index);
678    }
679
680    fn peek(&self) -> Option<&StateRef> {
681        self.0.first()
682    }
683
684    fn pop(&mut self) -> Option<StateRef> {
685        if let Some(&first) = self.0.first() {
686            self.remove(0);
687            Some(first)
688        } else {
689            None
690        }
691    }
692
693    fn swap(&mut self, a: usize, b: usize) {
694        self.0.swap(a, b);
695        // SAFETY: `inner` is locked.
696        unsafe {
697            self.0[a].set_index(a.into());
698            self.0[b].set_index(b.into());
699        }
700    }
701
702    /// Resets the timer at the given index to the new time and returns the new index.
703    fn reset(&mut self, index: usize, nanos: i64) -> usize {
704        // SAFETY: `inner` is locked.
705        if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
706            self.fix_up(index)
707        } else {
708            self.fix_down(index)
709        }
710    }
711
712    fn remove(&mut self, index: usize) {
713        // SAFETY: `inner` is locked.
714        unsafe {
715            let old_index = self.0[index].set_index(HeapIndex::NULL);
716            debug_assert_eq!(old_index, index.into());
717        }
718
719        // Swap the item at slot `index` to the end of the vector so we can truncate it away, and
720        // then swap the previously last item into the correct spot.
721        let last = self.0.len() - 1;
722        if index < last {
723            let fix_up;
724            unsafe {
725                // SAFETY: `inner` is locked.
726                fix_up = self.0[last].nanos() < self.0[index].nanos();
727                self.0[index] = self.0[last];
728                self.0[index].set_index(index.into());
729            };
730            self.0.truncate(last);
731            if fix_up {
732                self.fix_up(index);
733            } else {
734                self.fix_down(index);
735            }
736        } else {
737            self.0.truncate(last);
738        };
739    }
740
741    /// Returns the new index
742    fn fix_up(&mut self, mut index: usize) -> usize {
743        while index > 0 {
744            let parent = (index - 1) / 2;
745            // SAFETY: `inner` is locked.
746            if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
747                return index;
748            }
749            self.swap(parent, index);
750            index = parent;
751        }
752        index
753    }
754
755    /// Returns the new index
756    fn fix_down(&mut self, mut index: usize) -> usize {
757        let len = self.0.len();
758        loop {
759            let left = index * 2 + 1;
760            if left >= len {
761                return index;
762            }
763
764            let mut swap_with = None;
765
766            // SAFETY: `inner` is locked.
767            unsafe {
768                let mut nanos = self.0[index].nanos();
769                let left_nanos = self.0[left].nanos();
770                if left_nanos < nanos {
771                    swap_with = Some(left);
772                    nanos = left_nanos;
773                }
774                let right = left + 1;
775                if right < len && self.0[right].nanos() < nanos {
776                    swap_with = Some(right);
777                }
778            }
779
780            let Some(swap_with) = swap_with else { return index };
781            self.swap(index, swap_with);
782            index = swap_with;
783        }
784    }
785}
786
787#[cfg(test)]
788mod test {
789    use super::*;
790    use crate::{LocalExecutor, SendExecutor, Task, TestExecutor};
791    use assert_matches::assert_matches;
792    use futures::channel::oneshot::channel;
793    use futures::future::Either;
794    use futures::prelude::*;
795    use rand::seq::SliceRandom;
796    use rand::{rng, Rng};
797    use std::future::poll_fn;
798    use std::pin::pin;
799    use zx::MonotonicDuration;
800
801    trait TestTimeInterface:
802        TimeInterface
803        + WakeupTime
804        + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
805        + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
806    {
807        fn after(duration: zx::Duration<Self::Timeline>) -> Self;
808    }
809
810    impl TestTimeInterface for MonotonicInstant {
811        fn after(duration: zx::MonotonicDuration) -> Self {
812            Self::after(duration)
813        }
814    }
815
816    impl TestTimeInterface for BootInstant {
817        fn after(duration: zx::BootDuration) -> Self {
818            Self::after(duration)
819        }
820    }
821
822    fn test_shorter_fires_first<T: TestTimeInterface>() {
823        let mut exec = LocalExecutor::new();
824        let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
825        let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
826        match exec.run_singlethreaded(future::select(shorter, longer)) {
827            Either::Left(_) => {}
828            Either::Right(_) => panic!("wrong timer fired"),
829        }
830    }
831
832    #[test]
833    fn shorter_fires_first() {
834        test_shorter_fires_first::<MonotonicInstant>();
835        test_shorter_fires_first::<BootInstant>();
836    }
837
838    fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
839        SendExecutor::new(4).run(async {
840            let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
841            let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
842            match future::select(shorter, longer).await {
843                Either::Left(_) => {}
844                Either::Right(_) => panic!("wrong timer fired"),
845            }
846        });
847    }
848
849    #[test]
850    fn shorter_fires_first_multithreaded() {
851        test_shorter_fires_first_multithreaded::<MonotonicInstant>();
852        test_shorter_fires_first_multithreaded::<BootInstant>();
853    }
854
855    fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
856        let mut exec = TestExecutor::new();
857        let now = T::now();
858        let before = pin!(Timer::new(T::from_nanos(now - 1)));
859        let after = pin!(Timer::new(T::from_nanos(now + 1)));
860        assert_matches!(
861            exec.run_singlethreaded(futures::future::select(before, after)),
862            Either::Left(_),
863            "Timer in the past should fire first"
864        );
865    }
866
867    #[test]
868    fn timer_before_now_fires_immediately() {
869        test_timer_before_now_fires_immediately::<MonotonicInstant>();
870        test_timer_before_now_fires_immediately::<BootInstant>();
871    }
872
873    #[test]
874    fn fires_after_timeout() {
875        let mut exec = TestExecutor::new_with_fake_time();
876        exec.set_fake_time(MonotonicInstant::from_nanos(0));
877        let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
878        let mut future = pin!(Timer::new(deadline));
879        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
880        exec.set_fake_time(deadline);
881        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
882    }
883
884    #[test]
885    fn interval() {
886        let mut exec = TestExecutor::new_with_fake_time();
887        let start = MonotonicInstant::from_nanos(0);
888        exec.set_fake_time(start);
889
890        let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
891        let mut future = pin!({
892            let counter = counter.clone();
893            Interval::new(MonotonicDuration::from_seconds(5))
894                .map(move |()| {
895                    counter.fetch_add(1, Ordering::SeqCst);
896                })
897                .collect::<()>()
898        });
899
900        // PollResult for the first time before the timer runs
901        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
902        assert_eq!(0, counter.load(Ordering::SeqCst));
903
904        // Pretend to wait until the next timer
905        let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
906        assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
907        exec.set_fake_time(first_deadline);
908        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
909        assert_eq!(1, counter.load(Ordering::SeqCst));
910
911        // PollResulting again before the timer runs shouldn't produce another item from the stream
912        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
913        assert_eq!(1, counter.load(Ordering::SeqCst));
914
915        // "Wait" until the next timeout and poll again: expect another item from the stream
916        let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
917        exec.set_fake_time(second_deadline);
918        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
919        assert_eq!(2, counter.load(Ordering::SeqCst));
920
921        assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
922    }
923
924    #[test]
925    fn timer_fake_time() {
926        let mut exec = TestExecutor::new_with_fake_time();
927        exec.set_fake_time(MonotonicInstant::from_nanos(0));
928
929        let mut timer =
930            pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
931        assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
932
933        exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
934        assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
935    }
936
937    fn create_timers(
938        timers: &Arc<Timers<MonotonicInstant>>,
939        nanos: &[i64],
940        timer_futures: &mut Vec<Pin<Box<Timer>>>,
941    ) {
942        let waker = futures::task::noop_waker();
943        let mut cx = Context::from_waker(&waker);
944        for &n in nanos {
945            let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
946            let _ = timer.poll_unpin(&mut cx);
947            timer_futures.push(timer);
948        }
949    }
950
951    #[test]
952    fn timer_heap() {
953        let _exec = TestExecutor::new_with_fake_time();
954        let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
955        timers.register(EHandle::local().inner());
956
957        let mut timer_futures = Vec::new();
958        let mut nanos: Vec<_> = (0..1000).collect();
959        let mut rng = rng();
960        nanos.shuffle(&mut rng);
961
962        create_timers(&timers, &nanos, &mut timer_futures);
963
964        // Make sure the timers fire in the correct order.
965        for i in 0..1000 {
966            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
967        }
968
969        timer_futures.clear();
970        create_timers(&timers, &nanos, &mut timer_futures);
971
972        // Remove half of them in random order, and ensure the remaining timers are correctly
973        // ordered.
974        timer_futures.shuffle(&mut rng);
975        timer_futures.truncate(500);
976        let mut last_time = None;
977        for _ in 0..500 {
978            let time = timers.wake_next_timer().unwrap();
979            if let Some(last_time) = last_time {
980                assert!(last_time <= time);
981            }
982            last_time = Some(time);
983        }
984        assert_eq!(timers.wake_next_timer(), None);
985
986        timer_futures = vec![];
987        create_timers(&timers, &nanos, &mut timer_futures);
988
989        // Replace them all in random order.
990        timer_futures.shuffle(&mut rng);
991        let mut nanos: Vec<_> = (1000..2000).collect();
992        nanos.shuffle(&mut rng);
993
994        for (fut, n) in timer_futures.iter_mut().zip(nanos) {
995            fut.as_mut().reset(MonotonicInstant::from_nanos(n));
996        }
997
998        // Check they all get changed and now fire in the correct order.
999        for i in 1000..2000 {
1000            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
1001        }
1002
1003        timers.deregister();
1004    }
1005
1006    #[test]
1007    fn timer_heap_with_same_time() {
1008        let _exec = TestExecutor::new_with_fake_time();
1009        let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
1010        timers.register(EHandle::local().inner());
1011
1012        let mut timer_futures = Vec::new();
1013        let mut nanos: Vec<_> = (1..100).collect();
1014        let mut rng = rng();
1015        nanos.shuffle(&mut rng);
1016
1017        create_timers(&timers, &nanos, &mut timer_futures);
1018
1019        // Create some timers with the same time.
1020        let time = rng.random_range(0..101);
1021        let same_time = [time; 100];
1022        create_timers(&timers, &same_time, &mut timer_futures);
1023
1024        nanos.extend(&same_time);
1025        nanos.sort();
1026
1027        for n in nanos {
1028            assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1029        }
1030
1031        timers.deregister();
1032    }
1033
1034    #[test]
1035    fn timer_reset_to_earlier_time() {
1036        let mut exec = LocalExecutor::new();
1037
1038        for _ in 0..100 {
1039            let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1040            let (sender, receiver) = channel();
1041            let task = Task::spawn(async move {
1042                let mut timer = pin!(Timer::new(instant));
1043                let mut receiver = pin!(receiver.fuse());
1044                poll_fn(|cx| loop {
1045                    if timer.as_mut().poll_unpin(cx).is_ready() {
1046                        return Poll::Ready(());
1047                    }
1048                    if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1049                        timer
1050                            .as_mut()
1051                            .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1052                    } else {
1053                        return Poll::Pending;
1054                    }
1055                })
1056                .await;
1057            });
1058            sender.send(()).unwrap();
1059
1060            exec.run_singlethreaded(task);
1061
1062            if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1063                return;
1064            }
1065        }
1066
1067        panic!("Timer fired late in all 100 attempts");
1068    }
1069
1070    #[test]
1071    fn test_reset() {
1072        // This is a test for https://fxbug.dev/418235546.
1073        SendExecutor::new(2).run(async {
1074            const TIMER_DELAY: zx::MonotonicDuration = zx::Duration::from_micros(100);
1075            let mut timer = pin!(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1076            for _ in 0..10000 {
1077                let _ = futures::poll!(timer.as_mut());
1078                std::thread::sleep(std::time::Duration::from_micros(rand::random_range(80..120)));
1079                timer.as_mut().reset(MonotonicInstant::after(TIMER_DELAY));
1080                timer.set(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1081            }
1082        });
1083    }
1084}