fuchsia_async/handle/zircon/
on_signals.rs1use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::task::Poll;
11use std::{fmt, mem};
12
13use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
14use futures::task::{AtomicWaker, Context};
15use zx::{self as zx, AsHandleRef};
16
17struct OnSignalsReceiver {
18 maybe_signals: AtomicUsize,
19 task: AtomicWaker,
20}
21
22impl OnSignalsReceiver {
23 fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
24 let mut signals = self.maybe_signals.load(Ordering::Relaxed);
25 if signals == 0 {
26 self.task.register(cx.waker());
28 signals = self.maybe_signals.load(Ordering::SeqCst);
31 }
32 if signals == 0 {
33 Poll::Pending
34 } else {
35 Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
36 }
37 }
38
39 fn set_signals(&self, signals: zx::Signals) {
40 self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
41 self.task.wake();
42 }
43}
44
45impl PacketReceiver for OnSignalsReceiver {
46 fn receive_packet(&self, packet: zx::Packet) {
47 let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
48 p.observed()
49 } else {
50 return;
51 };
52
53 self.set_signals(observed);
54 }
55}
56
57#[must_use = "futures do nothing unless polled"]
59pub struct OnSignals<'a, H: AsHandleRef> {
60 handle: H,
61 signals: zx::Signals,
62 registration: Option<ReceiverRegistration<OnSignalsReceiver>>,
63 phantom: PhantomData<&'a H>,
64}
65
66pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
68
69impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
70 pub fn new(handle: H, signals: zx::Signals) -> Self {
73 OnSignals { handle, signals, registration: None, phantom: PhantomData }
85 }
86
87 pub fn take_handle(mut self) -> H
89 where
90 H: zx::HandleBased,
91 {
92 self.unregister();
93 std::mem::replace(&mut self.handle, zx::Handle::invalid().into())
94 }
95
96 pub fn extend_lifetime(mut self) -> LeakedOnSignals {
115 match self.registration.take() {
116 Some(r) => LeakedOnSignals { registration: Ok(r) },
117 None => LeakedOnSignals { registration: self.register(None) },
118 }
119 }
120
121 fn register(
122 &self,
123 cx: Option<&mut Context<'_>>,
124 ) -> Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status> {
125 let registration = EHandle::local().register_receiver(Arc::new(OnSignalsReceiver {
126 maybe_signals: AtomicUsize::new(0),
127 task: AtomicWaker::new(),
128 }));
129
130 if let Some(cx) = cx {
133 registration.task.register(cx.waker());
134 }
135
136 self.handle.wait_async_handle(
137 registration.port(),
138 registration.key(),
139 self.signals,
140 zx::WaitAsyncOpts::empty(),
141 )?;
142
143 Ok(registration)
144 }
145
146 fn unregister(&mut self) {
147 if let Some(registration) = self.registration.take() {
148 if registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
149 let _ = registration.port().cancel(&self.handle, registration.key());
153 }
154 }
155 }
156}
157
158impl<H: AsHandleRef + Unpin> Future for OnSignals<'_, H> {
159 type Output = Result<zx::Signals, zx::Status>;
160 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161 match &self.registration {
162 None => {
163 match self
164 .handle
165 .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
166 .to_result()
167 {
168 Ok(signals) => Poll::Ready(Ok(signals)),
169 Err(zx::Status::TIMED_OUT) => {
170 let registration = self.register(Some(cx))?;
171 self.get_mut().registration = Some(registration);
172 Poll::Pending
173 }
174 Err(e) => Poll::Ready(Err(e)),
175 }
176 }
177 Some(r) => match r.receiver().get_signals(cx) {
178 Poll::Ready(signals) => Poll::Ready(Ok(signals)),
179 Poll::Pending => {
180 match self
190 .handle
191 .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
192 .to_result()
193 {
194 Ok(signals) => Poll::Ready(Ok(signals)),
195 Err(_) => Poll::Pending,
196 }
197 }
198 },
199 }
200 }
201}
202
203impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
204 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205 write!(f, "OnSignals")
206 }
207}
208
209impl<H: AsHandleRef> Drop for OnSignals<'_, H> {
210 fn drop(&mut self) {
211 self.unregister();
212 }
213}
214
215impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
216 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
217 self.handle.as_handle_ref()
218 }
219}
220
221impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
222 fn as_ref(&self) -> &H {
223 &self.handle
224 }
225}
226
227pub struct LeakedOnSignals {
228 registration: Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status>,
229}
230
231impl Future for LeakedOnSignals {
232 type Output = Result<zx::Signals, zx::Status>;
233 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234 let reg = self.registration.as_mut().map_err(|e| mem::replace(e, zx::Status::OK))?;
235 reg.receiver().get_signals(cx).map(Ok)
236 }
237}
238
239#[cfg(test)]
240mod test {
241 use super::*;
242 use crate::TestExecutor;
243 use assert_matches::assert_matches;
244 use futures::future::{pending, FutureExt};
245 use futures::task::{waker, ArcWake};
246 use std::pin::pin;
247
248 #[test]
249 fn wait_for_event() -> Result<(), zx::Status> {
250 let mut exec = crate::TestExecutor::new();
251 let mut deliver_events =
252 || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
253
254 let event = zx::Event::create();
255 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
256 let (waker, waker_count) = futures_test::task::new_count_waker();
257 let cx = &mut std::task::Context::from_waker(&waker);
258
259 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
261 deliver_events();
262 assert_eq!(waker_count, 0);
263 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
264
265 event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
268 deliver_events();
269 assert_eq!(waker_count, 1);
270 assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
271
272 Ok(())
273 }
274
275 #[test]
276 fn drop_before_event() {
277 let mut fut = std::pin::pin!(async {
278 let ehandle = EHandle::local();
279
280 let event = zx::Event::create();
281 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
282 assert_eq!(futures::poll!(&mut signals), Poll::Pending);
283 let key = signals.registration.as_ref().unwrap().key();
284
285 std::mem::drop(signals);
286 assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
287
288 let signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED).extend_lifetime();
290 let key = signals.registration.as_ref().unwrap().key();
291 std::mem::drop(signals);
292 assert!(ehandle.port().cancel(&event, key) == Ok(()));
293 });
294
295 assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
296 }
297
298 #[test]
299 fn test_always_polls() {
300 let mut exec = TestExecutor::new();
301
302 let (rx, tx) = zx::Channel::create();
303
304 let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
305
306 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
307
308 tx.write(b"hello", &mut []).expect("write failed");
309
310 struct Waker;
311 impl ArcWake for Waker {
312 fn wake_by_ref(_arc_self: &Arc<Self>) {}
313 }
314
315 assert_matches!(
318 fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
319 Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
320 );
321 }
322
323 #[test]
324 fn test_take_handle() {
325 let mut exec = TestExecutor::new();
326
327 let (rx, tx) = zx::Channel::create();
328
329 let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
330
331 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
332
333 tx.write(b"hello", &mut []).expect("write failed");
334
335 assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
336
337 let mut message = zx::MessageBuf::new();
338 fut.take_handle().read(&mut message).unwrap();
339
340 assert_eq!(message.bytes(), b"hello");
341 }
342}