bt_a2dp/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Error;
6use bt_avdtp::{
7    self as avdtp, ErrorCode, ServiceCapability, ServiceCategory, StreamEndpoint, StreamEndpointId,
8};
9use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
10use fuchsia_bluetooth::types::PeerId;
11use fuchsia_inspect::{self as inspect, Property};
12use fuchsia_inspect_derive::{AttachError, Inspect};
13use futures::future::BoxFuture;
14use futures::{FutureExt, TryFutureExt};
15use log::{info, warn};
16use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19use std::time::Duration;
20
21use crate::codec::{CodecNegotiation, MediaCodecConfig};
22use crate::media_task::{MediaTask, MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
23
24/// Manages a local StreamEndpoint and its associated media task, starting and stopping the
25/// related media task in sync with the endpoint's configured or streaming state.
26/// Note that this does not coordinate state with peer, which is done by bt_a2dp::Peer.
27pub struct Stream {
28    endpoint: StreamEndpoint,
29    /// The builder for media tasks associated with this endpoint.
30    media_task_builder: Arc<Box<dyn MediaTaskBuilder>>,
31    /// The MediaTaskRunner for this endpoint, if it is configured.
32    media_task_runner: Option<Box<dyn MediaTaskRunner>>,
33    /// The MediaTask, if it is running.
34    media_task: Option<Box<dyn MediaTask>>,
35    /// The peer associated with this endpoint, if it is configured.
36    /// Used during reconfiguration for MediaTask recreation.
37    peer_id: Option<PeerId>,
38    /// Inspect Node for this stream
39    inspect: fuchsia_inspect::Node,
40}
41
42impl fmt::Debug for Stream {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        f.debug_struct("Stream")
45            .field("endpoint", &self.endpoint)
46            .field("peer_id", &self.peer_id)
47            .field("has media_task", &self.media_task.is_some())
48            .finish()
49    }
50}
51
52impl Inspect for &mut Stream {
53    // Set up the StreamEndpoint to update the state
54    // The MediaTask node will be created when the media task is started.
55    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
56        self.inspect = parent.create_child(name.as_ref());
57
58        let endpoint_state_prop = self.inspect.create_string("endpoint_state", "");
59        let callback =
60            move |stream: &StreamEndpoint| endpoint_state_prop.set(&format!("{:?}", stream));
61        callback(self.endpoint());
62        self.endpoint_mut().set_update_callback(Some(Box::new(callback)));
63        Ok(())
64    }
65}
66
67impl Stream {
68    pub fn build(endpoint: StreamEndpoint, media_task_builder: Box<dyn MediaTaskBuilder>) -> Self {
69        Self {
70            endpoint,
71            media_task_builder: Arc::new(media_task_builder),
72            media_task_runner: None,
73            media_task: None,
74            peer_id: None,
75            inspect: Default::default(),
76        }
77    }
78
79    fn as_new(&self) -> Self {
80        Self {
81            endpoint: self.endpoint.as_new(),
82            media_task_builder: self.media_task_builder.clone(),
83            media_task_runner: None,
84            media_task: None,
85            peer_id: None,
86            inspect: Default::default(),
87        }
88    }
89
90    pub fn endpoint(&self) -> &StreamEndpoint {
91        &self.endpoint
92    }
93
94    pub fn endpoint_mut(&mut self) -> &mut StreamEndpoint {
95        &mut self.endpoint
96    }
97
98    fn media_codec_config(&self) -> Option<MediaCodecConfig> {
99        find_codec_capability(self.endpoint.capabilities())
100            .and_then(|x| MediaCodecConfig::try_from(x).ok())
101    }
102
103    /// Returns true if the config given is a supported configuration of this stream
104    /// Used when the stream is being configured to a specific configuration
105    fn config_supported(&self, config: &MediaCodecConfig) -> bool {
106        let Some(supported) = self.media_codec_config() else {
107            return false;
108        };
109        supported.supports(&config)
110    }
111
112    /// Returns true if this stream and the given config are compatible - a valid configuration
113    /// of this stream can be found within the capabilities of the given config.
114    fn config_compatible(&self, config: &MediaCodecConfig) -> bool {
115        let Some(supported) = self.media_codec_config() else {
116            return false;
117        };
118        MediaCodecConfig::negotiate(&supported, config).is_some()
119    }
120
121    fn build_media_task(
122        &self,
123        peer_id: &PeerId,
124        config: &MediaCodecConfig,
125    ) -> Option<Box<dyn MediaTaskRunner>> {
126        match self.media_task_builder.configure(peer_id, &config) {
127            Err(e) => {
128                warn!("Failed to build media task: {e:?}");
129                None
130            }
131            Ok(mut media_task_runner) => {
132                if let Err(e) = media_task_runner.iattach(&self.inspect, "media_task") {
133                    info!("Media Task inspect: {e}");
134                }
135                Some(media_task_runner)
136            }
137        }
138    }
139
140    fn supported_config_from_capability(
141        &self,
142        requested_cap: &ServiceCapability,
143    ) -> Option<MediaCodecConfig> {
144        MediaCodecConfig::try_from(requested_cap).ok().filter(|c| self.config_supported(c))
145    }
146
147    pub fn configure(
148        &mut self,
149        peer_id: &PeerId,
150        remote_id: &StreamEndpointId,
151        capabilities: Vec<ServiceCapability>,
152    ) -> Result<(), (ServiceCategory, ErrorCode)> {
153        if self.media_task.is_some() {
154            return Err((ServiceCategory::None, ErrorCode::BadState));
155        }
156        let unsupported = ErrorCode::UnsupportedConfiguration;
157        let codec_cap =
158            find_codec_capability(&capabilities).ok_or((ServiceCategory::None, unsupported))?;
159        let media_unsupported = (ServiceCategory::MediaCodec, unsupported);
160        let config = self.supported_config_from_capability(codec_cap).ok_or(media_unsupported)?;
161        self.media_task_runner =
162            Some(self.build_media_task(peer_id, &config).ok_or(media_unsupported)?);
163        self.peer_id = Some(peer_id.clone());
164        self.endpoint.configure(remote_id, capabilities)
165    }
166
167    pub fn set_delay(&mut self, delay: Duration) -> Result<(), ErrorCode> {
168        let Some(runner) = self.media_task_runner.as_mut() else {
169            return Err(ErrorCode::BadState);
170        };
171        match runner.set_delay(delay) {
172            Err(MediaTaskError::NotSupported) => Err(ErrorCode::NotSupportedCommand),
173            Err(_) => Err(ErrorCode::BadState),
174            Ok(()) => Ok(()),
175        }
176    }
177
178    pub fn reconfigure(
179        &mut self,
180        capabilities: Vec<ServiceCapability>,
181    ) -> Result<(), (ServiceCategory, ErrorCode)> {
182        let bad_state = (ServiceCategory::None, ErrorCode::BadState);
183        let _peer_id = self.peer_id.as_ref().ok_or(bad_state)?;
184        if let Some(requested_codec_cap) = find_codec_capability(&capabilities) {
185            let unsupported = (ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration);
186            let requested =
187                self.supported_config_from_capability(requested_codec_cap).ok_or(unsupported)?;
188            self.media_task_runner
189                .as_mut()
190                .ok_or(bad_state)?
191                .reconfigure(&requested)
192                .or(Err(unsupported))?;
193        }
194        self.endpoint.reconfigure(capabilities)
195    }
196
197    fn media_runner_ref(&mut self) -> Result<&mut Box<dyn MediaTaskRunner>, ErrorCode> {
198        self.media_task_runner.as_mut().ok_or(ErrorCode::BadState)
199    }
200
201    /// Attempt to start the endpoint.
202    /// If the endpoint is successfully started, the media task is started and a future that
203    /// will finish when the media task finishes is returned.
204    pub fn start(&mut self) -> Result<BoxFuture<'static, Result<(), Error>>, ErrorCode> {
205        if self.media_task_runner.is_none() {
206            return Err(ErrorCode::BadState);
207        };
208        let transport = self.endpoint.take_transport().ok_or(ErrorCode::BadState)?;
209        let _ = self.endpoint.start()?;
210        let mut task = match self.media_runner_ref()?.start(transport, None) {
211            Ok(media_task) => media_task,
212            Err(_e) => {
213                let _ = self.endpoint.suspend()?;
214                return Err(ErrorCode::BadState);
215            }
216        };
217        let finished = task.finished();
218        self.media_task = Some(task);
219        Ok(finished.err_into().boxed())
220    }
221
222    /// Suspends the media processor and endpoint.
223    pub fn suspend(&mut self) -> Result<(), ErrorCode> {
224        self.endpoint.suspend()?;
225        let _ = self.media_task.take().ok_or(ErrorCode::BadState)?.stop();
226        Ok(())
227    }
228
229    fn stop_media_task(&mut self) {
230        if let Some(mut task) = self.media_task.take() {
231            // Ignoring stop errors, best effort.
232            let _ = task.stop();
233        }
234        self.media_task_runner = None;
235        self.peer_id = None;
236    }
237
238    /// Releases the endpoint and stops the processing of audio.
239    pub fn release(
240        &mut self,
241        responder: avdtp::SimpleResponder,
242        peer: &avdtp::Peer,
243    ) -> avdtp::Result<()> {
244        self.stop_media_task();
245        self.endpoint.release(responder, peer)
246    }
247
248    pub fn abort(&mut self) {
249        self.stop_media_task();
250        self.endpoint.abort()
251    }
252
253    pub async fn initiate_abort(&mut self, peer: &avdtp::Peer) {
254        self.stop_media_task();
255        self.endpoint.initiate_abort(peer).await
256    }
257}
258
259fn find_codec_capability(capabilities: &[ServiceCapability]) -> Option<&ServiceCapability> {
260    capabilities.iter().find(|cap| cap.category() == ServiceCategory::MediaCodec)
261}
262
263/// Iterator which generates SEIDs.  Used by StreamsBuilder to get valid SEIDs.
264#[derive(Clone, Debug)]
265struct SeidRangeFrom {
266    from: u8,
267}
268
269impl Default for SeidRangeFrom {
270    fn default() -> Self {
271        Self { from: 1 }
272    }
273}
274
275impl Iterator for SeidRangeFrom {
276    type Item = u8;
277
278    fn next(&mut self) -> Option<Self::Item> {
279        let res = self.from;
280        if self.from == 0x3E {
281            self.from = 0x01;
282        } else {
283            self.from += 1;
284        }
285        Some(res)
286    }
287}
288
289/// Builds a set of streams, based on the capabilities of a set of MediaTaskBuilders that are
290/// supported and configured by the system.
291pub struct StreamsBuilder {
292    builders: Vec<Box<dyn MediaTaskBuilder>>,
293    seid_range: SeidRangeFrom,
294    node: inspect::Node,
295}
296
297impl Default for StreamsBuilder {
298    fn default() -> Self {
299        Self {
300            builders: Default::default(),
301            seid_range: SeidRangeFrom { from: Self::START_SEID },
302            node: Default::default(),
303        }
304    }
305}
306
307impl Clone for StreamsBuilder {
308    fn clone(&self) -> Self {
309        Self {
310            builders: self.builders.clone(),
311            node: Default::default(),
312            seid_range: self.seid_range.clone(),
313        }
314    }
315}
316
317impl StreamsBuilder {
318    // Randomly chosen by fair dice roll
319    // TODO(https://fxbug.dev/337321738): Do better for randomizing this maybe
320    const START_SEID: u8 = 8;
321
322    /// Add a builder to the set of builders used to generate streams.
323    pub fn add_builder(&mut self, builder: impl MediaTaskBuilder + 'static) {
324        self.builders.push(Box::new(builder));
325        self.node.record_uint("builders", self.builders.len() as u64);
326    }
327
328    pub async fn peer_streams(
329        &self,
330        peer_id: &PeerId,
331        offload: Option<AudioOffloadExtProxy>,
332    ) -> Result<Streams, MediaTaskError> {
333        let mut streams = Streams::default();
334        let mut seid_range = self.seid_range.clone();
335        for builder in &self.builders {
336            let endpoint_type = builder.direction();
337            let supported_res = builder.supported_configs(peer_id, offload.clone()).await;
338            let Ok(supported) = supported_res else {
339                info!(e:? = supported_res.err().unwrap(); "Failed to get supported configs from builder, skipping");
340                continue;
341            };
342            let codec_caps = supported.iter().map(ServiceCapability::from);
343            for codec_cap in codec_caps {
344                let capabilities = match endpoint_type {
345                    avdtp::EndpointType::Source => vec![
346                        ServiceCapability::MediaTransport,
347                        ServiceCapability::DelayReporting,
348                        codec_cap,
349                    ],
350                    avdtp::EndpointType::Sink => {
351                        vec![ServiceCapability::MediaTransport, codec_cap]
352                    }
353                };
354                let endpoint = avdtp::StreamEndpoint::new(
355                    seid_range.next().unwrap(),
356                    avdtp::MediaType::Audio,
357                    endpoint_type,
358                    capabilities,
359                )?;
360                streams.insert(Stream::build(endpoint, builder.clone()));
361            }
362        }
363        Ok(streams)
364    }
365
366    pub async fn negotiation(
367        &self,
368        peer_id: &PeerId,
369        offload: Option<AudioOffloadExtProxy>,
370        preferred_direction: avdtp::EndpointType,
371    ) -> Result<CodecNegotiation, Error> {
372        let mut caps_available = Vec::new();
373        for builder in &self.builders {
374            caps_available.extend(
375                builder
376                    .supported_configs(peer_id, offload.clone())
377                    .await?
378                    .iter()
379                    .map(ServiceCapability::from),
380            );
381        }
382        Ok(CodecNegotiation::build(caps_available, preferred_direction)?)
383    }
384}
385
386impl Inspect for &mut StreamsBuilder {
387    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
388        self.node = parent.create_child(name.as_ref());
389        self.node.record_uint("builders", self.builders.len() as u64);
390        Ok(())
391    }
392}
393
394/// A set of streams, indexed by their local endpoint ID.
395#[derive(Default)]
396pub struct Streams {
397    streams: HashMap<StreamEndpointId, Stream>,
398    inspect_node: fuchsia_inspect::Node,
399}
400
401impl fmt::Debug for Streams {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        f.debug_struct("Streams").field("streams", &self.streams).finish()
404    }
405}
406
407impl Streams {
408    /// Makes a copy of this set of streams, but with all streams copied with their states set to
409    /// idle.
410    pub fn as_new(&self) -> Self {
411        let streams =
412            self.streams.iter().map(|(id, stream)| (id.clone(), stream.as_new())).collect();
413        Self { streams, ..Default::default() }
414    }
415
416    /// Returns true if there are no streams in the set.
417    pub fn is_empty(&self) -> bool {
418        self.streams.is_empty()
419    }
420
421    /// Inserts a stream, indexing it by the local endpoint id.
422    /// It replaces any other stream with the same endpoint id.
423    pub fn insert(&mut self, stream: Stream) {
424        let local_id = stream.endpoint().local_id().clone();
425        if self.streams.insert(local_id.clone(), stream).is_some() {
426            warn!("Replacing stream with local id {local_id}");
427        }
428    }
429
430    /// Retrieves a reference to the Stream referenced by `id`, if the stream exists,
431    pub fn get(&self, id: &StreamEndpointId) -> Option<&Stream> {
432        self.streams.get(id)
433    }
434
435    /// Retrieves a mutable reference to the Stream referenced by `id`, if the stream exists,
436    pub fn get_mut(&mut self, id: &StreamEndpointId) -> Option<&mut Stream> {
437        self.streams.get_mut(id)
438    }
439
440    /// Returns a vector of information on all the contained streams.
441    pub fn information(&self) -> Vec<avdtp::StreamInformation> {
442        self.streams.values().map(|x| x.endpoint().information()).collect()
443    }
444
445    /// Returns streams that are in the open (established but not streaming) state
446    pub fn open(&self) -> impl Iterator<Item = &Stream> {
447        self.streams.values().filter(|s| s.endpoint().state() == avdtp::StreamState::Open)
448    }
449
450    /// Returns streams that are streaming.
451    pub fn streaming(&self) -> impl Iterator<Item = &Stream> {
452        self.streams.values().filter(|s| s.endpoint().state() == avdtp::StreamState::Streaming)
453    }
454
455    /// Finds streams in the set which are compatible with `codec_config`.
456    pub fn compatible(&self, codec_config: MediaCodecConfig) -> impl Iterator<Item = &Stream> {
457        self.streams.values().filter(move |s| s.config_compatible(&codec_config))
458    }
459}
460
461impl Inspect for &mut Streams {
462    // Attach self to `parent`
463    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
464        self.inspect_node = parent.create_child(name.as_ref());
465        for stream in self.streams.values_mut() {
466            stream.iattach(&self.inspect_node, inspect::unique_name("stream_"))?;
467        }
468        Ok(())
469    }
470}
471
472#[cfg(test)]
473pub(crate) mod tests {
474    use super::*;
475
476    use fuchsia_async as fasync;
477    use fuchsia_bluetooth::types::Channel;
478    use std::pin::pin;
479    use std::task::Poll;
480
481    use crate::media_task::tests::TestMediaTaskBuilder;
482    use crate::media_types::*;
483
484    pub(crate) fn sbc_mediacodec_capability() -> avdtp::ServiceCapability {
485        let sbc_codec_info = SbcCodecInfo::new(
486            SbcSamplingFrequency::FREQ48000HZ,
487            SbcChannelMode::MONO | SbcChannelMode::JOINT_STEREO,
488            SbcBlockCount::MANDATORY_SRC,
489            SbcSubBands::MANDATORY_SRC,
490            SbcAllocation::MANDATORY_SRC,
491            SbcCodecInfo::BITPOOL_MIN,
492            SbcCodecInfo::BITPOOL_MAX,
493        )
494        .expect("SBC codec info");
495
496        ServiceCapability::MediaCodec {
497            media_type: avdtp::MediaType::Audio,
498            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
499            codec_extra: sbc_codec_info.to_bytes().to_vec(),
500        }
501    }
502
503    pub(crate) fn aac_mediacodec_capability(bitrate: u32) -> avdtp::ServiceCapability {
504        let codec_info = AacCodecInfo::new(
505            AacObjectType::MANDATORY_SRC,
506            AacSamplingFrequency::FREQ48000HZ,
507            AacChannels::TWO,
508            true,
509            bitrate,
510        )
511        .expect("should work");
512        ServiceCapability::MediaCodec {
513            media_type: avdtp::MediaType::Audio,
514            codec_type: avdtp::MediaCodecType::AUDIO_AAC,
515            codec_extra: codec_info.to_bytes().to_vec(),
516        }
517    }
518
519    pub(crate) fn make_sbc_endpoint(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
520        StreamEndpoint::new(
521            seid,
522            avdtp::MediaType::Audio,
523            direction,
524            vec![avdtp::ServiceCapability::MediaTransport, sbc_mediacodec_capability()],
525        )
526        .expect("endpoint creation should succeed")
527    }
528
529    const LOW_BITRATE: u32 = 320_000;
530    const HIGH_BITRATE: u32 = 393_216;
531
532    pub(crate) fn make_aac_endpoint(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
533        StreamEndpoint::new(
534            seid,
535            avdtp::MediaType::Audio,
536            direction,
537            vec![avdtp::ServiceCapability::MediaTransport, aac_mediacodec_capability(LOW_BITRATE)],
538        )
539        .expect("endpoint creation should succeed")
540    }
541
542    fn make_stream(seid: u8, codec_type: avdtp::MediaCodecType) -> Stream {
543        let endpoint = match codec_type {
544            avdtp::MediaCodecType::AUDIO_SBC => {
545                make_sbc_endpoint(seid, avdtp::EndpointType::Source)
546            }
547            avdtp::MediaCodecType::AUDIO_AAC => {
548                make_aac_endpoint(seid, avdtp::EndpointType::Source)
549            }
550            _ => panic!("Unsupported codec_type"),
551        };
552        Stream::build(endpoint, TestMediaTaskBuilder::new().builder())
553    }
554
555    #[fuchsia::test]
556    fn streams_basic_functionality() {
557        let mut streams = Streams::default();
558
559        streams.insert(make_stream(1, avdtp::MediaCodecType::AUDIO_SBC));
560        streams.insert(make_stream(6, avdtp::MediaCodecType::AUDIO_AAC));
561
562        let first_id = 1_u8.try_into().expect("good id");
563        let missing_id = 5_u8.try_into().expect("good id");
564
565        assert!(streams.get(&first_id).is_some());
566        assert!(streams.get(&missing_id).is_none());
567
568        assert!(streams.get_mut(&first_id).is_some());
569        assert!(streams.get_mut(&missing_id).is_none());
570
571        let expected_info = vec![
572            make_sbc_endpoint(1, avdtp::EndpointType::Source).information(),
573            make_aac_endpoint(6, avdtp::EndpointType::Source).information(),
574        ];
575
576        let infos = streams.information();
577
578        assert_eq!(expected_info.len(), infos.len());
579
580        if infos[0].id() == &first_id {
581            assert_eq!(expected_info[0], infos[0]);
582            assert_eq!(expected_info[1], infos[1]);
583        } else {
584            assert_eq!(expected_info[0], infos[1]);
585            assert_eq!(expected_info[1], infos[0]);
586        }
587    }
588
589    #[fuchsia::test]
590    fn streams_filters_compatible_codecs() {
591        let mut streams = Streams::default();
592        streams.insert(make_stream(1, avdtp::MediaCodecType::AUDIO_SBC));
593        streams.insert(make_stream(6, avdtp::MediaCodecType::AUDIO_AAC));
594
595        // Even if the other bitrate is higher, we can negotiate to the lower bitrate.
596        let config_high_bitrate_aac =
597            MediaCodecConfig::try_from(&aac_mediacodec_capability(HIGH_BITRATE)).unwrap();
598
599        let compatible: Vec<_> = streams.compatible(config_high_bitrate_aac).collect();
600        assert_eq!(compatible.len(), 1);
601        let codec_capability = compatible[0]
602            .endpoint()
603            .capabilities()
604            .into_iter()
605            .find(|x| x.category() == avdtp::ServiceCategory::MediaCodec)
606            .expect("should have a codec");
607        assert_eq!(
608            MediaCodecConfig::try_from(codec_capability).unwrap().codec_type(),
609            &avdtp::MediaCodecType::AUDIO_AAC
610        );
611    }
612
613    #[fuchsia::test]
614    fn rejects_unsupported_configurations() {
615        // Needed to make fasync::Tasks.
616        let _exec = fasync::TestExecutor::new();
617        let mut builder = TestMediaTaskBuilder::new_reconfigurable();
618        let mut stream =
619            Stream::build(make_sbc_endpoint(1, avdtp::EndpointType::Source), builder.builder());
620
621        // the default test stream only supports 48000hz
622        let unsupported_sbc_codec_info = SbcCodecInfo::new(
623            SbcSamplingFrequency::FREQ44100HZ,
624            SbcChannelMode::JOINT_STEREO,
625            SbcBlockCount::SIXTEEN,
626            SbcSubBands::EIGHT,
627            SbcAllocation::LOUDNESS,
628            53,
629            53,
630        )
631        .expect("SBC codec info");
632
633        let unsupported_caps = vec![ServiceCapability::MediaCodec {
634            media_type: avdtp::MediaType::Audio,
635            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
636            codec_extra: unsupported_sbc_codec_info.to_bytes().to_vec(),
637        }];
638
639        let peer_id = PeerId(1);
640        let stream_id = 1.try_into().expect("StreamEndpointId");
641        let res = stream.configure(&peer_id, &stream_id, unsupported_caps.clone());
642        assert!(res.is_err());
643        assert_eq!(
644            res.err(),
645            Some((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
646        );
647
648        assert_eq!(
649            stream.reconfigure(unsupported_caps.clone()),
650            Err((ServiceCategory::None, ErrorCode::BadState))
651        );
652
653        let supported_sbc_codec_info = SbcCodecInfo::new(
654            SbcSamplingFrequency::FREQ48000HZ,
655            SbcChannelMode::JOINT_STEREO,
656            SbcBlockCount::SIXTEEN,
657            SbcSubBands::EIGHT,
658            SbcAllocation::LOUDNESS,
659            53,
660            53,
661        )
662        .expect("SBC codec info");
663
664        let sbc_codec_cap = ServiceCapability::MediaCodec {
665            media_type: avdtp::MediaType::Audio,
666            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
667            codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
668        };
669
670        let supported_caps = vec![ServiceCapability::MediaTransport, sbc_codec_cap.clone()];
671
672        let res = stream.configure(&peer_id, &stream_id, supported_caps.clone());
673        assert!(res.is_ok());
674
675        // need to be in the open state for reconfigure
676        assert!(stream.endpoint_mut().establish().is_ok());
677        let (_remote, transport) = Channel::create();
678        match stream.endpoint_mut().receive_channel(transport) {
679            Ok(false) => {}
680            Ok(true) => panic!("Only should be expecting one channel"),
681            Err(e) => panic!("Expected channel to be accepted, got {:?}", e),
682        };
683
684        assert_eq!(
685            stream.reconfigure(unsupported_caps.clone()),
686            Err((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
687        );
688
689        let new_codec_caps = vec![ServiceCapability::MediaCodec {
690            media_type: avdtp::MediaType::Audio,
691            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
692            codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
693        }];
694
695        assert!(stream.reconfigure(new_codec_caps.clone()).is_ok());
696
697        // Should be able to start after reconfigure, and we used the right configuration.
698        let _ = stream.start().expect("stream should start ok");
699        let task = builder.expect_task();
700        assert_eq!(task.codec_config, MediaCodecConfig::try_from(&new_codec_caps[0]).unwrap());
701    }
702
703    #[fuchsia::test]
704    fn reconfigure_runner_fails() {
705        // Needed to make fasync::Tasks.
706        let _exec = fasync::TestExecutor::new();
707        let mut builder = TestMediaTaskBuilder::new();
708        let mut stream =
709            Stream::build(make_sbc_endpoint(1, avdtp::EndpointType::Source), builder.builder());
710
711        let supported_sbc_codec_info = SbcCodecInfo::new(
712            SbcSamplingFrequency::FREQ48000HZ,
713            SbcChannelMode::JOINT_STEREO,
714            SbcBlockCount::SIXTEEN,
715            SbcSubBands::EIGHT,
716            SbcAllocation::LOUDNESS,
717            53,
718            53,
719        )
720        .expect("SBC codec info");
721
722        let orig_codec_cap = ServiceCapability::MediaCodec {
723            media_type: avdtp::MediaType::Audio,
724            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
725            codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
726        };
727
728        let supported_caps = vec![ServiceCapability::MediaTransport, orig_codec_cap.clone()];
729
730        let res = stream.configure(&PeerId(1), &(1.try_into().unwrap()), supported_caps.clone());
731        assert!(res.is_ok());
732
733        // need to be in the open state for reconfigure
734        assert!(stream.endpoint_mut().establish().is_ok());
735        let (_remote, transport) = Channel::create();
736        match stream.endpoint_mut().receive_channel(transport) {
737            Ok(false) => {}
738            Ok(true) => panic!("Only should be expecting one channel"),
739            Err(e) => panic!("Expected channel to be accepted, got {:?}", e),
740        };
741
742        // Should be able to start after configure, and we used the right configuration.
743        let _ = stream.start().expect("stream should start ok");
744        let task = builder.expect_task();
745        assert_eq!(task.codec_config, MediaCodecConfig::try_from(&orig_codec_cap).unwrap());
746        stream.suspend().expect("stream should suspend ok");
747
748        // Try to reconfigure with a supported configuration, but the builder doesn't reconfigure.
749        let mono_sbc_codec_info = SbcCodecInfo::new(
750            SbcSamplingFrequency::FREQ48000HZ,
751            SbcChannelMode::MONO,
752            SbcBlockCount::SIXTEEN,
753            SbcSubBands::EIGHT,
754            SbcAllocation::LOUDNESS,
755            53,
756            53,
757        )
758        .expect("SBC codec info");
759
760        let new_codec_caps = vec![ServiceCapability::MediaCodec {
761            media_type: avdtp::MediaType::Audio,
762            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
763            codec_extra: mono_sbc_codec_info.to_bytes().to_vec(),
764        }];
765
766        // Media Builder fails to reconfigure (as it's failing all reconfigures)
767        assert_eq!(
768            stream.reconfigure(new_codec_caps.clone()),
769            Err((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
770        );
771
772        // Should be able to start after reconfigure, but it will use the old configuration.
773        let _ = stream.start().expect("stream should start ok");
774        let task = builder.expect_task();
775        assert_eq!(task.codec_config, MediaCodecConfig::try_from(&orig_codec_cap).unwrap());
776        stream.suspend().expect("stream should suspend ok")
777    }
778
779    #[fuchsia::test]
780    fn suspend_stops_media_task() {
781        let mut exec = fasync::TestExecutor::new();
782
783        let mut task_builder = TestMediaTaskBuilder::new();
784        let mut stream = Stream::build(
785            make_sbc_endpoint(1, avdtp::EndpointType::Source),
786            task_builder.builder(),
787        );
788        let next_task_fut = task_builder.next_task();
789        let remote_id = 1_u8.try_into().expect("good id");
790
791        let sbc_codec_cap = sbc_mediacodec_capability();
792        let expected_codec_config =
793            MediaCodecConfig::try_from(&sbc_codec_cap).expect("codec config");
794
795        assert!(stream.configure(&PeerId(1), &remote_id, vec![]).is_err());
796        assert!(stream.configure(&PeerId(1), &remote_id, vec![sbc_codec_cap]).is_ok());
797
798        stream.endpoint_mut().establish().expect("establishment should start okay");
799        let (_remote, transport) = Channel::create();
800        let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
801
802        assert!(stream.start().is_ok());
803
804        // Task should be created here.
805        let task = {
806            let mut next_task_fut = pin!(next_task_fut);
807            match exec.run_until_stalled(&mut next_task_fut) {
808                Poll::Ready(Some(task)) => task,
809                x => panic!("Expected next task to be sent after start, got {:?}", x),
810            }
811        };
812
813        assert_eq!(task.peer_id, PeerId(1));
814        assert_eq!(task.codec_config, expected_codec_config);
815
816        assert!(task.is_started());
817        assert!(stream.suspend().is_ok());
818        assert!(!task.is_started());
819        assert!(stream.start().is_ok());
820
821        let next_task_fut = task_builder.next_task();
822        // Task should be created here.
823        let task = {
824            let mut next_task_fut = pin!(next_task_fut);
825            match exec.run_until_stalled(&mut next_task_fut) {
826                Poll::Ready(Some(task)) => task,
827                x => panic!("Expected next task to be sent after start, got {:?}", x),
828            }
829        };
830
831        assert!(task.is_started());
832    }
833
834    #[fuchsia::test]
835    fn media_task_ending_ends_future() {
836        let mut exec = fasync::TestExecutor::new();
837
838        let mut task_builder = TestMediaTaskBuilder::new();
839        let mut stream = Stream::build(
840            make_sbc_endpoint(1, avdtp::EndpointType::Source),
841            task_builder.builder(),
842        );
843        let next_task_fut = task_builder.next_task();
844        let peer_id = PeerId(1);
845        let remote_id = 1_u8.try_into().expect("good id");
846
847        let sbc_codec_cap = sbc_mediacodec_capability();
848        let expected_codec_config =
849            MediaCodecConfig::try_from(&sbc_codec_cap).expect("codec config");
850
851        assert!(stream.configure(&peer_id, &remote_id, vec![]).is_err());
852        assert!(stream.configure(&peer_id, &remote_id, vec![sbc_codec_cap]).is_ok());
853
854        stream.endpoint_mut().establish().expect("establishment should start okay");
855        let (_remote, transport) = Channel::create();
856        let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
857
858        let stream_finish_fut = stream.start().expect("start to succeed with a future");
859        let mut stream_finish_fut = pin!(stream_finish_fut);
860
861        let task = {
862            let mut next_task_fut = pin!(next_task_fut);
863            match exec.run_until_stalled(&mut next_task_fut) {
864                Poll::Ready(Some(task)) => task,
865                x => panic!("Expected next task to be sent after start, got {:?}", x),
866            }
867        };
868
869        assert_eq!(task.peer_id, PeerId(1));
870        assert_eq!(task.codec_config, expected_codec_config);
871
872        // Does not need to be polled to be started.
873        assert!(task.is_started());
874
875        assert!(exec.run_until_stalled(&mut stream_finish_fut).is_pending());
876
877        task.end_prematurely(Some(Ok(())));
878        assert!(!task.is_started());
879
880        // The future should be finished, since the task ended.
881        match exec.run_until_stalled(&mut stream_finish_fut) {
882            Poll::Ready(Ok(())) => {}
883            x => panic!("Expected to get ready Ok from finish future, but got {x:?}"),
884        };
885
886        // Should still be able to suspend the stream.
887        assert!(stream.suspend().is_ok());
888
889        // And be able to restart it.
890        let result_fut = stream.start().expect("start to succeed with a future");
891
892        let next_task_fut = task_builder.next_task();
893        let mut next_task_fut = pin!(next_task_fut);
894        let task = match exec.run_until_stalled(&mut next_task_fut) {
895            Poll::Ready(Some(task)) => task,
896            x => panic!("Expected next task to be sent after restart, got {x:?}"),
897        };
898
899        assert!(task.is_started());
900
901        // Dropping the result future shouldn't stop the media task.
902        drop(result_fut);
903
904        assert!(task.is_started());
905    }
906
907    #[fuchsia::test]
908    fn set_delay_correct_results_transmits_to_task() {
909        let mut _exec = fasync::TestExecutor::new();
910
911        let mut task_builder = TestMediaTaskBuilder::new_delayable();
912        let mut stream = Stream::build(
913            make_sbc_endpoint(1, avdtp::EndpointType::Source),
914            task_builder.builder(),
915        );
916        let peer_id = PeerId(1);
917        let remote_id = 1_u8.try_into().expect("good id");
918
919        let sbc_codec_cap = sbc_mediacodec_capability();
920
921        let code = stream
922            .set_delay(std::time::Duration::ZERO)
923            .expect_err("before configure, can't set a delay");
924        assert_eq!(ErrorCode::BadState, code);
925
926        assert!(stream.configure(&peer_id, &remote_id, vec![]).is_err());
927        assert!(stream.configure(&peer_id, &remote_id, vec![sbc_codec_cap]).is_ok());
928
929        let delay_set = std::time::Duration::from_nanos(0xfeed);
930
931        stream.set_delay(delay_set.clone()).expect("after configure, delay is fine");
932
933        stream.endpoint_mut().establish().expect("establishment should start okay");
934        let (_remote, transport) = Channel::create();
935        let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
936        let _stream_finish_fut = stream.start().expect("start to succeed with a future");
937
938        let media_task = task_builder.expect_task();
939        assert_eq!(delay_set, media_task.delay);
940    }
941}