async_test_helpers/
lib.rs1use async_utils::PollExt;
6use fuchsia_async as fasync;
7use futures::future::Either;
8use futures::stream::{Stream, StreamExt};
9use futures::task::Poll;
10use futures::Future;
11use std::pin::pin;
12
13pub fn run_while<BackgroundFut, ResultFut, Out>(
22    exec: &mut fasync::TestExecutor,
23    background_fut: BackgroundFut,
24    result_fut: ResultFut,
25) -> (Out, BackgroundFut)
26where
27    BackgroundFut: Future + Unpin,
28    ResultFut: Future<Output = Out>,
29{
30    let result_fut = pin!(result_fut);
31    let mut select_fut = futures::future::select(background_fut, result_fut);
32    loop {
33        match exec.run_until_stalled(&mut select_fut) {
34            Poll::Ready(Either::Right(r)) => return r,
35            Poll::Ready(Either::Left(_)) => panic!("Background future finished"),
36            Poll::Pending => {}
37        }
38    }
39}
40
41#[track_caller]
43pub fn expect_stream_item<S: Stream + Unpin>(
44    exec: &mut fasync::TestExecutor,
45    stream: &mut S,
46) -> S::Item {
47    exec.run_until_stalled(&mut stream.next()).expect("stream item").expect("not terminated")
48}
49
50#[track_caller]
52pub fn expect_stream_pending<S: Stream + Unpin>(exec: &mut fasync::TestExecutor, stream: &mut S) {
53    let _ = exec.run_until_stalled(&mut stream.next()).expect_pending("stream pending");
54}
55
56#[track_caller]
58pub fn expect_stream_terminated<S: Stream + Unpin>(
59    exec: &mut fasync::TestExecutor,
60    stream: &mut S,
61) {
62    let result = exec.run_until_stalled(&mut stream.next()).expect("stream item");
63    assert!(result.is_none());
64}