1use anyhow::{format_err, Error};
6use fidl_fuchsia_bluetooth::ChannelParameters;
7use fidl_fuchsia_bluetooth_bredr::{self as bredr, ProfileDescriptor, ProfileProxy};
8use fuchsia_bluetooth::detachable_map::{DetachableMap, DetachableWeak};
9use fuchsia_bluetooth::inspect::DebugExt;
10use fuchsia_bluetooth::types::{Channel, PeerId};
11use fuchsia_inspect::{self as inspect, NumericProperty, Property};
12use fuchsia_inspect_derive::{AttachError, Inspect};
13use fuchsia_sync::Mutex;
14use futures::channel::{mpsc, oneshot};
15use futures::stream::{Stream, StreamExt};
16use futures::task::{Context, Poll};
17use futures::{Future, FutureExt, TryFutureExt};
18use log::{info, warn};
19use std::collections::hash_map::Entry;
20use std::collections::{HashMap, HashSet};
21use std::pin::Pin;
22use std::sync::Arc;
23use {bt_avdtp as avdtp, fuchsia_async as fasync};
24
25use crate::codec::CodecNegotiation;
26use crate::peer::Peer;
27use crate::permits::Permits;
28use crate::stream::StreamsBuilder;
29
30struct PeerStats {
33 id: PeerId,
34 inspect_node: inspect::Node,
35 connection_count: inspect::UintProperty,
37}
38
39impl PeerStats {
40 fn new(id: PeerId) -> Self {
41 Self { id, inspect_node: Default::default(), connection_count: Default::default() }
42 }
43
44 fn set_descriptor(&mut self, descriptor: &ProfileDescriptor) {
45 self.inspect_node.record_string("descriptor", descriptor.debug());
46 }
47
48 fn record_connected(&mut self) {
49 let _ = self.connection_count.add(1);
50 }
51}
52
53impl Inspect for &mut PeerStats {
54 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
55 self.inspect_node = parent.create_child(name.as_ref());
56 self.inspect_node.record_string("id", self.id.to_string());
57 self.connection_count = self.inspect_node.create_uint("connection_count", 0);
58 Ok(())
59 }
60}
61
62#[derive(Default)]
63struct DiscoveredPeers {
64 descriptors: HashMap<PeerId, (ProfileDescriptor, HashSet<avdtp::EndpointType>)>,
68 stats: HashMap<PeerId, PeerStats>,
70 inspect_node: inspect::Node,
72}
73
74impl DiscoveredPeers {
75 fn insert(
76 &mut self,
77 id: PeerId,
78 descriptor: ProfileDescriptor,
79 directions: HashSet<avdtp::EndpointType>,
80 ) {
81 self.stats
82 .entry(id)
83 .or_insert_with(|| {
84 let mut new_stats = PeerStats::new(id);
85 let _ = new_stats.iattach(&self.inspect_node, inspect::unique_name("peer_"));
86 new_stats
87 })
88 .set_descriptor(&descriptor);
89
90 match self.descriptors.entry(id) {
91 Entry::Occupied(mut entry) => {
92 entry.get_mut().0 = descriptor;
93 entry.get_mut().1.extend(&directions);
94 }
95 Entry::Vacant(entry) => {
96 let _ = entry.insert((descriptor, directions));
97 }
98 };
99 }
100
101 fn connected(&mut self, id: PeerId) {
102 if let Some(stats) = self.stats.get_mut(&id) {
103 stats.record_connected();
104 }
105 }
106
107 fn get(&self, id: &PeerId) -> Option<(ProfileDescriptor, Option<avdtp::EndpointType>)> {
109 self.descriptors.get(id).map(|(desc, dirs)| (desc.clone(), find_preferred_direction(dirs)))
110 }
111}
112
113impl Inspect for &mut DiscoveredPeers {
114 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
115 self.inspect_node = parent.create_child(name.as_ref());
116 Ok(())
117 }
118}
119
120fn find_preferred_direction(
123 directions: &HashSet<avdtp::EndpointType>,
124) -> Option<avdtp::EndpointType> {
125 if directions.len() == 1 {
126 directions.iter().next().cloned()
127 } else {
128 None
131 }
132}
133
134async fn connect_peer(
136 proxy: ProfileProxy,
137 id: PeerId,
138 channel_params: ChannelParameters,
139) -> Result<Channel, Error> {
140 info!(id:%; "Connecting to peer");
141 let connect_fut = proxy.connect(
142 &id.into(),
143 &bredr::ConnectParameters::L2cap(bredr::L2capParameters {
144 psm: Some(bredr::PSM_AVDTP),
145 parameters: Some(channel_params),
146 ..Default::default()
147 }),
148 );
149 let channel = match connect_fut.await {
150 Err(e) => {
151 warn!(id:%, e:?; "FIDL error on connect");
152 return Err(e.into());
153 }
154 Ok(Err(e)) => return Err(format_err!("Bluetooth connect error: {e:?}")),
155 Ok(Ok(channel)) => channel,
156 };
157
158 let channel = channel
159 .try_into()
160 .map_err(|e| format_err!("Couldn't convert FIDL to BT channel: {e:?}"))?;
161 Ok(channel)
162}
163
164pub struct ConnectedPeers {
167 connected: DetachableMap<PeerId, Peer>,
169 connection_attempts: Mutex<HashMap<PeerId, fasync::Task<()>>>,
172 discovered: Mutex<DiscoveredPeers>,
174 streams_builder: StreamsBuilder,
176 permits: Permits,
178 profile: ProfileProxy,
180 metrics: bt_metrics::MetricsLogger,
182 inspect: inspect::Node,
184 inspect_peer_direction: inspect::StringProperty,
186 connected_peer_senders: Mutex<Vec<mpsc::Sender<DetachableWeak<PeerId, Peer>>>>,
188 start_stream_tasks: Mutex<HashMap<PeerId, fasync::Task<()>>>,
191 preferred_peer_direction: Mutex<avdtp::EndpointType>,
194}
195
196impl ConnectedPeers {
197 pub fn new(
198 streams_builder: StreamsBuilder,
199 permits: Permits,
200 profile: ProfileProxy,
201 metrics: bt_metrics::MetricsLogger,
202 ) -> Self {
203 Self {
204 connected: DetachableMap::new(),
205 connection_attempts: Mutex::new(HashMap::new()),
206 discovered: Default::default(),
207 streams_builder,
208 profile,
209 permits,
210 inspect: inspect::Node::default(),
211 inspect_peer_direction: inspect::StringProperty::default(),
212 metrics,
213 connected_peer_senders: Default::default(),
214 start_stream_tasks: Default::default(),
215 preferred_peer_direction: Mutex::new(avdtp::EndpointType::Sink),
216 }
217 }
218
219 pub(crate) fn get_weak(&self, id: &PeerId) -> Option<DetachableWeak<PeerId, Peer>> {
220 self.connected.get(id)
221 }
222
223 pub(crate) fn get(&self, id: &PeerId) -> Option<Arc<Peer>> {
224 self.get_weak(id).and_then(|p| p.upgrade())
225 }
226
227 pub fn is_connected(&self, id: &PeerId) -> bool {
228 self.connected.contains_key(id)
229 }
230
231 async fn start_streaming(
236 peer: &DetachableWeak<PeerId, Peer>,
237 negotiation: CodecNegotiation,
238 ) -> Result<(), anyhow::Error> {
239 let remote_streams = {
240 let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
241 if strong.streaming_active() {
242 return Ok(());
243 }
244 strong.collect_capabilities()
245 }
246 .await?;
247
248 let (negotiated, remote_seid) = negotiation
249 .select(&remote_streams)
250 .ok_or_else(|| format_err!("No compatible stream found"))?;
251
252 let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
253 if strong.streaming_active() {
254 let peer_id = peer.key();
255 info!(peer_id:%; "Not starting streaming, it's already started");
256 return Ok(());
257 }
258 strong.stream_start(remote_seid, negotiated).await.map_err(Into::into)
259 }
260
261 pub fn found(
262 &self,
263 id: PeerId,
264 desc: ProfileDescriptor,
265 preferred_directions: HashSet<avdtp::EndpointType>,
266 ) {
267 self.discovered.lock().insert(id, desc.clone(), preferred_directions);
268 if let Some(peer) = self.get(&id) {
269 let _ = peer.set_descriptor(desc);
270 }
271 }
272
273 pub fn set_preferred_peer_direction(&self, direction: avdtp::EndpointType) {
274 *self.preferred_peer_direction.lock() = direction;
275 self.inspect_peer_direction.set(&format!("{direction:?}"));
276 }
277
278 pub fn preferred_peer_direction(&self) -> avdtp::EndpointType {
279 *self.preferred_peer_direction.lock()
280 }
281
282 pub fn try_connect(
283 &self,
284 id: PeerId,
285 channel_params: ChannelParameters,
286 ) -> impl Future<Output = Result<Option<Channel>, Error>> {
287 let proxy = self.profile.clone();
288 let connected = self.is_connected(&id);
289 let (sender, recv) = oneshot::channel();
290 let recv =
291 recv.map_ok_or_else(|_e| Err(format_err!("Connection task canceled")), Into::into);
292 if connected {
293 if let Err(e) = sender.send(Ok(None)) {
294 warn!(id:%, e:?; "Failed to notify already-connected");
295 }
296 return recv;
297 }
298 let mut attempts = self.connection_attempts.lock();
299 if let Some(previous_connect_task) = attempts.remove(&id) {
300 if previous_connect_task.now_or_never().is_none() {
302 warn!(id:%; "Cancelling previous connect attempt");
303 }
304 }
305 let connect_task = fasync::Task::spawn(async move {
306 if let Err(e) = sender.send(connect_peer(proxy, id, channel_params).await.map(Some)) {
307 warn!(id:%, e:?; "Failed to send channel connect result");
308 }
309 });
310 let _ = attempts.insert(id, connect_task);
311 recv
312 }
313
314 pub async fn connected(
319 &self,
320 id: PeerId,
321 channel: Channel,
322 initiator_delay: Option<zx::MonotonicDuration>,
323 ) -> Result<DetachableWeak<PeerId, Peer>, Error> {
324 if let Some(weak) = self.get_weak(&id) {
325 let peer =
326 weak.upgrade().ok_or_else(|| format_err!("Disconnected connecting transport"))?;
327 if let Err(e) = peer.receive_channel(channel) {
328 warn!(id:%, e:%; "failed to connect channel");
329 return Err(e.into());
330 }
331 return Ok(weak);
332 }
333
334 let entry = self.connected.lazy_entry(&id);
335
336 info!(id:%; "peer connected");
337 let avdtp_peer = avdtp::Peer::new(channel);
338
339 let mut peer = Peer::create(
340 id,
341 avdtp_peer,
342 self.streams_builder.peer_streams(&id, None).await?,
343 Some(self.permits.clone()),
344 self.profile.clone(),
345 self.metrics.clone(),
346 );
347
348 self.discovered.lock().connected(id);
349
350 let peer_preferred_direction = if let Some((desc, dir)) = self.discovered.lock().get(&id) {
351 let _ = peer.set_descriptor(desc);
352 dir
353 } else {
354 None
355 };
356
357 if let Err(e) = peer.iattach(&self.inspect, inspect::unique_name("peer_")) {
358 warn!(id:%, e:?; "Couldn't attach inspect");
359 }
360
361 let closed_fut = peer.closed();
362 let peer = match entry.try_insert(peer) {
363 Err(_peer) => {
364 warn!(id:%; "Peer connected while we were setting up");
365 return self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"));
366 }
367 Ok(weak_peer) => weak_peer,
368 };
369
370 if let Some(delay) = initiator_delay {
371 let peer = peer.clone();
372 let peer_id = peer.key().clone();
373 let negotiation = self
376 .streams_builder
377 .negotiation(
378 &id,
379 None,
380 peer_preferred_direction.unwrap_or_else(|| self.preferred_peer_direction()),
381 )
382 .await?;
383 let start_stream_task = fuchsia_async::Task::local(async move {
384 let delay_sec = delay.into_millis() as f64 / 1000.0;
385 info!(id:% = peer.key(); "dwelling {delay_sec}s for peer initiation");
386 fasync::Timer::new(fasync::MonotonicInstant::after(delay)).await;
387
388 if let Err(e) = ConnectedPeers::start_streaming(&peer, negotiation).await {
389 info!(id:% = peer.key(), e:?; "Peer start streaming failed");
390 peer.detach();
391 }
392 });
393 if self.start_stream_tasks.lock().insert(peer_id, start_stream_task).is_some() {
394 info!(peer_id:%; "Replacing previous start stream dwell");
395 }
396 }
397
398 fasync::Task::local(async move {
400 closed_fut.await;
401 peer.detach();
402 })
403 .detach();
404
405 let peer = self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"))?;
406 self.notify_connected(&peer);
407 Ok(peer)
408 }
409
410 fn notify_connected(&self, peer: &DetachableWeak<PeerId, Peer>) {
412 let mut senders = self.connected_peer_senders.lock();
413 senders.retain_mut(|sender| sender.try_send(peer.clone()).is_ok());
414 }
415
416 pub fn connected_stream(&self) -> PeerConnections {
418 let (sender, receiver) = mpsc::channel(0);
419 self.connected_peer_senders.lock().push(sender);
420 PeerConnections { stream: receiver }
421 }
422}
423
424impl Inspect for &mut ConnectedPeers {
425 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
426 self.inspect = parent.create_child(name.as_ref());
427 let peer_dir_str = format!("{:?}", self.preferred_peer_direction());
428 self.inspect_peer_direction =
429 self.inspect.create_string("preferred_peer_direction", peer_dir_str);
430 self.streams_builder.iattach(&self.inspect, "streams_builder")?;
431 self.discovered.lock().iattach(&self.inspect, "discovered")
432 }
433}
434
435pub struct PeerConnections {
438 stream: mpsc::Receiver<DetachableWeak<PeerId, Peer>>,
439}
440
441impl Stream for PeerConnections {
442 type Item = DetachableWeak<PeerId, Peer>;
443
444 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
445 self.stream.poll_next_unpin(cx)
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452
453 use async_utils::PollExt;
454 use bt_avdtp::{Request, ServiceCapability};
455 use diagnostics_assertions::assert_data_tree;
456 use fidl::endpoints::create_proxy_and_stream;
457 use fidl_fuchsia_bluetooth_bredr::{
458 AudioOffloadExtProxy, ProfileMarker, ProfileRequestStream, ServiceClassProfileIdentifier,
459 };
460 use futures::future::BoxFuture;
461 use std::pin::pin;
462
463 use crate::codec::MediaCodecConfig;
464 use crate::media_task::{MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
465 use crate::media_types::*;
466
467 fn run_to_stalled(exec: &mut fasync::TestExecutor) {
468 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
469 }
470
471 fn exercise_avdtp(exec: &mut fasync::TestExecutor, remote: Channel, peer: &Peer) {
472 let remote_avdtp = avdtp::Peer::new(remote);
473 let mut remote_requests = remote_avdtp.take_request_stream();
474
475 let avdtp = peer.avdtp();
477 let discover_fut = avdtp.discover();
478
479 let mut discover_fut = pin!(discover_fut);
480
481 assert!(exec.run_until_stalled(&mut discover_fut).is_pending());
482
483 let responder = match exec.run_until_stalled(&mut remote_requests.next()) {
484 Poll::Ready(Some(Ok(Request::Discover { responder }))) => responder,
485 x => panic!("Expected a Ready Discovery request but got {:?}", x),
486 };
487
488 let endpoint_id = avdtp::StreamEndpointId::try_from(1).expect("endpointid creation");
489
490 let information = avdtp::StreamInformation::new(
491 endpoint_id,
492 false,
493 avdtp::MediaType::Audio,
494 avdtp::EndpointType::Source,
495 );
496
497 responder.send(&[information]).expect("Sending response should have worked");
498
499 let _stream_infos = match exec.run_until_stalled(&mut discover_fut) {
500 Poll::Ready(Ok(infos)) => infos,
501 x => panic!("Expected a Ready response but got {:?}", x),
502 };
503 }
504
505 fn setup_connected_peer_test(
506 ) -> (fasync::TestExecutor, PeerId, ConnectedPeers, ProfileRequestStream) {
507 let exec = fasync::TestExecutor::new();
508 let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
509 let id = PeerId(1);
510
511 let peers = ConnectedPeers::new(
512 StreamsBuilder::default(),
513 Permits::new(1),
514 proxy,
515 bt_metrics::MetricsLogger::default(),
516 );
517
518 (exec, id, peers, stream)
519 }
520
521 #[fuchsia::test]
522 fn connect_creates_peer() {
523 let (mut exec, id, peers, _stream) = setup_connected_peer_test();
524
525 let (remote, channel) = Channel::create();
526
527 let peer = exec
528 .run_singlethreaded(peers.connected(id, channel, None))
529 .expect("peer should connect");
530 let peer = peer.upgrade().expect("peer should be connected");
531
532 exercise_avdtp(&mut exec, remote, &peer);
533 }
534
535 #[fuchsia::test]
536 fn connect_notifies_streams() {
537 let (mut exec, id, peers, _stream) = setup_connected_peer_test();
538
539 let (remote, channel) = Channel::create();
540
541 let mut peer_stream = peers.connected_stream();
542 let mut peer_stream_two = peers.connected_stream();
543
544 let peer = exec
545 .run_singlethreaded(peers.connected(id, channel, None))
546 .expect("peer should connect");
547 let peer = peer.upgrade().expect("peer should be connected");
548
549 let weak = exec.run_singlethreaded(peer_stream.next()).expect("peer stream to produce");
551 assert_eq!(weak.key(), &id);
552 let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
553 assert_eq!(weak.key(), &id);
554
555 exercise_avdtp(&mut exec, remote, &peer);
556
557 drop(peer_stream);
559
560 let id2 = PeerId(2);
561 let (remote2, channel2) = Channel::create();
562 let peer2 = exec
563 .run_singlethreaded(peers.connected(id2, channel2, None))
564 .expect("peer should connect");
565 let peer2 = peer2.upgrade().expect("peer two should be connected");
566
567 let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
568 assert_eq!(weak.key(), &id2);
569
570 exercise_avdtp(&mut exec, remote2, &peer2);
571 }
572
573 #[fuchsia::test]
574 fn find_preferred_direction_returns_correct_endpoints() {
575 let empty = HashSet::new();
576 assert_eq!(find_preferred_direction(&empty), None);
577
578 let sink_only = HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter());
579 assert_eq!(find_preferred_direction(&sink_only), Some(avdtp::EndpointType::Sink));
580
581 let source_only = HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter());
582 assert_eq!(find_preferred_direction(&source_only), Some(avdtp::EndpointType::Source));
583
584 let both = HashSet::from_iter(
585 vec![avdtp::EndpointType::Sink, avdtp::EndpointType::Source].into_iter(),
586 );
587 assert_eq!(find_preferred_direction(&both), None);
588 }
589
590 const AAC_SEID: u8 = 8;
592 const SBC_SINK_SEID: u8 = 9;
594 const SBC_SOURCE_SEID: u8 = 10;
596
597 fn aac_sink_codec() -> avdtp::ServiceCapability {
598 AacCodecInfo::new(
599 AacObjectType::MANDATORY_SNK,
600 AacSamplingFrequency::MANDATORY_SNK,
601 AacChannels::MANDATORY_SNK,
602 true,
603 0, )
605 .unwrap()
606 .into()
607 }
608
609 fn sbc_sink_codec() -> avdtp::ServiceCapability {
610 SbcCodecInfo::new(
611 SbcSamplingFrequency::MANDATORY_SNK,
612 SbcChannelMode::MANDATORY_SNK,
613 SbcBlockCount::MANDATORY_SNK,
614 SbcSubBands::MANDATORY_SNK,
615 SbcAllocation::MANDATORY_SNK,
616 SbcCodecInfo::BITPOOL_MIN,
617 SbcCodecInfo::BITPOOL_MAX,
618 )
619 .unwrap()
620 .into()
621 }
622
623 fn sbc_source_codec() -> avdtp::ServiceCapability {
624 SbcCodecInfo::new(
625 SbcSamplingFrequency::FREQ48000HZ,
626 SbcChannelMode::JOINT_STEREO,
627 SbcBlockCount::MANDATORY_SRC,
628 SbcSubBands::MANDATORY_SRC,
629 SbcAllocation::MANDATORY_SRC,
630 SbcCodecInfo::BITPOOL_MIN,
631 SbcCodecInfo::BITPOOL_MAX,
632 )
633 .unwrap()
634 .into()
635 }
636
637 #[derive(Clone)]
638 struct FakeBuilder {
639 capability: avdtp::ServiceCapability,
640 direction: avdtp::EndpointType,
641 }
642
643 impl MediaTaskBuilder for FakeBuilder {
644 fn configure(
645 &self,
646 _peer_id: &PeerId,
647 codec_config: &MediaCodecConfig,
648 ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
649 if self.capability.codec_type() == Some(codec_config.codec_type()) {
650 return Ok(Box::new(FakeRunner {}));
651 }
652 Err(MediaTaskError::Other(String::from("Unsupported configuring")))
653 }
654
655 fn direction(&self) -> bt_avdtp::EndpointType {
656 self.direction
657 }
658
659 fn supported_configs(
660 &self,
661 _peer_id: &PeerId,
662 _offload: Option<AudioOffloadExtProxy>,
663 ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
664 futures::future::ready(Ok(vec![(&self.capability).try_into().unwrap()])).boxed()
665 }
666 }
667
668 struct FakeRunner {}
669
670 impl MediaTaskRunner for FakeRunner {
671 fn start(
672 &mut self,
673 _stream: avdtp::MediaStream,
674 _offload: Option<AudioOffloadExtProxy>,
675 ) -> Result<Box<dyn crate::media_task::MediaTask>, MediaTaskError> {
676 Err(MediaTaskError::Other(String::from("unimplemented starting")))
677 }
678 }
679
680 fn setup_negotiation_test() -> (
684 fasync::TestExecutor,
685 ConnectedPeers,
686 ProfileRequestStream,
687 ServiceCapability,
688 ServiceCapability,
689 ) {
690 let exec = fasync::TestExecutor::new_with_fake_time();
691 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000));
692 let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
693
694 let aac_sink_codec = aac_sink_codec();
695 let sbc_sink_codec = sbc_sink_codec();
696 let aac_sink_builder = FakeBuilder {
697 capability: aac_sink_codec.clone(),
698 direction: avdtp::EndpointType::Sink,
699 };
700 let sbc_sink_builder = FakeBuilder {
701 capability: sbc_sink_codec.clone(),
702 direction: avdtp::EndpointType::Sink,
703 };
704 let sbc_source_builder =
705 FakeBuilder { capability: sbc_source_codec(), direction: avdtp::EndpointType::Source };
706
707 let mut streams_builder = StreamsBuilder::default();
708 streams_builder.add_builder(aac_sink_builder);
709 streams_builder.add_builder(sbc_sink_builder);
710 streams_builder.add_builder(sbc_source_builder);
711
712 let peers = ConnectedPeers::new(
713 streams_builder,
714 Permits::new(1),
715 proxy,
716 bt_metrics::MetricsLogger::default(),
717 );
718
719 (exec, peers, stream, sbc_sink_codec, aac_sink_codec)
720 }
721
722 #[fuchsia::test]
723 fn streaming_start_with_streaming_peer_is_noop() {
724 let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
725 let id = PeerId(1);
726 let (remote, channel) = Channel::create();
727 let remote = avdtp::Peer::new(remote);
728
729 let delay = zx::MonotonicDuration::from_seconds(1);
730
731 let mut remote_requests = remote.take_request_stream();
732
733 let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
735 assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
736 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
737
738 let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
741 let config_caps = &[ServiceCapability::MediaTransport, sbc_codec];
742 let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
743 let mut set_config_fut = pin!(set_config_fut);
744 match exec.run_until_stalled(&mut set_config_fut) {
745 Poll::Ready(Ok(())) => {}
746 x => panic!("Expected set config to be ready and Ok, got {:?}", x),
747 };
748
749 exec.set_fake_time(
753 fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
754 );
755 let _ = exec.wake_expired_timers();
756
757 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
758
759 assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
761 }
762
763 fn sbc_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
764 let remote_sbc_seid: avdtp::StreamEndpointId = 1u8.try_into().unwrap();
765 let info = avdtp::StreamInformation::new(
766 remote_sbc_seid.clone(),
767 false,
768 avdtp::MediaType::Audio,
769 avdtp::EndpointType::Source,
770 );
771 (remote_sbc_seid, info)
772 }
773
774 fn aac_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
775 let remote_aac_seid: avdtp::StreamEndpointId = 2u8.try_into().unwrap();
776 let info = avdtp::StreamInformation::new(
777 remote_aac_seid.clone(),
778 false,
779 avdtp::MediaType::Audio,
780 avdtp::EndpointType::Source,
781 );
782 (remote_aac_seid, info)
783 }
784
785 fn sbc_sink_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
786 let remote_sbc_seid: avdtp::StreamEndpointId = 3u8.try_into().unwrap();
787 let info = avdtp::StreamInformation::new(
788 remote_sbc_seid.clone(),
789 false,
790 avdtp::MediaType::Audio,
791 avdtp::EndpointType::Sink,
792 );
793 (remote_sbc_seid, info)
794 }
795
796 fn expect_peer_discovery(
799 exec: &mut fasync::TestExecutor,
800 requests: &mut avdtp::RequestStream,
801 response: Vec<avdtp::StreamInformation>,
802 ) {
803 match exec.run_until_stalled(&mut requests.next()) {
804 Poll::Ready(Some(Ok(avdtp::Request::Discover { responder }))) => {
805 responder.send(&response).expect("response succeeds");
806 }
807 x => panic!("Expected a discovery request to be sent after delay, got {:?}", x),
808 };
809 }
810
811 #[fuchsia::test]
812 fn streaming_start_configure_while_discovery() {
813 let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
814 let id = PeerId(1);
815 let (remote, channel) = Channel::create();
816 let remote = avdtp::Peer::new(remote);
817
818 let delay = zx::MonotonicDuration::from_seconds(1);
819
820 let mut remote_requests = remote.take_request_stream();
821
822 let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
824 assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
825 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
826
827 exec.set_fake_time(
829 fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
830 );
831 let _ = exec.wake_expired_timers();
832 expect_peer_discovery(
833 &mut exec,
834 &mut remote_requests,
835 vec![sbc_source_endpoint().1, aac_source_endpoint().1],
836 );
837
838 let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
840 let config_caps = &[ServiceCapability::MediaTransport, sbc_codec.clone()];
841 let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
842 let mut set_config_fut = pin!(set_config_fut);
843 match exec.run_until_stalled(&mut set_config_fut) {
844 Poll::Ready(Ok(())) => {}
845 x => panic!("Expected set config to be ready and Ok, got {:?}", x),
846 };
847
848 loop {
850 match exec.run_until_stalled(&mut remote_requests.next()) {
851 Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { responder, .. }))) => {
852 responder
853 .send(&[avdtp::ServiceCapability::MediaTransport, sbc_codec.clone()])
854 .expect("respond succeeds");
855 }
856 Poll::Ready(x) => panic!("Got unexpected request: {:?}", x),
857 Poll::Pending => break,
858 }
859 }
860 }
861
862 #[fuchsia::test]
865 fn connect_initiation_uses_biased_codec_negotiation_by_peer() {
866 let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
867 let id = PeerId(1);
868 let (remote, channel) = Channel::create();
869
870 peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
872
873 let remote = avdtp::Peer::new(remote);
875 let desc = ProfileDescriptor {
876 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
877 major_version: Some(1),
878 minor_version: Some(2),
879 ..Default::default()
880 };
881 let preferred_direction = vec![avdtp::EndpointType::Sink];
882 let delay = zx::MonotonicDuration::from_seconds(1);
883 peers.found(id, desc, HashSet::from_iter(preferred_direction.into_iter()));
884
885 let connected_fut = peers.connected(id, channel, Some(delay));
886 let mut connected_fut = std::pin::pin!(connected_fut);
887 let _ = exec
888 .run_until_stalled(&mut connected_fut)
889 .expect("is ready")
890 .expect("connect control channel is ok");
891 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
893
894 let mut remote_requests = remote.take_request_stream();
895
896 assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
898
899 exec.set_fake_time(fasync::MonotonicInstant::after(
900 delay + zx::MonotonicDuration::from_micros(1),
901 ));
902 let _ = exec.wake_expired_timers();
903
904 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
905 let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
908 let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
909 expect_peer_discovery(
910 &mut exec,
911 &mut remote_requests,
912 vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
913 );
914 for _twice in 1..=2 {
915 match exec.run_until_stalled(&mut remote_requests.next()) {
916 Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
917 let codec = match stream_id {
918 id if id == peer_sbc_source_seid => sbc_codec.clone(),
919 id if id == peer_sbc_sink_seid => sbc_codec.clone(),
920 x => panic!("Got unexpected get_capabilities seid {:?}", x),
921 };
922 responder
923 .send(&[avdtp::ServiceCapability::MediaTransport, codec])
924 .expect("respond succeeds");
925 }
926 x => panic!("Expected a ready get capabilities request, got {:?}", x),
927 };
928 }
929
930 match exec.run_until_stalled(&mut remote_requests.next()) {
931 Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
932 local_stream_id,
933 remote_stream_id,
934 capabilities: _,
935 responder,
936 }))) => {
937 assert_eq!(peer_sbc_sink_seid, local_stream_id);
940 let local_sbc_source_seid: avdtp::StreamEndpointId =
941 SBC_SOURCE_SEID.try_into().unwrap();
942 assert_eq!(local_sbc_source_seid, remote_stream_id);
943 responder.send().expect("response sends");
944 }
945 x => panic!("Expected a ready set configuration request, got {:?}", x),
946 };
947 }
948
949 #[fuchsia::test]
954 fn connect_initiation_uses_biased_codec_negotiation_by_system() {
955 let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
956 let id = PeerId(1);
957 let (remote, channel) = Channel::create();
958
959 peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
961
962 let remote = avdtp::Peer::new(remote);
964 let desc = ProfileDescriptor {
965 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
966 major_version: Some(1),
967 minor_version: Some(2),
968 ..Default::default()
969 };
970 peers.found(
971 id,
972 desc.clone(),
973 HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter()),
974 );
975 peers.found(id, desc, HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter()));
976
977 let delay = zx::MonotonicDuration::from_seconds(1);
978 let connect_fut = peers.connected(id, channel, Some(delay));
979 let mut connect_fut = std::pin::pin!(connect_fut);
980 let _ = exec
981 .run_until_stalled(&mut connect_fut)
982 .expect("ready")
983 .expect("connect control channel is ok");
984 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
985
986 let mut remote_requests = remote.take_request_stream();
987 assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
989 exec.set_fake_time(fasync::MonotonicInstant::after(
990 delay + zx::MonotonicDuration::from_micros(1),
991 ));
992 let _ = exec.wake_expired_timers();
993 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
994
995 let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
998 let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
999 expect_peer_discovery(
1000 &mut exec,
1001 &mut remote_requests,
1002 vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
1003 );
1004 for _twice in 1..=2 {
1005 match exec.run_until_stalled(&mut remote_requests.next()) {
1006 Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
1007 let codec = match stream_id {
1008 id if id == peer_sbc_source_seid => sbc_codec.clone(),
1009 id if id == peer_sbc_sink_seid => sbc_codec.clone(),
1010 x => panic!("Got unexpected get_capabilities seid {:?}", x),
1011 };
1012 responder
1013 .send(&[avdtp::ServiceCapability::MediaTransport, codec])
1014 .expect("respond succeeds");
1015 }
1016 x => panic!("Expected a ready get capabilities request, got {:?}", x),
1017 };
1018 }
1019
1020 match exec.run_until_stalled(&mut remote_requests.next()) {
1021 Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
1022 local_stream_id,
1023 remote_stream_id,
1024 capabilities: _,
1025 responder,
1026 }))) => {
1027 assert_eq!(peer_sbc_source_seid, local_stream_id);
1030 let local_sbc_sink_seid: avdtp::StreamEndpointId =
1031 SBC_SINK_SEID.try_into().unwrap();
1032 assert_eq!(local_sbc_sink_seid, remote_stream_id);
1033 responder.send().expect("response sends");
1034 }
1035 x => panic!("Expected a ready set configuration request, got {:?}", x),
1036 };
1037 }
1038
1039 #[fuchsia::test]
1040 fn connect_initiation_uses_negotiation() {
1041 let (mut exec, peers, _stream, sbc_codec, aac_codec) = setup_negotiation_test();
1042 let id = PeerId(1);
1043 let (remote, channel) = Channel::create();
1044 let remote = avdtp::Peer::new(remote);
1045
1046 let delay = zx::MonotonicDuration::from_seconds(1);
1047
1048 let mut connect_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
1049 let _ = exec
1050 .run_until_stalled(&mut connect_fut)
1051 .expect("ready")
1052 .expect("connect control channel is ok");
1053
1054 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1056
1057 let mut remote_requests = remote.take_request_stream();
1058
1059 assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
1061
1062 exec.set_fake_time(fasync::MonotonicInstant::after(
1063 delay + zx::MonotonicDuration::from_micros(1),
1064 ));
1065 let _ = exec.wake_expired_timers();
1066
1067 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1068
1069 let (peer_sbc_seid, peer_sbc_endpoint) = sbc_source_endpoint();
1071 let (peer_aac_seid, peer_aac_endpoint) = aac_source_endpoint();
1072 expect_peer_discovery(
1073 &mut exec,
1074 &mut remote_requests,
1075 vec![peer_sbc_endpoint, peer_aac_endpoint],
1076 );
1077 for _twice in 1..=2 {
1078 match exec.run_until_stalled(&mut remote_requests.next()) {
1079 Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
1080 let codec = match stream_id {
1081 id if id == peer_sbc_seid => sbc_codec.clone(),
1082 id if id == peer_aac_seid => aac_codec.clone(),
1083 x => panic!("Got unexpected get_capabilities seid {:?}", x),
1084 };
1085 responder
1086 .send(&[avdtp::ServiceCapability::MediaTransport, codec])
1087 .expect("respond succeeds");
1088 }
1089 x => panic!("Expected a ready get capabilities request, got {:?}", x),
1090 };
1091 }
1092
1093 match exec.run_until_stalled(&mut remote_requests.next()) {
1094 Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
1095 local_stream_id,
1096 remote_stream_id,
1097 capabilities: _,
1098 responder,
1099 }))) => {
1100 assert_eq!(peer_aac_seid, local_stream_id);
1102 let local_aac_seid: avdtp::StreamEndpointId = AAC_SEID.try_into().unwrap();
1103 assert_eq!(local_aac_seid, remote_stream_id);
1104 responder.send().expect("response sends");
1105 }
1106 x => panic!("Expected a ready set configuration request, got {:?}", x),
1107 };
1108 }
1109
1110 #[fuchsia::test]
1111 fn connected_peers_inspect() {
1112 let (mut exec, id, mut peers, _stream) = setup_connected_peer_test();
1113
1114 let inspect = inspect::Inspector::default();
1115 peers.iattach(inspect.root(), "peers").expect("should attach to inspect tree");
1116
1117 assert_data_tree!(inspect, root: {
1118 peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Sink" }});
1119
1120 peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
1121
1122 assert_data_tree!(inspect, root: {
1123 peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Source" }});
1124
1125 let (_remote, channel) = Channel::create();
1127 assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1128
1129 assert_data_tree!(inspect, root: {
1130 peers: {
1131 discovered: contains {},
1132 preferred_peer_direction: "Source",
1133 streams_builder: contains {},
1134 peer_0: { id: "0000000000000001", local_streams: contains {} }
1135 }
1136 });
1137 }
1138
1139 #[fuchsia::test]
1140 fn try_connect_cancels_previous_attempt() {
1141 let (mut exec, id, peers, mut profile_stream) = setup_connected_peer_test();
1142
1143 let mut connect_fut = peers.try_connect(id, ChannelParameters::default());
1144
1145 let responder = match exec.run_singlethreaded(profile_stream.next()) {
1147 Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
1148 x => panic!("Expected Profile connect, got {x:?}"),
1149 };
1150
1151 let mut connect_again_fut = peers.try_connect(id, ChannelParameters::default());
1153 let responder_two = match exec.run_singlethreaded(profile_stream.next()) {
1154 Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
1155 x => panic!("Expected Profile connect, got {x:?}"),
1156 };
1157
1158 let first_result = exec.run_singlethreaded(&mut connect_fut);
1159 let _ = first_result.expect_err("Should have an error from first attempt");
1160
1161 responder.send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed)).unwrap();
1163
1164 exec.run_until_stalled(&mut connect_again_fut).expect_pending("shouldn't finish");
1165
1166 let (_remote, local) = Channel::create();
1167 responder_two.send(Ok(local.try_into().unwrap())).unwrap();
1168
1169 let second_result = exec.run_singlethreaded(&mut connect_again_fut);
1170 let _ = second_result.expect("should receive the channel");
1171 }
1172
1173 #[fuchsia::test]
1174 fn connected_peers_peer_disconnect_removes_peer() {
1175 let (mut exec, id, peers, _stream) = setup_connected_peer_test();
1176
1177 let (remote, channel) = Channel::create();
1178
1179 assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1180 run_to_stalled(&mut exec);
1181
1182 drop(remote);
1184
1185 run_to_stalled(&mut exec);
1186
1187 assert!(peers.get(&id).is_none());
1188 }
1189
1190 #[fuchsia::test]
1191 fn connected_peers_reconnect_works() {
1192 let (mut exec, id, peers, _stream) = setup_connected_peer_test();
1193
1194 let (remote, channel) = Channel::create();
1195 assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1196 run_to_stalled(&mut exec);
1197
1198 drop(remote);
1200
1201 run_to_stalled(&mut exec);
1202
1203 assert!(peers.get(&id).is_none());
1204
1205 let (_remote, channel) = Channel::create();
1207
1208 assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1209 run_to_stalled(&mut exec);
1210
1211 assert!(peers.get(&id).is_some());
1213 }
1214}