fuchsia_audio_codec/
stream_processor.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{format_err, Context as _, Error};
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_media::*;
8use fidl_fuchsia_mediacodec::*;
9use fidl_fuchsia_sysmem2::*;
10use fuchsia_stream_processors::*;
11use fuchsia_sync::{Mutex, RwLock};
12use futures::future::{maybe_done, MaybeDone};
13use futures::io::{self, AsyncWrite};
14use futures::stream::{FusedStream, Stream};
15use futures::task::{Context, Poll, Waker};
16use futures::{ready, Future, StreamExt};
17use log::{trace, warn};
18use std::collections::{HashSet, VecDeque};
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use crate::buffer_collection_constraints::buffer_collection_constraints_default;
24use crate::sysmem_allocator::{BufferName, SysmemAllocatedBuffers, SysmemAllocation};
25
26fn fidl_error_to_io_error(e: fidl::Error) -> io::Error {
27    io::Error::other(format_err!("Fidl Error: {}", e))
28}
29
30#[derive(Debug)]
31/// Listener is a three-valued Option that captures the waker that a listener needs to be woken
32/// upon when it polls the future instead of at registration time.
33enum Listener {
34    /// No one is listening.
35    None,
36    /// Someone is listening, but either have been woken and not repolled, or never polled yet.
37    New,
38    /// Someone is listening, and can be woken with the waker.
39    Some(Waker),
40}
41
42impl Listener {
43    /// Adds a waker to be awoken with `Listener::wake`.
44    /// Panics if no one is listening.
45    fn register(&mut self, waker: Waker) {
46        *self = match mem::replace(self, Listener::None) {
47            Listener::None => panic!("Polled a listener with no pollers"),
48            _ => Listener::Some(waker),
49        };
50    }
51
52    /// If a listener has polled, wake the listener and replace it with New.
53    /// Noop if no one has registered.
54    fn wake(&mut self) {
55        if let Listener::None = self {
56            return;
57        }
58        match mem::replace(self, Listener::New) {
59            Listener::None => panic!("Should have been polled"),
60            Listener::Some(waker) => waker.wake(),
61            Listener::New => {}
62        }
63    }
64
65    /// Get a reference to the waker, if there is one waiting.
66    fn waker(&self) -> Option<&Waker> {
67        if let Listener::Some(ref waker) = self {
68            Some(waker)
69        } else {
70            None
71        }
72    }
73}
74
75impl Default for Listener {
76    fn default() -> Self {
77        Listener::None
78    }
79}
80
81/// A queue of encoded packets, to be sent to the `listener` when it polls next.
82struct OutputQueue {
83    /// The listener. Woken when a packet arrives after a previous poll() returned Pending.
84    listener: Listener,
85    /// A queue of encoded packets to be delivered to the receiver.
86    queue: VecDeque<Packet>,
87    /// True when the stream has received an end-of-stream message. The stream will return None
88    /// after the `queue` is empty.
89    ended: bool,
90}
91
92impl OutputQueue {
93    /// Adds a packet to the queue and wakes the listener if necessary.
94    fn enqueue(&mut self, packet: Packet) {
95        self.queue.push_back(packet);
96        self.listener.wake();
97    }
98
99    /// Signals the end of the stream has happened.
100    /// Wakes the listener if necessary.
101    fn mark_ended(&mut self) {
102        self.ended = true;
103        self.listener.wake();
104    }
105
106    fn waker(&self) -> Option<&Waker> {
107        self.listener.waker()
108    }
109
110    /// Wakes the listener so that it will repoll, if it is waiting.
111    fn wake(&mut self) {
112        self.listener.wake();
113    }
114}
115
116impl Default for OutputQueue {
117    fn default() -> Self {
118        OutputQueue { listener: Listener::default(), queue: VecDeque::new(), ended: false }
119    }
120}
121
122impl Stream for OutputQueue {
123    type Item = Packet;
124
125    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126        match self.queue.pop_front() {
127            Some(packet) => Poll::Ready(Some(packet)),
128            None if self.ended => Poll::Ready(None),
129            None => {
130                self.listener.register(cx.waker().clone());
131                Poll::Pending
132            }
133        }
134    }
135}
136
137// The minimum specified by codec is too small to contain the typical pcm frame chunk size for the
138// encoder case (1024). Increase to a reasonable amount.
139const MIN_INPUT_BUFFER_SIZE: u32 = 4096;
140// Go with codec default for output, for frame alignment.
141const MIN_OUTPUT_BUFFER_SIZE: u32 = 0;
142
143/// Index of an input buffer to be shared between the client and the StreamProcessor.
144#[derive(PartialEq, Eq, Hash, Clone, Debug)]
145struct InputBufferIndex(u32);
146
147/// The StreamProcessorInner handles the events that come from the StreamProcessor, mostly related
148/// to setup of the buffers and handling the output packets as they arrive.
149struct StreamProcessorInner {
150    /// The proxy to the stream processor.
151    processor: StreamProcessorProxy,
152    /// The proxy to the sysmem allocator.
153    sysmem_client: AllocatorProxy,
154    /// The event stream from the StreamProcessor.  We handle these internally.
155    events: StreamProcessorEventStream,
156    /// The size in bytes of each input packet
157    input_packet_size: u64,
158    /// The set of input buffers that are available for writing by the client, without the one
159    /// possibly being used by the input_cursor.
160    client_owned: HashSet<InputBufferIndex>,
161    /// A cursor on the next input buffer location to be written to when new input data arrives.
162    input_cursor: Option<(InputBufferIndex, u64)>,
163    /// An queue of the indexes of output buffers that have been filled by the processor and a
164    /// waiter if someone is waiting on it.
165    /// Also holds the output waker, if it is registered.
166    output_queue: Mutex<OutputQueue>,
167    /// Waker that is waiting on input to be ready.
168    input_waker: Option<Waker>,
169    /// Allocation for the input buffers.
170    input_allocation: MaybeDone<SysmemAllocation>,
171    /// Allocation for the output buffers.
172    output_allocation: MaybeDone<SysmemAllocation>,
173}
174
175impl StreamProcessorInner {
176    /// Handles an event from the StreamProcessor. A number of these events come on stream start to
177    /// setup the input and output buffers, and from then on the output packets and end of stream
178    /// marker, and the input packets are marked as usable after they are processed.
179    fn handle_event(&mut self, evt: StreamProcessorEvent) -> Result<(), Error> {
180        match evt {
181            StreamProcessorEvent::OnInputConstraints { input_constraints } => {
182                let _input_constraints = ValidStreamBufferConstraints::try_from(input_constraints)?;
183                let buffer_constraints =
184                    Self::buffer_constraints_from_min_size(MIN_INPUT_BUFFER_SIZE);
185                let processor = self.processor.clone();
186                let mut partial_settings = Self::partial_settings();
187                let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
188                    // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so
189                    // we can convert here until StreamProcessor has a sysmem2 token field.
190                    partial_settings.sysmem_token =
191                        Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
192                            token.into_channel(),
193                        ));
194                    // FIDL failures will be caught via the request stream.
195                    if let Err(e) = processor.set_input_buffer_partial_settings(partial_settings) {
196                        warn!("Couldn't set input buffer settings: {:?}", e);
197                    }
198                };
199                self.input_allocation = maybe_done(SysmemAllocation::allocate(
200                    self.sysmem_client.clone(),
201                    BufferName { name: "StreamProcessorInput", priority: 1 },
202                    None,
203                    buffer_constraints,
204                    token_fn,
205                )?);
206            }
207            StreamProcessorEvent::OnOutputConstraints { output_config } => {
208                let output_constraints = ValidStreamOutputConstraints::try_from(output_config)?;
209                if !output_constraints.buffer_constraints_action_required {
210                    return Ok(());
211                }
212                let buffer_constraints =
213                    Self::buffer_constraints_from_min_size(MIN_OUTPUT_BUFFER_SIZE);
214                let processor = self.processor.clone();
215                let mut partial_settings = Self::partial_settings();
216                let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
217                    // A sysmem token channel serves both sysmem(1) and sysmem2 token protocols, so
218                    // we can convert here until StreamProcessor has a sysmem2 token field.
219                    partial_settings.sysmem_token =
220                        Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
221                            token.into_channel(),
222                        ));
223                    // FIDL failures will be caught via the request stream.
224                    if let Err(e) = processor.set_output_buffer_partial_settings(partial_settings) {
225                        warn!("Couldn't set output buffer settings: {:?}", e);
226                    }
227                };
228
229                self.output_allocation = maybe_done(SysmemAllocation::allocate(
230                    self.sysmem_client.clone(),
231                    BufferName { name: "StreamProcessorOutput", priority: 1 },
232                    None,
233                    buffer_constraints,
234                    token_fn,
235                )?);
236            }
237            StreamProcessorEvent::OnOutputPacket { output_packet, .. } => {
238                let mut lock = self.output_queue.lock();
239                lock.enqueue(output_packet);
240            }
241            StreamProcessorEvent::OnFreeInputPacket {
242                free_input_packet: PacketHeader { packet_index: Some(idx), .. },
243            } => {
244                if !self.client_owned.insert(InputBufferIndex(idx)) {
245                    warn!("Freed an input packet that was already freed: {:?}", idx);
246                }
247                self.setup_input_cursor();
248            }
249            StreamProcessorEvent::OnOutputEndOfStream { .. } => {
250                let mut lock = self.output_queue.lock();
251                lock.mark_ended();
252            }
253            StreamProcessorEvent::OnOutputFormat { .. } => {}
254            e => trace!("Unhandled stream processor event: {:?}", e),
255        }
256        Ok(())
257    }
258
259    /// Process one event, and return Poll::Ready if the item has been processed,
260    /// and Poll::Pending if no event has been processed and the waker will be woken if
261    /// another event happens.
262    fn process_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
263        match ready!(self.events.poll_next_unpin(cx)) {
264            Some(Err(e)) => Poll::Ready(Err(e.into())),
265            Some(Ok(event)) => Poll::Ready(self.handle_event(event)),
266            None => Poll::Ready(Err(format_err!("Client disconnected"))),
267        }
268    }
269
270    fn buffer_constraints_from_min_size(min_buffer_size: u32) -> BufferCollectionConstraints {
271        BufferCollectionConstraints {
272            buffer_memory_constraints: Some(BufferMemoryConstraints {
273                min_size_bytes: Some(min_buffer_size as u64),
274                ..Default::default()
275            }),
276            ..buffer_collection_constraints_default()
277        }
278    }
279
280    fn partial_settings() -> StreamBufferPartialSettings {
281        StreamBufferPartialSettings {
282            buffer_lifetime_ordinal: Some(1),
283            buffer_constraints_version_ordinal: Some(1),
284            sysmem_token: None,
285            ..Default::default()
286        }
287    }
288
289    fn input_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
290        Pin::new(&mut self.input_allocation)
291            .output_mut()
292            .expect("allocation completed")
293            .as_mut()
294            .expect("succcessful allocation")
295    }
296
297    fn output_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
298        Pin::new(&mut self.output_allocation)
299            .output_mut()
300            .expect("allocation completed")
301            .as_mut()
302            .expect("succcessful allocation")
303    }
304
305    /// Called when the input_allocation future finishes.
306    /// Takes the buffers out of the allocator, and sets up the input cursor to accept data.
307    fn input_allocation_complete(&mut self) -> Result<(), Error> {
308        let _ = Pin::new(&mut self.input_allocation)
309            .output_mut()
310            .ok_or_else(|| format_err!("allocation isn't complete"))?;
311
312        let settings = self.input_buffers().settings();
313        self.input_packet_size = (*settings.size_bytes.as_ref().unwrap()).try_into()?;
314        let buffer_count = self.input_buffers().len();
315        for i in 0..buffer_count {
316            let _ = self.client_owned.insert(InputBufferIndex(i.try_into()?));
317        }
318        // allocation is complete, and we can write to the input.
319        self.setup_input_cursor();
320        Ok(())
321    }
322
323    /// Called when the output allocation future finishes.
324    /// Takes the buffers out of the allocator, and sets up the output buffers for retrieval of output,
325    /// signaling to the processor that the output buffers are set.
326    fn output_allocation_complete(&mut self) -> Result<(), Error> {
327        let _ = Pin::new(&mut self.output_allocation)
328            .output_mut()
329            .ok_or_else(|| format_err!("allocation isn't complete"))?;
330        self.processor
331            .complete_output_buffer_partial_settings(/*buffer_lifetime_ordinal=*/ 1)
332            .context("setting output buffer settings")?;
333        Ok(())
334    }
335
336    /// Poll any of the allocations that are waiting to complete, returning Pending if
337    /// any are still waiting to finish, and Ready if one has failed or both have completed.
338    fn poll_buffer_allocation(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
339        if let MaybeDone::Future(_) = self.input_allocation {
340            match Pin::new(&mut self.input_allocation).poll(cx) {
341                Poll::Ready(()) => {
342                    if let Err(e) = self.input_allocation_complete() {
343                        return Poll::Ready(Err(e));
344                    }
345                }
346                Poll::Pending => {}
347            };
348        }
349        if let MaybeDone::Future(_) = self.output_allocation {
350            match Pin::new(&mut self.output_allocation).poll(cx) {
351                Poll::Ready(()) => {
352                    if let Err(e) = self.output_allocation_complete() {
353                        return Poll::Ready(Err(e));
354                    }
355                }
356                Poll::Pending => {}
357            };
358        }
359        Poll::Pending
360    }
361
362    /// Provides the current registered waiting context with priority given to the output waker.
363    fn waiting_waker(&self) -> Option<Waker> {
364        match (self.output_queue.lock().waker(), &self.input_waker) {
365            // No one is waiting.
366            (None, None) => None,
367            (Some(waker), _) => Some(waker.clone()),
368            (_, Some(waker)) => Some(waker.clone()),
369        }
370    }
371
372    /// Process all the events that are currently available from the StreamProcessor and Allocators,
373    /// waking any known waker to be woken when another event arrives.
374    /// Returns Ok(()) if this was accomplished or Err() if an error occurred while processing.
375    fn poll_events(&mut self) -> Result<(), Error> {
376        let waker = loop {
377            let waker = match self.waiting_waker() {
378                // No one still needs to be woken.  This means all the wakers have been awoke,
379                // and will repoll.
380                None => return Ok(()),
381                Some(waker) => waker,
382            };
383            match self.process_event(&mut Context::from_waker(&waker)) {
384                Poll::Pending => break waker,
385                Poll::Ready(Err(e)) => {
386                    warn!("Stream processing error: {:?}", e);
387                    return Err(e.into());
388                }
389                // Didn't set the waker to be awoken, so let's try again.
390                Poll::Ready(Ok(())) => {}
391            }
392        };
393
394        if let Poll::Ready(Err(e)) = self.poll_buffer_allocation(&mut Context::from_waker(&waker)) {
395            warn!("Stream buffer allocation error: {:?}", e);
396            return Err(e.into());
397        }
398        Ok(())
399    }
400
401    fn wake_output(&mut self) {
402        self.output_queue.lock().wake();
403    }
404
405    fn wake_input(&mut self) {
406        if let Some(w) = self.input_waker.take() {
407            w.wake();
408        }
409    }
410
411    /// Attempts to set up a new input cursor, out of the current set of client owned input buffers.
412    /// If the cursor is already set, this does nothing.
413    fn setup_input_cursor(&mut self) {
414        if self.input_cursor.is_some() {
415            // Nothing to be done
416            return;
417        }
418        let next_idx = match self.client_owned.iter().next() {
419            None => return,
420            Some(idx) => idx.clone(),
421        };
422        let _ = self.client_owned.remove(&next_idx);
423        self.input_cursor = Some((next_idx, 0));
424        self.wake_input();
425    }
426
427    /// Reads an output packet from the output buffers, and marks the packets as recycled so the
428    /// output buffer can be reused. Allocates a new vector to hold the data.
429    fn read_output_packet(&mut self, packet: Packet) -> Result<Vec<u8>, Error> {
430        let packet = ValidPacket::try_from(packet)?;
431
432        let output_size = packet.valid_length_bytes as usize;
433        let offset = packet.start_offset as u64;
434        let mut output = vec![0; output_size];
435        let buf_idx = packet.buffer_index;
436        let vmo = self.output_buffers().get_mut(buf_idx).expect("output vmo should exist");
437        vmo.read(&mut output, offset)?;
438        self.processor.recycle_output_packet(&packet.header.into())?;
439        Ok(output)
440    }
441}
442
443/// Struct representing a CodecFactory .
444/// Input sent to the encoder via `StreamProcessor::write_bytes` is queued for delivery, and delivered
445/// whenever a packet is full or `StreamProcessor::send_packet` is called.  Output can be retrieved using
446/// an `StreamProcessorStream` from `StreamProcessor::take_output_stream`.
447pub struct StreamProcessor {
448    inner: Arc<RwLock<StreamProcessorInner>>,
449}
450
451/// An StreamProcessorStream is a Stream of processed data from a stream processor.
452/// Returned from `StreamProcessor::take_output_stream`.
453pub struct StreamProcessorOutputStream {
454    inner: Arc<RwLock<StreamProcessorInner>>,
455}
456
457impl StreamProcessor {
458    /// Create a new StreamProcessor given the proxy.
459    /// Takes the event stream of the proxy.
460    fn create(processor: StreamProcessorProxy, sysmem_client: AllocatorProxy) -> Self {
461        let events = processor.take_event_stream();
462        Self {
463            inner: Arc::new(RwLock::new(StreamProcessorInner {
464                processor,
465                sysmem_client,
466                events,
467                input_packet_size: 0,
468                client_owned: HashSet::new(),
469                input_cursor: None,
470                output_queue: Default::default(),
471                input_waker: None,
472                input_allocation: maybe_done(SysmemAllocation::pending()),
473                output_allocation: maybe_done(SysmemAllocation::pending()),
474            })),
475        }
476    }
477
478    /// Create a new StreamProcessor encoder, with the given `input_domain` and `encoder_settings`.  See
479    /// stream_processor.fidl for descriptions of these parameters.  This is only meant for audio
480    /// encoding.
481    pub fn create_encoder(
482        input_domain: DomainFormat,
483        encoder_settings: EncoderSettings,
484    ) -> Result<StreamProcessor, Error> {
485        let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
486            .context("Connecting to sysmem")?;
487
488        let format_details = FormatDetails {
489            domain: Some(input_domain),
490            encoder_settings: Some(encoder_settings),
491            format_details_version_ordinal: Some(1),
492            mime_type: Some("audio/pcm".to_string()),
493            oob_bytes: None,
494            pass_through_parameters: None,
495            timebase: None,
496            ..Default::default()
497        };
498
499        let encoder_params = CreateEncoderParams {
500            input_details: Some(format_details),
501            require_hw: Some(false),
502            ..Default::default()
503        };
504
505        let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
506            .context("Failed to connect to Codec Factory")?;
507
508        let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
509
510        codec_svc.create_encoder(&encoder_params, stream_processor_serverend)?;
511
512        Ok(StreamProcessor::create(processor, sysmem_client))
513    }
514
515    /// Create a new StreamProcessor decoder, with the given `mime_type` and optional `oob_bytes`.  See
516    /// stream_processor.fidl for descriptions of these parameters.  This is only meant for audio
517    /// decoding.
518    pub fn create_decoder(
519        mime_type: &str,
520        oob_bytes: Option<Vec<u8>>,
521    ) -> Result<StreamProcessor, Error> {
522        let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
523            .context("Connecting to sysmem")?;
524
525        let format_details = FormatDetails {
526            mime_type: Some(mime_type.to_string()),
527            oob_bytes: oob_bytes,
528            format_details_version_ordinal: Some(1),
529            encoder_settings: None,
530            domain: None,
531            pass_through_parameters: None,
532            timebase: None,
533            ..Default::default()
534        };
535
536        let decoder_params = CreateDecoderParams {
537            input_details: Some(format_details),
538            permit_lack_of_split_header_handling: Some(true),
539            ..Default::default()
540        };
541
542        let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
543            .context("Failed to connect to Codec Factory")?;
544
545        let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
546
547        codec_svc.create_decoder(&decoder_params, stream_processor_serverend)?;
548
549        Ok(StreamProcessor::create(processor, sysmem_client))
550    }
551
552    /// Take a stream object which will produce the output of the processor.
553    /// Only one StreamProcessorOutputStream object can exist at a time, and this will return an Error if it is
554    /// already taken.
555    pub fn take_output_stream(&mut self) -> Result<StreamProcessorOutputStream, Error> {
556        {
557            let read = self.inner.read();
558            let mut lock = read.output_queue.lock();
559            if let Listener::None = lock.listener {
560                lock.listener = Listener::New;
561            } else {
562                return Err(format_err!("Output stream already taken"));
563            }
564        }
565        Ok(StreamProcessorOutputStream { inner: self.inner.clone() })
566    }
567
568    /// Deliver input to the stream processor.  Returns the number of bytes delivered.
569    fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, io::Error> {
570        let mut bytes_idx = 0;
571        while bytes.len() > bytes_idx {
572            {
573                let mut write = self.inner.write();
574                let (idx, size) = match write.input_cursor.take() {
575                    None => return Ok(bytes_idx),
576                    Some(x) => x,
577                };
578                let space_left = write.input_packet_size - size;
579                let left_to_write = bytes.len() - bytes_idx;
580                let buffer_vmo = write.input_buffers().get_mut(idx.0).expect("need buffer vmo");
581                if space_left as usize > left_to_write {
582                    let write_buf = &bytes[bytes_idx..];
583                    let write_len = write_buf.len();
584                    buffer_vmo.write(write_buf, size)?;
585                    bytes_idx += write_len;
586                    write.input_cursor = Some((idx, size + write_len as u64));
587                    assert!(bytes.len() == bytes_idx);
588                    return Ok(bytes_idx);
589                }
590                let end_idx = bytes_idx + space_left as usize;
591                let write_buf = &bytes[bytes_idx..end_idx];
592                let write_len = write_buf.len();
593                buffer_vmo.write(write_buf, size)?;
594                bytes_idx += write_len;
595                // this buffer is done, ship it!
596                assert_eq!(size + write_len as u64, write.input_packet_size);
597                write.input_cursor = Some((idx, write.input_packet_size));
598            }
599            self.send_packet()?;
600        }
601        Ok(bytes_idx)
602    }
603
604    /// Flush the input buffer to the processor, relinquishing the ownership of the buffer
605    /// currently in the input cursor, and picking a new input buffer.  If there is no input
606    /// buffer left, the input cursor is left as None.
607    pub fn send_packet(&mut self) -> Result<(), io::Error> {
608        let mut write = self.inner.write();
609        if write.input_cursor.is_none() {
610            // Nothing to flush, nothing can have been written to an empty input cursor.
611            return Ok(());
612        }
613        let (idx, size) = write.input_cursor.take().expect("input cursor is none");
614        if size == 0 {
615            // Can't send empty packet to processor.
616            write.input_cursor = Some((idx, size));
617            return Ok(());
618        }
619        let packet = Packet {
620            header: Some(PacketHeader {
621                buffer_lifetime_ordinal: Some(1),
622                packet_index: Some(idx.0),
623                ..Default::default()
624            }),
625            buffer_index: Some(idx.0),
626            stream_lifetime_ordinal: Some(1),
627            start_offset: Some(0),
628            valid_length_bytes: Some(size as u32),
629            start_access_unit: Some(true),
630            known_end_access_unit: Some(true),
631            ..Default::default()
632        };
633        write.processor.queue_input_packet(&packet).map_err(fidl_error_to_io_error)?;
634        // pick another buffer for the input cursor
635        write.setup_input_cursor();
636        Ok(())
637    }
638
639    /// Test whether it is possible to write to the StreamProcessor. If there are no input buffers
640    /// available, returns Poll::Pending and arranges for the input task to receive a
641    /// notification when an input buffer may be available or the encoder is closed.
642    fn poll_writable(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
643        let mut write = self.inner.write();
644        // Drop the current input waker, since we have a new one.
645        // If the output waker is set, it should already be queued to be woken for the codec.
646        write.input_waker = None;
647        if write.input_cursor.is_some() {
648            return Poll::Ready(Ok(()));
649        }
650        write.input_waker = Some(cx.waker().clone());
651        // This can:
652        //  - wake the input waker (somehow received a input packet)
653        //  - poll with the output waker, setting it up to be woken
654        //  - poll with the input waker to be woken
655        if let Err(e) = write.poll_events() {
656            return Poll::Ready(Err(io::Error::other(e)));
657        }
658        Poll::Pending
659    }
660
661    pub fn close(&mut self) -> Result<(), io::Error> {
662        self.send_packet()?;
663
664        let mut write = self.inner.write();
665
666        write.processor.queue_input_end_of_stream(1).map_err(fidl_error_to_io_error)?;
667        // TODO: indicate this another way so that we can send an error if someone tries to write
668        // it after it's closed.
669        write.input_cursor = None;
670        write.wake_input();
671        write.wake_output();
672        Ok(())
673    }
674}
675
676impl AsyncWrite for StreamProcessor {
677    fn poll_write(
678        mut self: Pin<&mut Self>,
679        cx: &mut Context<'_>,
680        buf: &[u8],
681    ) -> Poll<io::Result<usize>> {
682        ready!(self.poll_writable(cx))?;
683        match self.write_bytes(buf) {
684            Ok(written) => Poll::Ready(Ok(written)),
685            Err(e) => Poll::Ready(Err(e.into())),
686        }
687    }
688
689    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
690        Poll::Ready(self.send_packet())
691    }
692
693    fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
694        Poll::Ready(self.send_packet())
695    }
696}
697
698impl Stream for StreamProcessorOutputStream {
699    type Item = Result<Vec<u8>, Error>;
700
701    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
702        let mut write = self.inner.write();
703        // If we have a item ready, just return it.
704        let packet = {
705            let mut queue = write.output_queue.lock();
706            match queue.poll_next_unpin(cx) {
707                Poll::Ready(Some(packet)) => Some(Some(packet)),
708                Poll::Ready(None) => Some(None),
709                Poll::Pending => {
710                    // The waker has been set for when the queue gets data.
711                    // We also need to set the same waker if an event happens.
712                    None
713                }
714            }
715        };
716        // We always need to set a waker for the events loop (this may be the same waker as above,
717        // or the input waker if the stream returned a packet)
718        if let Err(e) = write.poll_events() {
719            return Poll::Ready(Some(Err(e.into())));
720        }
721        match packet {
722            Some(Some(packet)) => Poll::Ready(Some(write.read_output_packet(packet))),
723            Some(None) => Poll::Ready(None),
724            None => Poll::Pending,
725        }
726    }
727}
728
729impl FusedStream for StreamProcessorOutputStream {
730    fn is_terminated(&self) -> bool {
731        self.inner.read().output_queue.lock().ended
732    }
733}
734
735#[cfg(test)]
736mod tests {
737    use super::*;
738
739    use async_test_helpers::run_while;
740    use byteorder::{ByteOrder, NativeEndian};
741    use fixture::fixture;
742    use fuchsia_async as fasync;
743    use futures::io::AsyncWriteExt;
744    use futures::FutureExt;
745    use futures_test::task::new_count_waker;
746    use sha2::{Digest as _, Sha256};
747    use std::fs::File;
748    use std::io::{Read, Write};
749    use std::pin::pin;
750
751    use stream_processor_test::ExpectedDigest;
752
753    const PCM_SAMPLE_SIZE: usize = 2;
754
755    #[derive(Clone, Debug)]
756    pub struct PcmAudio {
757        pcm_format: PcmFormat,
758        buffer: Vec<u8>,
759    }
760
761    impl PcmAudio {
762        pub fn create_saw_wave(pcm_format: PcmFormat, frame_count: usize) -> Self {
763            const FREQUENCY: f32 = 20.0;
764            const AMPLITUDE: f32 = 0.2;
765
766            let pcm_frame_size = PCM_SAMPLE_SIZE * pcm_format.channel_map.len();
767            let samples_per_frame = pcm_format.channel_map.len();
768            let sample_count = frame_count * samples_per_frame;
769
770            let mut buffer = vec![0; frame_count * pcm_frame_size];
771
772            for i in 0..sample_count {
773                let frame = (i / samples_per_frame) as f32;
774                let value =
775                    ((frame * FREQUENCY / (pcm_format.frames_per_second as f32)) % 1.0) * AMPLITUDE;
776                let sample = (value * i16::max_value() as f32) as i16;
777
778                let mut sample_bytes = [0; std::mem::size_of::<i16>()];
779                NativeEndian::write_i16(&mut sample_bytes, sample);
780
781                let offset = i * PCM_SAMPLE_SIZE;
782                buffer[offset] = sample_bytes[0];
783                buffer[offset + 1] = sample_bytes[1];
784            }
785
786            Self { pcm_format, buffer }
787        }
788
789        pub fn frame_size(&self) -> usize {
790            self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE
791        }
792    }
793
794    // Note: stolen from audio_encoder_test, update to stream_processor_test lib when this gets
795    // moved.
796    pub struct BytesValidator {
797        pub output_file: Option<&'static str>,
798        pub expected_digest: ExpectedDigest,
799    }
800
801    impl BytesValidator {
802        fn write_and_hash(&self, mut file: impl Write, bytes: &[u8]) -> Result<(), Error> {
803            let mut hasher = Sha256::default();
804
805            file.write_all(&bytes)?;
806            hasher.update(&bytes);
807
808            let digest: [u8; 32] = hasher.finalize().into();
809            if self.expected_digest.bytes != digest {
810                return Err(format_err!(
811                    "Expected {}; got {}",
812                    self.expected_digest,
813                    hex::encode(digest)
814                ))
815                .into();
816            }
817
818            Ok(())
819        }
820
821        fn output_file(&self) -> Result<impl Write, Error> {
822            Ok(if let Some(file) = self.output_file {
823                Box::new(std::fs::File::create(file)?) as Box<dyn Write>
824            } else {
825                Box::new(std::io::sink()) as Box<dyn Write>
826            })
827        }
828
829        fn validate(&self, bytes: &[u8]) -> Result<(), Error> {
830            self.write_and_hash(self.output_file()?, &bytes)
831        }
832    }
833
834    #[fuchsia::test]
835    fn encode_sbc() {
836        let mut exec = fasync::TestExecutor::new();
837
838        let pcm_format = PcmFormat {
839            pcm_mode: AudioPcmMode::Linear,
840            bits_per_sample: 16,
841            frames_per_second: 44100,
842            channel_map: vec![AudioChannelId::Cf],
843        };
844
845        let sub_bands = SbcSubBands::SubBands4;
846        let block_count = SbcBlockCount::BlockCount8;
847
848        let input_frames = 3000;
849
850        let pcm_audio = PcmAudio::create_saw_wave(pcm_format.clone(), input_frames);
851
852        let sbc_encoder_settings = EncoderSettings::Sbc(SbcEncoderSettings {
853            sub_bands,
854            block_count,
855            allocation: SbcAllocation::AllocLoudness,
856            channel_mode: SbcChannelMode::Mono,
857            bit_pool: 59, // Recommended from the SBC spec for these parameters.
858        });
859
860        let input_domain = DomainFormat::Audio(AudioFormat::Uncompressed(
861            AudioUncompressedFormat::Pcm(pcm_format),
862        ));
863
864        let mut encoder = StreamProcessor::create_encoder(input_domain, sbc_encoder_settings)
865            .expect("to create Encoder");
866
867        let frames_per_packet: usize = 8; // Randomly chosen by fair d10 roll.
868        let packet_size = pcm_audio.frame_size() * frames_per_packet;
869        let mut packets = pcm_audio.buffer.as_slice().chunks(packet_size);
870        let first_packet = packets.next().unwrap();
871
872        // Write an initial frame to the encoder.
873        // This is required to get past allocating the input/output buffers.
874        let written =
875            exec.run_singlethreaded(&mut encoder.write(first_packet)).expect("successful write");
876        assert_eq!(written, first_packet.len());
877
878        let mut encoded_stream = encoder.take_output_stream().expect("Stream should be taken");
879
880        // Shouldn't be able to take the stream twice
881        assert!(encoder.take_output_stream().is_err());
882
883        // Polling the encoded stream before the encoder has started up should wake it when
884        // output starts happening, set up the poll here.
885        let encoded_fut = pin!(encoded_stream.next());
886
887        let (waker, encoder_fut_wake_count) = new_count_waker();
888        let mut counting_ctx = Context::from_waker(&waker);
889
890        assert!(encoded_fut.poll(&mut counting_ctx).is_pending());
891
892        let mut frames_sent = first_packet.len() / pcm_audio.frame_size();
893
894        for packet in packets {
895            let mut written_fut = encoder.write(&packet);
896
897            let written_bytes =
898                exec.run_singlethreaded(&mut written_fut).expect("to write to encoder");
899
900            assert_eq!(packet.len(), written_bytes);
901            frames_sent += packet.len() / pcm_audio.frame_size();
902        }
903
904        encoder.close().expect("stream should always be closable");
905
906        assert_eq!(input_frames, frames_sent);
907
908        // When an unprocessed event has happened on the stream, even if intervening events have been
909        // procesed by the input processes, it should wake the output future to process the events.
910        let woke_count = encoder_fut_wake_count.get();
911        while encoder_fut_wake_count.get() == woke_count {
912            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
913        }
914        assert_eq!(encoder_fut_wake_count.get(), woke_count + 1);
915
916        // Get data from the output now.
917        let mut encoded = Vec::new();
918
919        loop {
920            let mut encoded_fut = encoded_stream.next();
921
922            match exec.run_singlethreaded(&mut encoded_fut) {
923                Some(Ok(enc_data)) => {
924                    assert!(!enc_data.is_empty());
925                    encoded.extend_from_slice(&enc_data);
926                }
927                Some(Err(e)) => {
928                    panic!("Unexpected error when polling encoded data: {}", e);
929                }
930                None => {
931                    break;
932                }
933            }
934        }
935
936        // Match the encoded data to the known hash.
937        let expected_digest = ExpectedDigest::new(
938            "Sbc: 44.1kHz/Loudness/Mono/bitpool 56/blocks 8/subbands 4",
939            "5c65a88bda3f132538966d87df34aa8675f85c9892b7f9f5571f76f3c7813562",
940        );
941        let hash_validator = BytesValidator { output_file: None, expected_digest };
942
943        assert_eq!(6110, encoded.len(), "Encoded size should be equal");
944
945        let validated = hash_validator.validate(encoded.as_slice());
946        assert!(validated.is_ok(), "Failed hash: {:?}", validated);
947    }
948
949    fn fix_sbc_test_file<F>(_name: &str, test: F)
950    where
951        F: FnOnce(Vec<u8>) -> (),
952    {
953        const SBC_TEST_FILE: &str = "/pkg/data/s16le44100mono.sbc";
954
955        let mut sbc_data = Vec::new();
956        let _ = File::open(SBC_TEST_FILE)
957            .expect("open test file")
958            .read_to_end(&mut sbc_data)
959            .expect("read test file");
960
961        test(sbc_data)
962    }
963
964    #[fixture(fix_sbc_test_file)]
965    #[fuchsia::test]
966    fn decode_sbc(sbc_data: Vec<u8>) {
967        let mut exec = fasync::TestExecutor::new();
968
969        const SBC_FRAME_SIZE: usize = 72;
970        const INPUT_FRAMES: usize = 23;
971
972        // SBC codec info corresponding to Mono reference stream.
973        let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
974        let mut decoder =
975            StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
976
977        let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
978
979        // Shouldn't be able to take the stream twice
980        assert!(decoder.take_output_stream().is_err());
981
982        let mut frames_sent = 0;
983
984        let frames_per_packet: usize = 1; // Randomly chosen by fair d10 roll.
985        let packet_size = SBC_FRAME_SIZE * frames_per_packet;
986
987        for frames in sbc_data.as_slice().chunks(packet_size) {
988            let mut written_fut = decoder.write(&frames);
989
990            let written_bytes =
991                exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
992
993            assert_eq!(frames.len(), written_bytes);
994            frames_sent += frames.len() / SBC_FRAME_SIZE;
995        }
996
997        assert_eq!(INPUT_FRAMES, frames_sent);
998
999        let mut flush_fut = pin!(decoder.flush());
1000        exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1001
1002        decoder.close().expect("stream should always be closable");
1003
1004        // Get data from the output now.
1005        let mut decoded = Vec::new();
1006
1007        loop {
1008            let mut decoded_fut = decoded_stream.next();
1009
1010            match exec.run_singlethreaded(&mut decoded_fut) {
1011                Some(Ok(dec_data)) => {
1012                    assert!(!dec_data.is_empty());
1013                    decoded.extend_from_slice(&dec_data);
1014                }
1015                Some(Err(e)) => {
1016                    panic!("Unexpected error when polling decoded data: {}", e);
1017                }
1018                None => {
1019                    break;
1020                }
1021            }
1022        }
1023
1024        // Match the decoded data to the known hash.
1025        let expected_digest = ExpectedDigest::new(
1026            "Pcm: 44.1kHz/16bit/Mono",
1027            "ff2e7afea51217886d3df15b9a623b4e49c9bd9bd79c58ac01bc94c5511e08d6",
1028        );
1029        let hash_validator = BytesValidator { output_file: None, expected_digest };
1030
1031        assert_eq!(256 * INPUT_FRAMES, decoded.len(), "Decoded size should be equal");
1032
1033        let validated = hash_validator.validate(decoded.as_slice());
1034        assert!(validated.is_ok(), "Failed hash: {:?}", validated);
1035    }
1036
1037    #[fixture(fix_sbc_test_file)]
1038    #[fuchsia::test]
1039    fn decode_sbc_wakes_output_to_process_events(sbc_data: Vec<u8>) {
1040        let mut exec = fasync::TestExecutor::new();
1041        const SBC_FRAME_SIZE: usize = 72;
1042
1043        // SBC codec info corresponding to Mono reference stream.
1044        let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1045        let mut decoder =
1046            StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1047
1048        let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1049        let next_frame = chunks.next().unwrap();
1050
1051        // Write an initial frame to the encoder.
1052        // This is required to get past allocating the input/output buffers.
1053        let written =
1054            exec.run_singlethreaded(&mut decoder.write(next_frame)).expect("successful write");
1055        assert_eq!(written, next_frame.len());
1056
1057        let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1058
1059        // Polling the decoded stream before the decoder has started up should wake it when
1060        // output starts happening, set up the poll here.
1061        let decoded_fut = pin!(decoded_stream.next());
1062
1063        let (waker, decoder_fut_wake_count) = new_count_waker();
1064        let mut counting_ctx = Context::from_waker(&waker);
1065
1066        assert!(decoded_fut.poll(&mut counting_ctx).is_pending());
1067
1068        // Send only one frame. This is not eneough to automatically cause output to be generated
1069        // by pushing data.
1070        let frame = chunks.next().unwrap();
1071        let mut written_fut = decoder.write(&frame);
1072        let written_bytes = exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
1073        assert_eq!(frame.len(), written_bytes);
1074
1075        let mut flush_fut = pin!(decoder.flush());
1076        exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1077
1078        // When an unprocessed event has happened on the stream, even if intervening events have been
1079        // procesed by the input processes, it should wake the output future to process the events.
1080        assert_eq!(decoder_fut_wake_count.get(), 0);
1081        while decoder_fut_wake_count.get() == 0 {
1082            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1083        }
1084        assert_eq!(decoder_fut_wake_count.get(), 1);
1085
1086        let mut decoded = Vec::new();
1087        // Drops the previous decoder future, which is fine.
1088        let mut decoded_fut = decoded_stream.next();
1089
1090        match exec.run_singlethreaded(&mut decoded_fut) {
1091            Some(Ok(dec_data)) => {
1092                assert!(!dec_data.is_empty());
1093                decoded.extend_from_slice(&dec_data);
1094            }
1095            x => panic!("Expected decoded frame, got {:?}", x),
1096        }
1097
1098        assert_eq!(512, decoded.len(), "Decoded size should be equal to one frame");
1099    }
1100
1101    #[fixture(fix_sbc_test_file)]
1102    #[fuchsia::test]
1103    fn decode_sbc_wakes_input_to_process_events(sbc_data: Vec<u8>) {
1104        let mut exec = fasync::TestExecutor::new();
1105        const SBC_FRAME_SIZE: usize = 72;
1106
1107        // SBC codec info corresponding to Mono reference stream.
1108        let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1109        let mut decoder =
1110            StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1111
1112        let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1113
1114        let decoded_fut = pin!(decoded_stream.next());
1115
1116        let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1117        let next_frame = chunks.next().unwrap();
1118
1119        // Write an initial frame to the encoder.
1120        // This is to get past allocating the input/output buffers stage.
1121        // TODO(https://fxbug.dev/42081385): Both futures need to be polled here even though it's only the
1122        // writer we really care about because currently decoded_fut is needed to drive the
1123        // allocation process.
1124        let (written_res, mut decoded_fut) =
1125            run_while(&mut exec, decoded_fut, decoder.write(next_frame));
1126        assert_eq!(written_res.expect("initial write should succeed"), next_frame.len());
1127
1128        // Write to the encoder until we cannot write anymore, because there are no input buffers
1129        // available.  This should happen when all the input buffers are full and and the input
1130        // buffers are waiting to be written.
1131        let (waker, write_fut_wake_count) = new_count_waker();
1132        let mut counting_ctx = Context::from_waker(&waker);
1133
1134        let mut wake_count_before_stall = 0;
1135        for frame in chunks {
1136            wake_count_before_stall = write_fut_wake_count.get();
1137            let mut written_fut = decoder.write(&frame);
1138            if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1139                // The poll_unpin can wake the input waker if an event arrived for it, meaning we should
1140                // continue filling.
1141                if write_fut_wake_count.get() != wake_count_before_stall {
1142                    continue;
1143                }
1144                // We should have never been woken until now, because we always were ready before,
1145                // and the output waker is not registered (so can't progress)
1146                break;
1147            }
1148            // Flush the packet, to make input buffers get spent faster.
1149            let mut flush_fut = pin!(decoder.flush());
1150            exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1151        }
1152
1153        // We should be able to get a decoded output, once the codec does it's thing.
1154        let decoded_frame = exec.run_singlethreaded(&mut decoded_fut);
1155        assert_eq!(512, decoded_frame.unwrap().unwrap().len(), "Decoded frame size wrong");
1156
1157        // Fill the input buffer again so the input waker is registered.
1158        let chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1159        for frame in chunks {
1160            wake_count_before_stall = write_fut_wake_count.get();
1161            let mut written_fut = decoder.write(&frame);
1162            if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1163                // The poll_unpin can wake the input waker if an event arrived for it, meaning we should
1164                // continue filling.
1165                if write_fut_wake_count.get() != wake_count_before_stall {
1166                    continue;
1167                }
1168                break;
1169            }
1170            // Flush the packet, to make input buffers get spent faster.
1171            let mut flush_fut = pin!(decoder.flush());
1172            exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1173        }
1174
1175        // The input waker should be the one waiting on events from the codec and get woken up,
1176        // even if an output event happens.
1177        // At some point, we will get an event from the encoder, with no output waker set, and this
1178        // should wake the input waker, which is waiting to be woken up.
1179        while write_fut_wake_count.get() == wake_count_before_stall {
1180            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1181        }
1182
1183        // Note: at this point, we may not be able to write another frame, but the waiter should
1184        // repoll, and set the waker again.
1185    }
1186}