fuchsia_async/handle/zircon/
socket.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::on_signals::OnSignalsRef;
6use super::rwhandle::{
7    RWHandle, RWHandleSpec, ReadableHandle, ReadableState, WritableHandle, WritableState,
8};
9use futures::future::poll_fn;
10use futures::io::{self, AsyncRead, AsyncWrite};
11use futures::ready;
12use futures::stream::Stream;
13use futures::task::Context;
14use std::fmt;
15use std::pin::Pin;
16use std::task::Poll;
17use zx::{self as zx, AsHandleRef};
18
19/// An I/O object representing a `Socket`.
20pub struct Socket(RWHandle<zx::Socket, SocketRWHandleSpec>);
21
22impl AsRef<zx::Socket> for Socket {
23    fn as_ref(&self) -> &zx::Socket {
24        self.0.get_ref()
25    }
26}
27
28impl AsHandleRef for Socket {
29    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
30        self.0.get_ref().as_handle_ref()
31    }
32}
33
34impl Socket {
35    /// Create a new `Socket` from a previously-created `zx::Socket`.
36    ///
37    /// # Panics
38    ///
39    /// If called outside the context of an active async executor.
40    pub fn from_socket(socket: zx::Socket) -> Self {
41        Socket(RWHandle::new_with_spec(socket))
42    }
43
44    /// Consumes `self` and returns the underlying `zx::Socket`.
45    pub fn into_zx_socket(self) -> zx::Socket {
46        self.0.into_inner()
47    }
48
49    /// Returns true if the socket received the `OBJECT_PEER_CLOSED` signal.
50    pub fn is_closed(&self) -> bool {
51        self.0.is_closed()
52    }
53
54    /// Returns a future that completes when the socket received the `OBJECT_PEER_CLOSED` signal.
55    pub fn on_closed(&self) -> OnSignalsRef<'_> {
56        self.0.on_closed()
57    }
58
59    /// Attempt to read from the socket, registering for wakeup if the socket doesn't have any
60    /// contents available. Used internally in the `AsyncRead` implementation, exposed for users
61    /// who know the concrete type they're using and don't want to pin the socket.
62    ///
63    /// Note: this function will never return `PEER_CLOSED` as an error. Instead, it will return
64    /// `Ok(0)` when the peer closes, to match the contract of `std::io::Read`.
65    pub fn poll_read_ref(
66        &self,
67        cx: &mut Context<'_>,
68        buf: &mut [u8],
69    ) -> Poll<Result<usize, zx::Status>> {
70        ready!(self.poll_readable(cx))?;
71        loop {
72            let res = self.0.get_ref().read(buf);
73            match res {
74                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
75                Err(zx::Status::BAD_STATE) => {
76                    // BAD_STATE indicates our peer is closed for writes.
77                    return Poll::Ready(Ok(0));
78                }
79                Err(zx::Status::PEER_CLOSED) => return Poll::Ready(Ok(0)),
80                _ => return Poll::Ready(res),
81            }
82        }
83    }
84
85    /// Attempt to write into the socket, registering for wakeup if the socket is not ready. Used
86    /// internally in the `AsyncWrite` implementation, exposed for users who know the concrete type
87    /// they're using and don't want to pin the socket.
88    pub fn poll_write_ref(
89        &self,
90        cx: &mut Context<'_>,
91        buf: &[u8],
92    ) -> Poll<Result<usize, zx::Status>> {
93        ready!(self.poll_writable(cx))?;
94        loop {
95            let res = self.0.get_ref().write(buf);
96            match res {
97                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
98                Err(zx::Status::BAD_STATE) => {
99                    // BAD_STATE indicates we're closed for writes.
100                    return Poll::Ready(Err(zx::Status::BAD_STATE));
101                }
102                _ => return Poll::Ready(res),
103            }
104        }
105    }
106
107    /// Polls for the next data on the socket, appending it to the end of |out| if it has arrived.
108    /// Not very useful for a non-datagram socket as it will return all available data
109    /// on the socket.
110    pub fn poll_datagram(
111        &self,
112        cx: &mut Context<'_>,
113        out: &mut Vec<u8>,
114    ) -> Poll<Result<usize, zx::Status>> {
115        ready!(self.poll_readable(cx))?;
116        let avail = self.0.get_ref().outstanding_read_bytes()?;
117        let len = out.len();
118        out.resize(len + avail, 0);
119        let (_, tail) = out.split_at_mut(len);
120        loop {
121            match self.0.get_ref().read(tail) {
122                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
123                Err(e) => return Poll::Ready(Err(e)),
124                Ok(bytes) => {
125                    return if bytes == avail {
126                        Poll::Ready(Ok(bytes))
127                    } else {
128                        Poll::Ready(Err(zx::Status::IO_DATA_LOSS))
129                    }
130                }
131            }
132        }
133    }
134
135    /// Reads the next datagram that becomes available onto the end of |out|.  Note: Using this
136    /// multiple times concurrently is an error and the first one will never complete.
137    pub async fn read_datagram<'a>(&'a self, out: &'a mut Vec<u8>) -> Result<usize, zx::Status> {
138        poll_fn(move |cx| self.poll_datagram(cx, out)).await
139    }
140
141    /// Use this socket as a stream of `Result<Vec<u8>, zx::Status>` datagrams.
142    ///
143    /// Note: multiple concurrent streams from the same socket are not supported.
144    pub fn as_datagram_stream(&self) -> DatagramStream<&Self> {
145        DatagramStream(self)
146    }
147
148    /// Convert this socket into a stream of `Result<Vec<u8>, zx::Status>` datagrams.
149    pub fn into_datagram_stream(self) -> DatagramStream<Self> {
150        DatagramStream(self)
151    }
152}
153
154impl ReadableHandle for Socket {
155    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
156        self.0.poll_readable(cx)
157    }
158
159    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
160        self.0.need_readable(cx)
161    }
162}
163
164impl WritableHandle for Socket {
165    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
166        self.0.poll_writable(cx)
167    }
168
169    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
170        self.0.need_writable(cx)
171    }
172}
173
174impl fmt::Debug for Socket {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        self.0.get_ref().fmt(f)
177    }
178}
179
180impl AsyncRead for Socket {
181    /// Note: this function will never return `PEER_CLOSED` as an error. Instead, it will return
182    /// `Ok(0)` when the peer closes, to match the contract of `std::io::Read`.
183    fn poll_read(
184        self: Pin<&mut Self>,
185        cx: &mut Context<'_>,
186        buf: &mut [u8],
187    ) -> Poll<io::Result<usize>> {
188        self.poll_read_ref(cx, buf).map_err(Into::into)
189    }
190}
191
192impl AsyncWrite for Socket {
193    fn poll_write(
194        self: Pin<&mut Self>,
195        cx: &mut Context<'_>,
196        buf: &[u8],
197    ) -> Poll<io::Result<usize>> {
198        self.poll_write_ref(cx, buf).map_err(Into::into)
199    }
200
201    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
202        Poll::Ready(Ok(()))
203    }
204
205    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
206        Poll::Ready(Ok(()))
207    }
208}
209
210impl AsyncRead for &Socket {
211    /// Note: this function will never return `PEER_CLOSED` as an error. Instead, it will return
212    /// `Ok(0)` when the peer closes, to match the contract of `std::io::Read`.
213    fn poll_read(
214        self: Pin<&mut Self>,
215        cx: &mut Context<'_>,
216        buf: &mut [u8],
217    ) -> Poll<io::Result<usize>> {
218        self.poll_read_ref(cx, buf).map_err(Into::into)
219    }
220}
221
222impl AsyncWrite for &Socket {
223    fn poll_write(
224        self: Pin<&mut Self>,
225        cx: &mut Context<'_>,
226        buf: &[u8],
227    ) -> Poll<io::Result<usize>> {
228        self.poll_write_ref(cx, buf).map_err(Into::into)
229    }
230
231    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
232        Poll::Ready(Ok(()))
233    }
234
235    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
236        Poll::Ready(Ok(()))
237    }
238}
239
240/// A datagram stream from a `Socket`.
241#[derive(Debug)]
242pub struct DatagramStream<S>(pub S);
243
244fn poll_datagram_as_stream(
245    socket: &Socket,
246    cx: &mut Context<'_>,
247) -> Poll<Option<Result<Vec<u8>, zx::Status>>> {
248    let mut res = Vec::<u8>::new();
249    Poll::Ready(match ready!(socket.poll_datagram(cx, &mut res)) {
250        Ok(_size) => Some(Ok(res)),
251        Err(zx::Status::PEER_CLOSED) => None,
252        Err(e) => Some(Err(e)),
253    })
254}
255
256impl Stream for DatagramStream<Socket> {
257    type Item = Result<Vec<u8>, zx::Status>;
258
259    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
260        poll_datagram_as_stream(&self.0, cx)
261    }
262}
263
264impl Stream for DatagramStream<&Socket> {
265    type Item = Result<Vec<u8>, zx::Status>;
266
267    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
268        poll_datagram_as_stream(self.0, cx)
269    }
270}
271
272struct SocketRWHandleSpec;
273impl RWHandleSpec for SocketRWHandleSpec {
274    const READABLE_SIGNALS: zx::Signals =
275        zx::Signals::SOCKET_READABLE.union(zx::Signals::SOCKET_PEER_WRITE_DISABLED);
276    const WRITABLE_SIGNALS: zx::Signals =
277        zx::Signals::SOCKET_WRITABLE.union(zx::Signals::SOCKET_WRITE_DISABLED);
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::{MonotonicInstant, TestExecutor, TimeoutExt, Timer};
284
285    use futures::future::{self, join};
286    use futures::io::{AsyncReadExt as _, AsyncWriteExt as _};
287    use futures::stream::TryStreamExt;
288    use futures::task::noop_waker_ref;
289    use futures::FutureExt;
290    use std::pin::pin;
291    use zx::SocketWriteDisposition;
292
293    #[test]
294    fn can_read_write() {
295        let mut exec = TestExecutor::new();
296        let bytes = &[0, 1, 2, 3];
297
298        let (tx, rx) = zx::Socket::create_stream();
299        let (mut tx, mut rx) = (Socket::from_socket(tx), Socket::from_socket(rx));
300
301        let receive_future = async {
302            let mut buf = vec![];
303            rx.read_to_end(&mut buf).await.expect("reading socket");
304            assert_eq!(&*buf, bytes);
305        };
306
307        // add a timeout to receiver so if test is broken it doesn't take forever
308        // Note: if debugging a hang, you may want to lower the timeout to `300.millis()` to get
309        // faster feedback. This is set to 10s rather than something shorter to avoid triggering
310        // flakes if things happen to be slow.
311        let receiver = receive_future
312            .on_timeout(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10)), || {
313                panic!("timeout")
314            });
315
316        // Sends a message after the timeout has passed
317        let sender = async move {
318            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_millis(100))).await;
319            tx.write_all(bytes).await.expect("writing into socket");
320            // close socket to signal no more bytes will be written
321            drop(tx);
322        };
323
324        let done = join(receiver, sender);
325        exec.run_singlethreaded(done);
326    }
327
328    #[test]
329    fn can_read_datagram() {
330        let mut exec = TestExecutor::new();
331
332        let (one, two) = (&[0, 1], &[2, 3, 4, 5]);
333
334        let (tx, rx) = zx::Socket::create_datagram();
335        let rx = Socket::from_socket(rx);
336
337        let mut out = vec![50];
338
339        assert!(tx.write(one).is_ok());
340        assert!(tx.write(two).is_ok());
341
342        let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
343
344        assert!(size.is_ok());
345        assert_eq!(one.len(), size.unwrap());
346
347        assert_eq!([50, 0, 1], out.as_slice());
348
349        let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
350
351        assert!(size.is_ok());
352        assert_eq!(two.len(), size.unwrap());
353
354        assert_eq!([50, 0, 1, 2, 3, 4, 5], out.as_slice());
355    }
356
357    #[test]
358    fn stream_datagram() {
359        let mut exec = TestExecutor::new();
360
361        let (tx, rx) = zx::Socket::create_datagram();
362        let mut rx = Socket::from_socket(rx).into_datagram_stream();
363
364        let packets = 20;
365
366        for size in 1..packets + 1 {
367            let mut vec = Vec::<u8>::new();
368            vec.resize(size, size as u8);
369            assert!(tx.write(&vec).is_ok());
370        }
371
372        // Close the socket.
373        drop(tx);
374
375        let stream_read_fut = async move {
376            let mut count = 0;
377            while let Some(packet) = rx.try_next().await.expect("received error from stream") {
378                count += 1;
379                assert_eq!(packet.len(), count);
380                assert!(packet.iter().all(|&x| x == count as u8));
381            }
382            assert_eq!(packets, count);
383        };
384
385        exec.run_singlethreaded(stream_read_fut);
386    }
387
388    #[test]
389    fn peer_closed_signal_raised() {
390        let mut executor = TestExecutor::new();
391
392        let (s1, s2) = zx::Socket::create_stream();
393        let mut async_s2 = Socket::from_socket(s2);
394
395        // The socket won't start watching for peer-closed until we actually try reading from it.
396        let _ = executor.run_until_stalled(&mut pin!(async {
397            let mut buf = [0; 16];
398            let _ = async_s2.read(&mut buf).await;
399        }));
400
401        let on_closed_fut = async_s2.on_closed();
402
403        drop(s1);
404
405        // Now make sure all packets get processed before we poll the socket.
406        let _ = executor.run_until_stalled(&mut future::pending::<()>());
407
408        // Dropping s1 raises a closed signal on s2 when the executor next polls the signal port.
409        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
410
411        if let Poll::Ready(Ok(state)) = executor.run_until_stalled(&mut rx_fut) {
412            assert_eq!(state, ReadableState::MaybeReadableAndClosed);
413        } else {
414            panic!("Expected future to be ready and Ok");
415        }
416        assert!(async_s2.is_closed());
417        assert_eq!(on_closed_fut.now_or_never(), Some(Ok(zx::Signals::CHANNEL_PEER_CLOSED)));
418    }
419
420    #[test]
421    fn need_read_ensures_freshness() {
422        let mut executor = TestExecutor::new();
423
424        let (s1, s2) = zx::Socket::create_stream();
425        let async_s2 = Socket::from_socket(s2);
426
427        // The read signal is optimistically set on socket creation, so even though there is
428        // nothing to read, poll_readable returns Ready.
429        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
430        assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
431
432        // Call need_readable to reacquire the read signal. The socket now knows
433        // that the signal is not actually set, so returns Pending.
434        assert!(async_s2.need_readable(&mut Context::from_waker(noop_waker_ref())).is_pending());
435        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
436        assert!(executor.run_until_stalled(&mut rx_fut).is_pending());
437
438        assert_eq!(s1.write(b"hello!").expect("failed to write 6 bytes"), 6);
439
440        // After writing to s1, its peer now has an actual read signal and is Ready.
441        assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
442    }
443
444    #[test]
445    fn need_write_ensures_freshness() {
446        let mut executor = TestExecutor::new();
447
448        let (s1, s2) = zx::Socket::create_stream();
449
450        // Completely fill the transmit buffer. This socket is no longer writable.
451        let socket_info = s2.info().expect("failed to get socket info");
452        let bytes = vec![0u8; socket_info.tx_buf_max];
453        assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
454
455        let async_s2 = Socket::from_socket(s2);
456
457        // The write signal is optimistically set on socket creation, so even though it's not
458        // possible to write, poll_writable returns Ready.
459        let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
460        assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
461
462        // Call need_writable to reacquire the write signal. The socket now
463        // knows that the signal is not actually set, so returns Pending.
464        assert!(async_s2.need_writable(&mut Context::from_waker(noop_waker_ref())).is_pending());
465        let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
466        assert!(executor.run_until_stalled(&mut tx_fut).is_pending());
467
468        let mut buffer = [0u8; 5];
469        assert_eq!(s1.read(&mut buffer).expect("failed to read 5 bytes"), 5);
470
471        // After reading from s1, its peer is now able to write and should have a write signal.
472        assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
473    }
474
475    #[test]
476    fn half_closed_for_writes() {
477        let mut executor = TestExecutor::new();
478
479        let (s1, s2) = zx::Socket::create_stream();
480
481        // Completely fill the transmit buffer. This socket is no longer writable.
482        let socket_info = s2.info().expect("failed to get socket info");
483        let bytes = vec![0u8; socket_info.tx_buf_max];
484        assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
485
486        let async_s2 = Socket::from_socket(s2);
487        let mut tx_fut = poll_fn(|cx| async_s2.poll_write_ref(cx, &bytes[..]));
488        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
489
490        s1.set_disposition(None, Some(SocketWriteDisposition::Disabled)).expect("set disposition");
491        assert_eq!(
492            executor.run_until_stalled(&mut tx_fut),
493            Poll::Ready(Err::<usize, _>(zx::Status::BAD_STATE))
494        );
495
496        // Drain the socket so we can reopen it.
497        let mut readbuf = vec![0u8; bytes.len()];
498        assert_eq!(s1.read(&mut readbuf[..]), Ok(readbuf.len()));
499        s1.set_disposition(None, Some(SocketWriteDisposition::Enabled)).expect("set disposition");
500
501        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(bytes.len())));
502    }
503
504    #[test]
505    fn half_closed_for_reads() {
506        let mut executor = TestExecutor::new();
507
508        let (s1, s2) = zx::Socket::create_stream();
509        let async_s2 = Socket::from_socket(s2);
510        let mut bytes = [0u8; 10];
511        let mut tx_fut = poll_fn(|cx| async_s2.poll_read_ref(cx, &mut bytes[..]));
512        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
513
514        // Write a message and then half close.
515        let msg = b"hello";
516        assert_eq!(s1.write(msg), Ok(msg.len()));
517        s1.set_disposition(Some(SocketWriteDisposition::Disabled), None).expect("set disposition");
518        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(msg.len())));
519        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(0)));
520
521        // Reopen.
522        s1.set_disposition(Some(SocketWriteDisposition::Enabled), None).expect("set disposition");
523        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
524
525        // Close once more, without any bytes this time.
526        s1.set_disposition(Some(SocketWriteDisposition::Disabled), None).expect("set disposition");
527        assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(0)));
528    }
529}