1use anyhow::anyhow;
6use fuchsia_audio_device::codec;
7use fuchsia_audio_device::codec::CodecRequest;
8use fuchsia_bluetooth::types::{PeerId, peer_audio_stream_id};
9use fuchsia_sync::Mutex;
10use futures::stream::BoxStream;
11use futures::{SinkExt, StreamExt};
12use log::{info, warn};
13use std::sync::Arc;
14use {
15 fidl_fuchsia_audio_device as audio_device, fidl_fuchsia_hardware_audio as audio,
16 fuchsia_async as fasync,
17};
18
19use super::{Control, ControlEvent, Error, HF_INPUT_UUID};
20use crate::codec_id::CodecId;
21use crate::sco;
22
23struct CodecControlInner {
24 start_request:
25 Option<Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>>,
26 stop_request:
27 Option<Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>>,
28 earliest_start_time: fasync::MonotonicInstant,
31}
32
33impl Default for CodecControlInner {
34 fn default() -> Self {
35 Self {
36 start_request: None,
37 stop_request: None,
38 earliest_start_time: fasync::MonotonicInstant::INFINITE_PAST,
39 }
40 }
41}
42
43impl CodecControlInner {
44 fn clear(&mut self) {
45 *self = Self::default();
46 }
47
48 fn set_start_delay(&mut self) {
49 self.earliest_start_time = fasync::MonotonicInstant::after(DELAY_AFTER_STOP);
50 }
51}
52
53const DELAY_AFTER_STOP: fasync::MonotonicDuration = fasync::MonotonicDuration::from_seconds(1);
54
55pub struct CodecControl {
60 provider: audio_device::ProviderProxy,
61 codec_task: Option<fasync::Task<()>>,
62 events_sender: futures::channel::mpsc::Sender<ControlEvent>,
63 events_receiver: Mutex<Option<futures::channel::mpsc::Receiver<ControlEvent>>>,
64 codec_id: Option<CodecId>,
65 connection: Option<sco::Connection>,
66 connected_peer: Option<PeerId>,
67 inner: Arc<Mutex<CodecControlInner>>,
68}
69
70impl Control for CodecControl {
71 fn start(
72 &mut self,
73 id: PeerId,
74 connection: sco::Connection,
75 codec: crate::codec_id::CodecId,
76 ) -> Result<(), Error> {
77 if self.connection.is_some() {
78 return Err(Error::AlreadyStarted);
79 }
80 if Some(codec) != self.codec_id {
81 return Err(Error::UnsupportedParameters {
82 source: anyhow!("CodecId must match connected CodecId"),
83 });
84 }
85 if Some(id) != self.connected_peer {
86 return Err(Error::UnsupportedParameters {
87 source: anyhow!("Can't start a non-connected peer"),
88 });
89 };
90 let Some(start_request) = self.inner.lock().start_request.take() else {
91 return Err(Error::UnsupportedParameters {
92 source: anyhow!("Can only start in response to request"),
93 });
94 };
95 self.connection = Some(connection);
96 start_request(Ok(fuchsia_async::MonotonicInstant::now().into()));
97 Ok(())
98 }
99
100 fn stop(&mut self, id: PeerId) -> Result<(), Error> {
101 if self.connection.is_none() {
102 return Err(Error::NotStarted);
103 }
104 if Some(id) != self.connected_peer {
105 return Err(Error::UnsupportedParameters {
106 source: anyhow!("Can't stop a non-connected peer"),
107 });
108 }
109 let Some(stop_request) = self.inner.lock().stop_request.take() else {
110 return Err(Error::UnsupportedParameters {
111 source: anyhow!("Codec can only stop in response to request"),
112 });
113 };
114 self.connection = None;
115 self.inner.lock().set_start_delay();
116 stop_request(Ok(fuchsia_async::MonotonicInstant::now().into()));
117 Ok(())
118 }
119
120 fn connect(&mut self, id: PeerId, supported_codecs: &[CodecId]) {
121 let supported_formats: audio::DaiSupportedFormats;
122 if supported_codecs.contains(&CodecId::MSBC) {
123 self.codec_id = Some(CodecId::MSBC);
124 supported_formats = CodecId::MSBC.try_into().unwrap();
125 } else {
126 self.codec_id = Some(CodecId::CVSD);
127 supported_formats = CodecId::CVSD.try_into().unwrap();
128 };
129 let audio_dev_id = peer_audio_stream_id(id, HF_INPUT_UUID);
130 let (codec, client) = codec::SoftCodec::create(
131 Some(&audio_dev_id),
132 "Fuchsia",
133 super::DEVICE_NAME,
134 codec::CodecDirection::Duplex,
135 supported_formats.clone(),
136 true,
137 );
138 self.codec_task = Some(fasync::Task::local(codec_task(
139 id,
140 self.provider.clone(),
141 codec,
142 supported_formats,
143 client,
144 self.events_sender.clone(),
145 self.inner.clone(),
146 )));
147 self.connected_peer = Some(id);
148 }
149
150 fn disconnect(&mut self, id: PeerId) {
151 info!("Codec peer {id} disconnected, cleaning up");
152 self.codec_task = None;
153 self.connected_peer = None;
154 self.connection = None;
155 self.codec_id = None;
156 self.inner.lock().clear();
157 }
158
159 fn take_events(&self) -> BoxStream<'static, ControlEvent> {
160 self.events_receiver.lock().take().unwrap().boxed()
161 }
162
163 fn failed_request(&self, request: ControlEvent, _error: Error) {
164 match request {
165 ControlEvent::RequestStart { id: _ } => {
166 let Some(start_request) = self.inner.lock().start_request.take() else {
167 return;
168 };
169 start_request(Err(zx::Status::INTERNAL));
170 }
171 ControlEvent::RequestStop { id: _ } => {
172 let Some(stop_request) = self.inner.lock().start_request.take() else {
173 return;
174 };
175 stop_request(Err(zx::Status::INTERNAL));
176 }
177 _ => unreachable!(),
178 }
179 }
180}
181
182async fn codec_task(
183 id: PeerId,
184 provider: audio_device::ProviderProxy,
185 mut codec: codec::SoftCodec,
186 supported_formats: audio::DaiSupportedFormats,
187 client: fidl::endpoints::ClientEnd<audio::CodecMarker>,
188 mut event_sender: futures::channel::mpsc::Sender<ControlEvent>,
189 inner: Arc<Mutex<CodecControlInner>>,
190) {
191 let result = provider
192 .add_device(audio_device::ProviderAddDeviceRequest {
193 device_name: Some(super::DEVICE_NAME.into()),
194 device_type: Some(audio_device::DeviceType::Codec),
195 driver_client: Some(audio_device::DriverClient::Codec(client)),
196 ..Default::default()
197 })
198 .await;
199 match result {
200 Err(e) => {
201 warn!("FIDL Error adding device: {e:?}");
202 return;
203 }
204 Ok(Err(e)) => {
205 warn!("Failed to add device: {e:?}");
206 return;
207 }
208 Ok(Ok(_)) => {}
209 };
210 info!("Added Codec device!");
211 while let Some(event) = codec.next().await {
212 let Ok(event) = event else {
213 let _ = event_sender
214 .send(ControlEvent::Stopped {
215 id,
216 error: Some(Error::audio_core(event.err().unwrap().into())),
217 })
218 .await;
219 return;
220 };
221 info!("Codec request: {event:?}");
222 let audio_event = match event {
223 CodecRequest::SetFormat { format, responder } => {
224 if supported_formats.number_of_channels.contains(&format.number_of_channels)
225 && supported_formats.frame_formats.contains(&format.frame_format)
226 && supported_formats.sample_formats.contains(&format.sample_format)
227 && supported_formats.frame_rates.contains(&format.frame_rate)
228 && supported_formats.bits_per_slot.contains(&format.bits_per_slot)
229 && supported_formats.bits_per_sample.contains(&format.bits_per_sample)
230 {
231 responder(Ok(()));
232 } else {
233 responder(Err(zx::Status::NOT_SUPPORTED));
234 }
235 continue;
236 }
237 CodecRequest::Start { responder } => {
238 if inner.lock().start_request.is_some() {
239 responder(Err(zx::Status::ALREADY_EXISTS));
240 continue;
241 }
242 inner.lock().start_request = Some(responder);
243 let deadline = inner.lock().earliest_start_time.clone();
244 let now = fasync::MonotonicInstant::now();
245 if deadline > now {
246 info!(
247 "Delaying start request {}ms due to too-fast stop->start cycle",
248 (deadline - now).into_millis()
249 );
250 }
251 fasync::Timer::new(deadline).await;
252 ControlEvent::RequestStart { id }
253 }
254 CodecRequest::Stop { responder } => {
255 if inner.lock().stop_request.is_some() {
256 responder(Err(zx::Status::ALREADY_EXISTS));
257 continue;
258 }
259 inner.lock().stop_request = Some(responder);
260 ControlEvent::RequestStop { id }
261 }
262 };
263 let _ = event_sender.send(audio_event).await;
264 }
265 warn!("Codec device finished, dropping..!");
266}
267
268impl CodecControl {
269 pub fn new(provider: audio_device::ProviderProxy) -> Self {
270 let (events_sender, receiver) = futures::channel::mpsc::channel(1);
271 Self {
272 provider,
273 codec_task: None,
274 events_sender,
275 events_receiver: Mutex::new(Some(receiver)),
276 inner: Default::default(),
277 codec_id: None,
278 connection: None,
279 connected_peer: None,
280 }
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287
288 use fidl::endpoints::Proxy;
289 use fidl_fuchsia_audio_device::ProviderRequestStream;
290 use fixture::fixture;
291 use futures::FutureExt;
292 use futures::task::{Context, Poll};
293
294 use crate::sco::test_utils::connection_for_codec;
295
296 async fn connect_peer_to_codec(
297 codec: &mut CodecControl,
298 provider_requests: &mut ProviderRequestStream,
299 codecs: &[CodecId],
300 ) -> audio::CodecProxy {
301 codec.connect(PeerId(1), codecs);
302
303 let Some(Ok(audio_device::ProviderRequest::AddDevice {
304 payload:
305 audio_device::ProviderAddDeviceRequest {
306 driver_client: Some(client),
307 device_name: Some(_name),
308 device_type: Some(device_type),
309 ..
310 },
311 responder,
312 })) = provider_requests.next().await
313 else {
314 panic!("Expected a request from the connect");
315 };
316
317 assert_eq!(device_type, audio_device::DeviceType::Codec);
318
319 responder.send(Ok(&Default::default())).expect("response to succeed");
320
321 let audio_device::DriverClient::Codec(codec_client) = client else {
322 panic!("Should have provided a codec client");
323 };
324 codec_client.into_proxy()
325 }
326
327 async fn codec_setup_connected<F, Fut>(_test_name: &str, test: F)
328 where
329 F: FnOnce(audio::CodecProxy, CodecControl, ProviderRequestStream) -> Fut,
330 Fut: futures::Future<Output = ()>,
331 {
332 let (provider_proxy, mut provider_requests) =
333 fidl::endpoints::create_proxy_and_stream::<audio_device::ProviderMarker>();
334 let mut codec = CodecControl::new(provider_proxy);
335
336 let codec_proxy =
337 connect_peer_to_codec(&mut codec, &mut provider_requests, &[CodecId::MSBC]).await;
338
339 test(codec_proxy, codec, provider_requests).await
340 }
341
342 #[fixture(codec_setup_connected)]
343 #[fuchsia::test]
344 async fn publishes_on_connect(
345 codec_client: audio::CodecProxy,
346 codec: CodecControl,
347 _provider_requests: ProviderRequestStream,
348 ) {
349 let _properties = codec_client.get_properties().await.unwrap();
350 let audio::CodecGetDaiFormatsResult::Ok(formats) =
351 codec_client.get_dai_formats().await.unwrap()
352 else {
353 panic!("Expected formats from get_dai_formats");
354 };
355
356 assert_eq!(formats.len(), 1);
357 assert_eq!(formats[0].frame_rates[0], 16000);
359 drop(codec);
360 }
361
362 #[fixture(codec_setup_connected)]
363 #[fuchsia::test]
364 async fn removed_on_disconnect(
365 codec_client: audio::CodecProxy,
366 mut codec: CodecControl,
367 _provider_requests: ProviderRequestStream,
368 ) {
369 codec.disconnect(PeerId(1));
370 let _ = codec_client.on_closed().await;
371 }
372
373 #[fixture(codec_setup_connected)]
374 #[fuchsia::test]
375 async fn start_request_lifetime_test(
376 codec_client: audio::CodecProxy,
377 codec: CodecControl,
378 provider_requests: ProviderRequestStream,
379 ) {
380 start_request_lifetime(codec_client, codec, CodecId::MSBC, provider_requests).await
381 }
382
383 async fn start_request_lifetime(
384 codec_client: audio::CodecProxy,
385 mut codec: CodecControl,
386 good_codec: CodecId,
387 _provider_requests: ProviderRequestStream,
388 ) {
389 let mut event_stream = codec.take_events();
390 let (connection, _stream) = connection_for_codec(PeerId(1), good_codec, false);
392 let start_result = codec.start(PeerId(1), connection, good_codec);
393 let Err(Error::UnsupportedParameters { .. }) = start_result else {
394 panic!("Expected error from start before request");
395 };
396
397 let mut start_fut = codec_client.start();
399 let (waker, wake_count) = futures_test::task::new_count_waker();
400 let Poll::Pending = start_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
401 panic!("Expected start to be pending");
402 };
403
404 let Some(ControlEvent::RequestStart { id }) = event_stream.next().await else {
405 panic!("Expected start request from event stream");
406 };
407 assert_eq!(id, PeerId(1));
408
409 let bad_codec: CodecId = 100u8.into();
410
411 let (connection, _stream) = connection_for_codec(PeerId(1), bad_codec, false);
413 let start_result = codec.start(PeerId(1), connection, bad_codec);
414 let Err(Error::UnsupportedParameters { .. }) = start_result else {
415 panic!("Expected error from start before request");
416 };
417 assert_eq!(wake_count.get(), 0);
418
419 let (connection, _stream) = connection_for_codec(PeerId(1), good_codec, false);
421 codec.start(PeerId(1), connection, good_codec).expect("should start ok");
422
423 let Poll::Ready(_) = start_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
424 panic!("Expected to get response back from start");
425 };
426
427 let (connection, _stream) = connection_for_codec(PeerId(1), good_codec, false);
429 let start_result = codec.start(PeerId(1), connection, good_codec);
430 let Err(Error::AlreadyStarted) = start_result else {
431 panic!("Expected error from start while started");
432 };
433 }
434
435 #[fixture(codec_setup_connected)]
436 #[fuchsia::test]
437 async fn stop_request_lifetime(
438 codec_client: audio::CodecProxy,
439 mut codec: CodecControl,
440 _provider_requests: ProviderRequestStream,
441 ) {
442 let mut event_stream = codec.take_events();
443 let Err(Error::NotStarted) = codec.stop(PeerId(1)) else {
445 panic!("Expected to not be able tp start when stopped");
446 };
447
448 let start_fut = codec_client.start();
450
451 let Some(ControlEvent::RequestStart { .. }) = event_stream.next().await else {
452 panic!("Expected start request from event stream");
453 };
454
455 let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
457 codec.start(PeerId(1), connection, CodecId::MSBC).expect("should start ok");
458 let _ = start_fut.await.expect("start to succeed");
459
460 let Err(Error::UnsupportedParameters { .. }) = codec.stop(PeerId(1)) else {
462 panic!("expected to not be able to stop without a request");
463 };
464
465 let mut stop_fut = codec_client.stop();
467 let (waker, _wake_count) = futures_test::task::new_count_waker();
468 let Poll::Pending = stop_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
469 panic!("Expected stop to be pending");
470 };
471
472 let Some(ControlEvent::RequestStop { id }) = event_stream.next().await else {
473 panic!("Expected stop request from event stream");
474 };
475 assert_eq!(id, PeerId(1));
476
477 let _ = codec.stop(PeerId(2)).expect_err("shouldn't be able to stop a different peer");
479
480 let stop_request_time = fasync::MonotonicInstant::now();
482 codec.stop(PeerId(1)).expect("should be able to stop");
483
484 let Poll::Ready(_) = stop_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
485 panic!("Expected to get response back from start");
486 };
487 let Err(Error::NotStarted) = codec.stop(PeerId(1)) else {
490 panic!("Expected to not be able tp start when stopped");
491 };
492
493 let _start_fut = codec_client.start();
495
496 assert!(event_stream.next().now_or_never().is_none());
497
498 let Some(ControlEvent::RequestStart { .. }) = event_stream.next().await else {
499 panic!("Expected start request from event stream");
500 };
501
502 let fulfilled_time = fasync::MonotonicInstant::now();
503
504 let actual_delay = fulfilled_time - stop_request_time;
505 assert!(
506 actual_delay > DELAY_AFTER_STOP,
507 "expected delay before relaying request to start, only {}ms",
508 actual_delay.into_millis()
509 );
510 }
511
512 #[fixture(codec_setup_connected)]
513 #[fuchsia::test]
514 async fn disconnect_and_reconnect(
515 mut codec_client: audio::CodecProxy,
516 mut codec: CodecControl,
517 mut provider_requests: ProviderRequestStream,
518 ) {
519 codec.disconnect(PeerId(1));
520
521 let _ = codec_client.on_closed().await;
522
523 codec_client =
525 connect_peer_to_codec(&mut codec, &mut provider_requests, &[CodecId::CVSD]).await;
526 start_request_lifetime(codec_client, codec, CodecId::CVSD, provider_requests).await
528 }
529}