fuchsia_async/handle/zircon/
on_signals.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::task::Poll;
11use std::{fmt, mem};
12
13use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
14use futures::task::{AtomicWaker, Context};
15use zx::{self as zx, AsHandleRef};
16
17struct OnSignalsReceiver {
18    maybe_signals: AtomicUsize,
19    task: AtomicWaker,
20}
21
22impl OnSignalsReceiver {
23    fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
24        let mut signals = self.maybe_signals.load(Ordering::Relaxed);
25        if signals == 0 {
26            // No signals were received-- register to receive a wakeup when they arrive.
27            self.task.register(cx.waker());
28            // Check again for signals after registering for a wakeup in case signals
29            // arrived between registering and the initial load of signals
30            signals = self.maybe_signals.load(Ordering::SeqCst);
31        }
32        if signals == 0 {
33            Poll::Pending
34        } else {
35            Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
36        }
37    }
38
39    fn set_signals(&self, signals: zx::Signals) {
40        self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
41        self.task.wake();
42    }
43}
44
45impl PacketReceiver for OnSignalsReceiver {
46    fn receive_packet(&self, packet: zx::Packet) {
47        let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
48            p.observed()
49        } else {
50            return;
51        };
52
53        self.set_signals(observed);
54    }
55}
56
57/// A future that completes when some set of signals become available on a Handle.
58#[must_use = "futures do nothing unless polled"]
59pub struct OnSignals<'a, H: AsHandleRef> {
60    handle: H,
61    signals: zx::Signals,
62    registration: Option<ReceiverRegistration<OnSignalsReceiver>>,
63    phantom: PhantomData<&'a H>,
64}
65
66/// Alias for the common case where OnSignals is used with zx::HandleRef.
67pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
68
69impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
70    /// Creates a new `OnSignals` object which will receive notifications when
71    /// any signals in `signals` occur on `handle`.
72    pub fn new(handle: H, signals: zx::Signals) -> Self {
73        // We don't register for the signals until first polled.  When we are first polled, we'll
74        // check to see if the signals are set and if they are, we're done.  If they aren't, we then
75        // register for an asynchronous notification via the port.
76        //
77        // We could change the code to register for the asynchronous notification here, but then
78        // when first polled, if the notification hasn't arrived, we'll still check to see if the
79        // signals are set (see below for the reason why).  Given that the time between construction
80        // and when we first poll is typically small, registering here probably won't make much
81        // difference (and on a single-threaded executor, a notification is unlikely to be processed
82        // before the first poll anyway).  The way we have it now means we don't have to register at
83        // all if the signals are already set, which will be a win some of the time.
84        OnSignals { handle, signals, registration: None, phantom: PhantomData }
85    }
86
87    /// Takes the handle.
88    pub fn take_handle(mut self) -> H
89    where
90        H: zx::HandleBased,
91    {
92        self.unregister();
93        std::mem::replace(&mut self.handle, zx::Handle::invalid().into())
94    }
95
96    /// This function allows the `OnSignals` object to live for the `'static` lifetime, at the cost
97    /// of disabling automatic cleanup of the port wait.
98    ///
99    /// WARNING: Do not use unless you can guarantee that either:
100    /// - The future is not dropped before it completes, or
101    /// - The handle is dropped without creating additional OnSignals futures for it.
102    ///
103    /// Creating an OnSignals calls zx_object_wait_async, which consumes a small amount of kernel
104    /// resources. Dropping the OnSignals calls zx_port_cancel to clean up. But calling
105    /// extend_lifetime disables this cleanup, since the zx_port_wait call requires a reference to
106    /// the handle. The port registration can also be cleaned up by closing the handle or by
107    /// waiting for the signal to be triggered. But if neither of these happens, the registration
108    /// is leaked. This wastes kernel memory and the kernel will eventually kill your process to
109    /// force a cleanup.
110    ///
111    /// Note that `OnSignals` will not fire if the handle that was used to create it is dropped or
112    /// transferred to another process.
113    // TODO(https://fxbug.dev/42182035): Try to remove this footgun.
114    pub fn extend_lifetime(mut self) -> LeakedOnSignals {
115        match self.registration.take() {
116            Some(r) => LeakedOnSignals { registration: Ok(r) },
117            None => LeakedOnSignals { registration: self.register(None) },
118        }
119    }
120
121    fn register(
122        &self,
123        cx: Option<&mut Context<'_>>,
124    ) -> Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status> {
125        let registration = EHandle::local().register_receiver(Arc::new(OnSignalsReceiver {
126            maybe_signals: AtomicUsize::new(0),
127            task: AtomicWaker::new(),
128        }));
129
130        // If a context has been supplied, we must register it now before calling
131        // `wait_async_handle` below to avoid races.
132        if let Some(cx) = cx {
133            registration.task.register(cx.waker());
134        }
135
136        self.handle.wait_async_handle(
137            registration.port(),
138            registration.key(),
139            self.signals,
140            zx::WaitAsyncOpts::empty(),
141        )?;
142
143        Ok(registration)
144    }
145
146    fn unregister(&mut self) {
147        if let Some(registration) = self.registration.take() {
148            if registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
149                // Ignore the error from zx_port_cancel, because it might just be a race condition.
150                // If the packet is handled between the above maybe_signals check and the port
151                // cancel, it will fail with ZX_ERR_NOT_FOUND, and we can't do anything about it.
152                let _ = registration.port().cancel(&self.handle, registration.key());
153            }
154        }
155    }
156}
157
158impl<H: AsHandleRef + Unpin> Future for OnSignals<'_, H> {
159    type Output = Result<zx::Signals, zx::Status>;
160    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161        match &self.registration {
162            None => {
163                match self
164                    .handle
165                    .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
166                    .to_result()
167                {
168                    Ok(signals) => Poll::Ready(Ok(signals)),
169                    Err(zx::Status::TIMED_OUT) => {
170                        let registration = self.register(Some(cx))?;
171                        self.get_mut().registration = Some(registration);
172                        Poll::Pending
173                    }
174                    Err(e) => Poll::Ready(Err(e)),
175                }
176            }
177            Some(r) => match r.receiver().get_signals(cx) {
178                Poll::Ready(signals) => Poll::Ready(Ok(signals)),
179                Poll::Pending => {
180                    // We haven't received a notification for the signals, but we still want to poll
181                    // the kernel in case the notification hasn't been processed yet by the
182                    // executor.  This behaviour is relied upon in some cases: in Component Manager,
183                    // in some shutdown paths, it wants to drain and process all messages in
184                    // channels before it closes them.  There is no other reliable way to flush a
185                    // pending notification (particularly on a multi-threaded executor).  This will
186                    // incur a small performance penalty in the case that this future has been
187                    // polled when no notification was actually received (such as can be the case
188                    // with some futures combinators).
189                    match self
190                        .handle
191                        .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
192                        .to_result()
193                    {
194                        Ok(signals) => Poll::Ready(Ok(signals)),
195                        Err(_) => Poll::Pending,
196                    }
197                }
198            },
199        }
200    }
201}
202
203impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
204    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205        write!(f, "OnSignals")
206    }
207}
208
209impl<H: AsHandleRef> Drop for OnSignals<'_, H> {
210    fn drop(&mut self) {
211        self.unregister();
212    }
213}
214
215impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
216    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
217        self.handle.as_handle_ref()
218    }
219}
220
221impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
222    fn as_ref(&self) -> &H {
223        &self.handle
224    }
225}
226
227pub struct LeakedOnSignals {
228    registration: Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status>,
229}
230
231impl Future for LeakedOnSignals {
232    type Output = Result<zx::Signals, zx::Status>;
233    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234        let reg = self.registration.as_mut().map_err(|e| mem::replace(e, zx::Status::OK))?;
235        reg.receiver().get_signals(cx).map(Ok)
236    }
237}
238
239#[cfg(test)]
240mod test {
241    use super::*;
242    use crate::TestExecutor;
243    use assert_matches::assert_matches;
244    use futures::future::{pending, FutureExt};
245    use futures::task::{waker, ArcWake};
246    use std::pin::pin;
247
248    #[test]
249    fn wait_for_event() -> Result<(), zx::Status> {
250        let mut exec = crate::TestExecutor::new();
251        let mut deliver_events =
252            || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
253
254        let event = zx::Event::create();
255        let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
256        let (waker, waker_count) = futures_test::task::new_count_waker();
257        let cx = &mut std::task::Context::from_waker(&waker);
258
259        // Check that `signals` is still pending before the event has been signaled
260        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
261        deliver_events();
262        assert_eq!(waker_count, 0);
263        assert_eq!(signals.poll_unpin(cx), Poll::Pending);
264
265        // signal the event and check that `signals` has been woken up and is
266        // no longer pending
267        event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
268        deliver_events();
269        assert_eq!(waker_count, 1);
270        assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
271
272        Ok(())
273    }
274
275    #[test]
276    fn drop_before_event() {
277        let mut fut = std::pin::pin!(async {
278            let ehandle = EHandle::local();
279
280            let event = zx::Event::create();
281            let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
282            assert_eq!(futures::poll!(&mut signals), Poll::Pending);
283            let key = signals.registration.as_ref().unwrap().key();
284
285            std::mem::drop(signals);
286            assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
287
288            // try again but with extend_lifetime
289            let signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED).extend_lifetime();
290            let key = signals.registration.as_ref().unwrap().key();
291            std::mem::drop(signals);
292            assert!(ehandle.port().cancel(&event, key) == Ok(()));
293        });
294
295        assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
296    }
297
298    #[test]
299    fn test_always_polls() {
300        let mut exec = TestExecutor::new();
301
302        let (rx, tx) = zx::Channel::create();
303
304        let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
305
306        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
307
308        tx.write(b"hello", &mut []).expect("write failed");
309
310        struct Waker;
311        impl ArcWake for Waker {
312            fn wake_by_ref(_arc_self: &Arc<Self>) {}
313        }
314
315        // Poll the future directly which guarantees the port notification for the write hasn't
316        // arrived.
317        assert_matches!(
318            fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
319            Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
320        );
321    }
322
323    #[test]
324    fn test_take_handle() {
325        let mut exec = TestExecutor::new();
326
327        let (rx, tx) = zx::Channel::create();
328
329        let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
330
331        assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
332
333        tx.write(b"hello", &mut []).expect("write failed");
334
335        assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
336
337        let mut message = zx::MessageBuf::new();
338        fut.take_handle().read(&mut message).unwrap();
339
340        assert_eq!(message.bytes(), b"hello");
341    }
342}