bt_a2dp/
connected_peers.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{format_err, Error};
6use fidl_fuchsia_bluetooth::ChannelParameters;
7use fidl_fuchsia_bluetooth_bredr::{self as bredr, ProfileDescriptor, ProfileProxy};
8use fuchsia_bluetooth::detachable_map::{DetachableMap, DetachableWeak};
9use fuchsia_bluetooth::inspect::DebugExt;
10use fuchsia_bluetooth::types::{Channel, PeerId};
11use fuchsia_inspect::{self as inspect, NumericProperty, Property};
12use fuchsia_inspect_derive::{AttachError, Inspect};
13use fuchsia_sync::Mutex;
14use futures::channel::{mpsc, oneshot};
15use futures::stream::{Stream, StreamExt};
16use futures::task::{Context, Poll};
17use futures::{Future, FutureExt, TryFutureExt};
18use log::{info, warn};
19use std::collections::hash_map::Entry;
20use std::collections::{HashMap, HashSet};
21use std::pin::Pin;
22use std::sync::Arc;
23use {bt_avdtp as avdtp, fuchsia_async as fasync};
24
25use crate::codec::CodecNegotiation;
26use crate::peer::Peer;
27use crate::permits::Permits;
28use crate::stream::StreamsBuilder;
29
30/// Statistics node for tracking various information about a peer that has been encountered.
31/// Typically used as an inspect tree node.
32struct PeerStats {
33    id: PeerId,
34    inspect_node: inspect::Node,
35    /// The number of times that this peer has been successfully connected to since discovery.
36    connection_count: inspect::UintProperty,
37}
38
39impl PeerStats {
40    fn new(id: PeerId) -> Self {
41        Self { id, inspect_node: Default::default(), connection_count: Default::default() }
42    }
43
44    fn set_descriptor(&mut self, descriptor: &ProfileDescriptor) {
45        self.inspect_node.record_string("descriptor", descriptor.debug());
46    }
47
48    fn record_connected(&mut self) {
49        let _ = self.connection_count.add(1);
50    }
51}
52
53impl Inspect for &mut PeerStats {
54    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
55        self.inspect_node = parent.create_child(name.as_ref());
56        self.inspect_node.record_string("id", self.id.to_string());
57        self.connection_count = self.inspect_node.create_uint("connection_count", 0);
58        Ok(())
59    }
60}
61
62#[derive(Default)]
63struct DiscoveredPeers {
64    /// The peers that we have discovered, with their descriptors and potential preferred
65    /// endpoint directions. Because the same peer can be discovered multiple times, with
66    /// potentially different endpoints, we maintain a set of advertised directions.
67    descriptors: HashMap<PeerId, (ProfileDescriptor, HashSet<avdtp::EndpointType>)>,
68    /// Holds the child nodes which include the ids and profile descriptors for inspect.
69    stats: HashMap<PeerId, PeerStats>,
70    /// Inspect node, usually at "discovered" in the tree.
71    inspect_node: inspect::Node,
72}
73
74impl DiscoveredPeers {
75    fn insert(
76        &mut self,
77        id: PeerId,
78        descriptor: ProfileDescriptor,
79        directions: HashSet<avdtp::EndpointType>,
80    ) {
81        self.stats
82            .entry(id)
83            .or_insert_with(|| {
84                let mut new_stats = PeerStats::new(id);
85                let _ = new_stats.iattach(&self.inspect_node, inspect::unique_name("peer_"));
86                new_stats
87            })
88            .set_descriptor(&descriptor);
89
90        match self.descriptors.entry(id) {
91            Entry::Occupied(mut entry) => {
92                entry.get_mut().0 = descriptor;
93                entry.get_mut().1.extend(&directions);
94            }
95            Entry::Vacant(entry) => {
96                let _ = entry.insert((descriptor, directions));
97            }
98        };
99    }
100
101    fn connected(&mut self, id: PeerId) {
102        if let Some(stats) = self.stats.get_mut(&id) {
103            stats.record_connected();
104        }
105    }
106
107    /// Returns the descriptor and preferred endpoint direction associated with the peer `id`.
108    fn get(&self, id: &PeerId) -> Option<(ProfileDescriptor, Option<avdtp::EndpointType>)> {
109        self.descriptors.get(id).map(|(desc, dirs)| (desc.clone(), find_preferred_direction(dirs)))
110    }
111}
112
113impl Inspect for &mut DiscoveredPeers {
114    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
115        self.inspect_node = parent.create_child(name.as_ref());
116        Ok(())
117    }
118}
119
120/// Given a set of endpoint `directions`, returns the preferred direction or None
121/// if both Sink and Source are specified.
122fn find_preferred_direction(
123    directions: &HashSet<avdtp::EndpointType>,
124) -> Option<avdtp::EndpointType> {
125    if directions.len() == 1 {
126        directions.iter().next().cloned()
127    } else {
128        // Otherwise, either there are no A2DP services or both Sink & Source are specified
129        // in which case there is no preferred direction.
130        None
131    }
132}
133
134/// Make an outgoing connection to a peer.
135async fn connect_peer(
136    proxy: ProfileProxy,
137    id: PeerId,
138    channel_params: ChannelParameters,
139) -> Result<Channel, Error> {
140    info!(id:%; "Connecting to peer");
141    let connect_fut = proxy.connect(
142        &id.into(),
143        &bredr::ConnectParameters::L2cap(bredr::L2capParameters {
144            psm: Some(bredr::PSM_AVDTP),
145            parameters: Some(channel_params),
146            ..Default::default()
147        }),
148    );
149    let channel = match connect_fut.await {
150        Err(e) => {
151            warn!(id:%, e:?; "FIDL error on connect");
152            return Err(e.into());
153        }
154        Ok(Err(e)) => return Err(format_err!("Bluetooth connect error: {e:?}")),
155        Ok(Ok(channel)) => channel,
156    };
157
158    let channel = channel
159        .try_into()
160        .map_err(|e| format_err!("Couldn't convert FIDL to BT channel: {e:?}"))?;
161    Ok(channel)
162}
163
164/// ConnectedPeers manages the set of connected peers based on discovery, new connection, and
165/// peer session lifetime.
166pub struct ConnectedPeers {
167    /// The set of connected peers.
168    connected: DetachableMap<PeerId, Peer>,
169    /// Tasks for peers that we are attempting to connect to.
170    /// Used to ensure only one outgoing attempt exists at once.
171    connection_attempts: Mutex<HashMap<PeerId, fasync::Task<()>>>,
172    /// ProfileDescriptors from discovering the peer, stored here even if the peer is disconnected
173    discovered: Mutex<DiscoveredPeers>,
174    /// Streams builder, provides a set of streams and negotiation when a peer is connected
175    streams_builder: StreamsBuilder,
176    /// The permits that each peer uses to validate that we can start a stream.
177    permits: Permits,
178    /// Profile Proxy, used to connect new transport sockets.
179    profile: ProfileProxy,
180    /// Cobalt logger to use and hand out to peers, if we are using one.
181    metrics: bt_metrics::MetricsLogger,
182    /// The 'peers' node of the inspect tree. All connected peers own a child node of this node.
183    inspect: inspect::Node,
184    /// Inspect node for which is the current preferred peer direction.
185    inspect_peer_direction: inspect::StringProperty,
186    /// Listeners for new connected peers
187    connected_peer_senders: Mutex<Vec<mpsc::Sender<DetachableWeak<PeerId, Peer>>>>,
188    /// Task handles for newly connected peer stream starts.
189    // TODO(https://fxbug.dev/42146917): Completed tasks aren't garbage-collected yet.
190    start_stream_tasks: Mutex<HashMap<PeerId, fasync::Task<()>>>,
191    /// Preferred direction for new peers.  This is the direction we prefer the peer's endpoint to
192    /// be, i.e. if we prefer Sink, locally we are Source.
193    preferred_peer_direction: Mutex<avdtp::EndpointType>,
194}
195
196impl ConnectedPeers {
197    pub fn new(
198        streams_builder: StreamsBuilder,
199        permits: Permits,
200        profile: ProfileProxy,
201        metrics: bt_metrics::MetricsLogger,
202    ) -> Self {
203        Self {
204            connected: DetachableMap::new(),
205            connection_attempts: Mutex::new(HashMap::new()),
206            discovered: Default::default(),
207            streams_builder,
208            profile,
209            permits,
210            inspect: inspect::Node::default(),
211            inspect_peer_direction: inspect::StringProperty::default(),
212            metrics,
213            connected_peer_senders: Default::default(),
214            start_stream_tasks: Default::default(),
215            preferred_peer_direction: Mutex::new(avdtp::EndpointType::Sink),
216        }
217    }
218
219    pub(crate) fn get_weak(&self, id: &PeerId) -> Option<DetachableWeak<PeerId, Peer>> {
220        self.connected.get(id)
221    }
222
223    pub(crate) fn get(&self, id: &PeerId) -> Option<Arc<Peer>> {
224        self.get_weak(id).and_then(|p| p.upgrade())
225    }
226
227    pub fn is_connected(&self, id: &PeerId) -> bool {
228        self.connected.contains_key(id)
229    }
230
231    /// Attempts to start streaming on `peer` by collecting the remote streaming endpoint
232    /// information, selecting a compatible peer using `negotiation` and starting the stream.
233    /// Does nothing and returns Ok(()) if the peer is already streaming or will start streaming
234    /// on it's own.
235    async fn start_streaming(
236        peer: &DetachableWeak<PeerId, Peer>,
237        negotiation: CodecNegotiation,
238    ) -> Result<(), anyhow::Error> {
239        let remote_streams = {
240            let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
241            if strong.streaming_active() {
242                return Ok(());
243            }
244            strong.collect_capabilities()
245        }
246        .await?;
247
248        let (negotiated, remote_seid) = negotiation
249            .select(&remote_streams)
250            .ok_or_else(|| format_err!("No compatible stream found"))?;
251
252        let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
253        if strong.streaming_active() {
254            let peer_id = peer.key();
255            info!(peer_id:%; "Not starting streaming, it's already started");
256            return Ok(());
257        }
258        strong.stream_start(remote_seid, negotiated).await.map_err(Into::into)
259    }
260
261    pub fn found(
262        &self,
263        id: PeerId,
264        desc: ProfileDescriptor,
265        preferred_directions: HashSet<avdtp::EndpointType>,
266    ) {
267        self.discovered.lock().insert(id, desc.clone(), preferred_directions);
268        if let Some(peer) = self.get(&id) {
269            let _ = peer.set_descriptor(desc);
270        }
271    }
272
273    pub fn set_preferred_peer_direction(&self, direction: avdtp::EndpointType) {
274        *self.preferred_peer_direction.lock() = direction;
275        self.inspect_peer_direction.set(&format!("{direction:?}"));
276    }
277
278    pub fn preferred_peer_direction(&self) -> avdtp::EndpointType {
279        *self.preferred_peer_direction.lock()
280    }
281
282    pub fn try_connect(
283        &self,
284        id: PeerId,
285        channel_params: ChannelParameters,
286    ) -> impl Future<Output = Result<Option<Channel>, Error>> {
287        let proxy = self.profile.clone();
288        let connected = self.is_connected(&id);
289        let (sender, recv) = oneshot::channel();
290        let recv =
291            recv.map_ok_or_else(|_e| Err(format_err!("Connection task canceled")), Into::into);
292        if connected {
293            if let Err(e) = sender.send(Ok(None)) {
294                warn!(id:%, e:?; "Failed to notify already-connected");
295            }
296            return recv;
297        }
298        let mut attempts = self.connection_attempts.lock();
299        if let Some(previous_connect_task) = attempts.remove(&id) {
300            // We are the only place that can poll the connect task, check if it finished.
301            if previous_connect_task.now_or_never().is_none() {
302                warn!(id:%; "Cancelling previous connect attempt");
303            }
304        }
305        let connect_task = fasync::Task::spawn(async move {
306            if let Err(e) = sender.send(connect_peer(proxy, id, channel_params).await.map(Some)) {
307                warn!(id:%, e:?; "Failed to send channel connect result");
308            }
309        });
310        let _ = attempts.insert(id, connect_task);
311        recv
312    }
313
314    /// Accept a channel that is connected to the peer `id`.
315    /// If `initiator_delay` is set, attempt to start a stream after the specified delay.
316    /// `initiator_delay` has no effect if the peer already has a control channel.
317    /// Returns a weak peer pointer (even if it was previously connected) if successful.
318    pub async fn connected(
319        &self,
320        id: PeerId,
321        channel: Channel,
322        initiator_delay: Option<zx::MonotonicDuration>,
323    ) -> Result<DetachableWeak<PeerId, Peer>, Error> {
324        if let Some(weak) = self.get_weak(&id) {
325            let peer =
326                weak.upgrade().ok_or_else(|| format_err!("Disconnected connecting transport"))?;
327            if let Err(e) = peer.receive_channel(channel) {
328                warn!(id:%, e:%; "failed to connect channel");
329                return Err(e.into());
330            }
331            return Ok(weak);
332        }
333
334        let entry = self.connected.lazy_entry(&id);
335
336        info!(id:%; "peer connected");
337        let avdtp_peer = avdtp::Peer::new(channel);
338
339        let mut peer = Peer::create(
340            id,
341            avdtp_peer,
342            self.streams_builder.peer_streams(&id, None).await?,
343            Some(self.permits.clone()),
344            self.profile.clone(),
345            self.metrics.clone(),
346        );
347
348        self.discovered.lock().connected(id);
349
350        let peer_preferred_direction = if let Some((desc, dir)) = self.discovered.lock().get(&id) {
351            let _ = peer.set_descriptor(desc);
352            dir
353        } else {
354            None
355        };
356
357        if let Err(e) = peer.iattach(&self.inspect, inspect::unique_name("peer_")) {
358            warn!(id:%, e:?; "Couldn't attach inspect");
359        }
360
361        let closed_fut = peer.closed();
362        let peer = match entry.try_insert(peer) {
363            Err(_peer) => {
364                warn!(id:%; "Peer connected while we were setting up");
365                return self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"));
366            }
367            Ok(weak_peer) => weak_peer,
368        };
369
370        if let Some(delay) = initiator_delay {
371            let peer = peer.clone();
372            let peer_id = peer.key().clone();
373            // Bias the codec negotiation with the peer's preferred direction that was discovered
374            // from the SDP service search.
375            let negotiation = self
376                .streams_builder
377                .negotiation(
378                    &id,
379                    None,
380                    peer_preferred_direction.unwrap_or_else(|| self.preferred_peer_direction()),
381                )
382                .await?;
383            let start_stream_task = fuchsia_async::Task::local(async move {
384                let delay_sec = delay.into_millis() as f64 / 1000.0;
385                info!(id:% = peer.key(); "dwelling {delay_sec}s for peer initiation");
386                fasync::Timer::new(fasync::MonotonicInstant::after(delay)).await;
387
388                if let Err(e) = ConnectedPeers::start_streaming(&peer, negotiation).await {
389                    info!(id:% = peer.key(), e:?; "Peer start streaming failed");
390                    peer.detach();
391                }
392            });
393            if self.start_stream_tasks.lock().insert(peer_id, start_stream_task).is_some() {
394                info!(peer_id:%; "Replacing previous start stream dwell");
395            }
396        }
397
398        // Remove the peer when we disconnect.
399        fasync::Task::local(async move {
400            closed_fut.await;
401            peer.detach();
402        })
403        .detach();
404
405        let peer = self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"))?;
406        self.notify_connected(&peer);
407        Ok(peer)
408    }
409
410    /// Notify the listeners that a new peer has been connected to.
411    fn notify_connected(&self, peer: &DetachableWeak<PeerId, Peer>) {
412        let mut senders = self.connected_peer_senders.lock();
413        senders.retain_mut(|sender| sender.try_send(peer.clone()).is_ok());
414    }
415
416    /// Get a stream that produces peers that have been connected.
417    pub fn connected_stream(&self) -> PeerConnections {
418        let (sender, receiver) = mpsc::channel(0);
419        self.connected_peer_senders.lock().push(sender);
420        PeerConnections { stream: receiver }
421    }
422}
423
424impl Inspect for &mut ConnectedPeers {
425    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
426        self.inspect = parent.create_child(name.as_ref());
427        let peer_dir_str = format!("{:?}", self.preferred_peer_direction());
428        self.inspect_peer_direction =
429            self.inspect.create_string("preferred_peer_direction", peer_dir_str);
430        self.streams_builder.iattach(&self.inspect, "streams_builder")?;
431        self.discovered.lock().iattach(&self.inspect, "discovered")
432    }
433}
434
435/// Provides a stream of peers that have been connected to. This stream produces an item whenever
436/// an A2DP peer has been connected.  It will produce None when no more peers will be connected.
437pub struct PeerConnections {
438    stream: mpsc::Receiver<DetachableWeak<PeerId, Peer>>,
439}
440
441impl Stream for PeerConnections {
442    type Item = DetachableWeak<PeerId, Peer>;
443
444    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
445        self.stream.poll_next_unpin(cx)
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452
453    use async_utils::PollExt;
454    use bt_avdtp::{Request, ServiceCapability};
455    use diagnostics_assertions::assert_data_tree;
456    use fidl::endpoints::create_proxy_and_stream;
457    use fidl_fuchsia_bluetooth_bredr::{
458        AudioOffloadExtProxy, ProfileMarker, ProfileRequestStream, ServiceClassProfileIdentifier,
459    };
460    use futures::future::BoxFuture;
461    use std::pin::pin;
462
463    use crate::codec::MediaCodecConfig;
464    use crate::media_task::{MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
465    use crate::media_types::*;
466
467    fn run_to_stalled(exec: &mut fasync::TestExecutor) {
468        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
469    }
470
471    fn exercise_avdtp(exec: &mut fasync::TestExecutor, remote: Channel, peer: &Peer) {
472        let remote_avdtp = avdtp::Peer::new(remote);
473        let mut remote_requests = remote_avdtp.take_request_stream();
474
475        // Should be able to actually communicate via the peer.
476        let avdtp = peer.avdtp();
477        let discover_fut = avdtp.discover();
478
479        let mut discover_fut = pin!(discover_fut);
480
481        assert!(exec.run_until_stalled(&mut discover_fut).is_pending());
482
483        let responder = match exec.run_until_stalled(&mut remote_requests.next()) {
484            Poll::Ready(Some(Ok(Request::Discover { responder }))) => responder,
485            x => panic!("Expected a Ready Discovery request but got {:?}", x),
486        };
487
488        let endpoint_id = avdtp::StreamEndpointId::try_from(1).expect("endpointid creation");
489
490        let information = avdtp::StreamInformation::new(
491            endpoint_id,
492            false,
493            avdtp::MediaType::Audio,
494            avdtp::EndpointType::Source,
495        );
496
497        responder.send(&[information]).expect("Sending response should have worked");
498
499        let _stream_infos = match exec.run_until_stalled(&mut discover_fut) {
500            Poll::Ready(Ok(infos)) => infos,
501            x => panic!("Expected a Ready response but got {:?}", x),
502        };
503    }
504
505    fn setup_connected_peer_test(
506    ) -> (fasync::TestExecutor, PeerId, ConnectedPeers, ProfileRequestStream) {
507        let exec = fasync::TestExecutor::new();
508        let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
509        let id = PeerId(1);
510
511        let peers = ConnectedPeers::new(
512            StreamsBuilder::default(),
513            Permits::new(1),
514            proxy,
515            bt_metrics::MetricsLogger::default(),
516        );
517
518        (exec, id, peers, stream)
519    }
520
521    #[fuchsia::test]
522    fn connect_creates_peer() {
523        let (mut exec, id, peers, _stream) = setup_connected_peer_test();
524
525        let (remote, channel) = Channel::create();
526
527        let peer = exec
528            .run_singlethreaded(peers.connected(id, channel, None))
529            .expect("peer should connect");
530        let peer = peer.upgrade().expect("peer should be connected");
531
532        exercise_avdtp(&mut exec, remote, &peer);
533    }
534
535    #[fuchsia::test]
536    fn connect_notifies_streams() {
537        let (mut exec, id, peers, _stream) = setup_connected_peer_test();
538
539        let (remote, channel) = Channel::create();
540
541        let mut peer_stream = peers.connected_stream();
542        let mut peer_stream_two = peers.connected_stream();
543
544        let peer = exec
545            .run_singlethreaded(peers.connected(id, channel, None))
546            .expect("peer should connect");
547        let peer = peer.upgrade().expect("peer should be connected");
548
549        // Peers should have been notified of the new peer
550        let weak = exec.run_singlethreaded(peer_stream.next()).expect("peer stream to produce");
551        assert_eq!(weak.key(), &id);
552        let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
553        assert_eq!(weak.key(), &id);
554
555        exercise_avdtp(&mut exec, remote, &peer);
556
557        // If you drop one stream, the other one should still produce.
558        drop(peer_stream);
559
560        let id2 = PeerId(2);
561        let (remote2, channel2) = Channel::create();
562        let peer2 = exec
563            .run_singlethreaded(peers.connected(id2, channel2, None))
564            .expect("peer should connect");
565        let peer2 = peer2.upgrade().expect("peer two should be connected");
566
567        let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
568        assert_eq!(weak.key(), &id2);
569
570        exercise_avdtp(&mut exec, remote2, &peer2);
571    }
572
573    #[fuchsia::test]
574    fn find_preferred_direction_returns_correct_endpoints() {
575        let empty = HashSet::new();
576        assert_eq!(find_preferred_direction(&empty), None);
577
578        let sink_only = HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter());
579        assert_eq!(find_preferred_direction(&sink_only), Some(avdtp::EndpointType::Sink));
580
581        let source_only = HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter());
582        assert_eq!(find_preferred_direction(&source_only), Some(avdtp::EndpointType::Source));
583
584        let both = HashSet::from_iter(
585            vec![avdtp::EndpointType::Sink, avdtp::EndpointType::Source].into_iter(),
586        );
587        assert_eq!(find_preferred_direction(&both), None);
588    }
589
590    // Expected chosen ID for the AAC stream endpoint.
591    const AAC_SEID: u8 = 8;
592    // Expected chosen ID for the SBC sink stream endpoint.
593    const SBC_SINK_SEID: u8 = 9;
594    // Expected chosen ID for the SBC source stream endpoint.
595    const SBC_SOURCE_SEID: u8 = 10;
596
597    fn aac_sink_codec() -> avdtp::ServiceCapability {
598        AacCodecInfo::new(
599            AacObjectType::MANDATORY_SNK,
600            AacSamplingFrequency::MANDATORY_SNK,
601            AacChannels::MANDATORY_SNK,
602            true,
603            0, // 0 = Unknown constant bitrate support (A2DP Sec. 4.5.2.4)
604        )
605        .unwrap()
606        .into()
607    }
608
609    fn sbc_sink_codec() -> avdtp::ServiceCapability {
610        SbcCodecInfo::new(
611            SbcSamplingFrequency::MANDATORY_SNK,
612            SbcChannelMode::MANDATORY_SNK,
613            SbcBlockCount::MANDATORY_SNK,
614            SbcSubBands::MANDATORY_SNK,
615            SbcAllocation::MANDATORY_SNK,
616            SbcCodecInfo::BITPOOL_MIN,
617            SbcCodecInfo::BITPOOL_MAX,
618        )
619        .unwrap()
620        .into()
621    }
622
623    fn sbc_source_codec() -> avdtp::ServiceCapability {
624        SbcCodecInfo::new(
625            SbcSamplingFrequency::FREQ48000HZ,
626            SbcChannelMode::JOINT_STEREO,
627            SbcBlockCount::MANDATORY_SRC,
628            SbcSubBands::MANDATORY_SRC,
629            SbcAllocation::MANDATORY_SRC,
630            SbcCodecInfo::BITPOOL_MIN,
631            SbcCodecInfo::BITPOOL_MAX,
632        )
633        .unwrap()
634        .into()
635    }
636
637    #[derive(Clone)]
638    struct FakeBuilder {
639        capability: avdtp::ServiceCapability,
640        direction: avdtp::EndpointType,
641    }
642
643    impl MediaTaskBuilder for FakeBuilder {
644        fn configure(
645            &self,
646            _peer_id: &PeerId,
647            codec_config: &MediaCodecConfig,
648        ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
649            if self.capability.codec_type() == Some(codec_config.codec_type()) {
650                return Ok(Box::new(FakeRunner {}));
651            }
652            Err(MediaTaskError::Other(String::from("Unsupported configuring")))
653        }
654
655        fn direction(&self) -> bt_avdtp::EndpointType {
656            self.direction
657        }
658
659        fn supported_configs(
660            &self,
661            _peer_id: &PeerId,
662            _offload: Option<AudioOffloadExtProxy>,
663        ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
664            futures::future::ready(Ok(vec![(&self.capability).try_into().unwrap()])).boxed()
665        }
666    }
667
668    struct FakeRunner {}
669
670    impl MediaTaskRunner for FakeRunner {
671        fn start(
672            &mut self,
673            _stream: avdtp::MediaStream,
674            _offload: Option<AudioOffloadExtProxy>,
675        ) -> Result<Box<dyn crate::media_task::MediaTask>, MediaTaskError> {
676            Err(MediaTaskError::Other(String::from("unimplemented starting")))
677        }
678    }
679
680    /// Sets up a test in which we expect to select a stream and connect to a peer.
681    /// Returns the executor, connected peers (under test), request stream for profile interaction,
682    /// and an SBC and AAC Sink service capability.
683    fn setup_negotiation_test() -> (
684        fasync::TestExecutor,
685        ConnectedPeers,
686        ProfileRequestStream,
687        ServiceCapability,
688        ServiceCapability,
689    ) {
690        let exec = fasync::TestExecutor::new_with_fake_time();
691        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000));
692        let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
693
694        let aac_sink_codec = aac_sink_codec();
695        let sbc_sink_codec = sbc_sink_codec();
696        let aac_sink_builder = FakeBuilder {
697            capability: aac_sink_codec.clone(),
698            direction: avdtp::EndpointType::Sink,
699        };
700        let sbc_sink_builder = FakeBuilder {
701            capability: sbc_sink_codec.clone(),
702            direction: avdtp::EndpointType::Sink,
703        };
704        let sbc_source_builder =
705            FakeBuilder { capability: sbc_source_codec(), direction: avdtp::EndpointType::Source };
706
707        let mut streams_builder = StreamsBuilder::default();
708        streams_builder.add_builder(aac_sink_builder);
709        streams_builder.add_builder(sbc_sink_builder);
710        streams_builder.add_builder(sbc_source_builder);
711
712        let peers = ConnectedPeers::new(
713            streams_builder,
714            Permits::new(1),
715            proxy,
716            bt_metrics::MetricsLogger::default(),
717        );
718
719        (exec, peers, stream, sbc_sink_codec, aac_sink_codec)
720    }
721
722    #[fuchsia::test]
723    fn streaming_start_with_streaming_peer_is_noop() {
724        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
725        let id = PeerId(1);
726        let (remote, channel) = Channel::create();
727        let remote = avdtp::Peer::new(remote);
728
729        let delay = zx::MonotonicDuration::from_seconds(1);
730
731        let mut remote_requests = remote.take_request_stream();
732
733        // This starts the task in the background waiting.
734        let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
735        assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
736        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
737
738        // Before the delay expires, the peer starts the stream.
739
740        let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
741        let config_caps = &[ServiceCapability::MediaTransport, sbc_codec];
742        let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
743        let mut set_config_fut = pin!(set_config_fut);
744        match exec.run_until_stalled(&mut set_config_fut) {
745            Poll::Ready(Ok(())) => {}
746            x => panic!("Expected set config to be ready and Ok, got {:?}", x),
747        };
748
749        // The remote peer doesn't need to actually open, Set Configuration is enough of a signal.
750        // wait for the delay to expire now.
751
752        exec.set_fake_time(
753            fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
754        );
755        let _ = exec.wake_expired_timers();
756
757        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
758
759        // Shouldn't start a discovery, since the stream is scheduled to start already.
760        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
761    }
762
763    fn sbc_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
764        let remote_sbc_seid: avdtp::StreamEndpointId = 1u8.try_into().unwrap();
765        let info = avdtp::StreamInformation::new(
766            remote_sbc_seid.clone(),
767            false,
768            avdtp::MediaType::Audio,
769            avdtp::EndpointType::Source,
770        );
771        (remote_sbc_seid, info)
772    }
773
774    fn aac_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
775        let remote_aac_seid: avdtp::StreamEndpointId = 2u8.try_into().unwrap();
776        let info = avdtp::StreamInformation::new(
777            remote_aac_seid.clone(),
778            false,
779            avdtp::MediaType::Audio,
780            avdtp::EndpointType::Source,
781        );
782        (remote_aac_seid, info)
783    }
784
785    fn sbc_sink_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
786        let remote_sbc_seid: avdtp::StreamEndpointId = 3u8.try_into().unwrap();
787        let info = avdtp::StreamInformation::new(
788            remote_sbc_seid.clone(),
789            false,
790            avdtp::MediaType::Audio,
791            avdtp::EndpointType::Sink,
792        );
793        (remote_sbc_seid, info)
794    }
795
796    /// Expects an AVDTP Discovery request on the `requests` stream. Responds to
797    /// the request with the provided `response` endpoints.
798    fn expect_peer_discovery(
799        exec: &mut fasync::TestExecutor,
800        requests: &mut avdtp::RequestStream,
801        response: Vec<avdtp::StreamInformation>,
802    ) {
803        match exec.run_until_stalled(&mut requests.next()) {
804            Poll::Ready(Some(Ok(avdtp::Request::Discover { responder }))) => {
805                responder.send(&response).expect("response succeeds");
806            }
807            x => panic!("Expected a discovery request to be sent after delay, got {:?}", x),
808        };
809    }
810
811    #[fuchsia::test]
812    fn streaming_start_configure_while_discovery() {
813        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
814        let id = PeerId(1);
815        let (remote, channel) = Channel::create();
816        let remote = avdtp::Peer::new(remote);
817
818        let delay = zx::MonotonicDuration::from_seconds(1);
819
820        let mut remote_requests = remote.take_request_stream();
821
822        // This starts the task in the background waiting.
823        let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
824        assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
825        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
826
827        // The delay expires, and the discovery is start!
828        exec.set_fake_time(
829            fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
830        );
831        let _ = exec.wake_expired_timers();
832        expect_peer_discovery(
833            &mut exec,
834            &mut remote_requests,
835            vec![sbc_source_endpoint().1, aac_source_endpoint().1],
836        );
837
838        // The remote peer doesn't need to actually open, Set Configuration is enough of a signal.
839        let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
840        let config_caps = &[ServiceCapability::MediaTransport, sbc_codec.clone()];
841        let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
842        let mut set_config_fut = pin!(set_config_fut);
843        match exec.run_until_stalled(&mut set_config_fut) {
844            Poll::Ready(Ok(())) => {}
845            x => panic!("Expected set config to be ready and Ok, got {:?}", x),
846        };
847
848        // Can finish the collection process, but not attempt to configure or start a stream.
849        loop {
850            match exec.run_until_stalled(&mut remote_requests.next()) {
851                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { responder, .. }))) => {
852                    responder
853                        .send(&[avdtp::ServiceCapability::MediaTransport, sbc_codec.clone()])
854                        .expect("respond succeeds");
855                }
856                Poll::Ready(x) => panic!("Got unexpected request: {:?}", x),
857                Poll::Pending => break,
858            }
859        }
860    }
861
862    /// Tests connection initiation selects the appropriate stream endpoint based
863    /// on a biased codec negotiation that is set from the peer's discovered services.
864    #[fuchsia::test]
865    fn connect_initiation_uses_biased_codec_negotiation_by_peer() {
866        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
867        let id = PeerId(1);
868        let (remote, channel) = Channel::create();
869
870        // System biases towards the Source direction (called when the AudioMode FIDL changes).
871        peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
872
873        // New fake peer discovered with some descriptor - the peer's SDP entry shows Sink.
874        let remote = avdtp::Peer::new(remote);
875        let desc = ProfileDescriptor {
876            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
877            major_version: Some(1),
878            minor_version: Some(2),
879            ..Default::default()
880        };
881        let preferred_direction = vec![avdtp::EndpointType::Sink];
882        let delay = zx::MonotonicDuration::from_seconds(1);
883        peers.found(id, desc, HashSet::from_iter(preferred_direction.into_iter()));
884
885        let connected_fut = peers.connected(id, channel, Some(delay));
886        let mut connected_fut = std::pin::pin!(connected_fut);
887        let _ = exec
888            .run_until_stalled(&mut connected_fut)
889            .expect("is ready")
890            .expect("connect control channel is ok");
891        // run the start task until it's stalled.
892        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
893
894        let mut remote_requests = remote.take_request_stream();
895
896        // Should wait for the specified amount of time.
897        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
898
899        exec.set_fake_time(fasync::MonotonicInstant::after(
900            delay + zx::MonotonicDuration::from_micros(1),
901        ));
902        let _ = exec.wake_expired_timers();
903
904        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
905        // Even though the peer supports both SBC Sink and Source, we expect to negotiate and start
906        // on the Sink endpoint since that is the peer's preferred one.
907        let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
908        let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
909        expect_peer_discovery(
910            &mut exec,
911            &mut remote_requests,
912            vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
913        );
914        for _twice in 1..=2 {
915            match exec.run_until_stalled(&mut remote_requests.next()) {
916                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
917                    let codec = match stream_id {
918                        id if id == peer_sbc_source_seid => sbc_codec.clone(),
919                        id if id == peer_sbc_sink_seid => sbc_codec.clone(),
920                        x => panic!("Got unexpected get_capabilities seid {:?}", x),
921                    };
922                    responder
923                        .send(&[avdtp::ServiceCapability::MediaTransport, codec])
924                        .expect("respond succeeds");
925                }
926                x => panic!("Expected a ready get capabilities request, got {:?}", x),
927            };
928        }
929
930        match exec.run_until_stalled(&mut remote_requests.next()) {
931            Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
932                local_stream_id,
933                remote_stream_id,
934                capabilities: _,
935                responder,
936            }))) => {
937                // We expect the set configuration to apply to the remote peer's Sink SEID and the
938                // local Source SEID.
939                assert_eq!(peer_sbc_sink_seid, local_stream_id);
940                let local_sbc_source_seid: avdtp::StreamEndpointId =
941                    SBC_SOURCE_SEID.try_into().unwrap();
942                assert_eq!(local_sbc_source_seid, remote_stream_id);
943                responder.send().expect("response sends");
944            }
945            x => panic!("Expected a ready set configuration request, got {:?}", x),
946        };
947    }
948
949    /// Tests connection initiation selects the appropriate stream endpoint based
950    /// on a biased codec negotiation that is set from by the system (in practice, the AudioMode
951    /// FIDL). This case typically occurs when a peer advertises both sink and source, and therefore
952    /// has no preference for the endpoint direction.
953    #[fuchsia::test]
954    fn connect_initiation_uses_biased_codec_negotiation_by_system() {
955        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
956        let id = PeerId(1);
957        let (remote, channel) = Channel::create();
958
959        // System biases towards the Source direction (called when the AudioMode FIDL changes).
960        peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
961
962        // New fake peer discovered with separate Sink and Source entries.
963        let remote = avdtp::Peer::new(remote);
964        let desc = ProfileDescriptor {
965            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
966            major_version: Some(1),
967            minor_version: Some(2),
968            ..Default::default()
969        };
970        peers.found(
971            id,
972            desc.clone(),
973            HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter()),
974        );
975        peers.found(id, desc, HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter()));
976
977        let delay = zx::MonotonicDuration::from_seconds(1);
978        let connect_fut = peers.connected(id, channel, Some(delay));
979        let mut connect_fut = std::pin::pin!(connect_fut);
980        let _ = exec
981            .run_until_stalled(&mut connect_fut)
982            .expect("ready")
983            .expect("connect control channel is ok");
984        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
985
986        let mut remote_requests = remote.take_request_stream();
987        // Should wait for the specified amount of time.
988        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
989        exec.set_fake_time(fasync::MonotonicInstant::after(
990            delay + zx::MonotonicDuration::from_micros(1),
991        ));
992        let _ = exec.wake_expired_timers();
993        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
994
995        // Because the peer advertises both Sink and Source, we fall back to the system-biased
996        // direction, which is Source for the peer.
997        let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
998        let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
999        expect_peer_discovery(
1000            &mut exec,
1001            &mut remote_requests,
1002            vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
1003        );
1004        for _twice in 1..=2 {
1005            match exec.run_until_stalled(&mut remote_requests.next()) {
1006                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
1007                    let codec = match stream_id {
1008                        id if id == peer_sbc_source_seid => sbc_codec.clone(),
1009                        id if id == peer_sbc_sink_seid => sbc_codec.clone(),
1010                        x => panic!("Got unexpected get_capabilities seid {:?}", x),
1011                    };
1012                    responder
1013                        .send(&[avdtp::ServiceCapability::MediaTransport, codec])
1014                        .expect("respond succeeds");
1015                }
1016                x => panic!("Expected a ready get capabilities request, got {:?}", x),
1017            };
1018        }
1019
1020        match exec.run_until_stalled(&mut remote_requests.next()) {
1021            Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
1022                local_stream_id,
1023                remote_stream_id,
1024                capabilities: _,
1025                responder,
1026            }))) => {
1027                // We expect the set configuration to apply to the remote peer's Source SEID and the
1028                // local Sink SEID.
1029                assert_eq!(peer_sbc_source_seid, local_stream_id);
1030                let local_sbc_sink_seid: avdtp::StreamEndpointId =
1031                    SBC_SINK_SEID.try_into().unwrap();
1032                assert_eq!(local_sbc_sink_seid, remote_stream_id);
1033                responder.send().expect("response sends");
1034            }
1035            x => panic!("Expected a ready set configuration request, got {:?}", x),
1036        };
1037    }
1038
1039    #[fuchsia::test]
1040    fn connect_initiation_uses_negotiation() {
1041        let (mut exec, peers, _stream, sbc_codec, aac_codec) = setup_negotiation_test();
1042        let id = PeerId(1);
1043        let (remote, channel) = Channel::create();
1044        let remote = avdtp::Peer::new(remote);
1045
1046        let delay = zx::MonotonicDuration::from_seconds(1);
1047
1048        let mut connect_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
1049        let _ = exec
1050            .run_until_stalled(&mut connect_fut)
1051            .expect("ready")
1052            .expect("connect control channel is ok");
1053
1054        // run the start task until it's stalled.
1055        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1056
1057        let mut remote_requests = remote.take_request_stream();
1058
1059        // Should wait for the specified amount of time.
1060        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
1061
1062        exec.set_fake_time(fasync::MonotonicInstant::after(
1063            delay + zx::MonotonicDuration::from_micros(1),
1064        ));
1065        let _ = exec.wake_expired_timers();
1066
1067        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1068
1069        // Should discover remote streams, negotiate, and start.
1070        let (peer_sbc_seid, peer_sbc_endpoint) = sbc_source_endpoint();
1071        let (peer_aac_seid, peer_aac_endpoint) = aac_source_endpoint();
1072        expect_peer_discovery(
1073            &mut exec,
1074            &mut remote_requests,
1075            vec![peer_sbc_endpoint, peer_aac_endpoint],
1076        );
1077        for _twice in 1..=2 {
1078            match exec.run_until_stalled(&mut remote_requests.next()) {
1079                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
1080                    let codec = match stream_id {
1081                        id if id == peer_sbc_seid => sbc_codec.clone(),
1082                        id if id == peer_aac_seid => aac_codec.clone(),
1083                        x => panic!("Got unexpected get_capabilities seid {:?}", x),
1084                    };
1085                    responder
1086                        .send(&[avdtp::ServiceCapability::MediaTransport, codec])
1087                        .expect("respond succeeds");
1088                }
1089                x => panic!("Expected a ready get capabilities request, got {:?}", x),
1090            };
1091        }
1092
1093        match exec.run_until_stalled(&mut remote_requests.next()) {
1094            Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
1095                local_stream_id,
1096                remote_stream_id,
1097                capabilities: _,
1098                responder,
1099            }))) => {
1100                // Should set the aac stream, matched with local AAC seid.
1101                assert_eq!(peer_aac_seid, local_stream_id);
1102                let local_aac_seid: avdtp::StreamEndpointId = AAC_SEID.try_into().unwrap();
1103                assert_eq!(local_aac_seid, remote_stream_id);
1104                responder.send().expect("response sends");
1105            }
1106            x => panic!("Expected a ready set configuration request, got {:?}", x),
1107        };
1108    }
1109
1110    #[fuchsia::test]
1111    fn connected_peers_inspect() {
1112        let (mut exec, id, mut peers, _stream) = setup_connected_peer_test();
1113
1114        let inspect = inspect::Inspector::default();
1115        peers.iattach(inspect.root(), "peers").expect("should attach to inspect tree");
1116
1117        assert_data_tree!(inspect, root: {
1118            peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Sink" }});
1119
1120        peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
1121
1122        assert_data_tree!(inspect, root: {
1123            peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Source" }});
1124
1125        // Connect a peer, it should show up in the tree.
1126        let (_remote, channel) = Channel::create();
1127        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1128
1129        assert_data_tree!(inspect, root: {
1130            peers: {
1131                discovered: contains {},
1132                preferred_peer_direction: "Source",
1133                streams_builder: contains {},
1134                peer_0: { id: "0000000000000001", local_streams: contains {} }
1135            }
1136        });
1137    }
1138
1139    #[fuchsia::test]
1140    fn try_connect_cancels_previous_attempt() {
1141        let (mut exec, id, peers, mut profile_stream) = setup_connected_peer_test();
1142
1143        let mut connect_fut = peers.try_connect(id, ChannelParameters::default());
1144
1145        // Should get a request to connect, which we will stall and not respond to.
1146        let responder = match exec.run_singlethreaded(profile_stream.next()) {
1147            Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
1148            x => panic!("Expected Profile connect, got {x:?}"),
1149        };
1150
1151        // Trying to connect again should cancel the first try, and send another connect.
1152        let mut connect_again_fut = peers.try_connect(id, ChannelParameters::default());
1153        let responder_two = match exec.run_singlethreaded(profile_stream.next()) {
1154            Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
1155            x => panic!("Expected Profile connect, got {x:?}"),
1156        };
1157
1158        let first_result = exec.run_singlethreaded(&mut connect_fut);
1159        let _ = first_result.expect_err("Should have an error from first attempt");
1160
1161        // Responding on the first connect shouldn't do anything at this point.
1162        responder.send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed)).unwrap();
1163
1164        exec.run_until_stalled(&mut connect_again_fut).expect_pending("shouldn't finish");
1165
1166        let (_remote, local) = Channel::create();
1167        responder_two.send(Ok(local.try_into().unwrap())).unwrap();
1168
1169        let second_result = exec.run_singlethreaded(&mut connect_again_fut);
1170        let _ = second_result.expect("should receive the channel");
1171    }
1172
1173    #[fuchsia::test]
1174    fn connected_peers_peer_disconnect_removes_peer() {
1175        let (mut exec, id, peers, _stream) = setup_connected_peer_test();
1176
1177        let (remote, channel) = Channel::create();
1178
1179        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1180        run_to_stalled(&mut exec);
1181
1182        // Disconnect the signaling channel, peer should be gone.
1183        drop(remote);
1184
1185        run_to_stalled(&mut exec);
1186
1187        assert!(peers.get(&id).is_none());
1188    }
1189
1190    #[fuchsia::test]
1191    fn connected_peers_reconnect_works() {
1192        let (mut exec, id, peers, _stream) = setup_connected_peer_test();
1193
1194        let (remote, channel) = Channel::create();
1195        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1196        run_to_stalled(&mut exec);
1197
1198        // Disconnect the signaling channel, peer should be gone.
1199        drop(remote);
1200
1201        run_to_stalled(&mut exec);
1202
1203        assert!(peers.get(&id).is_none());
1204
1205        // Connect another peer with the same ID
1206        let (_remote, channel) = Channel::create();
1207
1208        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1209        run_to_stalled(&mut exec);
1210
1211        // Should be connected.
1212        assert!(peers.get(&id).is_some());
1213    }
1214}