1use crate::Result;
8use crate::buffer_collection_constraints::*;
9use anyhow::Context as _;
10use fidl::endpoints::{ClientEnd, Proxy, create_endpoints};
11use fidl_fuchsia_media::*;
12use fidl_fuchsia_sysmem2::*;
13use fuchsia_component::client;
14use fuchsia_stream_processors::*;
15use log::debug;
16use std::fmt;
17use std::iter::StepBy;
18use std::ops::RangeFrom;
19use thiserror::Error;
20
21#[derive(Debug, Error)]
22pub enum Error {
23 ReclaimClientTokenChannel,
24 ServerOmittedBufferVmo,
25 PacketReferencesInvalidBuffer,
26 VmoReadFail(zx::Status),
27}
28
29impl fmt::Display for Error {
30 fn fmt(&self, w: &mut fmt::Formatter<'_>) -> fmt::Result {
31 fmt::Debug::fmt(&self, w)
32 }
33}
34
35#[derive(Debug, Clone, Copy)]
37pub enum OrdinalPattern {
38 Odd,
40 All,
42}
43
44impl IntoIterator for OrdinalPattern {
45 type Item = u64;
46 type IntoIter = StepBy<RangeFrom<Self::Item>>;
47 fn into_iter(self) -> Self::IntoIter {
48 let (start, step) = match self {
49 OrdinalPattern::Odd => (1, 2),
50 OrdinalPattern::All => (1, 1),
51 };
52 (start..).step_by(step)
53 }
54}
55
56pub fn get_ordinal(pattern: &mut <OrdinalPattern as IntoIterator>::IntoIter) -> u64 {
57 pattern.next().expect("Getting next item in infinite pattern")
58}
59
60pub enum BufferSetType {
61 Input,
62 Output,
63}
64
65pub struct BufferSetFactory;
66
67fn set_allocator_name(sysmem_client: &fidl_fuchsia_sysmem2::AllocatorProxy) -> Result<()> {
68 let name = fuchsia_runtime::process_self().get_name()?;
69 let koid = fuchsia_runtime::process_self().koid()?;
70 Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
71 name: Some(name.to_string()),
72 id: Some(koid.raw_koid()),
73 ..Default::default()
74 })?)
75}
76
77const MIN_BUFFER_COUNT_FOR_CAMPING: u32 = 1;
80
81impl BufferSetFactory {
82 pub async fn buffer_set(
83 buffer_lifetime_ordinal: u64,
84 constraints: ValidStreamBufferConstraints,
85 codec: &mut StreamProcessorProxy,
86 buffer_set_type: BufferSetType,
87 buffer_collection_constraints: Option<BufferCollectionConstraints>,
88 ) -> Result<BufferSet> {
89 let (collection_client, settings) =
90 Self::settings(buffer_lifetime_ordinal, constraints, buffer_collection_constraints)
91 .await?;
92
93 debug!("Got settings; waiting for buffers. {:?}", settings);
94
95 match buffer_set_type {
96 BufferSetType::Input => codec
97 .set_input_buffer_partial_settings(settings)
98 .context("Sending input partial settings to codec")?,
99 BufferSetType::Output => codec
100 .set_output_buffer_partial_settings(settings)
101 .context("Sending output partial settings to codec")?,
102 };
103
104 let wait_result = collection_client
105 .wait_for_all_buffers_allocated()
106 .await
107 .context("Waiting for buffers")?;
108 debug!("Sysmem responded (None is success): {:?}", wait_result.as_ref().err());
109 let collection_info = wait_result
110 .map_err(|err| anyhow::format_err!("sysmem allocation error: {:?}", err))?
111 .buffer_collection_info
112 .unwrap();
113
114 if let BufferSetType::Output = buffer_set_type {
115 debug!("Completing settings for output.");
116 codec.complete_output_buffer_partial_settings(buffer_lifetime_ordinal)?;
117 }
118
119 debug!(
120 "Got {} buffers of size {:?}",
121 collection_info.buffers.as_ref().unwrap().len(),
122 collection_info
123 .settings
124 .as_ref()
125 .unwrap()
126 .buffer_settings
127 .as_ref()
128 .unwrap()
129 .size_bytes
130 .as_ref()
131 .unwrap()
132 );
133 debug!("Buffer collection is: {:#?}", collection_info.settings.as_ref().unwrap());
134 for (i, buffer) in collection_info.buffers.as_ref().unwrap().iter().enumerate() {
135 debug!("Buffer {} is : {:#?}", i, buffer);
136 }
137
138 Ok(BufferSet::try_from(BufferSetSpec {
139 proxy: collection_client,
140 buffer_lifetime_ordinal,
141 collection_info,
142 })?)
143 }
144
145 async fn settings(
146 buffer_lifetime_ordinal: u64,
147 constraints: ValidStreamBufferConstraints,
148 buffer_collection_constraints: Option<BufferCollectionConstraints>,
149 ) -> Result<(BufferCollectionProxy, StreamBufferPartialSettings)> {
150 let (client_token, client_token_request) =
151 create_endpoints::<BufferCollectionTokenMarker>();
152 let (codec_token, codec_token_request) = create_endpoints::<BufferCollectionTokenMarker>();
153 let client_token = client_token.into_proxy();
154
155 let sysmem_client =
156 client::connect_to_protocol::<AllocatorMarker>().context("Connecting to sysmem")?;
157
158 set_allocator_name(&sysmem_client).context("Setting sysmem allocator name")?;
159
160 sysmem_client
161 .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
162 token_request: Some(client_token_request),
163 ..Default::default()
164 })
165 .context("Allocating shared collection")?;
166 client_token.duplicate(BufferCollectionTokenDuplicateRequest {
167 rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
168 token_request: Some(codec_token_request),
169 ..Default::default()
170 })?;
171
172 let (collection_client, collection_request) = create_endpoints::<BufferCollectionMarker>();
173 sysmem_client.bind_shared_collection(AllocatorBindSharedCollectionRequest {
174 token: Some(
175 client_token.into_client_end().map_err(|_| Error::ReclaimClientTokenChannel)?,
176 ),
177 buffer_collection_request: Some(collection_request),
178 ..Default::default()
179 })?;
180 let collection_client = collection_client.into_proxy();
181 collection_client.sync().await.context("Syncing codec_token_request with sysmem")?;
182
183 let mut collection_constraints = buffer_collection_constraints
184 .unwrap_or_else(|| buffer_collection_constraints_default());
185 assert!(
186 collection_constraints.min_buffer_count_for_camping.is_none(),
187 "min_buffer_count_for_camping should be un-set before we've set it"
188 );
189 collection_constraints.min_buffer_count_for_camping = Some(MIN_BUFFER_COUNT_FOR_CAMPING);
190
191 debug!("Our buffer collection constraints are: {:#?}", collection_constraints);
192
193 collection_client
194 .set_constraints(BufferCollectionSetConstraintsRequest {
195 constraints: Some(collection_constraints),
196 ..Default::default()
197 })
198 .context("Sending buffer constraints to sysmem")?;
199
200 Ok((
201 collection_client,
202 StreamBufferPartialSettings {
203 buffer_lifetime_ordinal: Some(buffer_lifetime_ordinal),
204 buffer_constraints_version_ordinal: Some(
205 constraints.buffer_constraints_version_ordinal,
206 ),
207 sysmem_token: Some(
210 ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
211 codec_token.into_channel(),
212 ),
213 ),
214 ..Default::default()
215 },
216 ))
217 }
218}
219
220struct BufferSetSpec {
221 proxy: BufferCollectionProxy,
222 buffer_lifetime_ordinal: u64,
223 collection_info: BufferCollectionInfo,
224}
225
226#[derive(Debug, PartialEq)]
227pub struct Buffer {
228 pub data: zx::Vmo,
229 pub start: u64,
230 pub size: u64,
231}
232
233#[derive(Debug)]
234pub struct BufferSet {
235 pub proxy: BufferCollectionProxy,
236 pub buffers: Vec<Buffer>,
237 pub buffer_lifetime_ordinal: u64,
238 pub buffer_size: usize,
239}
240
241impl TryFrom<BufferSetSpec> for BufferSet {
242 type Error = anyhow::Error;
243 fn try_from(mut src: BufferSetSpec) -> std::result::Result<Self, Self::Error> {
244 let buffer_size = *src
245 .collection_info
246 .settings
247 .as_ref()
248 .unwrap()
249 .buffer_settings
250 .as_ref()
251 .unwrap()
252 .size_bytes
253 .as_ref()
254 .unwrap();
255 let buffer_count = src.collection_info.buffers.as_ref().unwrap().len();
256
257 let mut buffers = vec![];
258 for (i, buffer) in src.collection_info.buffers.as_mut().unwrap().iter_mut().enumerate() {
259 buffers.push(Buffer {
260 data: buffer.vmo.take().ok_or(Error::ServerOmittedBufferVmo).with_context(
261 || {
262 format!(
263 "Trying to ingest {}th buffer of {}: {:#?}",
264 i, buffer_count, buffer
265 )
266 },
267 )?,
268 start: *buffer.vmo_usable_start.as_ref().unwrap(),
269 size: buffer_size,
270 });
271 }
272
273 Ok(Self {
274 proxy: src.proxy,
275 buffers,
276 buffer_lifetime_ordinal: src.buffer_lifetime_ordinal,
277 buffer_size: buffer_size as usize,
278 })
279 }
280}
281
282impl BufferSet {
283 pub fn read_packet(&self, packet: &ValidPacket) -> Result<Vec<u8>> {
284 let buffer = self
285 .buffers
286 .get(packet.buffer_index as usize)
287 .ok_or(Error::PacketReferencesInvalidBuffer)?;
288 let mut dest = vec![0; packet.valid_length_bytes as usize];
289 buffer.data.read(&mut dest, packet.start_offset as u64).map_err(Error::VmoReadFail)?;
290 Ok(dest)
291 }
292}