1use anyhow::Error;
6use bt_avdtp::{
7 self as avdtp, ErrorCode, ServiceCapability, ServiceCategory, StreamEndpoint, StreamEndpointId,
8};
9use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
10use fuchsia_bluetooth::types::PeerId;
11use fuchsia_inspect::{self as inspect, Property};
12use fuchsia_inspect_derive::{AttachError, Inspect};
13use futures::future::BoxFuture;
14use futures::{FutureExt, TryFutureExt};
15use log::{info, warn};
16use std::collections::HashMap;
17use std::fmt;
18use std::sync::Arc;
19use std::time::Duration;
20
21use crate::codec::{CodecNegotiation, MediaCodecConfig};
22use crate::media_task::{MediaTask, MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
23
24pub struct Stream {
28 endpoint: StreamEndpoint,
29 media_task_builder: Arc<Box<dyn MediaTaskBuilder>>,
31 media_task_runner: Option<Box<dyn MediaTaskRunner>>,
33 media_task: Option<Box<dyn MediaTask>>,
35 peer_id: Option<PeerId>,
38 inspect: fuchsia_inspect::Node,
40}
41
42impl fmt::Debug for Stream {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 f.debug_struct("Stream")
45 .field("endpoint", &self.endpoint)
46 .field("peer_id", &self.peer_id)
47 .field("has media_task", &self.media_task.is_some())
48 .finish()
49 }
50}
51
52impl Inspect for &mut Stream {
53 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
56 self.inspect = parent.create_child(name.as_ref());
57
58 let endpoint_state_prop = self.inspect.create_string("endpoint_state", "");
59 let callback =
60 move |stream: &StreamEndpoint| endpoint_state_prop.set(&format!("{:?}", stream));
61 callback(self.endpoint());
62 self.endpoint_mut().set_update_callback(Some(Box::new(callback)));
63 Ok(())
64 }
65}
66
67impl Stream {
68 pub fn build(endpoint: StreamEndpoint, media_task_builder: Box<dyn MediaTaskBuilder>) -> Self {
69 Self {
70 endpoint,
71 media_task_builder: Arc::new(media_task_builder),
72 media_task_runner: None,
73 media_task: None,
74 peer_id: None,
75 inspect: Default::default(),
76 }
77 }
78
79 fn as_new(&self) -> Self {
80 Self {
81 endpoint: self.endpoint.as_new(),
82 media_task_builder: self.media_task_builder.clone(),
83 media_task_runner: None,
84 media_task: None,
85 peer_id: None,
86 inspect: Default::default(),
87 }
88 }
89
90 pub fn endpoint(&self) -> &StreamEndpoint {
91 &self.endpoint
92 }
93
94 pub fn endpoint_mut(&mut self) -> &mut StreamEndpoint {
95 &mut self.endpoint
96 }
97
98 fn media_codec_config(&self) -> Option<MediaCodecConfig> {
99 find_codec_capability(self.endpoint.capabilities())
100 .and_then(|x| MediaCodecConfig::try_from(x).ok())
101 }
102
103 fn config_supported(&self, config: &MediaCodecConfig) -> bool {
106 let Some(supported) = self.media_codec_config() else {
107 return false;
108 };
109 supported.supports(&config)
110 }
111
112 fn config_compatible(&self, config: &MediaCodecConfig) -> bool {
115 let Some(supported) = self.media_codec_config() else {
116 return false;
117 };
118 MediaCodecConfig::negotiate(&supported, config).is_some()
119 }
120
121 fn build_media_task(
122 &self,
123 peer_id: &PeerId,
124 config: &MediaCodecConfig,
125 ) -> Option<Box<dyn MediaTaskRunner>> {
126 match self.media_task_builder.configure(peer_id, &config) {
127 Err(e) => {
128 warn!("Failed to build media task: {e:?}");
129 None
130 }
131 Ok(mut media_task_runner) => {
132 if let Err(e) = media_task_runner.iattach(&self.inspect, "media_task") {
133 info!("Media Task inspect: {e}");
134 }
135 Some(media_task_runner)
136 }
137 }
138 }
139
140 fn supported_config_from_capability(
141 &self,
142 requested_cap: &ServiceCapability,
143 ) -> Option<MediaCodecConfig> {
144 MediaCodecConfig::try_from(requested_cap).ok().filter(|c| self.config_supported(c))
145 }
146
147 pub fn configure(
148 &mut self,
149 peer_id: &PeerId,
150 remote_id: &StreamEndpointId,
151 capabilities: Vec<ServiceCapability>,
152 ) -> Result<(), (ServiceCategory, ErrorCode)> {
153 if self.media_task.is_some() {
154 return Err((ServiceCategory::None, ErrorCode::BadState));
155 }
156 let unsupported = ErrorCode::UnsupportedConfiguration;
157 let codec_cap =
158 find_codec_capability(&capabilities).ok_or((ServiceCategory::None, unsupported))?;
159 let media_unsupported = (ServiceCategory::MediaCodec, unsupported);
160 let config = self.supported_config_from_capability(codec_cap).ok_or(media_unsupported)?;
161 self.media_task_runner =
162 Some(self.build_media_task(peer_id, &config).ok_or(media_unsupported)?);
163 self.peer_id = Some(peer_id.clone());
164 self.endpoint.configure(remote_id, capabilities)
165 }
166
167 pub fn set_delay(&mut self, delay: Duration) -> Result<(), ErrorCode> {
168 let Some(runner) = self.media_task_runner.as_mut() else {
169 return Err(ErrorCode::BadState);
170 };
171 match runner.set_delay(delay) {
172 Err(MediaTaskError::NotSupported) => Err(ErrorCode::NotSupportedCommand),
173 Err(_) => Err(ErrorCode::BadState),
174 Ok(()) => Ok(()),
175 }
176 }
177
178 pub fn reconfigure(
179 &mut self,
180 capabilities: Vec<ServiceCapability>,
181 ) -> Result<(), (ServiceCategory, ErrorCode)> {
182 let bad_state = (ServiceCategory::None, ErrorCode::BadState);
183 let _peer_id = self.peer_id.as_ref().ok_or(bad_state)?;
184 if let Some(requested_codec_cap) = find_codec_capability(&capabilities) {
185 let unsupported = (ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration);
186 let requested =
187 self.supported_config_from_capability(requested_codec_cap).ok_or(unsupported)?;
188 self.media_task_runner
189 .as_mut()
190 .ok_or(bad_state)?
191 .reconfigure(&requested)
192 .or(Err(unsupported))?;
193 }
194 self.endpoint.reconfigure(capabilities)
195 }
196
197 fn media_runner_ref(&mut self) -> Result<&mut Box<dyn MediaTaskRunner>, ErrorCode> {
198 self.media_task_runner.as_mut().ok_or(ErrorCode::BadState)
199 }
200
201 pub fn start(&mut self) -> Result<BoxFuture<'static, Result<(), Error>>, ErrorCode> {
205 if self.media_task_runner.is_none() {
206 return Err(ErrorCode::BadState);
207 };
208 let transport = self.endpoint.take_transport().ok_or(ErrorCode::BadState)?;
209 let _ = self.endpoint.start()?;
210 let mut task = match self.media_runner_ref()?.start(transport, None) {
211 Ok(media_task) => media_task,
212 Err(_e) => {
213 let _ = self.endpoint.suspend()?;
214 return Err(ErrorCode::BadState);
215 }
216 };
217 let finished = task.finished();
218 self.media_task = Some(task);
219 Ok(finished.err_into().boxed())
220 }
221
222 pub fn suspend(&mut self) -> Result<(), ErrorCode> {
224 self.endpoint.suspend()?;
225 let _ = self.media_task.take().ok_or(ErrorCode::BadState)?.stop();
226 Ok(())
227 }
228
229 fn stop_media_task(&mut self) {
230 if let Some(mut task) = self.media_task.take() {
231 let _ = task.stop();
233 }
234 self.media_task_runner = None;
235 self.peer_id = None;
236 }
237
238 pub fn release(
240 &mut self,
241 responder: avdtp::SimpleResponder,
242 peer: &avdtp::Peer,
243 ) -> avdtp::Result<()> {
244 self.stop_media_task();
245 self.endpoint.release(responder, peer)
246 }
247
248 pub fn abort(&mut self) {
249 self.stop_media_task();
250 self.endpoint.abort()
251 }
252
253 pub async fn initiate_abort(&mut self, peer: &avdtp::Peer) {
254 self.stop_media_task();
255 self.endpoint.initiate_abort(peer).await
256 }
257}
258
259fn find_codec_capability(capabilities: &[ServiceCapability]) -> Option<&ServiceCapability> {
260 capabilities.iter().find(|cap| cap.category() == ServiceCategory::MediaCodec)
261}
262
263#[derive(Clone, Debug)]
265struct SeidRangeFrom {
266 from: u8,
267}
268
269impl Default for SeidRangeFrom {
270 fn default() -> Self {
271 Self { from: 1 }
272 }
273}
274
275impl Iterator for SeidRangeFrom {
276 type Item = u8;
277
278 fn next(&mut self) -> Option<Self::Item> {
279 let res = self.from;
280 if self.from == 0x3E {
281 self.from = 0x01;
282 } else {
283 self.from += 1;
284 }
285 Some(res)
286 }
287}
288
289pub struct StreamsBuilder {
292 builders: Vec<Box<dyn MediaTaskBuilder>>,
293 seid_range: SeidRangeFrom,
294 node: inspect::Node,
295}
296
297impl Default for StreamsBuilder {
298 fn default() -> Self {
299 Self {
300 builders: Default::default(),
301 seid_range: SeidRangeFrom { from: Self::START_SEID },
302 node: Default::default(),
303 }
304 }
305}
306
307impl Clone for StreamsBuilder {
308 fn clone(&self) -> Self {
309 Self {
310 builders: self.builders.clone(),
311 node: Default::default(),
312 seid_range: self.seid_range.clone(),
313 }
314 }
315}
316
317impl StreamsBuilder {
318 const START_SEID: u8 = 8;
321
322 pub fn add_builder(&mut self, builder: impl MediaTaskBuilder + 'static) {
324 self.builders.push(Box::new(builder));
325 self.node.record_uint("builders", self.builders.len() as u64);
326 }
327
328 pub async fn peer_streams(
329 &self,
330 peer_id: &PeerId,
331 offload: Option<AudioOffloadExtProxy>,
332 ) -> Result<Streams, MediaTaskError> {
333 let mut streams = Streams::default();
334 let mut seid_range = self.seid_range.clone();
335 for builder in &self.builders {
336 let endpoint_type = builder.direction();
337 let supported_res = builder.supported_configs(peer_id, offload.clone()).await;
338 let Ok(supported) = supported_res else {
339 info!(e:? = supported_res.err().unwrap(); "Failed to get supported configs from builder, skipping");
340 continue;
341 };
342 let codec_caps = supported.iter().map(ServiceCapability::from);
343 for codec_cap in codec_caps {
344 let capabilities = match endpoint_type {
345 avdtp::EndpointType::Source => vec![
346 ServiceCapability::MediaTransport,
347 ServiceCapability::DelayReporting,
348 codec_cap,
349 ],
350 avdtp::EndpointType::Sink => {
351 vec![ServiceCapability::MediaTransport, codec_cap]
352 }
353 };
354 let endpoint = avdtp::StreamEndpoint::new(
355 seid_range.next().unwrap(),
356 avdtp::MediaType::Audio,
357 endpoint_type,
358 capabilities,
359 )?;
360 streams.insert(Stream::build(endpoint, builder.clone()));
361 }
362 }
363 Ok(streams)
364 }
365
366 pub async fn negotiation(
367 &self,
368 peer_id: &PeerId,
369 offload: Option<AudioOffloadExtProxy>,
370 preferred_direction: avdtp::EndpointType,
371 ) -> Result<CodecNegotiation, Error> {
372 let mut caps_available = Vec::new();
373 for builder in &self.builders {
374 caps_available.extend(
375 builder
376 .supported_configs(peer_id, offload.clone())
377 .await?
378 .iter()
379 .map(ServiceCapability::from),
380 );
381 }
382 Ok(CodecNegotiation::build(caps_available, preferred_direction)?)
383 }
384}
385
386impl Inspect for &mut StreamsBuilder {
387 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
388 self.node = parent.create_child(name.as_ref());
389 self.node.record_uint("builders", self.builders.len() as u64);
390 Ok(())
391 }
392}
393
394#[derive(Default)]
396pub struct Streams {
397 streams: HashMap<StreamEndpointId, Stream>,
398 inspect_node: fuchsia_inspect::Node,
399}
400
401impl fmt::Debug for Streams {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 f.debug_struct("Streams").field("streams", &self.streams).finish()
404 }
405}
406
407impl Streams {
408 pub fn as_new(&self) -> Self {
411 let streams =
412 self.streams.iter().map(|(id, stream)| (id.clone(), stream.as_new())).collect();
413 Self { streams, ..Default::default() }
414 }
415
416 pub fn is_empty(&self) -> bool {
418 self.streams.is_empty()
419 }
420
421 pub fn insert(&mut self, stream: Stream) {
424 let local_id = stream.endpoint().local_id().clone();
425 if self.streams.insert(local_id.clone(), stream).is_some() {
426 warn!("Replacing stream with local id {local_id}");
427 }
428 }
429
430 pub fn get(&self, id: &StreamEndpointId) -> Option<&Stream> {
432 self.streams.get(id)
433 }
434
435 pub fn get_mut(&mut self, id: &StreamEndpointId) -> Option<&mut Stream> {
437 self.streams.get_mut(id)
438 }
439
440 pub fn information(&self) -> Vec<avdtp::StreamInformation> {
442 self.streams.values().map(|x| x.endpoint().information()).collect()
443 }
444
445 pub fn open(&self) -> impl Iterator<Item = &Stream> {
447 self.streams.values().filter(|s| s.endpoint().state() == avdtp::StreamState::Open)
448 }
449
450 pub fn streaming(&self) -> impl Iterator<Item = &Stream> {
452 self.streams.values().filter(|s| s.endpoint().state() == avdtp::StreamState::Streaming)
453 }
454
455 pub fn compatible(&self, codec_config: MediaCodecConfig) -> impl Iterator<Item = &Stream> {
457 self.streams.values().filter(move |s| s.config_compatible(&codec_config))
458 }
459}
460
461impl Inspect for &mut Streams {
462 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
464 self.inspect_node = parent.create_child(name.as_ref());
465 for stream in self.streams.values_mut() {
466 stream.iattach(&self.inspect_node, inspect::unique_name("stream_"))?;
467 }
468 Ok(())
469 }
470}
471
472#[cfg(test)]
473pub(crate) mod tests {
474 use super::*;
475
476 use fuchsia_async as fasync;
477 use fuchsia_bluetooth::types::Channel;
478 use std::pin::pin;
479 use std::task::Poll;
480
481 use crate::media_task::tests::TestMediaTaskBuilder;
482 use crate::media_types::*;
483
484 pub(crate) fn sbc_mediacodec_capability() -> avdtp::ServiceCapability {
485 let sbc_codec_info = SbcCodecInfo::new(
486 SbcSamplingFrequency::FREQ48000HZ,
487 SbcChannelMode::MONO | SbcChannelMode::JOINT_STEREO,
488 SbcBlockCount::MANDATORY_SRC,
489 SbcSubBands::MANDATORY_SRC,
490 SbcAllocation::MANDATORY_SRC,
491 SbcCodecInfo::BITPOOL_MIN,
492 SbcCodecInfo::BITPOOL_MAX,
493 )
494 .expect("SBC codec info");
495
496 ServiceCapability::MediaCodec {
497 media_type: avdtp::MediaType::Audio,
498 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
499 codec_extra: sbc_codec_info.to_bytes().to_vec(),
500 }
501 }
502
503 pub(crate) fn aac_mediacodec_capability(bitrate: u32) -> avdtp::ServiceCapability {
504 let codec_info = AacCodecInfo::new(
505 AacObjectType::MANDATORY_SRC,
506 AacSamplingFrequency::FREQ48000HZ,
507 AacChannels::TWO,
508 true,
509 bitrate,
510 )
511 .expect("should work");
512 ServiceCapability::MediaCodec {
513 media_type: avdtp::MediaType::Audio,
514 codec_type: avdtp::MediaCodecType::AUDIO_AAC,
515 codec_extra: codec_info.to_bytes().to_vec(),
516 }
517 }
518
519 pub(crate) fn make_sbc_endpoint(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
520 StreamEndpoint::new(
521 seid,
522 avdtp::MediaType::Audio,
523 direction,
524 vec![avdtp::ServiceCapability::MediaTransport, sbc_mediacodec_capability()],
525 )
526 .expect("endpoint creation should succeed")
527 }
528
529 const LOW_BITRATE: u32 = 320_000;
530 const HIGH_BITRATE: u32 = 393_216;
531
532 pub(crate) fn make_aac_endpoint(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
533 StreamEndpoint::new(
534 seid,
535 avdtp::MediaType::Audio,
536 direction,
537 vec![avdtp::ServiceCapability::MediaTransport, aac_mediacodec_capability(LOW_BITRATE)],
538 )
539 .expect("endpoint creation should succeed")
540 }
541
542 fn make_stream(seid: u8, codec_type: avdtp::MediaCodecType) -> Stream {
543 let endpoint = match codec_type {
544 avdtp::MediaCodecType::AUDIO_SBC => {
545 make_sbc_endpoint(seid, avdtp::EndpointType::Source)
546 }
547 avdtp::MediaCodecType::AUDIO_AAC => {
548 make_aac_endpoint(seid, avdtp::EndpointType::Source)
549 }
550 _ => panic!("Unsupported codec_type"),
551 };
552 Stream::build(endpoint, TestMediaTaskBuilder::new().builder())
553 }
554
555 #[fuchsia::test]
556 fn streams_basic_functionality() {
557 let mut streams = Streams::default();
558
559 streams.insert(make_stream(1, avdtp::MediaCodecType::AUDIO_SBC));
560 streams.insert(make_stream(6, avdtp::MediaCodecType::AUDIO_AAC));
561
562 let first_id = 1_u8.try_into().expect("good id");
563 let missing_id = 5_u8.try_into().expect("good id");
564
565 assert!(streams.get(&first_id).is_some());
566 assert!(streams.get(&missing_id).is_none());
567
568 assert!(streams.get_mut(&first_id).is_some());
569 assert!(streams.get_mut(&missing_id).is_none());
570
571 let expected_info = vec![
572 make_sbc_endpoint(1, avdtp::EndpointType::Source).information(),
573 make_aac_endpoint(6, avdtp::EndpointType::Source).information(),
574 ];
575
576 let infos = streams.information();
577
578 assert_eq!(expected_info.len(), infos.len());
579
580 if infos[0].id() == &first_id {
581 assert_eq!(expected_info[0], infos[0]);
582 assert_eq!(expected_info[1], infos[1]);
583 } else {
584 assert_eq!(expected_info[0], infos[1]);
585 assert_eq!(expected_info[1], infos[0]);
586 }
587 }
588
589 #[fuchsia::test]
590 fn streams_filters_compatible_codecs() {
591 let mut streams = Streams::default();
592 streams.insert(make_stream(1, avdtp::MediaCodecType::AUDIO_SBC));
593 streams.insert(make_stream(6, avdtp::MediaCodecType::AUDIO_AAC));
594
595 let config_high_bitrate_aac =
597 MediaCodecConfig::try_from(&aac_mediacodec_capability(HIGH_BITRATE)).unwrap();
598
599 let compatible: Vec<_> = streams.compatible(config_high_bitrate_aac).collect();
600 assert_eq!(compatible.len(), 1);
601 let codec_capability = compatible[0]
602 .endpoint()
603 .capabilities()
604 .into_iter()
605 .find(|x| x.category() == avdtp::ServiceCategory::MediaCodec)
606 .expect("should have a codec");
607 assert_eq!(
608 MediaCodecConfig::try_from(codec_capability).unwrap().codec_type(),
609 &avdtp::MediaCodecType::AUDIO_AAC
610 );
611 }
612
613 #[fuchsia::test]
614 fn rejects_unsupported_configurations() {
615 let _exec = fasync::TestExecutor::new();
617 let mut builder = TestMediaTaskBuilder::new_reconfigurable();
618 let mut stream =
619 Stream::build(make_sbc_endpoint(1, avdtp::EndpointType::Source), builder.builder());
620
621 let unsupported_sbc_codec_info = SbcCodecInfo::new(
623 SbcSamplingFrequency::FREQ44100HZ,
624 SbcChannelMode::JOINT_STEREO,
625 SbcBlockCount::SIXTEEN,
626 SbcSubBands::EIGHT,
627 SbcAllocation::LOUDNESS,
628 53,
629 53,
630 )
631 .expect("SBC codec info");
632
633 let unsupported_caps = vec![ServiceCapability::MediaCodec {
634 media_type: avdtp::MediaType::Audio,
635 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
636 codec_extra: unsupported_sbc_codec_info.to_bytes().to_vec(),
637 }];
638
639 let peer_id = PeerId(1);
640 let stream_id = 1.try_into().expect("StreamEndpointId");
641 let res = stream.configure(&peer_id, &stream_id, unsupported_caps.clone());
642 assert!(res.is_err());
643 assert_eq!(
644 res.err(),
645 Some((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
646 );
647
648 assert_eq!(
649 stream.reconfigure(unsupported_caps.clone()),
650 Err((ServiceCategory::None, ErrorCode::BadState))
651 );
652
653 let supported_sbc_codec_info = SbcCodecInfo::new(
654 SbcSamplingFrequency::FREQ48000HZ,
655 SbcChannelMode::JOINT_STEREO,
656 SbcBlockCount::SIXTEEN,
657 SbcSubBands::EIGHT,
658 SbcAllocation::LOUDNESS,
659 53,
660 53,
661 )
662 .expect("SBC codec info");
663
664 let sbc_codec_cap = ServiceCapability::MediaCodec {
665 media_type: avdtp::MediaType::Audio,
666 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
667 codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
668 };
669
670 let supported_caps = vec![ServiceCapability::MediaTransport, sbc_codec_cap.clone()];
671
672 let res = stream.configure(&peer_id, &stream_id, supported_caps.clone());
673 assert!(res.is_ok());
674
675 assert!(stream.endpoint_mut().establish().is_ok());
677 let (_remote, transport) = Channel::create();
678 match stream.endpoint_mut().receive_channel(transport) {
679 Ok(false) => {}
680 Ok(true) => panic!("Only should be expecting one channel"),
681 Err(e) => panic!("Expected channel to be accepted, got {:?}", e),
682 };
683
684 assert_eq!(
685 stream.reconfigure(unsupported_caps.clone()),
686 Err((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
687 );
688
689 let new_codec_caps = vec![ServiceCapability::MediaCodec {
690 media_type: avdtp::MediaType::Audio,
691 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
692 codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
693 }];
694
695 assert!(stream.reconfigure(new_codec_caps.clone()).is_ok());
696
697 let _ = stream.start().expect("stream should start ok");
699 let task = builder.expect_task();
700 assert_eq!(task.codec_config, MediaCodecConfig::try_from(&new_codec_caps[0]).unwrap());
701 }
702
703 #[fuchsia::test]
704 fn reconfigure_runner_fails() {
705 let _exec = fasync::TestExecutor::new();
707 let mut builder = TestMediaTaskBuilder::new();
708 let mut stream =
709 Stream::build(make_sbc_endpoint(1, avdtp::EndpointType::Source), builder.builder());
710
711 let supported_sbc_codec_info = SbcCodecInfo::new(
712 SbcSamplingFrequency::FREQ48000HZ,
713 SbcChannelMode::JOINT_STEREO,
714 SbcBlockCount::SIXTEEN,
715 SbcSubBands::EIGHT,
716 SbcAllocation::LOUDNESS,
717 53,
718 53,
719 )
720 .expect("SBC codec info");
721
722 let orig_codec_cap = ServiceCapability::MediaCodec {
723 media_type: avdtp::MediaType::Audio,
724 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
725 codec_extra: supported_sbc_codec_info.to_bytes().to_vec(),
726 };
727
728 let supported_caps = vec![ServiceCapability::MediaTransport, orig_codec_cap.clone()];
729
730 let res = stream.configure(&PeerId(1), &(1.try_into().unwrap()), supported_caps.clone());
731 assert!(res.is_ok());
732
733 assert!(stream.endpoint_mut().establish().is_ok());
735 let (_remote, transport) = Channel::create();
736 match stream.endpoint_mut().receive_channel(transport) {
737 Ok(false) => {}
738 Ok(true) => panic!("Only should be expecting one channel"),
739 Err(e) => panic!("Expected channel to be accepted, got {:?}", e),
740 };
741
742 let _ = stream.start().expect("stream should start ok");
744 let task = builder.expect_task();
745 assert_eq!(task.codec_config, MediaCodecConfig::try_from(&orig_codec_cap).unwrap());
746 stream.suspend().expect("stream should suspend ok");
747
748 let mono_sbc_codec_info = SbcCodecInfo::new(
750 SbcSamplingFrequency::FREQ48000HZ,
751 SbcChannelMode::MONO,
752 SbcBlockCount::SIXTEEN,
753 SbcSubBands::EIGHT,
754 SbcAllocation::LOUDNESS,
755 53,
756 53,
757 )
758 .expect("SBC codec info");
759
760 let new_codec_caps = vec![ServiceCapability::MediaCodec {
761 media_type: avdtp::MediaType::Audio,
762 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
763 codec_extra: mono_sbc_codec_info.to_bytes().to_vec(),
764 }];
765
766 assert_eq!(
768 stream.reconfigure(new_codec_caps.clone()),
769 Err((ServiceCategory::MediaCodec, ErrorCode::UnsupportedConfiguration))
770 );
771
772 let _ = stream.start().expect("stream should start ok");
774 let task = builder.expect_task();
775 assert_eq!(task.codec_config, MediaCodecConfig::try_from(&orig_codec_cap).unwrap());
776 stream.suspend().expect("stream should suspend ok")
777 }
778
779 #[fuchsia::test]
780 fn suspend_stops_media_task() {
781 let mut exec = fasync::TestExecutor::new();
782
783 let mut task_builder = TestMediaTaskBuilder::new();
784 let mut stream = Stream::build(
785 make_sbc_endpoint(1, avdtp::EndpointType::Source),
786 task_builder.builder(),
787 );
788 let next_task_fut = task_builder.next_task();
789 let remote_id = 1_u8.try_into().expect("good id");
790
791 let sbc_codec_cap = sbc_mediacodec_capability();
792 let expected_codec_config =
793 MediaCodecConfig::try_from(&sbc_codec_cap).expect("codec config");
794
795 assert!(stream.configure(&PeerId(1), &remote_id, vec![]).is_err());
796 assert!(stream.configure(&PeerId(1), &remote_id, vec![sbc_codec_cap]).is_ok());
797
798 stream.endpoint_mut().establish().expect("establishment should start okay");
799 let (_remote, transport) = Channel::create();
800 let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
801
802 assert!(stream.start().is_ok());
803
804 let task = {
806 let mut next_task_fut = pin!(next_task_fut);
807 match exec.run_until_stalled(&mut next_task_fut) {
808 Poll::Ready(Some(task)) => task,
809 x => panic!("Expected next task to be sent after start, got {:?}", x),
810 }
811 };
812
813 assert_eq!(task.peer_id, PeerId(1));
814 assert_eq!(task.codec_config, expected_codec_config);
815
816 assert!(task.is_started());
817 assert!(stream.suspend().is_ok());
818 assert!(!task.is_started());
819 assert!(stream.start().is_ok());
820
821 let next_task_fut = task_builder.next_task();
822 let task = {
824 let mut next_task_fut = pin!(next_task_fut);
825 match exec.run_until_stalled(&mut next_task_fut) {
826 Poll::Ready(Some(task)) => task,
827 x => panic!("Expected next task to be sent after start, got {:?}", x),
828 }
829 };
830
831 assert!(task.is_started());
832 }
833
834 #[fuchsia::test]
835 fn media_task_ending_ends_future() {
836 let mut exec = fasync::TestExecutor::new();
837
838 let mut task_builder = TestMediaTaskBuilder::new();
839 let mut stream = Stream::build(
840 make_sbc_endpoint(1, avdtp::EndpointType::Source),
841 task_builder.builder(),
842 );
843 let next_task_fut = task_builder.next_task();
844 let peer_id = PeerId(1);
845 let remote_id = 1_u8.try_into().expect("good id");
846
847 let sbc_codec_cap = sbc_mediacodec_capability();
848 let expected_codec_config =
849 MediaCodecConfig::try_from(&sbc_codec_cap).expect("codec config");
850
851 assert!(stream.configure(&peer_id, &remote_id, vec![]).is_err());
852 assert!(stream.configure(&peer_id, &remote_id, vec![sbc_codec_cap]).is_ok());
853
854 stream.endpoint_mut().establish().expect("establishment should start okay");
855 let (_remote, transport) = Channel::create();
856 let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
857
858 let stream_finish_fut = stream.start().expect("start to succeed with a future");
859 let mut stream_finish_fut = pin!(stream_finish_fut);
860
861 let task = {
862 let mut next_task_fut = pin!(next_task_fut);
863 match exec.run_until_stalled(&mut next_task_fut) {
864 Poll::Ready(Some(task)) => task,
865 x => panic!("Expected next task to be sent after start, got {:?}", x),
866 }
867 };
868
869 assert_eq!(task.peer_id, PeerId(1));
870 assert_eq!(task.codec_config, expected_codec_config);
871
872 assert!(task.is_started());
874
875 assert!(exec.run_until_stalled(&mut stream_finish_fut).is_pending());
876
877 task.end_prematurely(Some(Ok(())));
878 assert!(!task.is_started());
879
880 match exec.run_until_stalled(&mut stream_finish_fut) {
882 Poll::Ready(Ok(())) => {}
883 x => panic!("Expected to get ready Ok from finish future, but got {x:?}"),
884 };
885
886 assert!(stream.suspend().is_ok());
888
889 let result_fut = stream.start().expect("start to succeed with a future");
891
892 let next_task_fut = task_builder.next_task();
893 let mut next_task_fut = pin!(next_task_fut);
894 let task = match exec.run_until_stalled(&mut next_task_fut) {
895 Poll::Ready(Some(task)) => task,
896 x => panic!("Expected next task to be sent after restart, got {x:?}"),
897 };
898
899 assert!(task.is_started());
900
901 drop(result_fut);
903
904 assert!(task.is_started());
905 }
906
907 #[fuchsia::test]
908 fn set_delay_correct_results_transmits_to_task() {
909 let mut _exec = fasync::TestExecutor::new();
910
911 let mut task_builder = TestMediaTaskBuilder::new_delayable();
912 let mut stream = Stream::build(
913 make_sbc_endpoint(1, avdtp::EndpointType::Source),
914 task_builder.builder(),
915 );
916 let peer_id = PeerId(1);
917 let remote_id = 1_u8.try_into().expect("good id");
918
919 let sbc_codec_cap = sbc_mediacodec_capability();
920
921 let code = stream
922 .set_delay(std::time::Duration::ZERO)
923 .expect_err("before configure, can't set a delay");
924 assert_eq!(ErrorCode::BadState, code);
925
926 assert!(stream.configure(&peer_id, &remote_id, vec![]).is_err());
927 assert!(stream.configure(&peer_id, &remote_id, vec![sbc_codec_cap]).is_ok());
928
929 let delay_set = std::time::Duration::from_nanos(0xfeed);
930
931 stream.set_delay(delay_set.clone()).expect("after configure, delay is fine");
932
933 stream.endpoint_mut().establish().expect("establishment should start okay");
934 let (_remote, transport) = Channel::create();
935 let _ = stream.endpoint_mut().receive_channel(transport).expect("ready for a channel");
936 let _stream_finish_fut = stream.start().expect("start to succeed with a future");
937
938 let media_task = task_builder.expect_task();
939 assert_eq!(delay_set, media_task.delay);
940 }
941}