fuchsia_async/handle/zircon/
socket.rs1use super::on_signals::OnSignalsRef;
6use super::rwhandle::{RWHandle, ReadableHandle, ReadableState, WritableHandle, WritableState};
7use futures::future::poll_fn;
8use futures::io::{self, AsyncRead, AsyncWrite};
9use futures::ready;
10use futures::stream::Stream;
11use futures::task::Context;
12use std::fmt;
13use std::pin::Pin;
14use std::task::Poll;
15use zx::{self as zx, AsHandleRef};
16
17pub struct Socket(RWHandle<zx::Socket>);
19
20impl AsRef<zx::Socket> for Socket {
21 fn as_ref(&self) -> &zx::Socket {
22 &self.0.get_ref()
23 }
24}
25
26impl AsHandleRef for Socket {
27 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
28 self.0.get_ref().as_handle_ref()
29 }
30}
31
32impl Socket {
33 pub fn from_socket(socket: zx::Socket) -> Self {
39 Socket(RWHandle::new(socket))
40 }
41
42 pub fn into_zx_socket(self) -> zx::Socket {
44 self.0.into_inner()
45 }
46
47 pub fn is_closed(&self) -> bool {
49 self.0.is_closed()
50 }
51
52 pub fn on_closed(&self) -> OnSignalsRef<'_> {
54 self.0.on_closed()
55 }
56
57 pub fn poll_read_ref(
64 &self,
65 cx: &mut Context<'_>,
66 buf: &mut [u8],
67 ) -> Poll<Result<usize, zx::Status>> {
68 ready!(self.poll_readable(cx))?;
69 loop {
70 let res = self.0.get_ref().read(buf);
71 match res {
72 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
73 Err(zx::Status::PEER_CLOSED) => return Poll::Ready(Ok(0)),
74 _ => return Poll::Ready(res),
75 }
76 }
77 }
78
79 pub fn poll_write_ref(
83 &self,
84 cx: &mut Context<'_>,
85 buf: &[u8],
86 ) -> Poll<Result<usize, zx::Status>> {
87 ready!(self.poll_writable(cx))?;
88 loop {
89 let res = self.0.get_ref().write(buf);
90 match res {
91 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
92 _ => return Poll::Ready(res),
93 }
94 }
95 }
96
97 pub fn poll_datagram(
101 &self,
102 cx: &mut Context<'_>,
103 out: &mut Vec<u8>,
104 ) -> Poll<Result<usize, zx::Status>> {
105 ready!(self.poll_readable(cx))?;
106 let avail = self.0.get_ref().outstanding_read_bytes()?;
107 let len = out.len();
108 out.resize(len + avail, 0);
109 let (_, mut tail) = out.split_at_mut(len);
110 loop {
111 match self.0.get_ref().read(&mut tail) {
112 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
113 Err(e) => return Poll::Ready(Err(e)),
114 Ok(bytes) => {
115 return if bytes == avail {
116 Poll::Ready(Ok(bytes))
117 } else {
118 Poll::Ready(Err(zx::Status::BAD_STATE))
119 }
120 }
121 }
122 }
123 }
124
125 pub async fn read_datagram<'a>(&'a self, out: &'a mut Vec<u8>) -> Result<usize, zx::Status> {
128 poll_fn(move |cx| self.poll_datagram(cx, out)).await
129 }
130
131 pub fn as_datagram_stream<'a>(&'a self) -> DatagramStream<&'a Self> {
135 DatagramStream(self)
136 }
137
138 pub fn into_datagram_stream(self) -> DatagramStream<Self> {
140 DatagramStream(self)
141 }
142}
143
144impl ReadableHandle for Socket {
145 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
146 self.0.poll_readable(cx)
147 }
148
149 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
150 self.0.need_readable(cx)
151 }
152}
153
154impl WritableHandle for Socket {
155 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
156 self.0.poll_writable(cx)
157 }
158
159 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
160 self.0.need_writable(cx)
161 }
162}
163
164impl fmt::Debug for Socket {
165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166 self.0.get_ref().fmt(f)
167 }
168}
169
170impl AsyncRead for Socket {
171 fn poll_read(
174 self: Pin<&mut Self>,
175 cx: &mut Context<'_>,
176 buf: &mut [u8],
177 ) -> Poll<io::Result<usize>> {
178 self.poll_read_ref(cx, buf).map_err(Into::into)
179 }
180}
181
182impl AsyncWrite for Socket {
183 fn poll_write(
184 self: Pin<&mut Self>,
185 cx: &mut Context<'_>,
186 buf: &[u8],
187 ) -> Poll<io::Result<usize>> {
188 self.poll_write_ref(cx, buf).map_err(Into::into)
189 }
190
191 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
192 Poll::Ready(Ok(()))
193 }
194
195 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
196 Poll::Ready(Ok(()))
197 }
198}
199
200impl<'a> AsyncRead for &'a Socket {
201 fn poll_read(
204 self: Pin<&mut Self>,
205 cx: &mut Context<'_>,
206 buf: &mut [u8],
207 ) -> Poll<io::Result<usize>> {
208 self.poll_read_ref(cx, buf).map_err(Into::into)
209 }
210}
211
212impl<'a> AsyncWrite for &'a Socket {
213 fn poll_write(
214 self: Pin<&mut Self>,
215 cx: &mut Context<'_>,
216 buf: &[u8],
217 ) -> Poll<io::Result<usize>> {
218 self.poll_write_ref(cx, buf).map_err(Into::into)
219 }
220
221 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
222 Poll::Ready(Ok(()))
223 }
224
225 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
226 Poll::Ready(Ok(()))
227 }
228}
229
230#[derive(Debug)]
232pub struct DatagramStream<S>(pub S);
233
234fn poll_datagram_as_stream(
235 socket: &Socket,
236 cx: &mut Context<'_>,
237) -> Poll<Option<Result<Vec<u8>, zx::Status>>> {
238 let mut res = Vec::<u8>::new();
239 Poll::Ready(match ready!(socket.poll_datagram(cx, &mut res)) {
240 Ok(_size) => Some(Ok(res)),
241 Err(zx::Status::PEER_CLOSED) => None,
242 Err(e) => Some(Err(e)),
243 })
244}
245
246impl Stream for DatagramStream<Socket> {
247 type Item = Result<Vec<u8>, zx::Status>;
248
249 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
250 poll_datagram_as_stream(&self.0, cx)
251 }
252}
253
254impl Stream for DatagramStream<&Socket> {
255 type Item = Result<Vec<u8>, zx::Status>;
256
257 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
258 poll_datagram_as_stream(self.0, cx)
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use crate::{MonotonicInstant, TestExecutor, TimeoutExt, Timer};
266
267 use futures::future::{self, join};
268 use futures::io::{AsyncReadExt as _, AsyncWriteExt as _};
269 use futures::stream::TryStreamExt;
270 use futures::task::noop_waker_ref;
271 use futures::FutureExt;
272 use std::pin::pin;
273
274 #[test]
275 fn can_read_write() {
276 let mut exec = TestExecutor::new();
277 let bytes = &[0, 1, 2, 3];
278
279 let (tx, rx) = zx::Socket::create_stream();
280 let (mut tx, mut rx) = (Socket::from_socket(tx), Socket::from_socket(rx));
281
282 let receive_future = async {
283 let mut buf = vec![];
284 rx.read_to_end(&mut buf).await.expect("reading socket");
285 assert_eq!(&*buf, bytes);
286 };
287
288 let receiver = receive_future
293 .on_timeout(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10)), || {
294 panic!("timeout")
295 });
296
297 let sender = async move {
299 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_millis(100))).await;
300 tx.write_all(bytes).await.expect("writing into socket");
301 drop(tx);
303 };
304
305 let done = join(receiver, sender);
306 exec.run_singlethreaded(done);
307 }
308
309 #[test]
310 fn can_read_datagram() {
311 let mut exec = TestExecutor::new();
312
313 let (one, two) = (&[0, 1], &[2, 3, 4, 5]);
314
315 let (tx, rx) = zx::Socket::create_datagram();
316 let rx = Socket::from_socket(rx);
317
318 let mut out = vec![50];
319
320 assert!(tx.write(one).is_ok());
321 assert!(tx.write(two).is_ok());
322
323 let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
324
325 assert!(size.is_ok());
326 assert_eq!(one.len(), size.unwrap());
327
328 assert_eq!([50, 0, 1], out.as_slice());
329
330 let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
331
332 assert!(size.is_ok());
333 assert_eq!(two.len(), size.unwrap());
334
335 assert_eq!([50, 0, 1, 2, 3, 4, 5], out.as_slice());
336 }
337
338 #[test]
339 fn stream_datagram() {
340 let mut exec = TestExecutor::new();
341
342 let (tx, rx) = zx::Socket::create_datagram();
343 let mut rx = Socket::from_socket(rx).into_datagram_stream();
344
345 let packets = 20;
346
347 for size in 1..packets + 1 {
348 let mut vec = Vec::<u8>::new();
349 vec.resize(size, size as u8);
350 assert!(tx.write(&vec).is_ok());
351 }
352
353 drop(tx);
355
356 let stream_read_fut = async move {
357 let mut count = 0;
358 while let Some(packet) = rx.try_next().await.expect("received error from stream") {
359 count = count + 1;
360 assert_eq!(packet.len(), count);
361 assert!(packet.iter().all(|&x| x == count as u8));
362 }
363 assert_eq!(packets, count);
364 };
365
366 exec.run_singlethreaded(stream_read_fut);
367 }
368
369 #[test]
370 fn peer_closed_signal_raised() {
371 let mut executor = TestExecutor::new();
372
373 let (s1, s2) = zx::Socket::create_stream();
374 let mut async_s2 = Socket::from_socket(s2);
375
376 let _ = executor.run_until_stalled(&mut pin!(async {
378 let mut buf = [0; 16];
379 let _ = async_s2.read(&mut buf).await;
380 }));
381
382 let on_closed_fut = async_s2.on_closed();
383
384 drop(s1);
385
386 let _ = executor.run_until_stalled(&mut future::pending::<()>());
388
389 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
391
392 if let Poll::Ready(Ok(state)) = executor.run_until_stalled(&mut rx_fut) {
393 assert_eq!(state, ReadableState::MaybeReadableAndClosed);
394 } else {
395 panic!("Expected future to be ready and Ok");
396 }
397 assert!(async_s2.is_closed());
398 assert_eq!(on_closed_fut.now_or_never(), Some(Ok(zx::Signals::CHANNEL_PEER_CLOSED)));
399 }
400
401 #[test]
402 fn need_read_ensures_freshness() {
403 let mut executor = TestExecutor::new();
404
405 let (s1, s2) = zx::Socket::create_stream();
406 let async_s2 = Socket::from_socket(s2);
407
408 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
411 assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
412
413 assert!(async_s2.need_readable(&mut Context::from_waker(noop_waker_ref())).is_pending());
416 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
417 assert!(executor.run_until_stalled(&mut rx_fut).is_pending());
418
419 assert_eq!(s1.write(b"hello!").expect("failed to write 6 bytes"), 6);
420
421 assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
423 }
424
425 #[test]
426 fn need_write_ensures_freshness() {
427 let mut executor = TestExecutor::new();
428
429 let (s1, s2) = zx::Socket::create_stream();
430
431 let socket_info = s2.info().expect("failed to get socket info");
433 let bytes = vec![0u8; socket_info.tx_buf_max];
434 assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
435
436 let async_s2 = Socket::from_socket(s2);
437
438 let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
441 assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
442
443 assert!(async_s2.need_writable(&mut Context::from_waker(noop_waker_ref())).is_pending());
446 let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
447 assert!(executor.run_until_stalled(&mut tx_fut).is_pending());
448
449 let mut buffer = [0u8; 5];
450 assert_eq!(s1.read(&mut buffer).expect("failed to read 5 bytes"), 5);
451
452 assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
454 }
455}