1use anyhow::anyhow;
6use fuchsia_audio_device::codec;
7use fuchsia_audio_device::codec::CodecRequest;
8use fuchsia_bluetooth::types::{peer_audio_stream_id, PeerId};
9use fuchsia_sync::Mutex;
10use futures::stream::BoxStream;
11use futures::{SinkExt, StreamExt};
12use log::{info, warn};
13use std::sync::Arc;
14use {
15 fidl_fuchsia_audio_device as audio_device, fidl_fuchsia_hardware_audio as audio,
16 fuchsia_async as fasync,
17};
18
19use super::{Control, ControlEvent, Error, HF_INPUT_UUID};
20use crate::codec_id::CodecId;
21use crate::sco;
22
23#[derive(Default)]
24struct CodecControlInner {
25 start_request:
26 Option<Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>>,
27 stop_request:
28 Option<Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>>,
29}
30
31pub struct CodecControl {
36 provider: audio_device::ProviderProxy,
37 codec_task: Option<fasync::Task<()>>,
38 events_sender: futures::channel::mpsc::Sender<ControlEvent>,
39 events_receiver: Mutex<Option<futures::channel::mpsc::Receiver<ControlEvent>>>,
40 codec_id: Option<CodecId>,
41 connection: Option<sco::Connection>,
42 connected_peer: Option<PeerId>,
43 inner: Arc<Mutex<CodecControlInner>>,
44}
45
46impl Control for CodecControl {
47 fn start(
48 &mut self,
49 id: PeerId,
50 connection: sco::Connection,
51 codec: crate::codec_id::CodecId,
52 ) -> Result<(), Error> {
53 if self.connection.is_some() {
54 return Err(Error::AlreadyStarted);
55 }
56 if Some(codec) != self.codec_id {
57 return Err(Error::UnsupportedParameters {
58 source: anyhow!("CodecId must match connected CodecId"),
59 });
60 }
61 if Some(id) != self.connected_peer {
62 return Err(Error::UnsupportedParameters {
63 source: anyhow!("Can't start a non-connected peer"),
64 });
65 };
66 let Some(start_request) = self.inner.lock().start_request.take() else {
67 return Err(Error::UnsupportedParameters {
68 source: anyhow!("Can only start in response to request"),
69 });
70 };
71 self.connection = Some(connection);
72 start_request(Ok(fuchsia_async::MonotonicInstant::now().into()));
73 Ok(())
74 }
75
76 fn stop(&mut self, id: PeerId) -> Result<(), Error> {
77 if self.connection.is_none() {
78 return Err(Error::NotStarted);
79 }
80 if Some(id) != self.connected_peer {
81 return Err(Error::UnsupportedParameters {
82 source: anyhow!("Can't stop a non-connected peer"),
83 });
84 }
85 let Some(stop_request) = self.inner.lock().stop_request.take() else {
86 return Err(Error::UnsupportedParameters {
87 source: anyhow!("Can only stop in response to request"),
88 });
89 };
90 self.connection = None;
91 stop_request(Ok(fuchsia_async::MonotonicInstant::now().into()));
92 Ok(())
93 }
94
95 fn connect(&mut self, id: PeerId, supported_codecs: &[CodecId]) {
96 let supported_formats: audio::DaiSupportedFormats;
97 if supported_codecs.contains(&CodecId::MSBC) {
98 self.codec_id = Some(CodecId::MSBC);
99 supported_formats = CodecId::MSBC.try_into().unwrap();
100 } else {
101 self.codec_id = Some(CodecId::CVSD);
102 supported_formats = CodecId::CVSD.try_into().unwrap();
103 };
104 let audio_dev_id = peer_audio_stream_id(id, HF_INPUT_UUID);
105 let (codec, client) = codec::SoftCodec::create(
106 Some(&audio_dev_id),
107 "Fuchsia",
108 super::DEVICE_NAME,
109 codec::CodecDirection::Duplex,
110 supported_formats.clone(),
111 true,
112 );
113 self.codec_task = Some(fasync::Task::local(codec_task(
114 id,
115 self.provider.clone(),
116 codec,
117 supported_formats,
118 client,
119 self.events_sender.clone(),
120 self.inner.clone(),
121 )));
122 self.connected_peer = Some(id);
123 }
124
125 fn disconnect(&mut self, _id: PeerId) {
126 self.codec_task = None;
127 self.connected_peer = None;
128 }
129
130 fn take_events(&self) -> BoxStream<'static, ControlEvent> {
131 self.events_receiver.lock().take().unwrap().boxed()
132 }
133
134 fn failed_request(&self, request: ControlEvent, _error: Error) {
135 match request {
136 ControlEvent::RequestStart { id: _ } => {
137 let Some(start_request) = self.inner.lock().start_request.take() else {
138 return;
139 };
140 start_request(Err(zx::Status::INTERNAL));
141 }
142 ControlEvent::RequestStop { id: _ } => {
143 let Some(stop_request) = self.inner.lock().start_request.take() else {
144 return;
145 };
146 stop_request(Err(zx::Status::INTERNAL));
147 }
148 _ => unreachable!(),
149 }
150 }
151}
152
153async fn codec_task(
154 id: PeerId,
155 provider: audio_device::ProviderProxy,
156 mut codec: codec::SoftCodec,
157 supported_formats: audio::DaiSupportedFormats,
158 client: fidl::endpoints::ClientEnd<audio::CodecMarker>,
159 mut event_sender: futures::channel::mpsc::Sender<ControlEvent>,
160 inner: Arc<Mutex<CodecControlInner>>,
161) {
162 let result = provider
163 .add_device(audio_device::ProviderAddDeviceRequest {
164 device_name: Some(super::DEVICE_NAME.into()),
165 device_type: Some(audio_device::DeviceType::Codec),
166 driver_client: Some(audio_device::DriverClient::Codec(client)),
167 ..Default::default()
168 })
169 .await;
170 match result {
171 Err(e) => {
172 warn!("FIDL Error adding device: {e:?}");
173 return;
174 }
175 Ok(Err(e)) => {
176 warn!("Failed to add device: {e:?}");
177 return;
178 }
179 Ok(Ok(_)) => {}
180 };
181 info!("Added Codec device!");
182 while let Some(event) = codec.next().await {
183 let Ok(event) = event else {
184 let _ = event_sender
185 .send(ControlEvent::Stopped {
186 id,
187 error: Some(Error::audio_core(event.err().unwrap().into())),
188 })
189 .await;
190 return;
191 };
192 info!("Codec request: {event:?}");
193 let audio_event = match event {
194 CodecRequest::SetFormat { format, responder } => {
195 if supported_formats.number_of_channels.contains(&format.number_of_channels)
196 && supported_formats.frame_formats.contains(&format.frame_format)
197 && supported_formats.sample_formats.contains(&format.sample_format)
198 && supported_formats.frame_rates.contains(&format.frame_rate)
199 && supported_formats.bits_per_slot.contains(&format.bits_per_slot)
200 && supported_formats.bits_per_sample.contains(&format.bits_per_sample)
201 {
202 responder(Ok(()));
203 } else {
204 responder(Err(zx::Status::NOT_SUPPORTED));
205 }
206 continue;
207 }
208 CodecRequest::Start { responder } => {
209 if inner.lock().start_request.is_some() {
210 responder(Err(zx::Status::ALREADY_EXISTS));
211 continue;
212 }
213 inner.lock().start_request = Some(responder);
214 ControlEvent::RequestStart { id }
215 }
216 CodecRequest::Stop { responder } => {
217 if inner.lock().stop_request.is_some() {
218 responder(Err(zx::Status::ALREADY_EXISTS));
219 continue;
220 }
221 inner.lock().stop_request = Some(responder);
222 ControlEvent::RequestStop { id }
223 }
224 };
225 let _ = event_sender.send(audio_event).await;
226 }
227 warn!("Codec device finished, dropping..!");
228}
229
230impl CodecControl {
231 pub fn new(provider: audio_device::ProviderProxy) -> Self {
232 let (events_sender, receiver) = futures::channel::mpsc::channel(1);
233 Self {
234 provider,
235 codec_task: None,
236 events_sender,
237 events_receiver: Mutex::new(Some(receiver)),
238 inner: Default::default(),
239 codec_id: None,
240 connection: None,
241 connected_peer: None,
242 }
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 use fidl::endpoints::Proxy;
251 use fixture::fixture;
252 use futures::task::{Context, Poll};
253 use futures::FutureExt;
254
255 use crate::sco::test_utils::connection_for_codec;
256
257 async fn codec_setup_connected<F, Fut>(_test_name: &str, test: F)
258 where
259 F: FnOnce(audio::CodecProxy, CodecControl) -> Fut,
260 Fut: futures::Future<Output = ()>,
261 {
262 let (provider_proxy, mut provider_requests) =
263 fidl::endpoints::create_proxy_and_stream::<audio_device::ProviderMarker>();
264 let mut codec = CodecControl::new(provider_proxy);
265
266 codec.connect(PeerId(1), &[CodecId::MSBC]);
267
268 let Some(Ok(audio_device::ProviderRequest::AddDevice {
269 payload:
270 audio_device::ProviderAddDeviceRequest {
271 driver_client: Some(client),
272 device_name: Some(_name),
273 device_type: Some(device_type),
274 ..
275 },
276 responder,
277 })) = provider_requests.next().await
278 else {
279 panic!("Expected a request from the connect");
280 };
281
282 assert_eq!(device_type, audio_device::DeviceType::Codec);
283
284 responder.send(Ok(&Default::default())).expect("response to succeed");
285
286 let audio_device::DriverClient::Codec(codec_client) = client else {
287 panic!("Should have provided a codec client");
288 };
289
290 let codec_proxy = codec_client.into_proxy();
291
292 test(codec_proxy, codec).await
293 }
294
295 #[fixture(codec_setup_connected)]
296 #[fuchsia::test]
297 async fn publishes_on_connect(codec_client: audio::CodecProxy, codec: CodecControl) {
298 let _properties = codec_client.get_properties().await.unwrap();
299 let audio::CodecGetDaiFormatsResult::Ok(formats) =
300 codec_client.get_dai_formats().await.unwrap()
301 else {
302 panic!("Expected formats from get_dai_formats");
303 };
304
305 assert_eq!(formats.len(), 1);
306 assert_eq!(formats[0].frame_rates[0], 16000);
308 drop(codec);
309 }
310
311 #[fixture(codec_setup_connected)]
312 #[fuchsia::test]
313 async fn removed_on_disconnect(codec_client: audio::CodecProxy, mut codec: CodecControl) {
314 codec.disconnect(PeerId(1));
315 let _ = codec_client.on_closed().await;
316 }
317
318 #[fixture(codec_setup_connected)]
319 #[fuchsia::test]
320 async fn start_request_lifetime(codec_client: audio::CodecProxy, mut codec: CodecControl) {
321 let mut event_stream = codec.take_events();
322 let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
324 let start_result = codec.start(PeerId(1), connection, CodecId::MSBC);
325 let Err(Error::UnsupportedParameters { .. }) = start_result else {
326 panic!("Expected error from start before request");
327 };
328
329 let mut start_fut = codec_client.start();
331 let (waker, wake_count) = futures_test::task::new_count_waker();
332 let Poll::Pending = start_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
333 panic!("Expected start to be pending");
334 };
335
336 let Some(ControlEvent::RequestStart { id }) = event_stream.next().await else {
337 panic!("Expected start request from event stream");
338 };
339 assert_eq!(id, PeerId(1));
340
341 let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::CVSD, false);
343 let start_result = codec.start(PeerId(1), connection, CodecId::CVSD);
344 let Err(Error::UnsupportedParameters { .. }) = start_result else {
345 panic!("Expected error from start before request");
346 };
347 assert_eq!(wake_count.get(), 0);
348
349 let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
351 codec.start(PeerId(1), connection, CodecId::MSBC).expect("should start ok");
352
353 let Poll::Ready(_) = start_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
354 panic!("Expected to get response back from start");
355 };
356
357 let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
359 let start_result = codec.start(PeerId(1), connection, CodecId::MSBC);
360 let Err(Error::AlreadyStarted) = start_result else {
361 panic!("Expected error from start while started");
362 };
363 }
364
365 #[fixture(codec_setup_connected)]
366 #[fuchsia::test]
367 async fn stop_request_lifetime(codec_client: audio::CodecProxy, mut codec: CodecControl) {
368 let mut event_stream = codec.take_events();
369 let Err(Error::NotStarted) = codec.stop(PeerId(1)) else {
371 panic!("Expected to not be able tp start when stopped");
372 };
373
374 let start_fut = codec_client.start();
376
377 let Some(ControlEvent::RequestStart { .. }) = event_stream.next().await else {
378 panic!("Expected start request from event stream");
379 };
380
381 let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
383 codec.start(PeerId(1), connection, CodecId::MSBC).expect("should start ok");
384 let _ = start_fut.await.expect("start to succeed");
385
386 let Err(Error::UnsupportedParameters { .. }) = codec.stop(PeerId(1)) else {
388 panic!("expected to not be able to stop without a request");
389 };
390
391 let mut stop_fut = codec_client.stop();
393 let (waker, _wake_count) = futures_test::task::new_count_waker();
394 let Poll::Pending = stop_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
395 panic!("Expected stop to be pending");
396 };
397
398 let Some(ControlEvent::RequestStop { id }) = event_stream.next().await else {
399 panic!("Expected stop request from event stream");
400 };
401 assert_eq!(id, PeerId(1));
402
403 let _ = codec.stop(PeerId(2)).expect_err("shouldn't be able to stop a different peer");
405
406 codec.stop(PeerId(1)).expect("should be able to stop");
408
409 let Poll::Ready(_) = stop_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
410 panic!("Expected to get response back from start");
411 };
412 let Err(Error::NotStarted) = codec.stop(PeerId(1)) else {
415 panic!("Expected to not be able tp start when stopped");
416 };
417 }
418}