1use crate::buffer_collection_constraints::*;
8use crate::Result;
9use anyhow::Context as _;
10use fidl::endpoints::{create_endpoints, ClientEnd, Proxy};
11use fidl_fuchsia_media::*;
12use fidl_fuchsia_sysmem2::*;
13use fuchsia_component::client;
14use fuchsia_stream_processors::*;
15use log::debug;
16use std::fmt;
17use std::iter::StepBy;
18use std::ops::RangeFrom;
19use thiserror::Error;
20use zx::{self as zx, AsHandleRef};
21
22#[derive(Debug, Error)]
23pub enum Error {
24 ReclaimClientTokenChannel,
25 ServerOmittedBufferVmo,
26 PacketReferencesInvalidBuffer,
27 VmoReadFail(zx::Status),
28}
29
30impl fmt::Display for Error {
31 fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
32 fmt::Debug::fmt(&self, w)
33 }
34}
35
36#[derive(Debug, Clone, Copy)]
38pub enum OrdinalPattern {
39 Odd,
41 All,
43}
44
45impl IntoIterator for OrdinalPattern {
46 type Item = u64;
47 type IntoIter = StepBy<RangeFrom<Self::Item>>;
48 fn into_iter(self) -> Self::IntoIter {
49 let (start, step) = match self {
50 OrdinalPattern::Odd => (1, 2),
51 OrdinalPattern::All => (1, 1),
52 };
53 (start..).step_by(step)
54 }
55}
56
57pub fn get_ordinal(pattern: &mut <OrdinalPattern as IntoIterator>::IntoIter) -> u64 {
58 pattern.next().expect("Getting next item in infinite pattern")
59}
60
61pub enum BufferSetType {
62 Input,
63 Output,
64}
65
66pub struct BufferSetFactory;
67
68fn set_allocator_name(sysmem_client: &fidl_fuchsia_sysmem2::AllocatorProxy) -> Result<()> {
69 let name = fuchsia_runtime::process_self().get_name()?;
70 let koid = fuchsia_runtime::process_self().get_koid()?;
71 Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
72 name: Some(name.to_string()),
73 id: Some(koid.raw_koid()),
74 ..Default::default()
75 })?)
76}
77
78const MIN_BUFFER_COUNT_FOR_CAMPING: u32 = 1;
81
82impl BufferSetFactory {
83 pub async fn buffer_set(
84 buffer_lifetime_ordinal: u64,
85 constraints: ValidStreamBufferConstraints,
86 codec: &mut StreamProcessorProxy,
87 buffer_set_type: BufferSetType,
88 buffer_collection_constraints: Option<BufferCollectionConstraints>,
89 ) -> Result<BufferSet> {
90 let (collection_client, settings) =
91 Self::settings(buffer_lifetime_ordinal, constraints, buffer_collection_constraints)
92 .await?;
93
94 debug!("Got settings; waiting for buffers. {:?}", settings);
95
96 match buffer_set_type {
97 BufferSetType::Input => codec
98 .set_input_buffer_partial_settings(settings)
99 .context("Sending input partial settings to codec")?,
100 BufferSetType::Output => codec
101 .set_output_buffer_partial_settings(settings)
102 .context("Sending output partial settings to codec")?,
103 };
104
105 let wait_result = collection_client
106 .wait_for_all_buffers_allocated()
107 .await
108 .context("Waiting for buffers")?;
109 debug!("Sysmem responded (None is success): {:?}", wait_result.as_ref().err());
110 let collection_info = wait_result
111 .map_err(|err| anyhow::format_err!("sysmem allocation error: {:?}", err))?
112 .buffer_collection_info
113 .unwrap();
114
115 if let BufferSetType::Output = buffer_set_type {
116 debug!("Completing settings for output.");
117 codec.complete_output_buffer_partial_settings(buffer_lifetime_ordinal)?;
118 }
119
120 debug!(
121 "Got {} buffers of size {:?}",
122 collection_info.buffers.as_ref().unwrap().len(),
123 collection_info
124 .settings
125 .as_ref()
126 .unwrap()
127 .buffer_settings
128 .as_ref()
129 .unwrap()
130 .size_bytes
131 .as_ref()
132 .unwrap()
133 );
134 debug!("Buffer collection is: {:#?}", collection_info.settings.as_ref().unwrap());
135 for (i, buffer) in collection_info.buffers.as_ref().unwrap().iter().enumerate() {
136 debug!("Buffer {} is : {:#?}", i, buffer);
137 }
138
139 Ok(BufferSet::try_from(BufferSetSpec {
140 proxy: collection_client,
141 buffer_lifetime_ordinal,
142 collection_info,
143 })?)
144 }
145
146 async fn settings(
147 buffer_lifetime_ordinal: u64,
148 constraints: ValidStreamBufferConstraints,
149 buffer_collection_constraints: Option<BufferCollectionConstraints>,
150 ) -> Result<(BufferCollectionProxy, StreamBufferPartialSettings)> {
151 let (client_token, client_token_request) =
152 create_endpoints::<BufferCollectionTokenMarker>();
153 let (codec_token, codec_token_request) = create_endpoints::<BufferCollectionTokenMarker>();
154 let client_token = client_token.into_proxy();
155
156 let sysmem_client =
157 client::connect_to_protocol::<AllocatorMarker>().context("Connecting to sysmem")?;
158
159 set_allocator_name(&sysmem_client).context("Setting sysmem allocator name")?;
160
161 sysmem_client
162 .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
163 token_request: Some(client_token_request),
164 ..Default::default()
165 })
166 .context("Allocating shared collection")?;
167 client_token.duplicate(BufferCollectionTokenDuplicateRequest {
168 rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
169 token_request: Some(codec_token_request),
170 ..Default::default()
171 })?;
172
173 let (collection_client, collection_request) = create_endpoints::<BufferCollectionMarker>();
174 sysmem_client.bind_shared_collection(AllocatorBindSharedCollectionRequest {
175 token: Some(
176 client_token.into_client_end().map_err(|_| Error::ReclaimClientTokenChannel)?,
177 ),
178 buffer_collection_request: Some(collection_request),
179 ..Default::default()
180 })?;
181 let collection_client = collection_client.into_proxy();
182 collection_client.sync().await.context("Syncing codec_token_request with sysmem")?;
183
184 let mut collection_constraints = buffer_collection_constraints
185 .unwrap_or_else(|| buffer_collection_constraints_default());
186 assert!(
187 collection_constraints.min_buffer_count_for_camping.is_none(),
188 "min_buffer_count_for_camping should be un-set before we've set it"
189 );
190 collection_constraints.min_buffer_count_for_camping = Some(MIN_BUFFER_COUNT_FOR_CAMPING);
191
192 debug!("Our buffer collection constraints are: {:#?}", collection_constraints);
193
194 collection_client
195 .set_constraints(BufferCollectionSetConstraintsRequest {
196 constraints: Some(collection_constraints),
197 ..Default::default()
198 })
199 .context("Sending buffer constraints to sysmem")?;
200
201 Ok((
202 collection_client,
203 StreamBufferPartialSettings {
204 buffer_lifetime_ordinal: Some(buffer_lifetime_ordinal),
205 buffer_constraints_version_ordinal: Some(
206 constraints.buffer_constraints_version_ordinal,
207 ),
208 sysmem_token: Some(
211 ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
212 codec_token.into_channel(),
213 ),
214 ),
215 ..Default::default()
216 },
217 ))
218 }
219}
220
221struct BufferSetSpec {
222 proxy: BufferCollectionProxy,
223 buffer_lifetime_ordinal: u64,
224 collection_info: BufferCollectionInfo,
225}
226
227#[derive(Debug, PartialEq)]
228pub struct Buffer {
229 pub data: zx::Vmo,
230 pub start: u64,
231 pub size: u64,
232}
233
234#[derive(Debug)]
235pub struct BufferSet {
236 pub proxy: BufferCollectionProxy,
237 pub buffers: Vec<Buffer>,
238 pub buffer_lifetime_ordinal: u64,
239 pub buffer_size: usize,
240}
241
242impl TryFrom<BufferSetSpec> for BufferSet {
243 type Error = anyhow::Error;
244 fn try_from(mut src: BufferSetSpec) -> std::result::Result<Self, Self::Error> {
245 let buffer_size = *src
246 .collection_info
247 .settings
248 .as_ref()
249 .unwrap()
250 .buffer_settings
251 .as_ref()
252 .unwrap()
253 .size_bytes
254 .as_ref()
255 .unwrap();
256 let buffer_count = src.collection_info.buffers.as_ref().unwrap().len();
257
258 let mut buffers = vec![];
259 for (i, buffer) in src.collection_info.buffers.as_mut().unwrap().iter_mut().enumerate() {
260 buffers.push(Buffer {
261 data: buffer.vmo.take().ok_or(Error::ServerOmittedBufferVmo).with_context(
262 || {
263 format!(
264 "Trying to ingest {}th buffer of {}: {:#?}",
265 i, buffer_count, buffer
266 )
267 },
268 )?,
269 start: *buffer.vmo_usable_start.as_ref().unwrap(),
270 size: buffer_size,
271 });
272 }
273
274 Ok(Self {
275 proxy: src.proxy,
276 buffers,
277 buffer_lifetime_ordinal: src.buffer_lifetime_ordinal,
278 buffer_size: buffer_size as usize,
279 })
280 }
281}
282
283impl BufferSet {
284 pub fn read_packet(&self, packet: &ValidPacket) -> Result<Vec<u8>> {
285 let buffer = self
286 .buffers
287 .get(packet.buffer_index as usize)
288 .ok_or(Error::PacketReferencesInvalidBuffer)?;
289 let mut dest = vec![0; packet.valid_length_bytes as usize];
290 buffer.data.read(&mut dest, packet.start_offset as u64).map_err(Error::VmoReadFail)?;
291 Ok(dest)
292 }
293}