stream_processor_test/
buffer_set.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Handles negotiating buffer sets with the codec server and sysmem.
6
7use crate::buffer_collection_constraints::*;
8use crate::Result;
9use anyhow::Context as _;
10use fidl::endpoints::{create_endpoints, ClientEnd, Proxy};
11use fidl_fuchsia_media::*;
12use fidl_fuchsia_sysmem2::*;
13use fuchsia_component::client;
14use fuchsia_stream_processors::*;
15use log::debug;
16use std::fmt;
17use std::iter::StepBy;
18use std::ops::RangeFrom;
19use thiserror::Error;
20use zx::{self as zx, AsHandleRef};
21
22#[derive(Debug, Error)]
23pub enum Error {
24    ReclaimClientTokenChannel,
25    ServerOmittedBufferVmo,
26    PacketReferencesInvalidBuffer,
27    VmoReadFail(zx::Status),
28}
29
30impl fmt::Display for Error {
31    fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
32        fmt::Debug::fmt(&self, w)
33    }
34}
35
36/// The pattern to use when advancing ordinals.
37#[derive(Debug, Clone, Copy)]
38pub enum OrdinalPattern {
39    /// Odd ordinal pattern starts at 1 and moves in increments of 2: [1,3,5..]
40    Odd,
41    /// All ordinal pattern starts at 1 and moves in increments of 1: [1,2,3..]
42    All,
43}
44
45impl IntoIterator for OrdinalPattern {
46    type Item = u64;
47    type IntoIter = StepBy<RangeFrom<Self::Item>>;
48    fn into_iter(self) -> Self::IntoIter {
49        let (start, step) = match self {
50            OrdinalPattern::Odd => (1, 2),
51            OrdinalPattern::All => (1, 1),
52        };
53        (start..).step_by(step)
54    }
55}
56
57pub fn get_ordinal(pattern: &mut <OrdinalPattern as IntoIterator>::IntoIter) -> u64 {
58    pattern.next().expect("Getting next item in infinite pattern")
59}
60
61pub enum BufferSetType {
62    Input,
63    Output,
64}
65
66pub struct BufferSetFactory;
67
68fn set_allocator_name(sysmem_client: &fidl_fuchsia_sysmem2::AllocatorProxy) -> Result<()> {
69    let name = fuchsia_runtime::process_self().get_name()?;
70    let koid = fuchsia_runtime::process_self().get_koid()?;
71    Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
72        name: Some(name.to_string()),
73        id: Some(koid.raw_koid()),
74        ..Default::default()
75    })?)
76}
77
78// This client only intends to be filling one input buffer or hashing one output buffer at any given
79// time.
80const MIN_BUFFER_COUNT_FOR_CAMPING: u32 = 1;
81
82impl BufferSetFactory {
83    pub async fn buffer_set(
84        buffer_lifetime_ordinal: u64,
85        constraints: ValidStreamBufferConstraints,
86        codec: &mut StreamProcessorProxy,
87        buffer_set_type: BufferSetType,
88        buffer_collection_constraints: Option<BufferCollectionConstraints>,
89    ) -> Result<BufferSet> {
90        let (collection_client, settings) =
91            Self::settings(buffer_lifetime_ordinal, constraints, buffer_collection_constraints)
92                .await?;
93
94        debug!("Got settings; waiting for buffers. {:?}", settings);
95
96        match buffer_set_type {
97            BufferSetType::Input => codec
98                .set_input_buffer_partial_settings(settings)
99                .context("Sending input partial settings to codec")?,
100            BufferSetType::Output => codec
101                .set_output_buffer_partial_settings(settings)
102                .context("Sending output partial settings to codec")?,
103        };
104
105        let wait_result = collection_client
106            .wait_for_all_buffers_allocated()
107            .await
108            .context("Waiting for buffers")?;
109        debug!("Sysmem responded (None is success): {:?}", wait_result.as_ref().err());
110        let collection_info = wait_result
111            .map_err(|err| anyhow::format_err!("sysmem allocation error: {:?}", err))?
112            .buffer_collection_info
113            .unwrap();
114
115        if let BufferSetType::Output = buffer_set_type {
116            debug!("Completing settings for output.");
117            codec.complete_output_buffer_partial_settings(buffer_lifetime_ordinal)?;
118        }
119
120        debug!(
121            "Got {} buffers of size {:?}",
122            collection_info.buffers.as_ref().unwrap().len(),
123            collection_info
124                .settings
125                .as_ref()
126                .unwrap()
127                .buffer_settings
128                .as_ref()
129                .unwrap()
130                .size_bytes
131                .as_ref()
132                .unwrap()
133        );
134        debug!("Buffer collection is: {:#?}", collection_info.settings.as_ref().unwrap());
135        for (i, buffer) in collection_info.buffers.as_ref().unwrap().iter().enumerate() {
136            debug!("Buffer {} is : {:#?}", i, buffer);
137        }
138
139        Ok(BufferSet::try_from(BufferSetSpec {
140            proxy: collection_client,
141            buffer_lifetime_ordinal,
142            collection_info,
143        })?)
144    }
145
146    async fn settings(
147        buffer_lifetime_ordinal: u64,
148        constraints: ValidStreamBufferConstraints,
149        buffer_collection_constraints: Option<BufferCollectionConstraints>,
150    ) -> Result<(BufferCollectionProxy, StreamBufferPartialSettings)> {
151        let (client_token, client_token_request) =
152            create_endpoints::<BufferCollectionTokenMarker>();
153        let (codec_token, codec_token_request) = create_endpoints::<BufferCollectionTokenMarker>();
154        let client_token = client_token.into_proxy();
155
156        let sysmem_client =
157            client::connect_to_protocol::<AllocatorMarker>().context("Connecting to sysmem")?;
158
159        set_allocator_name(&sysmem_client).context("Setting sysmem allocator name")?;
160
161        sysmem_client
162            .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
163                token_request: Some(client_token_request),
164                ..Default::default()
165            })
166            .context("Allocating shared collection")?;
167        client_token.duplicate(BufferCollectionTokenDuplicateRequest {
168            rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
169            token_request: Some(codec_token_request),
170            ..Default::default()
171        })?;
172
173        let (collection_client, collection_request) = create_endpoints::<BufferCollectionMarker>();
174        sysmem_client.bind_shared_collection(AllocatorBindSharedCollectionRequest {
175            token: Some(
176                client_token.into_client_end().map_err(|_| Error::ReclaimClientTokenChannel)?,
177            ),
178            buffer_collection_request: Some(collection_request),
179            ..Default::default()
180        })?;
181        let collection_client = collection_client.into_proxy();
182        collection_client.sync().await.context("Syncing codec_token_request with sysmem")?;
183
184        let mut collection_constraints = buffer_collection_constraints
185            .unwrap_or_else(|| buffer_collection_constraints_default());
186        assert!(
187            collection_constraints.min_buffer_count_for_camping.is_none(),
188            "min_buffer_count_for_camping should be un-set before we've set it"
189        );
190        collection_constraints.min_buffer_count_for_camping = Some(MIN_BUFFER_COUNT_FOR_CAMPING);
191
192        debug!("Our buffer collection constraints are: {:#?}", collection_constraints);
193
194        collection_client
195            .set_constraints(BufferCollectionSetConstraintsRequest {
196                constraints: Some(collection_constraints),
197                ..Default::default()
198            })
199            .context("Sending buffer constraints to sysmem")?;
200
201        Ok((
202            collection_client,
203            StreamBufferPartialSettings {
204                buffer_lifetime_ordinal: Some(buffer_lifetime_ordinal),
205                buffer_constraints_version_ordinal: Some(
206                    constraints.buffer_constraints_version_ordinal,
207                ),
208                // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so we
209                // can convert here until StreamBufferPartialSettings has a sysmem2 token field.
210                sysmem_token: Some(
211                    ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
212                        codec_token.into_channel(),
213                    ),
214                ),
215                ..Default::default()
216            },
217        ))
218    }
219}
220
221struct BufferSetSpec {
222    proxy: BufferCollectionProxy,
223    buffer_lifetime_ordinal: u64,
224    collection_info: BufferCollectionInfo,
225}
226
227#[derive(Debug, PartialEq)]
228pub struct Buffer {
229    pub data: zx::Vmo,
230    pub start: u64,
231    pub size: u64,
232}
233
234#[derive(Debug)]
235pub struct BufferSet {
236    pub proxy: BufferCollectionProxy,
237    pub buffers: Vec<Buffer>,
238    pub buffer_lifetime_ordinal: u64,
239    pub buffer_size: usize,
240}
241
242impl TryFrom<BufferSetSpec> for BufferSet {
243    type Error = anyhow::Error;
244    fn try_from(mut src: BufferSetSpec) -> std::result::Result<Self, Self::Error> {
245        let buffer_size = *src
246            .collection_info
247            .settings
248            .as_ref()
249            .unwrap()
250            .buffer_settings
251            .as_ref()
252            .unwrap()
253            .size_bytes
254            .as_ref()
255            .unwrap();
256        let buffer_count = src.collection_info.buffers.as_ref().unwrap().len();
257
258        let mut buffers = vec![];
259        for (i, buffer) in src.collection_info.buffers.as_mut().unwrap().iter_mut().enumerate() {
260            buffers.push(Buffer {
261                data: buffer.vmo.take().ok_or(Error::ServerOmittedBufferVmo).with_context(
262                    || {
263                        format!(
264                            "Trying to ingest {}th buffer of {}: {:#?}",
265                            i, buffer_count, buffer
266                        )
267                    },
268                )?,
269                start: *buffer.vmo_usable_start.as_ref().unwrap(),
270                size: buffer_size,
271            });
272        }
273
274        Ok(Self {
275            proxy: src.proxy,
276            buffers,
277            buffer_lifetime_ordinal: src.buffer_lifetime_ordinal,
278            buffer_size: buffer_size as usize,
279        })
280    }
281}
282
283impl BufferSet {
284    pub fn read_packet(&self, packet: &ValidPacket) -> Result<Vec<u8>> {
285        let buffer = self
286            .buffers
287            .get(packet.buffer_index as usize)
288            .ok_or(Error::PacketReferencesInvalidBuffer)?;
289        let mut dest = vec![0; packet.valid_length_bytes as usize];
290        buffer.data.read(&mut dest, packet.start_offset as u64).map_err(Error::VmoReadFail)?;
291        Ok(dest)
292    }
293}