Skip to main content

fidl_next_protocol/
mpsc.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A basic [`Transport`] implementation based on MPSC channels.
6
7use core::fmt;
8use core::mem::{ManuallyDrop, take};
9use core::pin::Pin;
10use core::task::{Context, Poll};
11
12use fidl_next_codec::Chunk;
13use fuchsia_loom::future::AtomicWaker;
14use fuchsia_loom::sync::atomic::{AtomicBool, Ordering};
15use fuchsia_loom::sync::{Arc, mpsc};
16
17use crate::{NonBlockingTransport, Transport};
18
19/// A paired mpsc transport.
20pub struct Mpsc {
21    shared: Shared,
22    exclusive: Exclusive,
23}
24
25impl Mpsc {
26    /// Creates two mpscs which can communicate with each other.
27    pub fn new() -> (Self, Self) {
28        let state = Arc::new(State {
29            send_wakers: [AtomicWaker::new(), AtomicWaker::new()],
30            is_closed: AtomicBool::new(false),
31        });
32        let (a_send, a_recv) = mpsc::channel();
33        let (b_send, b_recv) = mpsc::channel();
34        (
35            Mpsc {
36                shared: Shared { state: state.clone(), end: 0, sender: ManuallyDrop::new(a_send) },
37                exclusive: Exclusive { receiver: b_recv },
38            },
39            Mpsc {
40                shared: Shared { state, end: 1, sender: ManuallyDrop::new(b_send) },
41                exclusive: Exclusive { receiver: a_recv },
42            },
43        )
44    }
45}
46
47/// The error type for paired mpsc transports.
48#[derive(Clone, Debug)]
49pub enum Error {}
50
51impl fmt::Display for Error {
52    fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
53        match *self {}
54    }
55}
56
57impl core::error::Error for Error {}
58
59struct State {
60    send_wakers: [AtomicWaker; 2],
61    is_closed: AtomicBool,
62}
63
64/// The shared part of a paired mpsc transport.
65pub struct Shared {
66    state: Arc<State>,
67    end: usize,
68    sender: ManuallyDrop<mpsc::Sender<Vec<Chunk>>>,
69}
70
71impl Drop for Shared {
72    fn drop(&mut self) {
73        // Make sure that the mpsc is closed before waking the other end
74        unsafe {
75            ManuallyDrop::drop(&mut self.sender);
76        }
77        self.state.is_closed.store(true, Ordering::Relaxed);
78        self.state.send_wakers[self.end].wake();
79    }
80}
81
82/// The send future for a paired mpsc transport.
83pub struct SendFutureState {
84    buffer: Vec<Chunk>,
85}
86
87/// The exclusive part of a paired mpsc transport.
88pub struct Exclusive {
89    receiver: mpsc::Receiver<Vec<Chunk>>,
90}
91
92impl Transport for Mpsc {
93    type Error = Error;
94
95    fn split(self) -> (Self::Shared, Self::Exclusive) {
96        (self.shared, self.exclusive)
97    }
98
99    type Shared = Shared;
100    type SendBuffer = Vec<Chunk>;
101    type SendFutureState = SendFutureState;
102
103    fn acquire(_: &Self::Shared) -> Self::SendBuffer {
104        Vec::new()
105    }
106
107    fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
108        SendFutureState { buffer }
109    }
110
111    fn poll_send(
112        future_state: Pin<&mut SendFutureState>,
113        _: &mut Context<'_>,
114        shared: &Self::Shared,
115    ) -> Poll<Result<(), Option<Error>>> {
116        Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
117    }
118
119    type Exclusive = Exclusive;
120    type RecvFutureState = ();
121    type RecvBuffer = Vec<Chunk>;
122
123    fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {}
124
125    fn poll_recv(
126        _: Pin<&mut Self::RecvFutureState>,
127        cx: &mut Context<'_>,
128        shared: &Self::Shared,
129        exclusive: &mut Self::Exclusive,
130    ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
131        shared.state.send_wakers[1 - shared.end].register_by_ref(cx.waker());
132        if shared.state.is_closed.load(Ordering::Relaxed) {
133            return Poll::Ready(Err(None));
134        }
135
136        match exclusive.receiver.try_recv() {
137            Ok(chunks) => Poll::Ready(Ok(chunks)),
138            Err(mpsc::TryRecvError::Empty) => Poll::Pending,
139            Err(mpsc::TryRecvError::Disconnected) => Poll::Ready(Err(None)),
140        }
141    }
142}
143
144impl NonBlockingTransport for Mpsc {
145    fn send_immediately(
146        future_state: &mut Self::SendFutureState,
147        shared: &Self::Shared,
148    ) -> Result<(), Option<Self::Error>> {
149        let chunks = take(&mut future_state.buffer);
150        match shared.sender.send(chunks) {
151            Ok(()) => {
152                shared.state.send_wakers[shared.end].wake();
153                Ok(())
154            }
155            Err(_) => Err(None),
156        }
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use fuchsia_async as fasync;
163
164    use super::Mpsc;
165    use crate::testing::*;
166
167    #[fasync::run_singlethreaded(test)]
168    async fn close_on_drop() {
169        test_close_on_drop(Mpsc::new).await;
170    }
171
172    #[fasync::run_singlethreaded(test)]
173    async fn one_way() {
174        test_one_way(Mpsc::new).await;
175    }
176
177    #[fasync::run_singlethreaded(test)]
178    async fn one_way_nonblocking() {
179        test_one_way_nonblocking(Mpsc::new).await;
180    }
181
182    #[fasync::run_singlethreaded(test)]
183    async fn two_way() {
184        test_two_way(Mpsc::new).await;
185    }
186
187    #[fasync::run_singlethreaded(test)]
188    async fn multiple_two_way() {
189        test_multiple_two_way(Mpsc::new).await;
190    }
191
192    #[fasync::run_singlethreaded(test)]
193    async fn event() {
194        test_event(Mpsc::new).await;
195    }
196}