bt_a2dp/peer/
mod.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::Context as _;
6use bt_avdtp::{
7    self as avdtp, MediaCodecType, ServiceCapability, ServiceCategory, StreamEndpoint,
8    StreamEndpointId,
9};
10use fidl_fuchsia_bluetooth::ChannelParameters;
11use fidl_fuchsia_bluetooth_bredr::{
12    ConnectParameters, L2capParameters, ProfileDescriptor, ProfileProxy, PSM_AVDTP,
13};
14use fuchsia_async::{self as fasync, DurationExt};
15use fuchsia_bluetooth::inspect::DebugExt;
16use fuchsia_bluetooth::types::{Channel, PeerId};
17use fuchsia_inspect as inspect;
18use fuchsia_inspect_derive::{AttachError, Inspect};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc;
21use futures::future::{BoxFuture, Either};
22use futures::stream::FuturesUnordered;
23use futures::task::{Context, Poll, Waker};
24use futures::{select, Future, FutureExt, StreamExt};
25use log::{debug, info, trace, warn};
26use std::collections::{BTreeMap, HashMap, HashSet};
27use std::pin::Pin;
28use std::sync::{Arc, Weak};
29
30/// For sending out-of-band commands over the A2DP peer.
31mod controller;
32pub use controller::ControllerPool;
33
34use crate::codec::MediaCodecConfig;
35use crate::permits::{Permit, Permits};
36use crate::stream::{Stream, Streams};
37
38/// A Peer represents an A2DP peer which may be connected to this device.
39/// Only one A2DP peer should exist for each Bluetooth peer.
40#[derive(Inspect)]
41pub struct Peer {
42    /// The id of the peer we are connected to.
43    id: PeerId,
44    /// Inner keeps track of the peer and the streams.
45    #[inspect(forward)]
46    inner: Arc<Mutex<PeerInner>>,
47    /// Profile Proxy to connect new transport channels
48    profile: ProfileProxy,
49    /// The profile descriptor for this peer, if it has been discovered.
50    descriptor: Mutex<Option<ProfileDescriptor>>,
51    /// Wakers that are to be woken when the peer disconnects.  If None, the peers have been woken
52    /// and this peer is disconnected.  Shared weakly with ClosedPeer future objects that complete
53    /// when the peer disconnects.
54    closed_wakers: Arc<Mutex<Option<Vec<Waker>>>>,
55    /// Used to report peer metrics to Cobalt.
56    metrics: bt_metrics::MetricsLogger,
57    /// A task waiting to start a stream if it hasn't been started yet.
58    start_stream_task: Mutex<Option<fasync::Task<avdtp::Result<()>>>>,
59}
60
61/// StreamPermits handles reserving and retrieving permits for streaming audio.
62/// Reservations are automatically retrieved for streams that are revoked, and when the
63/// reservation is completed, the permit is stored and a StreamPermit is sent so it can be started.
64#[derive(Clone)]
65struct StreamPermits {
66    permits: Permits,
67    open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
68    reserved_streams: Arc<Mutex<HashSet<StreamEndpointId>>>,
69    inner: Weak<Mutex<PeerInner>>,
70    peer_id: PeerId,
71    sender: mpsc::UnboundedSender<BoxFuture<'static, StreamPermit>>,
72}
73
74#[derive(Debug)]
75struct StreamPermit {
76    local_id: StreamEndpointId,
77    open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
78}
79
80impl StreamPermit {
81    fn local_id(&self) -> &StreamEndpointId {
82        &self.local_id
83    }
84
85    /// Returns true if a Permit is held for this stream endpoint.
86    fn is_held(&self) -> bool {
87        self.open_streams.lock().contains_key(&self.local_id)
88    }
89}
90
91impl Drop for StreamPermit {
92    fn drop(&mut self) {
93        let _ = self.open_streams.lock().remove(&self.local_id);
94    }
95}
96
97impl StreamPermits {
98    fn new(
99        inner: Weak<Mutex<PeerInner>>,
100        peer_id: PeerId,
101        permits: Permits,
102    ) -> (Self, mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>) {
103        let (sender, reservations_receiver) = futures::channel::mpsc::unbounded();
104        (
105            Self {
106                inner,
107                permits,
108                peer_id,
109                sender,
110                open_streams: Default::default(),
111                reserved_streams: Default::default(),
112            },
113            reservations_receiver,
114        )
115    }
116
117    fn label_for(&self, local_id: &StreamEndpointId) -> String {
118        format!("{} {}", self.peer_id, local_id)
119    }
120
121    /// Get a permit to stream on the stream with id `local_id`.
122    /// Returns Some() if there is a permit available.
123    fn get(&self, local_id: StreamEndpointId) -> Option<StreamPermit> {
124        let revoke_fn = self.make_revocation_fn(&local_id);
125        let Some(permit) = self.permits.get_revokable(revoke_fn) else {
126            info!("No permits available: {:?}", self.permits);
127            return None;
128        };
129        permit.relabel(self.label_for(&local_id));
130        if let Some(_) = self.open_streams.lock().insert(local_id.clone(), permit) {
131            warn!(id:% = self.peer_id; "Started stream {local_id:?} twice, dropping previous permit");
132        }
133        Some(StreamPermit { local_id, open_streams: self.open_streams.clone() })
134    }
135
136    /// Get a reservation that will resolve to a StreamPermit to start a stream with the id
137    /// `local_id`
138    fn setup_reservation_for(&self, local_id: StreamEndpointId) {
139        if !self.reserved_streams.lock().insert(local_id.clone()) {
140            // Already reserved.
141            return;
142        }
143        let restart_stream_available_fut = {
144            let self_revoke_fn = Self::make_revocation_fn(&self, &local_id);
145            let reservation = self.permits.reserve_revokable(self_revoke_fn);
146            let open_streams = self.open_streams.clone();
147            let reserved_streams = self.reserved_streams.clone();
148            let label = self.label_for(&local_id);
149            let local_id = local_id.clone();
150            async move {
151                let permit = reservation.await;
152                permit.relabel(label);
153                if open_streams.lock().insert(local_id.clone(), permit).is_some() {
154                    warn!("Reservation replaces acquired permit for {}", local_id.clone());
155                }
156                if !reserved_streams.lock().remove(&local_id) {
157                    warn!(local_id:%; "Unrecorded reservation resolved");
158                }
159                StreamPermit { local_id, open_streams }
160            }
161        };
162        if let Err(e) = self.sender.unbounded_send(restart_stream_available_fut.boxed()) {
163            warn!(id:% = self.peer_id, local_id:%, e:?; "Couldn't queue reservation to finish");
164        }
165    }
166
167    /// Revokes a permit that was previously delivered, suspending the local stream and signaling
168    /// the peer.
169    /// The permit must have been previously received through StreamPermits::get or a have been
170    /// restarted after being revoked, otherwise will panic.
171    fn revocation_fn(self, local_id: StreamEndpointId) -> Permit {
172        if let Ok(peer) = PeerInner::upgrade(self.inner.clone()) {
173            {
174                let mut lock = peer.lock();
175                match lock.suspend_local_stream(&local_id) {
176                    Ok(remote_id) => drop(lock.peer.suspend(&[remote_id])),
177                    Err(e) => warn!("Couldn't stop local stream {local_id:?}: {e:?}"),
178                }
179            }
180            self.setup_reservation_for(local_id.clone());
181        }
182        self.open_streams.lock().remove(&local_id).expect("permit revoked but don't have it")
183    }
184
185    fn make_revocation_fn(&self, local_id: &StreamEndpointId) -> impl FnOnce() -> Permit {
186        let local_id = local_id.clone();
187        let cloned = self.clone();
188        move || cloned.revocation_fn(local_id)
189    }
190}
191
192impl Peer {
193    /// Make a new Peer which is connected to the peer `id` using the AVDTP `peer`.
194    /// The `streams` are the local endpoints available to the peer.
195    /// `profile` will be used to initiate connections for Media Transport.
196    /// The `permits`, if provided, will acquire a permit before starting streams on this peer.
197    /// If `metrics` is included, metrics for codec availability will be reported.
198    /// This also starts a task on the executor to handle incoming events from the peer.
199    pub fn create(
200        id: PeerId,
201        peer: avdtp::Peer,
202        streams: Streams,
203        permits: Option<Permits>,
204        profile: ProfileProxy,
205        metrics: bt_metrics::MetricsLogger,
206    ) -> Self {
207        let inner = Arc::new(Mutex::new(PeerInner::new(peer, id, streams, metrics.clone())));
208        let reservations_receiver = if let Some(permits) = permits {
209            let (stream_permits, receiver) =
210                StreamPermits::new(Arc::downgrade(&inner), id, permits);
211            inner.lock().permits = Some(stream_permits);
212            receiver
213        } else {
214            let (_, receiver) = mpsc::unbounded();
215            receiver
216        };
217        let res = Self {
218            id,
219            inner,
220            profile,
221            descriptor: Mutex::new(None),
222            closed_wakers: Arc::new(Mutex::new(Some(Vec::new()))),
223            metrics,
224            start_stream_task: Mutex::new(None),
225        };
226        res.start_requests_task(reservations_receiver);
227        res
228    }
229
230    pub fn set_descriptor(&self, descriptor: ProfileDescriptor) -> Option<ProfileDescriptor> {
231        self.descriptor.lock().replace(descriptor)
232    }
233
234    /// How long to wait after a non-local establishment of a stream to start the stream.
235    /// Chosen to produce reasonably quick startup while allowing for peer start.
236    const STREAM_DWELL: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(500);
237
238    /// Receive a channel from the peer that was initiated remotely.
239    /// This function should be called whenever the peer associated with this opens an L2CAP channel.
240    /// If this completes opening a stream, streams that are suspended will be scheduled to start.
241    pub fn receive_channel(&self, channel: Channel) -> avdtp::Result<()> {
242        let mut lock = self.inner.lock();
243        if lock.receive_channel(channel)? {
244            let weak = Arc::downgrade(&self.inner);
245            let mut task_lock = self.start_stream_task.lock();
246            *task_lock = Some(fasync::Task::local(async move {
247                trace!("Dwelling to start remotely-opened stream..");
248                fasync::Timer::new(Self::STREAM_DWELL.after_now()).await;
249                PeerInner::start_opened(weak).await
250            }));
251        }
252        Ok(())
253    }
254
255    /// Return a handle to the AVDTP peer, to use as initiator of commands.
256    pub fn avdtp(&self) -> avdtp::Peer {
257        let lock = self.inner.lock();
258        lock.peer.clone()
259    }
260
261    /// Returns the stream endpoints discovered by this peer.
262    pub fn remote_endpoints(&self) -> Option<Vec<avdtp::StreamEndpoint>> {
263        self.inner.lock().remote_endpoints()
264    }
265
266    /// Perform Discovery and Collect Capabilities to enumerate the endpoints and capabilities of
267    /// the connected peer.
268    /// Returns a future which performs the work and resolves to a vector of peer stream endpoints.
269    pub fn collect_capabilities(
270        &self,
271    ) -> impl Future<Output = avdtp::Result<Vec<avdtp::StreamEndpoint>>> {
272        let avdtp = self.avdtp();
273        let get_all = self.descriptor.lock().clone().is_some_and(a2dp_version_check);
274        let inner = self.inner.clone();
275        let metrics = self.metrics.clone();
276        let peer_id = self.id;
277        async move {
278            if let Some(caps) = inner.lock().remote_endpoints() {
279                return Ok(caps);
280            }
281            trace!("Discovering peer streams..");
282            let infos = avdtp.discover().await?;
283            trace!("Discovered {} streams", infos.len());
284            let mut remote_streams = Vec::new();
285            for info in infos {
286                let capabilities = if get_all {
287                    avdtp.get_all_capabilities(info.id()).await
288                } else {
289                    avdtp.get_capabilities(info.id()).await
290                };
291                match capabilities {
292                    Ok(capabilities) => {
293                        trace!("Stream {:?}", info);
294                        for cap in &capabilities {
295                            trace!("  - {:?}", cap);
296                        }
297                        remote_streams.push(avdtp::StreamEndpoint::from_info(&info, capabilities));
298                    }
299                    Err(e) => {
300                        info!(peer_id:%; "Stream {} capabilities failed: {:?}, skipping", info.id(), e);
301                    }
302                };
303            }
304            inner.lock().set_remote_endpoints(&remote_streams);
305            Self::record_cobalt_metrics(metrics, &remote_streams);
306            Ok(remote_streams)
307        }
308    }
309
310    fn record_cobalt_metrics(metrics: bt_metrics::MetricsLogger, endpoints: &[StreamEndpoint]) {
311        let codec_metrics: HashSet<_> = endpoints
312            .iter()
313            .filter_map(|endpoint| {
314                endpoint.codec_type().map(|t| codectype_to_availability_metric(t) as u32)
315            })
316            .collect();
317        metrics
318            .log_occurrences(bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID, codec_metrics);
319
320        let cap_metrics: HashSet<_> = endpoints
321            .iter()
322            .flat_map(|endpoint| {
323                endpoint
324                    .capabilities()
325                    .iter()
326                    .filter_map(|t| capability_to_metric(t))
327                    .chain(std::iter::once(
328                        bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic,
329                    ))
330                    .map(|t| t as u32)
331            })
332            .collect();
333        metrics.log_occurrences(bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID, cap_metrics);
334    }
335
336    fn transport_channel_params() -> L2capParameters {
337        L2capParameters {
338            psm: Some(PSM_AVDTP),
339            parameters: Some(ChannelParameters {
340                max_rx_packet_size: Some(65535),
341                ..Default::default()
342            }),
343            ..Default::default()
344        }
345    }
346
347    /// Open and start a media transport stream, connecting a compatible local stream to the remote
348    /// stream `remote_id`, configuring it with the `capabilities` provided.
349    /// Returns a future which should be awaited on.
350    /// The future returns Ok(()) if successfully started, and an appropriate error otherwise.
351    pub fn stream_start(
352        &self,
353        remote_id: StreamEndpointId,
354        capabilities: Vec<ServiceCapability>,
355    ) -> impl Future<Output = avdtp::Result<()>> {
356        let peer = Arc::downgrade(&self.inner);
357        let peer_id = self.id.clone();
358        let avdtp = self.avdtp();
359        let profile = self.profile.clone();
360
361        async move {
362            let codec_params =
363                capabilities.iter().find(|x| x.is_codec()).ok_or(avdtp::Error::InvalidState)?;
364            let (local_id, local_capabilities) = {
365                let peer = PeerInner::upgrade(peer.clone())?;
366                let lock = peer.lock();
367                lock.find_compatible_local_capabilities(codec_params, &remote_id)?
368            };
369
370            let local_by_cat: HashMap<ServiceCategory, ServiceCapability> =
371                local_capabilities.into_iter().map(|i| (i.category(), i)).collect();
372
373            // Filter things out if they don't have a match in the local capabilities.
374            // Order them by the ServiceCategory ordinal - some noncompliant devices care about it.
375            let shared_capabilities: BTreeMap<ServiceCategory, ServiceCapability> = capabilities
376                .into_iter()
377                .filter_map(|cap| {
378                    let Some(local_cap) = local_by_cat.get(&cap.category()) else {
379                        return None;
380                    };
381                    if cap.category() == ServiceCategory::MediaCodec {
382                        let Ok(a) = MediaCodecConfig::try_from(&cap) else {
383                            return None;
384                        };
385                        let Ok(b) = MediaCodecConfig::try_from(local_cap) else {
386                            return None;
387                        };
388                        let Some(negotiated) = MediaCodecConfig::negotiate(&a, &b) else {
389                            return None;
390                        };
391                        Some((cap.category(), (&negotiated).into()))
392                    } else {
393                        Some((cap.category(), cap))
394                    }
395                })
396                .collect();
397            let shared_capabilities: Vec<_> = shared_capabilities.into_values().collect();
398
399            trace!("Starting stream {local_id} to remote {remote_id} with {shared_capabilities:?}");
400
401            avdtp.set_configuration(&remote_id, &local_id, &shared_capabilities).await?;
402            {
403                let strong = PeerInner::upgrade(peer.clone())?;
404                strong.lock().set_opening(&local_id, &remote_id, shared_capabilities)?;
405            }
406            avdtp.open(&remote_id).await?;
407
408            debug!(peer_id:%; "Connecting transport channel");
409            let channel = profile
410                .connect(
411                    &peer_id.into(),
412                    &ConnectParameters::L2cap(Self::transport_channel_params()),
413                )
414                .await
415                .context("FIDL error: {}")?
416                .or(Err(avdtp::Error::PeerDisconnected))?;
417            trace!(peer_id:%; "Connected transport channel, converting to local Channel");
418            let channel = match channel.try_into() {
419                Err(e) => {
420                    warn!(peer_id:%, e:?; "Couldn't connect media transport: no channel");
421                    return Err(avdtp::Error::PeerDisconnected);
422                }
423                Ok(c) => c,
424            };
425
426            trace!(peer_id:%; "Connected transport channel, passing to Peer..");
427
428            {
429                let strong = PeerInner::upgrade(peer.clone())?;
430                let _ = strong.lock().receive_channel(channel)?;
431            }
432            // Start streams immediately if the channel is locally initiated.
433            PeerInner::start_opened(peer).await
434        }
435    }
436
437    /// Query whether any streams are currently started or scheduled to start.
438    pub fn streaming_active(&self) -> bool {
439        self.inner.lock().is_streaming() || self.will_start_streaming()
440    }
441
442    /// Returns true if there are any streams that are currently started.
443    #[cfg(test)]
444    fn is_streaming_now(&self) -> bool {
445        self.inner.lock().is_streaming_now()
446    }
447
448    /// Polls the task scheduled to start streaming, returning true if the task is still scheduled
449    /// to start streaming.
450    fn will_start_streaming(&self) -> bool {
451        let mut task_lock = self.start_stream_task.lock();
452        if task_lock.is_none() {
453            return false;
454        }
455        // This is the only thing that can poll the start task, so it is okay to ignore the wakeup.
456        let mut cx = Context::from_waker(futures::task::noop_waker_ref());
457        if let Poll::Pending = task_lock.as_mut().unwrap().poll_unpin(&mut cx) {
458            return true;
459        }
460        // Reset the task to None so that we don't try to re-poll it.
461        let _ = task_lock.take();
462        false
463    }
464
465    /// Suspend a media transport stream `local_id`.
466    /// It's possible that the stream is not active - a suspend will be attempted, but an
467    /// error from the command will be returned.
468    /// Returns the result of the suspend command.
469    pub fn stream_suspend(
470        &self,
471        local_id: StreamEndpointId,
472    ) -> impl Future<Output = avdtp::Result<()>> {
473        let peer = Arc::downgrade(&self.inner);
474        PeerInner::suspend(peer, local_id)
475    }
476
477    /// Start an asynchronous task to handle any requests from the AVDTP peer.
478    /// This task completes when the remote end closes the signaling connection.
479    fn start_requests_task(
480        &self,
481        mut reservations_receiver: mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>,
482    ) {
483        let lock = self.inner.lock();
484        let mut request_stream = lock.peer.take_request_stream().fuse();
485        let id = self.id.clone();
486        let peer = Arc::downgrade(&self.inner);
487        let mut stream_reservations = FuturesUnordered::new();
488        let disconnect_wakers = Arc::downgrade(&self.closed_wakers);
489        fuchsia_async::Task::local(async move {
490            loop {
491                select! {
492                    request = request_stream.next() => {
493                        match request {
494                            None => break,
495                            Some(Err(e)) => info!(peer_id:% = id, e:?; "Request stream error"),
496                            Some(Ok(request)) => match peer.upgrade() {
497                                None => return,
498                                Some(p) => {
499                                    let result_or_future = p.lock().handle_request(request);
500                                    let result = match result_or_future {
501                                        Either::Left(result) => result,
502                                        Either::Right(future) => future.await,
503                                    };
504                                    if let Err(e) = result {
505                                        warn!(peer_id:% = id, e:?; "Error handling request");
506                                    }
507                                }
508                            },
509                        }
510                    },
511                    reservation_fut = reservations_receiver.select_next_some() => {
512                        stream_reservations.push(reservation_fut)
513                    },
514                    permit = stream_reservations.select_next_some() => {
515                        if let Err(e) = PeerInner::start_permit(peer.clone(), permit).await {
516                            warn!(peer_id:% = id, e:?; "Couldn't start stream after unpause");
517                        }
518                    }
519                    complete => break,
520                }
521            }
522            info!(peer_id:% = id; "disconnected");
523            if let Some(wakers) = disconnect_wakers.upgrade() {
524                for waker in wakers.lock().take().unwrap_or_else(Vec::new) {
525                    waker.wake();
526                }
527            }
528        })
529        .detach();
530    }
531
532    /// Returns a future that will complete when the peer disconnects.
533    pub fn closed(&self) -> ClosedPeer {
534        ClosedPeer { inner: Arc::downgrade(&self.closed_wakers) }
535    }
536}
537
538/// Future which completes when the A2DP peer has closed the control conection.
539/// See `Peer::closed`
540#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct ClosedPeer {
542    inner: Weak<Mutex<Option<Vec<Waker>>>>,
543}
544
545impl Future for ClosedPeer {
546    type Output = ();
547
548    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
549        match self.inner.upgrade() {
550            None => Poll::Ready(()),
551            Some(inner) => match inner.lock().as_mut() {
552                None => Poll::Ready(()),
553                Some(wakers) => {
554                    wakers.push(cx.waker().clone());
555                    Poll::Pending
556                }
557            },
558        }
559    }
560}
561
562/// Determines if Peer profile version is newer (>= 1.3) or older (< 1.3)
563fn a2dp_version_check(profile: ProfileDescriptor) -> bool {
564    let (Some(major), Some(minor)) = (profile.major_version, profile.minor_version) else {
565        return false;
566    };
567    (major == 1 && minor >= 3) || major > 1
568}
569
570/// Peer handles the communication with the AVDTP layer, and provides responses as appropriate
571/// based on the current state of local streams available.
572/// Each peer has its own set of local stream endpoints, and tracks a set of remote peer endpoints.
573struct PeerInner {
574    /// AVDTP peer communicating to this.
575    peer: avdtp::Peer,
576    /// The PeerId that this peer is representing
577    peer_id: PeerId,
578    /// Some(local_id) if an endpoint has been configured but hasn't finished opening.
579    /// Per AVDTP Sec 6.11 only up to one stream can be in this state.
580    opening: Option<StreamEndpointId>,
581    /// The local stream endpoint collection
582    local: Streams,
583    /// The permits that are available for this peer.
584    permits: Option<StreamPermits>,
585    /// Tasks watching for the end of a started stream. Key is the local stream id.
586    started: HashMap<StreamEndpointId, WatchedStream>,
587    /// The inspect node for this peer
588    inspect: fuchsia_inspect::Node,
589    /// The set of discovered remote endpoints. None until set.
590    remote_endpoints: Option<Vec<StreamEndpoint>>,
591    /// The inspect node representing the remote endpoints.
592    remote_inspect: fuchsia_inspect::Node,
593    /// Cobalt logger used to report peer metrics.
594    metrics: bt_metrics::MetricsLogger,
595}
596
597impl Inspect for &mut PeerInner {
598    // Set up the StreamEndpoint to update the state
599    // The MediaTask node will be created when the media task is started.
600    fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
601        self.inspect = parent.create_child(name.as_ref());
602        self.inspect.record_string("id", self.peer_id.to_string());
603        self.local.iattach(&self.inspect, "local_streams")
604    }
605}
606
607impl PeerInner {
608    pub fn new(
609        peer: avdtp::Peer,
610        peer_id: PeerId,
611        local: Streams,
612        metrics: bt_metrics::MetricsLogger,
613    ) -> Self {
614        Self {
615            peer,
616            peer_id,
617            opening: None,
618            local,
619            permits: None,
620            started: HashMap::new(),
621            inspect: Default::default(),
622            remote_endpoints: None,
623            remote_inspect: Default::default(),
624            metrics,
625        }
626    }
627
628    /// Returns an endpoint from the local set or a BadAcpSeid error if it doesn't exist.
629    fn get_mut(&mut self, local_id: &StreamEndpointId) -> Result<&mut Stream, avdtp::ErrorCode> {
630        self.local.get_mut(&local_id).ok_or(avdtp::ErrorCode::BadAcpSeid)
631    }
632
633    fn set_remote_endpoints(&mut self, endpoints: &[StreamEndpoint]) {
634        self.remote_inspect = self.inspect.create_child("remote_endpoints");
635        for endpoint in endpoints {
636            self.remote_inspect.record_child(inspect::unique_name("remote_"), |node| {
637                node.record_string("endpoint_id", endpoint.local_id().debug());
638                node.record_string("capabilities", endpoint.capabilities().debug());
639                node.record_string("type", endpoint.endpoint_type().debug());
640            });
641        }
642        self.remote_endpoints = Some(endpoints.iter().map(StreamEndpoint::as_new).collect());
643    }
644
645    /// If the remote endpoints have been set, returns a copy of the endpoints.
646    fn remote_endpoints(&self) -> Option<Vec<StreamEndpoint>> {
647        self.remote_endpoints.as_ref().map(|v| v.iter().map(StreamEndpoint::as_new).collect())
648    }
649
650    /// If the remote endpoint with endpoint `id` exists, return a copy of the endpoint.
651    fn remote_endpoint(&self, id: &StreamEndpointId) -> Option<StreamEndpoint> {
652        self.remote_endpoints
653            .as_ref()
654            .and_then(|v| v.iter().find(|v| v.local_id() == id).map(StreamEndpoint::as_new))
655    }
656
657    /// Returns true if there is at least one stream that has started or is starting for this peer.
658    fn is_streaming(&self) -> bool {
659        self.is_streaming_now() || self.opening.is_some()
660    }
661
662    /// Returns true if there is at least one stream in the started state for this peer.
663    fn is_streaming_now(&self) -> bool {
664        self.local.streaming().next().is_some()
665    }
666
667    fn set_opening(
668        &mut self,
669        local_id: &StreamEndpointId,
670        remote_id: &StreamEndpointId,
671        capabilities: Vec<ServiceCapability>,
672    ) -> avdtp::Result<()> {
673        if self.opening.is_some() {
674            return Err(avdtp::Error::InvalidState);
675        }
676        let peer_id = self.peer_id;
677        let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
678        stream
679            .configure(&peer_id, &remote_id, capabilities)
680            .map_err(|(cat, c)| avdtp::Error::RequestInvalidExtra(c, (&cat).into()))?;
681        stream.endpoint_mut().establish().or(Err(avdtp::Error::InvalidState))?;
682        self.opening = Some(local_id.clone());
683        Ok(())
684    }
685
686    fn upgrade(weak: Weak<Mutex<Self>>) -> avdtp::Result<Arc<Mutex<Self>>> {
687        weak.upgrade().ok_or(avdtp::Error::PeerDisconnected)
688    }
689
690    /// Start the stream that is opening, completing the opened procedure.
691    async fn start_opened(weak: Weak<Mutex<Self>>) -> avdtp::Result<()> {
692        let (avdtp, stream_pairs) = {
693            let peer = Self::upgrade(weak.clone())?;
694            let peer = peer.lock();
695            let stream_pairs: Vec<(StreamEndpointId, StreamEndpointId)> = peer
696                .local
697                .open()
698                .filter_map(|stream| {
699                    let endpoint = stream.endpoint();
700                    endpoint.remote_id().map(|id| (endpoint.local_id().clone(), id.clone()))
701                })
702                .collect();
703            (peer.peer.clone(), stream_pairs)
704        };
705        for (local_id, remote_id) in stream_pairs {
706            let permit_result =
707                Self::upgrade(weak.clone())?.lock().get_permit_or_reserve(&local_id);
708            if let Ok(permit) = permit_result {
709                Self::initiated_start(avdtp.clone(), weak.clone(), permit, &local_id, &remote_id)
710                    .await?;
711            }
712        }
713        Ok(())
714    }
715
716    async fn start_permit(weak: Weak<Mutex<Self>>, permit: StreamPermit) -> avdtp::Result<()> {
717        let local_id = permit.local_id().clone();
718        let (avdtp, remote_id) = {
719            let peer = Self::upgrade(weak.clone())?;
720            let mut peer = peer.lock();
721            let remote_id = peer
722                .get_mut(&local_id)
723                .map_err(|e| avdtp::Error::RequestInvalid(e))?
724                .endpoint()
725                .remote_id()
726                .ok_or(avdtp::Error::InvalidState)?
727                .clone();
728            (peer.peer.clone(), remote_id)
729        };
730        Self::initiated_start(avdtp, weak, Some(permit), &local_id, &remote_id).await
731    }
732
733    /// Start a stream for a local reason.  Requires a Permit to start streaming for the local stream.
734    async fn initiated_start(
735        avdtp: avdtp::Peer,
736        weak: Weak<Mutex<Self>>,
737        permit: Option<StreamPermit>,
738        local_id: &StreamEndpointId,
739        remote_id: &StreamEndpointId,
740    ) -> avdtp::Result<()> {
741        trace!(permit:?, local_id:?, remote_id:?; "Making outgoing start request");
742        let to_start = &[remote_id.clone()];
743        avdtp.start(to_start).await?;
744        trace!("Start response received: {permit:?}");
745        let peer = Self::upgrade(weak.clone())?;
746        let (peer_id, start_result) = {
747            let mut peer = peer.lock();
748            (peer.peer_id, peer.start_local_stream(permit, &local_id))
749        };
750        if let Err(e) = start_result {
751            warn!(peer_id:%, local_id:%, remote_id:%, e:?; "Failed to start local stream, suspending");
752            avdtp.suspend(to_start).await?;
753        }
754        Ok(())
755    }
756
757    /// Suspend a stream locally, returning a future to get the result from the peer.
758    fn suspend(
759        weak: Weak<Mutex<Self>>,
760        local_id: StreamEndpointId,
761    ) -> impl Future<Output = avdtp::Result<()>> {
762        let res = (move || {
763            let peer = Self::upgrade(weak.clone())?;
764            let mut peer = peer.lock();
765            Ok((peer.peer.clone(), peer.suspend_local_stream(&local_id)?))
766        })();
767        let (avdtp, remote_id) = match res {
768            Err(e) => return futures::future::err(e).left_future(),
769            Ok(r) => r,
770        };
771        let to_suspend = &[remote_id];
772        avdtp.suspend(to_suspend).right_future()
773    }
774
775    /// Finds a stream in the local stream set which is compatible with the remote_id given the codec config.
776    /// Returns the local stream ID and capabilities if found, or OutOfRange if one could not be found.
777    pub fn find_compatible_local_capabilities(
778        &self,
779        codec_params: &ServiceCapability,
780        remote_id: &StreamEndpointId,
781    ) -> avdtp::Result<(StreamEndpointId, Vec<ServiceCapability>)> {
782        let config = codec_params.try_into()?;
783        let our_direction = self.remote_endpoint(remote_id).map(|e| e.endpoint_type().opposite());
784        debug!(codec_params:?, local:? = self.local; "Looking for compatible local stream");
785        self.local
786            .compatible(config)
787            .find_map(|s| {
788                let endpoint = s.endpoint();
789                if let Some(d) = our_direction {
790                    if &d != endpoint.endpoint_type() {
791                        return None;
792                    }
793                }
794                Some((endpoint.local_id().clone(), endpoint.capabilities().clone()))
795            })
796            .ok_or(avdtp::Error::OutOfRange)
797    }
798
799    /// Attempts to acquire a permit for streaming, if the permits are set.
800    /// Returns Ok if is is okay to stream, and Err if the permit was not available and a
801    /// reservation was made.
802    fn get_permit_or_reserve(
803        &self,
804        local_id: &StreamEndpointId,
805    ) -> Result<Option<StreamPermit>, ()> {
806        let Some(permits) = self.permits.as_ref() else {
807            return Ok(None);
808        };
809        if let Some(permit) = permits.get(local_id.clone()) {
810            return Ok(Some(permit));
811        }
812        info!(peer_id:% = self.peer_id, local_id:%; "No permit to start stream, adding a reservation");
813        permits.setup_reservation_for(local_id.clone());
814        Err(())
815    }
816
817    /// Starts the stream which is in the local Streams with `local_id`.
818    /// Requires a permit to stream.
819    fn start_local_stream(
820        &mut self,
821        permit: Option<StreamPermit>,
822        local_id: &StreamEndpointId,
823    ) -> avdtp::Result<()> {
824        let peer_id = self.peer_id;
825        let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
826        // The streaming permit can be revoked while stream setup is in progress. If so, return
827        // without starting the local stream.
828        if permit.as_ref().is_some_and(|p| !p.is_held()) {
829            return Err(avdtp::Error::Other(anyhow::format_err!(
830                "streaming permit revoked during setup"
831            )));
832        }
833
834        info!(peer_id:%, stream:?; "Starting");
835        let stream_finished = stream.start().map_err(|c| avdtp::Error::RequestInvalid(c))?;
836        // TODO(https://fxbug.dev/42147239): if streaming stops unexpectedly, send a suspend to match to peer
837        let watched_stream = WatchedStream::new(permit, stream_finished);
838        if self.started.insert(local_id.clone(), watched_stream).is_some() {
839            warn!(peer_id:%, local_id:%; "Stream that was already started");
840        }
841        Ok(())
842    }
843
844    /// Suspend a stream on the local side. Returns the remote StreamEndpointId if the stream was suspended,
845    /// or a RequestInvalid error eith the error code otherwise.
846    fn suspend_local_stream(
847        &mut self,
848        local_id: &StreamEndpointId,
849    ) -> avdtp::Result<StreamEndpointId> {
850        let peer_id = self.peer_id;
851        let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
852        let remote_id = stream.endpoint().remote_id().ok_or(avdtp::Error::InvalidState)?.clone();
853        info!(peer_id:%; "Suspend stream local {local_id} <-> {remote_id} remote");
854        stream.suspend().map_err(|c| avdtp::Error::RequestInvalid(c))?;
855        let _ = self.started.remove(local_id);
856        Ok(remote_id)
857    }
858
859    /// Provide a new established L2CAP channel to this remote peer.
860    /// This function should be called whenever the remote associated with this peer opens an
861    /// L2CAP channel after the first.
862    /// Returns true if this channel completed the opening sequence.
863    fn receive_channel(&mut self, channel: Channel) -> avdtp::Result<bool> {
864        let stream_id = self.opening.as_ref().cloned().ok_or(avdtp::Error::InvalidState)?;
865        let stream = self.get_mut(&stream_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
866        let done = !stream.endpoint_mut().receive_channel(channel)?;
867        if done {
868            self.opening = None;
869        }
870        info!(peer_id:% = self.peer_id, stream_id:%; "Transport connected");
871        Ok(done)
872    }
873
874    /// Handle a single request event from the avdtp peer.
875    fn handle_request(
876        &mut self,
877        request: avdtp::Request,
878    ) -> Either<avdtp::Result<()>, impl Future<Output = avdtp::Result<()>>> {
879        use avdtp::ErrorCode;
880        use avdtp::Request::*;
881        trace!("Handling {request:?} from peer..");
882        let immediate_result = 'result: {
883            match request {
884                Discover { responder } => responder.send(&self.local.information()),
885                GetCapabilities { responder, stream_id }
886                | GetAllCapabilities { responder, stream_id } => match self.local.get(&stream_id) {
887                    None => responder.reject(ErrorCode::BadAcpSeid),
888                    Some(stream) => responder.send(stream.endpoint().capabilities()),
889                },
890                Open { responder, stream_id } => {
891                    if self.opening.is_none() {
892                        break 'result responder.reject(ErrorCode::BadState);
893                    }
894                    let Ok(stream) = self.get_mut(&stream_id) else {
895                        break 'result responder.reject(ErrorCode::BadAcpSeid);
896                    };
897                    match stream.endpoint_mut().establish() {
898                        Ok(()) => responder.send(),
899                        Err(_) => responder.reject(ErrorCode::BadState),
900                    }
901                }
902                Close { responder, stream_id } => {
903                    let peer = self.peer.clone();
904                    let Ok(stream) = self.get_mut(&stream_id) else {
905                        break 'result responder.reject(ErrorCode::BadAcpSeid);
906                    };
907                    stream.release(responder, &peer)
908                }
909                SetConfiguration { responder, local_stream_id, remote_stream_id, capabilities } => {
910                    if self.opening.is_some() {
911                        break 'result responder.reject(ServiceCategory::None, ErrorCode::BadState);
912                    }
913                    let peer_id = self.peer_id;
914                    let Ok(stream) = self.get_mut(&local_stream_id) else {
915                        break 'result responder
916                            .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
917                    };
918                    match stream.configure(&peer_id, &remote_stream_id, capabilities) {
919                        Ok(_) => {
920                            self.opening = Some(local_stream_id);
921                            responder.send()
922                        }
923                        Err((category, code)) => responder.reject(category, code),
924                    }
925                }
926                GetConfiguration { stream_id, responder } => {
927                    let Ok(stream) = self.get_mut(&stream_id) else {
928                        break 'result responder.reject(ErrorCode::BadAcpSeid);
929                    };
930                    let Some(vec_capabilities) = stream.endpoint().get_configuration() else {
931                        break 'result responder.reject(ErrorCode::BadState);
932                    };
933                    responder.send(vec_capabilities.as_slice())
934                }
935                Reconfigure { responder, local_stream_id, capabilities } => {
936                    let Ok(stream) = self.get_mut(&local_stream_id) else {
937                        break 'result responder
938                            .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
939                    };
940                    match stream.reconfigure(capabilities) {
941                        Ok(_) => responder.send(),
942                        Err((cat, code)) => responder.reject(cat, code),
943                    }
944                }
945                Start { responder, stream_ids } => {
946                    let mut immediate_suspend = Vec::new();
947                    // Fail on the first failed endpoint, as per the AVDTP spec 8.13 Note 5
948                    let result = stream_ids.into_iter().try_for_each(|seid| {
949                        let Some(stream) = self.local.get_mut(&seid) else {
950                            return Err((seid, ErrorCode::BadAcpSeid));
951                        };
952                        let remote_id = stream.endpoint().remote_id().cloned();
953                        let Some(remote_id) = remote_id else {
954                            return Err((seid, ErrorCode::BadState));
955                        };
956                        let Ok(permit) = self.get_permit_or_reserve(&seid) else {
957                            // Happens when we cannot start because of permits.
958                            // Accept this one, then queue up for suspend.
959                            // We are already reserved for a permit.
960                            immediate_suspend.push(remote_id);
961                            return Ok(());
962                        };
963                        match self.start_local_stream(permit, &seid) {
964                            Ok(()) => Ok(()),
965                            Err(avdtp::Error::RequestInvalid(code)) => Err((seid, code)),
966                            Err(_) => Err((seid, ErrorCode::BadState)),
967                        }
968                    });
969                    let response_result = match result {
970                        Ok(()) => responder.send(),
971                        Err((seid, code)) => responder.reject(&seid, code),
972                    };
973                    {
974                        let peer = self.peer.clone();
975                        return Either::Right(async move {
976                            if !immediate_suspend.is_empty() {
977                                peer.suspend(immediate_suspend.as_slice()).await?;
978                            }
979                            response_result
980                        });
981                    }
982                }
983                Suspend { responder, stream_ids } => {
984                    for seid in stream_ids {
985                        match self.suspend_local_stream(&seid) {
986                            Ok(_remote_id) => {}
987                            Err(avdtp::Error::RequestInvalid(code)) => {
988                                break 'result responder.reject(&seid, code)
989                            }
990                            Err(_e) => break 'result responder.reject(&seid, ErrorCode::BadState),
991                        }
992                    }
993                    responder.send()
994                }
995                Abort { responder, stream_id } => {
996                    let Ok(stream) = self.get_mut(&stream_id) else {
997                        // No response is sent on an invalid ID for an Abort
998                        break 'result Ok(());
999                    };
1000                    stream.abort();
1001                    self.opening = self.opening.take().filter(|local_id| local_id != &stream_id);
1002                    responder.send()
1003                }
1004                DelayReport { responder, delay, stream_id } => {
1005                    // Delay is in 1/10 ms
1006                    let delay_ns = delay as u64 * 100000;
1007                    // Record delay to cobalt.
1008                    self.metrics.log_integer(
1009                        bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID,
1010                        delay_ns.try_into().unwrap_or(-1),
1011                        vec![],
1012                    );
1013                    // Report should only come after a stream is configured
1014                    let Some(stream) = self.local.get_mut(&stream_id) else {
1015                        break 'result responder.reject(avdtp::ErrorCode::BadAcpSeid);
1016                    };
1017                    let delay_str = format!("delay {}.{} ms", delay / 10, delay % 10);
1018                    let peer = self.peer_id;
1019                    match stream.set_delay(std::time::Duration::from_nanos(delay_ns)) {
1020                        Ok(()) => info!(peer:%, stream_id:%; "reported {delay_str}"),
1021                        Err(avdtp::ErrorCode::BadState) => {
1022                            info!(peer:%, stream_id:%; "bad state {delay_str}");
1023                            break 'result responder.reject(avdtp::ErrorCode::BadState);
1024                        }
1025                        Err(e) => info!(peer:%, stream_id:%, e:?; "failed {delay_str}"),
1026                    };
1027                    // Can't really respond with an Error
1028                    responder.send()
1029                }
1030            }
1031        };
1032        Either::Left(immediate_result)
1033    }
1034}
1035
1036/// A WatchedStream holds a task tracking a started stream and ensures actions are performed when
1037/// the stream media task finishes.
1038struct WatchedStream {
1039    _permit_task: fasync::Task<()>,
1040}
1041
1042impl WatchedStream {
1043    fn new(
1044        permit: Option<StreamPermit>,
1045        finish_fut: BoxFuture<'static, Result<(), anyhow::Error>>,
1046    ) -> Self {
1047        let permit_task = fasync::Task::spawn(async move {
1048            let _ = finish_fut.await;
1049            drop(permit);
1050        });
1051        Self { _permit_task: permit_task }
1052    }
1053}
1054
1055fn codectype_to_availability_metric(
1056    codec_type: &MediaCodecType,
1057) -> bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec {
1058    match codec_type {
1059        &MediaCodecType::AUDIO_SBC => {
1060            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc
1061        }
1062        &MediaCodecType::AUDIO_MPEG12 => {
1063            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Mpeg12
1064        }
1065        &MediaCodecType::AUDIO_AAC => {
1066            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Aac
1067        }
1068        &MediaCodecType::AUDIO_ATRAC => {
1069            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac
1070        }
1071        &MediaCodecType::AUDIO_NON_A2DP => {
1072            bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::VendorSpecific
1073        }
1074        _ => bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown,
1075    }
1076}
1077
1078fn capability_to_metric(
1079    cap: &ServiceCapability,
1080) -> Option<bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability> {
1081    match cap {
1082        ServiceCapability::DelayReporting => {
1083            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport)
1084        }
1085        ServiceCapability::Reporting => {
1086            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Reporting)
1087        }
1088        ServiceCapability::Recovery { .. } => {
1089            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Recovery)
1090        }
1091        ServiceCapability::ContentProtection { .. } => {
1092            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::ContentProtection)
1093        }
1094        ServiceCapability::HeaderCompression { .. } => {
1095            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::HeaderCompression)
1096        }
1097        ServiceCapability::Multiplexing { .. } => {
1098            Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Multiplexing)
1099        }
1100        // We ignore capabilities that we don't care to track.
1101        other => {
1102            trace!("untracked remote peer capability: {:?}", other);
1103            None
1104        }
1105    }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110    use super::*;
1111
1112    use async_utils::PollExt;
1113    use bt_metrics::respond_to_metrics_req_for_test;
1114    use fidl::endpoints::create_proxy_and_stream;
1115    use fidl_fuchsia_bluetooth::ErrorCode;
1116    use fidl_fuchsia_bluetooth_bredr::{
1117        ProfileMarker, ProfileRequest, ProfileRequestStream, ServiceClassProfileIdentifier,
1118    };
1119    use fidl_fuchsia_metrics::{MetricEvent, MetricEventPayload};
1120    use futures::future::Either;
1121    use std::pin::pin;
1122
1123    use crate::media_task::tests::{TestMediaTask, TestMediaTaskBuilder};
1124    use crate::media_types::*;
1125    use crate::stream::tests::{make_sbc_endpoint, sbc_mediacodec_capability};
1126
1127    fn fake_metrics(
1128    ) -> (bt_metrics::MetricsLogger, fidl_fuchsia_metrics::MetricEventLoggerRequestStream) {
1129        let (c, s) = fidl::endpoints::create_proxy_and_stream::<
1130            fidl_fuchsia_metrics::MetricEventLoggerMarker,
1131        >();
1132        (bt_metrics::MetricsLogger::from_proxy(c), s)
1133    }
1134
1135    fn setup_avdtp_peer() -> (avdtp::Peer, Channel) {
1136        let (remote, signaling) = Channel::create();
1137        let peer = avdtp::Peer::new(signaling);
1138        (peer, remote)
1139    }
1140
1141    fn build_test_streams() -> Streams {
1142        let mut streams = Streams::default();
1143        let source = Stream::build(
1144            make_sbc_endpoint(1, avdtp::EndpointType::Source),
1145            TestMediaTaskBuilder::new_delayable().builder(),
1146        );
1147        streams.insert(source);
1148        let sink = Stream::build(
1149            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
1150            TestMediaTaskBuilder::new().builder(),
1151        );
1152        streams.insert(sink);
1153        streams
1154    }
1155
1156    fn build_test_streams_delayable() -> Streams {
1157        fn with_delay(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
1158            StreamEndpoint::new(
1159                seid,
1160                avdtp::MediaType::Audio,
1161                direction,
1162                vec![
1163                    avdtp::ServiceCapability::MediaTransport,
1164                    avdtp::ServiceCapability::DelayReporting,
1165                    sbc_mediacodec_capability(),
1166                ],
1167            )
1168            .expect("endpoint creation should succeed")
1169        }
1170        let mut streams = Streams::default();
1171        let source = Stream::build(
1172            with_delay(1, avdtp::EndpointType::Source),
1173            TestMediaTaskBuilder::new_delayable().builder(),
1174        );
1175        streams.insert(source);
1176        let sink = Stream::build(
1177            with_delay(2, avdtp::EndpointType::Sink),
1178            TestMediaTaskBuilder::new().builder(),
1179        );
1180        streams.insert(sink);
1181        streams
1182    }
1183
1184    pub(crate) fn recv_remote(remote: &Channel) -> Result<Vec<u8>, zx::Status> {
1185        remote.read_packet()
1186    }
1187
1188    /// Creates a Peer object, returning a channel connected ot the remote end, a
1189    /// ProfileRequestStream connected to the profile_proxy, and the Peer object.
1190    fn setup_test_peer(
1191        use_cobalt: bool,
1192        streams: Streams,
1193        permits: Option<Permits>,
1194    ) -> (
1195        Channel,
1196        ProfileRequestStream,
1197        Option<fidl_fuchsia_metrics::MetricEventLoggerRequestStream>,
1198        Peer,
1199    ) {
1200        let (avdtp, remote) = setup_avdtp_peer();
1201        let (metrics_logger, cobalt_receiver) = if use_cobalt {
1202            let (l, r) = fake_metrics();
1203            (l, Some(r))
1204        } else {
1205            (bt_metrics::MetricsLogger::default(), None)
1206        };
1207        let (profile_proxy, requests) = create_proxy_and_stream::<ProfileMarker>();
1208        let peer = Peer::create(PeerId(1), avdtp, streams, permits, profile_proxy, metrics_logger);
1209
1210        (remote, requests, cobalt_receiver, peer)
1211    }
1212
1213    fn expect_get_capabilities_and_respond(
1214        remote: &Channel,
1215        expected_seid: u8,
1216        response_capabilities: &[u8],
1217    ) {
1218        let received = recv_remote(&remote).unwrap();
1219        // Last half of header must be Single (0b00) and Command (0b00)
1220        assert_eq!(0x00, received[0] & 0xF);
1221        assert_eq!(0x02, received[1]); // 0x02 = Get Capabilities
1222        assert_eq!(expected_seid << 2, received[2]);
1223
1224        let txlabel_raw = received[0] & 0xF0;
1225
1226        // Expect a get capabilities and respond.
1227        #[rustfmt::skip]
1228        let mut get_capabilities_rsp = vec![
1229            txlabel_raw << 4 | 0x2, // TxLabel (same) + ResponseAccept (0x02)
1230            0x02 // GetCapabilities
1231        ];
1232
1233        get_capabilities_rsp.extend_from_slice(response_capabilities);
1234
1235        assert!(remote.write(&get_capabilities_rsp).is_ok());
1236    }
1237
1238    fn expect_get_all_capabilities_and_respond(
1239        remote: &Channel,
1240        expected_seid: u8,
1241        response_capabilities: &[u8],
1242    ) {
1243        let received = recv_remote(&remote).unwrap();
1244        // Last half of header must be Single (0b00) and Command (0b00)
1245        assert_eq!(0x00, received[0] & 0xF);
1246        assert_eq!(0x0C, received[1]); // 0x0C = Get All Capabilities
1247        assert_eq!(expected_seid << 2, received[2]);
1248
1249        let txlabel_raw = received[0] & 0xF0;
1250
1251        // Expect a get capabilities and respond.
1252        #[rustfmt::skip]
1253        let mut get_capabilities_rsp = vec![
1254            txlabel_raw << 4 | 0x2, // TxLabel (same) + ResponseAccept (0x02)
1255            0x0C // GetAllCapabilities
1256        ];
1257
1258        get_capabilities_rsp.extend_from_slice(response_capabilities);
1259
1260        assert!(remote.write(&get_capabilities_rsp).is_ok());
1261    }
1262
1263    #[fuchsia::test]
1264    fn disconnected() {
1265        let mut exec = fasync::TestExecutor::new();
1266        let (proxy, _stream) = create_proxy_and_stream::<ProfileMarker>();
1267        let (remote, signaling) = Channel::create();
1268
1269        let id = PeerId(1);
1270
1271        let avdtp = avdtp::Peer::new(signaling);
1272        let peer = Peer::create(
1273            id,
1274            avdtp,
1275            Streams::default(),
1276            None,
1277            proxy,
1278            bt_metrics::MetricsLogger::default(),
1279        );
1280
1281        let closed_fut = peer.closed();
1282
1283        let mut closed_fut = pin!(closed_fut);
1284
1285        assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
1286
1287        // Close the remote channel
1288        drop(remote);
1289
1290        assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
1291    }
1292
1293    #[fuchsia::test]
1294    fn peer_collect_capabilities_success() {
1295        let mut exec = fasync::TestExecutor::new();
1296
1297        let (remote, _, cobalt_receiver, peer) = setup_test_peer(true, build_test_streams(), None);
1298
1299        let p: ProfileDescriptor = ProfileDescriptor {
1300            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1301            major_version: Some(1),
1302            minor_version: Some(2),
1303            ..Default::default()
1304        };
1305        let _ = peer.set_descriptor(p);
1306
1307        let collect_future = peer.collect_capabilities();
1308        let mut collect_future = pin!(collect_future);
1309
1310        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1311
1312        // Expect a discover command.
1313        let received = recv_remote(&remote).unwrap();
1314        // Last half of header must be Single (0b00) and Command (0b00)
1315        assert_eq!(0x00, received[0] & 0xF);
1316        assert_eq!(0x01, received[1]); // 0x01 = Discover
1317
1318        let txlabel_raw = received[0] & 0xF0;
1319
1320        // Respond with a set of streams.
1321        let response: &[u8] = &[
1322            txlabel_raw << 4 | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1323            0x01,                              // Discover
1324            0x3E << 2 | 0x0 << 1,              // SEID (3E), Not In Use (0b0)
1325            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1326            0x01 << 2 | 0x1 << 1,              // SEID (1), In Use (0b1)
1327            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1328        ];
1329        assert!(remote.write(response).is_ok());
1330
1331        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1332
1333        // Expect a get capabilities and respond.
1334        #[rustfmt::skip]
1335        let capabilities_rsp = &[
1336            // MediaTransport (Length of Service Capability = 0)
1337            0x01, 0x00,
1338            // Media Codec (LOSC = 2 + 4), Media Type Audio (0x00), Codec type (0x04), Codec specific 0xF09F9296
1339            0x07, 0x06, 0x00, 0x04, 0xF0, 0x9F, 0x92, 0x96
1340        ];
1341        expect_get_capabilities_and_respond(&remote, 0x3E, capabilities_rsp);
1342
1343        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1344
1345        // Expect a get capabilities and respond.
1346        #[rustfmt::skip]
1347        let capabilities_rsp = &[
1348            // MediaTransport (Length of Service Capability = 0)
1349            0x01, 0x00,
1350            // Media Codec (LOSC = 2 + 2), Media Type Audio (0x00), Codec type (0x00), Codec specific 0xC0DE
1351            0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1352        ];
1353        expect_get_capabilities_and_respond(&remote, 0x01, capabilities_rsp);
1354
1355        match exec.run_until_stalled(&mut collect_future) {
1356            Poll::Pending => panic!("collect capabilities should be complete"),
1357            Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1358            Poll::Ready(Ok(endpoints)) => {
1359                let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1360                let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1361                for stream in endpoints {
1362                    if stream.local_id() == &first_seid {
1363                        let expected_caps = vec![
1364                            ServiceCapability::MediaTransport,
1365                            ServiceCapability::MediaCodec {
1366                                media_type: avdtp::MediaType::Audio,
1367                                codec_type: avdtp::MediaCodecType::new(0x04),
1368                                codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1369                            },
1370                        ];
1371                        assert_eq!(&expected_caps, stream.capabilities());
1372                    } else if stream.local_id() == &second_seid {
1373                        let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1374                        assert_eq!(Some(&expected_codec_type), stream.codec_type());
1375                    } else {
1376                        panic!("Unexpected endpoint in the streams collected");
1377                    }
1378                }
1379            }
1380        }
1381
1382        // Collect reported cobalt logs.
1383        let mut recv = cobalt_receiver.expect("should have receiver");
1384        let mut log_events = Vec::new();
1385        while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1386            log_events.push(respond_to_metrics_req_for_test(req));
1387        }
1388
1389        // Should have sent two metric events for codec and one for capability.
1390        assert_eq!(3, log_events.len());
1391        assert!(log_events.contains(&MetricEvent {
1392            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1393            event_codes: vec![
1394                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1395            ],
1396            payload: MetricEventPayload::Count(1),
1397        }));
1398        assert!(log_events.contains(&MetricEvent {
1399            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1400            event_codes: vec![
1401                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac as u32
1402            ],
1403            payload: MetricEventPayload::Count(1),
1404        }));
1405        assert!(log_events.contains(&MetricEvent {
1406            metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1407            event_codes: vec![
1408                bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1409            ],
1410            payload: MetricEventPayload::Count(1),
1411        }));
1412
1413        // The second time, we don't expect to ask the peer again.
1414        let collect_future = peer.collect_capabilities();
1415        let mut collect_future = pin!(collect_future);
1416
1417        match exec.run_until_stalled(&mut collect_future) {
1418            Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1419            x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1420        };
1421    }
1422
1423    #[fuchsia::test]
1424    fn peer_collect_all_capabilities_success() {
1425        let mut exec = fasync::TestExecutor::new();
1426
1427        let (remote, _, cobalt_receiver, peer) = setup_test_peer(true, build_test_streams(), None);
1428        let p: ProfileDescriptor = ProfileDescriptor {
1429            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1430            major_version: Some(1),
1431            minor_version: Some(3),
1432            ..Default::default()
1433        };
1434        let _ = peer.set_descriptor(p);
1435
1436        let collect_future = peer.collect_capabilities();
1437        let mut collect_future = pin!(collect_future);
1438
1439        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1440
1441        // Expect a discover command.
1442        let received = recv_remote(&remote).unwrap();
1443        // Last half of header must be Single (0b00) and Command (0b00)
1444        assert_eq!(0x00, received[0] & 0xF);
1445        assert_eq!(0x01, received[1]); // 0x01 = Discover
1446
1447        let txlabel_raw = received[0] & 0xF0;
1448
1449        // Respond with a set of streams.
1450        let response: &[u8] = &[
1451            txlabel_raw << 4 | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1452            0x01,                              // Discover
1453            0x3E << 2 | 0x0 << 1,              // SEID (3E), Not In Use (0b0)
1454            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1455            0x01 << 2 | 0x1 << 1,              // SEID (1), In Use (0b1)
1456            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1457        ];
1458        assert!(remote.write(response).is_ok());
1459
1460        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1461
1462        // Expect a get all capabilities and respond.
1463        #[rustfmt::skip]
1464        let capabilities_rsp = &[
1465            // MediaTransport (Length of Service Capability = 0)
1466            0x01, 0x00,
1467            // Media Codec (LOSC = 2 + 4), Media Type Audio (0x00), Codec type (0x40), Codec specific 0xF09F9296
1468            0x07, 0x06, 0x00, 0x40, 0xF0, 0x9F, 0x92, 0x96,
1469            // Delay Reporting (LOSC = 0)
1470            0x08, 0x00
1471        ];
1472        expect_get_all_capabilities_and_respond(&remote, 0x3E, capabilities_rsp);
1473
1474        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1475
1476        // Expect a get all capabilities and respond.
1477        #[rustfmt::skip]
1478        let capabilities_rsp = &[
1479            // MediaTransport (Length of Service Capability = 0)
1480            0x01, 0x00,
1481            // Media Codec (LOSC = 2 + 2), Media Type Audio (0x00), Codec type (0x00), Codec specific 0xC0DE
1482            0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1483        ];
1484        expect_get_all_capabilities_and_respond(&remote, 0x01, capabilities_rsp);
1485
1486        match exec.run_until_stalled(&mut collect_future) {
1487            Poll::Pending => panic!("collect capabilities should be complete"),
1488            Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1489            Poll::Ready(Ok(endpoints)) => {
1490                let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1491                let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1492                for stream in endpoints {
1493                    if stream.local_id() == &first_seid {
1494                        let expected_caps = vec![
1495                            ServiceCapability::MediaTransport,
1496                            ServiceCapability::MediaCodec {
1497                                media_type: avdtp::MediaType::Audio,
1498                                codec_type: avdtp::MediaCodecType::new(0x40),
1499                                codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1500                            },
1501                            ServiceCapability::DelayReporting,
1502                        ];
1503                        assert_eq!(&expected_caps, stream.capabilities());
1504                    } else if stream.local_id() == &second_seid {
1505                        let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1506                        assert_eq!(Some(&expected_codec_type), stream.codec_type());
1507                    } else {
1508                        panic!("Unexpected endpoint in the streams collected");
1509                    }
1510                }
1511            }
1512        }
1513
1514        // Collect reported cobalt logs.
1515        let mut recv = cobalt_receiver.expect("should have receiver");
1516        let mut log_events = Vec::new();
1517        while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1518            log_events.push(respond_to_metrics_req_for_test(req));
1519        }
1520
1521        // Should have sent two metric events for codec and two for capability.
1522        assert_eq!(4, log_events.len());
1523        assert!(log_events.contains(&MetricEvent {
1524            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1525            event_codes: vec![
1526                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown as u32
1527            ],
1528            payload: MetricEventPayload::Count(1),
1529        }));
1530        assert!(log_events.contains(&MetricEvent {
1531            metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1532            event_codes: vec![
1533                bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1534            ],
1535            payload: MetricEventPayload::Count(1),
1536        }));
1537        assert!(log_events.contains(&MetricEvent {
1538            metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1539            event_codes: vec![
1540                bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1541            ],
1542            payload: MetricEventPayload::Count(1),
1543        }));
1544        assert!(log_events.contains(&MetricEvent {
1545            metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1546            event_codes: vec![
1547                bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport as u32
1548            ],
1549            payload: MetricEventPayload::Count(1),
1550        }));
1551
1552        // The second time, we don't expect to ask the peer again.
1553        let collect_future = peer.collect_capabilities();
1554        let mut collect_future = pin!(collect_future);
1555
1556        match exec.run_until_stalled(&mut collect_future) {
1557            Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1558            x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1559        };
1560    }
1561
1562    #[fuchsia::test]
1563    fn peer_collect_capabilities_discovery_fails() {
1564        let mut exec = fasync::TestExecutor::new();
1565
1566        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1567
1568        let collect_future = peer.collect_capabilities();
1569        let mut collect_future = pin!(collect_future);
1570
1571        // Shouldn't finish yet.
1572        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1573
1574        // Expect a discover command.
1575        let received = recv_remote(&remote).unwrap();
1576        // Last half of header must be Single (0b00) and Command (0b00)
1577        assert_eq!(0x00, received[0] & 0xF);
1578        assert_eq!(0x01, received[1]); // 0x01 = Discover
1579
1580        let txlabel_raw = received[0] & 0xF0;
1581
1582        // Respond with an eror.
1583        let response: &[u8] = &[
1584            txlabel_raw | 0x0 << 2 | 0x3, // txlabel (same), Single (0b00), Response Reject (0b11)
1585            0x01,                         // Discover
1586            0x31,                         // BAD_STATE
1587        ];
1588        assert!(remote.write(response).is_ok());
1589
1590        // Should be done with an error.
1591        // Should finish!
1592        match exec.run_until_stalled(&mut collect_future) {
1593            Poll::Pending => panic!("Should be ready after discovery failure"),
1594            Poll::Ready(Ok(x)) => panic!("Should be an error but returned {x:?}"),
1595            Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
1596                assert_eq!(Some(Ok(avdtp::ErrorCode::BadState)), e.error_code());
1597            }
1598            Poll::Ready(Err(e)) => panic!("Should have been a RemoteRejected was was {e:?}"),
1599        }
1600    }
1601
1602    #[fuchsia::test]
1603    fn peer_collect_capabilities_get_capability_fails() {
1604        let mut exec = fasync::TestExecutor::new();
1605
1606        let (remote, _, _, peer) = setup_test_peer(true, build_test_streams(), None);
1607
1608        let collect_future = peer.collect_capabilities();
1609        let mut collect_future = pin!(collect_future);
1610
1611        // Shouldn't finish yet.
1612        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1613
1614        // Expect a discover command.
1615        let received = recv_remote(&remote).unwrap();
1616        // Last half of header must be Single (0b00) and Command (0b00)
1617        assert_eq!(0x00, received[0] & 0xF);
1618        assert_eq!(0x01, received[1]); // 0x01 = Discover
1619
1620        let txlabel_raw = received[0] & 0xF0;
1621
1622        // Respond with a set of streams.
1623        let response: &[u8] = &[
1624            txlabel_raw << 4 | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1625            0x01,                              // Discover
1626            0x3E << 2 | 0x0 << 1,              // SEID (3E), Not In Use (0b0)
1627            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1628            0x01 << 2 | 0x1 << 1,              // SEID (1), In Use (0b1)
1629            0x00 << 4 | 0x1 << 3,              // Audio (0x00), Sink (0x01)
1630        ];
1631        assert!(remote.write(response).is_ok());
1632
1633        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1634
1635        // Expect a get capabilities request
1636        let expected_seid = 0x3E;
1637        let received = recv_remote(&remote).unwrap();
1638        // Last half of header must be Single (0b00) and Command (0b00)
1639        assert_eq!(0x00, received[0] & 0xF);
1640        assert_eq!(0x02, received[1]); // 0x02 = Get Capabilities
1641        assert_eq!(expected_seid << 2, received[2]);
1642
1643        let txlabel_raw = received[0] & 0xF0;
1644
1645        let response: &[u8] = &[
1646            txlabel_raw | 0x0 << 2 | 0x3, // txlabel (same), Single (0b00), Response Reject (0b11)
1647            0x02,                         // Get Capabilities
1648            0x12,                         // BAD_ACP_SEID
1649        ];
1650        assert!(remote.write(response).is_ok());
1651
1652        assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1653
1654        // Expect a get capabilities request (skipped the last one)
1655        let expected_seid = 0x01;
1656        let received = recv_remote(&remote).unwrap();
1657        // Last half of header must be Single (0b00) and Command (0b00)
1658        assert_eq!(0x00, received[0] & 0xF);
1659        assert_eq!(0x02, received[1]); // 0x02 = Get Capabilities
1660        assert_eq!(expected_seid << 2, received[2]);
1661
1662        let txlabel_raw = received[0] & 0xF0;
1663
1664        let response: &[u8] = &[
1665            txlabel_raw | 0x0 << 2 | 0x3, // txlabel (same), Single (0b00), Response Reject (0b11)
1666            0x02,                         // Get Capabilities
1667            0x12,                         // BAD_ACP_SEID
1668        ];
1669        assert!(remote.write(response).is_ok());
1670
1671        // Should be done without an error, but with no streams.
1672        match exec.run_until_stalled(&mut collect_future) {
1673            Poll::Pending => panic!("Should be ready after discovery failure"),
1674            Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1675            Poll::Ready(Ok(map)) => assert_eq!(0, map.len()),
1676        }
1677    }
1678
1679    fn receive_simple_accept(remote: &Channel, signal_id: u8) {
1680        let received = recv_remote(&remote).expect("expected a packet");
1681        // Last half of header must be Single (0b00) and Command (0b00)
1682        assert_eq!(0x00, received[0] & 0xF);
1683        assert_eq!(signal_id, received[1]);
1684
1685        let txlabel_raw = received[0] & 0xF0;
1686
1687        let response: &[u8] = &[
1688            txlabel_raw | 0x0 << 2 | 0x2, // txlabel (same), Single (0b00), Response Accept (0b10)
1689            signal_id,
1690        ];
1691        assert!(remote.write(response).is_ok());
1692    }
1693
1694    #[fuchsia::test]
1695    fn peer_stream_start_success() {
1696        let mut exec = fasync::TestExecutor::new();
1697
1698        let (remote, mut profile_request_stream, _, peer) =
1699            setup_test_peer(false, build_test_streams(), None);
1700
1701        let remote_seid = 2_u8.try_into().unwrap();
1702
1703        let codec_params = ServiceCapability::MediaCodec {
1704            media_type: avdtp::MediaType::Audio,
1705            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1706            codec_extra: vec![0x11, 0x45, 51, 51],
1707        };
1708
1709        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1710        let mut start_future = pin!(start_future);
1711
1712        match exec.run_until_stalled(&mut start_future) {
1713            Poll::Pending => {}
1714            x => panic!("Expected pending, but got {x:?}"),
1715        };
1716
1717        receive_simple_accept(&remote, 0x03); // Set Configuration
1718
1719        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1720
1721        receive_simple_accept(&remote, 0x06); // Open
1722
1723        match exec.run_until_stalled(&mut start_future) {
1724            Poll::Pending => {}
1725            Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1726            Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1727        };
1728
1729        // Should connect the media channel after open.
1730        let (_, transport) = Channel::create();
1731
1732        let request = exec.run_until_stalled(&mut profile_request_stream.next());
1733        match request {
1734            Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
1735                assert_eq!(PeerId(1), peer_id.into());
1736                assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
1737                let channel = transport.try_into().unwrap();
1738                responder.send(Ok(channel)).expect("responder sends");
1739            }
1740            x => panic!("Should have sent a open l2cap request, but got {:?}", x),
1741        };
1742
1743        match exec.run_until_stalled(&mut start_future) {
1744            Poll::Pending => {}
1745            Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1746            Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1747        };
1748
1749        receive_simple_accept(&remote, 0x07); // Start
1750
1751        // Should return the media stream (which should be connected)
1752        // Should be done without an error, but with no streams.
1753        match exec.run_until_stalled(&mut start_future) {
1754            Poll::Pending => panic!("Should be ready after start succeeds"),
1755            Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1756            // TODO: confirm the stream is usable
1757            Poll::Ready(Ok(())) => {
1758                assert!(peer.is_streaming_now());
1759            }
1760        }
1761    }
1762
1763    #[fuchsia::test]
1764    fn peer_stream_start_picks_correct_direction() {
1765        let mut exec = fasync::TestExecutor::new();
1766
1767        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1768        let remote = avdtp::Peer::new(remote);
1769        let mut remote_events = remote.take_request_stream();
1770
1771        // Respond as if we have a single SBC Source Stream
1772        fn remote_handle_request(req: avdtp::Request) {
1773            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1774            let res = match req {
1775                avdtp::Request::Discover { responder } => {
1776                    let infos = [avdtp::StreamInformation::new(
1777                        expected_stream_id,
1778                        false,
1779                        avdtp::MediaType::Audio,
1780                        avdtp::EndpointType::Source,
1781                    )];
1782                    responder.send(&infos)
1783                }
1784                avdtp::Request::GetAllCapabilities { stream_id, responder }
1785                | avdtp::Request::GetCapabilities { stream_id, responder } => {
1786                    assert_eq!(expected_stream_id, stream_id);
1787                    let caps = vec![
1788                        ServiceCapability::MediaTransport,
1789                        ServiceCapability::MediaCodec {
1790                            media_type: avdtp::MediaType::Audio,
1791                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1792                            codec_extra: vec![0x11, 0x45, 51, 250],
1793                        },
1794                    ];
1795                    responder.send(&caps[..])
1796                }
1797                avdtp::Request::Open { responder, stream_id } => {
1798                    assert_eq!(expected_stream_id, stream_id);
1799                    responder.send()
1800                }
1801                avdtp::Request::SetConfiguration {
1802                    responder,
1803                    local_stream_id,
1804                    remote_stream_id,
1805                    ..
1806                } => {
1807                    assert_eq!(local_stream_id, expected_stream_id);
1808                    // This is the "sink" local stream id.
1809                    assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1810                    responder.send()
1811                }
1812                x => panic!("Unexpected request: {:?}", x),
1813            };
1814            res.expect("should be able to respond");
1815        }
1816
1817        // Need to discover the remote streams first, or the stream start will not work.
1818        let collect_capabilities_fut = peer.collect_capabilities();
1819        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1820
1821        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1822
1823        let request = exec.run_singlethreaded(&mut remote_events.next());
1824        remote_handle_request(request.expect("should have a discovery request").unwrap());
1825
1826        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1827        let request = exec.run_singlethreaded(&mut remote_events.next());
1828        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1829
1830        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1831
1832        // Try to start the stream.  It should continue to configure and connect.
1833        let remote_seid = 4_u8.try_into().unwrap();
1834
1835        let codec_params = ServiceCapability::MediaCodec {
1836            media_type: avdtp::MediaType::Audio,
1837            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1838            codec_extra: vec![0x11, 0x45, 51, 51],
1839        };
1840        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1841        let mut start_future = pin!(start_future);
1842
1843        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1844        let request = exec.run_singlethreaded(&mut remote_events.next());
1845        remote_handle_request(request.expect("should have a set_capabilities request").unwrap());
1846
1847        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1848        let request = exec.run_singlethreaded(&mut remote_events.next());
1849        remote_handle_request(request.expect("should have an open request").unwrap());
1850    }
1851
1852    #[fuchsia::test]
1853    fn peer_stream_start_strips_unsupported_local_capabilities() {
1854        let mut exec = fasync::TestExecutor::new();
1855
1856        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1857        let remote = avdtp::Peer::new(remote);
1858        let mut remote_events = remote.take_request_stream();
1859
1860        // Respond as if we have a single SBC Source Stream
1861        fn remote_handle_request(req: avdtp::Request) {
1862            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1863            let res = match req {
1864                avdtp::Request::Discover { responder } => {
1865                    let infos = [avdtp::StreamInformation::new(
1866                        expected_stream_id,
1867                        false,
1868                        avdtp::MediaType::Audio,
1869                        avdtp::EndpointType::Source,
1870                    )];
1871                    responder.send(&infos)
1872                }
1873                avdtp::Request::GetAllCapabilities { stream_id, responder }
1874                | avdtp::Request::GetCapabilities { stream_id, responder } => {
1875                    assert_eq!(expected_stream_id, stream_id);
1876                    let caps = vec![
1877                        ServiceCapability::MediaTransport,
1878                        // We don't have a local delay-reporting, so this shouldn't be requested.
1879                        ServiceCapability::DelayReporting,
1880                        ServiceCapability::MediaCodec {
1881                            media_type: avdtp::MediaType::Audio,
1882                            codec_type: avdtp::MediaCodecType::AUDIO_AAC,
1883                            codec_extra: vec![128, 0, 132, 134, 0, 0],
1884                        },
1885                    ];
1886                    responder.send(&caps[..])
1887                }
1888                avdtp::Request::Open { responder, stream_id } => {
1889                    assert_eq!(expected_stream_id, stream_id);
1890                    responder.send()
1891                }
1892                avdtp::Request::SetConfiguration {
1893                    responder,
1894                    local_stream_id,
1895                    remote_stream_id,
1896                    capabilities,
1897                } => {
1898                    assert_eq!(local_stream_id, expected_stream_id);
1899                    // This is the "sink" local stream id.
1900                    assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1901                    // Make sure we didn't request a DelayReport since the local Sink doesn't
1902                    // support it.
1903                    assert!(!capabilities.contains(&ServiceCapability::DelayReporting));
1904                    responder.send()
1905                }
1906                x => panic!("Unexpected request: {:?}", x),
1907            };
1908            res.expect("should be able to respond");
1909        }
1910
1911        // Need to discover the remote streams first, or the stream start will not work.
1912        let collect_capabilities_fut = peer.collect_capabilities();
1913        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1914
1915        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1916
1917        let request = exec.run_singlethreaded(&mut remote_events.next());
1918        remote_handle_request(request.expect("should have a discovery request").unwrap());
1919
1920        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1921        let request = exec.run_singlethreaded(&mut remote_events.next());
1922        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1923
1924        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1925
1926        // Try to start the stream.  It should continue to configure and connect.
1927        let remote_seid = 4_u8.try_into().unwrap();
1928
1929        let codec_params = ServiceCapability::MediaCodec {
1930            media_type: avdtp::MediaType::Audio,
1931            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1932            codec_extra: vec![0x11, 0x45, 51, 51],
1933        };
1934        let start_future =
1935            peer.stream_start(remote_seid, vec![codec_params, ServiceCapability::DelayReporting]);
1936        let mut start_future = pin!(start_future);
1937
1938        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1939        let request = exec.run_singlethreaded(&mut remote_events.next());
1940        remote_handle_request(request.expect("should have a set_configuration request").unwrap());
1941
1942        assert!(exec.run_until_stalled(&mut start_future).is_pending());
1943        let request = exec.run_singlethreaded(&mut remote_events.next());
1944        remote_handle_request(request.expect("should have an open request").unwrap());
1945    }
1946
1947    #[fuchsia::test]
1948    fn peer_stream_start_orders_local_capabilities() {
1949        let mut exec = fasync::TestExecutor::new();
1950
1951        let (remote, _, _, peer) = setup_test_peer(false, build_test_streams_delayable(), None);
1952        let remote = avdtp::Peer::new(remote);
1953        let mut remote_events = remote.take_request_stream();
1954
1955        // Respond as if we have a single SBC Source Stream
1956        fn remote_handle_request(req: avdtp::Request) {
1957            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1958            let res = match req {
1959                avdtp::Request::Discover { responder } => {
1960                    let infos = [avdtp::StreamInformation::new(
1961                        expected_stream_id,
1962                        false,
1963                        avdtp::MediaType::Audio,
1964                        avdtp::EndpointType::Source,
1965                    )];
1966                    responder.send(&infos)
1967                }
1968                avdtp::Request::GetAllCapabilities { stream_id, responder }
1969                | avdtp::Request::GetCapabilities { stream_id, responder } => {
1970                    assert_eq!(expected_stream_id, stream_id);
1971                    let caps = &[
1972                        ServiceCapability::MediaTransport,
1973                        ServiceCapability::MediaCodec {
1974                            media_type: avdtp::MediaType::Audio,
1975                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1976                            codec_extra: vec![0x11, 0x45, 51, 250],
1977                        },
1978                        ServiceCapability::DelayReporting,
1979                    ];
1980                    responder.send(caps)
1981                }
1982                avdtp::Request::Open { responder, stream_id } => {
1983                    assert_eq!(expected_stream_id, stream_id);
1984                    responder.send()
1985                }
1986                avdtp::Request::SetConfiguration {
1987                    responder,
1988                    local_stream_id,
1989                    remote_stream_id,
1990                    capabilities,
1991                } => {
1992                    assert_eq!(local_stream_id, expected_stream_id);
1993                    // This is the "sink" local stream id.
1994                    assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1995                    // The capabilities should be in order.
1996                    let mut capabilities_ordered = capabilities.clone();
1997                    capabilities_ordered.sort_by_key(ServiceCapability::category);
1998                    assert_eq!(capabilities, capabilities_ordered);
1999                    responder.send()
2000                }
2001                x => panic!("Unexpected request: {:?}", x),
2002            };
2003            res.expect("should be able to respond");
2004        }
2005
2006        // Need to discover the remote streams first, or the stream start will not work.
2007        let collect_capabilities_fut = peer.collect_capabilities();
2008        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2009
2010        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2011
2012        let request = exec.run_singlethreaded(&mut remote_events.next());
2013        remote_handle_request(request.expect("should have a discovery request").unwrap());
2014
2015        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2016        let request = exec.run_singlethreaded(&mut remote_events.next());
2017        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2018
2019        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2020
2021        // Try to start the stream.  It should continue to configure and connect.
2022        let remote_seid = 4_u8.try_into().unwrap();
2023
2024        let codec_params = ServiceCapability::MediaCodec {
2025            media_type: avdtp::MediaType::Audio,
2026            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2027            codec_extra: vec![0x11, 0x45, 51, 51],
2028        };
2029        let start_future = peer.stream_start(
2030            remote_seid,
2031            vec![
2032                ServiceCapability::MediaTransport,
2033                ServiceCapability::DelayReporting,
2034                codec_params,
2035            ],
2036        );
2037        let mut start_future = pin!(start_future);
2038
2039        assert!(exec.run_until_stalled(&mut start_future).is_pending());
2040        let request = exec.run_singlethreaded(&mut remote_events.next());
2041        remote_handle_request(request.expect("should have a set_configuration request").unwrap());
2042
2043        assert!(exec.run_until_stalled(&mut start_future).is_pending());
2044        let request = exec.run_singlethreaded(&mut remote_events.next());
2045        remote_handle_request(request.expect("should have an open request").unwrap());
2046    }
2047
2048    /// Tests that A2DP streaming does not start if the streaming permit is revoked during streaming
2049    /// setup.
2050    #[fuchsia::test]
2051    fn peer_stream_start_permit_revoked() {
2052        let mut exec = fasync::TestExecutor::new();
2053
2054        let test_permits = Permits::new(1);
2055        let (remote, mut profile_request_stream, _, peer) =
2056            setup_test_peer(false, build_test_streams(), Some(test_permits.clone()));
2057
2058        let remote_seid = 2_u8.try_into().unwrap();
2059
2060        let codec_params = ServiceCapability::MediaCodec {
2061            media_type: avdtp::MediaType::Audio,
2062            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2063            codec_extra: vec![0x11, 0x45, 51, 51],
2064        };
2065
2066        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2067        let mut start_future = pin!(start_future);
2068
2069        let _ = exec
2070            .run_until_stalled(&mut start_future)
2071            .expect_pending("waiting for set config response");
2072        receive_simple_accept(&remote, 0x03); // Set Configuration
2073        exec.run_until_stalled(&mut start_future).expect_pending("waiting for open response");
2074        receive_simple_accept(&remote, 0x06); // Open
2075        exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2076        assert!(!peer.is_streaming_now());
2077
2078        // Should connect the media channel after open.
2079        let (_, transport) = Channel::create();
2080
2081        let request = exec.run_until_stalled(&mut profile_request_stream.next());
2082        match request {
2083            Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
2084                assert_eq!(PeerId(1), peer_id.into());
2085                assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
2086                let channel = transport.try_into().unwrap();
2087                responder.send(Ok(channel)).expect("responder sends");
2088            }
2089            x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2090        };
2091
2092        exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2093        assert!(!peer.is_streaming_now());
2094
2095        // Before peer responds to start, the permit gets taken.
2096        let seized_permits = test_permits.seize();
2097        assert_eq!(seized_permits.len(), 1);
2098        receive_simple_accept(&remote, 0x07); // Start
2099
2100        // Streaming should not locally begin because there is no available permit. The Start
2101        // response is handled gracefully.
2102        exec.run_until_stalled(&mut start_future)
2103            .expect_pending("waiting to send outgoing suspend");
2104        assert!(!peer.is_streaming_now());
2105        // We should issue an outgoing suspend request to synchronize state with the remote peer.
2106        receive_simple_accept(&remote, 0x09); // Suspend
2107
2108        // The start future should resolve without Error, and A2DP should not have started
2109        // streaming.
2110        let () = exec
2111            .run_until_stalled(&mut start_future)
2112            .expect("start finished")
2113            .expect("suspended stream is ok");
2114        assert!(!peer.is_streaming_now());
2115    }
2116
2117    #[fuchsia::test]
2118    fn peer_stream_start_fails_wrong_direction() {
2119        let mut exec = fasync::TestExecutor::new();
2120
2121        // Setup peers with only one Source Stream.
2122        let mut streams = Streams::default();
2123        let source = Stream::build(
2124            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2125            TestMediaTaskBuilder::new().builder(),
2126        );
2127        streams.insert(source);
2128
2129        let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2130        let remote = avdtp::Peer::new(remote);
2131        let mut remote_events = remote.take_request_stream();
2132
2133        // Respond as if we have a single SBC Source Stream
2134        fn remote_handle_request(req: avdtp::Request) {
2135            let expected_stream_id: StreamEndpointId = 2_u8.try_into().unwrap();
2136            let res = match req {
2137                avdtp::Request::Discover { responder } => {
2138                    let infos = [avdtp::StreamInformation::new(
2139                        expected_stream_id,
2140                        false,
2141                        avdtp::MediaType::Audio,
2142                        avdtp::EndpointType::Source,
2143                    )];
2144                    responder.send(&infos)
2145                }
2146                avdtp::Request::GetAllCapabilities { stream_id, responder }
2147                | avdtp::Request::GetCapabilities { stream_id, responder } => {
2148                    assert_eq!(expected_stream_id, stream_id);
2149                    let caps = vec![
2150                        ServiceCapability::MediaTransport,
2151                        ServiceCapability::MediaCodec {
2152                            media_type: avdtp::MediaType::Audio,
2153                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2154                            codec_extra: vec![0x11, 0x45, 51, 250],
2155                        },
2156                    ];
2157                    responder.send(&caps[..])
2158                }
2159                avdtp::Request::Open { responder, .. } => responder.send(),
2160                avdtp::Request::SetConfiguration { responder, .. } => responder.send(),
2161                x => panic!("Unexpected request: {:?}", x),
2162            };
2163            res.expect("should be able to respond");
2164        }
2165
2166        // Need to discover the remote streams first, or the stream start will always work.
2167        let collect_capabilities_fut = peer.collect_capabilities();
2168        let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2169
2170        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2171
2172        let request = exec.run_singlethreaded(&mut remote_events.next());
2173        remote_handle_request(request.expect("should have a discovery request").unwrap());
2174
2175        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2176        let request = exec.run_singlethreaded(&mut remote_events.next());
2177        remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2178
2179        assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2180
2181        // Try to start the stream.  It should fail with OutOfRange because we can't connect a Source to a Source.
2182        let remote_seid = 2_u8.try_into().unwrap();
2183
2184        let codec_params = ServiceCapability::MediaCodec {
2185            media_type: avdtp::MediaType::Audio,
2186            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2187            codec_extra: vec![0x11, 0x45, 51, 51],
2188        };
2189        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2190        let mut start_future = pin!(start_future);
2191
2192        match exec.run_until_stalled(&mut start_future) {
2193            Poll::Ready(Err(avdtp::Error::OutOfRange)) => {}
2194            x => panic!("Expected a ready OutOfRange error but got {:?}", x),
2195        };
2196    }
2197
2198    #[fuchsia::test]
2199    fn peer_stream_start_fails_to_connect() {
2200        let mut exec = fasync::TestExecutor::new();
2201
2202        let (remote, mut profile_request_stream, _, peer) =
2203            setup_test_peer(false, build_test_streams(), None);
2204
2205        let remote_seid = 2_u8.try_into().unwrap();
2206
2207        let codec_params = ServiceCapability::MediaCodec {
2208            media_type: avdtp::MediaType::Audio,
2209            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2210            codec_extra: vec![0x11, 0x45, 51, 51],
2211        };
2212
2213        let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2214        let mut start_future = pin!(start_future);
2215
2216        match exec.run_until_stalled(&mut start_future) {
2217            Poll::Pending => {}
2218            x => panic!("was expecting pending but got {x:?}"),
2219        };
2220
2221        receive_simple_accept(&remote, 0x03); // Set Configuration
2222
2223        assert!(exec.run_until_stalled(&mut start_future).is_pending());
2224
2225        receive_simple_accept(&remote, 0x06); // Open
2226
2227        match exec.run_until_stalled(&mut start_future) {
2228            Poll::Pending => {}
2229            Poll::Ready(x) => panic!("Expected to be pending but {x:?}"),
2230        };
2231
2232        // Should connect the media channel after open.
2233        let request = exec.run_until_stalled(&mut profile_request_stream.next());
2234        match request {
2235            Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, responder, .. }))) => {
2236                assert_eq!(PeerId(1), peer_id.into());
2237                responder.send(Err(ErrorCode::Failed)).expect("responder sends");
2238            }
2239            x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2240        };
2241
2242        // Should return an error.
2243        // Should be done without an error, but with no streams.
2244        match exec.run_until_stalled(&mut start_future) {
2245            Poll::Pending => panic!("Should be ready after start fails"),
2246            Poll::Ready(Ok(_stream)) => panic!("Shouldn't have succeeded stream here"),
2247            Poll::Ready(Err(_)) => {}
2248        }
2249    }
2250
2251    /// Test that the delay reports get acknowledged and they are sent to cobalt.
2252    #[fuchsia::test]
2253    async fn peer_delay_report() {
2254        let (remote, _profile_requests, cobalt_recv, peer) =
2255            setup_test_peer(true, build_test_streams(), None);
2256        let remote_peer = avdtp::Peer::new(remote);
2257        let mut remote_events = remote_peer.take_request_stream();
2258
2259        // Respond as if we have a single SBC Sink Stream
2260        async fn remote_handle_request(req: avdtp::Request, peer: &avdtp::Peer) {
2261            let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
2262            // "peer" in this case is the test code Peer stream
2263            let expected_peer_stream_id: StreamEndpointId = 1_u8.try_into().unwrap();
2264            use avdtp::Request::*;
2265            match req {
2266                Discover { responder } => {
2267                    let infos = [avdtp::StreamInformation::new(
2268                        expected_stream_id,
2269                        false,
2270                        avdtp::MediaType::Audio,
2271                        avdtp::EndpointType::Sink,
2272                    )];
2273                    responder.send(&infos).expect("response should succeed");
2274                }
2275                GetAllCapabilities { stream_id, responder }
2276                | GetCapabilities { stream_id, responder } => {
2277                    assert_eq!(expected_stream_id, stream_id);
2278                    let caps = vec![
2279                        ServiceCapability::MediaTransport,
2280                        ServiceCapability::MediaCodec {
2281                            media_type: avdtp::MediaType::Audio,
2282                            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2283                            codec_extra: vec![0x11, 0x45, 51, 250],
2284                        },
2285                    ];
2286                    responder.send(&caps[..]).expect("response should succeed");
2287                    // Sending a delayreport before the stream is configured is not allowed, it's a
2288                    // bad state.
2289                    assert!(peer.delay_report(&expected_peer_stream_id, 0xc0de).await.is_err());
2290                }
2291                Open { responder, stream_id } => {
2292                    // Configuration has happened but open not succeeded yet, send delay reports.
2293                    assert!(peer.delay_report(&expected_stream_id, 0xc0de).await.is_err());
2294                    // Send a delay report to the peer.
2295                    peer.delay_report(&expected_peer_stream_id, 0xc0de)
2296                        .await
2297                        .expect("should get acked correctly");
2298                    assert_eq!(expected_stream_id, stream_id);
2299                    responder.send().expect("response should succeed");
2300                }
2301                SetConfiguration { responder, local_stream_id, remote_stream_id, .. } => {
2302                    assert_eq!(local_stream_id, expected_stream_id);
2303                    assert_eq!(remote_stream_id, expected_peer_stream_id);
2304                    responder.send().expect("should send back response without issue");
2305                }
2306                x => panic!("Unexpected request: {:?}", x),
2307            };
2308        }
2309
2310        let collect_fut = pin!(peer.collect_capabilities());
2311
2312        // Discover then a GetCapabilities.
2313        let Either::Left((request, collect_fut)) =
2314            futures::future::select(remote_events.next(), collect_fut).await
2315        else {
2316            panic!("Collect future shouldn't finish first");
2317        };
2318        let collect_fut = pin!(collect_fut);
2319        remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2320        let Either::Left((request, collect_fut)) =
2321            futures::future::select(remote_events.next(), collect_fut).await
2322        else {
2323            panic!("Collect future shouldn't finish first");
2324        };
2325        remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2326
2327        // Collect future should be able to finish now.
2328        assert_eq!(1, collect_fut.await.expect("should get the remote endpoints back").len());
2329
2330        // Try to start the stream.  It should go through the normal motions,
2331        let remote_seid = 4_u8.try_into().unwrap();
2332
2333        let codec_params = ServiceCapability::MediaCodec {
2334            media_type: avdtp::MediaType::Audio,
2335            codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2336            codec_extra: vec![0x11, 0x45, 51, 51],
2337        };
2338
2339        // We don't expect this task to finish before being dropped, since we never respond to the
2340        // request to open the transport channel.
2341        let _start_task = fasync::Task::spawn(async move {
2342            let _ = peer.stream_start(remote_seid, vec![codec_params]).await;
2343            panic!("stream start task finished");
2344        });
2345
2346        let request = remote_events.next().await.expect("should have set_config").unwrap();
2347        remote_handle_request(request, &remote_peer).await;
2348
2349        let request = remote_events.next().await.expect("should have open").unwrap();
2350        remote_handle_request(request, &remote_peer).await;
2351
2352        let mut cobalt = cobalt_recv.expect("should have receiver");
2353
2354        let mut got_ids = HashMap::new();
2355        let delay_metric_id = bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID;
2356        while got_ids.len() < 3 || *got_ids.get(&delay_metric_id).unwrap_or(&0) < 3 {
2357            let report = respond_to_metrics_req_for_test(cobalt.next().await.unwrap().unwrap());
2358            let _ = got_ids.entry(report.metric_id).and_modify(|x| *x += 1).or_insert(1);
2359            // All the delay reports should report the same value correctly.
2360            if report.metric_id == delay_metric_id {
2361                assert_eq!(MetricEventPayload::IntegerValue(0xc0de * 100000), report.payload);
2362            }
2363        }
2364        assert!(got_ids.contains_key(&bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID));
2365        assert!(got_ids.contains_key(&bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID));
2366        assert!(got_ids.contains_key(&delay_metric_id));
2367        // There should have been three reports.
2368        // We report the delay amount even if it fails to work.
2369        assert_eq!(got_ids.get(&delay_metric_id).cloned(), Some(3));
2370    }
2371
2372    fn sbc_capabilities() -> Vec<ServiceCapability> {
2373        let sbc_codec_info = SbcCodecInfo::new(
2374            SbcSamplingFrequency::FREQ48000HZ,
2375            SbcChannelMode::JOINT_STEREO,
2376            SbcBlockCount::SIXTEEN,
2377            SbcSubBands::EIGHT,
2378            SbcAllocation::LOUDNESS,
2379            /* min_bpv= */ 53,
2380            /* max_bpv= */ 53,
2381        )
2382        .expect("sbc codec info");
2383
2384        vec![avdtp::ServiceCapability::MediaTransport, sbc_codec_info.into()]
2385    }
2386
2387    /// Test that the remote end can configure and start a stream.
2388    #[fuchsia::test]
2389    fn peer_as_acceptor() {
2390        let mut exec = fasync::TestExecutor::new();
2391
2392        let mut streams = Streams::default();
2393        let mut test_builder = TestMediaTaskBuilder::new();
2394        streams.insert(Stream::build(
2395            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2396            test_builder.builder(),
2397        ));
2398
2399        let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2400        let remote_peer = avdtp::Peer::new(remote);
2401
2402        let discover_fut = remote_peer.discover();
2403        let mut discover_fut = pin!(discover_fut);
2404
2405        let expected = vec![make_sbc_endpoint(1, avdtp::EndpointType::Source).information()];
2406        match exec.run_until_stalled(&mut discover_fut) {
2407            Poll::Ready(Ok(res)) => assert_eq!(res, expected),
2408            x => panic!("Expected discovery to complete and got {:?}", x),
2409        };
2410
2411        let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2412        let unknown_endpoint_id = 2_u8.try_into().expect("should be able to get sbc endpointid");
2413
2414        let get_caps_fut = remote_peer.get_capabilities(&sbc_endpoint_id);
2415        let mut get_caps_fut = pin!(get_caps_fut);
2416
2417        match exec.run_until_stalled(&mut get_caps_fut) {
2418            // There are two caps (mediatransport, mediacodec) in the sbc endpoint.
2419            Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2420            x => panic!("Get capabilities should be ready but got {:?}", x),
2421        };
2422
2423        let get_caps_fut = remote_peer.get_capabilities(&unknown_endpoint_id);
2424        let mut get_caps_fut = pin!(get_caps_fut);
2425
2426        match exec.run_until_stalled(&mut get_caps_fut) {
2427            Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2428                assert_eq!(Some(Ok(avdtp::ErrorCode::BadAcpSeid)), e.error_code())
2429            }
2430            x => panic!("Get capabilities should be a ready error but got {:?}", x),
2431        };
2432
2433        let get_caps_fut = remote_peer.get_all_capabilities(&sbc_endpoint_id);
2434        let mut get_caps_fut = pin!(get_caps_fut);
2435
2436        match exec.run_until_stalled(&mut get_caps_fut) {
2437            // There are two caps (mediatransport, mediacodec) in the sbc endpoint.
2438            Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2439            x => panic!("Get capabilities should be ready but got {:?}", x),
2440        };
2441
2442        let sbc_caps = sbc_capabilities();
2443        let set_config_fut =
2444            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2445        let mut set_config_fut = pin!(set_config_fut);
2446
2447        match exec.run_until_stalled(&mut set_config_fut) {
2448            Poll::Ready(Ok(())) => {}
2449            x => panic!("Set capabilities should be ready but got {:?}", x),
2450        };
2451
2452        let open_fut = remote_peer.open(&sbc_endpoint_id);
2453        let mut open_fut = pin!(open_fut);
2454        match exec.run_until_stalled(&mut open_fut) {
2455            Poll::Ready(Ok(())) => {}
2456            x => panic!("Open should be ready but got {:?}", x),
2457        };
2458
2459        // Establish a media transport stream
2460        let (_remote_transport, transport) = Channel::create();
2461
2462        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2463
2464        let stream_ids = vec![sbc_endpoint_id.clone()];
2465        let start_fut = remote_peer.start(&stream_ids);
2466        let mut start_fut = pin!(start_fut);
2467        match exec.run_until_stalled(&mut start_fut) {
2468            Poll::Ready(Ok(())) => {}
2469            x => panic!("Start should be ready but got {:?}", x),
2470        };
2471
2472        // The task should be created locally and started.
2473        let media_task = test_builder.expect_task();
2474        assert!(media_task.is_started());
2475
2476        let suspend_fut = remote_peer.suspend(&stream_ids);
2477        let mut suspend_fut = pin!(suspend_fut);
2478        match exec.run_until_stalled(&mut suspend_fut) {
2479            Poll::Ready(Ok(())) => {}
2480            x => panic!("Start should be ready but got {:?}", x),
2481        };
2482
2483        // Should have stopped the media task on suspend.
2484        assert!(!media_task.is_started());
2485    }
2486
2487    #[fuchsia::test]
2488    fn peer_set_config_reject_first() {
2489        let mut exec = fasync::TestExecutor::new();
2490
2491        let mut streams = Streams::default();
2492        let test_builder = TestMediaTaskBuilder::new();
2493        streams.insert(Stream::build(
2494            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2495            test_builder.builder(),
2496        ));
2497
2498        let (remote, _requests, _, _peer) = setup_test_peer(false, streams, None);
2499        let remote_peer = avdtp::Peer::new(remote);
2500
2501        let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2502
2503        let wrong_freq_sbc = &[SbcCodecInfo::new(
2504            SbcSamplingFrequency::FREQ44100HZ, // 44.1 is not supported by the caps from above.
2505            SbcChannelMode::JOINT_STEREO,
2506            SbcBlockCount::SIXTEEN,
2507            SbcSubBands::EIGHT,
2508            SbcAllocation::LOUDNESS,
2509            /* min_bpv= */ 53,
2510            /* max_bpv= */ 53,
2511        )
2512        .expect("sbc codec info")
2513        .into()];
2514
2515        let set_config_fut =
2516            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, wrong_freq_sbc);
2517        let mut set_config_fut = pin!(set_config_fut);
2518
2519        match exec.run_until_stalled(&mut set_config_fut) {
2520            Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2521                assert!(e.service_category().is_some())
2522            }
2523            x => panic!("Set capabilities should have been rejected but got {:?}", x),
2524        };
2525
2526        let sbc_caps = sbc_capabilities();
2527        let set_config_fut =
2528            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2529        let mut set_config_fut = pin!(set_config_fut);
2530
2531        match exec.run_until_stalled(&mut set_config_fut) {
2532            Poll::Ready(Ok(())) => {}
2533            x => panic!("Set capabilities should be ready but got {:?}", x),
2534        };
2535    }
2536
2537    #[fuchsia::test]
2538    fn peer_starts_waiting_streams() {
2539        let mut exec = fasync::TestExecutor::new_with_fake_time();
2540        exec.set_fake_time(fasync::MonotonicInstant::from_nanos(5_000_000_000));
2541
2542        let mut streams = Streams::default();
2543        let mut test_builder = TestMediaTaskBuilder::new();
2544        streams.insert(Stream::build(
2545            make_sbc_endpoint(1, avdtp::EndpointType::Source),
2546            test_builder.builder(),
2547        ));
2548
2549        let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2550        let remote_peer = avdtp::Peer::new(remote);
2551
2552        let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2553
2554        let sbc_caps = sbc_capabilities();
2555        let set_config_fut =
2556            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2557        let mut set_config_fut = pin!(set_config_fut);
2558
2559        match exec.run_until_stalled(&mut set_config_fut) {
2560            Poll::Ready(Ok(())) => {}
2561            x => panic!("Set capabilities should be ready but got {:?}", x),
2562        };
2563
2564        let open_fut = remote_peer.open(&sbc_endpoint_id);
2565        let mut open_fut = pin!(open_fut);
2566        match exec.run_until_stalled(&mut open_fut) {
2567            Poll::Ready(Ok(())) => {}
2568            x => panic!("Open should be ready but got {:?}", x),
2569        };
2570
2571        // Establish a media transport stream
2572        let (_remote_transport, transport) = Channel::create();
2573        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2574
2575        // The remote end should get a start request after the timeout.
2576        let mut remote_requests = remote_peer.take_request_stream();
2577        let next_remote_request_fut = remote_requests.next();
2578        let mut next_remote_request_fut = pin!(next_remote_request_fut);
2579
2580        // Nothing should happen immediately.
2581        assert!(exec.run_until_stalled(&mut next_remote_request_fut).is_pending());
2582
2583        // After the timeout has passed..
2584        exec.set_fake_time(zx::MonotonicDuration::from_seconds(3).after_now());
2585        let _ = exec.wake_expired_timers();
2586
2587        let stream_ids = match exec.run_until_stalled(&mut next_remote_request_fut) {
2588            Poll::Ready(Some(Ok(avdtp::Request::Start { responder, stream_ids }))) => {
2589                responder.send().unwrap();
2590                stream_ids
2591            }
2592            x => panic!("Expected to receive a start request for the stream, got {:?}", x),
2593        };
2594
2595        // We should start the media task, so the task should be created locally
2596        let media_task =
2597            exec.run_until_stalled(&mut test_builder.next_task()).expect("ready").unwrap();
2598        assert!(media_task.is_started());
2599
2600        // Remote peer should still be able to suspend the stream.
2601        let suspend_fut = remote_peer.suspend(&stream_ids);
2602        let mut suspend_fut = pin!(suspend_fut);
2603        match exec.run_until_stalled(&mut suspend_fut) {
2604            Poll::Ready(Ok(())) => {}
2605            x => panic!("Suspend should be ready but got {:?}", x),
2606        };
2607
2608        // Should have stopped the media task on suspend.
2609        assert!(!media_task.is_started());
2610    }
2611
2612    #[fuchsia::test]
2613    fn needs_permit_to_start_streams() {
2614        let mut exec = fasync::TestExecutor::new();
2615
2616        let mut streams = Streams::default();
2617        let mut test_builder = TestMediaTaskBuilder::new();
2618        streams.insert(Stream::build(
2619            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2620            test_builder.builder(),
2621        ));
2622        streams.insert(Stream::build(
2623            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2624            test_builder.builder(),
2625        ));
2626        let mut next_task_fut = test_builder.next_task();
2627
2628        let permits = Permits::new(1);
2629        let taken_permit = permits.get().expect("permit taken");
2630        let (remote, _profile_request_stream, _, peer) =
2631            setup_test_peer(false, streams, Some(permits.clone()));
2632        let remote_peer = avdtp::Peer::new(remote);
2633
2634        let sbc_endpoint_id = 1_u8.try_into().unwrap();
2635
2636        let sbc_caps = sbc_capabilities();
2637        let mut set_config_fut =
2638            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2639
2640        match exec.run_until_stalled(&mut set_config_fut) {
2641            Poll::Ready(Ok(())) => {}
2642            x => panic!("Set capabilities should be ready but got {:?}", x),
2643        };
2644
2645        let mut open_fut = remote_peer.open(&sbc_endpoint_id);
2646        match exec.run_until_stalled(&mut open_fut) {
2647            Poll::Ready(Ok(())) => {}
2648            x => panic!("Open should be ready but got {:?}", x),
2649        };
2650
2651        // Establish a media transport stream
2652        let (_remote_transport, transport) = Channel::create();
2653        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2654
2655        // Do the same, but for the OTHER stream.
2656        let sbc_endpoint_two = 2_u8.try_into().unwrap();
2657
2658        let mut set_config_fut =
2659            remote_peer.set_configuration(&sbc_endpoint_two, &sbc_endpoint_two, &sbc_caps);
2660
2661        match exec.run_until_stalled(&mut set_config_fut) {
2662            Poll::Ready(Ok(())) => {}
2663            x => panic!("Set capabilities should be ready but got {:?}", x),
2664        };
2665
2666        let mut open_fut = remote_peer.open(&sbc_endpoint_two);
2667        match exec.run_until_stalled(&mut open_fut) {
2668            Poll::Ready(Ok(())) => {}
2669            x => panic!("Open should be ready but got {:?}", x),
2670        };
2671
2672        // Establish a media transport stream
2673        let (_remote_transport_two, transport_two) = Channel::create();
2674        assert_eq!(Some(()), peer.receive_channel(transport_two).ok());
2675
2676        // Remote peer should still be able to try to start the stream, and we will say yes, but
2677        // that last seid looks wonky.
2678        let unknown_endpoint_id: StreamEndpointId = 9_u8.try_into().unwrap();
2679        let stream_ids = [sbc_endpoint_id.clone(), unknown_endpoint_id.clone()];
2680        let mut start_fut = remote_peer.start(&stream_ids);
2681        match exec.run_until_stalled(&mut start_fut) {
2682            Poll::Ready(Err(avdtp::Error::RemoteRejected(rejection))) => {
2683                assert_eq!(avdtp::ErrorCode::BadAcpSeid, rejection.error_code().unwrap().unwrap());
2684                assert_eq!(unknown_endpoint_id, rejection.stream_id().unwrap());
2685            }
2686            x => panic!("Start should be ready but got {:?}", x),
2687        };
2688
2689        // We can't get a permit (none are available) so we suspend the one we didn't error on.
2690        let mut remote_requests = remote_peer.take_request_stream();
2691
2692        let suspended_stream_ids = match exec.run_singlethreaded(&mut remote_requests.next()) {
2693            Some(Ok(avdtp::Request::Suspend { responder, stream_ids })) => {
2694                responder.send().unwrap();
2695                stream_ids
2696            }
2697            x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2698        };
2699
2700        assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2701        assert_eq!(1, suspended_stream_ids.len());
2702
2703        // And we should have not tried to start a task.
2704        match exec.run_until_stalled(&mut next_task_fut) {
2705            Poll::Pending => {}
2706            x => panic!("Local task should not have been created at this point: {:?}", x),
2707        };
2708
2709        // No matter how many times they ask to start, we will still suspend (but not queue another
2710        // reservation for the same id)
2711        let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
2712        match exec.run_until_stalled(&mut start_fut) {
2713            Poll::Ready(Ok(())) => {}
2714            x => panic!("Start should be ready but got {:?}", x),
2715        }
2716
2717        let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2718            Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2719                responder.send().unwrap();
2720                stream_ids
2721            }
2722            x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2723        };
2724        assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2725
2726        // After a permit is available, should try to start the first endpoint that failed.
2727        drop(taken_permit);
2728
2729        match exec.run_singlethreaded(&mut remote_requests.next()) {
2730            Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2731                assert_eq!(stream_ids, &[sbc_endpoint_id.clone()]);
2732                responder.send().unwrap();
2733            }
2734            x => panic!("Expected start on permit available but got {x:?}"),
2735        };
2736
2737        // And we should start a task.
2738        let media_task = match exec.run_until_stalled(&mut next_task_fut) {
2739            Poll::Ready(Some(task)) => task,
2740            x => panic!("Local task should be created at this point: {:?}", x),
2741        };
2742
2743        assert!(media_task.is_started());
2744
2745        // If the remote asks to start another one, we still suspend it immediately.
2746        let mut start_fut = remote_peer.start(&[sbc_endpoint_two.clone()]);
2747        match exec.run_until_stalled(&mut start_fut) {
2748            Poll::Ready(Ok(())) => {}
2749            x => panic!("Start should be ready but got {:?}", x),
2750        }
2751
2752        let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2753            Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2754                responder.send().unwrap();
2755                stream_ids
2756            }
2757            x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2758        };
2759
2760        assert!(suspended_stream_ids.contains(&sbc_endpoint_two));
2761        assert_eq!(1, suspended_stream_ids.len());
2762
2763        // Once the first one is done, the second can start.
2764        let mut suspend_fut = remote_peer.suspend(&[sbc_endpoint_id.clone()]);
2765        match exec.run_until_stalled(&mut suspend_fut) {
2766            Poll::Ready(Ok(())) => {}
2767            x => panic!("Start should be ready but got {:?}", x),
2768        }
2769
2770        match exec.run_singlethreaded(&mut remote_requests.next()) {
2771            Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2772                assert_eq!(stream_ids, &[sbc_endpoint_two]);
2773                responder.send().unwrap();
2774            }
2775            x => panic!("Expected start on permit available but got {x:?}"),
2776        };
2777    }
2778
2779    fn start_sbc_stream(
2780        exec: &mut fasync::TestExecutor,
2781        media_test_builder: &mut TestMediaTaskBuilder,
2782        peer: &Peer,
2783        remote_peer: &avdtp::Peer,
2784        local_id: &StreamEndpointId,
2785        remote_id: &StreamEndpointId,
2786    ) -> TestMediaTask {
2787        let sbc_caps = sbc_capabilities();
2788        let set_config_fut = remote_peer.set_configuration(&local_id, &remote_id, &sbc_caps);
2789        let mut set_config_fut = pin!(set_config_fut);
2790
2791        match exec.run_until_stalled(&mut set_config_fut) {
2792            Poll::Ready(Ok(())) => {}
2793            x => panic!("Set capabilities should be ready but got {:?}", x),
2794        };
2795
2796        let open_fut = remote_peer.open(&local_id);
2797        let mut open_fut = pin!(open_fut);
2798        match exec.run_until_stalled(&mut open_fut) {
2799            Poll::Ready(Ok(())) => {}
2800            x => panic!("Open should be ready but got {:?}", x),
2801        };
2802
2803        // Establish a media transport stream
2804        let (_remote_transport, transport) = Channel::create();
2805        assert_eq!(Some(()), peer.receive_channel(transport).ok());
2806
2807        // Remote peer should still be able to try to start the stream, and we will say yes.
2808        let stream_ids = [local_id.clone()];
2809        let start_fut = remote_peer.start(&stream_ids);
2810        let mut start_fut = pin!(start_fut);
2811        match exec.run_until_stalled(&mut start_fut) {
2812            Poll::Ready(Ok(())) => {}
2813            x => panic!("Start should be ready but got {:?}", x),
2814        };
2815
2816        // And we should start a media task.
2817        let media_task = media_test_builder.expect_task();
2818        assert!(media_task.is_started());
2819
2820        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
2821        media_task
2822    }
2823
2824    #[fuchsia::test]
2825    fn permits_can_be_revoked_and_reinstated_all() {
2826        let mut exec = fasync::TestExecutor::new();
2827
2828        let mut streams = Streams::default();
2829        let mut test_builder = TestMediaTaskBuilder::new();
2830        streams.insert(Stream::build(
2831            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2832            test_builder.builder(),
2833        ));
2834        let sbc_endpoint_id = 1_u8.try_into().unwrap();
2835        let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2836
2837        streams.insert(Stream::build(
2838            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2839            test_builder.builder(),
2840        ));
2841        let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2842        let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2843
2844        let permits = Permits::new(2);
2845
2846        let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2847        let remote_peer = avdtp::Peer::new(remote);
2848
2849        let one_media_task = start_sbc_stream(
2850            &mut exec,
2851            &mut test_builder,
2852            &peer,
2853            &remote_peer,
2854            &sbc_endpoint_id,
2855            &remote_sbc_endpoint_id,
2856        );
2857        let two_media_task = start_sbc_stream(
2858            &mut exec,
2859            &mut test_builder,
2860            &peer,
2861            &remote_peer,
2862            &sbc2_endpoint_id,
2863            &remote_sbc2_endpoint_id,
2864        );
2865
2866        // Someone comes along and revokes our permits.
2867        let taken_permits = permits.seize();
2868
2869        let remote_endpoints: HashSet<_> =
2870            [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2871
2872        // We should send a suspend to the other end, for both of them.
2873        let mut remote_requests = remote_peer.take_request_stream();
2874        let mut expected_suspends = remote_endpoints.clone();
2875        while !expected_suspends.is_empty() {
2876            match exec.run_until_stalled(&mut remote_requests.next()) {
2877                Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2878                    for stream_id in stream_ids {
2879                        assert!(expected_suspends.remove(&stream_id));
2880                    }
2881                    responder.send().expect("send response okay");
2882                }
2883                x => panic!("Expected suspension and got {:?}", x),
2884            }
2885        }
2886
2887        // And the media tasks should be stopped.
2888        assert!(!one_media_task.is_started());
2889        assert!(!two_media_task.is_started());
2890
2891        // After the permits are available again, we send a start, and start the media stream.
2892        drop(taken_permits);
2893
2894        let mut expected_starts = remote_endpoints.clone();
2895        while !expected_starts.is_empty() {
2896            match exec.run_singlethreaded(&mut remote_requests.next()) {
2897                Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2898                    for stream_id in stream_ids {
2899                        assert!(expected_starts.remove(&stream_id));
2900                    }
2901                    responder.send().expect("send response okay");
2902                }
2903                x => panic!("Expected start and got {:?}", x),
2904            }
2905        }
2906        // And we should start two media tasks.
2907
2908        let one_media_task = test_builder.expect_task();
2909        assert!(one_media_task.is_started());
2910        let two_media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
2911            Poll::Ready(Some(task)) => task,
2912            x => panic!("Expected another ready task but {x:?}"),
2913        };
2914        assert!(two_media_task.is_started());
2915    }
2916
2917    #[fuchsia::test]
2918    fn permits_can_be_revoked_one_at_a_time() {
2919        let mut exec = fasync::TestExecutor::new();
2920
2921        let mut streams = Streams::default();
2922        let mut test_builder = TestMediaTaskBuilder::new();
2923        streams.insert(Stream::build(
2924            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2925            test_builder.builder(),
2926        ));
2927        let sbc_endpoint_id = 1_u8.try_into().unwrap();
2928        let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2929
2930        streams.insert(Stream::build(
2931            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2932            test_builder.builder(),
2933        ));
2934        let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2935        let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2936
2937        let permits = Permits::new(2);
2938
2939        let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2940        let remote_peer = avdtp::Peer::new(remote);
2941
2942        let one_media_task = start_sbc_stream(
2943            &mut exec,
2944            &mut test_builder,
2945            &peer,
2946            &remote_peer,
2947            &sbc_endpoint_id,
2948            &remote_sbc_endpoint_id,
2949        );
2950        let two_media_task = start_sbc_stream(
2951            &mut exec,
2952            &mut test_builder,
2953            &peer,
2954            &remote_peer,
2955            &sbc2_endpoint_id,
2956            &remote_sbc2_endpoint_id,
2957        );
2958
2959        // Someone comes along and revokes one of our permits.
2960        let taken_permit = permits.take();
2961
2962        let remote_endpoints: HashSet<_> =
2963            [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2964
2965        // We should send a suspend to the other end, for both of them.
2966        let mut remote_requests = remote_peer.take_request_stream();
2967        let suspended_id = match exec.run_until_stalled(&mut remote_requests.next()) {
2968            Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2969                assert!(stream_ids.len() == 1);
2970                assert!(remote_endpoints.contains(&stream_ids[0]));
2971                responder.send().expect("send response okay");
2972                stream_ids[0].clone()
2973            }
2974            x => panic!("Expected suspension and got {:?}", x),
2975        };
2976
2977        // And the correct one of the media tasks should be stopped.
2978        if suspended_id == remote_sbc_endpoint_id {
2979            assert!(!one_media_task.is_started());
2980            assert!(two_media_task.is_started());
2981        } else {
2982            assert!(one_media_task.is_started());
2983            assert!(!two_media_task.is_started());
2984        }
2985
2986        // After the permits are available again, we send a start, and start the media stream.
2987        drop(taken_permit);
2988
2989        match exec.run_singlethreaded(&mut remote_requests.next()) {
2990            Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2991                assert_eq!(stream_ids, &[suspended_id]);
2992                responder.send().expect("send response okay");
2993            }
2994            x => panic!("Expected start and got {:?}", x),
2995        }
2996        // And we should start another media task.
2997        let media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
2998            Poll::Ready(Some(task)) => task,
2999            x => panic!("Expected media task to start: {x:?}"),
3000        };
3001        assert!(media_task.is_started());
3002    }
3003
3004    // Scenario: when we are waiting for a suspend response from the peer after a permit was not
3005    // available, we try to start the peer (because a dwell has expired)
3006    #[fuchsia::test]
3007    fn permit_suspend_start_while_suspending() {
3008        let mut exec = fasync::TestExecutor::new();
3009
3010        let mut streams = Streams::default();
3011        let mut test_builder = TestMediaTaskBuilder::new();
3012        streams.insert(Stream::build(
3013            make_sbc_endpoint(1, avdtp::EndpointType::Sink),
3014            test_builder.builder(),
3015        ));
3016        streams.insert(Stream::build(
3017            make_sbc_endpoint(2, avdtp::EndpointType::Sink),
3018            test_builder.builder(),
3019        ));
3020        let mut next_task_fut = test_builder.next_task();
3021
3022        let permits = Permits::new(1);
3023        let (remote, _profile_request_stream, _, peer) =
3024            setup_test_peer(false, streams, Some(permits.clone()));
3025
3026        let remote_peer = avdtp::Peer::new(remote);
3027        let mut remote_requests = remote_peer.take_request_stream();
3028
3029        let sbc_endpoint_id = 1_u8.try_into().unwrap();
3030
3031        let sbc_caps = sbc_capabilities();
3032        let mut set_config_fut =
3033            remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
3034
3035        match exec.run_until_stalled(&mut set_config_fut) {
3036            Poll::Ready(Ok(())) => {}
3037            x => panic!("Set capabilities should be ready but got {:?}", x),
3038        };
3039
3040        let mut open_fut = remote_peer.open(&sbc_endpoint_id);
3041        match exec.run_until_stalled(&mut open_fut) {
3042            Poll::Ready(Ok(())) => {}
3043            x => panic!("Open should be ready but got {:?}", x),
3044        };
3045
3046        // Establish a media transport stream
3047        let (_remote_transport, transport) = Channel::create();
3048        assert_eq!(Some(()), peer.receive_channel(transport).ok());
3049
3050        // At this point, we are dwelling, waiting for the peer to start the stream. Skip the timer.
3051        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3052        let Some(_deadline) = exec.wake_next_timer() else {
3053            panic!("Expected a timer to be waiting to run");
3054        };
3055
3056        // We will try to start it ourselves, which will take the only permit and send a start.
3057        let start_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3058            Some(Ok(avdtp::Request::Start { stream_ids, responder })) => {
3059                assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3060                responder
3061            }
3062            x => panic!("Expected a Start request, got {x:?}"),
3063        };
3064
3065        assert!(permits.get().is_none());
3066
3067        // The peer doesn't notice. Instead try to start it from the peer side (bad timing)
3068        let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
3069
3070        // We get an OK, and then immediately a suspend request because there are no
3071        // permits available.
3072        match exec.run_singlethreaded(&mut start_fut) {
3073            Ok(()) => {}
3074            x => panic!("Expected OK response from start future but got {x:?}"),
3075        }
3076
3077        let suspend_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3078            Some(Ok(avdtp::Request::Suspend { stream_ids, responder })) => {
3079                assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3080                responder
3081            }
3082            x => panic!("Expected a suspend got {x:?}"),
3083        };
3084
3085        // At this point, the peer notices the start request and responds.
3086        start_responder.send().unwrap();
3087
3088        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3089
3090        // Okay I guess..
3091        suspend_responder.send().unwrap();
3092
3093        // And we should start a task.
3094        let media_task = match exec.run_until_stalled(&mut next_task_fut) {
3095            Poll::Ready(Some(task)) => task,
3096            x => panic!("Local task should be created at this point: {:?}", x),
3097        };
3098
3099        assert!(media_task.is_started());
3100    }
3101
3102    /// Test that the version check method correctly differentiates between newer
3103    /// and older A2DP versions.
3104    #[fuchsia::test]
3105    fn version_check() {
3106        let p1: ProfileDescriptor = ProfileDescriptor {
3107            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3108            major_version: Some(1),
3109            minor_version: Some(3),
3110            ..Default::default()
3111        };
3112        assert_eq!(true, a2dp_version_check(p1));
3113
3114        let p1: ProfileDescriptor = ProfileDescriptor {
3115            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3116            major_version: Some(2),
3117            minor_version: Some(10),
3118            ..Default::default()
3119        };
3120        assert_eq!(true, a2dp_version_check(p1));
3121
3122        let p1: ProfileDescriptor = ProfileDescriptor {
3123            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3124            major_version: Some(1),
3125            minor_version: Some(0),
3126            ..Default::default()
3127        };
3128        assert_eq!(false, a2dp_version_check(p1));
3129
3130        let p1: ProfileDescriptor = ProfileDescriptor {
3131            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3132            major_version: None,
3133            minor_version: Some(9),
3134            ..Default::default()
3135        };
3136        assert_eq!(false, a2dp_version_check(p1));
3137
3138        let p1: ProfileDescriptor = ProfileDescriptor {
3139            profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3140            major_version: Some(2),
3141            minor_version: Some(2),
3142            ..Default::default()
3143        };
3144        assert_eq!(true, a2dp_version_check(p1));
3145    }
3146}