stream_processor_test/
input_packet_stream.rs

1use crate::buffer_set::*;
2use crate::elementary_stream::*;
3// Copyright 2019 The Fuchsia Authors. All rights reserved.
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7use fidl_fuchsia_media::*;
8use fuchsia_stream_processors::*;
9
10use std::collections::HashMap;
11use std::fmt;
12use thiserror::Error;
13
14type PacketIdx = u32;
15type BufferIdx = u32;
16
17/// A stream converting elementary stream chunks into input packets for a stream processor.
18pub struct InputPacketStream<I> {
19    packet_and_buffer_pairs: HashMap<PacketIdx, (BufferIdx, UsageStatus)>,
20    buffer_set: BufferSet,
21    stream_lifetime_ordinal: u64,
22    stream: I,
23    sent_eos: bool,
24}
25
26#[derive(Copy, Clone, PartialEq, Debug)]
27enum UsageStatus {
28    Free,
29    InUse,
30}
31
32#[derive(Debug, Error)]
33pub enum Error {
34    PacketRefersToInvalidBuffer,
35    BufferTooSmall { buffer_size: usize, stream_chunk_size: usize },
36    VmoWriteFail(zx::Status),
37}
38
39impl fmt::Display for Error {
40    fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
41        fmt::Debug::fmt(&self, w)
42    }
43}
44
45pub enum PacketPoll {
46    Ready(Packet),
47    Eos,
48    NotReady,
49}
50
51impl<'a, I: Iterator<Item = ElementaryStreamChunk>> InputPacketStream<I> {
52    pub fn new(buffer_set: BufferSet, stream: I, stream_lifetime_ordinal: u64) -> Self {
53        // The official # of packets / usable packet_index values can be greater than this (for
54        // now), but we don't need to use more packets than buffers, and we know # of packets will
55        // be at least buffer_count.
56        let packets = 0..(buffer_set.buffers.len() as u32);
57        let buffers = packets.clone().rev().map(|idx| (idx, UsageStatus::Free));
58        Self {
59            packet_and_buffer_pairs: packets.zip(buffers).collect(),
60            buffer_set,
61            stream_lifetime_ordinal,
62            stream,
63            sent_eos: false,
64        }
65    }
66
67    pub fn add_free_packet(&mut self, packet: ValidPacketHeader) -> Result<(), Error> {
68        let (_, ref mut status) = *self
69            .packet_and_buffer_pairs
70            .get_mut(&packet.packet_index)
71            .ok_or(Error::PacketRefersToInvalidBuffer)?;
72        *status = UsageStatus::Free;
73        Ok(())
74    }
75
76    fn free_packet_and_buffer(&mut self) -> Option<(u32, u32)> {
77        // This is a linear search. This may not be appropriate in prod code.
78        self.packet_and_buffer_pairs.iter_mut().find_map(|(packet, (buffer, usage))| match usage {
79            UsageStatus::Free => {
80                *usage = UsageStatus::InUse;
81                Some((*packet, *buffer))
82            }
83            UsageStatus::InUse => None,
84        })
85    }
86
87    pub fn next_packet(&mut self) -> Result<PacketPoll, Error> {
88        let (packet_idx, buffer_idx) = if let Some(idxs) = self.free_packet_and_buffer() {
89            idxs
90        } else {
91            return Ok(PacketPoll::NotReady);
92        };
93
94        let chunk = if let Some(chunk) = self.stream.next() {
95            chunk
96        } else if !self.sent_eos {
97            self.sent_eos = true;
98            return Ok(PacketPoll::Eos);
99        } else {
100            return Ok(PacketPoll::NotReady);
101        };
102
103        let buffer = self
104            .buffer_set
105            .buffers
106            .get(buffer_idx as usize)
107            .ok_or(Error::PacketRefersToInvalidBuffer)?;
108
109        if (buffer.size as usize) < chunk.data.len() {
110            return Err(Error::BufferTooSmall {
111                buffer_size: buffer.size as usize,
112                stream_chunk_size: chunk.data.len(),
113            });
114        }
115
116        buffer.data.write(&chunk.data, 0).map_err(Error::VmoWriteFail)?;
117
118        Ok(PacketPoll::Ready(Packet {
119            header: Some(PacketHeader {
120                packet_index: Some(packet_idx),
121                buffer_lifetime_ordinal: Some(self.buffer_set.buffer_lifetime_ordinal),
122                ..Default::default()
123            }),
124            buffer_index: Some(buffer_idx),
125            stream_lifetime_ordinal: Some(self.stream_lifetime_ordinal),
126            start_offset: Some(0),
127            valid_length_bytes: Some(chunk.data.len() as u32),
128            timestamp_ish: chunk.timestamp,
129            start_access_unit: Some(chunk.start_access_unit),
130            known_end_access_unit: Some(chunk.known_end_access_unit),
131            ..Default::default()
132        }))
133    }
134
135    pub fn take_buffer_set(self) -> BufferSet {
136        self.buffer_set
137    }
138}