stream_processor_test/
test_spec.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![allow(clippy::large_futures)]
6
7use crate::elementary_stream::*;
8use crate::output_validator::*;
9use crate::stream::*;
10use crate::stream_runner::*;
11use crate::{FatalError, Result};
12use anyhow::Context as _;
13use fidl_fuchsia_media::StreamProcessorProxy;
14use futures::future::BoxFuture;
15use futures::stream::FuturesUnordered;
16use futures::TryStreamExt;
17use std::rc::Rc;
18
19pub enum OutputSize {
20    // Size of output in terms of packets.
21    PacketCount(usize),
22    // Size of output in terms of number of raw bytes.
23    RawBytesCount(usize),
24}
25
26const FIRST_FORMAT_DETAILS_VERSION_ORDINAL: u64 = 1;
27
28pub type TestCaseOutputs = Vec<Output>;
29
30pub trait StreamProcessorFactory {
31    fn connect_to_stream_processor(
32        &self,
33        stream: &dyn ElementaryStream,
34        format_details_version_ordinal: u64,
35    ) -> BoxFuture<'_, Result<StreamProcessorProxy>>;
36}
37
38/// A test spec describes all the cases that will run and the circumstances in which
39/// they will run.
40pub struct TestSpec {
41    pub cases: Vec<TestCase>,
42    pub relation: CaseRelation,
43    pub stream_processor_factory: Rc<dyn StreamProcessorFactory>,
44}
45
46/// A case relation describes the temporal relationship between two test cases.
47pub enum CaseRelation {
48    /// With serial relation, test cases will be run in sequence using the same codec server.
49    /// For serial relation, outputs from test cases will be returned.
50    Serial,
51    /// With concurrent relation, test cases will run concurrently using two or more codec servers.
52    /// For concurrent relation, outputs from test cases will not be returned.
53    Concurrent,
54}
55
56/// A test cases describes a sequence of elementary stream chunks that should be fed into a codec
57/// server, and a set of validators to check the output. To pass, all validations must pass for all
58/// output from the stream.
59pub struct TestCase {
60    pub name: &'static str,
61    pub stream: Rc<dyn ElementaryStream>,
62    pub validators: Vec<Rc<dyn OutputValidator>>,
63    pub stream_options: Option<StreamOptions>,
64}
65
66impl TestSpec {
67    pub async fn run(self) -> Result<Option<Vec<TestCaseOutputs>>> {
68        let res = match self.relation {
69            CaseRelation::Serial => {
70                Some(run_cases_serially(self.stream_processor_factory.as_ref(), self.cases).await?)
71            }
72            CaseRelation::Concurrent => {
73                run_cases_concurrently(self.stream_processor_factory.as_ref(), self.cases).await?;
74                None
75            }
76        };
77        Ok(res)
78    }
79}
80
81async fn run_cases_serially(
82    stream_processor_factory: &dyn StreamProcessorFactory,
83    cases: Vec<TestCase>,
84) -> Result<Vec<TestCaseOutputs>> {
85    let stream_processor =
86        if let Some(stream) = cases.first().as_ref().map(|case| case.stream.as_ref()) {
87            stream_processor_factory
88                .connect_to_stream_processor(stream, FIRST_FORMAT_DETAILS_VERSION_ORDINAL)
89                .await?
90        } else {
91            return Err(FatalError(String::from("No test cases provided.")).into());
92        };
93    let mut stream_runner = StreamRunner::new(stream_processor);
94
95    let mut all_outputs = Vec::new();
96    for case in cases {
97        let output = stream_runner
98            .run_stream(case.stream, case.stream_options.unwrap_or_default())
99            .await
100            .context(format!("Running case {}", case.name))?;
101        for validator in case.validators {
102            validator.validate(&output).await.context(format!("Validating case {}", case.name))?;
103        }
104        all_outputs.push(output);
105    }
106    Ok(all_outputs)
107}
108
109async fn run_cases_concurrently(
110    stream_processor_factory: &dyn StreamProcessorFactory,
111    cases: Vec<TestCase>,
112) -> Result<()> {
113    let mut unordered = FuturesUnordered::new();
114    for case in cases {
115        unordered.push(run_cases_serially(stream_processor_factory, vec![case]))
116    }
117
118    while let Some(_) = unordered.try_next().await? {}
119
120    Ok(())
121}
122
123pub fn with_large_stack(f: fn() -> Result<()>) -> Result<()> {
124    // The TestSpec futures are too big to fit on Fuchsia's default stack.
125    const MEGABYTE: usize = 1024 * 1024;
126    const STACK_SIZE: usize = 4 * MEGABYTE;
127    std::thread::Builder::new().stack_size(STACK_SIZE).spawn(f).unwrap().join().unwrap()
128}