1use anyhow::{Error, format_err};
6use bt_rfcomm::ServerChannel;
7use bt_rfcomm::profile::server_channel_from_protocol;
8use derivative::Derivative;
9use fidl_fuchsia_bluetooth as fidl_bt;
10use fidl_fuchsia_bluetooth_bredr as bredr;
11use fidl_fuchsia_bluetooth_rfcomm_test as rfcomm;
12use fuchsia_async as fasync;
13use fuchsia_bluetooth::profile::ProtocolDescriptor;
14use fuchsia_bluetooth::types::{Channel, PeerId, Uuid};
15use fuchsia_sync::Mutex;
16use futures::channel::mpsc;
17use futures::{SinkExt, StreamExt, select};
18use log::{info, warn};
19use profile_client::{ProfileClient, ProfileEvent};
20use std::cell::Cell;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24const USER_DATA_BUFFER_SIZE: usize = 50;
28
29fn spp_service_definition() -> bredr::ServiceDefinition {
31 bredr::ServiceDefinition {
32 service_class_uuids: Some(vec![
33 Uuid::new16(bredr::ServiceClassProfileIdentifier::SerialPort.into_primitive()).into(),
34 ]),
35 protocol_descriptor_list: Some(vec![
36 bredr::ProtocolDescriptor {
37 protocol: Some(bredr::ProtocolIdentifier::L2Cap),
38 params: Some(vec![]),
39 ..Default::default()
40 },
41 bredr::ProtocolDescriptor {
42 protocol: Some(bredr::ProtocolIdentifier::Rfcomm),
43 params: Some(vec![]),
44 ..Default::default()
45 },
46 ]),
47 profile_descriptors: Some(vec![bredr::ProfileDescriptor {
48 profile_id: Some(bredr::ServiceClassProfileIdentifier::SerialPort),
49 major_version: Some(1),
50 minor_version: Some(2),
51 ..Default::default()
52 }]),
53 ..Default::default()
54 }
55}
56
57#[derive(Debug)]
59pub struct RfcommSession {
60 _id: PeerId,
62 active_channels: HashMap<ServerChannel, mpsc::Sender<Vec<u8>>>,
64}
65
66impl RfcommSession {
67 fn new(id: PeerId) -> Self {
68 Self { _id: id, active_channels: HashMap::new() }
69 }
70
71 fn is_active(&self, server_channel: &ServerChannel) -> bool {
72 self.active_channels.get(server_channel).is_some_and(|s| !s.is_closed())
73 }
74
75 fn close_rfcomm_channel(&mut self, server_channel: &ServerChannel) -> bool {
76 self.active_channels.remove(server_channel).is_some()
77 }
78
79 fn new_rfcomm_channel(&mut self, server_channel: ServerChannel, channel: Channel) {
80 if self.is_active(&server_channel) {
81 info!("Overwriting existing RFCOMM channel: {:?}", server_channel);
82 }
83
84 let (sender, receiver) = mpsc::channel(USER_DATA_BUFFER_SIZE);
85 fasync::Task::spawn(Self::rfcomm_channel_task(server_channel, channel, receiver)).detach();
86 let _ = self.active_channels.insert(server_channel, sender);
87 }
88
89 async fn rfcomm_channel_task(
92 server_channel: ServerChannel,
93 mut channel: Channel,
94 mut write_requests: mpsc::Receiver<Vec<u8>>,
95 ) {
96 info!("Starting processing task for RFCOMM channel: {:?}", server_channel);
97 loop {
98 select! {
99 bytes_from_peer = channel.next() => {
103 let user_data = match bytes_from_peer {
104 Some(Ok(bytes)) => bytes,
105 Some(Err(e)) => {
106 info!("Error receiving data: {:?}", e);
107 continue;
108 }
109 None => {
110 info!("Peer closed RFCOMM channel {:?}", server_channel);
112 break;
113 }
114 };
115 info!("{:?}: Received user data from peer: {:?}", server_channel, user_data);
116 }
117 bytes_to_peer = write_requests.next() => {
118 match bytes_to_peer {
119 Some(bytes) => {
120 match channel.send(bytes).await {
121 Ok(()) => info!("Sent user data over RFCOMM channel ({:?}).", server_channel),
122 Err(e) => info!("Couldn't send user data for channel ({:?}): {:?}", server_channel, e),
123 }
124 }
125 None => break, }
127 }
128 complete => break,
129 }
130 }
131 info!("RFCOMM channel ({:?}) task ended", server_channel);
132 }
133
134 fn send_user_data(
137 &mut self,
138 server_channel: ServerChannel,
139 user_data: Vec<u8>,
140 ) -> Result<(), Error> {
141 if let Some(sender) = self.active_channels.get_mut(&server_channel) {
142 sender.try_send(user_data).map_err(|e| format_err!("{:?}", e))
143 } else {
144 Err(format_err!("No registered server channel"))
145 }
146 }
147}
148
149#[derive(Derivative, Default)]
150#[derivative(Debug)]
151pub struct RfcommState {
152 #[derivative(Debug = "ignore")]
154 service: Option<fasync::Task<()>>,
155 active_sessions: HashMap<PeerId, RfcommSession>,
157}
158
159impl RfcommState {
160 fn new() -> Self {
161 Self { service: None, active_sessions: HashMap::new() }
162 }
163
164 fn get_active_session(&mut self, id: &PeerId) -> Option<&mut RfcommSession> {
165 match self.active_sessions.get_mut(id) {
166 None => {
167 info!("No active RFCOMM session with peer {}", id);
168 None
169 }
170 session => session,
171 }
172 }
173
174 fn clear_services(&mut self) {
175 if let Some(old_task) = self.service.take() {
176 info!("Clearing SPP service advertisement/search");
177 let _ = old_task.abort();
178 }
179 self.active_sessions.clear();
180 }
181
182 fn new_rfcomm_channel(&mut self, id: PeerId, server_channel: ServerChannel, channel: Channel) {
183 let _ = self
184 .active_sessions
185 .entry(id)
186 .or_insert_with(|| RfcommSession::new(id))
187 .new_rfcomm_channel(server_channel, channel);
188 }
189}
190
191#[derive(Derivative, Default)]
192#[derivative(Debug)]
193pub struct RfcommManager {
194 #[derivative(Debug = "ignore")]
195 profile: Cell<Option<bredr::ProfileProxy>>,
196 #[derivative(Debug = "ignore")]
197 rfcomm: Cell<Option<rfcomm::RfcommTestProxy>>,
198 inner: Arc<Mutex<RfcommState>>,
199}
200
201impl Clone for RfcommManager {
202 fn clone(&self) -> Self {
203 let profile = self.profile.take();
204 if let Some(p) = profile.as_ref() {
205 self.profile.set(Some(p.clone()));
206 }
207 let rfcomm = self.rfcomm.take();
208 if let Some(rf) = rfcomm.as_ref() {
209 self.rfcomm.set(Some(rf.clone()));
210 }
211 Self { profile: Cell::new(profile), rfcomm: Cell::new(rfcomm), inner: self.inner.clone() }
212 }
213}
214
215impl RfcommManager {
216 pub fn new() -> Result<Self, Error> {
217 Ok(Self::default())
218 }
219
220 pub fn from_proxy(profile: bredr::ProfileProxy, rfcomm: rfcomm::RfcommTestProxy) -> Self {
221 Self {
222 profile: Cell::new(Some(profile)),
223 rfcomm: Cell::new(Some(rfcomm)),
224 inner: Arc::new(Mutex::new(RfcommState::new())),
225 }
226 }
227
228 pub fn clear_services(&self) {
229 self.inner.lock().clear_services();
230 }
231
232 fn get_profile_proxy(&self) -> Result<bredr::ProfileProxy, Error> {
233 let proxy = match self.profile.take() {
234 Some(proxy) => proxy,
235 None => fuchsia_component::client::connect_to_protocol::<bredr::ProfileMarker>()?,
236 };
237 self.profile.set(Some(proxy.clone()));
238 Ok(proxy)
239 }
240
241 fn get_rfcomm_test_proxy(&self) -> Result<rfcomm::RfcommTestProxy, Error> {
242 let proxy = match self.rfcomm.take() {
243 Some(proxy) => proxy,
244 None => fuchsia_component::client::connect_to_protocol::<rfcomm::RfcommTestMarker>()?,
245 };
246 self.rfcomm.set(Some(proxy.clone()));
247 Ok(proxy)
248 }
249
250 pub fn advertise(&self) -> Result<(), Error> {
253 self.clear_services();
256
257 let profile_proxy = self.get_profile_proxy()?;
258 let inner_clone = self.inner.clone();
259 let mut inner = self.inner.lock();
260
261 let spp_service = vec![spp_service_definition()];
263 let mut client = ProfileClient::advertise(
264 profile_proxy,
265 spp_service,
266 fidl_bt::ChannelParameters::default(),
267 )?;
268 let _ = client.add_search(bredr::ServiceClassProfileIdentifier::SerialPort, None)?;
269 let service_task = fasync::Task::spawn(async move {
270 let result = Self::handle_profile_events(client, inner_clone).await;
271 info!("Profile event handler ended: {:?}", result);
272 });
273 inner.service = Some(service_task);
274 info!("Advertising and searching for SPP services");
275 Ok(())
276 }
277
278 async fn handle_profile_events(
280 mut client: ProfileClient,
281 state: Arc<Mutex<RfcommState>>,
282 ) -> Result<(), Error> {
283 while let Some(request) = client.next().await {
284 match request {
285 Ok(ProfileEvent::PeerConnected { id, protocol, channel, .. }) => {
286 let protocol = protocol
288 .iter()
289 .map(|p| ProtocolDescriptor::try_from(p))
290 .collect::<Result<Vec<_>, _>>()?;
291 let server_channel = server_channel_from_protocol(&protocol)
292 .ok_or_else(|| format_err!("Not RFCOMM protocol"))?;
293
294 state.lock().new_rfcomm_channel(id, server_channel, channel);
296 info!("Peer {} established RFCOMM Channel ({:?}) ", id, server_channel);
297 }
298 Ok(ProfileEvent::SearchResult { id, protocol, .. }) => {
299 let protocol = protocol
301 .expect("Protocol should exist")
302 .iter()
303 .map(|p| ProtocolDescriptor::try_from(p))
304 .collect::<Result<Vec<_>, _>>()?;
305 let server_channel = server_channel_from_protocol(&protocol)
306 .ok_or_else(|| format_err!("Not RFCOMM protocol"))?;
307 info!("Found SPP service for {} with server channel: {:?}", id, server_channel);
308 }
309 Err(e) => warn!("Error in ProfileClient results: {:?}", e),
310 }
311 }
312 Ok(())
313 }
314
315 pub fn close_session(&self, id: PeerId) -> Result<(), Error> {
317 let _ = self
319 .get_rfcomm_test_proxy()?
320 .disconnect(&id.into())
321 .map_err::<fidl::Error, _>(Into::into)?;
322
323 let mut inner = self.inner.lock();
324 if let Some(session) = inner.active_sessions.remove(&id) {
325 drop(session);
326 }
327 Ok(())
328 }
329
330 pub fn close_rfcomm_channel(
332 &self,
333 id: PeerId,
334 server_channel: ServerChannel,
335 ) -> Result<(), Error> {
336 let mut inner = self.inner.lock();
337 if let Some(session) = inner.get_active_session(&id) {
338 let _ = session.close_rfcomm_channel(&server_channel);
339 Ok(())
340 } else {
341 Err(format_err!("No RFCOMM session with peer: {:?}", id))
342 }
343 }
344
345 pub async fn outgoing_rfcomm_channel(
347 &self,
348 id: PeerId,
349 server_channel: ServerChannel,
350 ) -> Result<(), Error> {
351 let channel = self
352 .get_profile_proxy()?
353 .connect(
354 &id.into(),
355 &bredr::ConnectParameters::Rfcomm(bredr::RfcommParameters {
356 channel: Some(server_channel.into()),
357 ..Default::default()
358 }),
359 )
360 .await?
361 .map_err(|e| format_err!("{:?}", e))?;
362 let channel = Channel::try_from(channel).expect("valid channel");
363
364 self.inner.lock().new_rfcomm_channel(id, server_channel, channel);
365 Ok(())
366 }
367
368 pub fn send_rls(&self, id: PeerId, server_channel: ServerChannel) -> Result<(), Error> {
371 let rfcomm_test_proxy = self.get_rfcomm_test_proxy()?;
372 let mut inner = self.inner.lock();
373 if inner.get_active_session(&id).is_some() {
374 let status = rfcomm::Status::FramingError;
376 let _ = rfcomm_test_proxy
377 .remote_line_status(&id.into(), server_channel.into(), status)
378 .map_err::<fidl::Error, _>(Into::into)?;
379 Ok(())
380 } else {
381 Err(format_err!("No RFCOMM session with peer: {:?}", id))
382 }
383 }
384
385 pub fn send_user_data(
388 &self,
389 id: PeerId,
390 server_channel: ServerChannel,
391 data: Vec<u8>,
392 ) -> Result<(), Error> {
393 let mut inner = self.inner.lock();
394 if let Some(session) = inner.get_active_session(&id) {
395 session.send_user_data(server_channel, data)
396 } else {
397 Err(format_err!("No RFCOMM session with peer: {:?}", id))
398 }
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use assert_matches::assert_matches;
406 use async_utils::PollExt;
407 use bt_rfcomm::profile::build_rfcomm_protocol;
408 use fidl::endpoints::Proxy;
409 use fidl_fuchsia_bluetooth::ErrorCode;
410 use fidl_fuchsia_bluetooth_bredr::{ProfileMarker, ProfileRequestStream};
411 use fidl_fuchsia_bluetooth_rfcomm_test::{RfcommTestMarker, RfcommTestRequestStream};
412 use fixture::fixture;
413
414 type TestFixture = (RfcommManager, ProfileRequestStream, RfcommTestRequestStream);
415
416 async fn setup_rfcomm_mgr<F, Fut>(_name: &str, test: F)
417 where
418 F: FnOnce(TestFixture) -> Fut,
419 Fut: futures::Future<Output = ()>,
420 {
421 let (profile, profile_server) = fidl::endpoints::create_proxy_and_stream::<ProfileMarker>();
422 let (rfcomm_test, rfcomm_test_server) =
423 fidl::endpoints::create_proxy_and_stream::<RfcommTestMarker>();
424
425 let rfcomm_mgr = RfcommManager::from_proxy(profile, rfcomm_test);
426 test((rfcomm_mgr, profile_server, rfcomm_test_server)).await
427 }
428
429 async fn expect_data(remote: &mut Channel, expected_data: Vec<u8>) {
430 let read_result = remote.next().await.expect("data").expect("okay");
431 assert_eq!(read_result, expected_data);
432 }
433
434 async fn expect_advertisement_and_search(
435 profile: &mut ProfileRequestStream,
436 ) -> (
437 bredr::SearchResultsProxy,
438 (bredr::ConnectionReceiverProxy, bredr::ProfileAdvertiseResponder),
439 ) {
440 let mut search_request = None;
441 let mut advertisement = None;
442 while let Some(req) = profile.next().await {
443 match req {
444 Ok(bredr::ProfileRequest::Advertise { payload, responder, .. }) => {
445 let connect_proxy = payload.receiver.unwrap().into_proxy();
446 advertisement = Some((connect_proxy, responder));
447 }
448 Ok(bredr::ProfileRequest::Search { payload, .. }) => {
449 search_request = Some(payload.results.unwrap().into_proxy())
450 }
451 x => panic!("Expected one Advertise and Search but got: {:?}", x),
452 }
453 if search_request.is_some() && advertisement.is_some() {
454 break;
455 }
456 }
457 (search_request.expect("just set"), advertisement.expect("just set"))
458 }
459
460 #[fixture(setup_rfcomm_mgr)]
461 #[fuchsia::test]
462 async fn initiate_rfcomm_channel_to_peer_is_ok(
463 (rfcomm_mgr, mut profile_server, mut rfcomm_test_server): TestFixture,
464 ) {
465 let _profile_requests = {
467 assert_matches!(rfcomm_mgr.advertise(), Ok(_));
468 expect_advertisement_and_search(&mut profile_server).await
469 };
470
471 let remote_id = PeerId(123);
473 let random_channel_number = ServerChannel::try_from(5).unwrap();
474 let mut peer_channel = {
475 let ch_fut =
476 Box::pin(rfcomm_mgr.outgoing_rfcomm_channel(remote_id, random_channel_number));
477
478 let profile_fut = async {
479 match profile_server.next().await {
480 Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => {
481 let (left, right) = Channel::create();
482 let _ = responder
483 .send(left.try_into().map_err(|_e| ErrorCode::Failed))
484 .unwrap();
485 right
486 }
487 x => panic!("Expected connect request, got: {:?}", x),
488 }
489 };
490
491 match futures::future::join(ch_fut, profile_fut).await {
492 (Ok(_), channel) => channel,
493 x => panic!("Expected both futures to complete: {:?}", x),
494 }
495 };
496
497 let user_data = vec![0x98, 0x97, 0x96, 0x95];
499 {
500 assert_matches!(
501 rfcomm_mgr.send_user_data(remote_id, random_channel_number, user_data.clone()),
502 Ok(_)
503 );
504 expect_data(&mut peer_channel, user_data).await;
505 }
506
507 let buf = vec![0x99, 0x11, 0x44];
509 assert_matches!(peer_channel.send(buf).await, Ok(()));
510
511 assert_matches!(rfcomm_mgr.send_rls(remote_id, random_channel_number), Ok(_));
513 match rfcomm_test_server.next().await.expect("valid fidl request") {
514 Ok(rfcomm::RfcommTestRequest::RemoteLineStatus { id, channel_number, .. }) => {
515 assert_eq!(id, remote_id.into());
516 assert_eq!(channel_number, u8::from(random_channel_number));
517 }
518 x => panic!("Expected RLS request but got: {:?}", x),
519 }
520 }
521
522 #[fixture(setup_rfcomm_mgr)]
523 #[fuchsia::test]
524 async fn peer_initiating_rfcomm_channel_is_delivered(
525 (rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
526 ) {
527 let (_search_proxy, (connect_proxy, _adv_fut)) = {
529 assert_matches!(rfcomm_mgr.advertise(), Ok(_));
530 expect_advertisement_and_search(&mut profile_server).await
531 };
532
533 let remote_id = PeerId(8978);
535 let random_channel_number = ServerChannel::try_from(7).unwrap();
536 let (_peer_channel, local_channel) = Channel::create();
537 let protocol: Vec<bredr::ProtocolDescriptor> =
538 build_rfcomm_protocol(random_channel_number).iter().map(Into::into).collect();
539 assert_matches!(
540 connect_proxy.connected(
541 &remote_id.into(),
542 local_channel.try_into().unwrap(),
543 &protocol,
544 ),
545 Ok(_)
546 );
547 }
548
549 #[fixture(setup_rfcomm_mgr)]
550 #[fuchsia::test]
551 async fn disconnect_session_received_by_rfcomm_test(
552 (rfcomm_mgr, mut profile_server, mut rfcomm_test_server): TestFixture,
553 ) {
554 let _profile_requests = {
556 assert_matches!(rfcomm_mgr.advertise(), Ok(_));
557 expect_advertisement_and_search(&mut profile_server).await
558 };
559
560 let remote = PeerId(834);
563 assert_matches!(rfcomm_mgr.close_session(remote), Ok(_));
564
565 match rfcomm_test_server.next().await.expect("valid fidl request") {
566 Ok(rfcomm::RfcommTestRequest::Disconnect { id, .. }) if id == remote.into() => {}
567 x => panic!("Expected Disconnect request but got: {:?}", x),
568 }
569 }
570
571 #[fixture(setup_rfcomm_mgr)]
572 #[fuchsia::test]
573 async fn rls_update_before_established_channel_is_error(
574 (rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
575 ) {
576 let _profile_requests = {
578 assert_matches!(rfcomm_mgr.advertise(), Ok(_));
579 expect_advertisement_and_search(&mut profile_server).await
580 };
581
582 let remote = PeerId(222);
585 let random_channel_number = ServerChannel::try_from(9).unwrap();
586 assert_matches!(rfcomm_mgr.send_rls(remote, random_channel_number), Err(_));
587 }
588
589 #[fixture(setup_rfcomm_mgr)]
590 #[fuchsia::test]
591 async fn clear_services_unregisters_profile_requests(
592 (rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
593 ) {
594 let (search_proxy, (connect_proxy, _advertise_fut)) = {
596 assert_matches!(rfcomm_mgr.advertise(), Ok(_));
597 expect_advertisement_and_search(&mut profile_server).await
598 };
599 assert!(!search_proxy.is_closed());
600 assert!(!connect_proxy.is_closed());
601
602 rfcomm_mgr.clear_services();
609
610 let _profile = {
612 assert_matches!(rfcomm_mgr.advertise(), Ok(_));
613 expect_advertisement_and_search(&mut profile_server).await
614 };
615 }
616
617 #[fuchsia::test]
618 async fn rfcomm_session_task() {
619 let id = PeerId(999);
620 let mut session = RfcommSession::new(id);
621
622 let random_channel_number = ServerChannel::try_from(4).unwrap();
623 let (local, mut remote) = Channel::create();
624 session.new_rfcomm_channel(random_channel_number, local);
625
626 assert!(session.is_active(&random_channel_number));
627
628 let data = vec![0x00, 0x02, 0x04, 0x06, 0x08, 0x10];
629 let unregistered = ServerChannel::try_from(9).unwrap();
630 assert_matches!(session.send_user_data(unregistered, data.clone()), Err(_));
632 assert_matches!(session.send_user_data(random_channel_number, data.clone()), Ok(_));
634
635 expect_data(&mut remote, data).await;
637
638 let data1 = vec![0x09];
640 let data2 = vec![0x11];
641 assert_matches!(session.send_user_data(random_channel_number, data1.clone()), Ok(_));
642 assert_matches!(session.send_user_data(random_channel_number, data2.clone()), Ok(_));
643 expect_data(&mut remote, data1).await;
644 expect_data(&mut remote, data2).await;
645
646 assert!(session.close_rfcomm_channel(&random_channel_number));
648 assert_matches!(remote.closed().await, Ok(_));
649
650 assert!(!session.close_rfcomm_channel(&random_channel_number));
652 }
653
654 #[fuchsia::test]
655 async fn second_channel_overwrites_first_in_rfcomm_session() {
656 let id = PeerId(78);
657 let mut session = RfcommSession::new(id);
658
659 let random_channel_number = ServerChannel::try_from(10).unwrap();
660 let (local1, remote1) = Channel::create();
661 session.new_rfcomm_channel(random_channel_number, local1);
662 assert!(session.is_active(&random_channel_number));
663
664 let (local2, mut remote2) = Channel::create();
666 session.new_rfcomm_channel(random_channel_number, local2);
667 assert!(session.is_active(&random_channel_number));
668
669 assert_matches!(remote1.closed().await, Ok(_));
670
671 let data = vec![0x00, 0x02, 0x04, 0x06, 0x08, 0x10];
672 assert_matches!(session.send_user_data(random_channel_number, data.clone()), Ok(_));
674 expect_data(&mut remote2, data).await;
675 }
676
677 #[fuchsia::test]
678 fn closing_sender_closes_rfcomm_channel_task() {
679 let mut exec = fasync::TestExecutor::new();
680
681 let random_channel_number = ServerChannel::try_from(10).unwrap();
682 let (local, _remote) = Channel::create();
683 let (_sender, receiver) = mpsc::channel(0);
684
685 let mut channel_task =
686 Box::pin(RfcommSession::rfcomm_channel_task(random_channel_number, local, receiver));
687
688 exec.run_until_stalled(&mut channel_task).expect_pending("sender still active");
689
690 drop(_sender);
691 let _ = exec.run_until_stalled(&mut channel_task).expect("task should complete");
692 }
693
694 #[fuchsia::test]
695 fn closing_channel_closes_rfcomm_channel_task() {
696 let mut exec = fasync::TestExecutor::new();
697
698 let random_channel_number = ServerChannel::try_from(10).unwrap();
699 let (local, _remote) = Channel::create();
700 let (_sender, receiver) = mpsc::channel(0);
701
702 let mut channel_task =
703 Box::pin(RfcommSession::rfcomm_channel_task(random_channel_number, local, receiver));
704
705 exec.run_until_stalled(&mut channel_task).expect_pending("sender still active");
706
707 drop(_remote);
708 let _ = exec.run_until_stalled(&mut channel_task).expect("task should complete");
709 }
710}