1use anyhow::Error;
6use bt_avdtp::{
7 self as avdtp, ErrorCode, ServiceCapability, ServiceCategory, StreamEndpoint, StreamEndpointId,
8};
9use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
10use fuchsia_bluetooth::types::PeerId;
11use fuchsia_inspect::{self as inspect, Property};
12use fuchsia_inspect_derive::{AttachError, Inspect};
13use futures::future::BoxFuture;
14use futures::{FutureExt, TryFutureExt};
15use log::{info, warn};
16use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19use std::time::Duration;
20
21use crate::codec::{CodecNegotiation, MediaCodecConfig};
22use crate::media_task::{MediaTask, MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
23
24pub struct Stream {
28 endpoint: StreamEndpoint,
29 media_task_builder: Arc<Box<dyn MediaTaskBuilder>>,
31 media_task_runner: Option<Box<dyn MediaTaskRunner>>,
33 media_task: Option<Box<dyn MediaTask>>,
35 peer_id: Option<PeerId>,
38 inspect: fuchsia_inspect::Node,
40}
41
42impl fmt::Debug for Stream {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 f.debug_struct("Stream")
45 .field("endpoint", &self.endpoint)
46 .field("peer_id", &self.peer_id)
47 .field("has media_task", &self.media_task.is_some())
48 .finish()
49 }
50}
51
52impl Inspect for &mut Stream {
53 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
56 self.inspect = parent.create_child(name.as_ref());
57
58 let endpoint_state_prop = self.inspect.create_string("endpoint_state", "");
59 let callback =
60 move |stream: &StreamEndpoint| endpoint_state_prop.set(&format!("{:?}", stream));
61 callback(self.endpoint());
62 self.endpoint_mut().set_update_callback(Some(Box::new(callback)));
63 Ok(())
64 }
65}
66
67impl Stream {
68 pub fn build(endpoint: StreamEndpoint, media_task_builder: Box<dyn MediaTaskBuilder>) -> Self {
69 Self {
70 endpoint,
71 media_task_builder: Arc::new(media_task_builder),
72 media_task_runner: None,
73 media_task: None,
74 peer_id: None,
75 inspect: Default::default(),
76 }
77 }
78
79 fn as_new(&self) -> Self {
80 Self {
81 endpoint: self.endpoint.as_new(),
82 media_task_builder: self.media_task_builder.clone(),
83 media_task_runner: None,
84 media_task: None,
85 peer_id: None,
86 inspect: Default::default(),
87 }
88 }
89
90 pub fn endpoint(&self) -> &StreamEndpoint {
91 &self.endpoint
92 }
93
94 pub fn endpoint_mut(&mut self) -> &mut StreamEndpoint {
95 &mut self.endpoint
96 }
97
98 fn media_codec_config(&self) -> Option<MediaCodecConfig> {
99 find_codec_capability(self.endpoint.capabilities())
100 .and_then(|x| MediaCodecConfig::try_from(x).ok())
101 }
102
103 fn config_supported(&self, config: &MediaCodecConfig) -> bool {
106 let Some(supported) = self.media_codec_config() else {
107 return false;
108 };
109 supported.supports(&config)
110 }
111
112 fn config_compatible(&self, config: &MediaCodecConfig) -> bool {
115 let Some(supported) = self.media_codec_config() else {
116 return false;
117 };
118 MediaCodecConfig::negotiate(&supported, config).is_some()
119 }
120
121 fn build_media_task(
122 &self,
123 peer_id: &PeerId,
124 config: &MediaCodecConfig,
125 ) -> Option<Box<dyn MediaTaskRunner>> {
126 match self.media_task_builder.configure(peer_id, &config) {
127 Err(e) => {
128 warn!("Failed to build media task: {e:?}");
129 None
130 }
131 Ok(mut media_task_runner) => {
132 if let Err(e) = media_task_runner.iattach(&self.inspect, "media_task") {
133 info!("Media Task inspect: {e}");
134 }
135 Some(media_task_runner)
136 }
137 }
138 }
139
140 fn supported_config_from_capability(
141 &self,
142 requested_cap: &ServiceCapability,
143 ) -> Option<MediaCodecConfig> {
144 MediaCodecConfig::try_from(requested_cap).ok().filter(|c| self.config_supported(c))
145 }
146
147 pub fn configure(
148 &mut self,
149 peer_id: &PeerId,
150 remote_id: &StreamEndpointId,
151 capabilities: Vec<ServiceCapability>,
152 ) -> Result<(), (ServiceCategory, ErrorCode)> {
153 if self.media_task.is_some() {
154 return Err((ServiceCategory::None, ErrorCode::BadState));
155 }
156 let unsupported = ErrorCode::UnsupportedConfiguration;
157 let codec_cap =
158 find_codec_capability(&capabilities).ok_or((ServiceCategory::None, unsupported))?;
159 let media_unsupported = (ServiceCategory::MediaCodec, unsupported);
160 let config = self.supported_config_from_capability(codec_cap).ok_or(media_unsupported)?;
161 self.media_task_runner =
162 Some(self.build_media_task(peer_id, &config).ok_or(media_unsupported)?);
163 self.peer_id = Some(peer_id.clone());
164 self.endpoint.configure(remote_id, capabilities)
165 }
166
167 pub fn set_delay(&mut self, delay: Duration) -> Result<(), ErrorCode> {
168 let Some(runner) = self.media_task_runner.as_mut() else {
169 return Err(ErrorCode::BadState);
170 };
171 match runner.set_delay(delay) {
172 Err(MediaTaskError::NotSupported) => Err(ErrorCode::NotSupportedCommand),
173 Err(_) => Err(ErrorCode::BadState),
174 Ok(()) => Ok(()),
175 }
176 }
177
178 pub fn reconfigure(
179 &mut self,
180 capabilities: Vec<ServiceCapability>,
181 ) -> Result<(), (ServiceCategory, ErrorCode)> {
182 let bad_state = (ServiceCategory::None, ErrorCode::BadState);
183 let _peer_id = self.peer_id.as_ref().ok_or(bad_state)?;
184 if let Some(requested_codec_cap) = find_codec_capability(&capabilities) {
185 let unsupported = (ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration);
186 let requested =
187 self.supported_config_from_capability(requested_codec_cap).ok_or(unsupported)?;
188 self.media_task_runner
189 .as_mut()
190 .ok_or(bad_state)?
191 .reconfigure(&requested)
192 .or(Err(unsupported))?;
193 }
194 self.endpoint.reconfigure(capabilities)
195 }
196
197 fn media_runner_ref(&mut self) -> Result<&mut Box<dyn MediaTaskRunner>, ErrorCode> {
198 self.media_task_runner.as_mut().ok_or(ErrorCode::BadState)
199 }
200
201 pub fn start(&mut self) -> Result<BoxFuture<'static, Result<(), Error>>, ErrorCode> {
205 if self.media_task_runner.is_none() {
206 return Err(ErrorCode::BadState);
207 };
208 let transport = self.endpoint.take_transport().ok_or(ErrorCode::BadState)?;
209 let _ = self.endpoint.start()?;
210 let offload = self.endpoint.audio_offload();
211 let mut task = match self.media_runner_ref()?.start(transport, offload) {
212 Ok(media_task) => media_task,
213 Err(_e) => {
214 let _ = self.endpoint.suspend()?;
215 return Err(ErrorCode::BadState);
216 }
217 };
218 let finished = task.finished();
219 self.media_task = Some(task);
220 Ok(finished.err_into().boxed())
221 }
222
223 pub fn suspend(&mut self) -> Result<(), ErrorCode> {
225 self.endpoint.suspend()?;
226 let _ = self.media_task.take().ok_or(ErrorCode::BadState)?.stop();
227 Ok(())
228 }
229
230 fn stop_media_task(&mut self) {
231 if let Some(mut task) = self.media_task.take() {
232 let _ = task.stop();
234 }
235 self.media_task_runner = None;
236 self.peer_id = None;
237 }
238
239 pub fn release(
241 &mut self,
242 responder: avdtp::SimpleResponder,
243 peer: &avdtp::Peer,
244 ) -> avdtp::Result<()> {
245 self.stop_media_task();
246 self.endpoint.release(responder, peer)
247 }
248
249 pub fn abort(&mut self) {
250 self.stop_media_task();
251 self.endpoint.abort()
252 }
253
254 pub async fn initiate_abort(&mut self, peer: &avdtp::Peer) {
255 self.stop_media_task();
256 self.endpoint.initiate_abort(peer).await
257 }
258}
259
260fn find_codec_capability(capabilities: &[ServiceCapability]) -> Option<&ServiceCapability> {
261 capabilities.iter().find(|cap| cap.category() == ServiceCategory::MediaCodec)
262}
263
264#[derive(Clone, Debug)]
266struct SeidRangeFrom {
267 from: u8,
268}
269
270impl Default for SeidRangeFrom {
271 fn default() -> Self {
272 Self { from: 1 }
273 }
274}
275
276impl Iterator for SeidRangeFrom {
277 type Item = u8;
278
279 fn next(&mut self) -> Option<Self::Item> {
280 let res = self.from;
281 if self.from == 0x3E {
282 self.from = 0x01;
283 } else {
284 self.from += 1;
285 }
286 Some(res)
287 }
288}
289
290pub struct StreamsBuilder {
293 builders: Vec<Box<dyn MediaTaskBuilder>>,
294 seid_range: SeidRangeFrom,
295 node: inspect::Node,
296}
297
298impl Default for StreamsBuilder {
299 fn default() -> Self {
300 Self {
301 builders: Default::default(),
302 seid_range: SeidRangeFrom { from: Self::START_SEID },
303 node: Default::default(),
304 }
305 }
306}
307
308impl Clone for StreamsBuilder {
309 fn clone(&self) -> Self {
310 Self {
311 builders: self.builders.clone(),
312 node: Default::default(),
313 seid_range: self.seid_range.clone(),
314 }
315 }
316}
317
318impl StreamsBuilder {
319 const START_SEID: u8 = 8;
322
323 pub fn add_builder(&mut self, builder: impl MediaTaskBuilder + 'static) {
325 self.builders.push(Box::new(builder));
326 self.node.record_uint("builders", self.builders.len() as u64);
327 }
328
329 pub async fn peer_streams(
330 &self,
331 peer_id: &PeerId,
332 offload: Option<AudioOffloadExtProxy>,
333 ) -> Result<Streams, MediaTaskError> {
334 let mut streams = Streams::default();
335 let mut seid_range = self.seid_range.clone();
336 for builder in &self.builders {
337 let endpoint_type = builder.direction();
338 let supported_res = builder.supported_configs(peer_id, offload.clone()).await;
339 let Ok(supported) = supported_res else {
340 info!(e:? = supported_res.err().unwrap(); "Failed to get supported configs from builder, skipping");
341 continue;
342 };
343 let codec_caps = supported.iter().map(ServiceCapability::from);
344 for codec_cap in codec_caps {
345 let capabilities = match endpoint_type {
346 avdtp::EndpointType::Source => vec![
347 ServiceCapability::MediaTransport,
348 ServiceCapability::DelayReporting,
349 codec_cap,
350 ],
351 avdtp::EndpointType::Sink => {
352 vec![ServiceCapability::MediaTransport, codec_cap]
353 }
354 };
355 let endpoint = avdtp::StreamEndpoint::new(
356 seid_range.next().unwrap(),
357 avdtp::MediaType::Audio,
358 endpoint_type,
359 capabilities,
360 )?;
361 streams.insert(Stream::build(endpoint, builder.clone()));
362 }
363 }
364 Ok(streams)
365 }
366
367 pub async fn negotiation(
368 &self,
369 peer_id: &PeerId,
370 offload: Option<AudioOffloadExtProxy>,
371 preferred_direction: avdtp::EndpointType,
372 ) -> Result<CodecNegotiation, Error> {
373 let mut caps_available = Vec::new();
374 for builder in &self.builders {
375 caps_available.extend(
376 builder
377 .supported_configs(peer_id, offload.clone())
378 .await?
379 .iter()
380 .map(ServiceCapability::from),
381 );
382 }
383 Ok(CodecNegotiation::build(caps_available, preferred_direction)?)
384 }
385}
386
387impl Inspect for &mut StreamsBuilder {
388 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
389 self.node = parent.create_child(name.as_ref());
390 self.node.record_uint("builders", self.builders.len() as u64);
391 Ok(())
392 }
393}
394
395#[derive(Default)]
397pub struct Streams {
398 streams: HashMap<StreamEndpointId, Stream>,
399 inspect_node: fuchsia_inspect::Node,
400}
401
402impl fmt::Debug for Streams {
403 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404 f.debug_struct("Streams").field("streams", &self.streams).finish()
405 }
406}
407
408impl Streams {
409 pub fn as_new(&self) -> Self {
412 let streams =
413 self.streams.iter().map(|(id, stream)| (id.clone(), stream.as_new())).collect();
414 Self { streams, ..Default::default() }
415 }
416
417 pub fn is_empty(&self) -> bool {
419 self.streams.is_empty()
420 }
421
422 pub fn insert(&mut self, stream: Stream) {
425 let local_id = stream.endpoint().local_id().clone();
426 if self.streams.insert(local_id.clone(), stream).is_some() {
427 warn!("Replacing stream with local id {local_id}");
428 }
429 }
430
431 pub fn get(&self, id: &StreamEndpointId) -> Option<&Stream> {
433 self.streams.get(id)
434 }
435
436 pub fn get_mut(&mut self, id: &StreamEndpointId) -> Option<&mut Stream> {
438 self.streams.get_mut(id)
439 }
440
441 pub fn information(&self) -> Vec<avdtp::StreamInformation> {
443 self.streams.values().map(|x| x.endpoint().information()).collect()
444 }
445
446 pub fn open(&self) -> impl Iterator<Item = &Stream> {
448 self.streams.values().filter(|s| s.endpoint().state() == avdtp::StreamState::Open)
449 }
450
451 pub fn streaming(&self) -> impl Iterator<Item = &Stream> {
453 self.streams.values().filter(|s| s.endpoint().state() == avdtp::StreamState::Streaming)
454 }
455
456 pub fn compatible(&self, codec_config: MediaCodecConfig) -> impl Iterator<Item = &Stream> {
458 self.streams.values().filter(move |s| s.config_compatible(&codec_config))
459 }
460}
461
462impl Inspect for &mut Streams {
463 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
465 self.inspect_node = parent.create_child(name.as_ref());
466 for stream in self.streams.values_mut() {
467 stream.iattach(&self.inspect_node, inspect::unique_name("stream_"))?;
468 }
469 Ok(())
470 }
471}
472
473#[cfg(test)]
474pub(crate) mod tests {
475 use super::*;
476
477 use fuchsia_async as fasync;
478 use fuchsia_bluetooth::types::Channel;
479 use std::pin::pin;
480 use std::task::Poll;
481
482 use crate::media_task::tests::TestMediaTaskBuilder;
483 use crate::media_types::*;
484
485 pub(crate) fn sbc_mediacodec_capability() -> avdtp::ServiceCapability {
486 let sbc_codec_info = SbcCodecInfo::new(
487 SbcSamplingFrequency::FREQ48000HZ,
488 SbcChannelMode::MONO | SbcChannelMode::JOINT_STEREO,
489 SbcBlockCount::MANDATORY_SRC,
490 SbcSubBands::MANDATORY_SRC,
491 SbcAllocation::MANDATORY_SRC,
492 SbcCodecInfo::BITPOOL_MIN,
493 SbcCodecInfo::BITPOOL_MAX,
494 )
495 .expect("SBC codec info");
496
497 ServiceCapability::MediaCodec {
498 media_type: avdtp::MediaType::Audio,
499 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
500 codec_extra: sbc_codec_info.to_bytes().to_vec(),
501 }
502 }
503
504 pub(crate) fn aac_mediacodec_capability(bitrate: u32) -> avdtp::ServiceCapability {
505 let codec_info = AacCodecInfo::new(
506 AacObjectType::MANDATORY_SRC,
507 AacSamplingFrequency::FREQ48000HZ,
508 AacChannels::TWO,
509 true,
510 bitrate,
511 )
512 .expect("should work");
513 ServiceCapability::MediaCodec {
514 media_type: avdtp::MediaType::Audio,
515 codec_type: avdtp::MediaCodecType::AUDIO_AAC,
516 codec_extra: codec_info.to_bytes().to_vec(),
517 }
518 }
519
520 pub(crate) fn make_sbc_endpoint(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
521 StreamEndpoint::new(
522 seid,
523 avdtp::MediaType::Audio,
524 direction,
525 vec![avdtp::ServiceCapability::MediaTransport, sbc_mediacodec_capability()],
526 )
527 .expect("endpoint creation should succeed")
528 }
529
530 const LOW_BITRATE: u32 = 320_000;
531 const HIGH_BITRATE: u32 = 393_216;
532
533 pub(crate) fn make_aac_endpoint(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
534 StreamEndpoint::new(
535 seid,
536 avdtp::MediaType::Audio,
537 direction,
538 vec![avdtp::ServiceCapability::MediaTransport, aac_mediacodec_capability(LOW_BITRATE)],
539 )
540 .expect("endpoint creation should succeed")
541 }
542
543 fn make_stream(seid: u8, codec_type: avdtp::MediaCodecType) -> Stream {
544 let endpoint = match codec_type {
545 avdtp::MediaCodecType::AUDIO_SBC => {
546 make_sbc_endpoint(seid, avdtp::EndpointType::Source)
547 }
548 avdtp::MediaCodecType::AUDIO_AAC => {
549 make_aac_endpoint(seid, avdtp::EndpointType::Source)
550 }
551 _ => panic!("Unsupported codec_type"),
552 };
553 Stream::build(endpoint, TestMediaTaskBuilder::new().builder())
554 }
555
556 #[fuchsia::test]
557 fn streams_basic_functionality() {
558 let mut streams = Streams::default();
559
560 streams.insert(make_stream(1, avdtp::MediaCodecType::AUDIO_SBC));
561 streams.insert(make_stream(6, avdtp::MediaCodecType::AUDIO_AAC));
562
563 let first_id = 1_u8.try_into().expect("good id");
564 let missing_id = 5_u8.try_into().expect("good id");
565
566 assert!(streams.get(&first_id).is_some());
567 assert!(streams.get(&missing_id).is_none());
568
569 assert!(streams.get_mut(&first_id).is_some());
570 assert!(streams.get_mut(&missing_id).is_none());
571
572 let expected_info = vec![
573 make_sbc_endpoint(1, avdtp::EndpointType::Source).information(),
574 make_aac_endpoint(6, avdtp::EndpointType::Source).information(),
575 ];
576
577 let infos = streams.information();
578
579 assert_eq!(expected_info.len(), infos.len());
580
581 if infos[0].id() == &first_id {
582 assert_eq!(expected_info[0], infos[0]);
583 assert_eq!(expected_info[1], infos[1]);
584 } else {
585 assert_eq!(expected_info[0], infos[1]);
586 assert_eq!(expected_info[1], infos[0]);
587 }
588 }
589
590 #[fuchsia::test]
591 fn streams_filters_compatible_codecs() {
592 let mut streams = Streams::default();
593 streams.insert(make_stream(1, avdtp::MediaCodecType::AUDIO_SBC));
594 streams.insert(make_stream(6, avdtp::MediaCodecType::AUDIO_AAC));
595
596 let config_high_bitrate_aac =
598 MediaCodecConfig::try_from(&aac_mediacodec_capability(HIGH_BITRATE)).unwrap();
599
600 let compatible: Vec<_> = streams.compatible(config_high_bitrate_aac).collect();
601 assert_eq!(compatible.len(), 1);
602 let codec_capability = compatible[0]
603 .endpoint()
604 .capabilities()
605 .into_iter()
606 .find(|x| x.category() == avdtp::ServiceCategory::MediaCodec)
607 .expect("should have a codec");
608 assert_eq!(
609 MediaCodecConfig::try_from(codec_capability).unwrap().codec_type(),
610 &avdtp::MediaCodecType::AUDIO_AAC
611 );
612 }
613
614 #[fuchsia::test]
615 fn rejects_unsupported_configurations() {
616 let _exec = fasync::TestExecutor::new();
618 let mut builder = TestMediaTaskBuilder::new_reconfigurable();
619 let mut stream =
620 Stream::build(make_sbc_endpoint(1, avdtp::EndpointType::Source), builder.builder());
621
622 let unsupported_sbc_codec_info = SbcCodecInfo::new(
624 SbcSamplingFrequency::FREQ44100HZ,
625 SbcChannelMode::JOINT_STEREO,
626 SbcBlockCount::SIXTEEN,
627 SbcSubBands::EIGHT,
628 SbcAllocation::LOUDNESS,
629 53,
630 53,
631 )
632 .expect("SBC codec info");
633
634 let unsupported_caps = vec![ServiceCapability::MediaCodec {
635 media_type: avdtp::MediaType::Audio,
636 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
637 codec_extra: unsupported_sbc_codec_info.to_bytes().to_vec(),
638 }];
639
640 let peer_id = PeerId(1);
641 let stream_id = 1.try_into().expect("StreamEndpointId");
642 let res = stream.configure(&peer_id, &stream_id, unsupported_caps.clone());
643 assert!(res.is_err());
644 assert_eq!(
645 res.err(),
646 Some((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
647 );
648
649 assert_eq!(
650 stream.reconfigure(unsupported_caps.clone()),
651 Err((ServiceCategory::None, ErrorCode::BadState))
652 );
653
654 let supported_sbc_codec_info = SbcCodecInfo::new(
655 SbcSamplingFrequency::FREQ48000HZ,
656 SbcChannelMode::JOINT_STEREO,
657 SbcBlockCount::SIXTEEN,
658 SbcSubBands::EIGHT,
659 SbcAllocation::LOUDNESS,
660 53,
661 53,
662 )
663 .expect("SBC codec info");
664
665 let sbc_codec_cap = ServiceCapability::MediaCodec {
666 media_type: avdtp::MediaType::Audio,
667 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
668 codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
669 };
670
671 let supported_caps = vec![ServiceCapability::MediaTransport, sbc_codec_cap.clone()];
672
673 let res = stream.configure(&peer_id, &stream_id, supported_caps.clone());
674 assert!(res.is_ok());
675
676 assert!(stream.endpoint_mut().establish().is_ok());
678 let (_remote, transport) = Channel::create();
679 match stream.endpoint_mut().receive_channel(transport) {
680 Ok(false) => {}
681 Ok(true) => panic!("Only should be expecting one channel"),
682 Err(e) => panic!("Expected channel to be accepted, got {:?}", e),
683 };
684
685 assert_eq!(
686 stream.reconfigure(unsupported_caps.clone()),
687 Err((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
688 );
689
690 let new_codec_caps = vec![ServiceCapability::MediaCodec {
691 media_type: avdtp::MediaType::Audio,
692 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
693 codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
694 }];
695
696 assert!(stream.reconfigure(new_codec_caps.clone()).is_ok());
697
698 let _ = stream.start().expect("stream should start ok");
700 let task = builder.expect_task();
701 assert_eq!(task.codec_config, MediaCodecConfig::try_from(&new_codec_caps[0]).unwrap());
702 }
703
704 #[fuchsia::test]
705 fn reconfigure_runner_fails() {
706 let _exec = fasync::TestExecutor::new();
708 let mut builder = TestMediaTaskBuilder::new();
709 let mut stream =
710 Stream::build(make_sbc_endpoint(1, avdtp::EndpointType::Source), builder.builder());
711
712 let supported_sbc_codec_info = SbcCodecInfo::new(
713 SbcSamplingFrequency::FREQ48000HZ,
714 SbcChannelMode::JOINT_STEREO,
715 SbcBlockCount::SIXTEEN,
716 SbcSubBands::EIGHT,
717 SbcAllocation::LOUDNESS,
718 53,
719 53,
720 )
721 .expect("SBC codec info");
722
723 let orig_codec_cap = ServiceCapability::MediaCodec {
724 media_type: avdtp::MediaType::Audio,
725 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
726 codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
727 };
728
729 let supported_caps = vec![ServiceCapability::MediaTransport, orig_codec_cap.clone()];
730
731 let res = stream.configure(&PeerId(1), &(1.try_into().unwrap()), supported_caps.clone());
732 assert!(res.is_ok());
733
734 assert!(stream.endpoint_mut().establish().is_ok());
736 let (_remote, transport) = Channel::create();
737 match stream.endpoint_mut().receive_channel(transport) {
738 Ok(false) => {}
739 Ok(true) => panic!("Only should be expecting one channel"),
740 Err(e) => panic!("Expected channel to be accepted, got {:?}", e),
741 };
742
743 let _ = stream.start().expect("stream should start ok");
745 let task = builder.expect_task();
746 assert_eq!(task.codec_config, MediaCodecConfig::try_from(&orig_codec_cap).unwrap());
747 stream.suspend().expect("stream should suspend ok");
748
749 let mono_sbc_codec_info = SbcCodecInfo::new(
751 SbcSamplingFrequency::FREQ48000HZ,
752 SbcChannelMode::MONO,
753 SbcBlockCount::SIXTEEN,
754 SbcSubBands::EIGHT,
755 SbcAllocation::LOUDNESS,
756 53,
757 53,
758 )
759 .expect("SBC codec info");
760
761 let new_codec_caps = vec![ServiceCapability::MediaCodec {
762 media_type: avdtp::MediaType::Audio,
763 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
764 codec_extra: mono_sbc_codec_info.to_bytes().to_vec(),
765 }];
766
767 assert_eq!(
769 stream.reconfigure(new_codec_caps.clone()),
770 Err((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
771 );
772
773 let _ = stream.start().expect("stream should start ok");
775 let task = builder.expect_task();
776 assert_eq!(task.codec_config, MediaCodecConfig::try_from(&orig_codec_cap).unwrap());
777 stream.suspend().expect("stream should suspend ok")
778 }
779
780 #[fuchsia::test]
781 fn suspend_stops_media_task() {
782 let mut exec = fasync::TestExecutor::new();
783
784 let mut task_builder = TestMediaTaskBuilder::new();
785 let mut stream = Stream::build(
786 make_sbc_endpoint(1, avdtp::EndpointType::Source),
787 task_builder.builder(),
788 );
789 let next_task_fut = task_builder.next_task();
790 let remote_id = 1_u8.try_into().expect("good id");
791
792 let sbc_codec_cap = sbc_mediacodec_capability();
793 let expected_codec_config =
794 MediaCodecConfig::try_from(&sbc_codec_cap).expect("codec config");
795
796 assert!(stream.configure(&PeerId(1), &remote_id, vec![]).is_err());
797 assert!(stream.configure(&PeerId(1), &remote_id, vec![sbc_codec_cap]).is_ok());
798
799 stream.endpoint_mut().establish().expect("establishment should start okay");
800 let (_remote, transport) = Channel::create();
801 let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
802
803 assert!(stream.start().is_ok());
804
805 let task = {
807 let mut next_task_fut = pin!(next_task_fut);
808 match exec.run_until_stalled(&mut next_task_fut) {
809 Poll::Ready(Some(task)) => task,
810 x => panic!("Expected next task to be sent after start, got {:?}", x),
811 }
812 };
813
814 assert_eq!(task.peer_id, PeerId(1));
815 assert_eq!(task.codec_config, expected_codec_config);
816
817 assert!(task.is_started());
818 assert!(stream.suspend().is_ok());
819 assert!(!task.is_started());
820 assert!(stream.start().is_ok());
821
822 let next_task_fut = task_builder.next_task();
823 let task = {
825 let mut next_task_fut = pin!(next_task_fut);
826 match exec.run_until_stalled(&mut next_task_fut) {
827 Poll::Ready(Some(task)) => task,
828 x => panic!("Expected next task to be sent after start, got {:?}", x),
829 }
830 };
831
832 assert!(task.is_started());
833 }
834
835 #[fuchsia::test]
836 fn media_task_ending_ends_future() {
837 let mut exec = fasync::TestExecutor::new();
838
839 let mut task_builder = TestMediaTaskBuilder::new();
840 let mut stream = Stream::build(
841 make_sbc_endpoint(1, avdtp::EndpointType::Source),
842 task_builder.builder(),
843 );
844 let next_task_fut = task_builder.next_task();
845 let peer_id = PeerId(1);
846 let remote_id = 1_u8.try_into().expect("good id");
847
848 let sbc_codec_cap = sbc_mediacodec_capability();
849 let expected_codec_config =
850 MediaCodecConfig::try_from(&sbc_codec_cap).expect("codec config");
851
852 assert!(stream.configure(&peer_id, &remote_id, vec![]).is_err());
853 assert!(stream.configure(&peer_id, &remote_id, vec![sbc_codec_cap]).is_ok());
854
855 stream.endpoint_mut().establish().expect("establishment should start okay");
856 let (_remote, transport) = Channel::create();
857 let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
858
859 let stream_finish_fut = stream.start().expect("start to succeed with a future");
860 let mut stream_finish_fut = pin!(stream_finish_fut);
861
862 let task = {
863 let mut next_task_fut = pin!(next_task_fut);
864 match exec.run_until_stalled(&mut next_task_fut) {
865 Poll::Ready(Some(task)) => task,
866 x => panic!("Expected next task to be sent after start, got {:?}", x),
867 }
868 };
869
870 assert_eq!(task.peer_id, PeerId(1));
871 assert_eq!(task.codec_config, expected_codec_config);
872
873 assert!(task.is_started());
875
876 assert!(exec.run_until_stalled(&mut stream_finish_fut).is_pending());
877
878 task.end_prematurely(Some(Ok(())));
879 assert!(!task.is_started());
880
881 match exec.run_until_stalled(&mut stream_finish_fut) {
883 Poll::Ready(Ok(())) => {}
884 x => panic!("Expected to get ready Ok from finish future, but got {x:?}"),
885 };
886
887 assert!(stream.suspend().is_ok());
889
890 let result_fut = stream.start().expect("start to succeed with a future");
892
893 let next_task_fut = task_builder.next_task();
894 let mut next_task_fut = pin!(next_task_fut);
895 let task = match exec.run_until_stalled(&mut next_task_fut) {
896 Poll::Ready(Some(task)) => task,
897 x => panic!("Expected next task to be sent after restart, got {x:?}"),
898 };
899
900 assert!(task.is_started());
901
902 drop(result_fut);
904
905 assert!(task.is_started());
906 }
907
908 #[fuchsia::test]
909 fn set_delay_correct_results_transmits_to_task() {
910 let mut _exec = fasync::TestExecutor::new();
911
912 let mut task_builder = TestMediaTaskBuilder::new_delayable();
913 let mut stream = Stream::build(
914 make_sbc_endpoint(1, avdtp::EndpointType::Source),
915 task_builder.builder(),
916 );
917 let peer_id = PeerId(1);
918 let remote_id = 1_u8.try_into().expect("good id");
919
920 let sbc_codec_cap = sbc_mediacodec_capability();
921
922 let code = stream
923 .set_delay(std::time::Duration::ZERO)
924 .expect_err("before configure, can't set a delay");
925 assert_eq!(ErrorCode::BadState, code);
926
927 assert!(stream.configure(&peer_id, &remote_id, vec![]).is_err());
928 assert!(stream.configure(&peer_id, &remote_id, vec![sbc_codec_cap]).is_ok());
929
930 let delay_set = std::time::Duration::from_nanos(0xfeed);
931
932 stream.set_delay(delay_set.clone()).expect("after configure, delay is fine");
933
934 stream.endpoint_mut().establish().expect("establishment should start okay");
935 let (_remote, transport) = Channel::create();
936 let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
937 let _stream_finish_fut = stream.start().expect("start to succeed with a future");
938
939 let media_task = task_builder.expect_task();
940 assert_eq!(delay_set, media_task.delay);
941 }
942}