fuchsia_async/handle/zircon/
fifo.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::rwhandle::{RWHandle, ReadableHandle as _, WritableHandle as _};
6use futures::ready;
7use std::fmt;
8use std::future::poll_fn;
9use std::mem::MaybeUninit;
10use std::task::{Context, Poll};
11use zerocopy::{FromBytes, Immutable, IntoBytes};
12use zx::{self as zx, AsHandleRef};
13
14/// Marker trait for types that can be read/written with a `Fifo`.
15///
16/// An implementation is provided for all types that implement
17/// [`IntoBytes`], [`FromBytes`], and [`Immutable`].
18pub trait FifoEntry: IntoBytes + FromBytes + Immutable {}
19
20impl<O: IntoBytes + FromBytes + Immutable> FifoEntry for O {}
21
22/// A buffer used to write `T` into [`Fifo`] objects.
23pub trait FifoWriteBuffer<T> {
24    fn as_slice(&self) -> &[T];
25}
26
27/// A buffer used to read `T` from [`Fifo`] objects.
28///
29/// # Safety
30///
31/// This trait is unsafe because the compiler cannot verify a correct
32/// implementation of `as_bytes_ptr_mut`. See
33/// [`FifoReadBuffer::as_bytes_ptr_mut`] for safety notes.
34pub unsafe trait FifoReadBuffer<T> {
35    /// Returns the number of slots available in the buffer to be rceived.
36    fn count(&self) -> usize;
37    /// Returns a mutable pointer to the buffer contents where FIFO entries must
38    /// be written into.
39    ///
40    /// # Safety
41    ///
42    /// The returned memory *must* be at least `count() * sizeof<T>()` bytes
43    /// long.
44    fn as_mut_ptr(&mut self) -> *mut T;
45}
46
47impl<T: FifoEntry, const N: usize> FifoWriteBuffer<T> for [T; N] {
48    fn as_slice(&self) -> &[T] {
49        self
50    }
51}
52
53impl<T: FifoEntry> FifoWriteBuffer<T> for [T] {
54    fn as_slice(&self) -> &[T] {
55        self
56    }
57}
58
59unsafe impl<T: FifoEntry, const N: usize> FifoReadBuffer<T> for [T; N] {
60    fn count(&self) -> usize {
61        N
62    }
63
64    fn as_mut_ptr(&mut self) -> *mut T {
65        self.as_mut_slice().as_mut_ptr()
66    }
67}
68
69unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [T] {
70    fn count(&self) -> usize {
71        self.len()
72    }
73
74    fn as_mut_ptr(&mut self) -> *mut T {
75        self.as_mut_ptr()
76    }
77}
78
79impl<T: FifoEntry> FifoWriteBuffer<T> for T {
80    fn as_slice(&self) -> &[T] {
81        std::slice::from_ref(self)
82    }
83}
84
85unsafe impl<T: FifoEntry> FifoReadBuffer<T> for T {
86    fn count(&self) -> usize {
87        1
88    }
89
90    fn as_mut_ptr(&mut self) -> *mut T {
91        self as *mut T
92    }
93}
94
95unsafe impl<T: FifoEntry> FifoReadBuffer<T> for MaybeUninit<T> {
96    fn count(&self) -> usize {
97        1
98    }
99
100    fn as_mut_ptr(&mut self) -> *mut T {
101        self.as_mut_ptr()
102    }
103}
104
105unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [MaybeUninit<T>] {
106    fn count(&self) -> usize {
107        self.len()
108    }
109
110    fn as_mut_ptr(&mut self) -> *mut T {
111        // TODO(https://github.com/rust-lang/rust/issues/63569): Use
112        // `MaybeUninit::slice_as_mut_ptr` once stable.
113        self.as_mut_ptr() as *mut T
114    }
115}
116
117/// An I/O object representing a `Fifo`.
118pub struct Fifo<R, W = R> {
119    handle: RWHandle<zx::Fifo<R, W>>,
120}
121
122impl<R, W> AsRef<zx::Fifo<R, W>> for Fifo<R, W> {
123    fn as_ref(&self) -> &zx::Fifo<R, W> {
124        self.handle.get_ref()
125    }
126}
127
128impl<R, W> AsHandleRef for Fifo<R, W> {
129    fn as_handle_ref(&self) -> zx::HandleRef<'_> {
130        self.handle.get_ref().as_handle_ref()
131    }
132}
133
134impl<R, W> From<Fifo<R, W>> for zx::Fifo<R, W> {
135    fn from(fifo: Fifo<R, W>) -> zx::Fifo<R, W> {
136        fifo.handle.into_inner()
137    }
138}
139
140impl<R: FifoEntry, W: FifoEntry> Fifo<R, W> {
141    /// Creates a new `Fifo` from a previously-created `zx::Fifo`.
142    ///
143    /// # Panics
144    ///
145    /// If called on a thread that does not have a current async executor.
146    pub fn from_fifo(fifo: impl Into<zx::Fifo<R, W>>) -> Self {
147        Fifo { handle: RWHandle::new(fifo.into()) }
148    }
149
150    /// Writes entries to the fifo and registers this `Fifo` as needing a write on receiving a
151    /// `zx::Status::SHOULD_WAIT`.
152    ///
153    /// Returns the number of elements processed.
154    ///
155    /// NOTE: Only one writer is supported; this will overwrite any waker registered with a previous
156    /// invocation to `try_write`.
157    pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
158        &self,
159        cx: &mut Context<'_>,
160        entries: &B,
161    ) -> Poll<Result<usize, zx::Status>> {
162        ready!(self.handle.poll_writable(cx)?);
163
164        let entries = entries.as_slice();
165        let fifo = self.as_ref();
166        // SAFETY: Safety relies on us keeping the slice alive over the call to `write_raw`, which
167        // we do.
168        loop {
169            let result = unsafe { fifo.write_raw(entries.as_ptr(), entries.len()) };
170            match result {
171                Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_writable(cx)?),
172                Err(e) => return Poll::Ready(Err(e)),
173                Ok(count) => return Poll::Ready(Ok(count)),
174            }
175        }
176    }
177
178    /// Reads entries from the fifo into `entries` and registers this `Fifo` as needing a read on
179    /// receiving a `zx::Status::SHOULD_WAIT`.
180    ///
181    /// NOTE: Only one reader is supported; this will overwrite any waker registered with a previous
182    /// invocation to `try_read`.
183    pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
184        &self,
185        cx: &mut Context<'_>,
186        entries: &mut B,
187    ) -> Poll<Result<usize, zx::Status>> {
188        ready!(self.handle.poll_readable(cx)?);
189
190        let buf = entries.as_mut_ptr();
191        let count = entries.count();
192        let fifo = self.as_ref();
193
194        loop {
195            // SAFETY: Safety relies on the pointer returned by `B` being valid,
196            // which itself depends on a correct implementation of `FifoEntry` for
197            // `R`.
198            let result = unsafe { fifo.read_raw(buf, count) };
199
200            match result {
201                Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_readable(cx)?),
202                Err(e) => return Poll::Ready(Err(e)),
203                Ok(count) => return Poll::Ready(Ok(count)),
204            }
205        }
206    }
207
208    /// Returns a reader and writer which have async functions that can be used to read and write
209    /// requests.
210    pub fn async_io(&mut self) -> (FifoReader<'_, R, W>, FifoWriter<'_, R, W>) {
211        (FifoReader(self), FifoWriter(self))
212    }
213}
214
215pub struct FifoWriter<'a, R, W>(&'a Fifo<R, W>);
216
217impl<R: FifoEntry, W: FifoEntry> FifoWriter<'_, R, W> {
218    /// NOTE: If this future is dropped or there is an error, there is no indication how many
219    /// entries were successfully written.
220    pub async fn write_entries(
221        &mut self,
222        entries: &(impl ?Sized + FifoWriteBuffer<W>),
223    ) -> Result<(), zx::Status> {
224        let mut entries = entries.as_slice();
225        poll_fn(|cx| {
226            while !entries.is_empty() {
227                match ready!(self.0.try_write(cx, entries)) {
228                    Ok(count) => entries = &entries[count..],
229                    Err(status) => return Poll::Ready(Err(status)),
230                }
231            }
232            Poll::Ready(Ok(()))
233        })
234        .await
235    }
236
237    /// Same as Fifo::try_write.
238    pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
239        &mut self,
240        cx: &mut Context<'_>,
241        entries: &B,
242    ) -> Poll<Result<usize, zx::Status>> {
243        self.0.try_write(cx, entries)
244    }
245}
246
247pub struct FifoReader<'a, R, W>(&'a Fifo<R, W>);
248
249impl<R: FifoEntry, W: FifoEntry> FifoReader<'_, R, W> {
250    pub async fn read_entries(
251        &mut self,
252        entries: &mut (impl ?Sized + FifoReadBuffer<R>),
253    ) -> Result<usize, zx::Status> {
254        poll_fn(|cx| self.0.try_read(cx, entries)).await
255    }
256
257    /// Same as Fifo::try_read.
258    pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
259        &mut self,
260        cx: &mut Context<'_>,
261        entries: &mut B,
262    ) -> Poll<Result<usize, zx::Status>> {
263        self.0.try_read(cx, entries)
264    }
265}
266
267impl<R, W> fmt::Debug for Fifo<R, W> {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        self.handle.get_ref().fmt(f)
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use crate::{DurationExt, TestExecutor, TimeoutExt, Timer};
277    use futures::future::try_join;
278    use futures::prelude::*;
279    use zerocopy::{Immutable, KnownLayout};
280    use zx::prelude::*;
281
282    #[derive(
283        Copy, Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
284    )]
285    #[repr(C)]
286    struct Entry {
287        a: u32,
288        b: u32,
289    }
290
291    #[derive(
292        Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
293    )]
294    #[repr(C)]
295    struct WrongEntry {
296        a: u16,
297    }
298
299    #[test]
300    fn can_read_write() {
301        let mut exec = TestExecutor::new();
302        let element = Entry { a: 10, b: 20 };
303
304        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
305        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
306        let (_, mut tx) = tx.async_io();
307        let (mut rx, _) = rx.async_io();
308
309        let mut buffer = Entry::default();
310        let receive_future = rx.read_entries(&mut buffer).map_ok(|count| {
311            assert_eq!(count, 1);
312        });
313
314        // add a timeout to receiver so if test is broken it doesn't take forever
315        let receiver = receive_future
316            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
317
318        // Sends an entry after the timeout has passed
319        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
320            .then(|()| tx.write_entries(&element));
321
322        let done = try_join(receiver, sender);
323        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
324        assert_eq!(buffer, element);
325    }
326
327    #[test]
328    fn read_wrong_size() {
329        let mut exec = TestExecutor::new();
330        let elements = &[Entry { a: 10, b: 20 }][..];
331
332        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
333        let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
334        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(wrong_rx));
335        let (_, mut tx) = tx.async_io();
336        let (mut rx, _) = rx.async_io();
337
338        let mut buffer = WrongEntry::default();
339        let receive_future = rx
340            .read_entries(&mut buffer)
341            .map_ok(|count| panic!("read should have failed, got {count}"));
342
343        // add a timeout to receiver so if test is broken it doesn't take forever
344        let receiver = receive_future
345            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
346
347        // Sends an entry after the timeout has passed
348        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
349            .then(|()| tx.write_entries(elements));
350
351        let done = try_join(receiver, sender);
352        let res = exec.run_singlethreaded(done);
353        match res {
354            Err(zx::Status::OUT_OF_RANGE) => (),
355            _ => panic!("did not get out-of-range error"),
356        }
357    }
358
359    #[test]
360    fn write_wrong_size() {
361        let mut exec = TestExecutor::new();
362        let elements = &[WrongEntry { a: 10 }][..];
363
364        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
365        let wrong_tx = zx::Fifo::<WrongEntry>::from(tx.into_handle());
366        let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
367        let (mut tx, _rx) = (Fifo::from_fifo(wrong_tx), Fifo::from_fifo(wrong_rx));
368        let (_, mut tx) = tx.async_io();
369
370        let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
371            .then(|()| tx.write_entries(elements));
372
373        let res = exec.run_singlethreaded(sender);
374        match res {
375            Err(zx::Status::OUT_OF_RANGE) => (),
376            _ => panic!("did not get out-of-range error"),
377        }
378    }
379
380    #[test]
381    fn write_into_full() {
382        use std::sync::atomic::{AtomicUsize, Ordering};
383
384        let mut exec = TestExecutor::new();
385        let elements =
386            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
387
388        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
389        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
390
391        // Use `writes_completed` to verify that not all writes
392        // are transmitted at once, and the last write is actually blocked.
393        let writes_completed = AtomicUsize::new(0);
394        let sender = async {
395            let (_, mut writer) = tx.async_io();
396            writer.write_entries(&elements[..2]).await?;
397            writes_completed.fetch_add(1, Ordering::SeqCst);
398            writer.write_entries(&elements[2..]).await?;
399            writes_completed.fetch_add(1, Ordering::SeqCst);
400            Ok::<(), zx::Status>(())
401        };
402
403        // Wait 10 ms, then read the messages from the fifo.
404        let receive_future = async {
405            Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
406            let mut buffer = Entry::default();
407            let (mut reader, _) = rx.async_io();
408            let count = reader.read_entries(&mut buffer).await?;
409            assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
410            assert_eq!(count, 1);
411            assert_eq!(buffer, elements[0]);
412            let count = reader.read_entries(&mut buffer).await?;
413            // At this point, the last write may or may not have
414            // been written.
415            assert_eq!(count, 1);
416            assert_eq!(buffer, elements[1]);
417            let count = reader.read_entries(&mut buffer).await?;
418            assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
419            assert_eq!(count, 1);
420            assert_eq!(buffer, elements[2]);
421            Ok::<(), zx::Status>(())
422        };
423
424        // add a timeout to receiver so if test is broken it doesn't take forever
425        let receiver = receive_future
426            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
427
428        let done = try_join(receiver, sender);
429
430        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
431    }
432
433    #[test]
434    fn write_more_than_full() {
435        let mut exec = TestExecutor::new();
436        let elements =
437            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
438
439        let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
440        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
441        let (_, mut tx) = tx.async_io();
442        let (mut rx, _) = rx.async_io();
443
444        let sender = tx.write_entries(elements);
445
446        // Wait 10 ms, then read the messages from the fifo.
447        let receive_future = async {
448            Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
449            for e in elements {
450                let mut buffer = [Entry::default(); 1];
451                let count = rx.read_entries(&mut buffer[..]).await?;
452                assert_eq!(count, 1);
453                assert_eq!(&buffer[0], e);
454            }
455            Ok::<(), zx::Status>(())
456        };
457
458        // add a timeout to receiver so if test is broken it doesn't take forever
459        let receiver = receive_future
460            .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
461
462        let done = try_join(receiver, sender);
463
464        exec.run_singlethreaded(done).expect("failed to run receive future on executor");
465    }
466
467    #[test]
468    fn read_multiple() {
469        let mut exec = TestExecutor::new();
470        let elements =
471            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
472        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
473        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
474
475        let write_fut = async {
476            tx.async_io().1.write_entries(elements).await.expect("failed write entries");
477        };
478        let read_fut = async {
479            // Use a larger buffer to show partial reads.
480            let mut buffer = [Entry::default(); 5];
481            let count = rx
482                .async_io()
483                .0
484                .read_entries(&mut buffer[..])
485                .await
486                .expect("failed to read entries");
487            assert_eq!(count, elements.len());
488            assert_eq!(&buffer[..count], elements);
489        };
490        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
491    }
492
493    #[test]
494    fn read_one() {
495        let mut exec = TestExecutor::new();
496        let elements =
497            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
498        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
499        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
500
501        let write_fut = async {
502            tx.async_io().1.write_entries(elements).await.expect("failed write entries");
503        };
504        let read_fut = async {
505            let (mut reader, _) = rx.async_io();
506            for e in elements {
507                let mut entry = Entry::default();
508                assert_eq!(reader.read_entries(&mut entry).await.expect("failed to read entry"), 1);
509                assert_eq!(&entry, e);
510            }
511        };
512        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
513    }
514
515    #[test]
516    fn maybe_uninit_single() {
517        let mut exec = TestExecutor::new();
518        let element = Entry { a: 10, b: 20 };
519        let (tx, rx) = zx::Fifo::<Entry>::create(1).expect("failed to create zx fifo");
520        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
521
522        let write_fut = async {
523            tx.async_io().1.write_entries(&element).await.expect("failed write entries");
524        };
525        let read_fut = async {
526            let mut buffer = MaybeUninit::<Entry>::uninit();
527            let count =
528                rx.async_io().0.read_entries(&mut buffer).await.expect("failed to read entries");
529            assert_eq!(count, 1);
530            // SAFETY: We just read a new entry into the buffer.
531            let read = unsafe { buffer.assume_init() };
532            assert_eq!(read, element);
533        };
534        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
535    }
536
537    #[test]
538    fn maybe_uninit_slice() {
539        let mut exec = TestExecutor::new();
540        let elements =
541            &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
542        let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
543        let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
544
545        let write_fut = async {
546            tx.async_io().1.write_entries(elements).await.expect("failed write entries");
547        };
548        let read_fut = async {
549            // Use a larger buffer to show partial reads.
550            let mut buffer = [MaybeUninit::<Entry>::uninit(); 15];
551            let count = rx
552                .async_io()
553                .0
554                .read_entries(&mut buffer[..])
555                .await
556                .expect("failed to read entries");
557            assert_eq!(count, elements.len());
558            let read = &mut buffer[..count];
559            for (i, v) in read.iter_mut().enumerate() {
560                // SAFETY: This is the read region of the buffer, initialized by
561                // reading from the FIFO.
562                let read = unsafe { v.assume_init_ref() };
563                assert_eq!(read, &elements[i]);
564                // SAFETY: The buffer was partially initialized by reading from
565                // the FIFO, the correct thing to do here is to manually drop
566                // the elements that were initialized.
567                unsafe {
568                    v.assume_init_drop();
569                }
570            }
571        };
572        let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
573    }
574}