fdf_channel/
channel.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Safe bindings for the driver runtime channel stable ABI
6
7use core::future::Future;
8use std::mem::ManuallyDrop;
9use std::sync::Arc;
10use zx::Status;
11
12use crate::arena::{Arena, ArenaBox};
13use crate::futures::{ReadMessageState, ReadMessageStateOp};
14use crate::message::Message;
15use fdf_core::dispatcher::OnDispatcher;
16use fdf_core::handle::{DriverHandle, MixedHandle};
17use fdf_sys::*;
18
19use core::marker::PhantomData;
20use core::mem::{MaybeUninit, size_of_val};
21use core::num::NonZero;
22use core::pin::Pin;
23use core::ptr::{NonNull, null_mut};
24use core::task::{Context, Poll};
25
26pub use fdf_sys::fdf_handle_t;
27
28/// Implements a message channel through the Fuchsia Driver Runtime
29#[derive(Debug)]
30pub struct Channel<T: ?Sized + 'static> {
31    // Note: if we're waiting on a callback we can't drop the handle until
32    // that callback has fired.
33    pub(crate) handle: ManuallyDrop<DriverHandle>,
34    pub(crate) wait_state: Option<Arc<ReadMessageStateOp>>,
35    _p: PhantomData<Message<T>>,
36}
37
38impl<T: ?Sized> Drop for Channel<T> {
39    fn drop(&mut self) {
40        let mut can_drop = true;
41
42        if let Some(current_wait) = &self.wait_state {
43            // channel_dropped() will return true if we can drop the handle ourselves.
44            // otherwise the channel should not be dropped until the callback is called.
45            can_drop = current_wait.set_channel_dropped();
46        }
47
48        if can_drop {
49            // SAFETY: If there's no current wait active, we are the only
50            // owner of the handle.
51            unsafe {
52                ManuallyDrop::drop(&mut self.handle);
53            }
54        };
55    }
56}
57
58impl<T: ?Sized + 'static> Channel<T> {
59    /// Creates a new channel pair that can be used to send messages of type `T`
60    /// between threads managed by the driver runtime.
61    pub fn create() -> (Self, Self) {
62        let mut channel1 = 0;
63        let mut channel2 = 0;
64        // This call cannot fail as the only reason it would fail is due to invalid
65        // option flags, and 0 is a valid option.
66        Status::ok(unsafe { fdf_channel_create(0, &mut channel1, &mut channel2) })
67            .expect("failed to create channel pair");
68        // SAFETY: if fdf_channel_create returned ZX_OK, it will have placed
69        // valid channel handles that must be non-zero.
70        unsafe {
71            (
72                Self::from_handle_unchecked(NonZero::new_unchecked(channel1)),
73                Self::from_handle_unchecked(NonZero::new_unchecked(channel2)),
74            )
75        }
76    }
77
78    /// Returns a reference to the inner handle of the channel.
79    pub fn driver_handle(&self) -> &DriverHandle {
80        &self.handle
81    }
82
83    /// Takes the inner handle to the channel. The caller is responsible for ensuring
84    /// that the handle is freed.
85    ///
86    /// # Panics
87    ///
88    /// This function will panic if the channel has previously had a read wait
89    /// registered on it.
90    pub fn into_driver_handle(self) -> DriverHandle {
91        assert!(
92            self.wait_state.is_none(),
93            "A read wait has been registered on this channel so it can't be destructured"
94        );
95
96        // SAFETY: We will be forgetting `self` after this, so we can safely
97        // take ownership of the raw handle for reconstituting into a `DriverHandle`
98        // object after.
99        let handle = unsafe { self.handle.get_raw() };
100
101        // we don't want to call drop here because we've taken the handle out of the
102        // object.
103        std::mem::forget(self);
104
105        // SAFETY: We just took this handle from the object we just forgot, so we
106        // are the only owner of it.
107        unsafe { DriverHandle::new_unchecked(handle) }
108    }
109
110    /// Initializes a [`Channel`] object from the given non-zero handle.
111    ///
112    /// # Safety
113    ///
114    /// The caller must ensure that the handle is not invalid and that it is
115    /// part of a driver runtime channel pair of type `T`.
116    unsafe fn from_handle_unchecked(handle: NonZero<fdf_handle_t>) -> Self {
117        // SAFETY: caller is responsible for ensuring that it is a valid channel
118        Self {
119            handle: ManuallyDrop::new(unsafe { DriverHandle::new_unchecked(handle) }),
120            wait_state: None,
121            _p: PhantomData,
122        }
123    }
124
125    /// Initializes a [`Channel`] object from the given [`DriverHandle`],
126    /// assuming that it is a channel of type `T`.
127    ///
128    /// # Safety
129    ///
130    /// The caller must ensure that the handle is a [`Channel`]-based handle that is
131    /// using type `T` as its wire format.
132    pub unsafe fn from_driver_handle(handle: DriverHandle) -> Self {
133        Self { handle: ManuallyDrop::new(handle), wait_state: None, _p: PhantomData }
134    }
135
136    /// Writes the [`Message`] given to the channel. This will complete asynchronously and can't
137    /// be cancelled.
138    ///
139    /// The channel will take ownership of the data and handles passed in,
140    pub fn write(&self, message: Message<T>) -> Result<(), Status> {
141        // get the sizes while the we still have refs to the data and handles
142        let data_len = message.data().map_or(0, |data| size_of_val(data) as u32);
143        let handles_count = message.handles().map_or(0, |handles| handles.len() as u32);
144
145        let (arena, data, handles) = message.into_raw();
146
147        // transform the `Option<NonNull<T>>` into just `*mut T`
148        let data_ptr = data.map_or(null_mut(), |data| data.cast().as_ptr());
149        let handles_ptr = handles.map_or(null_mut(), |handles| handles.cast().as_ptr());
150
151        // SAFETY:
152        // - Normally, we could be reading uninit bytes here. However, as long as fdf_channel_write
153        //   doesn't allow cross-LTO then it won't care whether the bytes are initialized.
154        // - The `Message` will generally only construct correctly if the data and handles pointers
155        //   inside it are from the arena it holds, but just in case `fdf_channel_write` will check
156        //   that we are using the correct arena so we do not need to re-verify that they are from
157        //   the same arena.
158        Status::ok(unsafe {
159            fdf_channel_write(
160                self.handle.get_raw().get(),
161                0,
162                arena.as_ptr(),
163                data_ptr,
164                data_len,
165                handles_ptr,
166                handles_count,
167            )
168        })?;
169
170        // SAFETY: this is the valid-by-contruction arena we were passed in through the [`Message`]
171        // object, and now that we have completed `fdf_channel_write` it is safe to drop our copy
172        // of it.
173        unsafe { fdf_arena_drop_ref(arena.as_ptr()) };
174        Ok(())
175    }
176
177    /// Shorthand for calling [`Self::write`] with the result of [`Message::new_with`]
178    pub fn write_with<F>(&self, arena: Arena, f: F) -> Result<(), Status>
179    where
180        F: for<'a> FnOnce(
181            &'a Arena,
182        )
183            -> (Option<ArenaBox<'a, T>>, Option<ArenaBox<'a, [Option<MixedHandle>]>>),
184    {
185        self.write(Message::new_with(arena, f))
186    }
187
188    /// Shorthand for calling [`Self::write`] with the result of [`Message::new_with`]
189    pub fn write_with_data<F>(&self, arena: Arena, f: F) -> Result<(), Status>
190    where
191        F: for<'a> FnOnce(&'a Arena) -> ArenaBox<'a, T>,
192    {
193        self.write(Message::new_with_data(arena, f))
194    }
195}
196
197/// Attempts to read from the channel, returning a [`Message`] object that can be used to
198/// access or take the data received if there was any. This is the basic building block
199/// on which the other `try_read_*` methods are built.
200pub(crate) fn try_read_raw(
201    channel: &DriverHandle,
202) -> Result<Option<Message<[MaybeUninit<u8>]>>, Status> {
203    let mut out_arena = null_mut();
204    let mut out_data = null_mut();
205    let mut out_num_bytes = 0;
206    let mut out_handles = null_mut();
207    let mut out_num_handles = 0;
208    Status::ok(unsafe {
209        fdf_channel_read(
210            channel.get_raw().get(),
211            0,
212            &mut out_arena,
213            &mut out_data,
214            &mut out_num_bytes,
215            &mut out_handles,
216            &mut out_num_handles,
217        )
218    })?;
219    // if no arena was returned, that means no data was returned.
220    if out_arena.is_null() {
221        return Ok(None);
222    }
223    // SAFETY: we just checked that the `out_arena` is non-null
224    let arena = Arena(unsafe { NonNull::new_unchecked(out_arena) });
225    let data_ptr = if !out_data.is_null() {
226        let ptr = core::ptr::slice_from_raw_parts_mut(out_data.cast(), out_num_bytes as usize);
227        // SAFETY: we just checked that the pointer was non-null, the slice version of it should
228        // be too.
229        Some(unsafe { ArenaBox::new(NonNull::new_unchecked(ptr)) })
230    } else {
231        None
232    };
233    let handles_ptr = if !out_handles.is_null() {
234        let ptr = core::ptr::slice_from_raw_parts_mut(out_handles.cast(), out_num_handles as usize);
235        // SAFETY: we just checked that the pointer was non-null, the slice version of it should
236        // be too.
237        Some(unsafe { ArenaBox::new(NonNull::new_unchecked(ptr)) })
238    } else {
239        None
240    };
241    Ok(Some(unsafe { Message::new_unchecked(arena, data_ptr, handles_ptr) }))
242}
243
244/// Reads a message from the channel asynchronously
245///
246/// # Panic
247///
248/// Panics if this is not run from a driver framework dispatcher.
249///
250/// # Safety
251///
252/// The caller is responsible for ensuring that the channel object's
253/// handle lifetime is longer than the returned future.
254pub(crate) unsafe fn read_raw<T: ?Sized, D>(
255    channel: &mut Channel<T>,
256    dispatcher: D,
257) -> ReadMessageRawFut<D> {
258    // SAFETY: The caller promises that the message state object can't outlive the handle.
259    let raw_fut = unsafe { ReadMessageState::register_read_wait(channel) };
260    ReadMessageRawFut { raw_fut, dispatcher }
261}
262
263impl<T> Channel<T> {
264    /// Attempts to read an object of type `T` and a handle set from the channel
265    pub fn try_read(&self) -> Result<Option<Message<T>>, Status> {
266        // read a message from the channel
267        let Some(message) = try_read_raw(&self.handle)? else {
268            return Ok(None);
269        };
270        // SAFETY: It is an invariant of Channel<T> that messages sent or received are always of
271        // type T.
272        Ok(Some(unsafe { message.cast_unchecked() }))
273    }
274
275    /// Reads an object of type `T` and a handle set from the channel asynchronously
276    pub async fn read<D: OnDispatcher>(
277        &mut self,
278        dispatcher: D,
279    ) -> Result<Option<Message<T>>, Status> {
280        // SAFETY: By calling `read_raw` in an async context that holds this channel's lifetime open
281        // beyond the resolution of the future, we ensure that the channel handle outlives the
282        // future state object.
283        let Some(message) = unsafe { read_raw(self, dispatcher) }.await? else {
284            return Ok(None);
285        };
286        // SAFETY: It is an invariant of Channel<T> that messages sent or received are always of
287        // type T.
288        Ok(Some(unsafe { message.cast_unchecked() }))
289    }
290}
291
292impl Channel<[u8]> {
293    /// Attempts to read an object of type `T` and a handle set from the channel
294    pub fn try_read_bytes(&self) -> Result<Option<Message<[u8]>>, Status> {
295        // read a message from the channel
296        let Some(message) = try_read_raw(&self.handle)? else {
297            return Ok(None);
298        };
299        // SAFETY: It is an invariant of Channel<[u8]> that messages sent or received are always of
300        // type [u8].
301        Ok(Some(unsafe { message.assume_init() }))
302    }
303
304    /// Reads a slice of type `T` and a handle set from the channel asynchronously
305    pub async fn read_bytes<D: OnDispatcher>(
306        &mut self,
307        dispatcher: D,
308    ) -> Result<Option<Message<[u8]>>, Status> {
309        // read a message from the channel
310        // SAFETY: By calling `read_raw` in an async context that holds this channel's lifetime open
311        // beyond the resolution of the future, we ensure that the channel handle outlives the
312        // future state object.
313        let Some(message) = unsafe { read_raw(self, dispatcher) }.await? else {
314            return Ok(None);
315        };
316        // SAFETY: It is an invariant of Channel<[u8]> that messages sent or received are always of
317        // type [u8].
318        Ok(Some(unsafe { message.assume_init() }))
319    }
320}
321
322impl<T> From<Channel<T>> for MixedHandle {
323    fn from(value: Channel<T>) -> Self {
324        MixedHandle::from(value.into_driver_handle())
325    }
326}
327
328impl<T: ?Sized> std::cmp::Ord for Channel<T> {
329    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
330        self.handle.cmp(&other.handle)
331    }
332}
333
334impl<T: ?Sized> std::cmp::PartialOrd for Channel<T> {
335    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
336        Some(self.cmp(other))
337    }
338}
339
340impl<T: ?Sized> std::cmp::PartialEq for Channel<T> {
341    fn eq(&self, other: &Self) -> bool {
342        self.handle.eq(&other.handle)
343    }
344}
345
346impl<T: ?Sized> std::cmp::Eq for Channel<T> {}
347
348impl<T: ?Sized> std::hash::Hash for Channel<T> {
349    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
350        self.handle.hash(state);
351    }
352}
353
354pub(crate) struct ReadMessageRawFut<D> {
355    pub(crate) raw_fut: ReadMessageState,
356    dispatcher: D,
357}
358
359impl<D: OnDispatcher> Future for ReadMessageRawFut<D> {
360    type Output = Result<Option<Message<[MaybeUninit<u8>]>>, Status>;
361
362    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
363        let dispatcher = self.dispatcher.clone();
364        self.as_mut().raw_fut.poll_with_dispatcher(cx, dispatcher)
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use std::io::{Write, stdout};
371    use std::pin::pin;
372    use std::sync::atomic::{AtomicU64, Ordering};
373    use std::sync::{Arc, mpsc};
374
375    use fdf_core::dispatcher::{
376        CurrentDispatcher, Dispatcher, DispatcherBuilder, DispatcherRef, OnDispatcher,
377    };
378    use fdf_core::handle::MixedHandleType;
379    use fdf_env::test::spawn_in_driver;
380    use futures::channel::oneshot;
381    use futures::poll;
382
383    use super::*;
384    use crate::test_utils::*;
385
386    #[test]
387    fn send_and_receive_bytes_synchronously() {
388        let (first, second) = Channel::create();
389        let arena = Arena::new();
390        assert_eq!(first.try_read_bytes().unwrap_err(), Status::from_raw(ZX_ERR_SHOULD_WAIT));
391        first.write_with_data(arena.clone(), |arena| arena.insert_slice(&[1, 2, 3, 4])).unwrap();
392        assert_eq!(second.try_read_bytes().unwrap().unwrap().data().unwrap(), &[1, 2, 3, 4]);
393        assert_eq!(second.try_read_bytes().unwrap_err(), Status::from_raw(ZX_ERR_SHOULD_WAIT));
394        second.write_with_data(arena.clone(), |arena| arena.insert_slice(&[5, 6, 7, 8])).unwrap();
395        assert_eq!(first.try_read_bytes().unwrap().unwrap().data().unwrap(), &[5, 6, 7, 8]);
396        assert_eq!(first.try_read_bytes().unwrap_err(), Status::from_raw(ZX_ERR_SHOULD_WAIT));
397        assert_eq!(second.try_read_bytes().unwrap_err(), Status::from_raw(ZX_ERR_SHOULD_WAIT));
398        drop(second);
399        assert_eq!(
400            first.write_with_data(arena.clone(), |arena| arena.insert_slice(&[9, 10, 11, 12])),
401            Err(Status::from_raw(ZX_ERR_PEER_CLOSED))
402        );
403    }
404
405    #[test]
406    fn send_and_receive_bytes_asynchronously() {
407        spawn_in_driver("channel async", async {
408            let arena = Arena::new();
409            let (mut first, second) = Channel::create();
410
411            assert!(poll!(pin!(first.read_bytes(CurrentDispatcher))).is_pending());
412            second.write_with_data(arena, |arena| arena.insert_slice(&[1, 2, 3, 4])).unwrap();
413            assert_eq!(
414                first.read_bytes(CurrentDispatcher).await.unwrap().unwrap().data().unwrap(),
415                &[1, 2, 3, 4]
416            );
417        });
418    }
419
420    #[test]
421    fn send_and_receive_objects_synchronously() {
422        let arena = Arena::new();
423        let (first, second) = Channel::create();
424        let (tx, rx) = mpsc::channel();
425        first
426            .write_with_data(arena.clone(), |arena| arena.insert(DropSender::new(1, tx.clone())))
427            .unwrap();
428        rx.try_recv().expect_err("should not drop the object when sent");
429        let message = second.try_read().unwrap().unwrap();
430        assert_eq!(message.data().unwrap().0, 1);
431        rx.try_recv().expect_err("should not drop the object when received");
432        drop(message);
433        rx.try_recv().expect("dropped when received");
434    }
435
436    #[test]
437    fn send_and_receive_handles_synchronously() {
438        println!("Create channels and write one end of one of the channel pairs to the other");
439        let (first, second) = Channel::<()>::create();
440        let (inner_first, inner_second) = Channel::<String>::create();
441        let message = Message::new_with(Arena::new(), |arena| {
442            (None, Some(arena.insert_boxed_slice(Box::new([Some(inner_first.into())]))))
443        });
444        first.write(message).unwrap();
445
446        println!("Receive the channel back on the other end of the first channel pair.");
447        let mut arena = None;
448        let message =
449            second.try_read().unwrap().expect("Expected a message with contents to be received");
450        let (_, received_handles) = message.into_arena_boxes(&mut arena);
451        let mut first_handle_received =
452            ArenaBox::take_boxed_slice(received_handles.expect("expected handles in the message"));
453        let first_handle_received = first_handle_received
454            .first_mut()
455            .expect("expected one handle in the handle set")
456            .take()
457            .expect("expected the first handle to be non-null");
458        let first_handle_received = first_handle_received.resolve();
459        let MixedHandleType::Driver(driver_handle) = first_handle_received else {
460            panic!("Got a non-driver handle when we sent a driver handle");
461        };
462        let inner_first_received = unsafe { Channel::from_driver_handle(driver_handle) };
463
464        println!("Send and receive a string across the now-transmitted channel pair.");
465        inner_first_received
466            .write_with_data(Arena::new(), |arena| arena.insert("boom".to_string()))
467            .unwrap();
468        assert_eq!(inner_second.try_read().unwrap().unwrap().data().unwrap(), &"boom".to_string());
469    }
470
471    async fn ping(mut chan: Channel<u8>) {
472        println!("starting ping!");
473        chan.write_with_data(Arena::new(), |arena| arena.insert(0)).unwrap();
474        while let Ok(Some(msg)) = chan.read(CurrentDispatcher).await {
475            let next = *msg.data().unwrap();
476            println!("ping! {next}");
477            chan.write_with_data(msg.take_arena(), |arena| arena.insert(next + 1)).unwrap();
478        }
479    }
480
481    async fn pong(mut chan: Channel<u8>) {
482        println!("starting pong!");
483        while let Some(msg) = chan.read(CurrentDispatcher).await.unwrap() {
484            let next = *msg.data().unwrap();
485            println!("pong! {next}");
486            if next > 10 {
487                println!("bye!");
488                break;
489            }
490            chan.write_with_data(msg.take_arena(), |arena| arena.insert(next + 1)).unwrap();
491        }
492    }
493
494    #[test]
495    fn async_ping_pong() {
496        spawn_in_driver("async ping pong", async {
497            let (ping_chan, pong_chan) = Channel::create();
498            CurrentDispatcher.spawn_task(ping(ping_chan)).unwrap();
499            pong(pong_chan).await;
500        });
501    }
502
503    #[test]
504    fn async_ping_pong_on_fuchsia_async() {
505        spawn_in_driver("async ping pong", async {
506            let (ping_chan, pong_chan) = Channel::create();
507
508            let fdf_dispatcher = DispatcherBuilder::new()
509                .name("fdf-async")
510                .create()
511                .expect("failure creating non-blocking dispatcher for fdf operations on rust-async dispatcher")
512                .release();
513
514            let rust_async_dispatcher = DispatcherBuilder::new()
515                .name("fuchsia-async")
516                .allow_thread_blocking()
517                .create()
518                .expect("failure creating blocking dispatcher for rust async")
519                .release();
520
521            rust_async_dispatcher
522                .post_task_sync(move |_| {
523                    Dispatcher::override_current(fdf_dispatcher, || {
524                        let mut executor = fuchsia_async::LocalExecutor::default();
525                        executor.run_singlethreaded(ping(ping_chan));
526                    });
527                })
528                .unwrap();
529
530            pong(pong_chan).await
531        });
532    }
533
534    async fn recv_lots_of_bytes_with_cancellations(
535        mut rx: Channel<[u8]>,
536        fin_tx: oneshot::Sender<()>,
537        pending_count: Arc<AtomicU64>,
538    ) {
539        let mut immediate_count = 0;
540        let mut count = 0;
541        loop {
542            // try to read as fast as we can, but any time we get a pending drop the future
543            // and then re-try with a proper await so we re-read and get it. This tests
544            // the reliability of the channel read's drop cancellation.
545            let mut next_fut = Box::pin(rx.read_bytes(CurrentDispatcher));
546            let next = match futures::poll!(&mut next_fut) {
547                Poll::Pending => {
548                    pending_count.fetch_add(1, Ordering::Relaxed);
549                    drop(next_fut);
550                    rx.read_bytes(CurrentDispatcher).await
551                }
552                Poll::Ready(r) => {
553                    immediate_count += 1;
554                    r
555                }
556            };
557            match next {
558                Err(Status::PEER_CLOSED) | Ok(None) => break,
559                Err(_) => {
560                    next.unwrap();
561                }
562                Ok(Some(msg)) => {
563                    assert_eq!(msg.data().unwrap(), &[count as u8; 100]);
564                    count += 1;
565                }
566            }
567        }
568        println!("read total: {count}, immediate: {immediate_count}, pending: {pending_count:?}");
569        // send the channel out as well so that the cancellation can finish
570        fin_tx.send(()).unwrap();
571    }
572
573    async fn send_lots_of_bytes(
574        tx: Channel<[u8]>,
575        fin_rx: oneshot::Receiver<()>,
576        pending_count: Arc<AtomicU64>,
577    ) {
578        // The potential failure modes here are not entirely deterministic, so we want to
579        // make sure that we get enough runs through the danger path (a pending read that is
580        // dropped) so that we exercise it thoroughly. To that end, we will do up to 10,000
581        // writes but stop early if we have 500 pending events.
582        let arena = Arena::new();
583        print!("writing: ");
584        for i in 0..10000 {
585            tx.write_with_data(arena.clone(), |arena| arena.insert_slice(&[i as u8; 100])).unwrap();
586            // the following print and flush is not just aesthetic. It helps slow down the
587            // writes a bit so that the reader dispatcher is more likely to have to wait for
588            // further data.
589            print!(".");
590            stdout().flush().unwrap();
591            if pending_count.load(Ordering::Relaxed) > 500 {
592                break;
593            }
594        }
595        drop(tx);
596        fin_rx.await.unwrap();
597    }
598
599    async fn send_and_recv_lots_of_bytes_with_cancellations(dispatcher: DispatcherRef<'static>) {
600        let (tx, rx) = Channel::create();
601        let (fin_tx, fin_rx) = oneshot::channel();
602        let pending_count = Arc::new(AtomicU64::new(0));
603        dispatcher
604            .spawn_task(recv_lots_of_bytes_with_cancellations(rx, fin_tx, pending_count.clone()))
605            .unwrap();
606
607        send_lots_of_bytes(tx, fin_rx, pending_count).await;
608    }
609
610    #[test]
611    fn send_and_recv_lots_of_bytes_with_cancellations_on_synchronized_dispatcher() {
612        spawn_in_driver(
613            "lots of bytes and with some cancellations on a synchronized dispatcher",
614            async {
615                let dispatcher =
616                    DispatcherBuilder::new().name("fdf-synchronized").create().unwrap().release();
617
618                send_and_recv_lots_of_bytes_with_cancellations(dispatcher).await;
619            },
620        );
621    }
622
623    #[test]
624    fn send_and_recv_lots_of_bytes_with_cancellations_on_unsynchronized_dispatcher() {
625        spawn_in_driver(
626            "lots of bytes and with some cancellations on an unsynchronized dispatcher",
627            async {
628                let dispatcher = DispatcherBuilder::new()
629                    .name("fdf-unsynchronized")
630                    .unsynchronized()
631                    .create()
632                    .unwrap()
633                    .release();
634
635                send_and_recv_lots_of_bytes_with_cancellations(dispatcher).await;
636            },
637        );
638    }
639
640    #[test]
641    fn send_and_recv_lots_of_bytes_with_cancellations_on_fuchsia_async_dispatcher() {
642        spawn_in_driver(
643            "lots of bytes and with some cancellations on a fuchsia-async overridden dispatcher",
644            async {
645                let fdf_dispatcher = DispatcherBuilder::new()
646                    .name("fdf-async")
647                    .create()
648                    .expect("failure creating non-blocking dispatcher for fdf operations on rust-async dispatcher")
649                    .release();
650
651                let dispatcher = DispatcherBuilder::new()
652                    .name("fdf-fuchsia-async")
653                    .allow_thread_blocking()
654                    .create()
655                    .expect("failure creating blocking dispatcher for rust async")
656                    .release();
657
658                let (tx, rx) = Channel::create();
659                let (fin_tx, fin_rx) = oneshot::channel();
660                let pending_count = Arc::new(AtomicU64::new(0));
661
662                let pending_count_clone = pending_count.clone();
663                dispatcher
664                    .post_task_sync(move |_| {
665                        Dispatcher::override_current(fdf_dispatcher, || {
666                            let mut executor = fuchsia_async::LocalExecutor::default();
667                            executor.run_singlethreaded(recv_lots_of_bytes_with_cancellations(
668                                rx,
669                                fin_tx,
670                                pending_count_clone,
671                            ));
672                        });
673                    })
674                    .unwrap();
675
676                send_lots_of_bytes(tx, fin_rx, pending_count).await;
677            },
678        );
679    }
680}