fuchsia_async/handle/zircon/
socket.rs1use super::on_signals::OnSignalsRef;
6use super::rwhandle::{
7 RWHandle, RWHandleSpec, ReadableHandle, ReadableState, WritableHandle, WritableState,
8};
9use futures::future::poll_fn;
10use futures::io::{self, AsyncRead, AsyncWrite};
11use futures::ready;
12use futures::stream::Stream;
13use futures::task::Context;
14use std::fmt;
15use std::pin::Pin;
16use std::task::Poll;
17use zx::{self as zx, AsHandleRef};
18
19pub struct Socket(RWHandle<zx::Socket, SocketRWHandleSpec>);
21
22impl AsRef<zx::Socket> for Socket {
23 fn as_ref(&self) -> &zx::Socket {
24 self.0.get_ref()
25 }
26}
27
28impl AsHandleRef for Socket {
29 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
30 self.0.get_ref().as_handle_ref()
31 }
32}
33
34impl Socket {
35 pub fn from_socket(socket: zx::Socket) -> Self {
41 Socket(RWHandle::new_with_spec(socket))
42 }
43
44 pub fn into_zx_socket(self) -> zx::Socket {
46 self.0.into_inner()
47 }
48
49 pub fn is_closed(&self) -> bool {
51 self.0.is_closed()
52 }
53
54 pub fn on_closed(&self) -> OnSignalsRef<'_> {
56 self.0.on_closed()
57 }
58
59 pub fn poll_read_ref(
66 &self,
67 cx: &mut Context<'_>,
68 buf: &mut [u8],
69 ) -> Poll<Result<usize, zx::Status>> {
70 ready!(self.poll_readable(cx))?;
71 loop {
72 let res = self.0.get_ref().read(buf);
73 match res {
74 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
75 Err(zx::Status::BAD_STATE) => {
76 return Poll::Ready(Ok(0));
78 }
79 Err(zx::Status::PEER_CLOSED) => return Poll::Ready(Ok(0)),
80 _ => return Poll::Ready(res),
81 }
82 }
83 }
84
85 pub fn poll_write_ref(
89 &self,
90 cx: &mut Context<'_>,
91 buf: &[u8],
92 ) -> Poll<Result<usize, zx::Status>> {
93 ready!(self.poll_writable(cx))?;
94 loop {
95 let res = self.0.get_ref().write(buf);
96 match res {
97 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_writable(cx)?),
98 Err(zx::Status::BAD_STATE) => {
99 return Poll::Ready(Err(zx::Status::BAD_STATE));
101 }
102 _ => return Poll::Ready(res),
103 }
104 }
105 }
106
107 pub fn poll_datagram(
111 &self,
112 cx: &mut Context<'_>,
113 out: &mut Vec<u8>,
114 ) -> Poll<Result<usize, zx::Status>> {
115 ready!(self.poll_readable(cx))?;
116 let avail = self.0.get_ref().outstanding_read_bytes()?;
117 let len = out.len();
118 out.resize(len + avail, 0);
119 let (_, tail) = out.split_at_mut(len);
120 loop {
121 match self.0.get_ref().read(tail) {
122 Err(zx::Status::SHOULD_WAIT) => ready!(self.need_readable(cx)?),
123 Err(e) => return Poll::Ready(Err(e)),
124 Ok(bytes) => {
125 return if bytes == avail {
126 Poll::Ready(Ok(bytes))
127 } else {
128 Poll::Ready(Err(zx::Status::IO_DATA_LOSS))
129 }
130 }
131 }
132 }
133 }
134
135 pub async fn read_datagram<'a>(&'a self, out: &'a mut Vec<u8>) -> Result<usize, zx::Status> {
138 poll_fn(move |cx| self.poll_datagram(cx, out)).await
139 }
140
141 pub fn as_datagram_stream(&self) -> DatagramStream<&Self> {
145 DatagramStream(self)
146 }
147
148 pub fn into_datagram_stream(self) -> DatagramStream<Self> {
150 DatagramStream(self)
151 }
152}
153
154impl ReadableHandle for Socket {
155 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<ReadableState, zx::Status>> {
156 self.0.poll_readable(cx)
157 }
158
159 fn need_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
160 self.0.need_readable(cx)
161 }
162}
163
164impl WritableHandle for Socket {
165 fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<WritableState, zx::Status>> {
166 self.0.poll_writable(cx)
167 }
168
169 fn need_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
170 self.0.need_writable(cx)
171 }
172}
173
174impl fmt::Debug for Socket {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 self.0.get_ref().fmt(f)
177 }
178}
179
180impl AsyncRead for Socket {
181 fn poll_read(
184 self: Pin<&mut Self>,
185 cx: &mut Context<'_>,
186 buf: &mut [u8],
187 ) -> Poll<io::Result<usize>> {
188 self.poll_read_ref(cx, buf).map_err(Into::into)
189 }
190}
191
192impl AsyncWrite for Socket {
193 fn poll_write(
194 self: Pin<&mut Self>,
195 cx: &mut Context<'_>,
196 buf: &[u8],
197 ) -> Poll<io::Result<usize>> {
198 self.poll_write_ref(cx, buf).map_err(Into::into)
199 }
200
201 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
202 Poll::Ready(Ok(()))
203 }
204
205 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
206 Poll::Ready(Ok(()))
207 }
208}
209
210impl AsyncRead for &Socket {
211 fn poll_read(
214 self: Pin<&mut Self>,
215 cx: &mut Context<'_>,
216 buf: &mut [u8],
217 ) -> Poll<io::Result<usize>> {
218 self.poll_read_ref(cx, buf).map_err(Into::into)
219 }
220}
221
222impl AsyncWrite for &Socket {
223 fn poll_write(
224 self: Pin<&mut Self>,
225 cx: &mut Context<'_>,
226 buf: &[u8],
227 ) -> Poll<io::Result<usize>> {
228 self.poll_write_ref(cx, buf).map_err(Into::into)
229 }
230
231 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
232 Poll::Ready(Ok(()))
233 }
234
235 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
236 Poll::Ready(Ok(()))
237 }
238}
239
240#[derive(Debug)]
242pub struct DatagramStream<S>(pub S);
243
244fn poll_datagram_as_stream(
245 socket: &Socket,
246 cx: &mut Context<'_>,
247) -> Poll<Option<Result<Vec<u8>, zx::Status>>> {
248 let mut res = Vec::<u8>::new();
249 Poll::Ready(match ready!(socket.poll_datagram(cx, &mut res)) {
250 Ok(_size) => Some(Ok(res)),
251 Err(zx::Status::PEER_CLOSED) => None,
252 Err(e) => Some(Err(e)),
253 })
254}
255
256impl Stream for DatagramStream<Socket> {
257 type Item = Result<Vec<u8>, zx::Status>;
258
259 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
260 poll_datagram_as_stream(&self.0, cx)
261 }
262}
263
264impl Stream for DatagramStream<&Socket> {
265 type Item = Result<Vec<u8>, zx::Status>;
266
267 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
268 poll_datagram_as_stream(self.0, cx)
269 }
270}
271
272struct SocketRWHandleSpec;
273impl RWHandleSpec for SocketRWHandleSpec {
274 const READABLE_SIGNALS: zx::Signals =
275 zx::Signals::SOCKET_READABLE.union(zx::Signals::SOCKET_PEER_WRITE_DISABLED);
276 const WRITABLE_SIGNALS: zx::Signals =
277 zx::Signals::SOCKET_WRITABLE.union(zx::Signals::SOCKET_WRITE_DISABLED);
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::{MonotonicInstant, TestExecutor, TimeoutExt, Timer};
284
285 use futures::future::{self, join};
286 use futures::io::{AsyncReadExt as _, AsyncWriteExt as _};
287 use futures::stream::TryStreamExt;
288 use futures::task::noop_waker_ref;
289 use futures::FutureExt;
290 use std::pin::pin;
291 use zx::SocketWriteDisposition;
292
293 #[test]
294 fn can_read_write() {
295 let mut exec = TestExecutor::new();
296 let bytes = &[0, 1, 2, 3];
297
298 let (tx, rx) = zx::Socket::create_stream();
299 let (mut tx, mut rx) = (Socket::from_socket(tx), Socket::from_socket(rx));
300
301 let receive_future = async {
302 let mut buf = vec![];
303 rx.read_to_end(&mut buf).await.expect("reading socket");
304 assert_eq!(&*buf, bytes);
305 };
306
307 let receiver = receive_future
312 .on_timeout(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(10)), || {
313 panic!("timeout")
314 });
315
316 let sender = async move {
318 Timer::new(MonotonicInstant::after(zx::MonotonicDuration::from_millis(100))).await;
319 tx.write_all(bytes).await.expect("writing into socket");
320 drop(tx);
322 };
323
324 let done = join(receiver, sender);
325 exec.run_singlethreaded(done);
326 }
327
328 #[test]
329 fn can_read_datagram() {
330 let mut exec = TestExecutor::new();
331
332 let (one, two) = (&[0, 1], &[2, 3, 4, 5]);
333
334 let (tx, rx) = zx::Socket::create_datagram();
335 let rx = Socket::from_socket(rx);
336
337 let mut out = vec![50];
338
339 assert!(tx.write(one).is_ok());
340 assert!(tx.write(two).is_ok());
341
342 let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
343
344 assert!(size.is_ok());
345 assert_eq!(one.len(), size.unwrap());
346
347 assert_eq!([50, 0, 1], out.as_slice());
348
349 let size = exec.run_singlethreaded(rx.read_datagram(&mut out));
350
351 assert!(size.is_ok());
352 assert_eq!(two.len(), size.unwrap());
353
354 assert_eq!([50, 0, 1, 2, 3, 4, 5], out.as_slice());
355 }
356
357 #[test]
358 fn stream_datagram() {
359 let mut exec = TestExecutor::new();
360
361 let (tx, rx) = zx::Socket::create_datagram();
362 let mut rx = Socket::from_socket(rx).into_datagram_stream();
363
364 let packets = 20;
365
366 for size in 1..packets + 1 {
367 let mut vec = Vec::<u8>::new();
368 vec.resize(size, size as u8);
369 assert!(tx.write(&vec).is_ok());
370 }
371
372 drop(tx);
374
375 let stream_read_fut = async move {
376 let mut count = 0;
377 while let Some(packet) = rx.try_next().await.expect("received error from stream") {
378 count += 1;
379 assert_eq!(packet.len(), count);
380 assert!(packet.iter().all(|&x| x == count as u8));
381 }
382 assert_eq!(packets, count);
383 };
384
385 exec.run_singlethreaded(stream_read_fut);
386 }
387
388 #[test]
389 fn peer_closed_signal_raised() {
390 let mut executor = TestExecutor::new();
391
392 let (s1, s2) = zx::Socket::create_stream();
393 let mut async_s2 = Socket::from_socket(s2);
394
395 let _ = executor.run_until_stalled(&mut pin!(async {
397 let mut buf = [0; 16];
398 let _ = async_s2.read(&mut buf).await;
399 }));
400
401 let on_closed_fut = async_s2.on_closed();
402
403 drop(s1);
404
405 let _ = executor.run_until_stalled(&mut future::pending::<()>());
407
408 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
410
411 if let Poll::Ready(Ok(state)) = executor.run_until_stalled(&mut rx_fut) {
412 assert_eq!(state, ReadableState::MaybeReadableAndClosed);
413 } else {
414 panic!("Expected future to be ready and Ok");
415 }
416 assert!(async_s2.is_closed());
417 assert_eq!(on_closed_fut.now_or_never(), Some(Ok(zx::Signals::CHANNEL_PEER_CLOSED)));
418 }
419
420 #[test]
421 fn need_read_ensures_freshness() {
422 let mut executor = TestExecutor::new();
423
424 let (s1, s2) = zx::Socket::create_stream();
425 let async_s2 = Socket::from_socket(s2);
426
427 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
430 assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
431
432 assert!(async_s2.need_readable(&mut Context::from_waker(noop_waker_ref())).is_pending());
435 let mut rx_fut = poll_fn(|cx| async_s2.poll_readable(cx));
436 assert!(executor.run_until_stalled(&mut rx_fut).is_pending());
437
438 assert_eq!(s1.write(b"hello!").expect("failed to write 6 bytes"), 6);
439
440 assert!(executor.run_until_stalled(&mut rx_fut).is_ready());
442 }
443
444 #[test]
445 fn need_write_ensures_freshness() {
446 let mut executor = TestExecutor::new();
447
448 let (s1, s2) = zx::Socket::create_stream();
449
450 let socket_info = s2.info().expect("failed to get socket info");
452 let bytes = vec![0u8; socket_info.tx_buf_max];
453 assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
454
455 let async_s2 = Socket::from_socket(s2);
456
457 let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
460 assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
461
462 assert!(async_s2.need_writable(&mut Context::from_waker(noop_waker_ref())).is_pending());
465 let mut tx_fut = poll_fn(|cx| async_s2.poll_writable(cx));
466 assert!(executor.run_until_stalled(&mut tx_fut).is_pending());
467
468 let mut buffer = [0u8; 5];
469 assert_eq!(s1.read(&mut buffer).expect("failed to read 5 bytes"), 5);
470
471 assert!(executor.run_until_stalled(&mut tx_fut).is_ready());
473 }
474
475 #[test]
476 fn half_closed_for_writes() {
477 let mut executor = TestExecutor::new();
478
479 let (s1, s2) = zx::Socket::create_stream();
480
481 let socket_info = s2.info().expect("failed to get socket info");
483 let bytes = vec![0u8; socket_info.tx_buf_max];
484 assert_eq!(socket_info.tx_buf_max, s2.write(&bytes).expect("failed to write to socket"));
485
486 let async_s2 = Socket::from_socket(s2);
487 let mut tx_fut = poll_fn(|cx| async_s2.poll_write_ref(cx, &bytes[..]));
488 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
489
490 s1.set_disposition(None, Some(SocketWriteDisposition::Disabled)).expect("set disposition");
491 assert_eq!(
492 executor.run_until_stalled(&mut tx_fut),
493 Poll::Ready(Err::<usize, _>(zx::Status::BAD_STATE))
494 );
495
496 let mut readbuf = vec![0u8; bytes.len()];
498 assert_eq!(s1.read(&mut readbuf[..]), Ok(readbuf.len()));
499 s1.set_disposition(None, Some(SocketWriteDisposition::Enabled)).expect("set disposition");
500
501 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(bytes.len())));
502 }
503
504 #[test]
505 fn half_closed_for_reads() {
506 let mut executor = TestExecutor::new();
507
508 let (s1, s2) = zx::Socket::create_stream();
509 let async_s2 = Socket::from_socket(s2);
510 let mut bytes = [0u8; 10];
511 let mut tx_fut = poll_fn(|cx| async_s2.poll_read_ref(cx, &mut bytes[..]));
512 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
513
514 let msg = b"hello";
516 assert_eq!(s1.write(msg), Ok(msg.len()));
517 s1.set_disposition(Some(SocketWriteDisposition::Disabled), None).expect("set disposition");
518 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(msg.len())));
519 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(0)));
520
521 s1.set_disposition(Some(SocketWriteDisposition::Enabled), None).expect("set disposition");
523 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Pending);
524
525 s1.set_disposition(Some(SocketWriteDisposition::Disabled), None).expect("set disposition");
527 assert_eq!(executor.run_until_stalled(&mut tx_fut), Poll::Ready(Ok(0)));
528 }
529}