fuchsia_async/net/fuchsia/
mod.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![allow(missing_docs)]
6
7mod tcp;
8pub use self::tcp::*;
9
10mod udp;
11pub use self::udp::*;
12
13use futures::io::{self, AsyncRead, AsyncWrite};
14use futures::ready;
15use futures::task::{AtomicWaker, Context};
16use zx::{self as zx, AsHandleRef};
17
18use std::convert::{AsMut, AsRef};
19use std::io::{Read, Write};
20use std::os::unix::io::{AsRawFd, RawFd};
21use std::pin::Pin;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::task::Poll;
24use std::{fmt, mem};
25
26use crate::runtime::{EHandle, PacketReceiver, ReceiverRegistration};
27
28const READABLE: usize = libc::EPOLLIN as usize;
29const WRITABLE: usize = libc::EPOLLOUT as usize;
30const ERROR: usize = libc::EPOLLERR as usize;
31const HUP: usize = libc::EPOLLHUP as usize;
32
33// Unsafe to use. `receive_packet` must not be called after
34// `fdio` is invalidated.
35pub(crate) struct EventedFdPacketReceiver {
36    fdio: *const syscall::fdio_t,
37    signals: AtomicUsize,
38    read_task: AtomicWaker,
39    write_task: AtomicWaker,
40}
41
42// Needed because of the fdio pointer.
43// It is safe to send because the `EventedFdPacketReceiver` must be
44// deregistered (and therefore `receive_packet` never called again)
45// before `fdio_unsafe_release` is called.
46unsafe impl Send for EventedFdPacketReceiver {}
47unsafe impl Sync for EventedFdPacketReceiver {}
48
49impl PacketReceiver for EventedFdPacketReceiver {
50    fn receive_packet(&self, packet: zx::Packet) {
51        let observed_signals = if let zx::PacketContents::SignalOne(p) = packet.contents() {
52            p.observed()
53        } else {
54            return;
55        };
56
57        let mut events: u32 = 0;
58        unsafe {
59            syscall::fdio_unsafe_wait_end(self.fdio, observed_signals.bits(), &mut events);
60        }
61        let events = events as usize;
62
63        let old = self.signals.fetch_or(events, Ordering::SeqCst);
64        let became_readable = ((events & READABLE) != 0) && ((old & READABLE) == 0);
65        let became_writable = ((events & WRITABLE) != 0) && ((old & WRITABLE) == 0);
66        let err_occurred = (events & (ERROR | HUP)) != 0;
67
68        if became_readable || err_occurred {
69            self.read_task.wake();
70        }
71        if became_writable || err_occurred {
72            self.write_task.wake();
73        }
74    }
75}
76
77/// A type which can be used for receiving IO events for a file descriptor.
78pub struct EventedFd<T> {
79    inner: T,
80    // Must be valid, acquired from `fdio_unsafe_fd_to_io`
81    fdio: *const syscall::fdio_t,
82    // Must be dropped before `fdio_unsafe_release` is called
83    signal_receiver: mem::ManuallyDrop<ReceiverRegistration<EventedFdPacketReceiver>>,
84}
85
86unsafe impl<T> Send for EventedFd<T> where T: Send {}
87unsafe impl<T> Sync for EventedFd<T> where T: Sync {}
88
89impl<T> Unpin for EventedFd<T> {}
90
91impl<T> Drop for EventedFd<T> {
92    fn drop(&mut self) {
93        unsafe {
94            // Drop the receiver so `packet_receive` may not be called again.
95            mem::ManuallyDrop::drop(&mut self.signal_receiver);
96
97            // Release the fdio
98            syscall::fdio_unsafe_release(self.fdio);
99        }
100
101        // Then `inner` gets dropped
102    }
103}
104
105impl<T: fmt::Debug> fmt::Debug for EventedFd<T> {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        // FIXME(https://github.com/rust-lang/rust/issues/67364): This could be
108        // better written with `.finish_non_exhaustive()` once that feature is
109        // stablized.
110        f.debug_struct("EventedFd").field("inner", &self.inner).finish()
111    }
112}
113
114impl<T> AsRef<T> for EventedFd<T>
115where
116    T: AsRawFd,
117{
118    // Returns a reference to the underlying IO object.
119    fn as_ref(&self) -> &T {
120        &self.inner
121    }
122}
123
124impl<T> AsMut<T> for EventedFd<T>
125where
126    T: AsRawFd,
127{
128    // Returns a mutable reference to the underlying IO object.
129    fn as_mut(&mut self) -> &mut T {
130        &mut self.inner
131    }
132}
133
134impl<T> EventedFd<T>
135where
136    T: AsRawFd,
137{
138    /// Creates a new EventedFd.
139    ///
140    /// # Safety
141    ///
142    /// The raw file descriptor returned from `inner.as_raw_fd()` must not be
143    /// closed until the returned `EventedFd` is dropped.
144    pub unsafe fn new(inner: T) -> io::Result<Self> {
145        let fdio = syscall::fdio_unsafe_fd_to_io(inner.as_raw_fd());
146        let signal_receiver = EHandle::local().register_receiver(EventedFdPacketReceiver {
147            fdio,
148            // Optimistically assume that the fd is readable and writable.
149            // Reads and writes will be attempted before queueing a packet.
150            // This makes fds slightly faster to read/write the first time
151            // they're accessed after being created, provided they start off as
152            // readable or writable. In return, there will be an extra wasted
153            // syscall per read/write if the fd is not readable or writable.
154            signals: AtomicUsize::new(READABLE | WRITABLE),
155            read_task: AtomicWaker::new(),
156            write_task: AtomicWaker::new(),
157        });
158
159        let evented_fd =
160            EventedFd { inner, fdio, signal_receiver: mem::ManuallyDrop::new(signal_receiver) };
161
162        // Make sure a packet is delivered if an error or closure occurs.
163        evented_fd.schedule_packet(ERROR | HUP);
164
165        // Need to schedule packets to maintain the invariant that
166        // if !READABLE or !WRITABLE a packet has been scheduled.
167        evented_fd.schedule_packet(READABLE);
168        evented_fd.schedule_packet(WRITABLE);
169
170        Ok(evented_fd)
171    }
172    /// Tests to see if this resource is ready to be read from.
173    /// If it is not, it arranges for the current task to receive a notification
174    /// when a "readable" signal arrives.
175    pub fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
176        let receiver = self.signal_receiver.receiver();
177        if (receiver.signals.load(Ordering::SeqCst) & (READABLE | ERROR | HUP)) != 0 {
178            Poll::Ready(Ok(()))
179        } else {
180            self.need_read(cx);
181            Poll::Pending
182        }
183    }
184
185    /// Tests to see if this resource is ready to be written to.
186    /// If it is not, it arranges for the current task to receive a notification
187    /// when a "writable" signal arrives.
188    pub fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<Result<(), zx::Status>> {
189        let receiver = self.signal_receiver.receiver();
190        if (receiver.signals.load(Ordering::SeqCst) & (WRITABLE | ERROR | HUP)) != 0 {
191            Poll::Ready(Ok(()))
192        } else {
193            self.need_write(cx);
194            Poll::Pending
195        }
196    }
197
198    /// Arranges for the current task to receive a notification when a "readable"
199    /// signal arrives.
200    pub fn need_read(&self, cx: &mut Context<'_>) {
201        let receiver = self.signal_receiver.receiver();
202        receiver.read_task.register(cx.waker());
203        let old = receiver.signals.fetch_and(!READABLE, Ordering::SeqCst);
204        // We only need to schedule a new packet if one isn't already scheduled.
205        // If READABLE was already false, a packet was already scheduled.
206        if (old & READABLE) != 0 {
207            self.schedule_packet(READABLE);
208        }
209    }
210
211    /// Arranges for the current task to receive a notification when a "writable"
212    /// signal arrives.
213    pub fn need_write(&self, cx: &mut Context<'_>) {
214        let receiver = self.signal_receiver.receiver();
215        receiver.write_task.register(cx.waker());
216        let old = receiver.signals.fetch_and(!WRITABLE, Ordering::SeqCst);
217        // We only need to schedule a new packet if one isn't already scheduled.
218        // If WRITABLE was already false, a packet was already scheduled.
219        if (old & WRITABLE) != 0 {
220            self.schedule_packet(WRITABLE);
221        }
222    }
223
224    fn schedule_packet(&self, signals: usize) {
225        unsafe {
226            let (mut raw_handle, mut raw_signals) = (0, 0);
227            syscall::fdio_unsafe_wait_begin(
228                self.fdio,
229                signals as u32,
230                &mut raw_handle,
231                &mut raw_signals,
232            );
233
234            let handle = zx::Handle::from_raw(raw_handle);
235            let signals = zx::Signals::from_bits_truncate(raw_signals);
236
237            let res = handle.wait_async_handle(
238                self.signal_receiver.port(),
239                self.signal_receiver.key(),
240                signals,
241                zx::WaitAsyncOpts::empty(),
242            );
243
244            // The handle is borrowed, so we cannot drop it.
245            mem::forget(handle);
246            res.expect("Error scheduling EventedFd notification");
247        }
248    }
249
250    /// Clears all incoming signals.
251    pub fn clear(&self) {
252        self.signal_receiver.receiver().signals.store(0, Ordering::SeqCst);
253    }
254}
255
256impl<T: AsRawFd> AsRawFd for EventedFd<T> {
257    fn as_raw_fd(&self) -> RawFd {
258        self.as_ref().as_raw_fd()
259    }
260}
261
262impl<T: AsRawFd + Read> AsyncRead for EventedFd<T> {
263    fn poll_read(
264        mut self: Pin<&mut Self>,
265        cx: &mut Context<'_>,
266        buf: &mut [u8],
267    ) -> Poll<Result<usize, io::Error>> {
268        ready!(EventedFd::poll_readable(&*self, cx))?;
269        let res = (*self).as_mut().read(buf);
270        if let Err(e) = &res {
271            if e.kind() == io::ErrorKind::WouldBlock {
272                self.need_read(cx);
273                return Poll::Pending;
274            }
275        }
276        Poll::Ready(res)
277    }
278
279    // TODO: override poll_vectored_read and call readv on the underlying handle
280}
281
282impl<T: AsRawFd + Write> AsyncWrite for EventedFd<T> {
283    fn poll_write(
284        mut self: Pin<&mut Self>,
285        cx: &mut Context<'_>,
286        buf: &[u8],
287    ) -> Poll<Result<usize, io::Error>> {
288        ready!(EventedFd::poll_writable(&*self, cx))?;
289        let res = (*self).as_mut().write(buf);
290        if let Err(e) = &res {
291            if e.kind() == io::ErrorKind::WouldBlock {
292                self.need_write(cx);
293                return Poll::Pending;
294            }
295        }
296        Poll::Ready(res)
297    }
298
299    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
300        Poll::Ready(Ok(()))
301    }
302
303    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
304        Poll::Ready(Ok(()))
305    }
306
307    // TODO: override poll_vectored_write and call writev on the underlying handle
308}
309
310impl<T> AsyncRead for &EventedFd<T>
311where
312    T: AsRawFd,
313    for<'b> &'b T: Read,
314{
315    fn poll_read(
316        self: Pin<&mut Self>,
317        cx: &mut Context<'_>,
318        buf: &mut [u8],
319    ) -> Poll<Result<usize, io::Error>> {
320        ready!(EventedFd::poll_readable(*self, cx))?;
321        let res = (*self).as_ref().read(buf);
322        if let Err(e) = &res {
323            if e.kind() == io::ErrorKind::WouldBlock {
324                self.need_read(cx);
325                return Poll::Pending;
326            }
327        }
328        Poll::Ready(res)
329    }
330}
331
332impl<T> AsyncWrite for &EventedFd<T>
333where
334    T: AsRawFd,
335    for<'b> &'b T: Write,
336{
337    fn poll_write(
338        self: Pin<&mut Self>,
339        cx: &mut Context<'_>,
340        buf: &[u8],
341    ) -> Poll<Result<usize, io::Error>> {
342        ready!(EventedFd::poll_writable(*self, cx))?;
343        let res = (*self).as_ref().write(buf);
344        if let Err(e) = &res {
345            if e.kind() == io::ErrorKind::WouldBlock {
346                self.need_write(cx);
347                return Poll::Pending;
348            }
349        }
350        Poll::Ready(res)
351    }
352
353    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
354        Poll::Ready(Ok(()))
355    }
356
357    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
358        Poll::Ready(Ok(()))
359    }
360}
361
362mod syscall {
363    #![allow(non_camel_case_types, improper_ctypes)]
364    use std::os::unix::io::RawFd;
365    pub use zx::sys::{zx_handle_t, zx_signals_t};
366
367    // This is the "improper" c type
368    pub type fdio_t = ();
369
370    // From libfdio.so
371    extern "C" {
372        pub fn fdio_unsafe_fd_to_io(fd: RawFd) -> *const fdio_t;
373        pub fn fdio_unsafe_release(io: *const fdio_t);
374
375        pub fn fdio_unsafe_wait_begin(
376            io: *const fdio_t,
377            events: u32,
378            handle_out: &mut zx_handle_t,
379            signals_out: &mut zx_signals_t,
380        );
381
382        pub fn fdio_unsafe_wait_end(io: *const fdio_t, signals: zx_signals_t, events_out: &mut u32);
383    }
384}