stream_processor_test/
stream_runner.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::buffer_set::*;
6use crate::elementary_stream::*;
7use crate::input_packet_stream::*;
8use crate::output_validator::*;
9use crate::stream::*;
10use crate::Result;
11use fidl_fuchsia_media::*;
12use futures::TryStreamExt;
13use log::debug;
14use std::rc::Rc;
15
16/// Runs elementary streams through a stream processor.
17pub struct StreamRunner {
18    input_buffer_ordinals: OrdinalSequence,
19    output_buffer_ordinals: OrdinalSequence,
20    stream_lifetime_ordinals: OrdinalSequence,
21    format_details_ordinals: OrdinalSequence,
22    output_buffer_set: Option<BufferSet>,
23    input_buffer_set: Option<BufferSet>,
24    stream_processor: StreamProcessorProxy,
25}
26
27impl StreamRunner {
28    pub fn new(stream_processor: StreamProcessorProxy) -> Self {
29        Self {
30            input_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
31            output_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
32            stream_lifetime_ordinals: OrdinalPattern::Odd.into_iter(),
33            format_details_ordinals: OrdinalPattern::All.into_iter(),
34            input_buffer_set: None,
35            output_buffer_set: None,
36            stream_processor,
37        }
38    }
39
40    pub async fn run_stream(
41        &mut self,
42        stream: Rc<dyn ElementaryStream>,
43        options: StreamOptions,
44    ) -> Result<Vec<Output>> {
45        let format_details_version_ordinal = get_ordinal(&mut self.format_details_ordinals);
46        let stream_lifetime_ordinal = get_ordinal(&mut self.stream_lifetime_ordinals);
47
48        debug!(stream_lifetime_ordinal:%, format_details_version_ordinal:%; "Starting a stream");
49
50        let mut events = self.stream_processor.take_event_stream();
51
52        let output = {
53            let mut stream = Stream {
54                format_details_version_ordinal,
55                stream_lifetime_ordinal,
56                input_buffer_ordinals: &mut self.input_buffer_ordinals,
57                input_packet_stream: self.input_buffer_set.take().map(|buffer_set| {
58                    InputPacketStream::new(buffer_set, stream.stream(), stream_lifetime_ordinal)
59                }),
60                output_buffer_ordinals: &mut self.output_buffer_ordinals,
61                output_buffer_set: self.output_buffer_set.take(),
62                current_output_format: None,
63                stream_processor: &mut self.stream_processor,
64                stream: stream.as_ref(),
65                options: options.clone(),
66                output: vec![],
67            };
68
69            stream.start().await?;
70
71            let channel_closed = loop {
72                let Some(event) = events.try_next().await? else {
73                    break true;
74                };
75                #[allow(clippy::large_futures)]
76                let control_flow = stream.handle_event(event).await?;
77                match control_flow {
78                    StreamControlFlow::Continue => {}
79                    StreamControlFlow::Stop => break false,
80                };
81            };
82
83            let mut output = stream.output;
84            if channel_closed {
85                output.push(Output::CodecChannelClose);
86            }
87
88            self.input_buffer_set =
89                stream.input_packet_stream.map(|stream| stream.take_buffer_set());
90            self.output_buffer_set = stream.output_buffer_set;
91
92            output
93        };
94
95        if options.release_input_buffers_at_end {
96            self.input_buffer_set = None;
97        }
98
99        if options.release_output_buffers_at_end {
100            self.output_buffer_set = None;
101        }
102
103        Ok(output)
104    }
105}