fuchsia_async/handle/zircon/
on_signals.rs1use std::future::Future;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use std::task::Poll;
11use std::fmt;
12
13use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
14use futures::task::{AtomicWaker, Context};
15use zx::{self as zx, AsHandleRef};
16
17struct OnSignalsReceiver {
18 maybe_signals: AtomicUsize,
19 task: AtomicWaker,
20}
21
22impl OnSignalsReceiver {
23 fn get_signals(&self, cx: &mut Context<'_>) -> Poll<zx::Signals> {
24 let mut signals = self.maybe_signals.load(Ordering::Relaxed);
25 if signals == 0 {
26 self.task.register(cx.waker());
28 signals = self.maybe_signals.load(Ordering::SeqCst);
31 }
32 if signals == 0 {
33 Poll::Pending
34 } else {
35 Poll::Ready(zx::Signals::from_bits_truncate(signals as u32))
36 }
37 }
38
39 fn set_signals(&self, signals: zx::Signals) {
40 self.maybe_signals.store(signals.bits() as usize, Ordering::SeqCst);
41 self.task.wake();
42 }
43}
44
45impl PacketReceiver for OnSignalsReceiver {
46 fn receive_packet(&self, packet: zx::Packet) {
47 let observed = if let zx::PacketContents::SignalOne(p) = packet.contents() {
48 p.observed()
49 } else {
50 return;
51 };
52
53 self.set_signals(observed);
54 }
55}
56
57#[must_use = "futures do nothing unless polled"]
59pub struct OnSignals<'a, H: AsHandleRef> {
60 handle: H,
61 signals: zx::Signals,
62 registration: Option<ReceiverRegistration<OnSignalsReceiver>>,
63 phantom: PhantomData<&'a H>,
64}
65
66pub type OnSignalsRef<'a> = OnSignals<'a, zx::HandleRef<'a>>;
68
69impl<'a, H: AsHandleRef + 'a> OnSignals<'a, H> {
70 pub fn new(handle: H, signals: zx::Signals) -> Self {
73 OnSignals { handle, signals, registration: None, phantom: PhantomData }
85 }
86
87 pub fn take_handle(mut self) -> H
89 where
90 H: zx::HandleBased,
91 {
92 self.unregister();
93 std::mem::replace(&mut self.handle, zx::Handle::invalid().into())
94 }
95
96 fn register(
97 &self,
98 cx: Option<&mut Context<'_>>,
99 ) -> Result<ReceiverRegistration<OnSignalsReceiver>, zx::Status> {
100 let registration = EHandle::local().register_receiver(Arc::new(OnSignalsReceiver {
101 maybe_signals: AtomicUsize::new(0),
102 task: AtomicWaker::new(),
103 }));
104
105 if let Some(cx) = cx {
108 registration.task.register(cx.waker());
109 }
110
111 self.handle.wait_async_handle(
112 registration.port(),
113 registration.key(),
114 self.signals,
115 zx::WaitAsyncOpts::empty(),
116 )?;
117
118 Ok(registration)
119 }
120
121 fn unregister(&mut self) {
122 if let Some(registration) = self.registration.take() {
123 if registration.receiver().maybe_signals.load(Ordering::SeqCst) == 0 {
124 let _ = registration.port().cancel(&self.handle, registration.key());
128 }
129 }
130 }
131}
132
133impl<H: AsHandleRef + Unpin> Future for OnSignals<'_, H> {
134 type Output = Result<zx::Signals, zx::Status>;
135 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136 match &self.registration {
137 None => {
138 match self
139 .handle
140 .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
141 .to_result()
142 {
143 Ok(signals) => Poll::Ready(Ok(signals)),
144 Err(zx::Status::TIMED_OUT) => {
145 let registration = self.register(Some(cx))?;
146 self.get_mut().registration = Some(registration);
147 Poll::Pending
148 }
149 Err(e) => Poll::Ready(Err(e)),
150 }
151 }
152 Some(r) => match r.receiver().get_signals(cx) {
153 Poll::Ready(signals) => Poll::Ready(Ok(signals)),
154 Poll::Pending => {
155 match self
165 .handle
166 .wait_handle(self.signals, zx::MonotonicInstant::INFINITE_PAST)
167 .to_result()
168 {
169 Ok(signals) => Poll::Ready(Ok(signals)),
170 Err(_) => Poll::Pending,
171 }
172 }
173 },
174 }
175 }
176}
177
178impl<H: AsHandleRef> fmt::Debug for OnSignals<'_, H> {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 write!(f, "OnSignals")
181 }
182}
183
184impl<H: AsHandleRef> Drop for OnSignals<'_, H> {
185 fn drop(&mut self) {
186 self.unregister();
187 }
188}
189
190impl<H: AsHandleRef> AsHandleRef for OnSignals<'_, H> {
191 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
192 self.handle.as_handle_ref()
193 }
194}
195
196impl<H: AsHandleRef> AsRef<H> for OnSignals<'_, H> {
197 fn as_ref(&self) -> &H {
198 &self.handle
199 }
200}
201
202#[cfg(test)]
203mod test {
204 use super::*;
205 use crate::TestExecutor;
206 use assert_matches::assert_matches;
207 use futures::future::{pending, FutureExt};
208 use futures::task::{waker, ArcWake};
209 use std::pin::pin;
210
211 #[test]
212 fn wait_for_event() -> Result<(), zx::Status> {
213 let mut exec = crate::TestExecutor::new();
214 let mut deliver_events =
215 || assert!(exec.run_until_stalled(&mut pending::<()>()).is_pending());
216
217 let event = zx::Event::create();
218 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
219 let (waker, waker_count) = futures_test::task::new_count_waker();
220 let cx = &mut std::task::Context::from_waker(&waker);
221
222 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
224 deliver_events();
225 assert_eq!(waker_count, 0);
226 assert_eq!(signals.poll_unpin(cx), Poll::Pending);
227
228 event.signal_handle(zx::Signals::NONE, zx::Signals::EVENT_SIGNALED)?;
231 deliver_events();
232 assert_eq!(waker_count, 1);
233 assert_eq!(signals.poll_unpin(cx), Poll::Ready(Ok(zx::Signals::EVENT_SIGNALED)));
234
235 Ok(())
236 }
237
238 #[test]
239 fn drop_before_event() {
240 let mut fut = std::pin::pin!(async {
241 let ehandle = EHandle::local();
242
243 let event = zx::Event::create();
244 let mut signals = OnSignals::new(&event, zx::Signals::EVENT_SIGNALED);
245 assert_eq!(futures::poll!(&mut signals), Poll::Pending);
246 let key = signals.registration.as_ref().unwrap().key();
247
248 std::mem::drop(signals);
249 assert!(ehandle.port().cancel(&event, key) == Err(zx::Status::NOT_FOUND));
250 });
251
252 assert!(TestExecutor::new().run_until_stalled(&mut fut).is_ready());
253 }
254
255 #[test]
256 fn test_always_polls() {
257 let mut exec = TestExecutor::new();
258
259 let (rx, tx) = zx::Channel::create();
260
261 let mut fut = pin!(OnSignals::new(&rx, zx::Signals::CHANNEL_READABLE));
262
263 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
264
265 tx.write(b"hello", &mut []).expect("write failed");
266
267 struct Waker;
268 impl ArcWake for Waker {
269 fn wake_by_ref(_arc_self: &Arc<Self>) {}
270 }
271
272 assert_matches!(
275 fut.poll(&mut Context::from_waker(&waker(Arc::new(Waker)))),
276 Poll::Ready(Ok(signals)) if signals.contains(zx::Signals::CHANNEL_READABLE)
277 );
278 }
279
280 #[test]
281 fn test_take_handle() {
282 let mut exec = TestExecutor::new();
283
284 let (rx, tx) = zx::Channel::create();
285
286 let mut fut = OnSignals::new(rx, zx::Signals::CHANNEL_READABLE);
287
288 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
289
290 tx.write(b"hello", &mut []).expect("write failed");
291
292 assert_matches!(exec.run_until_stalled(&mut fut), Poll::Ready(Ok(_)));
293
294 let mut message = zx::MessageBuf::new();
295 fut.take_handle().read(&mut message).unwrap();
296
297 assert_eq!(message.bytes(), b"hello");
298 }
299}