fuchsia_async/handle/zircon/
on_signals.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::task::Poll;
11use std::fmt;
12
13use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
14use futures::task::{AtomicWaker, Context};
15use zx::{self as zx, AsHandleRef};
16
17struct OnSignalsReceiver {
18    maybe_signals: AtomicUsize,
19    task: AtomicWaker,
20}
21
22impl OnSignalsReceiver {
23    fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
24        let mut signals = self.maybe_signals.load(Ordering::Relaxed);
25        if signals == 0 {
26            // No signals were received-- register to receive a wakeup when they arrive.
27            self.task.register(cx.waker());
28            // Check again for signals after registering for a wakeup in case signals
29            // arrived between registering and the initial load of signals
30            signals = self.maybe_signals.load(Ordering::SeqCst);
31        }
32        if signals == 0 {
33            Poll::Pending
34        } else {
35            Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
36        }
37    }
38
39    fn set_signals(&self, signals: zx::Signals) {
40        self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
41        self.task.wake();
42    }
43}
44
45impl PacketReceiver for OnSignalsReceiver {
46    fn receive_packet(&self, packet: zx::Packet) {
47        let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
48            p.observed()
49        } else {
50            return;
51        };
52
53        self.set_signals(observed);
54    }
55}
56
57/// A future that completes when some set of signals become available on a Handle.
58#[must_use = "futures do nothing unless polled"]
59pub struct OnSignals<'a, H: AsHandleRef> {
60    handle: H,
61    signals: zx::Signals,
62    registration: Option<ReceiverRegistration<OnSignalsReceiver>>,
63    phantom: PhantomData<&'a H>,
64}
65
66/// Alias for the common case where OnSignals is used with zx::HandleRef.
67pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
68
69impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
70    /// Creates a new `OnSignals` object which will receive notifications when
71    /// any signals in `signals` occur on `handle`.
72    pub fn new(handle: H, signals: zx::Signals) -> Self {
73        // We don't register for the signals until first polled.  When we are first polled, we'll
74        // check to see if the signals are set and if they are, we're done.  If they aren't, we then
75        // register for an asynchronous notification via the port.
76        //
77        // We could change the code to register for the asynchronous notification here, but then
78        // when first polled, if the notification hasn't arrived, we'll still check to see if the
79        // signals are set (see below for the reason why).  Given that the time between construction
80        // and when we first poll is typically small, registering here probably won't make much
81        // difference (and on a single-threaded executor, a notification is unlikely to be processed
82        // before the first poll anyway).  The way we have it now means we don't have to register at
83        // all if the signals are already set, which will be a win some of the time.
84        OnSignals { handle, signals, registration: None, phantom: PhantomData }
85    }
86
87    /// Takes the handle.
88    pub fn take_handle(mut self) -> H
89    where
90        H: zx::HandleBased,
91    {
92        self.unregister();
93        std::mem::replace(&mut self.handle, zx::Handle::invalid().into())
94    }
95
96    fn register(
97        &self,
98        cx: Option<&mut Context<'_>>,
99    ) -> Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status> {
100        let registration = EHandle::local().register_receiver(Arc::new(OnSignalsReceiver {
101            maybe_signals: AtomicUsize::new(0),
102            task: AtomicWaker::new(),
103        }));
104
105        // If a context has been supplied, we must register it now before calling
106        // `wait_async_handle` below to avoid races.
107        if let Some(cx) = cx {
108            registration.task.register(cx.waker());
109        }
110
111        self.handle.wait_async_handle(
112            registration.port(),
113            registration.key(),
114            self.signals,
115            zx::WaitAsyncOpts::empty(),
116        )?;
117
118        Ok(registration)
119    }
120
121    fn unregister(&mut self) {
122        if let Some(registration) = self.registration.take() {
123            if registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
124                // Ignore the error from zx_port_cancel, because it might just be a race condition.
125                // If the packet is handled between the above maybe_signals check and the port
126                // cancel, it will fail with ZX_ERR_NOT_FOUND, and we can't do anything about it.
127                let _ = registration.port().cancel(&self.handle, registration.key());
128            }
129        }
130    }
131}
132
133impl<H: AsHandleRef + Unpin> Future for OnSignals<'_, H> {
134    type Output = Result<zx::Signals, zx::Status>;
135    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136        match &self.registration {
137            None => {
138                match self
139                    .handle
140                    .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
141                    .to_result()
142                {
143                    Ok(signals) => Poll::Ready(Ok(signals)),
144                    Err(zx::Status::TIMED_OUT) => {
145                        let registration = self.register(Some(cx))?;
146                        self.get_mut().registration = Some(registration);
147                        Poll::Pending
148                    }
149                    Err(e) => Poll::Ready(Err(e)),
150                }
151            }
152            Some(r) => match r.receiver().get_signals(cx) {
153                Poll::Ready(signals) => Poll::Ready(Ok(signals)),
154                Poll::Pending => {
155                    // We haven't received a notification for the signals, but we still want to poll
156                    // the kernel in case the notification hasn't been processed yet by the
157                    // executor.  This behaviour is relied upon in some cases: in Component Manager,
158                    // in some shutdown paths, it wants to drain and process all messages in
159                    // channels before it closes them.  There is no other reliable way to flush a
160                    // pending notification (particularly on a multi-threaded executor).  This will
161                    // incur a small performance penalty in the case that this future has been
162                    // polled when no notification was actually received (such as can be the case
163                    // with some futures combinators).
164                    match self
165                        .handle
166                        .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
167                        .to_result()
168                    {
169                        Ok(signals) => Poll::Ready(Ok(signals)),
170                        Err(_) => Poll::Pending,
171                    }
172                }
173            },
174        }
175    }
176}
177
178impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        write!(f, "OnSignals")
181    }
182}
183
184impl<H: AsHandleRef> Drop for OnSignals<'_, H> {
185    fn drop(&mut self) {
186        self.unregister();
187    }
188}
189
190impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
191    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
192        self.handle.as_handle_ref()
193    }
194}
195
196impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
197    fn as_ref(&self) -> &H {
198        &self.handle
199    }
200}
201
202#[cfg(test)]
203mod test {
204    use super::*;
205    use crate::TestExecutor;
206    use assert_matches::assert_matches;
207    use futures::future::{pending, FutureExt};
208    use futures::task::{waker, ArcWake};
209    use std::pin::pin;
210
211    #[test]
212    fn wait_for_event() -> Result<(), zx::Status> {
213        let mut exec = crate::TestExecutor::new();
214        let mut deliver_events =
215            || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
216
217        let event = zx::Event::create();
218        let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
219        let (waker, waker_count) = futures_test::task::new_count_waker();
220        let cx = &mut std::task::Context::from_waker(&waker);
221
222        // Check that `signals` is still pending before the event has been signaled
223        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
224        deliver_events();
225        assert_eq!(waker_count, 0);
226        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
227
228        // signal the event and check that `signals` has been woken up and is
229        // no longer pending
230        event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
231        deliver_events();
232        assert_eq!(waker_count, 1);
233        assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
234
235        Ok(())
236    }
237
238    #[test]
239    fn drop_before_event() {
240        let mut fut = std::pin::pin!(async {
241            let ehandle = EHandle::local();
242
243            let event = zx::Event::create();
244            let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
245            assert_eq!(futures::poll!(&mut signals), Poll::Pending);
246            let key = signals.registration.as_ref().unwrap().key();
247
248            std::mem::drop(signals);
249            assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
250        });
251
252        assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
253    }
254
255    #[test]
256    fn test_always_polls() {
257        let mut exec = TestExecutor::new();
258
259        let (rx, tx) = zx::Channel::create();
260
261        let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
262
263        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
264
265        tx.write(b"hello", &mut []).expect("write failed");
266
267        struct Waker;
268        impl ArcWake for Waker {
269            fn wake_by_ref(_arc_self: &Arc<Self>) {}
270        }
271
272        // Poll the future directly which guarantees the port notification for the write hasn't
273        // arrived.
274        assert_matches!(
275            fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
276            Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
277        );
278    }
279
280    #[test]
281    fn test_take_handle() {
282        let mut exec = TestExecutor::new();
283
284        let (rx, tx) = zx::Channel::create();
285
286        let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
287
288        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
289
290        tx.write(b"hello", &mut []).expect("write failed");
291
292        assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
293
294        let mut message = zx::MessageBuf::new();
295        fut.take_handle().read(&mut message).unwrap();
296
297        assert_eq!(message.bytes(), b"hello");
298    }
299}