bt_hfp/
a2dp.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_bluetooth_internal_a2dp as a2dp;
6use fuchsia_bluetooth::types::PeerId;
7use futures::{Future, FutureExt, StreamExt};
8use log::warn;
9use thiserror::Error;
10
11#[derive(Error, Debug)]
12pub enum Error {
13    #[error("The suspend future unexpected completed")]
14    UnexpectedCompletion,
15    #[error("FIDL Error: {:?}", .0)]
16    Fidl(#[from] fidl::Error),
17}
18
19/// A client for fuchsia.bluetooth.internal.a2dp.
20
21#[derive(Clone)]
22pub struct Control {
23    proxy: Option<a2dp::ControllerProxy>,
24}
25
26pub type PauseToken = Option<a2dp::StreamSuspenderProxy>;
27
28impl Control {
29    pub fn connect() -> Self {
30        let proxy = fuchsia_component::client::connect_to_protocol::<a2dp::ControllerMarker>().ok();
31        Self { proxy }
32    }
33
34    #[cfg(all(test, feature = "test_a2dp_controller"))]
35    fn from_proxy(proxy: a2dp::ControllerProxy) -> Self {
36        Self { proxy: Some(proxy) }
37    }
38
39    pub fn pause(
40        &self,
41        peer_id: Option<PeerId>,
42    ) -> impl Future<Output = Result<PauseToken, Error>> {
43        let proxy = match self.proxy.as_ref() {
44            None => return futures::future::ok(None).left_future(),
45            Some(proxy) => proxy,
46        };
47
48        let (suspender_proxy, server_end) = fidl::endpoints::create_proxy();
49        let id = peer_id.map(Into::into);
50        let suspend_fut = proxy.suspend(id.as_ref(), server_end);
51
52        async move {
53            match suspender_proxy.take_event_stream().next().await {
54                Some(Ok(a2dp::StreamSuspenderEvent::OnSuspended {})) => Ok(Some(suspender_proxy)),
55                x => {
56                    warn!("Failed to suspend A2DP: {:?}", x);
57                    // Check to see the result of the suspend future.  It should finish, and it
58                    // might have finished because we couldn't connect (delayed)
59                    match suspend_fut.await {
60                        Err(fidl::Error::ClientChannelClosed { status, .. })
61                            if status == zx::Status::NOT_FOUND =>
62                        {
63                            Ok(None)
64                        }
65                        Err(e) => Err(e.into()),
66                        Ok(()) => Err(Error::UnexpectedCompletion),
67                    }
68                }
69            }
70        }
71        .right_future()
72    }
73}
74
75#[cfg(all(test, feature = "test_a2dp_controller"))]
76mod tests {
77    use super::*;
78
79    use async_utils::PollExt;
80    use fidl::endpoints::RequestStream;
81    use fuchsia_async as fasync;
82    use futures::task::Poll;
83    use std::pin::pin;
84
85    #[fuchsia::test]
86    fn when_a2dp_not_accessible() {
87        let mut exec = fasync::TestExecutor::new();
88        let control = Control::connect();
89
90        let pause_fut = control.pause(None);
91        let mut pause_fut = pin!(pause_fut);
92
93        let _ = exec.run_singlethreaded(&mut pause_fut).expect("should be Ok");
94
95        let pause_single_fut = control.pause(Some(PeerId(1)));
96        let mut pause_single_fut = pin!(pause_single_fut);
97
98        let _ = exec.run_singlethreaded(&mut pause_single_fut).expect("should be Ok");
99    }
100
101    fn expect_suspend_request(
102        exec: &mut fasync::TestExecutor,
103        requests: &mut a2dp::ControllerRequestStream,
104        expected_peer: Option<PeerId>,
105    ) -> (a2dp::ControllerSuspendResponder, a2dp::StreamSuspenderRequestStream) {
106        match exec.run_until_stalled(&mut requests.next()) {
107            Poll::Ready(Some(Ok(a2dp::ControllerRequest::Suspend {
108                responder,
109                token,
110                peer_id,
111            }))) => {
112                assert_eq!(peer_id, expected_peer.map(Into::into).map(Box::new));
113                (responder, token.into_stream())
114            }
115            x => panic!("Expected a ready controller suspend, got {:?}", x),
116        }
117    }
118
119    fn expect_suspender_close(
120        exec: &mut fasync::TestExecutor,
121        requests: &mut a2dp::StreamSuspenderRequestStream,
122    ) {
123        match exec.run_until_stalled(&mut requests.next()) {
124            Poll::Ready(None) => {}
125            x => panic!("Expected suspender to be closed, but it wasn't: {:?}", x),
126        }
127    }
128
129    #[fuchsia::test]
130    fn suspend_and_release() {
131        let mut exec = fasync::TestExecutor::new();
132        let (proxy, mut control_requests) =
133            fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
134        let control = Control::from_proxy(proxy);
135
136        let pause_fut = control.pause(Some(PeerId(1)));
137        let mut pause_fut = pin!(pause_fut);
138
139        let (responder_one, mut stream1) =
140            expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
141
142        exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
143
144        let _ = stream1.control_handle().send_on_suspended().expect("send on suspended event");
145
146        let token = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
147
148        // Should be able to have overlapping pauses.
149        let pause_fut = control.pause(None);
150        let mut pause_fut = pin!(pause_fut);
151        let (responder_two, mut stream2) =
152            expect_suspend_request(&mut exec, &mut control_requests, None);
153        stream2.control_handle().send_on_suspended().expect("should send on suspended event");
154        let token2 = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
155
156        drop(token);
157
158        expect_suspender_close(&mut exec, &mut stream1);
159        let _ = responder_one.send().unwrap();
160
161        drop(token2);
162
163        expect_suspender_close(&mut exec, &mut stream2);
164        let _ = responder_two.send().unwrap();
165    }
166
167    #[fuchsia::test]
168    fn suspend_fails() {
169        let mut exec = fasync::TestExecutor::new();
170        let (proxy, mut control_requests) =
171            fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
172        let control = Control::from_proxy(proxy);
173
174        let pause_fut = control.pause(Some(PeerId(1)));
175        let mut pause_fut = pin!(pause_fut);
176
177        let (responder, stream1) =
178            expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
179
180        drop(responder);
181        drop(control_requests);
182        drop(stream1);
183
184        let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
185    }
186
187    #[fuchsia::test]
188    fn proxy_is_closed_before_suspend_event() {
189        let mut exec = fasync::TestExecutor::new();
190        let (proxy, mut control_requests) =
191            fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
192        let control = Control::from_proxy(proxy);
193
194        let pause_fut = control.pause(Some(PeerId(1)));
195        let mut pause_fut = pin!(pause_fut);
196
197        let (responder, stream1) =
198            expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
199
200        exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
201
202        drop(stream1);
203        let _ = responder.send().expect("should send response okay");
204
205        let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
206    }
207}