async_utils/stream/future_map.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::stream::{FusedStream, Stream};
6use futures::Future;
7use std::collections::HashMap;
8use std::hash::Hash;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12/// A collection of Future indexed by key, allowing removal by Key. When polled, a FutureMap yields
13/// from whichever member future is ready first.
14/// The Future type `Fut` can be `?Unpin`, as all futures are stored as pins inside the map. The Key
15/// type `K` must be `Unpin`; it is unlikely that an `!Unpin` type would ever be needed as a Key.
16/// FutureMap yields items of type Fut::Output.
17pub struct FutureMap<K, Fut> {
18 inner: HashMap<K, Pin<Box<Fut>>>,
19 is_terminated: bool,
20}
21
22impl<K, Fut> Default for FutureMap<K, Fut> {
23 fn default() -> Self {
24 Self { inner: Default::default(), is_terminated: false }
25 }
26}
27
28impl<K: Unpin, Fut> Unpin for FutureMap<K, Fut> {}
29
30impl<K: Eq + Hash + Unpin, Fut: Future> FutureMap<K, Fut> {
31 /// Returns an empty `FutureMap`.
32 pub fn new() -> Self {
33 Self::default()
34 }
35
36 /// Insert a future identified by `key` to the map.
37 ///
38 /// This method will not call `poll` on the submitted stream. The caller must ensure
39 /// that `poll_next` is called in order to receive wake-up notifications for the given
40 /// stream.
41 pub fn insert(&mut self, key: K, future: Fut) -> Option<Pin<Box<Fut>>> {
42 let Self { inner, is_terminated } = self;
43 *is_terminated = false;
44 inner.insert(key, Box::new(future).into())
45 }
46
47 /// Returns `true` if the `FutureMap` contains `key`.
48 pub fn contains_key(&self, key: &K) -> bool {
49 self.inner.contains_key(key)
50 }
51
52 /// Remove the future identified by `key`, returning it if it exists.
53 pub fn remove(&mut self, key: &K) -> Option<Pin<Box<Fut>>> {
54 self.inner.remove(key)
55 }
56
57 /// Provide mutable access to the inner hashmap.
58 /// This is safe as if the future were being polled, we would not be able to access a mutable
59 /// reference to self to pass to this method.
60 pub fn inner(&mut self) -> &mut HashMap<K, Pin<Box<Fut>>> {
61 &mut self.inner
62 }
63}
64
65impl<K: Clone + Eq + Hash + Unpin, Fut: Future> Stream for FutureMap<K, Fut> {
66 type Item = Fut::Output;
67
68 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
69 // We can pull the inner value out as FutureMap is `Unpin`
70 let Self { inner, is_terminated } = Pin::into_inner(self);
71
72 if inner.is_empty() {
73 *is_terminated = true;
74 Poll::Ready(None)
75 } else {
76 match inner.iter_mut().find_map(|(key, future)| match Pin::new(future).poll(cx) {
77 Poll::Ready(req) => Some((key.clone(), req)),
78 Poll::Pending => None,
79 }) {
80 Some((key, req)) => {
81 assert!(inner.remove(&key).is_some());
82 Poll::Ready(Some(req))
83 }
84 None => Poll::Pending,
85 }
86 }
87 }
88}
89
90impl<K: Clone + Eq + Hash + Unpin, Fut: Future> FusedStream for FutureMap<K, Fut> {
91 fn is_terminated(&self) -> bool {
92 let Self { inner: _, is_terminated } = self;
93 *is_terminated
94 }
95}
96
97#[cfg(test)]
98mod test {
99 //! We validate the behavior of the FutureMap stream by enumerating all possible external
100 //! events, and then generating permutations of valid sequences of those events. These model
101 //! the possible executions sequences the stream could go through in program execution. We
102 //! then assert that:
103 //! a) At all points during execution, all invariants are held
104 //! b) The final result is as expected
105 //!
106 //! In this case, the invariants are:
107 //! * If the map is empty, it is pending
108 //! * If all futures are pending, the map is pending
109 //! * otherwise the map is ready
110 //!
111 //! The result is:
112 //! * All test messages have been injected
113 //! * All test messages have been yielded
114 //! * All test futures have terminated
115 //! * No event is yielded with a given key after the future for that key has terminated
116 //!
117 //! Together these show:
118 //! * Progress is always eventually made - the Stream cannot be stalled
119 //! * All inserted elements will eventually be yielded
120 //! * Elements are never duplicated
121 use super::*;
122 use crate::stream::WithTag;
123 use futures::channel::oneshot;
124 use futures::StreamExt;
125 use proptest::prelude::*;
126 use std::collections::HashSet;
127 use std::fmt::Debug;
128
129 /// Possible actions to take in evaluating the stream
130 enum Event<K> {
131 /// Insert a new future
132 InsertFuture(K, oneshot::Receiver<Result<u64, ()>>),
133 /// Send a value, completing a future.
134 CompleteFuture(K, oneshot::Sender<Result<u64, ()>>),
135 /// Schedule the executor. The executor will only run the task if awoken, otherwise it will
136 /// do nothing
137 Execute,
138 }
139
140 impl<K: Debug> Debug for Event<K> {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 match self {
143 Event::InsertFuture(k, _) => write!(f, "InsertFuture({:?})", k),
144 Event::CompleteFuture(k, _) => write!(f, "SendRequest({:?})", k),
145 Event::Execute => write!(f, "Execute"),
146 }
147 }
148 }
149
150 fn stream_events<K: Clone + Eq + Hash>(key: K) -> Vec<Event<K>> {
151 let (sender, receiver) = oneshot::channel::<Result<u64, ()>>();
152 vec![Event::InsertFuture(key.clone(), receiver), Event::CompleteFuture(key, sender)]
153 }
154
155 /// Determine how many events are sent on open channels (a channel is open if it has not been
156 /// closed, even if it has not yet been inserted into the FutureMap)
157 fn expected_yield<K: Eq + Hash>(events: &Vec<Event<K>>) -> usize {
158 events
159 .iter()
160 .fold((HashSet::new(), 0), |(mut terminated, closed), event| match event {
161 Event::CompleteFuture(k, _) => {
162 assert!(
163 !terminated.contains(k),
164 "There should be no more than one future per key"
165 );
166 let _: bool = terminated.insert(k);
167 (terminated, closed + 1)
168 }
169 _ => (terminated, closed),
170 })
171 .1
172 }
173
174 /// Strategy that produces random permutations of a set of events, corresponding to inserting,
175 /// sending and completing up to n different streams in random order, also interspersed with
176 /// running the executor
177 fn execution_sequences(n: u64) -> impl Strategy<Value = Vec<Event<u64>>> {
178 fn generate_events(n: u64) -> Vec<Event<u64>> {
179 let mut events = (0..n).flat_map(|n| stream_events(n)).collect::<Vec<_>>();
180 events.extend(std::iter::repeat_with(|| Event::Execute).take((n * 3) as usize));
181 events
182 }
183
184 // We want to produce random permutations of these events
185 (0..n).prop_map(generate_events).prop_shuffle()
186 }
187
188 proptest! {
189 #[test]
190 fn test_invariants(mut execution in execution_sequences(4)) {
191 let expected = expected_yield(&execution);
192
193 // Add enough execution events to ensure we will complete, no matter the order
194 execution.extend(std::iter::repeat_with(|| Event::Execute).take((expected * 3) as usize));
195
196 let (waker, count) = futures_test::task::new_count_waker();
197 let mut futures = FutureMap::new();
198 let expected = expected as u64;
199 let mut next_wake = 0;
200 let mut yielded = 0;
201 let mut inserted = 0;
202 let mut events = vec![];
203 for event in execution {
204 match event {
205 Event::InsertFuture(key, future) => {
206 assert_matches::assert_matches!(futures.insert(key, future.tagged(key)), None);
207 // FutureMap does *not* wake on inserting new futures, matching the
208 // behavior of streams::SelectAll. The client *must* arrange for it to be
209 // polled again after a future is inserted; we model that here by forcing a
210 // wake up
211 next_wake = count.get();
212 }
213 Event::CompleteFuture(_, sender) => {
214 prop_assert_eq!(sender.send(Ok(1)), Ok(()));
215 inserted = inserted + 1;
216 }
217 Event::Execute if count.get() >= next_wake => {
218 match Pin::new(&mut futures.next()).poll(&mut Context::from_waker(&waker)) {
219 Poll::Ready(Some((k, v))) => {
220 events.push((k, v));
221 yielded = yielded + 1;
222 // Ensure that we wake up next time;
223 next_wake = count.get();
224 // Invariant: future(k) must be in the map
225 prop_assert!(!futures.contains_key(&k))
226 }
227 Poll::Ready(None) => {
228 // // the Stream impl for FutureMap never completes
229 // unreachable!()
230 prop_assert!(futures.inner.is_empty());
231 }
232 Poll::Pending => {
233 next_wake = count.get() + 1;
234 }
235 };
236 }
237 Event::Execute => (),
238 }
239 }
240 prop_assert_eq!(inserted, expected, "All expected requests inserted");
241 prop_assert_eq!((next_wake, count.get(), yielded), (next_wake, count.get(), expected), "All expected requests yielded");
242 let all_keys = 0..expected;
243 for k in all_keys {
244 prop_assert!(!futures.contains_key(&k), "All futures should now have been removed");
245 }
246 }
247 }
248}