Skip to main content

bt_a2dp/peer/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context as _;
6use bt_avdtp::{
7    self as avdtp, MediaCodecType, ServiceCapability, ServiceCategory, StreamEndpoint,
8    StreamEndpointId,
9};
10use fidl_fuchsia_bluetooth::ChannelParameters;
11use fidl_fuchsia_bluetooth_bredr::{
12    ConnectParameters, L2capParameters, PSM_AVDTP, ProfileDescriptor, ProfileProxy,
13};
14use fuchsia_async::{self as fasync, DurationExt};
15use fuchsia_bluetooth::inspect::DebugExt;
16use fuchsia_bluetooth::types::{Channel, PeerId};
17use fuchsia_inspect as inspect;
18use fuchsia_inspect_derive::{AttachError, Inspect};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc;
21use futures::future::{BoxFuture, Either};
22use futures::stream::FuturesUnordered;
23use futures::task::{Context, Poll, Waker};
24use futures::{Future, FutureExt, StreamExt, select};
25use log::{debug, info, trace, warn};
26use std::collections::{BTreeMap, HashMap, HashSet};
27use std::pin::Pin;
28use std::sync::{Arc, Weak};
29
30/// For sending out-of-band commands over the A2DP peer.
31mod controller;
32pub use controller::ControllerPool;
33
34use crate::codec::MediaCodecConfig;
35use crate::permits::{Permit, Permits};
36use crate::stream::{Stream, Streams};
37
38/// A Peer represents an A2DP peer which may be connected to this device.
39/// Only one A2DP peer should exist for each Bluetooth peer.
40#[derive(Inspect)]
41pub struct Peer {
42    /// The id of the peer we are connected to.
43    id: PeerId,
44    /// Inner keeps track of the peer and the streams.
45    #[inspect(forward)]
46    inner: Arc<Mutex<PeerInner>>,
47    /// Profile Proxy to connect new transport channels
48    profile: ProfileProxy,
49    /// The profile descriptor for this peer, if it has been discovered.
50    descriptor: Mutex<Option<ProfileDescriptor>>,
51    /// Wakers that are to be woken when the peer disconnects.  If None, the peers have been woken
52    /// and this peer is disconnected.  Shared weakly with ClosedPeer future objects that complete
53    /// when the peer disconnects.
54    closed_wakers: Arc<Mutex<Option<Vec<Waker>>>>,
55    /// Used to report peer metrics to Cobalt.
56    metrics: bt_metrics::MetricsLogger,
57    /// A task waiting to start a stream if it hasn't been started yet.
58    start_stream_task: Mutex<Option<fasync::Task<avdtp::Result<()>>>>,
59}
60
61/// StreamPermits handles reserving and retrieving permits for streaming audio.
62/// Reservations are automatically retrieved for streams that are revoked, and when the
63/// reservation is completed, the permit is stored and a StreamPermit is sent so it can be started.
64#[derive(Clone)]
65struct StreamPermits {
66    permits: Permits,
67    open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
68    reserved_streams: Arc<Mutex<HashSet<StreamEndpointId>>>,
69    inner: Weak<Mutex<PeerInner>>,
70    peer_id: PeerId,
71    sender: mpsc::UnboundedSender<BoxFuture<'static, StreamPermit>>,
72}
73
74#[derive(Debug)]
75struct StreamPermit {
76    local_id: StreamEndpointId,
77    open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
78}
79
80impl StreamPermit {
81    fn local_id(&self) -> &StreamEndpointId {
82        &self.local_id
83    }
84
85    /// Returns true if a Permit is held for this stream endpoint.
86    fn is_held(&self) -> bool {
87        self.open_streams.lock().contains_key(&self.local_id)
88    }
89}
90
91impl Drop for StreamPermit {
92    fn drop(&mut self) {
93        let _ = self.open_streams.lock().remove(&self.local_id);
94    }
95}
96
97impl StreamPermits {
98    fn new(
99        inner: Weak<Mutex<PeerInner>>,
100        peer_id: PeerId,
101        permits: Permits,
102    ) -> (Self, mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>) {
103        let (sender, reservations_receiver) = futures::channel::mpsc::unbounded();
104        (
105            Self {
106                inner,
107                permits,
108                peer_id,
109                sender,
110                open_streams: Default::default(),
111                reserved_streams: Default::default(),
112            },
113            reservations_receiver,
114        )
115    }
116
117    fn label_for(&self, local_id: &StreamEndpointId) -> String {
118        format!("{} {}", self.peer_id, local_id)
119    }
120
121    /// Get a permit to stream on the stream with id `local_id`.
122    /// Returns Some() if there is a permit available.
123    fn get(&self, local_id: StreamEndpointId) -> Option<StreamPermit> {
124        let revoke_fn = self.make_revocation_fn(&local_id);
125        let Some(permit) = self.permits.get_revokable(revoke_fn) else {
126            info!("No permits available: {:?}", self.permits);
127            return None;
128        };
129        permit.relabel(self.label_for(&local_id));
130        if let Some(_) = self.open_streams.lock().insert(local_id.clone(), permit) {
131            warn!(id:% = self.peer_id; "Started stream {local_id:?} twice, dropping previous permit");
132        }
133        Some(StreamPermit { local_id, open_streams: self.open_streams.clone() })
134    }
135
136    /// Get a reservation that will resolve to a StreamPermit to start a stream with the id
137    /// `local_id`
138    fn setup_reservation_for(&self, local_id: StreamEndpointId) {
139        if !self.reserved_streams.lock().insert(local_id.clone()) {
140            // Already reserved.
141            return;
142        }
143        let restart_stream_available_fut = {
144            let self_revoke_fn = Self::make_revocation_fn(&self, &local_id);
145            let reservation = self.permits.reserve_revokable(self_revoke_fn);
146            let open_streams = self.open_streams.clone();
147            let reserved_streams = self.reserved_streams.clone();
148            let label = self.label_for(&local_id);
149            let local_id = local_id.clone();
150            async move {
151                let permit = reservation.await;
152                permit.relabel(label);
153                if open_streams.lock().insert(local_id.clone(), permit).is_some() {
154                    warn!("Reservation replaces acquired permit for {}", local_id.clone());
155                }
156                if !reserved_streams.lock().remove(&local_id) {
157                    warn!(local_id:%; "Unrecorded reservation resolved");
158                }
159                StreamPermit { local_id, open_streams }
160            }
161        };
162        if let Err(e) = self.sender.unbounded_send(restart_stream_available_fut.boxed()) {
163            warn!(id:% = self.peer_id, local_id:%, e:?; "Couldn't queue reservation to finish");
164        }
165    }
166
167    /// Revokes a permit that was previously delivered, suspending the local stream and signaling
168    /// the peer.
169    /// The permit must have been previously received through StreamPermits::get or a have been
170    /// restarted after being revoked, otherwise will panic.
171    fn revocation_fn(self, local_id: StreamEndpointId) -> Permit {
172        if let Ok(peer) = PeerInner::upgrade(self.inner.clone()) {
173            {
174                let mut lock = peer.lock();
175                match lock.suspend_local_stream(&local_id) {
176                    Ok(remote_id) => drop(lock.peer.suspend(&[remote_id])),
177                    Err(e) => warn!("Couldn't stop local stream {local_id:?}: {e:?}"),
178                }
179            }
180            self.setup_reservation_for(local_id.clone());
181        }
182        self.open_streams.lock().remove(&local_id).expect("permit revoked but don't have it")
183    }
184
185    fn make_revocation_fn(&self, local_id: &StreamEndpointId) -> impl FnOnce() -> Permit + use<> {
186        let local_id = local_id.clone();
187        let cloned = self.clone();
188        move || cloned.revocation_fn(local_id)
189    }
190}
191
192impl Peer {
193    /// Make a new Peer which is connected to the peer `id` using the AVDTP `peer`.
194    /// The `streams` are the local endpoints available to the peer.
195    /// `profile` will be used to initiate connections for Media Transport.
196    /// The `permits`, if provided, will acquire a permit before starting streams on this peer.
197    /// If `metrics` is included, metrics for codec availability will be reported.
198    /// This also starts a task on the executor to handle incoming events from the peer.
199    pub fn create(
200        id: PeerId,
201        peer: avdtp::Peer,
202        streams: Streams,
203        permits: Option<Permits>,
204        profile: ProfileProxy,
205        metrics: bt_metrics::MetricsLogger,
206    ) -> Self {
207        let inner = Arc::new(Mutex::new(PeerInner::new(peer, id, streams, metrics.clone())));
208        let reservations_receiver = if let Some(permits) = permits {
209            let (stream_permits, receiver) =
210                StreamPermits::new(Arc::downgrade(&inner), id, permits);
211            inner.lock().permits = Some(stream_permits);
212            receiver
213        } else {
214            let (_, receiver) = mpsc::unbounded();
215            receiver
216        };
217        let res = Self {
218            id,
219            inner,
220            profile,
221            descriptor: Mutex::new(None),
222            closed_wakers: Arc::new(Mutex::new(Some(Vec::new()))),
223            metrics,
224            start_stream_task: Mutex::new(None),
225        };
226        res.start_requests_task(reservations_receiver);
227        res
228    }
229
230    pub fn set_descriptor(&self, descriptor: ProfileDescriptor) -> Option<ProfileDescriptor> {
231        self.descriptor.lock().replace(descriptor)
232    }
233
234    /// How long to wait after a non-local establishment of a stream to start the stream.
235    /// Chosen to produce reasonably quick startup while allowing for peer start.
236    const STREAM_DWELL: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(500);
237
238    /// Receive a channel from the peer that was initiated remotely.
239    /// This function should be called whenever the peer associated with this opens an L2CAP channel.
240    /// If this completes opening a stream, streams that are suspended will be scheduled to start.
241    pub fn receive_channel(&self, channel: Channel) -> avdtp::Result<()> {
242        let mut lock = self.inner.lock();
243        if lock.receive_channel(channel)? {
244            let weak = Arc::downgrade(&self.inner);
245            let mut task_lock = self.start_stream_task.lock();
246            *task_lock = Some(fasync::Task::local(async move {
247                trace!("Dwelling to start remotely-opened stream..");
248                fasync::Timer::new(Self::STREAM_DWELL.after_now()).await;
249                PeerInner::start_opened(weak).await
250            }));
251        }
252        Ok(())
253    }
254
255    /// Return a handle to the AVDTP peer, to use as initiator of commands.
256    pub fn avdtp(&self) -> avdtp::Peer {
257        let lock = self.inner.lock();
258        lock.peer.clone()
259    }
260
261    /// Returns the stream endpoints discovered by this peer.
262    pub fn remote_endpoints(&self) -> Option<Vec<avdtp::StreamEndpoint>> {
263        self.inner.lock().remote_endpoints()
264    }
265
266    /// Perform Discovery and Collect Capabilities to enumerate the endpoints and capabilities of
267    /// the connected peer.
268    /// Returns a future which performs the work and resolves to a vector of peer stream endpoints.
269    pub fn collect_capabilities(
270        &self,
271    ) -> impl Future<Output = avdtp::Result<Vec<avdtp::StreamEndpoint>>> + use<> {
272        let avdtp = self.avdtp();
273        let get_all = self.descriptor.lock().clone().is_some_and(a2dp_version_check);
274        let inner = self.inner.clone();
275        let metrics = self.metrics.clone();
276        let peer_id = self.id;
277        async move {
278            if let Some(caps) = inner.lock().remote_endpoints() {
279                return Ok(caps);
280            }
281            trace!("Discovering peer streams..");
282            let infos = avdtp.discover().await?;
283            trace!("Discovered {} streams", infos.len());
284            let mut remote_streams = Vec::new();
285            for info in infos {
286                let capabilities = if get_all {
287                    avdtp.get_all_capabilities(info.id()).await
288                } else {
289                    avdtp.get_capabilities(info.id()).await
290                };
291                match capabilities {
292                    Ok(capabilities) => {
293                        trace!("Stream {:?}", info);
294                        for cap in &capabilities {
295                            trace!("  - {:?}", cap);
296                        }
297                        remote_streams.push(avdtp::StreamEndpoint::from_info(&info, capabilities));
298                    }
299                    Err(e) => {
300                        info!(peer_id:%; "Stream {} capabilities failed: {:?}, skipping", info.id(), e);
301                    }
302                };
303            }
304            inner.lock().set_remote_endpoints(&remote_streams);
305            Self::record_cobalt_metrics(metrics, &remote_streams);
306            Ok(remote_streams)
307        }
308    }
309
310    fn record_cobalt_metrics(metrics: bt_metrics::MetricsLogger, endpoints: &[StreamEndpoint]) {
311        let codec_metrics: HashSet<_> = endpoints
312            .iter()
313            .filter_map(|endpoint| {
314                endpoint.codec_type().map(|t| codectype_to_availability_metric(t) as u32)
315            })
316            .collect();
317        metrics
318            .log_occurrences(bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID, codec_metrics);
319
320        let cap_metrics: HashSet<_> = endpoints
321            .iter()
322            .flat_map(|endpoint| {
323                endpoint
324                    .capabilities()
325                    .iter()
326                    .filter_map(|t| capability_to_metric(t))
327                    .chain(std::iter::once(
328                        bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic,
329                    ))
330                    .map(|t| t as u32)
331            })
332            .collect();
333        metrics.log_occurrences(bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID, cap_metrics);
334    }
335
336    fn transport_channel_params() -> L2capParameters {
337        L2capParameters {
338            psm: Some(PSM_AVDTP),
339            parameters: Some(ChannelParameters {
340                max_rx_packet_size: Some(65535),
341                ..Default::default()
342            }),
343            ..Default::default()
344        }
345    }
346
347    /// Open and start a media transport stream, connecting a compatible local stream to the remote
348    /// stream `remote_id`, configuring it with the `capabilities` provided.
349    /// Returns a future which should be awaited on.
350    /// The future returns Ok(()) if successfully started, and an appropriate error otherwise.
351    pub fn stream_start(
352        &self,
353        remote_id: StreamEndpointId,
354        capabilities: Vec<ServiceCapability>,
355    ) -> impl Future<Output = avdtp::Result<()>> {
356        let peer = Arc::downgrade(&self.inner);
357        let peer_id = self.id.clone();
358        let avdtp = self.avdtp();
359        let profile = self.profile.clone();
360
361        async move {
362            let codec_params =
363                capabilities.iter().find(|x| x.is_codec()).ok_or(avdtp::Error::InvalidState)?;
364            let (local_id, local_capabilities) = {
365                let peer = PeerInner::upgrade(peer.clone())?;
366                let lock = peer.lock();
367                lock.find_compatible_local_capabilities(codec_params, &remote_id)?
368            };
369
370            let local_by_cat: HashMap<ServiceCategory, ServiceCapability> =
371                local_capabilities.into_iter().map(|i| (i.category(), i)).collect();
372
373            // Filter things out if they don't have a match in the local capabilities.
374            // Order them by the ServiceCategory ordinal - some noncompliant devices care about it.
375            let shared_capabilities: BTreeMap<ServiceCategory, ServiceCapability> = capabilities
376                .into_iter()
377                .filter_map(|cap| {
378                    let Some(local_cap) = local_by_cat.get(&cap.category()) else {
379                        return None;
380                    };
381                    if cap.category() == ServiceCategory::MediaCodec {
382                        let Ok(a) = MediaCodecConfig::try_from(&cap) else {
383                            return None;
384                        };
385                        let Ok(b) = MediaCodecConfig::try_from(local_cap) else {
386                            return None;
387                        };
388                        let Some(negotiated) = MediaCodecConfig::negotiate(&a, &b) else {
389                            return None;
390                        };
391                        Some((cap.category(), (&negotiated).into()))
392                    } else {
393                        Some((cap.category(), cap))
394                    }
395                })
396                .collect();
397            let shared_capabilities: Vec<_> = shared_capabilities.into_values().collect();
398
399            trace!("Starting stream {local_id} to remote {remote_id} with {shared_capabilities:?}");
400
401            avdtp.set_configuration(&remote_id, &local_id, &shared_capabilities).await?;
402            {
403                let strong = PeerInner::upgrade(peer.clone())?;
404                strong.lock().set_opening(&local_id, &remote_id, shared_capabilities)?;
405            }
406            avdtp.open(&remote_id).await?;
407
408            debug!(peer_id:%; "Connecting transport channel");
409            let channel = profile
410                .connect(
411                    &peer_id.into(),
412                    &ConnectParameters::L2cap(Self::transport_channel_params()),
413                )
414                .await
415                .context("FIDL error: {}")?
416                .or(Err(avdtp::Error::PeerDisconnected))?;
417            trace!(peer_id:%; "Connected transport channel, converting to local Channel");
418            let channel = match channel.try_into() {
419                Err(e) => {
420                    warn!(peer_id:%, e:?; "Couldn't connect media transport: no channel");
421                    return Err(avdtp::Error::PeerDisconnected);
422                }
423                Ok(c) => c,
424            };
425
426            trace!(peer_id:%; "Connected transport channel, passing to Peer..");
427
428            {
429                let strong = PeerInner::upgrade(peer.clone())?;
430                let _ = strong.lock().receive_channel(channel)?;
431            }
432            // Start streams immediately if the channel is locally initiated.
433            PeerInner::start_opened(peer).await
434        }
435    }
436
437    /// Query whether any streams are currently started or scheduled to start.
438    pub fn streaming_active(&self) -> bool {
439        self.inner.lock().is_streaming() || self.will_start_streaming()
440    }
441
442    /// Returns true if there are any streams that are currently started.
443    #[cfg(test)]
444    fn is_streaming_now(&self) -> bool {
445        self.inner.lock().is_streaming_now()
446    }
447
448    /// Polls the task scheduled to start streaming, returning true if the task is still scheduled
449    /// to start streaming.
450    fn will_start_streaming(&self) -> bool {
451        let mut task_lock = self.start_stream_task.lock();
452        if task_lock.is_none() {
453            return false;
454        }
455        // This is the only thing that can poll the start task, so it is okay to ignore the wakeup.
456        let mut cx = Context::from_waker(&std::task::Waker::noop());
457        if let Poll::Pending = task_lock.as_mut().unwrap().poll_unpin(&mut cx) {
458            return true;
459        }
460        // Reset the task to None so that we don't try to re-poll it.
461        let _ = task_lock.take();
462        false
463    }
464
465    /// Suspend a media transport stream `local_id`.
466    /// It's possible that the stream is not active - a suspend will be attempted, but an
467    /// error from the command will be returned.
468    /// Returns the result of the suspend command.
469    pub fn stream_suspend(
470        &self,
471        local_id: StreamEndpointId,
472    ) -> impl Future<Output = avdtp::Result<()>> {
473        let peer = Arc::downgrade(&self.inner);
474        PeerInner::suspend(peer, local_id)
475    }
476
477    /// Start an asynchronous task to handle any requests from the AVDTP peer.
478    /// This task completes when the remote end closes the signaling connection.
479    fn start_requests_task(
480        &self,
481        mut reservations_receiver: mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>,
482    ) {
483        let lock = self.inner.lock();
484        let mut request_stream = lock.peer.take_request_stream();
485        let id = self.id.clone();
486        let peer = Arc::downgrade(&self.inner);
487        let mut stream_reservations = FuturesUnordered::new();
488        let disconnect_wakers = Arc::downgrade(&self.closed_wakers);
489        fuchsia_async::Task::local(async move {
490            loop {
491                select! {
492                    request = request_stream.next() => {
493                        match request {
494                            None => break,
495                            Some(Err(e)) => info!(peer_id:% = id, e:?; "Request stream error"),
496                            Some(Ok(request)) => match peer.upgrade() {
497                                None => return,
498                                Some(p) => {
499                                    let result_or_future = p.lock().handle_request(request);
500                                    let result = match result_or_future {
501                                        Either::Left(result) => result,
502                                        Either::Right(future) => future.await,
503                                    };
504                                    if let Err(e) = result {
505                                        warn!(peer_id:% = id, e:?; "Error handling request");
506                                    }
507                                }
508                            },
509                        }
510                    },
511                    reservation_fut = reservations_receiver.select_next_some() => {
512                        stream_reservations.push(reservation_fut)
513                    },
514                    permit = stream_reservations.select_next_some() => {
515                        if let Err(e) = PeerInner::start_permit(peer.clone(), permit).await {
516                            warn!(peer_id:% = id, e:?; "Couldn't start stream after unpause");
517                        }
518                    }
519                    complete => break,
520                }
521            }
522            info!(peer_id:% = id; "disconnected");
523            if let Some(wakers) = disconnect_wakers.upgrade() {
524                for waker in wakers.lock().take().unwrap_or_else(Vec::new) {
525                    waker.wake();
526                }
527            }
528        })
529        .detach();
530    }
531
532    /// Returns a future that will complete when the peer disconnects.
533    pub fn closed(&self) -> ClosedPeer {
534        ClosedPeer { inner: Arc::downgrade(&self.closed_wakers) }
535    }
536}
537
538/// Future which completes when the A2DP peer has closed the control connection.
539/// See `Peer::closed`
540#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct ClosedPeer {
542    inner: Weak<Mutex<Option<Vec<Waker>>>>,
543}
544
545impl Future for ClosedPeer {
546    type Output = ();
547
548    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
549        match self.inner.upgrade() {
550            None => Poll::Ready(()),
551            Some(inner) => match inner.lock().as_mut() {
552                None => Poll::Ready(()),
553                Some(wakers) => {
554                    wakers.push(cx.waker().clone());
555                    Poll::Pending
556                }
557            },
558        }
559    }
560}
561
562/// Determines if Peer profile version is newer (>= 1.3) or older (< 1.3)
563fn a2dp_version_check(profile: ProfileDescriptor) -> bool {
564    let (Some(major), Some(minor)) = (profile.major_version, profile.minor_version) else {
565        return false;
566    };
567    (major == 1 && minor >= 3) || major > 1
568}
569
570/// Peer handles the communication with the AVDTP layer, and provides responses as appropriate
571/// based on the current state of local streams available.
572/// Each peer has its own set of local stream endpoints, and tracks a set of remote peer endpoints.
573struct PeerInner {
574    /// AVDTP peer communicating to this.
575    peer: avdtp::Peer,
576    /// The PeerId that this peer is representing
577    peer_id: PeerId,
578    /// Some(local_id) if an endpoint has been configured but hasn't finished opening.
579    /// Per AVDTP Sec 6.11 only up to one stream can be in this state.
580    opening: Option<StreamEndpointId>,
581    /// The local stream endpoint collection
582    local: Streams,
583    /// The permits that are available for this peer.
584    permits: Option<StreamPermits>,
585    /// Tasks watching for the end of a started stream. Key is the local stream id.
586    started: HashMap<StreamEndpointId, WatchedStream>,
587    /// The inspect node for this peer
588    inspect: fuchsia_inspect::Node,
589    /// The set of discovered remote endpoints. None until set.
590    remote_endpoints: Option<Vec<StreamEndpoint>>,
591    /// The inspect node representing the remote endpoints.
592    remote_inspect: fuchsia_inspect::Node,
593    /// Cobalt logger used to report peer metrics.
594    metrics: bt_metrics::MetricsLogger,
595}
596
597impl Inspect for &mut PeerInner {
598    // Set up the StreamEndpoint to update the state
599    // The MediaTask node will be created when the media task is started.
600    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
601        self.inspect = parent.create_child(name.as_ref());
602        self.inspect.record_string("id", self.peer_id.to_string());
603        self.local.iattach(&self.inspect, "local_streams")
604    }
605}
606
607impl PeerInner {
608    pub fn new(
609        peer: avdtp::Peer,
610        peer_id: PeerId,
611        local: Streams,
612        metrics: bt_metrics::MetricsLogger,
613    ) -> Self {
614        Self {
615            peer,
616            peer_id,
617            opening: None,
618            local,
619            permits: None,
620            started: HashMap::new(),
621            inspect: Default::default(),
622            remote_endpoints: None,
623            remote_inspect: Default::default(),
624            metrics,
625        }
626    }
627
628    /// Returns an endpoint from the local set or a BadAcpSeid error if it doesn't exist.
629    fn get_mut(&mut self, local_id: &StreamEndpointId) -> Result<&mut Stream, avdtp::ErrorCode> {
630        self.local.get_mut(&local_id).ok_or(avdtp::ErrorCode::BadAcpSeid)
631    }
632
633    fn set_remote_endpoints(&mut self, endpoints: &[StreamEndpoint]) {
634        self.remote_inspect = self.inspect.create_child("remote_endpoints");
635        for endpoint in endpoints {
636            self.remote_inspect.record_child(inspect::unique_name("remote_"), |node| {
637                node.record_string("endpoint_id", endpoint.local_id().debug());
638                node.record_string("capabilities", endpoint.capabilities().debug());
639                node.record_string("type", endpoint.endpoint_type().debug());
640            });
641        }
642        self.remote_endpoints = Some(endpoints.iter().map(StreamEndpoint::as_new).collect());
643    }
644
645    /// If the remote endpoints have been set, returns a copy of the endpoints.
646    fn remote_endpoints(&self) -> Option<Vec<StreamEndpoint>> {
647        self.remote_endpoints.as_ref().map(|v| v.iter().map(StreamEndpoint::as_new).collect())
648    }
649
650    /// If the remote endpoint with endpoint `id` exists, return a copy of the endpoint.
651    fn remote_endpoint(&self, id: &StreamEndpointId) -> Option<StreamEndpoint> {
652        self.remote_endpoints
653            .as_ref()
654            .and_then(|v| v.iter().find(|v| v.local_id() == id).map(StreamEndpoint::as_new))
655    }
656
657    /// Returns true if there is at least one stream that has started or is starting for this peer.
658    fn is_streaming(&self) -> bool {
659        self.is_streaming_now() || self.opening.is_some()
660    }
661
662    /// Returns true if there is at least one stream in the started state for this peer.
663    fn is_streaming_now(&self) -> bool {
664        self.local.streaming().next().is_some()
665    }
666
667    fn set_opening(
668        &mut self,
669        local_id: &StreamEndpointId,
670        remote_id: &StreamEndpointId,
671        capabilities: Vec<ServiceCapability>,
672    ) -> avdtp::Result<()> {
673        if self.opening.is_some() {
674            return Err(avdtp::Error::InvalidState);
675        }
676        let peer_id = self.peer_id;
677        let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
678        stream
679            .configure(&peer_id, &remote_id, capabilities)
680            .map_err(|(cat, c)| avdtp::Error::RequestInvalidExtra(c, (&cat).into()))?;
681        stream.endpoint_mut().establish().or(Err(avdtp::Error::InvalidState))?;
682        self.opening = Some(local_id.clone());
683        Ok(())
684    }
685
686    fn upgrade(weak: Weak<Mutex<Self>>) -> avdtp::Result<Arc<Mutex<Self>>> {
687        weak.upgrade().ok_or(avdtp::Error::PeerDisconnected)
688    }
689
690    /// Start the stream that is opening, completing the opened procedure.
691    async fn start_opened(weak: Weak<Mutex<Self>>) -> avdtp::Result<()> {
692        let (avdtp, stream_pairs) = {
693            let peer = Self::upgrade(weak.clone())?;
694            let peer = peer.lock();
695            let stream_pairs: Vec<(StreamEndpointId, StreamEndpointId)> = peer
696                .local
697                .open()
698                .filter_map(|stream| {
699                    let endpoint = stream.endpoint();
700                    endpoint.remote_id().map(|id| (endpoint.local_id().clone(), id.clone()))
701                })
702                .collect();
703            (peer.peer.clone(), stream_pairs)
704        };
705        for (local_id, remote_id) in stream_pairs {
706            let permit_result =
707                Self::upgrade(weak.clone())?.lock().get_permit_or_reserve(&local_id);
708            if let Ok(permit) = permit_result {
709                Self::initiated_start(avdtp.clone(), weak.clone(), permit, &local_id, &remote_id)
710                    .await?;
711            }
712        }
713        Ok(())
714    }
715
716    async fn start_permit(weak: Weak<Mutex<Self>>, permit: StreamPermit) -> avdtp::Result<()> {
717        let local_id = permit.local_id().clone();
718        let (avdtp, remote_id) = {
719            let peer = Self::upgrade(weak.clone())?;
720            let mut peer = peer.lock();
721            let remote_id = peer
722                .get_mut(&local_id)
723                .map_err(|e| avdtp::Error::RequestInvalid(e))?
724                .endpoint()
725                .remote_id()
726                .ok_or(avdtp::Error::InvalidState)?
727                .clone();
728            (peer.peer.clone(), remote_id)
729        };
730        Self::initiated_start(avdtp, weak, Some(permit), &local_id, &remote_id).await
731    }
732
733    /// Start a stream for a local reason.  Requires a Permit to start streaming for the local stream.
734    async fn initiated_start(
735        avdtp: avdtp::Peer,
736        weak: Weak<Mutex<Self>>,
737        permit: Option<StreamPermit>,
738        local_id: &StreamEndpointId,
739        remote_id: &StreamEndpointId,
740    ) -> avdtp::Result<()> {
741        trace!(permit:?, local_id:?, remote_id:?; "Making outgoing start request");
742        let to_start = std::slice::from_ref(remote_id);
743        avdtp.start(to_start).await?;
744        trace!("Start response received: {permit:?}");
745        let peer = Self::upgrade(weak.clone())?;
746        let (peer_id, start_result) = {
747            let mut peer = peer.lock();
748            (peer.peer_id, peer.start_local_stream(permit, &local_id))
749        };
750        if let Err(e) = start_result {
751            warn!(peer_id:%, local_id:%, remote_id:%, e:?; "Failed to start local stream, suspending");
752            avdtp.suspend(to_start).await?;
753        }
754        Ok(())
755    }
756
757    /// Suspend a stream locally, returning a future to get the result from the peer.
758    fn suspend(
759        weak: Weak<Mutex<Self>>,
760        local_id: StreamEndpointId,
761    ) -> impl Future<Output = avdtp::Result<()>> {
762        let res = (move || {
763            let peer = Self::upgrade(weak.clone())?;
764            let mut peer = peer.lock();
765            Ok((peer.peer.clone(), peer.suspend_local_stream(&local_id)?))
766        })();
767        let (avdtp, remote_id) = match res {
768            Err(e) => return futures::future::err(e).left_future(),
769            Ok(r) => r,
770        };
771        let to_suspend = &[remote_id];
772        avdtp.suspend(to_suspend).right_future()
773    }
774
775    /// Finds a stream in the local stream set which is compatible with the remote_id given the codec config.
776    /// Returns the local stream ID and capabilities if found, or OutOfRange if one could not be found.
777    pub fn find_compatible_local_capabilities(
778        &self,
779        codec_params: &ServiceCapability,
780        remote_id: &StreamEndpointId,
781    ) -> avdtp::Result<(StreamEndpointId, Vec<ServiceCapability>)> {
782        let config = codec_params.try_into()?;
783        let our_direction = self.remote_endpoint(remote_id).map(|e| e.endpoint_type().opposite());
784        debug!(codec_params:?, local:? = self.local; "Looking for compatible local stream");
785        self.local
786            .compatible(config)
787            .find_map(|s| {
788                let endpoint = s.endpoint();
789                if let Some(d) = our_direction {
790                    if &d != endpoint.endpoint_type() {
791                        return None;
792                    }
793                }
794                Some((endpoint.local_id().clone(), endpoint.capabilities().clone()))
795            })
796            .ok_or(avdtp::Error::OutOfRange)
797    }
798
799    /// Attempts to acquire a permit for streaming, if the permits are set.
800    /// Returns Ok if is is okay to stream, and Err if the permit was not available and a
801    /// reservation was made.
802    fn get_permit_or_reserve(
803        &self,
804        local_id: &StreamEndpointId,
805    ) -> Result<Option<StreamPermit>, ()> {
806        let Some(permits) = self.permits.as_ref() else {
807            return Ok(None);
808        };
809        if let Some(permit) = permits.get(local_id.clone()) {
810            return Ok(Some(permit));
811        }
812        info!(peer_id:% = self.peer_id, local_id:%; "No permit to start stream, adding a reservation");
813        permits.setup_reservation_for(local_id.clone());
814        Err(())
815    }
816
817    /// Starts the stream which is in the local Streams with `local_id`.
818    /// Requires a permit to stream.
819    fn start_local_stream(
820        &mut self,
821        permit: Option<StreamPermit>,
822        local_id: &StreamEndpointId,
823    ) -> avdtp::Result<()> {
824        let peer_id = self.peer_id;
825        let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
826        // The streaming permit can be revoked while stream setup is in progress. If so, return
827        // without starting the local stream.
828        if permit.as_ref().is_some_and(|p| !p.is_held()) {
829            return Err(avdtp::Error::Other(anyhow::format_err!(
830                "streaming permit revoked during setup"
831            )));
832        }
833
834        info!(peer_id:%, stream:?; "Starting");
835        let stream_finished = stream.start().map_err(|c| avdtp::Error::RequestInvalid(c))?;
836        // TODO(https://fxbug.dev/42147239): if streaming stops unexpectedly, send a suspend to match to peer
837        let watched_stream = WatchedStream::new(permit, stream_finished);
838        if self.started.insert(local_id.clone(), watched_stream).is_some() {
839            warn!(peer_id:%, local_id:%; "Stream that was already started");
840        }
841        Ok(())
842    }
843
844    /// Suspend a stream on the local side. Returns the remote StreamEndpointId if the stream was suspended,
845    /// or a RequestInvalid error with the error code otherwise.
846    fn suspend_local_stream(
847        &mut self,
848        local_id: &StreamEndpointId,
849    ) -> avdtp::Result<StreamEndpointId> {
850        let peer_id = self.peer_id;
851        let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
852        let remote_id = stream.endpoint().remote_id().ok_or(avdtp::Error::InvalidState)?.clone();
853        info!(peer_id:%; "Suspend stream local {local_id} <-> {remote_id} remote");
854        stream.suspend().map_err(|c| avdtp::Error::RequestInvalid(c))?;
855        let _ = self.started.remove(local_id);
856        Ok(remote_id)
857    }
858
859    /// Provide a new established L2CAP channel to this remote peer.
860    /// This function should be called whenever the remote associated with this peer opens an
861    /// L2CAP channel after the first.
862    /// Returns true if this channel completed the opening sequence.
863    fn receive_channel(&mut self, channel: Channel) -> avdtp::Result<bool> {
864        let stream_id = self.opening.as_ref().cloned().ok_or(avdtp::Error::InvalidState)?;
865        let stream = self.get_mut(&stream_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
866        let done = !stream.endpoint_mut().receive_channel(channel)?;
867        if done {
868            self.opening = None;
869        }
870        info!(peer_id:% = self.peer_id, stream_id:%; "Transport connected");
871        Ok(done)
872    }
873
874    /// Handle a single request event from the avdtp peer.
875    fn handle_request(
876        &mut self,
877        request: avdtp::Request,
878    ) -> Either<avdtp::Result<()>, impl Future<Output = avdtp::Result<()>> + use<>> {
879        use avdtp::ErrorCode;
880        use avdtp::Request::*;
881        trace!("Handling {request:?} from peer..");
882        let immediate_result = 'result: {
883            match request {
884                Discover { responder } => responder.send(&self.local.information()),
885                GetCapabilities { responder, stream_id }
886                | GetAllCapabilities { responder, stream_id } => match self.local.get(&stream_id) {
887                    None => responder.reject(ErrorCode::BadAcpSeid),
888                    Some(stream) => responder.send(stream.endpoint().capabilities()),
889                },
890                Open { responder, stream_id } => {
891                    if self.opening.is_none() {
892                        break 'result responder.reject(ErrorCode::BadState);
893                    }
894                    let Ok(stream) = self.get_mut(&stream_id) else {
895                        break 'result responder.reject(ErrorCode::BadAcpSeid);
896                    };
897                    match stream.endpoint_mut().establish() {
898                        Ok(()) => responder.send(),
899                        Err(_) => responder.reject(ErrorCode::BadState),
900                    }
901                }
902                Close { responder, stream_id } => {
903                    let peer = self.peer.clone();
904                    let Ok(stream) = self.get_mut(&stream_id) else {
905                        break 'result responder.reject(ErrorCode::BadAcpSeid);
906                    };
907                    stream.release(responder, &peer)
908                }
909                SetConfiguration { responder, local_stream_id, remote_stream_id, capabilities } => {
910                    if self.opening.is_some() {
911                        break 'result responder.reject(ServiceCategory::None, ErrorCode::BadState);
912                    }
913                    let peer_id = self.peer_id;
914                    let Ok(stream) = self.get_mut(&local_stream_id) else {
915                        break 'result responder
916                            .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
917                    };
918                    match stream.configure(&peer_id, &remote_stream_id, capabilities) {
919                        Ok(_) => {
920                            self.opening = Some(local_stream_id);
921                            responder.send()
922                        }
923                        Err((category, code)) => responder.reject(category, code),
924                    }
925                }
926                GetConfiguration { stream_id, responder } => {
927                    let Ok(stream) = self.get_mut(&stream_id) else {
928                        break 'result responder.reject(ErrorCode::BadAcpSeid);
929                    };
930                    let Some(vec_capabilities) = stream.endpoint().get_configuration() else {
931                        break 'result responder.reject(ErrorCode::BadState);
932                    };
933                    responder.send(vec_capabilities.as_slice())
934                }
935                Reconfigure { responder, local_stream_id, capabilities } => {
936                    let Ok(stream) = self.get_mut(&local_stream_id) else {
937                        break 'result responder
938                            .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
939                    };
940                    match stream.reconfigure(capabilities) {
941                        Ok(_) => responder.send(),
942                        Err((cat, code)) => responder.reject(cat, code),
943                    }
944                }
945                Start { responder, stream_ids } => {
946                    let mut immediate_suspend = Vec::new();
947                    // Fail on the first failed endpoint, as per the AVDTP spec 8.13 Note 5
948                    let result = stream_ids.into_iter().try_for_each(|seid| {
949                        let Some(stream) = self.local.get_mut(&seid) else {
950                            return Err((seid, ErrorCode::BadAcpSeid));
951                        };
952                        let remote_id = stream.endpoint().remote_id().cloned();
953                        let Some(remote_id) = remote_id else {
954                            return Err((seid, ErrorCode::BadState));
955                        };
956                        let Ok(permit) = self.get_permit_or_reserve(&seid) else {
957                            // Happens when we cannot start because of permits.
958                            // Accept this one, then queue up for suspend.
959                            // We are already reserved for a permit.
960                            immediate_suspend.push(remote_id);
961                            return Ok(());
962                        };
963                        match self.start_local_stream(permit, &seid) {
964                            Ok(()) => Ok(()),
965                            Err(avdtp::Error::RequestInvalid(code)) => Err((seid, code)),
966                            Err(_) => Err((seid, ErrorCode::BadState)),
967                        }
968                    });
969                    let response_result = match result {
970                        Ok(()) => responder.send(),
971                        Err((seid, code)) => responder.reject(&seid, code),
972                    };
973                    {
974                        let peer = self.peer.clone();
975                        return Either::Right(async move {
976                            if !immediate_suspend.is_empty() {
977                                peer.suspend(immediate_suspend.as_slice()).await?;
978                            }
979                            response_result
980                        });
981                    }
982                }
983                Suspend { responder, stream_ids } => {
984                    for seid in stream_ids {
985                        match self.suspend_local_stream(&seid) {
986                            Ok(_remote_id) => {}
987                            Err(avdtp::Error::RequestInvalid(code)) => {
988                                break 'result responder.reject(&seid, code);
989                            }
990                            Err(_e) => break 'result responder.reject(&seid, ErrorCode::BadState),
991                        }
992                    }
993                    responder.send()
994                }
995                Abort { responder, stream_id } => {
996                    let Ok(stream) = self.get_mut(&stream_id) else {
997                        // No response is sent on an invalid ID for an Abort
998                        break 'result Ok(());
999                    };
1000                    stream.abort();
1001                    self.opening = self.opening.take().filter(|local_id| local_id != &stream_id);
1002                    responder.send()
1003                }
1004                DelayReport { responder, delay, stream_id } => {
1005                    // Delay is in 1/10 ms
1006                    let delay_ns = delay as u64 * 100000;
1007                    // Record delay to cobalt.
1008                    self.metrics.log_integer(
1009                        bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID,
1010                        delay_ns.try_into().unwrap_or(-1),
1011                        vec![],
1012                    );
1013                    // Report should only come after a stream is configured
1014                    let Some(stream) = self.local.get_mut(&stream_id) else {
1015                        break 'result responder.reject(avdtp::ErrorCode::BadAcpSeid);
1016                    };
1017                    let delay_str = format!("delay {}.{} ms", delay / 10, delay % 10);
1018                    let peer = self.peer_id;
1019                    match stream.set_delay(std::time::Duration::from_nanos(delay_ns)) {
1020                        Ok(()) => info!(peer:%, stream_id:%; "reported {delay_str}"),
1021                        Err(avdtp::ErrorCode::BadState) => {
1022                            info!(peer:%, stream_id:%; "bad state {delay_str}");
1023                            break 'result responder.reject(avdtp::ErrorCode::BadState);
1024                        }
1025                        Err(e) => info!(peer:%, stream_id:%, e:?; "failed {delay_str}"),
1026                    };
1027                    // Can't really respond with an Error
1028                    responder.send()
1029                }
1030            }
1031        };
1032        Either::Left(immediate_result)
1033    }
1034}
1035
1036/// A WatchedStream holds a task tracking a started stream and ensures actions are performed when
1037/// the stream media task finishes.
1038struct WatchedStream {
1039    _permit_task: fasync::Task<()>,
1040}
1041
1042impl WatchedStream {
1043    fn new(
1044        permit: Option<StreamPermit>,
1045        finish_fut: BoxFuture<'static, Result<(), anyhow::Error>>,
1046    ) -> Self {
1047        let permit_task = fasync::Task::spawn(async move {
1048            let _ = finish_fut.await;
1049            drop(permit);
1050        });
1051        Self { _permit_task: permit_task }
1052    }
1053}
1054
1055fn codectype_to_availability_metric(
1056    codec_type: &MediaCodecType,
1057) -> bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec {
1058    match codec_type {
1059        &MediaCodecType::AUDIO_SBC => {
1060            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc
1061        }
1062        &MediaCodecType::AUDIO_MPEG12 => {
1063            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Mpeg12
1064        }
1065        &MediaCodecType::AUDIO_AAC => {
1066            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Aac
1067        }
1068        &MediaCodecType::AUDIO_ATRAC => {
1069            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac
1070        }
1071        &MediaCodecType::AUDIO_NON_A2DP => {
1072            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::VendorSpecific
1073        }
1074        _ => bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown,
1075    }
1076}
1077
1078fn capability_to_metric(
1079    cap: &ServiceCapability,
1080) -> Option<bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability> {
1081    match cap {
1082        ServiceCapability::DelayReporting => {
1083            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport)
1084        }
1085        ServiceCapability::Reporting => {
1086            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Reporting)
1087        }
1088        ServiceCapability::Recovery { .. } => {
1089            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Recovery)
1090        }
1091        ServiceCapability::ContentProtection { .. } => {
1092            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::ContentProtection)
1093        }
1094        ServiceCapability::HeaderCompression { .. } => {
1095            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::HeaderCompression)
1096        }
1097        ServiceCapability::Multiplexing { .. } => {
1098            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Multiplexing)
1099        }
1100        // We ignore capabilities that we don't care to track.
1101        other => {
1102            trace!("untracked remote peer capability: {:?}", other);
1103            None
1104        }
1105    }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110    use super::*;
1111
1112    use async_utils::PollExt;
1113    use bt_metrics::respond_to_metrics_req_for_test;
1114    use fidl::endpoints::create_proxy_and_stream;
1115    use fidl_fuchsia_bluetooth::ErrorCode;
1116    use fidl_fuchsia_bluetooth_bredr::{
1117        ProfileMarker, ProfileRequest, ProfileRequestStream, ServiceClassProfileIdentifier,
1118    };
1119    use fidl_fuchsia_metrics::{MetricEvent, MetricEventPayload};
1120    use futures::{SinkExt, StreamExt};
1121    use std::pin::pin;
1122
1123    use crate::media_task::tests::{TestMediaTask, TestMediaTaskBuilder};
1124    use crate::media_types::*;
1125    use crate::stream::tests::{make_sbc_endpoint, sbc_mediacodec_capability};
1126
1127    fn fake_metrics()
1128    -> (bt_metrics::MetricsLogger, fidl_fuchsia_metrics::MetricEventLoggerRequestStream) {
1129        let (c, s) = fidl::endpoints::create_proxy_and_stream::<
1130            fidl_fuchsia_metrics::MetricEventLoggerMarker,
1131        >();
1132        (bt_metrics::MetricsLogger::from_proxy(c), s)
1133    }
1134
1135    fn setup_avdtp_peer() -> (avdtp::Peer, Channel) {
1136        let (remote, signaling) = Channel::create();
1137        let peer = avdtp::Peer::new(signaling);
1138        (peer, remote)
1139    }
1140
1141    fn build_test_streams() -> Streams {
1142        let mut streams = Streams::default();
1143        let source = Stream::build(
1144            make_sbc_endpoint(1, avdtp::EndpointType::Source),
1145            TestMediaTaskBuilder::new_delayable().builder(),
1146        );
1147        streams.insert(source);
1148        let sink = Stream::build(
1149            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
1150            TestMediaTaskBuilder::new().builder(),
1151        );
1152        streams.insert(sink);
1153        streams
1154    }
1155
1156    fn build_test_streams_delayable() -> Streams {
1157        fn with_delay(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
1158            StreamEndpoint::new(
1159                seid,
1160                avdtp::MediaType::Audio,
1161                direction,
1162                vec![
1163                    avdtp::ServiceCapability::MediaTransport,
1164                    avdtp::ServiceCapability::DelayReporting,
1165                    sbc_mediacodec_capability(),
1166                ],
1167            )
1168            .expect("endpoint creation should succeed")
1169        }
1170        let mut streams = Streams::default();
1171        let source = Stream::build(
1172            with_delay(1, avdtp::EndpointType::Source),
1173            TestMediaTaskBuilder::new_delayable().builder(),
1174        );
1175        streams.insert(source);
1176        let sink = Stream::build(
1177            with_delay(2, avdtp::EndpointType::Sink),
1178            TestMediaTaskBuilder::new().builder(),
1179        );
1180        streams.insert(sink);
1181        streams
1182    }
1183
1184    #[track_caller]
1185    pub(crate) fn recv_remote(remote: &mut Channel) -> Result<Vec<u8>, zx::Status> {
1186        let fut = remote.next();
1187        match fut.now_or_never() {
1188            Some(Some(res)) => res,
1189            Some(None) => Err(zx::Status::PEER_CLOSED),
1190            None => Err(zx::Status::SHOULD_WAIT),
1191        }
1192    }
1193
1194    /// Creates a Peer object, returning a channel connected ot the remote end, a
1195    /// ProfileRequestStream connected to the profile_proxy, and the Peer object.
1196    fn setup_test_peer(
1197        use_cobalt: bool,
1198        streams: Streams,
1199        permits: Option<Permits>,
1200    ) -> (
1201        Channel,
1202        ProfileRequestStream,
1203        Option<fidl_fuchsia_metrics::MetricEventLoggerRequestStream>,
1204        Peer,
1205    ) {
1206        let (avdtp, remote) = setup_avdtp_peer();
1207        let (metrics_logger, cobalt_receiver) = if use_cobalt {
1208            let (l, r) = fake_metrics();
1209            (l, Some(r))
1210        } else {
1211            (bt_metrics::MetricsLogger::default(), None)
1212        };
1213        let (profile_proxy, requests) = create_proxy_and_stream::<ProfileMarker>();
1214        let peer = Peer::create(PeerId(1), avdtp, streams, permits, profile_proxy, metrics_logger);
1215
1216        (remote, requests, cobalt_receiver, peer)
1217    }
1218
1219    #[track_caller]
1220    fn expect_send(exec: &mut fasync::TestExecutor, remote: &mut Channel, data: Vec<u8>) {
1221        exec.run_until_stalled(&mut remote.send(data))
1222            .expect("poll is ready")
1223            .expect("write successful");
1224    }
1225
1226    fn expect_get_capabilities_and_respond(
1227        exec: &mut fasync::TestExecutor,
1228        remote: &mut Channel,
1229        expected_seid: u8,
1230        response_capabilities: &[u8],
1231    ) {
1232        let received = recv_remote(remote).unwrap();
1233        // Last half of header must be Single (0b00) and Command (0b00)
1234        assert_eq!(0x00, received[0] & 0xF);
1235        assert_eq!(0x02, received[1]); // 0x02 = Get Capabilities
1236        assert_eq!(expected_seid << 2, received[2]);
1237
1238        let txlabel_raw = received[0] & 0xF0;
1239
1240        // Expect a get capabilities and respond.
1241        #[rustfmt::skip]
1242        let mut get_capabilities_rsp = vec![
1243            txlabel_raw << 4 | 0x2, // TxLabel (same) + ResponseAccept (0x02)
1244            0x02 // GetCapabilities
1245        ];
1246
1247        get_capabilities_rsp.extend_from_slice(response_capabilities);
1248
1249        expect_send(exec, remote, get_capabilities_rsp);
1250    }
1251
1252    fn expect_get_all_capabilities_and_respond(
1253        exec: &mut fasync::TestExecutor,
1254        remote: &mut Channel,
1255        expected_seid: u8,
1256        response_capabilities: &[u8],
1257    ) {
1258        let received = recv_remote(remote).unwrap();
1259        // Last half of header must be Single (0b00) and Command (0b00)
1260        assert_eq!(0x00, received[0] & 0xF);
1261        assert_eq!(0x0C, received[1]); // 0x0C = Get All Capabilities
1262        assert_eq!(expected_seid << 2, received[2]);
1263
1264        let txlabel_raw = received[0] & 0xF0;
1265
1266        // Expect a get capabilities and respond.
1267        #[rustfmt::skip]
1268        let mut get_capabilities_rsp = vec![
1269            txlabel_raw << 4 | 0x2, // TxLabel (same) + ResponseAccept (0x02)
1270            0x0C // GetAllCapabilities
1271        ];
1272
1273        get_capabilities_rsp.extend_from_slice(response_capabilities);
1274
1275        expect_send(exec, remote, get_capabilities_rsp);
1276    }
1277
1278    #[fuchsia::test]
1279    fn disconnected() {
1280        let mut exec = fasync::TestExecutor::new();
1281        let (proxy, _stream) = create_proxy_and_stream::<ProfileMarker>();
1282        let (remote, signaling) = Channel::create();
1283
1284        let id = PeerId(1);
1285
1286        let avdtp = avdtp::Peer::new(signaling);
1287        let peer = Peer::create(
1288            id,
1289            avdtp,
1290            Streams::default(),
1291            None,
1292            proxy,
1293            bt_metrics::MetricsLogger::default(),
1294        );
1295
1296        let closed_fut = peer.closed();
1297
1298        let mut closed_fut = pin!(closed_fut);
1299
1300        assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
1301
1302        // Close the remote channel
1303        drop(remote);
1304
1305        assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
1306    }
1307
1308    #[fuchsia::test]
1309    fn peer_collect_capabilities_success() {
1310        let mut exec = fasync::TestExecutor::new();
1311
1312        let (mut remote, _, cobalt_receiver, peer) =
1313            setup_test_peer(true, build_test_streams(), None);
1314
1315        let p: ProfileDescriptor = ProfileDescriptor {
1316            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1317            major_version: Some(1),
1318            minor_version: Some(2),
1319            ..Default::default()
1320        };
1321        let _ = peer.set_descriptor(p);
1322
1323        let collect_future = peer.collect_capabilities();
1324        let mut collect_future = pin!(collect_future);
1325
1326        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1327
1328        // Expect a discover command.
1329        let received = recv_remote(&mut remote).unwrap();
1330        // Last half of header must be Single (0b00) and Command (0b00)
1331        assert_eq!(0x00, received[0] & 0xF);
1332        assert_eq!(0x01, received[1]); // 0x01 = Discover
1333
1334        let txlabel_raw = received[0] & 0xF0;
1335
1336        // Respond with a set of streams.
1337        let response: &[u8] = &[
1338            txlabel_raw << 4 | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1339            0x01,                              // Discover
1340            0x3E << 2 | 0x0 << 1,              // SEID (3E), Not In Use (0b0)
1341            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1342            0x01 << 2 | 0x1 << 1,              // SEID (1), In Use (0b1)
1343            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1344        ];
1345        expect_send(&mut exec, &mut remote, response.to_vec());
1346
1347        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1348
1349        // Expect a get capabilities and respond.
1350        #[rustfmt::skip]
1351        let capabilities_rsp = &[
1352            // MediaTransport (Length of Service Capability = 0)
1353            0x01, 0x00,
1354            // Media Codec (LOSC = 2 + 4), Media Type Audio (0x00), Codec type (0x04), Codec specific 0xF09F9296
1355            0x07, 0x06, 0x00, 0x04, 0xF0, 0x9F, 0x92, 0x96
1356        ];
1357        expect_get_capabilities_and_respond(&mut exec, &mut remote, 0x3E, capabilities_rsp);
1358
1359        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1360
1361        // Expect a get capabilities and respond.
1362        #[rustfmt::skip]
1363        let capabilities_rsp = &[
1364            // MediaTransport (Length of Service Capability = 0)
1365            0x01, 0x00,
1366            // Media Codec (LOSC = 2 + 2), Media Type Audio (0x00), Codec type (0x00), Codec specific 0xC0DE
1367            0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1368        ];
1369        expect_get_capabilities_and_respond(&mut exec, &mut remote, 0x01, capabilities_rsp);
1370
1371        match exec.run_until_stalled(&mut collect_future) {
1372            Poll::Pending => panic!("collect capabilities should be complete"),
1373            Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1374            Poll::Ready(Ok(endpoints)) => {
1375                let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1376                let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1377                for stream in endpoints {
1378                    if stream.local_id() == &first_seid {
1379                        let expected_caps = vec![
1380                            ServiceCapability::MediaTransport,
1381                            ServiceCapability::MediaCodec {
1382                                media_type: avdtp::MediaType::Audio,
1383                                codec_type: avdtp::MediaCodecType::new(0x04),
1384                                codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1385                            },
1386                        ];
1387                        assert_eq!(&expected_caps, stream.capabilities());
1388                    } else if stream.local_id() == &second_seid {
1389                        let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1390                        assert_eq!(Some(&expected_codec_type), stream.codec_type());
1391                    } else {
1392                        panic!("Unexpected endpoint in the streams collected");
1393                    }
1394                }
1395            }
1396        }
1397
1398        // Collect reported cobalt logs.
1399        let mut recv = cobalt_receiver.expect("should have receiver");
1400        let mut log_events = Vec::new();
1401        while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1402            log_events.push(respond_to_metrics_req_for_test(req));
1403        }
1404
1405        // Should have sent two metric events for codec and one for capability.
1406        assert_eq!(3, log_events.len());
1407        assert!(log_events.contains(&MetricEvent {
1408            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1409            event_codes: vec![
1410                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1411            ],
1412            payload: MetricEventPayload::Count(1),
1413        }));
1414        assert!(log_events.contains(&MetricEvent {
1415            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1416            event_codes: vec![
1417                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac as u32
1418            ],
1419            payload: MetricEventPayload::Count(1),
1420        }));
1421        assert!(log_events.contains(&MetricEvent {
1422            metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1423            event_codes: vec![
1424                bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1425            ],
1426            payload: MetricEventPayload::Count(1),
1427        }));
1428
1429        // The second time, we don't expect to ask the peer again.
1430        let collect_future = peer.collect_capabilities();
1431        let mut collect_future = pin!(collect_future);
1432
1433        match exec.run_until_stalled(&mut collect_future) {
1434            Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1435            x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1436        };
1437    }
1438
1439    #[fuchsia::test]
1440    fn peer_collect_all_capabilities_success() {
1441        let mut exec = fasync::TestExecutor::new();
1442
1443        let (mut remote, _, cobalt_receiver, peer) =
1444            setup_test_peer(true, build_test_streams(), None);
1445        let p: ProfileDescriptor = ProfileDescriptor {
1446            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1447            major_version: Some(1),
1448            minor_version: Some(3),
1449            ..Default::default()
1450        };
1451        let _ = peer.set_descriptor(p);
1452
1453        let collect_future = peer.collect_capabilities();
1454        let mut collect_future = pin!(collect_future);
1455
1456        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1457
1458        // Expect a discover command.
1459        let received = recv_remote(&mut remote).unwrap();
1460        // Last half of header must be Single (0b00) and Command (0b00)
1461        assert_eq!(0x00, received[0] & 0xF);
1462        assert_eq!(0x01, received[1]); // 0x01 = Discover
1463
1464        let txlabel_raw = received[0] & 0xF0;
1465
1466        // Respond with a set of streams.
1467        let response: &[u8] = &[
1468            txlabel_raw << 4 | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1469            0x01,                              // Discover
1470            0x3E << 2 | 0x0 << 1,              // SEID (3E), Not In Use (0b0)
1471            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1472            0x01 << 2 | 0x1 << 1,              // SEID (1), In Use (0b1)
1473            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1474        ];
1475        expect_send(&mut exec, &mut remote, response.to_vec());
1476
1477        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1478
1479        // Expect a get all capabilities and respond.
1480        #[rustfmt::skip]
1481        let capabilities_rsp = &[
1482            // MediaTransport (Length of Service Capability = 0)
1483            0x01, 0x00,
1484            // Media Codec (LOSC = 2 + 4), Media Type Audio (0x00), Codec type (0x40), Codec specific 0xF09F9296
1485            0x07, 0x06, 0x00, 0x40, 0xF0, 0x9F, 0x92, 0x96,
1486            // Delay Reporting (LOSC = 0)
1487            0x08, 0x00
1488        ];
1489        expect_get_all_capabilities_and_respond(&mut exec, &mut remote, 0x3E, capabilities_rsp);
1490
1491        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1492
1493        // Expect a get all capabilities and respond.
1494        #[rustfmt::skip]
1495        let capabilities_rsp = &[
1496            // MediaTransport (Length of Service Capability = 0)
1497            0x01, 0x00,
1498            // Media Codec (LOSC = 2 + 2), Media Type Audio (0x00), Codec type (0x00), Codec specific 0xC0DE
1499            0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1500        ];
1501        expect_get_all_capabilities_and_respond(&mut exec, &mut remote, 0x01, capabilities_rsp);
1502
1503        match exec.run_until_stalled(&mut collect_future) {
1504            Poll::Pending => panic!("collect capabilities should be complete"),
1505            Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1506            Poll::Ready(Ok(endpoints)) => {
1507                let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1508                let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1509                for stream in endpoints {
1510                    if stream.local_id() == &first_seid {
1511                        let expected_caps = vec![
1512                            ServiceCapability::MediaTransport,
1513                            ServiceCapability::MediaCodec {
1514                                media_type: avdtp::MediaType::Audio,
1515                                codec_type: avdtp::MediaCodecType::new(0x40),
1516                                codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1517                            },
1518                            ServiceCapability::DelayReporting,
1519                        ];
1520                        assert_eq!(&expected_caps, stream.capabilities());
1521                    } else if stream.local_id() == &second_seid {
1522                        let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1523                        assert_eq!(Some(&expected_codec_type), stream.codec_type());
1524                    } else {
1525                        panic!("Unexpected endpoint in the streams collected");
1526                    }
1527                }
1528            }
1529        }
1530
1531        // Collect reported cobalt logs.
1532        let mut recv = cobalt_receiver.expect("should have receiver");
1533        let mut log_events = Vec::new();
1534        while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1535            log_events.push(respond_to_metrics_req_for_test(req));
1536        }
1537
1538        // Should have sent two metric events for codec and two for capability.
1539        assert_eq!(4, log_events.len());
1540        assert!(log_events.contains(&MetricEvent {
1541            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1542            event_codes: vec![
1543                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown as u32
1544            ],
1545            payload: MetricEventPayload::Count(1),
1546        }));
1547        assert!(log_events.contains(&MetricEvent {
1548            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1549            event_codes: vec![
1550                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1551            ],
1552            payload: MetricEventPayload::Count(1),
1553        }));
1554        assert!(log_events.contains(&MetricEvent {
1555            metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1556            event_codes: vec![
1557                bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1558            ],
1559            payload: MetricEventPayload::Count(1),
1560        }));
1561        assert!(log_events.contains(&MetricEvent {
1562            metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1563            event_codes: vec![
1564                bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport as u32
1565            ],
1566            payload: MetricEventPayload::Count(1),
1567        }));
1568
1569        // The second time, we don't expect to ask the peer again.
1570        let collect_future = peer.collect_capabilities();
1571        let mut collect_future = pin!(collect_future);
1572
1573        match exec.run_until_stalled(&mut collect_future) {
1574            Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1575            x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1576        };
1577    }
1578
1579    #[fuchsia::test]
1580    fn peer_collect_capabilities_discovery_fails() {
1581        let mut exec = fasync::TestExecutor::new();
1582
1583        let (mut remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1584
1585        let collect_future = peer.collect_capabilities();
1586        let mut collect_future = pin!(collect_future);
1587
1588        // Shouldn't finish yet.
1589        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1590
1591        // Expect a discover command.
1592        let received = recv_remote(&mut remote).unwrap();
1593        // Last half of header must be Single (0b00) and Command (0b00)
1594        assert_eq!(0x00, received[0] & 0xF);
1595        assert_eq!(0x01, received[1]); // 0x01 = Discover
1596
1597        let txlabel_raw = received[0] & 0xF0;
1598
1599        // Respond with an error.
1600        let response: &[u8] = &[
1601            txlabel_raw | 0x0 << 2 | 0x3, // txlabel (same), Single (0b00), Response Reject (0b11)
1602            0x01,                         // Discover
1603            0x31,                         // BAD_STATE
1604        ];
1605        expect_send(&mut exec, &mut remote, response.to_vec());
1606
1607        // Should be done with an error.
1608        // Should finish!
1609        match exec.run_until_stalled(&mut collect_future) {
1610            Poll::Pending => panic!("Should be ready after discovery failure"),
1611            Poll::Ready(Ok(x)) => panic!("Should be an error but returned {x:?}"),
1612            Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
1613                assert_eq!(Some(Ok(avdtp::ErrorCode::BadState)), e.error_code());
1614            }
1615            Poll::Ready(Err(e)) => panic!("Should have been a RemoteRejected was was {e:?}"),
1616        }
1617    }
1618
1619    #[fuchsia::test]
1620    fn peer_collect_capabilities_get_capability_fails() {
1621        let mut exec = fasync::TestExecutor::new();
1622
1623        let (mut remote, _, _, peer) = setup_test_peer(true, build_test_streams(), None);
1624
1625        let collect_future = peer.collect_capabilities();
1626        let mut collect_future = pin!(collect_future);
1627
1628        // Shouldn't finish yet.
1629        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1630
1631        // Expect a discover command.
1632        let received = recv_remote(&mut remote).unwrap();
1633        // Last half of header must be Single (0b00) and Command (0b00)
1634        assert_eq!(0x00, received[0] & 0xF);
1635        assert_eq!(0x01, received[1]); // 0x01 = Discover
1636
1637        let txlabel_raw = received[0] & 0xF0;
1638
1639        // Respond with a set of streams.
1640        let response: &[u8] = &[
1641            txlabel_raw << 4 | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1642            0x01,                              // Discover
1643            0x3E << 2 | 0x0 << 1,              // SEID (3E), Not In Use (0b0)
1644            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1645            0x01 << 2 | 0x1 << 1,              // SEID (1), In Use (0b1)
1646            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1647        ];
1648        expect_send(&mut exec, &mut remote, response.to_vec());
1649
1650        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1651
1652        // Expect a get capabilities request
1653        let expected_seid = 0x3E;
1654        let received = recv_remote(&mut remote).unwrap();
1655        // Last half of header must be Single (0b00) and Command (0b00)
1656        assert_eq!(0x00, received[0] & 0xF);
1657        assert_eq!(0x02, received[1]); // 0x02 = Get Capabilities
1658        assert_eq!(expected_seid << 2, received[2]);
1659
1660        let txlabel_raw = received[0] & 0xF0;
1661
1662        let response: &[u8] = &[
1663            txlabel_raw | 0x0 << 2 | 0x3, // txlabel (same), Single (0b00), Response Reject (0b11)
1664            0x02,                         // Get Capabilities
1665            0x12,                         // BAD_ACP_SEID
1666        ];
1667        expect_send(&mut exec, &mut remote, response.to_vec());
1668
1669        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1670
1671        // Expect a get capabilities request (skipped the last one)
1672        let expected_seid = 0x01;
1673        let received = recv_remote(&mut remote).unwrap();
1674        // Last half of header must be Single (0b00) and Command (0b00)
1675        assert_eq!(0x00, received[0] & 0xF);
1676        assert_eq!(0x02, received[1]); // 0x02 = Get Capabilities
1677        assert_eq!(expected_seid << 2, received[2]);
1678
1679        let txlabel_raw = received[0] & 0xF0;
1680
1681        let response: &[u8] = &[
1682            txlabel_raw | 0x0 << 2 | 0x3, // txlabel (same), Single (0b00), Response Reject (0b11)
1683            0x02,                         // Get Capabilities
1684            0x12,                         // BAD_ACP_SEID
1685        ];
1686        expect_send(&mut exec, &mut remote, response.to_vec());
1687
1688        // Should be done without an error, but with no streams.
1689        match exec.run_until_stalled(&mut collect_future) {
1690            Poll::Pending => panic!("Should be ready after discovery failure"),
1691            Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1692            Poll::Ready(Ok(map)) => assert_eq!(0, map.len()),
1693        }
1694    }
1695
1696    fn receive_simple_accept(exec: &mut fasync::TestExecutor, remote: &mut Channel, signal_id: u8) {
1697        let received = recv_remote(remote).expect("expected a packet");
1698        // Last half of header must be Single (0b00) and Command (0b00)
1699        assert_eq!(0x00, received[0] & 0xF);
1700        assert_eq!(signal_id, received[1]);
1701
1702        let txlabel_raw = received[0] & 0xF0;
1703
1704        let response: &[u8] = &[
1705            txlabel_raw | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1706            signal_id,
1707        ];
1708        expect_send(exec, remote, response.to_vec());
1709    }
1710
1711    #[fuchsia::test]
1712    fn peer_stream_start_success() {
1713        let mut exec = fasync::TestExecutor::new();
1714
1715        let (mut remote, mut profile_request_stream, _, peer) =
1716            setup_test_peer(false, build_test_streams(), None);
1717
1718        let remote_seid = 2_u8.try_into().unwrap();
1719
1720        let codec_params = ServiceCapability::MediaCodec {
1721            media_type: avdtp::MediaType::Audio,
1722            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1723            codec_extra: vec![0x11, 0x45, 51, 51],
1724        };
1725
1726        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1727        let mut start_future = pin!(start_future);
1728
1729        match exec.run_until_stalled(&mut start_future) {
1730            Poll::Pending => {}
1731            x => panic!("Expected pending, but got {x:?}"),
1732        };
1733
1734        receive_simple_accept(&mut exec, &mut remote, 0x03); // Set Configuration
1735
1736        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1737
1738        receive_simple_accept(&mut exec, &mut remote, 0x06); // Open
1739
1740        match exec.run_until_stalled(&mut start_future) {
1741            Poll::Pending => {}
1742            Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1743            Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1744        };
1745
1746        // Should connect the media channel after open.
1747        let (_, transport) = Channel::create();
1748
1749        let request = exec.run_until_stalled(&mut profile_request_stream.next());
1750        match request {
1751            Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
1752                assert_eq!(PeerId(1), peer_id.into());
1753                assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
1754                let channel = transport.try_into().unwrap();
1755                responder.send(Ok(channel)).expect("responder sends");
1756            }
1757            x => panic!("Should have sent a open l2cap request, but got {:?}", x),
1758        };
1759
1760        match exec.run_until_stalled(&mut start_future) {
1761            Poll::Pending => {}
1762            Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1763            Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1764        };
1765
1766        receive_simple_accept(&mut exec, &mut remote, 0x07); // Start
1767
1768        // Should return the media stream (which should be connected)
1769        // Should be done without an error, but with no streams.
1770        match exec.run_until_stalled(&mut start_future) {
1771            Poll::Pending => panic!("Should be ready after start succeeds"),
1772            Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1773            // TODO: confirm the stream is usable
1774            Poll::Ready(Ok(())) => {
1775                assert!(peer.is_streaming_now());
1776            }
1777        }
1778    }
1779
1780    #[fuchsia::test]
1781    fn peer_stream_start_picks_correct_direction() {
1782        let mut exec = fasync::TestExecutor::new();
1783
1784        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1785        let remote = avdtp::Peer::new(remote);
1786        let mut remote_events = remote.take_request_stream();
1787
1788        // Respond as if we have a single SBC Source Stream
1789        fn remote_handle_request(req: avdtp::Request) {
1790            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1791            let res = match req {
1792                avdtp::Request::Discover { responder } => {
1793                    let infos = [avdtp::StreamInformation::new(
1794                        expected_stream_id,
1795                        false,
1796                        avdtp::MediaType::Audio,
1797                        avdtp::EndpointType::Source,
1798                    )];
1799                    responder.send(&infos)
1800                }
1801                avdtp::Request::GetAllCapabilities { stream_id, responder }
1802                | avdtp::Request::GetCapabilities { stream_id, responder } => {
1803                    assert_eq!(expected_stream_id, stream_id);
1804                    let caps = vec![
1805                        ServiceCapability::MediaTransport,
1806                        ServiceCapability::MediaCodec {
1807                            media_type: avdtp::MediaType::Audio,
1808                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1809                            codec_extra: vec![0x11, 0x45, 51, 250],
1810                        },
1811                    ];
1812                    responder.send(&caps[..])
1813                }
1814                avdtp::Request::Open { responder, stream_id } => {
1815                    assert_eq!(expected_stream_id, stream_id);
1816                    responder.send()
1817                }
1818                avdtp::Request::SetConfiguration {
1819                    responder,
1820                    local_stream_id,
1821                    remote_stream_id,
1822                    ..
1823                } => {
1824                    assert_eq!(local_stream_id, expected_stream_id);
1825                    // This is the "sink" local stream id.
1826                    assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1827                    responder.send()
1828                }
1829                x => panic!("Unexpected request: {:?}", x),
1830            };
1831            res.expect("should be able to respond");
1832        }
1833
1834        // Need to discover the remote streams first, or the stream start will not work.
1835        let collect_capabilities_fut = peer.collect_capabilities();
1836        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1837
1838        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1839
1840        let request = exec.run_singlethreaded(&mut remote_events.next());
1841        remote_handle_request(request.expect("should have a discovery request").unwrap());
1842
1843        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1844        let request = exec.run_singlethreaded(&mut remote_events.next());
1845        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1846
1847        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1848
1849        // Try to start the stream.  It should continue to configure and connect.
1850        let remote_seid = 4_u8.try_into().unwrap();
1851
1852        let codec_params = ServiceCapability::MediaCodec {
1853            media_type: avdtp::MediaType::Audio,
1854            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1855            codec_extra: vec![0x11, 0x45, 51, 51],
1856        };
1857        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1858        let mut start_future = pin!(start_future);
1859
1860        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1861        let request = exec.run_singlethreaded(&mut remote_events.next());
1862        remote_handle_request(request.expect("should have a set_capabilities request").unwrap());
1863
1864        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1865        let request = exec.run_singlethreaded(&mut remote_events.next());
1866        remote_handle_request(request.expect("should have an open request").unwrap());
1867    }
1868
1869    #[fuchsia::test]
1870    fn peer_stream_start_strips_unsupported_local_capabilities() {
1871        let mut exec = fasync::TestExecutor::new();
1872
1873        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1874        let remote = avdtp::Peer::new(remote);
1875        let mut remote_events = remote.take_request_stream();
1876
1877        // Respond as if we have a single SBC Source Stream
1878        fn remote_handle_request(req: avdtp::Request) {
1879            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1880            let res = match req {
1881                avdtp::Request::Discover { responder } => {
1882                    let infos = [avdtp::StreamInformation::new(
1883                        expected_stream_id,
1884                        false,
1885                        avdtp::MediaType::Audio,
1886                        avdtp::EndpointType::Source,
1887                    )];
1888                    responder.send(&infos)
1889                }
1890                avdtp::Request::GetAllCapabilities { stream_id, responder }
1891                | avdtp::Request::GetCapabilities { stream_id, responder } => {
1892                    assert_eq!(expected_stream_id, stream_id);
1893                    let caps = vec![
1894                        ServiceCapability::MediaTransport,
1895                        // We don't have a local delay-reporting, so this shouldn't be requested.
1896                        ServiceCapability::DelayReporting,
1897                        ServiceCapability::MediaCodec {
1898                            media_type: avdtp::MediaType::Audio,
1899                            codec_type: avdtp::MediaCodecType::AUDIO_AAC,
1900                            codec_extra: vec![128, 0, 132, 134, 0, 0],
1901                        },
1902                    ];
1903                    responder.send(&caps[..])
1904                }
1905                avdtp::Request::Open { responder, stream_id } => {
1906                    assert_eq!(expected_stream_id, stream_id);
1907                    responder.send()
1908                }
1909                avdtp::Request::SetConfiguration {
1910                    responder,
1911                    local_stream_id,
1912                    remote_stream_id,
1913                    capabilities,
1914                } => {
1915                    assert_eq!(local_stream_id, expected_stream_id);
1916                    // This is the "sink" local stream id.
1917                    assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1918                    // Make sure we didn't request a DelayReport since the local Sink doesn't
1919                    // support it.
1920                    assert!(!capabilities.contains(&ServiceCapability::DelayReporting));
1921                    responder.send()
1922                }
1923                x => panic!("Unexpected request: {:?}", x),
1924            };
1925            res.expect("should be able to respond");
1926        }
1927
1928        // Need to discover the remote streams first, or the stream start will not work.
1929        let collect_capabilities_fut = peer.collect_capabilities();
1930        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1931
1932        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1933
1934        let request = exec.run_singlethreaded(&mut remote_events.next());
1935        remote_handle_request(request.expect("should have a discovery request").unwrap());
1936
1937        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1938        let request = exec.run_singlethreaded(&mut remote_events.next());
1939        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1940
1941        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1942
1943        // Try to start the stream.  It should continue to configure and connect.
1944        let remote_seid = 4_u8.try_into().unwrap();
1945
1946        let codec_params = ServiceCapability::MediaCodec {
1947            media_type: avdtp::MediaType::Audio,
1948            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1949            codec_extra: vec![0x11, 0x45, 51, 51],
1950        };
1951        let start_future =
1952            peer.stream_start(remote_seid, vec![codec_params, ServiceCapability::DelayReporting]);
1953        let mut start_future = pin!(start_future);
1954
1955        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1956        let request = exec.run_singlethreaded(&mut remote_events.next());
1957        remote_handle_request(request.expect("should have a set_configuration request").unwrap());
1958
1959        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1960        let request = exec.run_singlethreaded(&mut remote_events.next());
1961        remote_handle_request(request.expect("should have an open request").unwrap());
1962    }
1963
1964    #[fuchsia::test]
1965    fn peer_stream_start_orders_local_capabilities() {
1966        let mut exec = fasync::TestExecutor::new();
1967
1968        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams_delayable(), None);
1969        let remote = avdtp::Peer::new(remote);
1970        let mut remote_events = remote.take_request_stream();
1971
1972        // Respond as if we have a single SBC Source Stream
1973        fn remote_handle_request(req: avdtp::Request) {
1974            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1975            let res = match req {
1976                avdtp::Request::Discover { responder } => {
1977                    let infos = [avdtp::StreamInformation::new(
1978                        expected_stream_id,
1979                        false,
1980                        avdtp::MediaType::Audio,
1981                        avdtp::EndpointType::Source,
1982                    )];
1983                    responder.send(&infos)
1984                }
1985                avdtp::Request::GetAllCapabilities { stream_id, responder }
1986                | avdtp::Request::GetCapabilities { stream_id, responder } => {
1987                    assert_eq!(expected_stream_id, stream_id);
1988                    let caps = &[
1989                        ServiceCapability::MediaTransport,
1990                        ServiceCapability::MediaCodec {
1991                            media_type: avdtp::MediaType::Audio,
1992                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1993                            codec_extra: vec![0x11, 0x45, 51, 250],
1994                        },
1995                        ServiceCapability::DelayReporting,
1996                    ];
1997                    responder.send(caps)
1998                }
1999                avdtp::Request::Open { responder, stream_id } => {
2000                    assert_eq!(expected_stream_id, stream_id);
2001                    responder.send()
2002                }
2003                avdtp::Request::SetConfiguration {
2004                    responder,
2005                    local_stream_id,
2006                    remote_stream_id,
2007                    capabilities,
2008                } => {
2009                    assert_eq!(local_stream_id, expected_stream_id);
2010                    // This is the "sink" local stream id.
2011                    assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
2012                    // The capabilities should be in order.
2013                    let mut capabilities_ordered = capabilities.clone();
2014                    capabilities_ordered.sort_by_key(ServiceCapability::category);
2015                    assert_eq!(capabilities, capabilities_ordered);
2016                    responder.send()
2017                }
2018                x => panic!("Unexpected request: {:?}", x),
2019            };
2020            res.expect("should be able to respond");
2021        }
2022
2023        // Need to discover the remote streams first, or the stream start will not work.
2024        let collect_capabilities_fut = peer.collect_capabilities();
2025        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2026
2027        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2028
2029        let request = exec.run_singlethreaded(&mut remote_events.next());
2030        remote_handle_request(request.expect("should have a discovery request").unwrap());
2031
2032        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2033        let request = exec.run_singlethreaded(&mut remote_events.next());
2034        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2035
2036        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2037
2038        // Try to start the stream.  It should continue to configure and connect.
2039        let remote_seid = 4_u8.try_into().unwrap();
2040
2041        let codec_params = ServiceCapability::MediaCodec {
2042            media_type: avdtp::MediaType::Audio,
2043            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2044            codec_extra: vec![0x11, 0x45, 51, 51],
2045        };
2046        let start_future = peer.stream_start(
2047            remote_seid,
2048            vec![
2049                ServiceCapability::MediaTransport,
2050                ServiceCapability::DelayReporting,
2051                codec_params,
2052            ],
2053        );
2054        let mut start_future = pin!(start_future);
2055
2056        assert!(exec.run_until_stalled(&mut start_future).is_pending());
2057        let request = exec.run_singlethreaded(&mut remote_events.next());
2058        remote_handle_request(request.expect("should have a set_configuration request").unwrap());
2059
2060        assert!(exec.run_until_stalled(&mut start_future).is_pending());
2061        let request = exec.run_singlethreaded(&mut remote_events.next());
2062        remote_handle_request(request.expect("should have an open request").unwrap());
2063    }
2064
2065    /// Tests that A2DP streaming does not start if the streaming permit is revoked during streaming
2066    /// setup.
2067    #[fuchsia::test]
2068    fn peer_stream_start_permit_revoked() {
2069        let mut exec = fasync::TestExecutor::new();
2070
2071        let test_permits = Permits::new(1);
2072        let (mut remote, mut profile_request_stream, _, peer) =
2073            setup_test_peer(false, build_test_streams(), Some(test_permits.clone()));
2074
2075        let remote_seid = 2_u8.try_into().unwrap();
2076
2077        let codec_params = ServiceCapability::MediaCodec {
2078            media_type: avdtp::MediaType::Audio,
2079            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2080            codec_extra: vec![0x11, 0x45, 51, 51],
2081        };
2082
2083        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2084        let mut start_future = pin!(start_future);
2085
2086        let _ = exec
2087            .run_until_stalled(&mut start_future)
2088            .expect_pending("waiting for set config response");
2089        receive_simple_accept(&mut exec, &mut remote, 0x03); // Set Configuration
2090        exec.run_until_stalled(&mut start_future).expect_pending("waiting for open response");
2091        receive_simple_accept(&mut exec, &mut remote, 0x06); // Open
2092        exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2093        assert!(!peer.is_streaming_now());
2094
2095        // Should connect the media channel after open.
2096        let (_, transport) = Channel::create();
2097
2098        let request = exec.run_until_stalled(&mut profile_request_stream.next());
2099        match request {
2100            Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
2101                assert_eq!(PeerId(1), peer_id.into());
2102                assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
2103                let channel = transport.try_into().unwrap();
2104                responder.send(Ok(channel)).expect("responder sends");
2105            }
2106            x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2107        };
2108
2109        exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2110        assert!(!peer.is_streaming_now());
2111
2112        // Before peer responds to start, the permit gets taken.
2113        let seized_permits = test_permits.seize();
2114        assert_eq!(seized_permits.len(), 1);
2115        receive_simple_accept(&mut exec, &mut remote, 0x07); // Start
2116
2117        // Streaming should not locally begin because there is no available permit. The Start
2118        // response is handled gracefully.
2119        exec.run_until_stalled(&mut start_future)
2120            .expect_pending("waiting to send outgoing suspend");
2121        assert!(!peer.is_streaming_now());
2122        // We should issue an outgoing suspend request to synchronize state with the remote peer.
2123        receive_simple_accept(&mut exec, &mut remote, 0x09); // Suspend
2124
2125        // The start future should resolve without Error, and A2DP should not have started
2126        // streaming.
2127        let () = exec
2128            .run_until_stalled(&mut start_future)
2129            .expect("start finished")
2130            .expect("suspended stream is ok");
2131        assert!(!peer.is_streaming_now());
2132    }
2133
2134    #[fuchsia::test]
2135    fn peer_stream_start_fails_wrong_direction() {
2136        let mut exec = fasync::TestExecutor::new();
2137
2138        // Setup peers with only one Source Stream.
2139        let mut streams = Streams::default();
2140        let source = Stream::build(
2141            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2142            TestMediaTaskBuilder::new().builder(),
2143        );
2144        streams.insert(source);
2145
2146        let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2147        let remote = avdtp::Peer::new(remote);
2148        let mut remote_events = remote.take_request_stream();
2149
2150        // Respond as if we have a single SBC Source Stream
2151        fn remote_handle_request(req: avdtp::Request) {
2152            let expected_stream_id: StreamEndpointId = 2_u8.try_into().unwrap();
2153            let res = match req {
2154                avdtp::Request::Discover { responder } => {
2155                    let infos = [avdtp::StreamInformation::new(
2156                        expected_stream_id,
2157                        false,
2158                        avdtp::MediaType::Audio,
2159                        avdtp::EndpointType::Source,
2160                    )];
2161                    responder.send(&infos)
2162                }
2163                avdtp::Request::GetAllCapabilities { stream_id, responder }
2164                | avdtp::Request::GetCapabilities { stream_id, responder } => {
2165                    assert_eq!(expected_stream_id, stream_id);
2166                    let caps = vec![
2167                        ServiceCapability::MediaTransport,
2168                        ServiceCapability::MediaCodec {
2169                            media_type: avdtp::MediaType::Audio,
2170                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2171                            codec_extra: vec![0x11, 0x45, 51, 250],
2172                        },
2173                    ];
2174                    responder.send(&caps[..])
2175                }
2176                avdtp::Request::Open { responder, .. } => responder.send(),
2177                avdtp::Request::SetConfiguration { responder, .. } => responder.send(),
2178                x => panic!("Unexpected request: {:?}", x),
2179            };
2180            res.expect("should be able to respond");
2181        }
2182
2183        // Need to discover the remote streams first, or the stream start will always work.
2184        let collect_capabilities_fut = peer.collect_capabilities();
2185        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2186
2187        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2188
2189        let request = exec.run_singlethreaded(&mut remote_events.next());
2190        remote_handle_request(request.expect("should have a discovery request").unwrap());
2191
2192        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2193        let request = exec.run_singlethreaded(&mut remote_events.next());
2194        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2195
2196        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2197
2198        // Try to start the stream.  It should fail with OutOfRange because we can't connect a Source to a Source.
2199        let remote_seid = 2_u8.try_into().unwrap();
2200
2201        let codec_params = ServiceCapability::MediaCodec {
2202            media_type: avdtp::MediaType::Audio,
2203            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2204            codec_extra: vec![0x11, 0x45, 51, 51],
2205        };
2206        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2207        let mut start_future = pin!(start_future);
2208
2209        match exec.run_until_stalled(&mut start_future) {
2210            Poll::Ready(Err(avdtp::Error::OutOfRange)) => {}
2211            x => panic!("Expected a ready OutOfRange error but got {:?}", x),
2212        };
2213    }
2214
2215    #[fuchsia::test]
2216    fn peer_stream_start_fails_to_connect() {
2217        let mut exec = fasync::TestExecutor::new();
2218
2219        let (mut remote, mut profile_request_stream, _, peer) =
2220            setup_test_peer(false, build_test_streams(), None);
2221
2222        let remote_seid = 2_u8.try_into().unwrap();
2223
2224        let codec_params = ServiceCapability::MediaCodec {
2225            media_type: avdtp::MediaType::Audio,
2226            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2227            codec_extra: vec![0x11, 0x45, 51, 51],
2228        };
2229
2230        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2231        let mut start_future = pin!(start_future);
2232
2233        match exec.run_until_stalled(&mut start_future) {
2234            Poll::Pending => {}
2235            x => panic!("was expecting pending but got {x:?}"),
2236        };
2237
2238        receive_simple_accept(&mut exec, &mut remote, 0x03); // Set Configuration
2239
2240        assert!(exec.run_until_stalled(&mut start_future).is_pending());
2241
2242        receive_simple_accept(&mut exec, &mut remote, 0x06); // Open
2243
2244        match exec.run_until_stalled(&mut start_future) {
2245            Poll::Pending => {}
2246            Poll::Ready(x) => panic!("Expected to be pending but {x:?}"),
2247        };
2248
2249        // Should connect the media channel after open.
2250        let request = exec.run_until_stalled(&mut profile_request_stream.next());
2251        match request {
2252            Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, responder, .. }))) => {
2253                assert_eq!(PeerId(1), peer_id.into());
2254                responder.send(Err(ErrorCode::Failed)).expect("responder sends");
2255            }
2256            x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2257        };
2258
2259        // Should return an error.
2260        // Should be done without an error, but with no streams.
2261        match exec.run_until_stalled(&mut start_future) {
2262            Poll::Pending => panic!("Should be ready after start fails"),
2263            Poll::Ready(Ok(_stream)) => panic!("Shouldn't have succeeded stream here"),
2264            Poll::Ready(Err(_)) => {}
2265        }
2266    }
2267
2268    /// Test that the delay reports get acknowledged and they are sent to cobalt.
2269    #[fuchsia::test]
2270    async fn peer_delay_report() {
2271        let (remote, _profile_requests, cobalt_recv, peer) =
2272            setup_test_peer(true, build_test_streams(), None);
2273        let remote_peer = avdtp::Peer::new(remote);
2274        let mut remote_events = remote_peer.take_request_stream();
2275
2276        // Respond as if we have a single SBC Sink Stream
2277        async fn remote_handle_request(req: avdtp::Request, peer: &avdtp::Peer) {
2278            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
2279            // "peer" in this case is the test code Peer stream
2280            let expected_peer_stream_id: StreamEndpointId = 1_u8.try_into().unwrap();
2281            use avdtp::Request::*;
2282            match req {
2283                Discover { responder } => {
2284                    let infos = [avdtp::StreamInformation::new(
2285                        expected_stream_id,
2286                        false,
2287                        avdtp::MediaType::Audio,
2288                        avdtp::EndpointType::Sink,
2289                    )];
2290                    responder.send(&infos).expect("response should succeed");
2291                }
2292                GetAllCapabilities { stream_id, responder }
2293                | GetCapabilities { stream_id, responder } => {
2294                    assert_eq!(expected_stream_id, stream_id);
2295                    let caps = vec![
2296                        ServiceCapability::MediaTransport,
2297                        ServiceCapability::MediaCodec {
2298                            media_type: avdtp::MediaType::Audio,
2299                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2300                            codec_extra: vec![0x11, 0x45, 51, 250],
2301                        },
2302                    ];
2303                    responder.send(&caps[..]).expect("response should succeed");
2304                    // Sending a delayreport before the stream is configured is not allowed, it's a
2305                    // bad state.
2306                    assert!(peer.delay_report(&expected_peer_stream_id, 0xc0de).await.is_err());
2307                }
2308                Open { responder, stream_id } => {
2309                    // Configuration has happened but open not succeeded yet, send delay reports.
2310                    assert!(peer.delay_report(&expected_stream_id, 0xc0de).await.is_err());
2311                    // Send a delay report to the peer.
2312                    peer.delay_report(&expected_peer_stream_id, 0xc0de)
2313                        .await
2314                        .expect("should get acked correctly");
2315                    assert_eq!(expected_stream_id, stream_id);
2316                    responder.send().expect("response should succeed");
2317                }
2318                SetConfiguration { responder, local_stream_id, remote_stream_id, .. } => {
2319                    assert_eq!(local_stream_id, expected_stream_id);
2320                    assert_eq!(remote_stream_id, expected_peer_stream_id);
2321                    responder.send().expect("should send back response without issue");
2322                }
2323                x => panic!("Unexpected request: {:?}", x),
2324            };
2325        }
2326
2327        let collect_fut = pin!(peer.collect_capabilities());
2328
2329        // Discover then a GetCapabilities.
2330        let Either::Left((request, collect_fut)) =
2331            futures::future::select(remote_events.next(), collect_fut).await
2332        else {
2333            panic!("Collect future shouldn't finish first");
2334        };
2335        let collect_fut = pin!(collect_fut);
2336        remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2337        let Either::Left((request, collect_fut)) =
2338            futures::future::select(remote_events.next(), collect_fut).await
2339        else {
2340            panic!("Collect future shouldn't finish first");
2341        };
2342        remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2343
2344        // Collect future should be able to finish now.
2345        assert_eq!(1, collect_fut.await.expect("should get the remote endpoints back").len());
2346
2347        // Try to start the stream.  It should go through the normal motions,
2348        let remote_seid = 4_u8.try_into().unwrap();
2349
2350        let codec_params = ServiceCapability::MediaCodec {
2351            media_type: avdtp::MediaType::Audio,
2352            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2353            codec_extra: vec![0x11, 0x45, 51, 51],
2354        };
2355
2356        // We don't expect this task to finish before being dropped, since we never respond to the
2357        // request to open the transport channel.
2358        let _start_task = fasync::Task::spawn(async move {
2359            let _ = peer.stream_start(remote_seid, vec![codec_params]).await;
2360            panic!("stream start task finished");
2361        });
2362
2363        let request = remote_events.next().await.expect("should have set_config").unwrap();
2364        remote_handle_request(request, &remote_peer).await;
2365
2366        let request = remote_events.next().await.expect("should have open").unwrap();
2367        remote_handle_request(request, &remote_peer).await;
2368
2369        let mut cobalt = cobalt_recv.expect("should have receiver");
2370
2371        let mut got_ids = HashMap::new();
2372        let delay_metric_id = bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID;
2373        while got_ids.len() < 3 || *got_ids.get(&delay_metric_id).unwrap_or(&0) < 3 {
2374            let report = respond_to_metrics_req_for_test(cobalt.next().await.unwrap().unwrap());
2375            let _ = got_ids.entry(report.metric_id).and_modify(|x| *x += 1).or_insert(1);
2376            // All the delay reports should report the same value correctly.
2377            if report.metric_id == delay_metric_id {
2378                assert_eq!(MetricEventPayload::IntegerValue(0xc0de * 100000), report.payload);
2379            }
2380        }
2381        assert!(got_ids.contains_key(&bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID));
2382        assert!(got_ids.contains_key(&bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID));
2383        assert!(got_ids.contains_key(&delay_metric_id));
2384        // There should have been three reports.
2385        // We report the delay amount even if it fails to work.
2386        assert_eq!(got_ids.get(&delay_metric_id).cloned(), Some(3));
2387    }
2388
2389    fn sbc_capabilities() -> Vec<ServiceCapability> {
2390        let sbc_codec_info = SbcCodecInfo::new(
2391            SbcSamplingFrequency::FREQ48000HZ,
2392            SbcChannelMode::JOINT_STEREO,
2393            SbcBlockCount::SIXTEEN,
2394            SbcSubBands::EIGHT,
2395            SbcAllocation::LOUDNESS,
2396            /* min_bpv= */ 53,
2397            /* max_bpv= */ 53,
2398        )
2399        .expect("sbc codec info");
2400
2401        vec![avdtp::ServiceCapability::MediaTransport, sbc_codec_info.into()]
2402    }
2403
2404    /// Test that the remote end can configure and start a stream.
2405    #[fuchsia::test]
2406    fn peer_as_acceptor() {
2407        let mut exec = fasync::TestExecutor::new();
2408
2409        let mut streams = Streams::default();
2410        let mut test_builder = TestMediaTaskBuilder::new();
2411        streams.insert(Stream::build(
2412            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2413            test_builder.builder(),
2414        ));
2415
2416        let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2417        let remote_peer = avdtp::Peer::new(remote);
2418
2419        let discover_fut = remote_peer.discover();
2420        let mut discover_fut = pin!(discover_fut);
2421
2422        let expected = vec![make_sbc_endpoint(1, avdtp::EndpointType::Source).information()];
2423        match exec.run_until_stalled(&mut discover_fut) {
2424            Poll::Ready(Ok(res)) => assert_eq!(res, expected),
2425            x => panic!("Expected discovery to complete and got {:?}", x),
2426        };
2427
2428        let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2429        let unknown_endpoint_id = 2_u8.try_into().expect("should be able to get sbc endpointid");
2430
2431        let get_caps_fut = remote_peer.get_capabilities(&sbc_endpoint_id);
2432        let mut get_caps_fut = pin!(get_caps_fut);
2433
2434        match exec.run_until_stalled(&mut get_caps_fut) {
2435            // There are two caps (mediatransport, mediacodec) in the sbc endpoint.
2436            Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2437            x => panic!("Get capabilities should be ready but got {:?}", x),
2438        };
2439
2440        let get_caps_fut = remote_peer.get_capabilities(&unknown_endpoint_id);
2441        let mut get_caps_fut = pin!(get_caps_fut);
2442
2443        match exec.run_until_stalled(&mut get_caps_fut) {
2444            Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2445                assert_eq!(Some(Ok(avdtp::ErrorCode::BadAcpSeid)), e.error_code())
2446            }
2447            x => panic!("Get capabilities should be a ready error but got {:?}", x),
2448        };
2449
2450        let get_caps_fut = remote_peer.get_all_capabilities(&sbc_endpoint_id);
2451        let mut get_caps_fut = pin!(get_caps_fut);
2452
2453        match exec.run_until_stalled(&mut get_caps_fut) {
2454            // There are two caps (mediatransport, mediacodec) in the sbc endpoint.
2455            Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2456            x => panic!("Get capabilities should be ready but got {:?}", x),
2457        };
2458
2459        let sbc_caps = sbc_capabilities();
2460        let set_config_fut =
2461            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2462        let mut set_config_fut = pin!(set_config_fut);
2463
2464        match exec.run_until_stalled(&mut set_config_fut) {
2465            Poll::Ready(Ok(())) => {}
2466            x => panic!("Set capabilities should be ready but got {:?}", x),
2467        };
2468
2469        let open_fut = remote_peer.open(&sbc_endpoint_id);
2470        let mut open_fut = pin!(open_fut);
2471        match exec.run_until_stalled(&mut open_fut) {
2472            Poll::Ready(Ok(())) => {}
2473            x => panic!("Open should be ready but got {:?}", x),
2474        };
2475
2476        // Establish a media transport stream
2477        let (_remote_transport, transport) = Channel::create();
2478
2479        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2480
2481        let stream_ids = vec![sbc_endpoint_id.clone()];
2482        let start_fut = remote_peer.start(&stream_ids);
2483        let mut start_fut = pin!(start_fut);
2484        match exec.run_until_stalled(&mut start_fut) {
2485            Poll::Ready(Ok(())) => {}
2486            x => panic!("Start should be ready but got {:?}", x),
2487        };
2488
2489        // The task should be created locally and started.
2490        let media_task = test_builder.expect_task();
2491        assert!(media_task.is_started());
2492
2493        let suspend_fut = remote_peer.suspend(&stream_ids);
2494        let mut suspend_fut = pin!(suspend_fut);
2495        match exec.run_until_stalled(&mut suspend_fut) {
2496            Poll::Ready(Ok(())) => {}
2497            x => panic!("Start should be ready but got {:?}", x),
2498        };
2499
2500        // Should have stopped the media task on suspend.
2501        assert!(!media_task.is_started());
2502    }
2503
2504    #[fuchsia::test]
2505    fn peer_set_config_reject_first() {
2506        let mut exec = fasync::TestExecutor::new();
2507
2508        let mut streams = Streams::default();
2509        let test_builder = TestMediaTaskBuilder::new();
2510        streams.insert(Stream::build(
2511            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2512            test_builder.builder(),
2513        ));
2514
2515        let (remote, _requests, _, _peer) = setup_test_peer(false, streams, None);
2516        let remote_peer = avdtp::Peer::new(remote);
2517
2518        let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2519
2520        let wrong_freq_sbc = &[SbcCodecInfo::new(
2521            SbcSamplingFrequency::FREQ44100HZ, // 44.1 is not supported by the caps from above.
2522            SbcChannelMode::JOINT_STEREO,
2523            SbcBlockCount::SIXTEEN,
2524            SbcSubBands::EIGHT,
2525            SbcAllocation::LOUDNESS,
2526            /* min_bpv= */ 53,
2527            /* max_bpv= */ 53,
2528        )
2529        .expect("sbc codec info")
2530        .into()];
2531
2532        let set_config_fut =
2533            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, wrong_freq_sbc);
2534        let mut set_config_fut = pin!(set_config_fut);
2535
2536        match exec.run_until_stalled(&mut set_config_fut) {
2537            Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2538                assert!(e.service_category().is_some())
2539            }
2540            x => panic!("Set capabilities should have been rejected but got {:?}", x),
2541        };
2542
2543        let sbc_caps = sbc_capabilities();
2544        let set_config_fut =
2545            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2546        let mut set_config_fut = pin!(set_config_fut);
2547
2548        match exec.run_until_stalled(&mut set_config_fut) {
2549            Poll::Ready(Ok(())) => {}
2550            x => panic!("Set capabilities should be ready but got {:?}", x),
2551        };
2552    }
2553
2554    #[fuchsia::test]
2555    fn peer_starts_waiting_streams() {
2556        let mut exec = fasync::TestExecutor::new_with_fake_time();
2557        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(5_000_000_000));
2558
2559        let mut streams = Streams::default();
2560        let mut test_builder = TestMediaTaskBuilder::new();
2561        streams.insert(Stream::build(
2562            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2563            test_builder.builder(),
2564        ));
2565
2566        let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2567        let remote_peer = avdtp::Peer::new(remote);
2568
2569        let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2570
2571        let sbc_caps = sbc_capabilities();
2572        let set_config_fut =
2573            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2574        let mut set_config_fut = pin!(set_config_fut);
2575
2576        match exec.run_until_stalled(&mut set_config_fut) {
2577            Poll::Ready(Ok(())) => {}
2578            x => panic!("Set capabilities should be ready but got {:?}", x),
2579        };
2580
2581        let open_fut = remote_peer.open(&sbc_endpoint_id);
2582        let mut open_fut = pin!(open_fut);
2583        match exec.run_until_stalled(&mut open_fut) {
2584            Poll::Ready(Ok(())) => {}
2585            x => panic!("Open should be ready but got {:?}", x),
2586        };
2587
2588        // Establish a media transport stream
2589        let (_remote_transport, transport) = Channel::create();
2590        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2591
2592        // The remote end should get a start request after the timeout.
2593        let mut remote_requests = remote_peer.take_request_stream();
2594        let next_remote_request_fut = remote_requests.next();
2595        let mut next_remote_request_fut = pin!(next_remote_request_fut);
2596
2597        // Nothing should happen immediately.
2598        assert!(exec.run_until_stalled(&mut next_remote_request_fut).is_pending());
2599
2600        // After the timeout has passed..
2601        exec.set_fake_time(zx::MonotonicDuration::from_seconds(3).after_now());
2602        let _ = exec.wake_expired_timers();
2603
2604        let stream_ids = match exec.run_until_stalled(&mut next_remote_request_fut) {
2605            Poll::Ready(Some(Ok(avdtp::Request::Start { responder, stream_ids }))) => {
2606                responder.send().unwrap();
2607                stream_ids
2608            }
2609            x => panic!("Expected to receive a start request for the stream, got {:?}", x),
2610        };
2611
2612        // We should start the media task, so the task should be created locally
2613        let media_task =
2614            exec.run_until_stalled(&mut test_builder.next_task()).expect("ready").unwrap();
2615        assert!(media_task.is_started());
2616
2617        // Remote peer should still be able to suspend the stream.
2618        let suspend_fut = remote_peer.suspend(&stream_ids);
2619        let mut suspend_fut = pin!(suspend_fut);
2620        match exec.run_until_stalled(&mut suspend_fut) {
2621            Poll::Ready(Ok(())) => {}
2622            x => panic!("Suspend should be ready but got {:?}", x),
2623        };
2624
2625        // Should have stopped the media task on suspend.
2626        assert!(!media_task.is_started());
2627    }
2628
2629    #[fuchsia::test]
2630    fn needs_permit_to_start_streams() {
2631        let mut exec = fasync::TestExecutor::new();
2632
2633        let mut streams = Streams::default();
2634        let mut test_builder = TestMediaTaskBuilder::new();
2635        streams.insert(Stream::build(
2636            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2637            test_builder.builder(),
2638        ));
2639        streams.insert(Stream::build(
2640            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2641            test_builder.builder(),
2642        ));
2643        let mut next_task_fut = test_builder.next_task();
2644
2645        let permits = Permits::new(1);
2646        let taken_permit = permits.get().expect("permit taken");
2647        let (remote, _profile_request_stream, _, peer) =
2648            setup_test_peer(false, streams, Some(permits.clone()));
2649        let remote_peer = avdtp::Peer::new(remote);
2650
2651        let sbc_endpoint_id = 1_u8.try_into().unwrap();
2652
2653        let sbc_caps = sbc_capabilities();
2654        let mut set_config_fut =
2655            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2656
2657        match exec.run_until_stalled(&mut set_config_fut) {
2658            Poll::Ready(Ok(())) => {}
2659            x => panic!("Set capabilities should be ready but got {:?}", x),
2660        };
2661
2662        let mut open_fut = remote_peer.open(&sbc_endpoint_id);
2663        match exec.run_until_stalled(&mut open_fut) {
2664            Poll::Ready(Ok(())) => {}
2665            x => panic!("Open should be ready but got {:?}", x),
2666        };
2667
2668        // Establish a media transport stream
2669        let (_remote_transport, transport) = Channel::create();
2670        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2671
2672        // Do the same, but for the OTHER stream.
2673        let sbc_endpoint_two = 2_u8.try_into().unwrap();
2674
2675        let mut set_config_fut =
2676            remote_peer.set_configuration(&sbc_endpoint_two, &sbc_endpoint_two, &sbc_caps);
2677
2678        match exec.run_until_stalled(&mut set_config_fut) {
2679            Poll::Ready(Ok(())) => {}
2680            x => panic!("Set capabilities should be ready but got {:?}", x),
2681        };
2682
2683        let mut open_fut = remote_peer.open(&sbc_endpoint_two);
2684        match exec.run_until_stalled(&mut open_fut) {
2685            Poll::Ready(Ok(())) => {}
2686            x => panic!("Open should be ready but got {:?}", x),
2687        };
2688
2689        // Establish a media transport stream
2690        let (_remote_transport_two, transport_two) = Channel::create();
2691        assert_eq!(Some(()), peer.receive_channel(transport_two).ok());
2692
2693        // Remote peer should still be able to try to start the stream, and we will say yes, but
2694        // that last seid looks wonky.
2695        let unknown_endpoint_id: StreamEndpointId = 9_u8.try_into().unwrap();
2696        let stream_ids = [sbc_endpoint_id.clone(), unknown_endpoint_id.clone()];
2697        let mut start_fut = remote_peer.start(&stream_ids);
2698        match exec.run_until_stalled(&mut start_fut) {
2699            Poll::Ready(Err(avdtp::Error::RemoteRejected(rejection))) => {
2700                assert_eq!(avdtp::ErrorCode::BadAcpSeid, rejection.error_code().unwrap().unwrap());
2701                assert_eq!(unknown_endpoint_id, rejection.stream_id().unwrap());
2702            }
2703            x => panic!("Start should be ready but got {:?}", x),
2704        };
2705
2706        // We can't get a permit (none are available) so we suspend the one we didn't error on.
2707        let mut remote_requests = remote_peer.take_request_stream();
2708
2709        let suspended_stream_ids = match exec.run_singlethreaded(&mut remote_requests.next()) {
2710            Some(Ok(avdtp::Request::Suspend { responder, stream_ids })) => {
2711                responder.send().unwrap();
2712                stream_ids
2713            }
2714            x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2715        };
2716
2717        assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2718        assert_eq!(1, suspended_stream_ids.len());
2719
2720        // And we should have not tried to start a task.
2721        match exec.run_until_stalled(&mut next_task_fut) {
2722            Poll::Pending => {}
2723            x => panic!("Local task should not have been created at this point: {:?}", x),
2724        };
2725
2726        // No matter how many times they ask to start, we will still suspend (but not queue another
2727        // reservation for the same id)
2728        let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
2729        match exec.run_until_stalled(&mut start_fut) {
2730            Poll::Ready(Ok(())) => {}
2731            x => panic!("Start should be ready but got {:?}", x),
2732        }
2733
2734        let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2735            Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2736                responder.send().unwrap();
2737                stream_ids
2738            }
2739            x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2740        };
2741        assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2742
2743        // After a permit is available, should try to start the first endpoint that failed.
2744        drop(taken_permit);
2745
2746        match exec.run_singlethreaded(&mut remote_requests.next()) {
2747            Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2748                assert_eq!(stream_ids, &[sbc_endpoint_id.clone()]);
2749                responder.send().unwrap();
2750            }
2751            x => panic!("Expected start on permit available but got {x:?}"),
2752        };
2753
2754        // And we should start a task.
2755        let media_task = match exec.run_until_stalled(&mut next_task_fut) {
2756            Poll::Ready(Some(task)) => task,
2757            x => panic!("Local task should be created at this point: {:?}", x),
2758        };
2759
2760        assert!(media_task.is_started());
2761
2762        // If the remote asks to start another one, we still suspend it immediately.
2763        let mut start_fut = remote_peer.start(&[sbc_endpoint_two.clone()]);
2764        match exec.run_until_stalled(&mut start_fut) {
2765            Poll::Ready(Ok(())) => {}
2766            x => panic!("Start should be ready but got {:?}", x),
2767        }
2768
2769        let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2770            Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2771                responder.send().unwrap();
2772                stream_ids
2773            }
2774            x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2775        };
2776
2777        assert!(suspended_stream_ids.contains(&sbc_endpoint_two));
2778        assert_eq!(1, suspended_stream_ids.len());
2779
2780        // Once the first one is done, the second can start.
2781        let mut suspend_fut = remote_peer.suspend(&[sbc_endpoint_id.clone()]);
2782        match exec.run_until_stalled(&mut suspend_fut) {
2783            Poll::Ready(Ok(())) => {}
2784            x => panic!("Start should be ready but got {:?}", x),
2785        }
2786
2787        match exec.run_singlethreaded(&mut remote_requests.next()) {
2788            Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2789                assert_eq!(stream_ids, &[sbc_endpoint_two]);
2790                responder.send().unwrap();
2791            }
2792            x => panic!("Expected start on permit available but got {x:?}"),
2793        };
2794    }
2795
2796    fn start_sbc_stream(
2797        exec: &mut fasync::TestExecutor,
2798        media_test_builder: &mut TestMediaTaskBuilder,
2799        peer: &Peer,
2800        remote_peer: &avdtp::Peer,
2801        local_id: &StreamEndpointId,
2802        remote_id: &StreamEndpointId,
2803    ) -> TestMediaTask {
2804        let sbc_caps = sbc_capabilities();
2805        let set_config_fut = remote_peer.set_configuration(&local_id, &remote_id, &sbc_caps);
2806        let mut set_config_fut = pin!(set_config_fut);
2807
2808        match exec.run_until_stalled(&mut set_config_fut) {
2809            Poll::Ready(Ok(())) => {}
2810            x => panic!("Set capabilities should be ready but got {:?}", x),
2811        };
2812
2813        let open_fut = remote_peer.open(&local_id);
2814        let mut open_fut = pin!(open_fut);
2815        match exec.run_until_stalled(&mut open_fut) {
2816            Poll::Ready(Ok(())) => {}
2817            x => panic!("Open should be ready but got {:?}", x),
2818        };
2819
2820        // Establish a media transport stream
2821        let (_remote_transport, transport) = Channel::create();
2822        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2823
2824        // Remote peer should still be able to try to start the stream, and we will say yes.
2825        let stream_ids = [local_id.clone()];
2826        let start_fut = remote_peer.start(&stream_ids);
2827        let mut start_fut = pin!(start_fut);
2828        match exec.run_until_stalled(&mut start_fut) {
2829            Poll::Ready(Ok(())) => {}
2830            x => panic!("Start should be ready but got {:?}", x),
2831        };
2832
2833        // And we should start a media task.
2834        let media_task = media_test_builder.expect_task();
2835        assert!(media_task.is_started());
2836
2837        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
2838        media_task
2839    }
2840
2841    #[fuchsia::test]
2842    fn permits_can_be_revoked_and_reinstated_all() {
2843        let mut exec = fasync::TestExecutor::new();
2844
2845        let mut streams = Streams::default();
2846        let mut test_builder = TestMediaTaskBuilder::new();
2847        streams.insert(Stream::build(
2848            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2849            test_builder.builder(),
2850        ));
2851        let sbc_endpoint_id = 1_u8.try_into().unwrap();
2852        let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2853
2854        streams.insert(Stream::build(
2855            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2856            test_builder.builder(),
2857        ));
2858        let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2859        let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2860
2861        let permits = Permits::new(2);
2862
2863        let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2864        let remote_peer = avdtp::Peer::new(remote);
2865
2866        let one_media_task = start_sbc_stream(
2867            &mut exec,
2868            &mut test_builder,
2869            &peer,
2870            &remote_peer,
2871            &sbc_endpoint_id,
2872            &remote_sbc_endpoint_id,
2873        );
2874        let two_media_task = start_sbc_stream(
2875            &mut exec,
2876            &mut test_builder,
2877            &peer,
2878            &remote_peer,
2879            &sbc2_endpoint_id,
2880            &remote_sbc2_endpoint_id,
2881        );
2882
2883        // Someone comes along and revokes our permits.
2884        let taken_permits = permits.seize();
2885
2886        let remote_endpoints: HashSet<_> =
2887            [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2888
2889        // We should send a suspend to the other end, for both of them.
2890        let mut remote_requests = remote_peer.take_request_stream();
2891        let mut expected_suspends = remote_endpoints.clone();
2892        while !expected_suspends.is_empty() {
2893            match exec.run_until_stalled(&mut remote_requests.next()) {
2894                Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2895                    for stream_id in stream_ids {
2896                        assert!(expected_suspends.remove(&stream_id));
2897                    }
2898                    responder.send().expect("send response okay");
2899                }
2900                x => panic!("Expected suspension and got {:?}", x),
2901            }
2902        }
2903
2904        // And the media tasks should be stopped.
2905        assert!(!one_media_task.is_started());
2906        assert!(!two_media_task.is_started());
2907
2908        // After the permits are available again, we send a start, and start the media stream.
2909        drop(taken_permits);
2910
2911        let mut expected_starts = remote_endpoints.clone();
2912        while !expected_starts.is_empty() {
2913            match exec.run_singlethreaded(&mut remote_requests.next()) {
2914                Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2915                    for stream_id in stream_ids {
2916                        assert!(expected_starts.remove(&stream_id));
2917                    }
2918                    responder.send().expect("send response okay");
2919                }
2920                x => panic!("Expected start and got {:?}", x),
2921            }
2922        }
2923        // And we should start two media tasks.
2924
2925        let one_media_task = test_builder.expect_task();
2926        assert!(one_media_task.is_started());
2927        let two_media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
2928            Poll::Ready(Some(task)) => task,
2929            x => panic!("Expected another ready task but {x:?}"),
2930        };
2931        assert!(two_media_task.is_started());
2932    }
2933
2934    #[fuchsia::test]
2935    fn permits_can_be_revoked_one_at_a_time() {
2936        let mut exec = fasync::TestExecutor::new();
2937
2938        let mut streams = Streams::default();
2939        let mut test_builder = TestMediaTaskBuilder::new();
2940        streams.insert(Stream::build(
2941            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2942            test_builder.builder(),
2943        ));
2944        let sbc_endpoint_id = 1_u8.try_into().unwrap();
2945        let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2946
2947        streams.insert(Stream::build(
2948            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2949            test_builder.builder(),
2950        ));
2951        let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2952        let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2953
2954        let permits = Permits::new(2);
2955
2956        let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2957        let remote_peer = avdtp::Peer::new(remote);
2958
2959        let one_media_task = start_sbc_stream(
2960            &mut exec,
2961            &mut test_builder,
2962            &peer,
2963            &remote_peer,
2964            &sbc_endpoint_id,
2965            &remote_sbc_endpoint_id,
2966        );
2967        let two_media_task = start_sbc_stream(
2968            &mut exec,
2969            &mut test_builder,
2970            &peer,
2971            &remote_peer,
2972            &sbc2_endpoint_id,
2973            &remote_sbc2_endpoint_id,
2974        );
2975
2976        // Someone comes along and revokes one of our permits.
2977        let taken_permit = permits.take();
2978
2979        let remote_endpoints: HashSet<_> =
2980            [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2981
2982        // We should send a suspend to the other end, for both of them.
2983        let mut remote_requests = remote_peer.take_request_stream();
2984        let suspended_id = match exec.run_until_stalled(&mut remote_requests.next()) {
2985            Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2986                assert!(stream_ids.len() == 1);
2987                assert!(remote_endpoints.contains(&stream_ids[0]));
2988                responder.send().expect("send response okay");
2989                stream_ids[0].clone()
2990            }
2991            x => panic!("Expected suspension and got {:?}", x),
2992        };
2993
2994        // And the correct one of the media tasks should be stopped.
2995        if suspended_id == remote_sbc_endpoint_id {
2996            assert!(!one_media_task.is_started());
2997            assert!(two_media_task.is_started());
2998        } else {
2999            assert!(one_media_task.is_started());
3000            assert!(!two_media_task.is_started());
3001        }
3002
3003        // After the permits are available again, we send a start, and start the media stream.
3004        drop(taken_permit);
3005
3006        match exec.run_singlethreaded(&mut remote_requests.next()) {
3007            Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
3008                assert_eq!(stream_ids, &[suspended_id]);
3009                responder.send().expect("send response okay");
3010            }
3011            x => panic!("Expected start and got {:?}", x),
3012        }
3013        // And we should start another media task.
3014        let media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
3015            Poll::Ready(Some(task)) => task,
3016            x => panic!("Expected media task to start: {x:?}"),
3017        };
3018        assert!(media_task.is_started());
3019    }
3020
3021    // Scenario: when we are waiting for a suspend response from the peer after a permit was not
3022    // available, we try to start the peer (because a dwell has expired)
3023    #[fuchsia::test]
3024    fn permit_suspend_start_while_suspending() {
3025        let mut exec = fasync::TestExecutor::new();
3026
3027        let mut streams = Streams::default();
3028        let mut test_builder = TestMediaTaskBuilder::new();
3029        streams.insert(Stream::build(
3030            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
3031            test_builder.builder(),
3032        ));
3033        streams.insert(Stream::build(
3034            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
3035            test_builder.builder(),
3036        ));
3037        let mut next_task_fut = test_builder.next_task();
3038
3039        let permits = Permits::new(1);
3040        let (remote, _profile_request_stream, _, peer) =
3041            setup_test_peer(false, streams, Some(permits.clone()));
3042
3043        let remote_peer = avdtp::Peer::new(remote);
3044        let mut remote_requests = remote_peer.take_request_stream();
3045
3046        let sbc_endpoint_id = 1_u8.try_into().unwrap();
3047
3048        let sbc_caps = sbc_capabilities();
3049        let mut set_config_fut =
3050            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
3051
3052        match exec.run_until_stalled(&mut set_config_fut) {
3053            Poll::Ready(Ok(())) => {}
3054            x => panic!("Set capabilities should be ready but got {:?}", x),
3055        };
3056
3057        let mut open_fut = remote_peer.open(&sbc_endpoint_id);
3058        match exec.run_until_stalled(&mut open_fut) {
3059            Poll::Ready(Ok(())) => {}
3060            x => panic!("Open should be ready but got {:?}", x),
3061        };
3062
3063        // Establish a media transport stream
3064        let (_remote_transport, transport) = Channel::create();
3065        assert_eq!(Some(()), peer.receive_channel(transport).ok());
3066
3067        // At this point, we are dwelling, waiting for the peer to start the stream. Skip the timer.
3068        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3069        let Some(_deadline) = exec.wake_next_timer() else {
3070            panic!("Expected a timer to be waiting to run");
3071        };
3072
3073        // We will try to start it ourselves, which will take the only permit and send a start.
3074        let start_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3075            Some(Ok(avdtp::Request::Start { stream_ids, responder })) => {
3076                assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3077                responder
3078            }
3079            x => panic!("Expected a Start request, got {x:?}"),
3080        };
3081
3082        assert!(permits.get().is_none());
3083
3084        // The peer doesn't notice. Instead try to start it from the peer side (bad timing)
3085        let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
3086
3087        // We get an OK, and then immediately a suspend request because there are no
3088        // permits available.
3089        match exec.run_singlethreaded(&mut start_fut) {
3090            Ok(()) => {}
3091            x => panic!("Expected OK response from start future but got {x:?}"),
3092        }
3093
3094        let suspend_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3095            Some(Ok(avdtp::Request::Suspend { stream_ids, responder })) => {
3096                assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3097                responder
3098            }
3099            x => panic!("Expected a suspend got {x:?}"),
3100        };
3101
3102        // At this point, the peer notices the start request and responds.
3103        start_responder.send().unwrap();
3104
3105        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3106
3107        // Okay I guess..
3108        suspend_responder.send().unwrap();
3109
3110        // And we should start a task.
3111        let media_task = match exec.run_until_stalled(&mut next_task_fut) {
3112            Poll::Ready(Some(task)) => task,
3113            x => panic!("Local task should be created at this point: {:?}", x),
3114        };
3115
3116        assert!(media_task.is_started());
3117    }
3118
3119    /// Test that the version check method correctly differentiates between newer
3120    /// and older A2DP versions.
3121    #[fuchsia::test]
3122    fn version_check() {
3123        let p1: ProfileDescriptor = ProfileDescriptor {
3124            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3125            major_version: Some(1),
3126            minor_version: Some(3),
3127            ..Default::default()
3128        };
3129        assert_eq!(true, a2dp_version_check(p1));
3130
3131        let p1: ProfileDescriptor = ProfileDescriptor {
3132            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3133            major_version: Some(2),
3134            minor_version: Some(10),
3135            ..Default::default()
3136        };
3137        assert_eq!(true, a2dp_version_check(p1));
3138
3139        let p1: ProfileDescriptor = ProfileDescriptor {
3140            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3141            major_version: Some(1),
3142            minor_version: Some(0),
3143            ..Default::default()
3144        };
3145        assert_eq!(false, a2dp_version_check(p1));
3146
3147        let p1: ProfileDescriptor = ProfileDescriptor {
3148            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3149            major_version: None,
3150            minor_version: Some(9),
3151            ..Default::default()
3152        };
3153        assert_eq!(false, a2dp_version_check(p1));
3154
3155        let p1: ProfileDescriptor = ProfileDescriptor {
3156            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3157            major_version: Some(2),
3158            minor_version: Some(2),
3159            ..Default::default()
3160        };
3161        assert_eq!(true, a2dp_version_check(p1));
3162    }
3163}