1use anyhow::Error;
6use fidl::endpoints::create_endpoints;
7use fidl_fuchsia_bluetooth_avdtp_test::{
8 PeerControllerMarker, PeerControllerProxy, PeerManagerEvent, PeerManagerMarker,
9 PeerManagerProxy,
10};
11use fuchsia_async as fasync;
12use fuchsia_component::client;
13use fuchsia_sync::RwLock;
14use futures::stream::StreamExt;
15use log::*;
16use std::collections::hash_map::Entry;
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use crate::bluetooth::types::PeerFactoryMap;
21use crate::common_utils::common::macros::{fx_err_and_bail, with_line};
22
23#[derive(Debug)]
24struct AvdtpFacadeInner {
25 avdtp_service_proxy: Option<PeerManagerProxy>,
27
28 peer_map: Arc<RwLock<PeerFactoryMap>>,
30}
31
32#[derive(Debug)]
33pub struct AvdtpFacade {
34 initialized: RwLock<bool>,
35 inner: RwLock<AvdtpFacadeInner>,
36}
37
38impl AvdtpFacade {
43 pub fn new() -> AvdtpFacade {
44 AvdtpFacade {
45 initialized: RwLock::new(false),
46 inner: RwLock::new(AvdtpFacadeInner {
47 avdtp_service_proxy: None,
48 peer_map: Arc::new(RwLock::new(HashMap::new())),
49 }),
50 }
51 }
52
53 async fn create_avdtp_service_proxy(&self) -> Result<PeerManagerProxy, Error> {
55 let tag = "AvdtpFacade::create_avdtp_service_proxy";
56 match self.inner.read().avdtp_service_proxy.clone() {
57 Some(avdtp_service_proxy) => {
58 info!(
59 tag = &with_line!(tag);
60 "Current Avdtp service proxy: {:?}", avdtp_service_proxy
61 );
62 Ok(avdtp_service_proxy)
63 }
64 None => {
65 info!(tag = &with_line!(tag); "Launching A2DP and setting new Avdtp service proxy");
66
67 let avdtp_service_proxy = client::connect_to_protocol::<PeerManagerMarker>();
68 if let Err(err) = avdtp_service_proxy {
69 fx_err_and_bail!(
70 &with_line!(tag),
71 format_err!("Failed to create Avdtp service proxy: {}", err)
72 );
73 }
74 avdtp_service_proxy
75 }
76 }
77 }
78
79 pub async fn init_avdtp_service_proxy(&self) -> Result<(), Error> {
81 if *self.initialized.read() {
82 return Ok(());
83 }
84 *self.initialized.write() = true;
85
86 let tag = "AvdtpFacade::init_avdtp_service_proxy";
87 self.inner.write().avdtp_service_proxy = Some(self.create_avdtp_service_proxy().await?);
88
89 let avdtp_svc = match &self.inner.read().avdtp_service_proxy {
90 Some(p) => p.clone(),
91 None => fx_err_and_bail!(&with_line!(tag), "No AVDTP Service proxy created"),
92 };
93
94 let avdtp_service_future =
95 AvdtpFacade::monitor_avdtp_event_stream(avdtp_svc, self.inner.write().peer_map.clone());
96
97 let fut = async move {
98 let result = avdtp_service_future.await;
99 if let Err(_err) = result {
100 error!("Failed to monitor AVDTP event stream.");
101 }
102 };
103 fasync::Task::spawn(fut).detach();
104
105 Ok(())
106 }
107
108 pub async fn get_connected_peers(&self) -> Result<Vec<u64>, Error> {
110 let tag = "AvdtpFacade::get_connected_peers";
111 let peer_ids = match &self.inner.read().avdtp_service_proxy {
112 Some(p) => {
113 let connected_peers = p.connected_peers().await?;
114 let mut peer_id_list = Vec::new();
115 for peer in connected_peers {
116 peer_id_list.push(peer.value);
117 }
118 peer_id_list
119 }
120 None => fx_err_and_bail!(&with_line!(tag), "No AVDTP Service proxy created"),
121 };
122 Ok(peer_ids)
123 }
124
125 fn get_peer_controller_by_id(&self, peer_id: u64) -> Option<PeerControllerProxy> {
130 match self.inner.read().peer_map.write().get(&peer_id.to_string()) {
131 Some(p) => Some(p.clone()),
132 None => None,
133 }
134 }
135
136 pub async fn set_configuration(&self, peer_id: u64) -> Result<(), Error> {
141 let tag = "AvdtpFacade::set_configuration";
142 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
143 match p.set_configuration().await? {
144 Err(err) => {
145 let err_msg = format_err!("Error: {:?}", err);
146 fx_err_and_bail!(&with_line!(tag), err_msg)
147 }
148 Ok(()) => Ok(()),
149 }
150 } else {
151 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
152 }
153 }
154
155 pub async fn get_configuration(&self, peer_id: u64) -> Result<(), Error> {
161 let tag = "AvdtpFacade::get_configuration";
162 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
163 match p.get_configuration().await? {
164 Err(err) => {
165 let err_msg = format_err!("Error: {:?}", err);
166 fx_err_and_bail!(&with_line!(tag), err_msg)
167 }
168 Ok(()) => Ok(()),
169 }
170 } else {
171 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
172 }
173 }
174
175 pub async fn get_capabilities(&self, peer_id: u64) -> Result<(), Error> {
180 let tag = "AvdtpFacade::get_capabilities";
181 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
182 let result = p.get_capabilities().await;
183 match result {
184 Ok(capabilities) => info!("{:?}", capabilities),
185 Err(e) => fx_err_and_bail!(
186 &with_line!(tag),
187 format_err!("Error getting capabilities: {:?}", e)
188 ),
189 };
190 Ok(())
191 } else {
192 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
193 }
194 }
195
196 pub async fn get_all_capabilities(&self, peer_id: u64) -> Result<(), Error> {
201 let tag = "AvdtpFacade::get_all_capabilities";
202 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
203 let result = p.get_all_capabilities().await;
204 match result {
205 Ok(capabilities) => info!("{:?}", capabilities),
206 Err(e) => fx_err_and_bail!(
207 &with_line!(tag),
208 format_err!("Error getting capabilities: {:?}", e)
209 ),
210 };
211 Ok(())
212 } else {
213 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
214 }
215 }
216
217 pub async fn reconfigure_stream(&self, peer_id: u64) -> Result<(), Error> {
223 let tag = "AvdtpFacade::reconfigure_stream";
224 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
225 match p.reconfigure_stream().await? {
226 Err(err) => {
227 let err_msg = format_err!("Error: {:?}", err);
228 fx_err_and_bail!(&with_line!(tag), err_msg)
229 }
230 Ok(()) => Ok(()),
231 }
232 } else {
233 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
234 }
235 }
236
237 pub async fn suspend_stream(&self, peer_id: u64) -> Result<(), Error> {
244 let tag = "AvdtpFacade::suspend_stream";
245 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
246 match p.suspend_stream().await? {
247 Err(err) => {
248 let err_msg = format_err!("Error: {:?}", err);
249 fx_err_and_bail!(&with_line!(tag), err_msg)
250 }
251 Ok(()) => Ok(()),
252 }
253 } else {
254 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
255 }
256 }
257
258 pub async fn suspend_and_reconfigure(&self, peer_id: u64) -> Result<(), Error> {
263 let tag = "AvdtpFacade::suspend_and_reconfigure";
264 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
265 match p.suspend_and_reconfigure().await? {
266 Err(err) => {
267 let err_msg = format_err!("Error: {:?}", err);
268 fx_err_and_bail!(&with_line!(tag), err_msg)
269 }
270 Ok(()) => Ok(()),
271 }
272 } else {
273 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
274 }
275 }
276
277 pub async fn release_stream(&self, peer_id: u64) -> Result<(), Error> {
283 let tag = "AvdtpFacade::release_stream";
284 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
285 match p.release_stream().await? {
286 Err(err) => {
287 let err_msg = format_err!("Error: {:?}", err);
288 fx_err_and_bail!(&with_line!(tag), err_msg)
289 }
290 Ok(()) => Ok(()),
291 }
292 } else {
293 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
294 }
295 }
296
297 pub async fn establish_stream(&self, peer_id: u64) -> Result<(), Error> {
302 let tag = "AvdtpFacade::establish_stream";
303 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
304 match p.establish_stream().await? {
305 Err(err) => {
306 let err_msg = format_err!("Error: {:?}", err);
307 fx_err_and_bail!(&with_line!(tag), err_msg)
308 }
309 Ok(()) => Ok(()),
310 }
311 } else {
312 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
313 }
314 }
315
316 pub async fn start_stream(&self, peer_id: u64) -> Result<(), Error> {
321 let tag = "AvdtpFacade::start_stream";
322 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
323 match p.start_stream().await? {
324 Err(err) => {
325 let err_msg = format_err!("Error: {:?}", err);
326 fx_err_and_bail!(&with_line!(tag), err_msg)
327 }
328 Ok(()) => Ok(()),
329 }
330 } else {
331 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
332 }
333 }
334
335 pub async fn abort_stream(&self, peer_id: u64) -> Result<(), Error> {
340 let tag = "AvdtpFacade::abort_stream";
341 if let Some(p) = self.get_peer_controller_by_id(peer_id) {
342 match p.abort_stream().await? {
343 Err(err) => {
344 let err_msg = format_err!("Error: {:?}", err);
345 fx_err_and_bail!(&with_line!(tag), err_msg)
346 }
347 Ok(()) => Ok(()),
348 }
349 } else {
350 fx_err_and_bail!(&with_line!(tag), format!("Peer id {:?} not found.", peer_id))
351 }
352 }
353
354 async fn monitor_avdtp_event_stream(
356 avdtp_svc: PeerManagerProxy,
357 peer_map: Arc<RwLock<PeerFactoryMap>>,
358 ) -> Result<(), Error> {
359 let tag = "AvdtpFacade::monitor_avdtp_event_stream";
360 let mut stream = avdtp_svc.take_event_stream();
361
362 while let Some(evt) = stream.next().await {
363 match evt {
364 Ok(e) => match e {
365 PeerManagerEvent::OnPeerConnected { peer_id } => {
366 let (client, server) = create_endpoints::<PeerControllerMarker>();
367 let peer = client.into_proxy();
368 match peer_map.write().entry(peer_id.value.to_string()) {
369 Entry::Occupied(mut entry) => {
370 entry.insert(peer);
371 info!("Overriding device in PeerFactoryMap");
372 }
373 Entry::Vacant(entry) => {
374 entry.insert(peer);
375 info!("Inserted device into PeerFactoryMap");
376 }
377 };
378 let _ = avdtp_svc.get_peer(&peer_id, server);
380 info!("Getting peer with peer_id: {}", peer_id.value);
381 }
382 },
383 Err(e) => {
384 let log_err = format_err!("Error during handling request stream: {}", e);
385 fx_err_and_bail!(&with_line!(tag), log_err)
386 }
387 }
388 }
389 Ok(())
390 }
391
392 fn clear(&self) {
394 self.inner.write().peer_map.write().clear();
395 self.inner.write().avdtp_service_proxy = None;
396 }
397
398 pub async fn remove_service(&self) {
400 self.clear()
401 }
402
403 pub async fn cleanup(&self) -> Result<(), Error> {
405 self.clear();
406 Ok(())
407 }
408}