bt_hfp/audio/
codec.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::anyhow;
6use fuchsia_audio_device::codec;
7use fuchsia_audio_device::codec::CodecRequest;
8use fuchsia_bluetooth::types::{PeerId, peer_audio_stream_id};
9use fuchsia_sync::Mutex;
10use futures::stream::BoxStream;
11use futures::{SinkExt, StreamExt};
12use log::{info, warn};
13use std::sync::Arc;
14use {
15    fidl_fuchsia_audio_device as audio_device, fidl_fuchsia_hardware_audio as audio,
16    fuchsia_async as fasync,
17};
18
19use super::{Control, ControlEvent, Error, HF_INPUT_UUID};
20use crate::codec_id::CodecId;
21use crate::sco;
22
23struct CodecControlInner {
24    start_request:
25        Option<Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>>,
26    stop_request:
27        Option<Box<dyn FnOnce(std::result::Result<zx::MonotonicInstant, zx::Status>) + Send>>,
28    /// The earliest time that we will relay a start request after a stop.  Used to delay until the
29    /// SCO is disconnected when encountering a start immediately after a stop.
30    earliest_start_time: fasync::MonotonicInstant,
31}
32
33impl Default for CodecControlInner {
34    fn default() -> Self {
35        Self {
36            start_request: None,
37            stop_request: None,
38            earliest_start_time: fasync::MonotonicInstant::INFINITE_PAST,
39        }
40    }
41}
42
43impl CodecControlInner {
44    fn clear(&mut self) {
45        *self = Self::default();
46    }
47
48    fn set_start_delay(&mut self) {
49        self.earliest_start_time = fasync::MonotonicInstant::after(DELAY_AFTER_STOP);
50    }
51}
52
53const DELAY_AFTER_STOP: fasync::MonotonicDuration = fasync::MonotonicDuration::from_seconds(1);
54
55// Control that is connected to a Codec device registered with an
56/// AudioDeviceRegistry component.  The AudioDeviceRegistry can request that we
57/// start and/or stop the audio in-band which will send the request on to the
58/// HFP task to initiate Audio Connection Setup when no call is in progress.
59pub struct CodecControl {
60    provider: audio_device::ProviderProxy,
61    codec_task: Option<fasync::Task<()>>,
62    events_sender: futures::channel::mpsc::Sender<ControlEvent>,
63    events_receiver: Mutex<Option<futures::channel::mpsc::Receiver<ControlEvent>>>,
64    codec_id: Option<CodecId>,
65    connection: Option<sco::Connection>,
66    connected_peer: Option<PeerId>,
67    inner: Arc<Mutex<CodecControlInner>>,
68}
69
70impl Control for CodecControl {
71    fn start(
72        &mut self,
73        id: PeerId,
74        connection: sco::Connection,
75        codec: crate::codec_id::CodecId,
76    ) -> Result<(), Error> {
77        if self.connection.is_some() {
78            return Err(Error::AlreadyStarted);
79        }
80        if Some(codec) != self.codec_id {
81            return Err(Error::UnsupportedParameters {
82                source: anyhow!("CodecId must match connected CodecId"),
83            });
84        }
85        if Some(id) != self.connected_peer {
86            return Err(Error::UnsupportedParameters {
87                source: anyhow!("Can't start a non-connected peer"),
88            });
89        };
90        let Some(start_request) = self.inner.lock().start_request.take() else {
91            return Err(Error::UnsupportedParameters {
92                source: anyhow!("Can only start in response to request"),
93            });
94        };
95        self.connection = Some(connection);
96        start_request(Ok(fuchsia_async::MonotonicInstant::now().into()));
97        Ok(())
98    }
99
100    fn stop(&mut self, id: PeerId) -> Result<(), Error> {
101        if self.connection.is_none() {
102            return Err(Error::NotStarted);
103        }
104        if Some(id) != self.connected_peer {
105            return Err(Error::UnsupportedParameters {
106                source: anyhow!("Can't stop a non-connected peer"),
107            });
108        }
109        let Some(stop_request) = self.inner.lock().stop_request.take() else {
110            return Err(Error::UnsupportedParameters {
111                source: anyhow!("Codec can only stop in response to request"),
112            });
113        };
114        self.connection = None;
115        self.inner.lock().set_start_delay();
116        stop_request(Ok(fuchsia_async::MonotonicInstant::now().into()));
117        Ok(())
118    }
119
120    fn connect(&mut self, id: PeerId, supported_codecs: &[CodecId]) {
121        let supported_formats: audio::DaiSupportedFormats;
122        if supported_codecs.contains(&CodecId::MSBC) {
123            self.codec_id = Some(CodecId::MSBC);
124            supported_formats = CodecId::MSBC.try_into().unwrap();
125        } else {
126            self.codec_id = Some(CodecId::CVSD);
127            supported_formats = CodecId::CVSD.try_into().unwrap();
128        };
129        let audio_dev_id = peer_audio_stream_id(id, HF_INPUT_UUID);
130        let (codec, client) = codec::SoftCodec::create(
131            Some(&audio_dev_id),
132            "Fuchsia",
133            super::DEVICE_NAME,
134            codec::CodecDirection::Duplex,
135            supported_formats.clone(),
136            true,
137        );
138        self.codec_task = Some(fasync::Task::local(codec_task(
139            id,
140            self.provider.clone(),
141            codec,
142            supported_formats,
143            client,
144            self.events_sender.clone(),
145            self.inner.clone(),
146        )));
147        self.connected_peer = Some(id);
148    }
149
150    fn disconnect(&mut self, id: PeerId) {
151        info!("Codec peer {id} disconnected, cleaning up");
152        self.codec_task = None;
153        self.connected_peer = None;
154        self.connection = None;
155        self.codec_id = None;
156        self.inner.lock().clear();
157    }
158
159    fn take_events(&self) -> BoxStream<'static, ControlEvent> {
160        self.events_receiver.lock().take().unwrap().boxed()
161    }
162
163    fn failed_request(&self, request: ControlEvent, _error: Error) {
164        match request {
165            ControlEvent::RequestStart { id: _ } => {
166                let Some(start_request) = self.inner.lock().start_request.take() else {
167                    return;
168                };
169                start_request(Err(zx::Status::INTERNAL));
170            }
171            ControlEvent::RequestStop { id: _ } => {
172                let Some(stop_request) = self.inner.lock().start_request.take() else {
173                    return;
174                };
175                stop_request(Err(zx::Status::INTERNAL));
176            }
177            _ => unreachable!(),
178        }
179    }
180}
181
182async fn codec_task(
183    id: PeerId,
184    provider: audio_device::ProviderProxy,
185    mut codec: codec::SoftCodec,
186    supported_formats: audio::DaiSupportedFormats,
187    client: fidl::endpoints::ClientEnd<audio::CodecMarker>,
188    mut event_sender: futures::channel::mpsc::Sender<ControlEvent>,
189    inner: Arc<Mutex<CodecControlInner>>,
190) {
191    let result = provider
192        .add_device(audio_device::ProviderAddDeviceRequest {
193            device_name: Some(super::DEVICE_NAME.into()),
194            device_type: Some(audio_device::DeviceType::Codec),
195            driver_client: Some(audio_device::DriverClient::Codec(client)),
196            ..Default::default()
197        })
198        .await;
199    match result {
200        Err(e) => {
201            warn!("FIDL Error adding device: {e:?}");
202            return;
203        }
204        Ok(Err(e)) => {
205            warn!("Failed to add device: {e:?}");
206            return;
207        }
208        Ok(Ok(_)) => {}
209    };
210    info!("Added Codec device!");
211    while let Some(event) = codec.next().await {
212        let Ok(event) = event else {
213            let _ = event_sender
214                .send(ControlEvent::Stopped {
215                    id,
216                    error: Some(Error::audio_core(event.err().unwrap().into())),
217                })
218                .await;
219            return;
220        };
221        info!("Codec request: {event:?}");
222        let audio_event = match event {
223            CodecRequest::SetFormat { format, responder } => {
224                if supported_formats.number_of_channels.contains(&format.number_of_channels)
225                    && supported_formats.frame_formats.contains(&format.frame_format)
226                    && supported_formats.sample_formats.contains(&format.sample_format)
227                    && supported_formats.frame_rates.contains(&format.frame_rate)
228                    && supported_formats.bits_per_slot.contains(&format.bits_per_slot)
229                    && supported_formats.bits_per_sample.contains(&format.bits_per_sample)
230                {
231                    responder(Ok(()));
232                } else {
233                    responder(Err(zx::Status::NOT_SUPPORTED));
234                }
235                continue;
236            }
237            CodecRequest::Start { responder } => {
238                if inner.lock().start_request.is_some() {
239                    responder(Err(zx::Status::ALREADY_EXISTS));
240                    continue;
241                }
242                inner.lock().start_request = Some(responder);
243                let deadline = inner.lock().earliest_start_time.clone();
244                let now = fasync::MonotonicInstant::now();
245                if deadline > now {
246                    info!(
247                        "Delaying start request {}ms due to too-fast stop->start cycle",
248                        (deadline - now).into_millis()
249                    );
250                }
251                fasync::Timer::new(deadline).await;
252                ControlEvent::RequestStart { id }
253            }
254            CodecRequest::Stop { responder } => {
255                if inner.lock().stop_request.is_some() {
256                    responder(Err(zx::Status::ALREADY_EXISTS));
257                    continue;
258                }
259                inner.lock().stop_request = Some(responder);
260                ControlEvent::RequestStop { id }
261            }
262        };
263        let _ = event_sender.send(audio_event).await;
264    }
265    warn!("Codec device finished, dropping..!");
266}
267
268impl CodecControl {
269    pub fn new(provider: audio_device::ProviderProxy) -> Self {
270        let (events_sender, receiver) = futures::channel::mpsc::channel(1);
271        Self {
272            provider,
273            codec_task: None,
274            events_sender,
275            events_receiver: Mutex::new(Some(receiver)),
276            inner: Default::default(),
277            codec_id: None,
278            connection: None,
279            connected_peer: None,
280        }
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287
288    use fidl::endpoints::Proxy;
289    use fidl_fuchsia_audio_device::ProviderRequestStream;
290    use fixture::fixture;
291    use futures::FutureExt;
292    use futures::task::{Context, Poll};
293
294    use crate::sco::test_utils::connection_for_codec;
295
296    async fn connect_peer_to_codec(
297        codec: &mut CodecControl,
298        provider_requests: &mut ProviderRequestStream,
299        codecs: &[CodecId],
300    ) -> audio::CodecProxy {
301        codec.connect(PeerId(1), codecs);
302
303        let Some(Ok(audio_device::ProviderRequest::AddDevice {
304            payload:
305                audio_device::ProviderAddDeviceRequest {
306                    driver_client: Some(client),
307                    device_name: Some(_name),
308                    device_type: Some(device_type),
309                    ..
310                },
311            responder,
312        })) = provider_requests.next().await
313        else {
314            panic!("Expected a request from the connect");
315        };
316
317        assert_eq!(device_type, audio_device::DeviceType::Codec);
318
319        responder.send(Ok(&Default::default())).expect("response to succeed");
320
321        let audio_device::DriverClient::Codec(codec_client) = client else {
322            panic!("Should have provided a codec client");
323        };
324        codec_client.into_proxy()
325    }
326
327    async fn codec_setup_connected<F, Fut>(_test_name: &str, test: F)
328    where
329        F: FnOnce(audio::CodecProxy, CodecControl, ProviderRequestStream) -> Fut,
330        Fut: futures::Future<Output = ()>,
331    {
332        let (provider_proxy, mut provider_requests) =
333            fidl::endpoints::create_proxy_and_stream::<audio_device::ProviderMarker>();
334        let mut codec = CodecControl::new(provider_proxy);
335
336        let codec_proxy =
337            connect_peer_to_codec(&mut codec, &mut provider_requests, &[CodecId::MSBC]).await;
338
339        test(codec_proxy, codec, provider_requests).await
340    }
341
342    #[fixture(codec_setup_connected)]
343    #[fuchsia::test]
344    async fn publishes_on_connect(
345        codec_client: audio::CodecProxy,
346        codec: CodecControl,
347        _provider_requests: ProviderRequestStream,
348    ) {
349        let _properties = codec_client.get_properties().await.unwrap();
350        let audio::CodecGetDaiFormatsResult::Ok(formats) =
351            codec_client.get_dai_formats().await.unwrap()
352        else {
353            panic!("Expected formats from get_dai_formats");
354        };
355
356        assert_eq!(formats.len(), 1);
357        // MSBC has a frame-rate of 16khz
358        assert_eq!(formats[0].frame_rates[0], 16000);
359        drop(codec);
360    }
361
362    #[fixture(codec_setup_connected)]
363    #[fuchsia::test]
364    async fn removed_on_disconnect(
365        codec_client: audio::CodecProxy,
366        mut codec: CodecControl,
367        _provider_requests: ProviderRequestStream,
368    ) {
369        codec.disconnect(PeerId(1));
370        let _ = codec_client.on_closed().await;
371    }
372
373    #[fixture(codec_setup_connected)]
374    #[fuchsia::test]
375    async fn start_request_lifetime_test(
376        codec_client: audio::CodecProxy,
377        codec: CodecControl,
378        provider_requests: ProviderRequestStream,
379    ) {
380        start_request_lifetime(codec_client, codec, CodecId::MSBC, provider_requests).await
381    }
382
383    async fn start_request_lifetime(
384        codec_client: audio::CodecProxy,
385        mut codec: CodecControl,
386        good_codec: CodecId,
387        _provider_requests: ProviderRequestStream,
388    ) {
389        let mut event_stream = codec.take_events();
390        // start without a request should fail
391        let (connection, _stream) = connection_for_codec(PeerId(1), good_codec, false);
392        let start_result = codec.start(PeerId(1), connection, good_codec);
393        let Err(Error::UnsupportedParameters { .. }) = start_result else {
394            panic!("Expected error from start before request");
395        };
396
397        // request comes in
398        let mut start_fut = codec_client.start();
399        let (waker, wake_count) = futures_test::task::new_count_waker();
400        let Poll::Pending = start_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
401            panic!("Expected start to be pending");
402        };
403
404        let Some(ControlEvent::RequestStart { id }) = event_stream.next().await else {
405            panic!("Expected start request from event stream");
406        };
407        assert_eq!(id, PeerId(1));
408
409        let bad_codec: CodecId = 100u8.into();
410
411        // starting with a wrong codec fails, and doesn't complete the future.
412        let (connection, _stream) = connection_for_codec(PeerId(1), bad_codec, false);
413        let start_result = codec.start(PeerId(1), connection, bad_codec);
414        let Err(Error::UnsupportedParameters { .. }) = start_result else {
415            panic!("Expected error from start before request");
416        };
417        assert_eq!(wake_count.get(), 0);
418
419        // starting after works and then completes the request.
420        let (connection, _stream) = connection_for_codec(PeerId(1), good_codec, false);
421        codec.start(PeerId(1), connection, good_codec).expect("should start ok");
422
423        let Poll::Ready(_) = start_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
424            panic!("Expected to get response back from start");
425        };
426
427        // Starting after started is no good either.
428        let (connection, _stream) = connection_for_codec(PeerId(1), good_codec, false);
429        let start_result = codec.start(PeerId(1), connection, good_codec);
430        let Err(Error::AlreadyStarted) = start_result else {
431            panic!("Expected error from start while started");
432        };
433    }
434
435    #[fixture(codec_setup_connected)]
436    #[fuchsia::test]
437    async fn stop_request_lifetime(
438        codec_client: audio::CodecProxy,
439        mut codec: CodecControl,
440        _provider_requests: ProviderRequestStream,
441    ) {
442        let mut event_stream = codec.take_events();
443        // can't stop before we are started
444        let Err(Error::NotStarted) = codec.stop(PeerId(1)) else {
445            panic!("Expected to not be able tp start when stopped");
446        };
447
448        // request comes in
449        let start_fut = codec_client.start();
450
451        let Some(ControlEvent::RequestStart { .. }) = event_stream.next().await else {
452            panic!("Expected start request from event stream");
453        };
454
455        // starting after works and then completes the request.
456        let (connection, _stream) = connection_for_codec(PeerId(1), CodecId::MSBC, false);
457        codec.start(PeerId(1), connection, CodecId::MSBC).expect("should start ok");
458        let _ = start_fut.await.expect("start to succeed");
459
460        // can't stop without a request
461        let Err(Error::UnsupportedParameters { .. }) = codec.stop(PeerId(1)) else {
462            panic!("expected to not be able to stop without a request");
463        };
464
465        // request to stop comes in
466        let mut stop_fut = codec_client.stop();
467        let (waker, _wake_count) = futures_test::task::new_count_waker();
468        let Poll::Pending = stop_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
469            panic!("Expected stop to be pending");
470        };
471
472        let Some(ControlEvent::RequestStop { id }) = event_stream.next().await else {
473            panic!("Expected stop request from event stream");
474        };
475        assert_eq!(id, PeerId(1));
476
477        // Can't stop a peer that's not started.
478        let _ = codec.stop(PeerId(2)).expect_err("shouldn't be able to stop a different peer");
479
480        // can stop the one requested
481        let stop_request_time = fasync::MonotonicInstant::now();
482        codec.stop(PeerId(1)).expect("should be able to stop");
483
484        let Poll::Ready(_) = stop_fut.poll_unpin(&mut Context::from_waker(&waker)) else {
485            panic!("Expected to get response back from start");
486        };
487        // back to being able to not stop it again
488
489        let Err(Error::NotStarted) = codec.stop(PeerId(1)) else {
490            panic!("Expected to not be able tp start when stopped");
491        };
492
493        // A start coming in immediately doesn't get relayed right away.
494        let _start_fut = codec_client.start();
495
496        assert!(event_stream.next().now_or_never().is_none());
497
498        let Some(ControlEvent::RequestStart { .. }) = event_stream.next().await else {
499            panic!("Expected start request from event stream");
500        };
501
502        let fulfilled_time = fasync::MonotonicInstant::now();
503
504        let actual_delay = fulfilled_time - stop_request_time;
505        assert!(
506            actual_delay > DELAY_AFTER_STOP,
507            "expected delay before relaying request to start, only {}ms",
508            actual_delay.into_millis()
509        );
510    }
511
512    #[fixture(codec_setup_connected)]
513    #[fuchsia::test]
514    async fn disconnect_and_reconnect(
515        mut codec_client: audio::CodecProxy,
516        mut codec: CodecControl,
517        mut provider_requests: ProviderRequestStream,
518    ) {
519        codec.disconnect(PeerId(1));
520
521        let _ = codec_client.on_closed().await;
522
523        // Reconnect, but with a different codec.  It should still succeed.
524        codec_client =
525            connect_peer_to_codec(&mut codec, &mut provider_requests, &[CodecId::CVSD]).await;
526        // Should be able to do the whole start request lifetime again now
527        start_request_lifetime(codec_client, codec, CodecId::CVSD, provider_requests).await
528    }
529}