async_helpers/
responding_channel.rs1use anyhow::Error;
23use futures::channel::{mpsc, oneshot};
24use futures::stream::{FusedStream, Stream};
25use futures::SinkExt;
26use std::pin::Pin;
27use std::task::{Context, Poll};
28
29pub struct Sender<Req, Resp> {
31 inner: mpsc::Sender<(Req, Responder<Resp>)>,
32}
33
34impl<Req, Resp> Clone for Sender<Req, Resp> {
35 fn clone(&self) -> Self {
36 Self { inner: self.inner.clone() }
37 }
38}
39
40impl<Req, Resp> Sender<Req, Resp> {
41 pub async fn request(&mut self, value: Req) -> Result<Resp, Error> {
46 let (responder, response) = oneshot::channel();
47 self.inner.send((value, Responder { inner: responder })).await?;
48 Ok(response.await?)
49 }
50}
51
52pub struct Responder<Resp> {
54 inner: oneshot::Sender<Resp>,
55}
56
57impl<Resp> Responder<Resp> {
58 pub fn respond(self, value: Resp) -> Result<(), Resp> {
62 self.inner.send(value)
63 }
64}
65
66pub struct Receiver<Req, Resp> {
71 inner: mpsc::Receiver<(Req, Responder<Resp>)>,
72}
73
74impl<Req, Resp> Receiver<Req, Resp> {
75 pub fn close(&mut self) {
80 self.inner.close();
81 }
82
83 pub fn try_receive(&mut self) -> Result<Option<(Req, Responder<Resp>)>, Error> {
88 Ok(self.inner.try_next()?)
89 }
90}
91
92impl<Req, Resp> Stream for Receiver<Req, Resp> {
93 type Item = (Req, Responder<Resp>);
94
95 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96 Pin::new(&mut self.inner).poll_next(cx)
97 }
98}
99
100impl<Req, Resp> FusedStream for Receiver<Req, Resp> {
101 fn is_terminated(&self) -> bool {
102 self.inner.is_terminated()
103 }
104}
105
106pub fn channel<Req, Resp>(buffer: usize) -> (Sender<Req, Resp>, Receiver<Req, Resp>) {
111 let (inner_sender, inner_receiver) = mpsc::channel(buffer);
112 (Sender { inner: inner_sender }, Receiver { inner: inner_receiver })
113}
114
115#[cfg(test)]
116mod tests {
117 use super::*;
118 use fuchsia_async as fasync;
119 use futures::StreamExt;
120 use std::pin::pin;
121
122 macro_rules! unwrap_ready {
123 ($poll:expr) => {
124 match $poll {
125 Poll::Ready(value) => value,
126 Poll::Pending => panic!("not ready"),
127 }
128 };
129 }
130
131 #[test]
132 fn sender_receives_response() {
133 let mut ex = fasync::TestExecutor::new();
134 let (mut sender, mut receiver) = channel(0);
135
136 let received = receiver.next();
137 let mut received = pin!(received);
138 assert!(ex.run_until_stalled(&mut received).is_pending());
139
140 let request = sender.request(());
141 let mut request = pin!(request);
142 assert!(ex.run_until_stalled(&mut request).is_pending());
143
144 let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
145
146 assert!(ex.run_until_stalled(&mut request).is_pending());
147
148 responder.respond(()).unwrap();
149
150 unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
151 }
152
153 #[test]
154 fn cloned_senders_go_to_same_receiver() {
155 let mut ex = fasync::TestExecutor::new();
156 let (mut sender, mut receiver) = channel(0);
157 let mut sender2 = sender.clone();
158
159 let received = receiver.next();
160 let mut received = pin!(received);
161 assert!(ex.run_until_stalled(&mut received).is_pending());
162
163 let request = sender.request(());
164 let mut request = pin!(request);
165 assert!(ex.run_until_stalled(&mut request).is_pending());
166
167 let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
168
169 assert!(ex.run_until_stalled(&mut request).is_pending());
170
171 responder.respond(()).unwrap();
172
173 unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
174
175 let received = receiver.next();
176 let mut received = pin!(received);
177 assert!(ex.run_until_stalled(&mut received).is_pending());
178
179 let request = sender2.request(());
180 let mut request = pin!(request);
181 assert!(ex.run_until_stalled(&mut request).is_pending());
182
183 let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
184
185 assert!(ex.run_until_stalled(&mut request).is_pending());
186
187 responder.respond(()).unwrap();
188
189 unwrap_ready!(ex.run_until_stalled(&mut request)).unwrap();
190 }
191
192 #[test]
193 fn sender_receives_error_on_dropped_receiver() {
194 let mut ex = fasync::TestExecutor::new();
195 let (mut sender, receiver) = channel::<(), ()>(0);
196
197 let request = sender.request(());
198 let mut request = pin!(request);
199 assert!(ex.run_until_stalled(&mut request).is_pending());
200
201 drop(receiver);
202
203 assert!(unwrap_ready!(ex.run_until_stalled(&mut request)).is_err());
204 }
205
206 #[test]
207 fn sender_receives_error_on_dropped_responder() {
208 let mut ex = fasync::TestExecutor::new();
209 let (mut sender, mut receiver) = channel::<(), ()>(0);
210
211 let request = sender.request(());
212 let mut request = pin!(request);
213 assert!(ex.run_until_stalled(&mut request).is_pending());
214
215 let received = receiver.next();
216 let mut received = pin!(received);
217 let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
218
219 assert!(ex.run_until_stalled(&mut request).is_pending());
220 drop(responder);
221
222 assert!(unwrap_ready!(ex.run_until_stalled(&mut request)).is_err());
223 }
224
225 #[test]
226 fn receiver_receives_error_on_dropped_sender() {
227 let mut ex = fasync::TestExecutor::new();
228 let (sender, mut receiver) = channel::<(), ()>(0);
229
230 let received = receiver.next();
231 let mut received = pin!(received);
232 assert!(ex.run_until_stalled(&mut received).is_pending());
233
234 drop(sender);
235
236 assert!(unwrap_ready!(ex.run_until_stalled(&mut received)).is_none());
237 }
238
239 #[test]
240 fn responder_returns_error_on_dropped_sender() {
241 let mut ex = fasync::TestExecutor::new();
242 let (mut sender, mut receiver) = channel(0);
243
244 {
245 let request = sender.request(());
246 let mut request = pin!(request);
247 assert!(ex.run_until_stalled(&mut request).is_pending());
248 } let received = receiver.next();
251 let mut received = pin!(received);
252 let ((), responder) = unwrap_ready!(ex.run_until_stalled(&mut received)).unwrap();
253
254 drop(sender);
255
256 assert!(responder.respond(()).is_err());
257 }
258
259 #[fasync::run_until_stalled(test)]
260 async fn cannot_request_after_receiver_closed() {
261 let (mut sender, mut receiver) = channel::<(), ()>(0);
262 receiver.close();
263 assert!(sender.request(()).await.is_err());
264 }
265
266 #[test]
267 fn try_receive_returns_none_when_channel_is_empty() {
268 let (_, mut receiver) = channel::<(), ()>(0);
269 assert!(receiver.try_receive().unwrap().is_none());
270 }
271
272 #[test]
273 fn try_receive_returns_none_after_none_result() {
274 let (_, mut receiver) = channel::<(), ()>(0);
275 assert!(receiver.try_receive().unwrap().is_none());
276 assert!(receiver.try_receive().unwrap().is_none());
277 }
278
279 #[test]
280 fn try_receive_returns_value_when_channel_has_value() {
281 let mut ex = fasync::TestExecutor::new();
282 let (mut sender, mut receiver) = channel::<(), ()>(0);
283
284 let request = sender.request(());
285 let mut request = pin!(request);
286 assert!(ex.run_until_stalled(&mut request).is_pending());
287
288 assert!(receiver.try_receive().unwrap().is_some());
289 }
290}