fuchsia_async/handle/zircon/
socket.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::on_signals::OnSignalsRef;
6use super::rwhandle::{RWHandle, ReadableHandle, ReadableState, WritableHandle, WritableState};
7use futures::future::poll_fn;
8use futures::io::{self, AsyncRead, AsyncWrite};
9use futures::ready;
10use futures::stream::Stream;
11use futures::task::Context;
12use std::fmt;
13use std::pin::Pin;
14use std::task::Poll;
15use zx::{self as zx, AsHandleRef};
16
17/// An I/O object representing a `Socket`.
18pub struct Socket(RWHandle<zx::Socket>);
19
20impl AsRef<zx::Socket> for Socket {
21    fn as_ref(&self) -> &zx::Socket {
22        &self.0.get_ref()
23    }
24}
25
26impl AsHandleRef for Socket {
27    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
28        self.0.get_ref().as_handle_ref()
29    }
30}
31
32impl Socket {
33    /// Create a new `Socket` from a previously-created `zx::Socket`.
34    ///
35    /// # Panics
36    ///
37    /// If called outside the context of an active async executor.
38    pub fn from_socket(socket: zx::Socket) -> Self {
39        Socket(RWHandle::new(socket))
40    }
41
42    /// Consumes `self` and returns the underlying `zx::Socket`.
43    pub fn into_zx_socket(self) -> zx::Socket {
44        self.0.into_inner()
45    }
46
47    /// Returns true if the socket received the `OBJECT_PEER_CLOSED` signal.
48    pub fn is_closed(&self) -> bool {
49        self.0.is_closed()
50    }
51
52    /// Returns a future that completes when the socket received the `OBJECT_PEER_CLOSED` signal.
53    pub fn on_closed(&self) -> OnSignalsRef<'_> {
54        self.0.on_closed()
55    }
56
57    /// Attempt to read from the socket, registering for wakeup if the socket doesn't have any
58    /// contents available. Used internally in the `AsyncRead` implementation, exposed for users
59    /// who know the concrete type they're using and don't want to pin the socket.
60    ///
61    /// Note: this function will never return `PEER_CLOSED` as an error. Instead, it will return
62    /// `Ok(0)` when the peer closes, to match the contract of `std::io::Read`.
63    pub fn poll_read_ref(
64        &self,
65        cx: &mut Context<'_>,
66        buf: &mut [u8],
67    ) -> Poll<Result<usize, zx::Status>> {
68        ready!(self.poll_readable(cx))?;
69        loop {
70            let res = self.0.get_ref().read(buf);
71            match res {
72                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
73                Err(zx::Status::PEER_CLOSED) => return Poll::Ready(Ok(0)),
74                _ => return Poll::Ready(res),
75            }
76        }
77    }
78
79    /// Attempt to write into the socket, registering for wakeup if the socket is not ready. Used
80    /// internally in the `AsyncWrite` implementation, exposed for users who know the concrete type
81    /// they're using and don't want to pin the socket.
82    pub fn poll_write_ref(
83        &self,
84        cx: &mut Context<'_>,
85        buf: &[u8],
86    ) -> Poll<Result<usize, zx::Status>> {
87        ready!(self.poll_writable(cx))?;
88        loop {
89            let res = self.0.get_ref().write(buf);
90            match res {
91                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
92                _ => return Poll::Ready(res),
93            }
94        }
95    }
96
97    /// Polls for the next data on the socket, appending it to the end of |out| if it has arrived.
98    /// Not very useful for a non-datagram socket as it will return all available data
99    /// on the socket.
100    pub fn poll_datagram(
101        &self,
102        cx: &mut Context<'_>,
103        out: &mut Vec<u8>,
104    ) -> Poll<Result<usize, zx::Status>> {
105        ready!(self.poll_readable(cx))?;
106        let avail = self.0.get_ref().outstanding_read_bytes()?;
107        let len = out.len();
108        out.resize(len + avail, 0);
109        let (_, mut tail) = out.split_at_mut(len);
110        loop {
111            match self.0.get_ref().read(&mut tail) {
112                Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
113                Err(e) => return Poll::Ready(Err(e)),
114                Ok(bytes) => {
115                    return if bytes == avail {
116                        Poll::Ready(Ok(bytes))
117                    } else {
118                        Poll::Ready(Err(zx::Status::BAD_STATE))
119                    }
120                }
121            }
122        }
123    }
124
125    /// Reads the next datagram that becomes available onto the end of |out|.  Note: Using this
126    /// multiple times concurrently is an error and the first one will never complete.
127    pub async fn read_datagram<'a>(&'a self, out: &'a mut Vec<u8>) -> Result<usize, zx::Status> {
128        poll_fn(move |cx| self.poll_datagram(cx, out)).await
129    }
130
131    /// Use this socket as a stream of `Result<Vec<u8>, zx::Status>` datagrams.
132    ///
133    /// Note: multiple concurrent streams from the same socket are not supported.
134    pub fn as_datagram_stream<'a>(&'a self) -> DatagramStream<&'a Self> {
135        DatagramStream(self)
136    }
137
138    /// Convert this socket into a stream of `Result<Vec<u8>, zx::Status>` datagrams.
139    pub fn into_datagram_stream(self) -> DatagramStream<Self> {
140        DatagramStream(self)
141    }
142}
143
144impl ReadableHandle for Socket {
145    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
146        self.0.poll_readable(cx)
147    }
148
149    fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
150        self.0.need_readable(cx)
151    }
152}
153
154impl WritableHandle for Socket {
155    fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
156        self.0.poll_writable(cx)
157    }
158
159    fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
160        self.0.need_writable(cx)
161    }
162}
163
164impl fmt::Debug for Socket {
165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166        self.0.get_ref().fmt(f)
167    }
168}
169
170impl AsyncRead for Socket {
171    /// Note: this function will never return `PEER_CLOSED` as an error. Instead, it will return
172    /// `Ok(0)` when the peer closes, to match the contract of `std::io::Read`.
173    fn poll_read(
174        self: Pin<&mut Self>,
175        cx: &mut Context<'_>,
176        buf: &mut [u8],
177    ) -> Poll<io::Result<usize>> {
178        self.poll_read_ref(cx, buf).map_err(Into::into)
179    }
180}
181
182impl AsyncWrite for Socket {
183    fn poll_write(
184        self: Pin<&mut Self>,
185        cx: &mut Context<'_>,
186        buf: &[u8],
187    ) -> Poll<io::Result<usize>> {
188        self.poll_write_ref(cx, buf).map_err(Into::into)
189    }
190
191    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
192        Poll::Ready(Ok(()))
193    }
194
195    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
196        Poll::Ready(Ok(()))
197    }
198}
199
200impl<'a> AsyncRead for &'a Socket {
201    /// Note: this function will never return `PEER_CLOSED` as an error. Instead, it will return
202    /// `Ok(0)` when the peer closes, to match the contract of `std::io::Read`.
203    fn poll_read(
204        self: Pin<&mut Self>,
205        cx: &mut Context<'_>,
206        buf: &mut [u8],
207    ) -> Poll<io::Result<usize>> {
208        self.poll_read_ref(cx, buf).map_err(Into::into)
209    }
210}
211
212impl<'a> AsyncWrite for &'a Socket {
213    fn poll_write(
214        self: Pin<&mut Self>,
215        cx: &mut Context<'_>,
216        buf: &[u8],
217    ) -> Poll<io::Result<usize>> {
218        self.poll_write_ref(cx, buf).map_err(Into::into)
219    }
220
221    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
222        Poll::Ready(Ok(()))
223    }
224
225    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
226        Poll::Ready(Ok(()))
227    }
228}
229
230/// A datagram stream from a `Socket`.
231#[derive(Debug)]
232pub struct DatagramStream<S>(pub S);
233
234fn poll_datagram_as_stream(
235    socket: &Socket,
236    cx: &mut Context<'_>,
237) -> Poll<Option<Result<Vec<u8>, zx::Status>>> {
238    let mut res = Vec::<u8>::new();
239    Poll::Ready(match ready!(socket.poll_datagram(cx, &mut res)) {
240        Ok(_size) => Some(Ok(res)),
241        Err(zx::Status::PEER_CLOSED) => None,
242        Err(e) => Some(Err(e)),
243    })
244}
245
246impl Stream for DatagramStream<Socket> {
247    type Item = Result<Vec<u8>, zx::Status>;
248
249    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
250        poll_datagram_as_stream(&self.0, cx)
251    }
252}
253
254impl Stream for DatagramStream<&Socket> {
255    type Item = Result<Vec<u8>, zx::Status>;
256
257    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
258        poll_datagram_as_stream(self.0, cx)
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use crate::{MonotonicInstant, TestExecutor, TimeoutExt, Timer};
266
267    use futures::future::{self, join};
268    use futures::io::{AsyncReadExt as _, AsyncWriteExt as _};
269    use futures::stream::TryStreamExt;
270    use futures::task::noop_waker_ref;
271    use futures::FutureExt;
272    use std::pin::pin;
273
274    #[test]
275    fn can_read_write() {
276        let mut exec = TestExecutor::new();
277        let bytes = &[0, 1, 2, 3];
278
279        let (tx, rx) = zx::Socket::create_stream();
280        let (mut tx, mut rx) = (Socket::from_socket(tx), Socket::from_socket(rx));
281
282        let receive_future = async {
283            let mut buf = vec![];
284            rx.read_to_end(&mut buf).await.expect("reading socket");
285            assert_eq!(&*buf, bytes);
286        };
287
288        // add a timeout to receiver so if test is broken it doesn't take forever
289        // Note: if debugging a hang, you may want to lower the timeout to `300.millis()` to get
290        // faster feedback. This is set to 10s rather than something shorter to avoid triggering
291        // flakes if things happen to be slow.
292        let receiver = receive_future
293            .on_timeout(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10)), || {
294                panic!("timeout")
295            });
296
297        // Sends a message after the timeout has passed
298        let sender = async move {
299            Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_millis(100))).await;
300            tx.write_all(bytes).await.expect("writing into socket");
301            // close socket to signal no more bytes will be written
302            drop(tx);
303        };
304
305        let done = join(receiver, sender);
306        exec.run_singlethreaded(done);
307    }
308
309    #[test]
310    fn can_read_datagram() {
311        let mut exec = TestExecutor::new();
312
313        let (one, two) = (&[0, 1], &[2, 3, 4, 5]);
314
315        let (tx, rx) = zx::Socket::create_datagram();
316        let rx = Socket::from_socket(rx);
317
318        let mut out = vec![50];
319
320        assert!(tx.write(one).is_ok());
321        assert!(tx.write(two).is_ok());
322
323        let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
324
325        assert!(size.is_ok());
326        assert_eq!(one.len(), size.unwrap());
327
328        assert_eq!([50, 0, 1], out.as_slice());
329
330        let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
331
332        assert!(size.is_ok());
333        assert_eq!(two.len(), size.unwrap());
334
335        assert_eq!([50, 0, 1, 2, 3, 4, 5], out.as_slice());
336    }
337
338    #[test]
339    fn stream_datagram() {
340        let mut exec = TestExecutor::new();
341
342        let (tx, rx) = zx::Socket::create_datagram();
343        let mut rx = Socket::from_socket(rx).into_datagram_stream();
344
345        let packets = 20;
346
347        for size in 1..packets + 1 {
348            let mut vec = Vec::<u8>::new();
349            vec.resize(size, size as u8);
350            assert!(tx.write(&vec).is_ok());
351        }
352
353        // Close the socket.
354        drop(tx);
355
356        let stream_read_fut = async move {
357            let mut count = 0;
358            while let Some(packet) = rx.try_next().await.expect("received error from stream") {
359                count = count + 1;
360                assert_eq!(packet.len(), count);
361                assert!(packet.iter().all(|&x| x == count as u8));
362            }
363            assert_eq!(packets, count);
364        };
365
366        exec.run_singlethreaded(stream_read_fut);
367    }
368
369    #[test]
370    fn peer_closed_signal_raised() {
371        let mut executor = TestExecutor::new();
372
373        let (s1, s2) = zx::Socket::create_stream();
374        let mut async_s2 = Socket::from_socket(s2);
375
376        // The socket won't start watching for peer-closed until we actually try reading from it.
377        let _ = executor.run_until_stalled(&mut pin!(async {
378            let mut buf = [0; 16];
379            let _ = async_s2.read(&mut buf).await;
380        }));
381
382        let on_closed_fut = async_s2.on_closed();
383
384        drop(s1);
385
386        // Now make sure all packets get processed before we poll the socket.
387        let _ = executor.run_until_stalled(&mut future::pending::<()>());
388
389        // Dropping s1 raises a closed signal on s2 when the executor next polls the signal port.
390        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
391
392        if let Poll::Ready(Ok(state)) = executor.run_until_stalled(&mut rx_fut) {
393            assert_eq!(state, ReadableState::MaybeReadableAndClosed);
394        } else {
395            panic!("Expected future to be ready and Ok");
396        }
397        assert!(async_s2.is_closed());
398        assert_eq!(on_closed_fut.now_or_never(), Some(Ok(zx::Signals::CHANNEL_PEER_CLOSED)));
399    }
400
401    #[test]
402    fn need_read_ensures_freshness() {
403        let mut executor = TestExecutor::new();
404
405        let (s1, s2) = zx::Socket::create_stream();
406        let async_s2 = Socket::from_socket(s2);
407
408        // The read signal is optimistically set on socket creation, so even though there is
409        // nothing to read, poll_readable returns Ready.
410        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
411        assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
412
413        // Call need_readable to reacquire the read signal. The socket now knows
414        // that the signal is not actually set, so returns Pending.
415        assert!(async_s2.need_readable(&mut Context::from_waker(noop_waker_ref())).is_pending());
416        let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
417        assert!(executor.run_until_stalled(&mut rx_fut).is_pending());
418
419        assert_eq!(s1.write(b"hello!").expect("failed to write 6 bytes"), 6);
420
421        // After writing to s1, its peer now has an actual read signal and is Ready.
422        assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
423    }
424
425    #[test]
426    fn need_write_ensures_freshness() {
427        let mut executor = TestExecutor::new();
428
429        let (s1, s2) = zx::Socket::create_stream();
430
431        // Completely fill the transmit buffer. This socket is no longer writable.
432        let socket_info = s2.info().expect("failed to get socket info");
433        let bytes = vec![0u8; socket_info.tx_buf_max];
434        assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
435
436        let async_s2 = Socket::from_socket(s2);
437
438        // The write signal is optimistically set on socket creation, so even though it's not
439        // possible to write, poll_writable returns Ready.
440        let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
441        assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
442
443        // Call need_writable to reacquire the write signal. The socket now
444        // knows that the signal is not actually set, so returns Pending.
445        assert!(async_s2.need_writable(&mut Context::from_waker(noop_waker_ref())).is_pending());
446        let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
447        assert!(executor.run_until_stalled(&mut tx_fut).is_pending());
448
449        let mut buffer = [0u8; 5];
450        assert_eq!(s1.read(&mut buffer).expect("failed to read 5 bytes"), 5);
451
452        // After reading from s1, its peer is now able to write and should have a write signal.
453        assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
454    }
455}