1use anyhow::Context as _;
6use bt_avdtp::{
7 self as avdtp, MediaCodecType, ServiceCapability, ServiceCategory, StreamEndpoint,
8 StreamEndpointId,
9};
10use fidl_fuchsia_bluetooth::ChannelParameters;
11use fidl_fuchsia_bluetooth_bredr::{
12 ConnectParameters, L2capParameters, PSM_AVDTP, ProfileDescriptor, ProfileProxy,
13};
14use fuchsia_async::{self as fasync, DurationExt};
15use fuchsia_bluetooth::inspect::DebugExt;
16use fuchsia_bluetooth::types::{Channel, PeerId};
17use fuchsia_inspect as inspect;
18use fuchsia_inspect_derive::{AttachError, Inspect};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc;
21use futures::future::{BoxFuture, Either};
22use futures::stream::FuturesUnordered;
23use futures::task::{Context, Poll, Waker};
24use futures::{Future, FutureExt, StreamExt, select};
25use log::{debug, info, trace, warn};
26use std::collections::{BTreeMap, HashMap, HashSet};
27use std::pin::Pin;
28use std::sync::{Arc, Weak};
29
30mod controller;
32pub use controller::ControllerPool;
33
34use crate::codec::MediaCodecConfig;
35use crate::permits::{Permit, Permits};
36use crate::stream::{Stream, Streams};
37
38#[derive(Inspect)]
41pub struct Peer {
42 id: PeerId,
44 #[inspect(forward)]
46 inner: Arc<Mutex<PeerInner>>,
47 profile: ProfileProxy,
49 descriptor: Mutex<Option<ProfileDescriptor>>,
51 closed_wakers: Arc<Mutex<Option<Vec<Waker>>>>,
55 metrics: bt_metrics::MetricsLogger,
57 start_stream_task: Mutex<Option<fasync::Task<avdtp::Result<()>>>>,
59}
60
61#[derive(Clone)]
65struct StreamPermits {
66 permits: Permits,
67 open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
68 reserved_streams: Arc<Mutex<HashSet<StreamEndpointId>>>,
69 inner: Weak<Mutex<PeerInner>>,
70 peer_id: PeerId,
71 sender: mpsc::UnboundedSender<BoxFuture<'static, StreamPermit>>,
72}
73
74#[derive(Debug)]
75struct StreamPermit {
76 local_id: StreamEndpointId,
77 open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
78}
79
80impl StreamPermit {
81 fn local_id(&self) -> &StreamEndpointId {
82 &self.local_id
83 }
84
85 fn is_held(&self) -> bool {
87 self.open_streams.lock().contains_key(&self.local_id)
88 }
89}
90
91impl Drop for StreamPermit {
92 fn drop(&mut self) {
93 let _ = self.open_streams.lock().remove(&self.local_id);
94 }
95}
96
97impl StreamPermits {
98 fn new(
99 inner: Weak<Mutex<PeerInner>>,
100 peer_id: PeerId,
101 permits: Permits,
102 ) -> (Self, mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>) {
103 let (sender, reservations_receiver) = futures::channel::mpsc::unbounded();
104 (
105 Self {
106 inner,
107 permits,
108 peer_id,
109 sender,
110 open_streams: Default::default(),
111 reserved_streams: Default::default(),
112 },
113 reservations_receiver,
114 )
115 }
116
117 fn label_for(&self, local_id: &StreamEndpointId) -> String {
118 format!("{} {}", self.peer_id, local_id)
119 }
120
121 fn get(&self, local_id: StreamEndpointId) -> Option<StreamPermit> {
124 let revoke_fn = self.make_revocation_fn(&local_id);
125 let Some(permit) = self.permits.get_revokable(revoke_fn) else {
126 info!("No permits available: {:?}", self.permits);
127 return None;
128 };
129 permit.relabel(self.label_for(&local_id));
130 if let Some(_) = self.open_streams.lock().insert(local_id.clone(), permit) {
131 warn!(id:% = self.peer_id; "Started stream {local_id:?} twice, dropping previous permit");
132 }
133 Some(StreamPermit { local_id, open_streams: self.open_streams.clone() })
134 }
135
136 fn setup_reservation_for(&self, local_id: StreamEndpointId) {
139 if !self.reserved_streams.lock().insert(local_id.clone()) {
140 return;
142 }
143 let restart_stream_available_fut = {
144 let self_revoke_fn = Self::make_revocation_fn(&self, &local_id);
145 let reservation = self.permits.reserve_revokable(self_revoke_fn);
146 let open_streams = self.open_streams.clone();
147 let reserved_streams = self.reserved_streams.clone();
148 let label = self.label_for(&local_id);
149 let local_id = local_id.clone();
150 async move {
151 let permit = reservation.await;
152 permit.relabel(label);
153 if open_streams.lock().insert(local_id.clone(), permit).is_some() {
154 warn!("Reservation replaces acquired permit for {}", local_id.clone());
155 }
156 if !reserved_streams.lock().remove(&local_id) {
157 warn!(local_id:%; "Unrecorded reservation resolved");
158 }
159 StreamPermit { local_id, open_streams }
160 }
161 };
162 if let Err(e) = self.sender.unbounded_send(restart_stream_available_fut.boxed()) {
163 warn!(id:% = self.peer_id, local_id:%, e:?; "Couldn't queue reservation to finish");
164 }
165 }
166
167 fn revocation_fn(self, local_id: StreamEndpointId) -> Permit {
172 if let Ok(peer) = PeerInner::upgrade(self.inner.clone()) {
173 {
174 let mut lock = peer.lock();
175 match lock.suspend_local_stream(&local_id) {
176 Ok(remote_id) => drop(lock.peer.suspend(&[remote_id])),
177 Err(e) => warn!("Couldn't stop local stream {local_id:?}: {e:?}"),
178 }
179 }
180 self.setup_reservation_for(local_id.clone());
181 }
182 self.open_streams.lock().remove(&local_id).expect("permit revoked but don't have it")
183 }
184
185 fn make_revocation_fn(&self, local_id: &StreamEndpointId) -> impl FnOnce() -> Permit + use<> {
186 let local_id = local_id.clone();
187 let cloned = self.clone();
188 move || cloned.revocation_fn(local_id)
189 }
190}
191
192impl Peer {
193 pub fn create(
200 id: PeerId,
201 peer: avdtp::Peer,
202 streams: Streams,
203 permits: Option<Permits>,
204 profile: ProfileProxy,
205 metrics: bt_metrics::MetricsLogger,
206 ) -> Self {
207 let inner = Arc::new(Mutex::new(PeerInner::new(peer, id, streams, metrics.clone())));
208 let reservations_receiver = if let Some(permits) = permits {
209 let (stream_permits, receiver) =
210 StreamPermits::new(Arc::downgrade(&inner), id, permits);
211 inner.lock().permits = Some(stream_permits);
212 receiver
213 } else {
214 let (_, receiver) = mpsc::unbounded();
215 receiver
216 };
217 let res = Self {
218 id,
219 inner,
220 profile,
221 descriptor: Mutex::new(None),
222 closed_wakers: Arc::new(Mutex::new(Some(Vec::new()))),
223 metrics,
224 start_stream_task: Mutex::new(None),
225 };
226 res.start_requests_task(reservations_receiver);
227 res
228 }
229
230 pub fn set_descriptor(&self, descriptor: ProfileDescriptor) -> Option<ProfileDescriptor> {
231 self.descriptor.lock().replace(descriptor)
232 }
233
234 const STREAM_DWELL: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(500);
237
238 pub fn receive_channel(&self, channel: Channel) -> avdtp::Result<()> {
242 let mut lock = self.inner.lock();
243 if lock.receive_channel(channel)? {
244 let weak = Arc::downgrade(&self.inner);
245 let mut task_lock = self.start_stream_task.lock();
246 *task_lock = Some(fasync::Task::local(async move {
247 trace!("Dwelling to start remotely-opened stream..");
248 fasync::Timer::new(Self::STREAM_DWELL.after_now()).await;
249 PeerInner::start_opened(weak).await
250 }));
251 }
252 Ok(())
253 }
254
255 pub fn avdtp(&self) -> avdtp::Peer {
257 let lock = self.inner.lock();
258 lock.peer.clone()
259 }
260
261 pub fn remote_endpoints(&self) -> Option<Vec<avdtp::StreamEndpoint>> {
263 self.inner.lock().remote_endpoints()
264 }
265
266 pub fn collect_capabilities(
270 &self,
271 ) -> impl Future<Output = avdtp::Result<Vec<avdtp::StreamEndpoint>>> + use<> {
272 let avdtp = self.avdtp();
273 let get_all = self.descriptor.lock().clone().is_some_and(a2dp_version_check);
274 let inner = self.inner.clone();
275 let metrics = self.metrics.clone();
276 let peer_id = self.id;
277 async move {
278 if let Some(caps) = inner.lock().remote_endpoints() {
279 return Ok(caps);
280 }
281 trace!("Discovering peer streams..");
282 let infos = avdtp.discover().await?;
283 trace!("Discovered {} streams", infos.len());
284 let mut remote_streams = Vec::new();
285 for info in infos {
286 let capabilities = if get_all {
287 avdtp.get_all_capabilities(info.id()).await
288 } else {
289 avdtp.get_capabilities(info.id()).await
290 };
291 match capabilities {
292 Ok(capabilities) => {
293 trace!("Stream {:?}", info);
294 for cap in &capabilities {
295 trace!(" - {:?}", cap);
296 }
297 remote_streams.push(avdtp::StreamEndpoint::from_info(&info, capabilities));
298 }
299 Err(e) => {
300 info!(peer_id:%; "Stream {} capabilities failed: {:?}, skipping", info.id(), e);
301 }
302 };
303 }
304 inner.lock().set_remote_endpoints(&remote_streams);
305 Self::record_cobalt_metrics(metrics, &remote_streams);
306 Ok(remote_streams)
307 }
308 }
309
310 fn record_cobalt_metrics(metrics: bt_metrics::MetricsLogger, endpoints: &[StreamEndpoint]) {
311 let codec_metrics: HashSet<_> = endpoints
312 .iter()
313 .filter_map(|endpoint| {
314 endpoint.codec_type().map(|t| codectype_to_availability_metric(t) as u32)
315 })
316 .collect();
317 metrics
318 .log_occurrences(bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID, codec_metrics);
319
320 let cap_metrics: HashSet<_> = endpoints
321 .iter()
322 .flat_map(|endpoint| {
323 endpoint
324 .capabilities()
325 .iter()
326 .filter_map(|t| capability_to_metric(t))
327 .chain(std::iter::once(
328 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic,
329 ))
330 .map(|t| t as u32)
331 })
332 .collect();
333 metrics.log_occurrences(bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID, cap_metrics);
334 }
335
336 fn transport_channel_params() -> L2capParameters {
337 L2capParameters {
338 psm: Some(PSM_AVDTP),
339 parameters: Some(ChannelParameters {
340 max_rx_packet_size: Some(65535),
341 ..Default::default()
342 }),
343 ..Default::default()
344 }
345 }
346
347 pub fn stream_start(
352 &self,
353 remote_id: StreamEndpointId,
354 capabilities: Vec<ServiceCapability>,
355 ) -> impl Future<Output = avdtp::Result<()>> {
356 let peer = Arc::downgrade(&self.inner);
357 let peer_id = self.id.clone();
358 let avdtp = self.avdtp();
359 let profile = self.profile.clone();
360
361 async move {
362 let codec_params =
363 capabilities.iter().find(|x| x.is_codec()).ok_or(avdtp::Error::InvalidState)?;
364 let (local_id, local_capabilities) = {
365 let peer = PeerInner::upgrade(peer.clone())?;
366 let lock = peer.lock();
367 lock.find_compatible_local_capabilities(codec_params, &remote_id)?
368 };
369
370 let local_by_cat: HashMap<ServiceCategory, ServiceCapability> =
371 local_capabilities.into_iter().map(|i| (i.category(), i)).collect();
372
373 let shared_capabilities: BTreeMap<ServiceCategory, ServiceCapability> = capabilities
376 .into_iter()
377 .filter_map(|cap| {
378 let Some(local_cap) = local_by_cat.get(&cap.category()) else {
379 return None;
380 };
381 if cap.category() == ServiceCategory::MediaCodec {
382 let Ok(a) = MediaCodecConfig::try_from(&cap) else {
383 return None;
384 };
385 let Ok(b) = MediaCodecConfig::try_from(local_cap) else {
386 return None;
387 };
388 let Some(negotiated) = MediaCodecConfig::negotiate(&a, &b) else {
389 return None;
390 };
391 Some((cap.category(), (&negotiated).into()))
392 } else {
393 Some((cap.category(), cap))
394 }
395 })
396 .collect();
397 let shared_capabilities: Vec<_> = shared_capabilities.into_values().collect();
398
399 trace!("Starting stream {local_id} to remote {remote_id} with {shared_capabilities:?}");
400
401 avdtp.set_configuration(&remote_id, &local_id, &shared_capabilities).await?;
402 {
403 let strong = PeerInner::upgrade(peer.clone())?;
404 strong.lock().set_opening(&local_id, &remote_id, shared_capabilities)?;
405 }
406 avdtp.open(&remote_id).await?;
407
408 debug!(peer_id:%; "Connecting transport channel");
409 let channel = profile
410 .connect(
411 &peer_id.into(),
412 &ConnectParameters::L2cap(Self::transport_channel_params()),
413 )
414 .await
415 .context("FIDL error: {}")?
416 .or(Err(avdtp::Error::PeerDisconnected))?;
417 trace!(peer_id:%; "Connected transport channel, converting to local Channel");
418 let channel = match channel.try_into() {
419 Err(e) => {
420 warn!(peer_id:%, e:?; "Couldn't connect media transport: no channel");
421 return Err(avdtp::Error::PeerDisconnected);
422 }
423 Ok(c) => c,
424 };
425
426 trace!(peer_id:%; "Connected transport channel, passing to Peer..");
427
428 {
429 let strong = PeerInner::upgrade(peer.clone())?;
430 let _ = strong.lock().receive_channel(channel)?;
431 }
432 PeerInner::start_opened(peer).await
434 }
435 }
436
437 pub fn streaming_active(&self) -> bool {
439 self.inner.lock().is_streaming() || self.will_start_streaming()
440 }
441
442 #[cfg(test)]
444 fn is_streaming_now(&self) -> bool {
445 self.inner.lock().is_streaming_now()
446 }
447
448 fn will_start_streaming(&self) -> bool {
451 let mut task_lock = self.start_stream_task.lock();
452 if task_lock.is_none() {
453 return false;
454 }
455 let mut cx = Context::from_waker(&std::task::Waker::noop());
457 if let Poll::Pending = task_lock.as_mut().unwrap().poll_unpin(&mut cx) {
458 return true;
459 }
460 let _ = task_lock.take();
462 false
463 }
464
465 pub fn stream_suspend(
470 &self,
471 local_id: StreamEndpointId,
472 ) -> impl Future<Output = avdtp::Result<()>> {
473 let peer = Arc::downgrade(&self.inner);
474 PeerInner::suspend(peer, local_id)
475 }
476
477 fn start_requests_task(
480 &self,
481 mut reservations_receiver: mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>,
482 ) {
483 let lock = self.inner.lock();
484 let mut request_stream = lock.peer.take_request_stream();
485 let id = self.id.clone();
486 let peer = Arc::downgrade(&self.inner);
487 let mut stream_reservations = FuturesUnordered::new();
488 let disconnect_wakers = Arc::downgrade(&self.closed_wakers);
489 fuchsia_async::Task::local(async move {
490 loop {
491 select! {
492 request = request_stream.next() => {
493 match request {
494 None => break,
495 Some(Err(e)) => info!(peer_id:% = id, e:?; "Request stream error"),
496 Some(Ok(request)) => match peer.upgrade() {
497 None => return,
498 Some(p) => {
499 let result_or_future = p.lock().handle_request(request);
500 let result = match result_or_future {
501 Either::Left(result) => result,
502 Either::Right(future) => future.await,
503 };
504 if let Err(e) = result {
505 warn!(peer_id:% = id, e:?; "Error handling request");
506 }
507 }
508 },
509 }
510 },
511 reservation_fut = reservations_receiver.select_next_some() => {
512 stream_reservations.push(reservation_fut)
513 },
514 permit = stream_reservations.select_next_some() => {
515 if let Err(e) = PeerInner::start_permit(peer.clone(), permit).await {
516 warn!(peer_id:% = id, e:?; "Couldn't start stream after unpause");
517 }
518 }
519 complete => break,
520 }
521 }
522 info!(peer_id:% = id; "disconnected");
523 if let Some(wakers) = disconnect_wakers.upgrade() {
524 for waker in wakers.lock().take().unwrap_or_else(Vec::new) {
525 waker.wake();
526 }
527 }
528 })
529 .detach();
530 }
531
532 pub fn closed(&self) -> ClosedPeer {
534 ClosedPeer { inner: Arc::downgrade(&self.closed_wakers) }
535 }
536}
537
538#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct ClosedPeer {
542 inner: Weak<Mutex<Option<Vec<Waker>>>>,
543}
544
545impl Future for ClosedPeer {
546 type Output = ();
547
548 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
549 match self.inner.upgrade() {
550 None => Poll::Ready(()),
551 Some(inner) => match inner.lock().as_mut() {
552 None => Poll::Ready(()),
553 Some(wakers) => {
554 wakers.push(cx.waker().clone());
555 Poll::Pending
556 }
557 },
558 }
559 }
560}
561
562fn a2dp_version_check(profile: ProfileDescriptor) -> bool {
564 let (Some(major), Some(minor)) = (profile.major_version, profile.minor_version) else {
565 return false;
566 };
567 (major == 1 && minor >= 3) || major > 1
568}
569
570struct PeerInner {
574 peer: avdtp::Peer,
576 peer_id: PeerId,
578 opening: Option<StreamEndpointId>,
581 local: Streams,
583 permits: Option<StreamPermits>,
585 started: HashMap<StreamEndpointId, WatchedStream>,
587 inspect: fuchsia_inspect::Node,
589 remote_endpoints: Option<Vec<StreamEndpoint>>,
591 remote_inspect: fuchsia_inspect::Node,
593 metrics: bt_metrics::MetricsLogger,
595}
596
597impl Inspect for &mut PeerInner {
598 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
601 self.inspect = parent.create_child(name.as_ref());
602 self.inspect.record_string("id", self.peer_id.to_string());
603 self.local.iattach(&self.inspect, "local_streams")
604 }
605}
606
607impl PeerInner {
608 pub fn new(
609 peer: avdtp::Peer,
610 peer_id: PeerId,
611 local: Streams,
612 metrics: bt_metrics::MetricsLogger,
613 ) -> Self {
614 Self {
615 peer,
616 peer_id,
617 opening: None,
618 local,
619 permits: None,
620 started: HashMap::new(),
621 inspect: Default::default(),
622 remote_endpoints: None,
623 remote_inspect: Default::default(),
624 metrics,
625 }
626 }
627
628 fn get_mut(&mut self, local_id: &StreamEndpointId) -> Result<&mut Stream, avdtp::ErrorCode> {
630 self.local.get_mut(&local_id).ok_or(avdtp::ErrorCode::BadAcpSeid)
631 }
632
633 fn set_remote_endpoints(&mut self, endpoints: &[StreamEndpoint]) {
634 self.remote_inspect = self.inspect.create_child("remote_endpoints");
635 for endpoint in endpoints {
636 self.remote_inspect.record_child(inspect::unique_name("remote_"), |node| {
637 node.record_string("endpoint_id", endpoint.local_id().debug());
638 node.record_string("capabilities", endpoint.capabilities().debug());
639 node.record_string("type", endpoint.endpoint_type().debug());
640 });
641 }
642 self.remote_endpoints = Some(endpoints.iter().map(StreamEndpoint::as_new).collect());
643 }
644
645 fn remote_endpoints(&self) -> Option<Vec<StreamEndpoint>> {
647 self.remote_endpoints.as_ref().map(|v| v.iter().map(StreamEndpoint::as_new).collect())
648 }
649
650 fn remote_endpoint(&self, id: &StreamEndpointId) -> Option<StreamEndpoint> {
652 self.remote_endpoints
653 .as_ref()
654 .and_then(|v| v.iter().find(|v| v.local_id() == id).map(StreamEndpoint::as_new))
655 }
656
657 fn is_streaming(&self) -> bool {
659 self.is_streaming_now() || self.opening.is_some()
660 }
661
662 fn is_streaming_now(&self) -> bool {
664 self.local.streaming().next().is_some()
665 }
666
667 fn set_opening(
668 &mut self,
669 local_id: &StreamEndpointId,
670 remote_id: &StreamEndpointId,
671 capabilities: Vec<ServiceCapability>,
672 ) -> avdtp::Result<()> {
673 if self.opening.is_some() {
674 return Err(avdtp::Error::InvalidState);
675 }
676 let peer_id = self.peer_id;
677 let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
678 stream
679 .configure(&peer_id, &remote_id, capabilities)
680 .map_err(|(cat, c)| avdtp::Error::RequestInvalidExtra(c, (&cat).into()))?;
681 stream.endpoint_mut().establish().or(Err(avdtp::Error::InvalidState))?;
682 self.opening = Some(local_id.clone());
683 Ok(())
684 }
685
686 fn upgrade(weak: Weak<Mutex<Self>>) -> avdtp::Result<Arc<Mutex<Self>>> {
687 weak.upgrade().ok_or(avdtp::Error::PeerDisconnected)
688 }
689
690 async fn start_opened(weak: Weak<Mutex<Self>>) -> avdtp::Result<()> {
692 let (avdtp, stream_pairs) = {
693 let peer = Self::upgrade(weak.clone())?;
694 let peer = peer.lock();
695 let stream_pairs: Vec<(StreamEndpointId, StreamEndpointId)> = peer
696 .local
697 .open()
698 .filter_map(|stream| {
699 let endpoint = stream.endpoint();
700 endpoint.remote_id().map(|id| (endpoint.local_id().clone(), id.clone()))
701 })
702 .collect();
703 (peer.peer.clone(), stream_pairs)
704 };
705 for (local_id, remote_id) in stream_pairs {
706 let permit_result =
707 Self::upgrade(weak.clone())?.lock().get_permit_or_reserve(&local_id);
708 if let Ok(permit) = permit_result {
709 Self::initiated_start(avdtp.clone(), weak.clone(), permit, &local_id, &remote_id)
710 .await?;
711 }
712 }
713 Ok(())
714 }
715
716 async fn start_permit(weak: Weak<Mutex<Self>>, permit: StreamPermit) -> avdtp::Result<()> {
717 let local_id = permit.local_id().clone();
718 let (avdtp, remote_id) = {
719 let peer = Self::upgrade(weak.clone())?;
720 let mut peer = peer.lock();
721 let remote_id = peer
722 .get_mut(&local_id)
723 .map_err(|e| avdtp::Error::RequestInvalid(e))?
724 .endpoint()
725 .remote_id()
726 .ok_or(avdtp::Error::InvalidState)?
727 .clone();
728 (peer.peer.clone(), remote_id)
729 };
730 Self::initiated_start(avdtp, weak, Some(permit), &local_id, &remote_id).await
731 }
732
733 async fn initiated_start(
735 avdtp: avdtp::Peer,
736 weak: Weak<Mutex<Self>>,
737 permit: Option<StreamPermit>,
738 local_id: &StreamEndpointId,
739 remote_id: &StreamEndpointId,
740 ) -> avdtp::Result<()> {
741 trace!(permit:?, local_id:?, remote_id:?; "Making outgoing start request");
742 let to_start = std::slice::from_ref(remote_id);
743 avdtp.start(to_start).await?;
744 trace!("Start response received: {permit:?}");
745 let peer = Self::upgrade(weak.clone())?;
746 let (peer_id, start_result) = {
747 let mut peer = peer.lock();
748 (peer.peer_id, peer.start_local_stream(permit, &local_id))
749 };
750 if let Err(e) = start_result {
751 warn!(peer_id:%, local_id:%, remote_id:%, e:?; "Failed to start local stream, suspending");
752 avdtp.suspend(to_start).await?;
753 }
754 Ok(())
755 }
756
757 fn suspend(
759 weak: Weak<Mutex<Self>>,
760 local_id: StreamEndpointId,
761 ) -> impl Future<Output = avdtp::Result<()>> {
762 let res = (move || {
763 let peer = Self::upgrade(weak.clone())?;
764 let mut peer = peer.lock();
765 Ok((peer.peer.clone(), peer.suspend_local_stream(&local_id)?))
766 })();
767 let (avdtp, remote_id) = match res {
768 Err(e) => return futures::future::err(e).left_future(),
769 Ok(r) => r,
770 };
771 let to_suspend = &[remote_id];
772 avdtp.suspend(to_suspend).right_future()
773 }
774
775 pub fn find_compatible_local_capabilities(
778 &self,
779 codec_params: &ServiceCapability,
780 remote_id: &StreamEndpointId,
781 ) -> avdtp::Result<(StreamEndpointId, Vec<ServiceCapability>)> {
782 let config = codec_params.try_into()?;
783 let our_direction = self.remote_endpoint(remote_id).map(|e| e.endpoint_type().opposite());
784 debug!(codec_params:?, local:? = self.local; "Looking for compatible local stream");
785 self.local
786 .compatible(config)
787 .find_map(|s| {
788 let endpoint = s.endpoint();
789 if let Some(d) = our_direction {
790 if &d != endpoint.endpoint_type() {
791 return None;
792 }
793 }
794 Some((endpoint.local_id().clone(), endpoint.capabilities().clone()))
795 })
796 .ok_or(avdtp::Error::OutOfRange)
797 }
798
799 fn get_permit_or_reserve(
803 &self,
804 local_id: &StreamEndpointId,
805 ) -> Result<Option<StreamPermit>, ()> {
806 let Some(permits) = self.permits.as_ref() else {
807 return Ok(None);
808 };
809 if let Some(permit) = permits.get(local_id.clone()) {
810 return Ok(Some(permit));
811 }
812 info!(peer_id:% = self.peer_id, local_id:%; "No permit to start stream, adding a reservation");
813 permits.setup_reservation_for(local_id.clone());
814 Err(())
815 }
816
817 fn start_local_stream(
820 &mut self,
821 permit: Option<StreamPermit>,
822 local_id: &StreamEndpointId,
823 ) -> avdtp::Result<()> {
824 let peer_id = self.peer_id;
825 let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
826 if permit.as_ref().is_some_and(|p| !p.is_held()) {
829 return Err(avdtp::Error::Other(anyhow::format_err!(
830 "streaming permit revoked during setup"
831 )));
832 }
833
834 info!(peer_id:%, stream:?; "Starting");
835 let stream_finished = stream.start().map_err(|c| avdtp::Error::RequestInvalid(c))?;
836 let watched_stream = WatchedStream::new(permit, stream_finished);
838 if self.started.insert(local_id.clone(), watched_stream).is_some() {
839 warn!(peer_id:%, local_id:%; "Stream that was already started");
840 }
841 Ok(())
842 }
843
844 fn suspend_local_stream(
847 &mut self,
848 local_id: &StreamEndpointId,
849 ) -> avdtp::Result<StreamEndpointId> {
850 let peer_id = self.peer_id;
851 let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
852 let remote_id = stream.endpoint().remote_id().ok_or(avdtp::Error::InvalidState)?.clone();
853 info!(peer_id:%; "Suspend stream local {local_id} <-> {remote_id} remote");
854 stream.suspend().map_err(|c| avdtp::Error::RequestInvalid(c))?;
855 let _ = self.started.remove(local_id);
856 Ok(remote_id)
857 }
858
859 fn receive_channel(&mut self, channel: Channel) -> avdtp::Result<bool> {
864 let stream_id = self.opening.as_ref().cloned().ok_or(avdtp::Error::InvalidState)?;
865 let stream = self.get_mut(&stream_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
866 let done = !stream.endpoint_mut().receive_channel(channel)?;
867 if done {
868 self.opening = None;
869 }
870 info!(peer_id:% = self.peer_id, stream_id:%; "Transport connected");
871 Ok(done)
872 }
873
874 fn handle_request(
876 &mut self,
877 request: avdtp::Request,
878 ) -> Either<avdtp::Result<()>, impl Future<Output = avdtp::Result<()>> + use<>> {
879 use avdtp::ErrorCode;
880 use avdtp::Request::*;
881 trace!("Handling {request:?} from peer..");
882 let immediate_result = 'result: {
883 match request {
884 Discover { responder } => responder.send(&self.local.information()),
885 GetCapabilities { responder, stream_id }
886 | GetAllCapabilities { responder, stream_id } => match self.local.get(&stream_id) {
887 None => responder.reject(ErrorCode::BadAcpSeid),
888 Some(stream) => responder.send(stream.endpoint().capabilities()),
889 },
890 Open { responder, stream_id } => {
891 if self.opening.is_none() {
892 break 'result responder.reject(ErrorCode::BadState);
893 }
894 let Ok(stream) = self.get_mut(&stream_id) else {
895 break 'result responder.reject(ErrorCode::BadAcpSeid);
896 };
897 match stream.endpoint_mut().establish() {
898 Ok(()) => responder.send(),
899 Err(_) => responder.reject(ErrorCode::BadState),
900 }
901 }
902 Close { responder, stream_id } => {
903 let peer = self.peer.clone();
904 let Ok(stream) = self.get_mut(&stream_id) else {
905 break 'result responder.reject(ErrorCode::BadAcpSeid);
906 };
907 stream.release(responder, &peer)
908 }
909 SetConfiguration { responder, local_stream_id, remote_stream_id, capabilities } => {
910 if self.opening.is_some() {
911 break 'result responder.reject(ServiceCategory::None, ErrorCode::BadState);
912 }
913 let peer_id = self.peer_id;
914 let Ok(stream) = self.get_mut(&local_stream_id) else {
915 break 'result responder
916 .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
917 };
918 match stream.configure(&peer_id, &remote_stream_id, capabilities) {
919 Ok(_) => {
920 self.opening = Some(local_stream_id);
921 responder.send()
922 }
923 Err((category, code)) => responder.reject(category, code),
924 }
925 }
926 GetConfiguration { stream_id, responder } => {
927 let Ok(stream) = self.get_mut(&stream_id) else {
928 break 'result responder.reject(ErrorCode::BadAcpSeid);
929 };
930 let Some(vec_capabilities) = stream.endpoint().get_configuration() else {
931 break 'result responder.reject(ErrorCode::BadState);
932 };
933 responder.send(vec_capabilities.as_slice())
934 }
935 Reconfigure { responder, local_stream_id, capabilities } => {
936 let Ok(stream) = self.get_mut(&local_stream_id) else {
937 break 'result responder
938 .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
939 };
940 match stream.reconfigure(capabilities) {
941 Ok(_) => responder.send(),
942 Err((cat, code)) => responder.reject(cat, code),
943 }
944 }
945 Start { responder, stream_ids } => {
946 let mut immediate_suspend = Vec::new();
947 let result = stream_ids.into_iter().try_for_each(|seid| {
949 let Some(stream) = self.local.get_mut(&seid) else {
950 return Err((seid, ErrorCode::BadAcpSeid));
951 };
952 let remote_id = stream.endpoint().remote_id().cloned();
953 let Some(remote_id) = remote_id else {
954 return Err((seid, ErrorCode::BadState));
955 };
956 let Ok(permit) = self.get_permit_or_reserve(&seid) else {
957 immediate_suspend.push(remote_id);
961 return Ok(());
962 };
963 match self.start_local_stream(permit, &seid) {
964 Ok(()) => Ok(()),
965 Err(avdtp::Error::RequestInvalid(code)) => Err((seid, code)),
966 Err(_) => Err((seid, ErrorCode::BadState)),
967 }
968 });
969 let response_result = match result {
970 Ok(()) => responder.send(),
971 Err((seid, code)) => responder.reject(&seid, code),
972 };
973 {
974 let peer = self.peer.clone();
975 return Either::Right(async move {
976 if !immediate_suspend.is_empty() {
977 peer.suspend(immediate_suspend.as_slice()).await?;
978 }
979 response_result
980 });
981 }
982 }
983 Suspend { responder, stream_ids } => {
984 for seid in stream_ids {
985 match self.suspend_local_stream(&seid) {
986 Ok(_remote_id) => {}
987 Err(avdtp::Error::RequestInvalid(code)) => {
988 break 'result responder.reject(&seid, code);
989 }
990 Err(_e) => break 'result responder.reject(&seid, ErrorCode::BadState),
991 }
992 }
993 responder.send()
994 }
995 Abort { responder, stream_id } => {
996 let Ok(stream) = self.get_mut(&stream_id) else {
997 break 'result Ok(());
999 };
1000 stream.abort();
1001 self.opening = self.opening.take().filter(|local_id| local_id != &stream_id);
1002 responder.send()
1003 }
1004 DelayReport { responder, delay, stream_id } => {
1005 let delay_ns = delay as u64 * 100000;
1007 self.metrics.log_integer(
1009 bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID,
1010 delay_ns.try_into().unwrap_or(-1),
1011 vec![],
1012 );
1013 let Some(stream) = self.local.get_mut(&stream_id) else {
1015 break 'result responder.reject(avdtp::ErrorCode::BadAcpSeid);
1016 };
1017 let delay_str = format!("delay {}.{} ms", delay / 10, delay % 10);
1018 let peer = self.peer_id;
1019 match stream.set_delay(std::time::Duration::from_nanos(delay_ns)) {
1020 Ok(()) => info!(peer:%, stream_id:%; "reported {delay_str}"),
1021 Err(avdtp::ErrorCode::BadState) => {
1022 info!(peer:%, stream_id:%; "bad state {delay_str}");
1023 break 'result responder.reject(avdtp::ErrorCode::BadState);
1024 }
1025 Err(e) => info!(peer:%, stream_id:%, e:?; "failed {delay_str}"),
1026 };
1027 responder.send()
1029 }
1030 }
1031 };
1032 Either::Left(immediate_result)
1033 }
1034}
1035
1036struct WatchedStream {
1039 _permit_task: fasync::Task<()>,
1040}
1041
1042impl WatchedStream {
1043 fn new(
1044 permit: Option<StreamPermit>,
1045 finish_fut: BoxFuture<'static, Result<(), anyhow::Error>>,
1046 ) -> Self {
1047 let permit_task = fasync::Task::spawn(async move {
1048 let _ = finish_fut.await;
1049 drop(permit);
1050 });
1051 Self { _permit_task: permit_task }
1052 }
1053}
1054
1055fn codectype_to_availability_metric(
1056 codec_type: &MediaCodecType,
1057) -> bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec {
1058 match codec_type {
1059 &MediaCodecType::AUDIO_SBC => {
1060 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc
1061 }
1062 &MediaCodecType::AUDIO_MPEG12 => {
1063 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Mpeg12
1064 }
1065 &MediaCodecType::AUDIO_AAC => {
1066 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Aac
1067 }
1068 &MediaCodecType::AUDIO_ATRAC => {
1069 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac
1070 }
1071 &MediaCodecType::AUDIO_NON_A2DP => {
1072 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::VendorSpecific
1073 }
1074 _ => bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown,
1075 }
1076}
1077
1078fn capability_to_metric(
1079 cap: &ServiceCapability,
1080) -> Option<bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability> {
1081 match cap {
1082 ServiceCapability::DelayReporting => {
1083 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport)
1084 }
1085 ServiceCapability::Reporting => {
1086 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Reporting)
1087 }
1088 ServiceCapability::Recovery { .. } => {
1089 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Recovery)
1090 }
1091 ServiceCapability::ContentProtection { .. } => {
1092 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::ContentProtection)
1093 }
1094 ServiceCapability::HeaderCompression { .. } => {
1095 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::HeaderCompression)
1096 }
1097 ServiceCapability::Multiplexing { .. } => {
1098 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Multiplexing)
1099 }
1100 other => {
1102 trace!("untracked remote peer capability: {:?}", other);
1103 None
1104 }
1105 }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110 use super::*;
1111
1112 use async_utils::PollExt;
1113 use bt_metrics::respond_to_metrics_req_for_test;
1114 use fidl::endpoints::create_proxy_and_stream;
1115 use fidl_fuchsia_bluetooth::ErrorCode;
1116 use fidl_fuchsia_bluetooth_bredr::{
1117 ProfileMarker, ProfileRequest, ProfileRequestStream, ServiceClassProfileIdentifier,
1118 };
1119 use fidl_fuchsia_metrics::{MetricEvent, MetricEventPayload};
1120 use futures::{SinkExt, StreamExt};
1121 use std::pin::pin;
1122
1123 use crate::media_task::tests::{TestMediaTask, TestMediaTaskBuilder};
1124 use crate::media_types::*;
1125 use crate::stream::tests::{make_sbc_endpoint, sbc_mediacodec_capability};
1126
1127 fn fake_metrics()
1128 -> (bt_metrics::MetricsLogger, fidl_fuchsia_metrics::MetricEventLoggerRequestStream) {
1129 let (c, s) = fidl::endpoints::create_proxy_and_stream::<
1130 fidl_fuchsia_metrics::MetricEventLoggerMarker,
1131 >();
1132 (bt_metrics::MetricsLogger::from_proxy(c), s)
1133 }
1134
1135 fn setup_avdtp_peer() -> (avdtp::Peer, Channel) {
1136 let (remote, signaling) = Channel::create();
1137 let peer = avdtp::Peer::new(signaling);
1138 (peer, remote)
1139 }
1140
1141 fn build_test_streams() -> Streams {
1142 let mut streams = Streams::default();
1143 let source = Stream::build(
1144 make_sbc_endpoint(1, avdtp::EndpointType::Source),
1145 TestMediaTaskBuilder::new_delayable().builder(),
1146 );
1147 streams.insert(source);
1148 let sink = Stream::build(
1149 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
1150 TestMediaTaskBuilder::new().builder(),
1151 );
1152 streams.insert(sink);
1153 streams
1154 }
1155
1156 fn build_test_streams_delayable() -> Streams {
1157 fn with_delay(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
1158 StreamEndpoint::new(
1159 seid,
1160 avdtp::MediaType::Audio,
1161 direction,
1162 vec![
1163 avdtp::ServiceCapability::MediaTransport,
1164 avdtp::ServiceCapability::DelayReporting,
1165 sbc_mediacodec_capability(),
1166 ],
1167 )
1168 .expect("endpoint creation should succeed")
1169 }
1170 let mut streams = Streams::default();
1171 let source = Stream::build(
1172 with_delay(1, avdtp::EndpointType::Source),
1173 TestMediaTaskBuilder::new_delayable().builder(),
1174 );
1175 streams.insert(source);
1176 let sink = Stream::build(
1177 with_delay(2, avdtp::EndpointType::Sink),
1178 TestMediaTaskBuilder::new().builder(),
1179 );
1180 streams.insert(sink);
1181 streams
1182 }
1183
1184 #[track_caller]
1185 pub(crate) fn recv_remote(remote: &mut Channel) -> Result<Vec<u8>, zx::Status> {
1186 let fut = remote.next();
1187 match fut.now_or_never() {
1188 Some(Some(res)) => res,
1189 Some(None) => Err(zx::Status::PEER_CLOSED),
1190 None => Err(zx::Status::SHOULD_WAIT),
1191 }
1192 }
1193
1194 fn setup_test_peer(
1197 use_cobalt: bool,
1198 streams: Streams,
1199 permits: Option<Permits>,
1200 ) -> (
1201 Channel,
1202 ProfileRequestStream,
1203 Option<fidl_fuchsia_metrics::MetricEventLoggerRequestStream>,
1204 Peer,
1205 ) {
1206 let (avdtp, remote) = setup_avdtp_peer();
1207 let (metrics_logger, cobalt_receiver) = if use_cobalt {
1208 let (l, r) = fake_metrics();
1209 (l, Some(r))
1210 } else {
1211 (bt_metrics::MetricsLogger::default(), None)
1212 };
1213 let (profile_proxy, requests) = create_proxy_and_stream::<ProfileMarker>();
1214 let peer = Peer::create(PeerId(1), avdtp, streams, permits, profile_proxy, metrics_logger);
1215
1216 (remote, requests, cobalt_receiver, peer)
1217 }
1218
1219 #[track_caller]
1220 fn expect_send(exec: &mut fasync::TestExecutor, remote: &mut Channel, data: Vec<u8>) {
1221 exec.run_until_stalled(&mut remote.send(data))
1222 .expect("poll is ready")
1223 .expect("write successful");
1224 }
1225
1226 fn expect_get_capabilities_and_respond(
1227 exec: &mut fasync::TestExecutor,
1228 remote: &mut Channel,
1229 expected_seid: u8,
1230 response_capabilities: &[u8],
1231 ) {
1232 let received = recv_remote(remote).unwrap();
1233 assert_eq!(0x00, received[0] & 0xF);
1235 assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
1237
1238 let txlabel_raw = received[0] & 0xF0;
1239
1240 #[rustfmt::skip]
1242 let mut get_capabilities_rsp = vec![
1243 txlabel_raw << 4 | 0x2, 0x02 ];
1246
1247 get_capabilities_rsp.extend_from_slice(response_capabilities);
1248
1249 expect_send(exec, remote, get_capabilities_rsp);
1250 }
1251
1252 fn expect_get_all_capabilities_and_respond(
1253 exec: &mut fasync::TestExecutor,
1254 remote: &mut Channel,
1255 expected_seid: u8,
1256 response_capabilities: &[u8],
1257 ) {
1258 let received = recv_remote(remote).unwrap();
1259 assert_eq!(0x00, received[0] & 0xF);
1261 assert_eq!(0x0C, received[1]); assert_eq!(expected_seid << 2, received[2]);
1263
1264 let txlabel_raw = received[0] & 0xF0;
1265
1266 #[rustfmt::skip]
1268 let mut get_capabilities_rsp = vec![
1269 txlabel_raw << 4 | 0x2, 0x0C ];
1272
1273 get_capabilities_rsp.extend_from_slice(response_capabilities);
1274
1275 expect_send(exec, remote, get_capabilities_rsp);
1276 }
1277
1278 #[fuchsia::test]
1279 fn disconnected() {
1280 let mut exec = fasync::TestExecutor::new();
1281 let (proxy, _stream) = create_proxy_and_stream::<ProfileMarker>();
1282 let (remote, signaling) = Channel::create();
1283
1284 let id = PeerId(1);
1285
1286 let avdtp = avdtp::Peer::new(signaling);
1287 let peer = Peer::create(
1288 id,
1289 avdtp,
1290 Streams::default(),
1291 None,
1292 proxy,
1293 bt_metrics::MetricsLogger::default(),
1294 );
1295
1296 let closed_fut = peer.closed();
1297
1298 let mut closed_fut = pin!(closed_fut);
1299
1300 assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
1301
1302 drop(remote);
1304
1305 assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
1306 }
1307
1308 #[fuchsia::test]
1309 fn peer_collect_capabilities_success() {
1310 let mut exec = fasync::TestExecutor::new();
1311
1312 let (mut remote, _, cobalt_receiver, peer) =
1313 setup_test_peer(true, build_test_streams(), None);
1314
1315 let p: ProfileDescriptor = ProfileDescriptor {
1316 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1317 major_version: Some(1),
1318 minor_version: Some(2),
1319 ..Default::default()
1320 };
1321 let _ = peer.set_descriptor(p);
1322
1323 let collect_future = peer.collect_capabilities();
1324 let mut collect_future = pin!(collect_future);
1325
1326 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1327
1328 let received = recv_remote(&mut remote).unwrap();
1330 assert_eq!(0x00, received[0] & 0xF);
1332 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1335
1336 let response: &[u8] = &[
1338 txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
1345 expect_send(&mut exec, &mut remote, response.to_vec());
1346
1347 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1348
1349 #[rustfmt::skip]
1351 let capabilities_rsp = &[
1352 0x01, 0x00,
1354 0x07, 0x06, 0x00, 0x04, 0xF0, 0x9F, 0x92, 0x96
1356 ];
1357 expect_get_capabilities_and_respond(&mut exec, &mut remote, 0x3E, capabilities_rsp);
1358
1359 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1360
1361 #[rustfmt::skip]
1363 let capabilities_rsp = &[
1364 0x01, 0x00,
1366 0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1368 ];
1369 expect_get_capabilities_and_respond(&mut exec, &mut remote, 0x01, capabilities_rsp);
1370
1371 match exec.run_until_stalled(&mut collect_future) {
1372 Poll::Pending => panic!("collect capabilities should be complete"),
1373 Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1374 Poll::Ready(Ok(endpoints)) => {
1375 let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1376 let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1377 for stream in endpoints {
1378 if stream.local_id() == &first_seid {
1379 let expected_caps = vec![
1380 ServiceCapability::MediaTransport,
1381 ServiceCapability::MediaCodec {
1382 media_type: avdtp::MediaType::Audio,
1383 codec_type: avdtp::MediaCodecType::new(0x04),
1384 codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1385 },
1386 ];
1387 assert_eq!(&expected_caps, stream.capabilities());
1388 } else if stream.local_id() == &second_seid {
1389 let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1390 assert_eq!(Some(&expected_codec_type), stream.codec_type());
1391 } else {
1392 panic!("Unexpected endpoint in the streams collected");
1393 }
1394 }
1395 }
1396 }
1397
1398 let mut recv = cobalt_receiver.expect("should have receiver");
1400 let mut log_events = Vec::new();
1401 while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1402 log_events.push(respond_to_metrics_req_for_test(req));
1403 }
1404
1405 assert_eq!(3, log_events.len());
1407 assert!(log_events.contains(&MetricEvent {
1408 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1409 event_codes: vec![
1410 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1411 ],
1412 payload: MetricEventPayload::Count(1),
1413 }));
1414 assert!(log_events.contains(&MetricEvent {
1415 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1416 event_codes: vec![
1417 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac as u32
1418 ],
1419 payload: MetricEventPayload::Count(1),
1420 }));
1421 assert!(log_events.contains(&MetricEvent {
1422 metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1423 event_codes: vec![
1424 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1425 ],
1426 payload: MetricEventPayload::Count(1),
1427 }));
1428
1429 let collect_future = peer.collect_capabilities();
1431 let mut collect_future = pin!(collect_future);
1432
1433 match exec.run_until_stalled(&mut collect_future) {
1434 Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1435 x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1436 };
1437 }
1438
1439 #[fuchsia::test]
1440 fn peer_collect_all_capabilities_success() {
1441 let mut exec = fasync::TestExecutor::new();
1442
1443 let (mut remote, _, cobalt_receiver, peer) =
1444 setup_test_peer(true, build_test_streams(), None);
1445 let p: ProfileDescriptor = ProfileDescriptor {
1446 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1447 major_version: Some(1),
1448 minor_version: Some(3),
1449 ..Default::default()
1450 };
1451 let _ = peer.set_descriptor(p);
1452
1453 let collect_future = peer.collect_capabilities();
1454 let mut collect_future = pin!(collect_future);
1455
1456 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1457
1458 let received = recv_remote(&mut remote).unwrap();
1460 assert_eq!(0x00, received[0] & 0xF);
1462 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1465
1466 let response: &[u8] = &[
1468 txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
1475 expect_send(&mut exec, &mut remote, response.to_vec());
1476
1477 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1478
1479 #[rustfmt::skip]
1481 let capabilities_rsp = &[
1482 0x01, 0x00,
1484 0x07, 0x06, 0x00, 0x40, 0xF0, 0x9F, 0x92, 0x96,
1486 0x08, 0x00
1488 ];
1489 expect_get_all_capabilities_and_respond(&mut exec, &mut remote, 0x3E, capabilities_rsp);
1490
1491 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1492
1493 #[rustfmt::skip]
1495 let capabilities_rsp = &[
1496 0x01, 0x00,
1498 0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1500 ];
1501 expect_get_all_capabilities_and_respond(&mut exec, &mut remote, 0x01, capabilities_rsp);
1502
1503 match exec.run_until_stalled(&mut collect_future) {
1504 Poll::Pending => panic!("collect capabilities should be complete"),
1505 Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1506 Poll::Ready(Ok(endpoints)) => {
1507 let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1508 let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1509 for stream in endpoints {
1510 if stream.local_id() == &first_seid {
1511 let expected_caps = vec![
1512 ServiceCapability::MediaTransport,
1513 ServiceCapability::MediaCodec {
1514 media_type: avdtp::MediaType::Audio,
1515 codec_type: avdtp::MediaCodecType::new(0x40),
1516 codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1517 },
1518 ServiceCapability::DelayReporting,
1519 ];
1520 assert_eq!(&expected_caps, stream.capabilities());
1521 } else if stream.local_id() == &second_seid {
1522 let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1523 assert_eq!(Some(&expected_codec_type), stream.codec_type());
1524 } else {
1525 panic!("Unexpected endpoint in the streams collected");
1526 }
1527 }
1528 }
1529 }
1530
1531 let mut recv = cobalt_receiver.expect("should have receiver");
1533 let mut log_events = Vec::new();
1534 while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1535 log_events.push(respond_to_metrics_req_for_test(req));
1536 }
1537
1538 assert_eq!(4, log_events.len());
1540 assert!(log_events.contains(&MetricEvent {
1541 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1542 event_codes: vec![
1543 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown as u32
1544 ],
1545 payload: MetricEventPayload::Count(1),
1546 }));
1547 assert!(log_events.contains(&MetricEvent {
1548 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1549 event_codes: vec![
1550 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1551 ],
1552 payload: MetricEventPayload::Count(1),
1553 }));
1554 assert!(log_events.contains(&MetricEvent {
1555 metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1556 event_codes: vec![
1557 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1558 ],
1559 payload: MetricEventPayload::Count(1),
1560 }));
1561 assert!(log_events.contains(&MetricEvent {
1562 metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1563 event_codes: vec![
1564 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport as u32
1565 ],
1566 payload: MetricEventPayload::Count(1),
1567 }));
1568
1569 let collect_future = peer.collect_capabilities();
1571 let mut collect_future = pin!(collect_future);
1572
1573 match exec.run_until_stalled(&mut collect_future) {
1574 Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1575 x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1576 };
1577 }
1578
1579 #[fuchsia::test]
1580 fn peer_collect_capabilities_discovery_fails() {
1581 let mut exec = fasync::TestExecutor::new();
1582
1583 let (mut remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1584
1585 let collect_future = peer.collect_capabilities();
1586 let mut collect_future = pin!(collect_future);
1587
1588 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1590
1591 let received = recv_remote(&mut remote).unwrap();
1593 assert_eq!(0x00, received[0] & 0xF);
1595 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1598
1599 let response: &[u8] = &[
1601 txlabel_raw | 0x0 << 2 | 0x3, 0x01, 0x31, ];
1605 expect_send(&mut exec, &mut remote, response.to_vec());
1606
1607 match exec.run_until_stalled(&mut collect_future) {
1610 Poll::Pending => panic!("Should be ready after discovery failure"),
1611 Poll::Ready(Ok(x)) => panic!("Should be an error but returned {x:?}"),
1612 Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
1613 assert_eq!(Some(Ok(avdtp::ErrorCode::BadState)), e.error_code());
1614 }
1615 Poll::Ready(Err(e)) => panic!("Should have been a RemoteRejected was was {e:?}"),
1616 }
1617 }
1618
1619 #[fuchsia::test]
1620 fn peer_collect_capabilities_get_capability_fails() {
1621 let mut exec = fasync::TestExecutor::new();
1622
1623 let (mut remote, _, _, peer) = setup_test_peer(true, build_test_streams(), None);
1624
1625 let collect_future = peer.collect_capabilities();
1626 let mut collect_future = pin!(collect_future);
1627
1628 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1630
1631 let received = recv_remote(&mut remote).unwrap();
1633 assert_eq!(0x00, received[0] & 0xF);
1635 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1638
1639 let response: &[u8] = &[
1641 txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
1648 expect_send(&mut exec, &mut remote, response.to_vec());
1649
1650 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1651
1652 let expected_seid = 0x3E;
1654 let received = recv_remote(&mut remote).unwrap();
1655 assert_eq!(0x00, received[0] & 0xF);
1657 assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
1659
1660 let txlabel_raw = received[0] & 0xF0;
1661
1662 let response: &[u8] = &[
1663 txlabel_raw | 0x0 << 2 | 0x3, 0x02, 0x12, ];
1667 expect_send(&mut exec, &mut remote, response.to_vec());
1668
1669 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1670
1671 let expected_seid = 0x01;
1673 let received = recv_remote(&mut remote).unwrap();
1674 assert_eq!(0x00, received[0] & 0xF);
1676 assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
1678
1679 let txlabel_raw = received[0] & 0xF0;
1680
1681 let response: &[u8] = &[
1682 txlabel_raw | 0x0 << 2 | 0x3, 0x02, 0x12, ];
1686 expect_send(&mut exec, &mut remote, response.to_vec());
1687
1688 match exec.run_until_stalled(&mut collect_future) {
1690 Poll::Pending => panic!("Should be ready after discovery failure"),
1691 Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1692 Poll::Ready(Ok(map)) => assert_eq!(0, map.len()),
1693 }
1694 }
1695
1696 fn receive_simple_accept(exec: &mut fasync::TestExecutor, remote: &mut Channel, signal_id: u8) {
1697 let received = recv_remote(remote).expect("expected a packet");
1698 assert_eq!(0x00, received[0] & 0xF);
1700 assert_eq!(signal_id, received[1]);
1701
1702 let txlabel_raw = received[0] & 0xF0;
1703
1704 let response: &[u8] = &[
1705 txlabel_raw | 0x0 << 2 | 0x2, signal_id,
1707 ];
1708 expect_send(exec, remote, response.to_vec());
1709 }
1710
1711 #[fuchsia::test]
1712 fn peer_stream_start_success() {
1713 let mut exec = fasync::TestExecutor::new();
1714
1715 let (mut remote, mut profile_request_stream, _, peer) =
1716 setup_test_peer(false, build_test_streams(), None);
1717
1718 let remote_seid = 2_u8.try_into().unwrap();
1719
1720 let codec_params = ServiceCapability::MediaCodec {
1721 media_type: avdtp::MediaType::Audio,
1722 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1723 codec_extra: vec![0x11, 0x45, 51, 51],
1724 };
1725
1726 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1727 let mut start_future = pin!(start_future);
1728
1729 match exec.run_until_stalled(&mut start_future) {
1730 Poll::Pending => {}
1731 x => panic!("Expected pending, but got {x:?}"),
1732 };
1733
1734 receive_simple_accept(&mut exec, &mut remote, 0x03); assert!(exec.run_until_stalled(&mut start_future).is_pending());
1737
1738 receive_simple_accept(&mut exec, &mut remote, 0x06); match exec.run_until_stalled(&mut start_future) {
1741 Poll::Pending => {}
1742 Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1743 Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1744 };
1745
1746 let (_, transport) = Channel::create();
1748
1749 let request = exec.run_until_stalled(&mut profile_request_stream.next());
1750 match request {
1751 Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
1752 assert_eq!(PeerId(1), peer_id.into());
1753 assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
1754 let channel = transport.try_into().unwrap();
1755 responder.send(Ok(channel)).expect("responder sends");
1756 }
1757 x => panic!("Should have sent a open l2cap request, but got {:?}", x),
1758 };
1759
1760 match exec.run_until_stalled(&mut start_future) {
1761 Poll::Pending => {}
1762 Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1763 Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1764 };
1765
1766 receive_simple_accept(&mut exec, &mut remote, 0x07); match exec.run_until_stalled(&mut start_future) {
1771 Poll::Pending => panic!("Should be ready after start succeeds"),
1772 Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1773 Poll::Ready(Ok(())) => {
1775 assert!(peer.is_streaming_now());
1776 }
1777 }
1778 }
1779
1780 #[fuchsia::test]
1781 fn peer_stream_start_picks_correct_direction() {
1782 let mut exec = fasync::TestExecutor::new();
1783
1784 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1785 let remote = avdtp::Peer::new(remote);
1786 let mut remote_events = remote.take_request_stream();
1787
1788 fn remote_handle_request(req: avdtp::Request) {
1790 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1791 let res = match req {
1792 avdtp::Request::Discover { responder } => {
1793 let infos = [avdtp::StreamInformation::new(
1794 expected_stream_id,
1795 false,
1796 avdtp::MediaType::Audio,
1797 avdtp::EndpointType::Source,
1798 )];
1799 responder.send(&infos)
1800 }
1801 avdtp::Request::GetAllCapabilities { stream_id, responder }
1802 | avdtp::Request::GetCapabilities { stream_id, responder } => {
1803 assert_eq!(expected_stream_id, stream_id);
1804 let caps = vec![
1805 ServiceCapability::MediaTransport,
1806 ServiceCapability::MediaCodec {
1807 media_type: avdtp::MediaType::Audio,
1808 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1809 codec_extra: vec![0x11, 0x45, 51, 250],
1810 },
1811 ];
1812 responder.send(&caps[..])
1813 }
1814 avdtp::Request::Open { responder, stream_id } => {
1815 assert_eq!(expected_stream_id, stream_id);
1816 responder.send()
1817 }
1818 avdtp::Request::SetConfiguration {
1819 responder,
1820 local_stream_id,
1821 remote_stream_id,
1822 ..
1823 } => {
1824 assert_eq!(local_stream_id, expected_stream_id);
1825 assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1827 responder.send()
1828 }
1829 x => panic!("Unexpected request: {:?}", x),
1830 };
1831 res.expect("should be able to respond");
1832 }
1833
1834 let collect_capabilities_fut = peer.collect_capabilities();
1836 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1837
1838 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1839
1840 let request = exec.run_singlethreaded(&mut remote_events.next());
1841 remote_handle_request(request.expect("should have a discovery request").unwrap());
1842
1843 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1844 let request = exec.run_singlethreaded(&mut remote_events.next());
1845 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1846
1847 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1848
1849 let remote_seid = 4_u8.try_into().unwrap();
1851
1852 let codec_params = ServiceCapability::MediaCodec {
1853 media_type: avdtp::MediaType::Audio,
1854 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1855 codec_extra: vec![0x11, 0x45, 51, 51],
1856 };
1857 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1858 let mut start_future = pin!(start_future);
1859
1860 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1861 let request = exec.run_singlethreaded(&mut remote_events.next());
1862 remote_handle_request(request.expect("should have a set_capabilities request").unwrap());
1863
1864 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1865 let request = exec.run_singlethreaded(&mut remote_events.next());
1866 remote_handle_request(request.expect("should have an open request").unwrap());
1867 }
1868
1869 #[fuchsia::test]
1870 fn peer_stream_start_strips_unsupported_local_capabilities() {
1871 let mut exec = fasync::TestExecutor::new();
1872
1873 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1874 let remote = avdtp::Peer::new(remote);
1875 let mut remote_events = remote.take_request_stream();
1876
1877 fn remote_handle_request(req: avdtp::Request) {
1879 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1880 let res = match req {
1881 avdtp::Request::Discover { responder } => {
1882 let infos = [avdtp::StreamInformation::new(
1883 expected_stream_id,
1884 false,
1885 avdtp::MediaType::Audio,
1886 avdtp::EndpointType::Source,
1887 )];
1888 responder.send(&infos)
1889 }
1890 avdtp::Request::GetAllCapabilities { stream_id, responder }
1891 | avdtp::Request::GetCapabilities { stream_id, responder } => {
1892 assert_eq!(expected_stream_id, stream_id);
1893 let caps = vec![
1894 ServiceCapability::MediaTransport,
1895 ServiceCapability::DelayReporting,
1897 ServiceCapability::MediaCodec {
1898 media_type: avdtp::MediaType::Audio,
1899 codec_type: avdtp::MediaCodecType::AUDIO_AAC,
1900 codec_extra: vec![128, 0, 132, 134, 0, 0],
1901 },
1902 ];
1903 responder.send(&caps[..])
1904 }
1905 avdtp::Request::Open { responder, stream_id } => {
1906 assert_eq!(expected_stream_id, stream_id);
1907 responder.send()
1908 }
1909 avdtp::Request::SetConfiguration {
1910 responder,
1911 local_stream_id,
1912 remote_stream_id,
1913 capabilities,
1914 } => {
1915 assert_eq!(local_stream_id, expected_stream_id);
1916 assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1918 assert!(!capabilities.contains(&ServiceCapability::DelayReporting));
1921 responder.send()
1922 }
1923 x => panic!("Unexpected request: {:?}", x),
1924 };
1925 res.expect("should be able to respond");
1926 }
1927
1928 let collect_capabilities_fut = peer.collect_capabilities();
1930 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1931
1932 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1933
1934 let request = exec.run_singlethreaded(&mut remote_events.next());
1935 remote_handle_request(request.expect("should have a discovery request").unwrap());
1936
1937 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1938 let request = exec.run_singlethreaded(&mut remote_events.next());
1939 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1940
1941 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1942
1943 let remote_seid = 4_u8.try_into().unwrap();
1945
1946 let codec_params = ServiceCapability::MediaCodec {
1947 media_type: avdtp::MediaType::Audio,
1948 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1949 codec_extra: vec![0x11, 0x45, 51, 51],
1950 };
1951 let start_future =
1952 peer.stream_start(remote_seid, vec![codec_params, ServiceCapability::DelayReporting]);
1953 let mut start_future = pin!(start_future);
1954
1955 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1956 let request = exec.run_singlethreaded(&mut remote_events.next());
1957 remote_handle_request(request.expect("should have a set_configuration request").unwrap());
1958
1959 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1960 let request = exec.run_singlethreaded(&mut remote_events.next());
1961 remote_handle_request(request.expect("should have an open request").unwrap());
1962 }
1963
1964 #[fuchsia::test]
1965 fn peer_stream_start_orders_local_capabilities() {
1966 let mut exec = fasync::TestExecutor::new();
1967
1968 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams_delayable(), None);
1969 let remote = avdtp::Peer::new(remote);
1970 let mut remote_events = remote.take_request_stream();
1971
1972 fn remote_handle_request(req: avdtp::Request) {
1974 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1975 let res = match req {
1976 avdtp::Request::Discover { responder } => {
1977 let infos = [avdtp::StreamInformation::new(
1978 expected_stream_id,
1979 false,
1980 avdtp::MediaType::Audio,
1981 avdtp::EndpointType::Source,
1982 )];
1983 responder.send(&infos)
1984 }
1985 avdtp::Request::GetAllCapabilities { stream_id, responder }
1986 | avdtp::Request::GetCapabilities { stream_id, responder } => {
1987 assert_eq!(expected_stream_id, stream_id);
1988 let caps = &[
1989 ServiceCapability::MediaTransport,
1990 ServiceCapability::MediaCodec {
1991 media_type: avdtp::MediaType::Audio,
1992 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1993 codec_extra: vec![0x11, 0x45, 51, 250],
1994 },
1995 ServiceCapability::DelayReporting,
1996 ];
1997 responder.send(caps)
1998 }
1999 avdtp::Request::Open { responder, stream_id } => {
2000 assert_eq!(expected_stream_id, stream_id);
2001 responder.send()
2002 }
2003 avdtp::Request::SetConfiguration {
2004 responder,
2005 local_stream_id,
2006 remote_stream_id,
2007 capabilities,
2008 } => {
2009 assert_eq!(local_stream_id, expected_stream_id);
2010 assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
2012 let mut capabilities_ordered = capabilities.clone();
2014 capabilities_ordered.sort_by_key(ServiceCapability::category);
2015 assert_eq!(capabilities, capabilities_ordered);
2016 responder.send()
2017 }
2018 x => panic!("Unexpected request: {:?}", x),
2019 };
2020 res.expect("should be able to respond");
2021 }
2022
2023 let collect_capabilities_fut = peer.collect_capabilities();
2025 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2026
2027 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2028
2029 let request = exec.run_singlethreaded(&mut remote_events.next());
2030 remote_handle_request(request.expect("should have a discovery request").unwrap());
2031
2032 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2033 let request = exec.run_singlethreaded(&mut remote_events.next());
2034 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2035
2036 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2037
2038 let remote_seid = 4_u8.try_into().unwrap();
2040
2041 let codec_params = ServiceCapability::MediaCodec {
2042 media_type: avdtp::MediaType::Audio,
2043 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2044 codec_extra: vec![0x11, 0x45, 51, 51],
2045 };
2046 let start_future = peer.stream_start(
2047 remote_seid,
2048 vec![
2049 ServiceCapability::MediaTransport,
2050 ServiceCapability::DelayReporting,
2051 codec_params,
2052 ],
2053 );
2054 let mut start_future = pin!(start_future);
2055
2056 assert!(exec.run_until_stalled(&mut start_future).is_pending());
2057 let request = exec.run_singlethreaded(&mut remote_events.next());
2058 remote_handle_request(request.expect("should have a set_configuration request").unwrap());
2059
2060 assert!(exec.run_until_stalled(&mut start_future).is_pending());
2061 let request = exec.run_singlethreaded(&mut remote_events.next());
2062 remote_handle_request(request.expect("should have an open request").unwrap());
2063 }
2064
2065 #[fuchsia::test]
2068 fn peer_stream_start_permit_revoked() {
2069 let mut exec = fasync::TestExecutor::new();
2070
2071 let test_permits = Permits::new(1);
2072 let (mut remote, mut profile_request_stream, _, peer) =
2073 setup_test_peer(false, build_test_streams(), Some(test_permits.clone()));
2074
2075 let remote_seid = 2_u8.try_into().unwrap();
2076
2077 let codec_params = ServiceCapability::MediaCodec {
2078 media_type: avdtp::MediaType::Audio,
2079 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2080 codec_extra: vec![0x11, 0x45, 51, 51],
2081 };
2082
2083 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2084 let mut start_future = pin!(start_future);
2085
2086 let _ = exec
2087 .run_until_stalled(&mut start_future)
2088 .expect_pending("waiting for set config response");
2089 receive_simple_accept(&mut exec, &mut remote, 0x03); exec.run_until_stalled(&mut start_future).expect_pending("waiting for open response");
2091 receive_simple_accept(&mut exec, &mut remote, 0x06); exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2093 assert!(!peer.is_streaming_now());
2094
2095 let (_, transport) = Channel::create();
2097
2098 let request = exec.run_until_stalled(&mut profile_request_stream.next());
2099 match request {
2100 Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
2101 assert_eq!(PeerId(1), peer_id.into());
2102 assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
2103 let channel = transport.try_into().unwrap();
2104 responder.send(Ok(channel)).expect("responder sends");
2105 }
2106 x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2107 };
2108
2109 exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2110 assert!(!peer.is_streaming_now());
2111
2112 let seized_permits = test_permits.seize();
2114 assert_eq!(seized_permits.len(), 1);
2115 receive_simple_accept(&mut exec, &mut remote, 0x07); exec.run_until_stalled(&mut start_future)
2120 .expect_pending("waiting to send outgoing suspend");
2121 assert!(!peer.is_streaming_now());
2122 receive_simple_accept(&mut exec, &mut remote, 0x09); let () = exec
2128 .run_until_stalled(&mut start_future)
2129 .expect("start finished")
2130 .expect("suspended stream is ok");
2131 assert!(!peer.is_streaming_now());
2132 }
2133
2134 #[fuchsia::test]
2135 fn peer_stream_start_fails_wrong_direction() {
2136 let mut exec = fasync::TestExecutor::new();
2137
2138 let mut streams = Streams::default();
2140 let source = Stream::build(
2141 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2142 TestMediaTaskBuilder::new().builder(),
2143 );
2144 streams.insert(source);
2145
2146 let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2147 let remote = avdtp::Peer::new(remote);
2148 let mut remote_events = remote.take_request_stream();
2149
2150 fn remote_handle_request(req: avdtp::Request) {
2152 let expected_stream_id: StreamEndpointId = 2_u8.try_into().unwrap();
2153 let res = match req {
2154 avdtp::Request::Discover { responder } => {
2155 let infos = [avdtp::StreamInformation::new(
2156 expected_stream_id,
2157 false,
2158 avdtp::MediaType::Audio,
2159 avdtp::EndpointType::Source,
2160 )];
2161 responder.send(&infos)
2162 }
2163 avdtp::Request::GetAllCapabilities { stream_id, responder }
2164 | avdtp::Request::GetCapabilities { stream_id, responder } => {
2165 assert_eq!(expected_stream_id, stream_id);
2166 let caps = vec![
2167 ServiceCapability::MediaTransport,
2168 ServiceCapability::MediaCodec {
2169 media_type: avdtp::MediaType::Audio,
2170 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2171 codec_extra: vec![0x11, 0x45, 51, 250],
2172 },
2173 ];
2174 responder.send(&caps[..])
2175 }
2176 avdtp::Request::Open { responder, .. } => responder.send(),
2177 avdtp::Request::SetConfiguration { responder, .. } => responder.send(),
2178 x => panic!("Unexpected request: {:?}", x),
2179 };
2180 res.expect("should be able to respond");
2181 }
2182
2183 let collect_capabilities_fut = peer.collect_capabilities();
2185 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2186
2187 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2188
2189 let request = exec.run_singlethreaded(&mut remote_events.next());
2190 remote_handle_request(request.expect("should have a discovery request").unwrap());
2191
2192 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2193 let request = exec.run_singlethreaded(&mut remote_events.next());
2194 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2195
2196 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2197
2198 let remote_seid = 2_u8.try_into().unwrap();
2200
2201 let codec_params = ServiceCapability::MediaCodec {
2202 media_type: avdtp::MediaType::Audio,
2203 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2204 codec_extra: vec![0x11, 0x45, 51, 51],
2205 };
2206 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2207 let mut start_future = pin!(start_future);
2208
2209 match exec.run_until_stalled(&mut start_future) {
2210 Poll::Ready(Err(avdtp::Error::OutOfRange)) => {}
2211 x => panic!("Expected a ready OutOfRange error but got {:?}", x),
2212 };
2213 }
2214
2215 #[fuchsia::test]
2216 fn peer_stream_start_fails_to_connect() {
2217 let mut exec = fasync::TestExecutor::new();
2218
2219 let (mut remote, mut profile_request_stream, _, peer) =
2220 setup_test_peer(false, build_test_streams(), None);
2221
2222 let remote_seid = 2_u8.try_into().unwrap();
2223
2224 let codec_params = ServiceCapability::MediaCodec {
2225 media_type: avdtp::MediaType::Audio,
2226 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2227 codec_extra: vec![0x11, 0x45, 51, 51],
2228 };
2229
2230 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2231 let mut start_future = pin!(start_future);
2232
2233 match exec.run_until_stalled(&mut start_future) {
2234 Poll::Pending => {}
2235 x => panic!("was expecting pending but got {x:?}"),
2236 };
2237
2238 receive_simple_accept(&mut exec, &mut remote, 0x03); assert!(exec.run_until_stalled(&mut start_future).is_pending());
2241
2242 receive_simple_accept(&mut exec, &mut remote, 0x06); match exec.run_until_stalled(&mut start_future) {
2245 Poll::Pending => {}
2246 Poll::Ready(x) => panic!("Expected to be pending but {x:?}"),
2247 };
2248
2249 let request = exec.run_until_stalled(&mut profile_request_stream.next());
2251 match request {
2252 Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, responder, .. }))) => {
2253 assert_eq!(PeerId(1), peer_id.into());
2254 responder.send(Err(ErrorCode::Failed)).expect("responder sends");
2255 }
2256 x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2257 };
2258
2259 match exec.run_until_stalled(&mut start_future) {
2262 Poll::Pending => panic!("Should be ready after start fails"),
2263 Poll::Ready(Ok(_stream)) => panic!("Shouldn't have succeeded stream here"),
2264 Poll::Ready(Err(_)) => {}
2265 }
2266 }
2267
2268 #[fuchsia::test]
2270 async fn peer_delay_report() {
2271 let (remote, _profile_requests, cobalt_recv, peer) =
2272 setup_test_peer(true, build_test_streams(), None);
2273 let remote_peer = avdtp::Peer::new(remote);
2274 let mut remote_events = remote_peer.take_request_stream();
2275
2276 async fn remote_handle_request(req: avdtp::Request, peer: &avdtp::Peer) {
2278 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
2279 let expected_peer_stream_id: StreamEndpointId = 1_u8.try_into().unwrap();
2281 use avdtp::Request::*;
2282 match req {
2283 Discover { responder } => {
2284 let infos = [avdtp::StreamInformation::new(
2285 expected_stream_id,
2286 false,
2287 avdtp::MediaType::Audio,
2288 avdtp::EndpointType::Sink,
2289 )];
2290 responder.send(&infos).expect("response should succeed");
2291 }
2292 GetAllCapabilities { stream_id, responder }
2293 | GetCapabilities { stream_id, responder } => {
2294 assert_eq!(expected_stream_id, stream_id);
2295 let caps = vec![
2296 ServiceCapability::MediaTransport,
2297 ServiceCapability::MediaCodec {
2298 media_type: avdtp::MediaType::Audio,
2299 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2300 codec_extra: vec![0x11, 0x45, 51, 250],
2301 },
2302 ];
2303 responder.send(&caps[..]).expect("response should succeed");
2304 assert!(peer.delay_report(&expected_peer_stream_id, 0xc0de).await.is_err());
2307 }
2308 Open { responder, stream_id } => {
2309 assert!(peer.delay_report(&expected_stream_id, 0xc0de).await.is_err());
2311 peer.delay_report(&expected_peer_stream_id, 0xc0de)
2313 .await
2314 .expect("should get acked correctly");
2315 assert_eq!(expected_stream_id, stream_id);
2316 responder.send().expect("response should succeed");
2317 }
2318 SetConfiguration { responder, local_stream_id, remote_stream_id, .. } => {
2319 assert_eq!(local_stream_id, expected_stream_id);
2320 assert_eq!(remote_stream_id, expected_peer_stream_id);
2321 responder.send().expect("should send back response without issue");
2322 }
2323 x => panic!("Unexpected request: {:?}", x),
2324 };
2325 }
2326
2327 let collect_fut = pin!(peer.collect_capabilities());
2328
2329 let Either::Left((request, collect_fut)) =
2331 futures::future::select(remote_events.next(), collect_fut).await
2332 else {
2333 panic!("Collect future shouldn't finish first");
2334 };
2335 let collect_fut = pin!(collect_fut);
2336 remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2337 let Either::Left((request, collect_fut)) =
2338 futures::future::select(remote_events.next(), collect_fut).await
2339 else {
2340 panic!("Collect future shouldn't finish first");
2341 };
2342 remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2343
2344 assert_eq!(1, collect_fut.await.expect("should get the remote endpoints back").len());
2346
2347 let remote_seid = 4_u8.try_into().unwrap();
2349
2350 let codec_params = ServiceCapability::MediaCodec {
2351 media_type: avdtp::MediaType::Audio,
2352 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2353 codec_extra: vec![0x11, 0x45, 51, 51],
2354 };
2355
2356 let _start_task = fasync::Task::spawn(async move {
2359 let _ = peer.stream_start(remote_seid, vec![codec_params]).await;
2360 panic!("stream start task finished");
2361 });
2362
2363 let request = remote_events.next().await.expect("should have set_config").unwrap();
2364 remote_handle_request(request, &remote_peer).await;
2365
2366 let request = remote_events.next().await.expect("should have open").unwrap();
2367 remote_handle_request(request, &remote_peer).await;
2368
2369 let mut cobalt = cobalt_recv.expect("should have receiver");
2370
2371 let mut got_ids = HashMap::new();
2372 let delay_metric_id = bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID;
2373 while got_ids.len() < 3 || *got_ids.get(&delay_metric_id).unwrap_or(&0) < 3 {
2374 let report = respond_to_metrics_req_for_test(cobalt.next().await.unwrap().unwrap());
2375 let _ = got_ids.entry(report.metric_id).and_modify(|x| *x += 1).or_insert(1);
2376 if report.metric_id == delay_metric_id {
2378 assert_eq!(MetricEventPayload::IntegerValue(0xc0de * 100000), report.payload);
2379 }
2380 }
2381 assert!(got_ids.contains_key(&bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID));
2382 assert!(got_ids.contains_key(&bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID));
2383 assert!(got_ids.contains_key(&delay_metric_id));
2384 assert_eq!(got_ids.get(&delay_metric_id).cloned(), Some(3));
2387 }
2388
2389 fn sbc_capabilities() -> Vec<ServiceCapability> {
2390 let sbc_codec_info = SbcCodecInfo::new(
2391 SbcSamplingFrequency::FREQ48000HZ,
2392 SbcChannelMode::JOINT_STEREO,
2393 SbcBlockCount::SIXTEEN,
2394 SbcSubBands::EIGHT,
2395 SbcAllocation::LOUDNESS,
2396 53,
2397 53,
2398 )
2399 .expect("sbc codec info");
2400
2401 vec![avdtp::ServiceCapability::MediaTransport, sbc_codec_info.into()]
2402 }
2403
2404 #[fuchsia::test]
2406 fn peer_as_acceptor() {
2407 let mut exec = fasync::TestExecutor::new();
2408
2409 let mut streams = Streams::default();
2410 let mut test_builder = TestMediaTaskBuilder::new();
2411 streams.insert(Stream::build(
2412 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2413 test_builder.builder(),
2414 ));
2415
2416 let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2417 let remote_peer = avdtp::Peer::new(remote);
2418
2419 let discover_fut = remote_peer.discover();
2420 let mut discover_fut = pin!(discover_fut);
2421
2422 let expected = vec![make_sbc_endpoint(1, avdtp::EndpointType::Source).information()];
2423 match exec.run_until_stalled(&mut discover_fut) {
2424 Poll::Ready(Ok(res)) => assert_eq!(res, expected),
2425 x => panic!("Expected discovery to complete and got {:?}", x),
2426 };
2427
2428 let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2429 let unknown_endpoint_id = 2_u8.try_into().expect("should be able to get sbc endpointid");
2430
2431 let get_caps_fut = remote_peer.get_capabilities(&sbc_endpoint_id);
2432 let mut get_caps_fut = pin!(get_caps_fut);
2433
2434 match exec.run_until_stalled(&mut get_caps_fut) {
2435 Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2437 x => panic!("Get capabilities should be ready but got {:?}", x),
2438 };
2439
2440 let get_caps_fut = remote_peer.get_capabilities(&unknown_endpoint_id);
2441 let mut get_caps_fut = pin!(get_caps_fut);
2442
2443 match exec.run_until_stalled(&mut get_caps_fut) {
2444 Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2445 assert_eq!(Some(Ok(avdtp::ErrorCode::BadAcpSeid)), e.error_code())
2446 }
2447 x => panic!("Get capabilities should be a ready error but got {:?}", x),
2448 };
2449
2450 let get_caps_fut = remote_peer.get_all_capabilities(&sbc_endpoint_id);
2451 let mut get_caps_fut = pin!(get_caps_fut);
2452
2453 match exec.run_until_stalled(&mut get_caps_fut) {
2454 Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2456 x => panic!("Get capabilities should be ready but got {:?}", x),
2457 };
2458
2459 let sbc_caps = sbc_capabilities();
2460 let set_config_fut =
2461 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2462 let mut set_config_fut = pin!(set_config_fut);
2463
2464 match exec.run_until_stalled(&mut set_config_fut) {
2465 Poll::Ready(Ok(())) => {}
2466 x => panic!("Set capabilities should be ready but got {:?}", x),
2467 };
2468
2469 let open_fut = remote_peer.open(&sbc_endpoint_id);
2470 let mut open_fut = pin!(open_fut);
2471 match exec.run_until_stalled(&mut open_fut) {
2472 Poll::Ready(Ok(())) => {}
2473 x => panic!("Open should be ready but got {:?}", x),
2474 };
2475
2476 let (_remote_transport, transport) = Channel::create();
2478
2479 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2480
2481 let stream_ids = vec![sbc_endpoint_id.clone()];
2482 let start_fut = remote_peer.start(&stream_ids);
2483 let mut start_fut = pin!(start_fut);
2484 match exec.run_until_stalled(&mut start_fut) {
2485 Poll::Ready(Ok(())) => {}
2486 x => panic!("Start should be ready but got {:?}", x),
2487 };
2488
2489 let media_task = test_builder.expect_task();
2491 assert!(media_task.is_started());
2492
2493 let suspend_fut = remote_peer.suspend(&stream_ids);
2494 let mut suspend_fut = pin!(suspend_fut);
2495 match exec.run_until_stalled(&mut suspend_fut) {
2496 Poll::Ready(Ok(())) => {}
2497 x => panic!("Start should be ready but got {:?}", x),
2498 };
2499
2500 assert!(!media_task.is_started());
2502 }
2503
2504 #[fuchsia::test]
2505 fn peer_set_config_reject_first() {
2506 let mut exec = fasync::TestExecutor::new();
2507
2508 let mut streams = Streams::default();
2509 let test_builder = TestMediaTaskBuilder::new();
2510 streams.insert(Stream::build(
2511 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2512 test_builder.builder(),
2513 ));
2514
2515 let (remote, _requests, _, _peer) = setup_test_peer(false, streams, None);
2516 let remote_peer = avdtp::Peer::new(remote);
2517
2518 let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2519
2520 let wrong_freq_sbc = &[SbcCodecInfo::new(
2521 SbcSamplingFrequency::FREQ44100HZ, SbcChannelMode::JOINT_STEREO,
2523 SbcBlockCount::SIXTEEN,
2524 SbcSubBands::EIGHT,
2525 SbcAllocation::LOUDNESS,
2526 53,
2527 53,
2528 )
2529 .expect("sbc codec info")
2530 .into()];
2531
2532 let set_config_fut =
2533 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, wrong_freq_sbc);
2534 let mut set_config_fut = pin!(set_config_fut);
2535
2536 match exec.run_until_stalled(&mut set_config_fut) {
2537 Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2538 assert!(e.service_category().is_some())
2539 }
2540 x => panic!("Set capabilities should have been rejected but got {:?}", x),
2541 };
2542
2543 let sbc_caps = sbc_capabilities();
2544 let set_config_fut =
2545 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2546 let mut set_config_fut = pin!(set_config_fut);
2547
2548 match exec.run_until_stalled(&mut set_config_fut) {
2549 Poll::Ready(Ok(())) => {}
2550 x => panic!("Set capabilities should be ready but got {:?}", x),
2551 };
2552 }
2553
2554 #[fuchsia::test]
2555 fn peer_starts_waiting_streams() {
2556 let mut exec = fasync::TestExecutor::new_with_fake_time();
2557 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(5_000_000_000));
2558
2559 let mut streams = Streams::default();
2560 let mut test_builder = TestMediaTaskBuilder::new();
2561 streams.insert(Stream::build(
2562 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2563 test_builder.builder(),
2564 ));
2565
2566 let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2567 let remote_peer = avdtp::Peer::new(remote);
2568
2569 let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2570
2571 let sbc_caps = sbc_capabilities();
2572 let set_config_fut =
2573 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2574 let mut set_config_fut = pin!(set_config_fut);
2575
2576 match exec.run_until_stalled(&mut set_config_fut) {
2577 Poll::Ready(Ok(())) => {}
2578 x => panic!("Set capabilities should be ready but got {:?}", x),
2579 };
2580
2581 let open_fut = remote_peer.open(&sbc_endpoint_id);
2582 let mut open_fut = pin!(open_fut);
2583 match exec.run_until_stalled(&mut open_fut) {
2584 Poll::Ready(Ok(())) => {}
2585 x => panic!("Open should be ready but got {:?}", x),
2586 };
2587
2588 let (_remote_transport, transport) = Channel::create();
2590 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2591
2592 let mut remote_requests = remote_peer.take_request_stream();
2594 let next_remote_request_fut = remote_requests.next();
2595 let mut next_remote_request_fut = pin!(next_remote_request_fut);
2596
2597 assert!(exec.run_until_stalled(&mut next_remote_request_fut).is_pending());
2599
2600 exec.set_fake_time(zx::MonotonicDuration::from_seconds(3).after_now());
2602 let _ = exec.wake_expired_timers();
2603
2604 let stream_ids = match exec.run_until_stalled(&mut next_remote_request_fut) {
2605 Poll::Ready(Some(Ok(avdtp::Request::Start { responder, stream_ids }))) => {
2606 responder.send().unwrap();
2607 stream_ids
2608 }
2609 x => panic!("Expected to receive a start request for the stream, got {:?}", x),
2610 };
2611
2612 let media_task =
2614 exec.run_until_stalled(&mut test_builder.next_task()).expect("ready").unwrap();
2615 assert!(media_task.is_started());
2616
2617 let suspend_fut = remote_peer.suspend(&stream_ids);
2619 let mut suspend_fut = pin!(suspend_fut);
2620 match exec.run_until_stalled(&mut suspend_fut) {
2621 Poll::Ready(Ok(())) => {}
2622 x => panic!("Suspend should be ready but got {:?}", x),
2623 };
2624
2625 assert!(!media_task.is_started());
2627 }
2628
2629 #[fuchsia::test]
2630 fn needs_permit_to_start_streams() {
2631 let mut exec = fasync::TestExecutor::new();
2632
2633 let mut streams = Streams::default();
2634 let mut test_builder = TestMediaTaskBuilder::new();
2635 streams.insert(Stream::build(
2636 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2637 test_builder.builder(),
2638 ));
2639 streams.insert(Stream::build(
2640 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2641 test_builder.builder(),
2642 ));
2643 let mut next_task_fut = test_builder.next_task();
2644
2645 let permits = Permits::new(1);
2646 let taken_permit = permits.get().expect("permit taken");
2647 let (remote, _profile_request_stream, _, peer) =
2648 setup_test_peer(false, streams, Some(permits.clone()));
2649 let remote_peer = avdtp::Peer::new(remote);
2650
2651 let sbc_endpoint_id = 1_u8.try_into().unwrap();
2652
2653 let sbc_caps = sbc_capabilities();
2654 let mut set_config_fut =
2655 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2656
2657 match exec.run_until_stalled(&mut set_config_fut) {
2658 Poll::Ready(Ok(())) => {}
2659 x => panic!("Set capabilities should be ready but got {:?}", x),
2660 };
2661
2662 let mut open_fut = remote_peer.open(&sbc_endpoint_id);
2663 match exec.run_until_stalled(&mut open_fut) {
2664 Poll::Ready(Ok(())) => {}
2665 x => panic!("Open should be ready but got {:?}", x),
2666 };
2667
2668 let (_remote_transport, transport) = Channel::create();
2670 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2671
2672 let sbc_endpoint_two = 2_u8.try_into().unwrap();
2674
2675 let mut set_config_fut =
2676 remote_peer.set_configuration(&sbc_endpoint_two, &sbc_endpoint_two, &sbc_caps);
2677
2678 match exec.run_until_stalled(&mut set_config_fut) {
2679 Poll::Ready(Ok(())) => {}
2680 x => panic!("Set capabilities should be ready but got {:?}", x),
2681 };
2682
2683 let mut open_fut = remote_peer.open(&sbc_endpoint_two);
2684 match exec.run_until_stalled(&mut open_fut) {
2685 Poll::Ready(Ok(())) => {}
2686 x => panic!("Open should be ready but got {:?}", x),
2687 };
2688
2689 let (_remote_transport_two, transport_two) = Channel::create();
2691 assert_eq!(Some(()), peer.receive_channel(transport_two).ok());
2692
2693 let unknown_endpoint_id: StreamEndpointId = 9_u8.try_into().unwrap();
2696 let stream_ids = [sbc_endpoint_id.clone(), unknown_endpoint_id.clone()];
2697 let mut start_fut = remote_peer.start(&stream_ids);
2698 match exec.run_until_stalled(&mut start_fut) {
2699 Poll::Ready(Err(avdtp::Error::RemoteRejected(rejection))) => {
2700 assert_eq!(avdtp::ErrorCode::BadAcpSeid, rejection.error_code().unwrap().unwrap());
2701 assert_eq!(unknown_endpoint_id, rejection.stream_id().unwrap());
2702 }
2703 x => panic!("Start should be ready but got {:?}", x),
2704 };
2705
2706 let mut remote_requests = remote_peer.take_request_stream();
2708
2709 let suspended_stream_ids = match exec.run_singlethreaded(&mut remote_requests.next()) {
2710 Some(Ok(avdtp::Request::Suspend { responder, stream_ids })) => {
2711 responder.send().unwrap();
2712 stream_ids
2713 }
2714 x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2715 };
2716
2717 assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2718 assert_eq!(1, suspended_stream_ids.len());
2719
2720 match exec.run_until_stalled(&mut next_task_fut) {
2722 Poll::Pending => {}
2723 x => panic!("Local task should not have been created at this point: {:?}", x),
2724 };
2725
2726 let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
2729 match exec.run_until_stalled(&mut start_fut) {
2730 Poll::Ready(Ok(())) => {}
2731 x => panic!("Start should be ready but got {:?}", x),
2732 }
2733
2734 let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2735 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2736 responder.send().unwrap();
2737 stream_ids
2738 }
2739 x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2740 };
2741 assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2742
2743 drop(taken_permit);
2745
2746 match exec.run_singlethreaded(&mut remote_requests.next()) {
2747 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2748 assert_eq!(stream_ids, &[sbc_endpoint_id.clone()]);
2749 responder.send().unwrap();
2750 }
2751 x => panic!("Expected start on permit available but got {x:?}"),
2752 };
2753
2754 let media_task = match exec.run_until_stalled(&mut next_task_fut) {
2756 Poll::Ready(Some(task)) => task,
2757 x => panic!("Local task should be created at this point: {:?}", x),
2758 };
2759
2760 assert!(media_task.is_started());
2761
2762 let mut start_fut = remote_peer.start(&[sbc_endpoint_two.clone()]);
2764 match exec.run_until_stalled(&mut start_fut) {
2765 Poll::Ready(Ok(())) => {}
2766 x => panic!("Start should be ready but got {:?}", x),
2767 }
2768
2769 let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2770 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2771 responder.send().unwrap();
2772 stream_ids
2773 }
2774 x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2775 };
2776
2777 assert!(suspended_stream_ids.contains(&sbc_endpoint_two));
2778 assert_eq!(1, suspended_stream_ids.len());
2779
2780 let mut suspend_fut = remote_peer.suspend(&[sbc_endpoint_id.clone()]);
2782 match exec.run_until_stalled(&mut suspend_fut) {
2783 Poll::Ready(Ok(())) => {}
2784 x => panic!("Start should be ready but got {:?}", x),
2785 }
2786
2787 match exec.run_singlethreaded(&mut remote_requests.next()) {
2788 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2789 assert_eq!(stream_ids, &[sbc_endpoint_two]);
2790 responder.send().unwrap();
2791 }
2792 x => panic!("Expected start on permit available but got {x:?}"),
2793 };
2794 }
2795
2796 fn start_sbc_stream(
2797 exec: &mut fasync::TestExecutor,
2798 media_test_builder: &mut TestMediaTaskBuilder,
2799 peer: &Peer,
2800 remote_peer: &avdtp::Peer,
2801 local_id: &StreamEndpointId,
2802 remote_id: &StreamEndpointId,
2803 ) -> TestMediaTask {
2804 let sbc_caps = sbc_capabilities();
2805 let set_config_fut = remote_peer.set_configuration(&local_id, &remote_id, &sbc_caps);
2806 let mut set_config_fut = pin!(set_config_fut);
2807
2808 match exec.run_until_stalled(&mut set_config_fut) {
2809 Poll::Ready(Ok(())) => {}
2810 x => panic!("Set capabilities should be ready but got {:?}", x),
2811 };
2812
2813 let open_fut = remote_peer.open(&local_id);
2814 let mut open_fut = pin!(open_fut);
2815 match exec.run_until_stalled(&mut open_fut) {
2816 Poll::Ready(Ok(())) => {}
2817 x => panic!("Open should be ready but got {:?}", x),
2818 };
2819
2820 let (_remote_transport, transport) = Channel::create();
2822 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2823
2824 let stream_ids = [local_id.clone()];
2826 let start_fut = remote_peer.start(&stream_ids);
2827 let mut start_fut = pin!(start_fut);
2828 match exec.run_until_stalled(&mut start_fut) {
2829 Poll::Ready(Ok(())) => {}
2830 x => panic!("Start should be ready but got {:?}", x),
2831 };
2832
2833 let media_task = media_test_builder.expect_task();
2835 assert!(media_task.is_started());
2836
2837 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
2838 media_task
2839 }
2840
2841 #[fuchsia::test]
2842 fn permits_can_be_revoked_and_reinstated_all() {
2843 let mut exec = fasync::TestExecutor::new();
2844
2845 let mut streams = Streams::default();
2846 let mut test_builder = TestMediaTaskBuilder::new();
2847 streams.insert(Stream::build(
2848 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2849 test_builder.builder(),
2850 ));
2851 let sbc_endpoint_id = 1_u8.try_into().unwrap();
2852 let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2853
2854 streams.insert(Stream::build(
2855 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2856 test_builder.builder(),
2857 ));
2858 let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2859 let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2860
2861 let permits = Permits::new(2);
2862
2863 let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2864 let remote_peer = avdtp::Peer::new(remote);
2865
2866 let one_media_task = start_sbc_stream(
2867 &mut exec,
2868 &mut test_builder,
2869 &peer,
2870 &remote_peer,
2871 &sbc_endpoint_id,
2872 &remote_sbc_endpoint_id,
2873 );
2874 let two_media_task = start_sbc_stream(
2875 &mut exec,
2876 &mut test_builder,
2877 &peer,
2878 &remote_peer,
2879 &sbc2_endpoint_id,
2880 &remote_sbc2_endpoint_id,
2881 );
2882
2883 let taken_permits = permits.seize();
2885
2886 let remote_endpoints: HashSet<_> =
2887 [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2888
2889 let mut remote_requests = remote_peer.take_request_stream();
2891 let mut expected_suspends = remote_endpoints.clone();
2892 while !expected_suspends.is_empty() {
2893 match exec.run_until_stalled(&mut remote_requests.next()) {
2894 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2895 for stream_id in stream_ids {
2896 assert!(expected_suspends.remove(&stream_id));
2897 }
2898 responder.send().expect("send response okay");
2899 }
2900 x => panic!("Expected suspension and got {:?}", x),
2901 }
2902 }
2903
2904 assert!(!one_media_task.is_started());
2906 assert!(!two_media_task.is_started());
2907
2908 drop(taken_permits);
2910
2911 let mut expected_starts = remote_endpoints.clone();
2912 while !expected_starts.is_empty() {
2913 match exec.run_singlethreaded(&mut remote_requests.next()) {
2914 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2915 for stream_id in stream_ids {
2916 assert!(expected_starts.remove(&stream_id));
2917 }
2918 responder.send().expect("send response okay");
2919 }
2920 x => panic!("Expected start and got {:?}", x),
2921 }
2922 }
2923 let one_media_task = test_builder.expect_task();
2926 assert!(one_media_task.is_started());
2927 let two_media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
2928 Poll::Ready(Some(task)) => task,
2929 x => panic!("Expected another ready task but {x:?}"),
2930 };
2931 assert!(two_media_task.is_started());
2932 }
2933
2934 #[fuchsia::test]
2935 fn permits_can_be_revoked_one_at_a_time() {
2936 let mut exec = fasync::TestExecutor::new();
2937
2938 let mut streams = Streams::default();
2939 let mut test_builder = TestMediaTaskBuilder::new();
2940 streams.insert(Stream::build(
2941 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2942 test_builder.builder(),
2943 ));
2944 let sbc_endpoint_id = 1_u8.try_into().unwrap();
2945 let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2946
2947 streams.insert(Stream::build(
2948 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2949 test_builder.builder(),
2950 ));
2951 let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2952 let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2953
2954 let permits = Permits::new(2);
2955
2956 let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2957 let remote_peer = avdtp::Peer::new(remote);
2958
2959 let one_media_task = start_sbc_stream(
2960 &mut exec,
2961 &mut test_builder,
2962 &peer,
2963 &remote_peer,
2964 &sbc_endpoint_id,
2965 &remote_sbc_endpoint_id,
2966 );
2967 let two_media_task = start_sbc_stream(
2968 &mut exec,
2969 &mut test_builder,
2970 &peer,
2971 &remote_peer,
2972 &sbc2_endpoint_id,
2973 &remote_sbc2_endpoint_id,
2974 );
2975
2976 let taken_permit = permits.take();
2978
2979 let remote_endpoints: HashSet<_> =
2980 [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2981
2982 let mut remote_requests = remote_peer.take_request_stream();
2984 let suspended_id = match exec.run_until_stalled(&mut remote_requests.next()) {
2985 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2986 assert!(stream_ids.len() == 1);
2987 assert!(remote_endpoints.contains(&stream_ids[0]));
2988 responder.send().expect("send response okay");
2989 stream_ids[0].clone()
2990 }
2991 x => panic!("Expected suspension and got {:?}", x),
2992 };
2993
2994 if suspended_id == remote_sbc_endpoint_id {
2996 assert!(!one_media_task.is_started());
2997 assert!(two_media_task.is_started());
2998 } else {
2999 assert!(one_media_task.is_started());
3000 assert!(!two_media_task.is_started());
3001 }
3002
3003 drop(taken_permit);
3005
3006 match exec.run_singlethreaded(&mut remote_requests.next()) {
3007 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
3008 assert_eq!(stream_ids, &[suspended_id]);
3009 responder.send().expect("send response okay");
3010 }
3011 x => panic!("Expected start and got {:?}", x),
3012 }
3013 let media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
3015 Poll::Ready(Some(task)) => task,
3016 x => panic!("Expected media task to start: {x:?}"),
3017 };
3018 assert!(media_task.is_started());
3019 }
3020
3021 #[fuchsia::test]
3024 fn permit_suspend_start_while_suspending() {
3025 let mut exec = fasync::TestExecutor::new();
3026
3027 let mut streams = Streams::default();
3028 let mut test_builder = TestMediaTaskBuilder::new();
3029 streams.insert(Stream::build(
3030 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
3031 test_builder.builder(),
3032 ));
3033 streams.insert(Stream::build(
3034 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
3035 test_builder.builder(),
3036 ));
3037 let mut next_task_fut = test_builder.next_task();
3038
3039 let permits = Permits::new(1);
3040 let (remote, _profile_request_stream, _, peer) =
3041 setup_test_peer(false, streams, Some(permits.clone()));
3042
3043 let remote_peer = avdtp::Peer::new(remote);
3044 let mut remote_requests = remote_peer.take_request_stream();
3045
3046 let sbc_endpoint_id = 1_u8.try_into().unwrap();
3047
3048 let sbc_caps = sbc_capabilities();
3049 let mut set_config_fut =
3050 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
3051
3052 match exec.run_until_stalled(&mut set_config_fut) {
3053 Poll::Ready(Ok(())) => {}
3054 x => panic!("Set capabilities should be ready but got {:?}", x),
3055 };
3056
3057 let mut open_fut = remote_peer.open(&sbc_endpoint_id);
3058 match exec.run_until_stalled(&mut open_fut) {
3059 Poll::Ready(Ok(())) => {}
3060 x => panic!("Open should be ready but got {:?}", x),
3061 };
3062
3063 let (_remote_transport, transport) = Channel::create();
3065 assert_eq!(Some(()), peer.receive_channel(transport).ok());
3066
3067 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3069 let Some(_deadline) = exec.wake_next_timer() else {
3070 panic!("Expected a timer to be waiting to run");
3071 };
3072
3073 let start_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3075 Some(Ok(avdtp::Request::Start { stream_ids, responder })) => {
3076 assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3077 responder
3078 }
3079 x => panic!("Expected a Start request, got {x:?}"),
3080 };
3081
3082 assert!(permits.get().is_none());
3083
3084 let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
3086
3087 match exec.run_singlethreaded(&mut start_fut) {
3090 Ok(()) => {}
3091 x => panic!("Expected OK response from start future but got {x:?}"),
3092 }
3093
3094 let suspend_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3095 Some(Ok(avdtp::Request::Suspend { stream_ids, responder })) => {
3096 assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3097 responder
3098 }
3099 x => panic!("Expected a suspend got {x:?}"),
3100 };
3101
3102 start_responder.send().unwrap();
3104
3105 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3106
3107 suspend_responder.send().unwrap();
3109
3110 let media_task = match exec.run_until_stalled(&mut next_task_fut) {
3112 Poll::Ready(Some(task)) => task,
3113 x => panic!("Local task should be created at this point: {:?}", x),
3114 };
3115
3116 assert!(media_task.is_started());
3117 }
3118
3119 #[fuchsia::test]
3122 fn version_check() {
3123 let p1: ProfileDescriptor = ProfileDescriptor {
3124 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3125 major_version: Some(1),
3126 minor_version: Some(3),
3127 ..Default::default()
3128 };
3129 assert_eq!(true, a2dp_version_check(p1));
3130
3131 let p1: ProfileDescriptor = ProfileDescriptor {
3132 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3133 major_version: Some(2),
3134 minor_version: Some(10),
3135 ..Default::default()
3136 };
3137 assert_eq!(true, a2dp_version_check(p1));
3138
3139 let p1: ProfileDescriptor = ProfileDescriptor {
3140 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3141 major_version: Some(1),
3142 minor_version: Some(0),
3143 ..Default::default()
3144 };
3145 assert_eq!(false, a2dp_version_check(p1));
3146
3147 let p1: ProfileDescriptor = ProfileDescriptor {
3148 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3149 major_version: None,
3150 minor_version: Some(9),
3151 ..Default::default()
3152 };
3153 assert_eq!(false, a2dp_version_check(p1));
3154
3155 let p1: ProfileDescriptor = ProfileDescriptor {
3156 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3157 major_version: Some(2),
3158 minor_version: Some(2),
3159 ..Default::default()
3160 };
3161 assert_eq!(true, a2dp_version_check(p1));
3162 }
3163}