fidl_next_protocol/
mpsc.rs1use core::fmt;
8use core::mem::{ManuallyDrop, take};
9use core::pin::Pin;
10use core::task::{Context, Poll};
11
12use fidl_next_codec::Chunk;
13use fuchsia_loom::future::AtomicWaker;
14use fuchsia_loom::sync::atomic::{AtomicBool, Ordering};
15use fuchsia_loom::sync::{Arc, mpsc};
16
17use crate::{NonBlockingTransport, Transport};
18
19pub struct Mpsc {
21 shared: Shared,
22 exclusive: Exclusive,
23}
24
25impl Mpsc {
26 pub fn new() -> (Self, Self) {
28 let state = Arc::new(State {
29 send_wakers: [AtomicWaker::new(), AtomicWaker::new()],
30 is_closed: AtomicBool::new(false),
31 });
32 let (a_send, a_recv) = mpsc::channel();
33 let (b_send, b_recv) = mpsc::channel();
34 (
35 Mpsc {
36 shared: Shared { state: state.clone(), end: 0, sender: ManuallyDrop::new(a_send) },
37 exclusive: Exclusive { receiver: b_recv },
38 },
39 Mpsc {
40 shared: Shared { state, end: 1, sender: ManuallyDrop::new(b_send) },
41 exclusive: Exclusive { receiver: a_recv },
42 },
43 )
44 }
45}
46
47#[derive(Clone, Debug)]
49pub enum Error {}
50
51impl fmt::Display for Error {
52 fn fmt(&self, _: &mut fmt::Formatter<'_>) -> fmt::Result {
53 match *self {}
54 }
55}
56
57impl core::error::Error for Error {}
58
59struct State {
60 send_wakers: [AtomicWaker; 2],
61 is_closed: AtomicBool,
62}
63
64pub struct Shared {
66 state: Arc<State>,
67 end: usize,
68 sender: ManuallyDrop<mpsc::Sender<Vec<Chunk>>>,
69}
70
71impl Drop for Shared {
72 fn drop(&mut self) {
73 unsafe {
75 ManuallyDrop::drop(&mut self.sender);
76 }
77 self.state.is_closed.store(true, Ordering::Relaxed);
78 self.state.send_wakers[self.end].wake();
79 }
80}
81
82pub struct SendFutureState {
84 buffer: Vec<Chunk>,
85}
86
87pub struct Exclusive {
89 receiver: mpsc::Receiver<Vec<Chunk>>,
90}
91
92impl Transport for Mpsc {
93 type Error = Error;
94
95 fn split(self) -> (Self::Shared, Self::Exclusive) {
96 (self.shared, self.exclusive)
97 }
98
99 type Shared = Shared;
100 type SendBuffer = Vec<Chunk>;
101 type SendFutureState = SendFutureState;
102
103 fn acquire(_: &Self::Shared) -> Self::SendBuffer {
104 Vec::new()
105 }
106
107 fn begin_send(_: &Self::Shared, buffer: Self::SendBuffer) -> Self::SendFutureState {
108 SendFutureState { buffer }
109 }
110
111 fn poll_send(
112 future_state: Pin<&mut SendFutureState>,
113 _: &mut Context<'_>,
114 shared: &Self::Shared,
115 ) -> Poll<Result<(), Option<Error>>> {
116 Poll::Ready(Self::send_immediately(future_state.get_mut(), shared))
117 }
118
119 type Exclusive = Exclusive;
120 type RecvFutureState = ();
121 type RecvBuffer = Vec<Chunk>;
122
123 fn begin_recv(_: &Self::Shared, _: &mut Self::Exclusive) -> Self::RecvFutureState {}
124
125 fn poll_recv(
126 _: Pin<&mut Self::RecvFutureState>,
127 cx: &mut Context<'_>,
128 shared: &Self::Shared,
129 exclusive: &mut Self::Exclusive,
130 ) -> Poll<Result<Self::RecvBuffer, Option<Self::Error>>> {
131 shared.state.send_wakers[1 - shared.end].register_by_ref(cx.waker());
132 if shared.state.is_closed.load(Ordering::Relaxed) {
133 return Poll::Ready(Err(None));
134 }
135
136 match exclusive.receiver.try_recv() {
137 Ok(chunks) => Poll::Ready(Ok(chunks)),
138 Err(mpsc::TryRecvError::Empty) => Poll::Pending,
139 Err(mpsc::TryRecvError::Disconnected) => Poll::Ready(Err(None)),
140 }
141 }
142}
143
144impl NonBlockingTransport for Mpsc {
145 fn send_immediately(
146 future_state: &mut Self::SendFutureState,
147 shared: &Self::Shared,
148 ) -> Result<(), Option<Self::Error>> {
149 let chunks = take(&mut future_state.buffer);
150 match shared.sender.send(chunks) {
151 Ok(()) => {
152 shared.state.send_wakers[shared.end].wake();
153 Ok(())
154 }
155 Err(_) => Err(None),
156 }
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use fuchsia_async as fasync;
163
164 use super::Mpsc;
165 use crate::testing::*;
166
167 #[fasync::run_singlethreaded(test)]
168 async fn close_on_drop() {
169 test_close_on_drop(Mpsc::new).await;
170 }
171
172 #[fasync::run_singlethreaded(test)]
173 async fn one_way() {
174 test_one_way(Mpsc::new).await;
175 }
176
177 #[fasync::run_singlethreaded(test)]
178 async fn one_way_nonblocking() {
179 test_one_way_nonblocking(Mpsc::new).await;
180 }
181
182 #[fasync::run_singlethreaded(test)]
183 async fn two_way() {
184 test_two_way(Mpsc::new).await;
185 }
186
187 #[fasync::run_singlethreaded(test)]
188 async fn multiple_two_way() {
189 test_multiple_two_way(Mpsc::new).await;
190 }
191
192 #[fasync::run_singlethreaded(test)]
193 async fn event() {
194 test_event(Mpsc::new).await;
195 }
196}