async_utils/
event.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! An event that can be signaled and waited on by multiple consumers.
6
7use fuchsia_sync::Mutex;
8use futures::future::{FusedFuture, Future};
9use slab::Slab;
10use std::fmt;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll, Waker};
14
15const NULL_WAKER_KEY: usize = usize::max_value();
16
17/// An `Event` is a clonable object that can be signaled once. Calls to `.wait()` produce a future,
18/// `EventWait`, that can wait on that signal. Once the `Event` has been signaled, all futures will
19/// complete immediately.
20#[derive(Clone)]
21pub struct Event {
22    inner: Arc<EventSignaler>,
23}
24
25impl Event {
26    /// Create a new `Event` that has not yet been signaled.
27    pub fn new() -> Self {
28        Self {
29            inner: Arc::new(EventSignaler {
30                inner: Arc::new(Mutex::new(EventState {
31                    state: State::Waiting,
32                    wakers: Slab::new(),
33                })),
34            }),
35        }
36    }
37
38    /// Signal the `Event`. Once this is done, it cannot be undone. Any tasks waiting on this
39    /// `Event` will be notified and its `Future` implementation will complete.
40    ///
41    /// Returns true if this `Event` was the one that performed the signal operation.
42    pub fn signal(&self) -> bool {
43        self.inner.set(State::Signaled)
44    }
45
46    /// Return true if `Event::signal` has already been called.
47    pub fn signaled(&self) -> bool {
48        self.inner.inner.lock().state == State::Signaled
49    }
50
51    /// Create a new `EventWait` future that will complete after this event has been signaled.
52    /// If all signalers are dropped, this future will continue to return `Poll::Pending`. To be
53    /// notified when all signalers are dropped without signaling, use `wait_or_dropped`.
54    pub fn wait(&self) -> EventWait {
55        EventWait { inner: self.wait_or_dropped() }
56    }
57
58    /// Create a new `EventWaitResult` future that will complete after this event has been
59    /// signaled or all `Event` clones have been dropped.
60    ///
61    /// This future will output a `Result<(), Dropped>` to indicate what has occurred.
62    pub fn wait_or_dropped(&self) -> EventWaitResult {
63        EventWaitResult {
64            inner: (*self.inner).inner.clone(),
65            waker_key: NULL_WAKER_KEY,
66            terminated: false,
67        }
68    }
69}
70
71impl fmt::Debug for Event {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        write!(f, "Event {{ state: {:?} }}", self.inner.inner.lock().state)
74    }
75}
76
77/// `Event` state tracking enum.
78#[derive(Copy, Clone, PartialEq, Eq, Debug)]
79enum State {
80    /// The `Event` has not yet been signaled. This is the initial state of an `Event`.
81    Waiting,
82    /// The `signal` method has been called on an `Event`.
83    Signaled,
84    /// All clones of an `Event` have been dropped without the `signal` method being called. An
85    /// `Event` can never move out of the `Dropped` state.
86    Dropped,
87}
88
89/// Tracks state shared by all Event clones and futures.
90struct EventState {
91    pub state: State,
92    pub wakers: Slab<Waker>,
93}
94
95/// A handle shared between all `Event` structs for a given event. Once all `Event`s are dropped,
96/// this will be dropped and will notify the `EventState` that it is unreachable by any signalers
97/// and will never be signaled if it hasn't been already.
98struct EventSignaler {
99    inner: Arc<Mutex<EventState>>,
100}
101
102impl EventSignaler {
103    /// Internal function to set the self.inner.state value if it has not already been set to
104    /// `State::Signaled`. Returns true if this function call changed the value of self.inner.state.
105    fn set(&self, state: State) -> bool {
106        assert!(state != State::Waiting, "Cannot reset the state to Waiting");
107        let mut guard = self.inner.lock();
108        if let State::Signaled = guard.state {
109            // Avoid double panicking.
110            if !std::thread::panicking() {
111                assert!(
112                    guard.wakers.is_empty(),
113                    "If there are wakers, a race condition is present"
114                );
115            }
116            false
117        } else {
118            let mut wakers = std::mem::replace(&mut guard.wakers, Slab::new());
119            guard.state = state;
120            drop(guard);
121            for waker in wakers.drain() {
122                waker.wake();
123            }
124            true
125        }
126    }
127}
128
129impl Drop for EventSignaler {
130    fn drop(&mut self) {
131        // Indicate that all `Event` clones have been dropped. This does not set the value if it
132        // has already been set to `State::Signaled`.
133        let _: bool = self.set(State::Dropped);
134    }
135}
136
137/// Future implementation for `Event::wait_or_dropped`.
138#[must_use = "futures do nothing unless polled"]
139pub struct EventWaitResult {
140    inner: Arc<Mutex<EventState>>,
141    waker_key: usize,
142    terminated: bool,
143}
144
145impl Future for EventWaitResult {
146    type Output = Result<(), Dropped>;
147
148    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
149        // `this: &mut Self` allows the compiler to track access to individual fields of Self as
150        // distinct borrows.
151        let this = self.get_mut();
152        let mut guard = this.inner.lock();
153
154        match guard.state {
155            State::Waiting => {
156                let mut new_key = None;
157                if this.waker_key == NULL_WAKER_KEY || !guard.wakers.contains(this.waker_key) {
158                    new_key = Some(guard.wakers.insert(cx.waker().clone()));
159                } else {
160                    guard.wakers[this.waker_key] = cx.waker().clone();
161                }
162
163                if let Some(key) = new_key {
164                    this.waker_key = key;
165                }
166
167                Poll::Pending
168            }
169            State::Signaled => {
170                this.terminated = true;
171                this.waker_key = NULL_WAKER_KEY;
172                Poll::Ready(Ok(()))
173            }
174            State::Dropped => {
175                this.terminated = true;
176                this.waker_key = NULL_WAKER_KEY;
177                Poll::Ready(Err(Dropped))
178            }
179        }
180    }
181}
182
183impl FusedFuture for EventWaitResult {
184    fn is_terminated(&self) -> bool {
185        self.terminated
186    }
187}
188
189impl Unpin for EventWaitResult {}
190
191impl Drop for EventWaitResult {
192    fn drop(&mut self) {
193        if self.waker_key != NULL_WAKER_KEY {
194            // Cleanup the EventWaitResult's waker one is present in the wakers slab.
195            let mut guard = self.inner.lock();
196            if guard.wakers.contains(self.waker_key) {
197                let _ = guard.wakers.remove(self.waker_key);
198            }
199        }
200    }
201}
202
203/// Future implementation for `Event::wait`. This future only completes when the event is signaled.
204/// If all signalers are dropped, `EventWait` continues to return `Poll::Pending`.
205#[must_use = "futures do nothing unless polled"]
206pub struct EventWait {
207    inner: EventWaitResult,
208}
209
210impl Future for EventWait {
211    type Output = ();
212
213    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214        match Pin::new(&mut self.inner).poll(cx) {
215            Poll::Ready(Ok(())) => Poll::Ready(()),
216            _ => Poll::Pending,
217        }
218    }
219}
220
221impl FusedFuture for EventWait {
222    fn is_terminated(&self) -> bool {
223        self.inner.is_terminated()
224    }
225}
226
227impl Unpin for EventWait {}
228
229/// Error returned from an `EventWait` when the Event is dropped.
230#[derive(Debug, Eq, PartialEq, Clone, Copy)]
231pub struct Dropped;
232
233impl fmt::Display for Dropped {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        write!(f, "event dropped")
236    }
237}
238
239impl std::error::Error for Dropped {}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use fuchsia_async as fasync;
245
246    // TODO: Add tests to check waker count in EventWait and EventWaitResult.
247
248    #[test]
249    fn signaled_method_respects_signaling() {
250        let event = Event::new();
251        let event_clone = event.clone();
252
253        assert!(!event.signaled());
254        assert!(!event_clone.signaled());
255
256        assert!(event.signal());
257
258        assert!(event.signaled());
259        assert!(event_clone.signaled());
260    }
261
262    #[test]
263    fn unsignaled_event_is_pending() {
264        let mut ex = fasync::TestExecutor::new();
265
266        let event = Event::new();
267        let mut wait = event.wait();
268        let mut wait_or_dropped = event.wait_or_dropped();
269        assert!(ex.run_until_stalled(&mut wait).is_pending());
270        assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
271    }
272
273    #[test]
274    fn signaled_event_is_ready() {
275        let mut ex = fasync::TestExecutor::new();
276
277        let event = Event::new();
278        let mut wait = event.wait();
279        let mut wait_or_dropped = event.wait_or_dropped();
280        assert!(event.signal());
281        assert!(ex.run_until_stalled(&mut wait).is_ready());
282        assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
283    }
284
285    #[test]
286    fn event_is_ready_and_wakes_after_stalled() {
287        let mut ex = fasync::TestExecutor::new();
288
289        let event = Event::new();
290        let mut wait = event.wait();
291        let mut wait_or_dropped = event.wait_or_dropped();
292        assert!(ex.run_until_stalled(&mut wait).is_pending());
293        assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
294        assert!(event.signal());
295        assert!(ex.run_until_stalled(&mut wait).is_ready());
296        assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
297    }
298
299    #[test]
300    fn signaling_event_registers_and_wakes_multiple_waiters_properly() {
301        let mut ex = fasync::TestExecutor::new();
302
303        let event = Event::new();
304        let mut wait_1 = event.wait();
305        let mut wait_2 = event.wait();
306        let mut wait_3 = event.wait();
307
308        // Multiple waiters events are pending correctly.
309        assert!(ex.run_until_stalled(&mut wait_1).is_pending());
310        assert!(ex.run_until_stalled(&mut wait_2).is_pending());
311
312        assert!(event.signal());
313
314        // Both previously registered and unregistered event waiters complete correctly.
315        assert!(ex.run_until_stalled(&mut wait_1).is_ready());
316        assert!(ex.run_until_stalled(&mut wait_2).is_ready());
317        assert!(ex.run_until_stalled(&mut wait_3).is_ready());
318    }
319
320    #[test]
321    fn event_is_terminated_after_complete() {
322        let mut ex = fasync::TestExecutor::new();
323
324        let event = Event::new();
325        let mut wait = event.wait();
326        let mut wait_or_dropped = event.wait_or_dropped();
327        assert!(ex.run_until_stalled(&mut wait).is_pending());
328        assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
329        assert!(!wait.is_terminated());
330        assert!(!wait_or_dropped.is_terminated());
331        assert!(event.signal());
332        assert!(ex.run_until_stalled(&mut wait).is_ready());
333        assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
334        assert!(wait.is_terminated());
335        assert!(wait_or_dropped.is_terminated());
336    }
337
338    #[test]
339    fn waiter_drops_gracefully() {
340        let mut ex = fasync::TestExecutor::new();
341
342        let event = Event::new();
343        let mut wait = event.wait();
344        let mut wait_or_dropped = event.wait();
345        assert!(ex.run_until_stalled(&mut wait).is_pending());
346        assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
347        assert!(!wait.is_terminated());
348        assert!(!wait_or_dropped.is_terminated());
349        drop(wait);
350        drop(wait_or_dropped);
351        assert!(event.signal());
352    }
353
354    #[test]
355    fn waiter_completes_after_all_events_drop() {
356        let mut ex = fasync::TestExecutor::new();
357
358        let event = Event::new();
359        let event_clone = Event::new();
360        let mut wait = event.wait();
361        let mut wait_or_dropped = event.wait_or_dropped();
362        assert!(ex.run_until_stalled(&mut wait).is_pending());
363        assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
364        assert!(!wait.is_terminated());
365        assert!(!wait_or_dropped.is_terminated());
366        drop(event);
367        drop(event_clone);
368        assert!(ex.run_until_stalled(&mut wait).is_pending());
369        assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
370    }
371
372    #[test]
373    fn drop_receiver_after_poll_without_event_signal() {
374        let mut exec = fasync::TestExecutor::new();
375        let event = Event::new();
376        let mut waiter = event.wait_or_dropped();
377        assert!(exec.run_until_stalled(&mut waiter).is_pending());
378        drop(event);
379        drop(waiter);
380    }
381
382    #[test]
383    fn drop_receiver_after_event_signal_without_repoll() {
384        let mut exec = fasync::TestExecutor::new();
385        let event = Event::new();
386        let mut waiter = event.wait_or_dropped();
387        assert_eq!(event.inner.inner.lock().wakers.len(), 0);
388
389        // Polling the waiter will register a new waker.
390        assert!(exec.run_until_stalled(&mut waiter).is_pending());
391        assert_eq!(event.inner.inner.lock().wakers.len(), 1);
392
393        // The waiter's waker is used.
394        assert!(event.signal());
395        assert_eq!(event.inner.inner.lock().wakers.len(), 0);
396
397        // Dropping a waiter without polling it is valid.
398        drop(waiter);
399    }
400}