async_utils/
mutex_ticket.rs1use futures::lock::{Mutex, MutexGuard, MutexLockFuture};
6use futures::prelude::*;
7use std::task::{Context, Poll};
8
9#[derive(Debug)]
31pub struct MutexTicket<'a, T> {
32 mutex: &'a Mutex<T>,
33 lock: Option<MutexLockFuture<'a, T>>,
34}
35
36impl<'a, T> MutexTicket<'a, T> {
37 pub fn new(mutex: &'a Mutex<T>) -> MutexTicket<'a, T> {
39 MutexTicket { mutex, lock: None }
40 }
41
42 pub fn poll(&mut self, ctx: &mut Context<'_>) -> Poll<MutexGuard<'a, T>> {
47 let mut lock_fut = match self.lock.take() {
48 None => self.mutex.lock(),
49 Some(lock_fut) => lock_fut,
50 };
51 match lock_fut.poll_unpin(ctx) {
52 Poll::Pending => {
53 self.lock = Some(lock_fut);
54 Poll::Pending
55 }
56 Poll::Ready(g) => Poll::Ready(g),
57 }
58 }
59
60 pub async fn lock(&mut self) -> MutexGuard<'a, T> {
64 match self.lock.take() {
65 None => self.mutex.lock(),
66 Some(lock_fut) => lock_fut,
67 }
68 .await
69 }
70}
71
72#[cfg(test)]
73mod tests {
74
75 use super::MutexTicket;
76 use anyhow::{format_err, Error};
77 use assert_matches::assert_matches;
78 use fuchsia_async::Timer;
79 use futures::channel::oneshot;
80 use futures::future::{poll_fn, try_join};
81 use futures::lock::Mutex;
82 use futures::task::noop_waker_ref;
83 use std::task::{Context, Poll};
84 use std::time::Duration;
85
86 #[fuchsia_async::run_singlethreaded(test)]
87 async fn basics(run: usize) {
88 let mutex = Mutex::new(run);
89 let mut ctx = Context::from_waker(noop_waker_ref());
90 let mut poll_mutex = MutexTicket::new(&mutex);
91 assert_matches!(poll_mutex.poll(&mut ctx), Poll::Ready(_));
92 assert_matches!(poll_mutex.poll(&mut ctx), Poll::Ready(_));
93 assert_matches!(poll_mutex.poll(&mut ctx), Poll::Ready(_));
94 let mutex_guard = mutex.lock().await;
95 assert_matches!(poll_mutex.poll(&mut ctx), Poll::Pending);
96 assert_matches!(poll_mutex.poll(&mut ctx), Poll::Pending);
97 drop(mutex_guard);
98 assert_matches!(poll_mutex.poll(&mut ctx), Poll::Ready(_));
99 }
100
101 #[fuchsia_async::run_singlethreaded(test)]
102 async fn wakes_up(run: usize) -> Result<(), Error> {
103 let mutex = Mutex::new(run);
104 let (tx_saw_first_pending, rx_saw_first_pending) = oneshot::channel();
105 let mut poll_mutex = MutexTicket::new(&mutex);
106 let mutex_guard = mutex.lock().await;
107 try_join(
108 async move {
109 assert_matches!(
110 poll_mutex.poll(&mut Context::from_waker(noop_waker_ref())),
111 Poll::Pending
112 );
113 tx_saw_first_pending.send(()).map_err(|_| format_err!("cancelled"))?;
114 assert_eq!(*poll_fn(|ctx| poll_mutex.poll(ctx)).await, run);
115 Ok(())
116 },
117 async move {
118 rx_saw_first_pending.await?;
119 Timer::new(Duration::from_millis(300)).await;
120 drop(mutex_guard);
121 Ok(())
122 },
123 )
124 .await
125 .map(|_| ())
126 }
127}