1use anyhow::{Error, format_err};
6use fidl_fuchsia_bluetooth::ChannelParameters;
7use fidl_fuchsia_bluetooth_bredr::{self as bredr, ProfileDescriptor, ProfileProxy};
8use fuchsia_bluetooth::detachable_map::{DetachableMap, DetachableWeak};
9use fuchsia_bluetooth::inspect::DebugExt;
10use fuchsia_bluetooth::types::{Channel, PeerId};
11use fuchsia_inspect::{self as inspect, NumericProperty, Property};
12use fuchsia_inspect_derive::{AttachError, Inspect};
13use fuchsia_sync::Mutex;
14use futures::channel::{mpsc, oneshot};
15use futures::stream::{Stream, StreamExt};
16use futures::task::{Context, Poll};
17use futures::{Future, FutureExt, TryFutureExt};
18use log::{info, warn};
19use std::collections::hash_map::Entry;
20use std::collections::{HashMap, HashSet};
21use std::pin::Pin;
22use std::sync::Arc;
23use {bt_avdtp as avdtp, fuchsia_async as fasync};
24
25use crate::codec::CodecNegotiation;
26use crate::peer::Peer;
27use crate::permits::Permits;
28use crate::stream::StreamsBuilder;
29
30struct PeerStats {
33 id: PeerId,
34 inspect_node: inspect::Node,
35 connection_count: inspect::UintProperty,
37}
38
39impl PeerStats {
40 fn new(id: PeerId) -> Self {
41 Self { id, inspect_node: Default::default(), connection_count: Default::default() }
42 }
43
44 fn set_descriptor(&mut self, descriptor: &ProfileDescriptor) {
45 self.inspect_node.record_string("descriptor", descriptor.debug());
46 }
47
48 fn record_connected(&mut self) {
49 let _ = self.connection_count.add(1);
50 }
51}
52
53impl Inspect for &mut PeerStats {
54 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
55 self.inspect_node = parent.create_child(name.as_ref());
56 self.inspect_node.record_string("id", self.id.to_string());
57 self.connection_count = self.inspect_node.create_uint("connection_count", 0);
58 Ok(())
59 }
60}
61
62#[derive(Default)]
63struct DiscoveredPeers {
64 descriptors: HashMap<PeerId, (ProfileDescriptor, HashSet<avdtp::EndpointType>)>,
68 stats: HashMap<PeerId, PeerStats>,
70 inspect_node: inspect::Node,
72}
73
74impl DiscoveredPeers {
75 fn insert(
76 &mut self,
77 id: PeerId,
78 descriptor: ProfileDescriptor,
79 directions: HashSet<avdtp::EndpointType>,
80 ) {
81 self.stats
82 .entry(id)
83 .or_insert_with(|| {
84 let mut new_stats = PeerStats::new(id);
85 let _ = new_stats.iattach(&self.inspect_node, inspect::unique_name("peer_"));
86 new_stats
87 })
88 .set_descriptor(&descriptor);
89
90 match self.descriptors.entry(id) {
91 Entry::Occupied(mut entry) => {
92 entry.get_mut().0 = descriptor;
93 entry.get_mut().1.extend(&directions);
94 }
95 Entry::Vacant(entry) => {
96 let _ = entry.insert((descriptor, directions));
97 }
98 };
99 }
100
101 fn connected(&mut self, id: PeerId) {
102 if let Some(stats) = self.stats.get_mut(&id) {
103 stats.record_connected();
104 }
105 }
106
107 fn get(&self, id: &PeerId) -> Option<(ProfileDescriptor, Option<avdtp::EndpointType>)> {
109 self.descriptors.get(id).map(|(desc, dirs)| (desc.clone(), find_preferred_direction(dirs)))
110 }
111}
112
113impl Inspect for &mut DiscoveredPeers {
114 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
115 self.inspect_node = parent.create_child(name.as_ref());
116 Ok(())
117 }
118}
119
120fn find_preferred_direction(
123 directions: &HashSet<avdtp::EndpointType>,
124) -> Option<avdtp::EndpointType> {
125 if directions.len() == 1 {
126 directions.iter().next().cloned()
127 } else {
128 None
131 }
132}
133
134async fn connect_peer(
136 proxy: ProfileProxy,
137 id: PeerId,
138 channel_params: ChannelParameters,
139) -> Result<Channel, Error> {
140 info!(id:%; "Connecting to peer");
141 let connect_fut = proxy.connect(
142 &id.into(),
143 &bredr::ConnectParameters::L2cap(bredr::L2capParameters {
144 psm: Some(bredr::PSM_AVDTP),
145 parameters: Some(channel_params),
146 ..Default::default()
147 }),
148 );
149 let channel = match connect_fut.await {
150 Err(e) => {
151 warn!(id:%, e:?; "FIDL error on connect");
152 return Err(e.into());
153 }
154 Ok(Err(e)) => return Err(format_err!("Bluetooth connect error: {e:?}")),
155 Ok(Ok(channel)) => channel,
156 };
157
158 let channel = channel
159 .try_into()
160 .map_err(|e| format_err!("Couldn't convert FIDL to BT channel: {e:?}"))?;
161 Ok(channel)
162}
163
164pub struct ConnectedPeers {
167 connected: DetachableMap<PeerId, Peer>,
169 connection_attempts: Mutex<HashMap<PeerId, fasync::Task<()>>>,
172 discovered: Mutex<DiscoveredPeers>,
174 streams_builder: StreamsBuilder,
176 permits: Permits,
178 profile: ProfileProxy,
180 metrics: bt_metrics::MetricsLogger,
182 inspect: inspect::Node,
184 inspect_peer_direction: inspect::StringProperty,
186 connected_peer_senders: Mutex<Vec<mpsc::Sender<DetachableWeak<PeerId, Peer>>>>,
188 start_stream_tasks: Mutex<HashMap<PeerId, fasync::Task<()>>>,
191 preferred_peer_direction: Mutex<avdtp::EndpointType>,
194}
195
196impl ConnectedPeers {
197 pub fn new(
198 streams_builder: StreamsBuilder,
199 permits: Permits,
200 profile: ProfileProxy,
201 metrics: bt_metrics::MetricsLogger,
202 ) -> Self {
203 Self {
204 connected: DetachableMap::new(),
205 connection_attempts: Mutex::new(HashMap::new()),
206 discovered: Default::default(),
207 streams_builder,
208 profile,
209 permits,
210 inspect: inspect::Node::default(),
211 inspect_peer_direction: inspect::StringProperty::default(),
212 metrics,
213 connected_peer_senders: Default::default(),
214 start_stream_tasks: Default::default(),
215 preferred_peer_direction: Mutex::new(avdtp::EndpointType::Sink),
216 }
217 }
218
219 pub(crate) fn get_weak(&self, id: &PeerId) -> Option<DetachableWeak<PeerId, Peer>> {
220 self.connected.get(id)
221 }
222
223 pub(crate) fn get(&self, id: &PeerId) -> Option<Arc<Peer>> {
224 self.get_weak(id).and_then(|p| p.upgrade())
225 }
226
227 pub fn is_connected(&self, id: &PeerId) -> bool {
228 self.connected.contains_key(id)
229 }
230
231 async fn start_streaming(
236 peer: &DetachableWeak<PeerId, Peer>,
237 negotiation: CodecNegotiation,
238 ) -> Result<(), anyhow::Error> {
239 let remote_streams = {
240 let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
241 if strong.streaming_active() {
242 return Ok(());
243 }
244 strong.collect_capabilities()
245 }
246 .await?;
247
248 let (negotiated, remote_seid) = negotiation
249 .select(&remote_streams)
250 .ok_or_else(|| format_err!("No compatible stream found"))?;
251
252 let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
253 if strong.streaming_active() {
254 let peer_id = peer.key();
255 info!(peer_id:%; "Not starting streaming, it's already started");
256 return Ok(());
257 }
258 strong.stream_start(remote_seid, negotiated).await.map_err(Into::into)
259 }
260
261 pub fn found(
262 &self,
263 id: PeerId,
264 desc: ProfileDescriptor,
265 preferred_directions: HashSet<avdtp::EndpointType>,
266 ) {
267 self.discovered.lock().insert(id, desc.clone(), preferred_directions);
268 if let Some(peer) = self.get(&id) {
269 let _ = peer.set_descriptor(desc);
270 }
271 }
272
273 pub fn set_preferred_peer_direction(&self, direction: avdtp::EndpointType) {
274 *self.preferred_peer_direction.lock() = direction;
275 self.inspect_peer_direction.set(&format!("{direction:?}"));
276 }
277
278 pub fn preferred_peer_direction(&self) -> avdtp::EndpointType {
279 *self.preferred_peer_direction.lock()
280 }
281
282 pub fn try_connect(
283 &self,
284 id: PeerId,
285 channel_params: ChannelParameters,
286 ) -> impl Future<Output = Result<Option<Channel>, Error>> {
287 let proxy = self.profile.clone();
288 let connected = self.is_connected(&id);
289 let (sender, recv) = oneshot::channel();
290 let recv =
291 recv.map_ok_or_else(|_e| Err(format_err!("Connection task canceled")), Into::into);
292 if connected {
293 if let Err(e) = sender.send(Ok(None)) {
294 warn!(id:%, e:?; "Failed to notify already-connected");
295 }
296 return recv;
297 }
298 let mut attempts = self.connection_attempts.lock();
299 if let Some(previous_connect_task) = attempts.remove(&id) {
300 if previous_connect_task.now_or_never().is_none() {
302 warn!(id:%; "Cancelling previous connect attempt");
303 }
304 }
305 let connect_task = fasync::Task::spawn(async move {
306 if let Err(e) = sender.send(connect_peer(proxy, id, channel_params).await.map(Some)) {
307 warn!(id:%, e:?; "Failed to send channel connect result");
308 }
309 });
310 let _ = attempts.insert(id, connect_task);
311 recv
312 }
313
314 pub async fn connected(
319 &self,
320 id: PeerId,
321 channel: Channel,
322 initiator_delay: Option<zx::MonotonicDuration>,
323 ) -> Result<DetachableWeak<PeerId, Peer>, Error> {
324 if let Some(weak) = self.get_weak(&id) {
325 let peer =
326 weak.upgrade().ok_or_else(|| format_err!("Disconnected connecting transport"))?;
327 if let Err(e) = peer.receive_channel(channel) {
328 warn!(id:%, e:%; "failed to connect channel");
329 return Err(e.into());
330 }
331 return Ok(weak);
332 }
333
334 let entry = self.connected.lazy_entry(&id);
335
336 info!(id:%; "peer connected");
337 let audio_offload = channel.audio_offload();
338 let avdtp_peer = avdtp::Peer::new(channel);
339
340 let mut peer = Peer::create(
341 id,
342 avdtp_peer,
343 self.streams_builder.peer_streams(&id, audio_offload.clone()).await?,
344 Some(self.permits.clone()),
345 self.profile.clone(),
346 self.metrics.clone(),
347 );
348
349 self.discovered.lock().connected(id);
350
351 let peer_preferred_direction = if let Some((desc, dir)) = self.discovered.lock().get(&id) {
352 let _ = peer.set_descriptor(desc);
353 dir
354 } else {
355 None
356 };
357
358 if let Err(e) = peer.iattach(&self.inspect, inspect::unique_name("peer_")) {
359 warn!(id:%, e:?; "Couldn't attach inspect");
360 }
361
362 let closed_fut = peer.closed();
363 let peer = match entry.try_insert(peer) {
364 Err(_peer) => {
365 warn!(id:%; "Peer connected while we were setting up");
366 return self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"));
367 }
368 Ok(weak_peer) => weak_peer,
369 };
370
371 if let Some(delay) = initiator_delay {
372 let peer = peer.clone();
373 let peer_id = peer.key().clone();
374
375 let negotiation = self
378 .streams_builder
379 .negotiation(
380 &id,
381 audio_offload,
382 peer_preferred_direction.unwrap_or_else(|| self.preferred_peer_direction()),
383 )
384 .await?;
385 let start_stream_task = fuchsia_async::Task::local(async move {
386 let delay_sec = delay.into_millis() as f64 / 1000.0;
387 info!(id:% = peer.key(); "dwelling {delay_sec}s for peer initiation");
388 fasync::Timer::new(fasync::MonotonicInstant::after(delay)).await;
389
390 if let Err(e) = ConnectedPeers::start_streaming(&peer, negotiation).await {
391 info!(id:% = peer.key(), e:?; "Peer start streaming failed");
392 peer.detach();
393 }
394 });
395 if self.start_stream_tasks.lock().insert(peer_id, start_stream_task).is_some() {
396 info!(peer_id:%; "Replacing previous start stream dwell");
397 }
398 }
399
400 fasync::Task::local(async move {
402 closed_fut.await;
403 peer.detach();
404 })
405 .detach();
406
407 let peer = self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"))?;
408 self.notify_connected(&peer);
409 Ok(peer)
410 }
411
412 fn notify_connected(&self, peer: &DetachableWeak<PeerId, Peer>) {
414 let mut senders = self.connected_peer_senders.lock();
415 senders.retain_mut(|sender| sender.try_send(peer.clone()).is_ok());
416 }
417
418 pub fn connected_stream(&self) -> PeerConnections {
420 let (sender, receiver) = mpsc::channel(0);
421 self.connected_peer_senders.lock().push(sender);
422 PeerConnections { stream: receiver }
423 }
424}
425
426impl Inspect for &mut ConnectedPeers {
427 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
428 self.inspect = parent.create_child(name.as_ref());
429 let peer_dir_str = format!("{:?}", self.preferred_peer_direction());
430 self.inspect_peer_direction =
431 self.inspect.create_string("preferred_peer_direction", peer_dir_str);
432 self.streams_builder.iattach(&self.inspect, "streams_builder")?;
433 self.discovered.lock().iattach(&self.inspect, "discovered")
434 }
435}
436
437pub struct PeerConnections {
440 stream: mpsc::Receiver<DetachableWeak<PeerId, Peer>>,
441}
442
443impl Stream for PeerConnections {
444 type Item = DetachableWeak<PeerId, Peer>;
445
446 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
447 self.stream.poll_next_unpin(cx)
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 use async_utils::PollExt;
456 use bt_avdtp::{Request, ServiceCapability};
457 use diagnostics_assertions::assert_data_tree;
458 use fidl::endpoints::create_proxy_and_stream;
459 use fidl_fuchsia_bluetooth_bredr::{
460 AudioOffloadExtProxy, ProfileMarker, ProfileRequestStream, ServiceClassProfileIdentifier,
461 };
462 use futures::future::BoxFuture;
463 use std::pin::pin;
464
465 use crate::codec::MediaCodecConfig;
466 use crate::media_task::{MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
467 use crate::media_types::*;
468
469 fn run_to_stalled(exec: &mut fasync::TestExecutor) {
470 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
471 }
472
473 fn exercise_avdtp(exec: &mut fasync::TestExecutor, remote: Channel, peer: &Peer) {
474 let remote_avdtp = avdtp::Peer::new(remote);
475 let mut remote_requests = remote_avdtp.take_request_stream();
476
477 let avdtp = peer.avdtp();
479 let discover_fut = avdtp.discover();
480
481 let mut discover_fut = pin!(discover_fut);
482
483 assert!(exec.run_until_stalled(&mut discover_fut).is_pending());
484
485 let responder = match exec.run_until_stalled(&mut remote_requests.next()) {
486 Poll::Ready(Some(Ok(Request::Discover { responder }))) => responder,
487 x => panic!("Expected a Ready Discovery request but got {:?}", x),
488 };
489
490 let endpoint_id = avdtp::StreamEndpointId::try_from(1).expect("endpointid creation");
491
492 let information = avdtp::StreamInformation::new(
493 endpoint_id,
494 false,
495 avdtp::MediaType::Audio,
496 avdtp::EndpointType::Source,
497 );
498
499 responder.send(&[information]).expect("Sending response should have worked");
500
501 let _stream_infos = match exec.run_until_stalled(&mut discover_fut) {
502 Poll::Ready(Ok(infos)) => infos,
503 x => panic!("Expected a Ready response but got {:?}", x),
504 };
505 }
506
507 fn setup_connected_peer_test()
508 -> (fasync::TestExecutor, PeerId, ConnectedPeers, ProfileRequestStream) {
509 let exec = fasync::TestExecutor::new();
510 let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
511 let id = PeerId(1);
512
513 let peers = ConnectedPeers::new(
514 StreamsBuilder::default(),
515 Permits::new(1),
516 proxy,
517 bt_metrics::MetricsLogger::default(),
518 );
519
520 (exec, id, peers, stream)
521 }
522
523 #[fuchsia::test]
524 fn connect_creates_peer() {
525 let (mut exec, id, peers, _stream) = setup_connected_peer_test();
526
527 let (remote, channel) = Channel::create();
528
529 let peer = exec
530 .run_singlethreaded(peers.connected(id, channel, None))
531 .expect("peer should connect");
532 let peer = peer.upgrade().expect("peer should be connected");
533
534 exercise_avdtp(&mut exec, remote, &peer);
535 }
536
537 #[fuchsia::test]
538 fn connect_notifies_streams() {
539 let (mut exec, id, peers, _stream) = setup_connected_peer_test();
540
541 let (remote, channel) = Channel::create();
542
543 let mut peer_stream = peers.connected_stream();
544 let mut peer_stream_two = peers.connected_stream();
545
546 let peer = exec
547 .run_singlethreaded(peers.connected(id, channel, None))
548 .expect("peer should connect");
549 let peer = peer.upgrade().expect("peer should be connected");
550
551 let weak = exec.run_singlethreaded(peer_stream.next()).expect("peer stream to produce");
553 assert_eq!(weak.key(), &id);
554 let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
555 assert_eq!(weak.key(), &id);
556
557 exercise_avdtp(&mut exec, remote, &peer);
558
559 drop(peer_stream);
561
562 let id2 = PeerId(2);
563 let (remote2, channel2) = Channel::create();
564 let peer2 = exec
565 .run_singlethreaded(peers.connected(id2, channel2, None))
566 .expect("peer should connect");
567 let peer2 = peer2.upgrade().expect("peer two should be connected");
568
569 let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
570 assert_eq!(weak.key(), &id2);
571
572 exercise_avdtp(&mut exec, remote2, &peer2);
573 }
574
575 #[fuchsia::test]
576 fn find_preferred_direction_returns_correct_endpoints() {
577 let empty = HashSet::new();
578 assert_eq!(find_preferred_direction(&empty), None);
579
580 let sink_only = HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter());
581 assert_eq!(find_preferred_direction(&sink_only), Some(avdtp::EndpointType::Sink));
582
583 let source_only = HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter());
584 assert_eq!(find_preferred_direction(&source_only), Some(avdtp::EndpointType::Source));
585
586 let both = HashSet::from_iter(
587 vec![avdtp::EndpointType::Sink, avdtp::EndpointType::Source].into_iter(),
588 );
589 assert_eq!(find_preferred_direction(&both), None);
590 }
591
592 const AAC_SEID: u8 = 8;
594 const SBC_SINK_SEID: u8 = 9;
596 const SBC_SOURCE_SEID: u8 = 10;
598
599 fn aac_sink_codec() -> avdtp::ServiceCapability {
600 AacCodecInfo::new(
601 AacObjectType::MANDATORY_SNK,
602 AacSamplingFrequency::MANDATORY_SNK,
603 AacChannels::MANDATORY_SNK,
604 true,
605 0, )
607 .unwrap()
608 .into()
609 }
610
611 fn sbc_sink_codec() -> avdtp::ServiceCapability {
612 SbcCodecInfo::new(
613 SbcSamplingFrequency::MANDATORY_SNK,
614 SbcChannelMode::MANDATORY_SNK,
615 SbcBlockCount::MANDATORY_SNK,
616 SbcSubBands::MANDATORY_SNK,
617 SbcAllocation::MANDATORY_SNK,
618 SbcCodecInfo::BITPOOL_MIN,
619 SbcCodecInfo::BITPOOL_MAX,
620 )
621 .unwrap()
622 .into()
623 }
624
625 fn sbc_source_codec() -> avdtp::ServiceCapability {
626 SbcCodecInfo::new(
627 SbcSamplingFrequency::FREQ48000HZ,
628 SbcChannelMode::JOINT_STEREO,
629 SbcBlockCount::MANDATORY_SRC,
630 SbcSubBands::MANDATORY_SRC,
631 SbcAllocation::MANDATORY_SRC,
632 SbcCodecInfo::BITPOOL_MIN,
633 SbcCodecInfo::BITPOOL_MAX,
634 )
635 .unwrap()
636 .into()
637 }
638
639 #[derive(Clone)]
640 struct FakeBuilder {
641 capability: avdtp::ServiceCapability,
642 direction: avdtp::EndpointType,
643 }
644
645 impl MediaTaskBuilder for FakeBuilder {
646 fn configure(
647 &self,
648 _peer_id: &PeerId,
649 codec_config: &MediaCodecConfig,
650 ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
651 if self.capability.codec_type() == Some(codec_config.codec_type()) {
652 return Ok(Box::new(FakeRunner {}));
653 }
654 Err(MediaTaskError::Other(String::from("Unsupported configuring")))
655 }
656
657 fn direction(&self) -> bt_avdtp::EndpointType {
658 self.direction
659 }
660
661 fn supported_configs(
662 &self,
663 _peer_id: &PeerId,
664 _offload: Option<AudioOffloadExtProxy>,
665 ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
666 futures::future::ready(Ok(vec![(&self.capability).try_into().unwrap()])).boxed()
667 }
668 }
669
670 struct FakeRunner {}
671
672 impl MediaTaskRunner for FakeRunner {
673 fn start(
674 &mut self,
675 _stream: avdtp::MediaStream,
676 _offload: Option<AudioOffloadExtProxy>,
677 ) -> Result<Box<dyn crate::media_task::MediaTask>, MediaTaskError> {
678 Err(MediaTaskError::Other(String::from("unimplemented starting")))
679 }
680 }
681
682 fn setup_negotiation_test() -> (
686 fasync::TestExecutor,
687 ConnectedPeers,
688 ProfileRequestStream,
689 ServiceCapability,
690 ServiceCapability,
691 ) {
692 let exec = fasync::TestExecutor::new_with_fake_time();
693 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000));
694 let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
695
696 let aac_sink_codec = aac_sink_codec();
697 let sbc_sink_codec = sbc_sink_codec();
698 let aac_sink_builder = FakeBuilder {
699 capability: aac_sink_codec.clone(),
700 direction: avdtp::EndpointType::Sink,
701 };
702 let sbc_sink_builder = FakeBuilder {
703 capability: sbc_sink_codec.clone(),
704 direction: avdtp::EndpointType::Sink,
705 };
706 let sbc_source_builder =
707 FakeBuilder { capability: sbc_source_codec(), direction: avdtp::EndpointType::Source };
708
709 let mut streams_builder = StreamsBuilder::default();
710 streams_builder.add_builder(aac_sink_builder);
711 streams_builder.add_builder(sbc_sink_builder);
712 streams_builder.add_builder(sbc_source_builder);
713
714 let peers = ConnectedPeers::new(
715 streams_builder,
716 Permits::new(1),
717 proxy,
718 bt_metrics::MetricsLogger::default(),
719 );
720
721 (exec, peers, stream, sbc_sink_codec, aac_sink_codec)
722 }
723
724 #[fuchsia::test]
725 fn streaming_start_with_streaming_peer_is_noop() {
726 let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
727 let id = PeerId(1);
728 let (remote, channel) = Channel::create();
729 let remote = avdtp::Peer::new(remote);
730
731 let delay = zx::MonotonicDuration::from_seconds(1);
732
733 let mut remote_requests = remote.take_request_stream();
734
735 let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
737 assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
738 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
739
740 let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
743 let config_caps = &[ServiceCapability::MediaTransport, sbc_codec];
744 let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
745 let mut set_config_fut = pin!(set_config_fut);
746 match exec.run_until_stalled(&mut set_config_fut) {
747 Poll::Ready(Ok(())) => {}
748 x => panic!("Expected set config to be ready and Ok, got {:?}", x),
749 };
750
751 exec.set_fake_time(
755 fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
756 );
757 let _ = exec.wake_expired_timers();
758
759 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
760
761 assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
763 }
764
765 fn sbc_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
766 let remote_sbc_seid: avdtp::StreamEndpointId = 1u8.try_into().unwrap();
767 let info = avdtp::StreamInformation::new(
768 remote_sbc_seid.clone(),
769 false,
770 avdtp::MediaType::Audio,
771 avdtp::EndpointType::Source,
772 );
773 (remote_sbc_seid, info)
774 }
775
776 fn aac_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
777 let remote_aac_seid: avdtp::StreamEndpointId = 2u8.try_into().unwrap();
778 let info = avdtp::StreamInformation::new(
779 remote_aac_seid.clone(),
780 false,
781 avdtp::MediaType::Audio,
782 avdtp::EndpointType::Source,
783 );
784 (remote_aac_seid, info)
785 }
786
787 fn sbc_sink_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
788 let remote_sbc_seid: avdtp::StreamEndpointId = 3u8.try_into().unwrap();
789 let info = avdtp::StreamInformation::new(
790 remote_sbc_seid.clone(),
791 false,
792 avdtp::MediaType::Audio,
793 avdtp::EndpointType::Sink,
794 );
795 (remote_sbc_seid, info)
796 }
797
798 fn expect_peer_discovery(
801 exec: &mut fasync::TestExecutor,
802 requests: &mut avdtp::RequestStream,
803 response: Vec<avdtp::StreamInformation>,
804 ) {
805 match exec.run_until_stalled(&mut requests.next()) {
806 Poll::Ready(Some(Ok(avdtp::Request::Discover { responder }))) => {
807 responder.send(&response).expect("response succeeds");
808 }
809 x => panic!("Expected a discovery request to be sent after delay, got {:?}", x),
810 };
811 }
812
813 #[fuchsia::test]
814 fn streaming_start_configure_while_discovery() {
815 let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
816 let id = PeerId(1);
817 let (remote, channel) = Channel::create();
818 let remote = avdtp::Peer::new(remote);
819
820 let delay = zx::MonotonicDuration::from_seconds(1);
821
822 let mut remote_requests = remote.take_request_stream();
823
824 let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
826 assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
827 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
828
829 exec.set_fake_time(
831 fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
832 );
833 let _ = exec.wake_expired_timers();
834 expect_peer_discovery(
835 &mut exec,
836 &mut remote_requests,
837 vec![sbc_source_endpoint().1, aac_source_endpoint().1],
838 );
839
840 let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
842 let config_caps = &[ServiceCapability::MediaTransport, sbc_codec.clone()];
843 let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
844 let mut set_config_fut = pin!(set_config_fut);
845 match exec.run_until_stalled(&mut set_config_fut) {
846 Poll::Ready(Ok(())) => {}
847 x => panic!("Expected set config to be ready and Ok, got {:?}", x),
848 };
849
850 loop {
852 match exec.run_until_stalled(&mut remote_requests.next()) {
853 Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { responder, .. }))) => {
854 responder
855 .send(&[avdtp::ServiceCapability::MediaTransport, sbc_codec.clone()])
856 .expect("respond succeeds");
857 }
858 Poll::Ready(x) => panic!("Got unexpected request: {:?}", x),
859 Poll::Pending => break,
860 }
861 }
862 }
863
864 #[fuchsia::test]
867 fn connect_initiation_uses_biased_codec_negotiation_by_peer() {
868 let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
869 let id = PeerId(1);
870 let (remote, channel) = Channel::create();
871
872 peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
874
875 let remote = avdtp::Peer::new(remote);
877 let desc = ProfileDescriptor {
878 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
879 major_version: Some(1),
880 minor_version: Some(2),
881 ..Default::default()
882 };
883 let preferred_direction = vec![avdtp::EndpointType::Sink];
884 let delay = zx::MonotonicDuration::from_seconds(1);
885 peers.found(id, desc, HashSet::from_iter(preferred_direction.into_iter()));
886
887 let connected_fut = peers.connected(id, channel, Some(delay));
888 let mut connected_fut = std::pin::pin!(connected_fut);
889 let _ = exec
890 .run_until_stalled(&mut connected_fut)
891 .expect("is ready")
892 .expect("connect control channel is ok");
893 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
895
896 let mut remote_requests = remote.take_request_stream();
897
898 assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
900
901 exec.set_fake_time(fasync::MonotonicInstant::after(
902 delay + zx::MonotonicDuration::from_micros(1),
903 ));
904 let _ = exec.wake_expired_timers();
905
906 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
907 let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
910 let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
911 expect_peer_discovery(
912 &mut exec,
913 &mut remote_requests,
914 vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
915 );
916 for _twice in 1..=2 {
917 match exec.run_until_stalled(&mut remote_requests.next()) {
918 Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
919 let codec = match stream_id {
920 id if id == peer_sbc_source_seid => sbc_codec.clone(),
921 id if id == peer_sbc_sink_seid => sbc_codec.clone(),
922 x => panic!("Got unexpected get_capabilities seid {:?}", x),
923 };
924 responder
925 .send(&[avdtp::ServiceCapability::MediaTransport, codec])
926 .expect("respond succeeds");
927 }
928 x => panic!("Expected a ready get capabilities request, got {:?}", x),
929 };
930 }
931
932 match exec.run_until_stalled(&mut remote_requests.next()) {
933 Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
934 local_stream_id,
935 remote_stream_id,
936 capabilities: _,
937 responder,
938 }))) => {
939 assert_eq!(peer_sbc_sink_seid, local_stream_id);
942 let local_sbc_source_seid: avdtp::StreamEndpointId =
943 SBC_SOURCE_SEID.try_into().unwrap();
944 assert_eq!(local_sbc_source_seid, remote_stream_id);
945 responder.send().expect("response sends");
946 }
947 x => panic!("Expected a ready set configuration request, got {:?}", x),
948 };
949 }
950
951 #[fuchsia::test]
956 fn connect_initiation_uses_biased_codec_negotiation_by_system() {
957 let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
958 let id = PeerId(1);
959 let (remote, channel) = Channel::create();
960
961 peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
963
964 let remote = avdtp::Peer::new(remote);
966 let desc = ProfileDescriptor {
967 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
968 major_version: Some(1),
969 minor_version: Some(2),
970 ..Default::default()
971 };
972 peers.found(
973 id,
974 desc.clone(),
975 HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter()),
976 );
977 peers.found(id, desc, HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter()));
978
979 let delay = zx::MonotonicDuration::from_seconds(1);
980 let connect_fut = peers.connected(id, channel, Some(delay));
981 let mut connect_fut = std::pin::pin!(connect_fut);
982 let _ = exec
983 .run_until_stalled(&mut connect_fut)
984 .expect("ready")
985 .expect("connect control channel is ok");
986 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
987
988 let mut remote_requests = remote.take_request_stream();
989 assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
991 exec.set_fake_time(fasync::MonotonicInstant::after(
992 delay + zx::MonotonicDuration::from_micros(1),
993 ));
994 let _ = exec.wake_expired_timers();
995 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
996
997 let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
1000 let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
1001 expect_peer_discovery(
1002 &mut exec,
1003 &mut remote_requests,
1004 vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
1005 );
1006 for _twice in 1..=2 {
1007 match exec.run_until_stalled(&mut remote_requests.next()) {
1008 Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
1009 let codec = match stream_id {
1010 id if id == peer_sbc_source_seid => sbc_codec.clone(),
1011 id if id == peer_sbc_sink_seid => sbc_codec.clone(),
1012 x => panic!("Got unexpected get_capabilities seid {:?}", x),
1013 };
1014 responder
1015 .send(&[avdtp::ServiceCapability::MediaTransport, codec])
1016 .expect("respond succeeds");
1017 }
1018 x => panic!("Expected a ready get capabilities request, got {:?}", x),
1019 };
1020 }
1021
1022 match exec.run_until_stalled(&mut remote_requests.next()) {
1023 Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
1024 local_stream_id,
1025 remote_stream_id,
1026 capabilities: _,
1027 responder,
1028 }))) => {
1029 assert_eq!(peer_sbc_source_seid, local_stream_id);
1032 let local_sbc_sink_seid: avdtp::StreamEndpointId =
1033 SBC_SINK_SEID.try_into().unwrap();
1034 assert_eq!(local_sbc_sink_seid, remote_stream_id);
1035 responder.send().expect("response sends");
1036 }
1037 x => panic!("Expected a ready set configuration request, got {:?}", x),
1038 };
1039 }
1040
1041 #[fuchsia::test]
1042 fn connect_initiation_uses_negotiation() {
1043 let (mut exec, peers, _stream, sbc_codec, aac_codec) = setup_negotiation_test();
1044 let id = PeerId(1);
1045 let (remote, channel) = Channel::create();
1046 let remote = avdtp::Peer::new(remote);
1047
1048 let delay = zx::MonotonicDuration::from_seconds(1);
1049
1050 let mut connect_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
1051 let _ = exec
1052 .run_until_stalled(&mut connect_fut)
1053 .expect("ready")
1054 .expect("connect control channel is ok");
1055
1056 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1058
1059 let mut remote_requests = remote.take_request_stream();
1060
1061 assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
1063
1064 exec.set_fake_time(fasync::MonotonicInstant::after(
1065 delay + zx::MonotonicDuration::from_micros(1),
1066 ));
1067 let _ = exec.wake_expired_timers();
1068
1069 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1070
1071 let (peer_sbc_seid, peer_sbc_endpoint) = sbc_source_endpoint();
1073 let (peer_aac_seid, peer_aac_endpoint) = aac_source_endpoint();
1074 expect_peer_discovery(
1075 &mut exec,
1076 &mut remote_requests,
1077 vec![peer_sbc_endpoint, peer_aac_endpoint],
1078 );
1079 for _twice in 1..=2 {
1080 match exec.run_until_stalled(&mut remote_requests.next()) {
1081 Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
1082 let codec = match stream_id {
1083 id if id == peer_sbc_seid => sbc_codec.clone(),
1084 id if id == peer_aac_seid => aac_codec.clone(),
1085 x => panic!("Got unexpected get_capabilities seid {:?}", x),
1086 };
1087 responder
1088 .send(&[avdtp::ServiceCapability::MediaTransport, codec])
1089 .expect("respond succeeds");
1090 }
1091 x => panic!("Expected a ready get capabilities request, got {:?}", x),
1092 };
1093 }
1094
1095 match exec.run_until_stalled(&mut remote_requests.next()) {
1096 Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
1097 local_stream_id,
1098 remote_stream_id,
1099 capabilities: _,
1100 responder,
1101 }))) => {
1102 assert_eq!(peer_aac_seid, local_stream_id);
1104 let local_aac_seid: avdtp::StreamEndpointId = AAC_SEID.try_into().unwrap();
1105 assert_eq!(local_aac_seid, remote_stream_id);
1106 responder.send().expect("response sends");
1107 }
1108 x => panic!("Expected a ready set configuration request, got {:?}", x),
1109 };
1110 }
1111
1112 #[fuchsia::test]
1113 fn connected_peers_inspect() {
1114 let (mut exec, id, mut peers, _stream) = setup_connected_peer_test();
1115
1116 let inspect = inspect::Inspector::default();
1117 peers.iattach(inspect.root(), "peers").expect("should attach to inspect tree");
1118
1119 assert_data_tree!(@executor exec, inspect, root: {
1120 peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Sink" }});
1121
1122 peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
1123
1124 assert_data_tree!(@executor exec, inspect, root: {
1125 peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Source" }});
1126
1127 let (_remote, channel) = Channel::create();
1129 assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1130
1131 assert_data_tree!(@executor exec, inspect, root: {
1132 peers: {
1133 discovered: contains {},
1134 preferred_peer_direction: "Source",
1135 streams_builder: contains {},
1136 peer_0: { id: "0000000000000001", local_streams: contains {} }
1137 }
1138 });
1139 }
1140
1141 #[fuchsia::test]
1142 fn try_connect_cancels_previous_attempt() {
1143 let (mut exec, id, peers, mut profile_stream) = setup_connected_peer_test();
1144
1145 let mut connect_fut = peers.try_connect(id, ChannelParameters::default());
1146
1147 let responder = match exec.run_singlethreaded(profile_stream.next()) {
1149 Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
1150 x => panic!("Expected Profile connect, got {x:?}"),
1151 };
1152
1153 let mut connect_again_fut = peers.try_connect(id, ChannelParameters::default());
1155 let responder_two = match exec.run_singlethreaded(profile_stream.next()) {
1156 Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
1157 x => panic!("Expected Profile connect, got {x:?}"),
1158 };
1159
1160 let first_result = exec.run_singlethreaded(&mut connect_fut);
1161 let _ = first_result.expect_err("Should have an error from first attempt");
1162
1163 responder.send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed)).unwrap();
1165
1166 exec.run_until_stalled(&mut connect_again_fut).expect_pending("shouldn't finish");
1167
1168 let (_remote, local) = Channel::create();
1169 responder_two.send(Ok(local.try_into().unwrap())).unwrap();
1170
1171 let second_result = exec.run_singlethreaded(&mut connect_again_fut);
1172 let _ = second_result.expect("should receive the channel");
1173 }
1174
1175 #[fuchsia::test]
1176 fn connected_peers_peer_disconnect_removes_peer() {
1177 let (mut exec, id, peers, _stream) = setup_connected_peer_test();
1178
1179 let (remote, channel) = Channel::create();
1180
1181 assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1182 run_to_stalled(&mut exec);
1183
1184 drop(remote);
1186
1187 run_to_stalled(&mut exec);
1188
1189 assert!(peers.get(&id).is_none());
1190 }
1191
1192 #[fuchsia::test]
1193 fn connected_peers_reconnect_works() {
1194 let (mut exec, id, peers, _stream) = setup_connected_peer_test();
1195
1196 let (remote, channel) = Channel::create();
1197 assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1198 run_to_stalled(&mut exec);
1199
1200 drop(remote);
1202
1203 run_to_stalled(&mut exec);
1204
1205 assert!(peers.get(&id).is_none());
1206
1207 let (_remote, channel) = Channel::create();
1209
1210 assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1211 run_to_stalled(&mut exec);
1212
1213 assert!(peers.get(&id).is_some());
1215 }
1216}