fuchsia_async/handle/zircon/
rwhandle.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
6use crate::OnSignalsRef;
7use std::sync::{Arc, Mutex};
8use std::task::{ready, Context, Poll, Waker};
9use zx::{self as zx, AsHandleRef};
10
11const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
12const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
13const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
14
15/// State of an object when it is ready for reading.
16#[derive(Debug, PartialEq, Eq, Copy, Clone)]
17pub enum ReadableState {
18    /// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
19    Readable,
20    /// Received `OBJECT_PEER_CLOSED`.  The object might also be readable.
21    MaybeReadableAndClosed,
22}
23
24/// State of an object when it is ready for writing.
25#[derive(Debug, PartialEq, Eq, Copy, Clone)]
26pub enum WritableState {
27    /// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
28    Writable,
29    /// Received `OBJECT_PEER_CLOSED`.
30    Closed,
31}
32
33/// A `Handle` that receives notifications when it is readable.
34///
35/// # Examples
36///
37/// ```
38/// loop {
39///     ready!(self.poll_readable(cx))?;
40///     match /* make read syscall */ {
41///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
42///         status => return Poll::Ready(status),
43///     }
44/// }
45/// ```
46pub trait ReadableHandle {
47    /// If the object is ready for reading, returns `Ready` with the readable
48    /// state. If the implementor returns Pending, it should first ensure that
49    /// `need_readable` is called.
50    ///
51    /// This should be called in a poll function. If the syscall returns
52    /// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
53    /// object is readable.
54    ///
55    /// The returned `ReadableState` does not necessarily reflect an observed
56    /// `OBJECT_READABLE` signal. We optimistically assume the object remains
57    /// readable until `need_readable` is called.
58    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;
59
60    /// Arranges for the current task to be woken when the object receives an
61    /// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal.  This can return
62    /// Poll::Ready if the object has already been signaled in which case the
63    /// waker *will* not be woken and it is the caller's responsibility to not
64    /// lose the signal.
65    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
66}
67
68/// A `Handle` that receives notifications when it is writable.
69///
70/// # Examples
71///
72/// ```
73/// loop {
74///     ready!(self.poll_writable(cx))?;
75///     match /* make write syscall */ {
76///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
77///         status => Poll::Ready(status),
78///     }
79/// }
80/// ```
81pub trait WritableHandle {
82    /// If the object is ready for writing, returns `Ready` with the writable
83    /// state. If the implementor returns Pending, it should first ensure that
84    /// `need_writable` is called.
85    ///
86    /// This should be called in a poll function. If the syscall returns
87    /// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
88    /// object is writable.
89    ///
90    /// The returned `WritableState` does not necessarily reflect an observed
91    /// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
92    /// writable until `need_writable` is called.
93    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;
94
95    /// Arranges for the current task to be woken when the object receives an
96    /// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
97    /// Poll::Ready if the object has already been signaled in which case the
98    /// waker *will* not be woken and it is the caller's responsibility to not
99    /// lose the signal.
100    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
101}
102
103struct RWPacketReceiver(Mutex<Inner>);
104
105struct Inner {
106    signals: zx::Signals,
107    read_task: Option<Waker>,
108    write_task: Option<Waker>,
109}
110
111impl PacketReceiver for RWPacketReceiver {
112    fn receive_packet(&self, packet: zx::Packet) {
113        let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
114            p.observed()
115        } else {
116            return;
117        };
118
119        // We wake the tasks when the lock isn't held in case the wakers need the same lock.
120        let mut read_task = None;
121        let mut write_task = None;
122        {
123            let mut inner = self.0.lock().unwrap();
124            let old = inner.signals;
125            inner.signals |= new;
126
127            let became_readable = new.contains(OBJECT_READABLE) && !old.contains(OBJECT_READABLE);
128            let became_writable = new.contains(OBJECT_WRITABLE) && !old.contains(OBJECT_WRITABLE);
129            let became_closed =
130                new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
131
132            if became_readable || became_closed {
133                read_task = inner.read_task.take();
134            }
135            if became_writable || became_closed {
136                write_task = inner.write_task.take();
137            }
138        }
139        // *NOTE*: This is the only safe place to wake wakers.  In any other location, there is a
140        // risk that locks are held which might be required when the waker is woken.  It is safe to
141        // wake here because this is called from the executor when no locks are held.
142        if let Some(read_task) = read_task {
143            read_task.wake();
144        }
145        if let Some(write_task) = write_task {
146            write_task.wake();
147        }
148    }
149}
150
151/// A `Handle` that receives notifications when it is readable/writable.
152pub struct RWHandle<T> {
153    handle: T,
154    receiver: ReceiverRegistration<RWPacketReceiver>,
155}
156
157impl<T> RWHandle<T>
158where
159    T: AsHandleRef,
160{
161    /// Creates a new `RWHandle` object which will receive notifications when
162    /// the underlying handle becomes readable, writable, or closes.
163    ///
164    /// # Panics
165    ///
166    /// If called outside the context of an active async executor.
167    pub fn new(handle: T) -> Self {
168        let ehandle = EHandle::local();
169
170        let initial_signals = OBJECT_READABLE | OBJECT_WRITABLE;
171        let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver(Mutex::new(Inner {
172            // Optimistically assume that the handle is readable and writable.
173            // Reads and writes will be attempted before queueing a packet.
174            // This makes handles slightly faster to read/write the first time
175            // they're accessed after being created, provided they start off as
176            // readable or writable. In return, there will be an extra wasted
177            // syscall per read/write if the handle is not readable or writable.
178            signals: initial_signals,
179            read_task: None,
180            write_task: None,
181        }))));
182
183        RWHandle { handle, receiver }
184    }
185
186    /// Returns a reference to the underlying handle.
187    pub fn get_ref(&self) -> &T {
188        &self.handle
189    }
190
191    /// Returns a mutable reference to the underlying handle.
192    pub fn get_mut(&mut self) -> &mut T {
193        &mut self.handle
194    }
195
196    /// Consumes `self` and returns the underlying handle.
197    pub fn into_inner(self) -> T {
198        self.handle
199    }
200
201    /// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
202    pub fn is_closed(&self) -> bool {
203        let signals = self.receiver().0.lock().unwrap().signals;
204        if signals.contains(OBJECT_PEER_CLOSED) {
205            return true;
206        }
207
208        // The signals bitset might not be updated if we haven't gotten around to processing the
209        // packet telling us that yet. To provide an up-to-date response, we query the current
210        // state of the signal.
211        //
212        // Note: we _could_ update the bitset with what we find here, if we're careful to also
213        // update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
214        // But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
215        // we just leave the bitset as-is and let the regular notification mechanism get around to
216        // it when it gets around to it.
217        match self
218            .handle
219            .wait_handle(OBJECT_PEER_CLOSED, zx::MonotonicInstant::INFINITE_PAST)
220            .to_result()
221        {
222            Ok(_) => true,
223            Err(zx::Status::TIMED_OUT) => false,
224            Err(status) => {
225                // None of the other documented error statuses should be possible, either the type
226                // system doesn't allow it or the wait from `RWHandle::new()` would have already
227                // failed.
228                unreachable!("status: {status}")
229            }
230        }
231    }
232
233    /// Returns a future that completes when `is_closed()` is true.
234    pub fn on_closed(&self) -> OnSignalsRef<'_> {
235        OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
236    }
237
238    fn receiver(&self) -> &RWPacketReceiver {
239        self.receiver.receiver()
240    }
241
242    fn need_signal(
243        &self,
244        cx: &mut Context<'_>,
245        for_read: bool,
246        signal: zx::Signals,
247    ) -> Poll<Result<(), zx::Status>> {
248        let mut inner = self.receiver.0.lock().unwrap();
249        let old = inner.signals;
250        if old.contains(zx::Signals::OBJECT_PEER_CLOSED) {
251            // We don't want to return an error here because even though the peer has closed, the
252            // object could still have queued messages that can be read.
253            Poll::Ready(Ok(()))
254        } else {
255            let waker = cx.waker().clone();
256            if for_read {
257                inner.read_task = Some(waker);
258            } else {
259                inner.write_task = Some(waker);
260            }
261            if old.contains(signal) {
262                inner.signals &= !signal;
263                std::mem::drop(inner);
264                self.handle.wait_async_handle(
265                    self.receiver.port(),
266                    self.receiver.key(),
267                    signal | zx::Signals::OBJECT_PEER_CLOSED,
268                    zx::WaitAsyncOpts::empty(),
269                )?;
270            }
271            Poll::Pending
272        }
273    }
274}
275
276impl<T> ReadableHandle for RWHandle<T>
277where
278    T: AsHandleRef,
279{
280    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
281        loop {
282            let signals = self.receiver().0.lock().unwrap().signals;
283            match (signals.contains(OBJECT_READABLE), signals.contains(OBJECT_PEER_CLOSED)) {
284                (true, false) => return Poll::Ready(Ok(ReadableState::Readable)),
285                (_, true) => return Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed)),
286                (false, false) => {
287                    ready!(self.need_signal(cx, true, OBJECT_READABLE)?)
288                }
289            }
290        }
291    }
292
293    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
294        self.need_signal(cx, true, OBJECT_READABLE)
295    }
296}
297
298impl<T> WritableHandle for RWHandle<T>
299where
300    T: AsHandleRef,
301{
302    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
303        loop {
304            let signals = self.receiver().0.lock().unwrap().signals;
305            match (signals.contains(OBJECT_WRITABLE), signals.contains(OBJECT_PEER_CLOSED)) {
306                (_, true) => return Poll::Ready(Ok(WritableState::Closed)),
307                (true, _) => return Poll::Ready(Ok(WritableState::Writable)),
308                (false, false) => {
309                    ready!(self.need_signal(cx, false, OBJECT_WRITABLE)?)
310                }
311            }
312        }
313    }
314
315    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
316        self.need_signal(cx, false, OBJECT_WRITABLE)
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use crate::TestExecutor;
324
325    #[test]
326    fn is_closed_immediately_after_close() {
327        let mut exec = TestExecutor::new();
328        let (tx, rx) = zx::Channel::create();
329        let rx_rw_handle = RWHandle::new(rx);
330        let mut noop_ctx = Context::from_waker(futures::task::noop_waker_ref());
331        // Clear optimistic readable state
332        assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
333        // Starting state: the channel is not closed (because we haven't closed it yet)
334        assert_eq!(rx_rw_handle.is_closed(), false);
335        // we will never set readable, so this should be Pending until we close
336        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
337
338        drop(tx);
339
340        // Implementation note: the cached state will not be updated yet
341        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
342        // But is_closed should return true immediately
343        assert_eq!(rx_rw_handle.is_closed(), true);
344        // Still not updated, and won't be until we let the executor process port packets
345        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
346        // So we do
347        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
348        // And now it is updated, so we observe Closed
349        assert_eq!(
350            rx_rw_handle.poll_readable(&mut noop_ctx),
351            Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed))
352        );
353        // And is_closed should still be true, of course.
354        assert_eq!(rx_rw_handle.is_closed(), true);
355    }
356}