1use core::future::Future;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use futures::stream::{FusedStream, FuturesUnordered, Stream};
10use pin_project::pin_project;
11
12#[pin_project]
17pub struct OneOrMany<F>(#[pin] Impl<F>);
18
19#[pin_project(project=OneInnerProj)]
22enum OneInner<F> {
23 Present(#[pin] F),
24 Absent,
25 AbsentNoneYielded,
26}
27
28impl<F> OneInner<F> {
29 fn take(&mut self) -> Option<F> {
30 let v = std::mem::replace(self, Self::Absent);
31 match v {
32 Self::Present(f) => Some(f),
33 Self::Absent => None,
34 Self::AbsentNoneYielded => {
35 *self = Self::AbsentNoneYielded;
37 None
38 }
39 }
40 }
41}
42
43#[pin_project(project=OneOrManyProj)]
44enum Impl<F> {
45 One(#[pin] OneInner<F>),
46 Many(#[pin] FuturesUnordered<F>),
47}
48
49impl<F> Default for OneOrMany<F> {
50 fn default() -> Self {
51 Self(Impl::One(OneInner::Absent))
52 }
53}
54
55impl<F> OneOrMany<F> {
56 pub fn new(f: F) -> Self {
63 Self(Impl::One(OneInner::Present(f)))
64 }
65
66 pub fn push(&mut self, f: F) {
73 let Self(this) = self;
74 match this {
75 Impl::One(o) => match o.take() {
76 None => *o = OneInner::Present(f),
77 Some(first) => *this = Impl::Many([first, f].into_iter().collect()),
78 },
79 Impl::Many(unordered) => {
80 if unordered.is_empty() {
81 *this = Impl::One(OneInner::Present(f))
89 } else {
90 unordered.push(f);
91 }
92 }
93 }
94 }
95
96 pub fn is_empty(&self) -> bool {
98 let Self(this) = self;
99 match this {
100 Impl::One(OneInner::Absent | OneInner::AbsentNoneYielded) => true,
101 Impl::One(OneInner::Present(_)) => false,
102 Impl::Many(many) => many.is_empty(),
103 }
104 }
105}
106
107impl<F: Future> Stream for OneOrMany<F> {
108 type Item = F::Output;
109
110 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111 let this = self.project();
112 match this.0.project() {
113 OneOrManyProj::One(mut p) => match p.as_mut().project() {
114 OneInnerProj::Absent | OneInnerProj::AbsentNoneYielded => {
115 p.set(OneInner::AbsentNoneYielded);
116 Poll::Ready(None)
117 }
118 OneInnerProj::Present(f) => match f.poll(cx) {
119 Poll::Ready(t) => {
120 let output = Poll::Ready(Some(t));
121 p.set(OneInner::Absent);
122 output
123 }
124 Poll::Pending => Poll::Pending,
125 },
126 },
127 OneOrManyProj::Many(unordered) => {
128 unordered.poll_next(cx)
139 }
140 }
141 }
142}
143
144impl<F: Future> FusedStream for OneOrMany<F> {
145 fn is_terminated(&self) -> bool {
146 let Self(this) = self;
147 match this {
148 Impl::One(OneInner::Present(_) | OneInner::Absent) => false,
149 Impl::One(OneInner::AbsentNoneYielded) => true,
150 Impl::Many(unordered) => unordered.is_terminated(),
151 }
152 }
153}
154
155impl<F> FromIterator<F> for OneOrMany<F> {
156 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
157 let mut iter = iter.into_iter();
158
159 Self(match iter.next() {
160 None => Impl::One(OneInner::Absent),
161 Some(first) => match iter.next() {
162 None => Impl::One(OneInner::Present(first)),
163 Some(second) => Impl::Many([first, second].into_iter().chain(iter).collect()),
164 },
165 })
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use std::collections::HashSet;
172
173 use crate::event::Event;
174 use assert_matches::assert_matches;
175 use futures::future::Ready;
176 use futures::{pin_mut, StreamExt as _};
177 use futures_test::task::{new_count_waker, noop_waker};
178
179 use super::*;
180
181 #[test]
182 fn one_or_many_one() {
183 let (waker, count) = new_count_waker();
184 let mut context = Context::from_waker(&waker);
185
186 let event = Event::new();
187 let one_or_many = OneOrMany::new(event.wait());
188 pin_mut!(one_or_many);
189
190 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
191 assert_eq!(event.signal(), true);
192 assert_eq!(count, 1);
193
194 assert_eq!(one_or_many.poll_next(&mut context), Poll::Ready(Some(())));
195 assert_eq!(count, 1);
196 }
197
198 #[test]
199 fn one_or_many_one_poll_exhausted() {
200 let (waker, count) = new_count_waker();
201 let mut context = Context::from_waker(&waker);
202
203 let one_or_many = OneOrMany::new(futures::future::ready(()));
204 pin_mut!(one_or_many);
205 assert_eq!(one_or_many.is_terminated(), false);
206 assert_eq!(one_or_many.is_empty(), false);
207
208 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
209 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
210 assert_eq!(count, 0);
211 assert_eq!(one_or_many.is_terminated(), true);
212 assert_eq!(one_or_many.is_empty(), true);
213 }
214
215 #[test]
216 fn one_or_many_push_one() {
217 let (waker, count) = new_count_waker();
218 let mut context = Context::from_waker(&waker);
219
220 let mut one_or_many = OneOrMany::new(futures::future::ready(()));
221 one_or_many.push(futures::future::ready(()));
222 pin_mut!(one_or_many);
223 assert_eq!(one_or_many.is_terminated(), false);
224 assert_eq!(one_or_many.is_empty(), false);
225
226 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
227 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
228 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
229 assert_eq!(one_or_many.is_terminated(), true);
230 assert_eq!(count, 0);
231 assert_eq!(one_or_many.is_empty(), true);
232 }
233
234 #[test]
235 fn one_or_many_push_one_after_poll() {
236 let (waker, count) = new_count_waker();
237 let mut context = Context::from_waker(&waker);
238
239 let event = Event::new();
240 let one_or_many = OneOrMany::new(event.wait());
241 pin_mut!(one_or_many);
242 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
243 assert_eq!(one_or_many.is_empty(), false);
244
245 let other_event = Event::new();
246 one_or_many.push(other_event.wait());
247
248 assert_eq!(count, 0);
249 assert_eq!(event.signal(), true);
250 assert_eq!(count, 1);
251
252 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
253 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
254 assert_eq!(one_or_many.is_empty(), false);
255 }
256
257 #[test]
258 fn one_or_many_push_one_after_ready_before_poll() {
259 let (waker, count) = new_count_waker();
260 let mut context = Context::from_waker(&waker);
261
262 let event = Event::new();
263 let one_or_many = OneOrMany::new(event.wait());
264 pin_mut!(one_or_many);
265 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
266
267 assert_eq!(count, 0);
268 assert_eq!(event.signal(), true);
269
270 let other_event = Event::new();
271 one_or_many.push(other_event.wait());
272 assert_eq!(count, 1);
273
274 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
275 assert_eq!(one_or_many.poll_next(&mut context), Poll::Pending);
276 }
277
278 #[test]
279 fn one_or_many_one_exhausted_push() {
280 let (waker, count) = new_count_waker();
281 let mut context = Context::from_waker(&waker);
282
283 let one_or_many = OneOrMany::new(futures::future::ready(1));
284 pin_mut!(one_or_many);
285 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
286 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
287 assert_eq!(one_or_many.is_terminated(), true);
288
289 one_or_many.push(futures::future::ready(2));
290 assert_eq!(one_or_many.is_terminated(), false);
291 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(2)));
292 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
293 assert_eq!(one_or_many.is_terminated(), true);
294 assert_eq!(count, 0);
295 }
296
297 #[test]
298 fn one_or_many_many_exhausted_push() {
299 let (waker, count) = new_count_waker();
300 let mut context = Context::from_waker(&waker);
301
302 let one_or_many: OneOrMany<_> = [1, 2].into_iter().map(futures::future::ready).collect();
303 pin_mut!(one_or_many);
304
305 let mut values = [(); 2].map(|()| {
306 let poll = one_or_many.as_mut().poll_next(&mut context);
307 assert_matches!(poll, Poll::Ready(Some(i)) => i)
308 });
309 values.sort();
310 assert_eq!(values, [1, 2]);
311 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
312 assert_eq!(one_or_many.is_terminated(), true);
313
314 one_or_many.push(futures::future::ready(3));
315 assert_eq!(one_or_many.is_terminated(), false);
316
317 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(3)));
318 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
319 assert_eq!(one_or_many.is_terminated(), true);
320 assert_eq!(count, 0)
321 }
322
323 #[test]
324 fn one_or_many_collect_none() {
325 let waker = noop_waker();
326 let mut context = Context::from_waker(&waker);
327
328 let one_or_many: OneOrMany<Ready<()>> = std::iter::empty().collect();
329 pin_mut!(one_or_many);
330
331 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
332 assert_eq!(one_or_many.is_empty(), true);
333 }
334
335 #[test]
336 fn one_or_many_collect_one() {
337 let waker = noop_waker();
338 let mut context = Context::from_waker(&waker);
339
340 let one_or_many: OneOrMany<_> = std::iter::once(futures::future::ready(1)).collect();
341 pin_mut!(one_or_many);
342
343 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
344 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
345 assert_eq!(one_or_many.is_empty(), true);
346 }
347
348 #[test]
349 fn one_or_many_collect_multiple() {
350 let waker = noop_waker();
351 let mut context = Context::from_waker(&waker);
352
353 let one_or_many: OneOrMany<_> =
354 (1..=5).into_iter().map(|i| futures::future::ready(i)).collect();
355
356 let fut = one_or_many.collect();
357 pin_mut!(fut);
358 let all: HashSet<_> = assert_matches!(fut.poll(&mut context), Poll::Ready(x) => x);
359 assert_eq!(all, HashSet::from_iter(1..=5));
360 }
361
362 #[test]
363 fn fused_stream() {
364 let waker = futures_test::task::panic_waker();
365 let mut context = Context::from_waker(&waker);
366
367 let one_or_many = OneOrMany::<_>::default();
368 pin_mut!(one_or_many);
369 assert!(!one_or_many.is_terminated());
370 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
371 assert!(one_or_many.is_terminated());
372
373 one_or_many.as_mut().push(futures::future::ready(()));
374 assert!(!one_or_many.is_terminated());
375 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
376 assert!(!one_or_many.is_terminated());
377 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
378 assert!(one_or_many.is_terminated());
379
380 one_or_many.as_mut().push(futures::future::ready(()));
383 assert!(!one_or_many.is_terminated());
384 one_or_many.as_mut().push(futures::future::ready(()));
385 assert!(!one_or_many.is_terminated());
386 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
387 assert!(!one_or_many.is_terminated());
388 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
389 assert!(!one_or_many.is_terminated());
390 assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
391 assert!(one_or_many.is_terminated());
392 }
393}