async_utils/stream/
stream_map.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::hash::Hash;
6use core::pin::Pin;
7use core::task::{Context, Poll};
8use futures::stream::{FusedStream, Stream, StreamExt};
9use futures::Future;
10use std::collections::HashMap;
11
12/// A collection of Stream indexed by key, allowing removal by Key. When polled, a StreamMap yields
13/// from whichever member stream is ready first.
14/// The Stream type `St` can be `?Unpin`, as all streams are stored as pins inside the map. The Key
15/// type `K` must be `Unpin`; it is unlikely that an `!Unpin` type would ever be needed as a Key.
16/// StreamMap yields items of type St::Item; For a stream that yields messages tagged with their
17/// Key, consider using the `IndexedStreams` type alias or using the `Tagged` combinator.
18pub struct StreamMap<K, St> {
19    /// Streams `St` identified by key `K`
20    inner: HashMap<K, Pin<Box<St>>>,
21}
22
23impl<K: Unpin, St> Unpin for StreamMap<K, St> {}
24
25impl<K: Eq + Hash + Unpin, St: Stream> StreamMap<K, St> {
26    /// Returns an empty `StreamMap`.
27    pub fn empty() -> StreamMap<K, St> {
28        StreamMap { inner: HashMap::new() }
29    }
30
31    /// Insert a stream identified by `key` to the map.
32    ///
33    /// This method will not call `poll` on the submitted stream. The caller must ensure
34    /// that `poll_next` is called in order to receive wake-up notifications for the given
35    /// stream.
36    pub fn insert(&mut self, key: K, stream: St) -> Option<Pin<Box<St>>> {
37        self.inner.insert(key, Box::new(stream).into())
38    }
39
40    /// Returns `true` if the `StreamMap` contains `key`.
41    pub fn contains_key(&self, key: &K) -> bool {
42        self.inner.contains_key(key)
43    }
44
45    /// Remove the stream identified by `key`, returning it if it exists.
46    pub fn remove(&mut self, key: &K) -> Option<Pin<Box<St>>> {
47        self.inner.remove(key)
48    }
49
50    /// Provide mutable access to the inner hashmap.
51    ///
52    /// This is safe as if the stream were being polled, we would not be able to access a mutable
53    /// reference to self to pass to this method.
54    pub fn inner_mut(&mut self) -> &mut HashMap<K, Pin<Box<St>>> {
55        &mut self.inner
56    }
57
58    /// Provide immutable access to the inner hashmap.
59    ///
60    /// This is safe as if the stream were being polled, we would not be able to access a
61    /// reference to self to pass to this method.
62    pub fn inner(&self) -> &HashMap<K, Pin<Box<St>>> {
63        &self.inner
64    }
65}
66
67impl<K: Clone + Eq + Hash + Unpin, St: Stream> Stream for StreamMap<K, St> {
68    type Item = St::Item;
69
70    // TODO(https://fxbug.dev/42129310) - This implementation is a simple one, which is convenient to write but
71    // suffers from a couple of known issues:
72    // * The implementation is O(n) wrt the number of streams in the map. We should
73    //   be able to produce an O(1) implementation at the cost of internal complexity by
74    //   implementing a ready-to-run queue similarly to futures::stream::FuturesUnordered
75    // * The implementation uses a stable order of iteration which could result in one particular
76    //   stream starving following streams from ever being polled. The implementation makes no
77    //   promises about fairness but clients may well expect a fairer distribution. We should be
78    //   able to provide a round-robin implementation using a similar transformation as resolves the
79    //   O(1) issue
80    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81        let mut result = Poll::Pending;
82        let mut to_remove = Vec::new();
83        // We can pull the inner value out as StreamMap is `Unpin`
84        let streams = Pin::into_inner(self);
85        for (key, stream) in streams.inner.iter_mut() {
86            match Pin::new(&mut stream.next()).poll(cx) {
87                Poll::Ready(Some(req)) => {
88                    result = Poll::Ready(Some(req));
89                    break;
90                }
91                // if a stream returns None, remove it and continue
92                Poll::Ready(None) => {
93                    to_remove.push(key.clone());
94                }
95                Poll::Pending => (),
96            }
97        }
98        for key in to_remove {
99            assert!(streams.remove(&key).is_some());
100        }
101        result
102    }
103}
104
105// StreamMap never returns None, because a new stream could always be inserted with items.
106impl<K: Clone + Eq + Hash + Unpin, St: Stream> FusedStream for StreamMap<K, St> {
107    fn is_terminated(&self) -> bool {
108        false
109    }
110}
111
112#[cfg(test)]
113mod test {
114    //! We validate the behavior of the StreamMap stream by enumerating all possible external
115    //! events, and then generating permutations of valid sequences of those events. These model
116    //! the possible executions sequences the stream could go through in program execution. We
117    //! then assert that:
118    //!   a) At all points during execution, all invariants are held
119    //!   b) The final result is as expected
120    //!
121    //! In this case, the invariants are:
122    //!   * If the map is empty, it is pending
123    //!   * If all streams are pending, the map is pending
124    //!   * otherwise the map is ready
125    //!
126    //! The result is:
127    //!   * All test messages have been injected
128    //!   * All test messages have been yielded
129    //!   * All test streams have terminated
130    //!   * No event is yielded with a given key after the stream for that key has terminated
131    //!
132    //! Together these show:
133    //!   * Progress is always eventually made - the Stream cannot be stalled
134    //!   * All inserted elements will eventually be yielded
135    //!   * Elements are never duplicated
136    use super::*;
137    use crate::stream::{StreamItem, WithEpitaph, WithTag};
138    use futures::channel::mpsc;
139    use proptest::prelude::*;
140    use std::collections::HashSet;
141    use std::fmt::Debug;
142
143    // We validate the behavior of the StreamMap stream by enumerating all possible external
144    // events, and then generating permutations of valid sequences of those events. These model
145    // the possible executions sequences the stream could go through in program execution. We
146    // then assert that:
147    //   a) At all points during execution, all invariants are held
148    //   b) The final result is as expected
149    //
150    // In this case, the invariants are:
151    //   * If the map is empty, it is pending
152    //   * If all streams are pending, the map is pending
153    //   * otherwise the map is ready
154    //
155    // The result is:
156    //   * All test messages have been injected
157    //   * All test messages have been yielded
158    //   * All test streams have terminated
159    //   * No event is yielded with a given key after the stream for that key has terminated
160    //
161    // Together these show:
162    //   * Progress is always eventually made - the Stream cannot be stalled
163    //   * All inserted elements will eventually be yielded
164    //   * Elements are never duplicated
165
166    /// Possible actions to take in evaluating the stream
167    enum Event<K> {
168        /// Insert a new request stream
169        InsertStream(K, mpsc::Receiver<Result<u64, ()>>),
170        /// Send a new request
171        SendRequest(K, mpsc::Sender<Result<u64, ()>>),
172        /// Close an existing request stream
173        CloseStream(K, mpsc::Sender<Result<u64, ()>>),
174        /// Schedule the executor. The executor will only run the task if awoken, otherwise it will
175        /// do nothing
176        Execute,
177    }
178
179    impl<K: Debug> Debug for Event<K> {
180        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181            match self {
182                Event::InsertStream(k, _) => write!(f, "InsertStream({:?})", k),
183                Event::SendRequest(k, _) => write!(f, "SendRequest({:?})", k),
184                Event::CloseStream(k, _) => write!(f, "CloseStream({:?})", k),
185                Event::Execute => write!(f, "Execute"),
186            }
187        }
188    }
189
190    fn stream_events<K: Clone + Eq + Hash>(key: K) -> Vec<Event<K>> {
191        // Ensure that the channel is big enough to always handle all the Sends we make
192        let (sender, receiver) = mpsc::channel::<Result<u64, ()>>(10);
193        vec![
194            Event::InsertStream(key.clone(), receiver),
195            Event::SendRequest(key.clone(), sender.clone()),
196            Event::CloseStream(key, sender),
197        ]
198    }
199
200    /// Determine how many events are sent on open channels (a channel is open if it has not been
201    /// closed, even if it has not yet been inserted into the StreamMap)
202    fn expected_yield<K: Eq + Hash>(events: &Vec<Event<K>>) -> usize {
203        events
204            .iter()
205            .fold((HashSet::new(), 0), |(mut terminated, closed), event| match event {
206                Event::CloseStream(k, _) => {
207                    let _: bool = terminated.insert(k);
208                    (terminated, closed)
209                }
210                Event::SendRequest(k, _) if !terminated.contains(k) => (terminated, closed + 1),
211                _ => (terminated, closed),
212            })
213            .1
214    }
215
216    /// Strategy that produces random permutations of a set of events, corresponding to inserting,
217    /// sending and completing up to n different streams in random order, also interspersed with
218    /// running the executor
219    fn execution_sequences(n: u64) -> impl Strategy<Value = Vec<Event<u64>>> {
220        fn generate_events(n: u64) -> Vec<Event<u64>> {
221            let mut events = (0..n).flat_map(|n| stream_events(n)).collect::<Vec<_>>();
222            events.extend(std::iter::repeat_with(|| Event::Execute).take((n * 3) as usize));
223            events
224        }
225
226        // We want to produce random permutations of these events
227        (0..n).prop_map(generate_events).prop_shuffle()
228    }
229
230    proptest! {
231        #[test]
232        fn test_invariants(mut execution in execution_sequences(4)) {
233            let expected = expected_yield(&execution);
234            let expected_count:u64 = execution.iter()
235                .filter(|event| match event {
236                    Event::CloseStream(_, _) => true,
237                    _ => false,
238                }).count() as u64;
239
240            // Add enough execution events to ensure we will complete, no matter the order
241            execution.extend(std::iter::repeat_with(|| Event::Execute).take((expected_count * 3) as usize));
242
243            let (waker, count) = futures_test::task::new_count_waker();
244            let send_waker = futures_test::task::noop_waker();
245            let mut streams = StreamMap::empty();
246            let mut next_wake = 0;
247            let mut yielded = 0;
248            let mut inserted = 0;
249            let mut closed = 0;
250            let mut events = vec![];
251            for event in execution {
252                match event {
253                    Event::InsertStream(key, stream) => {
254                        assert_matches::assert_matches!(streams.insert(key, stream.tagged(key).with_epitaph(key)), None);
255                        // StreamMap does *not* wake on inserting new streams, matching the
256                        // behavior of streams::SelectAll. The client *must* arrange for it to be
257                        // polled again after a stream is inserted; we model that here by forcing a
258                        // wake up
259                        next_wake = count.get();
260                    }
261                    Event::SendRequest(_, mut sender) => {
262                        if let Poll::Ready(Ok(())) = sender.poll_ready(&mut Context::from_waker(&send_waker)) {
263                            prop_assert_eq!(sender.start_send(Ok(1)), Ok(()));
264                            inserted = inserted + 1;
265                        }
266                    }
267                    Event::CloseStream(_, mut stream) => {
268                        stream.close_channel();
269                    }
270                    Event::Execute if count.get() >= next_wake => {
271                        match Pin::new(&mut streams.next()).poll(&mut Context::from_waker(&waker)) {
272                            Poll::Ready(Some(StreamItem::Item((k, v)))) => {
273                                events.push(StreamItem::Item((k, v)));
274                                yielded = yielded + 1;
275                                // Ensure that we wake up next time;
276                                next_wake = count.get();
277                                // Invariant: stream(k) must be in the map
278                                prop_assert!(streams.contains_key(&k))
279                            }
280                            Poll::Ready(Some(StreamItem::Epitaph(k))) => {
281                                events.push(StreamItem::Epitaph(k));
282                                closed = closed + 1;
283                                // Ensure that we wake up next time;
284                                next_wake = count.get();
285                                // stream(k) is now terminated, but until polled again (Yielding
286                                // `None`), will still be in the map
287                            }
288                            Poll::Ready(None) => {
289                                // the Stream impl for StreamMap never completes
290                                unreachable!()
291                            }
292                            Poll::Pending => {
293                                next_wake = count.get() + 1;
294                            }
295                        };
296                    }
297                    Event::Execute => (),
298                }
299            }
300            prop_assert_eq!(inserted, expected, "All expected requests inserted");
301            prop_assert_eq!((next_wake, count.get(), yielded), (next_wake, count.get(), expected), "All expected requests yielded");
302            prop_assert_eq!(closed, expected_count, "All streams closed");
303            let not_terminated =
304                |key: u64, e: &StreamItem<(u64, Result<u64, ()>), u64>| match e {
305                    StreamItem::Epitaph(k) if *k == key => false,
306                    _ => true,
307                };
308            let event_of =
309                |key: u64, e: &StreamItem<(u64, Result<u64, ()>), u64>| match e {
310                    StreamItem::Item((k, _)) if *k == key => true,
311                    _ => false,
312                };
313            let all_keys = 0..expected_count;
314            for k in all_keys {
315                prop_assert!(!streams.contains_key(&k), "All streams should now have been removed");
316                prop_assert!(!events.iter().skip_while(|e| not_terminated(k, e)).any(|e| event_of(k, e)), "No events should have been yielded from a stream after it terminated");
317            }
318        }
319    }
320}