test_manager_lib/
utilities.rs1use futures::prelude::*;
6use futures::StreamExt;
7use log::info;
8
9pub fn stream_fn<F, T, E, Fut>(query_fn: F) -> impl Stream<Item = Result<T, E>>
15where
16 F: 'static + FnMut() -> Fut + Unpin + Send + Sync,
17 Fut: Future<Output = Result<Vec<T>, E>> + Unpin + Send + Sync,
18{
19 futures::stream::try_unfold(query_fn, |mut query_fn| async move {
20 Ok(Some((query_fn().await?, query_fn)))
21 })
22 .try_take_while(|vec| futures::future::ready(Ok(!vec.is_empty())))
23 .map_ok(|vec| futures::stream::iter(vec).map(Ok))
24 .try_flatten()
25}
26
27pub struct LogOnDrop(pub &'static str);
29
30impl std::ops::Drop for LogOnDrop {
31 fn drop(&mut self) {
32 info!("{}", self.0);
33 }
34}