stream_processor_test/
buffer_set.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Handles negotiating buffer sets with the codec server and sysmem.
6
7use crate::Result;
8use crate::buffer_collection_constraints::*;
9use anyhow::Context as _;
10use fidl::endpoints::{ClientEnd, Proxy, create_endpoints};
11use fidl_fuchsia_media::*;
12use fidl_fuchsia_sysmem2::*;
13use fuchsia_component::client;
14use fuchsia_stream_processors::*;
15use log::debug;
16use std::fmt;
17use std::iter::StepBy;
18use std::ops::RangeFrom;
19use thiserror::Error;
20
21#[derive(Debug, Error)]
22pub enum Error {
23    ReclaimClientTokenChannel,
24    ServerOmittedBufferVmo,
25    PacketReferencesInvalidBuffer,
26    VmoReadFail(zx::Status),
27}
28
29impl fmt::Display for Error {
30    fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
31        fmt::Debug::fmt(&self, w)
32    }
33}
34
35/// The pattern to use when advancing ordinals.
36#[derive(Debug, Clone, Copy)]
37pub enum OrdinalPattern {
38    /// Odd ordinal pattern starts at 1 and moves in increments of 2: [1,3,5..]
39    Odd,
40    /// All ordinal pattern starts at 1 and moves in increments of 1: [1,2,3..]
41    All,
42}
43
44impl IntoIterator for OrdinalPattern {
45    type Item = u64;
46    type IntoIter = StepBy<RangeFrom<Self::Item>>;
47    fn into_iter(self) -> Self::IntoIter {
48        let (start, step) = match self {
49            OrdinalPattern::Odd => (1, 2),
50            OrdinalPattern::All => (1, 1),
51        };
52        (start..).step_by(step)
53    }
54}
55
56pub fn get_ordinal(pattern: &mut <OrdinalPattern as IntoIterator>::IntoIter) -> u64 {
57    pattern.next().expect("Getting next item in infinite pattern")
58}
59
60pub enum BufferSetType {
61    Input,
62    Output,
63}
64
65pub struct BufferSetFactory;
66
67fn set_allocator_name(sysmem_client: &fidl_fuchsia_sysmem2::AllocatorProxy) -> Result<()> {
68    let name = fuchsia_runtime::process_self().get_name()?;
69    let koid = fuchsia_runtime::process_self().koid()?;
70    Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
71        name: Some(name.to_string()),
72        id: Some(koid.raw_koid()),
73        ..Default::default()
74    })?)
75}
76
77// This client only intends to be filling one input buffer or hashing one output buffer at any given
78// time.
79const MIN_BUFFER_COUNT_FOR_CAMPING: u32 = 1;
80
81impl BufferSetFactory {
82    pub async fn buffer_set(
83        buffer_lifetime_ordinal: u64,
84        constraints: ValidStreamBufferConstraints,
85        codec: &mut StreamProcessorProxy,
86        buffer_set_type: BufferSetType,
87        buffer_collection_constraints: Option<BufferCollectionConstraints>,
88    ) -> Result<BufferSet> {
89        let (collection_client, settings) =
90            Self::settings(buffer_lifetime_ordinal, constraints, buffer_collection_constraints)
91                .await?;
92
93        debug!("Got settings; waiting for buffers. {:?}", settings);
94
95        match buffer_set_type {
96            BufferSetType::Input => codec
97                .set_input_buffer_partial_settings(settings)
98                .context("Sending input partial settings to codec")?,
99            BufferSetType::Output => codec
100                .set_output_buffer_partial_settings(settings)
101                .context("Sending output partial settings to codec")?,
102        };
103
104        let wait_result = collection_client
105            .wait_for_all_buffers_allocated()
106            .await
107            .context("Waiting for buffers")?;
108        debug!("Sysmem responded (None is success): {:?}", wait_result.as_ref().err());
109        let collection_info = wait_result
110            .map_err(|err| anyhow::format_err!("sysmem allocation error: {:?}", err))?
111            .buffer_collection_info
112            .unwrap();
113
114        if let BufferSetType::Output = buffer_set_type {
115            debug!("Completing settings for output.");
116            codec.complete_output_buffer_partial_settings(buffer_lifetime_ordinal)?;
117        }
118
119        debug!(
120            "Got {} buffers of size {:?}",
121            collection_info.buffers.as_ref().unwrap().len(),
122            collection_info
123                .settings
124                .as_ref()
125                .unwrap()
126                .buffer_settings
127                .as_ref()
128                .unwrap()
129                .size_bytes
130                .as_ref()
131                .unwrap()
132        );
133        debug!("Buffer collection is: {:#?}", collection_info.settings.as_ref().unwrap());
134        for (i, buffer) in collection_info.buffers.as_ref().unwrap().iter().enumerate() {
135            debug!("Buffer {} is : {:#?}", i, buffer);
136        }
137
138        Ok(BufferSet::try_from(BufferSetSpec {
139            proxy: collection_client,
140            buffer_lifetime_ordinal,
141            collection_info,
142        })?)
143    }
144
145    async fn settings(
146        buffer_lifetime_ordinal: u64,
147        constraints: ValidStreamBufferConstraints,
148        buffer_collection_constraints: Option<BufferCollectionConstraints>,
149    ) -> Result<(BufferCollectionProxy, StreamBufferPartialSettings)> {
150        let (client_token, client_token_request) =
151            create_endpoints::<BufferCollectionTokenMarker>();
152        let (codec_token, codec_token_request) = create_endpoints::<BufferCollectionTokenMarker>();
153        let client_token = client_token.into_proxy();
154
155        let sysmem_client =
156            client::connect_to_protocol::<AllocatorMarker>().context("Connecting to sysmem")?;
157
158        set_allocator_name(&sysmem_client).context("Setting sysmem allocator name")?;
159
160        sysmem_client
161            .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
162                token_request: Some(client_token_request),
163                ..Default::default()
164            })
165            .context("Allocating shared collection")?;
166        client_token.duplicate(BufferCollectionTokenDuplicateRequest {
167            rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
168            token_request: Some(codec_token_request),
169            ..Default::default()
170        })?;
171
172        let (collection_client, collection_request) = create_endpoints::<BufferCollectionMarker>();
173        sysmem_client.bind_shared_collection(AllocatorBindSharedCollectionRequest {
174            token: Some(
175                client_token.into_client_end().map_err(|_| Error::ReclaimClientTokenChannel)?,
176            ),
177            buffer_collection_request: Some(collection_request),
178            ..Default::default()
179        })?;
180        let collection_client = collection_client.into_proxy();
181        collection_client.sync().await.context("Syncing codec_token_request with sysmem")?;
182
183        let mut collection_constraints = buffer_collection_constraints
184            .unwrap_or_else(|| buffer_collection_constraints_default());
185        assert!(
186            collection_constraints.min_buffer_count_for_camping.is_none(),
187            "min_buffer_count_for_camping should be un-set before we've set it"
188        );
189        collection_constraints.min_buffer_count_for_camping = Some(MIN_BUFFER_COUNT_FOR_CAMPING);
190
191        debug!("Our buffer collection constraints are: {:#?}", collection_constraints);
192
193        collection_client
194            .set_constraints(BufferCollectionSetConstraintsRequest {
195                constraints: Some(collection_constraints),
196                ..Default::default()
197            })
198            .context("Sending buffer constraints to sysmem")?;
199
200        Ok((
201            collection_client,
202            StreamBufferPartialSettings {
203                buffer_lifetime_ordinal: Some(buffer_lifetime_ordinal),
204                buffer_constraints_version_ordinal: Some(
205                    constraints.buffer_constraints_version_ordinal,
206                ),
207                // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so we
208                // can convert here until StreamBufferPartialSettings has a sysmem2 token field.
209                sysmem_token: Some(
210                    ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
211                        codec_token.into_channel(),
212                    ),
213                ),
214                ..Default::default()
215            },
216        ))
217    }
218}
219
220struct BufferSetSpec {
221    proxy: BufferCollectionProxy,
222    buffer_lifetime_ordinal: u64,
223    collection_info: BufferCollectionInfo,
224}
225
226#[derive(Debug, PartialEq)]
227pub struct Buffer {
228    pub data: zx::Vmo,
229    pub start: u64,
230    pub size: u64,
231}
232
233#[derive(Debug)]
234pub struct BufferSet {
235    pub proxy: BufferCollectionProxy,
236    pub buffers: Vec<Buffer>,
237    pub buffer_lifetime_ordinal: u64,
238    pub buffer_size: usize,
239}
240
241impl TryFrom<BufferSetSpec> for BufferSet {
242    type Error = anyhow::Error;
243    fn try_from(mut src: BufferSetSpec) -> std::result::Result<Self, Self::Error> {
244        let buffer_size = *src
245            .collection_info
246            .settings
247            .as_ref()
248            .unwrap()
249            .buffer_settings
250            .as_ref()
251            .unwrap()
252            .size_bytes
253            .as_ref()
254            .unwrap();
255        let buffer_count = src.collection_info.buffers.as_ref().unwrap().len();
256
257        let mut buffers = vec![];
258        for (i, buffer) in src.collection_info.buffers.as_mut().unwrap().iter_mut().enumerate() {
259            buffers.push(Buffer {
260                data: buffer.vmo.take().ok_or(Error::ServerOmittedBufferVmo).with_context(
261                    || {
262                        format!(
263                            "Trying to ingest {}th buffer of {}: {:#?}",
264                            i, buffer_count, buffer
265                        )
266                    },
267                )?,
268                start: *buffer.vmo_usable_start.as_ref().unwrap(),
269                size: buffer_size,
270            });
271        }
272
273        Ok(Self {
274            proxy: src.proxy,
275            buffers,
276            buffer_lifetime_ordinal: src.buffer_lifetime_ordinal,
277            buffer_size: buffer_size as usize,
278        })
279    }
280}
281
282impl BufferSet {
283    pub fn read_packet(&self, packet: &ValidPacket) -> Result<Vec<u8>> {
284        let buffer = self
285            .buffers
286            .get(packet.buffer_index as usize)
287            .ok_or(Error::PacketReferencesInvalidBuffer)?;
288        let mut dest = vec![0; packet.valid_length_bytes as usize];
289        buffer.data.read(&mut dest, packet.start_offset as u64).map_err(Error::VmoReadFail)?;
290        Ok(dest)
291    }
292}