bt_hfp/
a2dp.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl_fuchsia_bluetooth_internal_a2dp as a2dp;
6use fuchsia_bluetooth::types::PeerId;
7use futures::{Future, FutureExt, StreamExt};
8use log::warn;
9use thiserror::Error;
10
11#[derive(Error, Debug)]
12pub enum Error {
13    #[error("The suspend future unexpected completed")]
14    UnexpectedCompletion,
15    #[error("FIDL Error: {:?}", .0)]
16    Fidl(#[from] fidl::Error),
17}
18
19/// A client for fuchsia.bluetooth.internal.a2dp.
20
21#[derive(Clone, Default)]
22pub struct Control {
23    proxy: Option<a2dp::ControllerProxy>,
24}
25
26pub type PauseToken = Option<a2dp::StreamSuspenderProxy>;
27
28impl Control {
29    pub fn connect() -> Self {
30        let proxy = fuchsia_component::client::connect_to_protocol::<a2dp::ControllerMarker>().ok();
31        Self { proxy }
32    }
33
34    pub fn from_proxy(proxy: a2dp::ControllerProxy) -> Self {
35        Self { proxy: Some(proxy) }
36    }
37
38    pub fn pause(
39        &self,
40        peer_id: Option<PeerId>,
41    ) -> impl Future<Output = Result<PauseToken, Error>> {
42        let proxy = match self.proxy.as_ref() {
43            None => return futures::future::ok(None).left_future(),
44            Some(proxy) => proxy,
45        };
46
47        let (suspender_proxy, server_end) = fidl::endpoints::create_proxy();
48        let id = peer_id.map(Into::into);
49        let suspend_fut = proxy.suspend(id.as_ref(), server_end);
50
51        async move {
52            match suspender_proxy.take_event_stream().next().await {
53                Some(Ok(a2dp::StreamSuspenderEvent::OnSuspended {})) => Ok(Some(suspender_proxy)),
54                x => {
55                    warn!("Failed to suspend A2DP: {:?}", x);
56                    // Check to see the result of the suspend future.  It should finish, and it
57                    // might have finished because we couldn't connect (delayed)
58                    match suspend_fut.await {
59                        Err(fidl::Error::ClientChannelClosed { status, .. })
60                            if status == zx::Status::NOT_FOUND =>
61                        {
62                            Ok(None)
63                        }
64                        Err(e) => Err(e.into()),
65                        Ok(()) => Err(Error::UnexpectedCompletion),
66                    }
67                }
68            }
69        }
70        .right_future()
71    }
72}
73
74#[cfg(all(test, feature = "test_a2dp_controller"))]
75mod tests {
76    use super::*;
77
78    use async_utils::PollExt;
79    use fidl::endpoints::RequestStream;
80    use fuchsia_async as fasync;
81    use futures::task::Poll;
82    use std::pin::pin;
83
84    #[fuchsia::test]
85    fn when_a2dp_not_accessible() {
86        let mut exec = fasync::TestExecutor::new();
87        let control = Control::connect();
88
89        let pause_fut = control.pause(None);
90        let mut pause_fut = pin!(pause_fut);
91
92        let _ = exec.run_singlethreaded(&mut pause_fut).expect("should be Ok");
93
94        let pause_single_fut = control.pause(Some(PeerId(1)));
95        let mut pause_single_fut = pin!(pause_single_fut);
96
97        let _ = exec.run_singlethreaded(&mut pause_single_fut).expect("should be Ok");
98    }
99
100    fn expect_suspend_request(
101        exec: &mut fasync::TestExecutor,
102        requests: &mut a2dp::ControllerRequestStream,
103        expected_peer: Option<PeerId>,
104    ) -> (a2dp::ControllerSuspendResponder, a2dp::StreamSuspenderRequestStream) {
105        match exec.run_until_stalled(&mut requests.next()) {
106            Poll::Ready(Some(Ok(a2dp::ControllerRequest::Suspend {
107                responder,
108                token,
109                peer_id,
110            }))) => {
111                assert_eq!(peer_id, expected_peer.map(Into::into).map(Box::new));
112                (responder, token.into_stream())
113            }
114            x => panic!("Expected a ready controller suspend, got {:?}", x),
115        }
116    }
117
118    fn expect_suspender_close(
119        exec: &mut fasync::TestExecutor,
120        requests: &mut a2dp::StreamSuspenderRequestStream,
121    ) {
122        match exec.run_until_stalled(&mut requests.next()) {
123            Poll::Ready(None) => {}
124            x => panic!("Expected suspender to be closed, but it wasn't: {:?}", x),
125        }
126    }
127
128    #[fuchsia::test]
129    fn suspend_and_release() {
130        let mut exec = fasync::TestExecutor::new();
131        let (proxy, mut control_requests) =
132            fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
133        let control = Control::from_proxy(proxy);
134
135        let pause_fut = control.pause(Some(PeerId(1)));
136        let mut pause_fut = pin!(pause_fut);
137
138        let (responder_one, mut stream1) =
139            expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
140
141        exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
142
143        let _ = stream1.control_handle().send_on_suspended().expect("send on suspended event");
144
145        let token = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
146
147        // Should be able to have overlapping pauses.
148        let pause_fut = control.pause(None);
149        let mut pause_fut = pin!(pause_fut);
150        let (responder_two, mut stream2) =
151            expect_suspend_request(&mut exec, &mut control_requests, None);
152        stream2.control_handle().send_on_suspended().expect("should send on suspended event");
153        let token2 = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
154
155        drop(token);
156
157        expect_suspender_close(&mut exec, &mut stream1);
158        let _ = responder_one.send().unwrap();
159
160        drop(token2);
161
162        expect_suspender_close(&mut exec, &mut stream2);
163        let _ = responder_two.send().unwrap();
164    }
165
166    #[fuchsia::test]
167    fn suspend_fails() {
168        let mut exec = fasync::TestExecutor::new();
169        let (proxy, mut control_requests) =
170            fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
171        let control = Control::from_proxy(proxy);
172
173        let pause_fut = control.pause(Some(PeerId(1)));
174        let mut pause_fut = pin!(pause_fut);
175
176        let (responder, stream1) =
177            expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
178
179        drop(responder);
180        drop(control_requests);
181        drop(stream1);
182
183        let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
184    }
185
186    #[fuchsia::test]
187    fn proxy_is_closed_before_suspend_event() {
188        let mut exec = fasync::TestExecutor::new();
189        let (proxy, mut control_requests) =
190            fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
191        let control = Control::from_proxy(proxy);
192
193        let pause_fut = control.pause(Some(PeerId(1)));
194        let mut pause_fut = pin!(pause_fut);
195
196        let (responder, stream1) =
197            expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
198
199        exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
200
201        drop(stream1);
202        let _ = responder.send().expect("should send response okay");
203
204        let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
205    }
206}