1use super::rwhandle::{RWHandle, ReadableHandle as _, WritableHandle as _};
6use futures::ready;
7use std::fmt;
8use std::future::poll_fn;
9use std::mem::MaybeUninit;
10use std::task::{Context, Poll};
11use zerocopy::{FromBytes, Immutable, IntoBytes};
12use zx::{self as zx, AsHandleRef};
13
14pub trait FifoEntry: IntoBytes + FromBytes + Immutable {}
19
20impl<O: IntoBytes + FromBytes + Immutable> FifoEntry for O {}
21
22pub trait FifoWriteBuffer<T> {
24 fn as_slice(&self) -> &[T];
25}
26
27pub unsafe trait FifoReadBuffer<T> {
35 fn count(&self) -> usize;
37 fn as_mut_ptr(&mut self) -> *mut T;
45}
46
47impl<T: FifoEntry> FifoWriteBuffer<T> for [T] {
48 fn as_slice(&self) -> &[T] {
49 self
50 }
51}
52
53unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [T] {
54 fn count(&self) -> usize {
55 self.len()
56 }
57
58 fn as_mut_ptr(&mut self) -> *mut T {
59 self.as_mut_ptr()
60 }
61}
62
63impl<T: FifoEntry> FifoWriteBuffer<T> for T {
64 fn as_slice(&self) -> &[T] {
65 std::slice::from_ref(self)
66 }
67}
68
69unsafe impl<T: FifoEntry> FifoReadBuffer<T> for T {
70 fn count(&self) -> usize {
71 1
72 }
73
74 fn as_mut_ptr(&mut self) -> *mut T {
75 self as *mut T
76 }
77}
78
79unsafe impl<T: FifoEntry> FifoReadBuffer<T> for MaybeUninit<T> {
80 fn count(&self) -> usize {
81 1
82 }
83
84 fn as_mut_ptr(&mut self) -> *mut T {
85 self.as_mut_ptr()
86 }
87}
88
89unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [MaybeUninit<T>] {
90 fn count(&self) -> usize {
91 self.len()
92 }
93
94 fn as_mut_ptr(&mut self) -> *mut T {
95 self.as_mut_ptr() as *mut T
98 }
99}
100
101pub struct Fifo<R, W = R> {
103 handle: RWHandle<zx::Fifo<R, W>>,
104}
105
106impl<R, W> AsRef<zx::Fifo<R, W>> for Fifo<R, W> {
107 fn as_ref(&self) -> &zx::Fifo<R, W> {
108 self.handle.get_ref()
109 }
110}
111
112impl<R, W> AsHandleRef for Fifo<R, W> {
113 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
114 self.handle.get_ref().as_handle_ref()
115 }
116}
117
118impl<R, W> From<Fifo<R, W>> for zx::Fifo<R, W> {
119 fn from(fifo: Fifo<R, W>) -> zx::Fifo<R, W> {
120 fifo.handle.into_inner()
121 }
122}
123
124impl<R: FifoEntry, W: FifoEntry> Fifo<R, W> {
125 pub fn from_fifo(fifo: impl Into<zx::Fifo<R, W>>) -> Self {
131 Fifo { handle: RWHandle::new(fifo.into()) }
132 }
133
134 pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
142 &self,
143 cx: &mut Context<'_>,
144 entries: &B,
145 ) -> Poll<Result<usize, zx::Status>> {
146 ready!(self.handle.poll_writable(cx)?);
147
148 let entries = entries.as_slice();
149 let fifo = self.as_ref();
150 loop {
153 let result = unsafe { fifo.write_raw(entries.as_ptr(), entries.len()) };
154 match result {
155 Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_writable(cx)?),
156 Err(e) => return Poll::Ready(Err(e)),
157 Ok(count) => return Poll::Ready(Ok(count)),
158 }
159 }
160 }
161
162 pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
168 &self,
169 cx: &mut Context<'_>,
170 entries: &mut B,
171 ) -> Poll<Result<usize, zx::Status>> {
172 ready!(self.handle.poll_readable(cx)?);
173
174 let buf = entries.as_mut_ptr();
175 let count = entries.count();
176 let fifo = self.as_ref();
177
178 loop {
179 let result = unsafe { fifo.read_raw(buf, count) };
183
184 match result {
185 Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_readable(cx)?),
186 Err(e) => return Poll::Ready(Err(e)),
187 Ok(count) => return Poll::Ready(Ok(count)),
188 }
189 }
190 }
191
192 pub fn async_io(&mut self) -> (FifoReader<'_, R, W>, FifoWriter<'_, R, W>) {
195 (FifoReader(self), FifoWriter(self))
196 }
197}
198
199pub struct FifoWriter<'a, R, W>(&'a Fifo<R, W>);
200
201impl<R: FifoEntry, W: FifoEntry> FifoWriter<'_, R, W> {
202 pub async fn write_entries(
205 &mut self,
206 entries: &(impl ?Sized + FifoWriteBuffer<W>),
207 ) -> Result<(), zx::Status> {
208 let mut entries = entries.as_slice();
209 poll_fn(|cx| {
210 while !entries.is_empty() {
211 match ready!(self.0.try_write(cx, entries)) {
212 Ok(count) => entries = &entries[count..],
213 Err(status) => return Poll::Ready(Err(status)),
214 }
215 }
216 Poll::Ready(Ok(()))
217 })
218 .await
219 }
220
221 pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
223 &mut self,
224 cx: &mut Context<'_>,
225 entries: &B,
226 ) -> Poll<Result<usize, zx::Status>> {
227 self.0.try_write(cx, entries)
228 }
229}
230
231pub struct FifoReader<'a, R, W>(&'a Fifo<R, W>);
232
233impl<R: FifoEntry, W: FifoEntry> FifoReader<'_, R, W> {
234 pub async fn read_entries(
235 &mut self,
236 entries: &mut (impl ?Sized + FifoReadBuffer<R>),
237 ) -> Result<usize, zx::Status> {
238 poll_fn(|cx| self.0.try_read(cx, entries)).await
239 }
240
241 pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
243 &mut self,
244 cx: &mut Context<'_>,
245 entries: &mut B,
246 ) -> Poll<Result<usize, zx::Status>> {
247 self.0.try_read(cx, entries)
248 }
249}
250
251impl<R, W> fmt::Debug for Fifo<R, W> {
252 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
253 self.handle.get_ref().fmt(f)
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use crate::{DurationExt, TestExecutor, TimeoutExt, Timer};
261 use futures::future::try_join;
262 use futures::prelude::*;
263 use zerocopy::{Immutable, KnownLayout};
264 use zx::prelude::*;
265
266 #[derive(
267 Copy, Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
268 )]
269 #[repr(C)]
270 struct Entry {
271 a: u32,
272 b: u32,
273 }
274
275 #[derive(
276 Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
277 )]
278 #[repr(C)]
279 struct WrongEntry {
280 a: u16,
281 }
282
283 #[test]
284 fn can_read_write() {
285 let mut exec = TestExecutor::new();
286 let element = Entry { a: 10, b: 20 };
287
288 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
289 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
290 let (_, mut tx) = tx.async_io();
291 let (mut rx, _) = rx.async_io();
292
293 let mut buffer = Entry::default();
294 let receive_future = rx.read_entries(&mut buffer).map_ok(|count| {
295 assert_eq!(count, 1);
296 });
297
298 let receiver = receive_future
300 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
301
302 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
304 .then(|()| tx.write_entries(&element));
305
306 let done = try_join(receiver, sender);
307 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
308 assert_eq!(buffer, element);
309 }
310
311 #[test]
312 fn read_wrong_size() {
313 let mut exec = TestExecutor::new();
314 let elements = &[Entry { a: 10, b: 20 }][..];
315
316 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
317 let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
318 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(wrong_rx));
319 let (_, mut tx) = tx.async_io();
320 let (mut rx, _) = rx.async_io();
321
322 let mut buffer = WrongEntry::default();
323 let receive_future = rx
324 .read_entries(&mut buffer)
325 .map_ok(|count| panic!("read should have failed, got {}", count));
326
327 let receiver = receive_future
329 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
330
331 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
333 .then(|()| tx.write_entries(elements));
334
335 let done = try_join(receiver, sender);
336 let res = exec.run_singlethreaded(done);
337 match res {
338 Err(zx::Status::OUT_OF_RANGE) => (),
339 _ => panic!("did not get out-of-range error"),
340 }
341 }
342
343 #[test]
344 fn write_wrong_size() {
345 let mut exec = TestExecutor::new();
346 let elements = &[WrongEntry { a: 10 }][..];
347
348 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
349 let wrong_tx = zx::Fifo::<WrongEntry>::from(tx.into_handle());
350 let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
351 let (mut tx, _rx) = (Fifo::from_fifo(wrong_tx), Fifo::from_fifo(wrong_rx));
352 let (_, mut tx) = tx.async_io();
353
354 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
355 .then(|()| tx.write_entries(elements));
356
357 let res = exec.run_singlethreaded(sender);
358 match res {
359 Err(zx::Status::OUT_OF_RANGE) => (),
360 _ => panic!("did not get out-of-range error"),
361 }
362 }
363
364 #[test]
365 fn write_into_full() {
366 use std::sync::atomic::{AtomicUsize, Ordering};
367
368 let mut exec = TestExecutor::new();
369 let elements =
370 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
371
372 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
373 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
374
375 let writes_completed = AtomicUsize::new(0);
378 let sender = async {
379 let (_, mut writer) = tx.async_io();
380 writer.write_entries(&elements[..2]).await?;
381 writes_completed.fetch_add(1, Ordering::SeqCst);
382 writer.write_entries(&elements[2..]).await?;
383 writes_completed.fetch_add(1, Ordering::SeqCst);
384 Ok::<(), zx::Status>(())
385 };
386
387 let receive_future = async {
389 Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
390 let mut buffer = Entry::default();
391 let (mut reader, _) = rx.async_io();
392 let count = reader.read_entries(&mut buffer).await?;
393 assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
394 assert_eq!(count, 1);
395 assert_eq!(buffer, elements[0]);
396 let count = reader.read_entries(&mut buffer).await?;
397 assert_eq!(count, 1);
400 assert_eq!(buffer, elements[1]);
401 let count = reader.read_entries(&mut buffer).await?;
402 assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
403 assert_eq!(count, 1);
404 assert_eq!(buffer, elements[2]);
405 Ok::<(), zx::Status>(())
406 };
407
408 let receiver = receive_future
410 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
411
412 let done = try_join(receiver, sender);
413
414 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
415 }
416
417 #[test]
418 fn write_more_than_full() {
419 let mut exec = TestExecutor::new();
420 let elements =
421 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
422
423 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
424 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
425 let (_, mut tx) = tx.async_io();
426 let (mut rx, _) = rx.async_io();
427
428 let sender = tx.write_entries(elements);
429
430 let receive_future = async {
432 Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
433 for e in elements {
434 let mut buffer = [Entry::default(); 1];
435 let count = rx.read_entries(&mut buffer[..]).await?;
436 assert_eq!(count, 1);
437 assert_eq!(&buffer[0], e);
438 }
439 Ok::<(), zx::Status>(())
440 };
441
442 let receiver = receive_future
444 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
445
446 let done = try_join(receiver, sender);
447
448 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
449 }
450
451 #[test]
452 fn read_multiple() {
453 let mut exec = TestExecutor::new();
454 let elements =
455 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
456 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
457 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
458
459 let write_fut = async {
460 tx.async_io().1.write_entries(&elements[..]).await.expect("failed write entries");
461 };
462 let read_fut = async {
463 let mut buffer = [Entry::default(); 5];
465 let count = rx
466 .async_io()
467 .0
468 .read_entries(&mut buffer[..])
469 .await
470 .expect("failed to read entries");
471 assert_eq!(count, elements.len());
472 assert_eq!(&buffer[..count], &elements[..]);
473 };
474 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
475 }
476
477 #[test]
478 fn read_one() {
479 let mut exec = TestExecutor::new();
480 let elements =
481 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
482 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
483 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
484
485 let write_fut = async {
486 tx.async_io().1.write_entries(&elements[..]).await.expect("failed write entries");
487 };
488 let read_fut = async {
489 let (mut reader, _) = rx.async_io();
490 for e in elements {
491 let mut entry = Entry::default();
492 assert_eq!(reader.read_entries(&mut entry).await.expect("failed to read entry"), 1);
493 assert_eq!(&entry, e);
494 }
495 };
496 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
497 }
498
499 #[test]
500 fn maybe_uninit_single() {
501 let mut exec = TestExecutor::new();
502 let element = Entry { a: 10, b: 20 };
503 let (tx, rx) = zx::Fifo::<Entry>::create(1).expect("failed to create zx fifo");
504 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
505
506 let write_fut = async {
507 tx.async_io().1.write_entries(&element).await.expect("failed write entries");
508 };
509 let read_fut = async {
510 let mut buffer = MaybeUninit::<Entry>::uninit();
511 let count =
512 rx.async_io().0.read_entries(&mut buffer).await.expect("failed to read entries");
513 assert_eq!(count, 1);
514 let read = unsafe { buffer.assume_init() };
516 assert_eq!(read, element);
517 };
518 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
519 }
520
521 #[test]
522 fn maybe_uninit_slice() {
523 let mut exec = TestExecutor::new();
524 let elements =
525 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
526 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
527 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
528
529 let write_fut = async {
530 tx.async_io().1.write_entries(&elements[..]).await.expect("failed write entries");
531 };
532 let read_fut = async {
533 let mut buffer = [MaybeUninit::<Entry>::uninit(); 15];
535 let count = rx
536 .async_io()
537 .0
538 .read_entries(&mut buffer[..])
539 .await
540 .expect("failed to read entries");
541 assert_eq!(count, elements.len());
542 let read = &mut buffer[..count];
543 for (i, v) in read.iter_mut().enumerate() {
544 let read = unsafe { v.assume_init_ref() };
547 assert_eq!(read, &elements[i]);
548 unsafe {
552 v.assume_init_drop();
553 }
554 }
555 };
556 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
557 }
558}