bt_a2dp/
connected_peers.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Error, format_err};
6use fidl_fuchsia_bluetooth::ChannelParameters;
7use fidl_fuchsia_bluetooth_bredr::{self as bredr, ProfileDescriptor, ProfileProxy};
8use fuchsia_bluetooth::detachable_map::{DetachableMap, DetachableWeak};
9use fuchsia_bluetooth::inspect::DebugExt;
10use fuchsia_bluetooth::types::{Channel, PeerId};
11use fuchsia_inspect::{self as inspect, NumericProperty, Property};
12use fuchsia_inspect_derive::{AttachError, Inspect};
13use fuchsia_sync::Mutex;
14use futures::channel::{mpsc, oneshot};
15use futures::stream::{Stream, StreamExt};
16use futures::task::{Context, Poll};
17use futures::{Future, FutureExt, TryFutureExt};
18use log::{info, warn};
19use std::collections::hash_map::Entry;
20use std::collections::{HashMap, HashSet};
21use std::pin::Pin;
22use std::sync::Arc;
23use {bt_avdtp as avdtp, fuchsia_async as fasync};
24
25use crate::codec::CodecNegotiation;
26use crate::peer::Peer;
27use crate::permits::Permits;
28use crate::stream::StreamsBuilder;
29
30/// Statistics node for tracking various information about a peer that has been encountered.
31/// Typically used as an inspect tree node.
32struct PeerStats {
33    id: PeerId,
34    inspect_node: inspect::Node,
35    /// The number of times that this peer has been successfully connected to since discovery.
36    connection_count: inspect::UintProperty,
37}
38
39impl PeerStats {
40    fn new(id: PeerId) -> Self {
41        Self { id, inspect_node: Default::default(), connection_count: Default::default() }
42    }
43
44    fn set_descriptor(&mut self, descriptor: &ProfileDescriptor) {
45        self.inspect_node.record_string("descriptor", descriptor.debug());
46    }
47
48    fn record_connected(&mut self) {
49        let _ = self.connection_count.add(1);
50    }
51}
52
53impl Inspect for &mut PeerStats {
54    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
55        self.inspect_node = parent.create_child(name.as_ref());
56        self.inspect_node.record_string("id", self.id.to_string());
57        self.connection_count = self.inspect_node.create_uint("connection_count", 0);
58        Ok(())
59    }
60}
61
62#[derive(Default)]
63struct DiscoveredPeers {
64    /// The peers that we have discovered, with their descriptors and potential preferred
65    /// endpoint directions. Because the same peer can be discovered multiple times, with
66    /// potentially different endpoints, we maintain a set of advertised directions.
67    descriptors: HashMap<PeerId, (ProfileDescriptor, HashSet<avdtp::EndpointType>)>,
68    /// Holds the child nodes which include the ids and profile descriptors for inspect.
69    stats: HashMap<PeerId, PeerStats>,
70    /// Inspect node, usually at "discovered" in the tree.
71    inspect_node: inspect::Node,
72}
73
74impl DiscoveredPeers {
75    fn insert(
76        &mut self,
77        id: PeerId,
78        descriptor: ProfileDescriptor,
79        directions: HashSet<avdtp::EndpointType>,
80    ) {
81        self.stats
82            .entry(id)
83            .or_insert_with(|| {
84                let mut new_stats = PeerStats::new(id);
85                let _ = new_stats.iattach(&self.inspect_node, inspect::unique_name("peer_"));
86                new_stats
87            })
88            .set_descriptor(&descriptor);
89
90        match self.descriptors.entry(id) {
91            Entry::Occupied(mut entry) => {
92                entry.get_mut().0 = descriptor;
93                entry.get_mut().1.extend(&directions);
94            }
95            Entry::Vacant(entry) => {
96                let _ = entry.insert((descriptor, directions));
97            }
98        };
99    }
100
101    fn connected(&mut self, id: PeerId) {
102        if let Some(stats) = self.stats.get_mut(&id) {
103            stats.record_connected();
104        }
105    }
106
107    /// Returns the descriptor and preferred endpoint direction associated with the peer `id`.
108    fn get(&self, id: &PeerId) -> Option<(ProfileDescriptor, Option<avdtp::EndpointType>)> {
109        self.descriptors.get(id).map(|(desc, dirs)| (desc.clone(), find_preferred_direction(dirs)))
110    }
111}
112
113impl Inspect for &mut DiscoveredPeers {
114    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
115        self.inspect_node = parent.create_child(name.as_ref());
116        Ok(())
117    }
118}
119
120/// Given a set of endpoint `directions`, returns the preferred direction or None
121/// if both Sink and Source are specified.
122fn find_preferred_direction(
123    directions: &HashSet<avdtp::EndpointType>,
124) -> Option<avdtp::EndpointType> {
125    if directions.len() == 1 {
126        directions.iter().next().cloned()
127    } else {
128        // Otherwise, either there are no A2DP services or both Sink & Source are specified
129        // in which case there is no preferred direction.
130        None
131    }
132}
133
134/// Make an outgoing connection to a peer.
135async fn connect_peer(
136    proxy: ProfileProxy,
137    id: PeerId,
138    channel_params: ChannelParameters,
139) -> Result<Channel, Error> {
140    info!(id:%; "Connecting to peer");
141    let connect_fut = proxy.connect(
142        &id.into(),
143        &bredr::ConnectParameters::L2cap(bredr::L2capParameters {
144            psm: Some(bredr::PSM_AVDTP),
145            parameters: Some(channel_params),
146            ..Default::default()
147        }),
148    );
149    let channel = match connect_fut.await {
150        Err(e) => {
151            warn!(id:%, e:?; "FIDL error on connect");
152            return Err(e.into());
153        }
154        Ok(Err(e)) => return Err(format_err!("Bluetooth connect error: {e:?}")),
155        Ok(Ok(channel)) => channel,
156    };
157
158    let channel = channel
159        .try_into()
160        .map_err(|e| format_err!("Couldn't convert FIDL to BT channel: {e:?}"))?;
161    Ok(channel)
162}
163
164/// ConnectedPeers manages the set of connected peers based on discovery, new connection, and
165/// peer session lifetime.
166pub struct ConnectedPeers {
167    /// The set of connected peers.
168    connected: DetachableMap<PeerId, Peer>,
169    /// Tasks for peers that we are attempting to connect to.
170    /// Used to ensure only one outgoing attempt exists at once.
171    connection_attempts: Mutex<HashMap<PeerId, fasync::Task<()>>>,
172    /// ProfileDescriptors from discovering the peer, stored here even if the peer is disconnected
173    discovered: Mutex<DiscoveredPeers>,
174    /// Streams builder, provides a set of streams and negotiation when a peer is connected
175    streams_builder: StreamsBuilder,
176    /// The permits that each peer uses to validate that we can start a stream.
177    permits: Permits,
178    /// Profile Proxy, used to connect new transport sockets.
179    profile: ProfileProxy,
180    /// Cobalt logger to use and hand out to peers, if we are using one.
181    metrics: bt_metrics::MetricsLogger,
182    /// The 'peers' node of the inspect tree. All connected peers own a child node of this node.
183    inspect: inspect::Node,
184    /// Inspect node for which is the current preferred peer direction.
185    inspect_peer_direction: inspect::StringProperty,
186    /// Listeners for new connected peers
187    connected_peer_senders: Mutex<Vec<mpsc::Sender<DetachableWeak<PeerId, Peer>>>>,
188    /// Task handles for newly connected peer stream starts.
189    // TODO(https://fxbug.dev/42146917): Completed tasks aren't garbage-collected yet.
190    start_stream_tasks: Mutex<HashMap<PeerId, fasync::Task<()>>>,
191    /// Preferred direction for new peers.  This is the direction we prefer the peer's endpoint to
192    /// be, i.e. if we prefer Sink, locally we are Source.
193    preferred_peer_direction: Mutex<avdtp::EndpointType>,
194}
195
196impl ConnectedPeers {
197    pub fn new(
198        streams_builder: StreamsBuilder,
199        permits: Permits,
200        profile: ProfileProxy,
201        metrics: bt_metrics::MetricsLogger,
202    ) -> Self {
203        Self {
204            connected: DetachableMap::new(),
205            connection_attempts: Mutex::new(HashMap::new()),
206            discovered: Default::default(),
207            streams_builder,
208            profile,
209            permits,
210            inspect: inspect::Node::default(),
211            inspect_peer_direction: inspect::StringProperty::default(),
212            metrics,
213            connected_peer_senders: Default::default(),
214            start_stream_tasks: Default::default(),
215            preferred_peer_direction: Mutex::new(avdtp::EndpointType::Sink),
216        }
217    }
218
219    pub(crate) fn get_weak(&self, id: &PeerId) -> Option<DetachableWeak<PeerId, Peer>> {
220        self.connected.get(id)
221    }
222
223    pub(crate) fn get(&self, id: &PeerId) -> Option<Arc<Peer>> {
224        self.get_weak(id).and_then(|p| p.upgrade())
225    }
226
227    pub fn is_connected(&self, id: &PeerId) -> bool {
228        self.connected.contains_key(id)
229    }
230
231    /// Attempts to start streaming on `peer` by collecting the remote streaming endpoint
232    /// information, selecting a compatible peer using `negotiation` and starting the stream.
233    /// Does nothing and returns Ok(()) if the peer is already streaming or will start streaming
234    /// on it's own.
235    async fn start_streaming(
236        peer: &DetachableWeak<PeerId, Peer>,
237        negotiation: CodecNegotiation,
238    ) -> Result<(), anyhow::Error> {
239        let remote_streams = {
240            let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
241            if strong.streaming_active() {
242                return Ok(());
243            }
244            strong.collect_capabilities()
245        }
246        .await?;
247
248        let (negotiated, remote_seid) = negotiation
249            .select(&remote_streams)
250            .ok_or_else(|| format_err!("No compatible stream found"))?;
251
252        let strong = peer.upgrade().ok_or_else(|| format_err!("Disconnected"))?;
253        if strong.streaming_active() {
254            let peer_id = peer.key();
255            info!(peer_id:%; "Not starting streaming, it's already started");
256            return Ok(());
257        }
258        strong.stream_start(remote_seid, negotiated).await.map_err(Into::into)
259    }
260
261    pub fn found(
262        &self,
263        id: PeerId,
264        desc: ProfileDescriptor,
265        preferred_directions: HashSet<avdtp::EndpointType>,
266    ) {
267        self.discovered.lock().insert(id, desc.clone(), preferred_directions);
268        if let Some(peer) = self.get(&id) {
269            let _ = peer.set_descriptor(desc);
270        }
271    }
272
273    pub fn set_preferred_peer_direction(&self, direction: avdtp::EndpointType) {
274        *self.preferred_peer_direction.lock() = direction;
275        self.inspect_peer_direction.set(&format!("{direction:?}"));
276    }
277
278    pub fn preferred_peer_direction(&self) -> avdtp::EndpointType {
279        *self.preferred_peer_direction.lock()
280    }
281
282    pub fn try_connect(
283        &self,
284        id: PeerId,
285        channel_params: ChannelParameters,
286    ) -> impl Future<Output = Result<Option<Channel>, Error>> {
287        let proxy = self.profile.clone();
288        let connected = self.is_connected(&id);
289        let (sender, recv) = oneshot::channel();
290        let recv =
291            recv.map_ok_or_else(|_e| Err(format_err!("Connection task canceled")), Into::into);
292        if connected {
293            if let Err(e) = sender.send(Ok(None)) {
294                warn!(id:%, e:?; "Failed to notify already-connected");
295            }
296            return recv;
297        }
298        let mut attempts = self.connection_attempts.lock();
299        if let Some(previous_connect_task) = attempts.remove(&id) {
300            // We are the only place that can poll the connect task, check if it finished.
301            if previous_connect_task.now_or_never().is_none() {
302                warn!(id:%; "Cancelling previous connect attempt");
303            }
304        }
305        let connect_task = fasync::Task::spawn(async move {
306            if let Err(e) = sender.send(connect_peer(proxy, id, channel_params).await.map(Some)) {
307                warn!(id:%, e:?; "Failed to send channel connect result");
308            }
309        });
310        let _ = attempts.insert(id, connect_task);
311        recv
312    }
313
314    /// Accept a channel that is connected to the peer `id`.
315    /// If `initiator_delay` is set, attempt to start a stream after the specified delay.
316    /// `initiator_delay` has no effect if the peer already has a control channel.
317    /// Returns a weak peer pointer (even if it was previously connected) if successful.
318    pub async fn connected(
319        &self,
320        id: PeerId,
321        channel: Channel,
322        initiator_delay: Option<zx::MonotonicDuration>,
323    ) -> Result<DetachableWeak<PeerId, Peer>, Error> {
324        if let Some(weak) = self.get_weak(&id) {
325            let peer =
326                weak.upgrade().ok_or_else(|| format_err!("Disconnected connecting transport"))?;
327            if let Err(e) = peer.receive_channel(channel) {
328                warn!(id:%, e:%; "failed to connect channel");
329                return Err(e.into());
330            }
331            return Ok(weak);
332        }
333
334        let entry = self.connected.lazy_entry(&id);
335
336        info!(id:%; "peer connected");
337        let audio_offload = channel.audio_offload();
338        let avdtp_peer = avdtp::Peer::new(channel);
339
340        let mut peer = Peer::create(
341            id,
342            avdtp_peer,
343            self.streams_builder.peer_streams(&id, audio_offload.clone()).await?,
344            Some(self.permits.clone()),
345            self.profile.clone(),
346            self.metrics.clone(),
347        );
348
349        self.discovered.lock().connected(id);
350
351        let peer_preferred_direction = if let Some((desc, dir)) = self.discovered.lock().get(&id) {
352            let _ = peer.set_descriptor(desc);
353            dir
354        } else {
355            None
356        };
357
358        if let Err(e) = peer.iattach(&self.inspect, inspect::unique_name("peer_")) {
359            warn!(id:%, e:?; "Couldn't attach inspect");
360        }
361
362        let closed_fut = peer.closed();
363        let peer = match entry.try_insert(peer) {
364            Err(_peer) => {
365                warn!(id:%; "Peer connected while we were setting up");
366                return self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"));
367            }
368            Ok(weak_peer) => weak_peer,
369        };
370
371        if let Some(delay) = initiator_delay {
372            let peer = peer.clone();
373            let peer_id = peer.key().clone();
374
375            // Bias the codec negotiation with the peer's preferred direction that was discovered
376            // from the SDP service search.
377            let negotiation = self
378                .streams_builder
379                .negotiation(
380                    &id,
381                    audio_offload,
382                    peer_preferred_direction.unwrap_or_else(|| self.preferred_peer_direction()),
383                )
384                .await?;
385            let start_stream_task = fuchsia_async::Task::local(async move {
386                let delay_sec = delay.into_millis() as f64 / 1000.0;
387                info!(id:% = peer.key(); "dwelling {delay_sec}s for peer initiation");
388                fasync::Timer::new(fasync::MonotonicInstant::after(delay)).await;
389
390                if let Err(e) = ConnectedPeers::start_streaming(&peer, negotiation).await {
391                    info!(id:% = peer.key(), e:?; "Peer start streaming failed");
392                    peer.detach();
393                }
394            });
395            if self.start_stream_tasks.lock().insert(peer_id, start_stream_task).is_some() {
396                info!(peer_id:%; "Replacing previous start stream dwell");
397            }
398        }
399
400        // Remove the peer when we disconnect.
401        fasync::Task::local(async move {
402            closed_fut.await;
403            peer.detach();
404        })
405        .detach();
406
407        let peer = self.get_weak(&id).ok_or_else(|| format_err!("Peer missing"))?;
408        self.notify_connected(&peer);
409        Ok(peer)
410    }
411
412    /// Notify the listeners that a new peer has been connected to.
413    fn notify_connected(&self, peer: &DetachableWeak<PeerId, Peer>) {
414        let mut senders = self.connected_peer_senders.lock();
415        senders.retain_mut(|sender| sender.try_send(peer.clone()).is_ok());
416    }
417
418    /// Get a stream that produces peers that have been connected.
419    pub fn connected_stream(&self) -> PeerConnections {
420        let (sender, receiver) = mpsc::channel(0);
421        self.connected_peer_senders.lock().push(sender);
422        PeerConnections { stream: receiver }
423    }
424}
425
426impl Inspect for &mut ConnectedPeers {
427    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
428        self.inspect = parent.create_child(name.as_ref());
429        let peer_dir_str = format!("{:?}", self.preferred_peer_direction());
430        self.inspect_peer_direction =
431            self.inspect.create_string("preferred_peer_direction", peer_dir_str);
432        self.streams_builder.iattach(&self.inspect, "streams_builder")?;
433        self.discovered.lock().iattach(&self.inspect, "discovered")
434    }
435}
436
437/// Provides a stream of peers that have been connected to. This stream produces an item whenever
438/// an A2DP peer has been connected.  It will produce None when no more peers will be connected.
439pub struct PeerConnections {
440    stream: mpsc::Receiver<DetachableWeak<PeerId, Peer>>,
441}
442
443impl Stream for PeerConnections {
444    type Item = DetachableWeak<PeerId, Peer>;
445
446    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
447        self.stream.poll_next_unpin(cx)
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    use async_utils::PollExt;
456    use bt_avdtp::{Request, ServiceCapability};
457    use diagnostics_assertions::assert_data_tree;
458    use fidl::endpoints::create_proxy_and_stream;
459    use fidl_fuchsia_bluetooth_bredr::{
460        AudioOffloadExtProxy, ProfileMarker, ProfileRequestStream, ServiceClassProfileIdentifier,
461    };
462    use futures::future::BoxFuture;
463    use std::pin::pin;
464
465    use crate::codec::MediaCodecConfig;
466    use crate::media_task::{MediaTaskBuilder, MediaTaskError, MediaTaskRunner};
467    use crate::media_types::*;
468
469    fn run_to_stalled(exec: &mut fasync::TestExecutor) {
470        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
471    }
472
473    fn exercise_avdtp(exec: &mut fasync::TestExecutor, remote: Channel, peer: &Peer) {
474        let remote_avdtp = avdtp::Peer::new(remote);
475        let mut remote_requests = remote_avdtp.take_request_stream();
476
477        // Should be able to actually communicate via the peer.
478        let avdtp = peer.avdtp();
479        let discover_fut = avdtp.discover();
480
481        let mut discover_fut = pin!(discover_fut);
482
483        assert!(exec.run_until_stalled(&mut discover_fut).is_pending());
484
485        let responder = match exec.run_until_stalled(&mut remote_requests.next()) {
486            Poll::Ready(Some(Ok(Request::Discover { responder }))) => responder,
487            x => panic!("Expected a Ready Discovery request but got {:?}", x),
488        };
489
490        let endpoint_id = avdtp::StreamEndpointId::try_from(1).expect("endpointid creation");
491
492        let information = avdtp::StreamInformation::new(
493            endpoint_id,
494            false,
495            avdtp::MediaType::Audio,
496            avdtp::EndpointType::Source,
497        );
498
499        responder.send(&[information]).expect("Sending response should have worked");
500
501        let _stream_infos = match exec.run_until_stalled(&mut discover_fut) {
502            Poll::Ready(Ok(infos)) => infos,
503            x => panic!("Expected a Ready response but got {:?}", x),
504        };
505    }
506
507    fn setup_connected_peer_test()
508    -> (fasync::TestExecutor, PeerId, ConnectedPeers, ProfileRequestStream) {
509        let exec = fasync::TestExecutor::new();
510        let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
511        let id = PeerId(1);
512
513        let peers = ConnectedPeers::new(
514            StreamsBuilder::default(),
515            Permits::new(1),
516            proxy,
517            bt_metrics::MetricsLogger::default(),
518        );
519
520        (exec, id, peers, stream)
521    }
522
523    #[fuchsia::test]
524    fn connect_creates_peer() {
525        let (mut exec, id, peers, _stream) = setup_connected_peer_test();
526
527        let (remote, channel) = Channel::create();
528
529        let peer = exec
530            .run_singlethreaded(peers.connected(id, channel, None))
531            .expect("peer should connect");
532        let peer = peer.upgrade().expect("peer should be connected");
533
534        exercise_avdtp(&mut exec, remote, &peer);
535    }
536
537    #[fuchsia::test]
538    fn connect_notifies_streams() {
539        let (mut exec, id, peers, _stream) = setup_connected_peer_test();
540
541        let (remote, channel) = Channel::create();
542
543        let mut peer_stream = peers.connected_stream();
544        let mut peer_stream_two = peers.connected_stream();
545
546        let peer = exec
547            .run_singlethreaded(peers.connected(id, channel, None))
548            .expect("peer should connect");
549        let peer = peer.upgrade().expect("peer should be connected");
550
551        // Peers should have been notified of the new peer
552        let weak = exec.run_singlethreaded(peer_stream.next()).expect("peer stream to produce");
553        assert_eq!(weak.key(), &id);
554        let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
555        assert_eq!(weak.key(), &id);
556
557        exercise_avdtp(&mut exec, remote, &peer);
558
559        // If you drop one stream, the other one should still produce.
560        drop(peer_stream);
561
562        let id2 = PeerId(2);
563        let (remote2, channel2) = Channel::create();
564        let peer2 = exec
565            .run_singlethreaded(peers.connected(id2, channel2, None))
566            .expect("peer should connect");
567        let peer2 = peer2.upgrade().expect("peer two should be connected");
568
569        let weak = exec.run_singlethreaded(peer_stream_two.next()).expect("peer stream to produce");
570        assert_eq!(weak.key(), &id2);
571
572        exercise_avdtp(&mut exec, remote2, &peer2);
573    }
574
575    #[fuchsia::test]
576    fn find_preferred_direction_returns_correct_endpoints() {
577        let empty = HashSet::new();
578        assert_eq!(find_preferred_direction(&empty), None);
579
580        let sink_only = HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter());
581        assert_eq!(find_preferred_direction(&sink_only), Some(avdtp::EndpointType::Sink));
582
583        let source_only = HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter());
584        assert_eq!(find_preferred_direction(&source_only), Some(avdtp::EndpointType::Source));
585
586        let both = HashSet::from_iter(
587            vec![avdtp::EndpointType::Sink, avdtp::EndpointType::Source].into_iter(),
588        );
589        assert_eq!(find_preferred_direction(&both), None);
590    }
591
592    // Expected chosen ID for the AAC stream endpoint.
593    const AAC_SEID: u8 = 8;
594    // Expected chosen ID for the SBC sink stream endpoint.
595    const SBC_SINK_SEID: u8 = 9;
596    // Expected chosen ID for the SBC source stream endpoint.
597    const SBC_SOURCE_SEID: u8 = 10;
598
599    fn aac_sink_codec() -> avdtp::ServiceCapability {
600        AacCodecInfo::new(
601            AacObjectType::MANDATORY_SNK,
602            AacSamplingFrequency::MANDATORY_SNK,
603            AacChannels::MANDATORY_SNK,
604            true,
605            0, // 0 = Unknown constant bitrate support (A2DP Sec. 4.5.2.4)
606        )
607        .unwrap()
608        .into()
609    }
610
611    fn sbc_sink_codec() -> avdtp::ServiceCapability {
612        SbcCodecInfo::new(
613            SbcSamplingFrequency::MANDATORY_SNK,
614            SbcChannelMode::MANDATORY_SNK,
615            SbcBlockCount::MANDATORY_SNK,
616            SbcSubBands::MANDATORY_SNK,
617            SbcAllocation::MANDATORY_SNK,
618            SbcCodecInfo::BITPOOL_MIN,
619            SbcCodecInfo::BITPOOL_MAX,
620        )
621        .unwrap()
622        .into()
623    }
624
625    fn sbc_source_codec() -> avdtp::ServiceCapability {
626        SbcCodecInfo::new(
627            SbcSamplingFrequency::FREQ48000HZ,
628            SbcChannelMode::JOINT_STEREO,
629            SbcBlockCount::MANDATORY_SRC,
630            SbcSubBands::MANDATORY_SRC,
631            SbcAllocation::MANDATORY_SRC,
632            SbcCodecInfo::BITPOOL_MIN,
633            SbcCodecInfo::BITPOOL_MAX,
634        )
635        .unwrap()
636        .into()
637    }
638
639    #[derive(Clone)]
640    struct FakeBuilder {
641        capability: avdtp::ServiceCapability,
642        direction: avdtp::EndpointType,
643    }
644
645    impl MediaTaskBuilder for FakeBuilder {
646        fn configure(
647            &self,
648            _peer_id: &PeerId,
649            codec_config: &MediaCodecConfig,
650        ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
651            if self.capability.codec_type() == Some(codec_config.codec_type()) {
652                return Ok(Box::new(FakeRunner {}));
653            }
654            Err(MediaTaskError::Other(String::from("Unsupported configuring")))
655        }
656
657        fn direction(&self) -> bt_avdtp::EndpointType {
658            self.direction
659        }
660
661        fn supported_configs(
662            &self,
663            _peer_id: &PeerId,
664            _offload: Option<AudioOffloadExtProxy>,
665        ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
666            futures::future::ready(Ok(vec![(&self.capability).try_into().unwrap()])).boxed()
667        }
668    }
669
670    struct FakeRunner {}
671
672    impl MediaTaskRunner for FakeRunner {
673        fn start(
674            &mut self,
675            _stream: avdtp::MediaStream,
676            _offload: Option<AudioOffloadExtProxy>,
677        ) -> Result<Box<dyn crate::media_task::MediaTask>, MediaTaskError> {
678            Err(MediaTaskError::Other(String::from("unimplemented starting")))
679        }
680    }
681
682    /// Sets up a test in which we expect to select a stream and connect to a peer.
683    /// Returns the executor, connected peers (under test), request stream for profile interaction,
684    /// and an SBC and AAC Sink service capability.
685    fn setup_negotiation_test() -> (
686        fasync::TestExecutor,
687        ConnectedPeers,
688        ProfileRequestStream,
689        ServiceCapability,
690        ServiceCapability,
691    ) {
692        let exec = fasync::TestExecutor::new_with_fake_time();
693        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(1_000_000));
694        let (proxy, stream) = create_proxy_and_stream::<ProfileMarker>();
695
696        let aac_sink_codec = aac_sink_codec();
697        let sbc_sink_codec = sbc_sink_codec();
698        let aac_sink_builder = FakeBuilder {
699            capability: aac_sink_codec.clone(),
700            direction: avdtp::EndpointType::Sink,
701        };
702        let sbc_sink_builder = FakeBuilder {
703            capability: sbc_sink_codec.clone(),
704            direction: avdtp::EndpointType::Sink,
705        };
706        let sbc_source_builder =
707            FakeBuilder { capability: sbc_source_codec(), direction: avdtp::EndpointType::Source };
708
709        let mut streams_builder = StreamsBuilder::default();
710        streams_builder.add_builder(aac_sink_builder);
711        streams_builder.add_builder(sbc_sink_builder);
712        streams_builder.add_builder(sbc_source_builder);
713
714        let peers = ConnectedPeers::new(
715            streams_builder,
716            Permits::new(1),
717            proxy,
718            bt_metrics::MetricsLogger::default(),
719        );
720
721        (exec, peers, stream, sbc_sink_codec, aac_sink_codec)
722    }
723
724    #[fuchsia::test]
725    fn streaming_start_with_streaming_peer_is_noop() {
726        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
727        let id = PeerId(1);
728        let (remote, channel) = Channel::create();
729        let remote = avdtp::Peer::new(remote);
730
731        let delay = zx::MonotonicDuration::from_seconds(1);
732
733        let mut remote_requests = remote.take_request_stream();
734
735        // This starts the task in the background waiting.
736        let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
737        assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
738        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
739
740        // Before the delay expires, the peer starts the stream.
741
742        let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
743        let config_caps = &[ServiceCapability::MediaTransport, sbc_codec];
744        let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
745        let mut set_config_fut = pin!(set_config_fut);
746        match exec.run_until_stalled(&mut set_config_fut) {
747            Poll::Ready(Ok(())) => {}
748            x => panic!("Expected set config to be ready and Ok, got {:?}", x),
749        };
750
751        // The remote peer doesn't need to actually open, Set Configuration is enough of a signal.
752        // wait for the delay to expire now.
753
754        exec.set_fake_time(
755            fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
756        );
757        let _ = exec.wake_expired_timers();
758
759        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
760
761        // Shouldn't start a discovery, since the stream is scheduled to start already.
762        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
763    }
764
765    fn sbc_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
766        let remote_sbc_seid: avdtp::StreamEndpointId = 1u8.try_into().unwrap();
767        let info = avdtp::StreamInformation::new(
768            remote_sbc_seid.clone(),
769            false,
770            avdtp::MediaType::Audio,
771            avdtp::EndpointType::Source,
772        );
773        (remote_sbc_seid, info)
774    }
775
776    fn aac_source_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
777        let remote_aac_seid: avdtp::StreamEndpointId = 2u8.try_into().unwrap();
778        let info = avdtp::StreamInformation::new(
779            remote_aac_seid.clone(),
780            false,
781            avdtp::MediaType::Audio,
782            avdtp::EndpointType::Source,
783        );
784        (remote_aac_seid, info)
785    }
786
787    fn sbc_sink_endpoint() -> (avdtp::StreamEndpointId, avdtp::StreamInformation) {
788        let remote_sbc_seid: avdtp::StreamEndpointId = 3u8.try_into().unwrap();
789        let info = avdtp::StreamInformation::new(
790            remote_sbc_seid.clone(),
791            false,
792            avdtp::MediaType::Audio,
793            avdtp::EndpointType::Sink,
794        );
795        (remote_sbc_seid, info)
796    }
797
798    /// Expects an AVDTP Discovery request on the `requests` stream. Responds to
799    /// the request with the provided `response` endpoints.
800    fn expect_peer_discovery(
801        exec: &mut fasync::TestExecutor,
802        requests: &mut avdtp::RequestStream,
803        response: Vec<avdtp::StreamInformation>,
804    ) {
805        match exec.run_until_stalled(&mut requests.next()) {
806            Poll::Ready(Some(Ok(avdtp::Request::Discover { responder }))) => {
807                responder.send(&response).expect("response succeeds");
808            }
809            x => panic!("Expected a discovery request to be sent after delay, got {:?}", x),
810        };
811    }
812
813    #[fuchsia::test]
814    fn streaming_start_configure_while_discovery() {
815        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
816        let id = PeerId(1);
817        let (remote, channel) = Channel::create();
818        let remote = avdtp::Peer::new(remote);
819
820        let delay = zx::MonotonicDuration::from_seconds(1);
821
822        let mut remote_requests = remote.take_request_stream();
823
824        // This starts the task in the background waiting.
825        let mut connected_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
826        assert!(exec.run_until_stalled(&mut connected_fut).expect("ready").is_ok());
827        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
828
829        // The delay expires, and the discovery is start!
830        exec.set_fake_time(
831            fasync::MonotonicInstant::after(delay) + zx::MonotonicDuration::from_micros(1),
832        );
833        let _ = exec.wake_expired_timers();
834        expect_peer_discovery(
835            &mut exec,
836            &mut remote_requests,
837            vec![sbc_source_endpoint().1, aac_source_endpoint().1],
838        );
839
840        // The remote peer doesn't need to actually open, Set Configuration is enough of a signal.
841        let seid: avdtp::StreamEndpointId = SBC_SINK_SEID.try_into().expect("seid to be okay");
842        let config_caps = &[ServiceCapability::MediaTransport, sbc_codec.clone()];
843        let set_config_fut = remote.set_configuration(&seid, &seid, config_caps);
844        let mut set_config_fut = pin!(set_config_fut);
845        match exec.run_until_stalled(&mut set_config_fut) {
846            Poll::Ready(Ok(())) => {}
847            x => panic!("Expected set config to be ready and Ok, got {:?}", x),
848        };
849
850        // Can finish the collection process, but not attempt to configure or start a stream.
851        loop {
852            match exec.run_until_stalled(&mut remote_requests.next()) {
853                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { responder, .. }))) => {
854                    responder
855                        .send(&[avdtp::ServiceCapability::MediaTransport, sbc_codec.clone()])
856                        .expect("respond succeeds");
857                }
858                Poll::Ready(x) => panic!("Got unexpected request: {:?}", x),
859                Poll::Pending => break,
860            }
861        }
862    }
863
864    /// Tests connection initiation selects the appropriate stream endpoint based
865    /// on a biased codec negotiation that is set from the peer's discovered services.
866    #[fuchsia::test]
867    fn connect_initiation_uses_biased_codec_negotiation_by_peer() {
868        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
869        let id = PeerId(1);
870        let (remote, channel) = Channel::create();
871
872        // System biases towards the Source direction (called when the AudioMode FIDL changes).
873        peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
874
875        // New fake peer discovered with some descriptor - the peer's SDP entry shows Sink.
876        let remote = avdtp::Peer::new(remote);
877        let desc = ProfileDescriptor {
878            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
879            major_version: Some(1),
880            minor_version: Some(2),
881            ..Default::default()
882        };
883        let preferred_direction = vec![avdtp::EndpointType::Sink];
884        let delay = zx::MonotonicDuration::from_seconds(1);
885        peers.found(id, desc, HashSet::from_iter(preferred_direction.into_iter()));
886
887        let connected_fut = peers.connected(id, channel, Some(delay));
888        let mut connected_fut = std::pin::pin!(connected_fut);
889        let _ = exec
890            .run_until_stalled(&mut connected_fut)
891            .expect("is ready")
892            .expect("connect control channel is ok");
893        // run the start task until it's stalled.
894        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
895
896        let mut remote_requests = remote.take_request_stream();
897
898        // Should wait for the specified amount of time.
899        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
900
901        exec.set_fake_time(fasync::MonotonicInstant::after(
902            delay + zx::MonotonicDuration::from_micros(1),
903        ));
904        let _ = exec.wake_expired_timers();
905
906        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
907        // Even though the peer supports both SBC Sink and Source, we expect to negotiate and start
908        // on the Sink endpoint since that is the peer's preferred one.
909        let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
910        let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
911        expect_peer_discovery(
912            &mut exec,
913            &mut remote_requests,
914            vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
915        );
916        for _twice in 1..=2 {
917            match exec.run_until_stalled(&mut remote_requests.next()) {
918                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
919                    let codec = match stream_id {
920                        id if id == peer_sbc_source_seid => sbc_codec.clone(),
921                        id if id == peer_sbc_sink_seid => sbc_codec.clone(),
922                        x => panic!("Got unexpected get_capabilities seid {:?}", x),
923                    };
924                    responder
925                        .send(&[avdtp::ServiceCapability::MediaTransport, codec])
926                        .expect("respond succeeds");
927                }
928                x => panic!("Expected a ready get capabilities request, got {:?}", x),
929            };
930        }
931
932        match exec.run_until_stalled(&mut remote_requests.next()) {
933            Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
934                local_stream_id,
935                remote_stream_id,
936                capabilities: _,
937                responder,
938            }))) => {
939                // We expect the set configuration to apply to the remote peer's Sink SEID and the
940                // local Source SEID.
941                assert_eq!(peer_sbc_sink_seid, local_stream_id);
942                let local_sbc_source_seid: avdtp::StreamEndpointId =
943                    SBC_SOURCE_SEID.try_into().unwrap();
944                assert_eq!(local_sbc_source_seid, remote_stream_id);
945                responder.send().expect("response sends");
946            }
947            x => panic!("Expected a ready set configuration request, got {:?}", x),
948        };
949    }
950
951    /// Tests connection initiation selects the appropriate stream endpoint based
952    /// on a biased codec negotiation that is set from by the system (in practice, the AudioMode
953    /// FIDL). This case typically occurs when a peer advertises both sink and source, and therefore
954    /// has no preference for the endpoint direction.
955    #[fuchsia::test]
956    fn connect_initiation_uses_biased_codec_negotiation_by_system() {
957        let (mut exec, peers, _stream, sbc_codec, _aac_codec) = setup_negotiation_test();
958        let id = PeerId(1);
959        let (remote, channel) = Channel::create();
960
961        // System biases towards the Source direction (called when the AudioMode FIDL changes).
962        peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
963
964        // New fake peer discovered with separate Sink and Source entries.
965        let remote = avdtp::Peer::new(remote);
966        let desc = ProfileDescriptor {
967            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
968            major_version: Some(1),
969            minor_version: Some(2),
970            ..Default::default()
971        };
972        peers.found(
973            id,
974            desc.clone(),
975            HashSet::from_iter(vec![avdtp::EndpointType::Source].into_iter()),
976        );
977        peers.found(id, desc, HashSet::from_iter(vec![avdtp::EndpointType::Sink].into_iter()));
978
979        let delay = zx::MonotonicDuration::from_seconds(1);
980        let connect_fut = peers.connected(id, channel, Some(delay));
981        let mut connect_fut = std::pin::pin!(connect_fut);
982        let _ = exec
983            .run_until_stalled(&mut connect_fut)
984            .expect("ready")
985            .expect("connect control channel is ok");
986        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
987
988        let mut remote_requests = remote.take_request_stream();
989        // Should wait for the specified amount of time.
990        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
991        exec.set_fake_time(fasync::MonotonicInstant::after(
992            delay + zx::MonotonicDuration::from_micros(1),
993        ));
994        let _ = exec.wake_expired_timers();
995        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
996
997        // Because the peer advertises both Sink and Source, we fall back to the system-biased
998        // direction, which is Source for the peer.
999        let (peer_sbc_source_seid, peer_sbc_source_endpoint) = sbc_source_endpoint();
1000        let (peer_sbc_sink_seid, peer_sbc_sink_endpoint) = sbc_sink_endpoint();
1001        expect_peer_discovery(
1002            &mut exec,
1003            &mut remote_requests,
1004            vec![peer_sbc_source_endpoint, peer_sbc_sink_endpoint],
1005        );
1006        for _twice in 1..=2 {
1007            match exec.run_until_stalled(&mut remote_requests.next()) {
1008                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
1009                    let codec = match stream_id {
1010                        id if id == peer_sbc_source_seid => sbc_codec.clone(),
1011                        id if id == peer_sbc_sink_seid => sbc_codec.clone(),
1012                        x => panic!("Got unexpected get_capabilities seid {:?}", x),
1013                    };
1014                    responder
1015                        .send(&[avdtp::ServiceCapability::MediaTransport, codec])
1016                        .expect("respond succeeds");
1017                }
1018                x => panic!("Expected a ready get capabilities request, got {:?}", x),
1019            };
1020        }
1021
1022        match exec.run_until_stalled(&mut remote_requests.next()) {
1023            Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
1024                local_stream_id,
1025                remote_stream_id,
1026                capabilities: _,
1027                responder,
1028            }))) => {
1029                // We expect the set configuration to apply to the remote peer's Source SEID and the
1030                // local Sink SEID.
1031                assert_eq!(peer_sbc_source_seid, local_stream_id);
1032                let local_sbc_sink_seid: avdtp::StreamEndpointId =
1033                    SBC_SINK_SEID.try_into().unwrap();
1034                assert_eq!(local_sbc_sink_seid, remote_stream_id);
1035                responder.send().expect("response sends");
1036            }
1037            x => panic!("Expected a ready set configuration request, got {:?}", x),
1038        };
1039    }
1040
1041    #[fuchsia::test]
1042    fn connect_initiation_uses_negotiation() {
1043        let (mut exec, peers, _stream, sbc_codec, aac_codec) = setup_negotiation_test();
1044        let id = PeerId(1);
1045        let (remote, channel) = Channel::create();
1046        let remote = avdtp::Peer::new(remote);
1047
1048        let delay = zx::MonotonicDuration::from_seconds(1);
1049
1050        let mut connect_fut = std::pin::pin!(peers.connected(id, channel, Some(delay)));
1051        let _ = exec
1052            .run_until_stalled(&mut connect_fut)
1053            .expect("ready")
1054            .expect("connect control channel is ok");
1055
1056        // run the start task until it's stalled.
1057        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1058
1059        let mut remote_requests = remote.take_request_stream();
1060
1061        // Should wait for the specified amount of time.
1062        assert!(exec.run_until_stalled(&mut remote_requests.next()).is_pending());
1063
1064        exec.set_fake_time(fasync::MonotonicInstant::after(
1065            delay + zx::MonotonicDuration::from_micros(1),
1066        ));
1067        let _ = exec.wake_expired_timers();
1068
1069        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1070
1071        // Should discover remote streams, negotiate, and start.
1072        let (peer_sbc_seid, peer_sbc_endpoint) = sbc_source_endpoint();
1073        let (peer_aac_seid, peer_aac_endpoint) = aac_source_endpoint();
1074        expect_peer_discovery(
1075            &mut exec,
1076            &mut remote_requests,
1077            vec![peer_sbc_endpoint, peer_aac_endpoint],
1078        );
1079        for _twice in 1..=2 {
1080            match exec.run_until_stalled(&mut remote_requests.next()) {
1081                Poll::Ready(Some(Ok(avdtp::Request::GetCapabilities { stream_id, responder }))) => {
1082                    let codec = match stream_id {
1083                        id if id == peer_sbc_seid => sbc_codec.clone(),
1084                        id if id == peer_aac_seid => aac_codec.clone(),
1085                        x => panic!("Got unexpected get_capabilities seid {:?}", x),
1086                    };
1087                    responder
1088                        .send(&[avdtp::ServiceCapability::MediaTransport, codec])
1089                        .expect("respond succeeds");
1090                }
1091                x => panic!("Expected a ready get capabilities request, got {:?}", x),
1092            };
1093        }
1094
1095        match exec.run_until_stalled(&mut remote_requests.next()) {
1096            Poll::Ready(Some(Ok(avdtp::Request::SetConfiguration {
1097                local_stream_id,
1098                remote_stream_id,
1099                capabilities: _,
1100                responder,
1101            }))) => {
1102                // Should set the aac stream, matched with local AAC seid.
1103                assert_eq!(peer_aac_seid, local_stream_id);
1104                let local_aac_seid: avdtp::StreamEndpointId = AAC_SEID.try_into().unwrap();
1105                assert_eq!(local_aac_seid, remote_stream_id);
1106                responder.send().expect("response sends");
1107            }
1108            x => panic!("Expected a ready set configuration request, got {:?}", x),
1109        };
1110    }
1111
1112    #[fuchsia::test]
1113    fn connected_peers_inspect() {
1114        let (mut exec, id, mut peers, _stream) = setup_connected_peer_test();
1115
1116        let inspect = inspect::Inspector::default();
1117        peers.iattach(inspect.root(), "peers").expect("should attach to inspect tree");
1118
1119        assert_data_tree!(@executor exec, inspect, root: {
1120            peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Sink" }});
1121
1122        peers.set_preferred_peer_direction(avdtp::EndpointType::Source);
1123
1124        assert_data_tree!(@executor exec, inspect, root: {
1125            peers: { streams_builder: contains {}, discovered: contains {}, preferred_peer_direction: "Source" }});
1126
1127        // Connect a peer, it should show up in the tree.
1128        let (_remote, channel) = Channel::create();
1129        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1130
1131        assert_data_tree!(@executor exec, inspect, root: {
1132            peers: {
1133                discovered: contains {},
1134                preferred_peer_direction: "Source",
1135                streams_builder: contains {},
1136                peer_0: { id: "0000000000000001", local_streams: contains {} }
1137            }
1138        });
1139    }
1140
1141    #[fuchsia::test]
1142    fn try_connect_cancels_previous_attempt() {
1143        let (mut exec, id, peers, mut profile_stream) = setup_connected_peer_test();
1144
1145        let mut connect_fut = peers.try_connect(id, ChannelParameters::default());
1146
1147        // Should get a request to connect, which we will stall and not respond to.
1148        let responder = match exec.run_singlethreaded(profile_stream.next()) {
1149            Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
1150            x => panic!("Expected Profile connect, got {x:?}"),
1151        };
1152
1153        // Trying to connect again should cancel the first try, and send another connect.
1154        let mut connect_again_fut = peers.try_connect(id, ChannelParameters::default());
1155        let responder_two = match exec.run_singlethreaded(profile_stream.next()) {
1156            Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => responder,
1157            x => panic!("Expected Profile connect, got {x:?}"),
1158        };
1159
1160        let first_result = exec.run_singlethreaded(&mut connect_fut);
1161        let _ = first_result.expect_err("Should have an error from first attempt");
1162
1163        // Responding on the first connect shouldn't do anything at this point.
1164        responder.send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed)).unwrap();
1165
1166        exec.run_until_stalled(&mut connect_again_fut).expect_pending("shouldn't finish");
1167
1168        let (_remote, local) = Channel::create();
1169        responder_two.send(Ok(local.try_into().unwrap())).unwrap();
1170
1171        let second_result = exec.run_singlethreaded(&mut connect_again_fut);
1172        let _ = second_result.expect("should receive the channel");
1173    }
1174
1175    #[fuchsia::test]
1176    fn connected_peers_peer_disconnect_removes_peer() {
1177        let (mut exec, id, peers, _stream) = setup_connected_peer_test();
1178
1179        let (remote, channel) = Channel::create();
1180
1181        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1182        run_to_stalled(&mut exec);
1183
1184        // Disconnect the signaling channel, peer should be gone.
1185        drop(remote);
1186
1187        run_to_stalled(&mut exec);
1188
1189        assert!(peers.get(&id).is_none());
1190    }
1191
1192    #[fuchsia::test]
1193    fn connected_peers_reconnect_works() {
1194        let (mut exec, id, peers, _stream) = setup_connected_peer_test();
1195
1196        let (remote, channel) = Channel::create();
1197        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1198        run_to_stalled(&mut exec);
1199
1200        // Disconnect the signaling channel, peer should be gone.
1201        drop(remote);
1202
1203        run_to_stalled(&mut exec);
1204
1205        assert!(peers.get(&id).is_none());
1206
1207        // Connect another peer with the same ID
1208        let (_remote, channel) = Channel::create();
1209
1210        assert!(exec.run_singlethreaded(peers.connected(id, channel, None)).is_ok());
1211        run_to_stalled(&mut exec);
1212
1213        // Should be connected.
1214        assert!(peers.get(&id).is_some());
1215    }
1216}