1use anyhow::{format_err, Context as _, Error};
6use fidl::endpoints;
7use fidl_fuchsia_bluetooth_gatt2::{
8 Characteristic, CharacteristicNotifierMarker, CharacteristicNotifierRequest,
9 CharacteristicNotifierRequestStream, ClientEventStream, ClientProxy, Handle, LongReadOptions,
10 ReadOptions, RemoteServiceEventStream, RemoteServiceProxy, ServiceHandle, ServiceInfo,
11 ShortReadOptions, WriteMode, WriteOptions,
12};
13use fidl_fuchsia_bluetooth_le::{
14 CentralMarker, CentralProxy, ConnectionEventStream, ConnectionOptions, ConnectionProxy, Filter,
15 ScanOptions, ScanResultWatcherMarker, ScanResultWatcherProxy,
16};
17use fuchsia_sync::RwLock;
18use futures::{select, StreamExt};
19use log::*;
20use std::collections::HashMap;
21use std::str::FromStr;
22use std::sync::Arc;
23use {fuchsia_async as fasync, fuchsia_component as app};
24
25use fuchsia_bluetooth::types::le::Peer;
26use fuchsia_bluetooth::types::{PeerId, Uuid};
27
28use crate::bluetooth::types::{BleScanResponse, SerializableReadByTypeResult};
29use crate::common_utils::common::macros::with_line;
30
31#[derive(Debug)]
32struct Client {
33 proxy: ClientProxy,
34 _connection: ConnectionProxy,
35 services: HashMap<u64, ServiceInfo>,
38 watch_services_task: Option<fasync::Task<()>>,
41 _events_task: fasync::Task<()>,
43}
44
45#[derive(Debug)]
46struct Central {
47 proxy: CentralProxy,
48 _event_task: fasync::Task<Result<(), Error>>,
49}
50
51#[derive(Debug)]
52struct RemoteService {
53 proxy: RemoteServiceProxy,
54 _event_task: fasync::Task<()>,
55 notifier_tasks: HashMap<u64, fasync::Task<()>>,
57 peer_id: PeerId,
58 service_id: u64,
59}
60
61#[derive(Debug)]
62pub struct InnerGattClientFacade {
63 active_remote_service: Option<RemoteService>,
64 central: Option<Central>,
65 scan_results: HashMap<PeerId, Peer>,
66 clients: HashMap<PeerId, Client>,
67 scan_task: Option<fasync::Task<()>>,
68}
69
70#[derive(Debug)]
75pub struct GattClientFacade {
76 inner: Arc<RwLock<InnerGattClientFacade>>,
77}
78
79impl GattClientFacade {
80 pub fn new() -> GattClientFacade {
81 GattClientFacade {
82 inner: Arc::new(RwLock::new(InnerGattClientFacade {
83 active_remote_service: None,
84 central: None,
85 scan_results: HashMap::new(),
86 clients: HashMap::new(),
87 scan_task: None,
88 })),
89 }
90 }
91
92 pub async fn stop_scan(&self) -> Result<(), Error> {
93 let tag = "GattClientFacade::stop_scan";
94 if self.inner.write().scan_task.take().is_some() {
95 info!(tag = &with_line!(tag); "Scan stopped");
96 } else {
97 info!(tag = &with_line!(tag); "No scan was running");
98 }
99 Ok(())
100 }
101
102 pub async fn start_scan(&self, filter: Option<Filter>) -> Result<(), Error> {
103 let tag = "GattClientFacade::start_scan";
104
105 self.inner.write().scan_results.clear();
106
107 GattClientFacade::set_central_proxy(self.inner.clone());
109
110 let central = self
111 .inner
112 .read()
113 .central
114 .as_ref()
115 .ok_or_else(|| format_err!("No central proxy created."))?
116 .proxy
117 .clone();
118
119 let options = ScanOptions {
120 filters: Some(vec![filter.unwrap_or(Filter::default())]),
121 ..Default::default()
122 };
123
124 let (watcher_proxy, watcher_server) =
125 fidl::endpoints::create_proxy::<ScanResultWatcherMarker>();
126
127 let scan_fut = central.scan(&options, watcher_server);
130 fasync::Task::spawn(async move {
131 if let Err(e) = scan_fut.await {
132 warn!(tag = &with_line!(tag); "FIDL error during scan: {:?}", e);
133 }
134 })
135 .detach();
136
137 self.inner.write().scan_task = Some(fasync::Task::spawn(
138 GattClientFacade::scan_result_watcher_task(self.inner.clone(), watcher_proxy),
139 ));
140
141 info!(tag = &with_line!(tag); "Scan started");
142 Ok(())
143 }
144
145 async fn scan_result_watcher_task(
146 inner: Arc<RwLock<InnerGattClientFacade>>,
147 watcher_proxy: ScanResultWatcherProxy,
148 ) {
149 let tag = "GattClientFacade::scan_result_watcher_task";
150 let mut event_stream = watcher_proxy.take_event_stream();
151 let mut watch_fut = watcher_proxy.watch();
152 loop {
153 select! {
154 watch_result = watch_fut => {
155 let peers = match watch_result {
156 Ok(peers) => peers,
157 Err(e) => {
158 info!(
159 tag = &with_line!(tag);
160 "FIDL error calling ScanResultWatcher::Watch(): {}", e
161 );
162 break;
163 }};
164 for fidl_peer in peers {
165 let peer: Peer = fidl_peer.try_into().unwrap();
166 debug!(tag = &with_line!(tag); "Peer discovered (id: {}, name: {:?})", peer.id, peer.name);
167 inner.write().scan_results.insert(peer.id, peer);
168 }
169 watch_fut = watcher_proxy.watch();
170 },
171 event = event_stream.next() => {
172 if let Some(Err(err)) = event {
173 info!(tag = &with_line!(tag); "ScanResultWatcher error: {:?}", err);
174 }
175 break; }
177 }
178 }
179 inner.write().scan_task = None;
180 info!(tag = &with_line!(tag); "ScanResultWatcher closed");
181 }
182
183 async fn active_remote_service_event_task(
184 inner: Arc<RwLock<InnerGattClientFacade>>,
185 event_stream: RemoteServiceEventStream,
186 ) {
187 let tag = "GattClientFacade::active_remote_service_event_task";
188 event_stream.map(|_| ()).collect::<()>().await;
190 info!(tag = &with_line!(tag); "RemoteService closed");
191 inner.write().active_remote_service = None;
192 }
193
194 pub async fn gattc_connect_to_service(
195 &self,
196 peer_id: String,
197 service_id: u64,
198 ) -> Result<(), Error> {
199 let tag = "GattClientFacade::gattc_connect_to_service";
200 let peer_id = PeerId::from_str(&peer_id)?;
201
202 if let Some(service) = self.inner.read().active_remote_service.as_ref() {
204 if service.peer_id == peer_id && service.service_id == service_id {
205 info!(
206 tag = &with_line!(tag);
207 "Aready connected to service (peer: {}, service: {})", peer_id, service_id
208 );
209 return Ok(());
210 }
211 }
212
213 self.inner.write().active_remote_service = None;
214
215 let client_proxy = self.get_client_proxy(peer_id).ok_or_else(|| {
216 error!(
217 tag = &with_line!(tag);
218 "Unable to connect to service {} (not connected to peer {})", service_id, peer_id
219 );
220 format_err!("Not connected to peer")
221 })?;
222 let (proxy, server) = endpoints::create_proxy();
223 client_proxy.connect_to_service(&ServiceHandle { value: service_id }, server)?;
224 let event_stream = proxy.take_event_stream();
225 let event_task = fasync::Task::spawn(GattClientFacade::active_remote_service_event_task(
226 self.inner.clone(),
227 event_stream,
228 ));
229 self.inner.write().active_remote_service = Some(RemoteService {
230 proxy,
231 _event_task: event_task,
232 notifier_tasks: HashMap::new(),
233 peer_id,
234 service_id,
235 });
236 Ok(())
237 }
238
239 pub async fn gattc_discover_characteristics(&self) -> Result<Vec<Characteristic>, Error> {
240 let discover_characteristics_fut = self
241 .get_remote_service_proxy()
242 .ok_or_else(|| format_err!("RemoteService proxy not available"))?
243 .discover_characteristics();
244 discover_characteristics_fut.await.map_err(|_| format_err!("Failed to send message"))
245 }
246
247 async fn gattc_write_char_internal(
248 &self,
249 id: u64,
250 offset: u16,
251 write_value: Vec<u8>,
252 mode: WriteMode,
253 ) -> Result<(), Error> {
254 let handle = Handle { value: id };
255 let options =
256 WriteOptions { offset: Some(offset), write_mode: Some(mode), ..Default::default() };
257 let write_fut = self
258 .get_remote_service_proxy()
259 .ok_or_else(|| format_err!("No active service"))?
260 .write_characteristic(&handle, &write_value, &options);
261 write_fut
262 .await
263 .map_err(|_| format_err!("Failed to send message"))?
264 .map_err(|err| format_err!("Failed to write characteristic: {:?}", err))
265 }
266
267 pub async fn gattc_write_char_by_id(&self, id: u64, write_value: Vec<u8>) -> Result<(), Error> {
268 self.gattc_write_char_internal(id, 0, write_value, WriteMode::Default).await
269 }
270
271 pub async fn gattc_write_long_char_by_id(
272 &self,
273 id: u64,
274 offset: u16,
275 write_value: Vec<u8>,
276 reliable_mode: bool,
277 ) -> Result<(), Error> {
278 self.gattc_write_char_internal(
279 id,
280 offset,
281 write_value,
282 if reliable_mode { WriteMode::Reliable } else { WriteMode::Default },
283 )
284 .await
285 }
286
287 pub async fn gattc_write_char_by_id_without_response(
288 &self,
289 id: u64,
290 write_value: Vec<u8>,
291 ) -> Result<(), Error> {
292 self.gattc_write_char_internal(id, 0, write_value, WriteMode::WithoutResponse).await
293 }
294
295 async fn gattc_read_char_internal(
296 &self,
297 id: u64,
298 options: ReadOptions,
299 ) -> Result<Vec<u8>, Error> {
300 let handle = Handle { value: id };
301 let read_fut = self
302 .get_remote_service_proxy()
303 .ok_or_else(|| format_err!("RemoteService proxy not available"))?
304 .read_characteristic(&handle, &options);
305 let read_value = read_fut
306 .await
307 .map_err(|_| format_err!("Failed to send message"))?
308 .map_err(|err| format_err!("Failed to read long characteristic: {:?}", err))?;
309 Ok(read_value.value.unwrap())
310 }
311
312 pub async fn gattc_read_char_by_id(&self, id: u64) -> Result<Vec<u8>, Error> {
313 self.gattc_read_char_internal(id, ReadOptions::ShortRead(ShortReadOptions {})).await
314 }
315
316 pub async fn gattc_read_long_char_by_id(
317 &self,
318 id: u64,
319 offset: u16,
320 max_bytes: u16,
321 ) -> Result<Vec<u8>, Error> {
322 self.gattc_read_char_internal(
323 id,
324 ReadOptions::LongRead(LongReadOptions {
325 offset: Some(offset),
326 max_bytes: Some(max_bytes),
327 ..Default::default()
328 }),
329 )
330 .await
331 }
332
333 pub async fn gattc_read_char_by_type(
334 &self,
335 raw_uuid: String,
336 ) -> Result<Vec<SerializableReadByTypeResult>, Error> {
337 let uuid = Uuid::from_str(&raw_uuid)
338 .map_err(|e| format_err!("Unable to convert to Uuid: {:?}", e))?;
339 let fidl_uuid = fidl_fuchsia_bluetooth::Uuid::from(uuid);
340 let read_fut = self
341 .get_remote_service_proxy()
342 .ok_or_else(|| format_err!("RemoteService proxy not available"))?
343 .read_by_type(&fidl_uuid);
344 let results = read_fut
345 .await
346 .map_err(|err| format_err!("FIDL error: {:?}", err))?
347 .map_err(|err| format_err!("Failed to read characteristic by type: {:?}", err))?
348 .into_iter()
349 .filter(|r| r.error.is_none())
350 .map(|r| SerializableReadByTypeResult::new(r).unwrap())
351 .collect();
352 Ok(results)
353 }
354
355 async fn gattc_read_desc_internal(
356 &self,
357 id: u64,
358 options: ReadOptions,
359 ) -> Result<Vec<u8>, Error> {
360 let handle = Handle { value: id };
361 let read_fut = self
362 .get_remote_service_proxy()
363 .ok_or_else(|| format_err!("RemoteService proxy not available"))?
364 .read_descriptor(&handle, &options);
365 let read_value = read_fut
366 .await
367 .map_err(|_| format_err!("Failed to send message"))?
368 .map_err(|err| format_err!("Failed to read descriptor: {:?}", err))?;
369 Ok(read_value.value.unwrap())
370 }
371
372 pub async fn gattc_read_desc_by_id(&self, id: u64) -> Result<Vec<u8>, Error> {
373 self.gattc_read_desc_internal(id, ReadOptions::ShortRead(ShortReadOptions {})).await
374 }
375
376 pub async fn gattc_read_long_desc_by_id(
377 &self,
378 id: u64,
379 offset: u16,
380 max_bytes: u16,
381 ) -> Result<Vec<u8>, Error> {
382 self.gattc_read_desc_internal(
383 id,
384 ReadOptions::LongRead(LongReadOptions {
385 offset: Some(offset),
386 max_bytes: Some(max_bytes),
387 ..Default::default()
388 }),
389 )
390 .await
391 }
392
393 pub async fn gattc_write_desc_by_id(&self, id: u64, write_value: Vec<u8>) -> Result<(), Error> {
394 self.gattc_write_long_desc_by_id(id, 0, write_value).await
395 }
396
397 pub async fn gattc_write_long_desc_by_id(
398 &self,
399 id: u64,
400 offset: u16,
401 write_value: Vec<u8>,
402 ) -> Result<(), Error> {
403 let handle = Handle { value: id };
404 let options = WriteOptions { offset: Some(offset), ..Default::default() };
405 let write_fut = self
406 .get_remote_service_proxy()
407 .ok_or_else(|| format_err!("RemoteService proxy not available"))?
408 .write_descriptor(&handle, &write_value, &options);
409 write_fut
410 .await
411 .map_err(|_| format_err!("Failed to send message"))?
412 .map_err(|err| format_err!("Failed to write descriptor: {:?}", err))
413 }
414
415 async fn notifier_task(
416 inner: Arc<RwLock<InnerGattClientFacade>>,
417 id: u64,
418 mut request_stream: CharacteristicNotifierRequestStream,
419 ) {
420 let tag = "GattClientFacade::notifier_task";
421 while let Ok(event) = request_stream.select_next_some().await {
422 match event {
423 CharacteristicNotifierRequest::OnNotification { value, responder } => {
424 info!(
425 tag = &with_line!(tag);
426 "Received notification (id: {}, value: {:?})",
427 id,
428 value.value.unwrap()
429 );
430 let _ = responder.send();
431 }
432 }
433 }
434 info!(tag = &with_line!(tag); "CharacteristicNotifier closed (id: {})", id);
435 inner.write().active_remote_service.as_mut().and_then(|s| s.notifier_tasks.remove(&id));
436 }
437
438 pub async fn gattc_toggle_notify_characteristic(
439 &self,
440 id: u64,
441 enable: bool,
442 ) -> Result<(), Error> {
443 let (register_fut, request_stream) = {
444 let mut inner = self.inner.write();
445 let service = inner
446 .active_remote_service
447 .as_mut()
448 .ok_or_else(|| format_err!("Not connected to a service"))?;
449
450 if !enable {
451 service.notifier_tasks.remove(&id);
452 return Ok(());
453 }
454 if service.notifier_tasks.contains_key(&id) {
455 return Ok(());
456 }
457
458 let (client_end, request_stream) =
459 fidl::endpoints::create_request_stream::<CharacteristicNotifierMarker>();
460 let register_fut =
461 service.proxy.register_characteristic_notifier(&Handle { value: id }, client_end);
462
463 (register_fut, request_stream)
464 };
465 register_fut
466 .await
467 .map_err(|e| format_err!("FIDL error: {:?}", e))?
468 .map_err(|e| format_err!("Error registering notifier: {:?}", e))?;
469
470 let notifier_task = fasync::Task::spawn(GattClientFacade::notifier_task(
471 self.inner.clone(),
472 id,
473 request_stream,
474 ));
475 self.inner
476 .write()
477 .active_remote_service
478 .as_mut()
479 .ok_or_else(|| format_err!("Not connected to a service"))?
480 .notifier_tasks
481 .insert(id, notifier_task);
482 Ok(())
483 }
484
485 async fn watch_services_and_update_map(
487 inner: &Arc<RwLock<InnerGattClientFacade>>,
488 peer_id: &PeerId,
489 ) -> Result<(), Error> {
490 let client_proxy = inner
491 .read()
492 .clients
493 .get(peer_id)
494 .ok_or_else(|| format_err!("Not connected to peer"))?
495 .proxy
496 .clone();
497 let watch_fut = client_proxy.watch_services(&[]);
498 let (updated, removed) =
499 watch_fut.await.map_err(|_| format_err!("FIDL error calling WatchServices()"))?;
500
501 let mut inner = inner.write();
504 let services = &mut inner
505 .clients
506 .get_mut(peer_id)
507 .ok_or_else(|| format_err!("Not connected to peer"))?
508 .services;
509 for handle in removed {
510 services.remove(&handle.value);
511 }
512 for svc in updated {
513 services.insert(svc.handle.unwrap().value, svc);
514 }
515 Ok(())
516 }
517
518 async fn watch_services_task(inner: Arc<RwLock<InnerGattClientFacade>>, peer_id: PeerId) -> () {
519 loop {
520 let tag = "GattClientFacade::watch_services_task";
521 if let Err(err) =
522 GattClientFacade::watch_services_and_update_map(&inner, &peer_id).await
523 {
524 warn!(tag = &with_line!(tag); "{}", err);
525 return;
526 }
527 }
528 }
529
530 pub async fn list_services(&self, id: String) -> Result<Vec<ServiceInfo>, Error> {
531 let peer_id = PeerId::from_str(&id).map_err(|_| format_err!("Invalid peer id"))?;
532
533 {
534 let inner = self.inner.read();
535 let client =
536 inner.clients.get(&peer_id).ok_or_else(|| format_err!("Not connected to peer"))?;
537 if client.watch_services_task.is_some() {
539 return Ok(client.services.iter().map(|(_, svc)| svc.clone()).collect());
540 }
541 }
542
543 GattClientFacade::watch_services_and_update_map(&self.inner, &peer_id).await?;
545 let task =
546 fasync::Task::spawn(GattClientFacade::watch_services_task(self.inner.clone(), peer_id));
547 let mut inner = self.inner.write();
548 let client =
549 inner.clients.get_mut(&peer_id).ok_or_else(|| format_err!("Not connected to peer"))?;
550 client.watch_services_task = Some(task);
551
552 Ok(client.services.iter().map(|(_, svc)| svc.clone()).collect())
553 }
554
555 pub fn get_client_proxy(&self, id: PeerId) -> Option<ClientProxy> {
556 self.inner.read().clients.get(&id).map(|c| c.proxy.clone())
557 }
558
559 async fn central_event_task(inner: Arc<RwLock<InnerGattClientFacade>>) -> Result<(), Error> {
560 let tag = "GattClientFacade::central_event_task";
561
562 let stream = inner
563 .write()
564 .central
565 .as_ref()
566 .ok_or_else(|| format_err!("Central not set"))?
567 .proxy
568 .take_event_stream();
569
570 stream.map(|_| ()).collect::<()>().await;
571
572 info!(tag = &with_line!(tag); "Central closed");
573 inner.write().central.take();
574 return Ok(());
575 }
576
577 pub fn set_central_proxy(inner: Arc<RwLock<InnerGattClientFacade>>) {
580 if inner.read().central.is_some() {
581 return;
582 }
583 let proxy = app::client::connect_to_protocol::<CentralMarker>()
584 .context("Failed to connect to BLE Central service.")
585 .unwrap();
586 let event_task = fasync::Task::spawn(GattClientFacade::central_event_task(inner.clone()));
587 inner.write().central = Some(Central { proxy, _event_task: event_task });
588 }
589
590 async fn connection_event_task(
591 inner: Arc<RwLock<InnerGattClientFacade>>,
592 mut connection_stream: ConnectionEventStream,
593 mut client_stream: ClientEventStream,
594 peer_id: PeerId,
595 ) {
596 let tag = "GattClientFacade::connection_event_task";
597 select! {
598 _ = connection_stream.next() => info!(tag = &with_line!(tag) ; "Connection to {} closed", peer_id),
599 _ = client_stream.next() => info!(tag = &with_line!(tag); "Client for {} closed", peer_id),
600 }
601 inner.write().clients.remove(&peer_id);
602 }
603
604 pub async fn connect_peripheral(&self, id: String) -> Result<(), Error> {
605 let tag = "GattClientFacade::connect_peripheral";
606 let peer_id = PeerId::from_str(&id)?;
607
608 if self.inner.read().clients.contains_key(&peer_id) {
609 info!(tag = &with_line!(tag); "Already connected to {}", peer_id);
610 return Ok(());
611 }
612
613 GattClientFacade::set_central_proxy(self.inner.clone());
614
615 let (conn_proxy, conn_server_end) = fidl::endpoints::create_proxy();
616 let options = ConnectionOptions { bondable_mode: Some(true), ..Default::default() };
617 self.inner
618 .read()
619 .central
620 .as_ref()
621 .unwrap()
622 .proxy
623 .connect(&peer_id.clone().into(), &options, conn_server_end)
624 .map_err(|_| format_err!("FIDL error when trying to connect()"))?;
625
626 let (client_proxy, client_server_end) = fidl::endpoints::create_proxy();
627 conn_proxy.request_gatt_client(client_server_end)?;
628
629 let events_task = fasync::Task::spawn(GattClientFacade::connection_event_task(
630 self.inner.clone(),
631 conn_proxy.take_event_stream(),
632 client_proxy.take_event_stream(),
633 peer_id.clone(),
634 ));
635
636 self.inner.write().clients.insert(
637 peer_id,
638 Client {
639 proxy: client_proxy,
640 _connection: conn_proxy,
641 services: HashMap::new(),
642 watch_services_task: None,
643 _events_task: events_task,
644 },
645 );
646
647 Ok(())
648 }
649
650 pub async fn disconnect_peripheral(&self, id: String) -> Result<(), Error> {
651 let peer_id = PeerId::from_str(&id)?;
652 self.inner.write().clients.remove(&peer_id);
653 Ok(())
654 }
655
656 pub fn get_central_proxy(&self) -> Option<CentralProxy> {
658 self.inner.read().central.as_ref().map(|c| c.proxy.clone())
659 }
660
661 fn get_remote_service_proxy(&self) -> Option<RemoteServiceProxy> {
662 self.inner.read().active_remote_service.as_ref().map(|s| s.proxy.clone())
663 }
664
665 pub fn get_scan_responses(&self) -> Vec<BleScanResponse> {
667 const EMPTY_DEVICE: &str = "";
668 let mut devices = Vec::new();
669 for (peer_id, peer) in &self.inner.read().scan_results {
670 let id = format!("{}", peer_id);
671 let name = peer.name.clone().unwrap_or_else(|| EMPTY_DEVICE.to_string());
672 let connectable = peer.connectable;
673 devices.push(BleScanResponse::new(id, name, connectable));
674 }
675 devices
676 }
677
678 pub fn print(&self) {
679 let tag = "GattClientFacade::print";
680 let inner = self.inner.read();
681 info!(
682 tag = &with_line!(tag);
683 "Central: {:?}, Active Service: {:?}, Scan Results: {:?}, Clients: {:?}",
684 inner.central,
685 inner.active_remote_service,
686 inner.scan_results,
687 inner.clients,
688 );
689 }
690
691 pub fn cleanup(&self) {
692 let mut inner = self.inner.write();
693 inner.active_remote_service = None;
694 inner.central = None;
695 inner.scan_results.clear();
696 inner.clients.clear();
697 inner.scan_task = None;
698 }
699}