fuchsia_async/runtime/fuchsia/
timer.rs1use crate::runtime::{BootInstant, EHandle, MonotonicInstant, WakeupTime};
11use crate::PacketReceiver;
12use fuchsia_sync::Mutex;
13
14use futures::future::FusedFuture;
15use futures::stream::FusedStream;
16use futures::task::{AtomicWaker, Context};
17use futures::{FutureExt, Stream};
18use std::cell::UnsafeCell;
19use std::fmt;
20use std::future::Future;
21use std::marker::PhantomPinned;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicU8, Ordering};
24use std::sync::Arc;
25use std::task::{ready, Poll, Waker};
26use zx::AsHandleRef as _;
27
28pub trait TimeInterface:
29 Clone + Copy + fmt::Debug + PartialEq + PartialOrd + Ord + Send + Sync + 'static
30{
31 type Timeline: zx::Timeline + Send + Sync + 'static;
32
33 fn from_nanos(nanos: i64) -> Self;
34 fn into_nanos(self) -> i64;
35 fn zx_instant(nanos: i64) -> zx::Instant<Self::Timeline>;
36 fn now() -> i64;
37 fn create_timer() -> zx::Timer<Self::Timeline>;
38}
39
40impl TimeInterface for MonotonicInstant {
41 type Timeline = zx::MonotonicTimeline;
42
43 fn from_nanos(nanos: i64) -> Self {
44 Self::from_nanos(nanos)
45 }
46
47 fn into_nanos(self) -> i64 {
48 self.into_nanos()
49 }
50
51 fn zx_instant(nanos: i64) -> zx::MonotonicInstant {
52 zx::MonotonicInstant::from_nanos(nanos)
53 }
54
55 fn now() -> i64 {
56 EHandle::local().inner().now().into_nanos()
57 }
58
59 fn create_timer() -> zx::Timer<Self::Timeline> {
60 zx::Timer::<Self::Timeline>::create()
61 }
62}
63
64impl TimeInterface for BootInstant {
65 type Timeline = zx::BootTimeline;
66
67 fn from_nanos(nanos: i64) -> Self {
68 Self::from_nanos(nanos)
69 }
70
71 fn into_nanos(self) -> i64 {
72 self.into_nanos()
73 }
74
75 fn zx_instant(nanos: i64) -> zx::BootInstant {
76 zx::BootInstant::from_nanos(nanos)
77 }
78
79 fn now() -> i64 {
80 EHandle::local().inner().boot_now().into_nanos()
81 }
82
83 fn create_timer() -> zx::Timer<Self::Timeline> {
84 zx::Timer::<Self::Timeline>::create()
85 }
86}
87
88impl WakeupTime for std::time::Instant {
89 fn into_timer(self) -> Timer {
90 let now_as_instant = std::time::Instant::now();
91 let now_as_time = MonotonicInstant::now();
92 EHandle::local()
93 .mono_timers()
94 .new_timer(now_as_time + self.saturating_duration_since(now_as_instant).into())
95 }
96}
97
98impl WakeupTime for MonotonicInstant {
99 fn into_timer(self) -> Timer {
100 EHandle::local().mono_timers().new_timer(self)
101 }
102}
103
104impl WakeupTime for BootInstant {
105 fn into_timer(self) -> Timer {
106 EHandle::local().boot_timers().new_timer(self)
107 }
108}
109
110impl WakeupTime for zx::MonotonicInstant {
111 fn into_timer(self) -> Timer {
112 EHandle::local().mono_timers().new_timer(self.into())
113 }
114}
115
116impl WakeupTime for zx::BootInstant {
117 fn into_timer(self) -> Timer {
118 EHandle::local().boot_timers().new_timer(self.into())
119 }
120}
121
122#[must_use = "futures do nothing unless polled"]
124pub struct Timer(TimerState);
125
126impl Timer {
127 pub fn new(time: impl WakeupTime) -> Self {
129 time.into_timer()
130 }
131
132 pub fn reset(self: Pin<&mut Self>, time: MonotonicInstant) {
134 let nanos = time.into_nanos();
135 if self.0.state.load(Ordering::Relaxed) == UNREGISTERED
146 || !self.0.timers.try_reset_timer(&self.0, nanos)
147 {
148 unsafe { *self.0.nanos.get() = nanos };
151 self.0.state.store(UNREGISTERED, Ordering::Relaxed);
152 }
153 }
154}
155
156impl fmt::Debug for Timer {
157 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
158 f.debug_struct("Timer").field("time", &self.0.nanos).finish()
159 }
160}
161
162impl Drop for Timer {
163 fn drop(&mut self) {
164 self.0.timers.unregister(&self.0);
165 }
166}
167
168impl Future for Timer {
169 type Output = ();
170 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 unsafe { self.0.timers.poll(self.as_ref(), cx) }
173 }
174}
175
176struct TimerState {
177 timers: Arc<dyn TimersInterface>,
178
179 nanos: UnsafeCell<i64>,
181
182 waker: AtomicWaker,
183 state: AtomicU8,
184
185 index: UnsafeCell<HeapIndex>,
188
189 _pinned: PhantomPinned,
191}
192
193unsafe impl Send for TimerState {}
195unsafe impl Sync for TimerState {}
196
197const UNREGISTERED: u8 = 0;
199
200const REGISTERED: u8 = 1;
202
203const FIRED: u8 = 2;
205
206const TERMINATED: u8 = 3;
208
209#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211struct HeapIndex(usize);
212
213impl HeapIndex {
214 const NULL: HeapIndex = HeapIndex(usize::MAX);
215
216 fn get(&self) -> Option<usize> {
217 if *self == HeapIndex::NULL {
218 None
219 } else {
220 Some(self.0)
221 }
222 }
223}
224
225impl From<usize> for HeapIndex {
226 fn from(value: usize) -> Self {
227 Self(value)
228 }
229}
230
231impl FusedFuture for Timer {
232 fn is_terminated(&self) -> bool {
233 self.0.state.load(Ordering::Relaxed) == TERMINATED
234 }
235}
236
237#[derive(Copy, Clone, Debug)]
248struct StateRef(*const TimerState);
249
250unsafe impl Send for StateRef {}
252unsafe impl Sync for StateRef {}
253
254impl StateRef {
255 fn into_waker(self, _inner: &mut Inner) -> Option<Waker> {
256 unsafe {
258 (*self.0).state.store(FIRED, Ordering::Relaxed);
265 (*self.0).waker.take()
266 }
267 }
268
269 unsafe fn nanos(&self) -> i64 {
273 *(*self.0).nanos.get()
274 }
275
276 unsafe fn nanos_mut(&mut self) -> &mut i64 {
280 &mut *(*self.0).nanos.get()
281 }
282
283 unsafe fn set_index(&mut self, index: HeapIndex) -> HeapIndex {
287 std::mem::replace(&mut *(*self.0).index.get(), index)
288 }
289}
290
291#[derive(Debug)]
298#[must_use = "streams do nothing unless polled"]
299pub struct Interval {
300 timer: Pin<Box<Timer>>,
301 next: MonotonicInstant,
302 duration: zx::MonotonicDuration,
303}
304
305impl Interval {
306 pub fn new(duration: zx::MonotonicDuration) -> Self {
308 let next = MonotonicInstant::after(duration);
309 Interval { timer: Box::pin(Timer::new(next)), next, duration }
310 }
311}
312
313impl FusedStream for Interval {
314 fn is_terminated(&self) -> bool {
315 false
317 }
318}
319
320impl Stream for Interval {
321 type Item = ();
322 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
323 ready!(self.timer.poll_unpin(cx));
324 let next = self.next + self.duration;
325 self.timer.as_mut().reset(next);
326 self.next = next;
327 Poll::Ready(Some(()))
328 }
329}
330
331pub(crate) struct Timers<T: TimeInterface> {
332 inner: Mutex<Inner>,
333
334 port_key: u64,
339
340 fake: bool,
341
342 timer: zx::Timer<T::Timeline>,
343}
344
345struct Inner {
346 timers: Heap,
348
349 last_deadline: Option<i64>,
352
353 async_wait: bool,
355}
356
357impl<T: TimeInterface> Timers<T> {
358 pub fn new(port_key: u64, fake: bool) -> Self {
359 Self {
360 inner: Mutex::new(Inner {
361 timers: Heap::default(),
362 last_deadline: None,
363 async_wait: false,
364 }),
365 port_key,
366 fake,
367 timer: T::create_timer(),
368 }
369 }
370
371 pub fn new_timer(self: &Arc<Self>, time: T) -> Timer {
372 let nanos = time.into_nanos();
373 Timer(TimerState {
374 timers: self.clone(),
375 nanos: UnsafeCell::new(nanos),
376 waker: AtomicWaker::new(),
377 state: AtomicU8::new(UNREGISTERED),
378 index: UnsafeCell::new(HeapIndex::NULL),
379 _pinned: PhantomPinned,
380 })
381 }
382
383 fn setup_zircon_timer(&self, inner: &mut Inner, from_receive_packet: bool) {
390 let new_deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
393
394 if new_deadline != inner.last_deadline {
397 inner.last_deadline = new_deadline;
398 match inner.last_deadline {
399 Some(deadline) => {
400 self.timer.set(T::zx_instant(deadline), zx::Duration::ZERO).unwrap()
401 }
402 None => self.timer.cancel().unwrap(),
403 }
404 }
405
406 if from_receive_packet {
410 inner.async_wait = false;
411 }
412
413 if inner.last_deadline.is_some() && !inner.async_wait {
416 if self.fake {
417 self.timer.signal_handle(zx::Signals::USER_0, zx::Signals::empty()).unwrap();
420 }
421
422 self.timer
423 .wait_async_handle(
424 EHandle::local().port(),
425 self.port_key,
426 if self.fake { zx::Signals::USER_0 } else { zx::Signals::TIMER_SIGNALED },
427 zx::WaitAsyncOpts::empty(),
428 )
429 .unwrap();
430
431 inner.async_wait = true;
432 }
433 }
434
435 pub fn port_key(&self) -> u64 {
436 self.port_key
437 }
438
439 pub fn wake_timers(&self) -> bool {
441 self.wake_timers_impl(false)
442 }
443
444 fn wake_timers_impl(&self, from_receive_packet: bool) -> bool {
445 let now = T::now();
446 let mut timers_woken = false;
447
448 loop {
449 let waker = {
450 let mut inner = self.inner.lock();
451
452 let deadline = inner.timers.peek().map(|timer| unsafe { timer.nanos() });
454 if deadline.is_some_and(|d| d <= now) {
455 let timer = inner.timers.pop().unwrap();
456 timer.into_waker(&mut inner)
457 } else {
458 self.setup_zircon_timer(&mut inner, from_receive_packet);
471 break;
472 }
473 };
474 if let Some(waker) = waker {
475 waker.wake()
476 }
477 timers_woken = true;
478 }
479 timers_woken
480 }
481
482 pub fn wake_next_timer(&self) -> Option<T> {
484 let (nanos, waker) = {
485 let mut inner = self.inner.lock();
486 let timer = inner.timers.pop()?;
487 let nanos = unsafe { timer.nanos() };
489 (nanos, timer.into_waker(&mut inner))
490 };
491 if let Some(waker) = waker {
492 waker.wake();
493 }
494 Some(T::from_nanos(nanos))
495 }
496
497 pub fn next_timer(&self) -> Option<T> {
499 self.inner.lock().timers.peek().map(|state| T::from_nanos(unsafe { state.nanos() }))
501 }
502
503 pub fn maybe_notify(&self, now: T) {
509 assert!(self.fake, "calling this function requires using fake time.");
510 if self
512 .inner
513 .lock()
514 .timers
515 .peek()
516 .is_some_and(|state| unsafe { state.nanos() } <= now.into_nanos())
517 {
518 self.timer.signal_handle(zx::Signals::empty(), zx::Signals::USER_0).unwrap();
519 }
520 }
521}
522
523impl<T: TimeInterface> PacketReceiver for Timers<T> {
524 fn receive_packet(&self, _packet: zx::Packet) {
525 self.wake_timers_impl(true);
526 }
527}
528
529trait TimersInterface: Send + Sync + 'static {
531 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()>;
532 fn unregister(&self, state: &TimerState);
533 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool;
534}
535
536impl<T: TimeInterface> TimersInterface for Timers<T> {
537 unsafe fn poll(&self, timer: Pin<&Timer>, cx: &mut Context<'_>) -> Poll<()> {
541 let state = timer.0.state.load(Ordering::Relaxed);
552
553 if state == TERMINATED {
554 return Poll::Ready(());
555 }
556
557 if state == FIRED {
558 timer.0.state.store(TERMINATED, Ordering::Relaxed);
559 return Poll::Ready(());
560 }
561
562 if state == UNREGISTERED {
563 let nanos = unsafe { *timer.0.nanos.get() };
565 if nanos <= T::now() {
566 timer.0.state.store(FIRED, Ordering::Relaxed);
567 return Poll::Ready(());
568 }
569 let mut inner = self.inner.lock();
570
571 inner.timers.push(StateRef(&timer.0));
574
575 self.setup_zircon_timer(&mut inner, false);
578
579 timer.0.state.store(REGISTERED, Ordering::Relaxed);
580 }
581
582 timer.0.waker.register(cx.waker());
583
584 let state = timer.0.state.load(Ordering::Relaxed);
592 match state {
593 FIRED => {
594 timer.0.state.store(TERMINATED, Ordering::Relaxed);
595 Poll::Ready(())
596 }
597 REGISTERED => Poll::Pending,
598 _ => {
602 unreachable!();
603 }
604 }
605 }
606
607 fn unregister(&self, timer: &TimerState) {
608 if timer.state.load(Ordering::Relaxed) == UNREGISTERED {
609 return;
616 }
617 let mut inner = self.inner.lock();
618 let index = unsafe { *timer.index.get() };
620 if let Some(index) = index.get() {
621 inner.timers.remove(index);
622 if index == 0 {
623 self.setup_zircon_timer(&mut inner, false);
626 }
627 timer.state.store(UNREGISTERED, Ordering::Relaxed);
628 }
629 }
630
631 fn try_reset_timer(&self, timer: &TimerState, nanos: i64) -> bool {
633 let mut inner = self.inner.lock();
634 let index = unsafe { *timer.index.get() };
636 if let Some(old_index) = index.get() {
637 if (inner.timers.reset(old_index, nanos) == 0) || (old_index == 0) {
638 self.setup_zircon_timer(&mut inner, false);
641 }
642 timer.state.store(REGISTERED, Ordering::Relaxed);
643 true
644 } else {
645 false
646 }
647 }
648}
649
650#[derive(Default)]
651struct Heap(Vec<StateRef>);
652
653impl Heap {
656 fn push(&mut self, mut timer: StateRef) {
657 let index = self.0.len();
658 self.0.push(timer);
659 unsafe {
661 timer.set_index(index.into());
662 }
663 self.fix_up(index);
664 }
665
666 fn peek(&self) -> Option<&StateRef> {
667 self.0.first()
668 }
669
670 fn pop(&mut self) -> Option<StateRef> {
671 if let Some(&first) = self.0.first() {
672 self.remove(0);
673 Some(first)
674 } else {
675 None
676 }
677 }
678
679 fn swap(&mut self, a: usize, b: usize) {
680 self.0.swap(a, b);
681 unsafe {
683 self.0[a].set_index(a.into());
684 self.0[b].set_index(b.into());
685 }
686 }
687
688 fn reset(&mut self, index: usize, nanos: i64) -> usize {
690 if nanos < std::mem::replace(unsafe { self.0[index].nanos_mut() }, nanos) {
692 self.fix_up(index)
693 } else {
694 self.fix_down(index)
695 }
696 }
697
698 fn remove(&mut self, index: usize) {
699 unsafe {
701 let old_index = self.0[index].set_index(HeapIndex::NULL);
702 debug_assert_eq!(old_index, index.into());
703 }
704
705 let last = self.0.len() - 1;
708 if index < last {
709 let fix_up;
710 unsafe {
711 fix_up = self.0[last].nanos() < self.0[index].nanos();
713 self.0[index] = self.0[last];
714 self.0[index].set_index(index.into());
715 };
716 self.0.truncate(last);
717 if fix_up {
718 self.fix_up(index);
719 } else {
720 self.fix_down(index);
721 }
722 } else {
723 self.0.truncate(last);
724 };
725 }
726
727 fn fix_up(&mut self, mut index: usize) -> usize {
729 while index > 0 {
730 let parent = (index - 1) / 2;
731 if unsafe { self.0[parent].nanos() <= self.0[index].nanos() } {
733 return index;
734 }
735 self.swap(parent, index);
736 index = parent;
737 }
738 index
739 }
740
741 fn fix_down(&mut self, mut index: usize) -> usize {
743 let len = self.0.len();
744 loop {
745 let left = index * 2 + 1;
746 if left >= len {
747 return index;
748 }
749
750 let mut swap_with = None;
751
752 unsafe {
754 let mut nanos = self.0[index].nanos();
755 let left_nanos = self.0[left].nanos();
756 if left_nanos < nanos {
757 swap_with = Some(left);
758 nanos = left_nanos;
759 }
760 let right = left + 1;
761 if right < len && self.0[right].nanos() < nanos {
762 swap_with = Some(right);
763 }
764 }
765
766 let Some(swap_with) = swap_with else { return index };
767 self.swap(index, swap_with);
768 index = swap_with;
769 }
770 }
771}
772
773#[cfg(test)]
774mod test {
775 use super::*;
776 use crate::{LocalExecutor, SendExecutor, Task, TestExecutor};
777 use assert_matches::assert_matches;
778 use futures::channel::oneshot::channel;
779 use futures::future::Either;
780 use futures::prelude::*;
781 use rand::seq::SliceRandom;
782 use rand::{thread_rng, Rng};
783 use std::future::poll_fn;
784 use std::pin::pin;
785 use zx::MonotonicDuration;
786
787 trait TestTimeInterface:
788 TimeInterface
789 + WakeupTime
790 + std::ops::Sub<zx::Duration<Self::Timeline>, Output = Self>
791 + std::ops::Add<zx::Duration<Self::Timeline>, Output = Self>
792 {
793 fn after(duration: zx::Duration<Self::Timeline>) -> Self;
794 }
795
796 impl TestTimeInterface for MonotonicInstant {
797 fn after(duration: zx::MonotonicDuration) -> Self {
798 Self::after(duration)
799 }
800 }
801
802 impl TestTimeInterface for BootInstant {
803 fn after(duration: zx::BootDuration) -> Self {
804 Self::after(duration)
805 }
806 }
807
808 fn test_shorter_fires_first<T: TestTimeInterface>() {
809 let mut exec = LocalExecutor::new();
810 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
811 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
812 match exec.run_singlethreaded(future::select(shorter, longer)) {
813 Either::Left(_) => {}
814 Either::Right(_) => panic!("wrong timer fired"),
815 }
816 }
817
818 #[test]
819 fn shorter_fires_first() {
820 test_shorter_fires_first::<MonotonicInstant>();
821 test_shorter_fires_first::<BootInstant>();
822 }
823
824 fn test_shorter_fires_first_multithreaded<T: TestTimeInterface>() {
825 SendExecutor::new(4).run(async {
826 let shorter = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_millis(100))));
827 let longer = pin!(Timer::new(T::after(zx::Duration::<T::Timeline>::from_seconds(1))));
828 match future::select(shorter, longer).await {
829 Either::Left(_) => {}
830 Either::Right(_) => panic!("wrong timer fired"),
831 }
832 });
833 }
834
835 #[test]
836 fn shorter_fires_first_multithreaded() {
837 test_shorter_fires_first_multithreaded::<MonotonicInstant>();
838 test_shorter_fires_first_multithreaded::<BootInstant>();
839 }
840
841 fn test_timer_before_now_fires_immediately<T: TestTimeInterface>() {
842 let mut exec = TestExecutor::new();
843 let now = T::now();
844 let before = pin!(Timer::new(T::from_nanos(now - 1)));
845 let after = pin!(Timer::new(T::from_nanos(now + 1)));
846 assert_matches!(
847 exec.run_singlethreaded(futures::future::select(before, after)),
848 Either::Left(_),
849 "Timer in the past should fire first"
850 );
851 }
852
853 #[test]
854 fn timer_before_now_fires_immediately() {
855 test_timer_before_now_fires_immediately::<MonotonicInstant>();
856 test_timer_before_now_fires_immediately::<BootInstant>();
857 }
858
859 #[test]
860 fn fires_after_timeout() {
861 let mut exec = TestExecutor::new_with_fake_time();
862 exec.set_fake_time(MonotonicInstant::from_nanos(0));
863 let deadline = MonotonicInstant::after(MonotonicDuration::from_seconds(5));
864 let mut future = pin!(Timer::new(deadline));
865 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
866 exec.set_fake_time(deadline);
867 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut future));
868 }
869
870 #[test]
871 fn interval() {
872 let mut exec = TestExecutor::new_with_fake_time();
873 let start = MonotonicInstant::from_nanos(0);
874 exec.set_fake_time(start);
875
876 let counter = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
877 let mut future = pin!({
878 let counter = counter.clone();
879 Interval::new(MonotonicDuration::from_seconds(5))
880 .map(move |()| {
881 counter.fetch_add(1, Ordering::SeqCst);
882 })
883 .collect::<()>()
884 });
885
886 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
888 assert_eq!(0, counter.load(Ordering::SeqCst));
889
890 let first_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (1)");
892 assert!(first_deadline >= MonotonicDuration::from_seconds(5) + start);
893 exec.set_fake_time(first_deadline);
894 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
895 assert_eq!(1, counter.load(Ordering::SeqCst));
896
897 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
899 assert_eq!(1, counter.load(Ordering::SeqCst));
900
901 let second_deadline = TestExecutor::next_timer().expect("Expected a pending timeout (2)");
903 exec.set_fake_time(second_deadline);
904 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut future));
905 assert_eq!(2, counter.load(Ordering::SeqCst));
906
907 assert_eq!(second_deadline, first_deadline + MonotonicDuration::from_seconds(5));
908 }
909
910 #[test]
911 fn timer_fake_time() {
912 let mut exec = TestExecutor::new_with_fake_time();
913 exec.set_fake_time(MonotonicInstant::from_nanos(0));
914
915 let mut timer =
916 pin!(Timer::new(MonotonicInstant::after(MonotonicDuration::from_seconds(1))));
917 assert_eq!(Poll::Pending, exec.run_until_stalled(&mut timer));
918
919 exec.set_fake_time(MonotonicInstant::after(MonotonicDuration::from_seconds(1)));
920 assert_eq!(Poll::Ready(()), exec.run_until_stalled(&mut timer));
921 }
922
923 fn create_timers(
924 timers: &Arc<Timers<MonotonicInstant>>,
925 nanos: &[i64],
926 timer_futures: &mut Vec<Pin<Box<Timer>>>,
927 ) {
928 let waker = futures::task::noop_waker();
929 let mut cx = Context::from_waker(&waker);
930 for &n in nanos {
931 let mut timer = Box::pin(timers.new_timer(MonotonicInstant::from_nanos(n)));
932 let _ = timer.poll_unpin(&mut cx);
933 timer_futures.push(timer);
934 }
935 }
936
937 #[test]
938 fn timer_heap() {
939 let _exec = TestExecutor::new_with_fake_time();
940 let timers = Arc::new(Timers::<MonotonicInstant>::new(0, true));
941
942 let mut timer_futures = Vec::new();
943 let mut nanos: Vec<_> = (0..1000).collect();
944 let mut rng = thread_rng();
945 nanos.shuffle(&mut rng);
946
947 create_timers(&timers, &nanos, &mut timer_futures);
948
949 for i in 0..1000 {
951 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
952 }
953
954 timer_futures.clear();
955 create_timers(&timers, &nanos, &mut timer_futures);
956
957 timer_futures.shuffle(&mut rng);
960 timer_futures.truncate(500);
961 let mut last_time = None;
962 for _ in 0..500 {
963 let time = timers.wake_next_timer().unwrap();
964 if let Some(last_time) = last_time {
965 assert!(last_time <= time);
966 }
967 last_time = Some(time);
968 }
969 assert_eq!(timers.wake_next_timer(), None);
970
971 timer_futures = vec![];
972 create_timers(&timers, &nanos, &mut timer_futures);
973
974 timer_futures.shuffle(&mut rng);
976 let mut nanos: Vec<_> = (1000..2000).collect();
977 nanos.shuffle(&mut rng);
978
979 for (fut, n) in timer_futures.iter_mut().zip(nanos) {
980 fut.as_mut().reset(MonotonicInstant::from_nanos(n));
981 }
982
983 for i in 1000..2000 {
985 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(i)));
986 }
987 }
988
989 #[test]
990 fn timer_heap_with_same_time() {
991 let _exec = TestExecutor::new_with_fake_time();
992 let timers = Arc::new(Timers::<MonotonicInstant>::new(0, true));
993
994 let mut timer_futures = Vec::new();
995 let mut nanos: Vec<_> = (1..100).collect();
996 let mut rng = thread_rng();
997 nanos.shuffle(&mut rng);
998
999 create_timers(&timers, &nanos, &mut timer_futures);
1000
1001 let time = rng.gen_range(0..101);
1003 let same_time = [time; 100];
1004 create_timers(&timers, &same_time, &mut timer_futures);
1005
1006 nanos.extend(&same_time);
1007 nanos.sort();
1008
1009 for n in nanos {
1010 assert_eq!(timers.wake_next_timer(), Some(MonotonicInstant::from_nanos(n)));
1011 }
1012 }
1013
1014 #[test]
1015 fn timer_reset_to_earlier_time() {
1016 let mut exec = LocalExecutor::new();
1017
1018 for _ in 0..100 {
1019 let instant = MonotonicInstant::after(MonotonicDuration::from_millis(100));
1020 let (sender, receiver) = channel();
1021 let task = Task::spawn(async move {
1022 let mut timer = pin!(Timer::new(instant));
1023 let mut receiver = pin!(receiver.fuse());
1024 poll_fn(|cx| loop {
1025 if timer.as_mut().poll_unpin(cx).is_ready() {
1026 return Poll::Ready(());
1027 }
1028 if !receiver.is_terminated() && receiver.poll_unpin(cx).is_ready() {
1029 timer
1030 .as_mut()
1031 .reset(MonotonicInstant::after(MonotonicDuration::from_millis(1)));
1032 } else {
1033 return Poll::Pending;
1034 }
1035 })
1036 .await;
1037 });
1038 sender.send(()).unwrap();
1039
1040 exec.run_singlethreaded(task);
1041
1042 if MonotonicInstant::after(MonotonicDuration::from_millis(1)) < instant {
1043 return;
1044 }
1045 }
1046
1047 panic!("Timer fired late in all 100 attempts");
1048 }
1049
1050 #[test]
1051 fn test_reset() {
1052 SendExecutor::new(2).run(async {
1054 const TIMER_DELAY: zx::MonotonicDuration = zx::Duration::from_micros(100);
1055 let mut timer = pin!(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1056 for _ in 0..10000 {
1057 let _ = futures::poll!(timer.as_mut());
1058 std::thread::sleep(std::time::Duration::from_micros(
1059 rand::thread_rng().gen_range(80..120),
1060 ));
1061 timer.as_mut().reset(MonotonicInstant::after(TIMER_DELAY));
1062 timer.set(Timer::new(MonotonicInstant::after(TIMER_DELAY)));
1063 }
1064 });
1065 }
1066}