fuchsia_async/handle/zircon/
rwhandle.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
6use crate::OnSignalsRef;
7use std::sync::{Arc, Mutex};
8use std::task::{ready, Context, Poll, Waker};
9use zx::{self as zx, AsHandleRef};
10
11const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
12const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
13const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
14
15/// State of an object when it is ready for reading.
16#[derive(Debug, PartialEq, Eq, Copy, Clone)]
17pub enum ReadableState {
18    /// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
19    Readable,
20    /// Received `OBJECT_PEER_CLOSED`.  The object might also be readable.
21    MaybeReadableAndClosed,
22}
23
24/// State of an object when it is ready for writing.
25#[derive(Debug, PartialEq, Eq, Copy, Clone)]
26pub enum WritableState {
27    /// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
28    Writable,
29    /// Received `OBJECT_PEER_CLOSED`.
30    Closed,
31}
32
33/// A `Handle` that receives notifications when it is readable.
34///
35/// # Examples
36///
37/// ```
38/// loop {
39///     ready!(self.poll_readable(cx))?;
40///     match /* make read syscall */ {
41///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
42///         status => return Poll::Ready(status),
43///     }
44/// }
45/// ```
46pub trait ReadableHandle {
47    /// If the object is ready for reading, returns `Ready` with the readable
48    /// state. If the implementor returns Pending, it should first ensure that
49    /// `need_readable` is called.
50    ///
51    /// This should be called in a poll function. If the syscall returns
52    /// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
53    /// object is readable.
54    ///
55    /// The returned `ReadableState` does not necessarily reflect an observed
56    /// `OBJECT_READABLE` signal. We optimistically assume the object remains
57    /// readable until `need_readable` is called.
58    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;
59
60    /// Arranges for the current task to be woken when the object receives an
61    /// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal.  This can return
62    /// Poll::Ready if the object has already been signaled in which case the
63    /// waker *will* not be woken and it is the caller's responsibility to not
64    /// lose the signal.
65    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
66}
67
68/// A `Handle` that receives notifications when it is writable.
69///
70/// # Examples
71///
72/// ```
73/// loop {
74///     ready!(self.poll_writable(cx))?;
75///     match /* make write syscall */ {
76///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
77///         status => Poll::Ready(status),
78///     }
79/// }
80/// ```
81pub trait WritableHandle {
82    /// If the object is ready for writing, returns `Ready` with the writable
83    /// state. If the implementor returns Pending, it should first ensure that
84    /// `need_writable` is called.
85    ///
86    /// This should be called in a poll function. If the syscall returns
87    /// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
88    /// object is writable.
89    ///
90    /// The returned `WritableState` does not necessarily reflect an observed
91    /// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
92    /// writable until `need_writable` is called.
93    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;
94
95    /// Arranges for the current task to be woken when the object receives an
96    /// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
97    /// Poll::Ready if the object has already been signaled in which case the
98    /// waker *will* not be woken and it is the caller's responsibility to not
99    /// lose the signal.
100    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
101}
102
103struct RWPacketReceiver(Mutex<Inner>);
104
105struct Inner {
106    signals: zx::Signals,
107    read_task: Option<Waker>,
108    write_task: Option<Waker>,
109}
110
111impl PacketReceiver for RWPacketReceiver {
112    fn receive_packet(&self, packet: zx::Packet) {
113        let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
114            // Only consider the signals that were part of the trigger. This
115            // ensures that only the packets generated by the correct signal
116            // observer (Read or Write) can raise the corresponding inner signal
117            // bit.
118            //
119            // Without this, we can lose track of how many port observers are
120            // installed.
121            p.observed() & p.trigger()
122        } else {
123            return;
124        };
125
126        // We wake the tasks when the lock isn't held in case the wakers need the same lock.
127        let mut read_task = None;
128        let mut write_task = None;
129        {
130            let mut inner = self.0.lock().unwrap();
131            let old = inner.signals;
132            inner.signals |= new;
133
134            let became_readable = new.contains(OBJECT_READABLE) && !old.contains(OBJECT_READABLE);
135            let became_writable = new.contains(OBJECT_WRITABLE) && !old.contains(OBJECT_WRITABLE);
136            let became_closed =
137                new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
138
139            if became_readable || became_closed {
140                read_task = inner.read_task.take();
141            }
142            if became_writable || became_closed {
143                write_task = inner.write_task.take();
144            }
145        }
146        // *NOTE*: This is the only safe place to wake wakers.  In any other location, there is a
147        // risk that locks are held which might be required when the waker is woken.  It is safe to
148        // wake here because this is called from the executor when no locks are held.
149        if let Some(read_task) = read_task {
150            read_task.wake();
151        }
152        if let Some(write_task) = write_task {
153            write_task.wake();
154        }
155    }
156}
157
158/// A `Handle` that receives notifications when it is readable/writable.
159pub struct RWHandle<T> {
160    handle: T,
161    receiver: ReceiverRegistration<RWPacketReceiver>,
162}
163
164impl<T> RWHandle<T>
165where
166    T: AsHandleRef,
167{
168    /// Creates a new `RWHandle` object which will receive notifications when
169    /// the underlying handle becomes readable, writable, or closes.
170    ///
171    /// # Panics
172    ///
173    /// If called outside the context of an active async executor.
174    pub fn new(handle: T) -> Self {
175        let ehandle = EHandle::local();
176
177        let initial_signals = OBJECT_READABLE | OBJECT_WRITABLE;
178        let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver(Mutex::new(Inner {
179            // Optimistically assume that the handle is readable and writable.
180            // Reads and writes will be attempted before queueing a packet.
181            // This makes handles slightly faster to read/write the first time
182            // they're accessed after being created, provided they start off as
183            // readable or writable. In return, there will be an extra wasted
184            // syscall per read/write if the handle is not readable or writable.
185            signals: initial_signals,
186            read_task: None,
187            write_task: None,
188        }))));
189
190        RWHandle { handle, receiver }
191    }
192
193    /// Returns a reference to the underlying handle.
194    pub fn get_ref(&self) -> &T {
195        &self.handle
196    }
197
198    /// Returns a mutable reference to the underlying handle.
199    pub fn get_mut(&mut self) -> &mut T {
200        &mut self.handle
201    }
202
203    /// Consumes `self` and returns the underlying handle.
204    pub fn into_inner(self) -> T {
205        self.handle
206    }
207
208    /// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
209    pub fn is_closed(&self) -> bool {
210        let signals = self.receiver().0.lock().unwrap().signals;
211        if signals.contains(OBJECT_PEER_CLOSED) {
212            return true;
213        }
214
215        // The signals bitset might not be updated if we haven't gotten around to processing the
216        // packet telling us that yet. To provide an up-to-date response, we query the current
217        // state of the signal.
218        //
219        // Note: we _could_ update the bitset with what we find here, if we're careful to also
220        // update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
221        // But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
222        // we just leave the bitset as-is and let the regular notification mechanism get around to
223        // it when it gets around to it.
224        match self
225            .handle
226            .wait_handle(OBJECT_PEER_CLOSED, zx::MonotonicInstant::INFINITE_PAST)
227            .to_result()
228        {
229            Ok(_) => true,
230            Err(zx::Status::TIMED_OUT) => false,
231            Err(status) => {
232                // None of the other documented error statuses should be possible, either the type
233                // system doesn't allow it or the wait from `RWHandle::new()` would have already
234                // failed.
235                unreachable!("status: {status}")
236            }
237        }
238    }
239
240    /// Returns a future that completes when `is_closed()` is true.
241    pub fn on_closed(&self) -> OnSignalsRef<'_> {
242        OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
243    }
244
245    fn receiver(&self) -> &RWPacketReceiver {
246        self.receiver.receiver()
247    }
248
249    fn need_signal(&self, cx: &mut Context<'_>, signal: Signal) -> Poll<Result<(), zx::Status>> {
250        let mut inner = self.receiver.0.lock().unwrap();
251        let old = inner.signals;
252        if old.contains(zx::Signals::OBJECT_PEER_CLOSED) {
253            // We don't want to return an error here because even though the peer has closed, the
254            // object could still have queued messages that can be read.
255            Poll::Ready(Ok(()))
256        } else {
257            let waker = cx.waker().clone();
258            let signal = match signal {
259                Signal::Read => {
260                    inner.read_task = Some(waker);
261                    zx::Signals::OBJECT_READABLE
262                }
263                Signal::Write => {
264                    inner.write_task = Some(waker);
265                    zx::Signals::OBJECT_WRITABLE
266                }
267            };
268            if old.contains(signal) {
269                inner.signals &= !signal;
270                std::mem::drop(inner);
271                self.handle.wait_async_handle(
272                    self.receiver.port(),
273                    self.receiver.key(),
274                    signal | zx::Signals::OBJECT_PEER_CLOSED,
275                    zx::WaitAsyncOpts::empty(),
276                )?;
277            }
278            Poll::Pending
279        }
280    }
281}
282
283enum Signal {
284    Read,
285    Write,
286}
287
288impl<T> ReadableHandle for RWHandle<T>
289where
290    T: AsHandleRef,
291{
292    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
293        loop {
294            let signals = self.receiver().0.lock().unwrap().signals;
295            match (signals.contains(OBJECT_READABLE), signals.contains(OBJECT_PEER_CLOSED)) {
296                (true, false) => return Poll::Ready(Ok(ReadableState::Readable)),
297                (_, true) => return Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed)),
298                (false, false) => {
299                    ready!(self.need_signal(cx, Signal::Read)?)
300                }
301            }
302        }
303    }
304
305    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
306        self.need_signal(cx, Signal::Read)
307    }
308}
309
310impl<T> WritableHandle for RWHandle<T>
311where
312    T: AsHandleRef,
313{
314    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
315        loop {
316            let signals = self.receiver().0.lock().unwrap().signals;
317            match (signals.contains(OBJECT_WRITABLE), signals.contains(OBJECT_PEER_CLOSED)) {
318                (_, true) => return Poll::Ready(Ok(WritableState::Closed)),
319                (true, _) => return Poll::Ready(Ok(WritableState::Writable)),
320                (false, false) => {
321                    ready!(self.need_signal(cx, Signal::Write)?)
322                }
323            }
324        }
325    }
326
327    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
328        self.need_signal(cx, Signal::Write)
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use crate::TestExecutor;
336
337    #[test]
338    fn is_closed_immediately_after_close() {
339        let mut exec = TestExecutor::new();
340        let (tx, rx) = zx::Channel::create();
341        let rx_rw_handle = RWHandle::new(rx);
342        let mut noop_ctx = Context::from_waker(futures::task::noop_waker_ref());
343        // Clear optimistic readable state
344        assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
345        // Starting state: the channel is not closed (because we haven't closed it yet)
346        assert!(!rx_rw_handle.is_closed());
347        // we will never set readable, so this should be Pending until we close
348        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
349
350        drop(tx);
351
352        // Implementation note: the cached state will not be updated yet
353        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
354        // But is_closed should return true immediately
355        assert!(rx_rw_handle.is_closed());
356        // Still not updated, and won't be until we let the executor process port packets
357        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
358        // So we do
359        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
360        // And now it is updated, so we observe Closed
361        assert_eq!(
362            rx_rw_handle.poll_readable(&mut noop_ctx),
363            Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed))
364        );
365        // And is_closed should still be true, of course.
366        assert!(rx_rw_handle.is_closed());
367    }
368
369    // Regression test for https://fxbug.dev/417333384.
370    #[test]
371    fn simultaneous_read_and_write() {
372        let mut exec = TestExecutor::new();
373        let (peer, local) = zx::Socket::create_stream();
374        let mut buff = [0u8; 1024];
375        while local.write(&buff[..]).is_ok() {}
376
377        let rw_handle = RWHandle::new(local);
378        let read_fut = futures::future::poll_fn(|cx| {
379            let readable = ready!(rw_handle.poll_readable(cx));
380            assert_eq!(readable, Ok(ReadableState::Readable));
381            let mut buf = [0u8; 2];
382            loop {
383                match rw_handle.get_ref().read(&mut buf[..]) {
384                    Ok(r) => assert_eq!(r, 1),
385                    Err(e) => {
386                        assert_eq!(e, zx::Status::SHOULD_WAIT);
387                        break;
388                    }
389                }
390            }
391            assert_eq!(rw_handle.need_readable(cx), Poll::Pending);
392            Poll::<()>::Pending
393        });
394
395        let write_fut = futures::future::poll_fn(|cx| {
396            let writable = ready!(rw_handle.poll_writable(cx));
397            assert_eq!(writable, Ok(WritableState::Writable));
398            let buf = [0u8; 1];
399            loop {
400                match rw_handle.get_ref().write(&buf[..]) {
401                    Ok(r) => assert_eq!(r, 1),
402                    Err(e) => {
403                        assert_eq!(e, zx::Status::SHOULD_WAIT);
404                        break;
405                    }
406                }
407            }
408            assert_eq!(rw_handle.need_writable(cx), Poll::Pending);
409            Poll::<()>::Pending
410        });
411
412        let mut fut = std::pin::pin!(futures::future::join(write_fut, read_fut));
413        for _ in 0..5 {
414            assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
415            assert_eq!(peer.read(&mut buff[0..1]), Ok(1));
416            assert_eq!(peer.write(&buff[0..1]), Ok(1));
417        }
418
419        let mut read_waits = 0;
420        let mut write_waits = 0;
421        while let Ok(p) = exec.port().wait(zx::MonotonicInstant::INFINITE_PAST) {
422            if p.key() != rw_handle.receiver.key() {
423                continue;
424            }
425            let p = match p.contents() {
426                zx::PacketContents::SignalOne(p) => p,
427                e => panic!("unexpected packet {e:?}"),
428            };
429            if p.trigger().contains(zx::Signals::OBJECT_READABLE) {
430                read_waits += 1;
431            }
432            if p.trigger().contains(zx::Signals::OBJECT_WRITABLE) {
433                write_waits += 1;
434            }
435        }
436        // We should not have installed more than 1 waiter for each side of the
437        // operation.
438        assert_eq!((read_waits, write_waits), (1, 1));
439    }
440}