async_test_helpers/
lib.rs1use async_utils::PollExt;
6use fuchsia_async as fasync;
7use futures::future::Either;
8use futures::stream::{Stream, StreamExt};
9use futures::task::Poll;
10use futures::Future;
11use std::pin::pin;
12
13pub fn run_while<BackgroundFut, ResultFut, Out>(
22 exec: &mut fasync::TestExecutor,
23 background_fut: BackgroundFut,
24 result_fut: ResultFut,
25) -> (Out, BackgroundFut)
26where
27 BackgroundFut: Future + Unpin,
28 ResultFut: Future<Output = Out>,
29{
30 let result_fut = pin!(result_fut);
31 let mut select_fut = futures::future::select(background_fut, result_fut);
32 loop {
33 match exec.run_until_stalled(&mut select_fut) {
34 Poll::Ready(Either::Right(r)) => return r,
35 Poll::Ready(Either::Left(_)) => panic!("Background future finished"),
36 Poll::Pending => {}
37 }
38 }
39}
40
41#[track_caller]
43pub fn expect_stream_item<S: Stream + Unpin>(
44 exec: &mut fasync::TestExecutor,
45 stream: &mut S,
46) -> S::Item {
47 exec.run_until_stalled(&mut stream.next()).expect("stream item").expect("not terminated")
48}
49
50#[track_caller]
52pub fn expect_stream_pending<S: Stream + Unpin>(exec: &mut fasync::TestExecutor, stream: &mut S) {
53 let _ = exec.run_until_stalled(&mut stream.next()).expect_pending("stream pending");
54}
55
56#[track_caller]
58pub fn expect_stream_terminated<S: Stream + Unpin>(
59 exec: &mut fasync::TestExecutor,
60 stream: &mut S,
61) {
62 let result = exec.run_until_stalled(&mut stream.next()).expect("stream item");
63 assert!(result.is_none());
64}