1use fidl_fuchsia_bluetooth_internal_a2dp as a2dp;
6use fuchsia_bluetooth::types::PeerId;
7use futures::{Future, FutureExt, StreamExt};
8use log::warn;
9use thiserror::Error;
10
11#[derive(Error, Debug)]
12pub enum Error {
13 #[error("The suspend future unexpected completed")]
14 UnexpectedCompletion,
15 #[error("FIDL Error: {:?}", .0)]
16 Fidl(#[from] fidl::Error),
17}
18
19#[derive(Clone, Default)]
22pub struct Control {
23 proxy: Option<a2dp::ControllerProxy>,
24}
25
26pub type PauseToken = Option<a2dp::StreamSuspenderProxy>;
27
28impl Control {
29 pub fn connect() -> Self {
30 let proxy = fuchsia_component::client::connect_to_protocol::<a2dp::ControllerMarker>().ok();
31 Self { proxy }
32 }
33
34 pub fn from_proxy(proxy: a2dp::ControllerProxy) -> Self {
35 Self { proxy: Some(proxy) }
36 }
37
38 pub fn pause(
39 &self,
40 peer_id: Option<PeerId>,
41 ) -> impl Future<Output = Result<PauseToken, Error>> {
42 let proxy = match self.proxy.as_ref() {
43 None => return futures::future::ok(None).left_future(),
44 Some(proxy) => proxy,
45 };
46
47 let (suspender_proxy, server_end) = fidl::endpoints::create_proxy();
48 let id = peer_id.map(Into::into);
49 let suspend_fut = proxy.suspend(id.as_ref(), server_end);
50
51 async move {
52 match suspender_proxy.take_event_stream().next().await {
53 Some(Ok(a2dp::StreamSuspenderEvent::OnSuspended {})) => Ok(Some(suspender_proxy)),
54 x => {
55 warn!("Failed to suspend A2DP: {:?}", x);
56 match suspend_fut.await {
59 Err(fidl::Error::ClientChannelClosed { status, .. })
60 if status == zx::Status::NOT_FOUND =>
61 {
62 Ok(None)
63 }
64 Err(e) => Err(e.into()),
65 Ok(()) => Err(Error::UnexpectedCompletion),
66 }
67 }
68 }
69 }
70 .right_future()
71 }
72}
73
74#[cfg(all(test, feature = "test_a2dp_controller"))]
75mod tests {
76 use super::*;
77
78 use async_utils::PollExt;
79 use fidl::endpoints::RequestStream;
80 use fuchsia_async as fasync;
81 use futures::task::Poll;
82 use std::pin::pin;
83
84 #[fuchsia::test]
85 fn when_a2dp_not_accessible() {
86 let mut exec = fasync::TestExecutor::new();
87 let control = Control::connect();
88
89 let pause_fut = control.pause(None);
90 let mut pause_fut = pin!(pause_fut);
91
92 let _ = exec.run_singlethreaded(&mut pause_fut).expect("should be Ok");
93
94 let pause_single_fut = control.pause(Some(PeerId(1)));
95 let mut pause_single_fut = pin!(pause_single_fut);
96
97 let _ = exec.run_singlethreaded(&mut pause_single_fut).expect("should be Ok");
98 }
99
100 fn expect_suspend_request(
101 exec: &mut fasync::TestExecutor,
102 requests: &mut a2dp::ControllerRequestStream,
103 expected_peer: Option<PeerId>,
104 ) -> (a2dp::ControllerSuspendResponder, a2dp::StreamSuspenderRequestStream) {
105 match exec.run_until_stalled(&mut requests.next()) {
106 Poll::Ready(Some(Ok(a2dp::ControllerRequest::Suspend {
107 responder,
108 token,
109 peer_id,
110 }))) => {
111 assert_eq!(peer_id, expected_peer.map(Into::into).map(Box::new));
112 (responder, token.into_stream())
113 }
114 x => panic!("Expected a ready controller suspend, got {:?}", x),
115 }
116 }
117
118 fn expect_suspender_close(
119 exec: &mut fasync::TestExecutor,
120 requests: &mut a2dp::StreamSuspenderRequestStream,
121 ) {
122 match exec.run_until_stalled(&mut requests.next()) {
123 Poll::Ready(None) => {}
124 x => panic!("Expected suspender to be closed, but it wasn't: {:?}", x),
125 }
126 }
127
128 #[fuchsia::test]
129 fn suspend_and_release() {
130 let mut exec = fasync::TestExecutor::new();
131 let (proxy, mut control_requests) =
132 fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
133 let control = Control::from_proxy(proxy);
134
135 let pause_fut = control.pause(Some(PeerId(1)));
136 let mut pause_fut = pin!(pause_fut);
137
138 let (responder_one, mut stream1) =
139 expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
140
141 exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
142
143 let _ = stream1.control_handle().send_on_suspended().expect("send on suspended event");
144
145 let token = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
146
147 let pause_fut = control.pause(None);
149 let mut pause_fut = pin!(pause_fut);
150 let (responder_two, mut stream2) =
151 expect_suspend_request(&mut exec, &mut control_requests, None);
152 stream2.control_handle().send_on_suspended().expect("should send on suspended event");
153 let token2 = exec.run_until_stalled(&mut pause_fut).expect("done now").expect("token ok");
154
155 drop(token);
156
157 expect_suspender_close(&mut exec, &mut stream1);
158 let _ = responder_one.send().unwrap();
159
160 drop(token2);
161
162 expect_suspender_close(&mut exec, &mut stream2);
163 let _ = responder_two.send().unwrap();
164 }
165
166 #[fuchsia::test]
167 fn suspend_fails() {
168 let mut exec = fasync::TestExecutor::new();
169 let (proxy, mut control_requests) =
170 fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
171 let control = Control::from_proxy(proxy);
172
173 let pause_fut = control.pause(Some(PeerId(1)));
174 let mut pause_fut = pin!(pause_fut);
175
176 let (responder, stream1) =
177 expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
178
179 drop(responder);
180 drop(control_requests);
181 drop(stream1);
182
183 let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
184 }
185
186 #[fuchsia::test]
187 fn proxy_is_closed_before_suspend_event() {
188 let mut exec = fasync::TestExecutor::new();
189 let (proxy, mut control_requests) =
190 fidl::endpoints::create_proxy_and_stream::<a2dp::ControllerMarker>();
191 let control = Control::from_proxy(proxy);
192
193 let pause_fut = control.pause(Some(PeerId(1)));
194 let mut pause_fut = pin!(pause_fut);
195
196 let (responder, stream1) =
197 expect_suspend_request(&mut exec, &mut control_requests, Some(PeerId(1)));
198
199 exec.run_until_stalled(&mut pause_fut).expect_pending("shouldn't be done");
200
201 drop(stream1);
202 let _ = responder.send().expect("should send response okay");
203
204 let _ = exec.run_singlethreaded(&mut pause_fut).expect_err("pause error");
205 }
206}