1use fuchsia_sync::Mutex;
8use futures::future::{FusedFuture, Future};
9use slab::Slab;
10use std::fmt;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll, Waker};
14
15const NULL_WAKER_KEY: usize = usize::max_value();
16
17#[derive(Clone)]
21pub struct Event {
22 inner: Arc<EventSignaler>,
23}
24
25impl Event {
26 pub fn new() -> Self {
28 Self {
29 inner: Arc::new(EventSignaler {
30 inner: Arc::new(Mutex::new(EventState {
31 state: State::Waiting,
32 wakers: Slab::new(),
33 })),
34 }),
35 }
36 }
37
38 pub fn signal(&self) -> bool {
43 self.inner.set(State::Signaled)
44 }
45
46 pub fn signaled(&self) -> bool {
48 self.inner.inner.lock().state == State::Signaled
49 }
50
51 pub fn wait(&self) -> EventWait {
55 EventWait { inner: self.wait_or_dropped() }
56 }
57
58 pub fn wait_or_dropped(&self) -> EventWaitResult {
63 EventWaitResult {
64 inner: (*self.inner).inner.clone(),
65 waker_key: NULL_WAKER_KEY,
66 terminated: false,
67 }
68 }
69}
70
71impl fmt::Debug for Event {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 write!(f, "Event {{ state: {:?} }}", self.inner.inner.lock().state)
74 }
75}
76
77#[derive(Copy, Clone, PartialEq, Eq, Debug)]
79enum State {
80 Waiting,
82 Signaled,
84 Dropped,
87}
88
89struct EventState {
91 pub state: State,
92 pub wakers: Slab<Waker>,
93}
94
95struct EventSignaler {
99 inner: Arc<Mutex<EventState>>,
100}
101
102impl EventSignaler {
103 fn set(&self, state: State) -> bool {
106 assert!(state != State::Waiting, "Cannot reset the state to Waiting");
107 let mut guard = self.inner.lock();
108 if let State::Signaled = guard.state {
109 if !std::thread::panicking() {
111 assert!(
112 guard.wakers.is_empty(),
113 "If there are wakers, a race condition is present"
114 );
115 }
116 false
117 } else {
118 let mut wakers = std::mem::replace(&mut guard.wakers, Slab::new());
119 guard.state = state;
120 drop(guard);
121 for waker in wakers.drain() {
122 waker.wake();
123 }
124 true
125 }
126 }
127}
128
129impl Drop for EventSignaler {
130 fn drop(&mut self) {
131 let _: bool = self.set(State::Dropped);
134 }
135}
136
137#[must_use = "futures do nothing unless polled"]
139pub struct EventWaitResult {
140 inner: Arc<Mutex<EventState>>,
141 waker_key: usize,
142 terminated: bool,
143}
144
145impl Future for EventWaitResult {
146 type Output = Result<(), Dropped>;
147
148 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
149 let this = self.get_mut();
152 let mut guard = this.inner.lock();
153
154 match guard.state {
155 State::Waiting => {
156 let mut new_key = None;
157 if this.waker_key == NULL_WAKER_KEY || !guard.wakers.contains(this.waker_key) {
158 new_key = Some(guard.wakers.insert(cx.waker().clone()));
159 } else {
160 guard.wakers[this.waker_key] = cx.waker().clone();
161 }
162
163 if let Some(key) = new_key {
164 this.waker_key = key;
165 }
166
167 Poll::Pending
168 }
169 State::Signaled => {
170 this.terminated = true;
171 this.waker_key = NULL_WAKER_KEY;
172 Poll::Ready(Ok(()))
173 }
174 State::Dropped => {
175 this.terminated = true;
176 this.waker_key = NULL_WAKER_KEY;
177 Poll::Ready(Err(Dropped))
178 }
179 }
180 }
181}
182
183impl FusedFuture for EventWaitResult {
184 fn is_terminated(&self) -> bool {
185 self.terminated
186 }
187}
188
189impl Unpin for EventWaitResult {}
190
191impl Drop for EventWaitResult {
192 fn drop(&mut self) {
193 if self.waker_key != NULL_WAKER_KEY {
194 let mut guard = self.inner.lock();
196 if guard.wakers.contains(self.waker_key) {
197 let _ = guard.wakers.remove(self.waker_key);
198 }
199 }
200 }
201}
202
203#[must_use = "futures do nothing unless polled"]
206pub struct EventWait {
207 inner: EventWaitResult,
208}
209
210impl Future for EventWait {
211 type Output = ();
212
213 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214 match Pin::new(&mut self.inner).poll(cx) {
215 Poll::Ready(Ok(())) => Poll::Ready(()),
216 _ => Poll::Pending,
217 }
218 }
219}
220
221impl FusedFuture for EventWait {
222 fn is_terminated(&self) -> bool {
223 self.inner.is_terminated()
224 }
225}
226
227impl Unpin for EventWait {}
228
229#[derive(Debug, Eq, PartialEq, Clone, Copy)]
231pub struct Dropped;
232
233impl fmt::Display for Dropped {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 write!(f, "event dropped")
236 }
237}
238
239impl std::error::Error for Dropped {}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use fuchsia_async as fasync;
245
246 #[test]
249 fn signaled_method_respects_signaling() {
250 let event = Event::new();
251 let event_clone = event.clone();
252
253 assert!(!event.signaled());
254 assert!(!event_clone.signaled());
255
256 assert!(event.signal());
257
258 assert!(event.signaled());
259 assert!(event_clone.signaled());
260 }
261
262 #[test]
263 fn unsignaled_event_is_pending() {
264 let mut ex = fasync::TestExecutor::new();
265
266 let event = Event::new();
267 let mut wait = event.wait();
268 let mut wait_or_dropped = event.wait_or_dropped();
269 assert!(ex.run_until_stalled(&mut wait).is_pending());
270 assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
271 }
272
273 #[test]
274 fn signaled_event_is_ready() {
275 let mut ex = fasync::TestExecutor::new();
276
277 let event = Event::new();
278 let mut wait = event.wait();
279 let mut wait_or_dropped = event.wait_or_dropped();
280 assert!(event.signal());
281 assert!(ex.run_until_stalled(&mut wait).is_ready());
282 assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
283 }
284
285 #[test]
286 fn event_is_ready_and_wakes_after_stalled() {
287 let mut ex = fasync::TestExecutor::new();
288
289 let event = Event::new();
290 let mut wait = event.wait();
291 let mut wait_or_dropped = event.wait_or_dropped();
292 assert!(ex.run_until_stalled(&mut wait).is_pending());
293 assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
294 assert!(event.signal());
295 assert!(ex.run_until_stalled(&mut wait).is_ready());
296 assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
297 }
298
299 #[test]
300 fn signaling_event_registers_and_wakes_multiple_waiters_properly() {
301 let mut ex = fasync::TestExecutor::new();
302
303 let event = Event::new();
304 let mut wait_1 = event.wait();
305 let mut wait_2 = event.wait();
306 let mut wait_3 = event.wait();
307
308 assert!(ex.run_until_stalled(&mut wait_1).is_pending());
310 assert!(ex.run_until_stalled(&mut wait_2).is_pending());
311
312 assert!(event.signal());
313
314 assert!(ex.run_until_stalled(&mut wait_1).is_ready());
316 assert!(ex.run_until_stalled(&mut wait_2).is_ready());
317 assert!(ex.run_until_stalled(&mut wait_3).is_ready());
318 }
319
320 #[test]
321 fn event_is_terminated_after_complete() {
322 let mut ex = fasync::TestExecutor::new();
323
324 let event = Event::new();
325 let mut wait = event.wait();
326 let mut wait_or_dropped = event.wait_or_dropped();
327 assert!(ex.run_until_stalled(&mut wait).is_pending());
328 assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
329 assert!(!wait.is_terminated());
330 assert!(!wait_or_dropped.is_terminated());
331 assert!(event.signal());
332 assert!(ex.run_until_stalled(&mut wait).is_ready());
333 assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
334 assert!(wait.is_terminated());
335 assert!(wait_or_dropped.is_terminated());
336 }
337
338 #[test]
339 fn waiter_drops_gracefully() {
340 let mut ex = fasync::TestExecutor::new();
341
342 let event = Event::new();
343 let mut wait = event.wait();
344 let mut wait_or_dropped = event.wait();
345 assert!(ex.run_until_stalled(&mut wait).is_pending());
346 assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
347 assert!(!wait.is_terminated());
348 assert!(!wait_or_dropped.is_terminated());
349 drop(wait);
350 drop(wait_or_dropped);
351 assert!(event.signal());
352 }
353
354 #[test]
355 fn waiter_completes_after_all_events_drop() {
356 let mut ex = fasync::TestExecutor::new();
357
358 let event = Event::new();
359 let event_clone = Event::new();
360 let mut wait = event.wait();
361 let mut wait_or_dropped = event.wait_or_dropped();
362 assert!(ex.run_until_stalled(&mut wait).is_pending());
363 assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
364 assert!(!wait.is_terminated());
365 assert!(!wait_or_dropped.is_terminated());
366 drop(event);
367 drop(event_clone);
368 assert!(ex.run_until_stalled(&mut wait).is_pending());
369 assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
370 }
371
372 #[test]
373 fn drop_receiver_after_poll_without_event_signal() {
374 let mut exec = fasync::TestExecutor::new();
375 let event = Event::new();
376 let mut waiter = event.wait_or_dropped();
377 assert!(exec.run_until_stalled(&mut waiter).is_pending());
378 drop(event);
379 drop(waiter);
380 }
381
382 #[test]
383 fn drop_receiver_after_event_signal_without_repoll() {
384 let mut exec = fasync::TestExecutor::new();
385 let event = Event::new();
386 let mut waiter = event.wait_or_dropped();
387 assert_eq!(event.inner.inner.lock().wakers.len(), 0);
388
389 assert!(exec.run_until_stalled(&mut waiter).is_pending());
391 assert_eq!(event.inner.inner.lock().wakers.len(), 1);
392
393 assert!(event.signal());
395 assert_eq!(event.inner.inner.lock().wakers.len(), 0);
396
397 drop(waiter);
399 }
400}