fuchsia_async/handle/zircon/
on_signals.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::task::Poll;
10use std::{fmt, mem};
11
12use crate::runtime::{EHandle, PacketReceiver, RawReceiverRegistration};
13use futures::task::{AtomicWaker, Context};
14use zx::{self as zx, AsHandleRef};
15
16struct OnSignalsReceiver {
17    maybe_signals: AtomicUsize,
18    task: AtomicWaker,
19}
20
21impl OnSignalsReceiver {
22    fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
23        let mut signals = self.maybe_signals.load(Ordering::Relaxed);
24        if signals == 0 {
25            // No signals were received-- register to receive a wakeup when they arrive.
26            self.task.register(cx.waker());
27            // Check again for signals after registering for a wakeup in case signals
28            // arrived between registering and the initial load of signals
29            signals = self.maybe_signals.load(Ordering::SeqCst);
30        }
31        if signals == 0 {
32            Poll::Pending
33        } else {
34            Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
35        }
36    }
37
38    fn set_signals(&self, signals: zx::Signals) {
39        self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
40        self.task.wake();
41    }
42}
43
44impl PacketReceiver for OnSignalsReceiver {
45    fn receive_packet(&self, packet: zx::Packet) {
46        let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
47            p.observed()
48        } else {
49            return;
50        };
51
52        self.set_signals(observed);
53    }
54}
55
56pin_project_lite::pin_project! {
57    /// A future that completes when some set of signals become available on a Handle.
58    #[must_use = "futures do nothing unless polled"]
59    pub struct OnSignalsFuture<'a, H: AsHandleRef> {
60        handle: H,
61        signals: zx::Signals,
62        #[pin]
63        registration: RawReceiverRegistration<OnSignalsReceiver>,
64        phantom: PhantomData<&'a H>,
65    }
66
67    impl<'a, H: AsHandleRef> PinnedDrop for OnSignalsFuture<'a, H> {
68        fn drop(mut this: Pin<&mut Self>) {
69            this.unregister();
70        }
71    }
72}
73
74impl<'a, H: AsHandleRef + 'a> OnSignalsFuture<'a, H> {
75    /// Creates a new `OnSignals` object which will receive notifications when
76    /// any signals in `signals` occur on `handle`.
77    pub fn new(handle: H, signals: zx::Signals) -> Self {
78        // We don't register for the signals until first polled.  When we are first polled, we'll
79        // check to see if the signals are set and if they are, we're done.  If they aren't, we then
80        // register for an asynchronous notification via the port.
81        //
82        // We could change the code to register for the asynchronous notification here, but then
83        // when first polled, if the notification hasn't arrived, we'll still check to see if the
84        // signals are set (see below for the reason why).  Given that the time between construction
85        // and when we first poll is typically small, registering here probably won't make much
86        // difference (and on a single-threaded executor, a notification is unlikely to be processed
87        // before the first poll anyway).  The way we have it now means we don't have to register at
88        // all if the signals are already set, which will be a win some of the time.
89        OnSignalsFuture {
90            handle,
91            signals,
92            registration: RawReceiverRegistration::new(OnSignalsReceiver {
93                maybe_signals: AtomicUsize::new(0),
94                task: AtomicWaker::new(),
95            }),
96            phantom: PhantomData,
97        }
98    }
99
100    /// Takes the handle.
101    pub fn take_handle(mut self: Pin<&mut Self>) -> H
102    where
103        H: zx::HandleBased,
104    {
105        if self.registration.is_registered() {
106            self.as_mut().unregister();
107        }
108        mem::replace(self.project().handle, zx::Handle::invalid().into())
109    }
110
111    fn register(
112        mut registration: Pin<&mut RawReceiverRegistration<OnSignalsReceiver>>,
113        handle: &H,
114        signals: zx::Signals,
115        cx: Option<&mut Context<'_>>,
116    ) -> Result<(), zx::Status> {
117        registration.as_mut().register(EHandle::local());
118
119        // If a context has been supplied, we must register it now before calling
120        // `wait_async_handle` below to avoid races.
121        if let Some(cx) = cx {
122            registration.receiver().task.register(cx.waker());
123        }
124
125        handle.wait_async_handle(
126            registration.port().unwrap(),
127            registration.key().unwrap(),
128            signals,
129            zx::WaitAsyncOpts::empty(),
130        )?;
131
132        Ok(())
133    }
134
135    fn unregister(self: Pin<&mut Self>) {
136        let mut this = self.project();
137        if let Some((ehandle, key)) = this.registration.as_mut().unregister() {
138            if this.registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
139                // Ignore the error from zx_port_cancel, because it might just be a race condition.
140                // If the packet is handled between the above maybe_signals check and the port
141                // cancel, it will fail with ZX_ERR_NOT_FOUND, and we can't do anything about it.
142                let _ = ehandle.port().cancel(this.handle, key);
143            }
144        }
145    }
146}
147
148impl<H: AsHandleRef> Future for OnSignalsFuture<'_, H> {
149    type Output = Result<zx::Signals, zx::Status>;
150    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151        if !self.registration.is_registered() {
152            match self
153                .handle
154                .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
155                .to_result()
156            {
157                Ok(signals) => Poll::Ready(Ok(signals)),
158                Err(zx::Status::TIMED_OUT) => {
159                    let mut this = self.project();
160                    Self::register(
161                        this.registration.as_mut(),
162                        this.handle,
163                        *this.signals,
164                        Some(cx),
165                    )?;
166                    Poll::Pending
167                }
168                Err(e) => Poll::Ready(Err(e)),
169            }
170        } else {
171            match self.registration.receiver().get_signals(cx) {
172                Poll::Ready(signals) => Poll::Ready(Ok(signals)),
173                Poll::Pending => {
174                    // We haven't received a notification for the signals, but we still want to poll
175                    // the kernel in case the notification hasn't been processed yet by the
176                    // executor.  This behaviour is relied upon in some cases: in Component Manager,
177                    // in some shutdown paths, it wants to drain and process all messages in
178                    // channels before it closes them.  There is no other reliable way to flush a
179                    // pending notification (particularly on a multi-threaded executor).  This will
180                    // incur a small performance penalty in the case that this future has been
181                    // polled when no notification was actually received (such as can be the case
182                    // with some futures combinators).
183                    match self
184                        .handle
185                        .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
186                        .to_result()
187                    {
188                        Ok(signals) => Poll::Ready(Ok(signals)),
189                        Err(_) => Poll::Pending,
190                    }
191                }
192            }
193        }
194    }
195}
196
197impl<H: AsHandleRef> fmt::Debug for OnSignalsFuture<'_, H> {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        write!(f, "OnSignals")
200    }
201}
202
203impl<H: AsHandleRef> AsHandleRef for OnSignalsFuture<'_, H> {
204    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
205        self.handle.as_handle_ref()
206    }
207}
208
209impl<H: AsHandleRef> AsRef<H> for OnSignalsFuture<'_, H> {
210    fn as_ref(&self) -> &H {
211        &self.handle
212    }
213}
214
215/// A future that completes when some set of signals become available on a Handle.
216#[must_use = "futures do nothing unless polled"]
217pub struct OnSignals<'a, H: AsHandleRef> {
218    future: Pin<Box<OnSignalsFuture<'a, H>>>,
219}
220
221impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
222    /// Creates a new `OnSignals` object which will receive notifications when
223    /// any signals in `signals` occur on `handle`.
224    pub fn new(handle: H, signals: zx::Signals) -> Self {
225        Self { future: Box::pin(OnSignalsFuture::new(handle, signals)) }
226    }
227
228    /// Takes the handle.
229    pub fn take_handle(mut self) -> H
230    where
231        H: zx::HandleBased,
232    {
233        self.future.as_mut().take_handle()
234    }
235}
236
237impl<H: AsHandleRef> Future for OnSignals<'_, H> {
238    type Output = Result<zx::Signals, zx::Status>;
239    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
240        Pin::into_inner(self).future.as_mut().poll(cx)
241    }
242}
243
244impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        write!(f, "OnSignals")
247    }
248}
249
250impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
251    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
252        self.future.as_handle_ref()
253    }
254}
255
256impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
257    fn as_ref(&self) -> &H {
258        &self.future.handle
259    }
260}
261
262/// Alias for the common case where OnSignals is used with zx::HandleRef.
263pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
264
265#[cfg(test)]
266mod test {
267    use super::*;
268    use crate::TestExecutor;
269    use assert_matches::assert_matches;
270    use futures::future::{pending, FutureExt};
271    use futures::task::{waker, ArcWake};
272    use std::pin::pin;
273    use std::sync::Arc;
274
275    #[test]
276    fn wait_for_event() -> Result<(), zx::Status> {
277        let mut exec = crate::TestExecutor::new();
278        let mut deliver_events =
279            || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
280
281        let event = zx::Event::create();
282        let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
283        let (waker, waker_count) = futures_test::task::new_count_waker();
284        let cx = &mut std::task::Context::from_waker(&waker);
285
286        // Check that `signals` is still pending before the event has been signaled
287        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
288        deliver_events();
289        assert_eq!(waker_count, 0);
290        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
291
292        // signal the event and check that `signals` has been woken up and is
293        // no longer pending
294        event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
295        deliver_events();
296        assert_eq!(waker_count, 1);
297        assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
298
299        Ok(())
300    }
301
302    #[test]
303    fn drop_before_event() {
304        let mut fut = std::pin::pin!(async {
305            let ehandle = EHandle::local();
306
307            let event = zx::Event::create();
308            let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
309            assert_eq!(futures::poll!(&mut signals), Poll::Pending);
310            let key = signals.future.registration.key().unwrap();
311
312            std::mem::drop(signals);
313            assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
314        });
315
316        assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
317    }
318
319    #[test]
320    fn test_always_polls() {
321        let mut exec = TestExecutor::new();
322
323        let (rx, tx) = zx::Channel::create();
324
325        let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
326
327        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
328
329        tx.write(b"hello", &mut []).expect("write failed");
330
331        struct Waker;
332        impl ArcWake for Waker {
333            fn wake_by_ref(_arc_self: &Arc<Self>) {}
334        }
335
336        // Poll the future directly which guarantees the port notification for the write hasn't
337        // arrived.
338        assert_matches!(
339            fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
340            Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
341        );
342    }
343
344    #[test]
345    fn test_take_handle() {
346        let mut exec = TestExecutor::new();
347
348        let (rx, tx) = zx::Channel::create();
349
350        let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
351
352        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
353
354        tx.write(b"hello", &mut []).expect("write failed");
355
356        assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
357
358        let mut message = zx::MessageBuf::new();
359        fut.take_handle().read(&mut message).unwrap();
360
361        assert_eq!(message.bytes(), b"hello");
362    }
363}