fuchsia_async/handle/zircon/
on_signals.rs1use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::task::Poll;
10use std::{fmt, mem};
11
12use crate::runtime::{EHandle, PacketReceiver, RawReceiverRegistration};
13use futures::task::{AtomicWaker, Context};
14use zx::{self as zx, AsHandleRef};
15
16struct OnSignalsReceiver {
17 maybe_signals: AtomicUsize,
18 task: AtomicWaker,
19}
20
21impl OnSignalsReceiver {
22 fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
23 let mut signals = self.maybe_signals.load(Ordering::Relaxed);
24 if signals == 0 {
25 self.task.register(cx.waker());
27 signals = self.maybe_signals.load(Ordering::SeqCst);
30 }
31 if signals == 0 {
32 Poll::Pending
33 } else {
34 Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
35 }
36 }
37
38 fn set_signals(&self, signals: zx::Signals) {
39 self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
40 self.task.wake();
41 }
42}
43
44impl PacketReceiver for OnSignalsReceiver {
45 fn receive_packet(&self, packet: zx::Packet) {
46 let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
47 p.observed()
48 } else {
49 return;
50 };
51
52 self.set_signals(observed);
53 }
54}
55
56pin_project_lite::pin_project! {
57 #[must_use = "futures do nothing unless polled"]
59 pub struct OnSignalsFuture<'a, H: AsHandleRef> {
60 handle: H,
61 signals: zx::Signals,
62 #[pin]
63 registration: RawReceiverRegistration<OnSignalsReceiver>,
64 phantom: PhantomData<&'a H>,
65 }
66
67 impl<'a, H: AsHandleRef> PinnedDrop for OnSignalsFuture<'a, H> {
68 fn drop(mut this: Pin<&mut Self>) {
69 this.unregister();
70 }
71 }
72}
73
74impl<'a, H: AsHandleRef + 'a> OnSignalsFuture<'a, H> {
75 pub fn new(handle: H, signals: zx::Signals) -> Self {
78 OnSignalsFuture {
90 handle,
91 signals,
92 registration: RawReceiverRegistration::new(OnSignalsReceiver {
93 maybe_signals: AtomicUsize::new(0),
94 task: AtomicWaker::new(),
95 }),
96 phantom: PhantomData,
97 }
98 }
99
100 pub fn take_handle(mut self: Pin<&mut Self>) -> H
102 where
103 H: zx::HandleBased,
104 {
105 if self.registration.is_registered() {
106 self.as_mut().unregister();
107 }
108 mem::replace(self.project().handle, zx::Handle::invalid().into())
109 }
110
111 fn register(
112 mut registration: Pin<&mut RawReceiverRegistration<OnSignalsReceiver>>,
113 handle: &H,
114 signals: zx::Signals,
115 cx: Option<&mut Context<'_>>,
116 ) -> Result<(), zx::Status> {
117 registration.as_mut().register(EHandle::local());
118
119 if let Some(cx) = cx {
122 registration.receiver().task.register(cx.waker());
123 }
124
125 handle.wait_async_handle(
126 registration.port().unwrap(),
127 registration.key().unwrap(),
128 signals,
129 zx::WaitAsyncOpts::empty(),
130 )?;
131
132 Ok(())
133 }
134
135 fn unregister(self: Pin<&mut Self>) {
136 let mut this = self.project();
137 if let Some((ehandle, key)) = this.registration.as_mut().unregister() {
138 if this.registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
139 let _ = ehandle.port().cancel(this.handle, key);
143 }
144 }
145 }
146}
147
148impl<H: AsHandleRef> Future for OnSignalsFuture<'_, H> {
149 type Output = Result<zx::Signals, zx::Status>;
150 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
151 if !self.registration.is_registered() {
152 match self
153 .handle
154 .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
155 .to_result()
156 {
157 Ok(signals) => Poll::Ready(Ok(signals)),
158 Err(zx::Status::TIMED_OUT) => {
159 let mut this = self.project();
160 Self::register(
161 this.registration.as_mut(),
162 this.handle,
163 *this.signals,
164 Some(cx),
165 )?;
166 Poll::Pending
167 }
168 Err(e) => Poll::Ready(Err(e)),
169 }
170 } else {
171 match self.registration.receiver().get_signals(cx) {
172 Poll::Ready(signals) => Poll::Ready(Ok(signals)),
173 Poll::Pending => {
174 match self
184 .handle
185 .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
186 .to_result()
187 {
188 Ok(signals) => Poll::Ready(Ok(signals)),
189 Err(_) => Poll::Pending,
190 }
191 }
192 }
193 }
194 }
195}
196
197impl<H: AsHandleRef> fmt::Debug for OnSignalsFuture<'_, H> {
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 write!(f, "OnSignals")
200 }
201}
202
203impl<H: AsHandleRef> AsHandleRef for OnSignalsFuture<'_, H> {
204 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
205 self.handle.as_handle_ref()
206 }
207}
208
209impl<H: AsHandleRef> AsRef<H> for OnSignalsFuture<'_, H> {
210 fn as_ref(&self) -> &H {
211 &self.handle
212 }
213}
214
215#[must_use = "futures do nothing unless polled"]
217pub struct OnSignals<'a, H: AsHandleRef> {
218 future: Pin<Box<OnSignalsFuture<'a, H>>>,
219}
220
221impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
222 pub fn new(handle: H, signals: zx::Signals) -> Self {
225 Self { future: Box::pin(OnSignalsFuture::new(handle, signals)) }
226 }
227
228 pub fn take_handle(mut self) -> H
230 where
231 H: zx::HandleBased,
232 {
233 self.future.as_mut().take_handle()
234 }
235}
236
237impl<H: AsHandleRef> Future for OnSignals<'_, H> {
238 type Output = Result<zx::Signals, zx::Status>;
239 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
240 Pin::into_inner(self).future.as_mut().poll(cx)
241 }
242}
243
244impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 write!(f, "OnSignals")
247 }
248}
249
250impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
251 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
252 self.future.as_handle_ref()
253 }
254}
255
256impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
257 fn as_ref(&self) -> &H {
258 &self.future.handle
259 }
260}
261
262pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
264
265#[cfg(test)]
266mod test {
267 use super::*;
268 use crate::TestExecutor;
269 use assert_matches::assert_matches;
270 use futures::future::{pending, FutureExt};
271 use futures::task::{waker, ArcWake};
272 use std::pin::pin;
273 use std::sync::Arc;
274
275 #[test]
276 fn wait_for_event() -> Result<(), zx::Status> {
277 let mut exec = crate::TestExecutor::new();
278 let mut deliver_events =
279 || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
280
281 let event = zx::Event::create();
282 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
283 let (waker, waker_count) = futures_test::task::new_count_waker();
284 let cx = &mut std::task::Context::from_waker(&waker);
285
286 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
288 deliver_events();
289 assert_eq!(waker_count, 0);
290 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
291
292 event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
295 deliver_events();
296 assert_eq!(waker_count, 1);
297 assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
298
299 Ok(())
300 }
301
302 #[test]
303 fn drop_before_event() {
304 let mut fut = std::pin::pin!(async {
305 let ehandle = EHandle::local();
306
307 let event = zx::Event::create();
308 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
309 assert_eq!(futures::poll!(&mut signals), Poll::Pending);
310 let key = signals.future.registration.key().unwrap();
311
312 std::mem::drop(signals);
313 assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
314 });
315
316 assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
317 }
318
319 #[test]
320 fn test_always_polls() {
321 let mut exec = TestExecutor::new();
322
323 let (rx, tx) = zx::Channel::create();
324
325 let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
326
327 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
328
329 tx.write(b"hello", &mut []).expect("write failed");
330
331 struct Waker;
332 impl ArcWake for Waker {
333 fn wake_by_ref(_arc_self: &Arc<Self>) {}
334 }
335
336 assert_matches!(
339 fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
340 Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
341 );
342 }
343
344 #[test]
345 fn test_take_handle() {
346 let mut exec = TestExecutor::new();
347
348 let (rx, tx) = zx::Channel::create();
349
350 let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
351
352 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
353
354 tx.write(b"hello", &mut []).expect("write failed");
355
356 assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
357
358 let mut message = zx::MessageBuf::new();
359 fut.take_handle().read(&mut message).unwrap();
360
361 assert_eq!(message.bytes(), b"hello");
362 }
363}