fuchsia_async/runtime/fuchsia/
timer.rs1use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
11use crate::PacketReceiver;
12use fuchsia_sync::Mutex;
13
14use futures::future::FusedFuture;
15use futures::stream::FusedStream;
16use futures::task::{AtomicWaker, Context};
17use futures::{FutureExt, Stream};
18use std::cell::UnsafeCell;
19use std::fmt;
20use std::future::Future;
21use std::marker::PhantomPinned;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicU8, Ordering};
24use std::sync::Arc;
25use std::task::{ready, Poll, Waker};
26use zx::AsHandleRef as _;
27
28pub trait TimeInterface:
29 Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
30{
31 type Timeline: zx::Timeline + Send + Sync + 'static;
32
33 fn from_nanos(nanos: i64) -> Self;
34 fn into_nanos(self) -> i64;
35 fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
36 fn now() -> i64;
37 fn create_timer() -> zx::Timer<Self::Timeline>;
38}
39
40impl TimeInterface for MonotonicInstant {
41 type Timeline = zx::MonotonicTimeline;
42
43 fn from_nanos(nanos: i64) -> Self {
44 Self::from_nanos(nanos)
45 }
46
47 fn into_nanos(self) -> i64 {
48 self.into_nanos()
49 }
50
51 fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
52 zx::MonotonicInstant::from_nanos(nanos)
53 }
54
55 fn now() -> i64 {
56 EHandle::local().inner().now().into_nanos()
57 }
58
59 fn create_timer() -> zx::Timer<Self::Timeline> {
60 zx::Timer::<Self::Timeline>::create()
61 }
62}
63
64impl TimeInterface for BootInstant {
65 type Timeline = zx::BootTimeline;
66
67 fn from_nanos(nanos: i64) -> Self {
68 Self::from_nanos(nanos)
69 }
70
71 fn into_nanos(self) -> i64 {
72 self.into_nanos()
73 }
74
75 fn zx_instant(nanos: i64) -> zx::BootInstant {
76 zx::BootInstant::from_nanos(nanos)
77 }
78
79 fn now() -> i64 {
80 EHandle::local().inner().boot_now().into_nanos()
81 }
82
83 fn create_timer() -> zx::Timer<Self::Timeline> {
84 zx::Timer::<Self::Timeline>::create()
85 }
86}
87
88impl WakeupTime for std::time::Instant {
89 fn into_timer(self) -> Timer {
90 let now_as_instant = std::time::Instant::now();
91 let now_as_time = MonotonicInstant::now();
92 EHandle::local()
93 .mono_timers()
94 .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
95 }
96}
97
98impl WakeupTime for MonotonicInstant {
99 fn into_timer(self) -> Timer {
100 EHandle::local().mono_timers().new_timer(self)
101 }
102}
103
104impl WakeupTime for BootInstant {
105 fn into_timer(self) -> Timer {
106 EHandle::local().boot_timers().new_timer(self)
107 }
108}
109
110impl WakeupTime for zx::MonotonicInstant {
111 fn into_timer(self) -> Timer {
112 EHandle::local().mono_timers().new_timer(self.into())
113 }
114}
115
116impl WakeupTime for zx::BootInstant {
117 fn into_timer(self) -> Timer {
118 EHandle::local().boot_timers().new_timer(self.into())
119 }
120}
121
122#[must_use = "futures do nothing unless polled"]
124pub struct Timer(TimerState);
125
126impl Timer {
127 pub fn new(time: impl WakeupTime) -> Self {
129 time.into_timer()
130 }
131
132 pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
134 let nanos = time.into_nanos();
135 if self.0.state.load(Ordering::Relaxed) != REGISTERED
140 || !self.0.timers.try_reset_timer(&self.0, nanos)
141 {
142 unsafe { *self.0.nanos.get() = nanos };
145 self.0.state.store(UNREGISTERED, Ordering::Relaxed);
146 }
147 }
148}
149
150impl fmt::Debug for Timer {
151 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
152 f.debug_struct("Timer").field("time", &self.0.nanos).finish()
153 }
154}
155
156impl Drop for Timer {
157 fn drop(&mut self) {
158 self.0.timers.unregister(&self.0);
159 }
160}
161
162impl Future for Timer {
163 type Output = ();
164 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
165 unsafe { self.0.timers.poll(self.as_ref(), cx) }
167 }
168}
169
170struct TimerState {
171 timers: Arc<dyn TimersInterface>,
172
173 nanos: UnsafeCell<i64>,
175
176 waker: AtomicWaker,
177 state: AtomicU8,
178
179 index: UnsafeCell<HeapIndex>,
182
183 _pinned: PhantomPinned,
185}
186
187unsafe impl Send for TimerState {}
189unsafe impl Sync for TimerState {}
190
191const UNREGISTERED: u8 = 0;
193
194const REGISTERED: u8 = 1;
196
197const FIRED: u8 = 2;
199
200const TERMINATED: u8 = 3;
202
203#[derive(Clone, Copy, Debug, Eq, PartialEq)]
205struct HeapIndex(usize);
206
207impl HeapIndex {
208 const NULL: HeapIndex = HeapIndex(usize::MAX);
209
210 fn get(&self) -> Option<usize> {
211 if *self == HeapIndex::NULL {
212 None
213 } else {
214 Some(self.0)
215 }
216 }
217}
218
219impl From<usize> for HeapIndex {
220 fn from(value: usize) -> Self {
221 Self(value)
222 }
223}
224
225impl FusedFuture for Timer {
226 fn is_terminated(&self) -> bool {
227 self.0.state.load(Ordering::Relaxed) == TERMINATED
228 }
229}
230
231#[derive(Copy, Clone, Debug)]
242struct StateRef(*const TimerState);
243
244unsafe impl Send for StateRef {}
246unsafe impl Sync for StateRef {}
247
248impl StateRef {
249 fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
250 unsafe {
252 (*self.0).state.store(FIRED, Ordering::Relaxed);
259 (*self.0).waker.take()
260 }
261 }
262
263 unsafe fn nanos(&self) -> i64 {
267 *(*self.0).nanos.get()
268 }
269
270 unsafe fn nanos_mut(&mut self) -> &mut i64 {
274 &mut *(*self.0).nanos.get()
275 }
276
277 unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
281 std::mem::replace(&mut *(*self.0).index.get(), index)
282 }
283}
284
285#[derive(Debug)]
292#[must_use = "streams do nothing unless polled"]
293pub struct Interval {
294 timer: Pin<Box<Timer>>,
295 next: MonotonicInstant,
296 duration: zx::MonotonicDuration,
297}
298
299impl Interval {
300 pub fn new(duration: zx::MonotonicDuration) -> Self {
302 let next = MonotonicInstant::after(duration);
303 Interval { timer: Box::pin(Timer::new(next)), next, duration }
304 }
305}
306
307impl FusedStream for Interval {
308 fn is_terminated(&self) -> bool {
309 false
311 }
312}
313
314impl Stream for Interval {
315 type Item = ();
316 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
317 ready!(self.timer.poll_unpin(cx));
318 let next = self.next + self.duration;
319 self.timer.as_mut().reset(next);
320 self.next = next;
321 Poll::Ready(Some(()))
322 }
323}
324
325pub(crate) struct Timers<T: TimeInterface> {
326 inner: Mutex<Inner>,
327
328 port_key: u64,
333
334 fake: bool,
335
336 timer: zx::Timer<T::Timeline>,
337}
338
339struct Inner {
340 timers: Heap,
342
343 last_deadline: Option<i64>,
346
347 async_wait: bool,
349}
350
351impl<T: TimeInterface> Timers<T> {
352 pub fn new(port_key: u64, fake: bool) -> Self {
353 Self {
354 inner: Mutex::new(Inner {
355 timers: Heap::default(),
356 last_deadline: None,
357 async_wait: false,
358 }),
359 port_key,
360 fake,
361 timer: T::create_timer(),
362 }
363 }
364
365 pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
366 let nanos = time.into_nanos();
367 Timer(TimerState {
368 timers: self.clone(),
369 nanos: UnsafeCell::new(nanos),
370 waker: AtomicWaker::new(),
371 state: AtomicU8::new(UNREGISTERED),
372 index: UnsafeCell::new(HeapIndex::NULL),
373 _pinned: PhantomPinned,
374 })
375 }
376
377 fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
384 let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
387
388 if new_deadline != inner.last_deadline {
391 inner.last_deadline = new_deadline;
392 match inner.last_deadline {
393 Some(deadline) => {
394 self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
395 }
396 None => self.timer.cancel().unwrap(),
397 }
398 }
399
400 if from_receive_packet {
404 inner.async_wait = false;
405 }
406
407 if inner.last_deadline.is_some() && !inner.async_wait {
410 if self.fake {
411 self.timer.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
414 }
415
416 self.timer
417 .wait_async_handle(
418 EHandle::local().port(),
419 self.port_key,
420 if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
421 zx::WaitAsyncOpts::empty(),
422 )
423 .unwrap();
424
425 inner.async_wait = true;
426 }
427 }
428
429 pub fn port_key(&self) -> u64 {
430 self.port_key
431 }
432
433 pub fn wake_timers(&self) -> bool {
435 self.wake_timers_impl(false)
436 }
437
438 fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
439 let now = T::now();
440 let mut timers_woken = false;
441
442 loop {
443 let waker = {
444 let mut inner = self.inner.lock();
445
446 let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
448 if deadline.is_some_and(|d| d <= now) {
449 let timer = inner.timers.pop().unwrap();
450 timer.into_waker(&mut inner)
451 } else {
452 self.setup_zircon_timer(&mut inner, from_receive_packet);
465 break;
466 }
467 };
468 if let Some(waker) = waker {
469 waker.wake()
470 }
471 timers_woken = true;
472 }
473 timers_woken
474 }
475
476 pub fn wake_next_timer(&self) -> Option<T> {
478 let (nanos, waker) = {
479 let mut inner = self.inner.lock();
480 let Some(timer) = inner.timers.pop() else { return None };
481 let nanos = unsafe { timer.nanos() };
483 (nanos, timer.into_waker(&mut inner))
484 };
485 if let Some(waker) = waker {
486 waker.wake();
487 }
488 Some(T::from_nanos(nanos))
489 }
490
491 pub fn next_timer(&self) -> Option<T> {
493 self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
495 }
496
497 pub fn maybe_notify(&self, now: T) {
503 assert!(self.fake, "calling this function requires using fake time.");
504 if self
506 .inner
507 .lock()
508 .timers
509 .peek()
510 .map_or(false, |state| unsafe { state.nanos() } <= now.into_nanos())
511 {
512 self.timer.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
513 }
514 }
515}
516
517impl<T: TimeInterface> PacketReceiver for Timers<T> {
518 fn receive_packet(&self, _packet: zx::Packet) {
519 self.wake_timers_impl(true);
520 }
521}
522
523trait TimersInterface: Send + Sync + 'static {
525 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
526 fn unregister(&self, state: &TimerState);
527 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
528}
529
530impl<T: TimeInterface> TimersInterface for Timers<T> {
531 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
535 let state = timer.0.state.load(Ordering::Relaxed);
546
547 if state == TERMINATED {
548 return Poll::Ready(());
549 }
550
551 if state == FIRED {
552 timer.0.state.store(TERMINATED, Ordering::Relaxed);
553 return Poll::Ready(());
554 }
555
556 if state == UNREGISTERED {
557 let nanos = unsafe { *timer.0.nanos.get() };
559 if nanos <= T::now() {
560 timer.0.state.store(FIRED, Ordering::Relaxed);
561 return Poll::Ready(());
562 }
563 let mut inner = self.inner.lock();
564
565 inner.timers.push(StateRef(&timer.0));
568
569 self.setup_zircon_timer(&mut inner, false);
572
573 timer.0.state.store(REGISTERED, Ordering::Relaxed);
574 }
575
576 timer.0.waker.register(cx.waker());
577
578 let state = timer.0.state.load(Ordering::Relaxed);
586 match state {
587 FIRED => {
588 timer.0.state.store(TERMINATED, Ordering::Relaxed);
589 Poll::Ready(())
590 }
591 REGISTERED => Poll::Pending,
592 _ => {
596 unreachable!();
597 }
598 }
599 }
600
601 fn unregister(&self, timer: &TimerState) {
602 if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
603 return;
610 }
611 let mut inner = self.inner.lock();
612 let index = unsafe { *timer.index.get() };
614 if let Some(index) = index.get() {
615 inner.timers.remove(index);
616 if index == 0 {
617 self.setup_zircon_timer(&mut inner, false);
620 }
621 timer.state.store(UNREGISTERED, Ordering::Relaxed);
622 }
623 }
624
625 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
627 let mut inner = self.inner.lock();
628 let index = unsafe { *timer.index.get() };
630 if let Some(old_index) = index.get() {
631 if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
632 self.setup_zircon_timer(&mut inner, false);
635 }
636 timer.state.store(REGISTERED, Ordering::Relaxed);
637 true
638 } else {
639 false
640 }
641 }
642}
643
644#[derive(Default)]
645struct Heap(Vec<StateRef>);
646
647impl Heap {
650 fn push(&mut self, mut timer: StateRef) {
651 let index = self.0.len();
652 self.0.push(timer);
653 unsafe {
655 timer.set_index(index.into());
656 }
657 self.fix_up(index);
658 }
659
660 fn peek(&self) -> Option<&StateRef> {
661 self.0.first()
662 }
663
664 fn pop(&mut self) -> Option<StateRef> {
665 if let Some(&first) = self.0.first() {
666 self.remove(0);
667 Some(first)
668 } else {
669 None
670 }
671 }
672
673 fn swap(&mut self, a: usize, b: usize) {
674 self.0.swap(a, b);
675 unsafe {
677 self.0[a].set_index(a.into());
678 self.0[b].set_index(b.into());
679 }
680 }
681
682 fn reset(&mut self, index: usize, nanos: i64) -> usize {
684 if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
686 self.fix_up(index)
687 } else {
688 self.fix_down(index)
689 }
690 }
691
692 fn remove(&mut self, index: usize) {
693 unsafe {
695 let old_index = self.0[index].set_index(HeapIndex::NULL);
696 debug_assert_eq!(old_index, index.into());
697 }
698
699 let last = self.0.len() - 1;
702 if index < last {
703 let fix_up;
704 unsafe {
705 fix_up = self.0[last].nanos() < self.0[index].nanos();
707 self.0[index] = self.0[last];
708 self.0[index].set_index(index.into());
709 };
710 self.0.truncate(last);
711 if fix_up {
712 self.fix_up(index);
713 } else {
714 self.fix_down(index);
715 }
716 } else {
717 self.0.truncate(last);
718 };
719 }
720
721 fn fix_up(&mut self, mut index: usize) -> usize {
723 while index > 0 {
724 let parent = (index - 1) / 2;
725 if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
727 return index;
728 }
729 self.swap(parent, index);
730 index = parent;
731 }
732 index
733 }
734
735 fn fix_down(&mut self, mut index: usize) -> usize {
737 let len = self.0.len();
738 loop {
739 let left = index * 2 + 1;
740 if left >= len {
741 return index;
742 }
743
744 let mut swap_with = None;
745
746 unsafe {
748 let mut nanos = self.0[index].nanos();
749 let left_nanos = self.0[left].nanos();
750 if left_nanos < nanos {
751 swap_with = Some(left);
752 nanos = left_nanos;
753 }
754 let right = left + 1;
755 if right < len && self.0[right].nanos() < nanos {
756 swap_with = Some(right);
757 }
758 }
759
760 let Some(swap_with) = swap_with else { return index };
761 self.swap(index, swap_with);
762 index = swap_with;
763 }
764 }
765}
766
767#[cfg(test)]
768mod test {
769 use super::*;
770 use crate::{LocalExecutor, SendExecutor, Task, TestExecutor};
771 use assert_matches::assert_matches;
772 use futures::channel::oneshot::channel;
773 use futures::future::Either;
774 use futures::prelude::*;
775 use rand::seq::SliceRandom;
776 use rand::{thread_rng, Rng};
777 use std::future::poll_fn;
778 use std::pin::pin;
779 use zx::MonotonicDuration;
780
781 trait TestTimeInterface:
782 TimeInterface
783 + WakeupTime
784 + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
785 + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
786 {
787 fn after(duration: zx::Duration<Self::Timeline>) -> Self;
788 }
789
790 impl TestTimeInterface for MonotonicInstant {
791 fn after(duration: zx::MonotonicDuration) -> Self {
792 Self::after(duration)
793 }
794 }
795
796 impl TestTimeInterface for BootInstant {
797 fn after(duration: zx::BootDuration) -> Self {
798 Self::after(duration)
799 }
800 }
801
802 fn test_shorter_fires_first<T: TestTimeInterface>() {
803 let mut exec = LocalExecutor::new();
804 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
805 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
806 match exec.run_singlethreaded(future::select(shorter, longer)) {
807 Either::Left(_) => {}
808 Either::Right(_) => panic!("wrong timer fired"),
809 }
810 }
811
812 #[test]
813 fn shorter_fires_first() {
814 test_shorter_fires_first::<MonotonicInstant>();
815 test_shorter_fires_first::<BootInstant>();
816 }
817
818 fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
819 SendExecutor::new(4).run(async {
820 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
821 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
822 match future::select(shorter, longer).await {
823 Either::Left(_) => {}
824 Either::Right(_) => panic!("wrong timer fired"),
825 }
826 });
827 }
828
829 #[test]
830 fn shorter_fires_first_multithreaded() {
831 test_shorter_fires_first_multithreaded::<MonotonicInstant>();
832 test_shorter_fires_first_multithreaded::<BootInstant>();
833 }
834
835 fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
836 let mut exec = TestExecutor::new();
837 let now = T::now();
838 let before = pin!(Timer::new(T::from_nanos(now - 1)));
839 let after = pin!(Timer::new(T::from_nanos(now + 1)));
840 assert_matches!(
841 exec.run_singlethreaded(futures::future::select(before, after)),
842 Either::Left(_),
843 "Timer in the past should fire first"
844 );
845 }
846
847 #[test]
848 fn timer_before_now_fires_immediately() {
849 test_timer_before_now_fires_immediately::<MonotonicInstant>();
850 test_timer_before_now_fires_immediately::<BootInstant>();
851 }
852
853 #[test]
854 fn fires_after_timeout() {
855 let mut exec = TestExecutor::new_with_fake_time();
856 exec.set_fake_time(MonotonicInstant::from_nanos(0));
857 let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
858 let mut future = pin!(Timer::new(deadline));
859 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
860 exec.set_fake_time(deadline);
861 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
862 }
863
864 #[test]
865 fn interval() {
866 let mut exec = TestExecutor::new_with_fake_time();
867 let start = MonotonicInstant::from_nanos(0);
868 exec.set_fake_time(start);
869
870 let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
871 let mut future = pin!({
872 let counter = counter.clone();
873 Interval::new(MonotonicDuration::from_seconds(5))
874 .map(move |()| {
875 counter.fetch_add(1, Ordering::SeqCst);
876 })
877 .collect::<()>()
878 });
879
880 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
882 assert_eq!(0, counter.load(Ordering::SeqCst));
883
884 let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
886 assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
887 exec.set_fake_time(first_deadline);
888 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
889 assert_eq!(1, counter.load(Ordering::SeqCst));
890
891 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
893 assert_eq!(1, counter.load(Ordering::SeqCst));
894
895 let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
897 exec.set_fake_time(second_deadline);
898 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
899 assert_eq!(2, counter.load(Ordering::SeqCst));
900
901 assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
902 }
903
904 #[test]
905 fn timer_fake_time() {
906 let mut exec = TestExecutor::new_with_fake_time();
907 exec.set_fake_time(MonotonicInstant::from_nanos(0));
908
909 let mut timer =
910 pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
911 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
912
913 exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
914 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
915 }
916
917 fn create_timers(
918 timers: &Arc<Timers<MonotonicInstant>>,
919 nanos: &[i64],
920 timer_futures: &mut Vec<Pin<Box<Timer>>>,
921 ) {
922 let waker = futures::task::noop_waker();
923 let mut cx = Context::from_waker(&waker);
924 for &n in nanos {
925 let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
926 let _ = timer.poll_unpin(&mut cx);
927 timer_futures.push(timer);
928 }
929 }
930
931 #[test]
932 fn timer_heap() {
933 let _exec = TestExecutor::new_with_fake_time();
934 let timers = Arc::new(Timers::<MonotonicInstant>::new(0, true));
935
936 let mut timer_futures = Vec::new();
937 let mut nanos: Vec<_> = (0..1000).collect();
938 let mut rng = thread_rng();
939 nanos.shuffle(&mut rng);
940
941 create_timers(&timers, &nanos, &mut timer_futures);
942
943 for i in 0..1000 {
945 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
946 }
947
948 timer_futures.clear();
949 create_timers(&timers, &nanos, &mut timer_futures);
950
951 timer_futures.shuffle(&mut rng);
954 timer_futures.truncate(500);
955 let mut last_time = None;
956 for _ in 0..500 {
957 let time = timers.wake_next_timer().unwrap();
958 if let Some(last_time) = last_time {
959 assert!(last_time <= time);
960 }
961 last_time = Some(time);
962 }
963 assert_eq!(timers.wake_next_timer(), None);
964
965 timer_futures = vec![];
966 create_timers(&timers, &nanos, &mut timer_futures);
967
968 timer_futures.shuffle(&mut rng);
970 let mut nanos: Vec<_> = (1000..2000).collect();
971 nanos.shuffle(&mut rng);
972
973 for (fut, n) in timer_futures.iter_mut().zip(nanos) {
974 fut.as_mut().reset(MonotonicInstant::from_nanos(n));
975 }
976
977 for i in 1000..2000 {
979 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
980 }
981 }
982
983 #[test]
984 fn timer_heap_with_same_time() {
985 let _exec = TestExecutor::new_with_fake_time();
986 let timers = Arc::new(Timers::<MonotonicInstant>::new(0, true));
987
988 let mut timer_futures = Vec::new();
989 let mut nanos: Vec<_> = (1..100).collect();
990 let mut rng = thread_rng();
991 nanos.shuffle(&mut rng);
992
993 create_timers(&timers, &nanos, &mut timer_futures);
994
995 let time = rng.gen_range(0..101);
997 let same_time = [time; 100];
998 create_timers(&timers, &same_time, &mut timer_futures);
999
1000 nanos.extend(&same_time);
1001 nanos.sort();
1002
1003 for n in nanos {
1004 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1005 }
1006 }
1007
1008 #[test]
1009 fn timer_reset_to_earlier_time() {
1010 let mut exec = LocalExecutor::new();
1011
1012 for _ in 0..100 {
1013 let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1014 let (sender, receiver) = channel();
1015 let task = Task::spawn(async move {
1016 let mut timer = pin!(Timer::new(instant));
1017 let mut receiver = pin!(receiver.fuse());
1018 poll_fn(|cx| loop {
1019 if timer.as_mut().poll_unpin(cx).is_ready() {
1020 return Poll::Ready(());
1021 }
1022 if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1023 timer
1024 .as_mut()
1025 .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1026 } else {
1027 return Poll::Pending;
1028 }
1029 })
1030 .await;
1031 });
1032 sender.send(()).unwrap();
1033
1034 exec.run_singlethreaded(task);
1035
1036 if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1037 return;
1038 }
1039 }
1040
1041 panic!("Timer fired late in all 100 attempts");
1042 }
1043}