1use alloc::collections::{binary_heap, BinaryHeap};
8use core::hash::Hash;
9use core::time::Duration;
10
11use netstack3_hashmap::{hash_map, HashMap};
12
13use crate::{CoreTimerContext, Instant, InstantBindingsTypes, TimerBindingsTypes, TimerContext};
14
15#[derive(Debug)]
25pub struct LocalTimerHeap<K, V, BT: TimerBindingsTypes + InstantBindingsTypes> {
26 next_wakeup: BT::Timer,
27 heap: KeyedHeap<K, V, BT::Instant>,
28}
29
30impl<K, V, BC> LocalTimerHeap<K, V, BC>
31where
32 K: Hash + Eq + Clone,
33 BC: TimerContext,
34{
35 pub fn new(bindings_ctx: &mut BC, dispatch_id: BC::DispatchId) -> Self {
37 let next_wakeup = bindings_ctx.new_timer(dispatch_id);
38 Self { next_wakeup, heap: KeyedHeap::new() }
39 }
40
41 pub fn new_with_context<D, CC: CoreTimerContext<D, BC>>(
43 bindings_ctx: &mut BC,
44 dispatch_id: D,
45 ) -> Self {
46 Self::new(bindings_ctx, CC::convert_timer(dispatch_id))
47 }
48
49 pub fn schedule_instant(
54 &mut self,
55 bindings_ctx: &mut BC,
56 timer: K,
57 value: V,
58 at: BC::Instant,
59 ) -> Option<(BC::Instant, V)> {
60 let (prev_value, dirty) = self.heap.schedule(timer, value, at);
61 if dirty {
62 self.heal_and_reschedule(bindings_ctx);
63 }
64 prev_value
65 }
66
67 pub fn schedule_after(
74 &mut self,
75 bindings_ctx: &mut BC,
76 timer: K,
77 value: V,
78 after: Duration,
79 ) -> Option<(BC::Instant, V)> {
80 let time = bindings_ctx.now().checked_add(after).unwrap();
81 self.schedule_instant(bindings_ctx, timer, value, time)
82 }
83
84 pub fn pop(&mut self, bindings_ctx: &mut BC) -> Option<(K, V)> {
86 let Self { next_wakeup: _, heap } = self;
87 let (popped, dirty) = heap.pop_if(|t| t <= bindings_ctx.now());
88 if dirty {
89 self.heal_and_reschedule(bindings_ctx);
90 }
91 popped
92 }
93
94 pub fn get(&self, timer: &K) -> Option<(BC::Instant, &V)> {
97 self.heap.map.get(timer).map(|MapEntry { time, value }| (*time, value))
98 }
99
100 pub fn cancel(&mut self, bindings_ctx: &mut BC, timer: &K) -> Option<(BC::Instant, V)> {
103 let (scheduled, dirty) = self.heap.cancel(timer);
104 if dirty {
105 self.heal_and_reschedule(bindings_ctx);
106 }
107 scheduled
108 }
109
110 pub fn iter(&self) -> impl Iterator<Item = (&K, &V, &BC::Instant)> {
112 self.heap.map.iter().map(|(k, MapEntry { time, value })| (k, value, time))
113 }
114
115 fn heal_and_reschedule(&mut self, bindings_ctx: &mut BC) {
116 let Self { next_wakeup, heap } = self;
117 let mut new_top = None;
118 let _ = heap.pop_if(|t| {
119 new_top = Some(t);
123 false
124 });
125 let _: Option<BC::Instant> = match new_top {
126 Some(time) => bindings_ctx.schedule_timer_instant(time, next_wakeup),
127 None => bindings_ctx.cancel_timer(next_wakeup),
128 };
129 }
130
131 pub fn clear(&mut self, bindings_ctx: &mut BC) {
133 let Self { next_wakeup, heap } = self;
134 heap.clear();
135 let _: Option<BC::Instant> = bindings_ctx.cancel_timer(next_wakeup);
136 }
137
138 pub fn is_empty(&self) -> bool {
140 self.heap.map.is_empty()
141 }
142}
143
144#[derive(Debug)]
148struct KeyedHeap<K, V, T> {
149 map: HashMap<K, MapEntry<T, V>>,
156 heap: BinaryHeap<HeapEntry<T, K>>,
157}
158
159impl<K: Hash + Eq + Clone, V, T: Instant> KeyedHeap<K, V, T> {
160 fn new() -> Self {
161 Self { map: HashMap::new(), heap: BinaryHeap::new() }
162 }
163
164 fn schedule(&mut self, key: K, value: V, at: T) -> (Option<(T, V)>, bool) {
170 let Self { map, heap } = self;
171 let dirty = heap
176 .peek()
177 .map(|HeapEntry { time, key: top_key }| top_key == &key || at < *time)
178 .unwrap_or(true);
179 let (heap_entry, prev) = match map.entry(key) {
180 hash_map::Entry::Occupied(mut o) => {
181 let MapEntry { time, value } = o.insert(MapEntry { time: at, value });
182 let heap_entry = (at < time).then(|| HeapEntry { time: at, key: o.key().clone() });
185 (heap_entry, Some((time, value)))
186 }
187 hash_map::Entry::Vacant(v) => {
188 let heap_entry = Some(HeapEntry { time: at, key: v.key().clone() });
189 let _: &mut MapEntry<_, _> = v.insert(MapEntry { time: at, value });
190 (heap_entry, None)
191 }
192 };
193 if let Some(heap_entry) = heap_entry {
194 heap.push(heap_entry);
195 }
196 (prev, dirty)
197 }
198
199 fn cancel(&mut self, key: &K) -> (Option<(T, V)>, bool) {
205 let Self { heap, map } = self;
206 let was_front = heap.peek().is_some_and(|HeapEntry { time: _, key: top }| key == top);
208 let prev = map.remove(key).map(|MapEntry { time, value }| (time, value));
209 (prev, was_front)
210 }
211
212 fn pop_if<F: FnOnce(T) -> bool>(&mut self, f: F) -> (Option<(K, V)>, bool) {
222 let mut changed_heap = false;
223 let popped = loop {
224 let Self { heap, map } = self;
225 let Some(peek_mut) = heap.peek_mut() else {
226 break None;
227 };
228 let HeapEntry { time: heap_time, key } = &*peek_mut;
229 match map.entry(key.clone()) {
235 hash_map::Entry::Vacant(_) => {
236 let _: HeapEntry<_, _> = binary_heap::PeekMut::pop(peek_mut);
238 changed_heap = true;
239 }
240 hash_map::Entry::Occupied(map_entry) => {
241 let MapEntry { time: scheduled_for, value: _ } = map_entry.get();
242
243 match heap_time.cmp(scheduled_for) {
244 core::cmp::Ordering::Equal => {
245 break f(*scheduled_for).then(|| {
248 let HeapEntry { time: _, key } =
249 binary_heap::PeekMut::pop(peek_mut);
250 changed_heap = true;
251 let MapEntry { time: _, value } = map_entry.remove();
252 (key, value)
253 });
254 }
255 core::cmp::Ordering::Less => {
256 let HeapEntry { time: _, key } = binary_heap::PeekMut::pop(peek_mut);
261 heap.push(HeapEntry { time: *scheduled_for, key });
262 changed_heap = true;
263 }
264 core::cmp::Ordering::Greater => {
265 unreachable!(
273 "observed heap time: {:?} later than the scheduled time {:?}",
274 heap_time, scheduled_for
275 );
276 }
277 }
278 }
279 }
280 };
281 (popped, changed_heap)
282 }
283
284 fn clear(&mut self) {
285 let Self { map, heap } = self;
286 map.clear();
287 heap.clear();
288 }
289}
290
291#[derive(Debug, Eq, PartialEq)]
293struct MapEntry<T, V> {
294 time: T,
295 value: V,
296}
297
298#[derive(Debug)]
302struct HeapEntry<T, K> {
303 time: T,
304 key: K,
305}
306
307impl<T: Instant, K> PartialEq for HeapEntry<T, K> {
309 fn eq(&self, other: &Self) -> bool {
310 self.time == other.time
311 }
312}
313
314impl<T: Instant, K> Eq for HeapEntry<T, K> {}
315
316impl<T: Instant, K> Ord for HeapEntry<T, K> {
317 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
318 Ord::cmp(&other.time, &self.time)
321 }
322}
323
324impl<T: Instant, K> PartialOrd for HeapEntry<T, K> {
325 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
326 Some(Ord::cmp(self, other))
327 }
328}
329
330#[cfg(any(test, feature = "testutils"))]
331mod testutil {
332 use core::fmt::Debug;
333 use core::ops::RangeBounds;
334
335 use super::*;
336
337 impl<K, V, BC> LocalTimerHeap<K, V, BC>
338 where
339 K: Hash + Eq + Clone + Debug,
340 V: Debug + Eq + PartialEq,
341 BC: TimerContext,
342 {
343 #[track_caller]
346 pub fn assert_timers(&self, timers: impl IntoIterator<Item = (K, V, BC::Instant)>) {
347 let map = timers
348 .into_iter()
349 .map(|(k, value, time)| (k, MapEntry { value, time }))
350 .collect::<HashMap<_, _>>();
351 assert_eq!(&self.heap.map, &map);
352 }
353
354 #[track_caller]
357 pub fn assert_timers_after(
358 &self,
359 bindings_ctx: &mut BC,
360 timers: impl IntoIterator<Item = (K, V, Duration)>,
361 ) {
362 let now = bindings_ctx.now();
363 self.assert_timers(timers.into_iter().map(|(k, v, d)| (k, v, now.panicking_add(d))))
364 }
365
366 #[track_caller]
368 pub fn assert_top(&mut self, key: &K, value: &V) {
369 let top = self
373 .heap
374 .map
375 .iter()
376 .min_by_key(|(_key, MapEntry { time, .. })| time)
377 .map(|(key, MapEntry { time: _, value })| (key, value));
378 assert_eq!(top, Some((key, value)));
379 }
380
381 #[track_caller]
384 pub fn assert_range<
385 'a,
386 R: RangeBounds<BC::Instant> + Debug,
387 I: IntoIterator<Item = (&'a K, R)>,
388 >(
389 &'a self,
390 expect: I,
391 ) {
392 for (timer, range) in expect {
393 let time = self
394 .get(timer)
395 .map(|(t, _)| t)
396 .unwrap_or_else(|| panic!("timer {timer:?} not present"));
397 assert!(range.contains(&time), "timer {timer:?} is at {time:?} not in {range:?}");
398 }
399 }
400
401 #[track_caller]
404 pub fn assert_range_single<'a, R: RangeBounds<BC::Instant> + Debug>(
405 &'a self,
406 timer: &K,
407 range: R,
408 ) -> (BC::Instant, &V) {
409 let (time, value) =
410 self.get(timer).unwrap_or_else(|| panic!("timer {timer:?} not present"));
411 assert!(range.contains(&time), "timer {timer:?} is at {time:?} not in {range:?}");
412 (time, value)
413 }
414 }
415}
416
417#[cfg(test)]
418mod tests {
419 use alloc::vec::Vec;
420 use core::convert::Infallible as Never;
421
422 use crate::testutil::{FakeAtomicInstant, FakeInstant, FakeInstantCtx};
423 use crate::InstantContext;
424
425 use super::*;
426
427 #[derive(Default)]
428 struct FakeTimerCtx {
429 instant: FakeInstantCtx,
430 }
431
432 impl InstantBindingsTypes for FakeTimerCtx {
433 type Instant = FakeInstant;
434 type AtomicInstant = FakeAtomicInstant;
435 }
436
437 impl InstantContext for FakeTimerCtx {
438 fn now(&self) -> Self::Instant {
439 self.instant.now()
440 }
441 }
442
443 impl TimerBindingsTypes for FakeTimerCtx {
444 type Timer = FakeTimer;
445 type DispatchId = ();
446 type UniqueTimerId = Never;
447 }
448
449 impl TimerContext for FakeTimerCtx {
450 fn new_timer(&mut self, (): Self::DispatchId) -> Self::Timer {
451 FakeTimer::default()
452 }
453
454 fn schedule_timer_instant(
455 &mut self,
456 time: Self::Instant,
457 timer: &mut Self::Timer,
458 ) -> Option<Self::Instant> {
459 timer.scheduled.replace(time)
460 }
461
462 fn cancel_timer(&mut self, timer: &mut Self::Timer) -> Option<Self::Instant> {
463 timer.scheduled.take()
464 }
465
466 fn scheduled_instant(&self, timer: &mut Self::Timer) -> Option<Self::Instant> {
467 timer.scheduled.clone()
468 }
469
470 fn unique_timer_id(&self, _: &Self::Timer) -> Self::UniqueTimerId {
471 unimplemented!()
472 }
473 }
474
475 #[derive(Default, Debug)]
476 struct FakeTimer {
477 scheduled: Option<FakeInstant>,
478 }
479
480 #[derive(Eq, PartialEq, Debug, Ord, PartialOrd, Copy, Clone, Hash)]
481 struct TimerId(usize);
482
483 type LocalTimerHeap = super::LocalTimerHeap<TimerId, (), FakeTimerCtx>;
484
485 impl LocalTimerHeap {
486 #[track_caller]
487 fn assert_heap_entries<I: IntoIterator<Item = (FakeInstant, TimerId)>>(&self, i: I) {
488 let mut want = i.into_iter().collect::<Vec<_>>();
489 want.sort();
490 let mut got = self
491 .heap
492 .heap
493 .iter()
494 .map(|HeapEntry { time, key }| (*time, *key))
495 .collect::<Vec<_>>();
496 got.sort();
497 assert_eq!(got, want);
498 }
499
500 #[track_caller]
501 fn assert_map_entries<I: IntoIterator<Item = (FakeInstant, TimerId)>>(&self, i: I) {
502 let want = i.into_iter().map(|(t, k)| (k, t)).collect::<HashMap<_, _>>();
503 let got = self
504 .heap
505 .map
506 .iter()
507 .map(|(k, MapEntry { time, value: () })| (*k, *time))
508 .collect::<HashMap<_, _>>();
509 assert_eq!(got, want);
510 }
511 }
512
513 const TIMER1: TimerId = TimerId(1);
514 const TIMER2: TimerId = TimerId(2);
515 const TIMER3: TimerId = TimerId(3);
516
517 const T1: FakeInstant = FakeInstant { offset: Duration::from_secs(1) };
518 const T2: FakeInstant = FakeInstant { offset: Duration::from_secs(2) };
519 const T3: FakeInstant = FakeInstant { offset: Duration::from_secs(3) };
520 const T4: FakeInstant = FakeInstant { offset: Duration::from_secs(4) };
521
522 #[test]
523 fn schedule_instant() {
524 let mut ctx = FakeTimerCtx::default();
525 let mut heap = LocalTimerHeap::new(&mut ctx, ());
526 assert_eq!(heap.next_wakeup.scheduled, None);
527 heap.assert_heap_entries([]);
528
529 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
530 heap.assert_heap_entries([(T2, TIMER2)]);
531 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
532
533 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
534 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2)]);
535 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
536
537 assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
538 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
539 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
540 }
541
542 #[test]
543 fn schedule_after() {
544 let mut ctx = FakeTimerCtx::default();
545 let mut heap = LocalTimerHeap::new(&mut ctx, ());
546 assert_eq!(heap.next_wakeup.scheduled, None);
547 let long_duration = Duration::from_secs(5);
548 let short_duration = Duration::from_secs(1);
549
550 let long_instant = ctx.now().checked_add(long_duration).unwrap();
551 let short_instant = ctx.now().checked_add(short_duration).unwrap();
552
553 assert_eq!(heap.schedule_after(&mut ctx, TIMER1, (), long_duration), None);
554 assert_eq!(heap.next_wakeup.scheduled, Some(long_instant));
555 heap.assert_heap_entries([(long_instant, TIMER1)]);
556 heap.assert_map_entries([(long_instant, TIMER1)]);
557
558 assert_eq!(
559 heap.schedule_after(&mut ctx, TIMER1, (), short_duration),
560 Some((long_instant, ()))
561 );
562 assert_eq!(heap.next_wakeup.scheduled, Some(short_instant));
563 heap.assert_heap_entries([(short_instant, TIMER1), (long_instant, TIMER1)]);
564 heap.assert_map_entries([(short_instant, TIMER1)]);
565 }
566
567 #[test]
568 fn cancel() {
569 let mut ctx = FakeTimerCtx::default();
570 let mut heap = LocalTimerHeap::new(&mut ctx, ());
571 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
572 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
573 assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
574 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
575 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
576
577 assert_eq!(heap.cancel(&mut ctx, &TIMER1), Some((T1, ())));
578 heap.assert_heap_entries([(T2, TIMER2), (T3, TIMER3)]);
579 heap.assert_map_entries([(T2, TIMER2), (T3, TIMER3)]);
580 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
581
582 assert_eq!(heap.cancel(&mut ctx, &TIMER1), None);
583
584 assert_eq!(heap.cancel(&mut ctx, &TIMER3), Some((T3, ())));
585 heap.assert_heap_entries([(T2, TIMER2), (T3, TIMER3)]);
587 heap.assert_map_entries([(T2, TIMER2)]);
588 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
589
590 assert_eq!(heap.cancel(&mut ctx, &TIMER2), Some((T2, ())));
591 heap.assert_heap_entries([]);
592 heap.assert_map_entries([]);
593 assert_eq!(heap.next_wakeup.scheduled, None);
594 }
595
596 #[test]
597 fn pop() {
598 let mut ctx = FakeTimerCtx::default();
599 let mut heap = LocalTimerHeap::new(&mut ctx, ());
600 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
601 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
602 assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
603 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
604 heap.assert_map_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
605 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
606
607 assert_eq!(heap.pop(&mut ctx), None);
608 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
609 heap.assert_map_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
610 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
611
612 ctx.instant.time = T1;
613 assert_eq!(heap.pop(&mut ctx), Some((TIMER1, ())));
614 heap.assert_heap_entries([(T2, TIMER2), (T3, TIMER3)]);
615 heap.assert_map_entries([(T2, TIMER2), (T3, TIMER3)]);
616 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
617 assert_eq!(heap.pop(&mut ctx), None);
618 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
619
620 ctx.instant.time = T3;
621 assert_eq!(heap.pop(&mut ctx), Some((TIMER2, ())));
622 heap.assert_heap_entries([(T3, TIMER3)]);
623 heap.assert_map_entries([(T3, TIMER3)]);
624
625 assert_eq!(heap.next_wakeup.scheduled, Some(T3));
626 assert_eq!(heap.pop(&mut ctx), Some((TIMER3, ())));
627 heap.assert_heap_entries([]);
628 heap.assert_map_entries([]);
629 assert_eq!(heap.next_wakeup.scheduled, None);
630
631 assert_eq!(heap.pop(&mut ctx), None);
632 }
633
634 #[test]
635 fn reschedule() {
636 let mut ctx = FakeTimerCtx::default();
637 let mut heap = LocalTimerHeap::new(&mut ctx, ());
638 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
639 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), None);
640 assert_eq!(heap.schedule_instant(&mut ctx, TIMER3, (), T3), None);
641 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
642 heap.assert_map_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
643 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
644
645 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T4), Some((T2, ())));
646 heap.assert_heap_entries([(T1, TIMER1), (T2, TIMER2), (T3, TIMER3)]);
647 heap.assert_map_entries([(T1, TIMER1), (T4, TIMER2), (T3, TIMER3)]);
648
649 ctx.instant.time = T4;
650 assert_eq!(heap.pop(&mut ctx), Some((TIMER1, ())));
652 heap.assert_heap_entries([(T4, TIMER2), (T3, TIMER3)]);
653 heap.assert_map_entries([(T4, TIMER2), (T3, TIMER3)]);
654 assert_eq!(heap.next_wakeup.scheduled, Some(T3));
655
656 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T2), Some((T4, ())));
657 heap.assert_heap_entries([(T2, TIMER2), (T4, TIMER2), (T3, TIMER3)]);
658 heap.assert_map_entries([(T2, TIMER2), (T3, TIMER3)]);
659 assert_eq!(heap.next_wakeup.scheduled, Some(T2));
660
661 assert_eq!(heap.pop(&mut ctx), Some((TIMER2, ())));
662 heap.assert_heap_entries([(T4, TIMER2), (T3, TIMER3)]);
664 heap.assert_map_entries([(T3, TIMER3)]);
665 assert_eq!(heap.next_wakeup.scheduled, Some(T3));
666
667 assert_eq!(heap.pop(&mut ctx), Some((TIMER3, ())));
668 heap.assert_heap_entries([]);
669 heap.assert_map_entries([]);
670 assert_eq!(heap.next_wakeup.scheduled, None);
671 assert_eq!(heap.pop(&mut ctx), None);
672 }
673
674 #[test]
677 fn multiple_timers_same_instant() {
678 let mut ctx = FakeTimerCtx::default();
679 let mut heap = LocalTimerHeap::new(&mut ctx, ());
680 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
681 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T1), None);
682 assert_eq!(heap.next_wakeup.scheduled.take(), Some(T1));
683
684 ctx.instant.time = T1;
685
686 assert!(heap.pop(&mut ctx).is_some());
688 assert_eq!(heap.next_wakeup.scheduled, Some(T1));
689 assert!(heap.pop(&mut ctx).is_some());
690 assert_eq!(heap.next_wakeup.scheduled, None);
691 assert_eq!(heap.pop(&mut ctx), None);
692 }
693
694 #[test]
695 fn clear() {
696 let mut ctx = FakeTimerCtx::default();
697 let mut heap = LocalTimerHeap::new(&mut ctx, ());
698 assert_eq!(heap.schedule_instant(&mut ctx, TIMER1, (), T1), None);
699 assert_eq!(heap.schedule_instant(&mut ctx, TIMER2, (), T1), None);
700 heap.clear(&mut ctx);
701 heap.assert_map_entries([]);
702 assert_eq!(heap.next_wakeup.scheduled, None);
703 }
704}