stream_processor_test/
stream_runner.rs1use crate::buffer_set::*;
6use crate::elementary_stream::*;
7use crate::input_packet_stream::*;
8use crate::output_validator::*;
9use crate::stream::*;
10use crate::Result;
11use fidl_fuchsia_media::*;
12use futures::TryStreamExt;
13use log::debug;
14use std::rc::Rc;
15
16pub struct StreamRunner {
18 input_buffer_ordinals: OrdinalSequence,
19 output_buffer_ordinals: OrdinalSequence,
20 stream_lifetime_ordinals: OrdinalSequence,
21 format_details_ordinals: OrdinalSequence,
22 output_buffer_set: Option<BufferSet>,
23 input_buffer_set: Option<BufferSet>,
24 stream_processor: StreamProcessorProxy,
25}
26
27impl StreamRunner {
28 pub fn new(stream_processor: StreamProcessorProxy) -> Self {
29 Self {
30 input_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
31 output_buffer_ordinals: OrdinalPattern::Odd.into_iter(),
32 stream_lifetime_ordinals: OrdinalPattern::Odd.into_iter(),
33 format_details_ordinals: OrdinalPattern::All.into_iter(),
34 input_buffer_set: None,
35 output_buffer_set: None,
36 stream_processor,
37 }
38 }
39
40 pub async fn run_stream(
41 &mut self,
42 stream: Rc<dyn ElementaryStream>,
43 options: StreamOptions,
44 ) -> Result<Vec<Output>> {
45 let format_details_version_ordinal = get_ordinal(&mut self.format_details_ordinals);
46 let stream_lifetime_ordinal = get_ordinal(&mut self.stream_lifetime_ordinals);
47
48 debug!(stream_lifetime_ordinal:%, format_details_version_ordinal:%; "Starting a stream");
49
50 let mut events = self.stream_processor.take_event_stream();
51
52 let output = {
53 let mut stream = Stream {
54 format_details_version_ordinal,
55 stream_lifetime_ordinal,
56 input_buffer_ordinals: &mut self.input_buffer_ordinals,
57 input_packet_stream: self.input_buffer_set.take().map(|buffer_set| {
58 InputPacketStream::new(buffer_set, stream.stream(), stream_lifetime_ordinal)
59 }),
60 output_buffer_ordinals: &mut self.output_buffer_ordinals,
61 output_buffer_set: self.output_buffer_set.take(),
62 current_output_format: None,
63 stream_processor: &mut self.stream_processor,
64 stream: stream.as_ref(),
65 options: options.clone(),
66 output: vec![],
67 };
68
69 stream.start().await?;
70
71 let channel_closed = loop {
72 let Some(event) = events.try_next().await? else {
73 break true;
74 };
75 #[allow(clippy::large_futures)]
76 let control_flow = stream.handle_event(event).await?;
77 match control_flow {
78 StreamControlFlow::Continue => {}
79 StreamControlFlow::Stop => break false,
80 };
81 };
82
83 let mut output = stream.output;
84 if channel_closed {
85 output.push(Output::CodecChannelClose);
86 }
87
88 self.input_buffer_set =
89 stream.input_packet_stream.map(|stream| stream.take_buffer_set());
90 self.output_buffer_set = stream.output_buffer_set;
91
92 output
93 };
94
95 if options.release_input_buffers_at_end {
96 self.input_buffer_set = None;
97 }
98
99 if options.release_output_buffers_at_end {
100 self.output_buffer_set = None;
101 }
102
103 Ok(output)
104 }
105}