1use core::future::Future;
8use std::mem::ManuallyDrop;
9use std::sync::Arc;
10use zx::Status;
11
12use crate::arena::{Arena, ArenaBox};
13use crate::futures::{ReadMessageState, ReadMessageStateOp};
14use crate::message::Message;
15use fdf_core::dispatcher::OnDispatcher;
16use fdf_core::handle::{DriverHandle, MixedHandle};
17use fdf_sys::*;
18
19use core::marker::PhantomData;
20use core::mem::{MaybeUninit, size_of_val};
21use core::num::NonZero;
22use core::pin::Pin;
23use core::ptr::{NonNull, null_mut};
24use core::task::{Context, Poll};
25
26pub use fdf_sys::fdf_handle_t;
27
28#[derive(Debug)]
30pub struct Channel<T: ?Sized + 'static> {
31 pub(crate) handle: ManuallyDrop<DriverHandle>,
34 pub(crate) wait_state: Option<Arc<ReadMessageStateOp>>,
35 _p: PhantomData<Message<T>>,
36}
37
38impl<T: ?Sized> Drop for Channel<T> {
39 fn drop(&mut self) {
40 let mut can_drop = true;
41
42 if let Some(current_wait) = &self.wait_state {
43 can_drop = current_wait.set_channel_dropped();
46 }
47
48 if can_drop {
49 unsafe {
52 ManuallyDrop::drop(&mut self.handle);
53 }
54 };
55 }
56}
57
58impl<T: ?Sized + 'static> Channel<T> {
59 pub fn create() -> (Self, Self) {
62 let mut channel1 = 0;
63 let mut channel2 = 0;
64 Status::ok(unsafe { fdf_channel_create(0, &mut channel1, &mut channel2) })
67 .expect("failed to create channel pair");
68 unsafe {
71 (
72 Self::from_handle_unchecked(NonZero::new_unchecked(channel1)),
73 Self::from_handle_unchecked(NonZero::new_unchecked(channel2)),
74 )
75 }
76 }
77
78 pub fn driver_handle(&self) -> &DriverHandle {
80 &self.handle
81 }
82
83 pub fn into_driver_handle(self) -> DriverHandle {
91 assert!(
92 self.wait_state.is_none(),
93 "A read wait has been registered on this channel so it can't be destructured"
94 );
95
96 let handle = unsafe { self.handle.get_raw() };
100
101 std::mem::forget(self);
104
105 unsafe { DriverHandle::new_unchecked(handle) }
108 }
109
110 unsafe fn from_handle_unchecked(handle: NonZero<fdf_handle_t>) -> Self {
117 Self {
119 handle: ManuallyDrop::new(unsafe { DriverHandle::new_unchecked(handle) }),
120 wait_state: None,
121 _p: PhantomData,
122 }
123 }
124
125 pub unsafe fn from_driver_handle(handle: DriverHandle) -> Self {
133 Self { handle: ManuallyDrop::new(handle), wait_state: None, _p: PhantomData }
134 }
135
136 pub fn write(&self, message: Message<T>) -> Result<(), Status> {
141 let data_len = message.data().map_or(0, |data| size_of_val(data) as u32);
143 let handles_count = message.handles().map_or(0, |handles| handles.len() as u32);
144
145 let (arena, data, handles) = message.into_raw();
146
147 let data_ptr = data.map_or(null_mut(), |data| data.cast().as_ptr());
149 let handles_ptr = handles.map_or(null_mut(), |handles| handles.cast().as_ptr());
150
151 Status::ok(unsafe {
159 fdf_channel_write(
160 self.handle.get_raw().get(),
161 0,
162 arena.as_ptr(),
163 data_ptr,
164 data_len,
165 handles_ptr,
166 handles_count,
167 )
168 })?;
169
170 unsafe { fdf_arena_drop_ref(arena.as_ptr()) };
174 Ok(())
175 }
176
177 pub fn write_with<F>(&self, arena: Arena, f: F) -> Result<(), Status>
179 where
180 F: for<'a> FnOnce(
181 &'a Arena,
182 )
183 -> (Option<ArenaBox<'a, T>>, Option<ArenaBox<'a, [Option<MixedHandle>]>>),
184 {
185 self.write(Message::new_with(arena, f))
186 }
187
188 pub fn write_with_data<F>(&self, arena: Arena, f: F) -> Result<(), Status>
190 where
191 F: for<'a> FnOnce(&'a Arena) -> ArenaBox<'a, T>,
192 {
193 self.write(Message::new_with_data(arena, f))
194 }
195}
196
197pub(crate) fn try_read_raw(
201 channel: &DriverHandle,
202) -> Result<Option<Message<[MaybeUninit<u8>]>>, Status> {
203 let mut out_arena = null_mut();
204 let mut out_data = null_mut();
205 let mut out_num_bytes = 0;
206 let mut out_handles = null_mut();
207 let mut out_num_handles = 0;
208 Status::ok(unsafe {
209 fdf_channel_read(
210 channel.get_raw().get(),
211 0,
212 &mut out_arena,
213 &mut out_data,
214 &mut out_num_bytes,
215 &mut out_handles,
216 &mut out_num_handles,
217 )
218 })?;
219 if out_arena.is_null() {
221 return Ok(None);
222 }
223 let arena = Arena(unsafe { NonNull::new_unchecked(out_arena) });
225 let data_ptr = if !out_data.is_null() {
226 let ptr = core::ptr::slice_from_raw_parts_mut(out_data.cast(), out_num_bytes as usize);
227 Some(unsafe { ArenaBox::new(NonNull::new_unchecked(ptr)) })
230 } else {
231 None
232 };
233 let handles_ptr = if !out_handles.is_null() {
234 let ptr = core::ptr::slice_from_raw_parts_mut(out_handles.cast(), out_num_handles as usize);
235 Some(unsafe { ArenaBox::new(NonNull::new_unchecked(ptr)) })
238 } else {
239 None
240 };
241 Ok(Some(unsafe { Message::new_unchecked(arena, data_ptr, handles_ptr) }))
242}
243
244pub(crate) unsafe fn read_raw<T: ?Sized, D>(
255 channel: &mut Channel<T>,
256 dispatcher: D,
257) -> ReadMessageRawFut<D> {
258 let raw_fut = unsafe { ReadMessageState::register_read_wait(channel) };
260 ReadMessageRawFut { raw_fut, dispatcher }
261}
262
263impl<T> Channel<T> {
264 pub fn try_read(&self) -> Result<Option<Message<T>>, Status> {
266 let Some(message) = try_read_raw(&self.handle)? else {
268 return Ok(None);
269 };
270 Ok(Some(unsafe { message.cast_unchecked() }))
273 }
274
275 pub async fn read<D: OnDispatcher>(
277 &mut self,
278 dispatcher: D,
279 ) -> Result<Option<Message<T>>, Status> {
280 let Some(message) = unsafe { read_raw(self, dispatcher) }.await? else {
284 return Ok(None);
285 };
286 Ok(Some(unsafe { message.cast_unchecked() }))
289 }
290}
291
292impl Channel<[u8]> {
293 pub fn try_read_bytes(&self) -> Result<Option<Message<[u8]>>, Status> {
295 let Some(message) = try_read_raw(&self.handle)? else {
297 return Ok(None);
298 };
299 Ok(Some(unsafe { message.assume_init() }))
302 }
303
304 pub async fn read_bytes<D: OnDispatcher>(
306 &mut self,
307 dispatcher: D,
308 ) -> Result<Option<Message<[u8]>>, Status> {
309 let Some(message) = unsafe { read_raw(self, dispatcher) }.await? else {
314 return Ok(None);
315 };
316 Ok(Some(unsafe { message.assume_init() }))
319 }
320}
321
322impl<T> From<Channel<T>> for MixedHandle {
323 fn from(value: Channel<T>) -> Self {
324 MixedHandle::from(value.into_driver_handle())
325 }
326}
327
328impl<T: ?Sized> std::cmp::Ord for Channel<T> {
329 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
330 self.handle.cmp(&other.handle)
331 }
332}
333
334impl<T: ?Sized> std::cmp::PartialOrd for Channel<T> {
335 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
336 Some(self.cmp(other))
337 }
338}
339
340impl<T: ?Sized> std::cmp::PartialEq for Channel<T> {
341 fn eq(&self, other: &Self) -> bool {
342 self.handle.eq(&other.handle)
343 }
344}
345
346impl<T: ?Sized> std::cmp::Eq for Channel<T> {}
347
348impl<T: ?Sized> std::hash::Hash for Channel<T> {
349 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
350 self.handle.hash(state);
351 }
352}
353
354pub(crate) struct ReadMessageRawFut<D> {
355 pub(crate) raw_fut: ReadMessageState,
356 dispatcher: D,
357}
358
359impl<D: OnDispatcher> Future for ReadMessageRawFut<D> {
360 type Output = Result<Option<Message<[MaybeUninit<u8>]>>, Status>;
361
362 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
363 let dispatcher = self.dispatcher.clone();
364 self.as_mut().raw_fut.poll_with_dispatcher(cx, dispatcher)
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use std::io::{Write, stdout};
371 use std::pin::pin;
372 use std::sync::atomic::{AtomicU64, Ordering};
373 use std::sync::{Arc, mpsc};
374
375 use fdf_core::dispatcher::{
376 CurrentDispatcher, Dispatcher, DispatcherBuilder, DispatcherRef, OnDispatcher,
377 };
378 use fdf_core::handle::MixedHandleType;
379 use fdf_env::test::spawn_in_driver;
380 use futures::channel::oneshot;
381 use futures::poll;
382
383 use super::*;
384 use crate::test_utils::*;
385
386 #[test]
387 fn send_and_receive_bytes_synchronously() {
388 let (first, second) = Channel::create();
389 let arena = Arena::new();
390 assert_eq!(first.try_read_bytes().unwrap_err(), Status::from_raw(ZX_ERR_SHOULD_WAIT));
391 first.write_with_data(arena.clone(), |arena| arena.insert_slice(&[1, 2, 3, 4])).unwrap();
392 assert_eq!(second.try_read_bytes().unwrap().unwrap().data().unwrap(), &[1, 2, 3, 4]);
393 assert_eq!(second.try_read_bytes().unwrap_err(), Status::from_raw(ZX_ERR_SHOULD_WAIT));
394 second.write_with_data(arena.clone(), |arena| arena.insert_slice(&[5, 6, 7, 8])).unwrap();
395 assert_eq!(first.try_read_bytes().unwrap().unwrap().data().unwrap(), &[5, 6, 7, 8]);
396 assert_eq!(first.try_read_bytes().unwrap_err(), Status::from_raw(ZX_ERR_SHOULD_WAIT));
397 assert_eq!(second.try_read_bytes().unwrap_err(), Status::from_raw(ZX_ERR_SHOULD_WAIT));
398 drop(second);
399 assert_eq!(
400 first.write_with_data(arena.clone(), |arena| arena.insert_slice(&[9, 10, 11, 12])),
401 Err(Status::from_raw(ZX_ERR_PEER_CLOSED))
402 );
403 }
404
405 #[test]
406 fn send_and_receive_bytes_asynchronously() {
407 spawn_in_driver("channel async", async {
408 let arena = Arena::new();
409 let (mut first, second) = Channel::create();
410
411 assert!(poll!(pin!(first.read_bytes(CurrentDispatcher))).is_pending());
412 second.write_with_data(arena, |arena| arena.insert_slice(&[1, 2, 3, 4])).unwrap();
413 assert_eq!(
414 first.read_bytes(CurrentDispatcher).await.unwrap().unwrap().data().unwrap(),
415 &[1, 2, 3, 4]
416 );
417 });
418 }
419
420 #[test]
421 fn send_and_receive_objects_synchronously() {
422 let arena = Arena::new();
423 let (first, second) = Channel::create();
424 let (tx, rx) = mpsc::channel();
425 first
426 .write_with_data(arena.clone(), |arena| arena.insert(DropSender::new(1, tx.clone())))
427 .unwrap();
428 rx.try_recv().expect_err("should not drop the object when sent");
429 let message = second.try_read().unwrap().unwrap();
430 assert_eq!(message.data().unwrap().0, 1);
431 rx.try_recv().expect_err("should not drop the object when received");
432 drop(message);
433 rx.try_recv().expect("dropped when received");
434 }
435
436 #[test]
437 fn send_and_receive_handles_synchronously() {
438 println!("Create channels and write one end of one of the channel pairs to the other");
439 let (first, second) = Channel::<()>::create();
440 let (inner_first, inner_second) = Channel::<String>::create();
441 let message = Message::new_with(Arena::new(), |arena| {
442 (None, Some(arena.insert_boxed_slice(Box::new([Some(inner_first.into())]))))
443 });
444 first.write(message).unwrap();
445
446 println!("Receive the channel back on the other end of the first channel pair.");
447 let mut arena = None;
448 let message =
449 second.try_read().unwrap().expect("Expected a message with contents to be received");
450 let (_, received_handles) = message.into_arena_boxes(&mut arena);
451 let mut first_handle_received =
452 ArenaBox::take_boxed_slice(received_handles.expect("expected handles in the message"));
453 let first_handle_received = first_handle_received
454 .first_mut()
455 .expect("expected one handle in the handle set")
456 .take()
457 .expect("expected the first handle to be non-null");
458 let first_handle_received = first_handle_received.resolve();
459 let MixedHandleType::Driver(driver_handle) = first_handle_received else {
460 panic!("Got a non-driver handle when we sent a driver handle");
461 };
462 let inner_first_received = unsafe { Channel::from_driver_handle(driver_handle) };
463
464 println!("Send and receive a string across the now-transmitted channel pair.");
465 inner_first_received
466 .write_with_data(Arena::new(), |arena| arena.insert("boom".to_string()))
467 .unwrap();
468 assert_eq!(inner_second.try_read().unwrap().unwrap().data().unwrap(), &"boom".to_string());
469 }
470
471 async fn ping(mut chan: Channel<u8>) {
472 println!("starting ping!");
473 chan.write_with_data(Arena::new(), |arena| arena.insert(0)).unwrap();
474 while let Ok(Some(msg)) = chan.read(CurrentDispatcher).await {
475 let next = *msg.data().unwrap();
476 println!("ping! {next}");
477 chan.write_with_data(msg.take_arena(), |arena| arena.insert(next + 1)).unwrap();
478 }
479 }
480
481 async fn pong(mut chan: Channel<u8>) {
482 println!("starting pong!");
483 while let Some(msg) = chan.read(CurrentDispatcher).await.unwrap() {
484 let next = *msg.data().unwrap();
485 println!("pong! {next}");
486 if next > 10 {
487 println!("bye!");
488 break;
489 }
490 chan.write_with_data(msg.take_arena(), |arena| arena.insert(next + 1)).unwrap();
491 }
492 }
493
494 #[test]
495 fn async_ping_pong() {
496 spawn_in_driver("async ping pong", async {
497 let (ping_chan, pong_chan) = Channel::create();
498 CurrentDispatcher.spawn_task(ping(ping_chan)).unwrap();
499 pong(pong_chan).await;
500 });
501 }
502
503 #[test]
504 fn async_ping_pong_on_fuchsia_async() {
505 spawn_in_driver("async ping pong", async {
506 let (ping_chan, pong_chan) = Channel::create();
507
508 let fdf_dispatcher = DispatcherBuilder::new()
509 .name("fdf-async")
510 .create()
511 .expect("failure creating non-blocking dispatcher for fdf operations on rust-async dispatcher")
512 .release();
513
514 let rust_async_dispatcher = DispatcherBuilder::new()
515 .name("fuchsia-async")
516 .allow_thread_blocking()
517 .create()
518 .expect("failure creating blocking dispatcher for rust async")
519 .release();
520
521 rust_async_dispatcher
522 .post_task_sync(move |_| {
523 Dispatcher::override_current(fdf_dispatcher, || {
524 let mut executor = fuchsia_async::LocalExecutor::default();
525 executor.run_singlethreaded(ping(ping_chan));
526 });
527 })
528 .unwrap();
529
530 pong(pong_chan).await
531 });
532 }
533
534 async fn recv_lots_of_bytes_with_cancellations(
535 mut rx: Channel<[u8]>,
536 fin_tx: oneshot::Sender<()>,
537 pending_count: Arc<AtomicU64>,
538 ) {
539 let mut immediate_count = 0;
540 let mut count = 0;
541 loop {
542 let mut next_fut = Box::pin(rx.read_bytes(CurrentDispatcher));
546 let next = match futures::poll!(&mut next_fut) {
547 Poll::Pending => {
548 pending_count.fetch_add(1, Ordering::Relaxed);
549 drop(next_fut);
550 rx.read_bytes(CurrentDispatcher).await
551 }
552 Poll::Ready(r) => {
553 immediate_count += 1;
554 r
555 }
556 };
557 match next {
558 Err(Status::PEER_CLOSED) | Ok(None) => break,
559 Err(_) => {
560 next.unwrap();
561 }
562 Ok(Some(msg)) => {
563 assert_eq!(msg.data().unwrap(), &[count as u8; 100]);
564 count += 1;
565 }
566 }
567 }
568 println!("read total: {count}, immediate: {immediate_count}, pending: {pending_count:?}");
569 fin_tx.send(()).unwrap();
571 }
572
573 async fn send_lots_of_bytes(
574 tx: Channel<[u8]>,
575 fin_rx: oneshot::Receiver<()>,
576 pending_count: Arc<AtomicU64>,
577 ) {
578 let arena = Arena::new();
583 print!("writing: ");
584 for i in 0..10000 {
585 tx.write_with_data(arena.clone(), |arena| arena.insert_slice(&[i as u8; 100])).unwrap();
586 print!(".");
590 stdout().flush().unwrap();
591 if pending_count.load(Ordering::Relaxed) > 500 {
592 break;
593 }
594 }
595 drop(tx);
596 fin_rx.await.unwrap();
597 }
598
599 async fn send_and_recv_lots_of_bytes_with_cancellations(dispatcher: DispatcherRef<'static>) {
600 let (tx, rx) = Channel::create();
601 let (fin_tx, fin_rx) = oneshot::channel();
602 let pending_count = Arc::new(AtomicU64::new(0));
603 dispatcher
604 .spawn_task(recv_lots_of_bytes_with_cancellations(rx, fin_tx, pending_count.clone()))
605 .unwrap();
606
607 send_lots_of_bytes(tx, fin_rx, pending_count).await;
608 }
609
610 #[test]
611 fn send_and_recv_lots_of_bytes_with_cancellations_on_synchronized_dispatcher() {
612 spawn_in_driver(
613 "lots of bytes and with some cancellations on a synchronized dispatcher",
614 async {
615 let dispatcher =
616 DispatcherBuilder::new().name("fdf-synchronized").create().unwrap().release();
617
618 send_and_recv_lots_of_bytes_with_cancellations(dispatcher).await;
619 },
620 );
621 }
622
623 #[test]
624 fn send_and_recv_lots_of_bytes_with_cancellations_on_unsynchronized_dispatcher() {
625 spawn_in_driver(
626 "lots of bytes and with some cancellations on an unsynchronized dispatcher",
627 async {
628 let dispatcher = DispatcherBuilder::new()
629 .name("fdf-unsynchronized")
630 .unsynchronized()
631 .create()
632 .unwrap()
633 .release();
634
635 send_and_recv_lots_of_bytes_with_cancellations(dispatcher).await;
636 },
637 );
638 }
639
640 #[test]
641 fn send_and_recv_lots_of_bytes_with_cancellations_on_fuchsia_async_dispatcher() {
642 spawn_in_driver(
643 "lots of bytes and with some cancellations on a fuchsia-async overridden dispatcher",
644 async {
645 let fdf_dispatcher = DispatcherBuilder::new()
646 .name("fdf-async")
647 .create()
648 .expect("failure creating non-blocking dispatcher for fdf operations on rust-async dispatcher")
649 .release();
650
651 let dispatcher = DispatcherBuilder::new()
652 .name("fdf-fuchsia-async")
653 .allow_thread_blocking()
654 .create()
655 .expect("failure creating blocking dispatcher for rust async")
656 .release();
657
658 let (tx, rx) = Channel::create();
659 let (fin_tx, fin_rx) = oneshot::channel();
660 let pending_count = Arc::new(AtomicU64::new(0));
661
662 let pending_count_clone = pending_count.clone();
663 dispatcher
664 .post_task_sync(move |_| {
665 Dispatcher::override_current(fdf_dispatcher, || {
666 let mut executor = fuchsia_async::LocalExecutor::default();
667 executor.run_singlethreaded(recv_lots_of_bytes_with_cancellations(
668 rx,
669 fin_tx,
670 pending_count_clone,
671 ));
672 });
673 })
674 .unwrap();
675
676 send_lots_of_bytes(tx, fin_rx, pending_count).await;
677 },
678 );
679 }
680}