1use super::rwhandle::{RWHandle, ReadableHandle as _, WritableHandle as _};
6use futures::ready;
7use std::fmt;
8use std::future::poll_fn;
9use std::mem::MaybeUninit;
10use std::task::{Context, Poll};
11use zerocopy::{FromBytes, Immutable, IntoBytes};
12use zx::{self as zx, AsHandleRef};
13
14pub trait FifoEntry: IntoBytes + FromBytes + Immutable {}
19
20impl<O: IntoBytes + FromBytes + Immutable> FifoEntry for O {}
21
22pub trait FifoWriteBuffer<T> {
24 fn as_slice(&self) -> &[T];
25}
26
27pub unsafe trait FifoReadBuffer<T> {
35 fn count(&self) -> usize;
37 fn as_mut_ptr(&mut self) -> *mut T;
45}
46
47impl<T: FifoEntry, const N: usize> FifoWriteBuffer<T> for [T; N] {
48 fn as_slice(&self) -> &[T] {
49 self
50 }
51}
52
53impl<T: FifoEntry> FifoWriteBuffer<T> for [T] {
54 fn as_slice(&self) -> &[T] {
55 self
56 }
57}
58
59unsafe impl<T: FifoEntry, const N: usize> FifoReadBuffer<T> for [T; N] {
60 fn count(&self) -> usize {
61 N
62 }
63
64 fn as_mut_ptr(&mut self) -> *mut T {
65 self.as_mut_slice().as_mut_ptr()
66 }
67}
68
69unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [T] {
70 fn count(&self) -> usize {
71 self.len()
72 }
73
74 fn as_mut_ptr(&mut self) -> *mut T {
75 self.as_mut_ptr()
76 }
77}
78
79impl<T: FifoEntry> FifoWriteBuffer<T> for T {
80 fn as_slice(&self) -> &[T] {
81 std::slice::from_ref(self)
82 }
83}
84
85unsafe impl<T: FifoEntry> FifoReadBuffer<T> for T {
86 fn count(&self) -> usize {
87 1
88 }
89
90 fn as_mut_ptr(&mut self) -> *mut T {
91 self as *mut T
92 }
93}
94
95unsafe impl<T: FifoEntry> FifoReadBuffer<T> for MaybeUninit<T> {
96 fn count(&self) -> usize {
97 1
98 }
99
100 fn as_mut_ptr(&mut self) -> *mut T {
101 self.as_mut_ptr()
102 }
103}
104
105unsafe impl<T: FifoEntry> FifoReadBuffer<T> for [MaybeUninit<T>] {
106 fn count(&self) -> usize {
107 self.len()
108 }
109
110 fn as_mut_ptr(&mut self) -> *mut T {
111 self.as_mut_ptr() as *mut T
114 }
115}
116
117pub struct Fifo<R, W = R> {
119 handle: RWHandle<zx::Fifo<R, W>>,
120}
121
122impl<R, W> AsRef<zx::Fifo<R, W>> for Fifo<R, W> {
123 fn as_ref(&self) -> &zx::Fifo<R, W> {
124 self.handle.get_ref()
125 }
126}
127
128impl<R, W> AsHandleRef for Fifo<R, W> {
129 fn as_handle_ref(&self) -> zx::HandleRef<'_> {
130 self.handle.get_ref().as_handle_ref()
131 }
132}
133
134impl<R, W> From<Fifo<R, W>> for zx::Fifo<R, W> {
135 fn from(fifo: Fifo<R, W>) -> zx::Fifo<R, W> {
136 fifo.handle.into_inner()
137 }
138}
139
140impl<R: FifoEntry, W: FifoEntry> Fifo<R, W> {
141 pub fn from_fifo(fifo: impl Into<zx::Fifo<R, W>>) -> Self {
147 Fifo { handle: RWHandle::new(fifo.into()) }
148 }
149
150 pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
158 &self,
159 cx: &mut Context<'_>,
160 entries: &B,
161 ) -> Poll<Result<usize, zx::Status>> {
162 ready!(self.handle.poll_writable(cx)?);
163
164 let entries = entries.as_slice();
165 let fifo = self.as_ref();
166 loop {
169 let result = unsafe { fifo.write_raw(entries.as_ptr(), entries.len()) };
170 match result {
171 Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_writable(cx)?),
172 Err(e) => return Poll::Ready(Err(e)),
173 Ok(count) => return Poll::Ready(Ok(count)),
174 }
175 }
176 }
177
178 pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
184 &self,
185 cx: &mut Context<'_>,
186 entries: &mut B,
187 ) -> Poll<Result<usize, zx::Status>> {
188 ready!(self.handle.poll_readable(cx)?);
189
190 let buf = entries.as_mut_ptr();
191 let count = entries.count();
192 let fifo = self.as_ref();
193
194 loop {
195 let result = unsafe { fifo.read_raw(buf, count) };
199
200 match result {
201 Err(zx::Status::SHOULD_WAIT) => ready!(self.handle.need_readable(cx)?),
202 Err(e) => return Poll::Ready(Err(e)),
203 Ok(count) => return Poll::Ready(Ok(count)),
204 }
205 }
206 }
207
208 pub fn async_io(&mut self) -> (FifoReader<'_, R, W>, FifoWriter<'_, R, W>) {
211 (FifoReader(self), FifoWriter(self))
212 }
213}
214
215pub struct FifoWriter<'a, R, W>(&'a Fifo<R, W>);
216
217impl<R: FifoEntry, W: FifoEntry> FifoWriter<'_, R, W> {
218 pub async fn write_entries(
221 &mut self,
222 entries: &(impl ?Sized + FifoWriteBuffer<W>),
223 ) -> Result<(), zx::Status> {
224 let mut entries = entries.as_slice();
225 poll_fn(|cx| {
226 while !entries.is_empty() {
227 match ready!(self.0.try_write(cx, entries)) {
228 Ok(count) => entries = &entries[count..],
229 Err(status) => return Poll::Ready(Err(status)),
230 }
231 }
232 Poll::Ready(Ok(()))
233 })
234 .await
235 }
236
237 pub fn try_write<B: ?Sized + FifoWriteBuffer<W>>(
239 &mut self,
240 cx: &mut Context<'_>,
241 entries: &B,
242 ) -> Poll<Result<usize, zx::Status>> {
243 self.0.try_write(cx, entries)
244 }
245}
246
247pub struct FifoReader<'a, R, W>(&'a Fifo<R, W>);
248
249impl<R: FifoEntry, W: FifoEntry> FifoReader<'_, R, W> {
250 pub async fn read_entries(
251 &mut self,
252 entries: &mut (impl ?Sized + FifoReadBuffer<R>),
253 ) -> Result<usize, zx::Status> {
254 poll_fn(|cx| self.0.try_read(cx, entries)).await
255 }
256
257 pub fn try_read<B: ?Sized + FifoReadBuffer<R>>(
259 &mut self,
260 cx: &mut Context<'_>,
261 entries: &mut B,
262 ) -> Poll<Result<usize, zx::Status>> {
263 self.0.try_read(cx, entries)
264 }
265}
266
267impl<R, W> fmt::Debug for Fifo<R, W> {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 self.handle.get_ref().fmt(f)
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use crate::{DurationExt, TestExecutor, TimeoutExt, Timer};
277 use futures::future::try_join;
278 use futures::prelude::*;
279 use zerocopy::{Immutable, KnownLayout};
280 use zx::prelude::*;
281
282 #[derive(
283 Copy, Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
284 )]
285 #[repr(C)]
286 struct Entry {
287 a: u32,
288 b: u32,
289 }
290
291 #[derive(
292 Clone, Debug, PartialEq, Eq, Default, IntoBytes, KnownLayout, FromBytes, Immutable,
293 )]
294 #[repr(C)]
295 struct WrongEntry {
296 a: u16,
297 }
298
299 #[test]
300 fn can_read_write() {
301 let mut exec = TestExecutor::new();
302 let element = Entry { a: 10, b: 20 };
303
304 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
305 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
306 let (_, mut tx) = tx.async_io();
307 let (mut rx, _) = rx.async_io();
308
309 let mut buffer = Entry::default();
310 let receive_future = rx.read_entries(&mut buffer).map_ok(|count| {
311 assert_eq!(count, 1);
312 });
313
314 let receiver = receive_future
316 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
317
318 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
320 .then(|()| tx.write_entries(&element));
321
322 let done = try_join(receiver, sender);
323 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
324 assert_eq!(buffer, element);
325 }
326
327 #[test]
328 fn read_wrong_size() {
329 let mut exec = TestExecutor::new();
330 let elements = &[Entry { a: 10, b: 20 }][..];
331
332 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
333 let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
334 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(wrong_rx));
335 let (_, mut tx) = tx.async_io();
336 let (mut rx, _) = rx.async_io();
337
338 let mut buffer = WrongEntry::default();
339 let receive_future = rx
340 .read_entries(&mut buffer)
341 .map_ok(|count| panic!("read should have failed, got {count}"));
342
343 let receiver = receive_future
345 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
346
347 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
349 .then(|()| tx.write_entries(elements));
350
351 let done = try_join(receiver, sender);
352 let res = exec.run_singlethreaded(done);
353 match res {
354 Err(zx::Status::OUT_OF_RANGE) => (),
355 _ => panic!("did not get out-of-range error"),
356 }
357 }
358
359 #[test]
360 fn write_wrong_size() {
361 let mut exec = TestExecutor::new();
362 let elements = &[WrongEntry { a: 10 }][..];
363
364 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
365 let wrong_tx = zx::Fifo::<WrongEntry>::from(tx.into_handle());
366 let wrong_rx = zx::Fifo::<WrongEntry>::from(rx.into_handle());
367 let (mut tx, _rx) = (Fifo::from_fifo(wrong_tx), Fifo::from_fifo(wrong_rx));
368 let (_, mut tx) = tx.async_io();
369
370 let sender = Timer::new(zx::MonotonicDuration::from_millis(10).after_now())
371 .then(|()| tx.write_entries(elements));
372
373 let res = exec.run_singlethreaded(sender);
374 match res {
375 Err(zx::Status::OUT_OF_RANGE) => (),
376 _ => panic!("did not get out-of-range error"),
377 }
378 }
379
380 #[test]
381 fn write_into_full() {
382 use std::sync::atomic::{AtomicUsize, Ordering};
383
384 let mut exec = TestExecutor::new();
385 let elements =
386 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
387
388 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
389 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
390
391 let writes_completed = AtomicUsize::new(0);
394 let sender = async {
395 let (_, mut writer) = tx.async_io();
396 writer.write_entries(&elements[..2]).await?;
397 writes_completed.fetch_add(1, Ordering::SeqCst);
398 writer.write_entries(&elements[2..]).await?;
399 writes_completed.fetch_add(1, Ordering::SeqCst);
400 Ok::<(), zx::Status>(())
401 };
402
403 let receive_future = async {
405 Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
406 let mut buffer = Entry::default();
407 let (mut reader, _) = rx.async_io();
408 let count = reader.read_entries(&mut buffer).await?;
409 assert_eq!(writes_completed.load(Ordering::SeqCst), 1);
410 assert_eq!(count, 1);
411 assert_eq!(buffer, elements[0]);
412 let count = reader.read_entries(&mut buffer).await?;
413 assert_eq!(count, 1);
416 assert_eq!(buffer, elements[1]);
417 let count = reader.read_entries(&mut buffer).await?;
418 assert_eq!(writes_completed.load(Ordering::SeqCst), 2);
419 assert_eq!(count, 1);
420 assert_eq!(buffer, elements[2]);
421 Ok::<(), zx::Status>(())
422 };
423
424 let receiver = receive_future
426 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
427
428 let done = try_join(receiver, sender);
429
430 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
431 }
432
433 #[test]
434 fn write_more_than_full() {
435 let mut exec = TestExecutor::new();
436 let elements =
437 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
438
439 let (tx, rx) = zx::Fifo::<Entry>::create(2).expect("failed to create zx fifo");
440 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
441 let (_, mut tx) = tx.async_io();
442 let (mut rx, _) = rx.async_io();
443
444 let sender = tx.write_entries(elements);
445
446 let receive_future = async {
448 Timer::new(zx::MonotonicDuration::from_millis(10).after_now()).await;
449 for e in elements {
450 let mut buffer = [Entry::default(); 1];
451 let count = rx.read_entries(&mut buffer[..]).await?;
452 assert_eq!(count, 1);
453 assert_eq!(&buffer[0], e);
454 }
455 Ok::<(), zx::Status>(())
456 };
457
458 let receiver = receive_future
460 .on_timeout(zx::MonotonicDuration::from_millis(300).after_now(), || panic!("timeout"));
461
462 let done = try_join(receiver, sender);
463
464 exec.run_singlethreaded(done).expect("failed to run receive future on executor");
465 }
466
467 #[test]
468 fn read_multiple() {
469 let mut exec = TestExecutor::new();
470 let elements =
471 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
472 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
473 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
474
475 let write_fut = async {
476 tx.async_io().1.write_entries(elements).await.expect("failed write entries");
477 };
478 let read_fut = async {
479 let mut buffer = [Entry::default(); 5];
481 let count = rx
482 .async_io()
483 .0
484 .read_entries(&mut buffer[..])
485 .await
486 .expect("failed to read entries");
487 assert_eq!(count, elements.len());
488 assert_eq!(&buffer[..count], elements);
489 };
490 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
491 }
492
493 #[test]
494 fn read_one() {
495 let mut exec = TestExecutor::new();
496 let elements =
497 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
498 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
499 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
500
501 let write_fut = async {
502 tx.async_io().1.write_entries(elements).await.expect("failed write entries");
503 };
504 let read_fut = async {
505 let (mut reader, _) = rx.async_io();
506 for e in elements {
507 let mut entry = Entry::default();
508 assert_eq!(reader.read_entries(&mut entry).await.expect("failed to read entry"), 1);
509 assert_eq!(&entry, e);
510 }
511 };
512 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
513 }
514
515 #[test]
516 fn maybe_uninit_single() {
517 let mut exec = TestExecutor::new();
518 let element = Entry { a: 10, b: 20 };
519 let (tx, rx) = zx::Fifo::<Entry>::create(1).expect("failed to create zx fifo");
520 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
521
522 let write_fut = async {
523 tx.async_io().1.write_entries(&element).await.expect("failed write entries");
524 };
525 let read_fut = async {
526 let mut buffer = MaybeUninit::<Entry>::uninit();
527 let count =
528 rx.async_io().0.read_entries(&mut buffer).await.expect("failed to read entries");
529 assert_eq!(count, 1);
530 let read = unsafe { buffer.assume_init() };
532 assert_eq!(read, element);
533 };
534 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
535 }
536
537 #[test]
538 fn maybe_uninit_slice() {
539 let mut exec = TestExecutor::new();
540 let elements =
541 &[Entry { a: 10, b: 20 }, Entry { a: 30, b: 40 }, Entry { a: 50, b: 60 }][..];
542 let (tx, rx) = zx::Fifo::<Entry>::create(elements.len()).expect("failed to create zx fifo");
543 let (mut tx, mut rx) = (Fifo::from_fifo(tx), Fifo::from_fifo(rx));
544
545 let write_fut = async {
546 tx.async_io().1.write_entries(elements).await.expect("failed write entries");
547 };
548 let read_fut = async {
549 let mut buffer = [MaybeUninit::<Entry>::uninit(); 15];
551 let count = rx
552 .async_io()
553 .0
554 .read_entries(&mut buffer[..])
555 .await
556 .expect("failed to read entries");
557 assert_eq!(count, elements.len());
558 let read = &mut buffer[..count];
559 for (i, v) in read.iter_mut().enumerate() {
560 let read = unsafe { v.assume_init_ref() };
563 assert_eq!(read, &elements[i]);
564 unsafe {
568 v.assume_init_drop();
569 }
570 }
571 };
572 let ((), ()) = exec.run_singlethreaded(futures::future::join(write_fut, read_fut));
573 }
574}