async_utils/stream/
one_or_many.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::future::Future;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8
9use futures::stream::{FusedStream, FuturesUnordered, Stream};
10use pin_project::pin_project;
11
12/// A collection of multiple futures that optimizes for the single-future case.
13///
14/// Instances of `OneOrMany` can be created with `Default`, [`OneOrMany::new`],
15/// or as the result of `.collect()`ing from an iterator.
16#[pin_project]
17pub struct OneOrMany<F>(#[pin] Impl<F>);
18
19/// Maintains internal state for the [`Impl::One`] case, keeping track of when
20/// `None` is already yielded to provide a correct `FusedFuture` implementation.
21#[pin_project(project=OneInnerProj)]
22enum OneInner<F> {
23    Present(#[pin] F),
24    Absent,
25    AbsentNoneYielded,
26}
27
28impl<F> OneInner<F> {
29    fn take(&mut self) -> Option<F> {
30        let v = std::mem::replace(self, Self::Absent);
31        match v {
32            Self::Present(f) => Some(f),
33            Self::Absent => None,
34            Self::AbsentNoneYielded => {
35                // Restore back the NoneYieldedState.
36                *self = Self::AbsentNoneYielded;
37                None
38            }
39        }
40    }
41}
42
43#[pin_project(project=OneOrManyProj)]
44enum Impl<F> {
45    One(#[pin] OneInner<F>),
46    Many(#[pin] FuturesUnordered<F>),
47}
48
49impl<F> Default for OneOrMany<F> {
50    fn default() -> Self {
51        Self(Impl::One(OneInner::Absent))
52    }
53}
54
55impl<F> OneOrMany<F> {
56    /// Constructs a `OneOrMany` with a single future.
57    ///
58    /// Constructs a new `OneOrMany` with exactly one future. If no additional
59    /// futures are added via [`push`](OneOrMany::push), this is behaviorally
60    /// identical to constructing a stream by providing the future to
61    /// [`futures::stream::once`].
62    pub fn new(f: F) -> Self {
63        Self(Impl::One(OneInner::Present(f)))
64    }
65
66    /// Appends a new future to the set of pending futures.
67    ///
68    /// Like [`FuturesUnordered::push`], this doesn't call
69    /// [`poll`](Future::poll) on the provided future. The caller must ensure
70    /// that [`poll_next`](Stream::poll_next) is called in order to receive
71    /// wake-up notifications for the provided future.
72    pub fn push(&mut self, f: F) {
73        let Self(this) = self;
74        match this {
75            Impl::One(o) => match o.take() {
76                None => *o = OneInner::Present(f),
77                Some(first) => *this = Impl::Many([first, f].into_iter().collect()),
78            },
79            Impl::Many(unordered) => {
80                if unordered.is_empty() {
81                    // Opportunistically switch back to `One`, but only if there
82                    // are no more futures. This is expensive in the short term
83                    // but more performant on average assuming most of the time
84                    // this `OneOrMany` is holding zero or one futures.
85                    // Otherwise the cost of allocating and deallocating a
86                    // `FuturesUnordered` would outweigh the gains of less
87                    // indirection.
88                    *this = Impl::One(OneInner::Present(f))
89                } else {
90                    unordered.push(f);
91                }
92            }
93        }
94    }
95
96    /// Returns true if and only if there are no futures held.
97    pub fn is_empty(&self) -> bool {
98        let Self(this) = self;
99        match this {
100            Impl::One(OneInner::Absent | OneInner::AbsentNoneYielded) => true,
101            Impl::One(OneInner::Present(_)) => false,
102            Impl::Many(many) => many.is_empty(),
103        }
104    }
105}
106
107impl<F: Future> Stream for OneOrMany<F> {
108    type Item = F::Output;
109
110    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
111        let this = self.project();
112        match this.0.project() {
113            OneOrManyProj::One(mut p) => match p.as_mut().project() {
114                OneInnerProj::Absent | OneInnerProj::AbsentNoneYielded => {
115                    p.set(OneInner::AbsentNoneYielded);
116                    Poll::Ready(None)
117                }
118                OneInnerProj::Present(f) => match f.poll(cx) {
119                    Poll::Ready(t) => {
120                        let output = Poll::Ready(Some(t));
121                        p.set(OneInner::Absent);
122                        output
123                    }
124                    Poll::Pending => Poll::Pending,
125                },
126            },
127            OneOrManyProj::Many(unordered) => {
128                // Instead of returning the value directly, we could check
129                // whether `unordered` contains a single element and, if so,
130                // return `this` to the `Impl::One` case. We avoid doing that
131                // because it's unfriendly to an expected common pattern, where
132                // the OneOrMany holds two futures and a new one is added every
133                // time one of them completes (e.g. if the futures are
134                // constructed from streams). Instead we implement a little bit
135                // of hysteresis here and in the `Impl::Many` case in `push`. by
136                // requiring `unordered` to be completely empty before reverting
137                // to `Impl::One`.
138                unordered.poll_next(cx)
139            }
140        }
141    }
142}
143
144impl<F: Future> FusedStream for OneOrMany<F> {
145    fn is_terminated(&self) -> bool {
146        let Self(this) = self;
147        match this {
148            Impl::One(OneInner::Present(_) | OneInner::Absent) => false,
149            Impl::One(OneInner::AbsentNoneYielded) => true,
150            Impl::Many(unordered) => unordered.is_terminated(),
151        }
152    }
153}
154
155impl<F> FromIterator<F> for OneOrMany<F> {
156    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
157        let mut iter = iter.into_iter();
158
159        Self(match iter.next() {
160            None => Impl::One(OneInner::Absent),
161            Some(first) => match iter.next() {
162                None => Impl::One(OneInner::Present(first)),
163                Some(second) => Impl::Many([first, second].into_iter().chain(iter).collect()),
164            },
165        })
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use std::collections::HashSet;
172
173    use crate::event::Event;
174    use assert_matches::assert_matches;
175    use futures::future::Ready;
176    use futures::{pin_mut, StreamExt as _};
177    use futures_test::task::{new_count_waker, noop_waker};
178
179    use super::*;
180
181    #[test]
182    fn one_or_many_one() {
183        let (waker, count) = new_count_waker();
184        let mut context = Context::from_waker(&waker);
185
186        let event = Event::new();
187        let one_or_many = OneOrMany::new(event.wait());
188        pin_mut!(one_or_many);
189
190        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
191        assert_eq!(event.signal(), true);
192        assert_eq!(count, 1);
193
194        assert_eq!(one_or_many.poll_next(&mut context), Poll::Ready(Some(())));
195        assert_eq!(count, 1);
196    }
197
198    #[test]
199    fn one_or_many_one_poll_exhausted() {
200        let (waker, count) = new_count_waker();
201        let mut context = Context::from_waker(&waker);
202
203        let one_or_many = OneOrMany::new(futures::future::ready(()));
204        pin_mut!(one_or_many);
205        assert_eq!(one_or_many.is_terminated(), false);
206        assert_eq!(one_or_many.is_empty(), false);
207
208        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
209        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
210        assert_eq!(count, 0);
211        assert_eq!(one_or_many.is_terminated(), true);
212        assert_eq!(one_or_many.is_empty(), true);
213    }
214
215    #[test]
216    fn one_or_many_push_one() {
217        let (waker, count) = new_count_waker();
218        let mut context = Context::from_waker(&waker);
219
220        let mut one_or_many = OneOrMany::new(futures::future::ready(()));
221        one_or_many.push(futures::future::ready(()));
222        pin_mut!(one_or_many);
223        assert_eq!(one_or_many.is_terminated(), false);
224        assert_eq!(one_or_many.is_empty(), false);
225
226        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
227        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
228        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
229        assert_eq!(one_or_many.is_terminated(), true);
230        assert_eq!(count, 0);
231        assert_eq!(one_or_many.is_empty(), true);
232    }
233
234    #[test]
235    fn one_or_many_push_one_after_poll() {
236        let (waker, count) = new_count_waker();
237        let mut context = Context::from_waker(&waker);
238
239        let event = Event::new();
240        let one_or_many = OneOrMany::new(event.wait());
241        pin_mut!(one_or_many);
242        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
243        assert_eq!(one_or_many.is_empty(), false);
244
245        let other_event = Event::new();
246        one_or_many.push(other_event.wait());
247
248        assert_eq!(count, 0);
249        assert_eq!(event.signal(), true);
250        assert_eq!(count, 1);
251
252        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
253        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
254        assert_eq!(one_or_many.is_empty(), false);
255    }
256
257    #[test]
258    fn one_or_many_push_one_after_ready_before_poll() {
259        let (waker, count) = new_count_waker();
260        let mut context = Context::from_waker(&waker);
261
262        let event = Event::new();
263        let one_or_many = OneOrMany::new(event.wait());
264        pin_mut!(one_or_many);
265        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Pending);
266
267        assert_eq!(count, 0);
268        assert_eq!(event.signal(), true);
269
270        let other_event = Event::new();
271        one_or_many.push(other_event.wait());
272        assert_eq!(count, 1);
273
274        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
275        assert_eq!(one_or_many.poll_next(&mut context), Poll::Pending);
276    }
277
278    #[test]
279    fn one_or_many_one_exhausted_push() {
280        let (waker, count) = new_count_waker();
281        let mut context = Context::from_waker(&waker);
282
283        let one_or_many = OneOrMany::new(futures::future::ready(1));
284        pin_mut!(one_or_many);
285        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
286        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
287        assert_eq!(one_or_many.is_terminated(), true);
288
289        one_or_many.push(futures::future::ready(2));
290        assert_eq!(one_or_many.is_terminated(), false);
291        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(2)));
292        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
293        assert_eq!(one_or_many.is_terminated(), true);
294        assert_eq!(count, 0);
295    }
296
297    #[test]
298    fn one_or_many_many_exhausted_push() {
299        let (waker, count) = new_count_waker();
300        let mut context = Context::from_waker(&waker);
301
302        let one_or_many: OneOrMany<_> = [1, 2].into_iter().map(futures::future::ready).collect();
303        pin_mut!(one_or_many);
304
305        let mut values = [(); 2].map(|()| {
306            let poll = one_or_many.as_mut().poll_next(&mut context);
307            assert_matches!(poll, Poll::Ready(Some(i)) => i)
308        });
309        values.sort();
310        assert_eq!(values, [1, 2]);
311        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
312        assert_eq!(one_or_many.is_terminated(), true);
313
314        one_or_many.push(futures::future::ready(3));
315        assert_eq!(one_or_many.is_terminated(), false);
316
317        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(3)));
318        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
319        assert_eq!(one_or_many.is_terminated(), true);
320        assert_eq!(count, 0)
321    }
322
323    #[test]
324    fn one_or_many_collect_none() {
325        let waker = noop_waker();
326        let mut context = Context::from_waker(&waker);
327
328        let one_or_many: OneOrMany<Ready<()>> = std::iter::empty().collect();
329        pin_mut!(one_or_many);
330
331        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
332        assert_eq!(one_or_many.is_empty(), true);
333    }
334
335    #[test]
336    fn one_or_many_collect_one() {
337        let waker = noop_waker();
338        let mut context = Context::from_waker(&waker);
339
340        let one_or_many: OneOrMany<_> = std::iter::once(futures::future::ready(1)).collect();
341        pin_mut!(one_or_many);
342
343        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(1)));
344        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
345        assert_eq!(one_or_many.is_empty(), true);
346    }
347
348    #[test]
349    fn one_or_many_collect_multiple() {
350        let waker = noop_waker();
351        let mut context = Context::from_waker(&waker);
352
353        let one_or_many: OneOrMany<_> =
354            (1..=5).into_iter().map(|i| futures::future::ready(i)).collect();
355
356        let fut = one_or_many.collect();
357        pin_mut!(fut);
358        let all: HashSet<_> = assert_matches!(fut.poll(&mut context), Poll::Ready(x) => x);
359        assert_eq!(all, HashSet::from_iter(1..=5));
360    }
361
362    #[test]
363    fn fused_stream() {
364        let waker = futures_test::task::panic_waker();
365        let mut context = Context::from_waker(&waker);
366
367        let one_or_many = OneOrMany::<_>::default();
368        pin_mut!(one_or_many);
369        assert!(!one_or_many.is_terminated());
370        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
371        assert!(one_or_many.is_terminated());
372
373        one_or_many.as_mut().push(futures::future::ready(()));
374        assert!(!one_or_many.is_terminated());
375        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
376        assert!(!one_or_many.is_terminated());
377        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
378        assert!(one_or_many.is_terminated());
379
380        // Do it again with two futures to test the FuturesUnordered passthrough
381        // case.
382        one_or_many.as_mut().push(futures::future::ready(()));
383        assert!(!one_or_many.is_terminated());
384        one_or_many.as_mut().push(futures::future::ready(()));
385        assert!(!one_or_many.is_terminated());
386        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
387        assert!(!one_or_many.is_terminated());
388        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(Some(())));
389        assert!(!one_or_many.is_terminated());
390        assert_eq!(one_or_many.as_mut().poll_next(&mut context), Poll::Ready(None));
391        assert!(one_or_many.is_terminated());
392    }
393}