test_manager_lib/
utilities.rs1use futures::StreamExt;
6use futures::prelude::*;
7
8pub fn stream_fn<F, T, E, Fut>(query_fn: F) -> impl Stream<Item = Result<T, E>>
14where
15 F: 'static + FnMut() -> Fut + Unpin + Send + Sync,
16 Fut: Future<Output = Result<Vec<T>, E>> + Unpin + Send + Sync,
17{
18 futures::stream::try_unfold(query_fn, |mut query_fn| async move {
19 Ok(Some((query_fn().await?, query_fn)))
20 })
21 .try_take_while(|vec| futures::future::ready(Ok(!vec.is_empty())))
22 .map_ok(|vec| futures::stream::iter(vec).map(Ok))
23 .try_flatten()
24}