1use super::{Closed, TryMerge};
6use futures::channel::oneshot;
7use futures::future::{Either, Ready, Shared};
8use futures::prelude::*;
9use pin_project::pin_project;
10use std::collections::VecDeque;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[pin_project]
15pub(crate) struct TaskFuture<O> {
16 #[pin]
17 fut: Either<oneshot::Receiver<O>, Ready<Result<O, oneshot::Canceled>>>,
18}
19
20impl<O> Future for TaskFuture<O> {
21 type Output = Result<O, Closed>;
22
23 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
24 match self.project().fut.poll(cx) {
25 Poll::Pending => Poll::Pending,
26 Poll::Ready(res) => Poll::Ready(res.map_err(|oneshot::Canceled| Closed)),
27 }
28 }
29}
30
31pub struct PendingWorkInfo<C, O> {
34 context: C,
35 running: RunningWorkInfo<O>,
36}
37
38pub struct RunningWorkInfo<O> {
41 cb: oneshot::Sender<O>,
42 fut: Shared<TaskFuture<O>>,
43}
44
45pub struct TaskVariants<C, O> {
47 running: Option<RunningWorkInfo<O>>,
48 pending: VecDeque<PendingWorkInfo<C, O>>,
49}
50
51impl<C, O> TaskVariants<C, O>
52where
53 C: TryMerge,
54 O: Clone,
55{
56 pub(crate) fn new(context: C) -> (Self, Shared<TaskFuture<O>>) {
59 let mut res = Self { running: None, pending: VecDeque::new() };
60 let fut = res.push_back(context);
61 (res, fut)
62 }
63
64 pub(crate) fn push(&mut self, mut context: C) -> Shared<TaskFuture<O>> {
68 for info in self.pending.iter_mut() {
70 if let Err(unmerged) = info.context.try_merge(context) {
71 context = unmerged;
72 } else {
73 return info.running.fut.clone();
74 }
75 }
76
77 self.push_back(context)
79 }
80
81 fn push_back(&mut self, context: C) -> Shared<TaskFuture<O>> {
82 let (sender, fut) = make_broadcast_pair();
83
84 self.pending.push_back(PendingWorkInfo {
85 context,
86 running: RunningWorkInfo { cb: sender, fut: fut.clone() },
87 });
88 fut
89 }
90}
91
92impl<C, O> TaskVariants<C, O> {
93 pub(crate) fn running(&self) -> bool {
95 self.running.is_some()
96 }
97
98 pub(crate) fn pending(&self) -> usize {
100 self.pending.len()
101 }
102}
103
104impl<C, O> TaskVariants<C, O> {
105 pub(crate) fn start(&mut self) -> C {
111 self.try_start().expect("context to not yet be claimed")
112 }
113
114 pub(crate) fn done(&mut self, res: O) -> Option<C> {
121 let cb = self.running.take().expect("running item to mark done").cb;
122
123 let _ = cb.send(res);
126
127 self.try_start()
129 }
130
131 fn try_start(&mut self) -> Option<C> {
132 assert!(self.running.is_none());
133 if let Some(PendingWorkInfo { context, running }) = self.pending.pop_front() {
134 self.running = Some(running);
135 Some(context)
136 } else {
137 None
138 }
139 }
140}
141
142pub(crate) fn make_broadcast_pair<O>() -> (oneshot::Sender<O>, Shared<TaskFuture<O>>)
145where
146 O: Clone,
147{
148 let (sender, receiver) = oneshot::channel();
149 let fut = TaskFuture { fut: Either::Left(receiver) }.shared();
150
151 (sender, fut)
152}
153
154pub(crate) fn make_canceled_receiver<O>() -> Shared<TaskFuture<O>>
157where
158 O: Clone,
159{
160 TaskFuture { fut: Either::Right(futures::future::err(oneshot::Canceled)) }.shared()
161}
162
163#[cfg(test)]
164mod tests {
165 use super::super::tests::MergeEqual;
166 use super::*;
167 use futures::executor::block_on;
168
169 #[test]
170 fn merges() {
171 let (mut infos, fut0_a) = TaskVariants::<MergeEqual, i32>::new(MergeEqual(0));
172
173 let fut0_b = infos.push(MergeEqual(0));
174 let fut1 = infos.push(MergeEqual(1));
175 let fut0_c = infos.push(MergeEqual(0));
176 let fut2 = infos.push(MergeEqual(2));
177
178 assert_eq!(infos.start(), MergeEqual(0));
180 let fut0_d = infos.push(MergeEqual(0));
181
182 assert_eq!(infos.done(0), Some(MergeEqual(1)));
184 assert_eq!(block_on(fut0_a), Ok(0));
185 assert_eq!(block_on(fut0_b), Ok(0));
186 assert_eq!(block_on(fut0_c), Ok(0));
187
188 assert_eq!(infos.done(1), Some(MergeEqual(2)));
190 assert_eq!(block_on(fut1), Ok(1));
191
192 assert_eq!(infos.done(2), Some(MergeEqual(0)));
194 assert_eq!(block_on(fut2), Ok(2));
195
196 assert_eq!(infos.done(3), None);
198 assert_eq!(block_on(fut0_d), Ok(3));
199 }
200
201 #[test]
202 fn task_variants() {
203 let (mut infos, fut1) = TaskVariants::<(), ()>::new(());
204 let fut2 = infos.push(());
205
206 let () = infos.start();
207 assert_eq!(infos.done(()), None);
208
209 block_on(async move {
210 assert_eq!(fut1.await, Ok(()));
211 assert_eq!(fut2.await, Ok(()));
212 });
213 }
214}