fuchsia_async/handle/zircon/rwhandle.rs
1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
6use crate::OnSignalsRef;
7use std::marker::PhantomData;
8use std::sync::{Arc, Mutex};
9use std::task::{ready, Context, Poll, Waker};
10use zx::{self as zx, AsHandleRef};
11
12const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
13const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
14const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
15
16/// State of an object when it is ready for reading.
17#[derive(Debug, PartialEq, Eq, Copy, Clone)]
18pub enum ReadableState {
19 /// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
20 Readable,
21 /// Received `OBJECT_PEER_CLOSED`. The object might also be readable.
22 MaybeReadableAndClosed,
23}
24
25/// State of an object when it is ready for writing.
26#[derive(Debug, PartialEq, Eq, Copy, Clone)]
27pub enum WritableState {
28 /// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
29 Writable,
30 /// Received `OBJECT_PEER_CLOSED`.
31 Closed,
32}
33
34/// A `Handle` that receives notifications when it is readable.
35///
36/// # Examples
37///
38/// ```
39/// loop {
40/// ready!(self.poll_readable(cx))?;
41/// match /* make read syscall */ {
42/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
43/// status => return Poll::Ready(status),
44/// }
45/// }
46/// ```
47pub trait ReadableHandle {
48 /// If the object is ready for reading, returns `Ready` with the readable
49 /// state. If the implementor returns Pending, it should first ensure that
50 /// `need_readable` is called.
51 ///
52 /// This should be called in a poll function. If the syscall returns
53 /// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
54 /// object is readable.
55 ///
56 /// The returned `ReadableState` does not necessarily reflect an observed
57 /// `OBJECT_READABLE` signal. We optimistically assume the object remains
58 /// readable until `need_readable` is called.
59 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;
60
61 /// Arranges for the current task to be woken when the object receives an
62 /// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal. This can return
63 /// Poll::Ready if the object has already been signaled in which case the
64 /// waker *will* not be woken and it is the caller's responsibility to not
65 /// lose the signal.
66 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
67}
68
69/// A `Handle` that receives notifications when it is writable.
70///
71/// # Examples
72///
73/// ```
74/// loop {
75/// ready!(self.poll_writable(cx))?;
76/// match /* make write syscall */ {
77/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
78/// status => Poll::Ready(status),
79/// }
80/// }
81/// ```
82pub trait WritableHandle {
83 /// If the object is ready for writing, returns `Ready` with the writable
84 /// state. If the implementor returns Pending, it should first ensure that
85 /// `need_writable` is called.
86 ///
87 /// This should be called in a poll function. If the syscall returns
88 /// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
89 /// object is writable.
90 ///
91 /// The returned `WritableState` does not necessarily reflect an observed
92 /// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
93 /// writable until `need_writable` is called.
94 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;
95
96 /// Arranges for the current task to be woken when the object receives an
97 /// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
98 /// Poll::Ready if the object has already been signaled in which case the
99 /// waker *will* not be woken and it is the caller's responsibility to not
100 /// lose the signal.
101 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
102}
103
104struct RWPacketReceiver<S: RWHandleSpec> {
105 inner: Mutex<Inner>,
106 _marker: PhantomData<S>,
107}
108
109struct Inner {
110 signals: zx::Signals,
111 read_task: Option<Waker>,
112 write_task: Option<Waker>,
113}
114
115impl<S: RWHandleSpec> PacketReceiver for RWPacketReceiver<S> {
116 fn receive_packet(&self, packet: zx::Packet) {
117 let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
118 // Only consider the signals that were part of the trigger. This
119 // ensures that only the packets generated by the correct signal
120 // observer (Read or Write) can raise the corresponding inner signal
121 // bit.
122 //
123 // Without this, we can lose track of how many port observers are
124 // installed.
125 p.observed() & p.trigger()
126 } else {
127 return;
128 };
129
130 // We wake the tasks when the lock isn't held in case the wakers need the same lock.
131 let mut read_task = None;
132 let mut write_task = None;
133 {
134 let mut inner = self.inner.lock().unwrap();
135 let old = inner.signals;
136 inner.signals |= new;
137
138 let became_readable =
139 new.intersection(S::READABLE_SIGNALS) != old.intersection(S::READABLE_SIGNALS);
140 let became_writable =
141 new.intersection(S::WRITABLE_SIGNALS) != old.intersection(S::WRITABLE_SIGNALS);
142 let became_closed =
143 new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
144 if became_readable || became_closed {
145 read_task = inner.read_task.take();
146 }
147 if became_writable || became_closed {
148 write_task = inner.write_task.take();
149 }
150 }
151 // *NOTE*: This is the only safe place to wake wakers. In any other location, there is a
152 // risk that locks are held which might be required when the waker is woken. It is safe to
153 // wake here because this is called from the executor when no locks are held.
154 if let Some(read_task) = read_task {
155 read_task.wake();
156 }
157 if let Some(write_task) = write_task {
158 write_task.wake();
159 }
160 }
161}
162
163/// A `Handle` that receives notifications when it is readable/writable.
164pub struct RWHandle<T, S: RWHandleSpec = DefaultRWHandleSpec> {
165 handle: T,
166 receiver: ReceiverRegistration<RWPacketReceiver<S>>,
167}
168
169impl<T> RWHandle<T, DefaultRWHandleSpec>
170where
171 T: AsHandleRef,
172{
173 /// Creates a new `RWHandle` object which will receive notifications when
174 /// the underlying handle becomes readable, writable, or closes.
175 ///
176 /// # Panics
177 ///
178 /// If called outside the context of an active async executor.
179 pub fn new(handle: T) -> Self {
180 Self::new_with_spec(handle)
181 }
182}
183
184impl<T, S> RWHandle<T, S>
185where
186 T: AsHandleRef,
187 S: RWHandleSpec,
188{
189 /// Creates a new `RWHandle` with a non-default spec and an object which
190 /// will receive notifications when the underlying handle becomes readable,
191 /// writable, or closes.
192 ///
193 /// # Panics
194 ///
195 /// If called outside the context of an active async executor.
196 ///
197 /// If `S::READABLE_SIGNALS` or `S::WRITABLE_SIGNALS` contain
198 /// [`zx::Signals::OBJECT_PEER_CLOSED`] or have a non-empty intersection.
199 pub fn new_with_spec(handle: T) -> Self {
200 let ehandle = EHandle::local();
201 let internal_signals = OBJECT_PEER_CLOSED;
202 assert!(
203 !S::READABLE_SIGNALS.contains(internal_signals),
204 "readable signals may not contain ({internal_signals:?})"
205 );
206 assert!(
207 !S::WRITABLE_SIGNALS.contains(internal_signals),
208 "writable signals may not contain ({internal_signals:?})"
209 );
210 assert!(!S::WRITABLE_SIGNALS.intersects(S::READABLE_SIGNALS), "signals may not intersect");
211 let initial_signals = S::WRITABLE_SIGNALS | S::READABLE_SIGNALS;
212 let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver {
213 inner: Mutex::new(Inner {
214 // Optimistically assume that the handle is readable and writable.
215 // Reads and writes will be attempted before queueing a packet.
216 // This makes handles slightly faster to read/write the first time
217 // they're accessed after being created, provided they start off as
218 // readable or writable. In return, there will be an extra wasted
219 // syscall per read/write if the handle is not readable or writable.
220 signals: initial_signals,
221 read_task: None,
222 write_task: None,
223 }),
224 _marker: PhantomData,
225 }));
226
227 RWHandle { handle, receiver }
228 }
229
230 /// Returns a reference to the underlying handle.
231 pub fn get_ref(&self) -> &T {
232 &self.handle
233 }
234
235 /// Returns a mutable reference to the underlying handle.
236 pub fn get_mut(&mut self) -> &mut T {
237 &mut self.handle
238 }
239
240 /// Consumes `self` and returns the underlying handle.
241 pub fn into_inner(self) -> T {
242 self.handle
243 }
244
245 /// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
246 pub fn is_closed(&self) -> bool {
247 let signals = self.receiver().inner.lock().unwrap().signals;
248 if signals.contains(OBJECT_PEER_CLOSED) {
249 return true;
250 }
251
252 // The signals bitset might not be updated if we haven't gotten around to processing the
253 // packet telling us that yet. To provide an up-to-date response, we query the current
254 // state of the signal.
255 //
256 // Note: we _could_ update the bitset with what we find here, if we're careful to also
257 // update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
258 // But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
259 // we just leave the bitset as-is and let the regular notification mechanism get around to
260 // it when it gets around to it.
261 match self
262 .handle
263 .wait_handle(OBJECT_PEER_CLOSED, zx::MonotonicInstant::INFINITE_PAST)
264 .to_result()
265 {
266 Ok(_) => true,
267 Err(zx::Status::TIMED_OUT) => false,
268 Err(status) => {
269 // None of the other documented error statuses should be possible, either the type
270 // system doesn't allow it or the wait from `RWHandle::new()` would have already
271 // failed.
272 unreachable!("status: {status}")
273 }
274 }
275 }
276
277 /// Returns a future that completes when `is_closed()` is true.
278 pub fn on_closed(&self) -> OnSignalsRef<'_> {
279 OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
280 }
281
282 fn receiver(&self) -> &RWPacketReceiver<S> {
283 self.receiver.receiver()
284 }
285
286 fn need_signal(&self, cx: &mut Context<'_>, signal: Signal) -> Poll<Result<(), zx::Status>> {
287 let mut inner = self.receiver.inner.lock().unwrap();
288 let old = inner.signals;
289 if old.contains(OBJECT_PEER_CLOSED) {
290 // We don't want to return an error here because even though the peer has closed, the
291 // object could still have queued messages that can be read.
292 Poll::Ready(Ok(()))
293 } else {
294 let waker = cx.waker().clone();
295 let signal = match signal {
296 Signal::Read => {
297 inner.read_task = Some(waker);
298 S::READABLE_SIGNALS
299 }
300 Signal::Write => {
301 inner.write_task = Some(waker);
302 S::WRITABLE_SIGNALS
303 }
304 };
305 if old.intersects(signal) {
306 inner.signals &= !signal;
307 std::mem::drop(inner);
308 self.handle.wait_async_handle(
309 self.receiver.port(),
310 self.receiver.key(),
311 signal | OBJECT_PEER_CLOSED,
312 zx::WaitAsyncOpts::empty(),
313 )?;
314 }
315 Poll::Pending
316 }
317 }
318
319 fn poll_signal(
320 &self,
321 cx: &mut Context<'_>,
322 signal: Signal,
323 ) -> Poll<Result<zx::Signals, zx::Status>> {
324 let mask = match signal {
325 Signal::Read => S::READABLE_SIGNALS,
326 Signal::Write => S::WRITABLE_SIGNALS,
327 } | OBJECT_PEER_CLOSED;
328
329 loop {
330 let signals = self.receiver().inner.lock().unwrap().signals;
331 let asserted = signals.intersection(mask);
332 if !asserted.is_empty() {
333 return Poll::Ready(Ok(asserted));
334 }
335 ready!(self.need_signal(cx, signal)?);
336 }
337 }
338}
339
340#[derive(Copy, Clone)]
341enum Signal {
342 Read,
343 Write,
344}
345
346impl<T, S> ReadableHandle for RWHandle<T, S>
347where
348 T: AsHandleRef,
349 S: RWHandleSpec,
350{
351 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
352 let signals = ready!(self.poll_signal(cx, Signal::Read)?);
353 if signals.contains(OBJECT_PEER_CLOSED) {
354 return Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed));
355 }
356 Poll::Ready(Ok(ReadableState::Readable))
357 }
358
359 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
360 self.need_signal(cx, Signal::Read)
361 }
362}
363
364impl<T, S> WritableHandle for RWHandle<T, S>
365where
366 T: AsHandleRef,
367 S: RWHandleSpec,
368{
369 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
370 let signals = ready!(self.poll_signal(cx, Signal::Write)?);
371 if signals.contains(OBJECT_PEER_CLOSED) {
372 return Poll::Ready(Ok(WritableState::Closed));
373 }
374 Poll::Ready(Ok(WritableState::Writable))
375 }
376
377 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
378 self.need_signal(cx, Signal::Write)
379 }
380}
381
382/// A trait specifying the behavior of [`RWHandle`].
383///
384/// The default behavior, listening for [`zx::Signals::OBJECT_READABLE`] and
385/// [`zx::Signals::OBJECT_WRITABLE`] is provided by [`DefaultRWHandleSpec`].
386pub trait RWHandleSpec: Send + Sync + 'static {
387 /// Signals asserted when the handle is readable.
388 ///
389 /// [`RWHandle`] installs wait for these signals and if any of them are
390 /// asserted, the handle is considered readable.
391 const READABLE_SIGNALS: zx::Signals;
392 /// Signals asserted when the handle is writable.
393 ///
394 /// [`RWHandle`] installs wait for these signals and if any of them are
395 /// asserted, the handle is considered writable.
396 const WRITABLE_SIGNALS: zx::Signals;
397}
398
399/// The default behavior for [`RWHandle`].
400///
401/// Considers the handle readable when [`zx::Signals::OBJECT_READABLE`] is set,
402/// and writable when [`zx::Signals::OBJECT_WRITABLE`] is set.
403pub struct DefaultRWHandleSpec;
404
405impl RWHandleSpec for DefaultRWHandleSpec {
406 const READABLE_SIGNALS: zx::Signals = OBJECT_READABLE;
407 const WRITABLE_SIGNALS: zx::Signals = OBJECT_WRITABLE;
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use crate::TestExecutor;
414
415 #[test]
416 fn is_closed_immediately_after_close() {
417 let mut exec = TestExecutor::new();
418 let (tx, rx) = zx::Channel::create();
419 let rx_rw_handle = RWHandle::new(rx);
420 let mut noop_ctx = Context::from_waker(futures::task::noop_waker_ref());
421 // Clear optimistic readable state
422 assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
423 // Starting state: the channel is not closed (because we haven't closed it yet)
424 assert!(!rx_rw_handle.is_closed());
425 // we will never set readable, so this should be Pending until we close
426 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
427
428 drop(tx);
429
430 // Implementation note: the cached state will not be updated yet
431 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
432 // But is_closed should return true immediately
433 assert!(rx_rw_handle.is_closed());
434 // Still not updated, and won't be until we let the executor process port packets
435 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
436 // So we do
437 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
438 // And now it is updated, so we observe Closed
439 assert_eq!(
440 rx_rw_handle.poll_readable(&mut noop_ctx),
441 Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed))
442 );
443 // And is_closed should still be true, of course.
444 assert!(rx_rw_handle.is_closed());
445 }
446
447 // Regression test for https://fxbug.dev/417333384.
448 #[test]
449 fn simultaneous_read_and_write() {
450 let mut exec = TestExecutor::new();
451 let (peer, local) = zx::Socket::create_stream();
452 let mut buff = [0u8; 1024];
453 while local.write(&buff[..]).is_ok() {}
454
455 let rw_handle = RWHandle::new(local);
456 let read_fut = futures::future::poll_fn(|cx| {
457 let readable = ready!(rw_handle.poll_readable(cx));
458 assert_eq!(readable, Ok(ReadableState::Readable));
459 let mut buf = [0u8; 2];
460 loop {
461 match rw_handle.get_ref().read(&mut buf[..]) {
462 Ok(r) => assert_eq!(r, 1),
463 Err(e) => {
464 assert_eq!(e, zx::Status::SHOULD_WAIT);
465 break;
466 }
467 }
468 }
469 assert_eq!(rw_handle.need_readable(cx), Poll::Pending);
470 Poll::<()>::Pending
471 });
472
473 let write_fut = futures::future::poll_fn(|cx| {
474 let writable = ready!(rw_handle.poll_writable(cx));
475 assert_eq!(writable, Ok(WritableState::Writable));
476 let buf = [0u8; 1];
477 loop {
478 match rw_handle.get_ref().write(&buf[..]) {
479 Ok(r) => assert_eq!(r, 1),
480 Err(e) => {
481 assert_eq!(e, zx::Status::SHOULD_WAIT);
482 break;
483 }
484 }
485 }
486 assert_eq!(rw_handle.need_writable(cx), Poll::Pending);
487 Poll::<()>::Pending
488 });
489
490 let mut fut = std::pin::pin!(futures::future::join(write_fut, read_fut));
491 for _ in 0..5 {
492 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
493 assert_eq!(peer.read(&mut buff[0..1]), Ok(1));
494 assert_eq!(peer.write(&buff[0..1]), Ok(1));
495 }
496
497 let mut read_waits = 0;
498 let mut write_waits = 0;
499 while let Ok(p) = exec.port().wait(zx::MonotonicInstant::INFINITE_PAST) {
500 if p.key() != rw_handle.receiver.key() {
501 continue;
502 }
503 let p = match p.contents() {
504 zx::PacketContents::SignalOne(p) => p,
505 e => panic!("unexpected packet {e:?}"),
506 };
507 if p.trigger().contains(zx::Signals::OBJECT_READABLE) {
508 read_waits += 1;
509 }
510 if p.trigger().contains(zx::Signals::OBJECT_WRITABLE) {
511 write_waits += 1;
512 }
513 }
514 // We should not have installed more than 1 waiter for each side of the
515 // operation.
516 assert_eq!((read_waits, write_waits), (1, 1));
517 }
518}