test_rfcomm_client/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{format_err, Error};
6use bt_rfcomm::profile::server_channel_from_protocol;
7use bt_rfcomm::ServerChannel;
8use derivative::Derivative;
9use fuchsia_bluetooth::profile::ProtocolDescriptor;
10use fuchsia_bluetooth::types::{Channel, PeerId, Uuid};
11use fuchsia_sync::Mutex;
12use futures::channel::mpsc;
13use futures::{select, StreamExt};
14use log::{info, warn};
15use profile_client::{ProfileClient, ProfileEvent};
16use std::cell::Cell;
17use std::collections::HashMap;
18use std::sync::Arc;
19use {
20    fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
21    fidl_fuchsia_bluetooth_rfcomm_test as rfcomm, fuchsia_async as fasync,
22};
23
24/// The default buffer size for the mpsc channels used to relay user data packets to be sent to the
25/// remote peer.
26/// This value is arbitrarily chosen and should be enough to queue multiple buffers to be sent.
27const USER_DATA_BUFFER_SIZE: usize = 50;
28
29/// Valid SPP Service Definition - see SPP v1.2 Table 6.1.
30fn spp_service_definition() -> bredr::ServiceDefinition {
31    bredr::ServiceDefinition {
32        service_class_uuids: Some(vec![Uuid::new16(
33            bredr::ServiceClassProfileIdentifier::SerialPort.into_primitive(),
34        )
35        .into()]),
36        protocol_descriptor_list: Some(vec![
37            bredr::ProtocolDescriptor {
38                protocol: Some(bredr::ProtocolIdentifier::L2Cap),
39                params: Some(vec![]),
40                ..Default::default()
41            },
42            bredr::ProtocolDescriptor {
43                protocol: Some(bredr::ProtocolIdentifier::Rfcomm),
44                params: Some(vec![]),
45                ..Default::default()
46            },
47        ]),
48        profile_descriptors: Some(vec![bredr::ProfileDescriptor {
49            profile_id: Some(bredr::ServiceClassProfileIdentifier::SerialPort),
50            major_version: Some(1),
51            minor_version: Some(2),
52            ..Default::default()
53        }]),
54        ..Default::default()
55    }
56}
57
58/// Manages the set of active RFCOMM channels connected to a single remote peer.
59#[derive(Debug)]
60pub struct RfcommSession {
61    /// Unique id assigned to the remote peer.
62    _id: PeerId,
63    /// The set of active RFCOMM channels.
64    active_channels: HashMap<ServerChannel, mpsc::Sender<Vec<u8>>>,
65}
66
67impl RfcommSession {
68    fn new(id: PeerId) -> Self {
69        Self { _id: id, active_channels: HashMap::new() }
70    }
71
72    fn is_active(&self, server_channel: &ServerChannel) -> bool {
73        self.active_channels.get(server_channel).is_some_and(|s| !s.is_closed())
74    }
75
76    fn close_rfcomm_channel(&mut self, server_channel: &ServerChannel) -> bool {
77        self.active_channels.remove(server_channel).is_some()
78    }
79
80    fn new_rfcomm_channel(&mut self, server_channel: ServerChannel, channel: Channel) {
81        if self.is_active(&server_channel) {
82            info!("Overwriting existing RFCOMM channel: {:?}", server_channel);
83        }
84
85        let (sender, receiver) = mpsc::channel(USER_DATA_BUFFER_SIZE);
86        fasync::Task::spawn(Self::rfcomm_channel_task(server_channel, channel, receiver)).detach();
87        let _ = self.active_channels.insert(server_channel, sender);
88    }
89
90    /// Processes data received from the remote peer over the provided RFCOMM `channel`.
91    /// Processes data in the `write_requests` queue to be sent to the remote peer.
92    async fn rfcomm_channel_task(
93        server_channel: ServerChannel,
94        mut channel: Channel,
95        mut write_requests: mpsc::Receiver<Vec<u8>>,
96    ) {
97        info!("Starting processing task for RFCOMM channel: {:?}", server_channel);
98        loop {
99            select! {
100                // The `fuse()` call is in the loop because `channel` is both borrowed as a stream
101                // and used to send data. It is safe because once `channel` is closed, the loop will
102                // break and `channel.next()` will never be polled thereafter.
103                bytes_from_peer = channel.next() => {
104                    let user_data = match bytes_from_peer {
105                        Some(Ok(bytes)) => bytes,
106                        Some(Err(e)) => {
107                            info!("Error receiving data: {:?}", e);
108                            continue;
109                        }
110                        None => {
111                            // RFCOMM channel closed by the peer.
112                            info!("Peer closed RFCOMM channel {:?}", server_channel);
113                            break;
114                        }
115                    };
116                    info!("{:?}: Received user data from peer: {:?}", server_channel, user_data);
117                }
118                bytes_to_peer = write_requests.next() => {
119                    match bytes_to_peer {
120                        Some(bytes) => {
121                            match channel.write(&bytes) {
122                                Ok(_) => info!("Sent user data over RFCOMM channel ({:?}).", server_channel),
123                                Err(e) => info!("Couldn't send user data for channel ({:?}): {:?}", server_channel, e),
124                            }
125                        }
126                        None => break, // RFCOMM channel closed by us.
127                    }
128                }
129                complete => break,
130            }
131        }
132        info!("RFCOMM channel ({:?}) task ended", server_channel);
133    }
134
135    /// Sends the `user_data` buf to the peer that provides the service identified by the
136    /// `server_channel`. Returns the result of the send operation.
137    fn send_user_data(
138        &mut self,
139        server_channel: ServerChannel,
140        user_data: Vec<u8>,
141    ) -> Result<(), Error> {
142        if let Some(sender) = self.active_channels.get_mut(&server_channel) {
143            sender.try_send(user_data).map_err(|e| format_err!("{:?}", e))
144        } else {
145            Err(format_err!("No registered server channel"))
146        }
147    }
148}
149
150#[derive(Derivative, Default)]
151#[derivative(Debug)]
152pub struct RfcommState {
153    /// A task representing the RFCOMM service advertisement and search.
154    #[derivative(Debug = "ignore")]
155    service: Option<fasync::Task<()>>,
156    /// The set of active RFCOMM Sessions with remote peers.
157    active_sessions: HashMap<PeerId, RfcommSession>,
158}
159
160impl RfcommState {
161    fn new() -> Self {
162        Self { service: None, active_sessions: HashMap::new() }
163    }
164
165    fn get_active_session(&mut self, id: &PeerId) -> Option<&mut RfcommSession> {
166        match self.active_sessions.get_mut(id) {
167            None => {
168                info!("No active RFCOMM session with peer {}", id);
169                None
170            }
171            session => session,
172        }
173    }
174
175    fn clear_services(&mut self) {
176        if let Some(old_task) = self.service.take() {
177            info!("Clearing SPP service advertisement/search");
178            let _ = old_task.cancel();
179        }
180        self.active_sessions.clear();
181    }
182
183    fn new_rfcomm_channel(&mut self, id: PeerId, server_channel: ServerChannel, channel: Channel) {
184        let _ = self
185            .active_sessions
186            .entry(id)
187            .or_insert_with(|| RfcommSession::new(id))
188            .new_rfcomm_channel(server_channel, channel);
189    }
190}
191
192#[derive(Derivative, Default)]
193#[derivative(Debug)]
194pub struct RfcommManager {
195    #[derivative(Debug = "ignore")]
196    profile: Cell<Option<bredr::ProfileProxy>>,
197    #[derivative(Debug = "ignore")]
198    rfcomm: Cell<Option<rfcomm::RfcommTestProxy>>,
199    inner: Arc<Mutex<RfcommState>>,
200}
201
202impl Clone for RfcommManager {
203    fn clone(&self) -> Self {
204        let profile = self.profile.take();
205        if let Some(p) = profile.as_ref() {
206            self.profile.set(Some(p.clone()));
207        }
208        let rfcomm = self.rfcomm.take();
209        if let Some(rf) = rfcomm.as_ref() {
210            self.rfcomm.set(Some(rf.clone()));
211        }
212        Self { profile: Cell::new(profile), rfcomm: Cell::new(rfcomm), inner: self.inner.clone() }
213    }
214}
215
216impl RfcommManager {
217    pub fn new() -> Result<Self, Error> {
218        Ok(Self::default())
219    }
220
221    pub fn from_proxy(profile: bredr::ProfileProxy, rfcomm: rfcomm::RfcommTestProxy) -> Self {
222        Self {
223            profile: Cell::new(Some(profile)),
224            rfcomm: Cell::new(Some(rfcomm)),
225            inner: Arc::new(Mutex::new(RfcommState::new())),
226        }
227    }
228
229    pub fn clear_services(&self) {
230        self.inner.lock().clear_services();
231    }
232
233    fn get_profile_proxy(&self) -> Result<bredr::ProfileProxy, Error> {
234        let proxy = match self.profile.take() {
235            Some(proxy) => proxy,
236            None => fuchsia_component::client::connect_to_protocol::<bredr::ProfileMarker>()?,
237        };
238        self.profile.set(Some(proxy.clone()));
239        Ok(proxy)
240    }
241
242    fn get_rfcomm_test_proxy(&self) -> Result<rfcomm::RfcommTestProxy, Error> {
243        let proxy = match self.rfcomm.take() {
244            Some(proxy) => proxy,
245            None => fuchsia_component::client::connect_to_protocol::<rfcomm::RfcommTestMarker>()?,
246        };
247        self.rfcomm.set(Some(proxy.clone()));
248        Ok(proxy)
249    }
250
251    /// Advertises an SPP service and searches for other compatible SPP clients. Overwrites any
252    /// existing service advertisement & search.
253    pub fn advertise(&self) -> Result<(), Error> {
254        // Existing service must be unregistered before we can advertise again - this is to prevent
255        // clashes in `bredr.Profile` server.
256        self.clear_services();
257
258        let profile_proxy = self.get_profile_proxy()?;
259        let inner_clone = self.inner.clone();
260        let mut inner = self.inner.lock();
261
262        // Add an SPP advertisement & search.
263        let spp_service = vec![spp_service_definition()];
264        let mut client = ProfileClient::advertise(
265            profile_proxy,
266            spp_service,
267            fidl_bt::ChannelParameters::default(),
268        )?;
269        let _ = client.add_search(bredr::ServiceClassProfileIdentifier::SerialPort, None)?;
270        let service_task = fasync::Task::spawn(async move {
271            let result = Self::handle_profile_events(client, inner_clone).await;
272            info!("Profile event handler ended: {:?}", result);
273        });
274        inner.service = Some(service_task);
275        info!("Advertising and searching for SPP services");
276        Ok(())
277    }
278
279    /// Processes events from the `bredr.Profile` `client`.
280    async fn handle_profile_events(
281        mut client: ProfileClient,
282        state: Arc<Mutex<RfcommState>>,
283    ) -> Result<(), Error> {
284        while let Some(request) = client.next().await {
285            match request {
286                Ok(ProfileEvent::PeerConnected { id, protocol, channel, .. }) => {
287                    // Received an incoming connection request for our advertised service.
288                    let protocol = protocol
289                        .iter()
290                        .map(|p| ProtocolDescriptor::try_from(p))
291                        .collect::<Result<Vec<_>, _>>()?;
292                    let server_channel = server_channel_from_protocol(&protocol)
293                        .ok_or_else(|| format_err!("Not RFCOMM protocol"))?;
294
295                    // Spawn a processing task to handle read & writes over this RFCOMM channel.
296                    state.lock().new_rfcomm_channel(id, server_channel, channel);
297                    info!("Peer {} established RFCOMM Channel ({:?}) ", id, server_channel);
298                }
299                Ok(ProfileEvent::SearchResult { id, protocol, .. }) => {
300                    // Discovered a remote peer's service.
301                    let protocol = protocol
302                        .expect("Protocol should exist")
303                        .iter()
304                        .map(|p| ProtocolDescriptor::try_from(p))
305                        .collect::<Result<Vec<_>, _>>()?;
306                    let server_channel = server_channel_from_protocol(&protocol)
307                        .ok_or_else(|| format_err!("Not RFCOMM protocol"))?;
308                    info!("Found SPP service for {} with server channel: {:?}", id, server_channel);
309                }
310                Err(e) => warn!("Error in ProfileClient results: {:?}", e),
311            }
312        }
313        Ok(())
314    }
315
316    /// Terminates the RFCOMM session with the remote peer `id`.
317    pub fn close_session(&self, id: PeerId) -> Result<(), Error> {
318        // Send the disconnect request via the `RfcommTest` API and clean up local state.
319        let _ = self
320            .get_rfcomm_test_proxy()?
321            .disconnect(&id.into())
322            .map_err::<fidl::Error, _>(Into::into)?;
323
324        let mut inner = self.inner.lock();
325        if let Some(session) = inner.active_sessions.remove(&id) {
326            drop(session);
327        }
328        Ok(())
329    }
330
331    /// Closes the RFCOMM channel with the remote peer.
332    pub fn close_rfcomm_channel(
333        &self,
334        id: PeerId,
335        server_channel: ServerChannel,
336    ) -> Result<(), Error> {
337        let mut inner = self.inner.lock();
338        if let Some(session) = inner.get_active_session(&id) {
339            let _ = session.close_rfcomm_channel(&server_channel);
340            Ok(())
341        } else {
342            Err(format_err!("No RFCOMM session with peer: {:?}", id))
343        }
344    }
345
346    /// Makes an outgoing RFCOMM channel to the remote peer.
347    pub async fn outgoing_rfcomm_channel(
348        &self,
349        id: PeerId,
350        server_channel: ServerChannel,
351    ) -> Result<(), Error> {
352        let channel = self
353            .get_profile_proxy()?
354            .connect(
355                &id.into(),
356                &bredr::ConnectParameters::Rfcomm(bredr::RfcommParameters {
357                    channel: Some(server_channel.into()),
358                    ..Default::default()
359                }),
360            )
361            .await?
362            .map_err(|e| format_err!("{:?}", e))?;
363        let channel = Channel::try_from(channel).expect("valid channel");
364
365        self.inner.lock().new_rfcomm_channel(id, server_channel, channel);
366        Ok(())
367    }
368
369    /// Send a Remote Line Status update for the RFCOMM `server_channel` with peer `id`. Returns
370    /// Error if there is no such established RFCOMM channel with the peer.
371    pub fn send_rls(&self, id: PeerId, server_channel: ServerChannel) -> Result<(), Error> {
372        let rfcomm_test_proxy = self.get_rfcomm_test_proxy()?;
373        let mut inner = self.inner.lock();
374        if inner.get_active_session(&id).is_some() {
375            // Send a fixed Framing error status.
376            let status = rfcomm::Status::FramingError;
377            let _ = rfcomm_test_proxy
378                .remote_line_status(&id.into(), server_channel.into(), status)
379                .map_err::<fidl::Error, _>(Into::into)?;
380            Ok(())
381        } else {
382            Err(format_err!("No RFCOMM session with peer: {:?}", id))
383        }
384    }
385
386    /// Attempts to send user `data` to the remote peer `id`. Returns Error if there is no such
387    /// established RFCOMM channel with the peer.
388    pub fn send_user_data(
389        &self,
390        id: PeerId,
391        server_channel: ServerChannel,
392        data: Vec<u8>,
393    ) -> Result<(), Error> {
394        let mut inner = self.inner.lock();
395        if let Some(session) = inner.get_active_session(&id) {
396            session.send_user_data(server_channel, data)
397        } else {
398            Err(format_err!("No RFCOMM session with peer: {:?}", id))
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use assert_matches::assert_matches;
407    use async_utils::PollExt;
408    use bt_rfcomm::profile::build_rfcomm_protocol;
409    use fidl::endpoints::Proxy;
410    use fidl_fuchsia_bluetooth::ErrorCode;
411    use fidl_fuchsia_bluetooth_bredr::{ProfileMarker, ProfileRequestStream};
412    use fidl_fuchsia_bluetooth_rfcomm_test::{RfcommTestMarker, RfcommTestRequestStream};
413    use fixture::fixture;
414
415    type TestFixture = (RfcommManager, ProfileRequestStream, RfcommTestRequestStream);
416
417    async fn setup_rfcomm_mgr<F, Fut>(_name: &str, test: F)
418    where
419        F: FnOnce(TestFixture) -> Fut,
420        Fut: futures::Future<Output = ()>,
421    {
422        let (profile, profile_server) = fidl::endpoints::create_proxy_and_stream::<ProfileMarker>();
423        let (rfcomm_test, rfcomm_test_server) =
424            fidl::endpoints::create_proxy_and_stream::<RfcommTestMarker>();
425
426        let rfcomm_mgr = RfcommManager::from_proxy(profile, rfcomm_test);
427        test((rfcomm_mgr, profile_server, rfcomm_test_server)).await
428    }
429
430    async fn expect_data(remote: &mut Channel, expected_data: Vec<u8>) {
431        let read_result = remote.next().await.expect("data").expect("okay");
432        assert_eq!(read_result, expected_data);
433    }
434
435    async fn expect_advertisement_and_search(
436        profile: &mut ProfileRequestStream,
437    ) -> (
438        bredr::SearchResultsProxy,
439        (bredr::ConnectionReceiverProxy, bredr::ProfileAdvertiseResponder),
440    ) {
441        let mut search_request = None;
442        let mut advertisement = None;
443        while let Some(req) = profile.next().await {
444            match req {
445                Ok(bredr::ProfileRequest::Advertise { payload, responder, .. }) => {
446                    let connect_proxy = payload.receiver.unwrap().into_proxy();
447                    advertisement = Some((connect_proxy, responder));
448                }
449                Ok(bredr::ProfileRequest::Search { payload, .. }) => {
450                    search_request = Some(payload.results.unwrap().into_proxy())
451                }
452                x => panic!("Expected one Advertise and Search but got: {:?}", x),
453            }
454            if search_request.is_some() && advertisement.is_some() {
455                break;
456            }
457        }
458        (search_request.expect("just set"), advertisement.expect("just set"))
459    }
460
461    #[fixture(setup_rfcomm_mgr)]
462    #[fuchsia::test]
463    async fn initiate_rfcomm_channel_to_peer_is_ok(
464        (rfcomm_mgr, mut profile_server, mut rfcomm_test_server): TestFixture,
465    ) {
466        // Keep the `bredr.Profile` requests alive - one advertisement and search.
467        let _profile_requests = {
468            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
469            expect_advertisement_and_search(&mut profile_server).await
470        };
471
472        // Can establish RFCOMM channel to peer.
473        let remote_id = PeerId(123);
474        let random_channel_number = ServerChannel::try_from(5).unwrap();
475        let mut peer_channel = {
476            let ch_fut =
477                Box::pin(rfcomm_mgr.outgoing_rfcomm_channel(remote_id, random_channel_number));
478
479            let profile_fut = async {
480                match profile_server.next().await {
481                    Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => {
482                        let (left, right) = Channel::create();
483                        let _ = responder
484                            .send(left.try_into().map_err(|_e| ErrorCode::Failed))
485                            .unwrap();
486                        right
487                    }
488                    x => panic!("Expected connect request, got: {:?}", x),
489                }
490            };
491
492            match futures::future::join(ch_fut, profile_fut).await {
493                (Ok(_), channel) => channel,
494                x => panic!("Expected both futures to complete: {:?}", x),
495            }
496        };
497
498        // Sending data to the peer is ok.
499        let user_data = vec![0x98, 0x97, 0x96, 0x95];
500        {
501            assert_matches!(
502                rfcomm_mgr.send_user_data(remote_id, random_channel_number, user_data.clone()),
503                Ok(_)
504            );
505            expect_data(&mut peer_channel, user_data).await;
506        }
507
508        // Peer sends us data. It should be received gracefully and logged (nothing to test).
509        let buf = vec![0x99, 0x11, 0x44];
510        assert_eq!(peer_channel.write(&buf), Ok(3));
511
512        // Test client can request to send an RLS update - should be received by RFCOMM Test server.
513        assert_matches!(rfcomm_mgr.send_rls(remote_id, random_channel_number), Ok(_));
514        match rfcomm_test_server.next().await.expect("valid fidl request") {
515            Ok(rfcomm::RfcommTestRequest::RemoteLineStatus { id, channel_number, .. }) => {
516                assert_eq!(id, remote_id.into());
517                assert_eq!(channel_number, u8::from(random_channel_number));
518            }
519            x => panic!("Expected RLS request but got: {:?}", x),
520        }
521    }
522
523    #[fixture(setup_rfcomm_mgr)]
524    #[fuchsia::test]
525    async fn peer_initiating_rfcomm_channel_is_delivered(
526        (rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
527    ) {
528        // Keep the `bredr.Profile` requests alive - one advertisement and search.
529        let (_search_proxy, (connect_proxy, _adv_fut)) = {
530            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
531            expect_advertisement_and_search(&mut profile_server).await
532        };
533
534        // Peer connects to us.
535        let remote_id = PeerId(8978);
536        let random_channel_number = ServerChannel::try_from(7).unwrap();
537        let (_peer_channel, local_channel) = Channel::create();
538        let protocol: Vec<bredr::ProtocolDescriptor> =
539            build_rfcomm_protocol(random_channel_number).iter().map(Into::into).collect();
540        assert_matches!(
541            connect_proxy.connected(
542                &remote_id.into(),
543                local_channel.try_into().unwrap(),
544                &protocol,
545            ),
546            Ok(_)
547        );
548    }
549
550    #[fixture(setup_rfcomm_mgr)]
551    #[fuchsia::test]
552    async fn disconnect_session_received_by_rfcomm_test(
553        (rfcomm_mgr, mut profile_server, mut rfcomm_test_server): TestFixture,
554    ) {
555        // Keep the `bredr.Profile` requests alive - one advertisement and search.
556        let _profile_requests = {
557            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
558            expect_advertisement_and_search(&mut profile_server).await
559        };
560
561        // Even though there are no active RFCOMM channels established, a client can still request
562        // to disconnect the session - expect it to be received.
563        let remote = PeerId(834);
564        assert_matches!(rfcomm_mgr.close_session(remote), Ok(_));
565
566        match rfcomm_test_server.next().await.expect("valid fidl request") {
567            Ok(rfcomm::RfcommTestRequest::Disconnect { id, .. }) if id == remote.into() => {}
568            x => panic!("Expected Disconnect request but got: {:?}", x),
569        }
570    }
571
572    #[fixture(setup_rfcomm_mgr)]
573    #[fuchsia::test]
574    async fn rls_update_before_established_channel_is_error(
575        (rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
576    ) {
577        // Keep the `bredr.Profile` requests alive - one advertisement and search.
578        let _profile_requests = {
579            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
580            expect_advertisement_and_search(&mut profile_server).await
581        };
582
583        // RLS updates pertain to a specific RFCOMM channel. Expect an error if an RLS request is
584        // sent for a non existent channel.
585        let remote = PeerId(222);
586        let random_channel_number = ServerChannel::try_from(9).unwrap();
587        assert_matches!(rfcomm_mgr.send_rls(remote, random_channel_number), Err(_));
588    }
589
590    #[fixture(setup_rfcomm_mgr)]
591    #[fuchsia::test]
592    async fn clear_services_unregisters_profile_requests(
593        (rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
594    ) {
595        // Keep the `bredr.Profile` requests alive - one advertisement and search.
596        let (search_proxy, (connect_proxy, _advertise_fut)) = {
597            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
598            expect_advertisement_and_search(&mut profile_server).await
599        };
600        assert!(!search_proxy.is_closed());
601        assert!(!connect_proxy.is_closed());
602
603        // Clearing services should unregister advertisement and search (transitively closing the
604        // FIDL channels).
605        // Note: Clearing `Profile` services cancels the fasync::Task processing the `bredr.Profile`
606        // requests. Per documentation of fasync::Task, there are no guarantees about the freeing
607        // of resources held by a Task. Therefore, we cannot assume `search_proxy` and
608        // `connect_proxy` will be closed immediately (but we do expect them to be freed eventually)
609        rfcomm_mgr.clear_services();
610
611        // Can register again.
612        let _profile = {
613            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
614            expect_advertisement_and_search(&mut profile_server).await
615        };
616    }
617
618    #[fuchsia::test]
619    async fn rfcomm_session_task() {
620        let id = PeerId(999);
621        let mut session = RfcommSession::new(id);
622
623        let random_channel_number = ServerChannel::try_from(4).unwrap();
624        let (local, mut remote) = Channel::create();
625        session.new_rfcomm_channel(random_channel_number, local);
626
627        assert!(session.is_active(&random_channel_number));
628
629        let data = vec![0x00, 0x02, 0x04, 0x06, 0x08, 0x10];
630        let unregistered = ServerChannel::try_from(9).unwrap();
631        // Unregistered channel number is error.
632        assert_matches!(session.send_user_data(unregistered, data.clone()), Err(_));
633        // Sending is OK.
634        assert_matches!(session.send_user_data(random_channel_number, data.clone()), Ok(_));
635
636        // Should be received by remote.
637        expect_data(&mut remote, data).await;
638
639        // Can send multiple buffers.
640        let data1 = vec![0x09];
641        let data2 = vec![0x11];
642        assert_matches!(session.send_user_data(random_channel_number, data1.clone()), Ok(_));
643        assert_matches!(session.send_user_data(random_channel_number, data2.clone()), Ok(_));
644        expect_data(&mut remote, data1).await;
645        expect_data(&mut remote, data2).await;
646
647        // Local wants to close channel - should disconnect.
648        assert!(session.close_rfcomm_channel(&random_channel_number));
649        assert_matches!(remote.closed().await, Ok(_));
650
651        // Trying again is OK - nothing happens.
652        assert!(!session.close_rfcomm_channel(&random_channel_number));
653    }
654
655    #[fuchsia::test]
656    async fn second_channel_overwrites_first_in_rfcomm_session() {
657        let id = PeerId(78);
658        let mut session = RfcommSession::new(id);
659
660        let random_channel_number = ServerChannel::try_from(10).unwrap();
661        let (local1, remote1) = Channel::create();
662        session.new_rfcomm_channel(random_channel_number, local1);
663        assert!(session.is_active(&random_channel_number));
664
665        // Can create a new RFCOMM channel, this will overwrite the existing one.
666        let (local2, mut remote2) = Channel::create();
667        session.new_rfcomm_channel(random_channel_number, local2);
668        assert!(session.is_active(&random_channel_number));
669
670        assert_matches!(remote1.closed().await, Ok(_));
671
672        let data = vec![0x00, 0x02, 0x04, 0x06, 0x08, 0x10];
673        // Sending is OK - should be received by remote.
674        assert_matches!(session.send_user_data(random_channel_number, data.clone()), Ok(_));
675        expect_data(&mut remote2, data).await;
676    }
677
678    #[fuchsia::test]
679    fn closing_sender_closes_rfcomm_channel_task() {
680        let mut exec = fasync::TestExecutor::new();
681
682        let random_channel_number = ServerChannel::try_from(10).unwrap();
683        let (local, _remote) = Channel::create();
684        let (_sender, receiver) = mpsc::channel(0);
685
686        let mut channel_task =
687            Box::pin(RfcommSession::rfcomm_channel_task(random_channel_number, local, receiver));
688
689        exec.run_until_stalled(&mut channel_task).expect_pending("sender still active");
690
691        drop(_sender);
692        let _ = exec.run_until_stalled(&mut channel_task).expect("task should complete");
693    }
694
695    #[fuchsia::test]
696    fn closing_channel_closes_rfcomm_channel_task() {
697        let mut exec = fasync::TestExecutor::new();
698
699        let random_channel_number = ServerChannel::try_from(10).unwrap();
700        let (local, _remote) = Channel::create();
701        let (_sender, receiver) = mpsc::channel(0);
702
703        let mut channel_task =
704            Box::pin(RfcommSession::rfcomm_channel_task(random_channel_number, local, receiver));
705
706        exec.run_until_stalled(&mut channel_task).expect_pending("sender still active");
707
708        drop(_remote);
709        let _ = exec.run_until_stalled(&mut channel_task).expect("task should complete");
710    }
711}