fuchsia_async/net/fuchsia/
mod.rs1#![allow(missing_docs)]
6
7mod tcp;
8pub use self::tcp::*;
9
10mod udp;
11pub use self::udp::*;
12
13use futures::io::{self, AsyncRead, AsyncWrite};
14use futures::ready;
15use futures::task::{AtomicWaker, Context};
16use zx::{self as zx, AsHandleRef};
17
18use std::convert::{AsMut, AsRef};
19use std::io::{Read, Write};
20use std::os::unix::io::{AsRawFd, RawFd};
21use std::pin::Pin;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::task::Poll;
24use std::{fmt, mem};
25
26use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
27
28const READABLE: usize = libc::EPOLLIN as usize;
29const WRITABLE: usize = libc::EPOLLOUT as usize;
30const ERROR: usize = libc::EPOLLERR as usize;
31const HUP: usize = libc::EPOLLHUP as usize;
32
33pub(crate) struct EventedFdPacketReceiver {
36 fdio: *const syscall::fdio_t,
37 signals: AtomicUsize,
38 read_task: AtomicWaker,
39 write_task: AtomicWaker,
40}
41
42unsafe impl Send for EventedFdPacketReceiver {}
47unsafe impl Sync for EventedFdPacketReceiver {}
48
49impl PacketReceiver for EventedFdPacketReceiver {
50 fn receive_packet(&self, packet: zx::Packet) {
51 let observed_signals = if let zx::PacketContents::SignalOne(p) = packet.contents() {
52 p.observed()
53 } else {
54 return;
55 };
56
57 let mut events: u32 = 0;
58 unsafe {
59 syscall::fdio_unsafe_wait_end(self.fdio, observed_signals.bits(), &mut events);
60 }
61 let events = events as usize;
62
63 let old = self.signals.fetch_or(events, Ordering::SeqCst);
64 let became_readable = ((events & READABLE) != 0) && ((old & READABLE) == 0);
65 let became_writable = ((events & WRITABLE) != 0) && ((old & WRITABLE) == 0);
66 let err_occurred = (events & (ERROR | HUP)) != 0;
67
68 if became_readable || err_occurred {
69 self.read_task.wake();
70 }
71 if became_writable || err_occurred {
72 self.write_task.wake();
73 }
74 }
75}
76
77pub struct EventedFd<T> {
79 inner: T,
80 fdio: *const syscall::fdio_t,
82 signal_receiver: mem::ManuallyDrop<ReceiverRegistration<EventedFdPacketReceiver>>,
84}
85
86unsafe impl<T> Send for EventedFd<T> where T: Send {}
87unsafe impl<T> Sync for EventedFd<T> where T: Sync {}
88
89impl<T> Unpin for EventedFd<T> {}
90
91impl<T> Drop for EventedFd<T> {
92 fn drop(&mut self) {
93 unsafe {
94 mem::ManuallyDrop::drop(&mut self.signal_receiver);
96
97 syscall::fdio_unsafe_release(self.fdio);
99 }
100
101 }
103}
104
105impl<T: fmt::Debug> fmt::Debug for EventedFd<T> {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 f.debug_struct("EventedFd").field("inner", &self.inner).finish()
111 }
112}
113
114impl<T> AsRef<T> for EventedFd<T>
115where
116 T: AsRawFd,
117{
118 fn as_ref(&self) -> &T {
120 &self.inner
121 }
122}
123
124impl<T> AsMut<T> for EventedFd<T>
125where
126 T: AsRawFd,
127{
128 fn as_mut(&mut self) -> &mut T {
130 &mut self.inner
131 }
132}
133
134impl<T> EventedFd<T>
135where
136 T: AsRawFd,
137{
138 pub unsafe fn new(inner: T) -> io::Result<Self> {
145 let fdio = syscall::fdio_unsafe_fd_to_io(inner.as_raw_fd());
146 let signal_receiver = EHandle::local().register_receiver(EventedFdPacketReceiver {
147 fdio,
148 signals: AtomicUsize::new(READABLE | WRITABLE),
155 read_task: AtomicWaker::new(),
156 write_task: AtomicWaker::new(),
157 });
158
159 let evented_fd =
160 EventedFd { inner, fdio, signal_receiver: mem::ManuallyDrop::new(signal_receiver) };
161
162 evented_fd.schedule_packet(ERROR | HUP);
164
165 evented_fd.schedule_packet(READABLE);
168 evented_fd.schedule_packet(WRITABLE);
169
170 Ok(evented_fd)
171 }
172 pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
176 let receiver = self.signal_receiver.receiver();
177 if (receiver.signals.load(Ordering::SeqCst) & (READABLE | ERROR | HUP)) != 0 {
178 Poll::Ready(Ok(()))
179 } else {
180 self.need_read(cx);
181 Poll::Pending
182 }
183 }
184
185 pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
189 let receiver = self.signal_receiver.receiver();
190 if (receiver.signals.load(Ordering::SeqCst) & (WRITABLE | ERROR | HUP)) != 0 {
191 Poll::Ready(Ok(()))
192 } else {
193 self.need_write(cx);
194 Poll::Pending
195 }
196 }
197
198 pub fn need_read(&self, cx: &mut Context<'_>) {
201 let receiver = self.signal_receiver.receiver();
202 receiver.read_task.register(cx.waker());
203 let old = receiver.signals.fetch_and(!READABLE, Ordering::SeqCst);
204 if (old & READABLE) != 0 {
207 self.schedule_packet(READABLE);
208 }
209 }
210
211 pub fn need_write(&self, cx: &mut Context<'_>) {
214 let receiver = self.signal_receiver.receiver();
215 receiver.write_task.register(cx.waker());
216 let old = receiver.signals.fetch_and(!WRITABLE, Ordering::SeqCst);
217 if (old & WRITABLE) != 0 {
220 self.schedule_packet(WRITABLE);
221 }
222 }
223
224 fn schedule_packet(&self, signals: usize) {
225 unsafe {
226 let (mut raw_handle, mut raw_signals) = (0, 0);
227 syscall::fdio_unsafe_wait_begin(
228 self.fdio,
229 signals as u32,
230 &mut raw_handle,
231 &mut raw_signals,
232 );
233
234 let handle = zx::Handle::from_raw(raw_handle);
235 let signals = zx::Signals::from_bits_truncate(raw_signals);
236
237 let res = handle.wait_async_handle(
238 self.signal_receiver.port(),
239 self.signal_receiver.key(),
240 signals,
241 zx::WaitAsyncOpts::empty(),
242 );
243
244 mem::forget(handle);
246 res.expect("Error scheduling EventedFd notification");
247 }
248 }
249
250 pub fn clear(&self) {
252 self.signal_receiver.receiver().signals.store(0, Ordering::SeqCst);
253 }
254}
255
256impl<T: AsRawFd> AsRawFd for EventedFd<T> {
257 fn as_raw_fd(&self) -> RawFd {
258 self.as_ref().as_raw_fd()
259 }
260}
261
262impl<T: AsRawFd + Read> AsyncRead for EventedFd<T> {
263 fn poll_read(
264 mut self: Pin<&mut Self>,
265 cx: &mut Context<'_>,
266 buf: &mut [u8],
267 ) -> Poll<Result<usize, io::Error>> {
268 ready!(EventedFd::poll_readable(&*self, cx))?;
269 let res = (*self).as_mut().read(buf);
270 if let Err(e) = &res {
271 if e.kind() == io::ErrorKind::WouldBlock {
272 self.need_read(cx);
273 return Poll::Pending;
274 }
275 }
276 Poll::Ready(res)
277 }
278
279 }
281
282impl<T: AsRawFd + Write> AsyncWrite for EventedFd<T> {
283 fn poll_write(
284 mut self: Pin<&mut Self>,
285 cx: &mut Context<'_>,
286 buf: &[u8],
287 ) -> Poll<Result<usize, io::Error>> {
288 ready!(EventedFd::poll_writable(&*self, cx))?;
289 let res = (*self).as_mut().write(buf);
290 if let Err(e) = &res {
291 if e.kind() == io::ErrorKind::WouldBlock {
292 self.need_write(cx);
293 return Poll::Pending;
294 }
295 }
296 Poll::Ready(res)
297 }
298
299 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
300 Poll::Ready(Ok(()))
301 }
302
303 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
304 Poll::Ready(Ok(()))
305 }
306
307 }
309
310impl<T> AsyncRead for &EventedFd<T>
311where
312 T: AsRawFd,
313 for<'b> &'b T: Read,
314{
315 fn poll_read(
316 self: Pin<&mut Self>,
317 cx: &mut Context<'_>,
318 buf: &mut [u8],
319 ) -> Poll<Result<usize, io::Error>> {
320 ready!(EventedFd::poll_readable(*self, cx))?;
321 let res = (*self).as_ref().read(buf);
322 if let Err(e) = &res {
323 if e.kind() == io::ErrorKind::WouldBlock {
324 self.need_read(cx);
325 return Poll::Pending;
326 }
327 }
328 Poll::Ready(res)
329 }
330}
331
332impl<T> AsyncWrite for &EventedFd<T>
333where
334 T: AsRawFd,
335 for<'b> &'b T: Write,
336{
337 fn poll_write(
338 self: Pin<&mut Self>,
339 cx: &mut Context<'_>,
340 buf: &[u8],
341 ) -> Poll<Result<usize, io::Error>> {
342 ready!(EventedFd::poll_writable(*self, cx))?;
343 let res = (*self).as_ref().write(buf);
344 if let Err(e) = &res {
345 if e.kind() == io::ErrorKind::WouldBlock {
346 self.need_write(cx);
347 return Poll::Pending;
348 }
349 }
350 Poll::Ready(res)
351 }
352
353 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
354 Poll::Ready(Ok(()))
355 }
356
357 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
358 Poll::Ready(Ok(()))
359 }
360}
361
362mod syscall {
363 #![allow(non_camel_case_types, improper_ctypes)]
364 use std::os::unix::io::RawFd;
365 pub use zx::sys::{zx_handle_t, zx_signals_t};
366
367 pub type fdio_t = ();
369
370 extern "C" {
372 pub fn fdio_unsafe_fd_to_io(fd: RawFd) -> *const fdio_t;
373 pub fn fdio_unsafe_release(io: *const fdio_t);
374
375 pub fn fdio_unsafe_wait_begin(
376 io: *const fdio_t,
377 events: u32,
378 handle_out: &mut zx_handle_t,
379 signals_out: &mut zx_signals_t,
380 );
381
382 pub fn fdio_unsafe_wait_end(io: *const fdio_t, signals: zx_signals_t, events_out: &mut u32);
383 }
384}