test_manager_lib/
utilities.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::StreamExt;
6use futures::prelude::*;
7
8/// Convert iterator fidl method into stream of events.
9/// ie convert
10/// fidl_method() -> Future<Result<Vec<T>, E>>
11/// to
12/// Stream<Item=Result<T, E>>
13pub fn stream_fn<F, T, E, Fut>(query_fn: F) -> impl Stream<Item = Result<T, E>>
14where
15    F: 'static + FnMut() -> Fut + Unpin + Send + Sync,
16    Fut: Future<Output = Result<Vec<T>, E>> + Unpin + Send + Sync,
17{
18    futures::stream::try_unfold(query_fn, |mut query_fn| async move {
19        Ok(Some((query_fn().await?, query_fn)))
20    })
21    .try_take_while(|vec| futures::future::ready(Ok(!vec.is_empty())))
22    .map_ok(|vec| futures::stream::iter(vec).map(Ok))
23    .try_flatten()
24}