fuchsia_async/runtime/fuchsia/
timer.rs1use super::executor::Executor;
11use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
12use crate::{PacketReceiver, ReceiverRegistration};
13use fuchsia_sync::Mutex;
14
15use futures::future::FusedFuture;
16use futures::stream::FusedStream;
17use futures::task::{AtomicWaker, Context};
18use futures::{FutureExt, Stream};
19use std::cell::UnsafeCell;
20use std::fmt;
21use std::future::Future;
22use std::marker::PhantomPinned;
23use std::pin::Pin;
24use std::sync::atomic::{AtomicU8, Ordering};
25use std::sync::Arc;
26use std::task::{ready, Poll, Waker};
27use zx::AsHandleRef as _;
28
29pub trait TimeInterface:
30 Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
31{
32 type Timeline: zx::Timeline + Send + Sync + 'static;
33
34 fn from_nanos(nanos: i64) -> Self;
35 fn into_nanos(self) -> i64;
36 fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
37 fn now() -> i64;
38 fn create_timer() -> zx::Timer<Self::Timeline>;
39}
40
41impl TimeInterface for MonotonicInstant {
42 type Timeline = zx::MonotonicTimeline;
43
44 fn from_nanos(nanos: i64) -> Self {
45 Self::from_nanos(nanos)
46 }
47
48 fn into_nanos(self) -> i64 {
49 self.into_nanos()
50 }
51
52 fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
53 zx::MonotonicInstant::from_nanos(nanos)
54 }
55
56 fn now() -> i64 {
57 EHandle::local().inner().now().into_nanos()
58 }
59
60 fn create_timer() -> zx::Timer<Self::Timeline> {
61 zx::Timer::<Self::Timeline>::create()
62 }
63}
64
65impl TimeInterface for BootInstant {
66 type Timeline = zx::BootTimeline;
67
68 fn from_nanos(nanos: i64) -> Self {
69 Self::from_nanos(nanos)
70 }
71
72 fn into_nanos(self) -> i64 {
73 self.into_nanos()
74 }
75
76 fn zx_instant(nanos: i64) -> zx::BootInstant {
77 zx::BootInstant::from_nanos(nanos)
78 }
79
80 fn now() -> i64 {
81 EHandle::local().inner().boot_now().into_nanos()
82 }
83
84 fn create_timer() -> zx::Timer<Self::Timeline> {
85 zx::Timer::<Self::Timeline>::create()
86 }
87}
88
89impl WakeupTime for std::time::Instant {
90 fn into_timer(self) -> Timer {
91 let now_as_instant = std::time::Instant::now();
92 let now_as_time = MonotonicInstant::now();
93 EHandle::local()
94 .mono_timers()
95 .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
96 }
97}
98
99impl WakeupTime for MonotonicInstant {
100 fn into_timer(self) -> Timer {
101 EHandle::local().mono_timers().new_timer(self)
102 }
103}
104
105impl WakeupTime for BootInstant {
106 fn into_timer(self) -> Timer {
107 EHandle::local().boot_timers().new_timer(self)
108 }
109}
110
111impl WakeupTime for zx::MonotonicInstant {
112 fn into_timer(self) -> Timer {
113 EHandle::local().mono_timers().new_timer(self.into())
114 }
115}
116
117impl WakeupTime for zx::BootInstant {
118 fn into_timer(self) -> Timer {
119 EHandle::local().boot_timers().new_timer(self.into())
120 }
121}
122
123#[must_use = "futures do nothing unless polled"]
125pub struct Timer(TimerState);
126
127impl Timer {
128 pub fn new(time: impl WakeupTime) -> Self {
130 time.into_timer()
131 }
132
133 pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
135 let nanos = time.into_nanos();
136 if self.0.state.load(Ordering::Relaxed) == UNREGISTERED
147 || !self.0.timers.try_reset_timer(&self.0, nanos)
148 {
149 unsafe { *self.0.nanos.get() = nanos };
152 self.0.state.store(UNREGISTERED, Ordering::Relaxed);
153 }
154 }
155}
156
157impl fmt::Debug for Timer {
158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
159 f.debug_struct("Timer").field("time", &self.0.nanos).finish()
160 }
161}
162
163impl Drop for Timer {
164 fn drop(&mut self) {
165 self.0.timers.unregister(&self.0);
166 }
167}
168
169impl Future for Timer {
170 type Output = ();
171 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
172 unsafe { self.0.timers.poll(self.as_ref(), cx) }
174 }
175}
176
177struct TimerState {
178 timers: Arc<dyn TimersInterface>,
179
180 nanos: UnsafeCell<i64>,
182
183 waker: AtomicWaker,
184 state: AtomicU8,
185
186 index: UnsafeCell<HeapIndex>,
189
190 _pinned: PhantomPinned,
192}
193
194unsafe impl Send for TimerState {}
196unsafe impl Sync for TimerState {}
197
198const UNREGISTERED: u8 = 0;
200
201const REGISTERED: u8 = 1;
203
204const FIRED: u8 = 2;
206
207const TERMINATED: u8 = 3;
209
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
212struct HeapIndex(usize);
213
214impl HeapIndex {
215 const NULL: HeapIndex = HeapIndex(usize::MAX);
216
217 fn get(&self) -> Option<usize> {
218 if *self == HeapIndex::NULL {
219 None
220 } else {
221 Some(self.0)
222 }
223 }
224}
225
226impl From<usize> for HeapIndex {
227 fn from(value: usize) -> Self {
228 Self(value)
229 }
230}
231
232impl FusedFuture for Timer {
233 fn is_terminated(&self) -> bool {
234 self.0.state.load(Ordering::Relaxed) == TERMINATED
235 }
236}
237
238#[derive(Copy, Clone, Debug)]
249struct StateRef(*const TimerState);
250
251unsafe impl Send for StateRef {}
253unsafe impl Sync for StateRef {}
254
255impl StateRef {
256 fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
257 unsafe {
259 (*self.0).state.store(FIRED, Ordering::Relaxed);
266 (*self.0).waker.take()
267 }
268 }
269
270 unsafe fn nanos(&self) -> i64 {
274 *(*self.0).nanos.get()
275 }
276
277 unsafe fn nanos_mut(&mut self) -> &mut i64 {
281 &mut *(*self.0).nanos.get()
282 }
283
284 unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
288 std::mem::replace(&mut *(*self.0).index.get(), index)
289 }
290}
291
292#[derive(Debug)]
299#[must_use = "streams do nothing unless polled"]
300pub struct Interval {
301 timer: Pin<Box<Timer>>,
302 next: MonotonicInstant,
303 duration: zx::MonotonicDuration,
304}
305
306impl Interval {
307 pub fn new(duration: zx::MonotonicDuration) -> Self {
309 let next = MonotonicInstant::after(duration);
310 Interval { timer: Box::pin(Timer::new(next)), next, duration }
311 }
312}
313
314impl FusedStream for Interval {
315 fn is_terminated(&self) -> bool {
316 false
318 }
319}
320
321impl Stream for Interval {
322 type Item = ();
323 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
324 ready!(self.timer.poll_unpin(cx));
325 let next = self.next + self.duration;
326 self.timer.as_mut().reset(next);
327 self.next = next;
328 Poll::Ready(Some(()))
329 }
330}
331
332pub(crate) struct Timers<T: TimeInterface> {
333 inner: Mutex<Inner>,
334
335 fake: bool,
336
337 timer: zx::Timer<T::Timeline>,
338
339 receiver_registration: Mutex<Option<ReceiverRegistration<Arc<Self>>>>,
341}
342
343struct Inner {
344 timers: Heap,
346
347 last_deadline: Option<i64>,
350
351 async_wait: bool,
353
354 port_key: u64,
356}
357
358impl<T: TimeInterface> Timers<T> {
359 pub fn new(fake: bool) -> Self {
360 Self {
361 inner: Mutex::new(Inner {
362 timers: Heap::default(),
363 last_deadline: None,
364 async_wait: false,
365 port_key: 0,
366 }),
367 fake,
368 timer: T::create_timer(),
369 receiver_registration: Mutex::default(),
370 }
371 }
372
373 pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
374 let nanos = time.into_nanos();
375 Timer(TimerState {
376 timers: self.clone(),
377 nanos: UnsafeCell::new(nanos),
378 waker: AtomicWaker::new(),
379 state: AtomicU8::new(UNREGISTERED),
380 index: UnsafeCell::new(HeapIndex::NULL),
381 _pinned: PhantomPinned,
382 })
383 }
384
385 pub fn register(self: &Arc<Self>, executor: &Arc<Executor>) {
388 let key = self
389 .receiver_registration
390 .lock()
391 .get_or_insert_with(|| executor.receivers.register(executor.clone(), self.clone()))
392 .key();
393 self.inner.lock().port_key = key;
394 }
395
396 pub fn deregister(&self) {
398 *self.receiver_registration.lock() = None;
399 }
400
401 fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
408 let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
411
412 if new_deadline != inner.last_deadline {
415 inner.last_deadline = new_deadline;
416 match inner.last_deadline {
417 Some(deadline) => {
418 self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
419 }
420 None => self.timer.cancel().unwrap(),
421 }
422 }
423
424 if from_receive_packet {
428 inner.async_wait = false;
429 }
430
431 if inner.last_deadline.is_some() && !inner.async_wait {
434 if self.fake {
435 self.timer.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
438 }
439
440 self.timer
441 .wait_async_handle(
442 EHandle::local().port(),
443 inner.port_key,
444 if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
445 zx::WaitAsyncOpts::empty(),
446 )
447 .unwrap();
448
449 inner.async_wait = true;
450 }
451 }
452
453 pub fn wake_timers(&self) -> bool {
455 self.wake_timers_impl(false)
456 }
457
458 fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
459 let now = T::now();
460 let mut timers_woken = false;
461
462 loop {
463 let waker = {
464 let mut inner = self.inner.lock();
465
466 let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
468 if deadline.is_some_and(|d| d <= now) {
469 let timer = inner.timers.pop().unwrap();
470 timer.into_waker(&mut inner)
471 } else {
472 self.setup_zircon_timer(&mut inner, from_receive_packet);
485 break;
486 }
487 };
488 if let Some(waker) = waker {
489 waker.wake()
490 }
491 timers_woken = true;
492 }
493 timers_woken
494 }
495
496 pub fn wake_next_timer(&self) -> Option<T> {
498 let (nanos, waker) = {
499 let mut inner = self.inner.lock();
500 let timer = inner.timers.pop()?;
501 let nanos = unsafe { timer.nanos() };
503 (nanos, timer.into_waker(&mut inner))
504 };
505 if let Some(waker) = waker {
506 waker.wake();
507 }
508 Some(T::from_nanos(nanos))
509 }
510
511 pub fn next_timer(&self) -> Option<T> {
513 self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
515 }
516
517 pub fn maybe_notify(&self, now: T) {
523 assert!(self.fake, "calling this function requires using fake time.");
524 if self
526 .inner
527 .lock()
528 .timers
529 .peek()
530 .is_some_and(|state| unsafe { state.nanos() } <= now.into_nanos())
531 {
532 self.timer.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
533 }
534 }
535}
536
537impl<T: TimeInterface> PacketReceiver for Timers<T> {
538 fn receive_packet(&self, _packet: zx::Packet) {
539 self.wake_timers_impl(true);
540 }
541}
542
543trait TimersInterface: Send + Sync + 'static {
545 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
546 fn unregister(&self, state: &TimerState);
547 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
548}
549
550impl<T: TimeInterface> TimersInterface for Timers<T> {
551 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
555 let state = timer.0.state.load(Ordering::Relaxed);
566
567 if state == TERMINATED {
568 return Poll::Ready(());
569 }
570
571 if state == FIRED {
572 timer.0.state.store(TERMINATED, Ordering::Relaxed);
573 return Poll::Ready(());
574 }
575
576 if state == UNREGISTERED {
577 let nanos = unsafe { *timer.0.nanos.get() };
579 if nanos <= T::now() {
580 timer.0.state.store(FIRED, Ordering::Relaxed);
581 return Poll::Ready(());
582 }
583 let mut inner = self.inner.lock();
584
585 inner.timers.push(StateRef(&timer.0));
588
589 self.setup_zircon_timer(&mut inner, false);
592
593 timer.0.state.store(REGISTERED, Ordering::Relaxed);
594 }
595
596 timer.0.waker.register(cx.waker());
597
598 let state = timer.0.state.load(Ordering::Relaxed);
606 match state {
607 FIRED => {
608 timer.0.state.store(TERMINATED, Ordering::Relaxed);
609 Poll::Ready(())
610 }
611 REGISTERED => Poll::Pending,
612 _ => {
616 unreachable!();
617 }
618 }
619 }
620
621 fn unregister(&self, timer: &TimerState) {
622 if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
623 return;
630 }
631 let mut inner = self.inner.lock();
632 let index = unsafe { *timer.index.get() };
634 if let Some(index) = index.get() {
635 inner.timers.remove(index);
636 if index == 0 {
637 self.setup_zircon_timer(&mut inner, false);
640 }
641 timer.state.store(UNREGISTERED, Ordering::Relaxed);
642 }
643 }
644
645 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
647 let mut inner = self.inner.lock();
648 let index = unsafe { *timer.index.get() };
650 if let Some(old_index) = index.get() {
651 if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
652 self.setup_zircon_timer(&mut inner, false);
655 }
656 timer.state.store(REGISTERED, Ordering::Relaxed);
657 true
658 } else {
659 false
660 }
661 }
662}
663
664#[derive(Default)]
665struct Heap(Vec<StateRef>);
666
667impl Heap {
670 fn push(&mut self, mut timer: StateRef) {
671 let index = self.0.len();
672 self.0.push(timer);
673 unsafe {
675 timer.set_index(index.into());
676 }
677 self.fix_up(index);
678 }
679
680 fn peek(&self) -> Option<&StateRef> {
681 self.0.first()
682 }
683
684 fn pop(&mut self) -> Option<StateRef> {
685 if let Some(&first) = self.0.first() {
686 self.remove(0);
687 Some(first)
688 } else {
689 None
690 }
691 }
692
693 fn swap(&mut self, a: usize, b: usize) {
694 self.0.swap(a, b);
695 unsafe {
697 self.0[a].set_index(a.into());
698 self.0[b].set_index(b.into());
699 }
700 }
701
702 fn reset(&mut self, index: usize, nanos: i64) -> usize {
704 if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
706 self.fix_up(index)
707 } else {
708 self.fix_down(index)
709 }
710 }
711
712 fn remove(&mut self, index: usize) {
713 unsafe {
715 let old_index = self.0[index].set_index(HeapIndex::NULL);
716 debug_assert_eq!(old_index, index.into());
717 }
718
719 let last = self.0.len() - 1;
722 if index < last {
723 let fix_up;
724 unsafe {
725 fix_up = self.0[last].nanos() < self.0[index].nanos();
727 self.0[index] = self.0[last];
728 self.0[index].set_index(index.into());
729 };
730 self.0.truncate(last);
731 if fix_up {
732 self.fix_up(index);
733 } else {
734 self.fix_down(index);
735 }
736 } else {
737 self.0.truncate(last);
738 };
739 }
740
741 fn fix_up(&mut self, mut index: usize) -> usize {
743 while index > 0 {
744 let parent = (index - 1) / 2;
745 if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
747 return index;
748 }
749 self.swap(parent, index);
750 index = parent;
751 }
752 index
753 }
754
755 fn fix_down(&mut self, mut index: usize) -> usize {
757 let len = self.0.len();
758 loop {
759 let left = index * 2 + 1;
760 if left >= len {
761 return index;
762 }
763
764 let mut swap_with = None;
765
766 unsafe {
768 let mut nanos = self.0[index].nanos();
769 let left_nanos = self.0[left].nanos();
770 if left_nanos < nanos {
771 swap_with = Some(left);
772 nanos = left_nanos;
773 }
774 let right = left + 1;
775 if right < len && self.0[right].nanos() < nanos {
776 swap_with = Some(right);
777 }
778 }
779
780 let Some(swap_with) = swap_with else { return index };
781 self.swap(index, swap_with);
782 index = swap_with;
783 }
784 }
785}
786
787#[cfg(test)]
788mod test {
789 use super::*;
790 use crate::{LocalExecutor, SendExecutor, Task, TestExecutor};
791 use assert_matches::assert_matches;
792 use futures::channel::oneshot::channel;
793 use futures::future::Either;
794 use futures::prelude::*;
795 use rand::seq::SliceRandom;
796 use rand::{rng, Rng};
797 use std::future::poll_fn;
798 use std::pin::pin;
799 use zx::MonotonicDuration;
800
801 trait TestTimeInterface:
802 TimeInterface
803 + WakeupTime
804 + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
805 + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
806 {
807 fn after(duration: zx::Duration<Self::Timeline>) -> Self;
808 }
809
810 impl TestTimeInterface for MonotonicInstant {
811 fn after(duration: zx::MonotonicDuration) -> Self {
812 Self::after(duration)
813 }
814 }
815
816 impl TestTimeInterface for BootInstant {
817 fn after(duration: zx::BootDuration) -> Self {
818 Self::after(duration)
819 }
820 }
821
822 fn test_shorter_fires_first<T: TestTimeInterface>() {
823 let mut exec = LocalExecutor::new();
824 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
825 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
826 match exec.run_singlethreaded(future::select(shorter, longer)) {
827 Either::Left(_) => {}
828 Either::Right(_) => panic!("wrong timer fired"),
829 }
830 }
831
832 #[test]
833 fn shorter_fires_first() {
834 test_shorter_fires_first::<MonotonicInstant>();
835 test_shorter_fires_first::<BootInstant>();
836 }
837
838 fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
839 SendExecutor::new(4).run(async {
840 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
841 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
842 match future::select(shorter, longer).await {
843 Either::Left(_) => {}
844 Either::Right(_) => panic!("wrong timer fired"),
845 }
846 });
847 }
848
849 #[test]
850 fn shorter_fires_first_multithreaded() {
851 test_shorter_fires_first_multithreaded::<MonotonicInstant>();
852 test_shorter_fires_first_multithreaded::<BootInstant>();
853 }
854
855 fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
856 let mut exec = TestExecutor::new();
857 let now = T::now();
858 let before = pin!(Timer::new(T::from_nanos(now - 1)));
859 let after = pin!(Timer::new(T::from_nanos(now + 1)));
860 assert_matches!(
861 exec.run_singlethreaded(futures::future::select(before, after)),
862 Either::Left(_),
863 "Timer in the past should fire first"
864 );
865 }
866
867 #[test]
868 fn timer_before_now_fires_immediately() {
869 test_timer_before_now_fires_immediately::<MonotonicInstant>();
870 test_timer_before_now_fires_immediately::<BootInstant>();
871 }
872
873 #[test]
874 fn fires_after_timeout() {
875 let mut exec = TestExecutor::new_with_fake_time();
876 exec.set_fake_time(MonotonicInstant::from_nanos(0));
877 let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
878 let mut future = pin!(Timer::new(deadline));
879 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
880 exec.set_fake_time(deadline);
881 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
882 }
883
884 #[test]
885 fn interval() {
886 let mut exec = TestExecutor::new_with_fake_time();
887 let start = MonotonicInstant::from_nanos(0);
888 exec.set_fake_time(start);
889
890 let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
891 let mut future = pin!({
892 let counter = counter.clone();
893 Interval::new(MonotonicDuration::from_seconds(5))
894 .map(move |()| {
895 counter.fetch_add(1, Ordering::SeqCst);
896 })
897 .collect::<()>()
898 });
899
900 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
902 assert_eq!(0, counter.load(Ordering::SeqCst));
903
904 let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
906 assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
907 exec.set_fake_time(first_deadline);
908 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
909 assert_eq!(1, counter.load(Ordering::SeqCst));
910
911 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
913 assert_eq!(1, counter.load(Ordering::SeqCst));
914
915 let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
917 exec.set_fake_time(second_deadline);
918 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
919 assert_eq!(2, counter.load(Ordering::SeqCst));
920
921 assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
922 }
923
924 #[test]
925 fn timer_fake_time() {
926 let mut exec = TestExecutor::new_with_fake_time();
927 exec.set_fake_time(MonotonicInstant::from_nanos(0));
928
929 let mut timer =
930 pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
931 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
932
933 exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
934 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
935 }
936
937 fn create_timers(
938 timers: &Arc<Timers<MonotonicInstant>>,
939 nanos: &[i64],
940 timer_futures: &mut Vec<Pin<Box<Timer>>>,
941 ) {
942 let waker = futures::task::noop_waker();
943 let mut cx = Context::from_waker(&waker);
944 for &n in nanos {
945 let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
946 let _ = timer.poll_unpin(&mut cx);
947 timer_futures.push(timer);
948 }
949 }
950
951 #[test]
952 fn timer_heap() {
953 let _exec = TestExecutor::new_with_fake_time();
954 let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
955 timers.register(EHandle::local().inner());
956
957 let mut timer_futures = Vec::new();
958 let mut nanos: Vec<_> = (0..1000).collect();
959 let mut rng = rng();
960 nanos.shuffle(&mut rng);
961
962 create_timers(&timers, &nanos, &mut timer_futures);
963
964 for i in 0..1000 {
966 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
967 }
968
969 timer_futures.clear();
970 create_timers(&timers, &nanos, &mut timer_futures);
971
972 timer_futures.shuffle(&mut rng);
975 timer_futures.truncate(500);
976 let mut last_time = None;
977 for _ in 0..500 {
978 let time = timers.wake_next_timer().unwrap();
979 if let Some(last_time) = last_time {
980 assert!(last_time <= time);
981 }
982 last_time = Some(time);
983 }
984 assert_eq!(timers.wake_next_timer(), None);
985
986 timer_futures = vec![];
987 create_timers(&timers, &nanos, &mut timer_futures);
988
989 timer_futures.shuffle(&mut rng);
991 let mut nanos: Vec<_> = (1000..2000).collect();
992 nanos.shuffle(&mut rng);
993
994 for (fut, n) in timer_futures.iter_mut().zip(nanos) {
995 fut.as_mut().reset(MonotonicInstant::from_nanos(n));
996 }
997
998 for i in 1000..2000 {
1000 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
1001 }
1002
1003 timers.deregister();
1004 }
1005
1006 #[test]
1007 fn timer_heap_with_same_time() {
1008 let _exec = TestExecutor::new_with_fake_time();
1009 let timers = Arc::new(Timers::<MonotonicInstant>::new(true));
1010 timers.register(EHandle::local().inner());
1011
1012 let mut timer_futures = Vec::new();
1013 let mut nanos: Vec<_> = (1..100).collect();
1014 let mut rng = rng();
1015 nanos.shuffle(&mut rng);
1016
1017 create_timers(&timers, &nanos, &mut timer_futures);
1018
1019 let time = rng.random_range(0..101);
1021 let same_time = [time; 100];
1022 create_timers(&timers, &same_time, &mut timer_futures);
1023
1024 nanos.extend(&same_time);
1025 nanos.sort();
1026
1027 for n in nanos {
1028 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1029 }
1030
1031 timers.deregister();
1032 }
1033
1034 #[test]
1035 fn timer_reset_to_earlier_time() {
1036 let mut exec = LocalExecutor::new();
1037
1038 for _ in 0..100 {
1039 let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1040 let (sender, receiver) = channel();
1041 let task = Task::spawn(async move {
1042 let mut timer = pin!(Timer::new(instant));
1043 let mut receiver = pin!(receiver.fuse());
1044 poll_fn(|cx| loop {
1045 if timer.as_mut().poll_unpin(cx).is_ready() {
1046 return Poll::Ready(());
1047 }
1048 if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1049 timer
1050 .as_mut()
1051 .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1052 } else {
1053 return Poll::Pending;
1054 }
1055 })
1056 .await;
1057 });
1058 sender.send(()).unwrap();
1059
1060 exec.run_singlethreaded(task);
1061
1062 if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1063 return;
1064 }
1065 }
1066
1067 panic!("Timer fired late in all 100 attempts");
1068 }
1069
1070 #[test]
1071 fn test_reset() {
1072 SendExecutor::new(2).run(async {
1074 const TIMER_DELAY: zx::MonotonicDuration = zx::Duration::from_micros(100);
1075 let mut timer = pin!(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1076 for _ in 0..10000 {
1077 let _ = futures::poll!(timer.as_mut());
1078 std::thread::sleep(std::time::Duration::from_micros(rand::random_range(80..120)));
1079 timer.as_mut().reset(MonotonicInstant::after(TIMER_DELAY));
1080 timer.set(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1081 }
1082 });
1083 }
1084}