fuchsia_audio_codec/
sysmem_allocator.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{format_err, Context as _, Error};
6use fidl::client::QueryResponseFut;
7use fidl::endpoints::{ClientEnd, Proxy};
8use fidl_fuchsia_sysmem2::{
9    AllocatorAllocateSharedCollectionRequest, AllocatorBindSharedCollectionRequest, AllocatorProxy,
10    AllocatorSetDebugClientInfoRequest, BufferCollectionConstraints, BufferCollectionMarker,
11    BufferCollectionProxy, BufferCollectionSetConstraintsRequest,
12    BufferCollectionTokenDuplicateRequest, BufferCollectionTokenMarker,
13    BufferCollectionWaitForAllBuffersAllocatedResponse,
14    BufferCollectionWaitForAllBuffersAllocatedResult, BufferMemorySettings, NodeSetNameRequest,
15};
16use futures::future::{FusedFuture, Future};
17use futures::task::{Context, Poll};
18use futures::{ready, FutureExt};
19use log::error;
20use std::pin::Pin;
21use zx::{self as zx, AsHandleRef};
22
23/// A set of buffers that have been allocated with the SysmemAllocator.
24#[derive(Debug)]
25pub struct SysmemAllocatedBuffers {
26    buffers: Vec<zx::Vmo>,
27    settings: BufferMemorySettings,
28    _buffer_collection: BufferCollectionProxy,
29}
30
31#[derive(Debug)]
32pub struct BufferName<'a> {
33    pub name: &'a str,
34    pub priority: u32,
35}
36
37#[derive(Debug)]
38pub struct AllocatorDebugInfo {
39    pub name: String,
40    pub id: u64,
41}
42
43fn default_allocator_name() -> Result<AllocatorDebugInfo, Error> {
44    let name = fuchsia_runtime::process_self().get_name()?;
45    let koid = fuchsia_runtime::process_self().get_koid()?;
46    Ok(AllocatorDebugInfo { name: name.to_string(), id: koid.raw_koid() })
47}
48
49fn set_allocator_name(
50    sysmem_client: &AllocatorProxy,
51    debug_info: Option<AllocatorDebugInfo>,
52) -> Result<(), Error> {
53    let unwrapped_debug_info = match debug_info {
54        Some(x) => x,
55        None => default_allocator_name()?,
56    };
57    Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
58        name: Some(unwrapped_debug_info.name),
59        id: Some(unwrapped_debug_info.id),
60        ..Default::default()
61    })?)
62}
63
64impl SysmemAllocatedBuffers {
65    /// Settings of the buffers that are available through `SysmemAllocator::get`
66    /// Returns None if the buffers are not allocated yet.
67    pub fn settings(&self) -> &BufferMemorySettings {
68        &self.settings
69    }
70
71    /// Get a VMO which has been allocated from the
72    pub fn get_mut(&mut self, idx: u32) -> Option<&mut zx::Vmo> {
73        let idx = idx as usize;
74        return self.buffers.get_mut(idx);
75    }
76
77    /// Get the number of VMOs that have been allocated.
78    /// Returns None if the allocation is not complete yet.
79    pub fn len(&self) -> u32 {
80        self.buffers.len().try_into().expect("buffers should fit in u32")
81    }
82}
83
84#[allow(clippy::large_enum_variant)] // TODO(https://fxbug.dev/401087115)
85/// A Future that communicates with the `fuchsia.sysmem2.Allocator` service to allocate shared
86/// buffers.
87pub enum SysmemAllocation {
88    Pending,
89    /// Waiting for the Sync response from the Allocator
90    WaitingForSync {
91        future: QueryResponseFut<()>,
92        token_fn: Option<Box<dyn FnOnce() -> () + Send + Sync>>,
93        buffer_collection: BufferCollectionProxy,
94    },
95    /// Waiting for the buffers to be allocated, which should eventually happen after delivering the token.
96    WaitingForAllocation(
97        QueryResponseFut<BufferCollectionWaitForAllBuffersAllocatedResult>,
98        BufferCollectionProxy,
99    ),
100    /// Allocation is completed. The result here represents whether it completed successfully or an
101    /// error.
102    Done(Result<(), fidl_fuchsia_sysmem2::Error>),
103}
104
105impl SysmemAllocation {
106    /// A pending allocation which has not been started, and will never finish.
107    pub fn pending() -> Self {
108        Self::Pending
109    }
110
111    /// Allocate a new shared memory collection, using `allocator` to communicate with the Allocator
112    /// service. `constraints` will be used to allocate the collection. A shared collection token
113    /// client end will be provided to the `token_target_fn` once the request has been synced with
114    /// the collection. This token can be used with `SysmemAllocation::shared` to finish allocating
115    /// the shared buffers or provided to another service to share allocation, or duplicated to
116    /// share this memory with more than one other client.
117    pub fn allocate<
118        F: FnOnce(ClientEnd<BufferCollectionTokenMarker>) -> () + 'static + Send + Sync,
119    >(
120        allocator: AllocatorProxy,
121        name: BufferName<'_>,
122        debug_info: Option<AllocatorDebugInfo>,
123        constraints: BufferCollectionConstraints,
124        token_target_fn: F,
125    ) -> Result<Self, Error> {
126        // Ignore errors since only debug information is being sent.
127        set_allocator_name(&allocator, debug_info).context("Setting alloocator name")?;
128        let (client_token, client_token_request) =
129            fidl::endpoints::create_proxy::<BufferCollectionTokenMarker>();
130        allocator
131            .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
132                token_request: Some(client_token_request),
133                ..Default::default()
134            })
135            .context("Allocating shared collection")?;
136
137        // Duplicate to get another BufferCollectionToken to the same collection.
138        let (token, token_request) = fidl::endpoints::create_endpoints();
139        client_token.duplicate(BufferCollectionTokenDuplicateRequest {
140            rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
141            token_request: Some(token_request),
142            ..Default::default()
143        })?;
144
145        client_token
146            .set_name(&NodeSetNameRequest {
147                priority: Some(name.priority),
148                name: Some(name.name.to_string()),
149                ..Default::default()
150            })
151            .context("set_name on BufferCollectionToken")?;
152
153        let client_end_token = client_token.into_client_end().unwrap();
154
155        let mut res = Self::bind(allocator, client_end_token, constraints)?;
156
157        if let Self::WaitingForSync { token_fn, .. } = &mut res {
158            *token_fn = Some(Box::new(move || token_target_fn(token)));
159        }
160
161        Ok(res)
162    }
163
164    /// Bind to a shared memory collection, using `allocator` to communicate with the Allocator
165    /// service and a `token` which has already been allocated. `constraints` is set to communicate
166    /// the requirements of this client.
167    pub fn bind(
168        allocator: AllocatorProxy,
169        token: ClientEnd<BufferCollectionTokenMarker>,
170        constraints: BufferCollectionConstraints,
171    ) -> Result<Self, Error> {
172        let (buffer_collection, collection_request) =
173            fidl::endpoints::create_proxy::<BufferCollectionMarker>();
174        allocator.bind_shared_collection(AllocatorBindSharedCollectionRequest {
175            token: Some(token),
176            buffer_collection_request: Some(collection_request),
177            ..Default::default()
178        })?;
179
180        buffer_collection
181            .set_constraints(BufferCollectionSetConstraintsRequest {
182                constraints: Some(constraints),
183                ..Default::default()
184            })
185            .context("sending constraints to sysmem")?;
186
187        Ok(Self::WaitingForSync {
188            future: buffer_collection.sync(),
189            token_fn: None,
190            buffer_collection,
191        })
192    }
193
194    /// Advances a synced collection to wait for the allocation of the buffers, after synced.
195    /// Delivers the token to the target as the collection is aware of it now and can reliably
196    /// detect when all tokens have been turned in and constraints have been set.
197    fn synced(&mut self) -> Result<(), Error> {
198        *self = match std::mem::replace(self, Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)))
199        {
200            Self::WaitingForSync { future: _, token_fn, buffer_collection } => {
201                if let Some(deliver_token_fn) = token_fn {
202                    deliver_token_fn();
203                }
204                Self::WaitingForAllocation(
205                    buffer_collection.wait_for_all_buffers_allocated(),
206                    buffer_collection,
207                )
208            }
209            _ => Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)),
210        };
211        if let Self::Done(_) = self {
212            return Err(format_err!("bad state in synced"));
213        }
214        Ok(())
215    }
216
217    /// Finish once the allocation has completed.  Returns the buffers and marks the allocation as
218    /// complete.
219    fn allocated(
220        &mut self,
221        response_result: Result<
222            BufferCollectionWaitForAllBuffersAllocatedResponse,
223            fidl_fuchsia_sysmem2::Error,
224        >,
225    ) -> Result<SysmemAllocatedBuffers, Error> {
226        let done_result = response_result.as_ref().map(|_| ()).map_err(|err| *err);
227        match std::mem::replace(self, Self::Done(done_result)) {
228            Self::WaitingForAllocation(_, buffer_collection) => {
229                let response =
230                    response_result.map_err(|err| format_err!("allocation failed: {:?}", err))?;
231                let buffer_info = response.buffer_collection_info.unwrap();
232                let buffers = buffer_info
233                    .buffers
234                    .unwrap()
235                    .iter_mut()
236                    .map(|buffer| buffer.vmo.take().expect("missing buffer"))
237                    .collect();
238                let settings = buffer_info.settings.unwrap().buffer_settings.unwrap();
239                Ok(SysmemAllocatedBuffers {
240                    buffers,
241                    settings,
242                    _buffer_collection: buffer_collection,
243                })
244            }
245            _ => Err(format_err!("allocation complete but not in the right state")),
246        }
247    }
248}
249
250impl FusedFuture for SysmemAllocation {
251    fn is_terminated(&self) -> bool {
252        match self {
253            Self::Done(_) => true,
254            _ => false,
255        }
256    }
257}
258
259impl Future for SysmemAllocation {
260    type Output = Result<SysmemAllocatedBuffers, Error>;
261
262    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
263        let s = Pin::into_inner(self);
264        if let Self::WaitingForSync { future, .. } = s {
265            match ready!(future.poll_unpin(cx)) {
266                Err(e) => {
267                    error!("SysmemAllocator error: {:?}", e);
268                    return Poll::Ready(Err(e.into()));
269                }
270                Ok(()) => {
271                    if let Err(e) = s.synced() {
272                        return Poll::Ready(Err(e));
273                    }
274                }
275            };
276        }
277        if let Self::WaitingForAllocation(future, _) = s {
278            match ready!(future.poll_unpin(cx)) {
279                Ok(response_result) => return Poll::Ready(s.allocated(response_result)),
280                Err(e) => {
281                    error!("SysmemAllocator waiting error: {:?}", e);
282                    Poll::Ready(Err(e.into()))
283                }
284            }
285        } else {
286            Poll::Pending
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    use fidl_fuchsia_sysmem2::{
296        AllocatorMarker, AllocatorRequest, BufferCollectionInfo, BufferCollectionRequest,
297        BufferCollectionTokenProxy, BufferCollectionTokenRequest,
298        BufferCollectionTokenRequestStream, BufferMemoryConstraints, BufferUsage, CoherencyDomain,
299        Heap, SingleBufferSettings, VmoBuffer, CPU_USAGE_READ, VIDEO_USAGE_HW_DECODER,
300    };
301    use fuchsia_async as fasync;
302    use futures::StreamExt;
303    use std::pin::pin;
304
305    use crate::buffer_collection_constraints::buffer_collection_constraints_default;
306
307    fn assert_tokens_connected(
308        exec: &mut fasync::TestExecutor,
309        proxy: &BufferCollectionTokenProxy,
310        requests: &mut BufferCollectionTokenRequestStream,
311    ) {
312        let mut sync_fut = proxy.sync();
313
314        match exec.run_until_stalled(&mut requests.next()) {
315            Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Sync { responder }))) => {
316                responder.send().expect("respond to sync")
317            }
318            x => panic!("Expected vended token to be connected, got {:?}", x),
319        };
320
321        // The sync future is ready now.
322        assert!(exec.run_until_stalled(&mut sync_fut).is_ready());
323    }
324
325    #[fuchsia::test]
326    fn allocate_future() {
327        let mut exec = fasync::TestExecutor::new();
328
329        let (proxy, mut allocator_requests) =
330            fidl::endpoints::create_proxy_and_stream::<AllocatorMarker>();
331
332        let (sender, mut receiver) = futures::channel::oneshot::channel();
333
334        let token_fn = move |token| {
335            sender.send(token).expect("should be able to send token");
336        };
337
338        let mut allocation = SysmemAllocation::allocate(
339            proxy,
340            BufferName { name: "audio-codec.allocate_future", priority: 100 },
341            None,
342            BufferCollectionConstraints {
343                usage: Some(BufferUsage {
344                    cpu: Some(CPU_USAGE_READ),
345                    video: Some(VIDEO_USAGE_HW_DECODER),
346                    ..Default::default()
347                }),
348                min_buffer_count_for_camping: Some(1),
349                ..Default::default()
350            },
351            token_fn,
352        )
353        .expect("starting should work");
354        match exec.run_until_stalled(&mut allocator_requests.next()) {
355            Poll::Ready(Some(Ok(AllocatorRequest::SetDebugClientInfo { .. }))) => (),
356            x => panic!("Expected debug client info, got {:?}", x),
357        };
358
359        let mut token_requests_1 = match exec.run_until_stalled(&mut allocator_requests.next()) {
360            Poll::Ready(Some(Ok(AllocatorRequest::AllocateSharedCollection {
361                payload, ..
362            }))) => payload.token_request.unwrap().into_stream(),
363            x => panic!("Expected a shared allocation request, got {:?}", x),
364        };
365
366        let mut token_requests_2 = match exec.run_until_stalled(&mut token_requests_1.next()) {
367            Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Duplicate { payload, .. }))) => {
368                payload.token_request.unwrap().into_stream()
369            }
370            x => panic!("Expected a duplication request, got {:?}", x),
371        };
372
373        let (token_client_1, mut collection_requests_1) = match exec
374            .run_until_stalled(&mut allocator_requests.next())
375        {
376            Poll::Ready(Some(Ok(AllocatorRequest::BindSharedCollection { payload, .. }))) => (
377                payload.token.unwrap().into_proxy(),
378                payload.buffer_collection_request.unwrap().into_stream(),
379            ),
380            x => panic!("Expected Bind Shared Collection, got: {:?}", x),
381        };
382
383        match exec.run_until_stalled(&mut token_requests_1.next()) {
384            Poll::Ready(Some(Ok(BufferCollectionTokenRequest::SetName { .. }))) => {}
385            x => panic!("Expected setname {:?}", x),
386        };
387
388        // The token turned into the allocator for binding should be connected to the server on allocating.
389        assert_tokens_connected(&mut exec, &token_client_1, &mut token_requests_1);
390
391        match exec.run_until_stalled(&mut collection_requests_1.next()) {
392            Poll::Ready(Some(Ok(BufferCollectionRequest::SetConstraints { .. }))) => {}
393            x => panic!("Expected buffer constraints request, got {:?}", x),
394        };
395
396        let sync_responder = match exec.run_until_stalled(&mut collection_requests_1.next()) {
397            Poll::Ready(Some(Ok(BufferCollectionRequest::Sync { responder }))) => responder,
398            x => panic!("Expected a sync request, got {:?}", x),
399        };
400
401        // The sysmem allocator is now waiting for the sync from the collection
402
403        assert!(exec.run_until_stalled(&mut allocation).is_pending());
404
405        // When it gets a response that the collection is synced, it vends the token out
406        sync_responder.send().expect("respond to sync request");
407
408        assert!(exec.run_until_stalled(&mut allocation).is_pending());
409
410        let token_client_2 = match receiver.try_recv() {
411            Ok(Some(token)) => token.into_proxy(),
412            x => panic!("Should have a token sent to the fn, got {:?}", x),
413        };
414
415        // token_client_2 should be attached to the token_requests_2 that we handed over to sysmem
416        // (in the token duplicate)
417        assert_tokens_connected(&mut exec, &token_client_2, &mut token_requests_2);
418
419        // We should have received a wait for the buffers to be allocated in our collection
420        const SIZE_BYTES: u64 = 1024;
421        let buffer_settings = BufferMemorySettings {
422            size_bytes: Some(SIZE_BYTES),
423            is_physically_contiguous: Some(true),
424            is_secure: Some(false),
425            coherency_domain: Some(CoherencyDomain::Ram),
426            heap: Some(Heap {
427                heap_type: Some(bind_fuchsia_sysmem_heap::HEAP_TYPE_SYSTEM_RAM.into()),
428                ..Default::default()
429            }),
430            ..Default::default()
431        };
432
433        match exec.run_until_stalled(&mut collection_requests_1.next()) {
434            Poll::Ready(Some(Ok(BufferCollectionRequest::WaitForAllBuffersAllocated {
435                responder,
436            }))) => {
437                let single_buffer_settings = SingleBufferSettings {
438                    buffer_settings: Some(buffer_settings.clone()),
439                    ..Default::default()
440                };
441                let buffer_collection_info = BufferCollectionInfo {
442                    settings: Some(single_buffer_settings),
443                    buffers: Some(vec![VmoBuffer {
444                        vmo: Some(zx::Vmo::create(SIZE_BYTES.into()).expect("vmo creation")),
445                        vmo_usable_start: Some(0),
446                        ..Default::default()
447                    }]),
448                    ..Default::default()
449                };
450                let response = BufferCollectionWaitForAllBuffersAllocatedResponse {
451                    buffer_collection_info: Some(buffer_collection_info),
452                    ..Default::default()
453                };
454                responder.send(Ok(response)).expect("send collection response")
455            }
456            x => panic!("Expected WaitForBuffersAllocated, got {:?}", x),
457        };
458
459        // The allocator should now be finished!
460        let mut buffers = match exec.run_until_stalled(&mut allocation) {
461            Poll::Pending => panic!("allocation should be done"),
462            Poll::Ready(res) => res.expect("successful allocation"),
463        };
464
465        assert_eq!(1, buffers.len());
466        assert!(buffers.get_mut(0).is_some());
467        assert_eq!(buffers.settings(), &buffer_settings);
468    }
469
470    #[fuchsia::test]
471    fn with_system_allocator() {
472        let mut exec = fasync::TestExecutor::new();
473        let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
474            .expect("connect to allocator");
475
476        let buffer_constraints = BufferCollectionConstraints {
477            min_buffer_count: Some(2),
478            buffer_memory_constraints: Some(BufferMemoryConstraints {
479                min_size_bytes: Some(4096),
480                ..Default::default()
481            }),
482            ..buffer_collection_constraints_default()
483        };
484
485        let (sender, mut receiver) = futures::channel::oneshot::channel();
486        let token_fn = move |token| {
487            sender.send(token).expect("should be able to send token");
488        };
489
490        let mut allocation = SysmemAllocation::allocate(
491            sysmem_client.clone(),
492            BufferName { name: "audio-codec.allocate_future", priority: 100 },
493            None,
494            buffer_constraints.clone(),
495            token_fn,
496        )
497        .expect("start allocator");
498
499        // Receive the token.  From here on, using the token, the test becomes the other client to
500        // the Allocator sharing the memory.
501        let token = loop {
502            assert!(exec.run_until_stalled(&mut allocation).is_pending());
503            if let Poll::Ready(x) = exec.run_until_stalled(&mut receiver) {
504                break x;
505            }
506        };
507        let token = token.expect("receive token");
508
509        let (buffer_collection_client, buffer_collection_requests) =
510            fidl::endpoints::create_proxy::<BufferCollectionMarker>();
511        sysmem_client
512            .bind_shared_collection(AllocatorBindSharedCollectionRequest {
513                token: Some(token),
514                buffer_collection_request: Some(buffer_collection_requests),
515                ..Default::default()
516            })
517            .expect("bind okay");
518
519        buffer_collection_client
520            .set_constraints(BufferCollectionSetConstraintsRequest {
521                constraints: Some(buffer_constraints),
522                ..Default::default()
523            })
524            .expect("constraints should send okay");
525
526        let mut allocation_fut = pin!(buffer_collection_client.wait_for_all_buffers_allocated());
527
528        let allocation_result =
529            exec.run_singlethreaded(&mut allocation_fut).expect("allocation success");
530
531        assert!(allocation_result.is_ok());
532
533        // Allocator should be ready now.
534        let allocated_buffers = match exec.run_until_stalled(&mut allocation) {
535            Poll::Ready(bufs) => bufs.expect("allocation success"),
536            x => panic!("Expected ready, got {:?}", x),
537        };
538
539        let _allocator_settings = allocated_buffers.settings();
540
541        let buffers = allocation_result.unwrap().buffer_collection_info.unwrap().buffers.unwrap();
542
543        assert_eq!(buffers.len(), allocated_buffers.len() as usize);
544    }
545}