1use anyhow::Context as _;
6use bt_avdtp::{
7 self as avdtp, MediaCodecType, ServiceCapability, ServiceCategory, StreamEndpoint,
8 StreamEndpointId,
9};
10use fidl_fuchsia_bluetooth::ChannelParameters;
11use fidl_fuchsia_bluetooth_bredr::{
12 ConnectParameters, L2capParameters, ProfileDescriptor, ProfileProxy, PSM_AVDTP,
13};
14use fuchsia_async::{self as fasync, DurationExt};
15use fuchsia_bluetooth::inspect::DebugExt;
16use fuchsia_bluetooth::types::{Channel, PeerId};
17use fuchsia_inspect as inspect;
18use fuchsia_inspect_derive::{AttachError, Inspect};
19use fuchsia_sync::Mutex;
20use futures::channel::mpsc;
21use futures::future::{BoxFuture, Either};
22use futures::stream::FuturesUnordered;
23use futures::task::{Context, Poll, Waker};
24use futures::{select, Future, FutureExt, StreamExt};
25use log::{debug, info, trace, warn};
26use std::collections::{BTreeMap, HashMap, HashSet};
27use std::pin::Pin;
28use std::sync::{Arc, Weak};
29
30mod controller;
32pub use controller::ControllerPool;
33
34use crate::codec::MediaCodecConfig;
35use crate::permits::{Permit, Permits};
36use crate::stream::{Stream, Streams};
37
38#[derive(Inspect)]
41pub struct Peer {
42 id: PeerId,
44 #[inspect(forward)]
46 inner: Arc<Mutex<PeerInner>>,
47 profile: ProfileProxy,
49 descriptor: Mutex<Option<ProfileDescriptor>>,
51 closed_wakers: Arc<Mutex<Option<Vec<Waker>>>>,
55 metrics: bt_metrics::MetricsLogger,
57 start_stream_task: Mutex<Option<fasync::Task<avdtp::Result<()>>>>,
59}
60
61#[derive(Clone)]
65struct StreamPermits {
66 permits: Permits,
67 open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
68 reserved_streams: Arc<Mutex<HashSet<StreamEndpointId>>>,
69 inner: Weak<Mutex<PeerInner>>,
70 peer_id: PeerId,
71 sender: mpsc::UnboundedSender<BoxFuture<'static, StreamPermit>>,
72}
73
74#[derive(Debug)]
75struct StreamPermit {
76 local_id: StreamEndpointId,
77 open_streams: Arc<Mutex<HashMap<StreamEndpointId, Permit>>>,
78}
79
80impl StreamPermit {
81 fn local_id(&self) -> &StreamEndpointId {
82 &self.local_id
83 }
84
85 fn is_held(&self) -> bool {
87 self.open_streams.lock().contains_key(&self.local_id)
88 }
89}
90
91impl Drop for StreamPermit {
92 fn drop(&mut self) {
93 let _ = self.open_streams.lock().remove(&self.local_id);
94 }
95}
96
97impl StreamPermits {
98 fn new(
99 inner: Weak<Mutex<PeerInner>>,
100 peer_id: PeerId,
101 permits: Permits,
102 ) -> (Self, mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>) {
103 let (sender, reservations_receiver) = futures::channel::mpsc::unbounded();
104 (
105 Self {
106 inner,
107 permits,
108 peer_id,
109 sender,
110 open_streams: Default::default(),
111 reserved_streams: Default::default(),
112 },
113 reservations_receiver,
114 )
115 }
116
117 fn label_for(&self, local_id: &StreamEndpointId) -> String {
118 format!("{} {}", self.peer_id, local_id)
119 }
120
121 fn get(&self, local_id: StreamEndpointId) -> Option<StreamPermit> {
124 let revoke_fn = self.make_revocation_fn(&local_id);
125 let Some(permit) = self.permits.get_revokable(revoke_fn) else {
126 info!("No permits available: {:?}", self.permits);
127 return None;
128 };
129 permit.relabel(self.label_for(&local_id));
130 if let Some(_) = self.open_streams.lock().insert(local_id.clone(), permit) {
131 warn!(id:% = self.peer_id; "Started stream {local_id:?} twice, dropping previous permit");
132 }
133 Some(StreamPermit { local_id, open_streams: self.open_streams.clone() })
134 }
135
136 fn setup_reservation_for(&self, local_id: StreamEndpointId) {
139 if !self.reserved_streams.lock().insert(local_id.clone()) {
140 return;
142 }
143 let restart_stream_available_fut = {
144 let self_revoke_fn = Self::make_revocation_fn(&self, &local_id);
145 let reservation = self.permits.reserve_revokable(self_revoke_fn);
146 let open_streams = self.open_streams.clone();
147 let reserved_streams = self.reserved_streams.clone();
148 let label = self.label_for(&local_id);
149 let local_id = local_id.clone();
150 async move {
151 let permit = reservation.await;
152 permit.relabel(label);
153 if open_streams.lock().insert(local_id.clone(), permit).is_some() {
154 warn!("Reservation replaces acquired permit for {}", local_id.clone());
155 }
156 if !reserved_streams.lock().remove(&local_id) {
157 warn!(local_id:%; "Unrecorded reservation resolved");
158 }
159 StreamPermit { local_id, open_streams }
160 }
161 };
162 if let Err(e) = self.sender.unbounded_send(restart_stream_available_fut.boxed()) {
163 warn!(id:% = self.peer_id, local_id:%, e:?; "Couldn't queue reservation to finish");
164 }
165 }
166
167 fn revocation_fn(self, local_id: StreamEndpointId) -> Permit {
172 if let Ok(peer) = PeerInner::upgrade(self.inner.clone()) {
173 {
174 let mut lock = peer.lock();
175 match lock.suspend_local_stream(&local_id) {
176 Ok(remote_id) => drop(lock.peer.suspend(&[remote_id])),
177 Err(e) => warn!("Couldn't stop local stream {local_id:?}: {e:?}"),
178 }
179 }
180 self.setup_reservation_for(local_id.clone());
181 }
182 self.open_streams.lock().remove(&local_id).expect("permit revoked but don't have it")
183 }
184
185 fn make_revocation_fn(&self, local_id: &StreamEndpointId) -> impl FnOnce() -> Permit {
186 let local_id = local_id.clone();
187 let cloned = self.clone();
188 move || cloned.revocation_fn(local_id)
189 }
190}
191
192impl Peer {
193 pub fn create(
200 id: PeerId,
201 peer: avdtp::Peer,
202 streams: Streams,
203 permits: Option<Permits>,
204 profile: ProfileProxy,
205 metrics: bt_metrics::MetricsLogger,
206 ) -> Self {
207 let inner = Arc::new(Mutex::new(PeerInner::new(peer, id, streams, metrics.clone())));
208 let reservations_receiver = if let Some(permits) = permits {
209 let (stream_permits, receiver) =
210 StreamPermits::new(Arc::downgrade(&inner), id, permits);
211 inner.lock().permits = Some(stream_permits);
212 receiver
213 } else {
214 let (_, receiver) = mpsc::unbounded();
215 receiver
216 };
217 let res = Self {
218 id,
219 inner,
220 profile,
221 descriptor: Mutex::new(None),
222 closed_wakers: Arc::new(Mutex::new(Some(Vec::new()))),
223 metrics,
224 start_stream_task: Mutex::new(None),
225 };
226 res.start_requests_task(reservations_receiver);
227 res
228 }
229
230 pub fn set_descriptor(&self, descriptor: ProfileDescriptor) -> Option<ProfileDescriptor> {
231 self.descriptor.lock().replace(descriptor)
232 }
233
234 const STREAM_DWELL: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(500);
237
238 pub fn receive_channel(&self, channel: Channel) -> avdtp::Result<()> {
242 let mut lock = self.inner.lock();
243 if lock.receive_channel(channel)? {
244 let weak = Arc::downgrade(&self.inner);
245 let mut task_lock = self.start_stream_task.lock();
246 *task_lock = Some(fasync::Task::local(async move {
247 trace!("Dwelling to start remotely-opened stream..");
248 fasync::Timer::new(Self::STREAM_DWELL.after_now()).await;
249 PeerInner::start_opened(weak).await
250 }));
251 }
252 Ok(())
253 }
254
255 pub fn avdtp(&self) -> avdtp::Peer {
257 let lock = self.inner.lock();
258 lock.peer.clone()
259 }
260
261 pub fn remote_endpoints(&self) -> Option<Vec<avdtp::StreamEndpoint>> {
263 self.inner.lock().remote_endpoints()
264 }
265
266 pub fn collect_capabilities(
270 &self,
271 ) -> impl Future<Output = avdtp::Result<Vec<avdtp::StreamEndpoint>>> {
272 let avdtp = self.avdtp();
273 let get_all = self.descriptor.lock().clone().is_some_and(a2dp_version_check);
274 let inner = self.inner.clone();
275 let metrics = self.metrics.clone();
276 let peer_id = self.id;
277 async move {
278 if let Some(caps) = inner.lock().remote_endpoints() {
279 return Ok(caps);
280 }
281 trace!("Discovering peer streams..");
282 let infos = avdtp.discover().await?;
283 trace!("Discovered {} streams", infos.len());
284 let mut remote_streams = Vec::new();
285 for info in infos {
286 let capabilities = if get_all {
287 avdtp.get_all_capabilities(info.id()).await
288 } else {
289 avdtp.get_capabilities(info.id()).await
290 };
291 match capabilities {
292 Ok(capabilities) => {
293 trace!("Stream {:?}", info);
294 for cap in &capabilities {
295 trace!(" - {:?}", cap);
296 }
297 remote_streams.push(avdtp::StreamEndpoint::from_info(&info, capabilities));
298 }
299 Err(e) => {
300 info!(peer_id:%; "Stream {} capabilities failed: {:?}, skipping", info.id(), e);
301 }
302 };
303 }
304 inner.lock().set_remote_endpoints(&remote_streams);
305 Self::record_cobalt_metrics(metrics, &remote_streams);
306 Ok(remote_streams)
307 }
308 }
309
310 fn record_cobalt_metrics(metrics: bt_metrics::MetricsLogger, endpoints: &[StreamEndpoint]) {
311 let codec_metrics: HashSet<_> = endpoints
312 .iter()
313 .filter_map(|endpoint| {
314 endpoint.codec_type().map(|t| codectype_to_availability_metric(t) as u32)
315 })
316 .collect();
317 metrics
318 .log_occurrences(bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID, codec_metrics);
319
320 let cap_metrics: HashSet<_> = endpoints
321 .iter()
322 .flat_map(|endpoint| {
323 endpoint
324 .capabilities()
325 .iter()
326 .filter_map(|t| capability_to_metric(t))
327 .chain(std::iter::once(
328 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic,
329 ))
330 .map(|t| t as u32)
331 })
332 .collect();
333 metrics.log_occurrences(bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID, cap_metrics);
334 }
335
336 fn transport_channel_params() -> L2capParameters {
337 L2capParameters {
338 psm: Some(PSM_AVDTP),
339 parameters: Some(ChannelParameters {
340 max_rx_packet_size: Some(65535),
341 ..Default::default()
342 }),
343 ..Default::default()
344 }
345 }
346
347 pub fn stream_start(
352 &self,
353 remote_id: StreamEndpointId,
354 capabilities: Vec<ServiceCapability>,
355 ) -> impl Future<Output = avdtp::Result<()>> {
356 let peer = Arc::downgrade(&self.inner);
357 let peer_id = self.id.clone();
358 let avdtp = self.avdtp();
359 let profile = self.profile.clone();
360
361 async move {
362 let codec_params =
363 capabilities.iter().find(|x| x.is_codec()).ok_or(avdtp::Error::InvalidState)?;
364 let (local_id, local_capabilities) = {
365 let peer = PeerInner::upgrade(peer.clone())?;
366 let lock = peer.lock();
367 lock.find_compatible_local_capabilities(codec_params, &remote_id)?
368 };
369
370 let local_by_cat: HashMap<ServiceCategory, ServiceCapability> =
371 local_capabilities.into_iter().map(|i| (i.category(), i)).collect();
372
373 let shared_capabilities: BTreeMap<ServiceCategory, ServiceCapability> = capabilities
376 .into_iter()
377 .filter_map(|cap| {
378 let Some(local_cap) = local_by_cat.get(&cap.category()) else {
379 return None;
380 };
381 if cap.category() == ServiceCategory::MediaCodec {
382 let Ok(a) = MediaCodecConfig::try_from(&cap) else {
383 return None;
384 };
385 let Ok(b) = MediaCodecConfig::try_from(local_cap) else {
386 return None;
387 };
388 let Some(negotiated) = MediaCodecConfig::negotiate(&a, &b) else {
389 return None;
390 };
391 Some((cap.category(), (&negotiated).into()))
392 } else {
393 Some((cap.category(), cap))
394 }
395 })
396 .collect();
397 let shared_capabilities: Vec<_> = shared_capabilities.into_values().collect();
398
399 trace!("Starting stream {local_id} to remote {remote_id} with {shared_capabilities:?}");
400
401 avdtp.set_configuration(&remote_id, &local_id, &shared_capabilities).await?;
402 {
403 let strong = PeerInner::upgrade(peer.clone())?;
404 strong.lock().set_opening(&local_id, &remote_id, shared_capabilities)?;
405 }
406 avdtp.open(&remote_id).await?;
407
408 debug!(peer_id:%; "Connecting transport channel");
409 let channel = profile
410 .connect(
411 &peer_id.into(),
412 &ConnectParameters::L2cap(Self::transport_channel_params()),
413 )
414 .await
415 .context("FIDL error: {}")?
416 .or(Err(avdtp::Error::PeerDisconnected))?;
417 trace!(peer_id:%; "Connected transport channel, converting to local Channel");
418 let channel = match channel.try_into() {
419 Err(e) => {
420 warn!(peer_id:%, e:?; "Couldn't connect media transport: no channel");
421 return Err(avdtp::Error::PeerDisconnected);
422 }
423 Ok(c) => c,
424 };
425
426 trace!(peer_id:%; "Connected transport channel, passing to Peer..");
427
428 {
429 let strong = PeerInner::upgrade(peer.clone())?;
430 let _ = strong.lock().receive_channel(channel)?;
431 }
432 PeerInner::start_opened(peer).await
434 }
435 }
436
437 pub fn streaming_active(&self) -> bool {
439 self.inner.lock().is_streaming() || self.will_start_streaming()
440 }
441
442 #[cfg(test)]
444 fn is_streaming_now(&self) -> bool {
445 self.inner.lock().is_streaming_now()
446 }
447
448 fn will_start_streaming(&self) -> bool {
451 let mut task_lock = self.start_stream_task.lock();
452 if task_lock.is_none() {
453 return false;
454 }
455 let mut cx = Context::from_waker(futures::task::noop_waker_ref());
457 if let Poll::Pending = task_lock.as_mut().unwrap().poll_unpin(&mut cx) {
458 return true;
459 }
460 let _ = task_lock.take();
462 false
463 }
464
465 pub fn stream_suspend(
470 &self,
471 local_id: StreamEndpointId,
472 ) -> impl Future<Output = avdtp::Result<()>> {
473 let peer = Arc::downgrade(&self.inner);
474 PeerInner::suspend(peer, local_id)
475 }
476
477 fn start_requests_task(
480 &self,
481 mut reservations_receiver: mpsc::UnboundedReceiver<BoxFuture<'static, StreamPermit>>,
482 ) {
483 let lock = self.inner.lock();
484 let mut request_stream = lock.peer.take_request_stream().fuse();
485 let id = self.id.clone();
486 let peer = Arc::downgrade(&self.inner);
487 let mut stream_reservations = FuturesUnordered::new();
488 let disconnect_wakers = Arc::downgrade(&self.closed_wakers);
489 fuchsia_async::Task::local(async move {
490 loop {
491 select! {
492 request = request_stream.next() => {
493 match request {
494 None => break,
495 Some(Err(e)) => info!(peer_id:% = id, e:?; "Request stream error"),
496 Some(Ok(request)) => match peer.upgrade() {
497 None => return,
498 Some(p) => {
499 let result_or_future = p.lock().handle_request(request);
500 let result = match result_or_future {
501 Either::Left(result) => result,
502 Either::Right(future) => future.await,
503 };
504 if let Err(e) = result {
505 warn!(peer_id:% = id, e:?; "Error handling request");
506 }
507 }
508 },
509 }
510 },
511 reservation_fut = reservations_receiver.select_next_some() => {
512 stream_reservations.push(reservation_fut)
513 },
514 permit = stream_reservations.select_next_some() => {
515 if let Err(e) = PeerInner::start_permit(peer.clone(), permit).await {
516 warn!(peer_id:% = id, e:?; "Couldn't start stream after unpause");
517 }
518 }
519 complete => break,
520 }
521 }
522 info!(peer_id:% = id; "disconnected");
523 if let Some(wakers) = disconnect_wakers.upgrade() {
524 for waker in wakers.lock().take().unwrap_or_else(Vec::new) {
525 waker.wake();
526 }
527 }
528 })
529 .detach();
530 }
531
532 pub fn closed(&self) -> ClosedPeer {
534 ClosedPeer { inner: Arc::downgrade(&self.closed_wakers) }
535 }
536}
537
538#[must_use = "futures do nothing unless you `.await` or poll them"]
541pub struct ClosedPeer {
542 inner: Weak<Mutex<Option<Vec<Waker>>>>,
543}
544
545impl Future for ClosedPeer {
546 type Output = ();
547
548 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
549 match self.inner.upgrade() {
550 None => Poll::Ready(()),
551 Some(inner) => match inner.lock().as_mut() {
552 None => Poll::Ready(()),
553 Some(wakers) => {
554 wakers.push(cx.waker().clone());
555 Poll::Pending
556 }
557 },
558 }
559 }
560}
561
562fn a2dp_version_check(profile: ProfileDescriptor) -> bool {
564 let (Some(major), Some(minor)) = (profile.major_version, profile.minor_version) else {
565 return false;
566 };
567 (major == 1 && minor >= 3) || major > 1
568}
569
570struct PeerInner {
574 peer: avdtp::Peer,
576 peer_id: PeerId,
578 opening: Option<StreamEndpointId>,
581 local: Streams,
583 permits: Option<StreamPermits>,
585 started: HashMap<StreamEndpointId, WatchedStream>,
587 inspect: fuchsia_inspect::Node,
589 remote_endpoints: Option<Vec<StreamEndpoint>>,
591 remote_inspect: fuchsia_inspect::Node,
593 metrics: bt_metrics::MetricsLogger,
595}
596
597impl Inspect for &mut PeerInner {
598 fn iattach(self, parent: &inspect::Node, name: impl AsRef<str>) -> Result<(), AttachError> {
601 self.inspect = parent.create_child(name.as_ref());
602 self.inspect.record_string("id", self.peer_id.to_string());
603 self.local.iattach(&self.inspect, "local_streams")
604 }
605}
606
607impl PeerInner {
608 pub fn new(
609 peer: avdtp::Peer,
610 peer_id: PeerId,
611 local: Streams,
612 metrics: bt_metrics::MetricsLogger,
613 ) -> Self {
614 Self {
615 peer,
616 peer_id,
617 opening: None,
618 local,
619 permits: None,
620 started: HashMap::new(),
621 inspect: Default::default(),
622 remote_endpoints: None,
623 remote_inspect: Default::default(),
624 metrics,
625 }
626 }
627
628 fn get_mut(&mut self, local_id: &StreamEndpointId) -> Result<&mut Stream, avdtp::ErrorCode> {
630 self.local.get_mut(&local_id).ok_or(avdtp::ErrorCode::BadAcpSeid)
631 }
632
633 fn set_remote_endpoints(&mut self, endpoints: &[StreamEndpoint]) {
634 self.remote_inspect = self.inspect.create_child("remote_endpoints");
635 for endpoint in endpoints {
636 self.remote_inspect.record_child(inspect::unique_name("remote_"), |node| {
637 node.record_string("endpoint_id", endpoint.local_id().debug());
638 node.record_string("capabilities", endpoint.capabilities().debug());
639 node.record_string("type", endpoint.endpoint_type().debug());
640 });
641 }
642 self.remote_endpoints = Some(endpoints.iter().map(StreamEndpoint::as_new).collect());
643 }
644
645 fn remote_endpoints(&self) -> Option<Vec<StreamEndpoint>> {
647 self.remote_endpoints.as_ref().map(|v| v.iter().map(StreamEndpoint::as_new).collect())
648 }
649
650 fn remote_endpoint(&self, id: &StreamEndpointId) -> Option<StreamEndpoint> {
652 self.remote_endpoints
653 .as_ref()
654 .and_then(|v| v.iter().find(|v| v.local_id() == id).map(StreamEndpoint::as_new))
655 }
656
657 fn is_streaming(&self) -> bool {
659 self.is_streaming_now() || self.opening.is_some()
660 }
661
662 fn is_streaming_now(&self) -> bool {
664 self.local.streaming().next().is_some()
665 }
666
667 fn set_opening(
668 &mut self,
669 local_id: &StreamEndpointId,
670 remote_id: &StreamEndpointId,
671 capabilities: Vec<ServiceCapability>,
672 ) -> avdtp::Result<()> {
673 if self.opening.is_some() {
674 return Err(avdtp::Error::InvalidState);
675 }
676 let peer_id = self.peer_id;
677 let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
678 stream
679 .configure(&peer_id, &remote_id, capabilities)
680 .map_err(|(cat, c)| avdtp::Error::RequestInvalidExtra(c, (&cat).into()))?;
681 stream.endpoint_mut().establish().or(Err(avdtp::Error::InvalidState))?;
682 self.opening = Some(local_id.clone());
683 Ok(())
684 }
685
686 fn upgrade(weak: Weak<Mutex<Self>>) -> avdtp::Result<Arc<Mutex<Self>>> {
687 weak.upgrade().ok_or(avdtp::Error::PeerDisconnected)
688 }
689
690 async fn start_opened(weak: Weak<Mutex<Self>>) -> avdtp::Result<()> {
692 let (avdtp, stream_pairs) = {
693 let peer = Self::upgrade(weak.clone())?;
694 let peer = peer.lock();
695 let stream_pairs: Vec<(StreamEndpointId, StreamEndpointId)> = peer
696 .local
697 .open()
698 .filter_map(|stream| {
699 let endpoint = stream.endpoint();
700 endpoint.remote_id().map(|id| (endpoint.local_id().clone(), id.clone()))
701 })
702 .collect();
703 (peer.peer.clone(), stream_pairs)
704 };
705 for (local_id, remote_id) in stream_pairs {
706 let permit_result =
707 Self::upgrade(weak.clone())?.lock().get_permit_or_reserve(&local_id);
708 if let Ok(permit) = permit_result {
709 Self::initiated_start(avdtp.clone(), weak.clone(), permit, &local_id, &remote_id)
710 .await?;
711 }
712 }
713 Ok(())
714 }
715
716 async fn start_permit(weak: Weak<Mutex<Self>>, permit: StreamPermit) -> avdtp::Result<()> {
717 let local_id = permit.local_id().clone();
718 let (avdtp, remote_id) = {
719 let peer = Self::upgrade(weak.clone())?;
720 let mut peer = peer.lock();
721 let remote_id = peer
722 .get_mut(&local_id)
723 .map_err(|e| avdtp::Error::RequestInvalid(e))?
724 .endpoint()
725 .remote_id()
726 .ok_or(avdtp::Error::InvalidState)?
727 .clone();
728 (peer.peer.clone(), remote_id)
729 };
730 Self::initiated_start(avdtp, weak, Some(permit), &local_id, &remote_id).await
731 }
732
733 async fn initiated_start(
735 avdtp: avdtp::Peer,
736 weak: Weak<Mutex<Self>>,
737 permit: Option<StreamPermit>,
738 local_id: &StreamEndpointId,
739 remote_id: &StreamEndpointId,
740 ) -> avdtp::Result<()> {
741 trace!(permit:?, local_id:?, remote_id:?; "Making outgoing start request");
742 let to_start = &[remote_id.clone()];
743 avdtp.start(to_start).await?;
744 trace!("Start response received: {permit:?}");
745 let peer = Self::upgrade(weak.clone())?;
746 let (peer_id, start_result) = {
747 let mut peer = peer.lock();
748 (peer.peer_id, peer.start_local_stream(permit, &local_id))
749 };
750 if let Err(e) = start_result {
751 warn!(peer_id:%, local_id:%, remote_id:%, e:?; "Failed to start local stream, suspending");
752 avdtp.suspend(to_start).await?;
753 }
754 Ok(())
755 }
756
757 fn suspend(
759 weak: Weak<Mutex<Self>>,
760 local_id: StreamEndpointId,
761 ) -> impl Future<Output = avdtp::Result<()>> {
762 let res = (move || {
763 let peer = Self::upgrade(weak.clone())?;
764 let mut peer = peer.lock();
765 Ok((peer.peer.clone(), peer.suspend_local_stream(&local_id)?))
766 })();
767 let (avdtp, remote_id) = match res {
768 Err(e) => return futures::future::err(e).left_future(),
769 Ok(r) => r,
770 };
771 let to_suspend = &[remote_id];
772 avdtp.suspend(to_suspend).right_future()
773 }
774
775 pub fn find_compatible_local_capabilities(
778 &self,
779 codec_params: &ServiceCapability,
780 remote_id: &StreamEndpointId,
781 ) -> avdtp::Result<(StreamEndpointId, Vec<ServiceCapability>)> {
782 let config = codec_params.try_into()?;
783 let our_direction = self.remote_endpoint(remote_id).map(|e| e.endpoint_type().opposite());
784 debug!(codec_params:?, local:? = self.local; "Looking for compatible local stream");
785 self.local
786 .compatible(config)
787 .find_map(|s| {
788 let endpoint = s.endpoint();
789 if let Some(d) = our_direction {
790 if &d != endpoint.endpoint_type() {
791 return None;
792 }
793 }
794 Some((endpoint.local_id().clone(), endpoint.capabilities().clone()))
795 })
796 .ok_or(avdtp::Error::OutOfRange)
797 }
798
799 fn get_permit_or_reserve(
803 &self,
804 local_id: &StreamEndpointId,
805 ) -> Result<Option<StreamPermit>, ()> {
806 let Some(permits) = self.permits.as_ref() else {
807 return Ok(None);
808 };
809 if let Some(permit) = permits.get(local_id.clone()) {
810 return Ok(Some(permit));
811 }
812 info!(peer_id:% = self.peer_id, local_id:%; "No permit to start stream, adding a reservation");
813 permits.setup_reservation_for(local_id.clone());
814 Err(())
815 }
816
817 fn start_local_stream(
820 &mut self,
821 permit: Option<StreamPermit>,
822 local_id: &StreamEndpointId,
823 ) -> avdtp::Result<()> {
824 let peer_id = self.peer_id;
825 let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
826 if permit.as_ref().is_some_and(|p| !p.is_held()) {
829 return Err(avdtp::Error::Other(anyhow::format_err!(
830 "streaming permit revoked during setup"
831 )));
832 }
833
834 info!(peer_id:%, stream:?; "Starting");
835 let stream_finished = stream.start().map_err(|c| avdtp::Error::RequestInvalid(c))?;
836 let watched_stream = WatchedStream::new(permit, stream_finished);
838 if self.started.insert(local_id.clone(), watched_stream).is_some() {
839 warn!(peer_id:%, local_id:%; "Stream that was already started");
840 }
841 Ok(())
842 }
843
844 fn suspend_local_stream(
847 &mut self,
848 local_id: &StreamEndpointId,
849 ) -> avdtp::Result<StreamEndpointId> {
850 let peer_id = self.peer_id;
851 let stream = self.get_mut(&local_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
852 let remote_id = stream.endpoint().remote_id().ok_or(avdtp::Error::InvalidState)?.clone();
853 info!(peer_id:%; "Suspend stream local {local_id} <-> {remote_id} remote");
854 stream.suspend().map_err(|c| avdtp::Error::RequestInvalid(c))?;
855 let _ = self.started.remove(local_id);
856 Ok(remote_id)
857 }
858
859 fn receive_channel(&mut self, channel: Channel) -> avdtp::Result<bool> {
864 let stream_id = self.opening.as_ref().cloned().ok_or(avdtp::Error::InvalidState)?;
865 let stream = self.get_mut(&stream_id).map_err(|e| avdtp::Error::RequestInvalid(e))?;
866 let done = !stream.endpoint_mut().receive_channel(channel)?;
867 if done {
868 self.opening = None;
869 }
870 info!(peer_id:% = self.peer_id, stream_id:%; "Transport connected");
871 Ok(done)
872 }
873
874 fn handle_request(
876 &mut self,
877 request: avdtp::Request,
878 ) -> Either<avdtp::Result<()>, impl Future<Output = avdtp::Result<()>>> {
879 use avdtp::ErrorCode;
880 use avdtp::Request::*;
881 trace!("Handling {request:?} from peer..");
882 let immediate_result = 'result: {
883 match request {
884 Discover { responder } => responder.send(&self.local.information()),
885 GetCapabilities { responder, stream_id }
886 | GetAllCapabilities { responder, stream_id } => match self.local.get(&stream_id) {
887 None => responder.reject(ErrorCode::BadAcpSeid),
888 Some(stream) => responder.send(stream.endpoint().capabilities()),
889 },
890 Open { responder, stream_id } => {
891 if self.opening.is_none() {
892 break 'result responder.reject(ErrorCode::BadState);
893 }
894 let Ok(stream) = self.get_mut(&stream_id) else {
895 break 'result responder.reject(ErrorCode::BadAcpSeid);
896 };
897 match stream.endpoint_mut().establish() {
898 Ok(()) => responder.send(),
899 Err(_) => responder.reject(ErrorCode::BadState),
900 }
901 }
902 Close { responder, stream_id } => {
903 let peer = self.peer.clone();
904 let Ok(stream) = self.get_mut(&stream_id) else {
905 break 'result responder.reject(ErrorCode::BadAcpSeid);
906 };
907 stream.release(responder, &peer)
908 }
909 SetConfiguration { responder, local_stream_id, remote_stream_id, capabilities } => {
910 if self.opening.is_some() {
911 break 'result responder.reject(ServiceCategory::None, ErrorCode::BadState);
912 }
913 let peer_id = self.peer_id;
914 let Ok(stream) = self.get_mut(&local_stream_id) else {
915 break 'result responder
916 .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
917 };
918 match stream.configure(&peer_id, &remote_stream_id, capabilities) {
919 Ok(_) => {
920 self.opening = Some(local_stream_id);
921 responder.send()
922 }
923 Err((category, code)) => responder.reject(category, code),
924 }
925 }
926 GetConfiguration { stream_id, responder } => {
927 let Ok(stream) = self.get_mut(&stream_id) else {
928 break 'result responder.reject(ErrorCode::BadAcpSeid);
929 };
930 let Some(vec_capabilities) = stream.endpoint().get_configuration() else {
931 break 'result responder.reject(ErrorCode::BadState);
932 };
933 responder.send(vec_capabilities.as_slice())
934 }
935 Reconfigure { responder, local_stream_id, capabilities } => {
936 let Ok(stream) = self.get_mut(&local_stream_id) else {
937 break 'result responder
938 .reject(ServiceCategory::None, ErrorCode::BadAcpSeid);
939 };
940 match stream.reconfigure(capabilities) {
941 Ok(_) => responder.send(),
942 Err((cat, code)) => responder.reject(cat, code),
943 }
944 }
945 Start { responder, stream_ids } => {
946 let mut immediate_suspend = Vec::new();
947 let result = stream_ids.into_iter().try_for_each(|seid| {
949 let Some(stream) = self.local.get_mut(&seid) else {
950 return Err((seid, ErrorCode::BadAcpSeid));
951 };
952 let remote_id = stream.endpoint().remote_id().cloned();
953 let Some(remote_id) = remote_id else {
954 return Err((seid, ErrorCode::BadState));
955 };
956 let Ok(permit) = self.get_permit_or_reserve(&seid) else {
957 immediate_suspend.push(remote_id);
961 return Ok(());
962 };
963 match self.start_local_stream(permit, &seid) {
964 Ok(()) => Ok(()),
965 Err(avdtp::Error::RequestInvalid(code)) => Err((seid, code)),
966 Err(_) => Err((seid, ErrorCode::BadState)),
967 }
968 });
969 let response_result = match result {
970 Ok(()) => responder.send(),
971 Err((seid, code)) => responder.reject(&seid, code),
972 };
973 {
974 let peer = self.peer.clone();
975 return Either::Right(async move {
976 if !immediate_suspend.is_empty() {
977 peer.suspend(immediate_suspend.as_slice()).await?;
978 }
979 response_result
980 });
981 }
982 }
983 Suspend { responder, stream_ids } => {
984 for seid in stream_ids {
985 match self.suspend_local_stream(&seid) {
986 Ok(_remote_id) => {}
987 Err(avdtp::Error::RequestInvalid(code)) => {
988 break 'result responder.reject(&seid, code)
989 }
990 Err(_e) => break 'result responder.reject(&seid, ErrorCode::BadState),
991 }
992 }
993 responder.send()
994 }
995 Abort { responder, stream_id } => {
996 let Ok(stream) = self.get_mut(&stream_id) else {
997 break 'result Ok(());
999 };
1000 stream.abort();
1001 self.opening = self.opening.take().filter(|local_id| local_id != &stream_id);
1002 responder.send()
1003 }
1004 DelayReport { responder, delay, stream_id } => {
1005 let delay_ns = delay as u64 * 100000;
1007 self.metrics.log_integer(
1009 bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID,
1010 delay_ns.try_into().unwrap_or(-1),
1011 vec![],
1012 );
1013 let Some(stream) = self.local.get_mut(&stream_id) else {
1015 break 'result responder.reject(avdtp::ErrorCode::BadAcpSeid);
1016 };
1017 let delay_str = format!("delay {}.{} ms", delay / 10, delay % 10);
1018 let peer = self.peer_id;
1019 match stream.set_delay(std::time::Duration::from_nanos(delay_ns)) {
1020 Ok(()) => info!(peer:%, stream_id:%; "reported {delay_str}"),
1021 Err(avdtp::ErrorCode::BadState) => {
1022 info!(peer:%, stream_id:%; "bad state {delay_str}");
1023 break 'result responder.reject(avdtp::ErrorCode::BadState);
1024 }
1025 Err(e) => info!(peer:%, stream_id:%, e:?; "failed {delay_str}"),
1026 };
1027 responder.send()
1029 }
1030 }
1031 };
1032 Either::Left(immediate_result)
1033 }
1034}
1035
1036struct WatchedStream {
1039 _permit_task: fasync::Task<()>,
1040}
1041
1042impl WatchedStream {
1043 fn new(
1044 permit: Option<StreamPermit>,
1045 finish_fut: BoxFuture<'static, Result<(), anyhow::Error>>,
1046 ) -> Self {
1047 let permit_task = fasync::Task::spawn(async move {
1048 let _ = finish_fut.await;
1049 drop(permit);
1050 });
1051 Self { _permit_task: permit_task }
1052 }
1053}
1054
1055fn codectype_to_availability_metric(
1056 codec_type: &MediaCodecType,
1057) -> bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec {
1058 match codec_type {
1059 &MediaCodecType::AUDIO_SBC => {
1060 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc
1061 }
1062 &MediaCodecType::AUDIO_MPEG12 => {
1063 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Mpeg12
1064 }
1065 &MediaCodecType::AUDIO_AAC => {
1066 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Aac
1067 }
1068 &MediaCodecType::AUDIO_ATRAC => {
1069 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac
1070 }
1071 &MediaCodecType::AUDIO_NON_A2DP => {
1072 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::VendorSpecific
1073 }
1074 _ => bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown,
1075 }
1076}
1077
1078fn capability_to_metric(
1079 cap: &ServiceCapability,
1080) -> Option<bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability> {
1081 match cap {
1082 ServiceCapability::DelayReporting => {
1083 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport)
1084 }
1085 ServiceCapability::Reporting => {
1086 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Reporting)
1087 }
1088 ServiceCapability::Recovery { .. } => {
1089 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Recovery)
1090 }
1091 ServiceCapability::ContentProtection { .. } => {
1092 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::ContentProtection)
1093 }
1094 ServiceCapability::HeaderCompression { .. } => {
1095 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::HeaderCompression)
1096 }
1097 ServiceCapability::Multiplexing { .. } => {
1098 Some(bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Multiplexing)
1099 }
1100 other => {
1102 trace!("untracked remote peer capability: {:?}", other);
1103 None
1104 }
1105 }
1106}
1107
1108#[cfg(test)]
1109mod tests {
1110 use super::*;
1111
1112 use async_utils::PollExt;
1113 use bt_metrics::respond_to_metrics_req_for_test;
1114 use fidl::endpoints::create_proxy_and_stream;
1115 use fidl_fuchsia_bluetooth::ErrorCode;
1116 use fidl_fuchsia_bluetooth_bredr::{
1117 ProfileMarker, ProfileRequest, ProfileRequestStream, ServiceClassProfileIdentifier,
1118 };
1119 use fidl_fuchsia_metrics::{MetricEvent, MetricEventPayload};
1120 use futures::future::Either;
1121 use std::pin::pin;
1122
1123 use crate::media_task::tests::{TestMediaTask, TestMediaTaskBuilder};
1124 use crate::media_types::*;
1125 use crate::stream::tests::{make_sbc_endpoint, sbc_mediacodec_capability};
1126
1127 fn fake_metrics(
1128 ) -> (bt_metrics::MetricsLogger, fidl_fuchsia_metrics::MetricEventLoggerRequestStream) {
1129 let (c, s) = fidl::endpoints::create_proxy_and_stream::<
1130 fidl_fuchsia_metrics::MetricEventLoggerMarker,
1131 >();
1132 (bt_metrics::MetricsLogger::from_proxy(c), s)
1133 }
1134
1135 fn setup_avdtp_peer() -> (avdtp::Peer, Channel) {
1136 let (remote, signaling) = Channel::create();
1137 let peer = avdtp::Peer::new(signaling);
1138 (peer, remote)
1139 }
1140
1141 fn build_test_streams() -> Streams {
1142 let mut streams = Streams::default();
1143 let source = Stream::build(
1144 make_sbc_endpoint(1, avdtp::EndpointType::Source),
1145 TestMediaTaskBuilder::new_delayable().builder(),
1146 );
1147 streams.insert(source);
1148 let sink = Stream::build(
1149 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
1150 TestMediaTaskBuilder::new().builder(),
1151 );
1152 streams.insert(sink);
1153 streams
1154 }
1155
1156 fn build_test_streams_delayable() -> Streams {
1157 fn with_delay(seid: u8, direction: avdtp::EndpointType) -> StreamEndpoint {
1158 StreamEndpoint::new(
1159 seid,
1160 avdtp::MediaType::Audio,
1161 direction,
1162 vec![
1163 avdtp::ServiceCapability::MediaTransport,
1164 avdtp::ServiceCapability::DelayReporting,
1165 sbc_mediacodec_capability(),
1166 ],
1167 )
1168 .expect("endpoint creation should succeed")
1169 }
1170 let mut streams = Streams::default();
1171 let source = Stream::build(
1172 with_delay(1, avdtp::EndpointType::Source),
1173 TestMediaTaskBuilder::new_delayable().builder(),
1174 );
1175 streams.insert(source);
1176 let sink = Stream::build(
1177 with_delay(2, avdtp::EndpointType::Sink),
1178 TestMediaTaskBuilder::new().builder(),
1179 );
1180 streams.insert(sink);
1181 streams
1182 }
1183
1184 pub(crate) fn recv_remote(remote: &Channel) -> Result<Vec<u8>, zx::Status> {
1185 remote.read_packet()
1186 }
1187
1188 fn setup_test_peer(
1191 use_cobalt: bool,
1192 streams: Streams,
1193 permits: Option<Permits>,
1194 ) -> (
1195 Channel,
1196 ProfileRequestStream,
1197 Option<fidl_fuchsia_metrics::MetricEventLoggerRequestStream>,
1198 Peer,
1199 ) {
1200 let (avdtp, remote) = setup_avdtp_peer();
1201 let (metrics_logger, cobalt_receiver) = if use_cobalt {
1202 let (l, r) = fake_metrics();
1203 (l, Some(r))
1204 } else {
1205 (bt_metrics::MetricsLogger::default(), None)
1206 };
1207 let (profile_proxy, requests) = create_proxy_and_stream::<ProfileMarker>();
1208 let peer = Peer::create(PeerId(1), avdtp, streams, permits, profile_proxy, metrics_logger);
1209
1210 (remote, requests, cobalt_receiver, peer)
1211 }
1212
1213 fn expect_get_capabilities_and_respond(
1214 remote: &Channel,
1215 expected_seid: u8,
1216 response_capabilities: &[u8],
1217 ) {
1218 let received = recv_remote(&remote).unwrap();
1219 assert_eq!(0x00, received[0] & 0xF);
1221 assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
1223
1224 let txlabel_raw = received[0] & 0xF0;
1225
1226 #[rustfmt::skip]
1228 let mut get_capabilities_rsp = vec![
1229 txlabel_raw << 4 | 0x2, 0x02 ];
1232
1233 get_capabilities_rsp.extend_from_slice(response_capabilities);
1234
1235 assert!(remote.write(&get_capabilities_rsp).is_ok());
1236 }
1237
1238 fn expect_get_all_capabilities_and_respond(
1239 remote: &Channel,
1240 expected_seid: u8,
1241 response_capabilities: &[u8],
1242 ) {
1243 let received = recv_remote(&remote).unwrap();
1244 assert_eq!(0x00, received[0] & 0xF);
1246 assert_eq!(0x0C, received[1]); assert_eq!(expected_seid << 2, received[2]);
1248
1249 let txlabel_raw = received[0] & 0xF0;
1250
1251 #[rustfmt::skip]
1253 let mut get_capabilities_rsp = vec![
1254 txlabel_raw << 4 | 0x2, 0x0C ];
1257
1258 get_capabilities_rsp.extend_from_slice(response_capabilities);
1259
1260 assert!(remote.write(&get_capabilities_rsp).is_ok());
1261 }
1262
1263 #[fuchsia::test]
1264 fn disconnected() {
1265 let mut exec = fasync::TestExecutor::new();
1266 let (proxy, _stream) = create_proxy_and_stream::<ProfileMarker>();
1267 let (remote, signaling) = Channel::create();
1268
1269 let id = PeerId(1);
1270
1271 let avdtp = avdtp::Peer::new(signaling);
1272 let peer = Peer::create(
1273 id,
1274 avdtp,
1275 Streams::default(),
1276 None,
1277 proxy,
1278 bt_metrics::MetricsLogger::default(),
1279 );
1280
1281 let closed_fut = peer.closed();
1282
1283 let mut closed_fut = pin!(closed_fut);
1284
1285 assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
1286
1287 drop(remote);
1289
1290 assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
1291 }
1292
1293 #[fuchsia::test]
1294 fn peer_collect_capabilities_success() {
1295 let mut exec = fasync::TestExecutor::new();
1296
1297 let (remote, _, cobalt_receiver, peer) = setup_test_peer(true, build_test_streams(), None);
1298
1299 let p: ProfileDescriptor = ProfileDescriptor {
1300 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1301 major_version: Some(1),
1302 minor_version: Some(2),
1303 ..Default::default()
1304 };
1305 let _ = peer.set_descriptor(p);
1306
1307 let collect_future = peer.collect_capabilities();
1308 let mut collect_future = pin!(collect_future);
1309
1310 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1311
1312 let received = recv_remote(&remote).unwrap();
1314 assert_eq!(0x00, received[0] & 0xF);
1316 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1319
1320 let response: &[u8] = &[
1322 txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
1329 assert!(remote.write(response).is_ok());
1330
1331 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1332
1333 #[rustfmt::skip]
1335 let capabilities_rsp = &[
1336 0x01, 0x00,
1338 0x07, 0x06, 0x00, 0x04, 0xF0, 0x9F, 0x92, 0x96
1340 ];
1341 expect_get_capabilities_and_respond(&remote, 0x3E, capabilities_rsp);
1342
1343 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1344
1345 #[rustfmt::skip]
1347 let capabilities_rsp = &[
1348 0x01, 0x00,
1350 0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1352 ];
1353 expect_get_capabilities_and_respond(&remote, 0x01, capabilities_rsp);
1354
1355 match exec.run_until_stalled(&mut collect_future) {
1356 Poll::Pending => panic!("collect capabilities should be complete"),
1357 Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1358 Poll::Ready(Ok(endpoints)) => {
1359 let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1360 let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1361 for stream in endpoints {
1362 if stream.local_id() == &first_seid {
1363 let expected_caps = vec![
1364 ServiceCapability::MediaTransport,
1365 ServiceCapability::MediaCodec {
1366 media_type: avdtp::MediaType::Audio,
1367 codec_type: avdtp::MediaCodecType::new(0x04),
1368 codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1369 },
1370 ];
1371 assert_eq!(&expected_caps, stream.capabilities());
1372 } else if stream.local_id() == &second_seid {
1373 let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1374 assert_eq!(Some(&expected_codec_type), stream.codec_type());
1375 } else {
1376 panic!("Unexpected endpoint in the streams collected");
1377 }
1378 }
1379 }
1380 }
1381
1382 let mut recv = cobalt_receiver.expect("should have receiver");
1384 let mut log_events = Vec::new();
1385 while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1386 log_events.push(respond_to_metrics_req_for_test(req));
1387 }
1388
1389 assert_eq!(3, log_events.len());
1391 assert!(log_events.contains(&MetricEvent {
1392 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1393 event_codes: vec![
1394 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1395 ],
1396 payload: MetricEventPayload::Count(1),
1397 }));
1398 assert!(log_events.contains(&MetricEvent {
1399 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1400 event_codes: vec![
1401 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Atrac as u32
1402 ],
1403 payload: MetricEventPayload::Count(1),
1404 }));
1405 assert!(log_events.contains(&MetricEvent {
1406 metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1407 event_codes: vec![
1408 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1409 ],
1410 payload: MetricEventPayload::Count(1),
1411 }));
1412
1413 let collect_future = peer.collect_capabilities();
1415 let mut collect_future = pin!(collect_future);
1416
1417 match exec.run_until_stalled(&mut collect_future) {
1418 Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1419 x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1420 };
1421 }
1422
1423 #[fuchsia::test]
1424 fn peer_collect_all_capabilities_success() {
1425 let mut exec = fasync::TestExecutor::new();
1426
1427 let (remote, _, cobalt_receiver, peer) = setup_test_peer(true, build_test_streams(), None);
1428 let p: ProfileDescriptor = ProfileDescriptor {
1429 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
1430 major_version: Some(1),
1431 minor_version: Some(3),
1432 ..Default::default()
1433 };
1434 let _ = peer.set_descriptor(p);
1435
1436 let collect_future = peer.collect_capabilities();
1437 let mut collect_future = pin!(collect_future);
1438
1439 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1440
1441 let received = recv_remote(&remote).unwrap();
1443 assert_eq!(0x00, received[0] & 0xF);
1445 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1448
1449 let response: &[u8] = &[
1451 txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
1458 assert!(remote.write(response).is_ok());
1459
1460 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1461
1462 #[rustfmt::skip]
1464 let capabilities_rsp = &[
1465 0x01, 0x00,
1467 0x07, 0x06, 0x00, 0x40, 0xF0, 0x9F, 0x92, 0x96,
1469 0x08, 0x00
1471 ];
1472 expect_get_all_capabilities_and_respond(&remote, 0x3E, capabilities_rsp);
1473
1474 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1475
1476 #[rustfmt::skip]
1478 let capabilities_rsp = &[
1479 0x01, 0x00,
1481 0x07, 0x04, 0x00, 0x00, 0xC0, 0xDE
1483 ];
1484 expect_get_all_capabilities_and_respond(&remote, 0x01, capabilities_rsp);
1485
1486 match exec.run_until_stalled(&mut collect_future) {
1487 Poll::Pending => panic!("collect capabilities should be complete"),
1488 Poll::Ready(Err(e)) => panic!("collect capabilities should have succeeded: {}", e),
1489 Poll::Ready(Ok(endpoints)) => {
1490 let first_seid: StreamEndpointId = 0x3E_u8.try_into().unwrap();
1491 let second_seid: StreamEndpointId = 0x01_u8.try_into().unwrap();
1492 for stream in endpoints {
1493 if stream.local_id() == &first_seid {
1494 let expected_caps = vec![
1495 ServiceCapability::MediaTransport,
1496 ServiceCapability::MediaCodec {
1497 media_type: avdtp::MediaType::Audio,
1498 codec_type: avdtp::MediaCodecType::new(0x40),
1499 codec_extra: vec![0xF0, 0x9F, 0x92, 0x96],
1500 },
1501 ServiceCapability::DelayReporting,
1502 ];
1503 assert_eq!(&expected_caps, stream.capabilities());
1504 } else if stream.local_id() == &second_seid {
1505 let expected_codec_type = avdtp::MediaCodecType::new(0x00);
1506 assert_eq!(Some(&expected_codec_type), stream.codec_type());
1507 } else {
1508 panic!("Unexpected endpoint in the streams collected");
1509 }
1510 }
1511 }
1512 }
1513
1514 let mut recv = cobalt_receiver.expect("should have receiver");
1516 let mut log_events = Vec::new();
1517 while let Poll::Ready(Some(Ok(req))) = exec.run_until_stalled(&mut recv.next()) {
1518 log_events.push(respond_to_metrics_req_for_test(req));
1519 }
1520
1521 assert_eq!(4, log_events.len());
1523 assert!(log_events.contains(&MetricEvent {
1524 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1525 event_codes: vec![
1526 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Unknown as u32
1527 ],
1528 payload: MetricEventPayload::Count(1),
1529 }));
1530 assert!(log_events.contains(&MetricEvent {
1531 metric_id: bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID,
1532 event_codes: vec![
1533 bt_metrics::A2dpCodecAvailabilityMigratedMetricDimensionCodec::Sbc as u32
1534 ],
1535 payload: MetricEventPayload::Count(1),
1536 }));
1537 assert!(log_events.contains(&MetricEvent {
1538 metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1539 event_codes: vec![
1540 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::Basic as u32
1541 ],
1542 payload: MetricEventPayload::Count(1),
1543 }));
1544 assert!(log_events.contains(&MetricEvent {
1545 metric_id: bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID,
1546 event_codes: vec![
1547 bt_metrics::A2dpRemotePeerCapabilitiesMetricDimensionCapability::DelayReport as u32
1548 ],
1549 payload: MetricEventPayload::Count(1),
1550 }));
1551
1552 let collect_future = peer.collect_capabilities();
1554 let mut collect_future = pin!(collect_future);
1555
1556 match exec.run_until_stalled(&mut collect_future) {
1557 Poll::Ready(Ok(endpoints)) => assert_eq!(2, endpoints.len()),
1558 x => panic!("Expected get remote capabilities to be done, got {:?}", x),
1559 };
1560 }
1561
1562 #[fuchsia::test]
1563 fn peer_collect_capabilities_discovery_fails() {
1564 let mut exec = fasync::TestExecutor::new();
1565
1566 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1567
1568 let collect_future = peer.collect_capabilities();
1569 let mut collect_future = pin!(collect_future);
1570
1571 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1573
1574 let received = recv_remote(&remote).unwrap();
1576 assert_eq!(0x00, received[0] & 0xF);
1578 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1581
1582 let response: &[u8] = &[
1584 txlabel_raw | 0x0 << 2 | 0x3, 0x01, 0x31, ];
1588 assert!(remote.write(response).is_ok());
1589
1590 match exec.run_until_stalled(&mut collect_future) {
1593 Poll::Pending => panic!("Should be ready after discovery failure"),
1594 Poll::Ready(Ok(x)) => panic!("Should be an error but returned {x:?}"),
1595 Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
1596 assert_eq!(Some(Ok(avdtp::ErrorCode::BadState)), e.error_code());
1597 }
1598 Poll::Ready(Err(e)) => panic!("Should have been a RemoteRejected was was {e:?}"),
1599 }
1600 }
1601
1602 #[fuchsia::test]
1603 fn peer_collect_capabilities_get_capability_fails() {
1604 let mut exec = fasync::TestExecutor::new();
1605
1606 let (remote, _, _, peer) = setup_test_peer(true, build_test_streams(), None);
1607
1608 let collect_future = peer.collect_capabilities();
1609 let mut collect_future = pin!(collect_future);
1610
1611 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1613
1614 let received = recv_remote(&remote).unwrap();
1616 assert_eq!(0x00, received[0] & 0xF);
1618 assert_eq!(0x01, received[1]); let txlabel_raw = received[0] & 0xF0;
1621
1622 let response: &[u8] = &[
1624 txlabel_raw << 4 | 0x0 << 2 | 0x2, 0x01, 0x3E << 2 | 0x0 << 1, 0x00 << 4 | 0x1 << 3, 0x01 << 2 | 0x1 << 1, 0x00 << 4 | 0x1 << 3, ];
1631 assert!(remote.write(response).is_ok());
1632
1633 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1634
1635 let expected_seid = 0x3E;
1637 let received = recv_remote(&remote).unwrap();
1638 assert_eq!(0x00, received[0] & 0xF);
1640 assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
1642
1643 let txlabel_raw = received[0] & 0xF0;
1644
1645 let response: &[u8] = &[
1646 txlabel_raw | 0x0 << 2 | 0x3, 0x02, 0x12, ];
1650 assert!(remote.write(response).is_ok());
1651
1652 assert!(exec.run_until_stalled(&mut collect_future).is_pending());
1653
1654 let expected_seid = 0x01;
1656 let received = recv_remote(&remote).unwrap();
1657 assert_eq!(0x00, received[0] & 0xF);
1659 assert_eq!(0x02, received[1]); assert_eq!(expected_seid << 2, received[2]);
1661
1662 let txlabel_raw = received[0] & 0xF0;
1663
1664 let response: &[u8] = &[
1665 txlabel_raw | 0x0 << 2 | 0x3, 0x02, 0x12, ];
1669 assert!(remote.write(response).is_ok());
1670
1671 match exec.run_until_stalled(&mut collect_future) {
1673 Poll::Pending => panic!("Should be ready after discovery failure"),
1674 Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1675 Poll::Ready(Ok(map)) => assert_eq!(0, map.len()),
1676 }
1677 }
1678
1679 fn receive_simple_accept(remote: &Channel, signal_id: u8) {
1680 let received = recv_remote(&remote).expect("expected a packet");
1681 assert_eq!(0x00, received[0] & 0xF);
1683 assert_eq!(signal_id, received[1]);
1684
1685 let txlabel_raw = received[0] & 0xF0;
1686
1687 let response: &[u8] = &[
1688 txlabel_raw | 0x0 << 2 | 0x2, signal_id,
1690 ];
1691 assert!(remote.write(response).is_ok());
1692 }
1693
1694 #[fuchsia::test]
1695 fn peer_stream_start_success() {
1696 let mut exec = fasync::TestExecutor::new();
1697
1698 let (remote, mut profile_request_stream, _, peer) =
1699 setup_test_peer(false, build_test_streams(), None);
1700
1701 let remote_seid = 2_u8.try_into().unwrap();
1702
1703 let codec_params = ServiceCapability::MediaCodec {
1704 media_type: avdtp::MediaType::Audio,
1705 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1706 codec_extra: vec![0x11, 0x45, 51, 51],
1707 };
1708
1709 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1710 let mut start_future = pin!(start_future);
1711
1712 match exec.run_until_stalled(&mut start_future) {
1713 Poll::Pending => {}
1714 x => panic!("Expected pending, but got {x:?}"),
1715 };
1716
1717 receive_simple_accept(&remote, 0x03); assert!(exec.run_until_stalled(&mut start_future).is_pending());
1720
1721 receive_simple_accept(&remote, 0x06); match exec.run_until_stalled(&mut start_future) {
1724 Poll::Pending => {}
1725 Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1726 Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1727 };
1728
1729 let (_, transport) = Channel::create();
1731
1732 let request = exec.run_until_stalled(&mut profile_request_stream.next());
1733 match request {
1734 Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
1735 assert_eq!(PeerId(1), peer_id.into());
1736 assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
1737 let channel = transport.try_into().unwrap();
1738 responder.send(Ok(channel)).expect("responder sends");
1739 }
1740 x => panic!("Should have sent a open l2cap request, but got {:?}", x),
1741 };
1742
1743 match exec.run_until_stalled(&mut start_future) {
1744 Poll::Pending => {}
1745 Poll::Ready(Err(e)) => panic!("Expected to be pending but error: {:?}", e),
1746 Poll::Ready(Ok(_)) => panic!("Expected to be pending but finished!"),
1747 };
1748
1749 receive_simple_accept(&remote, 0x07); match exec.run_until_stalled(&mut start_future) {
1754 Poll::Pending => panic!("Should be ready after start succeeds"),
1755 Poll::Ready(Err(e)) => panic!("Shouldn't be an error but returned {:?}", e),
1756 Poll::Ready(Ok(())) => {
1758 assert!(peer.is_streaming_now());
1759 }
1760 }
1761 }
1762
1763 #[fuchsia::test]
1764 fn peer_stream_start_picks_correct_direction() {
1765 let mut exec = fasync::TestExecutor::new();
1766
1767 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1768 let remote = avdtp::Peer::new(remote);
1769 let mut remote_events = remote.take_request_stream();
1770
1771 fn remote_handle_request(req: avdtp::Request) {
1773 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1774 let res = match req {
1775 avdtp::Request::Discover { responder } => {
1776 let infos = [avdtp::StreamInformation::new(
1777 expected_stream_id,
1778 false,
1779 avdtp::MediaType::Audio,
1780 avdtp::EndpointType::Source,
1781 )];
1782 responder.send(&infos)
1783 }
1784 avdtp::Request::GetAllCapabilities { stream_id, responder }
1785 | avdtp::Request::GetCapabilities { stream_id, responder } => {
1786 assert_eq!(expected_stream_id, stream_id);
1787 let caps = vec![
1788 ServiceCapability::MediaTransport,
1789 ServiceCapability::MediaCodec {
1790 media_type: avdtp::MediaType::Audio,
1791 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1792 codec_extra: vec![0x11, 0x45, 51, 250],
1793 },
1794 ];
1795 responder.send(&caps[..])
1796 }
1797 avdtp::Request::Open { responder, stream_id } => {
1798 assert_eq!(expected_stream_id, stream_id);
1799 responder.send()
1800 }
1801 avdtp::Request::SetConfiguration {
1802 responder,
1803 local_stream_id,
1804 remote_stream_id,
1805 ..
1806 } => {
1807 assert_eq!(local_stream_id, expected_stream_id);
1808 assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1810 responder.send()
1811 }
1812 x => panic!("Unexpected request: {:?}", x),
1813 };
1814 res.expect("should be able to respond");
1815 }
1816
1817 let collect_capabilities_fut = peer.collect_capabilities();
1819 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1820
1821 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1822
1823 let request = exec.run_singlethreaded(&mut remote_events.next());
1824 remote_handle_request(request.expect("should have a discovery request").unwrap());
1825
1826 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1827 let request = exec.run_singlethreaded(&mut remote_events.next());
1828 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1829
1830 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1831
1832 let remote_seid = 4_u8.try_into().unwrap();
1834
1835 let codec_params = ServiceCapability::MediaCodec {
1836 media_type: avdtp::MediaType::Audio,
1837 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1838 codec_extra: vec![0x11, 0x45, 51, 51],
1839 };
1840 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
1841 let mut start_future = pin!(start_future);
1842
1843 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1844 let request = exec.run_singlethreaded(&mut remote_events.next());
1845 remote_handle_request(request.expect("should have a set_capabilities request").unwrap());
1846
1847 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1848 let request = exec.run_singlethreaded(&mut remote_events.next());
1849 remote_handle_request(request.expect("should have an open request").unwrap());
1850 }
1851
1852 #[fuchsia::test]
1853 fn peer_stream_start_strips_unsupported_local_capabilities() {
1854 let mut exec = fasync::TestExecutor::new();
1855
1856 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams(), None);
1857 let remote = avdtp::Peer::new(remote);
1858 let mut remote_events = remote.take_request_stream();
1859
1860 fn remote_handle_request(req: avdtp::Request) {
1862 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1863 let res = match req {
1864 avdtp::Request::Discover { responder } => {
1865 let infos = [avdtp::StreamInformation::new(
1866 expected_stream_id,
1867 false,
1868 avdtp::MediaType::Audio,
1869 avdtp::EndpointType::Source,
1870 )];
1871 responder.send(&infos)
1872 }
1873 avdtp::Request::GetAllCapabilities { stream_id, responder }
1874 | avdtp::Request::GetCapabilities { stream_id, responder } => {
1875 assert_eq!(expected_stream_id, stream_id);
1876 let caps = vec![
1877 ServiceCapability::MediaTransport,
1878 ServiceCapability::DelayReporting,
1880 ServiceCapability::MediaCodec {
1881 media_type: avdtp::MediaType::Audio,
1882 codec_type: avdtp::MediaCodecType::AUDIO_AAC,
1883 codec_extra: vec![128, 0, 132, 134, 0, 0],
1884 },
1885 ];
1886 responder.send(&caps[..])
1887 }
1888 avdtp::Request::Open { responder, stream_id } => {
1889 assert_eq!(expected_stream_id, stream_id);
1890 responder.send()
1891 }
1892 avdtp::Request::SetConfiguration {
1893 responder,
1894 local_stream_id,
1895 remote_stream_id,
1896 capabilities,
1897 } => {
1898 assert_eq!(local_stream_id, expected_stream_id);
1899 assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1901 assert!(!capabilities.contains(&ServiceCapability::DelayReporting));
1904 responder.send()
1905 }
1906 x => panic!("Unexpected request: {:?}", x),
1907 };
1908 res.expect("should be able to respond");
1909 }
1910
1911 let collect_capabilities_fut = peer.collect_capabilities();
1913 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
1914
1915 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1916
1917 let request = exec.run_singlethreaded(&mut remote_events.next());
1918 remote_handle_request(request.expect("should have a discovery request").unwrap());
1919
1920 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
1921 let request = exec.run_singlethreaded(&mut remote_events.next());
1922 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
1923
1924 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
1925
1926 let remote_seid = 4_u8.try_into().unwrap();
1928
1929 let codec_params = ServiceCapability::MediaCodec {
1930 media_type: avdtp::MediaType::Audio,
1931 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1932 codec_extra: vec![0x11, 0x45, 51, 51],
1933 };
1934 let start_future =
1935 peer.stream_start(remote_seid, vec![codec_params, ServiceCapability::DelayReporting]);
1936 let mut start_future = pin!(start_future);
1937
1938 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1939 let request = exec.run_singlethreaded(&mut remote_events.next());
1940 remote_handle_request(request.expect("should have a set_configuration request").unwrap());
1941
1942 assert!(exec.run_until_stalled(&mut start_future).is_pending());
1943 let request = exec.run_singlethreaded(&mut remote_events.next());
1944 remote_handle_request(request.expect("should have an open request").unwrap());
1945 }
1946
1947 #[fuchsia::test]
1948 fn peer_stream_start_orders_local_capabilities() {
1949 let mut exec = fasync::TestExecutor::new();
1950
1951 let (remote, _, _, peer) = setup_test_peer(false, build_test_streams_delayable(), None);
1952 let remote = avdtp::Peer::new(remote);
1953 let mut remote_events = remote.take_request_stream();
1954
1955 fn remote_handle_request(req: avdtp::Request) {
1957 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
1958 let res = match req {
1959 avdtp::Request::Discover { responder } => {
1960 let infos = [avdtp::StreamInformation::new(
1961 expected_stream_id,
1962 false,
1963 avdtp::MediaType::Audio,
1964 avdtp::EndpointType::Source,
1965 )];
1966 responder.send(&infos)
1967 }
1968 avdtp::Request::GetAllCapabilities { stream_id, responder }
1969 | avdtp::Request::GetCapabilities { stream_id, responder } => {
1970 assert_eq!(expected_stream_id, stream_id);
1971 let caps = &[
1972 ServiceCapability::MediaTransport,
1973 ServiceCapability::MediaCodec {
1974 media_type: avdtp::MediaType::Audio,
1975 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
1976 codec_extra: vec![0x11, 0x45, 51, 250],
1977 },
1978 ServiceCapability::DelayReporting,
1979 ];
1980 responder.send(caps)
1981 }
1982 avdtp::Request::Open { responder, stream_id } => {
1983 assert_eq!(expected_stream_id, stream_id);
1984 responder.send()
1985 }
1986 avdtp::Request::SetConfiguration {
1987 responder,
1988 local_stream_id,
1989 remote_stream_id,
1990 capabilities,
1991 } => {
1992 assert_eq!(local_stream_id, expected_stream_id);
1993 assert_eq!(remote_stream_id, 2_u8.try_into().unwrap());
1995 let mut capabilities_ordered = capabilities.clone();
1997 capabilities_ordered.sort_by_key(ServiceCapability::category);
1998 assert_eq!(capabilities, capabilities_ordered);
1999 responder.send()
2000 }
2001 x => panic!("Unexpected request: {:?}", x),
2002 };
2003 res.expect("should be able to respond");
2004 }
2005
2006 let collect_capabilities_fut = peer.collect_capabilities();
2008 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2009
2010 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2011
2012 let request = exec.run_singlethreaded(&mut remote_events.next());
2013 remote_handle_request(request.expect("should have a discovery request").unwrap());
2014
2015 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2016 let request = exec.run_singlethreaded(&mut remote_events.next());
2017 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2018
2019 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2020
2021 let remote_seid = 4_u8.try_into().unwrap();
2023
2024 let codec_params = ServiceCapability::MediaCodec {
2025 media_type: avdtp::MediaType::Audio,
2026 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2027 codec_extra: vec![0x11, 0x45, 51, 51],
2028 };
2029 let start_future = peer.stream_start(
2030 remote_seid,
2031 vec![
2032 ServiceCapability::MediaTransport,
2033 ServiceCapability::DelayReporting,
2034 codec_params,
2035 ],
2036 );
2037 let mut start_future = pin!(start_future);
2038
2039 assert!(exec.run_until_stalled(&mut start_future).is_pending());
2040 let request = exec.run_singlethreaded(&mut remote_events.next());
2041 remote_handle_request(request.expect("should have a set_configuration request").unwrap());
2042
2043 assert!(exec.run_until_stalled(&mut start_future).is_pending());
2044 let request = exec.run_singlethreaded(&mut remote_events.next());
2045 remote_handle_request(request.expect("should have an open request").unwrap());
2046 }
2047
2048 #[fuchsia::test]
2051 fn peer_stream_start_permit_revoked() {
2052 let mut exec = fasync::TestExecutor::new();
2053
2054 let test_permits = Permits::new(1);
2055 let (remote, mut profile_request_stream, _, peer) =
2056 setup_test_peer(false, build_test_streams(), Some(test_permits.clone()));
2057
2058 let remote_seid = 2_u8.try_into().unwrap();
2059
2060 let codec_params = ServiceCapability::MediaCodec {
2061 media_type: avdtp::MediaType::Audio,
2062 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2063 codec_extra: vec![0x11, 0x45, 51, 51],
2064 };
2065
2066 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2067 let mut start_future = pin!(start_future);
2068
2069 let _ = exec
2070 .run_until_stalled(&mut start_future)
2071 .expect_pending("waiting for set config response");
2072 receive_simple_accept(&remote, 0x03); exec.run_until_stalled(&mut start_future).expect_pending("waiting for open response");
2074 receive_simple_accept(&remote, 0x06); exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2076 assert!(!peer.is_streaming_now());
2077
2078 let (_, transport) = Channel::create();
2080
2081 let request = exec.run_until_stalled(&mut profile_request_stream.next());
2082 match request {
2083 Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, connection, responder }))) => {
2084 assert_eq!(PeerId(1), peer_id.into());
2085 assert_eq!(connection, ConnectParameters::L2cap(Peer::transport_channel_params()));
2086 let channel = transport.try_into().unwrap();
2087 responder.send(Ok(channel)).expect("responder sends");
2088 }
2089 x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2090 };
2091
2092 exec.run_until_stalled(&mut start_future).expect_pending("waiting for media transport");
2093 assert!(!peer.is_streaming_now());
2094
2095 let seized_permits = test_permits.seize();
2097 assert_eq!(seized_permits.len(), 1);
2098 receive_simple_accept(&remote, 0x07); exec.run_until_stalled(&mut start_future)
2103 .expect_pending("waiting to send outgoing suspend");
2104 assert!(!peer.is_streaming_now());
2105 receive_simple_accept(&remote, 0x09); let () = exec
2111 .run_until_stalled(&mut start_future)
2112 .expect("start finished")
2113 .expect("suspended stream is ok");
2114 assert!(!peer.is_streaming_now());
2115 }
2116
2117 #[fuchsia::test]
2118 fn peer_stream_start_fails_wrong_direction() {
2119 let mut exec = fasync::TestExecutor::new();
2120
2121 let mut streams = Streams::default();
2123 let source = Stream::build(
2124 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2125 TestMediaTaskBuilder::new().builder(),
2126 );
2127 streams.insert(source);
2128
2129 let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2130 let remote = avdtp::Peer::new(remote);
2131 let mut remote_events = remote.take_request_stream();
2132
2133 fn remote_handle_request(req: avdtp::Request) {
2135 let expected_stream_id: StreamEndpointId = 2_u8.try_into().unwrap();
2136 let res = match req {
2137 avdtp::Request::Discover { responder } => {
2138 let infos = [avdtp::StreamInformation::new(
2139 expected_stream_id,
2140 false,
2141 avdtp::MediaType::Audio,
2142 avdtp::EndpointType::Source,
2143 )];
2144 responder.send(&infos)
2145 }
2146 avdtp::Request::GetAllCapabilities { stream_id, responder }
2147 | avdtp::Request::GetCapabilities { stream_id, responder } => {
2148 assert_eq!(expected_stream_id, stream_id);
2149 let caps = vec![
2150 ServiceCapability::MediaTransport,
2151 ServiceCapability::MediaCodec {
2152 media_type: avdtp::MediaType::Audio,
2153 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2154 codec_extra: vec![0x11, 0x45, 51, 250],
2155 },
2156 ];
2157 responder.send(&caps[..])
2158 }
2159 avdtp::Request::Open { responder, .. } => responder.send(),
2160 avdtp::Request::SetConfiguration { responder, .. } => responder.send(),
2161 x => panic!("Unexpected request: {:?}", x),
2162 };
2163 res.expect("should be able to respond");
2164 }
2165
2166 let collect_capabilities_fut = peer.collect_capabilities();
2168 let mut collect_capabilities_fut = pin!(collect_capabilities_fut);
2169
2170 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2171
2172 let request = exec.run_singlethreaded(&mut remote_events.next());
2173 remote_handle_request(request.expect("should have a discovery request").unwrap());
2174
2175 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_pending());
2176 let request = exec.run_singlethreaded(&mut remote_events.next());
2177 remote_handle_request(request.expect("should have a get_capabilities request").unwrap());
2178
2179 assert!(exec.run_until_stalled(&mut collect_capabilities_fut).is_ready());
2180
2181 let remote_seid = 2_u8.try_into().unwrap();
2183
2184 let codec_params = ServiceCapability::MediaCodec {
2185 media_type: avdtp::MediaType::Audio,
2186 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2187 codec_extra: vec![0x11, 0x45, 51, 51],
2188 };
2189 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2190 let mut start_future = pin!(start_future);
2191
2192 match exec.run_until_stalled(&mut start_future) {
2193 Poll::Ready(Err(avdtp::Error::OutOfRange)) => {}
2194 x => panic!("Expected a ready OutOfRange error but got {:?}", x),
2195 };
2196 }
2197
2198 #[fuchsia::test]
2199 fn peer_stream_start_fails_to_connect() {
2200 let mut exec = fasync::TestExecutor::new();
2201
2202 let (remote, mut profile_request_stream, _, peer) =
2203 setup_test_peer(false, build_test_streams(), None);
2204
2205 let remote_seid = 2_u8.try_into().unwrap();
2206
2207 let codec_params = ServiceCapability::MediaCodec {
2208 media_type: avdtp::MediaType::Audio,
2209 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2210 codec_extra: vec![0x11, 0x45, 51, 51],
2211 };
2212
2213 let start_future = peer.stream_start(remote_seid, vec![codec_params]);
2214 let mut start_future = pin!(start_future);
2215
2216 match exec.run_until_stalled(&mut start_future) {
2217 Poll::Pending => {}
2218 x => panic!("was expecting pending but got {x:?}"),
2219 };
2220
2221 receive_simple_accept(&remote, 0x03); assert!(exec.run_until_stalled(&mut start_future).is_pending());
2224
2225 receive_simple_accept(&remote, 0x06); match exec.run_until_stalled(&mut start_future) {
2228 Poll::Pending => {}
2229 Poll::Ready(x) => panic!("Expected to be pending but {x:?}"),
2230 };
2231
2232 let request = exec.run_until_stalled(&mut profile_request_stream.next());
2234 match request {
2235 Poll::Ready(Some(Ok(ProfileRequest::Connect { peer_id, responder, .. }))) => {
2236 assert_eq!(PeerId(1), peer_id.into());
2237 responder.send(Err(ErrorCode::Failed)).expect("responder sends");
2238 }
2239 x => panic!("Should have sent a open l2cap request, but got {:?}", x),
2240 };
2241
2242 match exec.run_until_stalled(&mut start_future) {
2245 Poll::Pending => panic!("Should be ready after start fails"),
2246 Poll::Ready(Ok(_stream)) => panic!("Shouldn't have succeeded stream here"),
2247 Poll::Ready(Err(_)) => {}
2248 }
2249 }
2250
2251 #[fuchsia::test]
2253 async fn peer_delay_report() {
2254 let (remote, _profile_requests, cobalt_recv, peer) =
2255 setup_test_peer(true, build_test_streams(), None);
2256 let remote_peer = avdtp::Peer::new(remote);
2257 let mut remote_events = remote_peer.take_request_stream();
2258
2259 async fn remote_handle_request(req: avdtp::Request, peer: &avdtp::Peer) {
2261 let expected_stream_id: StreamEndpointId = 4_u8.try_into().unwrap();
2262 let expected_peer_stream_id: StreamEndpointId = 1_u8.try_into().unwrap();
2264 use avdtp::Request::*;
2265 match req {
2266 Discover { responder } => {
2267 let infos = [avdtp::StreamInformation::new(
2268 expected_stream_id,
2269 false,
2270 avdtp::MediaType::Audio,
2271 avdtp::EndpointType::Sink,
2272 )];
2273 responder.send(&infos).expect("response should succeed");
2274 }
2275 GetAllCapabilities { stream_id, responder }
2276 | GetCapabilities { stream_id, responder } => {
2277 assert_eq!(expected_stream_id, stream_id);
2278 let caps = vec![
2279 ServiceCapability::MediaTransport,
2280 ServiceCapability::MediaCodec {
2281 media_type: avdtp::MediaType::Audio,
2282 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2283 codec_extra: vec![0x11, 0x45, 51, 250],
2284 },
2285 ];
2286 responder.send(&caps[..]).expect("response should succeed");
2287 assert!(peer.delay_report(&expected_peer_stream_id, 0xc0de).await.is_err());
2290 }
2291 Open { responder, stream_id } => {
2292 assert!(peer.delay_report(&expected_stream_id, 0xc0de).await.is_err());
2294 peer.delay_report(&expected_peer_stream_id, 0xc0de)
2296 .await
2297 .expect("should get acked correctly");
2298 assert_eq!(expected_stream_id, stream_id);
2299 responder.send().expect("response should succeed");
2300 }
2301 SetConfiguration { responder, local_stream_id, remote_stream_id, .. } => {
2302 assert_eq!(local_stream_id, expected_stream_id);
2303 assert_eq!(remote_stream_id, expected_peer_stream_id);
2304 responder.send().expect("should send back response without issue");
2305 }
2306 x => panic!("Unexpected request: {:?}", x),
2307 };
2308 }
2309
2310 let collect_fut = pin!(peer.collect_capabilities());
2311
2312 let Either::Left((request, collect_fut)) =
2314 futures::future::select(remote_events.next(), collect_fut).await
2315 else {
2316 panic!("Collect future shouldn't finish first");
2317 };
2318 let collect_fut = pin!(collect_fut);
2319 remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2320 let Either::Left((request, collect_fut)) =
2321 futures::future::select(remote_events.next(), collect_fut).await
2322 else {
2323 panic!("Collect future shouldn't finish first");
2324 };
2325 remote_handle_request(request.expect("a request").unwrap(), &remote_peer).await;
2326
2327 assert_eq!(1, collect_fut.await.expect("should get the remote endpoints back").len());
2329
2330 let remote_seid = 4_u8.try_into().unwrap();
2332
2333 let codec_params = ServiceCapability::MediaCodec {
2334 media_type: avdtp::MediaType::Audio,
2335 codec_type: avdtp::MediaCodecType::AUDIO_SBC,
2336 codec_extra: vec![0x11, 0x45, 51, 51],
2337 };
2338
2339 let _start_task = fasync::Task::spawn(async move {
2342 let _ = peer.stream_start(remote_seid, vec![codec_params]).await;
2343 panic!("stream start task finished");
2344 });
2345
2346 let request = remote_events.next().await.expect("should have set_config").unwrap();
2347 remote_handle_request(request, &remote_peer).await;
2348
2349 let request = remote_events.next().await.expect("should have open").unwrap();
2350 remote_handle_request(request, &remote_peer).await;
2351
2352 let mut cobalt = cobalt_recv.expect("should have receiver");
2353
2354 let mut got_ids = HashMap::new();
2355 let delay_metric_id = bt_metrics::AVDTP_DELAY_REPORT_IN_NANOSECONDS_METRIC_ID;
2356 while got_ids.len() < 3 || *got_ids.get(&delay_metric_id).unwrap_or(&0) < 3 {
2357 let report = respond_to_metrics_req_for_test(cobalt.next().await.unwrap().unwrap());
2358 let _ = got_ids.entry(report.metric_id).and_modify(|x| *x += 1).or_insert(1);
2359 if report.metric_id == delay_metric_id {
2361 assert_eq!(MetricEventPayload::IntegerValue(0xc0de * 100000), report.payload);
2362 }
2363 }
2364 assert!(got_ids.contains_key(&bt_metrics::A2DP_CODEC_AVAILABILITY_MIGRATED_METRIC_ID));
2365 assert!(got_ids.contains_key(&bt_metrics::A2DP_REMOTE_PEER_CAPABILITIES_METRIC_ID));
2366 assert!(got_ids.contains_key(&delay_metric_id));
2367 assert_eq!(got_ids.get(&delay_metric_id).cloned(), Some(3));
2370 }
2371
2372 fn sbc_capabilities() -> Vec<ServiceCapability> {
2373 let sbc_codec_info = SbcCodecInfo::new(
2374 SbcSamplingFrequency::FREQ48000HZ,
2375 SbcChannelMode::JOINT_STEREO,
2376 SbcBlockCount::SIXTEEN,
2377 SbcSubBands::EIGHT,
2378 SbcAllocation::LOUDNESS,
2379 53,
2380 53,
2381 )
2382 .expect("sbc codec info");
2383
2384 vec![avdtp::ServiceCapability::MediaTransport, sbc_codec_info.into()]
2385 }
2386
2387 #[fuchsia::test]
2389 fn peer_as_acceptor() {
2390 let mut exec = fasync::TestExecutor::new();
2391
2392 let mut streams = Streams::default();
2393 let mut test_builder = TestMediaTaskBuilder::new();
2394 streams.insert(Stream::build(
2395 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2396 test_builder.builder(),
2397 ));
2398
2399 let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2400 let remote_peer = avdtp::Peer::new(remote);
2401
2402 let discover_fut = remote_peer.discover();
2403 let mut discover_fut = pin!(discover_fut);
2404
2405 let expected = vec![make_sbc_endpoint(1, avdtp::EndpointType::Source).information()];
2406 match exec.run_until_stalled(&mut discover_fut) {
2407 Poll::Ready(Ok(res)) => assert_eq!(res, expected),
2408 x => panic!("Expected discovery to complete and got {:?}", x),
2409 };
2410
2411 let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2412 let unknown_endpoint_id = 2_u8.try_into().expect("should be able to get sbc endpointid");
2413
2414 let get_caps_fut = remote_peer.get_capabilities(&sbc_endpoint_id);
2415 let mut get_caps_fut = pin!(get_caps_fut);
2416
2417 match exec.run_until_stalled(&mut get_caps_fut) {
2418 Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2420 x => panic!("Get capabilities should be ready but got {:?}", x),
2421 };
2422
2423 let get_caps_fut = remote_peer.get_capabilities(&unknown_endpoint_id);
2424 let mut get_caps_fut = pin!(get_caps_fut);
2425
2426 match exec.run_until_stalled(&mut get_caps_fut) {
2427 Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2428 assert_eq!(Some(Ok(avdtp::ErrorCode::BadAcpSeid)), e.error_code())
2429 }
2430 x => panic!("Get capabilities should be a ready error but got {:?}", x),
2431 };
2432
2433 let get_caps_fut = remote_peer.get_all_capabilities(&sbc_endpoint_id);
2434 let mut get_caps_fut = pin!(get_caps_fut);
2435
2436 match exec.run_until_stalled(&mut get_caps_fut) {
2437 Poll::Ready(Ok(caps)) => assert_eq!(2, caps.len()),
2439 x => panic!("Get capabilities should be ready but got {:?}", x),
2440 };
2441
2442 let sbc_caps = sbc_capabilities();
2443 let set_config_fut =
2444 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2445 let mut set_config_fut = pin!(set_config_fut);
2446
2447 match exec.run_until_stalled(&mut set_config_fut) {
2448 Poll::Ready(Ok(())) => {}
2449 x => panic!("Set capabilities should be ready but got {:?}", x),
2450 };
2451
2452 let open_fut = remote_peer.open(&sbc_endpoint_id);
2453 let mut open_fut = pin!(open_fut);
2454 match exec.run_until_stalled(&mut open_fut) {
2455 Poll::Ready(Ok(())) => {}
2456 x => panic!("Open should be ready but got {:?}", x),
2457 };
2458
2459 let (_remote_transport, transport) = Channel::create();
2461
2462 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2463
2464 let stream_ids = vec![sbc_endpoint_id.clone()];
2465 let start_fut = remote_peer.start(&stream_ids);
2466 let mut start_fut = pin!(start_fut);
2467 match exec.run_until_stalled(&mut start_fut) {
2468 Poll::Ready(Ok(())) => {}
2469 x => panic!("Start should be ready but got {:?}", x),
2470 };
2471
2472 let media_task = test_builder.expect_task();
2474 assert!(media_task.is_started());
2475
2476 let suspend_fut = remote_peer.suspend(&stream_ids);
2477 let mut suspend_fut = pin!(suspend_fut);
2478 match exec.run_until_stalled(&mut suspend_fut) {
2479 Poll::Ready(Ok(())) => {}
2480 x => panic!("Start should be ready but got {:?}", x),
2481 };
2482
2483 assert!(!media_task.is_started());
2485 }
2486
2487 #[fuchsia::test]
2488 fn peer_set_config_reject_first() {
2489 let mut exec = fasync::TestExecutor::new();
2490
2491 let mut streams = Streams::default();
2492 let test_builder = TestMediaTaskBuilder::new();
2493 streams.insert(Stream::build(
2494 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2495 test_builder.builder(),
2496 ));
2497
2498 let (remote, _requests, _, _peer) = setup_test_peer(false, streams, None);
2499 let remote_peer = avdtp::Peer::new(remote);
2500
2501 let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2502
2503 let wrong_freq_sbc = &[SbcCodecInfo::new(
2504 SbcSamplingFrequency::FREQ44100HZ, SbcChannelMode::JOINT_STEREO,
2506 SbcBlockCount::SIXTEEN,
2507 SbcSubBands::EIGHT,
2508 SbcAllocation::LOUDNESS,
2509 53,
2510 53,
2511 )
2512 .expect("sbc codec info")
2513 .into()];
2514
2515 let set_config_fut =
2516 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, wrong_freq_sbc);
2517 let mut set_config_fut = pin!(set_config_fut);
2518
2519 match exec.run_until_stalled(&mut set_config_fut) {
2520 Poll::Ready(Err(avdtp::Error::RemoteRejected(e))) => {
2521 assert!(e.service_category().is_some())
2522 }
2523 x => panic!("Set capabilities should have been rejected but got {:?}", x),
2524 };
2525
2526 let sbc_caps = sbc_capabilities();
2527 let set_config_fut =
2528 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2529 let mut set_config_fut = pin!(set_config_fut);
2530
2531 match exec.run_until_stalled(&mut set_config_fut) {
2532 Poll::Ready(Ok(())) => {}
2533 x => panic!("Set capabilities should be ready but got {:?}", x),
2534 };
2535 }
2536
2537 #[fuchsia::test]
2538 fn peer_starts_waiting_streams() {
2539 let mut exec = fasync::TestExecutor::new_with_fake_time();
2540 exec.set_fake_time(fasync::MonotonicInstant::from_nanos(5_000_000_000));
2541
2542 let mut streams = Streams::default();
2543 let mut test_builder = TestMediaTaskBuilder::new();
2544 streams.insert(Stream::build(
2545 make_sbc_endpoint(1, avdtp::EndpointType::Source),
2546 test_builder.builder(),
2547 ));
2548
2549 let (remote, _requests, _, peer) = setup_test_peer(false, streams, None);
2550 let remote_peer = avdtp::Peer::new(remote);
2551
2552 let sbc_endpoint_id = 1_u8.try_into().expect("should be able to get sbc endpointid");
2553
2554 let sbc_caps = sbc_capabilities();
2555 let set_config_fut =
2556 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2557 let mut set_config_fut = pin!(set_config_fut);
2558
2559 match exec.run_until_stalled(&mut set_config_fut) {
2560 Poll::Ready(Ok(())) => {}
2561 x => panic!("Set capabilities should be ready but got {:?}", x),
2562 };
2563
2564 let open_fut = remote_peer.open(&sbc_endpoint_id);
2565 let mut open_fut = pin!(open_fut);
2566 match exec.run_until_stalled(&mut open_fut) {
2567 Poll::Ready(Ok(())) => {}
2568 x => panic!("Open should be ready but got {:?}", x),
2569 };
2570
2571 let (_remote_transport, transport) = Channel::create();
2573 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2574
2575 let mut remote_requests = remote_peer.take_request_stream();
2577 let next_remote_request_fut = remote_requests.next();
2578 let mut next_remote_request_fut = pin!(next_remote_request_fut);
2579
2580 assert!(exec.run_until_stalled(&mut next_remote_request_fut).is_pending());
2582
2583 exec.set_fake_time(zx::MonotonicDuration::from_seconds(3).after_now());
2585 let _ = exec.wake_expired_timers();
2586
2587 let stream_ids = match exec.run_until_stalled(&mut next_remote_request_fut) {
2588 Poll::Ready(Some(Ok(avdtp::Request::Start { responder, stream_ids }))) => {
2589 responder.send().unwrap();
2590 stream_ids
2591 }
2592 x => panic!("Expected to receive a start request for the stream, got {:?}", x),
2593 };
2594
2595 let media_task =
2597 exec.run_until_stalled(&mut test_builder.next_task()).expect("ready").unwrap();
2598 assert!(media_task.is_started());
2599
2600 let suspend_fut = remote_peer.suspend(&stream_ids);
2602 let mut suspend_fut = pin!(suspend_fut);
2603 match exec.run_until_stalled(&mut suspend_fut) {
2604 Poll::Ready(Ok(())) => {}
2605 x => panic!("Suspend should be ready but got {:?}", x),
2606 };
2607
2608 assert!(!media_task.is_started());
2610 }
2611
2612 #[fuchsia::test]
2613 fn needs_permit_to_start_streams() {
2614 let mut exec = fasync::TestExecutor::new();
2615
2616 let mut streams = Streams::default();
2617 let mut test_builder = TestMediaTaskBuilder::new();
2618 streams.insert(Stream::build(
2619 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2620 test_builder.builder(),
2621 ));
2622 streams.insert(Stream::build(
2623 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2624 test_builder.builder(),
2625 ));
2626 let mut next_task_fut = test_builder.next_task();
2627
2628 let permits = Permits::new(1);
2629 let taken_permit = permits.get().expect("permit taken");
2630 let (remote, _profile_request_stream, _, peer) =
2631 setup_test_peer(false, streams, Some(permits.clone()));
2632 let remote_peer = avdtp::Peer::new(remote);
2633
2634 let sbc_endpoint_id = 1_u8.try_into().unwrap();
2635
2636 let sbc_caps = sbc_capabilities();
2637 let mut set_config_fut =
2638 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
2639
2640 match exec.run_until_stalled(&mut set_config_fut) {
2641 Poll::Ready(Ok(())) => {}
2642 x => panic!("Set capabilities should be ready but got {:?}", x),
2643 };
2644
2645 let mut open_fut = remote_peer.open(&sbc_endpoint_id);
2646 match exec.run_until_stalled(&mut open_fut) {
2647 Poll::Ready(Ok(())) => {}
2648 x => panic!("Open should be ready but got {:?}", x),
2649 };
2650
2651 let (_remote_transport, transport) = Channel::create();
2653 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2654
2655 let sbc_endpoint_two = 2_u8.try_into().unwrap();
2657
2658 let mut set_config_fut =
2659 remote_peer.set_configuration(&sbc_endpoint_two, &sbc_endpoint_two, &sbc_caps);
2660
2661 match exec.run_until_stalled(&mut set_config_fut) {
2662 Poll::Ready(Ok(())) => {}
2663 x => panic!("Set capabilities should be ready but got {:?}", x),
2664 };
2665
2666 let mut open_fut = remote_peer.open(&sbc_endpoint_two);
2667 match exec.run_until_stalled(&mut open_fut) {
2668 Poll::Ready(Ok(())) => {}
2669 x => panic!("Open should be ready but got {:?}", x),
2670 };
2671
2672 let (_remote_transport_two, transport_two) = Channel::create();
2674 assert_eq!(Some(()), peer.receive_channel(transport_two).ok());
2675
2676 let unknown_endpoint_id: StreamEndpointId = 9_u8.try_into().unwrap();
2679 let stream_ids = [sbc_endpoint_id.clone(), unknown_endpoint_id.clone()];
2680 let mut start_fut = remote_peer.start(&stream_ids);
2681 match exec.run_until_stalled(&mut start_fut) {
2682 Poll::Ready(Err(avdtp::Error::RemoteRejected(rejection))) => {
2683 assert_eq!(avdtp::ErrorCode::BadAcpSeid, rejection.error_code().unwrap().unwrap());
2684 assert_eq!(unknown_endpoint_id, rejection.stream_id().unwrap());
2685 }
2686 x => panic!("Start should be ready but got {:?}", x),
2687 };
2688
2689 let mut remote_requests = remote_peer.take_request_stream();
2691
2692 let suspended_stream_ids = match exec.run_singlethreaded(&mut remote_requests.next()) {
2693 Some(Ok(avdtp::Request::Suspend { responder, stream_ids })) => {
2694 responder.send().unwrap();
2695 stream_ids
2696 }
2697 x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2698 };
2699
2700 assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2701 assert_eq!(1, suspended_stream_ids.len());
2702
2703 match exec.run_until_stalled(&mut next_task_fut) {
2705 Poll::Pending => {}
2706 x => panic!("Local task should not have been created at this point: {:?}", x),
2707 };
2708
2709 let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
2712 match exec.run_until_stalled(&mut start_fut) {
2713 Poll::Ready(Ok(())) => {}
2714 x => panic!("Start should be ready but got {:?}", x),
2715 }
2716
2717 let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2718 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2719 responder.send().unwrap();
2720 stream_ids
2721 }
2722 x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2723 };
2724 assert!(suspended_stream_ids.contains(&sbc_endpoint_id));
2725
2726 drop(taken_permit);
2728
2729 match exec.run_singlethreaded(&mut remote_requests.next()) {
2730 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2731 assert_eq!(stream_ids, &[sbc_endpoint_id.clone()]);
2732 responder.send().unwrap();
2733 }
2734 x => panic!("Expected start on permit available but got {x:?}"),
2735 };
2736
2737 let media_task = match exec.run_until_stalled(&mut next_task_fut) {
2739 Poll::Ready(Some(task)) => task,
2740 x => panic!("Local task should be created at this point: {:?}", x),
2741 };
2742
2743 assert!(media_task.is_started());
2744
2745 let mut start_fut = remote_peer.start(&[sbc_endpoint_two.clone()]);
2747 match exec.run_until_stalled(&mut start_fut) {
2748 Poll::Ready(Ok(())) => {}
2749 x => panic!("Start should be ready but got {:?}", x),
2750 }
2751
2752 let suspended_stream_ids = match exec.run_until_stalled(&mut remote_requests.next()) {
2753 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2754 responder.send().unwrap();
2755 stream_ids
2756 }
2757 x => panic!("Expected to receive a suspend request for the stream, got {:?}", x),
2758 };
2759
2760 assert!(suspended_stream_ids.contains(&sbc_endpoint_two));
2761 assert_eq!(1, suspended_stream_ids.len());
2762
2763 let mut suspend_fut = remote_peer.suspend(&[sbc_endpoint_id.clone()]);
2765 match exec.run_until_stalled(&mut suspend_fut) {
2766 Poll::Ready(Ok(())) => {}
2767 x => panic!("Start should be ready but got {:?}", x),
2768 }
2769
2770 match exec.run_singlethreaded(&mut remote_requests.next()) {
2771 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2772 assert_eq!(stream_ids, &[sbc_endpoint_two]);
2773 responder.send().unwrap();
2774 }
2775 x => panic!("Expected start on permit available but got {x:?}"),
2776 };
2777 }
2778
2779 fn start_sbc_stream(
2780 exec: &mut fasync::TestExecutor,
2781 media_test_builder: &mut TestMediaTaskBuilder,
2782 peer: &Peer,
2783 remote_peer: &avdtp::Peer,
2784 local_id: &StreamEndpointId,
2785 remote_id: &StreamEndpointId,
2786 ) -> TestMediaTask {
2787 let sbc_caps = sbc_capabilities();
2788 let set_config_fut = remote_peer.set_configuration(&local_id, &remote_id, &sbc_caps);
2789 let mut set_config_fut = pin!(set_config_fut);
2790
2791 match exec.run_until_stalled(&mut set_config_fut) {
2792 Poll::Ready(Ok(())) => {}
2793 x => panic!("Set capabilities should be ready but got {:?}", x),
2794 };
2795
2796 let open_fut = remote_peer.open(&local_id);
2797 let mut open_fut = pin!(open_fut);
2798 match exec.run_until_stalled(&mut open_fut) {
2799 Poll::Ready(Ok(())) => {}
2800 x => panic!("Open should be ready but got {:?}", x),
2801 };
2802
2803 let (_remote_transport, transport) = Channel::create();
2805 assert_eq!(Some(()), peer.receive_channel(transport).ok());
2806
2807 let stream_ids = [local_id.clone()];
2809 let start_fut = remote_peer.start(&stream_ids);
2810 let mut start_fut = pin!(start_fut);
2811 match exec.run_until_stalled(&mut start_fut) {
2812 Poll::Ready(Ok(())) => {}
2813 x => panic!("Start should be ready but got {:?}", x),
2814 };
2815
2816 let media_task = media_test_builder.expect_task();
2818 assert!(media_task.is_started());
2819
2820 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
2821 media_task
2822 }
2823
2824 #[fuchsia::test]
2825 fn permits_can_be_revoked_and_reinstated_all() {
2826 let mut exec = fasync::TestExecutor::new();
2827
2828 let mut streams = Streams::default();
2829 let mut test_builder = TestMediaTaskBuilder::new();
2830 streams.insert(Stream::build(
2831 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2832 test_builder.builder(),
2833 ));
2834 let sbc_endpoint_id = 1_u8.try_into().unwrap();
2835 let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2836
2837 streams.insert(Stream::build(
2838 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2839 test_builder.builder(),
2840 ));
2841 let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2842 let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2843
2844 let permits = Permits::new(2);
2845
2846 let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2847 let remote_peer = avdtp::Peer::new(remote);
2848
2849 let one_media_task = start_sbc_stream(
2850 &mut exec,
2851 &mut test_builder,
2852 &peer,
2853 &remote_peer,
2854 &sbc_endpoint_id,
2855 &remote_sbc_endpoint_id,
2856 );
2857 let two_media_task = start_sbc_stream(
2858 &mut exec,
2859 &mut test_builder,
2860 &peer,
2861 &remote_peer,
2862 &sbc2_endpoint_id,
2863 &remote_sbc2_endpoint_id,
2864 );
2865
2866 let taken_permits = permits.seize();
2868
2869 let remote_endpoints: HashSet<_> =
2870 [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2871
2872 let mut remote_requests = remote_peer.take_request_stream();
2874 let mut expected_suspends = remote_endpoints.clone();
2875 while !expected_suspends.is_empty() {
2876 match exec.run_until_stalled(&mut remote_requests.next()) {
2877 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2878 for stream_id in stream_ids {
2879 assert!(expected_suspends.remove(&stream_id));
2880 }
2881 responder.send().expect("send response okay");
2882 }
2883 x => panic!("Expected suspension and got {:?}", x),
2884 }
2885 }
2886
2887 assert!(!one_media_task.is_started());
2889 assert!(!two_media_task.is_started());
2890
2891 drop(taken_permits);
2893
2894 let mut expected_starts = remote_endpoints.clone();
2895 while !expected_starts.is_empty() {
2896 match exec.run_singlethreaded(&mut remote_requests.next()) {
2897 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2898 for stream_id in stream_ids {
2899 assert!(expected_starts.remove(&stream_id));
2900 }
2901 responder.send().expect("send response okay");
2902 }
2903 x => panic!("Expected start and got {:?}", x),
2904 }
2905 }
2906 let one_media_task = test_builder.expect_task();
2909 assert!(one_media_task.is_started());
2910 let two_media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
2911 Poll::Ready(Some(task)) => task,
2912 x => panic!("Expected another ready task but {x:?}"),
2913 };
2914 assert!(two_media_task.is_started());
2915 }
2916
2917 #[fuchsia::test]
2918 fn permits_can_be_revoked_one_at_a_time() {
2919 let mut exec = fasync::TestExecutor::new();
2920
2921 let mut streams = Streams::default();
2922 let mut test_builder = TestMediaTaskBuilder::new();
2923 streams.insert(Stream::build(
2924 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
2925 test_builder.builder(),
2926 ));
2927 let sbc_endpoint_id = 1_u8.try_into().unwrap();
2928 let remote_sbc_endpoint_id = 7_u8.try_into().unwrap();
2929
2930 streams.insert(Stream::build(
2931 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
2932 test_builder.builder(),
2933 ));
2934 let sbc2_endpoint_id = 2_u8.try_into().unwrap();
2935 let remote_sbc2_endpoint_id = 6_u8.try_into().unwrap();
2936
2937 let permits = Permits::new(2);
2938
2939 let (remote, _requests, _, peer) = setup_test_peer(false, streams, Some(permits.clone()));
2940 let remote_peer = avdtp::Peer::new(remote);
2941
2942 let one_media_task = start_sbc_stream(
2943 &mut exec,
2944 &mut test_builder,
2945 &peer,
2946 &remote_peer,
2947 &sbc_endpoint_id,
2948 &remote_sbc_endpoint_id,
2949 );
2950 let two_media_task = start_sbc_stream(
2951 &mut exec,
2952 &mut test_builder,
2953 &peer,
2954 &remote_peer,
2955 &sbc2_endpoint_id,
2956 &remote_sbc2_endpoint_id,
2957 );
2958
2959 let taken_permit = permits.take();
2961
2962 let remote_endpoints: HashSet<_> =
2963 [&remote_sbc_endpoint_id, &remote_sbc2_endpoint_id].iter().cloned().collect();
2964
2965 let mut remote_requests = remote_peer.take_request_stream();
2967 let suspended_id = match exec.run_until_stalled(&mut remote_requests.next()) {
2968 Poll::Ready(Some(Ok(avdtp::Request::Suspend { responder, stream_ids }))) => {
2969 assert!(stream_ids.len() == 1);
2970 assert!(remote_endpoints.contains(&stream_ids[0]));
2971 responder.send().expect("send response okay");
2972 stream_ids[0].clone()
2973 }
2974 x => panic!("Expected suspension and got {:?}", x),
2975 };
2976
2977 if suspended_id == remote_sbc_endpoint_id {
2979 assert!(!one_media_task.is_started());
2980 assert!(two_media_task.is_started());
2981 } else {
2982 assert!(one_media_task.is_started());
2983 assert!(!two_media_task.is_started());
2984 }
2985
2986 drop(taken_permit);
2988
2989 match exec.run_singlethreaded(&mut remote_requests.next()) {
2990 Some(Ok(avdtp::Request::Start { responder, stream_ids })) => {
2991 assert_eq!(stream_ids, &[suspended_id]);
2992 responder.send().expect("send response okay");
2993 }
2994 x => panic!("Expected start and got {:?}", x),
2995 }
2996 let media_task = match exec.run_until_stalled(&mut test_builder.next_task()) {
2998 Poll::Ready(Some(task)) => task,
2999 x => panic!("Expected media task to start: {x:?}"),
3000 };
3001 assert!(media_task.is_started());
3002 }
3003
3004 #[fuchsia::test]
3007 fn permit_suspend_start_while_suspending() {
3008 let mut exec = fasync::TestExecutor::new();
3009
3010 let mut streams = Streams::default();
3011 let mut test_builder = TestMediaTaskBuilder::new();
3012 streams.insert(Stream::build(
3013 make_sbc_endpoint(1, avdtp::EndpointType::Sink),
3014 test_builder.builder(),
3015 ));
3016 streams.insert(Stream::build(
3017 make_sbc_endpoint(2, avdtp::EndpointType::Sink),
3018 test_builder.builder(),
3019 ));
3020 let mut next_task_fut = test_builder.next_task();
3021
3022 let permits = Permits::new(1);
3023 let (remote, _profile_request_stream, _, peer) =
3024 setup_test_peer(false, streams, Some(permits.clone()));
3025
3026 let remote_peer = avdtp::Peer::new(remote);
3027 let mut remote_requests = remote_peer.take_request_stream();
3028
3029 let sbc_endpoint_id = 1_u8.try_into().unwrap();
3030
3031 let sbc_caps = sbc_capabilities();
3032 let mut set_config_fut =
3033 remote_peer.set_configuration(&sbc_endpoint_id, &sbc_endpoint_id, &sbc_caps);
3034
3035 match exec.run_until_stalled(&mut set_config_fut) {
3036 Poll::Ready(Ok(())) => {}
3037 x => panic!("Set capabilities should be ready but got {:?}", x),
3038 };
3039
3040 let mut open_fut = remote_peer.open(&sbc_endpoint_id);
3041 match exec.run_until_stalled(&mut open_fut) {
3042 Poll::Ready(Ok(())) => {}
3043 x => panic!("Open should be ready but got {:?}", x),
3044 };
3045
3046 let (_remote_transport, transport) = Channel::create();
3048 assert_eq!(Some(()), peer.receive_channel(transport).ok());
3049
3050 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3052 let Some(_deadline) = exec.wake_next_timer() else {
3053 panic!("Expected a timer to be waiting to run");
3054 };
3055
3056 let start_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3058 Some(Ok(avdtp::Request::Start { stream_ids, responder })) => {
3059 assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3060 responder
3061 }
3062 x => panic!("Expected a Start request, got {x:?}"),
3063 };
3064
3065 assert!(permits.get().is_none());
3066
3067 let mut start_fut = remote_peer.start(&[sbc_endpoint_id.clone()]);
3069
3070 match exec.run_singlethreaded(&mut start_fut) {
3073 Ok(()) => {}
3074 x => panic!("Expected OK response from start future but got {x:?}"),
3075 }
3076
3077 let suspend_responder = match exec.run_singlethreaded(&mut remote_requests.next()) {
3078 Some(Ok(avdtp::Request::Suspend { stream_ids, responder })) => {
3079 assert_eq!(stream_ids, vec![sbc_endpoint_id.clone()]);
3080 responder
3081 }
3082 x => panic!("Expected a suspend got {x:?}"),
3083 };
3084
3085 start_responder.send().unwrap();
3087
3088 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
3089
3090 suspend_responder.send().unwrap();
3092
3093 let media_task = match exec.run_until_stalled(&mut next_task_fut) {
3095 Poll::Ready(Some(task)) => task,
3096 x => panic!("Local task should be created at this point: {:?}", x),
3097 };
3098
3099 assert!(media_task.is_started());
3100 }
3101
3102 #[fuchsia::test]
3105 fn version_check() {
3106 let p1: ProfileDescriptor = ProfileDescriptor {
3107 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3108 major_version: Some(1),
3109 minor_version: Some(3),
3110 ..Default::default()
3111 };
3112 assert_eq!(true, a2dp_version_check(p1));
3113
3114 let p1: ProfileDescriptor = ProfileDescriptor {
3115 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3116 major_version: Some(2),
3117 minor_version: Some(10),
3118 ..Default::default()
3119 };
3120 assert_eq!(true, a2dp_version_check(p1));
3121
3122 let p1: ProfileDescriptor = ProfileDescriptor {
3123 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3124 major_version: Some(1),
3125 minor_version: Some(0),
3126 ..Default::default()
3127 };
3128 assert_eq!(false, a2dp_version_check(p1));
3129
3130 let p1: ProfileDescriptor = ProfileDescriptor {
3131 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3132 major_version: None,
3133 minor_version: Some(9),
3134 ..Default::default()
3135 };
3136 assert_eq!(false, a2dp_version_check(p1));
3137
3138 let p1: ProfileDescriptor = ProfileDescriptor {
3139 profile_id: Some(ServiceClassProfileIdentifier::AdvancedAudioDistribution),
3140 major_version: Some(2),
3141 minor_version: Some(2),
3142 ..Default::default()
3143 };
3144 assert_eq!(true, a2dp_version_check(p1));
3145 }
3146}