fuchsia_async/handle/zircon/
on_signals.rs1use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::task::Poll;
11use std::{fmt, mem};
12
13use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
14use futures::task::{AtomicWaker, Context};
15use zx::{self as zx, AsHandleRef};
16
17struct OnSignalsReceiver {
18 maybe_signals: AtomicUsize,
19 task: AtomicWaker,
20}
21
22impl OnSignalsReceiver {
23 fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
24 let mut signals = self.maybe_signals.load(Ordering::Relaxed);
25 if signals == 0 {
26 self.task.register(cx.waker());
28 signals = self.maybe_signals.load(Ordering::SeqCst);
31 }
32 if signals == 0 {
33 Poll::Pending
34 } else {
35 Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
36 }
37 }
38
39 fn set_signals(&self, signals: zx::Signals) {
40 self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
41 self.task.wake();
42 }
43}
44
45impl PacketReceiver for OnSignalsReceiver {
46 fn receive_packet(&self, packet: zx::Packet) {
47 let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
48 p.observed()
49 } else {
50 return;
51 };
52
53 self.set_signals(observed);
54 }
55}
56
57#[must_use = "futures do nothing unless polled"]
59pub struct OnSignals<'a, H: AsHandleRef> {
60 handle: H,
61 signals: zx::Signals,
62 registration: Option<ReceiverRegistration<OnSignalsReceiver>>,
63 phantom: PhantomData<&'a H>,
64}
65
66pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
68
69impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
70 pub fn new(handle: H, signals: zx::Signals) -> Self {
73 OnSignals { handle, signals, registration: None, phantom: PhantomData }
85 }
86
87 pub fn take_handle(mut self) -> H
89 where
90 H: zx::HandleBased,
91 {
92 self.unregister();
93 std::mem::replace(&mut self.handle, zx::Handle::invalid().into())
94 }
95
96 pub fn extend_lifetime(mut self) -> LeakedOnSignals {
115 match self.registration.take() {
116 Some(r) => LeakedOnSignals { registration: Ok(r) },
117 None => LeakedOnSignals { registration: self.register(None) },
118 }
119 }
120
121 fn register(
122 &self,
123 cx: Option<&mut Context<'_>>,
124 ) -> Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status> {
125 let registration = EHandle::local().register_receiver(Arc::new(OnSignalsReceiver {
126 maybe_signals: AtomicUsize::new(0),
127 task: AtomicWaker::new(),
128 }));
129
130 if let Some(cx) = cx {
133 registration.task.register(cx.waker());
134 }
135
136 self.handle.wait_async_handle(
137 registration.port(),
138 registration.key(),
139 self.signals,
140 zx::WaitAsyncOpts::empty(),
141 )?;
142
143 Ok(registration)
144 }
145
146 fn unregister(&mut self) {
147 if let Some(registration) = self.registration.take() {
148 if registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
149 let _ = registration.port().cancel(&self.handle, registration.key());
153 }
154 }
155 }
156}
157
158impl<H: AsHandleRef + Unpin> Future for OnSignals<'_, H> {
159 type Output = Result<zx::Signals, zx::Status>;
160 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
161 match &self.registration {
162 None => {
163 match self.handle.wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST) {
164 Ok(signals) => Poll::Ready(Ok(signals)),
165 Err(zx::Status::TIMED_OUT) => {
166 let registration = self.register(Some(cx))?;
167 self.get_mut().registration = Some(registration);
168 Poll::Pending
169 }
170 Err(e) => Poll::Ready(Err(e)),
171 }
172 }
173 Some(r) => match r.receiver().get_signals(cx) {
174 Poll::Ready(signals) => Poll::Ready(Ok(signals)),
175 Poll::Pending => {
176 match self.handle.wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
186 {
187 Ok(signals) => Poll::Ready(Ok(signals)),
188 Err(_) => Poll::Pending,
189 }
190 }
191 },
192 }
193 }
194}
195
196impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198 write!(f, "OnSignals")
199 }
200}
201
202impl<H: AsHandleRef> Drop for OnSignals<'_, H> {
203 fn drop(&mut self) {
204 self.unregister();
205 }
206}
207
208impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
209 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
210 self.handle.as_handle_ref()
211 }
212}
213
214impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
215 fn as_ref(&self) -> &H {
216 &self.handle
217 }
218}
219
220pub struct LeakedOnSignals {
221 registration: Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status>,
222}
223
224impl Future for LeakedOnSignals {
225 type Output = Result<zx::Signals, zx::Status>;
226 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
227 let reg = self.registration.as_mut().map_err(|e| mem::replace(e, zx::Status::OK))?;
228 reg.receiver().get_signals(cx).map(Ok)
229 }
230}
231
232#[cfg(test)]
233mod test {
234 use super::*;
235 use crate::TestExecutor;
236 use assert_matches::assert_matches;
237 use futures::future::{pending, FutureExt};
238 use futures::task::{waker, ArcWake};
239 use std::pin::pin;
240
241 #[test]
242 fn wait_for_event() -> Result<(), zx::Status> {
243 let mut exec = crate::TestExecutor::new();
244 let mut deliver_events =
245 || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
246
247 let event = zx::Event::create();
248 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
249 let (waker, waker_count) = futures_test::task::new_count_waker();
250 let cx = &mut std::task::Context::from_waker(&waker);
251
252 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
254 deliver_events();
255 assert_eq!(waker_count, 0);
256 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
257
258 event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
261 deliver_events();
262 assert_eq!(waker_count, 1);
263 assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
264
265 Ok(())
266 }
267
268 #[test]
269 fn drop_before_event() {
270 let mut fut = std::pin::pin!(async {
271 let ehandle = EHandle::local();
272
273 let event = zx::Event::create();
274 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
275 assert_eq!(futures::poll!(&mut signals), Poll::Pending);
276 let key = signals.registration.as_ref().unwrap().key();
277
278 std::mem::drop(signals);
279 assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
280
281 let signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED).extend_lifetime();
283 let key = signals.registration.as_ref().unwrap().key();
284 std::mem::drop(signals);
285 assert!(ehandle.port().cancel(&event, key) == Ok(()));
286 });
287
288 assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
289 }
290
291 #[test]
292 fn test_always_polls() {
293 let mut exec = TestExecutor::new();
294
295 let (rx, tx) = zx::Channel::create();
296
297 let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
298
299 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
300
301 tx.write(b"hello", &mut []).expect("write failed");
302
303 struct Waker;
304 impl ArcWake for Waker {
305 fn wake_by_ref(_arc_self: &Arc<Self>) {}
306 }
307
308 assert_matches!(
311 fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
312 Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
313 );
314 }
315
316 #[test]
317 fn test_take_handle() {
318 let mut exec = TestExecutor::new();
319
320 let (rx, tx) = zx::Channel::create();
321
322 let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
323
324 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
325
326 tx.write(b"hello", &mut []).expect("write failed");
327
328 assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
329
330 let mut message = zx::MessageBuf::new();
331 fut.take_handle().read(&mut message).unwrap();
332
333 assert_eq!(message.bytes(), b"hello");
334 }
335}