async_utils/stream/stream_map.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::hash::Hash;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use futures::stream::{FusedStream, Stream, StreamExt};
9use futures::Future;
10use std::collections::HashMap;
11
12/// A collection of Stream indexed by key, allowing removal by Key. When polled, a StreamMap yields
13/// from whichever member stream is ready first.
14/// The Stream type `St` can be `?Unpin`, as all streams are stored as pins inside the map. The Key
15/// type `K` must be `Unpin`; it is unlikely that an `!Unpin` type would ever be needed as a Key.
16/// StreamMap yields items of type St::Item; For a stream that yields messages tagged with their
17/// Key, consider using the `IndexedStreams` type alias or using the `Tagged` combinator.
18pub struct StreamMap<K, St> {
19 /// Streams `St` identified by key `K`
20 inner: HashMap<K, Pin<Box<St>>>,
21}
22
23impl<K: Unpin, St> Unpin for StreamMap<K, St> {}
24
25impl<K: Eq + Hash + Unpin, St: Stream> StreamMap<K, St> {
26 /// Returns an empty `StreamMap`.
27 pub fn empty() -> StreamMap<K, St> {
28 StreamMap { inner: HashMap::new() }
29 }
30
31 /// Insert a stream identified by `key` to the map.
32 ///
33 /// This method will not call `poll` on the submitted stream. The caller must ensure
34 /// that `poll_next` is called in order to receive wake-up notifications for the given
35 /// stream.
36 pub fn insert(&mut self, key: K, stream: St) -> Option<Pin<Box<St>>> {
37 self.inner.insert(key, Box::new(stream).into())
38 }
39
40 /// Returns `true` if the `StreamMap` contains `key`.
41 pub fn contains_key(&self, key: &K) -> bool {
42 self.inner.contains_key(key)
43 }
44
45 /// Remove the stream identified by `key`, returning it if it exists.
46 pub fn remove(&mut self, key: &K) -> Option<Pin<Box<St>>> {
47 self.inner.remove(key)
48 }
49
50 /// Provide mutable access to the inner hashmap.
51 ///
52 /// This is safe as if the stream were being polled, we would not be able to access a mutable
53 /// reference to self to pass to this method.
54 pub fn inner_mut(&mut self) -> &mut HashMap<K, Pin<Box<St>>> {
55 &mut self.inner
56 }
57
58 /// Provide immutable access to the inner hashmap.
59 ///
60 /// This is safe as if the stream were being polled, we would not be able to access a
61 /// reference to self to pass to this method.
62 pub fn inner(&self) -> &HashMap<K, Pin<Box<St>>> {
63 &self.inner
64 }
65}
66
67impl<K: Clone + Eq + Hash + Unpin, St: Stream> Stream for StreamMap<K, St> {
68 type Item = St::Item;
69
70 // TODO(https://fxbug.dev/42129310) - This implementation is a simple one, which is convenient to write but
71 // suffers from a couple of known issues:
72 // * The implementation is O(n) wrt the number of streams in the map. We should
73 // be able to produce an O(1) implementation at the cost of internal complexity by
74 // implementing a ready-to-run queue similarly to futures::stream::FuturesUnordered
75 // * The implementation uses a stable order of iteration which could result in one particular
76 // stream starving following streams from ever being polled. The implementation makes no
77 // promises about fairness but clients may well expect a fairer distribution. We should be
78 // able to provide a round-robin implementation using a similar transformation as resolves the
79 // O(1) issue
80 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81 let mut result = Poll::Pending;
82 let mut to_remove = Vec::new();
83 // We can pull the inner value out as StreamMap is `Unpin`
84 let streams = Pin::into_inner(self);
85 for (key, stream) in streams.inner.iter_mut() {
86 match Pin::new(&mut stream.next()).poll(cx) {
87 Poll::Ready(Some(req)) => {
88 result = Poll::Ready(Some(req));
89 break;
90 }
91 // if a stream returns None, remove it and continue
92 Poll::Ready(None) => {
93 to_remove.push(key.clone());
94 }
95 Poll::Pending => (),
96 }
97 }
98 for key in to_remove {
99 assert!(streams.remove(&key).is_some());
100 }
101 result
102 }
103}
104
105// StreamMap never returns None, because a new stream could always be inserted with items.
106impl<K: Clone + Eq + Hash + Unpin, St: Stream> FusedStream for StreamMap<K, St> {
107 fn is_terminated(&self) -> bool {
108 false
109 }
110}
111
112#[cfg(test)]
113mod test {
114 //! We validate the behavior of the StreamMap stream by enumerating all possible external
115 //! events, and then generating permutations of valid sequences of those events. These model
116 //! the possible executions sequences the stream could go through in program execution. We
117 //! then assert that:
118 //! a) At all points during execution, all invariants are held
119 //! b) The final result is as expected
120 //!
121 //! In this case, the invariants are:
122 //! * If the map is empty, it is pending
123 //! * If all streams are pending, the map is pending
124 //! * otherwise the map is ready
125 //!
126 //! The result is:
127 //! * All test messages have been injected
128 //! * All test messages have been yielded
129 //! * All test streams have terminated
130 //! * No event is yielded with a given key after the stream for that key has terminated
131 //!
132 //! Together these show:
133 //! * Progress is always eventually made - the Stream cannot be stalled
134 //! * All inserted elements will eventually be yielded
135 //! * Elements are never duplicated
136 use super::*;
137 use crate::stream::{StreamItem, WithEpitaph, WithTag};
138 use futures::channel::mpsc;
139 use proptest::prelude::*;
140 use std::collections::HashSet;
141 use std::fmt::Debug;
142
143 // We validate the behavior of the StreamMap stream by enumerating all possible external
144 // events, and then generating permutations of valid sequences of those events. These model
145 // the possible executions sequences the stream could go through in program execution. We
146 // then assert that:
147 // a) At all points during execution, all invariants are held
148 // b) The final result is as expected
149 //
150 // In this case, the invariants are:
151 // * If the map is empty, it is pending
152 // * If all streams are pending, the map is pending
153 // * otherwise the map is ready
154 //
155 // The result is:
156 // * All test messages have been injected
157 // * All test messages have been yielded
158 // * All test streams have terminated
159 // * No event is yielded with a given key after the stream for that key has terminated
160 //
161 // Together these show:
162 // * Progress is always eventually made - the Stream cannot be stalled
163 // * All inserted elements will eventually be yielded
164 // * Elements are never duplicated
165
166 /// Possible actions to take in evaluating the stream
167 enum Event<K> {
168 /// Insert a new request stream
169 InsertStream(K, mpsc::Receiver<Result<u64, ()>>),
170 /// Send a new request
171 SendRequest(K, mpsc::Sender<Result<u64, ()>>),
172 /// Close an existing request stream
173 CloseStream(K, mpsc::Sender<Result<u64, ()>>),
174 /// Schedule the executor. The executor will only run the task if awoken, otherwise it will
175 /// do nothing
176 Execute,
177 }
178
179 impl<K: Debug> Debug for Event<K> {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 match self {
182 Event::InsertStream(k, _) => write!(f, "InsertStream({:?})", k),
183 Event::SendRequest(k, _) => write!(f, "SendRequest({:?})", k),
184 Event::CloseStream(k, _) => write!(f, "CloseStream({:?})", k),
185 Event::Execute => write!(f, "Execute"),
186 }
187 }
188 }
189
190 fn stream_events<K: Clone + Eq + Hash>(key: K) -> Vec<Event<K>> {
191 // Ensure that the channel is big enough to always handle all the Sends we make
192 let (sender, receiver) = mpsc::channel::<Result<u64, ()>>(10);
193 vec![
194 Event::InsertStream(key.clone(), receiver),
195 Event::SendRequest(key.clone(), sender.clone()),
196 Event::CloseStream(key, sender),
197 ]
198 }
199
200 /// Determine how many events are sent on open channels (a channel is open if it has not been
201 /// closed, even if it has not yet been inserted into the StreamMap)
202 fn expected_yield<K: Eq + Hash>(events: &Vec<Event<K>>) -> usize {
203 events
204 .iter()
205 .fold((HashSet::new(), 0), |(mut terminated, closed), event| match event {
206 Event::CloseStream(k, _) => {
207 let _: bool = terminated.insert(k);
208 (terminated, closed)
209 }
210 Event::SendRequest(k, _) if !terminated.contains(k) => (terminated, closed + 1),
211 _ => (terminated, closed),
212 })
213 .1
214 }
215
216 /// Strategy that produces random permutations of a set of events, corresponding to inserting,
217 /// sending and completing up to n different streams in random order, also interspersed with
218 /// running the executor
219 fn execution_sequences(n: u64) -> impl Strategy<Value = Vec<Event<u64>>> {
220 fn generate_events(n: u64) -> Vec<Event<u64>> {
221 let mut events = (0..n).flat_map(|n| stream_events(n)).collect::<Vec<_>>();
222 events.extend(std::iter::repeat_with(|| Event::Execute).take((n * 3) as usize));
223 events
224 }
225
226 // We want to produce random permutations of these events
227 (0..n).prop_map(generate_events).prop_shuffle()
228 }
229
230 proptest! {
231 #[test]
232 fn test_invariants(mut execution in execution_sequences(4)) {
233 let expected = expected_yield(&execution);
234 let expected_count:u64 = execution.iter()
235 .filter(|event| match event {
236 Event::CloseStream(_, _) => true,
237 _ => false,
238 }).count() as u64;
239
240 // Add enough execution events to ensure we will complete, no matter the order
241 execution.extend(std::iter::repeat_with(|| Event::Execute).take((expected_count * 3) as usize));
242
243 let (waker, count) = futures_test::task::new_count_waker();
244 let send_waker = futures_test::task::noop_waker();
245 let mut streams = StreamMap::empty();
246 let mut next_wake = 0;
247 let mut yielded = 0;
248 let mut inserted = 0;
249 let mut closed = 0;
250 let mut events = vec![];
251 for event in execution {
252 match event {
253 Event::InsertStream(key, stream) => {
254 assert_matches::assert_matches!(streams.insert(key, stream.tagged(key).with_epitaph(key)), None);
255 // StreamMap does *not* wake on inserting new streams, matching the
256 // behavior of streams::SelectAll. The client *must* arrange for it to be
257 // polled again after a stream is inserted; we model that here by forcing a
258 // wake up
259 next_wake = count.get();
260 }
261 Event::SendRequest(_, mut sender) => {
262 if let Poll::Ready(Ok(())) = sender.poll_ready(&mut Context::from_waker(&send_waker)) {
263 prop_assert_eq!(sender.start_send(Ok(1)), Ok(()));
264 inserted = inserted + 1;
265 }
266 }
267 Event::CloseStream(_, mut stream) => {
268 stream.close_channel();
269 }
270 Event::Execute if count.get() >= next_wake => {
271 match Pin::new(&mut streams.next()).poll(&mut Context::from_waker(&waker)) {
272 Poll::Ready(Some(StreamItem::Item((k, v)))) => {
273 events.push(StreamItem::Item((k, v)));
274 yielded = yielded + 1;
275 // Ensure that we wake up next time;
276 next_wake = count.get();
277 // Invariant: stream(k) must be in the map
278 prop_assert!(streams.contains_key(&k))
279 }
280 Poll::Ready(Some(StreamItem::Epitaph(k))) => {
281 events.push(StreamItem::Epitaph(k));
282 closed = closed + 1;
283 // Ensure that we wake up next time;
284 next_wake = count.get();
285 // stream(k) is now terminated, but until polled again (Yielding
286 // `None`), will still be in the map
287 }
288 Poll::Ready(None) => {
289 // the Stream impl for StreamMap never completes
290 unreachable!()
291 }
292 Poll::Pending => {
293 next_wake = count.get() + 1;
294 }
295 };
296 }
297 Event::Execute => (),
298 }
299 }
300 prop_assert_eq!(inserted, expected, "All expected requests inserted");
301 prop_assert_eq!((next_wake, count.get(), yielded), (next_wake, count.get(), expected), "All expected requests yielded");
302 prop_assert_eq!(closed, expected_count, "All streams closed");
303 let not_terminated =
304 |key: u64, e: &StreamItem<(u64, Result<u64, ()>), u64>| match e {
305 StreamItem::Epitaph(k) if *k == key => false,
306 _ => true,
307 };
308 let event_of =
309 |key: u64, e: &StreamItem<(u64, Result<u64, ()>), u64>| match e {
310 StreamItem::Item((k, _)) if *k == key => true,
311 _ => false,
312 };
313 let all_keys = 0..expected_count;
314 for k in all_keys {
315 prop_assert!(!streams.contains_key(&k), "All streams should now have been removed");
316 prop_assert!(!events.iter().skip_while(|e| not_terminated(k, e)).any(|e| event_of(k, e)), "No events should have been yielded from a stream after it terminated");
317 }
318 }
319 }
320}