1use anyhow::{format_err, Error};
22use fuchsia_sync::Mutex;
23use futures::channel::oneshot;
24use futures::future::FusedFuture;
25use futures::task::{Context, Poll};
26use futures::{ready, Future, FutureExt};
27use slab::Slab;
28use std::collections::VecDeque;
29use std::pin::Pin;
30use std::sync::atomic::{AtomicBool, Ordering};
31use std::sync::{Arc, Weak};
32
33type BoxRevokeFn = Box<dyn FnOnce() -> Permit + Send>;
34
35struct RevokeFnHolder {
36 f: Mutex<Option<BoxRevokeFn>>,
37 label: Mutex<String>,
38}
39
40impl RevokeFnHolder {
41 fn new(f: Option<BoxRevokeFn>) -> Arc<Self> {
42 Arc::new(Self { f: Mutex::new(f), label: Mutex::new(String::default()) })
43 }
44
45 fn replace(&self, f: BoxRevokeFn) -> Option<BoxRevokeFn> {
47 self.f.lock().replace(f)
48 }
49
50 fn relabel(&self, label: String) {
52 *(self.label.lock()) = label;
53 }
54
55 fn label(&self) -> String {
56 self.label.lock().clone()
57 }
58
59 fn take(&self) -> Option<BoxRevokeFn> {
60 self.f.lock().take()
61 }
62
63 fn is_revokable(&self) -> bool {
64 self.f.lock().is_some()
65 }
66
67 fn extract(weak: &Weak<Self>) -> BoxRevokeFn {
68 weak.upgrade().expect("should be resolvable").take().expect("revokable fn missing")
69 }
70}
71
72impl std::fmt::Debug for RevokeFnHolder {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("RevokeFnHolder")
75 .field("revokable", &self.is_revokable())
76 .field("label", &self.label())
77 .finish()
78 }
79}
80
81struct WaitingReservation {
82 sender: futures::channel::oneshot::Sender<Permit>,
83}
84
85struct PermitsInner {
86 limit: usize,
88 out: Slab<Weak<RevokeFnHolder>>,
91 waiting: VecDeque<WaitingReservation>,
93 revocations: VecDeque<usize>,
96 weak: Weak<Mutex<Self>>,
97}
98
99impl std::fmt::Debug for PermitsInner {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 let mut debug = f.debug_struct("PermitsInner");
102 let _ = debug.field("limit", &self.limit).field("waiting", &self.waiting.len());
103 for (k, holder) in &self.out {
104 let h = holder.upgrade().unwrap();
105 let holder_str = format!("{}: {}", if h.is_revokable() { "R" } else { "I" }, h.label());
106 let _ = debug.field(format!("permit{k}").as_str(), &holder_str);
107 }
108 debug.finish()
109 }
110}
111
112impl PermitsInner {
113 fn new(limit: usize) -> Arc<Mutex<Self>> {
114 Arc::new_cyclic(|weak| {
115 Mutex::new(Self {
116 limit,
117 out: Slab::with_capacity(limit),
118 waiting: VecDeque::new(),
119 revocations: VecDeque::new(),
120 weak: weak.clone(),
121 })
122 })
123 }
124
125 fn try_get(&mut self, revoke_fn: Option<BoxRevokeFn>) -> Result<Permit, Error> {
128 if self.out.len() == self.out.capacity() {
129 return Err(format_err!("No permits left"));
130 }
131 let is_revokable = revoke_fn.is_some();
132 let fn_holder = RevokeFnHolder::new(revoke_fn);
133 let key = self.out.insert(Arc::downgrade(&fn_holder));
134 if is_revokable {
135 self.revocations.push_back(key);
136 }
137 Ok(Permit {
138 inner: Some(self.weak.upgrade().unwrap()),
139 committed: Arc::new(AtomicBool::new(true)),
140 fn_holder,
141 key,
142 })
143 }
144
145 fn release(&mut self, key: usize) {
151 self.revocations.retain(|k| *k != key);
153 let holder = self.out.get(key).expect("reservation present").upgrade().unwrap();
155 drop(holder.take());
157 let this = self.weak.upgrade().unwrap();
158 while let Some(sender) = self.waiting.pop_front() {
159 if let Ok(()) = Permit::handoff(sender, this.clone(), holder.clone(), key) {
160 return;
161 }
162 }
163 drop(self.out.remove(key));
165 }
166
167 fn reservation(&mut self, revoke_fn: Option<BoxRevokeFn>) -> Reservation {
169 let (sender, receiver) = oneshot::channel();
170 match self.try_get(None).ok() {
172 Some(permit) => sender.send(permit).ok().unwrap(),
173 None => self.waiting.push_back(WaitingReservation { sender }),
174 }
175 Reservation { receiver, revoke_fn, inner: self.weak.clone() }
176 }
177
178 fn make_revokable(&mut self, key: usize, revoke_fn: BoxRevokeFn) {
180 let prev = self
181 .out
182 .get(key)
183 .expect("reservation should be out")
184 .upgrade()
185 .expect("holder should resolve")
186 .replace(revoke_fn);
187 assert!(prev.is_none(), "shouldn't be replacing a previous revocation function");
188 self.revocations.push_back(key);
189 }
190
191 fn pop_revoke(&mut self) -> Option<BoxRevokeFn> {
193 self.revocations.pop_front().map(|idx| RevokeFnHolder::extract(&self.out[idx]))
194 }
195
196 fn revoke_all(&mut self) -> Vec<BoxRevokeFn> {
198 let mut indices = std::mem::take(&mut self.revocations);
199 indices.drain(..).map(|idx| RevokeFnHolder::extract(&self.out[idx])).collect()
200 }
201}
202
203pub struct Reservation {
205 receiver: oneshot::Receiver<Permit>,
207 revoke_fn: Option<BoxRevokeFn>,
209 inner: Weak<Mutex<PermitsInner>>,
211}
212
213impl Future for Reservation {
214 type Output = Permit;
215 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216 let res = ready!(self.receiver.poll_unpin(cx));
217 let permit = res.expect("sender shouldn't be dropped, polled after termination?");
218 if let (Some(f), Some(inner)) = (self.revoke_fn.take(), self.inner.upgrade()) {
219 inner.lock().make_revokable(permit.key, f);
220 }
221 Poll::Ready(permit)
222 }
223}
224
225impl FusedFuture for Reservation {
226 fn is_terminated(&self) -> bool {
227 self.receiver.is_terminated()
228 }
229}
230
231#[derive(Debug, Clone)]
232pub struct Permits {
233 inner: Arc<Mutex<PermitsInner>>,
234 limit: usize,
235}
236
237impl Permits {
238 pub fn new(limit: usize) -> Self {
240 Self { inner: PermitsInner::new(limit), limit }
241 }
242
243 pub fn limit(&self) -> usize {
245 self.limit
246 }
247
248 pub fn get(&self) -> Option<Permit> {
250 Permit::try_issue(self.inner.clone(), None)
251 }
252
253 pub fn get_revokable(
257 &self,
258 revoked_fn: impl FnOnce() -> Permit + 'static + Send,
259 ) -> Option<Permit> {
260 Permit::try_issue(self.inner.clone(), Some(Box::new(revoked_fn)))
261 }
262
263 pub fn take(&self) -> Option<Permit> {
267 if let Some(permit) = self.get() {
268 return Some(permit);
269 }
270 let revoke_fn = self.inner.lock().pop_revoke();
271 revoke_fn.map(|f| f())
272 }
273
274 pub fn seize(&self) -> Vec<Permit> {
277 let mut bunch = Vec::new();
278 let mut revoke_fns = {
279 let mut lock = self.inner.lock();
280 loop {
282 match lock.try_get(None).ok() {
283 Some(permit) => bunch.push(permit),
284 None => break,
285 }
286 }
287 lock.revoke_all()
288 };
289 for f in revoke_fns.drain(..) {
290 bunch.push(f())
291 }
292 bunch
293 }
294
295 pub fn reserve(&self) -> Reservation {
299 self.inner.lock().reservation(None)
300 }
301
302 pub fn reserve_revokable(
307 &self,
308 revoked_fn: impl FnOnce() -> Permit + 'static + Send,
309 ) -> Reservation {
310 self.inner.lock().reservation(Some(Box::new(revoked_fn)))
311 }
312}
313
314#[derive(Debug)]
315pub struct Permit {
316 inner: Option<Arc<Mutex<PermitsInner>>>,
318 committed: Arc<AtomicBool>,
319 fn_holder: Arc<RevokeFnHolder>,
320 key: usize,
321}
322
323impl Permit {
324 pub fn relabel(&self, new_label: String) {
326 self.fn_holder.relabel(new_label);
327 }
328
329 fn try_issue(inner: Arc<Mutex<PermitsInner>>, revoke_fn: Option<BoxRevokeFn>) -> Option<Self> {
332 inner.lock().try_get(revoke_fn).ok()
333 }
334
335 fn handoff(
339 waiting: WaitingReservation,
340 inner: Arc<Mutex<PermitsInner>>,
341 fn_holder: Arc<RevokeFnHolder>,
342 key: usize,
343 ) -> Result<(), Error> {
344 let committed = Arc::new(AtomicBool::new(false));
345 let commit_clone = committed.clone();
346 let potential = Self { inner: Some(inner), committed, key, fn_holder };
347 match waiting.sender.send(potential) {
348 Ok(()) => {
349 commit_clone.store(true, Ordering::Relaxed);
350 Ok(())
351 }
352 Err(_) => Err(format_err!("failed to handoff")),
353 }
354 }
355}
356
357impl Drop for Permit {
358 fn drop(&mut self) {
359 let inner = match self.inner.take() {
360 None => return, Some(inner) => inner,
362 };
363 let committed = self.committed.load(Ordering::Relaxed);
364 if committed {
365 inner.lock().release(self.key);
366 }
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use super::*;
373
374 use async_utils::PollExt;
375 use fuchsia_async as fasync;
376
377 #[track_caller]
378 fn expect_none<T>(opt: Option<T>, msg: &str) {
379 if let Some(_) = opt {
380 panic!("{}", msg);
381 }
382 }
383
384 #[track_caller]
385 fn expect_no_permits(exec: &mut fasync::TestExecutor, reservation: &mut Reservation) {
386 exec.run_until_stalled(reservation)
387 .expect_pending("expected reservation to have no permits");
388 }
389
390 #[track_caller]
391 fn expect_permit_available(
392 exec: &mut fasync::TestExecutor,
393 reservation: &mut Reservation,
394 ) -> Permit {
395 exec.run_until_stalled(reservation).expect("reservation to have available permit")
396 }
397
398 #[test]
399 fn no_permits_available() {
400 let permits = Permits::new(0);
402
403 expect_none(permits.get(), "shouldn't be able to get a permit");
404
405 let _reservation = permits.reserve();
407 }
408
409 #[test]
410 fn permit_dropping() {
411 let permits = Permits::new(2);
413
414 assert_eq!(2, permits.limit());
415
416 let one = permits.get().expect("first permit");
417 let two = permits.get().expect("second permit");
418 expect_none(permits.get(), "shouln't get a third permit");
419
420 drop(two);
421
422 let three = permits.get().expect("third permit");
423 drop(one);
424 let four = permits.get().expect("fourth permit");
425
426 drop(three);
427 drop(four);
428
429 let _five = permits.get().expect("fifth permit");
430 }
431
432 #[test]
433 fn permit_reservations() {
434 let mut exec = fasync::TestExecutor::new();
435
436 let permits = Permits::new(2);
438
439 let one = permits.get().expect("permit one should be available");
440 let two = permits.get().expect("second permit is also okay");
441 expect_none(permits.get(), "can't get a third item");
442
443 let mut first = permits.reserve();
444 let second = permits.reserve();
445 let mut third = permits.reserve();
446 let mut fourth = permits.reserve();
447
448 drop(second);
450
451 expect_no_permits(&mut exec, &mut first);
452 expect_no_permits(&mut exec, &mut third);
453 expect_no_permits(&mut exec, &mut fourth);
454
455 drop(one);
456
457 let first_out = expect_permit_available(&mut exec, &mut first);
458 expect_no_permits(&mut exec, &mut third);
459 expect_no_permits(&mut exec, &mut fourth);
460
461 drop(first_out);
462
463 let third_out = expect_permit_available(&mut exec, &mut third);
464 expect_no_permits(&mut exec, &mut fourth);
465
466 drop(fourth);
467
468 drop(two);
469
470 let mut fifth = permits.reserve();
472
473 expect_none(permits.get(), "no items should be available");
477
478 let sixth = permits.reserve();
480 let mut seventh = permits.reserve();
481 let _eighth = permits.reserve();
482
483 drop(permits);
486
487 let _fifth_out = expect_permit_available(&mut exec, &mut fifth);
488
489 drop(third_out);
490
491 drop(sixth);
492
493 let _seventh_out = expect_permit_available(&mut exec, &mut seventh);
494 }
495
496 #[test]
497 fn revoke_permits() {
498 const TOTAL_PERMITS: usize = 2;
499 let permits = Permits::new(TOTAL_PERMITS);
500
501 let permit_holder = Arc::new(Mutex::new(None));
502
503 let revoke_from_holder_fn = {
504 let holder = permit_holder.clone();
505 move || holder.lock().take().expect("should be holding Permit")
506 };
507
508 let revokable_permit =
509 permits.get_revokable(revoke_from_holder_fn.clone()).expect("permit available");
510 *permit_holder.lock() = Some(revokable_permit);
511
512 let seized_permits = permits.seize();
513
514 assert_eq!(TOTAL_PERMITS, seized_permits.len());
516 assert!(permit_holder.lock().is_none());
518
519 drop(seized_permits);
521
522 let _nonrevokable_permit = permits.take().expect("permit available");
524 let revokable_permit =
525 permits.get_revokable(revoke_from_holder_fn.clone()).expect("two permits");
526 *permit_holder.lock() = Some(revokable_permit);
527
528 let seized_permits = permits.seize();
530 assert_eq!(1, seized_permits.len());
531 assert!(permit_holder.lock().is_none());
533
534 drop(seized_permits);
535
536 let revokable_permit = permits.get_revokable(revoke_from_holder_fn).expect("permit");
537 *permit_holder.lock() = Some(revokable_permit);
538
539 let _taken = permits.take().expect("can take the permit");
541 assert!(permit_holder.lock().is_none());
542
543 assert!(permits.take().is_none());
545 }
546
547 #[test]
548 fn revokable_dropped_before_revokation() {
549 const TOTAL_PERMITS: usize = 2;
550 let permits = Permits::new(TOTAL_PERMITS);
551
552 let permit_holder = Arc::new(Mutex::new(None));
553
554 let revoke_from_holder_fn = {
555 let holder = permit_holder.clone();
556 move || holder.lock().take().expect("should be holding Permit when revoked")
557 };
558
559 let revokable_permit =
560 permits.get_revokable(revoke_from_holder_fn).expect("permit available");
561 drop(revokable_permit);
563
564 let seized_permits = permits.seize();
565 assert_eq!(TOTAL_PERMITS, seized_permits.len());
567 }
568
569 fn revoke_then_reserve_again(
574 permits: Permits,
575 holder: Arc<Mutex<Vec<Permit>>>,
576 reservations: Arc<Mutex<Vec<Reservation>>>,
577 ) -> Permit {
578 let permit = holder.lock().pop().expect("should have a permit");
579 let recurse_fn = {
580 let permits = permits.clone();
581 let reservations = reservations.clone();
582 move || revoke_then_reserve_again(permits, holder, reservations)
583 };
584 let reservation = permits.reserve_revokable(recurse_fn);
585 reservations.lock().push(reservation);
586 permit
587 }
588
589 #[fuchsia::test]
590 fn revokable_reservations() {
591 let mut exec = fasync::TestExecutor::new();
592 const TOTAL_PERMITS: usize = 2;
593 let permits = Permits::new(TOTAL_PERMITS);
594
595 let permits_holder = Arc::new(Mutex::new(Vec::new()));
596 let reservations_holder = Arc::new(Mutex::new(Vec::new()));
597
598 let revoke_from_holder_fn = {
599 let holder = permits_holder.clone();
600 move || holder.lock().pop().expect("should have a Permit")
601 };
602
603 let revokable =
604 permits.get_revokable(revoke_from_holder_fn.clone()).expect("got revokable");
605 permits_holder.lock().push(revokable);
606
607 let revoke_then_reserve_fn = {
608 let permits = permits.clone();
609 let holder = permits_holder.clone();
610 let reservations = reservations_holder.clone();
611 move || revoke_then_reserve_again(permits, holder, reservations)
612 };
613
614 let mut revokable_reservation = permits.reserve_revokable(revoke_then_reserve_fn);
615 let revokable_permit = expect_permit_available(&mut exec, &mut revokable_reservation);
616 permits_holder.lock().push(revokable_permit);
617
618 let seized_permits = permits.seize();
619 assert_eq!(TOTAL_PERMITS, seized_permits.len());
621 assert_eq!(0, permits_holder.lock().len());
622
623 let mut another_reservation = reservations_holder.lock().pop().expect("reservation");
625 let mut revokable_reservation_two = permits.reserve_revokable(revoke_from_holder_fn);
627
628 drop(seized_permits);
630
631 let revokable_permit = expect_permit_available(&mut exec, &mut another_reservation);
632 permits_holder.lock().push(revokable_permit);
633 let revokable_permit = expect_permit_available(&mut exec, &mut revokable_reservation_two);
634 permits_holder.lock().push(revokable_permit);
635
636 let seized_permits = permits.seize();
638 assert_eq!(TOTAL_PERMITS, seized_permits.len());
640 assert_eq!(0, permits_holder.lock().len());
641 let mut yet_another = reservations_holder.lock().pop().expect("reservation");
643 expect_no_permits(&mut exec, &mut yet_another);
644
645 drop(seized_permits);
647
648 let one = permits.get().expect("one is available");
650
651 expect_none(permits.get(), "none should be available");
653
654 let revokable_permit = expect_permit_available(&mut exec, &mut yet_another);
655 permits_holder.lock().push(revokable_permit);
656
657 let seized_permits = permits.seize();
659 assert_eq!(1, seized_permits.len());
660 assert_eq!(0, permits_holder.lock().len());
661
662 let mut yet_another = reservations_holder.lock().pop().expect("reservation");
664 expect_no_permits(&mut exec, &mut yet_another);
665
666 drop(one);
668
669 let revokable_permit = expect_permit_available(&mut exec, &mut yet_another);
670 permits_holder.lock().push(revokable_permit);
671
672 let taken_permit = permits.take().expect("should be able to take one");
674
675 drop(taken_permit);
677 permits_holder.lock().clear();
681 drop(permits_holder);
682 reservations_holder.lock().clear();
684 drop(reservations_holder);
685 drop(permits);
686 }
687}