1use anyhow::format_err;
6use fuchsia_audio_codec::{StreamProcessor, StreamProcessorOutputStream};
7use fuchsia_audio_device::stream_config::SoftStreamConfig;
8use fuchsia_audio_device::{AudioFrameSink, AudioFrameStream};
9use fuchsia_bluetooth::types::{peer_audio_stream_id, PeerId};
10use fuchsia_sync::Mutex;
11use futures::stream::BoxStream;
12use futures::task::Context;
13use futures::{AsyncWriteExt, FutureExt, StreamExt};
14use log::{error, info, warn};
15use media::AudioDeviceEnumeratorProxy;
16use std::pin::pin;
17use {fidl_fuchsia_bluetooth_bredr as bredr, fidl_fuchsia_media as media, fuchsia_async as fasync};
18
19use crate::audio::{Control, ControlEvent, Error, HF_INPUT_UUID, HF_OUTPUT_UUID};
20use crate::codec_id::CodecId;
21use crate::sco;
22
23pub struct InbandControl {
26 audio_core: media::AudioDeviceEnumeratorProxy,
27 session_task: Option<(PeerId, fasync::Task<()>)>,
28 event_sender: Mutex<futures::channel::mpsc::Sender<ControlEvent>>,
29 stream: Mutex<Option<futures::channel::mpsc::Receiver<ControlEvent>>>,
30}
31
32struct AudioSession {
37 audio_frame_sink: AudioFrameSink,
38 audio_frame_stream: AudioFrameStream,
39 sco: sco::Connection,
40 codec: CodecId,
41 decoder: StreamProcessor,
42 encoder: StreamProcessor,
43 event_sender: futures::channel::mpsc::Sender<ControlEvent>,
44}
45
46impl AudioSession {
47 fn setup(
48 connection: sco::Connection,
49 codec: CodecId,
50 audio_frame_sink: AudioFrameSink,
51 audio_frame_stream: AudioFrameStream,
52 event_sender: futures::channel::mpsc::Sender<ControlEvent>,
53 ) -> Result<Self, Error> {
54 if !codec.is_supported() {
55 return Err(Error::UnsupportedParameters {
56 source: format_err!("unsupported codec {codec}"),
57 });
58 }
59 let decoder = StreamProcessor::create_decoder(codec.mime_type()?, Some(codec.oob_bytes()))
60 .map_err(|e| Error::audio_core(format_err!("creating decoder: {e:?}")))?;
61 let encoder = StreamProcessor::create_encoder(codec.try_into()?, codec.try_into()?)
62 .map_err(|e| Error::audio_core(format_err!("creating encoder: {e:?}")))?;
63 Ok(Self {
64 sco: connection,
65 decoder,
66 encoder,
67 audio_frame_sink,
68 audio_frame_stream,
69 codec,
70 event_sender,
71 })
72 }
73
74 async fn encoder_to_sco(
75 mut encoded_stream: StreamProcessorOutputStream,
76 proxy: bredr::ScoConnectionProxy,
77 codec: CodecId,
78 ) -> Error {
79 let packet: Vec<u8> = vec![0; 60]; let mut request =
82 bredr::ScoConnectionWriteRequest { data: Some(packet), ..Default::default() };
83
84 const MSBC_ENCODED_LEN: usize = 57; if codec == CodecId::MSBC {
86 let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
87 packet[0] = 0x01; }
90 let mut h2_marker = [0x08u8, 0x38, 0xc8, 0xf8].iter().cycle();
92 loop {
93 match encoded_stream.next().await {
94 Some(Ok(encoded)) => {
95 if codec == CodecId::MSBC {
96 if encoded.len() % MSBC_ENCODED_LEN != 0 {
97 warn!("Got {} bytes, uneven number of packets", encoded.len());
98 }
99 for sbc_packet in encoded.as_slice().chunks_exact(MSBC_ENCODED_LEN) {
100 let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
101 packet[1] = *h2_marker.next().unwrap();
102 packet[2..59].copy_from_slice(sbc_packet);
103 if let Err(e) = proxy.write(&request).await {
104 return e.into();
105 }
106 }
107 } else {
108 for cvsd_packet in encoded.as_slice().chunks_exact(60) {
111 let packet: &mut [u8] = request.data.as_mut().unwrap().as_mut_slice();
112 packet.copy_from_slice(cvsd_packet);
113 if let Err(e) = proxy.write(&request).await {
114 return e.into();
115 }
116 }
117 }
118 }
119 Some(Err(e)) => {
120 warn!("Error in encoding: {e:?}");
121 return Error::audio_core(format_err!("Couldn't read encoded: {e:?}"));
122 }
123 None => {
124 warn!("Error in encoding: Stream is ended!");
125 return Error::audio_core(format_err!("Encoder stream ended early"));
126 }
127 }
128 }
129 }
130
131 async fn pcm_to_encoder(mut encoder: StreamProcessor, mut stream: AudioFrameStream) -> Error {
132 loop {
133 match stream.next().await {
134 Some(Ok(pcm)) => {
135 if let Err(e) = encoder.write_all(pcm.as_slice()).await {
136 return Error::audio_core(format_err!("write to encoder: {e:?}"));
137 }
138 if let Err(e) = encoder.flush().await {
140 return Error::audio_core(format_err!("flush encoder: {e:?}"));
141 }
142 }
143 Some(Err(e)) => {
144 warn!("Audio output error: {e:?}");
145 return Error::audio_core(format_err!("output error: {e:?}"));
146 }
147 None => {
148 warn!("Ran out of audio input!");
149 return Error::audio_core(format_err!("Audio input end"));
150 }
151 }
152 }
153 }
154
155 async fn decoder_to_pcm(
156 mut decoded_stream: StreamProcessorOutputStream,
157 mut sink: AudioFrameSink,
158 ) -> Error {
159 let mut decoded_packets = 0;
160 loop {
161 match decoded_stream.next().await {
162 Some(Ok(decoded)) => {
163 decoded_packets += 1;
164 if decoded_packets % 500 == 0 {
165 info!(
166 "Got {} decoded bytes from decoder: {decoded_packets} packets",
167 decoded.len()
168 );
169 }
170 if let Err(e) = sink.write_all(decoded.as_slice()).await {
171 warn!("Error sending to sink: {e:?}");
172 return Error::audio_core(format_err!("send to sink: {e:?}"));
173 }
174 }
175 Some(Err(e)) => {
176 warn!("Error in decoding: {e:?}");
177 return Error::audio_core(format_err!("Couldn't read decoder: {e:?}"));
178 }
179 None => {
180 warn!("Error in decoding: Stream is ended!");
181 return Error::audio_core(format_err!("Decoder stream ended early"));
182 }
183 }
184 }
185 }
186
187 async fn sco_to_decoder(
188 proxy: bredr::ScoConnectionProxy,
189 mut decoder: StreamProcessor,
190 codec: CodecId,
191 ) -> Error {
192 loop {
193 let data = match proxy.read().await {
194 Ok(bredr::ScoConnectionReadResponse { data: Some(data), .. }) => data,
195 Ok(_) => return Error::audio_core(format_err!("Invalid Read response")),
196 Err(e) => return e.into(),
197 };
198 let packet = match codec {
199 CodecId::CVSD => data.as_slice(),
200 CodecId::MSBC => {
201 let (_header, packet) = data.as_slice().split_at(2);
203 if packet[0] != 0xad {
204 info!(
205 "Packet didn't start with syncword: {:#02x} {}",
206 packet[0],
207 packet.len()
208 );
209 }
210 packet
211 }
212 _ => {
213 return Error::UnsupportedParameters {
214 source: format_err!("Unknown CodecId: {codec:?}"),
215 }
216 }
217 };
218 if let Err(e) = decoder.write_all(packet).await {
219 return Error::audio_core(format_err!("Failed to write to decoder: {e:?}"));
220 }
221 if let Err(e) = decoder.flush().await {
224 return Error::audio_core(format_err!("Failed to flush decoder: {e:?}"));
225 }
226 }
227 }
228
229 async fn run(mut self) {
230 let peer_id = self.sco.peer_id;
231 let Ok(encoded_stream) = self.encoder.take_output_stream() else {
232 error!("Couldn't take encoder output stream");
233 return;
234 };
235 let sco_write =
236 AudioSession::encoder_to_sco(encoded_stream, self.sco.proxy.clone(), self.codec);
237 let sco_write = pin!(sco_write);
238 let audio_to_encoder = AudioSession::pcm_to_encoder(self.encoder, self.audio_frame_stream);
239 let audio_to_encoder = pin!(audio_to_encoder);
240
241 let Ok(decoded_stream) = self.decoder.take_output_stream() else {
242 error!("Couldn't take decoder output stream");
243 return;
244 };
245 let decoder_to_sink =
246 pin!(AudioSession::decoder_to_pcm(decoded_stream, self.audio_frame_sink));
247 let sco_read =
248 AudioSession::sco_to_decoder(self.sco.proxy.clone(), self.decoder, self.codec);
249 let sco_read = pin!(sco_read);
250 let e = futures::select! {
251 e = audio_to_encoder.fuse() => { warn!(e:?; "PCM to encoder write"); e},
252 e = sco_write.fuse() => { warn!(e:?; "Write encoded to SCO"); e},
253 e = sco_read.fuse() => { warn!(e:?; "SCO read to decoder"); e},
254 e = decoder_to_sink.fuse() => { warn!(e:?; "SCO decoder to PCM"); e},
255 };
256 let _ = self.event_sender.try_send(ControlEvent::Stopped { id: peer_id, error: Some(e) });
257 }
258
259 fn start(self) -> fasync::Task<()> {
260 fasync::Task::spawn(self.run())
261 }
262}
263
264impl InbandControl {
265 pub fn create(proxy: AudioDeviceEnumeratorProxy) -> Result<Self, Error> {
266 let (sender, receiver) = futures::channel::mpsc::channel(1);
267 Ok(Self {
268 audio_core: proxy,
269 session_task: None,
270 event_sender: Mutex::new(sender),
271 stream: Mutex::new(Some(receiver)),
272 })
273 }
274
275 fn running_id(&mut self) -> Option<PeerId> {
276 self.session_task
277 .as_mut()
278 .and_then(|(running, task)| {
279 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
280 task.poll_unpin(&mut cx).is_pending().then_some(running)
283 })
284 .copied()
285 }
286
287 const LOCAL_MONOTONIC_CLOCK_DOMAIN: u32 = 0;
288
289 const AUDIO_BUFFER_DURATION: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(15);
292
293 fn start_input(&mut self, peer_id: PeerId, codec_id: CodecId) -> Result<AudioFrameSink, Error> {
294 let audio_dev_id = peer_audio_stream_id(peer_id, HF_INPUT_UUID);
295 let (client, sink) = SoftStreamConfig::create_input(
296 &audio_dev_id,
297 "Fuchsia",
298 super::DEVICE_NAME,
299 Self::LOCAL_MONOTONIC_CLOCK_DOMAIN,
300 codec_id.try_into()?,
301 Self::AUDIO_BUFFER_DURATION,
302 )
303 .map_err(|e| Error::audio_core(format_err!("Couldn't create input: {e:?}")))?;
304
305 self.audio_core.add_device_by_channel(super::DEVICE_NAME, true, client)?;
306 Ok(sink)
307 }
308
309 fn start_output(
310 &mut self,
311 peer_id: PeerId,
312 codec_id: CodecId,
313 ) -> Result<AudioFrameStream, Error> {
314 let audio_dev_id = peer_audio_stream_id(peer_id, HF_OUTPUT_UUID);
315 let (client, stream) = SoftStreamConfig::create_output(
316 &audio_dev_id,
317 "Fuchsia",
318 super::DEVICE_NAME,
319 Self::LOCAL_MONOTONIC_CLOCK_DOMAIN,
320 codec_id.try_into()?,
321 Self::AUDIO_BUFFER_DURATION,
322 zx::MonotonicDuration::from_millis(0),
323 )
324 .map_err(|e| Error::audio_core(format_err!("Couldn't create output: {e:?}")))?;
325 self.audio_core.add_device_by_channel(super::DEVICE_NAME, false, client)?;
326 Ok(stream)
327 }
328}
329
330impl Control for InbandControl {
331 fn start(
332 &mut self,
333 id: PeerId,
334 connection: sco::Connection,
335 codec: CodecId,
336 ) -> Result<(), Error> {
337 if let Some(running) = self.running_id() {
338 if running == id {
339 return Err(Error::AlreadyStarted);
340 }
341 return Err(Error::UnsupportedParameters {
342 source: format_err!("Only one peer can be started inband at once"),
343 });
344 }
345 let frame_sink = self.start_input(id, codec)?;
346 let frame_stream = self.start_output(id, codec)?;
347 let session = AudioSession::setup(
348 connection,
349 codec,
350 frame_sink,
351 frame_stream,
352 self.event_sender.lock().clone(),
353 )?;
354 self.session_task = Some((id, session.start()));
355 Ok(())
356 }
357
358 fn stop(&mut self, id: PeerId) -> Result<(), Error> {
359 if self.running_id() != Some(id) {
360 return Err(Error::NotStarted);
361 }
362 self.session_task = None;
363 let _ = self.event_sender.get_mut().try_send(ControlEvent::Stopped { id, error: None });
364 Ok(())
365 }
366
367 fn connect(&mut self, _id: PeerId, _supported_codecs: &[CodecId]) {
368 }
370
371 fn disconnect(&mut self, id: PeerId) {
372 let _ = self.stop(id);
373 }
374
375 fn take_events(&self) -> BoxStream<'static, ControlEvent> {
376 self.stream.lock().take().unwrap().boxed()
377 }
378
379 fn failed_request(&self, _request: ControlEvent, _error: Error) {
380 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 use fidl_fuchsia_bluetooth_bredr::ScoConnectionRequestStream;
389
390 use crate::sco::test_utils::connection_for_codec;
391
392 const ZERO_INPUT_SBC_PACKET: [u8; 60] = [
395 0x80, 0x10, 0xad, 0x00, 0x00, 0xc5, 0x00, 0x00, 0x00, 0x00, 0x77, 0x6d, 0xb6, 0xdd, 0xdb,
396 0x6d, 0xb7, 0x76, 0xdb, 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7,
397 0x76, 0xdb, 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7, 0x76, 0xdb,
398 0x6d, 0xdd, 0xb6, 0xdb, 0x77, 0x6d, 0xb6, 0xdd, 0xdb, 0x6d, 0xb7, 0x76, 0xdb, 0x6c, 0x00,
399 ];
400
401 const ZERO_INPUT_CVSD_PACKET: [u8; 60] = [0x55; 60];
403
404 #[derive(PartialEq, Debug)]
405 enum ProcessedRequest {
406 ScoRead,
407 ScoWrite(Vec<u8>),
408 }
409
410 async fn process_sco_request(
412 sco_request_stream: &mut ScoConnectionRequestStream,
413 read_data: Vec<u8>,
414 ) -> Option<ProcessedRequest> {
415 match sco_request_stream.next().await {
416 Some(Ok(bredr::ScoConnectionRequest::Read { responder })) => {
417 let response = bredr::ScoConnectionReadResponse {
418 status_flag: Some(bredr::RxPacketStatus::CorrectlyReceivedData),
419 data: Some(read_data),
420 ..Default::default()
421 };
422 responder.send(&response).expect("sends okay");
423 Some(ProcessedRequest::ScoRead)
424 }
425 Some(Ok(bredr::ScoConnectionRequest::Write { payload, responder })) => {
426 responder.send().expect("response to write");
427 Some(ProcessedRequest::ScoWrite(payload.data.unwrap()))
428 }
429 None => None,
430 x => panic!("Expected read or write requests, got {x:?}"),
431 }
432 }
433
434 #[fuchsia::test]
435 async fn reads_audio_from_connection() {
436 let (proxy, _audio_enumerator_requests) =
437 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
438 let mut control = InbandControl::create(proxy).unwrap();
439
440 let (connection, mut sco_request_stream) =
441 connection_for_codec(PeerId(1), CodecId::MSBC, true);
442
443 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
444
445 let (connection2, _request_stream) = connection_for_codec(PeerId(1), CodecId::MSBC, true);
446 let _ = control
447 .start(PeerId(1), connection2, CodecId::MSBC)
448 .expect_err("Starting twice shouldn't be allowed");
449
450 for _ in 1..10 {
453 assert_eq!(
454 Some(ProcessedRequest::ScoRead),
455 process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
456 );
457 }
458
459 control.stop(PeerId(1)).expect("should be able to stop");
460 let _ = control.stop(PeerId(1)).expect_err("can't stop a stopped thing");
461
462 let mut extra_requests = 0;
464 while let Some(r) =
465 process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
466 {
467 assert_eq!(ProcessedRequest::ScoRead, r);
468 extra_requests += 1;
469 }
470
471 info!("Got {extra_requests} extra ScoConnectionProxy Requests after stop");
472 }
473
474 #[fuchsia::test]
475 async fn audio_setup_error_bad_codec() {
476 let (proxy, _) =
477 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
478 let mut control = InbandControl::create(proxy).unwrap();
479
480 let (connection, _sco_request_stream) =
481 connection_for_codec(PeerId(1), CodecId::MSBC, true);
482 let res = control.start(PeerId(1), connection, 0xD0u8.into());
483 assert!(res.is_err());
484 }
485
486 #[fuchsia::test]
487 async fn decode_sco_audio_path() {
488 use fidl_fuchsia_hardware_audio as audio;
489 let (proxy, mut audio_enumerator_requests) =
490 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
491 let mut control = InbandControl::create(proxy).unwrap();
492
493 let (connection, mut sco_request_stream) =
494 connection_for_codec(PeerId(1), CodecId::MSBC, true);
495
496 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
497
498 let audio_input_stream_config;
499 let mut _audio_output_stream_config;
500 loop {
501 match audio_enumerator_requests.next().await {
502 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
503 is_input,
504 channel,
505 ..
506 })) => {
507 if is_input {
508 audio_input_stream_config = channel.into_proxy();
509 break;
510 } else {
511 _audio_output_stream_config = channel.into_proxy();
512 }
513 }
514 x => panic!("Expected audio device by channel, got {x:?}"),
515 }
516 }
517
518 let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
519 audio_input_stream_config
520 .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
521 .expect("create ring buffer");
522
523 assert_eq!(
525 Some(ProcessedRequest::ScoRead),
526 process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
527 );
528
529 let notifications_per_ring = 20;
530 let (frames, _vmo) = ring_buffer
533 .get_vmo(16000, notifications_per_ring)
534 .await
535 .expect("fidl")
536 .expect("response");
537
538 let mut position_info = ring_buffer.watch_clock_recovery_position_info();
540 let mut position_notifications = 0;
541
542 let _ = ring_buffer.start().await;
543
544 let frames_per_notification = frames / notifications_per_ring;
546 let expected_notifications = 12000 / frames_per_notification;
549
550 if position_info
553 .poll_unpin(&mut Context::from_waker(futures::task::noop_waker_ref()))
554 .is_ready()
555 {
556 position_notifications += 1;
557 position_info = ring_buffer.watch_clock_recovery_position_info();
558 }
559 for _ in 1..100 {
560 assert_eq!(
561 Some(ProcessedRequest::ScoRead),
562 process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec()).await
563 );
564 if position_info
566 .poll_unpin(&mut Context::from_waker(futures::task::noop_waker_ref()))
567 .is_ready()
568 {
569 position_notifications += 1;
570 position_info = ring_buffer.watch_clock_recovery_position_info();
571 }
572 }
573
574 assert!(position_notifications >= expected_notifications);
578 assert!(position_notifications <= expected_notifications + 1);
579 }
580
581 #[fuchsia::test]
582 async fn encode_sco_audio_path_msbc() {
583 use fidl_fuchsia_hardware_audio as audio;
584 let (proxy, mut audio_enumerator_requests) =
585 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
586 let mut control = InbandControl::create(proxy).unwrap();
587
588 let (connection, mut sco_request_stream) =
589 connection_for_codec(PeerId(1), CodecId::MSBC, true);
590
591 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
592
593 let audio_output_stream_config;
594 let mut _audio_input_stream_config;
595 loop {
596 match audio_enumerator_requests.next().await {
597 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
598 is_input,
599 channel,
600 ..
601 })) => {
602 if !is_input {
603 audio_output_stream_config = channel.into_proxy();
604 break;
605 } else {
606 _audio_input_stream_config = channel.into_proxy();
607 }
608 }
609 x => panic!("Expected audio device by channel, got {x:?}"),
610 }
611 }
612
613 let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
614 audio_output_stream_config
615 .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
616 .unwrap();
617
618 let notifications_per_ring = 20;
622 let (_frames, _vmo) = ring_buffer
624 .get_vmo(16000, notifications_per_ring)
625 .await
626 .expect("fidl")
627 .expect("response");
628
629 let _ = ring_buffer.start().await;
630
631 let next_header = &mut [0x01, 0x08];
633 for _sco_frame in 1..100 {
634 'sco: loop {
635 match process_sco_request(&mut sco_request_stream, ZERO_INPUT_SBC_PACKET.to_vec())
636 .await
637 {
638 Some(ProcessedRequest::ScoRead) => continue 'sco,
639 Some(ProcessedRequest::ScoWrite(data)) => {
640 assert_eq!(60, data.len());
641 assert_eq!(&ZERO_INPUT_SBC_PACKET[2..], &data[2..]);
643 assert_eq!(next_header, &data[0..2]);
644 match next_header[1] {
646 0x08 => next_header[1] = 0x38,
647 0x38 => next_header[1] = 0xc8,
648 0xc8 => next_header[1] = 0xf8,
649 0xf8 => next_header[1] = 0x08,
650 _ => unreachable!(),
651 };
652 break 'sco;
653 }
654 x => panic!("Expected read or write but got {x:?}"),
655 };
656 }
657 }
658 }
659
660 #[fuchsia::test]
661 async fn encode_sco_audio_path_cvsd() {
662 use fidl_fuchsia_hardware_audio as audio;
663 let (proxy, mut audio_enumerator_requests) =
664 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
665 let mut control = InbandControl::create(proxy).unwrap();
666
667 let (connection, mut sco_request_stream) =
668 connection_for_codec(PeerId(1), CodecId::CVSD, true);
669
670 control.start(PeerId(1), connection, CodecId::CVSD).expect("should be able to start");
671
672 let audio_output_stream_config;
673 let mut _audio_input_stream_config;
674 loop {
675 match audio_enumerator_requests.next().await {
676 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
677 is_input,
678 channel,
679 ..
680 })) => {
681 if !is_input {
682 audio_output_stream_config = channel.into_proxy();
683 break;
684 } else {
685 _audio_input_stream_config = channel.into_proxy();
686 }
687 }
688 x => panic!("Expected audio device by channel, got {x:?}"),
689 }
690 }
691
692 let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
693 audio_output_stream_config
694 .create_ring_buffer(&CodecId::CVSD.try_into().unwrap(), server)
695 .unwrap();
696
697 let notifications_per_ring = 10;
701 let (_frames, _vmo) = ring_buffer
703 .get_vmo(64000, notifications_per_ring)
704 .await
705 .expect("fidl")
706 .expect("response");
707
708 let _ = ring_buffer.start().await;
709
710 for _sco_frame in 1..100 {
712 'sco: loop {
713 match process_sco_request(&mut sco_request_stream, ZERO_INPUT_CVSD_PACKET.to_vec())
714 .await
715 {
716 Some(ProcessedRequest::ScoRead) => continue 'sco,
717 Some(ProcessedRequest::ScoWrite(data)) => {
718 assert_eq!(60, data.len());
720 assert_eq!(&ZERO_INPUT_CVSD_PACKET, data.as_slice());
721 break 'sco;
722 }
723 x => panic!("Expected read or write but got {x:?}"),
724 };
725 }
726 }
727 }
728
729 #[fuchsia::test]
730 async fn read_from_audio_output() {
731 use fidl_fuchsia_hardware_audio as audio;
732 let (proxy, mut audio_enumerator_requests) =
733 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
734 let mut control = InbandControl::create(proxy).unwrap();
735
736 let (connection, mut sco_request_stream) =
737 connection_for_codec(PeerId(1), CodecId::MSBC, true);
738
739 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
740
741 let audio_output_stream_config;
742 let mut _audio_input_stream_config;
743 loop {
744 match audio_enumerator_requests.next().await {
745 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
746 is_input,
747 channel,
748 ..
749 })) => {
750 if !is_input {
751 audio_output_stream_config = channel.into_proxy();
752 break;
753 } else {
754 _audio_input_stream_config = channel.into_proxy();
755 }
756 }
757 x => panic!("Expected audio device by channel, got {x:?}"),
758 }
759 }
760
761 let (ring_buffer, server) = fidl::endpoints::create_proxy::<audio::RingBufferMarker>();
762 audio_output_stream_config
763 .create_ring_buffer(&CodecId::MSBC.try_into().unwrap(), server)
764 .expect("create ring buffer");
765
766 let notifications_per_ring = 20;
767 let (_frames, _vmo) = ring_buffer
769 .get_vmo(16000, notifications_per_ring)
770 .await
771 .expect("fidl")
772 .expect("response");
773
774 let _ = ring_buffer.start().await;
775
776 'position_notifications: for i in 1..20 {
779 let mut position_info = ring_buffer.watch_clock_recovery_position_info();
780 loop {
781 let sco_activity = Box::pin(process_sco_request(
782 &mut sco_request_stream,
783 ZERO_INPUT_SBC_PACKET.to_vec(),
784 ));
785 use futures::future::Either;
786 match futures::future::select(position_info, sco_activity).await {
787 Either::Left((result, _sco_fut)) => {
788 assert!(result.is_ok(), "Position Info failed at {i}");
789 continue 'position_notifications;
790 }
791 Either::Right((_sco_pkt, position_info_fut)) => {
792 position_info = position_info_fut;
793 }
794 }
795 }
796 }
797 }
798
799 #[fuchsia::test]
800 async fn audio_output_error_sends_to_events() {
801 let (proxy, mut audio_enumerator_requests) =
802 fidl::endpoints::create_proxy_and_stream::<media::AudioDeviceEnumeratorMarker>();
803 let mut control = InbandControl::create(proxy).unwrap();
804 let mut events = control.take_events();
805
806 let (connection, _sco_request_stream) =
807 connection_for_codec(PeerId(1), CodecId::MSBC, true);
808
809 control.start(PeerId(1), connection, CodecId::MSBC).expect("should be able to start");
810
811 let audio_output_stream_config;
812 let mut _audio_input_stream_config;
813 loop {
814 match audio_enumerator_requests.next().await {
815 Some(Ok(media::AudioDeviceEnumeratorRequest::AddDeviceByChannel {
816 is_input,
817 channel,
818 ..
819 })) => {
820 if !is_input {
821 audio_output_stream_config = channel.into_proxy();
822 break;
823 } else {
824 _audio_input_stream_config = channel.into_proxy();
825 }
826 }
827 x => panic!("Expected audio device by channel, got {x:?}"),
828 }
829 }
830
831 drop(audio_output_stream_config);
832
833 match events.next().await {
835 Some(ControlEvent::Stopped { id, error: Some(_) }) => {
836 assert_eq!(PeerId(1), id);
837 }
838 x => panic!("Expected the peer to have error stop, but got {x:?}"),
839 };
840 }
841}