fuchsia_async/handle/zircon/
fifo.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::rwhandle::{RWHandle, ReadableHandle as _, WritableHandle as _};
6use futures::ready;
7use std::fmt;
8use std::future::poll_fn;
9use std::mem::MaybeUninit;
10use std::task::{Context, Poll};
11use zerocopy::{FromBytes, Immutable, IntoBytes};
12use zx::{self as zx, AsHandleRef};
13
14/// Marker trait for types that can be read/written with a `Fifo`.
15///
16/// An implementation is provided for all types that implement
17/// [`IntoBytes`], [`FromBytes`], and [`Immutable`].
18pub trait FifoEntry: IntoBytes + FromBytes + Immutable {}
19
20impl<O: IntoBytes + FromBytes + Immutable> FifoEntry for O {}
21
22/// A buffer used to write `T` into [`Fifo`] objects.
23pub trait FifoWriteBuffer<T> {
24    fn as_slice(&self) -> &[T];
25}
26
27/// A buffer used to read `T` from [`Fifo`] objects.
28///
29/// # Safety
30///
31/// This trait is unsafe because the compiler cannot verify a correct
32/// implementation of `as_bytes_ptr_mut`. See
33/// [`FifoReadBuffer::as_bytes_ptr_mut`] for safety notes.
34pub unsafe trait FifoReadBuffer<T> {
35    /// Returns the number of slots available in the buffer to be rceived.
36    fn count(&self) -> usize;
37    /// Returns a mutable pointer to the buffer contents where FIFO entries must
38    /// be written into.
39    ///
40    /// # Safety
41    ///
42    /// The returned memory *must* be at least `count() * sizeof<T>()` bytes
43    /// long.
44    fn as_mut_ptr(&mut self) -> *mut T;
45}
46
47impl<T: FifoEntry> FifoWriteBuffer<T> for [T] {
48    fn as_slice(&self) -> &[T] {
49        self
50    }
51}
52
53unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [T] {
54    fn count(&self) -> usize {
55        self.len()
56    }
57
58    fn as_mut_ptr(&mut self) -> *mut T {
59        self.as_mut_ptr()
60    }
61}
62
63impl<T: FifoEntry> FifoWriteBuffer<T> for T {
64    fn as_slice(&self) -> &[T] {
65        std::slice::from_ref(self)
66    }
67}
68
69unsafe impl<T: FifoEntry> FifoReadBuffer<T> for T {
70    fn count(&self) -> usize {
71        1
72    }
73
74    fn as_mut_ptr(&mut self) -> *mut T {
75        self as *mut T
76    }
77}
78
79unsafe impl<T: FifoEntry> FifoReadBuffer<T> for MaybeUninit<T> {
80    fn count(&self) -> usize {
81        1
82    }
83
84    fn as_mut_ptr(&mut self) -> *mut T {
85        self.as_mut_ptr()
86    }
87}
88
89unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [MaybeUninit<T>] {
90    fn count(&self) -> usize {
91        self.len()
92    }
93
94    fn as_mut_ptr(&mut self) -> *mut T {
95        // TODO(https://github.com/rust-lang/rust/issues/63569): Use
96        // `MaybeUninit::slice_as_mut_ptr` once stable.
97        self.as_mut_ptr() as *mut T
98    }
99}
100
101/// An I/O object representing a `Fifo`.
102pub struct Fifo<R, W = R> {
103    handle: RWHandle<zx::Fifo<R, W>>,
104}
105
106impl<R, W> AsRef<zx::Fifo<R, W>> for Fifo<R, W> {
107    fn as_ref(&self) -> &zx::Fifo<R, W> {
108        self.handle.get_ref()
109    }
110}
111
112impl<R, W> AsHandleRef for Fifo<R, W> {
113    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
114        self.handle.get_ref().as_handle_ref()
115    }
116}
117
118impl<R, W> From<Fifo<R, W>> for zx::Fifo<R, W> {
119    fn from(fifo: Fifo<R, W>) -> zx::Fifo<R, W> {
120        fifo.handle.into_inner()
121    }
122}
123
124impl<R: FifoEntry, W: FifoEntry> Fifo<R, W> {
125    /// Creates a new `Fifo` from a previously-created `zx::Fifo`.
126    ///
127    /// # Panics
128    ///
129    /// If called on a thread that does not have a current async executor.
130    pub fn from_fifo(fifo: impl Into<zx::Fifo<R, W>>) -> Self {
131        Fifo { handle: RWHandle::new(fifo.into()) }
132    }
133
134    /// Writes entries to the fifo and registers this `Fifo` as needing a write on receiving a
135    /// `zx::Status::SHOULD_WAIT`.
136    ///
137    /// Returns the number of elements processed.
138    ///
139    /// NOTE: Only one writer is supported; this will overwrite any waker registered with a previous
140    /// invocation to `try_write`.
141    pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
142        &self,
143        cx: &mut Context<'_>,
144        entries: &B,
145    ) -> Poll<Result<usize, zx::Status>> {
146        ready!(self.handle.poll_writable(cx)?);
147
148        let entries = entries.as_slice();
149        let fifo = self.as_ref();
150        // SAFETY: Safety relies on us keeping the slice alive over the call to `write_raw`, which
151        // we do.
152        loop {
153            let result = unsafe { fifo.write_raw(entries.as_ptr(), entries.len()) };
154            match result {
155                Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_writable(cx)?),
156                Err(e) => return Poll::Ready(Err(e)),
157                Ok(count) => return Poll::Ready(Ok(count)),
158            }
159        }
160    }
161
162    /// Reads entries from the fifo into `entries` and registers this `Fifo` as needing a read on
163    /// receiving a `zx::Status::SHOULD_WAIT`.
164    ///
165    /// NOTE: Only one reader is supported; this will overwrite any waker registered with a previous
166    /// invocation to `try_read`.
167    pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
168        &self,
169        cx: &mut Context<'_>,
170        entries: &mut B,
171    ) -> Poll<Result<usize, zx::Status>> {
172        ready!(self.handle.poll_readable(cx)?);
173
174        let buf = entries.as_mut_ptr();
175        let count = entries.count();
176        let fifo = self.as_ref();
177
178        loop {
179            // SAFETY: Safety relies on the pointer returned by `B` being valid,
180            // which itself depends on a correct implementation of `FifoEntry` for
181            // `R`.
182            let result = unsafe { fifo.read_raw(buf, count) };
183
184            match result {
185                Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_readable(cx)?),
186                Err(e) => return Poll::Ready(Err(e)),
187                Ok(count) => return Poll::Ready(Ok(count)),
188            }
189        }
190    }
191
192    /// Returns a reader and writer which have async functions that can be used to read and write
193    /// requests.
194    pub fn async_io(&mut self) -> (FifoReader<'_, R, W>, FifoWriter<'_, R, W>) {
195        (FifoReader(self), FifoWriter(self))
196    }
197}
198
199pub struct FifoWriter<'a, R, W>(&'a Fifo<R, W>);
200
201impl<R: FifoEntry, W: FifoEntry> FifoWriter<'_, R, W> {
202    /// NOTE: If this future is dropped or there is an error, there is no indication how many
203    /// entries were successfully written.
204    pub async fn write_entries(
205        &mut self,
206        entries: &(impl ?Sized + FifoWriteBuffer<W>),
207    ) -> Result<(), zx::Status> {
208        let mut entries = entries.as_slice();
209        poll_fn(|cx| {
210            while !entries.is_empty() {
211                match ready!(self.0.try_write(cx, entries)) {
212                    Ok(count) => entries = &entries[count..],
213                    Err(status) => return Poll::Ready(Err(status)),
214                }
215            }
216            Poll::Ready(Ok(()))
217        })
218        .await
219    }
220
221    /// Same as Fifo::try_write.
222    pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
223        &mut self,
224        cx: &mut Context<'_>,
225        entries: &B,
226    ) -> Poll<Result<usize, zx::Status>> {
227        self.0.try_write(cx, entries)
228    }
229}
230
231pub struct FifoReader<'a, R, W>(&'a Fifo<R, W>);
232
233impl<R: FifoEntry, W: FifoEntry> FifoReader<'_, R, W> {
234    pub async fn read_entries(
235        &mut self,
236        entries: &mut (impl ?Sized + FifoReadBuffer<R>),
237    ) -> Result<usize, zx::Status> {
238        poll_fn(|cx| self.0.try_read(cx, entries)).await
239    }
240
241    /// Same as Fifo::try_read.
242    pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
243        &mut self,
244        cx: &mut Context<'_>,
245        entries: &mut B,
246    ) -> Poll<Result<usize, zx::Status>> {
247        self.0.try_read(cx, entries)
248    }
249}
250
251impl<R, W> fmt::Debug for Fifo<R, W> {
252    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253        self.handle.get_ref().fmt(f)
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use crate::{DurationExt, TestExecutor, TimeoutExt, Timer};
261    use futures::future::try_join;
262    use futures::prelude::*;
263    use zerocopy::{Immutable, KnownLayout};
264    use zx::prelude::*;
265
266    #[derive(
267        Copy, Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
268    )]
269    #[repr(C)]
270    struct Entry {
271        a: u32,
272        b: u32,
273    }
274
275    #[derive(
276        Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
277    )]
278    #[repr(C)]
279    struct WrongEntry {
280        a: u16,
281    }
282
283    #[test]
284    fn can_read_write() {
285        let mut exec = TestExecutor::new();
286        let element = Entry { a: 10, b: 20 };
287
288        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
289        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
290        let (_, mut tx) = tx.async_io();
291        let (mut rx, _) = rx.async_io();
292
293        let mut buffer = Entry::default();
294        let receive_future = rx.read_entries(&mut buffer).map_ok(|count| {
295            assert_eq!(count, 1);
296        });
297
298        // add a timeout to receiver so if test is broken it doesn't take forever
299        let receiver = receive_future
300            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
301
302        // Sends an entry after the timeout has passed
303        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
304            .then(|()| tx.write_entries(&element));
305
306        let done = try_join(receiver, sender);
307        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
308        assert_eq!(buffer, element);
309    }
310
311    #[test]
312    fn read_wrong_size() {
313        let mut exec = TestExecutor::new();
314        let elements = &[Entry { a: 10, b: 20 }][..];
315
316        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
317        let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
318        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(wrong_rx));
319        let (_, mut tx) = tx.async_io();
320        let (mut rx, _) = rx.async_io();
321
322        let mut buffer = WrongEntry::default();
323        let receive_future = rx
324            .read_entries(&mut buffer)
325            .map_ok(|count| panic!("read should have failed, got {}", count));
326
327        // add a timeout to receiver so if test is broken it doesn't take forever
328        let receiver = receive_future
329            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
330
331        // Sends an entry after the timeout has passed
332        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
333            .then(|()| tx.write_entries(elements));
334
335        let done = try_join(receiver, sender);
336        let res = exec.run_singlethreaded(done);
337        match res {
338            Err(zx::Status::OUT_OF_RANGE) => (),
339            _ => panic!("did not get out-of-range error"),
340        }
341    }
342
343    #[test]
344    fn write_wrong_size() {
345        let mut exec = TestExecutor::new();
346        let elements = &[WrongEntry { a: 10 }][..];
347
348        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
349        let wrong_tx = zx::Fifo::<WrongEntry>::from(tx.into_handle());
350        let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
351        let (mut tx, _rx) = (Fifo::from_fifo(wrong_tx), Fifo::from_fifo(wrong_rx));
352        let (_, mut tx) = tx.async_io();
353
354        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
355            .then(|()| tx.write_entries(elements));
356
357        let res = exec.run_singlethreaded(sender);
358        match res {
359            Err(zx::Status::OUT_OF_RANGE) => (),
360            _ => panic!("did not get out-of-range error"),
361        }
362    }
363
364    #[test]
365    fn write_into_full() {
366        use std::sync::atomic::{AtomicUsize, Ordering};
367
368        let mut exec = TestExecutor::new();
369        let elements =
370            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
371
372        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
373        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
374
375        // Use `writes_completed` to verify that not all writes
376        // are transmitted at once, and the last write is actually blocked.
377        let writes_completed = AtomicUsize::new(0);
378        let sender = async {
379            let (_, mut writer) = tx.async_io();
380            writer.write_entries(&elements[..2]).await?;
381            writes_completed.fetch_add(1, Ordering::SeqCst);
382            writer.write_entries(&elements[2..]).await?;
383            writes_completed.fetch_add(1, Ordering::SeqCst);
384            Ok::<(), zx::Status>(())
385        };
386
387        // Wait 10 ms, then read the messages from the fifo.
388        let receive_future = async {
389            Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
390            let mut buffer = Entry::default();
391            let (mut reader, _) = rx.async_io();
392            let count = reader.read_entries(&mut buffer).await?;
393            assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
394            assert_eq!(count, 1);
395            assert_eq!(buffer, elements[0]);
396            let count = reader.read_entries(&mut buffer).await?;
397            // At this point, the last write may or may not have
398            // been written.
399            assert_eq!(count, 1);
400            assert_eq!(buffer, elements[1]);
401            let count = reader.read_entries(&mut buffer).await?;
402            assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
403            assert_eq!(count, 1);
404            assert_eq!(buffer, elements[2]);
405            Ok::<(), zx::Status>(())
406        };
407
408        // add a timeout to receiver so if test is broken it doesn't take forever
409        let receiver = receive_future
410            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
411
412        let done = try_join(receiver, sender);
413
414        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
415    }
416
417    #[test]
418    fn write_more_than_full() {
419        let mut exec = TestExecutor::new();
420        let elements =
421            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
422
423        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
424        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
425        let (_, mut tx) = tx.async_io();
426        let (mut rx, _) = rx.async_io();
427
428        let sender = tx.write_entries(elements);
429
430        // Wait 10 ms, then read the messages from the fifo.
431        let receive_future = async {
432            Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
433            for e in elements {
434                let mut buffer = [Entry::default(); 1];
435                let count = rx.read_entries(&mut buffer[..]).await?;
436                assert_eq!(count, 1);
437                assert_eq!(&buffer[0], e);
438            }
439            Ok::<(), zx::Status>(())
440        };
441
442        // add a timeout to receiver so if test is broken it doesn't take forever
443        let receiver = receive_future
444            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
445
446        let done = try_join(receiver, sender);
447
448        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
449    }
450
451    #[test]
452    fn read_multiple() {
453        let mut exec = TestExecutor::new();
454        let elements =
455            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
456        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
457        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
458
459        let write_fut = async {
460            tx.async_io().1.write_entries(&elements[..]).await.expect("failed write entries");
461        };
462        let read_fut = async {
463            // Use a larger buffer to show partial reads.
464            let mut buffer = [Entry::default(); 5];
465            let count = rx
466                .async_io()
467                .0
468                .read_entries(&mut buffer[..])
469                .await
470                .expect("failed to read entries");
471            assert_eq!(count, elements.len());
472            assert_eq!(&buffer[..count], &elements[..]);
473        };
474        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
475    }
476
477    #[test]
478    fn read_one() {
479        let mut exec = TestExecutor::new();
480        let elements =
481            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
482        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
483        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
484
485        let write_fut = async {
486            tx.async_io().1.write_entries(&elements[..]).await.expect("failed write entries");
487        };
488        let read_fut = async {
489            let (mut reader, _) = rx.async_io();
490            for e in elements {
491                let mut entry = Entry::default();
492                assert_eq!(reader.read_entries(&mut entry).await.expect("failed to read entry"), 1);
493                assert_eq!(&entry, e);
494            }
495        };
496        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
497    }
498
499    #[test]
500    fn maybe_uninit_single() {
501        let mut exec = TestExecutor::new();
502        let element = Entry { a: 10, b: 20 };
503        let (tx, rx) = zx::Fifo::<Entry>::create(1).expect("failed to create zx fifo");
504        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
505
506        let write_fut = async {
507            tx.async_io().1.write_entries(&element).await.expect("failed write entries");
508        };
509        let read_fut = async {
510            let mut buffer = MaybeUninit::<Entry>::uninit();
511            let count =
512                rx.async_io().0.read_entries(&mut buffer).await.expect("failed to read entries");
513            assert_eq!(count, 1);
514            // SAFETY: We just read a new entry into the buffer.
515            let read = unsafe { buffer.assume_init() };
516            assert_eq!(read, element);
517        };
518        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
519    }
520
521    #[test]
522    fn maybe_uninit_slice() {
523        let mut exec = TestExecutor::new();
524        let elements =
525            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
526        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
527        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
528
529        let write_fut = async {
530            tx.async_io().1.write_entries(&elements[..]).await.expect("failed write entries");
531        };
532        let read_fut = async {
533            // Use a larger buffer to show partial reads.
534            let mut buffer = [MaybeUninit::<Entry>::uninit(); 15];
535            let count = rx
536                .async_io()
537                .0
538                .read_entries(&mut buffer[..])
539                .await
540                .expect("failed to read entries");
541            assert_eq!(count, elements.len());
542            let read = &mut buffer[..count];
543            for (i, v) in read.iter_mut().enumerate() {
544                // SAFETY: This is the read region of the buffer, initialized by
545                // reading from the FIFO.
546                let read = unsafe { v.assume_init_ref() };
547                assert_eq!(read, &elements[i]);
548                // SAFETY: The buffer was partially initialized by reading from
549                // the FIFO, the correct thing to do here is to manually drop
550                // the elements that were initialized.
551                unsafe {
552                    v.assume_init_drop();
553                }
554            }
555        };
556        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
557    }
558}