fuchsia_async/handle/zircon/
rwhandle.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
6use crate::OnSignalsRef;
7use std::marker::PhantomData;
8use std::sync::{Arc, Mutex};
9use std::task::{ready, Context, Poll, Waker};
10use zx::{self as zx, AsHandleRef};
11
12const OBJECT_PEER_CLOSED: zx::Signals = zx::Signals::OBJECT_PEER_CLOSED;
13const OBJECT_READABLE: zx::Signals = zx::Signals::OBJECT_READABLE;
14const OBJECT_WRITABLE: zx::Signals = zx::Signals::OBJECT_WRITABLE;
15
16/// State of an object when it is ready for reading.
17#[derive(Debug, PartialEq, Eq, Copy, Clone)]
18pub enum ReadableState {
19    /// Received `OBJECT_READABLE`, or optimistically assuming the object is readable.
20    Readable,
21    /// Received `OBJECT_PEER_CLOSED`.  The object might also be readable.
22    MaybeReadableAndClosed,
23}
24
25/// State of an object when it is ready for writing.
26#[derive(Debug, PartialEq, Eq, Copy, Clone)]
27pub enum WritableState {
28    /// Received `OBJECT_WRITABLE`, or optimistically assuming the object is writable.
29    Writable,
30    /// Received `OBJECT_PEER_CLOSED`.
31    Closed,
32}
33
34/// A `Handle` that receives notifications when it is readable.
35///
36/// # Examples
37///
38/// ```
39/// loop {
40///     ready!(self.poll_readable(cx))?;
41///     match /* make read syscall */ {
42///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
43///         status => return Poll::Ready(status),
44///     }
45/// }
46/// ```
47pub trait ReadableHandle {
48    /// If the object is ready for reading, returns `Ready` with the readable
49    /// state. If the implementor returns Pending, it should first ensure that
50    /// `need_readable` is called.
51    ///
52    /// This should be called in a poll function. If the syscall returns
53    /// `SHOULD_WAIT`, you must call `need_readable` to schedule wakeup when the
54    /// object is readable.
55    ///
56    /// The returned `ReadableState` does not necessarily reflect an observed
57    /// `OBJECT_READABLE` signal. We optimistically assume the object remains
58    /// readable until `need_readable` is called.
59    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>>;
60
61    /// Arranges for the current task to be woken when the object receives an
62    /// `OBJECT_READABLE` or `OBJECT_PEER_CLOSED` signal.  This can return
63    /// Poll::Ready if the object has already been signaled in which case the
64    /// waker *will* not be woken and it is the caller's responsibility to not
65    /// lose the signal.
66    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
67}
68
69/// A `Handle` that receives notifications when it is writable.
70///
71/// # Examples
72///
73/// ```
74/// loop {
75///     ready!(self.poll_writable(cx))?;
76///     match /* make write syscall */ {
77///         Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
78///         status => Poll::Ready(status),
79///     }
80/// }
81/// ```
82pub trait WritableHandle {
83    /// If the object is ready for writing, returns `Ready` with the writable
84    /// state. If the implementor returns Pending, it should first ensure that
85    /// `need_writable` is called.
86    ///
87    /// This should be called in a poll function. If the syscall returns
88    /// `SHOULD_WAIT`, you must call `need_writable` to schedule wakeup when the
89    /// object is writable.
90    ///
91    /// The returned `WritableState` does not necessarily reflect an observed
92    /// `OBJECT_WRITABLE` signal. We optimistically assume the object remains
93    /// writable until `need_writable` is called.
94    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>>;
95
96    /// Arranges for the current task to be woken when the object receives an
97    /// `OBJECT_WRITABLE` or `OBJECT_PEER_CLOSED` signal. This can return
98    /// Poll::Ready if the object has already been signaled in which case the
99    /// waker *will* not be woken and it is the caller's responsibility to not
100    /// lose the signal.
101    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>>;
102}
103
104struct RWPacketReceiver<S: RWHandleSpec> {
105    inner: Mutex<Inner>,
106    _marker: PhantomData<S>,
107}
108
109struct Inner {
110    signals: zx::Signals,
111    read_task: Option<Waker>,
112    write_task: Option<Waker>,
113}
114
115impl<S: RWHandleSpec> PacketReceiver for RWPacketReceiver<S> {
116    fn receive_packet(&self, packet: zx::Packet) {
117        let new = if let zx::PacketContents::SignalOne(p) = packet.contents() {
118            // Only consider the signals that were part of the trigger. This
119            // ensures that only the packets generated by the correct signal
120            // observer (Read or Write) can raise the corresponding inner signal
121            // bit.
122            //
123            // Without this, we can lose track of how many port observers are
124            // installed.
125            p.observed() & p.trigger()
126        } else {
127            return;
128        };
129
130        // We wake the tasks when the lock isn't held in case the wakers need the same lock.
131        let mut read_task = None;
132        let mut write_task = None;
133        {
134            let mut inner = self.inner.lock().unwrap();
135            let old = inner.signals;
136            inner.signals |= new;
137
138            let became_readable =
139                new.intersection(S::READABLE_SIGNALS) != old.intersection(S::READABLE_SIGNALS);
140            let became_writable =
141                new.intersection(S::WRITABLE_SIGNALS) != old.intersection(S::WRITABLE_SIGNALS);
142            let became_closed =
143                new.contains(OBJECT_PEER_CLOSED) && !old.contains(OBJECT_PEER_CLOSED);
144            if became_readable || became_closed {
145                read_task = inner.read_task.take();
146            }
147            if became_writable || became_closed {
148                write_task = inner.write_task.take();
149            }
150        }
151        // *NOTE*: This is the only safe place to wake wakers.  In any other location, there is a
152        // risk that locks are held which might be required when the waker is woken.  It is safe to
153        // wake here because this is called from the executor when no locks are held.
154        if let Some(read_task) = read_task {
155            read_task.wake();
156        }
157        if let Some(write_task) = write_task {
158            write_task.wake();
159        }
160    }
161}
162
163/// A `Handle` that receives notifications when it is readable/writable.
164pub struct RWHandle<T, S: RWHandleSpec = DefaultRWHandleSpec> {
165    handle: T,
166    receiver: ReceiverRegistration<RWPacketReceiver<S>>,
167}
168
169impl<T> RWHandle<T, DefaultRWHandleSpec>
170where
171    T: AsHandleRef,
172{
173    /// Creates a new `RWHandle` object which will receive notifications when
174    /// the underlying handle becomes readable, writable, or closes.
175    ///
176    /// # Panics
177    ///
178    /// If called outside the context of an active async executor.
179    pub fn new(handle: T) -> Self {
180        Self::new_with_spec(handle)
181    }
182}
183
184impl<T, S> RWHandle<T, S>
185where
186    T: AsHandleRef,
187    S: RWHandleSpec,
188{
189    /// Creates a new `RWHandle` with a non-default spec and an object which
190    /// will receive notifications when the underlying handle becomes readable,
191    /// writable, or closes.
192    ///
193    /// # Panics
194    ///
195    /// If called outside the context of an active async executor.
196    ///
197    /// If `S::READABLE_SIGNALS` or `S::WRITABLE_SIGNALS` contain
198    /// [`zx::Signals::OBJECT_PEER_CLOSED`] or have a non-empty intersection.
199    pub fn new_with_spec(handle: T) -> Self {
200        let ehandle = EHandle::local();
201        let internal_signals = OBJECT_PEER_CLOSED;
202        assert!(
203            !S::READABLE_SIGNALS.contains(internal_signals),
204            "readable signals may not contain ({internal_signals:?})"
205        );
206        assert!(
207            !S::WRITABLE_SIGNALS.contains(internal_signals),
208            "writable signals may not contain ({internal_signals:?})"
209        );
210        assert!(!S::WRITABLE_SIGNALS.intersects(S::READABLE_SIGNALS), "signals may not intersect");
211        let initial_signals = S::WRITABLE_SIGNALS | S::READABLE_SIGNALS;
212        let receiver = ehandle.register_receiver(Arc::new(RWPacketReceiver {
213            inner: Mutex::new(Inner {
214                // Optimistically assume that the handle is readable and writable.
215                // Reads and writes will be attempted before queueing a packet.
216                // This makes handles slightly faster to read/write the first time
217                // they're accessed after being created, provided they start off as
218                // readable or writable. In return, there will be an extra wasted
219                // syscall per read/write if the handle is not readable or writable.
220                signals: initial_signals,
221                read_task: None,
222                write_task: None,
223            }),
224            _marker: PhantomData,
225        }));
226
227        RWHandle { handle, receiver }
228    }
229
230    /// Returns a reference to the underlying handle.
231    pub fn get_ref(&self) -> &T {
232        &self.handle
233    }
234
235    /// Returns a mutable reference to the underlying handle.
236    pub fn get_mut(&mut self) -> &mut T {
237        &mut self.handle
238    }
239
240    /// Consumes `self` and returns the underlying handle.
241    pub fn into_inner(self) -> T {
242        self.handle
243    }
244
245    /// Returns true if the object received the `OBJECT_PEER_CLOSED` signal.
246    pub fn is_closed(&self) -> bool {
247        let signals = self.receiver().inner.lock().unwrap().signals;
248        if signals.contains(OBJECT_PEER_CLOSED) {
249            return true;
250        }
251
252        // The signals bitset might not be updated if we haven't gotten around to processing the
253        // packet telling us that yet. To provide an up-to-date response, we query the current
254        // state of the signal.
255        //
256        // Note: we _could_ update the bitset with what we find here, if we're careful to also
257        // update READABLE + WRITEABLE at the same time, and also wakeup the tasks as necessary.
258        // But having `is_closed` wakeup tasks if it discovered a signal change seems too weird, so
259        // we just leave the bitset as-is and let the regular notification mechanism get around to
260        // it when it gets around to it.
261        match self
262            .handle
263            .wait_handle(OBJECT_PEER_CLOSED, zx::MonotonicInstant::INFINITE_PAST)
264            .to_result()
265        {
266            Ok(_) => true,
267            Err(zx::Status::TIMED_OUT) => false,
268            Err(status) => {
269                // None of the other documented error statuses should be possible, either the type
270                // system doesn't allow it or the wait from `RWHandle::new()` would have already
271                // failed.
272                unreachable!("status: {status}")
273            }
274        }
275    }
276
277    /// Returns a future that completes when `is_closed()` is true.
278    pub fn on_closed(&self) -> OnSignalsRef<'_> {
279        OnSignalsRef::new(self.handle.as_handle_ref(), OBJECT_PEER_CLOSED)
280    }
281
282    fn receiver(&self) -> &RWPacketReceiver<S> {
283        self.receiver.receiver()
284    }
285
286    fn need_signal(&self, cx: &mut Context<'_>, signal: Signal) -> Poll<Result<(), zx::Status>> {
287        let mut inner = self.receiver.inner.lock().unwrap();
288        let old = inner.signals;
289        if old.contains(OBJECT_PEER_CLOSED) {
290            // We don't want to return an error here because even though the peer has closed, the
291            // object could still have queued messages that can be read.
292            Poll::Ready(Ok(()))
293        } else {
294            let waker = cx.waker().clone();
295            let signal = match signal {
296                Signal::Read => {
297                    inner.read_task = Some(waker);
298                    S::READABLE_SIGNALS
299                }
300                Signal::Write => {
301                    inner.write_task = Some(waker);
302                    S::WRITABLE_SIGNALS
303                }
304            };
305            if old.intersects(signal) {
306                inner.signals &= !signal;
307                std::mem::drop(inner);
308                self.handle.wait_async_handle(
309                    self.receiver.port(),
310                    self.receiver.key(),
311                    signal | OBJECT_PEER_CLOSED,
312                    zx::WaitAsyncOpts::empty(),
313                )?;
314            }
315            Poll::Pending
316        }
317    }
318
319    fn poll_signal(
320        &self,
321        cx: &mut Context<'_>,
322        signal: Signal,
323    ) -> Poll<Result<zx::Signals, zx::Status>> {
324        let mask = match signal {
325            Signal::Read => S::READABLE_SIGNALS,
326            Signal::Write => S::WRITABLE_SIGNALS,
327        } | OBJECT_PEER_CLOSED;
328
329        loop {
330            let signals = self.receiver().inner.lock().unwrap().signals;
331            let asserted = signals.intersection(mask);
332            if !asserted.is_empty() {
333                return Poll::Ready(Ok(asserted));
334            }
335            ready!(self.need_signal(cx, signal)?);
336        }
337    }
338}
339
340#[derive(Copy, Clone)]
341enum Signal {
342    Read,
343    Write,
344}
345
346impl<T, S> ReadableHandle for RWHandle<T, S>
347where
348    T: AsHandleRef,
349    S: RWHandleSpec,
350{
351    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
352        let signals = ready!(self.poll_signal(cx, Signal::Read)?);
353        if signals.contains(OBJECT_PEER_CLOSED) {
354            return Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed));
355        }
356        Poll::Ready(Ok(ReadableState::Readable))
357    }
358
359    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
360        self.need_signal(cx, Signal::Read)
361    }
362}
363
364impl<T, S> WritableHandle for RWHandle<T, S>
365where
366    T: AsHandleRef,
367    S: RWHandleSpec,
368{
369    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
370        let signals = ready!(self.poll_signal(cx, Signal::Write)?);
371        if signals.contains(OBJECT_PEER_CLOSED) {
372            return Poll::Ready(Ok(WritableState::Closed));
373        }
374        Poll::Ready(Ok(WritableState::Writable))
375    }
376
377    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
378        self.need_signal(cx, Signal::Write)
379    }
380}
381
382/// A trait specifying the behavior of [`RWHandle`].
383///
384/// The default behavior, listening for [`zx::Signals::OBJECT_READABLE`] and
385/// [`zx::Signals::OBJECT_WRITABLE`] is provided by [`DefaultRWHandleSpec`].
386pub trait RWHandleSpec: Send + Sync + 'static {
387    /// Signals asserted when the handle is readable.
388    ///
389    /// [`RWHandle`] installs wait for these signals and if any of them are
390    /// asserted, the handle is considered readable.
391    const READABLE_SIGNALS: zx::Signals;
392    /// Signals asserted when the handle is writable.
393    ///
394    /// [`RWHandle`] installs wait for these signals and if any of them are
395    /// asserted, the handle is considered writable.
396    const WRITABLE_SIGNALS: zx::Signals;
397}
398
399/// The default behavior for [`RWHandle`].
400///
401/// Considers the handle readable when [`zx::Signals::OBJECT_READABLE`] is set,
402/// and writable when [`zx::Signals::OBJECT_WRITABLE`] is set.
403pub struct DefaultRWHandleSpec;
404
405impl RWHandleSpec for DefaultRWHandleSpec {
406    const READABLE_SIGNALS: zx::Signals = OBJECT_READABLE;
407    const WRITABLE_SIGNALS: zx::Signals = OBJECT_WRITABLE;
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413    use crate::TestExecutor;
414
415    #[test]
416    fn is_closed_immediately_after_close() {
417        let mut exec = TestExecutor::new();
418        let (tx, rx) = zx::Channel::create();
419        let rx_rw_handle = RWHandle::new(rx);
420        let mut noop_ctx = Context::from_waker(futures::task::noop_waker_ref());
421        // Clear optimistic readable state
422        assert!(rx_rw_handle.need_readable(&mut noop_ctx).is_pending());
423        // Starting state: the channel is not closed (because we haven't closed it yet)
424        assert!(!rx_rw_handle.is_closed());
425        // we will never set readable, so this should be Pending until we close
426        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
427
428        drop(tx);
429
430        // Implementation note: the cached state will not be updated yet
431        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
432        // But is_closed should return true immediately
433        assert!(rx_rw_handle.is_closed());
434        // Still not updated, and won't be until we let the executor process port packets
435        assert_eq!(rx_rw_handle.poll_readable(&mut noop_ctx), Poll::Pending);
436        // So we do
437        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
438        // And now it is updated, so we observe Closed
439        assert_eq!(
440            rx_rw_handle.poll_readable(&mut noop_ctx),
441            Poll::Ready(Ok(ReadableState::MaybeReadableAndClosed))
442        );
443        // And is_closed should still be true, of course.
444        assert!(rx_rw_handle.is_closed());
445    }
446
447    // Regression test for https://fxbug.dev/417333384.
448    #[test]
449    fn simultaneous_read_and_write() {
450        let mut exec = TestExecutor::new();
451        let (peer, local) = zx::Socket::create_stream();
452        let mut buff = [0u8; 1024];
453        while local.write(&buff[..]).is_ok() {}
454
455        let rw_handle = RWHandle::new(local);
456        let read_fut = futures::future::poll_fn(|cx| {
457            let readable = ready!(rw_handle.poll_readable(cx));
458            assert_eq!(readable, Ok(ReadableState::Readable));
459            let mut buf = [0u8; 2];
460            loop {
461                match rw_handle.get_ref().read(&mut buf[..]) {
462                    Ok(r) => assert_eq!(r, 1),
463                    Err(e) => {
464                        assert_eq!(e, zx::Status::SHOULD_WAIT);
465                        break;
466                    }
467                }
468            }
469            assert_eq!(rw_handle.need_readable(cx), Poll::Pending);
470            Poll::<()>::Pending
471        });
472
473        let write_fut = futures::future::poll_fn(|cx| {
474            let writable = ready!(rw_handle.poll_writable(cx));
475            assert_eq!(writable, Ok(WritableState::Writable));
476            let buf = [0u8; 1];
477            loop {
478                match rw_handle.get_ref().write(&buf[..]) {
479                    Ok(r) => assert_eq!(r, 1),
480                    Err(e) => {
481                        assert_eq!(e, zx::Status::SHOULD_WAIT);
482                        break;
483                    }
484                }
485            }
486            assert_eq!(rw_handle.need_writable(cx), Poll::Pending);
487            Poll::<()>::Pending
488        });
489
490        let mut fut = std::pin::pin!(futures::future::join(write_fut, read_fut));
491        for _ in 0..5 {
492            assert_eq!(exec.run_until_stalled(&mut fut), Poll::Pending);
493            assert_eq!(peer.read(&mut buff[0..1]), Ok(1));
494            assert_eq!(peer.write(&buff[0..1]), Ok(1));
495        }
496
497        let mut read_waits = 0;
498        let mut write_waits = 0;
499        while let Ok(p) = exec.port().wait(zx::MonotonicInstant::INFINITE_PAST) {
500            if p.key() != rw_handle.receiver.key() {
501                continue;
502            }
503            let p = match p.contents() {
504                zx::PacketContents::SignalOne(p) => p,
505                e => panic!("unexpected packet {e:?}"),
506            };
507            if p.trigger().contains(zx::Signals::OBJECT_READABLE) {
508                read_waits += 1;
509            }
510            if p.trigger().contains(zx::Signals::OBJECT_WRITABLE) {
511                write_waits += 1;
512            }
513        }
514        // We should not have installed more than 1 waiter for each side of the
515        // operation.
516        assert_eq!((read_waits, write_waits), (1, 1));
517    }
518}