stream_processor_test/
stream.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![allow(clippy::large_futures)]
6
7use crate::buffer_set::*;
8use crate::elementary_stream::*;
9use crate::input_packet_stream::*;
10use crate::output_validator::*;
11use crate::{FatalError, Result};
12use fidl_fuchsia_media::*;
13use fidl_fuchsia_sysmem2::BufferCollectionConstraints;
14use fuchsia_stream_processors::*;
15use log::debug;
16use std::rc::Rc;
17
18pub type OrdinalSequence = <OrdinalPattern as IntoIterator>::IntoIter;
19
20#[derive(Debug, Clone)]
21pub struct StreamOptions {
22    /// When true, the stream runner will queue format details for each stream. Otherwise it will
23    /// inherit format details from the codec factory.
24    pub queue_format_details: bool,
25    pub release_input_buffers_at_end: bool,
26    pub release_output_buffers_at_end: bool,
27    pub input_buffer_collection_constraints: Option<BufferCollectionConstraints>,
28    pub output_buffer_collection_constraints: Option<BufferCollectionConstraints>,
29    pub stop_after_first_output: bool,
30}
31
32impl Default for StreamOptions {
33    fn default() -> Self {
34        Self {
35            queue_format_details: true,
36            release_input_buffers_at_end: false,
37            release_output_buffers_at_end: false,
38            input_buffer_collection_constraints: None,
39            output_buffer_collection_constraints: None,
40            stop_after_first_output: false,
41        }
42    }
43}
44
45pub struct Stream<'a> {
46    pub format_details_version_ordinal: u64,
47    pub stream_lifetime_ordinal: u64,
48    pub input_buffer_ordinals: &'a mut OrdinalSequence,
49    pub input_packet_stream:
50        Option<InputPacketStream<Box<dyn Iterator<Item = ElementaryStreamChunk> + 'a>>>,
51    pub output_buffer_ordinals: &'a mut OrdinalSequence,
52    pub output_buffer_set: Option<BufferSet>,
53    pub current_output_format: Option<Rc<ValidStreamOutputFormat>>,
54    pub stream_processor: &'a mut StreamProcessorProxy,
55    pub stream: &'a dyn ElementaryStream,
56    pub options: StreamOptions,
57    pub output: Vec<Output>,
58}
59
60pub enum StreamControlFlow {
61    Continue,
62    Stop,
63}
64
65impl<'a: 'b, 'b> Stream<'a> {
66    pub async fn start(&'b mut self) -> Result<()> {
67        if self.options.queue_format_details && self.input_packet_stream.is_some() {
68            debug!("Sending input format details for follow-up stream.");
69            self.stream_processor.queue_input_format_details(
70                self.stream_lifetime_ordinal,
71                &self.stream.format_details(self.format_details_version_ordinal),
72            )?;
73        }
74
75        self.send_available_input()?;
76
77        Ok(())
78    }
79
80    pub async fn handle_event(
81        &'b mut self,
82        event: StreamProcessorEvent,
83    ) -> Result<StreamControlFlow> {
84        match event {
85            StreamProcessorEvent::OnInputConstraints { input_constraints } => {
86                debug!("Received input constraints.");
87                debug!("Input constraints are: {:#?}", input_constraints);
88
89                let buffer_set = BufferSetFactory::buffer_set(
90                    get_ordinal(self.input_buffer_ordinals),
91                    ValidStreamBufferConstraints::try_from(input_constraints)?,
92                    self.stream_processor,
93                    BufferSetType::Input,
94                    self.options.input_buffer_collection_constraints.clone(),
95                )
96                .await?;
97
98                debug!("Sending input format details in response to input constraints.");
99                self.stream_processor.queue_input_format_details(
100                    self.stream_lifetime_ordinal,
101                    &self.stream.format_details(self.format_details_version_ordinal),
102                )?;
103
104                let chunk_stream = self.stream.capped_chunks(buffer_set.buffer_size);
105                self.input_packet_stream = Some(InputPacketStream::new(
106                    buffer_set,
107                    chunk_stream,
108                    self.stream_lifetime_ordinal,
109                ));
110                self.send_available_input()?;
111            }
112            StreamProcessorEvent::OnOutputConstraints { output_config } => {
113                debug!("Received output constraints.");
114                debug!("Output constraints are: {:#?}", output_config);
115
116                let constraints = ValidStreamOutputConstraints::try_from(output_config)?;
117                if constraints.buffer_constraints_action_required {
118                    self.output_buffer_set = Some(
119                        BufferSetFactory::buffer_set(
120                            get_ordinal(self.output_buffer_ordinals),
121                            constraints.buffer_constraints,
122                            self.stream_processor,
123                            BufferSetType::Output,
124                            self.options.output_buffer_collection_constraints.clone(),
125                        )
126                        .await?,
127                    );
128                }
129            }
130            StreamProcessorEvent::OnFreeInputPacket { free_input_packet } => {
131                debug!("Received freed input packet.");
132                debug!("Freed input packet is: {:#?}", free_input_packet);
133
134                let free_input_packet = ValidPacketHeader::try_from(free_input_packet)?;
135                let input_packet_stream = self.input_packet_stream.as_mut().expect(concat!(
136                    "Unwrapping packet stream; ",
137                    "it should be set before we ",
138                    "get free input packets back."
139                ));
140                input_packet_stream.add_free_packet(free_input_packet)?;
141                self.send_available_input()?;
142            }
143            StreamProcessorEvent::OnOutputFormat { output_format } => {
144                debug!("Received output format.");
145                debug!("Output format is: {:#?}", output_format);
146
147                let output_format = ValidStreamOutputFormat::try_from(output_format)?;
148                assert_eq!(output_format.stream_lifetime_ordinal, self.stream_lifetime_ordinal);
149                self.current_output_format = Some(Rc::new(output_format));
150            }
151            StreamProcessorEvent::OnOutputPacket {
152                output_packet,
153                error_detected_before,
154                error_detected_during,
155            } => {
156                assert!(!error_detected_before);
157                assert!(!error_detected_during);
158                debug!("Received output packet.");
159                debug!("Output packet is: {:#?}", output_packet);
160
161                let output_packet = ValidPacket::try_from(output_packet)?;
162                self.output.push(Output::Packet(OutputPacket {
163                    data: self
164                        .output_buffer_set
165                        .as_ref()
166                        .ok_or(FatalError(String::from(concat!(
167                            "There should be an output buffer set ",
168                            "if we are receiving output packets"
169                        ))))?
170                        .read_packet(&output_packet)?,
171                    format: self.current_output_format.clone().ok_or(FatalError(String::from(
172                        concat!(
173                            "There should be an output format set ",
174                            "if we are receiving output packets"
175                        ),
176                    )))?,
177                    packet: output_packet,
178                }));
179
180                self.stream_processor.recycle_output_packet(&PacketHeader {
181                    buffer_lifetime_ordinal: Some(output_packet.header.buffer_lifetime_ordinal),
182                    packet_index: Some(output_packet.header.packet_index),
183                    ..Default::default()
184                })?;
185
186                if self.options.stop_after_first_output {
187                    return Ok(StreamControlFlow::Stop);
188                }
189            }
190            StreamProcessorEvent::OnOutputEndOfStream {
191                stream_lifetime_ordinal,
192                error_detected_before,
193            } => {
194                assert!(!error_detected_before);
195                debug!("Received output end of stream.");
196                debug!("End of stream is for stream {}", stream_lifetime_ordinal);
197
198                // TODO(turnage): Enable the flush method of ending stream in options.
199                self.output.push(Output::Eos { stream_lifetime_ordinal });
200                self.stream_processor.close_current_stream(
201                    self.stream_lifetime_ordinal,
202                    self.options.release_input_buffers_at_end,
203                    self.options.release_output_buffers_at_end,
204                )?;
205                self.stream_processor.sync().await?;
206
207                // TODO(turnage): Some codecs return all input packets explicitly, not
208                //                implicitly. All codecs should return explicitly. For now
209                //                we forgive it but soon we want to check that all input
210                //                packets will come back.
211                return Ok(StreamControlFlow::Stop);
212            }
213            e => {
214                debug!("Got other event: {:#?}", e);
215            }
216        }
217
218        Ok(StreamControlFlow::Continue)
219    }
220
221    fn send_available_input(&'b mut self) -> Result<()> {
222        let input_packet_stream =
223            if let Some(input_packet_stream) = self.input_packet_stream.as_mut() {
224                input_packet_stream
225            } else {
226                return Ok(());
227            };
228
229        loop {
230            match input_packet_stream.next_packet()? {
231                PacketPoll::Ready(input_packet) => {
232                    debug!("Sending input packet. {:?}", input_packet.valid_length_bytes);
233                    self.stream_processor.queue_input_packet(&input_packet)?;
234                }
235                PacketPoll::Eos => {
236                    debug!("Sending end of stream.");
237                    break Ok(self
238                        .stream_processor
239                        .queue_input_end_of_stream(self.stream_lifetime_ordinal)?);
240                }
241                PacketPoll::NotReady => break Ok(()),
242            }
243        }
244    }
245}