fuchsia_async/handle/zircon/rwhandle.rs
1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
6use crate::OnSignalsRef;
7use std::sync::{Arc, Mutex};
8use std::task::{ready, Context, Poll, Waker};
9use zx::{self as zx, AsHandleRef};
10
11const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
12const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
13const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
14
15/// State of an object when it is ready for reading.
16#[derive(Debug, PartialEq, Eq, Copy, Clone)]
17pub enum ReadableState {
18 /// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
19 Readable,
20 /// Received `OBJECT_PEER_CLOSED`. The object might also be readable.
21 MaybeReadableAndClosed,
22}
23
24/// State of an object when it is ready for writing.
25#[derive(Debug, PartialEq, Eq, Copy, Clone)]
26pub enum WritableState {
27 /// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
28 Writable,
29 /// Received `OBJECT_PEER_CLOSED`.
30 Closed,
31}
32
33/// A `Handle` that receives notifications when it is readable.
34///
35/// # Examples
36///
37/// ```
38/// loop {
39/// ready!(self.poll_readable(cx))?;
40/// match /* make read syscall */ {
41/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
42/// status => return Poll::Ready(status),
43/// }
44/// }
45/// ```
46pub trait ReadableHandle {
47 /// If the object is ready for reading, returns `Ready` with the readable
48 /// state. If the implementor returns Pending, it should first ensure that
49 /// `need_readable` is called.
50 ///
51 /// This should be called in a poll function. If the syscall returns
52 /// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
53 /// object is readable.
54 ///
55 /// The returned `ReadableState` does not necessarily reflect an observed
56 /// `OBJECT_READABLE` signal. We optimistically assume the object remains
57 /// readable until `need_readable` is called.
58 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;
59
60 /// Arranges for the current task to be woken when the object receives an
61 /// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal. This can return
62 /// Poll::Ready if the object has already been signaled in which case the
63 /// waker *will* not be woken and it is the caller's responsibility to not
64 /// lose the signal.
65 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
66}
67
68/// A `Handle` that receives notifications when it is writable.
69///
70/// # Examples
71///
72/// ```
73/// loop {
74/// ready!(self.poll_writable(cx))?;
75/// match /* make write syscall */ {
76/// Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
77/// status => Poll::Ready(status),
78/// }
79/// }
80/// ```
81pub trait WritableHandle {
82 /// If the object is ready for writing, returns `Ready` with the writable
83 /// state. If the implementor returns Pending, it should first ensure that
84 /// `need_writable` is called.
85 ///
86 /// This should be called in a poll function. If the syscall returns
87 /// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
88 /// object is writable.
89 ///
90 /// The returned `WritableState` does not necessarily reflect an observed
91 /// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
92 /// writable until `need_writable` is called.
93 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;
94
95 /// Arranges for the current task to be woken when the object receives an
96 /// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
97 /// Poll::Ready if the object has already been signaled in which case the
98 /// waker *will* not be woken and it is the caller's responsibility to not
99 /// lose the signal.
100 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
101}
102
103struct RWPacketReceiver(Mutex<Inner>);
104
105struct Inner {
106 signals: zx::Signals,
107 read_task: Option<Waker>,
108 write_task: Option<Waker>,
109}
110
111impl PacketReceiver for RWPacketReceiver {
112 fn receive_packet(&self, packet: zx::Packet) {
113 let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
114 // Only consider the signals that were part of the trigger. This
115 // ensures that only the packets generated by the correct signal
116 // observer (Read or Write) can raise the corresponding inner signal
117 // bit.
118 //
119 // Without this, we can lose track of how many port observers are
120 // installed.
121 p.observed() & p.trigger()
122 } else {
123 return;
124 };
125
126 // We wake the tasks when the lock isn't held in case the wakers need the same lock.
127 let mut read_task = None;
128 let mut write_task = None;
129 {
130 let mut inner = self.0.lock().unwrap();
131 let old = inner.signals;
132 inner.signals |= new;
133
134 let became_readable = new.contains(OBJECT_READABLE) && !old.contains(OBJECT_READABLE);
135 let became_writable = new.contains(OBJECT_WRITABLE) && !old.contains(OBJECT_WRITABLE);
136 let became_closed =
137 new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
138
139 if became_readable || became_closed {
140 read_task = inner.read_task.take();
141 }
142 if became_writable || became_closed {
143 write_task = inner.write_task.take();
144 }
145 }
146 // *NOTE*: This is the only safe place to wake wakers. In any other location, there is a
147 // risk that locks are held which might be required when the waker is woken. It is safe to
148 // wake here because this is called from the executor when no locks are held.
149 if let Some(read_task) = read_task {
150 read_task.wake();
151 }
152 if let Some(write_task) = write_task {
153 write_task.wake();
154 }
155 }
156}
157
158/// A `Handle` that receives notifications when it is readable/writable.
159pub struct RWHandle<T> {
160 handle: T,
161 receiver: ReceiverRegistration<RWPacketReceiver>,
162}
163
164impl<T> RWHandle<T>
165where
166 T: AsHandleRef,
167{
168 /// Creates a new `RWHandle` object which will receive notifications when
169 /// the underlying handle becomes readable, writable, or closes.
170 ///
171 /// # Panics
172 ///
173 /// If called outside the context of an active async executor.
174 pub fn new(handle: T) -> Self {
175 let ehandle = EHandle::local();
176
177 let initial_signals = OBJECT_READABLE | OBJECT_WRITABLE;
178 let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver(Mutex::new(Inner {
179 // Optimistically assume that the handle is readable and writable.
180 // Reads and writes will be attempted before queueing a packet.
181 // This makes handles slightly faster to read/write the first time
182 // they're accessed after being created, provided they start off as
183 // readable or writable. In return, there will be an extra wasted
184 // syscall per read/write if the handle is not readable or writable.
185 signals: initial_signals,
186 read_task: None,
187 write_task: None,
188 }))));
189
190 RWHandle { handle, receiver }
191 }
192
193 /// Returns a reference to the underlying handle.
194 pub fn get_ref(&self) -> &T {
195 &self.handle
196 }
197
198 /// Returns a mutable reference to the underlying handle.
199 pub fn get_mut(&mut self) -> &mut T {
200 &mut self.handle
201 }
202
203 /// Consumes `self` and returns the underlying handle.
204 pub fn into_inner(self) -> T {
205 self.handle
206 }
207
208 /// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
209 pub fn is_closed(&self) -> bool {
210 let signals = self.receiver().0.lock().unwrap().signals;
211 if signals.contains(OBJECT_PEER_CLOSED) {
212 return true;
213 }
214
215 // The signals bitset might not be updated if we haven't gotten around to processing the
216 // packet telling us that yet. To provide an up-to-date response, we query the current
217 // state of the signal.
218 //
219 // Note: we _could_ update the bitset with what we find here, if we're careful to also
220 // update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
221 // But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
222 // we just leave the bitset as-is and let the regular notification mechanism get around to
223 // it when it gets around to it.
224 match self
225 .handle
226 .wait_handle(OBJECT_PEER_CLOSED, zx::MonotonicInstant::INFINITE_PAST)
227 .to_result()
228 {
229 Ok(_) => true,
230 Err(zx::Status::TIMED_OUT) => false,
231 Err(status) => {
232 // None of the other documented error statuses should be possible, either the type
233 // system doesn't allow it or the wait from `RWHandle::new()` would have already
234 // failed.
235 unreachable!("status: {status}")
236 }
237 }
238 }
239
240 /// Returns a future that completes when `is_closed()` is true.
241 pub fn on_closed(&self) -> OnSignalsRef<'_> {
242 OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
243 }
244
245 fn receiver(&self) -> &RWPacketReceiver {
246 self.receiver.receiver()
247 }
248
249 fn need_signal(&self, cx: &mut Context<'_>, signal: Signal) -> Poll<Result<(), zx::Status>> {
250 let mut inner = self.receiver.0.lock().unwrap();
251 let old = inner.signals;
252 if old.contains(zx::Signals::OBJECT_PEER_CLOSED) {
253 // We don't want to return an error here because even though the peer has closed, the
254 // object could still have queued messages that can be read.
255 Poll::Ready(Ok(()))
256 } else {
257 let waker = cx.waker().clone();
258 let signal = match signal {
259 Signal::Read => {
260 inner.read_task = Some(waker);
261 zx::Signals::OBJECT_READABLE
262 }
263 Signal::Write => {
264 inner.write_task = Some(waker);
265 zx::Signals::OBJECT_WRITABLE
266 }
267 };
268 if old.contains(signal) {
269 inner.signals &= !signal;
270 std::mem::drop(inner);
271 self.handle.wait_async_handle(
272 self.receiver.port(),
273 self.receiver.key(),
274 signal | zx::Signals::OBJECT_PEER_CLOSED,
275 zx::WaitAsyncOpts::empty(),
276 )?;
277 }
278 Poll::Pending
279 }
280 }
281}
282
283enum Signal {
284 Read,
285 Write,
286}
287
288impl<T> ReadableHandle for RWHandle<T>
289where
290 T: AsHandleRef,
291{
292 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
293 loop {
294 let signals = self.receiver().0.lock().unwrap().signals;
295 match (signals.contains(OBJECT_READABLE), signals.contains(OBJECT_PEER_CLOSED)) {
296 (true, false) => return Poll::Ready(Ok(ReadableState::Readable)),
297 (_, true) => return Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed)),
298 (false, false) => {
299 ready!(self.need_signal(cx, Signal::Read)?)
300 }
301 }
302 }
303 }
304
305 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
306 self.need_signal(cx, Signal::Read)
307 }
308}
309
310impl<T> WritableHandle for RWHandle<T>
311where
312 T: AsHandleRef,
313{
314 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
315 loop {
316 let signals = self.receiver().0.lock().unwrap().signals;
317 match (signals.contains(OBJECT_WRITABLE), signals.contains(OBJECT_PEER_CLOSED)) {
318 (_, true) => return Poll::Ready(Ok(WritableState::Closed)),
319 (true, _) => return Poll::Ready(Ok(WritableState::Writable)),
320 (false, false) => {
321 ready!(self.need_signal(cx, Signal::Write)?)
322 }
323 }
324 }
325 }
326
327 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
328 self.need_signal(cx, Signal::Write)
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use crate::TestExecutor;
336
337 #[test]
338 fn is_closed_immediately_after_close() {
339 let mut exec = TestExecutor::new();
340 let (tx, rx) = zx::Channel::create();
341 let rx_rw_handle = RWHandle::new(rx);
342 let mut noop_ctx = Context::from_waker(futures::task::noop_waker_ref());
343 // Clear optimistic readable state
344 assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
345 // Starting state: the channel is not closed (because we haven't closed it yet)
346 assert!(!rx_rw_handle.is_closed());
347 // we will never set readable, so this should be Pending until we close
348 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
349
350 drop(tx);
351
352 // Implementation note: the cached state will not be updated yet
353 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
354 // But is_closed should return true immediately
355 assert!(rx_rw_handle.is_closed());
356 // Still not updated, and won't be until we let the executor process port packets
357 assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
358 // So we do
359 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
360 // And now it is updated, so we observe Closed
361 assert_eq!(
362 rx_rw_handle.poll_readable(&mut noop_ctx),
363 Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed))
364 );
365 // And is_closed should still be true, of course.
366 assert!(rx_rw_handle.is_closed());
367 }
368
369 // Regression test for https://fxbug.dev/417333384.
370 #[test]
371 fn simultaneous_read_and_write() {
372 let mut exec = TestExecutor::new();
373 let (peer, local) = zx::Socket::create_stream();
374 let mut buff = [0u8; 1024];
375 while local.write(&buff[..]).is_ok() {}
376
377 let rw_handle = RWHandle::new(local);
378 let read_fut = futures::future::poll_fn(|cx| {
379 let readable = ready!(rw_handle.poll_readable(cx));
380 assert_eq!(readable, Ok(ReadableState::Readable));
381 let mut buf = [0u8; 2];
382 loop {
383 match rw_handle.get_ref().read(&mut buf[..]) {
384 Ok(r) => assert_eq!(r, 1),
385 Err(e) => {
386 assert_eq!(e, zx::Status::SHOULD_WAIT);
387 break;
388 }
389 }
390 }
391 assert_eq!(rw_handle.need_readable(cx), Poll::Pending);
392 Poll::<()>::Pending
393 });
394
395 let write_fut = futures::future::poll_fn(|cx| {
396 let writable = ready!(rw_handle.poll_writable(cx));
397 assert_eq!(writable, Ok(WritableState::Writable));
398 let buf = [0u8; 1];
399 loop {
400 match rw_handle.get_ref().write(&buf[..]) {
401 Ok(r) => assert_eq!(r, 1),
402 Err(e) => {
403 assert_eq!(e, zx::Status::SHOULD_WAIT);
404 break;
405 }
406 }
407 }
408 assert_eq!(rw_handle.need_writable(cx), Poll::Pending);
409 Poll::<()>::Pending
410 });
411
412 let mut fut = std::pin::pin!(futures::future::join(write_fut, read_fut));
413 for _ in 0..5 {
414 assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
415 assert_eq!(peer.read(&mut buff[0..1]), Ok(1));
416 assert_eq!(peer.write(&buff[0..1]), Ok(1));
417 }
418
419 let mut read_waits = 0;
420 let mut write_waits = 0;
421 while let Ok(p) = exec.port().wait(zx::MonotonicInstant::INFINITE_PAST) {
422 if p.key() != rw_handle.receiver.key() {
423 continue;
424 }
425 let p = match p.contents() {
426 zx::PacketContents::SignalOne(p) => p,
427 e => panic!("unexpected packet {e:?}"),
428 };
429 if p.trigger().contains(zx::Signals::OBJECT_READABLE) {
430 read_waits += 1;
431 }
432 if p.trigger().contains(zx::Signals::OBJECT_WRITABLE) {
433 write_waits += 1;
434 }
435 }
436 // We should not have installed more than 1 waiter for each side of the
437 // operation.
438 assert_eq!((read_waits, write_waits), (1, 1));
439 }
440}