1use anyhow::format_err;
6use bt_avdtp::{Error, MediaCodecType, MediaType, ServiceCapability, StreamEndpointId};
7use fidl::endpoints::RequestStream;
8use fidl_fuchsia_bluetooth_avdtp_test::{
9 PeerControllerMarker, PeerControllerRequest, PeerControllerRequestStream, PeerError,
10 PeerManagerControlHandle, PeerManagerRequest, PeerManagerRequestStream,
11};
12use fuchsia_async as fasync;
13use fuchsia_bluetooth::detachable_map::DetachableWeak;
14use fuchsia_bluetooth::types::PeerId;
15use fuchsia_sync::Mutex;
16use futures::{TryFutureExt, TryStreamExt};
17use log::{error, info};
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::peer::Peer;
22
23const SBC_SEID: u8 = 6;
26
27struct Controller {}
29
30impl Controller {
31 async fn handle_controller_request(
34 a2dp: Arc<Peer>,
35 request: PeerControllerRequest,
36 endpoint_id: &StreamEndpointId,
37 ) -> Result<(), fidl::Error> {
38 match request {
39 PeerControllerRequest::SetConfiguration { responder } => {
40 let generic_capabilities = vec![ServiceCapability::MediaTransport];
41 match a2dp
42 .avdtp()
43 .set_configuration(endpoint_id, endpoint_id, &generic_capabilities)
44 .await
45 {
46 Ok(resp) => {
47 info!("SetConfiguration successful: {:?}", resp);
48 responder.send(Ok(()))?;
49 }
50 Err(e) => {
51 error!("SetConfiguration for {} failed: {:?}", endpoint_id, e);
52 responder.send(Err(PeerError::ProtocolError))?;
53 }
54 }
55 }
56 PeerControllerRequest::GetConfiguration { responder } => {
57 match a2dp.avdtp().get_configuration(endpoint_id).await {
58 Ok(service_capabilities) => {
59 info!(
60 "Service capabilities from GetConfiguration: {:?}",
61 service_capabilities
62 );
63 responder.send(Ok(()))?;
64 }
65 Err(e) => {
66 error!("GetConfiguration for {} failed: {:?}", endpoint_id, e);
67 responder.send(Err(PeerError::ProtocolError))?;
68 }
69 }
70 }
71 PeerControllerRequest::GetCapabilities { responder } => {
72 match a2dp.avdtp().get_capabilities(endpoint_id).await {
73 Ok(service_capabilities) => {
74 info!(
75 "Service capabilities from GetCapabilities {:?}",
76 service_capabilities
77 );
78 responder.send(Ok(()))?;
79 }
80 Err(e) => {
81 error!("GetCapabilities for {} failed: {:?}", endpoint_id, e);
82 responder.send(Err(PeerError::ProtocolError))?;
83 }
84 }
85 }
86 PeerControllerRequest::GetAllCapabilities { responder } => {
87 match a2dp.avdtp().get_all_capabilities(endpoint_id).await {
88 Ok(service_capabilities) => {
89 info!(
90 "Service capabilities from GetAllCapabilities: {:?}",
91 service_capabilities
92 );
93 responder.send(Ok(()))?;
94 }
95 Err(e) => {
96 error!("GetAllCapabilities for {} failed: {:?}", endpoint_id, e);
97 responder.send(Err(PeerError::ProtocolError))?;
98 }
99 }
100 }
101 PeerControllerRequest::SuspendStream { responder } => {
102 let local_id: StreamEndpointId = SBC_SEID.try_into().expect("should work");
103 match a2dp.stream_suspend(local_id).await {
104 Ok(_) => {
105 info!("SuspendStream was successful");
106 responder.send(Ok(()))?;
107 }
108 Err(e) => {
109 error!("SuspendStream for {} failed: {:?}", endpoint_id, e);
110 responder.send(Err(PeerError::ProtocolError))?;
111 }
112 }
113 }
114 PeerControllerRequest::ReconfigureStream { responder } => {
115 let generic_capabilities = [ServiceCapability::MediaCodec {
118 media_type: MediaType::Audio,
119 codec_type: MediaCodecType::AUDIO_SBC,
120 codec_extra: vec![0x11, 0x15, 2, 250],
121 }];
122 match a2dp.avdtp().reconfigure(endpoint_id, &generic_capabilities[..]).await {
123 Ok(resp) => {
124 info!("ReconfigureStream was successful {:?}", resp);
125 responder.send(Ok(()))?;
126 }
127 Err(e) => {
128 error!("ReconfigureStream for {} failed: {:?}", endpoint_id, e);
129 match e {
130 Error::RemoteRejected(e) if e.service_category().is_some() => {}
131 _ => responder.send(Err(PeerError::ProtocolError))?,
132 }
133 }
134 }
135 }
136 PeerControllerRequest::ReleaseStream { responder } => {
137 match a2dp.avdtp().close(endpoint_id).await {
138 Ok(resp) => {
139 info!("ReleaseStream was successful: {:?}", resp);
140 responder.send(Ok(()))?;
141 }
142 Err(e) => {
143 error!("ReleaseStream for {} failed: {:?}", endpoint_id, e);
144 responder.send(Err(PeerError::ProtocolError))?;
145 }
146 }
147 }
148 PeerControllerRequest::StartStream { responder } => {
149 match a2dp.avdtp().start(&[endpoint_id.clone()]).await {
150 Ok(resp) => {
151 info!("StartStream was successful: {:?}", resp);
152 responder.send(Ok(()))?;
153 }
154 Err(e) => {
155 error!("StartStream for {} failed: {:?}", endpoint_id, e);
156 responder.send(Err(PeerError::ProtocolError))?;
157 }
158 }
159 }
160 PeerControllerRequest::AbortStream { responder } => {
161 match a2dp.avdtp().abort(endpoint_id).await {
162 Ok(resp) => {
163 info!("AbortStream was successful: {:?}", resp);
164 responder.send(Ok(()))?;
165 }
166 Err(e) => {
167 info!("AbortStream for {} failed: {:?}", endpoint_id, e);
168 responder.send(Err(PeerError::ProtocolError))?;
169 }
170 }
171 }
172 PeerControllerRequest::EstablishStream { responder } => {
173 match a2dp.avdtp().open(endpoint_id).await {
174 Ok(resp) => {
175 info!("EstablishStream was successful: {:?}", resp);
176 responder.send(Ok(()))?;
177 }
178 Err(e) => {
179 info!("EstablishStream for {} failed: {:?}", endpoint_id, e);
180 responder.send(Err(PeerError::ProtocolError))?;
181 }
182 }
183 }
184 PeerControllerRequest::SuspendAndReconfigure { responder } => {
185 let local_id: StreamEndpointId = SBC_SEID.try_into().expect("should work");
186 match a2dp.stream_suspend(local_id).await {
187 Ok(_) => {
188 info!("SuspendStream was successful");
189 let generic_capabilities = [ServiceCapability::MediaCodec {
192 media_type: MediaType::Audio,
193 codec_type: MediaCodecType::AUDIO_SBC,
194 codec_extra: vec![0x11, 0x15, 2, 250],
195 }];
196 match a2dp.avdtp().reconfigure(endpoint_id, &generic_capabilities[..]).await
197 {
198 Ok(resp) => {
199 info!("ReconfigureStream was successful {:?}", resp);
200 responder.send(Ok(()))?;
201 }
202 Err(e) => {
203 info!("ReconfigureStream for {} failed: {:?}", endpoint_id, e);
204 responder.send(Err(PeerError::ProtocolError))?;
205 }
206 }
207 }
208 Err(e) => {
209 info!("SuspendStream for {} failed: {:?}", endpoint_id, e);
210 responder.send(Err(PeerError::ProtocolError))?;
211 }
212 }
213 }
214 }
215 Ok(())
216 }
217
218 async fn process_requests(
223 mut stream: PeerControllerRequestStream,
224 peer: DetachableWeak<PeerId, Peer>,
225 peer_id: PeerId,
226 ) -> Result<(), anyhow::Error> {
227 while let Some(req) = stream.try_next().await? {
228 let peer = { peer.upgrade().ok_or_else(|| format_err!("Peer disconnected"))? };
229 let infos = match peer.collect_capabilities().await {
231 Ok(endpoints) => endpoints,
232 Err(e) => {
233 info!("Error collecting capabilities from peer: {:?}", e);
234 continue;
235 }
236 };
237 let remote_id = match infos.first() {
238 Some(stream_info) => stream_info.local_id().clone(),
239 None => {
240 info!("Can't execute {:?} - no streams exist on the peer.", req);
241 continue;
242 }
243 };
244
245 let fut = Self::handle_controller_request(peer.clone(), req, &remote_id);
246 if let Err(e) = fut.await {
247 error!("{} error handling request: {:?}", peer_id, e);
248 }
249 }
250
251 info!("Controller finished for id: {}", peer_id);
252 Ok(())
253 }
254}
255
256async fn start_control_service(
259 mut stream: PeerManagerRequestStream,
260 controller_pool: Arc<Mutex<ControllerPoolInner>>,
261) -> Result<(), anyhow::Error> {
262 while let Some(req) = stream.try_next().await? {
263 match req {
264 PeerManagerRequest::GetPeer { peer_id, handle, .. } => {
265 let handle_to_client: fidl::endpoints::ServerEnd<PeerControllerMarker> = handle;
266 let peer_id: PeerId = peer_id.into();
267 let peer = match controller_pool.lock().get_peer(&peer_id) {
268 None => {
269 info!("GetPeer: request for nonexistent peer {}, closing.", peer_id);
270 continue;
272 }
273 Some(peer) => peer.clone(),
274 };
275 info!("GetPeer: Creating peer controller for peer with id {}.", peer_id);
276 let client_stream = handle_to_client.into_stream();
277 fasync::Task::local(async move {
278 Controller::process_requests(client_stream, peer, peer_id)
279 .await
280 .unwrap_or_else(|e| error!("Requests failed: {:?}", e))
281 })
282 .detach();
283 }
284 PeerManagerRequest::ConnectedPeers { responder } => {
285 let connected_peers: Vec<fidl_fuchsia_bluetooth::PeerId> =
286 controller_pool.lock().connected_peers().into_iter().map(Into::into).collect();
287 responder.send(&connected_peers)?;
288 info!("ConnectedPeers request. Peers: {:?}", connected_peers);
289 }
290 }
291 }
292 Ok(())
293}
294
295struct ControllerPoolInner {
298 peers: HashMap<PeerId, DetachableWeak<PeerId, Peer>>,
299 control_handle: Option<PeerManagerControlHandle>,
300}
301
302impl ControllerPoolInner {
303 pub fn new() -> Self {
304 Self { control_handle: None, peers: HashMap::new() }
305 }
306
307 #[cfg(test)]
308 fn control_handle(&self) -> Option<PeerManagerControlHandle> {
309 self.control_handle.clone()
310 }
311
312 fn connected_peers(&self) -> Vec<PeerId> {
314 self.peers.keys().cloned().collect()
315 }
316
317 fn get_peer(&self, id: &PeerId) -> Option<&DetachableWeak<PeerId, Peer>> {
319 self.peers.get(id)
320 }
321
322 fn set_control_handle(&mut self, control_handle: PeerManagerControlHandle) -> bool {
324 if self.control_handle.is_none() {
325 self.control_handle = Some(control_handle);
326 return true;
327 }
328 false
329 }
330
331 fn peer_connected(&mut self, peer: DetachableWeak<PeerId, Peer>) {
334 let peer_id = peer.key().clone();
335 drop(self.peers.insert(peer_id, peer));
336 if let Some(handle) = self.control_handle.as_ref() {
337 if let Err(e) = handle.send_on_peer_connected(&peer_id.into()) {
338 info!("Peer connected callback failed: {:?}", e);
339 }
340 }
341 }
342}
343
344pub struct ControllerPool {
345 inner: Arc<Mutex<ControllerPoolInner>>,
346}
347
348impl ControllerPool {
349 pub fn new() -> Self {
350 Self { inner: Arc::new(Mutex::new(ControllerPoolInner::new())) }
351 }
352
353 #[cfg(test)]
355 fn control_handle(&self) -> Option<PeerManagerControlHandle> {
356 self.inner.lock().control_handle()
357 }
358
359 #[cfg(test)]
361 fn get_peer(&self, id: &PeerId) -> Option<DetachableWeak<PeerId, Peer>> {
362 self.inner.lock().get_peer(id).cloned()
363 }
364
365 pub fn connected(&self, stream: PeerManagerRequestStream) {
371 if self.inner.lock().set_control_handle(stream.control_handle()) {
372 let inner = self.inner.clone();
374 fasync::Task::local(
375 start_control_service(stream, inner)
376 .unwrap_or_else(|e| error!("Error handling requests {:?}", e)),
377 )
378 .detach()
379 }
380 }
381
382 pub fn peer_connected(&self, peer: DetachableWeak<PeerId, Peer>) {
385 self.inner.lock().peer_connected(peer);
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use bt_avdtp::{EndpointType, ErrorCode, Peer as AvdtpPeer, Request, StreamInformation};
393 use fidl::endpoints::{create_endpoints, create_proxy_and_stream};
394 use fidl_fuchsia_bluetooth_avdtp_test::*;
395 use fidl_fuchsia_bluetooth_bredr::ProfileMarker;
396 use fuchsia_async as fasync;
397 use fuchsia_bluetooth::detachable_map::DetachableMap;
398 use fuchsia_bluetooth::types::Channel;
399 use futures::StreamExt;
400
401 use crate::media_task::tests::TestMediaTaskBuilder;
402 use crate::stream::tests::make_sbc_endpoint;
403 use crate::stream::{Stream, Streams};
404
405 async fn listen_for_avdtp_requests(remote: Channel) {
407 let remote_avdtp = AvdtpPeer::new(remote);
408 let mut remote_requests = remote_avdtp.take_request_stream();
409 while let Some(request) = remote_requests.next().await {
410 match request {
411 Ok(Request::Discover { responder }) => {
412 let endpoint_id = StreamEndpointId::try_from(1).expect("endpoint id creation");
413 let information = StreamInformation::new(
414 endpoint_id,
415 false,
416 MediaType::Audio,
417 EndpointType::Source,
418 );
419 responder.send(&[information]).expect("Sending response should have worked");
420 }
421 Ok(Request::GetCapabilities { responder, .. })
422 | Ok(Request::GetAllCapabilities { responder, .. })
423 | Ok(Request::GetConfiguration { responder, .. }) => {
424 responder.send(&[]).expect("Sending response should have worked");
425 }
426 Ok(Request::SetConfiguration { responder, .. })
427 | Ok(Request::Reconfigure { responder, .. }) => {
428 responder.send().expect("Sending response should have worked");
429 }
430 Ok(Request::Suspend { responder, .. }) | Ok(Request::Start { responder, .. }) => {
431 responder.send().expect("Sending response should have worked");
432 }
433 Ok(Request::Open { responder, .. })
434 | Ok(Request::Close { responder, .. })
435 | Ok(Request::Abort { responder, .. }) => {
436 responder
438 .reject(ErrorCode::UnsupportedConfiguration)
439 .expect("Sending response should have worked");
440 }
441 _ => {
442 panic!("Got an unhandled AVDTP request");
443 }
444 }
445 }
446 }
447
448 #[fuchsia_async::run_singlethreaded(test)]
449 async fn test_client_connected_to_peer_manager() {
454 let (pm_proxy, pm_stream) = create_proxy_and_stream::<PeerManagerMarker>();
457 let controller_pool = ControllerPool::new();
458 let mut peer_map = DetachableMap::new();
459
460 controller_pool.connected(pm_stream);
462
463 let fake_peer_id = PeerId(12345);
465 let (profile_proxy, _requests) = create_proxy_and_stream::<ProfileMarker>();
466 let (remote, signaling) = Channel::create();
467 let avdtp_peer = AvdtpPeer::new(signaling);
468 let mut streams = Streams::default();
469 let test_builder = TestMediaTaskBuilder::new();
470 streams.insert(Stream::build(
471 make_sbc_endpoint(1, EndpointType::Source),
472 test_builder.builder(),
473 ));
474 let peer = Peer::create(
475 fake_peer_id,
476 avdtp_peer,
477 streams,
478 None,
479 profile_proxy,
480 bt_metrics::MetricsLogger::default(),
481 );
482 let _ = peer_map.insert(fake_peer_id, peer);
483 let weak_peer = peer_map.get(&fake_peer_id).expect("just inserted");
484
485 controller_pool.peer_connected(weak_peer);
486 assert!(controller_pool.control_handle().is_some());
487 assert!(controller_pool.get_peer(&fake_peer_id).is_some());
488
489 let (client, server) = create_endpoints::<PeerControllerMarker>();
491 let client_proxy = client.into_proxy();
492 let res = pm_proxy.get_peer(&fake_peer_id.into(), server);
493 assert_eq!(Ok(()), res.map_err(|e| e.to_string()));
494
495 fasync::Task::spawn(listen_for_avdtp_requests(remote)).detach();
497
498 let res = client_proxy.set_configuration().await;
500 assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
501
502 let res = client_proxy.get_configuration().await;
503 assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
504
505 let res = client_proxy.get_capabilities().await;
506 assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
507
508 let res = client_proxy.get_all_capabilities().await;
509 assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
510
511 let res = client_proxy.suspend_stream().await;
514 assert_eq!(Ok(Err(PeerError::ProtocolError)), res.map_err(|e| e.to_string()));
515
516 let res = client_proxy.reconfigure_stream().await;
517 assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
518
519 let res = client_proxy.suspend_and_reconfigure().await;
522 assert_eq!(Ok(Err(PeerError::ProtocolError)), res.map_err(|e| e.to_string()));
523
524 let res = client_proxy.start_stream().await;
525 assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
526
527 let res = client_proxy.release_stream().await.expect("Command should succeed");
530 assert_eq!(Err("ProtocolError".to_string()), res.map_err(|e| format!("{:?}", e)));
531
532 let res = client_proxy.establish_stream().await.expect("Command should succeed");
535 assert_eq!(Err("ProtocolError".to_string()), res.map_err(|e| format!("{:?}", e)));
536
537 let res = client_proxy.abort_stream().await.expect("Command should succeed");
540 assert_eq!(Err("ProtocolError".to_string()), res.map_err(|e| format!("{:?}", e)));
541 }
542}