stream_processor_test/
stream.rs1#![allow(clippy::large_futures)]
6
7use crate::buffer_set::*;
8use crate::elementary_stream::*;
9use crate::input_packet_stream::*;
10use crate::output_validator::*;
11use crate::{FatalError, Result};
12use fidl_fuchsia_media::*;
13use fidl_fuchsia_sysmem2::BufferCollectionConstraints;
14use fuchsia_stream_processors::*;
15use log::debug;
16use std::rc::Rc;
17
18pub type OrdinalSequence = <OrdinalPattern as IntoIterator>::IntoIter;
19
20#[derive(Debug, Clone)]
21pub struct StreamOptions {
22 pub queue_format_details: bool,
25 pub release_input_buffers_at_end: bool,
26 pub release_output_buffers_at_end: bool,
27 pub input_buffer_collection_constraints: Option<BufferCollectionConstraints>,
28 pub output_buffer_collection_constraints: Option<BufferCollectionConstraints>,
29 pub stop_after_first_output: bool,
30}
31
32impl Default for StreamOptions {
33 fn default() -> Self {
34 Self {
35 queue_format_details: true,
36 release_input_buffers_at_end: false,
37 release_output_buffers_at_end: false,
38 input_buffer_collection_constraints: None,
39 output_buffer_collection_constraints: None,
40 stop_after_first_output: false,
41 }
42 }
43}
44
45pub struct Stream<'a> {
46 pub format_details_version_ordinal: u64,
47 pub stream_lifetime_ordinal: u64,
48 pub input_buffer_ordinals: &'a mut OrdinalSequence,
49 pub input_packet_stream:
50 Option<InputPacketStream<Box<dyn Iterator<Item = ElementaryStreamChunk> + 'a>>>,
51 pub output_buffer_ordinals: &'a mut OrdinalSequence,
52 pub output_buffer_set: Option<BufferSet>,
53 pub current_output_format: Option<Rc<ValidStreamOutputFormat>>,
54 pub stream_processor: &'a mut StreamProcessorProxy,
55 pub stream: &'a dyn ElementaryStream,
56 pub options: StreamOptions,
57 pub output: Vec<Output>,
58}
59
60pub enum StreamControlFlow {
61 Continue,
62 Stop,
63}
64
65impl<'a: 'b, 'b> Stream<'a> {
66 pub async fn start(&'b mut self) -> Result<()> {
67 if self.options.queue_format_details && self.input_packet_stream.is_some() {
68 debug!("Sending input format details for follow-up stream.");
69 self.stream_processor.queue_input_format_details(
70 self.stream_lifetime_ordinal,
71 &self.stream.format_details(self.format_details_version_ordinal),
72 )?;
73 }
74
75 self.send_available_input()?;
76
77 Ok(())
78 }
79
80 pub async fn handle_event(
81 &'b mut self,
82 event: StreamProcessorEvent,
83 ) -> Result<StreamControlFlow> {
84 match event {
85 StreamProcessorEvent::OnInputConstraints { input_constraints } => {
86 debug!("Received input constraints.");
87 debug!("Input constraints are: {:#?}", input_constraints);
88
89 let buffer_set = BufferSetFactory::buffer_set(
90 get_ordinal(self.input_buffer_ordinals),
91 ValidStreamBufferConstraints::try_from(input_constraints)?,
92 self.stream_processor,
93 BufferSetType::Input,
94 self.options.input_buffer_collection_constraints.clone(),
95 )
96 .await?;
97
98 debug!("Sending input format details in response to input constraints.");
99 self.stream_processor.queue_input_format_details(
100 self.stream_lifetime_ordinal,
101 &self.stream.format_details(self.format_details_version_ordinal),
102 )?;
103
104 let chunk_stream = self.stream.capped_chunks(buffer_set.buffer_size);
105 self.input_packet_stream = Some(InputPacketStream::new(
106 buffer_set,
107 chunk_stream,
108 self.stream_lifetime_ordinal,
109 ));
110 self.send_available_input()?;
111 }
112 StreamProcessorEvent::OnOutputConstraints { output_config } => {
113 debug!("Received output constraints.");
114 debug!("Output constraints are: {:#?}", output_config);
115
116 let constraints = ValidStreamOutputConstraints::try_from(output_config)?;
117 if constraints.buffer_constraints_action_required {
118 self.output_buffer_set = Some(
119 BufferSetFactory::buffer_set(
120 get_ordinal(self.output_buffer_ordinals),
121 constraints.buffer_constraints,
122 self.stream_processor,
123 BufferSetType::Output,
124 self.options.output_buffer_collection_constraints.clone(),
125 )
126 .await?,
127 );
128 }
129 }
130 StreamProcessorEvent::OnFreeInputPacket { free_input_packet } => {
131 debug!("Received freed input packet.");
132 debug!("Freed input packet is: {:#?}", free_input_packet);
133
134 let free_input_packet = ValidPacketHeader::try_from(free_input_packet)?;
135 let input_packet_stream = self.input_packet_stream.as_mut().expect(concat!(
136 "Unwrapping packet stream; ",
137 "it should be set before we ",
138 "get free input packets back."
139 ));
140 input_packet_stream.add_free_packet(free_input_packet)?;
141 self.send_available_input()?;
142 }
143 StreamProcessorEvent::OnOutputFormat { output_format } => {
144 debug!("Received output format.");
145 debug!("Output format is: {:#?}", output_format);
146
147 let output_format = ValidStreamOutputFormat::try_from(output_format)?;
148 assert_eq!(output_format.stream_lifetime_ordinal, self.stream_lifetime_ordinal);
149 self.current_output_format = Some(Rc::new(output_format));
150 }
151 StreamProcessorEvent::OnOutputPacket {
152 output_packet,
153 error_detected_before,
154 error_detected_during,
155 } => {
156 assert!(!error_detected_before);
157 assert!(!error_detected_during);
158 debug!("Received output packet.");
159 debug!("Output packet is: {:#?}", output_packet);
160
161 let output_packet = ValidPacket::try_from(output_packet)?;
162 self.output.push(Output::Packet(OutputPacket {
163 data: self
164 .output_buffer_set
165 .as_ref()
166 .ok_or(FatalError(String::from(concat!(
167 "There should be an output buffer set ",
168 "if we are receiving output packets"
169 ))))?
170 .read_packet(&output_packet)?,
171 format: self.current_output_format.clone().ok_or(FatalError(String::from(
172 concat!(
173 "There should be an output format set ",
174 "if we are receiving output packets"
175 ),
176 )))?,
177 packet: output_packet,
178 }));
179
180 self.stream_processor.recycle_output_packet(&PacketHeader {
181 buffer_lifetime_ordinal: Some(output_packet.header.buffer_lifetime_ordinal),
182 packet_index: Some(output_packet.header.packet_index),
183 ..Default::default()
184 })?;
185
186 if self.options.stop_after_first_output {
187 return Ok(StreamControlFlow::Stop);
188 }
189 }
190 StreamProcessorEvent::OnOutputEndOfStream {
191 stream_lifetime_ordinal,
192 error_detected_before,
193 } => {
194 assert!(!error_detected_before);
195 debug!("Received output end of stream.");
196 debug!("End of stream is for stream {}", stream_lifetime_ordinal);
197
198 self.output.push(Output::Eos { stream_lifetime_ordinal });
200 self.stream_processor.close_current_stream(
201 self.stream_lifetime_ordinal,
202 self.options.release_input_buffers_at_end,
203 self.options.release_output_buffers_at_end,
204 )?;
205 self.stream_processor.sync().await?;
206
207 return Ok(StreamControlFlow::Stop);
212 }
213 e => {
214 debug!("Got other event: {:#?}", e);
215 }
216 }
217
218 Ok(StreamControlFlow::Continue)
219 }
220
221 fn send_available_input(&'b mut self) -> Result<()> {
222 let input_packet_stream =
223 if let Some(input_packet_stream) = self.input_packet_stream.as_mut() {
224 input_packet_stream
225 } else {
226 return Ok(());
227 };
228
229 loop {
230 match input_packet_stream.next_packet()? {
231 PacketPoll::Ready(input_packet) => {
232 debug!("Sending input packet. {:?}", input_packet.valid_length_bytes);
233 self.stream_processor.queue_input_packet(&input_packet)?;
234 }
235 PacketPoll::Eos => {
236 debug!("Sending end of stream.");
237 break Ok(self
238 .stream_processor
239 .queue_input_end_of_stream(self.stream_lifetime_ordinal)?);
240 }
241 PacketPoll::NotReady => break Ok(()),
242 }
243 }
244 }
245}