fuchsia_bluetooth/expectation/
asynchronous.rs1#![cfg(target_os = "fuchsia")]
30use anyhow::{format_err, Error};
31use fuchsia_async::{DurationExt, TimeoutExt};
32use fuchsia_sync::{MappedRwLockWriteGuard, RwLock, RwLockWriteGuard};
33use futures::future::BoxFuture;
34use futures::FutureExt;
35use slab::Slab;
36use std::future::Future;
37use std::pin::Pin;
38use std::sync::Arc;
39use std::task;
40use std::task::Poll;
41
42use crate::expectation::Predicate;
43
44#[must_use = "futures do nothing unless polled"]
47pub struct ExpectationFuture<T: ExpectableState> {
48 state: T,
49 expectation: Predicate<T::State>,
50 waker_key: Option<usize>,
51}
52
53impl<T: ExpectableState> ExpectationFuture<T> {
54 fn new(state: T, expectation: Predicate<T::State>) -> ExpectationFuture<T> {
55 ExpectationFuture { state, expectation, waker_key: None }
56 }
57
58 fn clear_waker(&mut self) {
59 if let Some(key) = self.waker_key {
60 self.state.remove_task(key);
61 self.waker_key = None;
62 }
63 }
64
65 fn store_task(&mut self, cx: &mut task::Context<'_>) {
66 let key = self.state.store_task(cx);
67 self.waker_key = Some(key);
68 }
69}
70
71impl<T: ExpectableState> std::marker::Unpin for ExpectationFuture<T> {}
72
73impl<T: ExpectableState> Future for ExpectationFuture<T> {
74 type Output = T::State;
75
76 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
77 self.clear_waker();
78 let state = self.state.read();
79 if self.expectation.satisfied(&state) {
80 Poll::Ready(state)
81 } else {
82 self.store_task(cx);
83 Poll::Pending
84 }
85 }
86}
87
88pub trait ExpectableState: Clone {
98 type State: 'static;
100
101 fn store_task(&mut self, cx: &mut task::Context<'_>) -> usize;
103
104 fn remove_task(&mut self, key: usize);
107
108 fn notify_state_changed(&self);
110
111 fn read(&self) -> Self::State;
113}
114
115pub trait ExpectableStateExt: ExpectableState + Sized {
116 fn when_satisfied(
124 &self,
125 expectation: Predicate<Self::State>,
126 timeout: zx::MonotonicDuration,
127 ) -> BoxFuture<'_, Result<Self::State, Error>>;
128}
129
130impl<T: ExpectableState + Sized> ExpectableStateExt for T
131where
132 T: Send + Sync + 'static,
133 T::State: Send + Sync + 'static,
134{
135 fn when_satisfied(
136 &self,
137 expectation: Predicate<T::State>,
138 timeout: zx::MonotonicDuration,
139 ) -> BoxFuture<'_, Result<Self::State, Error>> {
140 let state = self.clone();
141 let exp = expectation.clone();
142 ExpectationFuture::new(self.clone(), expectation)
143 .map(|s| Ok(s))
144 .on_timeout(timeout.after_now(), move || {
145 let state = state.read();
146 let result = exp.assert_satisfied(&state);
147 result.map(|_| state).map_err(|err| {
148 format_err!("Timed out waiting for expectation, last result:\n{:?}", err)
149 })
150 })
151 .boxed()
152 }
153}
154
155pub struct ExpectableInner<S, A> {
157 pub state: S,
159
160 tasks: Slab<task::Waker>,
162
163 pub aux: A,
165}
166
167pub type Expectable<S, A> = Arc<RwLock<ExpectableInner<S, A>>>;
170
171pub fn expectable<S, A>(state: S, aux: A) -> Expectable<S, A> {
172 Arc::new(RwLock::new(ExpectableInner { state, tasks: Slab::new(), aux }))
173}
174
175impl<S: Clone + 'static, A> ExpectableState for Expectable<S, A> {
176 type State = S;
177
178 fn store_task(&mut self, cx: &mut task::Context<'_>) -> usize {
180 self.write().tasks.insert(cx.waker().clone())
181 }
182
183 fn remove_task(&mut self, key: usize) {
185 let mut harness = self.write();
186 if harness.tasks.contains(key) {
187 drop(harness.tasks.remove(key));
188 }
189 }
190
191 fn notify_state_changed(&self) {
193 for task in &RwLock::read(self).tasks {
194 task.1.wake_by_ref();
195 }
196 self.write().tasks.clear()
197 }
198
199 fn read(&self) -> Self::State {
200 RwLock::read(self).state.clone()
201 }
202}
203
204pub trait ExpectableExt<S, A> {
206 fn aux(&self) -> MappedRwLockWriteGuard<'_, A>;
208
209 fn write_state(&self) -> MappedRwLockWriteGuard<'_, S>;
211}
212
213impl<S, A> ExpectableExt<S, A> for Expectable<S, A> {
216 fn aux(&self) -> MappedRwLockWriteGuard<'_, A> {
217 RwLockWriteGuard::map(self.write(), |harness| &mut harness.aux)
218 }
219
220 fn write_state(&self) -> MappedRwLockWriteGuard<'_, S> {
221 RwLockWriteGuard::map(self.write(), |harness| &mut harness.state)
222 }
223}