1use fidl_fuchsia_bluetooth_internal_a2dp as a2dp;
6use fuchsia_bluetooth::types::PeerId;
7use futures::{Future, FutureExt, StreamExt};
8use log::warn;
9use thiserror::Error;
10
11#[derive(Error, Debug)]
12pub enum Error {
13 #[error("The suspend future unexpected completed")]
14 UnexpectedCompletion,
15 #[error("FIDL Error: {:?}", .0)]
16 Fidl(#[from] fidl::Error),
17}
18
19#[derive(Clone)]
22pub struct Control {
23 proxy: Option<a2dp::ControllerProxy>,
24}
25
26pub type PauseToken = Option<a2dp::StreamSuspenderProxy>;
27
28impl Control {
29 pub fn connect() -> Self {
30 let proxy = fuchsia_component::client::connect_to_protocol::<a2dp::ControllerMarker>().ok();
31 Self { proxy }
32 }
33
34 #[cfg(all(test, feature = "test_a2dp_controller"))]
35 fn from_proxy(proxy: a2dp::ControllerProxy) -> Self {
36 Self { proxy: Some(proxy) }
37 }
38
39 pub fn pause(
40 &self,
41 peer_id: Option<PeerId>,
42 ) -> impl Future<Output = Result<PauseToken, Error>> {
43 let proxy = match self.proxy.as_ref() {
44 None => return futures::future::ok(None).left_future(),
45 Some(proxy) => proxy,
46 };
47
48 let (suspender_proxy, server_end) = fidl::endpoints::create_proxy();
49 let id = peer_id.map(Into::into);
50 let suspend_fut = proxy.suspend(id.as_ref(), server_end);
51
52 async move {
53 match suspender_proxy.take_event_stream().next().await {
54 Some(Ok(a2dp::StreamSuspenderEvent::OnSuspended {})) => Ok(Some(suspender_proxy)),
55 x => {
56 warn!("Failed to suspend A2DP: {:?}", x);
57 match suspend_fut.await {
60 Err(fidl::Error::ClientChannelClosed { status, .. })
61 if status == zx::Status::NOT_FOUND =>
62 {
63 Ok(None)
64 }
65 Err(e) => Err(e.into()),
66 Ok(()) => Err(Error::UnexpectedCompletion),
67 }
68 }
69 }
70 }
71 .right_future()
72 }
73}
74
75#[cfg(all(test, feature = "test_a2dp_controller"))]
76mod tests {
77 use super::*;
78
79 use async_utils::PollExt;
80 use fidl::endpoints::RequestStream;
81 use fuchsia_async as fasync;
82 use futures::task::Poll;
83 use std::pin::pin;
84
85 #[fuchsia::test]
86 fn when_a2dp_not_accessible() {
87 let mut exec = fasync::TestExecutor::new();
88 let control = Control::connect();
89
90 let pause_fut = control.pause(None);
91 let mut pause_fut = pin!(pause_fut);
92
93 let _ = exec.run_singlethreaded(&mut pause_fut).expect("should be Ok");
94
95 let pause_single_fut = control.pause(Some(PeerId(1)));
96 let mut pause_single_fut = pin!(pause_single_fut);
97
98 let _ = exec.run_singlethreaded(&mut pause_single_fut).expect("should be Ok");
99 }
100
101 fn expect_suspend_request(
102 exec: &mut fasync::TestExecutor,
103 requests: &mut a2dp::ControllerRequestStream,
104 expected_peer: Option<PeerId>,
105 ) -> (a2dp::ControllerSuspendResponder, a2dp::StreamSuspenderRequestStream) {
106 match exec.run_until_stalled(&mut requests.next()) {
107 Poll::Ready(Some(Ok(a2dp::ControllerRequest::Suspend {
108 responder,
109 token,
110 peer_id,
111 }))) => {
112 assert_eq!(peer_id, expected_peer.map(Into::into).map(Box::new));
113 (responder, token.into_stream())
114 }
115 x => panic!("Expected a ready controller suspend, got {:?}", x),
116 }
117 }
118
119 fn expect_suspender_close(
120 exec: &mut fasync::TestExecutor,
121 requests: &mut a2dp::StreamSuspenderRequestStream,
122 ) {
123 match exec.run_until_stalled(&mut requests.next()) {
124 Poll::Ready(None) => {}
125 x => panic!("Expected suspender to be closed, but it wasn't: {:?}", x),
126 }
127 }
128
129 #[fuchsia::test]
130 fn suspend_and_release() {
131 let mut exec = fasync::TestExecutor::new();
132 let (proxy, mut control_requests) =
133 fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
134 let control = Control::from_proxy(proxy);
135
136 let pause_fut = control.pause(Some(PeerId(1)));
137 let mut pause_fut = pin!(pause_fut);
138
139 let (responder_one, mut stream1) =
140 expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
141
142 exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
143
144 let _ = stream1.control_handle().send_on_suspended().expect("send on suspended event");
145
146 let token = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
147
148 let pause_fut = control.pause(None);
150 let mut pause_fut = pin!(pause_fut);
151 let (responder_two, mut stream2) =
152 expect_suspend_request(&mut exec, &mut control_requests, None);
153 stream2.control_handle().send_on_suspended().expect("should send on suspended event");
154 let token2 = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
155
156 drop(token);
157
158 expect_suspender_close(&mut exec, &mut stream1);
159 let _ = responder_one.send().unwrap();
160
161 drop(token2);
162
163 expect_suspender_close(&mut exec, &mut stream2);
164 let _ = responder_two.send().unwrap();
165 }
166
167 #[fuchsia::test]
168 fn suspend_fails() {
169 let mut exec = fasync::TestExecutor::new();
170 let (proxy, mut control_requests) =
171 fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
172 let control = Control::from_proxy(proxy);
173
174 let pause_fut = control.pause(Some(PeerId(1)));
175 let mut pause_fut = pin!(pause_fut);
176
177 let (responder, stream1) =
178 expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
179
180 drop(responder);
181 drop(control_requests);
182 drop(stream1);
183
184 let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
185 }
186
187 #[fuchsia::test]
188 fn proxy_is_closed_before_suspend_event() {
189 let mut exec = fasync::TestExecutor::new();
190 let (proxy, mut control_requests) =
191 fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
192 let control = Control::from_proxy(proxy);
193
194 let pause_fut = control.pause(Some(PeerId(1)));
195 let mut pause_fut = pin!(pause_fut);
196
197 let (responder, stream1) =
198 expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
199
200 exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
201
202 drop(stream1);
203 let _ = responder.send().expect("should send response okay");
204
205 let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
206 }
207}