stream_processor_test/
test_spec.rs1#![allow(clippy::large_futures)]
6
7use crate::elementary_stream::*;
8use crate::output_validator::*;
9use crate::stream::*;
10use crate::stream_runner::*;
11use crate::{FatalError, Result};
12use anyhow::Context as _;
13use fidl_fuchsia_media::StreamProcessorProxy;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::TryStreamExt;
17use std::rc::Rc;
18
19pub enum OutputSize {
20 PacketCount(usize),
22 RawBytesCount(usize),
24}
25
26const FIRST_FORMAT_DETAILS_VERSION_ORDINAL: u64 = 1;
27
28pub type TestCaseOutputs = Vec<Output>;
29
30pub trait StreamProcessorFactory {
31 fn connect_to_stream_processor(
32 &self,
33 stream: &dyn ElementaryStream,
34 format_details_version_ordinal: u64,
35 ) -> BoxFuture<'_, Result<StreamProcessorProxy>>;
36}
37
38pub struct TestSpec {
41 pub cases: Vec<TestCase>,
42 pub relation: CaseRelation,
43 pub stream_processor_factory: Rc<dyn StreamProcessorFactory>,
44}
45
46pub enum CaseRelation {
48 Serial,
51 Concurrent,
54}
55
56pub struct TestCase {
60 pub name: &'static str,
61 pub stream: Rc<dyn ElementaryStream>,
62 pub validators: Vec<Rc<dyn OutputValidator>>,
63 pub stream_options: Option<StreamOptions>,
64}
65
66impl TestSpec {
67 pub async fn run(self) -> Result<Option<Vec<TestCaseOutputs>>> {
68 let res = match self.relation {
69 CaseRelation::Serial => {
70 Some(run_cases_serially(self.stream_processor_factory.as_ref(), self.cases).await?)
71 }
72 CaseRelation::Concurrent => {
73 run_cases_concurrently(self.stream_processor_factory.as_ref(), self.cases).await?;
74 None
75 }
76 };
77 Ok(res)
78 }
79}
80
81async fn run_cases_serially(
82 stream_processor_factory: &dyn StreamProcessorFactory,
83 cases: Vec<TestCase>,
84) -> Result<Vec<TestCaseOutputs>> {
85 let stream_processor =
86 if let Some(stream) = cases.first().as_ref().map(|case| case.stream.as_ref()) {
87 stream_processor_factory
88 .connect_to_stream_processor(stream, FIRST_FORMAT_DETAILS_VERSION_ORDINAL)
89 .await?
90 } else {
91 return Err(FatalError(String::from("No test cases provided.")).into());
92 };
93 let mut stream_runner = StreamRunner::new(stream_processor);
94
95 let mut all_outputs = Vec::new();
96 for case in cases {
97 let output = stream_runner
98 .run_stream(case.stream, case.stream_options.unwrap_or_default())
99 .await
100 .context(format!("Running case {}", case.name))?;
101 for validator in case.validators {
102 validator.validate(&output).await.context(format!("Validating case {}", case.name))?;
103 }
104 all_outputs.push(output);
105 }
106 Ok(all_outputs)
107}
108
109async fn run_cases_concurrently(
110 stream_processor_factory: &dyn StreamProcessorFactory,
111 cases: Vec<TestCase>,
112) -> Result<()> {
113 let mut unordered = FuturesUnordered::new();
114 for case in cases {
115 unordered.push(run_cases_serially(stream_processor_factory, vec![case]))
116 }
117
118 while let Some(_) = unordered.try_next().await? {}
119
120 Ok(())
121}
122
123pub fn with_large_stack(f: fn() -> Result<()>) -> Result<()> {
124 const MEGABYTE: usize = 1024 * 1024;
126 const STACK_SIZE: usize = 4 * MEGABYTE;
127 std::thread::Builder::new().stack_size(STACK_SIZE).spawn(f).unwrap().join().unwrap()
128}