1use anyhow::{format_err, Context as _, Error};
6use fidl::client::QueryResponseFut;
7use fidl::endpoints::{ClientEnd, Proxy};
8use fidl_fuchsia_sysmem2::{
9 AllocatorAllocateSharedCollectionRequest, AllocatorBindSharedCollectionRequest, AllocatorProxy,
10 AllocatorSetDebugClientInfoRequest, BufferCollectionConstraints, BufferCollectionMarker,
11 BufferCollectionProxy, BufferCollectionSetConstraintsRequest,
12 BufferCollectionTokenDuplicateRequest, BufferCollectionTokenMarker,
13 BufferCollectionWaitForAllBuffersAllocatedResponse,
14 BufferCollectionWaitForAllBuffersAllocatedResult, BufferMemorySettings, NodeSetNameRequest,
15};
16use futures::future::{FusedFuture, Future};
17use futures::task::{Context, Poll};
18use futures::{ready, FutureExt};
19use log::error;
20use std::pin::Pin;
21use zx::{self as zx, AsHandleRef};
22
23#[derive(Debug)]
25pub struct SysmemAllocatedBuffers {
26 buffers: Vec<zx::Vmo>,
27 settings: BufferMemorySettings,
28 _buffer_collection: BufferCollectionProxy,
29}
30
31#[derive(Debug)]
32pub struct BufferName<'a> {
33 pub name: &'a str,
34 pub priority: u32,
35}
36
37#[derive(Debug)]
38pub struct AllocatorDebugInfo {
39 pub name: String,
40 pub id: u64,
41}
42
43fn default_allocator_name() -> Result<AllocatorDebugInfo, Error> {
44 let name = fuchsia_runtime::process_self().get_name()?;
45 let koid = fuchsia_runtime::process_self().get_koid()?;
46 Ok(AllocatorDebugInfo { name: name.to_string(), id: koid.raw_koid() })
47}
48
49fn set_allocator_name(
50 sysmem_client: &AllocatorProxy,
51 debug_info: Option<AllocatorDebugInfo>,
52) -> Result<(), Error> {
53 let unwrapped_debug_info = match debug_info {
54 Some(x) => x,
55 None => default_allocator_name()?,
56 };
57 Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
58 name: Some(unwrapped_debug_info.name),
59 id: Some(unwrapped_debug_info.id),
60 ..Default::default()
61 })?)
62}
63
64impl SysmemAllocatedBuffers {
65 pub fn settings(&self) -> &BufferMemorySettings {
68 &self.settings
69 }
70
71 pub fn get_mut(&mut self, idx: u32) -> Option<&mut zx::Vmo> {
73 let idx = idx as usize;
74 return self.buffers.get_mut(idx);
75 }
76
77 pub fn len(&self) -> u32 {
80 self.buffers.len().try_into().expect("buffers should fit in u32")
81 }
82}
83
84#[allow(clippy::large_enum_variant)] pub enum SysmemAllocation {
88 Pending,
89 WaitingForSync {
91 future: QueryResponseFut<()>,
92 token_fn: Option<Box<dyn FnOnce() -> () + Send + Sync>>,
93 buffer_collection: BufferCollectionProxy,
94 },
95 WaitingForAllocation(
97 QueryResponseFut<BufferCollectionWaitForAllBuffersAllocatedResult>,
98 BufferCollectionProxy,
99 ),
100 Done(Result<(), fidl_fuchsia_sysmem2::Error>),
103}
104
105impl SysmemAllocation {
106 pub fn pending() -> Self {
108 Self::Pending
109 }
110
111 pub fn allocate<
118 F: FnOnce(ClientEnd<BufferCollectionTokenMarker>) -> () + 'static + Send + Sync,
119 >(
120 allocator: AllocatorProxy,
121 name: BufferName<'_>,
122 debug_info: Option<AllocatorDebugInfo>,
123 constraints: BufferCollectionConstraints,
124 token_target_fn: F,
125 ) -> Result<Self, Error> {
126 set_allocator_name(&allocator, debug_info).context("Setting alloocator name")?;
128 let (client_token, client_token_request) =
129 fidl::endpoints::create_proxy::<BufferCollectionTokenMarker>();
130 allocator
131 .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
132 token_request: Some(client_token_request),
133 ..Default::default()
134 })
135 .context("Allocating shared collection")?;
136
137 let (token, token_request) = fidl::endpoints::create_endpoints();
139 client_token.duplicate(BufferCollectionTokenDuplicateRequest {
140 rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
141 token_request: Some(token_request),
142 ..Default::default()
143 })?;
144
145 client_token
146 .set_name(&NodeSetNameRequest {
147 priority: Some(name.priority),
148 name: Some(name.name.to_string()),
149 ..Default::default()
150 })
151 .context("set_name on BufferCollectionToken")?;
152
153 let client_end_token = client_token.into_client_end().unwrap();
154
155 let mut res = Self::bind(allocator, client_end_token, constraints)?;
156
157 if let Self::WaitingForSync { token_fn, .. } = &mut res {
158 *token_fn = Some(Box::new(move || token_target_fn(token)));
159 }
160
161 Ok(res)
162 }
163
164 pub fn bind(
168 allocator: AllocatorProxy,
169 token: ClientEnd<BufferCollectionTokenMarker>,
170 constraints: BufferCollectionConstraints,
171 ) -> Result<Self, Error> {
172 let (buffer_collection, collection_request) =
173 fidl::endpoints::create_proxy::<BufferCollectionMarker>();
174 allocator.bind_shared_collection(AllocatorBindSharedCollectionRequest {
175 token: Some(token),
176 buffer_collection_request: Some(collection_request),
177 ..Default::default()
178 })?;
179
180 buffer_collection
181 .set_constraints(BufferCollectionSetConstraintsRequest {
182 constraints: Some(constraints),
183 ..Default::default()
184 })
185 .context("sending constraints to sysmem")?;
186
187 Ok(Self::WaitingForSync {
188 future: buffer_collection.sync(),
189 token_fn: None,
190 buffer_collection,
191 })
192 }
193
194 fn synced(&mut self) -> Result<(), Error> {
198 *self = match std::mem::replace(self, Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)))
199 {
200 Self::WaitingForSync { future: _, token_fn, buffer_collection } => {
201 if let Some(deliver_token_fn) = token_fn {
202 deliver_token_fn();
203 }
204 Self::WaitingForAllocation(
205 buffer_collection.wait_for_all_buffers_allocated(),
206 buffer_collection,
207 )
208 }
209 _ => Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)),
210 };
211 if let Self::Done(_) = self {
212 return Err(format_err!("bad state in synced"));
213 }
214 Ok(())
215 }
216
217 fn allocated(
220 &mut self,
221 response_result: Result<
222 BufferCollectionWaitForAllBuffersAllocatedResponse,
223 fidl_fuchsia_sysmem2::Error,
224 >,
225 ) -> Result<SysmemAllocatedBuffers, Error> {
226 let done_result = response_result.as_ref().map(|_| ()).map_err(|err| *err);
227 match std::mem::replace(self, Self::Done(done_result)) {
228 Self::WaitingForAllocation(_, buffer_collection) => {
229 let response =
230 response_result.map_err(|err| format_err!("allocation failed: {:?}", err))?;
231 let buffer_info = response.buffer_collection_info.unwrap();
232 let buffers = buffer_info
233 .buffers
234 .unwrap()
235 .iter_mut()
236 .map(|buffer| buffer.vmo.take().expect("missing buffer"))
237 .collect();
238 let settings = buffer_info.settings.unwrap().buffer_settings.unwrap();
239 Ok(SysmemAllocatedBuffers {
240 buffers,
241 settings,
242 _buffer_collection: buffer_collection,
243 })
244 }
245 _ => Err(format_err!("allocation complete but not in the right state")),
246 }
247 }
248}
249
250impl FusedFuture for SysmemAllocation {
251 fn is_terminated(&self) -> bool {
252 match self {
253 Self::Done(_) => true,
254 _ => false,
255 }
256 }
257}
258
259impl Future for SysmemAllocation {
260 type Output = Result<SysmemAllocatedBuffers, Error>;
261
262 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
263 let s = Pin::into_inner(self);
264 if let Self::WaitingForSync { future, .. } = s {
265 match ready!(future.poll_unpin(cx)) {
266 Err(e) => {
267 error!("SysmemAllocator error: {:?}", e);
268 return Poll::Ready(Err(e.into()));
269 }
270 Ok(()) => {
271 if let Err(e) = s.synced() {
272 return Poll::Ready(Err(e));
273 }
274 }
275 };
276 }
277 if let Self::WaitingForAllocation(future, _) = s {
278 match ready!(future.poll_unpin(cx)) {
279 Ok(response_result) => return Poll::Ready(s.allocated(response_result)),
280 Err(e) => {
281 error!("SysmemAllocator waiting error: {:?}", e);
282 Poll::Ready(Err(e.into()))
283 }
284 }
285 } else {
286 Poll::Pending
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 use fidl_fuchsia_sysmem2::{
296 AllocatorMarker, AllocatorRequest, BufferCollectionInfo, BufferCollectionRequest,
297 BufferCollectionTokenProxy, BufferCollectionTokenRequest,
298 BufferCollectionTokenRequestStream, BufferMemoryConstraints, BufferUsage, CoherencyDomain,
299 Heap, SingleBufferSettings, VmoBuffer, CPU_USAGE_READ, VIDEO_USAGE_HW_DECODER,
300 };
301 use fuchsia_async as fasync;
302 use futures::StreamExt;
303 use std::pin::pin;
304
305 use crate::buffer_collection_constraints::buffer_collection_constraints_default;
306
307 fn assert_tokens_connected(
308 exec: &mut fasync::TestExecutor,
309 proxy: &BufferCollectionTokenProxy,
310 requests: &mut BufferCollectionTokenRequestStream,
311 ) {
312 let mut sync_fut = proxy.sync();
313
314 match exec.run_until_stalled(&mut requests.next()) {
315 Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Sync { responder }))) => {
316 responder.send().expect("respond to sync")
317 }
318 x => panic!("Expected vended token to be connected, got {:?}", x),
319 };
320
321 assert!(exec.run_until_stalled(&mut sync_fut).is_ready());
323 }
324
325 #[fuchsia::test]
326 fn allocate_future() {
327 let mut exec = fasync::TestExecutor::new();
328
329 let (proxy, mut allocator_requests) =
330 fidl::endpoints::create_proxy_and_stream::<AllocatorMarker>();
331
332 let (sender, mut receiver) = futures::channel::oneshot::channel();
333
334 let token_fn = move |token| {
335 sender.send(token).expect("should be able to send token");
336 };
337
338 let mut allocation = SysmemAllocation::allocate(
339 proxy,
340 BufferName { name: "audio-codec.allocate_future", priority: 100 },
341 None,
342 BufferCollectionConstraints {
343 usage: Some(BufferUsage {
344 cpu: Some(CPU_USAGE_READ),
345 video: Some(VIDEO_USAGE_HW_DECODER),
346 ..Default::default()
347 }),
348 min_buffer_count_for_camping: Some(1),
349 ..Default::default()
350 },
351 token_fn,
352 )
353 .expect("starting should work");
354 match exec.run_until_stalled(&mut allocator_requests.next()) {
355 Poll::Ready(Some(Ok(AllocatorRequest::SetDebugClientInfo { .. }))) => (),
356 x => panic!("Expected debug client info, got {:?}", x),
357 };
358
359 let mut token_requests_1 = match exec.run_until_stalled(&mut allocator_requests.next()) {
360 Poll::Ready(Some(Ok(AllocatorRequest::AllocateSharedCollection {
361 payload, ..
362 }))) => payload.token_request.unwrap().into_stream(),
363 x => panic!("Expected a shared allocation request, got {:?}", x),
364 };
365
366 let mut token_requests_2 = match exec.run_until_stalled(&mut token_requests_1.next()) {
367 Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Duplicate { payload, .. }))) => {
368 payload.token_request.unwrap().into_stream()
369 }
370 x => panic!("Expected a duplication request, got {:?}", x),
371 };
372
373 let (token_client_1, mut collection_requests_1) = match exec
374 .run_until_stalled(&mut allocator_requests.next())
375 {
376 Poll::Ready(Some(Ok(AllocatorRequest::BindSharedCollection { payload, .. }))) => (
377 payload.token.unwrap().into_proxy(),
378 payload.buffer_collection_request.unwrap().into_stream(),
379 ),
380 x => panic!("Expected Bind Shared Collection, got: {:?}", x),
381 };
382
383 match exec.run_until_stalled(&mut token_requests_1.next()) {
384 Poll::Ready(Some(Ok(BufferCollectionTokenRequest::SetName { .. }))) => {}
385 x => panic!("Expected setname {:?}", x),
386 };
387
388 assert_tokens_connected(&mut exec, &token_client_1, &mut token_requests_1);
390
391 match exec.run_until_stalled(&mut collection_requests_1.next()) {
392 Poll::Ready(Some(Ok(BufferCollectionRequest::SetConstraints { .. }))) => {}
393 x => panic!("Expected buffer constraints request, got {:?}", x),
394 };
395
396 let sync_responder = match exec.run_until_stalled(&mut collection_requests_1.next()) {
397 Poll::Ready(Some(Ok(BufferCollectionRequest::Sync { responder }))) => responder,
398 x => panic!("Expected a sync request, got {:?}", x),
399 };
400
401 assert!(exec.run_until_stalled(&mut allocation).is_pending());
404
405 sync_responder.send().expect("respond to sync request");
407
408 assert!(exec.run_until_stalled(&mut allocation).is_pending());
409
410 let token_client_2 = match receiver.try_recv() {
411 Ok(Some(token)) => token.into_proxy(),
412 x => panic!("Should have a token sent to the fn, got {:?}", x),
413 };
414
415 assert_tokens_connected(&mut exec, &token_client_2, &mut token_requests_2);
418
419 const SIZE_BYTES: u64 = 1024;
421 let buffer_settings = BufferMemorySettings {
422 size_bytes: Some(SIZE_BYTES),
423 is_physically_contiguous: Some(true),
424 is_secure: Some(false),
425 coherency_domain: Some(CoherencyDomain::Ram),
426 heap: Some(Heap {
427 heap_type: Some(bind_fuchsia_sysmem_heap::HEAP_TYPE_SYSTEM_RAM.into()),
428 ..Default::default()
429 }),
430 ..Default::default()
431 };
432
433 match exec.run_until_stalled(&mut collection_requests_1.next()) {
434 Poll::Ready(Some(Ok(BufferCollectionRequest::WaitForAllBuffersAllocated {
435 responder,
436 }))) => {
437 let single_buffer_settings = SingleBufferSettings {
438 buffer_settings: Some(buffer_settings.clone()),
439 ..Default::default()
440 };
441 let buffer_collection_info = BufferCollectionInfo {
442 settings: Some(single_buffer_settings),
443 buffers: Some(vec![VmoBuffer {
444 vmo: Some(zx::Vmo::create(SIZE_BYTES.into()).expect("vmo creation")),
445 vmo_usable_start: Some(0),
446 ..Default::default()
447 }]),
448 ..Default::default()
449 };
450 let response = BufferCollectionWaitForAllBuffersAllocatedResponse {
451 buffer_collection_info: Some(buffer_collection_info),
452 ..Default::default()
453 };
454 responder.send(Ok(response)).expect("send collection response")
455 }
456 x => panic!("Expected WaitForBuffersAllocated, got {:?}", x),
457 };
458
459 let mut buffers = match exec.run_until_stalled(&mut allocation) {
461 Poll::Pending => panic!("allocation should be done"),
462 Poll::Ready(res) => res.expect("successful allocation"),
463 };
464
465 assert_eq!(1, buffers.len());
466 assert!(buffers.get_mut(0).is_some());
467 assert_eq!(buffers.settings(), &buffer_settings);
468 }
469
470 #[fuchsia::test]
471 fn with_system_allocator() {
472 let mut exec = fasync::TestExecutor::new();
473 let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
474 .expect("connect to allocator");
475
476 let buffer_constraints = BufferCollectionConstraints {
477 min_buffer_count: Some(2),
478 buffer_memory_constraints: Some(BufferMemoryConstraints {
479 min_size_bytes: Some(4096),
480 ..Default::default()
481 }),
482 ..buffer_collection_constraints_default()
483 };
484
485 let (sender, mut receiver) = futures::channel::oneshot::channel();
486 let token_fn = move |token| {
487 sender.send(token).expect("should be able to send token");
488 };
489
490 let mut allocation = SysmemAllocation::allocate(
491 sysmem_client.clone(),
492 BufferName { name: "audio-codec.allocate_future", priority: 100 },
493 None,
494 buffer_constraints.clone(),
495 token_fn,
496 )
497 .expect("start allocator");
498
499 let token = loop {
502 assert!(exec.run_until_stalled(&mut allocation).is_pending());
503 if let Poll::Ready(x) = exec.run_until_stalled(&mut receiver) {
504 break x;
505 }
506 };
507 let token = token.expect("receive token");
508
509 let (buffer_collection_client, buffer_collection_requests) =
510 fidl::endpoints::create_proxy::<BufferCollectionMarker>();
511 sysmem_client
512 .bind_shared_collection(AllocatorBindSharedCollectionRequest {
513 token: Some(token),
514 buffer_collection_request: Some(buffer_collection_requests),
515 ..Default::default()
516 })
517 .expect("bind okay");
518
519 buffer_collection_client
520 .set_constraints(BufferCollectionSetConstraintsRequest {
521 constraints: Some(buffer_constraints),
522 ..Default::default()
523 })
524 .expect("constraints should send okay");
525
526 let mut allocation_fut = pin!(buffer_collection_client.wait_for_all_buffers_allocated());
527
528 let allocation_result =
529 exec.run_singlethreaded(&mut allocation_fut).expect("allocation success");
530
531 assert!(allocation_result.is_ok());
532
533 let allocated_buffers = match exec.run_until_stalled(&mut allocation) {
535 Poll::Ready(bufs) => bufs.expect("allocation success"),
536 x => panic!("Expected ready, got {:?}", x),
537 };
538
539 let _allocator_settings = allocated_buffers.settings();
540
541 let buffers = allocation_result.unwrap().buffer_collection_info.unwrap().buffers.unwrap();
542
543 assert_eq!(buffers.len(), allocated_buffers.len() as usize);
544 }
545}