1use anyhow::{format_err, Context as _, Error};
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_media::*;
8use fidl_fuchsia_mediacodec::*;
9use fidl_fuchsia_sysmem2::*;
10use fuchsia_stream_processors::*;
11use fuchsia_sync::{Mutex, RwLock};
12use futures::future::{maybe_done, MaybeDone};
13use futures::io::{self, AsyncWrite};
14use futures::stream::{FusedStream, Stream};
15use futures::task::{Context, Poll, Waker};
16use futures::{ready, Future, StreamExt};
17use log::{trace, warn};
18use std::collections::{HashSet, VecDeque};
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use crate::buffer_collection_constraints::buffer_collection_constraints_default;
24use crate::sysmem_allocator::{BufferName, SysmemAllocatedBuffers, SysmemAllocation};
25
26fn fidl_error_to_io_error(e: fidl::Error) -> io::Error {
27 io::Error::other(format_err!("Fidl Error: {}", e))
28}
29
30#[derive(Debug)]
31enum Listener {
34 None,
36 New,
38 Some(Waker),
40}
41
42impl Listener {
43 fn register(&mut self, waker: Waker) {
46 *self = match mem::replace(self, Listener::None) {
47 Listener::None => panic!("Polled a listener with no pollers"),
48 _ => Listener::Some(waker),
49 };
50 }
51
52 fn wake(&mut self) {
55 if let Listener::None = self {
56 return;
57 }
58 match mem::replace(self, Listener::New) {
59 Listener::None => panic!("Should have been polled"),
60 Listener::Some(waker) => waker.wake(),
61 Listener::New => {}
62 }
63 }
64
65 fn waker(&self) -> Option<&Waker> {
67 if let Listener::Some(ref waker) = self {
68 Some(waker)
69 } else {
70 None
71 }
72 }
73}
74
75impl Default for Listener {
76 fn default() -> Self {
77 Listener::None
78 }
79}
80
81struct OutputQueue {
83 listener: Listener,
85 queue: VecDeque<Packet>,
87 ended: bool,
90}
91
92impl OutputQueue {
93 fn enqueue(&mut self, packet: Packet) {
95 self.queue.push_back(packet);
96 self.listener.wake();
97 }
98
99 fn mark_ended(&mut self) {
102 self.ended = true;
103 self.listener.wake();
104 }
105
106 fn waker(&self) -> Option<&Waker> {
107 self.listener.waker()
108 }
109
110 fn wake(&mut self) {
112 self.listener.wake();
113 }
114}
115
116impl Default for OutputQueue {
117 fn default() -> Self {
118 OutputQueue { listener: Listener::default(), queue: VecDeque::new(), ended: false }
119 }
120}
121
122impl Stream for OutputQueue {
123 type Item = Packet;
124
125 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126 match self.queue.pop_front() {
127 Some(packet) => Poll::Ready(Some(packet)),
128 None if self.ended => Poll::Ready(None),
129 None => {
130 self.listener.register(cx.waker().clone());
131 Poll::Pending
132 }
133 }
134 }
135}
136
137const MIN_INPUT_BUFFER_SIZE: u32 = 4096;
140const MIN_OUTPUT_BUFFER_SIZE: u32 = 0;
142
143#[derive(PartialEq, Eq, Hash, Clone, Debug)]
145struct InputBufferIndex(u32);
146
147struct StreamProcessorInner {
150 processor: StreamProcessorProxy,
152 sysmem_client: AllocatorProxy,
154 events: StreamProcessorEventStream,
156 input_packet_size: u64,
158 client_owned: HashSet<InputBufferIndex>,
161 input_cursor: Option<(InputBufferIndex, u64)>,
163 output_queue: Mutex<OutputQueue>,
167 input_waker: Option<Waker>,
169 input_allocation: MaybeDone<SysmemAllocation>,
171 output_allocation: MaybeDone<SysmemAllocation>,
173}
174
175impl StreamProcessorInner {
176 fn handle_event(&mut self, evt: StreamProcessorEvent) -> Result<(), Error> {
180 match evt {
181 StreamProcessorEvent::OnInputConstraints { input_constraints } => {
182 let _input_constraints = ValidStreamBufferConstraints::try_from(input_constraints)?;
183 let buffer_constraints =
184 Self::buffer_constraints_from_min_size(MIN_INPUT_BUFFER_SIZE);
185 let processor = self.processor.clone();
186 let mut partial_settings = Self::partial_settings();
187 let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
188 partial_settings.sysmem_token =
191 Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
192 token.into_channel(),
193 ));
194 if let Err(e) = processor.set_input_buffer_partial_settings(partial_settings) {
196 warn!("Couldn't set input buffer settings: {:?}", e);
197 }
198 };
199 self.input_allocation = maybe_done(SysmemAllocation::allocate(
200 self.sysmem_client.clone(),
201 BufferName { name: "StreamProcessorInput", priority: 1 },
202 None,
203 buffer_constraints,
204 token_fn,
205 )?);
206 }
207 StreamProcessorEvent::OnOutputConstraints { output_config } => {
208 let output_constraints = ValidStreamOutputConstraints::try_from(output_config)?;
209 if !output_constraints.buffer_constraints_action_required {
210 return Ok(());
211 }
212 let buffer_constraints =
213 Self::buffer_constraints_from_min_size(MIN_OUTPUT_BUFFER_SIZE);
214 let processor = self.processor.clone();
215 let mut partial_settings = Self::partial_settings();
216 let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
217 partial_settings.sysmem_token =
220 Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
221 token.into_channel(),
222 ));
223 if let Err(e) = processor.set_output_buffer_partial_settings(partial_settings) {
225 warn!("Couldn't set output buffer settings: {:?}", e);
226 }
227 };
228
229 self.output_allocation = maybe_done(SysmemAllocation::allocate(
230 self.sysmem_client.clone(),
231 BufferName { name: "StreamProcessorOutput", priority: 1 },
232 None,
233 buffer_constraints,
234 token_fn,
235 )?);
236 }
237 StreamProcessorEvent::OnOutputPacket { output_packet, .. } => {
238 let mut lock = self.output_queue.lock();
239 lock.enqueue(output_packet);
240 }
241 StreamProcessorEvent::OnFreeInputPacket {
242 free_input_packet: PacketHeader { packet_index: Some(idx), .. },
243 } => {
244 if !self.client_owned.insert(InputBufferIndex(idx)) {
245 warn!("Freed an input packet that was already freed: {:?}", idx);
246 }
247 self.setup_input_cursor();
248 }
249 StreamProcessorEvent::OnOutputEndOfStream { .. } => {
250 let mut lock = self.output_queue.lock();
251 lock.mark_ended();
252 }
253 StreamProcessorEvent::OnOutputFormat { .. } => {}
254 e => trace!("Unhandled stream processor event: {:?}", e),
255 }
256 Ok(())
257 }
258
259 fn process_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
263 match ready!(self.events.poll_next_unpin(cx)) {
264 Some(Err(e)) => Poll::Ready(Err(e.into())),
265 Some(Ok(event)) => Poll::Ready(self.handle_event(event)),
266 None => Poll::Ready(Err(format_err!("Client disconnected"))),
267 }
268 }
269
270 fn buffer_constraints_from_min_size(min_buffer_size: u32) -> BufferCollectionConstraints {
271 BufferCollectionConstraints {
272 buffer_memory_constraints: Some(BufferMemoryConstraints {
273 min_size_bytes: Some(min_buffer_size as u64),
274 ..Default::default()
275 }),
276 ..buffer_collection_constraints_default()
277 }
278 }
279
280 fn partial_settings() -> StreamBufferPartialSettings {
281 StreamBufferPartialSettings {
282 buffer_lifetime_ordinal: Some(1),
283 buffer_constraints_version_ordinal: Some(1),
284 sysmem_token: None,
285 ..Default::default()
286 }
287 }
288
289 fn input_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
290 Pin::new(&mut self.input_allocation)
291 .output_mut()
292 .expect("allocation completed")
293 .as_mut()
294 .expect("succcessful allocation")
295 }
296
297 fn output_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
298 Pin::new(&mut self.output_allocation)
299 .output_mut()
300 .expect("allocation completed")
301 .as_mut()
302 .expect("succcessful allocation")
303 }
304
305 fn input_allocation_complete(&mut self) -> Result<(), Error> {
308 let _ = Pin::new(&mut self.input_allocation)
309 .output_mut()
310 .ok_or_else(|| format_err!("allocation isn't complete"))?;
311
312 let settings = self.input_buffers().settings();
313 self.input_packet_size = (*settings.size_bytes.as_ref().unwrap()).try_into()?;
314 let buffer_count = self.input_buffers().len();
315 for i in 0..buffer_count {
316 let _ = self.client_owned.insert(InputBufferIndex(i.try_into()?));
317 }
318 self.setup_input_cursor();
320 Ok(())
321 }
322
323 fn output_allocation_complete(&mut self) -> Result<(), Error> {
327 let _ = Pin::new(&mut self.output_allocation)
328 .output_mut()
329 .ok_or_else(|| format_err!("allocation isn't complete"))?;
330 self.processor
331 .complete_output_buffer_partial_settings(1)
332 .context("setting output buffer settings")?;
333 Ok(())
334 }
335
336 fn poll_buffer_allocation(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
339 if let MaybeDone::Future(_) = self.input_allocation {
340 match Pin::new(&mut self.input_allocation).poll(cx) {
341 Poll::Ready(()) => {
342 if let Err(e) = self.input_allocation_complete() {
343 return Poll::Ready(Err(e));
344 }
345 }
346 Poll::Pending => {}
347 };
348 }
349 if let MaybeDone::Future(_) = self.output_allocation {
350 match Pin::new(&mut self.output_allocation).poll(cx) {
351 Poll::Ready(()) => {
352 if let Err(e) = self.output_allocation_complete() {
353 return Poll::Ready(Err(e));
354 }
355 }
356 Poll::Pending => {}
357 };
358 }
359 Poll::Pending
360 }
361
362 fn waiting_waker(&self) -> Option<Waker> {
364 match (self.output_queue.lock().waker(), &self.input_waker) {
365 (None, None) => None,
367 (Some(waker), _) => Some(waker.clone()),
368 (_, Some(waker)) => Some(waker.clone()),
369 }
370 }
371
372 fn poll_events(&mut self) -> Result<(), Error> {
376 let waker = loop {
377 let waker = match self.waiting_waker() {
378 None => return Ok(()),
381 Some(waker) => waker,
382 };
383 match self.process_event(&mut Context::from_waker(&waker)) {
384 Poll::Pending => break waker,
385 Poll::Ready(Err(e)) => {
386 warn!("Stream processing error: {:?}", e);
387 return Err(e.into());
388 }
389 Poll::Ready(Ok(())) => {}
391 }
392 };
393
394 if let Poll::Ready(Err(e)) = self.poll_buffer_allocation(&mut Context::from_waker(&waker)) {
395 warn!("Stream buffer allocation error: {:?}", e);
396 return Err(e.into());
397 }
398 Ok(())
399 }
400
401 fn wake_output(&mut self) {
402 self.output_queue.lock().wake();
403 }
404
405 fn wake_input(&mut self) {
406 if let Some(w) = self.input_waker.take() {
407 w.wake();
408 }
409 }
410
411 fn setup_input_cursor(&mut self) {
414 if self.input_cursor.is_some() {
415 return;
417 }
418 let next_idx = match self.client_owned.iter().next() {
419 None => return,
420 Some(idx) => idx.clone(),
421 };
422 let _ = self.client_owned.remove(&next_idx);
423 self.input_cursor = Some((next_idx, 0));
424 self.wake_input();
425 }
426
427 fn read_output_packet(&mut self, packet: Packet) -> Result<Vec<u8>, Error> {
430 let packet = ValidPacket::try_from(packet)?;
431
432 let output_size = packet.valid_length_bytes as usize;
433 let offset = packet.start_offset as u64;
434 let mut output = vec![0; output_size];
435 let buf_idx = packet.buffer_index;
436 let vmo = self.output_buffers().get_mut(buf_idx).expect("output vmo should exist");
437 vmo.read(&mut output, offset)?;
438 self.processor.recycle_output_packet(&packet.header.into())?;
439 Ok(output)
440 }
441}
442
443pub struct StreamProcessor {
448 inner: Arc<RwLock<StreamProcessorInner>>,
449}
450
451pub struct StreamProcessorOutputStream {
454 inner: Arc<RwLock<StreamProcessorInner>>,
455}
456
457impl StreamProcessor {
458 fn create(processor: StreamProcessorProxy, sysmem_client: AllocatorProxy) -> Self {
461 let events = processor.take_event_stream();
462 Self {
463 inner: Arc::new(RwLock::new(StreamProcessorInner {
464 processor,
465 sysmem_client,
466 events,
467 input_packet_size: 0,
468 client_owned: HashSet::new(),
469 input_cursor: None,
470 output_queue: Default::default(),
471 input_waker: None,
472 input_allocation: maybe_done(SysmemAllocation::pending()),
473 output_allocation: maybe_done(SysmemAllocation::pending()),
474 })),
475 }
476 }
477
478 pub fn create_encoder(
482 input_domain: DomainFormat,
483 encoder_settings: EncoderSettings,
484 ) -> Result<StreamProcessor, Error> {
485 let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
486 .context("Connecting to sysmem")?;
487
488 let format_details = FormatDetails {
489 domain: Some(input_domain),
490 encoder_settings: Some(encoder_settings),
491 format_details_version_ordinal: Some(1),
492 mime_type: Some("audio/pcm".to_string()),
493 oob_bytes: None,
494 pass_through_parameters: None,
495 timebase: None,
496 ..Default::default()
497 };
498
499 let encoder_params = CreateEncoderParams {
500 input_details: Some(format_details),
501 require_hw: Some(false),
502 ..Default::default()
503 };
504
505 let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
506 .context("Failed to connect to Codec Factory")?;
507
508 let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
509
510 codec_svc.create_encoder(&encoder_params, stream_processor_serverend)?;
511
512 Ok(StreamProcessor::create(processor, sysmem_client))
513 }
514
515 pub fn create_decoder(
519 mime_type: &str,
520 oob_bytes: Option<Vec<u8>>,
521 ) -> Result<StreamProcessor, Error> {
522 let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
523 .context("Connecting to sysmem")?;
524
525 let format_details = FormatDetails {
526 mime_type: Some(mime_type.to_string()),
527 oob_bytes: oob_bytes,
528 format_details_version_ordinal: Some(1),
529 encoder_settings: None,
530 domain: None,
531 pass_through_parameters: None,
532 timebase: None,
533 ..Default::default()
534 };
535
536 let decoder_params = CreateDecoderParams {
537 input_details: Some(format_details),
538 permit_lack_of_split_header_handling: Some(true),
539 ..Default::default()
540 };
541
542 let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
543 .context("Failed to connect to Codec Factory")?;
544
545 let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
546
547 codec_svc.create_decoder(&decoder_params, stream_processor_serverend)?;
548
549 Ok(StreamProcessor::create(processor, sysmem_client))
550 }
551
552 pub fn take_output_stream(&mut self) -> Result<StreamProcessorOutputStream, Error> {
556 {
557 let read = self.inner.read();
558 let mut lock = read.output_queue.lock();
559 if let Listener::None = lock.listener {
560 lock.listener = Listener::New;
561 } else {
562 return Err(format_err!("Output stream already taken"));
563 }
564 }
565 Ok(StreamProcessorOutputStream { inner: self.inner.clone() })
566 }
567
568 fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, io::Error> {
570 let mut bytes_idx = 0;
571 while bytes.len() > bytes_idx {
572 {
573 let mut write = self.inner.write();
574 let (idx, size) = match write.input_cursor.take() {
575 None => return Ok(bytes_idx),
576 Some(x) => x,
577 };
578 let space_left = write.input_packet_size - size;
579 let left_to_write = bytes.len() - bytes_idx;
580 let buffer_vmo = write.input_buffers().get_mut(idx.0).expect("need buffer vmo");
581 if space_left as usize > left_to_write {
582 let write_buf = &bytes[bytes_idx..];
583 let write_len = write_buf.len();
584 buffer_vmo.write(write_buf, size)?;
585 bytes_idx += write_len;
586 write.input_cursor = Some((idx, size + write_len as u64));
587 assert!(bytes.len() == bytes_idx);
588 return Ok(bytes_idx);
589 }
590 let end_idx = bytes_idx + space_left as usize;
591 let write_buf = &bytes[bytes_idx..end_idx];
592 let write_len = write_buf.len();
593 buffer_vmo.write(write_buf, size)?;
594 bytes_idx += write_len;
595 assert_eq!(size + write_len as u64, write.input_packet_size);
597 write.input_cursor = Some((idx, write.input_packet_size));
598 }
599 self.send_packet()?;
600 }
601 Ok(bytes_idx)
602 }
603
604 pub fn send_packet(&mut self) -> Result<(), io::Error> {
608 let mut write = self.inner.write();
609 if write.input_cursor.is_none() {
610 return Ok(());
612 }
613 let (idx, size) = write.input_cursor.take().expect("input cursor is none");
614 if size == 0 {
615 write.input_cursor = Some((idx, size));
617 return Ok(());
618 }
619 let packet = Packet {
620 header: Some(PacketHeader {
621 buffer_lifetime_ordinal: Some(1),
622 packet_index: Some(idx.0),
623 ..Default::default()
624 }),
625 buffer_index: Some(idx.0),
626 stream_lifetime_ordinal: Some(1),
627 start_offset: Some(0),
628 valid_length_bytes: Some(size as u32),
629 start_access_unit: Some(true),
630 known_end_access_unit: Some(true),
631 ..Default::default()
632 };
633 write.processor.queue_input_packet(&packet).map_err(fidl_error_to_io_error)?;
634 write.setup_input_cursor();
636 Ok(())
637 }
638
639 fn poll_writable(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
643 let mut write = self.inner.write();
644 write.input_waker = None;
647 if write.input_cursor.is_some() {
648 return Poll::Ready(Ok(()));
649 }
650 write.input_waker = Some(cx.waker().clone());
651 if let Err(e) = write.poll_events() {
656 return Poll::Ready(Err(io::Error::other(e)));
657 }
658 Poll::Pending
659 }
660
661 pub fn close(&mut self) -> Result<(), io::Error> {
662 self.send_packet()?;
663
664 let mut write = self.inner.write();
665
666 write.processor.queue_input_end_of_stream(1).map_err(fidl_error_to_io_error)?;
667 write.input_cursor = None;
670 write.wake_input();
671 write.wake_output();
672 Ok(())
673 }
674}
675
676impl AsyncWrite for StreamProcessor {
677 fn poll_write(
678 mut self: Pin<&mut Self>,
679 cx: &mut Context<'_>,
680 buf: &[u8],
681 ) -> Poll<io::Result<usize>> {
682 ready!(self.poll_writable(cx))?;
683 match self.write_bytes(buf) {
684 Ok(written) => Poll::Ready(Ok(written)),
685 Err(e) => Poll::Ready(Err(e.into())),
686 }
687 }
688
689 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
690 Poll::Ready(self.send_packet())
691 }
692
693 fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
694 Poll::Ready(self.send_packet())
695 }
696}
697
698impl Stream for StreamProcessorOutputStream {
699 type Item = Result<Vec<u8>, Error>;
700
701 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
702 let mut write = self.inner.write();
703 let packet = {
705 let mut queue = write.output_queue.lock();
706 match queue.poll_next_unpin(cx) {
707 Poll::Ready(Some(packet)) => Some(Some(packet)),
708 Poll::Ready(None) => Some(None),
709 Poll::Pending => {
710 None
713 }
714 }
715 };
716 if let Err(e) = write.poll_events() {
719 return Poll::Ready(Some(Err(e.into())));
720 }
721 match packet {
722 Some(Some(packet)) => Poll::Ready(Some(write.read_output_packet(packet))),
723 Some(None) => Poll::Ready(None),
724 None => Poll::Pending,
725 }
726 }
727}
728
729impl FusedStream for StreamProcessorOutputStream {
730 fn is_terminated(&self) -> bool {
731 self.inner.read().output_queue.lock().ended
732 }
733}
734
735#[cfg(test)]
736mod tests {
737 use super::*;
738
739 use async_test_helpers::run_while;
740 use byteorder::{ByteOrder, NativeEndian};
741 use fixture::fixture;
742 use fuchsia_async as fasync;
743 use futures::io::AsyncWriteExt;
744 use futures::FutureExt;
745 use futures_test::task::new_count_waker;
746 use sha2::{Digest as _, Sha256};
747 use std::fs::File;
748 use std::io::{Read, Write};
749 use std::pin::pin;
750
751 use stream_processor_test::ExpectedDigest;
752
753 const PCM_SAMPLE_SIZE: usize = 2;
754
755 #[derive(Clone, Debug)]
756 pub struct PcmAudio {
757 pcm_format: PcmFormat,
758 buffer: Vec<u8>,
759 }
760
761 impl PcmAudio {
762 pub fn create_saw_wave(pcm_format: PcmFormat, frame_count: usize) -> Self {
763 const FREQUENCY: f32 = 20.0;
764 const AMPLITUDE: f32 = 0.2;
765
766 let pcm_frame_size = PCM_SAMPLE_SIZE * pcm_format.channel_map.len();
767 let samples_per_frame = pcm_format.channel_map.len();
768 let sample_count = frame_count * samples_per_frame;
769
770 let mut buffer = vec![0; frame_count * pcm_frame_size];
771
772 for i in 0..sample_count {
773 let frame = (i / samples_per_frame) as f32;
774 let value =
775 ((frame * FREQUENCY / (pcm_format.frames_per_second as f32)) % 1.0) * AMPLITUDE;
776 let sample = (value * i16::max_value() as f32) as i16;
777
778 let mut sample_bytes = [0; std::mem::size_of::<i16>()];
779 NativeEndian::write_i16(&mut sample_bytes, sample);
780
781 let offset = i * PCM_SAMPLE_SIZE;
782 buffer[offset] = sample_bytes[0];
783 buffer[offset + 1] = sample_bytes[1];
784 }
785
786 Self { pcm_format, buffer }
787 }
788
789 pub fn frame_size(&self) -> usize {
790 self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE
791 }
792 }
793
794 pub struct BytesValidator {
797 pub output_file: Option<&'static str>,
798 pub expected_digest: ExpectedDigest,
799 }
800
801 impl BytesValidator {
802 fn write_and_hash(&self, mut file: impl Write, bytes: &[u8]) -> Result<(), Error> {
803 let mut hasher = Sha256::default();
804
805 file.write_all(&bytes)?;
806 hasher.update(&bytes);
807
808 let digest: [u8; 32] = hasher.finalize().into();
809 if self.expected_digest.bytes != digest {
810 return Err(format_err!(
811 "Expected {}; got {}",
812 self.expected_digest,
813 hex::encode(digest)
814 ))
815 .into();
816 }
817
818 Ok(())
819 }
820
821 fn output_file(&self) -> Result<impl Write, Error> {
822 Ok(if let Some(file) = self.output_file {
823 Box::new(std::fs::File::create(file)?) as Box<dyn Write>
824 } else {
825 Box::new(std::io::sink()) as Box<dyn Write>
826 })
827 }
828
829 fn validate(&self, bytes: &[u8]) -> Result<(), Error> {
830 self.write_and_hash(self.output_file()?, &bytes)
831 }
832 }
833
834 #[fuchsia::test]
835 fn encode_sbc() {
836 let mut exec = fasync::TestExecutor::new();
837
838 let pcm_format = PcmFormat {
839 pcm_mode: AudioPcmMode::Linear,
840 bits_per_sample: 16,
841 frames_per_second: 44100,
842 channel_map: vec![AudioChannelId::Cf],
843 };
844
845 let sub_bands = SbcSubBands::SubBands4;
846 let block_count = SbcBlockCount::BlockCount8;
847
848 let input_frames = 3000;
849
850 let pcm_audio = PcmAudio::create_saw_wave(pcm_format.clone(), input_frames);
851
852 let sbc_encoder_settings = EncoderSettings::Sbc(SbcEncoderSettings {
853 sub_bands,
854 block_count,
855 allocation: SbcAllocation::AllocLoudness,
856 channel_mode: SbcChannelMode::Mono,
857 bit_pool: 59, });
859
860 let input_domain = DomainFormat::Audio(AudioFormat::Uncompressed(
861 AudioUncompressedFormat::Pcm(pcm_format),
862 ));
863
864 let mut encoder = StreamProcessor::create_encoder(input_domain, sbc_encoder_settings)
865 .expect("to create Encoder");
866
867 let frames_per_packet: usize = 8; let packet_size = pcm_audio.frame_size() * frames_per_packet;
869 let mut packets = pcm_audio.buffer.as_slice().chunks(packet_size);
870 let first_packet = packets.next().unwrap();
871
872 let written =
875 exec.run_singlethreaded(&mut encoder.write(first_packet)).expect("successful write");
876 assert_eq!(written, first_packet.len());
877
878 let mut encoded_stream = encoder.take_output_stream().expect("Stream should be taken");
879
880 assert!(encoder.take_output_stream().is_err());
882
883 let encoded_fut = pin!(encoded_stream.next());
886
887 let (waker, encoder_fut_wake_count) = new_count_waker();
888 let mut counting_ctx = Context::from_waker(&waker);
889
890 assert!(encoded_fut.poll(&mut counting_ctx).is_pending());
891
892 let mut frames_sent = first_packet.len() / pcm_audio.frame_size();
893
894 for packet in packets {
895 let mut written_fut = encoder.write(&packet);
896
897 let written_bytes =
898 exec.run_singlethreaded(&mut written_fut).expect("to write to encoder");
899
900 assert_eq!(packet.len(), written_bytes);
901 frames_sent += packet.len() / pcm_audio.frame_size();
902 }
903
904 encoder.close().expect("stream should always be closable");
905
906 assert_eq!(input_frames, frames_sent);
907
908 let woke_count = encoder_fut_wake_count.get();
911 while encoder_fut_wake_count.get() == woke_count {
912 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
913 }
914 assert_eq!(encoder_fut_wake_count.get(), woke_count + 1);
915
916 let mut encoded = Vec::new();
918
919 loop {
920 let mut encoded_fut = encoded_stream.next();
921
922 match exec.run_singlethreaded(&mut encoded_fut) {
923 Some(Ok(enc_data)) => {
924 assert!(!enc_data.is_empty());
925 encoded.extend_from_slice(&enc_data);
926 }
927 Some(Err(e)) => {
928 panic!("Unexpected error when polling encoded data: {}", e);
929 }
930 None => {
931 break;
932 }
933 }
934 }
935
936 let expected_digest = ExpectedDigest::new(
938 "Sbc: 44.1kHz/Loudness/Mono/bitpool 56/blocks 8/subbands 4",
939 "5c65a88bda3f132538966d87df34aa8675f85c9892b7f9f5571f76f3c7813562",
940 );
941 let hash_validator = BytesValidator { output_file: None, expected_digest };
942
943 assert_eq!(6110, encoded.len(), "Encoded size should be equal");
944
945 let validated = hash_validator.validate(encoded.as_slice());
946 assert!(validated.is_ok(), "Failed hash: {:?}", validated);
947 }
948
949 fn fix_sbc_test_file<F>(_name: &str, test: F)
950 where
951 F: FnOnce(Vec<u8>) -> (),
952 {
953 const SBC_TEST_FILE: &str = "/pkg/data/s16le44100mono.sbc";
954
955 let mut sbc_data = Vec::new();
956 let _ = File::open(SBC_TEST_FILE)
957 .expect("open test file")
958 .read_to_end(&mut sbc_data)
959 .expect("read test file");
960
961 test(sbc_data)
962 }
963
964 #[fixture(fix_sbc_test_file)]
965 #[fuchsia::test]
966 fn decode_sbc(sbc_data: Vec<u8>) {
967 let mut exec = fasync::TestExecutor::new();
968
969 const SBC_FRAME_SIZE: usize = 72;
970 const INPUT_FRAMES: usize = 23;
971
972 let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
974 let mut decoder =
975 StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
976
977 let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
978
979 assert!(decoder.take_output_stream().is_err());
981
982 let mut frames_sent = 0;
983
984 let frames_per_packet: usize = 1; let packet_size = SBC_FRAME_SIZE * frames_per_packet;
986
987 for frames in sbc_data.as_slice().chunks(packet_size) {
988 let mut written_fut = decoder.write(&frames);
989
990 let written_bytes =
991 exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
992
993 assert_eq!(frames.len(), written_bytes);
994 frames_sent += frames.len() / SBC_FRAME_SIZE;
995 }
996
997 assert_eq!(INPUT_FRAMES, frames_sent);
998
999 let mut flush_fut = pin!(decoder.flush());
1000 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1001
1002 decoder.close().expect("stream should always be closable");
1003
1004 let mut decoded = Vec::new();
1006
1007 loop {
1008 let mut decoded_fut = decoded_stream.next();
1009
1010 match exec.run_singlethreaded(&mut decoded_fut) {
1011 Some(Ok(dec_data)) => {
1012 assert!(!dec_data.is_empty());
1013 decoded.extend_from_slice(&dec_data);
1014 }
1015 Some(Err(e)) => {
1016 panic!("Unexpected error when polling decoded data: {}", e);
1017 }
1018 None => {
1019 break;
1020 }
1021 }
1022 }
1023
1024 let expected_digest = ExpectedDigest::new(
1026 "Pcm: 44.1kHz/16bit/Mono",
1027 "ff2e7afea51217886d3df15b9a623b4e49c9bd9bd79c58ac01bc94c5511e08d6",
1028 );
1029 let hash_validator = BytesValidator { output_file: None, expected_digest };
1030
1031 assert_eq!(256 * INPUT_FRAMES, decoded.len(), "Decoded size should be equal");
1032
1033 let validated = hash_validator.validate(decoded.as_slice());
1034 assert!(validated.is_ok(), "Failed hash: {:?}", validated);
1035 }
1036
1037 #[fixture(fix_sbc_test_file)]
1038 #[fuchsia::test]
1039 fn decode_sbc_wakes_output_to_process_events(sbc_data: Vec<u8>) {
1040 let mut exec = fasync::TestExecutor::new();
1041 const SBC_FRAME_SIZE: usize = 72;
1042
1043 let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1045 let mut decoder =
1046 StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1047
1048 let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1049 let next_frame = chunks.next().unwrap();
1050
1051 let written =
1054 exec.run_singlethreaded(&mut decoder.write(next_frame)).expect("successful write");
1055 assert_eq!(written, next_frame.len());
1056
1057 let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1058
1059 let decoded_fut = pin!(decoded_stream.next());
1062
1063 let (waker, decoder_fut_wake_count) = new_count_waker();
1064 let mut counting_ctx = Context::from_waker(&waker);
1065
1066 assert!(decoded_fut.poll(&mut counting_ctx).is_pending());
1067
1068 let frame = chunks.next().unwrap();
1071 let mut written_fut = decoder.write(&frame);
1072 let written_bytes = exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
1073 assert_eq!(frame.len(), written_bytes);
1074
1075 let mut flush_fut = pin!(decoder.flush());
1076 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1077
1078 assert_eq!(decoder_fut_wake_count.get(), 0);
1081 while decoder_fut_wake_count.get() == 0 {
1082 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1083 }
1084 assert_eq!(decoder_fut_wake_count.get(), 1);
1085
1086 let mut decoded = Vec::new();
1087 let mut decoded_fut = decoded_stream.next();
1089
1090 match exec.run_singlethreaded(&mut decoded_fut) {
1091 Some(Ok(dec_data)) => {
1092 assert!(!dec_data.is_empty());
1093 decoded.extend_from_slice(&dec_data);
1094 }
1095 x => panic!("Expected decoded frame, got {:?}", x),
1096 }
1097
1098 assert_eq!(512, decoded.len(), "Decoded size should be equal to one frame");
1099 }
1100
1101 #[fixture(fix_sbc_test_file)]
1102 #[fuchsia::test]
1103 fn decode_sbc_wakes_input_to_process_events(sbc_data: Vec<u8>) {
1104 let mut exec = fasync::TestExecutor::new();
1105 const SBC_FRAME_SIZE: usize = 72;
1106
1107 let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1109 let mut decoder =
1110 StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1111
1112 let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1113
1114 let decoded_fut = pin!(decoded_stream.next());
1115
1116 let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1117 let next_frame = chunks.next().unwrap();
1118
1119 let (written_res, mut decoded_fut) =
1125 run_while(&mut exec, decoded_fut, decoder.write(next_frame));
1126 assert_eq!(written_res.expect("initial write should succeed"), next_frame.len());
1127
1128 let (waker, write_fut_wake_count) = new_count_waker();
1132 let mut counting_ctx = Context::from_waker(&waker);
1133
1134 let mut wake_count_before_stall = 0;
1135 for frame in chunks {
1136 wake_count_before_stall = write_fut_wake_count.get();
1137 let mut written_fut = decoder.write(&frame);
1138 if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1139 if write_fut_wake_count.get() != wake_count_before_stall {
1142 continue;
1143 }
1144 break;
1147 }
1148 let mut flush_fut = pin!(decoder.flush());
1150 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1151 }
1152
1153 let decoded_frame = exec.run_singlethreaded(&mut decoded_fut);
1155 assert_eq!(512, decoded_frame.unwrap().unwrap().len(), "Decoded frame size wrong");
1156
1157 let chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1159 for frame in chunks {
1160 wake_count_before_stall = write_fut_wake_count.get();
1161 let mut written_fut = decoder.write(&frame);
1162 if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1163 if write_fut_wake_count.get() != wake_count_before_stall {
1166 continue;
1167 }
1168 break;
1169 }
1170 let mut flush_fut = pin!(decoder.flush());
1172 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1173 }
1174
1175 while write_fut_wake_count.get() == wake_count_before_stall {
1180 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1181 }
1182
1183 }
1186}