1use core::pin::Pin;
8use core::task::{Context, Poll};
9use futures::channel::mpsc;
10use futures::{ready, Future};
11
12pub trait TrySend<Item> {
14 fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item>;
25}
26
27impl<Item> TrySend<Item> for mpsc::Sender<Item> {
28 fn try_send_fut(&mut self, item: Item) -> TrySendFut<'_, Item> {
29 TrySendFut::new(item, self)
30 }
31}
32
33#[must_use]
36pub struct TrySendFut<'a, Item> {
37 item: Option<Item>,
39 channel: &'a mut mpsc::Sender<Item>,
40}
41
42impl<'a, Item> TrySendFut<'a, Item> {
45 fn new(item: Item, channel: &'a mut mpsc::Sender<Item>) -> Self {
47 Self { item: Some(item), channel }
48 }
49}
50
51impl<'a, Item> Unpin for TrySendFut<'a, Item> {}
52
53impl<'a, Item> Future for TrySendFut<'a, Item> {
54 type Output = Result<(), Item>;
55 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
56 loop {
57 let ready = ready!(self.channel.poll_ready(cx));
58 let item = self.item.take().expect("Cannot poll without `Some` item");
61 match ready {
62 Err(e) => {
63 assert!(e.is_disconnected(), "{}", e);
65 return Poll::Ready(Err(item));
66 }
67 Ok(()) => {}
68 }
69 match self.channel.try_send(item) {
70 Ok(()) => return Poll::Ready(Ok(())),
71 Err(e) => {
72 if e.is_disconnected() {
73 return Poll::Ready(Err(e.into_inner()));
74 } else {
75 assert!(e.is_full(), "{}", e);
78 self.item = Some(e.into_inner());
79 continue;
80 }
81 }
82 }
83 }
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90 use fuchsia_async as fasync;
91 use futures::future::join;
92 use futures::StreamExt;
93
94 #[fasync::run_until_stalled(test)]
95 async fn item_future_completes_on_receive() {
96 let (mut sender, mut receiver) = mpsc::channel(0);
99 let (send_result, receive_result) =
100 join(sender.try_send_fut(vec![1]), receiver.next()).await;
101 assert_eq!(send_result, Ok(()));
102 assert_eq!(receive_result, Some(vec![1]));
103 }
104
105 #[fasync::run_until_stalled(test)]
106 async fn item_future_errors_on_receiver_closed() {
107 let (mut sender, receiver) = mpsc::channel(0);
108 drop(receiver);
110 let send_result = sender.try_send_fut(vec![1]).await;
111 assert_eq!(send_result, Err(vec![1]));
112 }
113
114 #[test]
115 fn item_future_pending_on_buffer_full() {
116 let mut exec = fasync::TestExecutor::new();
117 let (mut sender, mut receiver) = mpsc::channel(0);
118
119 let send_result = exec.run_singlethreaded(sender.try_send_fut(vec![1]));
120 assert_eq!(send_result, Ok(()));
121
122 let mut send_fut = sender.try_send_fut(vec![2]);
124 let send_poll_result = exec.run_until_stalled(&mut send_fut);
125 assert_eq!(send_poll_result, Poll::Pending);
126
127 let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
129 assert_eq!(receive_poll_result, Poll::Ready(Some(vec![1])));
130
131 let send_poll_result = exec.run_until_stalled(&mut send_fut);
133 assert_eq!(send_poll_result, Poll::Ready(Ok(())));
134
135 let receive_poll_result = exec.run_until_stalled(&mut receiver.next());
137 assert_eq!(receive_poll_result, Poll::Ready(Some(vec![2])));
138 }
139}