bt_a2dp/
stream.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Error;
6use bt_avdtp::{
7    self as avdtp, ErrorCode, ServiceCapability, ServiceCategory, StreamEndpoint, StreamEndpointId,
8};
9use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
10use fuchsia_bluetooth::types::PeerId;
11use fuchsia_inspect::{self as inspect, Property};
12use fuchsia_inspect_derive::{AttachError, Inspect};
13use futures::future::BoxFuture;
14use futures::{FutureExt, TryFutureExt};
15use log::{info, warn};
16use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19use std::time::Duration;
20
21use crate::codec::{CodecNegotiation, MediaCodecConfig};
22use crate::media_task::{MediaTask, MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
23
24/// Manages a local StreamEndpoint and its associated media task, starting and stopping the
25/// related media task in sync with the endpoint's configured or streaming state.
26/// Note that this does not coordinate state with peer, which is done by bt_a2dp::Peer.
27pub struct Stream {
28    endpoint: StreamEndpoint,
29    /// The builder for media tasks associated with this endpoint.
30    media_task_builder: Arc<Box<dyn MediaTaskBuilder>>,
31    /// The MediaTaskRunner for this endpoint, if it is configured.
32    media_task_runner: Option<Box<dyn MediaTaskRunner>>,
33    /// The MediaTask, if it is running.
34    media_task: Option<Box<dyn MediaTask>>,
35    /// The peer associated with this endpoint, if it is configured.
36    /// Used during reconfiguration for MediaTask recreation.
37    peer_id: Option<PeerId>,
38    /// Inspect Node for this stream
39    inspect: fuchsia_inspect::Node,
40}
41
42impl fmt::Debug for Stream {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        f.debug_struct("Stream")
45            .field("endpoint", &self.endpoint)
46            .field("peer_id", &self.peer_id)
47            .field("has media_task", &self.media_task.is_some())
48            .finish()
49    }
50}
51
52impl Inspect for &mut Stream {
53    // Set up the StreamEndpoint to update the state
54    // The MediaTask node will be created when the media task is started.
55    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
56        self.inspect = parent.create_child(name.as_ref());
57
58        let endpoint_state_prop = self.inspect.create_string("endpoint_state", "");
59        let callback =
60            move |stream: &StreamEndpoint| endpoint_state_prop.set(&format!("{:?}", stream));
61        callback(self.endpoint());
62        self.endpoint_mut().set_update_callback(Some(Box::new(callback)));
63        Ok(())
64    }
65}
66
67impl Stream {
68    pub fn build(endpoint: StreamEndpoint, media_task_builder: Box<dyn MediaTaskBuilder>) -> Self {
69        Self {
70            endpoint,
71            media_task_builder: Arc::new(media_task_builder),
72            media_task_runner: None,
73            media_task: None,
74            peer_id: None,
75            inspect: Default::default(),
76        }
77    }
78
79    fn as_new(&self) -> Self {
80        Self {
81            endpoint: self.endpoint.as_new(),
82            media_task_builder: self.media_task_builder.clone(),
83            media_task_runner: None,
84            media_task: None,
85            peer_id: None,
86            inspect: Default::default(),
87        }
88    }
89
90    pub fn endpoint(&self) -> &StreamEndpoint {
91        &self.endpoint
92    }
93
94    pub fn endpoint_mut(&mut self) -> &mut StreamEndpoint {
95        &mut self.endpoint
96    }
97
98    fn media_codec_config(&self) -> Option<MediaCodecConfig> {
99        find_codec_capability(self.endpoint.capabilities())
100            .and_then(|x| MediaCodecConfig::try_from(x).ok())
101    }
102
103    /// Returns true if the config given is a supported configuration of this stream
104    /// Used when the stream is being configured to a specific configuration
105    fn config_supported(&self, config: &MediaCodecConfig) -> bool {
106        let Some(supported) = self.media_codec_config() else {
107            return false;
108        };
109        supported.supports(&config)
110    }
111
112    /// Returns true if this stream and the given config are compatible - a valid configuration
113    /// of this stream can be found within the capabilities of the given config.
114    fn config_compatible(&self, config: &MediaCodecConfig) -> bool {
115        let Some(supported) = self.media_codec_config() else {
116            return false;
117        };
118        MediaCodecConfig::negotiate(&supported, config).is_some()
119    }
120
121    fn build_media_task(
122        &self,
123        peer_id: &PeerId,
124        config: &MediaCodecConfig,
125    ) -> Option<Box<dyn MediaTaskRunner>> {
126        match self.media_task_builder.configure(peer_id, &config) {
127            Err(e) => {
128                warn!("Failed to build media task: {e:?}");
129                None
130            }
131            Ok(mut media_task_runner) => {
132                if let Err(e) = media_task_runner.iattach(&self.inspect, "media_task") {
133                    info!("Media Task inspect: {e}");
134                }
135                Some(media_task_runner)
136            }
137        }
138    }
139
140    fn supported_config_from_capability(
141        &self,
142        requested_cap: &ServiceCapability,
143    ) -> Option<MediaCodecConfig> {
144        MediaCodecConfig::try_from(requested_cap).ok().filter(|c| self.config_supported(c))
145    }
146
147    pub fn configure(
148        &mut self,
149        peer_id: &PeerId,
150        remote_id: &StreamEndpointId,
151        capabilities: Vec<ServiceCapability>,
152    ) -> Result<(), (ServiceCategory, ErrorCode)> {
153        if self.media_task.is_some() {
154            return Err((ServiceCategory::None, ErrorCode::BadState));
155        }
156        let unsupported = ErrorCode::UnsupportedConfiguration;
157        let codec_cap =
158            find_codec_capability(&capabilities).ok_or((ServiceCategory::None, unsupported))?;
159        let media_unsupported = (ServiceCategory::MediaCodec, unsupported);
160        let config = self.supported_config_from_capability(codec_cap).ok_or(media_unsupported)?;
161        self.media_task_runner =
162            Some(self.build_media_task(peer_id, &config).ok_or(media_unsupported)?);
163        self.peer_id = Some(peer_id.clone());
164        self.endpoint.configure(remote_id, capabilities)
165    }
166
167    pub fn set_delay(&mut self, delay: Duration) -> Result<(), ErrorCode> {
168        let Some(runner) = self.media_task_runner.as_mut() else {
169            return Err(ErrorCode::BadState);
170        };
171        match runner.set_delay(delay) {
172            Err(MediaTaskError::NotSupported) => Err(ErrorCode::NotSupportedCommand),
173            Err(_) => Err(ErrorCode::BadState),
174            Ok(()) => Ok(()),
175        }
176    }
177
178    pub fn reconfigure(
179        &mut self,
180        capabilities: Vec<ServiceCapability>,
181    ) -> Result<(), (ServiceCategory, ErrorCode)> {
182        let bad_state = (ServiceCategory::None, ErrorCode::BadState);
183        let _peer_id = self.peer_id.as_ref().ok_or(bad_state)?;
184        if let Some(requested_codec_cap) = find_codec_capability(&capabilities) {
185            let unsupported = (ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration);
186            let requested =
187                self.supported_config_from_capability(requested_codec_cap).ok_or(unsupported)?;
188            self.media_task_runner
189                .as_mut()
190                .ok_or(bad_state)?
191                .reconfigure(&requested)
192                .or(Err(unsupported))?;
193        }
194        self.endpoint.reconfigure(capabilities)
195    }
196
197    fn media_runner_ref(&mut self) -> Result<&mut Box<dyn MediaTaskRunner>, ErrorCode> {
198        self.media_task_runner.as_mut().ok_or(ErrorCode::BadState)
199    }
200
201    /// Attempt to start the endpoint.
202    /// If the endpoint is successfully started, the media task is started and a future that
203    /// will finish when the media task finishes is returned.
204    pub fn start(&mut self) -> Result<BoxFuture<'static, Result<(), Error>>, ErrorCode> {
205        if self.media_task_runner.is_none() {
206            return Err(ErrorCode::BadState);
207        };
208        let transport = self.endpoint.take_transport().ok_or(ErrorCode::BadState)?;
209        let _ = self.endpoint.start()?;
210        let offload = self.endpoint.audio_offload();
211        let mut task = match self.media_runner_ref()?.start(transport, offload) {
212            Ok(media_task) => media_task,
213            Err(_e) => {
214                let _ = self.endpoint.suspend()?;
215                return Err(ErrorCode::BadState);
216            }
217        };
218        let finished = task.finished();
219        self.media_task = Some(task);
220        Ok(finished.err_into().boxed())
221    }
222
223    /// Suspends the media processor and endpoint.
224    pub fn suspend(&mut self) -> Result<(), ErrorCode> {
225        self.endpoint.suspend()?;
226        let _ = self.media_task.take().ok_or(ErrorCode::BadState)?.stop();
227        Ok(())
228    }
229
230    fn stop_media_task(&mut self) {
231        if let Some(mut task) = self.media_task.take() {
232            // Ignoring stop errors, best effort.
233            let _ = task.stop();
234        }
235        self.media_task_runner = None;
236        self.peer_id = None;
237    }
238
239    /// Releases the endpoint and stops the processing of audio.
240    pub fn release(
241        &mut self,
242        responder: avdtp::SimpleResponder,
243        peer: &avdtp::Peer,
244    ) -> avdtp::Result<()> {
245        self.stop_media_task();
246        self.endpoint.release(responder, peer)
247    }
248
249    pub fn abort(&mut self) {
250        self.stop_media_task();
251        self.endpoint.abort()
252    }
253
254    pub async fn initiate_abort(&mut self, peer: &avdtp::Peer) {
255        self.stop_media_task();
256        self.endpoint.initiate_abort(peer).await
257    }
258}
259
260fn find_codec_capability(capabilities: &[ServiceCapability]) -> Option<&ServiceCapability> {
261    capabilities.iter().find(|cap| cap.category() == ServiceCategory::MediaCodec)
262}
263
264/// Iterator which generates SEIDs.  Used by StreamsBuilder to get valid SEIDs.
265#[derive(Clone, Debug)]
266struct SeidRangeFrom {
267    from: u8,
268}
269
270impl Default for SeidRangeFrom {
271    fn default() -> Self {
272        Self { from: 1 }
273    }
274}
275
276impl Iterator for SeidRangeFrom {
277    type Item = u8;
278
279    fn next(&mut self) -> Option<Self::Item> {
280        let res = self.from;
281        if self.from == 0x3E {
282            self.from = 0x01;
283        } else {
284            self.from += 1;
285        }
286        Some(res)
287    }
288}
289
290/// Builds a set of streams, based on the capabilities of a set of MediaTaskBuilders that are
291/// supported and configured by the system.
292pub struct StreamsBuilder {
293    builders: Vec<Box<dyn MediaTaskBuilder>>,
294    seid_range: SeidRangeFrom,
295    node: inspect::Node,
296}
297
298impl Default for StreamsBuilder {
299    fn default() -> Self {
300        Self {
301            builders: Default::default(),
302            seid_range: SeidRangeFrom { from: Self::START_SEID },
303            node: Default::default(),
304        }
305    }
306}
307
308impl Clone for StreamsBuilder {
309    fn clone(&self) -> Self {
310        Self {
311            builders: self.builders.clone(),
312            node: Default::default(),
313            seid_range: self.seid_range.clone(),
314        }
315    }
316}
317
318impl StreamsBuilder {
319    // Randomly chosen by fair dice roll
320    // TODO(https://fxbug.dev/337321738): Do better for randomizing this maybe
321    const START_SEID: u8 = 8;
322
323    /// Add a builder to the set of builders used to generate streams.
324    pub fn add_builder(&mut self, builder: impl MediaTaskBuilder + 'static) {
325        self.builders.push(Box::new(builder));
326        self.node.record_uint("builders", self.builders.len() as u64);
327    }
328
329    pub async fn peer_streams(
330        &self,
331        peer_id: &PeerId,
332        offload: Option<AudioOffloadExtProxy>,
333    ) -> Result<Streams, MediaTaskError> {
334        let mut streams = Streams::default();
335        let mut seid_range = self.seid_range.clone();
336        for builder in &self.builders {
337            let endpoint_type = builder.direction();
338            let supported_res = builder.supported_configs(peer_id, offload.clone()).await;
339            let Ok(supported) = supported_res else {
340                info!(e:? = supported_res.err().unwrap(); "Failed to get supported configs from builder, skipping");
341                continue;
342            };
343            let codec_caps = supported.iter().map(ServiceCapability::from);
344            for codec_cap in codec_caps {
345                let capabilities = match endpoint_type {
346                    avdtp::EndpointType::Source => vec![
347                        ServiceCapability::MediaTransport,
348                        ServiceCapability::DelayReporting,
349                        codec_cap,
350                    ],
351                    avdtp::EndpointType::Sink => {
352                        vec![ServiceCapability::MediaTransport, codec_cap]
353                    }
354                };
355                let endpoint = avdtp::StreamEndpoint::new(
356                    seid_range.next().unwrap(),
357                    avdtp::MediaType::Audio,
358                    endpoint_type,
359                    capabilities,
360                )?;
361                streams.insert(Stream::build(endpoint, builder.clone()));
362            }
363        }
364        Ok(streams)
365    }
366
367    pub async fn negotiation(
368        &self,
369        peer_id: &PeerId,
370        offload: Option<AudioOffloadExtProxy>,
371        preferred_direction: avdtp::EndpointType,
372    ) -> Result<CodecNegotiation, Error> {
373        let mut caps_available = Vec::new();
374        for builder in &self.builders {
375            caps_available.extend(
376                builder
377                    .supported_configs(peer_id, offload.clone())
378                    .await?
379                    .iter()
380                    .map(ServiceCapability::from),
381            );
382        }
383        Ok(CodecNegotiation::build(caps_available, preferred_direction)?)
384    }
385}
386
387impl Inspect for &mut StreamsBuilder {
388    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
389        self.node = parent.create_child(name.as_ref());
390        self.node.record_uint("builders", self.builders.len() as u64);
391        Ok(())
392    }
393}
394
395/// A set of streams, indexed by their local endpoint ID.
396#[derive(Default)]
397pub struct Streams {
398    streams: HashMap<StreamEndpointId, Stream>,
399    inspect_node: fuchsia_inspect::Node,
400}
401
402impl fmt::Debug for Streams {
403    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404        f.debug_struct("Streams").field("streams", &self.streams).finish()
405    }
406}
407
408impl Streams {
409    /// Makes a copy of this set of streams, but with all streams copied with their states set to
410    /// idle.
411    pub fn as_new(&self) -> Self {
412        let streams =
413            self.streams.iter().map(|(id, stream)| (id.clone(), stream.as_new())).collect();
414        Self { streams, ..Default::default() }
415    }
416
417    /// Returns true if there are no streams in the set.
418    pub fn is_empty(&self) -> bool {
419        self.streams.is_empty()
420    }
421
422    /// Inserts a stream, indexing it by the local endpoint id.
423    /// It replaces any other stream with the same endpoint id.
424    pub fn insert(&mut self, stream: Stream) {
425        let local_id = stream.endpoint().local_id().clone();
426        if self.streams.insert(local_id.clone(), stream).is_some() {
427            warn!("Replacing stream with local id {local_id}");
428        }
429    }
430
431    /// Retrieves a reference to the Stream referenced by `id`, if the stream exists,
432    pub fn get(&self, id: &StreamEndpointId) -> Option<&Stream> {
433        self.streams.get(id)
434    }
435
436    /// Retrieves a mutable reference to the Stream referenced by `id`, if the stream exists,
437    pub fn get_mut(&mut self, id: &StreamEndpointId) -> Option<&mut Stream> {
438        self.streams.get_mut(id)
439    }
440
441    /// Returns a vector of information on all the contained streams.
442    pub fn information(&self) -> Vec<avdtp::StreamInformation> {
443        self.streams.values().map(|x| x.endpoint().information()).collect()
444    }
445
446    /// Returns streams that are in the open (established but not streaming) state
447    pub fn open(&self) -> impl Iterator<Item = &Stream> {
448        self.streams.values().filter(|s| s.endpoint().state() == avdtp::StreamState::Open)
449    }
450
451    /// Returns streams that are streaming.
452    pub fn streaming(&self) -> impl Iterator<Item = &Stream> {
453        self.streams.values().filter(|s| s.endpoint().state() == avdtp::StreamState::Streaming)
454    }
455
456    /// Finds streams in the set which are compatible with `codec_config`.
457    pub fn compatible(&self, codec_config: MediaCodecConfig) -> impl Iterator<Item = &Stream> {
458        self.streams.values().filter(move |s| s.config_compatible(&codec_config))
459    }
460}
461
462impl Inspect for &mut Streams {
463    // Attach self to `parent`
464    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
465        self.inspect_node = parent.create_child(name.as_ref());
466        for stream in self.streams.values_mut() {
467            stream.iattach(&self.inspect_node, inspect::unique_name("stream_"))?;
468        }
469        Ok(())
470    }
471}
472
473#[cfg(test)]
474pub(crate) mod tests {
475    use super::*;
476
477    use fuchsia_async as fasync;
478    use fuchsia_bluetooth::types::Channel;
479    use std::pin::pin;
480    use std::task::Poll;
481
482    use crate::media_task::tests::TestMediaTaskBuilder;
483    use crate::media_types::*;
484
485    pub(crate) fn sbc_mediacodec_capability() -> avdtp::ServiceCapability {
486        let sbc_codec_info = SbcCodecInfo::new(
487            SbcSamplingFrequency::FREQ48000HZ,
488            SbcChannelMode::MONO | SbcChannelMode::JOINT_STEREO,
489            SbcBlockCount::MANDATORY_SRC,
490            SbcSubBands::MANDATORY_SRC,
491            SbcAllocation::MANDATORY_SRC,
492            SbcCodecInfo::BITPOOL_MIN,
493            SbcCodecInfo::BITPOOL_MAX,
494        )
495        .expect("SBC codec info");
496
497        ServiceCapability::MediaCodec {
498            media_type: avdtp::MediaType::Audio,
499            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
500            codec_extra: sbc_codec_info.to_bytes().to_vec(),
501        }
502    }
503
504    pub(crate) fn aac_mediacodec_capability(bitrate: u32) -> avdtp::ServiceCapability {
505        let codec_info = AacCodecInfo::new(
506            AacObjectType::MANDATORY_SRC,
507            AacSamplingFrequency::FREQ48000HZ,
508            AacChannels::TWO,
509            true,
510            bitrate,
511        )
512        .expect("should work");
513        ServiceCapability::MediaCodec {
514            media_type: avdtp::MediaType::Audio,
515            codec_type: avdtp::MediaCodecType::AUDIO_AAC,
516            codec_extra: codec_info.to_bytes().to_vec(),
517        }
518    }
519
520    pub(crate) fn make_sbc_endpoint(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
521        StreamEndpoint::new(
522            seid,
523            avdtp::MediaType::Audio,
524            direction,
525            vec![avdtp::ServiceCapability::MediaTransport, sbc_mediacodec_capability()],
526        )
527        .expect("endpoint creation should succeed")
528    }
529
530    const LOW_BITRATE: u32 = 320_000;
531    const HIGH_BITRATE: u32 = 393_216;
532
533    pub(crate) fn make_aac_endpoint(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
534        StreamEndpoint::new(
535            seid,
536            avdtp::MediaType::Audio,
537            direction,
538            vec![avdtp::ServiceCapability::MediaTransport, aac_mediacodec_capability(LOW_BITRATE)],
539        )
540        .expect("endpoint creation should succeed")
541    }
542
543    fn make_stream(seid: u8, codec_type: avdtp::MediaCodecType) -> Stream {
544        let endpoint = match codec_type {
545            avdtp::MediaCodecType::AUDIO_SBC => {
546                make_sbc_endpoint(seid, avdtp::EndpointType::Source)
547            }
548            avdtp::MediaCodecType::AUDIO_AAC => {
549                make_aac_endpoint(seid, avdtp::EndpointType::Source)
550            }
551            _ => panic!("Unsupported codec_type"),
552        };
553        Stream::build(endpoint, TestMediaTaskBuilder::new().builder())
554    }
555
556    #[fuchsia::test]
557    fn streams_basic_functionality() {
558        let mut streams = Streams::default();
559
560        streams.insert(make_stream(1, avdtp::MediaCodecType::AUDIO_SBC));
561        streams.insert(make_stream(6, avdtp::MediaCodecType::AUDIO_AAC));
562
563        let first_id = 1_u8.try_into().expect("good id");
564        let missing_id = 5_u8.try_into().expect("good id");
565
566        assert!(streams.get(&first_id).is_some());
567        assert!(streams.get(&missing_id).is_none());
568
569        assert!(streams.get_mut(&first_id).is_some());
570        assert!(streams.get_mut(&missing_id).is_none());
571
572        let expected_info = vec![
573            make_sbc_endpoint(1, avdtp::EndpointType::Source).information(),
574            make_aac_endpoint(6, avdtp::EndpointType::Source).information(),
575        ];
576
577        let infos = streams.information();
578
579        assert_eq!(expected_info.len(), infos.len());
580
581        if infos[0].id() == &first_id {
582            assert_eq!(expected_info[0], infos[0]);
583            assert_eq!(expected_info[1], infos[1]);
584        } else {
585            assert_eq!(expected_info[0], infos[1]);
586            assert_eq!(expected_info[1], infos[0]);
587        }
588    }
589
590    #[fuchsia::test]
591    fn streams_filters_compatible_codecs() {
592        let mut streams = Streams::default();
593        streams.insert(make_stream(1, avdtp::MediaCodecType::AUDIO_SBC));
594        streams.insert(make_stream(6, avdtp::MediaCodecType::AUDIO_AAC));
595
596        // Even if the other bitrate is higher, we can negotiate to the lower bitrate.
597        let config_high_bitrate_aac =
598            MediaCodecConfig::try_from(&aac_mediacodec_capability(HIGH_BITRATE)).unwrap();
599
600        let compatible: Vec<_> = streams.compatible(config_high_bitrate_aac).collect();
601        assert_eq!(compatible.len(), 1);
602        let codec_capability = compatible[0]
603            .endpoint()
604            .capabilities()
605            .into_iter()
606            .find(|x| x.category() == avdtp::ServiceCategory::MediaCodec)
607            .expect("should have a codec");
608        assert_eq!(
609            MediaCodecConfig::try_from(codec_capability).unwrap().codec_type(),
610            &avdtp::MediaCodecType::AUDIO_AAC
611        );
612    }
613
614    #[fuchsia::test]
615    fn rejects_unsupported_configurations() {
616        // Needed to make fasync::Tasks.
617        let _exec = fasync::TestExecutor::new();
618        let mut builder = TestMediaTaskBuilder::new_reconfigurable();
619        let mut stream =
620            Stream::build(make_sbc_endpoint(1, avdtp::EndpointType::Source), builder.builder());
621
622        // the default test stream only supports 48000hz
623        let unsupported_sbc_codec_info = SbcCodecInfo::new(
624            SbcSamplingFrequency::FREQ44100HZ,
625            SbcChannelMode::JOINT_STEREO,
626            SbcBlockCount::SIXTEEN,
627            SbcSubBands::EIGHT,
628            SbcAllocation::LOUDNESS,
629            53,
630            53,
631        )
632        .expect("SBC codec info");
633
634        let unsupported_caps = vec![ServiceCapability::MediaCodec {
635            media_type: avdtp::MediaType::Audio,
636            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
637            codec_extra: unsupported_sbc_codec_info.to_bytes().to_vec(),
638        }];
639
640        let peer_id = PeerId(1);
641        let stream_id = 1.try_into().expect("StreamEndpointId");
642        let res = stream.configure(&peer_id, &stream_id, unsupported_caps.clone());
643        assert!(res.is_err());
644        assert_eq!(
645            res.err(),
646            Some((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
647        );
648
649        assert_eq!(
650            stream.reconfigure(unsupported_caps.clone()),
651            Err((ServiceCategory::None, ErrorCode::BadState))
652        );
653
654        let supported_sbc_codec_info = SbcCodecInfo::new(
655            SbcSamplingFrequency::FREQ48000HZ,
656            SbcChannelMode::JOINT_STEREO,
657            SbcBlockCount::SIXTEEN,
658            SbcSubBands::EIGHT,
659            SbcAllocation::LOUDNESS,
660            53,
661            53,
662        )
663        .expect("SBC codec info");
664
665        let sbc_codec_cap = ServiceCapability::MediaCodec {
666            media_type: avdtp::MediaType::Audio,
667            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
668            codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
669        };
670
671        let supported_caps = vec![ServiceCapability::MediaTransport, sbc_codec_cap.clone()];
672
673        let res = stream.configure(&peer_id, &stream_id, supported_caps.clone());
674        assert!(res.is_ok());
675
676        // need to be in the open state for reconfigure
677        assert!(stream.endpoint_mut().establish().is_ok());
678        let (_remote, transport) = Channel::create();
679        match stream.endpoint_mut().receive_channel(transport) {
680            Ok(false) => {}
681            Ok(true) => panic!("Only should be expecting one channel"),
682            Err(e) => panic!("Expected channel to be accepted, got {:?}", e),
683        };
684
685        assert_eq!(
686            stream.reconfigure(unsupported_caps.clone()),
687            Err((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
688        );
689
690        let new_codec_caps = vec![ServiceCapability::MediaCodec {
691            media_type: avdtp::MediaType::Audio,
692            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
693            codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
694        }];
695
696        assert!(stream.reconfigure(new_codec_caps.clone()).is_ok());
697
698        // Should be able to start after reconfigure, and we used the right configuration.
699        let _ = stream.start().expect("stream should start ok");
700        let task = builder.expect_task();
701        assert_eq!(task.codec_config, MediaCodecConfig::try_from(&new_codec_caps[0]).unwrap());
702    }
703
704    #[fuchsia::test]
705    fn reconfigure_runner_fails() {
706        // Needed to make fasync::Tasks.
707        let _exec = fasync::TestExecutor::new();
708        let mut builder = TestMediaTaskBuilder::new();
709        let mut stream =
710            Stream::build(make_sbc_endpoint(1, avdtp::EndpointType::Source), builder.builder());
711
712        let supported_sbc_codec_info = SbcCodecInfo::new(
713            SbcSamplingFrequency::FREQ48000HZ,
714            SbcChannelMode::JOINT_STEREO,
715            SbcBlockCount::SIXTEEN,
716            SbcSubBands::EIGHT,
717            SbcAllocation::LOUDNESS,
718            53,
719            53,
720        )
721        .expect("SBC codec info");
722
723        let orig_codec_cap = ServiceCapability::MediaCodec {
724            media_type: avdtp::MediaType::Audio,
725            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
726            codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
727        };
728
729        let supported_caps = vec![ServiceCapability::MediaTransport, orig_codec_cap.clone()];
730
731        let res = stream.configure(&PeerId(1), &(1.try_into().unwrap()), supported_caps.clone());
732        assert!(res.is_ok());
733
734        // need to be in the open state for reconfigure
735        assert!(stream.endpoint_mut().establish().is_ok());
736        let (_remote, transport) = Channel::create();
737        match stream.endpoint_mut().receive_channel(transport) {
738            Ok(false) => {}
739            Ok(true) => panic!("Only should be expecting one channel"),
740            Err(e) => panic!("Expected channel to be accepted, got {:?}", e),
741        };
742
743        // Should be able to start after configure, and we used the right configuration.
744        let _ = stream.start().expect("stream should start ok");
745        let task = builder.expect_task();
746        assert_eq!(task.codec_config, MediaCodecConfig::try_from(&orig_codec_cap).unwrap());
747        stream.suspend().expect("stream should suspend ok");
748
749        // Try to reconfigure with a supported configuration, but the builder doesn't reconfigure.
750        let mono_sbc_codec_info = SbcCodecInfo::new(
751            SbcSamplingFrequency::FREQ48000HZ,
752            SbcChannelMode::MONO,
753            SbcBlockCount::SIXTEEN,
754            SbcSubBands::EIGHT,
755            SbcAllocation::LOUDNESS,
756            53,
757            53,
758        )
759        .expect("SBC codec info");
760
761        let new_codec_caps = vec![ServiceCapability::MediaCodec {
762            media_type: avdtp::MediaType::Audio,
763            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
764            codec_extra: mono_sbc_codec_info.to_bytes().to_vec(),
765        }];
766
767        // Media Builder fails to reconfigure (as it's failing all reconfigures)
768        assert_eq!(
769            stream.reconfigure(new_codec_caps.clone()),
770            Err((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
771        );
772
773        // Should be able to start after reconfigure, but it will use the old configuration.
774        let _ = stream.start().expect("stream should start ok");
775        let task = builder.expect_task();
776        assert_eq!(task.codec_config, MediaCodecConfig::try_from(&orig_codec_cap).unwrap());
777        stream.suspend().expect("stream should suspend ok")
778    }
779
780    #[fuchsia::test]
781    fn suspend_stops_media_task() {
782        let mut exec = fasync::TestExecutor::new();
783
784        let mut task_builder = TestMediaTaskBuilder::new();
785        let mut stream = Stream::build(
786            make_sbc_endpoint(1, avdtp::EndpointType::Source),
787            task_builder.builder(),
788        );
789        let next_task_fut = task_builder.next_task();
790        let remote_id = 1_u8.try_into().expect("good id");
791
792        let sbc_codec_cap = sbc_mediacodec_capability();
793        let expected_codec_config =
794            MediaCodecConfig::try_from(&sbc_codec_cap).expect("codec config");
795
796        assert!(stream.configure(&PeerId(1), &remote_id, vec![]).is_err());
797        assert!(stream.configure(&PeerId(1), &remote_id, vec![sbc_codec_cap]).is_ok());
798
799        stream.endpoint_mut().establish().expect("establishment should start okay");
800        let (_remote, transport) = Channel::create();
801        let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
802
803        assert!(stream.start().is_ok());
804
805        // Task should be created here.
806        let task = {
807            let mut next_task_fut = pin!(next_task_fut);
808            match exec.run_until_stalled(&mut next_task_fut) {
809                Poll::Ready(Some(task)) => task,
810                x => panic!("Expected next task to be sent after start, got {:?}", x),
811            }
812        };
813
814        assert_eq!(task.peer_id, PeerId(1));
815        assert_eq!(task.codec_config, expected_codec_config);
816
817        assert!(task.is_started());
818        assert!(stream.suspend().is_ok());
819        assert!(!task.is_started());
820        assert!(stream.start().is_ok());
821
822        let next_task_fut = task_builder.next_task();
823        // Task should be created here.
824        let task = {
825            let mut next_task_fut = pin!(next_task_fut);
826            match exec.run_until_stalled(&mut next_task_fut) {
827                Poll::Ready(Some(task)) => task,
828                x => panic!("Expected next task to be sent after start, got {:?}", x),
829            }
830        };
831
832        assert!(task.is_started());
833    }
834
835    #[fuchsia::test]
836    fn media_task_ending_ends_future() {
837        let mut exec = fasync::TestExecutor::new();
838
839        let mut task_builder = TestMediaTaskBuilder::new();
840        let mut stream = Stream::build(
841            make_sbc_endpoint(1, avdtp::EndpointType::Source),
842            task_builder.builder(),
843        );
844        let next_task_fut = task_builder.next_task();
845        let peer_id = PeerId(1);
846        let remote_id = 1_u8.try_into().expect("good id");
847
848        let sbc_codec_cap = sbc_mediacodec_capability();
849        let expected_codec_config =
850            MediaCodecConfig::try_from(&sbc_codec_cap).expect("codec config");
851
852        assert!(stream.configure(&peer_id, &remote_id, vec![]).is_err());
853        assert!(stream.configure(&peer_id, &remote_id, vec![sbc_codec_cap]).is_ok());
854
855        stream.endpoint_mut().establish().expect("establishment should start okay");
856        let (_remote, transport) = Channel::create();
857        let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
858
859        let stream_finish_fut = stream.start().expect("start to succeed with a future");
860        let mut stream_finish_fut = pin!(stream_finish_fut);
861
862        let task = {
863            let mut next_task_fut = pin!(next_task_fut);
864            match exec.run_until_stalled(&mut next_task_fut) {
865                Poll::Ready(Some(task)) => task,
866                x => panic!("Expected next task to be sent after start, got {:?}", x),
867            }
868        };
869
870        assert_eq!(task.peer_id, PeerId(1));
871        assert_eq!(task.codec_config, expected_codec_config);
872
873        // Does not need to be polled to be started.
874        assert!(task.is_started());
875
876        assert!(exec.run_until_stalled(&mut stream_finish_fut).is_pending());
877
878        task.end_prematurely(Some(Ok(())));
879        assert!(!task.is_started());
880
881        // The future should be finished, since the task ended.
882        match exec.run_until_stalled(&mut stream_finish_fut) {
883            Poll::Ready(Ok(())) => {}
884            x => panic!("Expected to get ready Ok from finish future, but got {x:?}"),
885        };
886
887        // Should still be able to suspend the stream.
888        assert!(stream.suspend().is_ok());
889
890        // And be able to restart it.
891        let result_fut = stream.start().expect("start to succeed with a future");
892
893        let next_task_fut = task_builder.next_task();
894        let mut next_task_fut = pin!(next_task_fut);
895        let task = match exec.run_until_stalled(&mut next_task_fut) {
896            Poll::Ready(Some(task)) => task,
897            x => panic!("Expected next task to be sent after restart, got {x:?}"),
898        };
899
900        assert!(task.is_started());
901
902        // Dropping the result future shouldn't stop the media task.
903        drop(result_fut);
904
905        assert!(task.is_started());
906    }
907
908    #[fuchsia::test]
909    fn set_delay_correct_results_transmits_to_task() {
910        let mut _exec = fasync::TestExecutor::new();
911
912        let mut task_builder = TestMediaTaskBuilder::new_delayable();
913        let mut stream = Stream::build(
914            make_sbc_endpoint(1, avdtp::EndpointType::Source),
915            task_builder.builder(),
916        );
917        let peer_id = PeerId(1);
918        let remote_id = 1_u8.try_into().expect("good id");
919
920        let sbc_codec_cap = sbc_mediacodec_capability();
921
922        let code = stream
923            .set_delay(std::time::Duration::ZERO)
924            .expect_err("before configure, can't set a delay");
925        assert_eq!(ErrorCode::BadState, code);
926
927        assert!(stream.configure(&peer_id, &remote_id, vec![]).is_err());
928        assert!(stream.configure(&peer_id, &remote_id, vec![sbc_codec_cap]).is_ok());
929
930        let delay_set = std::time::Duration::from_nanos(0xfeed);
931
932        stream.set_delay(delay_set.clone()).expect("after configure, delay is fine");
933
934        stream.endpoint_mut().establish().expect("establishment should start okay");
935        let (_remote, transport) = Channel::create();
936        let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
937        let _stream_finish_fut = stream.start().expect("start to succeed with a future");
938
939        let media_task = task_builder.expect_task();
940        assert_eq!(delay_set, media_task.delay);
941    }
942}