stream_processor_test/
input_packet_stream.rs1use crate::buffer_set::*;
2use crate::elementary_stream::*;
3use fidl_fuchsia_media::*;
8use fuchsia_stream_processors::*;
9
10use std::collections::HashMap;
11use std::fmt;
12use thiserror::Error;
13
14type PacketIdx = u32;
15type BufferIdx = u32;
16
17pub struct InputPacketStream<I> {
19 packet_and_buffer_pairs: HashMap<PacketIdx, (BufferIdx, UsageStatus)>,
20 buffer_set: BufferSet,
21 stream_lifetime_ordinal: u64,
22 stream: I,
23 sent_eos: bool,
24}
25
26#[derive(Copy, Clone, PartialEq, Debug)]
27enum UsageStatus {
28 Free,
29 InUse,
30}
31
32#[derive(Debug, Error)]
33pub enum Error {
34 PacketRefersToInvalidBuffer,
35 BufferTooSmall { buffer_size: usize, stream_chunk_size: usize },
36 VmoWriteFail(zx::Status),
37}
38
39impl fmt::Display for Error {
40 fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
41 fmt::Debug::fmt(&self, w)
42 }
43}
44
45pub enum PacketPoll {
46 Ready(Packet),
47 Eos,
48 NotReady,
49}
50
51impl<'a, I: Iterator<Item = ElementaryStreamChunk>> InputPacketStream<I> {
52 pub fn new(buffer_set: BufferSet, stream: I, stream_lifetime_ordinal: u64) -> Self {
53 let packets = 0..(buffer_set.buffers.len() as u32);
57 let buffers = packets.clone().rev().map(|idx| (idx, UsageStatus::Free));
58 Self {
59 packet_and_buffer_pairs: packets.zip(buffers).collect(),
60 buffer_set,
61 stream_lifetime_ordinal,
62 stream,
63 sent_eos: false,
64 }
65 }
66
67 pub fn add_free_packet(&mut self, packet: ValidPacketHeader) -> Result<(), Error> {
68 let (_, ref mut status) = *self
69 .packet_and_buffer_pairs
70 .get_mut(&packet.packet_index)
71 .ok_or(Error::PacketRefersToInvalidBuffer)?;
72 *status = UsageStatus::Free;
73 Ok(())
74 }
75
76 fn free_packet_and_buffer(&mut self) -> Option<(u32, u32)> {
77 self.packet_and_buffer_pairs.iter_mut().find_map(|(packet, (buffer, usage))| match usage {
79 UsageStatus::Free => {
80 *usage = UsageStatus::InUse;
81 Some((*packet, *buffer))
82 }
83 UsageStatus::InUse => None,
84 })
85 }
86
87 pub fn next_packet(&mut self) -> Result<PacketPoll, Error> {
88 let (packet_idx, buffer_idx) = if let Some(idxs) = self.free_packet_and_buffer() {
89 idxs
90 } else {
91 return Ok(PacketPoll::NotReady);
92 };
93
94 let chunk = if let Some(chunk) = self.stream.next() {
95 chunk
96 } else if !self.sent_eos {
97 self.sent_eos = true;
98 return Ok(PacketPoll::Eos);
99 } else {
100 return Ok(PacketPoll::NotReady);
101 };
102
103 let buffer = self
104 .buffer_set
105 .buffers
106 .get(buffer_idx as usize)
107 .ok_or(Error::PacketRefersToInvalidBuffer)?;
108
109 if (buffer.size as usize) < chunk.data.len() {
110 return Err(Error::BufferTooSmall {
111 buffer_size: buffer.size as usize,
112 stream_chunk_size: chunk.data.len(),
113 });
114 }
115
116 buffer.data.write(&chunk.data, 0).map_err(Error::VmoWriteFail)?;
117
118 Ok(PacketPoll::Ready(Packet {
119 header: Some(PacketHeader {
120 packet_index: Some(packet_idx),
121 buffer_lifetime_ordinal: Some(self.buffer_set.buffer_lifetime_ordinal),
122 ..Default::default()
123 }),
124 buffer_index: Some(buffer_idx),
125 stream_lifetime_ordinal: Some(self.stream_lifetime_ordinal),
126 start_offset: Some(0),
127 valid_length_bytes: Some(chunk.data.len() as u32),
128 timestamp_ish: chunk.timestamp,
129 start_access_unit: Some(chunk.start_access_unit),
130 known_end_access_unit: Some(chunk.known_end_access_unit),
131 ..Default::default()
132 }))
133 }
134
135 pub fn take_buffer_set(self) -> BufferSet {
136 self.buffer_set
137 }
138}