async_utils/stream/
future_map.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::stream::{FusedStream, Stream};
6use futures::Future;
7use std::collections::HashMap;
8use std::hash::Hash;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12/// A collection of Future indexed by key, allowing removal by Key. When polled, a FutureMap yields
13/// from whichever member future is ready first.
14/// The Future type `Fut` can be `?Unpin`, as all futures are stored as pins inside the map. The Key
15/// type `K` must be `Unpin`; it is unlikely that an `!Unpin` type would ever be needed as a Key.
16/// FutureMap yields items of type Fut::Output.
17pub struct FutureMap<K, Fut> {
18    inner: HashMap<K, Pin<Box<Fut>>>,
19    is_terminated: bool,
20}
21
22impl<K, Fut> Default for FutureMap<K, Fut> {
23    fn default() -> Self {
24        Self { inner: Default::default(), is_terminated: false }
25    }
26}
27
28impl<K: Unpin, Fut> Unpin for FutureMap<K, Fut> {}
29
30impl<K: Eq + Hash + Unpin, Fut: Future> FutureMap<K, Fut> {
31    /// Returns an empty `FutureMap`.
32    pub fn new() -> Self {
33        Self::default()
34    }
35
36    /// Insert a future identified by `key` to the map.
37    ///
38    /// This method will not call `poll` on the submitted stream. The caller must ensure
39    /// that `poll_next` is called in order to receive wake-up notifications for the given
40    /// stream.
41    pub fn insert(&mut self, key: K, future: Fut) -> Option<Pin<Box<Fut>>> {
42        let Self { inner, is_terminated } = self;
43        *is_terminated = false;
44        inner.insert(key, Box::new(future).into())
45    }
46
47    /// Returns `true` if the `FutureMap` contains `key`.
48    pub fn contains_key(&self, key: &K) -> bool {
49        self.inner.contains_key(key)
50    }
51
52    /// Remove the future identified by `key`, returning it if it exists.
53    pub fn remove(&mut self, key: &K) -> Option<Pin<Box<Fut>>> {
54        self.inner.remove(key)
55    }
56
57    /// Provide mutable access to the inner hashmap.
58    /// This is safe as if the future were being polled, we would not be able to access a mutable
59    /// reference to self to pass to this method.
60    pub fn inner(&mut self) -> &mut HashMap<K, Pin<Box<Fut>>> {
61        &mut self.inner
62    }
63}
64
65impl<K: Clone + Eq + Hash + Unpin, Fut: Future> Stream for FutureMap<K, Fut> {
66    type Item = Fut::Output;
67
68    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
69        // We can pull the inner value out as FutureMap is `Unpin`
70        let Self { inner, is_terminated } = Pin::into_inner(self);
71
72        if inner.is_empty() {
73            *is_terminated = true;
74            Poll::Ready(None)
75        } else {
76            match inner.iter_mut().find_map(|(key, future)| match Pin::new(future).poll(cx) {
77                Poll::Ready(req) => Some((key.clone(), req)),
78                Poll::Pending => None,
79            }) {
80                Some((key, req)) => {
81                    assert!(inner.remove(&key).is_some());
82                    Poll::Ready(Some(req))
83                }
84                None => Poll::Pending,
85            }
86        }
87    }
88}
89
90impl<K: Clone + Eq + Hash + Unpin, Fut: Future> FusedStream for FutureMap<K, Fut> {
91    fn is_terminated(&self) -> bool {
92        let Self { inner: _, is_terminated } = self;
93        *is_terminated
94    }
95}
96
97#[cfg(test)]
98mod test {
99    //! We validate the behavior of the FutureMap stream by enumerating all possible external
100    //! events, and then generating permutations of valid sequences of those events. These model
101    //! the possible executions sequences the stream could go through in program execution. We
102    //! then assert that:
103    //!   a) At all points during execution, all invariants are held
104    //!   b) The final result is as expected
105    //!
106    //! In this case, the invariants are:
107    //!   * If the map is empty, it is pending
108    //!   * If all futures are pending, the map is pending
109    //!   * otherwise the map is ready
110    //!
111    //! The result is:
112    //!   * All test messages have been injected
113    //!   * All test messages have been yielded
114    //!   * All test futures have terminated
115    //!   * No event is yielded with a given key after the future for that key has terminated
116    //!
117    //! Together these show:
118    //!   * Progress is always eventually made - the Stream cannot be stalled
119    //!   * All inserted elements will eventually be yielded
120    //!   * Elements are never duplicated
121    use super::*;
122    use crate::stream::WithTag;
123    use futures::channel::oneshot;
124    use futures::StreamExt;
125    use proptest::prelude::*;
126    use std::collections::HashSet;
127    use std::fmt::Debug;
128
129    /// Possible actions to take in evaluating the stream
130    enum Event<K> {
131        /// Insert a new future
132        InsertFuture(K, oneshot::Receiver<Result<u64, ()>>),
133        /// Send a value, completing a future.
134        CompleteFuture(K, oneshot::Sender<Result<u64, ()>>),
135        /// Schedule the executor. The executor will only run the task if awoken, otherwise it will
136        /// do nothing
137        Execute,
138    }
139
140    impl<K: Debug> Debug for Event<K> {
141        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142            match self {
143                Event::InsertFuture(k, _) => write!(f, "InsertFuture({:?})", k),
144                Event::CompleteFuture(k, _) => write!(f, "SendRequest({:?})", k),
145                Event::Execute => write!(f, "Execute"),
146            }
147        }
148    }
149
150    fn stream_events<K: Clone + Eq + Hash>(key: K) -> Vec<Event<K>> {
151        let (sender, receiver) = oneshot::channel::<Result<u64, ()>>();
152        vec![Event::InsertFuture(key.clone(), receiver), Event::CompleteFuture(key, sender)]
153    }
154
155    /// Determine how many events are sent on open channels (a channel is open if it has not been
156    /// closed, even if it has not yet been inserted into the FutureMap)
157    fn expected_yield<K: Eq + Hash>(events: &Vec<Event<K>>) -> usize {
158        events
159            .iter()
160            .fold((HashSet::new(), 0), |(mut terminated, closed), event| match event {
161                Event::CompleteFuture(k, _) => {
162                    assert!(
163                        !terminated.contains(k),
164                        "There should be no more than one future per key"
165                    );
166                    let _: bool = terminated.insert(k);
167                    (terminated, closed + 1)
168                }
169                _ => (terminated, closed),
170            })
171            .1
172    }
173
174    /// Strategy that produces random permutations of a set of events, corresponding to inserting,
175    /// sending and completing up to n different streams in random order, also interspersed with
176    /// running the executor
177    fn execution_sequences(n: u64) -> impl Strategy<Value = Vec<Event<u64>>> {
178        fn generate_events(n: u64) -> Vec<Event<u64>> {
179            let mut events = (0..n).flat_map(|n| stream_events(n)).collect::<Vec<_>>();
180            events.extend(std::iter::repeat_with(|| Event::Execute).take((n * 3) as usize));
181            events
182        }
183
184        // We want to produce random permutations of these events
185        (0..n).prop_map(generate_events).prop_shuffle()
186    }
187
188    proptest! {
189        #[test]
190        fn test_invariants(mut execution in execution_sequences(4)) {
191            let expected = expected_yield(&execution);
192
193            // Add enough execution events to ensure we will complete, no matter the order
194            execution.extend(std::iter::repeat_with(|| Event::Execute).take((expected * 3) as usize));
195
196            let (waker, count) = futures_test::task::new_count_waker();
197            let mut futures = FutureMap::new();
198            let expected = expected as u64;
199            let mut next_wake = 0;
200            let mut yielded = 0;
201            let mut inserted = 0;
202            let mut events = vec![];
203            for event in execution {
204                match event {
205                    Event::InsertFuture(key, future) => {
206                        assert_matches::assert_matches!(futures.insert(key, future.tagged(key)), None);
207                        // FutureMap does *not* wake on inserting new futures, matching the
208                        // behavior of streams::SelectAll. The client *must* arrange for it to be
209                        // polled again after a future is inserted; we model that here by forcing a
210                        // wake up
211                        next_wake = count.get();
212                    }
213                    Event::CompleteFuture(_, sender) => {
214                        prop_assert_eq!(sender.send(Ok(1)), Ok(()));
215                        inserted = inserted + 1;
216                    }
217                    Event::Execute if count.get() >= next_wake => {
218                        match Pin::new(&mut futures.next()).poll(&mut Context::from_waker(&waker)) {
219                            Poll::Ready(Some((k, v))) => {
220                                events.push((k, v));
221                                yielded = yielded + 1;
222                                // Ensure that we wake up next time;
223                                next_wake = count.get();
224                                // Invariant: future(k) must be in the map
225                                prop_assert!(!futures.contains_key(&k))
226                            }
227                            Poll::Ready(None) => {
228                                // // the Stream impl for FutureMap never completes
229                                // unreachable!()
230                                prop_assert!(futures.inner.is_empty());
231                            }
232                            Poll::Pending => {
233                                next_wake = count.get() + 1;
234                            }
235                        };
236                    }
237                    Event::Execute => (),
238                }
239            }
240            prop_assert_eq!(inserted, expected, "All expected requests inserted");
241            prop_assert_eq!((next_wake, count.get(), yielded), (next_wake, count.get(), expected), "All expected requests yielded");
242            let all_keys = 0..expected;
243            for k in all_keys {
244                prop_assert!(!futures.contains_key(&k), "All futures should now have been removed");
245            }
246        }
247    }
248}