bt_a2dp/peer/
controller.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::format_err;
6use bt_avdtp::{Error, MediaCodecType, MediaType, ServiceCapability, StreamEndpointId};
7use fidl::endpoints::RequestStream;
8use fidl_fuchsia_bluetooth_avdtp_test::{
9    PeerControllerMarker, PeerControllerRequest, PeerControllerRequestStream, PeerError,
10    PeerManagerControlHandle, PeerManagerRequest, PeerManagerRequestStream,
11};
12use fuchsia_async as fasync;
13use fuchsia_bluetooth::detachable_map::DetachableWeak;
14use fuchsia_bluetooth::types::PeerId;
15use fuchsia_sync::Mutex;
16use futures::{TryFutureExt, TryStreamExt};
17use log::{error, info};
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use crate::peer::Peer;
22
23/// ID for the SBC stream endpoint. This is the same SEID that is chosen
24/// in A2DP Sink and Source.
25const SBC_SEID: u8 = 6;
26
27/// A structure that handles requests from peer controller, specific to a peer.
28struct Controller {}
29
30impl Controller {
31    /// Handle one request from the PeerController and respond with the results from sending the
32    /// command(s) to the peer
33    async fn handle_controller_request(
34        a2dp: Arc<Peer>,
35        request: PeerControllerRequest,
36        endpoint_id: &StreamEndpointId,
37    ) -> Result<(), fidl::Error> {
38        match request {
39            PeerControllerRequest::SetConfiguration { responder } => {
40                let generic_capabilities = vec![ServiceCapability::MediaTransport];
41                match a2dp
42                    .avdtp()
43                    .set_configuration(endpoint_id, endpoint_id, &generic_capabilities)
44                    .await
45                {
46                    Ok(resp) => {
47                        info!("SetConfiguration successful: {:?}", resp);
48                        responder.send(Ok(()))?;
49                    }
50                    Err(e) => {
51                        error!("SetConfiguration for {} failed: {:?}", endpoint_id, e);
52                        responder.send(Err(PeerError::ProtocolError))?;
53                    }
54                }
55            }
56            PeerControllerRequest::GetConfiguration { responder } => {
57                match a2dp.avdtp().get_configuration(endpoint_id).await {
58                    Ok(service_capabilities) => {
59                        info!(
60                            "Service capabilities from GetConfiguration: {:?}",
61                            service_capabilities
62                        );
63                        responder.send(Ok(()))?;
64                    }
65                    Err(e) => {
66                        error!("GetConfiguration for {} failed: {:?}", endpoint_id, e);
67                        responder.send(Err(PeerError::ProtocolError))?;
68                    }
69                }
70            }
71            PeerControllerRequest::GetCapabilities { responder } => {
72                match a2dp.avdtp().get_capabilities(endpoint_id).await {
73                    Ok(service_capabilities) => {
74                        info!(
75                            "Service capabilities from GetCapabilities {:?}",
76                            service_capabilities
77                        );
78                        responder.send(Ok(()))?;
79                    }
80                    Err(e) => {
81                        error!("GetCapabilities for {} failed: {:?}", endpoint_id, e);
82                        responder.send(Err(PeerError::ProtocolError))?;
83                    }
84                }
85            }
86            PeerControllerRequest::GetAllCapabilities { responder } => {
87                match a2dp.avdtp().get_all_capabilities(endpoint_id).await {
88                    Ok(service_capabilities) => {
89                        info!(
90                            "Service capabilities from GetAllCapabilities: {:?}",
91                            service_capabilities
92                        );
93                        responder.send(Ok(()))?;
94                    }
95                    Err(e) => {
96                        error!("GetAllCapabilities for {} failed: {:?}", endpoint_id, e);
97                        responder.send(Err(PeerError::ProtocolError))?;
98                    }
99                }
100            }
101            PeerControllerRequest::SuspendStream { responder } => {
102                let local_id: StreamEndpointId = SBC_SEID.try_into().expect("should work");
103                match a2dp.stream_suspend(local_id).await {
104                    Ok(_) => {
105                        info!("SuspendStream was successful");
106                        responder.send(Ok(()))?;
107                    }
108                    Err(e) => {
109                        error!("SuspendStream for {} failed: {:?}", endpoint_id, e);
110                        responder.send(Err(PeerError::ProtocolError))?;
111                    }
112                }
113            }
114            PeerControllerRequest::ReconfigureStream { responder } => {
115                // Only one frequency, channel mode, block length, subband,
116                // and allocation for reconfigure (A2DP 4.3.2)
117                let generic_capabilities = [ServiceCapability::MediaCodec {
118                    media_type: MediaType::Audio,
119                    codec_type: MediaCodecType::AUDIO_SBC,
120                    codec_extra: vec![0x11, 0x15, 2, 250],
121                }];
122                match a2dp.avdtp().reconfigure(endpoint_id, &generic_capabilities[..]).await {
123                    Ok(resp) => {
124                        info!("ReconfigureStream was successful {:?}", resp);
125                        responder.send(Ok(()))?;
126                    }
127                    Err(e) => {
128                        error!("ReconfigureStream for {} failed: {:?}", endpoint_id, e);
129                        match e {
130                            Error::RemoteRejected(e) if e.service_category().is_some() => {}
131                            _ => responder.send(Err(PeerError::ProtocolError))?,
132                        }
133                    }
134                }
135            }
136            PeerControllerRequest::ReleaseStream { responder } => {
137                match a2dp.avdtp().close(endpoint_id).await {
138                    Ok(resp) => {
139                        info!("ReleaseStream was successful: {:?}", resp);
140                        responder.send(Ok(()))?;
141                    }
142                    Err(e) => {
143                        error!("ReleaseStream for {} failed: {:?}", endpoint_id, e);
144                        responder.send(Err(PeerError::ProtocolError))?;
145                    }
146                }
147            }
148            PeerControllerRequest::StartStream { responder } => {
149                match a2dp.avdtp().start(&[endpoint_id.clone()]).await {
150                    Ok(resp) => {
151                        info!("StartStream was successful: {:?}", resp);
152                        responder.send(Ok(()))?;
153                    }
154                    Err(e) => {
155                        error!("StartStream for {} failed: {:?}", endpoint_id, e);
156                        responder.send(Err(PeerError::ProtocolError))?;
157                    }
158                }
159            }
160            PeerControllerRequest::AbortStream { responder } => {
161                match a2dp.avdtp().abort(endpoint_id).await {
162                    Ok(resp) => {
163                        info!("AbortStream was successful: {:?}", resp);
164                        responder.send(Ok(()))?;
165                    }
166                    Err(e) => {
167                        info!("AbortStream for {} failed: {:?}", endpoint_id, e);
168                        responder.send(Err(PeerError::ProtocolError))?;
169                    }
170                }
171            }
172            PeerControllerRequest::EstablishStream { responder } => {
173                match a2dp.avdtp().open(endpoint_id).await {
174                    Ok(resp) => {
175                        info!("EstablishStream was successful: {:?}", resp);
176                        responder.send(Ok(()))?;
177                    }
178                    Err(e) => {
179                        info!("EstablishStream for {} failed: {:?}", endpoint_id, e);
180                        responder.send(Err(PeerError::ProtocolError))?;
181                    }
182                }
183            }
184            PeerControllerRequest::SuspendAndReconfigure { responder } => {
185                let local_id: StreamEndpointId = SBC_SEID.try_into().expect("should work");
186                match a2dp.stream_suspend(local_id).await {
187                    Ok(_) => {
188                        info!("SuspendStream was successful");
189                        // Only one frequency, channel mode, block length, subband,
190                        // and allocation for reconfigure (A2DP 4.3.2)
191                        let generic_capabilities = [ServiceCapability::MediaCodec {
192                            media_type: MediaType::Audio,
193                            codec_type: MediaCodecType::AUDIO_SBC,
194                            codec_extra: vec![0x11, 0x15, 2, 250],
195                        }];
196                        match a2dp.avdtp().reconfigure(endpoint_id, &generic_capabilities[..]).await
197                        {
198                            Ok(resp) => {
199                                info!("ReconfigureStream was successful {:?}", resp);
200                                responder.send(Ok(()))?;
201                            }
202                            Err(e) => {
203                                info!("ReconfigureStream for {} failed: {:?}", endpoint_id, e);
204                                responder.send(Err(PeerError::ProtocolError))?;
205                            }
206                        }
207                    }
208                    Err(e) => {
209                        info!("SuspendStream for {} failed: {:?}", endpoint_id, e);
210                        responder.send(Err(PeerError::ProtocolError))?;
211                    }
212                }
213            }
214        }
215        Ok(())
216    }
217
218    /// Process all the requests from a peer controller.
219    /// Finishes when an error occurs on the stream or the stream is closed (the controller
220    /// disconnects).  Errors that happen within each specific request are logged but do not close
221    /// the stream.
222    async fn process_requests(
223        mut stream: PeerControllerRequestStream,
224        peer: DetachableWeak<PeerId, Peer>,
225        peer_id: PeerId,
226    ) -> Result<(), anyhow::Error> {
227        while let Some(req) = stream.try_next().await? {
228            let peer = { peer.upgrade().ok_or_else(|| format_err!("Peer disconnected"))? };
229            // Find the first discovered stream by the A2DP peer.
230            let infos = match peer.collect_capabilities().await {
231                Ok(endpoints) => endpoints,
232                Err(e) => {
233                    info!("Error collecting capabilities from peer: {:?}", e);
234                    continue;
235                }
236            };
237            let remote_id = match infos.first() {
238                Some(stream_info) => stream_info.local_id().clone(),
239                None => {
240                    info!("Can't execute {:?} - no streams exist on the peer.", req);
241                    continue;
242                }
243            };
244
245            let fut = Self::handle_controller_request(peer.clone(), req, &remote_id);
246            if let Err(e) = fut.await {
247                error!("{} error handling request: {:?}", peer_id, e);
248            }
249        }
250
251        info!("Controller finished for id: {}", peer_id);
252        Ok(())
253    }
254}
255
256/// Control implementation to handle FIDL requests.
257/// State is stored in the remotes object.
258async fn start_control_service(
259    mut stream: PeerManagerRequestStream,
260    controller_pool: Arc<Mutex<ControllerPoolInner>>,
261) -> Result<(), anyhow::Error> {
262    while let Some(req) = stream.try_next().await? {
263        match req {
264            PeerManagerRequest::GetPeer { peer_id, handle, .. } => {
265                let handle_to_client: fidl::endpoints::ServerEnd<PeerControllerMarker> = handle;
266                let peer_id: PeerId = peer_id.into();
267                let peer = match controller_pool.lock().get_peer(&peer_id) {
268                    None => {
269                        info!("GetPeer: request for nonexistent peer {}, closing.", peer_id);
270                        // Dropping the handle will close the connection.
271                        continue;
272                    }
273                    Some(peer) => peer.clone(),
274                };
275                info!("GetPeer: Creating peer controller for peer with id {}.", peer_id);
276                let client_stream = handle_to_client.into_stream();
277                fasync::Task::local(async move {
278                    Controller::process_requests(client_stream, peer, peer_id)
279                        .await
280                        .unwrap_or_else(|e| error!("Requests failed: {:?}", e))
281                })
282                .detach();
283            }
284            PeerManagerRequest::ConnectedPeers { responder } => {
285                let connected_peers: Vec<fidl_fuchsia_bluetooth::PeerId> =
286                    controller_pool.lock().connected_peers().into_iter().map(Into::into).collect();
287                responder.send(&connected_peers)?;
288                info!("ConnectedPeers request. Peers: {:?}", connected_peers);
289            }
290        }
291    }
292    Ok(())
293}
294
295/// A pool of peers which can be potentially controlled by a connected PeerManager protocol.
296/// Each peer will persist in `peers` until both the peer is disconnected and a command fails.
297struct ControllerPoolInner {
298    peers: HashMap<PeerId, DetachableWeak<PeerId, Peer>>,
299    control_handle: Option<PeerManagerControlHandle>,
300}
301
302impl ControllerPoolInner {
303    pub fn new() -> Self {
304        Self { control_handle: None, peers: HashMap::new() }
305    }
306
307    #[cfg(test)]
308    fn control_handle(&self) -> Option<PeerManagerControlHandle> {
309        self.control_handle.clone()
310    }
311
312    /// Returns a list of the currently connected peers.
313    fn connected_peers(&self) -> Vec<PeerId> {
314        self.peers.keys().cloned().collect()
315    }
316
317    /// Returns a DetachableWeak reference to the A2DP peer, should it exist.
318    fn get_peer(&self, id: &PeerId) -> Option<&DetachableWeak<PeerId, Peer>> {
319        self.peers.get(id)
320    }
321
322    /// Sets the control_handle. Returns true if set, false otherwise.
323    fn set_control_handle(&mut self, control_handle: PeerManagerControlHandle) -> bool {
324        if self.control_handle.is_none() {
325            self.control_handle = Some(control_handle);
326            return true;
327        }
328        false
329    }
330
331    /// Inserts a weak reference to the A2DP peer and notifies the control handle of
332    /// the connection.
333    fn peer_connected(&mut self, peer: DetachableWeak<PeerId, Peer>) {
334        let peer_id = peer.key().clone();
335        drop(self.peers.insert(peer_id, peer));
336        if let Some(handle) = self.control_handle.as_ref() {
337            if let Err(e) = handle.send_on_peer_connected(&peer_id.into()) {
338                info!("Peer connected callback failed: {:?}", e);
339            }
340        }
341    }
342}
343
344pub struct ControllerPool {
345    inner: Arc<Mutex<ControllerPoolInner>>,
346}
347
348impl ControllerPool {
349    pub fn new() -> Self {
350        Self { inner: Arc::new(Mutex::new(ControllerPoolInner::new())) }
351    }
352
353    /// Returns the control_handle associated with this controller, if set.
354    #[cfg(test)]
355    fn control_handle(&self) -> Option<PeerManagerControlHandle> {
356        self.inner.lock().control_handle()
357    }
358
359    /// Returns a cloned copy of of the A2DP peer, if it exists.
360    #[cfg(test)]
361    fn get_peer(&self, id: &PeerId) -> Option<DetachableWeak<PeerId, Peer>> {
362        self.inner.lock().get_peer(id).cloned()
363    }
364
365    /// Called when a client connects to the PeerManager service. Stores the control_handle
366    /// associated with the request stream and spawns a task to handle incoming requests.
367    ///
368    /// There can only be one active client at a time. As such, any client connections thereafter
369    /// will be dropped.
370    pub fn connected(&self, stream: PeerManagerRequestStream) {
371        if self.inner.lock().set_control_handle(stream.control_handle()) {
372            // Spawns the control service task if the control handle hasn't been set.
373            let inner = self.inner.clone();
374            fasync::Task::local(
375                start_control_service(stream, inner)
376                    .unwrap_or_else(|e| error!("Error handling requests {:?}", e)),
377            )
378            .detach()
379        }
380    }
381
382    /// Stores the weak reference to the A2DP peer and notifies the control handle of the connection.
383    /// This should be called once for every connected remote peer.
384    pub fn peer_connected(&self, peer: DetachableWeak<PeerId, Peer>) {
385        self.inner.lock().peer_connected(peer);
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use bt_avdtp::{EndpointType, ErrorCode, Peer as AvdtpPeer, Request, StreamInformation};
393    use fidl::endpoints::{create_endpoints, create_proxy_and_stream};
394    use fidl_fuchsia_bluetooth_avdtp_test::*;
395    use fidl_fuchsia_bluetooth_bredr::ProfileMarker;
396    use fuchsia_async as fasync;
397    use fuchsia_bluetooth::detachable_map::DetachableMap;
398    use fuchsia_bluetooth::types::Channel;
399    use futures::StreamExt;
400
401    use crate::media_task::tests::TestMediaTaskBuilder;
402    use crate::stream::tests::make_sbc_endpoint;
403    use crate::stream::{Stream, Streams};
404
405    /// Reads from the AVDTP request stream, sending back acknowledgements, unless otherwise noted.
406    async fn listen_for_avdtp_requests(remote: Channel) {
407        let remote_avdtp = AvdtpPeer::new(remote);
408        let mut remote_requests = remote_avdtp.take_request_stream();
409        while let Some(request) = remote_requests.next().await {
410            match request {
411                Ok(Request::Discover { responder }) => {
412                    let endpoint_id = StreamEndpointId::try_from(1).expect("endpoint id creation");
413                    let information = StreamInformation::new(
414                        endpoint_id,
415                        false,
416                        MediaType::Audio,
417                        EndpointType::Source,
418                    );
419                    responder.send(&[information]).expect("Sending response should have worked");
420                }
421                Ok(Request::GetCapabilities { responder, .. })
422                | Ok(Request::GetAllCapabilities { responder, .. })
423                | Ok(Request::GetConfiguration { responder, .. }) => {
424                    responder.send(&[]).expect("Sending response should have worked");
425                }
426                Ok(Request::SetConfiguration { responder, .. })
427                | Ok(Request::Reconfigure { responder, .. }) => {
428                    responder.send().expect("Sending response should have worked");
429                }
430                Ok(Request::Suspend { responder, .. }) | Ok(Request::Start { responder, .. }) => {
431                    responder.send().expect("Sending response should have worked");
432                }
433                Ok(Request::Open { responder, .. })
434                | Ok(Request::Close { responder, .. })
435                | Ok(Request::Abort { responder, .. }) => {
436                    // Purposefully make this fail, to ensure fail condition branch works.
437                    responder
438                        .reject(ErrorCode::UnsupportedConfiguration)
439                        .expect("Sending response should have worked");
440                }
441                _ => {
442                    panic!("Got an unhandled AVDTP request");
443                }
444            }
445        }
446    }
447
448    #[fuchsia_async::run_singlethreaded(test)]
449    /// Tests when a client connects to the PeerManager, a listening task is spawned,
450    /// and requests can be served.
451    /// Note: This test does not test the correctness of the underlying AVDTP commands. The AVDTP
452    /// commands should be well tested in `bluetooth/lib/avdtp`.
453    async fn test_client_connected_to_peer_manager() {
454        // Create the ControllerPool. This stores all active peers and handles listening
455        // to PeerManager and PeerController requests.
456        let (pm_proxy, pm_stream) = create_proxy_and_stream::<PeerManagerMarker>();
457        let controller_pool = ControllerPool::new();
458        let mut peer_map = DetachableMap::new();
459
460        // Send a `connected` signal to simulate client connection.
461        controller_pool.connected(pm_stream);
462
463        // Create a fake peer, and simulate connection by sending the `peer_connected` signal.
464        let fake_peer_id = PeerId(12345);
465        let (profile_proxy, _requests) = create_proxy_and_stream::<ProfileMarker>();
466        let (remote, signaling) = Channel::create();
467        let avdtp_peer = AvdtpPeer::new(signaling);
468        let mut streams = Streams::default();
469        let test_builder = TestMediaTaskBuilder::new();
470        streams.insert(Stream::build(
471            make_sbc_endpoint(1, EndpointType::Source),
472            test_builder.builder(),
473        ));
474        let peer = Peer::create(
475            fake_peer_id,
476            avdtp_peer,
477            streams,
478            None,
479            profile_proxy,
480            bt_metrics::MetricsLogger::default(),
481        );
482        let _ = peer_map.insert(fake_peer_id, peer);
483        let weak_peer = peer_map.get(&fake_peer_id).expect("just inserted");
484
485        controller_pool.peer_connected(weak_peer);
486        assert!(controller_pool.control_handle().is_some());
487        assert!(controller_pool.get_peer(&fake_peer_id).is_some());
488
489        // Client connects to controller by sending `get_peer`.
490        let (client, server) = create_endpoints::<PeerControllerMarker>();
491        let client_proxy = client.into_proxy();
492        let res = pm_proxy.get_peer(&fake_peer_id.into(), server);
493        assert_eq!(Ok(()), res.map_err(|e| e.to_string()));
494
495        // Spawn task that acts as remote end and replies with simple responses.
496        fasync::Task::spawn(listen_for_avdtp_requests(remote)).detach();
497
498        // Send client commands over the proxy, and make sure responses are expected.
499        let res = client_proxy.set_configuration().await;
500        assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
501
502        let res = client_proxy.get_configuration().await;
503        assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
504
505        let res = client_proxy.get_capabilities().await;
506        assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
507
508        let res = client_proxy.get_all_capabilities().await;
509        assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
510
511        // Suspend command should work, but suspending the stream will fail
512        // since there is no active stream.
513        let res = client_proxy.suspend_stream().await;
514        assert_eq!(Ok(Err(PeerError::ProtocolError)), res.map_err(|e| e.to_string()));
515
516        let res = client_proxy.reconfigure_stream().await;
517        assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
518
519        // Suspend command should work, but suspending the stream will fail
520        // since there is no active stream.
521        let res = client_proxy.suspend_and_reconfigure().await;
522        assert_eq!(Ok(Err(PeerError::ProtocolError)), res.map_err(|e| e.to_string()));
523
524        let res = client_proxy.start_stream().await;
525        assert_eq!(Ok(Ok(())), res.map_err(|e| e.to_string()));
526
527        // FIDL method should work, but the underlying AVDTP method call should return an error.
528        // See `listen_for_avdtp_requests`.
529        let res = client_proxy.release_stream().await.expect("Command should succeed");
530        assert_eq!(Err("ProtocolError".to_string()), res.map_err(|e| format!("{:?}", e)));
531
532        // FIDL method should work, but the underlying AVDTP method call should return an error.
533        // See `listen_for_avdtp_requests`.
534        let res = client_proxy.establish_stream().await.expect("Command should succeed");
535        assert_eq!(Err("ProtocolError".to_string()), res.map_err(|e| format!("{:?}", e)));
536
537        // FIDL method should work, but the underlying AVDTP method call should return an error.
538        // See `listen_for_avdtp_requests`.
539        let res = client_proxy.abort_stream().await.expect("Command should succeed");
540        assert_eq!(Err("ProtocolError".to_string()), res.map_err(|e| format!("{:?}", e)));
541    }
542}