Skip to main content

test_rfcomm_client/
lib.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Error, format_err};
6use bt_rfcomm::ServerChannel;
7use bt_rfcomm::profile::server_channel_from_protocol;
8use derivative::Derivative;
9use fidl_fuchsia_bluetooth as fidl_bt;
10use fidl_fuchsia_bluetooth_bredr as bredr;
11use fidl_fuchsia_bluetooth_rfcomm_test as rfcomm;
12use fuchsia_async as fasync;
13use fuchsia_bluetooth::profile::ProtocolDescriptor;
14use fuchsia_bluetooth::types::{Channel, PeerId, Uuid};
15use fuchsia_sync::Mutex;
16use futures::channel::mpsc;
17use futures::{SinkExt, StreamExt, select};
18use log::{info, warn};
19use profile_client::{ProfileClient, ProfileEvent};
20use std::cell::Cell;
21use std::collections::HashMap;
22use std::sync::Arc;
23
24/// The default buffer size for the mpsc channels used to relay user data packets to be sent to the
25/// remote peer.
26/// This value is arbitrarily chosen and should be enough to queue multiple buffers to be sent.
27const USER_DATA_BUFFER_SIZE: usize = 50;
28
29/// Valid SPP Service Definition - see SPP v1.2 Table 6.1.
30fn spp_service_definition() -> bredr::ServiceDefinition {
31    bredr::ServiceDefinition {
32        service_class_uuids: Some(vec![
33            Uuid::new16(bredr::ServiceClassProfileIdentifier::SerialPort.into_primitive()).into(),
34        ]),
35        protocol_descriptor_list: Some(vec![
36            bredr::ProtocolDescriptor {
37                protocol: Some(bredr::ProtocolIdentifier::L2Cap),
38                params: Some(vec![]),
39                ..Default::default()
40            },
41            bredr::ProtocolDescriptor {
42                protocol: Some(bredr::ProtocolIdentifier::Rfcomm),
43                params: Some(vec![]),
44                ..Default::default()
45            },
46        ]),
47        profile_descriptors: Some(vec![bredr::ProfileDescriptor {
48            profile_id: Some(bredr::ServiceClassProfileIdentifier::SerialPort),
49            major_version: Some(1),
50            minor_version: Some(2),
51            ..Default::default()
52        }]),
53        ..Default::default()
54    }
55}
56
57/// Manages the set of active RFCOMM channels connected to a single remote peer.
58#[derive(Debug)]
59pub struct RfcommSession {
60    /// Unique id assigned to the remote peer.
61    _id: PeerId,
62    /// The set of active RFCOMM channels.
63    active_channels: HashMap<ServerChannel, mpsc::Sender<Vec<u8>>>,
64}
65
66impl RfcommSession {
67    fn new(id: PeerId) -> Self {
68        Self { _id: id, active_channels: HashMap::new() }
69    }
70
71    fn is_active(&self, server_channel: &ServerChannel) -> bool {
72        self.active_channels.get(server_channel).is_some_and(|s| !s.is_closed())
73    }
74
75    fn close_rfcomm_channel(&mut self, server_channel: &ServerChannel) -> bool {
76        self.active_channels.remove(server_channel).is_some()
77    }
78
79    fn new_rfcomm_channel(&mut self, server_channel: ServerChannel, channel: Channel) {
80        if self.is_active(&server_channel) {
81            info!("Overwriting existing RFCOMM channel: {:?}", server_channel);
82        }
83
84        let (sender, receiver) = mpsc::channel(USER_DATA_BUFFER_SIZE);
85        fasync::Task::spawn(Self::rfcomm_channel_task(server_channel, channel, receiver)).detach();
86        let _ = self.active_channels.insert(server_channel, sender);
87    }
88
89    /// Processes data received from the remote peer over the provided RFCOMM `channel`.
90    /// Processes data in the `write_requests` queue to be sent to the remote peer.
91    async fn rfcomm_channel_task(
92        server_channel: ServerChannel,
93        mut channel: Channel,
94        mut write_requests: mpsc::Receiver<Vec<u8>>,
95    ) {
96        info!("Starting processing task for RFCOMM channel: {:?}", server_channel);
97        loop {
98            select! {
99                // The `fuse()` call is in the loop because `channel` is both borrowed as a stream
100                // and used to send data. It is safe because once `channel` is closed, the loop will
101                // break and `channel.next()` will never be polled thereafter.
102                bytes_from_peer = channel.next() => {
103                    let user_data = match bytes_from_peer {
104                        Some(Ok(bytes)) => bytes,
105                        Some(Err(e)) => {
106                            info!("Error receiving data: {:?}", e);
107                            continue;
108                        }
109                        None => {
110                            // RFCOMM channel closed by the peer.
111                            info!("Peer closed RFCOMM channel {:?}", server_channel);
112                            break;
113                        }
114                    };
115                    info!("{:?}: Received user data from peer: {:?}", server_channel, user_data);
116                }
117                bytes_to_peer = write_requests.next() => {
118                    match bytes_to_peer {
119                        Some(bytes) => {
120                            match channel.send(bytes).await {
121                                Ok(()) => info!("Sent user data over RFCOMM channel ({:?}).", server_channel),
122                                Err(e) => info!("Couldn't send user data for channel ({:?}): {:?}", server_channel, e),
123                            }
124                        }
125                        None => break, // RFCOMM channel closed by us.
126                    }
127                }
128                complete => break,
129            }
130        }
131        info!("RFCOMM channel ({:?}) task ended", server_channel);
132    }
133
134    /// Sends the `user_data` buf to the peer that provides the service identified by the
135    /// `server_channel`. Returns the result of the send operation.
136    fn send_user_data(
137        &mut self,
138        server_channel: ServerChannel,
139        user_data: Vec<u8>,
140    ) -> Result<(), Error> {
141        if let Some(sender) = self.active_channels.get_mut(&server_channel) {
142            sender.try_send(user_data).map_err(|e| format_err!("{:?}", e))
143        } else {
144            Err(format_err!("No registered server channel"))
145        }
146    }
147}
148
149#[derive(Derivative, Default)]
150#[derivative(Debug)]
151pub struct RfcommState {
152    /// A task representing the RFCOMM service advertisement and search.
153    #[derivative(Debug = "ignore")]
154    service: Option<fasync::Task<()>>,
155    /// The set of active RFCOMM Sessions with remote peers.
156    active_sessions: HashMap<PeerId, RfcommSession>,
157}
158
159impl RfcommState {
160    fn new() -> Self {
161        Self { service: None, active_sessions: HashMap::new() }
162    }
163
164    fn get_active_session(&mut self, id: &PeerId) -> Option<&mut RfcommSession> {
165        match self.active_sessions.get_mut(id) {
166            None => {
167                info!("No active RFCOMM session with peer {}", id);
168                None
169            }
170            session => session,
171        }
172    }
173
174    fn clear_services(&mut self) {
175        if let Some(old_task) = self.service.take() {
176            info!("Clearing SPP service advertisement/search");
177            let _ = old_task.abort();
178        }
179        self.active_sessions.clear();
180    }
181
182    fn new_rfcomm_channel(&mut self, id: PeerId, server_channel: ServerChannel, channel: Channel) {
183        let _ = self
184            .active_sessions
185            .entry(id)
186            .or_insert_with(|| RfcommSession::new(id))
187            .new_rfcomm_channel(server_channel, channel);
188    }
189}
190
191#[derive(Derivative, Default)]
192#[derivative(Debug)]
193pub struct RfcommManager {
194    #[derivative(Debug = "ignore")]
195    profile: Cell<Option<bredr::ProfileProxy>>,
196    #[derivative(Debug = "ignore")]
197    rfcomm: Cell<Option<rfcomm::RfcommTestProxy>>,
198    inner: Arc<Mutex<RfcommState>>,
199}
200
201impl Clone for RfcommManager {
202    fn clone(&self) -> Self {
203        let profile = self.profile.take();
204        if let Some(p) = profile.as_ref() {
205            self.profile.set(Some(p.clone()));
206        }
207        let rfcomm = self.rfcomm.take();
208        if let Some(rf) = rfcomm.as_ref() {
209            self.rfcomm.set(Some(rf.clone()));
210        }
211        Self { profile: Cell::new(profile), rfcomm: Cell::new(rfcomm), inner: self.inner.clone() }
212    }
213}
214
215impl RfcommManager {
216    pub fn new() -> Result<Self, Error> {
217        Ok(Self::default())
218    }
219
220    pub fn from_proxy(profile: bredr::ProfileProxy, rfcomm: rfcomm::RfcommTestProxy) -> Self {
221        Self {
222            profile: Cell::new(Some(profile)),
223            rfcomm: Cell::new(Some(rfcomm)),
224            inner: Arc::new(Mutex::new(RfcommState::new())),
225        }
226    }
227
228    pub fn clear_services(&self) {
229        self.inner.lock().clear_services();
230    }
231
232    fn get_profile_proxy(&self) -> Result<bredr::ProfileProxy, Error> {
233        let proxy = match self.profile.take() {
234            Some(proxy) => proxy,
235            None => fuchsia_component::client::connect_to_protocol::<bredr::ProfileMarker>()?,
236        };
237        self.profile.set(Some(proxy.clone()));
238        Ok(proxy)
239    }
240
241    fn get_rfcomm_test_proxy(&self) -> Result<rfcomm::RfcommTestProxy, Error> {
242        let proxy = match self.rfcomm.take() {
243            Some(proxy) => proxy,
244            None => fuchsia_component::client::connect_to_protocol::<rfcomm::RfcommTestMarker>()?,
245        };
246        self.rfcomm.set(Some(proxy.clone()));
247        Ok(proxy)
248    }
249
250    /// Advertises an SPP service and searches for other compatible SPP clients. Overwrites any
251    /// existing service advertisement & search.
252    pub fn advertise(&self) -> Result<(), Error> {
253        // Existing service must be unregistered before we can advertise again - this is to prevent
254        // clashes in `bredr.Profile` server.
255        self.clear_services();
256
257        let profile_proxy = self.get_profile_proxy()?;
258        let inner_clone = self.inner.clone();
259        let mut inner = self.inner.lock();
260
261        // Add an SPP advertisement & search.
262        let spp_service = vec![spp_service_definition()];
263        let mut client = ProfileClient::advertise(
264            profile_proxy,
265            spp_service,
266            fidl_bt::ChannelParameters::default(),
267        )?;
268        let _ = client.add_search(bredr::ServiceClassProfileIdentifier::SerialPort, None)?;
269        let service_task = fasync::Task::spawn(async move {
270            let result = Self::handle_profile_events(client, inner_clone).await;
271            info!("Profile event handler ended: {:?}", result);
272        });
273        inner.service = Some(service_task);
274        info!("Advertising and searching for SPP services");
275        Ok(())
276    }
277
278    /// Processes events from the `bredr.Profile` `client`.
279    async fn handle_profile_events(
280        mut client: ProfileClient,
281        state: Arc<Mutex<RfcommState>>,
282    ) -> Result<(), Error> {
283        while let Some(request) = client.next().await {
284            match request {
285                Ok(ProfileEvent::PeerConnected { id, protocol, channel, .. }) => {
286                    // Received an incoming connection request for our advertised service.
287                    let protocol = protocol
288                        .iter()
289                        .map(|p| ProtocolDescriptor::try_from(p))
290                        .collect::<Result<Vec<_>, _>>()?;
291                    let server_channel = server_channel_from_protocol(&protocol)
292                        .ok_or_else(|| format_err!("Not RFCOMM protocol"))?;
293
294                    // Spawn a processing task to handle read & writes over this RFCOMM channel.
295                    state.lock().new_rfcomm_channel(id, server_channel, channel);
296                    info!("Peer {} established RFCOMM Channel ({:?}) ", id, server_channel);
297                }
298                Ok(ProfileEvent::SearchResult { id, protocol, .. }) => {
299                    // Discovered a remote peer's service.
300                    let protocol = protocol
301                        .expect("Protocol should exist")
302                        .iter()
303                        .map(|p| ProtocolDescriptor::try_from(p))
304                        .collect::<Result<Vec<_>, _>>()?;
305                    let server_channel = server_channel_from_protocol(&protocol)
306                        .ok_or_else(|| format_err!("Not RFCOMM protocol"))?;
307                    info!("Found SPP service for {} with server channel: {:?}", id, server_channel);
308                }
309                Err(e) => warn!("Error in ProfileClient results: {:?}", e),
310            }
311        }
312        Ok(())
313    }
314
315    /// Terminates the RFCOMM session with the remote peer `id`.
316    pub fn close_session(&self, id: PeerId) -> Result<(), Error> {
317        // Send the disconnect request via the `RfcommTest` API and clean up local state.
318        let _ = self
319            .get_rfcomm_test_proxy()?
320            .disconnect(&id.into())
321            .map_err::<fidl::Error, _>(Into::into)?;
322
323        let mut inner = self.inner.lock();
324        if let Some(session) = inner.active_sessions.remove(&id) {
325            drop(session);
326        }
327        Ok(())
328    }
329
330    /// Closes the RFCOMM channel with the remote peer.
331    pub fn close_rfcomm_channel(
332        &self,
333        id: PeerId,
334        server_channel: ServerChannel,
335    ) -> Result<(), Error> {
336        let mut inner = self.inner.lock();
337        if let Some(session) = inner.get_active_session(&id) {
338            let _ = session.close_rfcomm_channel(&server_channel);
339            Ok(())
340        } else {
341            Err(format_err!("No RFCOMM session with peer: {:?}", id))
342        }
343    }
344
345    /// Makes an outgoing RFCOMM channel to the remote peer.
346    pub async fn outgoing_rfcomm_channel(
347        &self,
348        id: PeerId,
349        server_channel: ServerChannel,
350    ) -> Result<(), Error> {
351        let channel = self
352            .get_profile_proxy()?
353            .connect(
354                &id.into(),
355                &bredr::ConnectParameters::Rfcomm(bredr::RfcommParameters {
356                    channel: Some(server_channel.into()),
357                    ..Default::default()
358                }),
359            )
360            .await?
361            .map_err(|e| format_err!("{:?}", e))?;
362        let channel = Channel::try_from(channel).expect("valid channel");
363
364        self.inner.lock().new_rfcomm_channel(id, server_channel, channel);
365        Ok(())
366    }
367
368    /// Send a Remote Line Status update for the RFCOMM `server_channel` with peer `id`. Returns
369    /// Error if there is no such established RFCOMM channel with the peer.
370    pub fn send_rls(&self, id: PeerId, server_channel: ServerChannel) -> Result<(), Error> {
371        let rfcomm_test_proxy = self.get_rfcomm_test_proxy()?;
372        let mut inner = self.inner.lock();
373        if inner.get_active_session(&id).is_some() {
374            // Send a fixed Framing error status.
375            let status = rfcomm::Status::FramingError;
376            let _ = rfcomm_test_proxy
377                .remote_line_status(&id.into(), server_channel.into(), status)
378                .map_err::<fidl::Error, _>(Into::into)?;
379            Ok(())
380        } else {
381            Err(format_err!("No RFCOMM session with peer: {:?}", id))
382        }
383    }
384
385    /// Attempts to send user `data` to the remote peer `id`. Returns Error if there is no such
386    /// established RFCOMM channel with the peer.
387    pub fn send_user_data(
388        &self,
389        id: PeerId,
390        server_channel: ServerChannel,
391        data: Vec<u8>,
392    ) -> Result<(), Error> {
393        let mut inner = self.inner.lock();
394        if let Some(session) = inner.get_active_session(&id) {
395            session.send_user_data(server_channel, data)
396        } else {
397            Err(format_err!("No RFCOMM session with peer: {:?}", id))
398        }
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use assert_matches::assert_matches;
406    use async_utils::PollExt;
407    use bt_rfcomm::profile::build_rfcomm_protocol;
408    use fidl::endpoints::Proxy;
409    use fidl_fuchsia_bluetooth::ErrorCode;
410    use fidl_fuchsia_bluetooth_bredr::{ProfileMarker, ProfileRequestStream};
411    use fidl_fuchsia_bluetooth_rfcomm_test::{RfcommTestMarker, RfcommTestRequestStream};
412    use fixture::fixture;
413
414    type TestFixture = (RfcommManager, ProfileRequestStream, RfcommTestRequestStream);
415
416    async fn setup_rfcomm_mgr<F, Fut>(_name: &str, test: F)
417    where
418        F: FnOnce(TestFixture) -> Fut,
419        Fut: futures::Future<Output = ()>,
420    {
421        let (profile, profile_server) = fidl::endpoints::create_proxy_and_stream::<ProfileMarker>();
422        let (rfcomm_test, rfcomm_test_server) =
423            fidl::endpoints::create_proxy_and_stream::<RfcommTestMarker>();
424
425        let rfcomm_mgr = RfcommManager::from_proxy(profile, rfcomm_test);
426        test((rfcomm_mgr, profile_server, rfcomm_test_server)).await
427    }
428
429    async fn expect_data(remote: &mut Channel, expected_data: Vec<u8>) {
430        let read_result = remote.next().await.expect("data").expect("okay");
431        assert_eq!(read_result, expected_data);
432    }
433
434    async fn expect_advertisement_and_search(
435        profile: &mut ProfileRequestStream,
436    ) -> (
437        bredr::SearchResultsProxy,
438        (bredr::ConnectionReceiverProxy, bredr::ProfileAdvertiseResponder),
439    ) {
440        let mut search_request = None;
441        let mut advertisement = None;
442        while let Some(req) = profile.next().await {
443            match req {
444                Ok(bredr::ProfileRequest::Advertise { payload, responder, .. }) => {
445                    let connect_proxy = payload.receiver.unwrap().into_proxy();
446                    advertisement = Some((connect_proxy, responder));
447                }
448                Ok(bredr::ProfileRequest::Search { payload, .. }) => {
449                    search_request = Some(payload.results.unwrap().into_proxy())
450                }
451                x => panic!("Expected one Advertise and Search but got: {:?}", x),
452            }
453            if search_request.is_some() && advertisement.is_some() {
454                break;
455            }
456        }
457        (search_request.expect("just set"), advertisement.expect("just set"))
458    }
459
460    #[fixture(setup_rfcomm_mgr)]
461    #[fuchsia::test]
462    async fn initiate_rfcomm_channel_to_peer_is_ok(
463        (rfcomm_mgr, mut profile_server, mut rfcomm_test_server): TestFixture,
464    ) {
465        // Keep the `bredr.Profile` requests alive - one advertisement and search.
466        let _profile_requests = {
467            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
468            expect_advertisement_and_search(&mut profile_server).await
469        };
470
471        // Can establish RFCOMM channel to peer.
472        let remote_id = PeerId(123);
473        let random_channel_number = ServerChannel::try_from(5).unwrap();
474        let mut peer_channel = {
475            let ch_fut =
476                Box::pin(rfcomm_mgr.outgoing_rfcomm_channel(remote_id, random_channel_number));
477
478            let profile_fut = async {
479                match profile_server.next().await {
480                    Some(Ok(bredr::ProfileRequest::Connect { responder, .. })) => {
481                        let (left, right) = Channel::create();
482                        let _ = responder
483                            .send(left.try_into().map_err(|_e| ErrorCode::Failed))
484                            .unwrap();
485                        right
486                    }
487                    x => panic!("Expected connect request, got: {:?}", x),
488                }
489            };
490
491            match futures::future::join(ch_fut, profile_fut).await {
492                (Ok(_), channel) => channel,
493                x => panic!("Expected both futures to complete: {:?}", x),
494            }
495        };
496
497        // Sending data to the peer is ok.
498        let user_data = vec![0x98, 0x97, 0x96, 0x95];
499        {
500            assert_matches!(
501                rfcomm_mgr.send_user_data(remote_id, random_channel_number, user_data.clone()),
502                Ok(_)
503            );
504            expect_data(&mut peer_channel, user_data).await;
505        }
506
507        // Peer sends us data. It should be received gracefully and logged (nothing to test).
508        let buf = vec![0x99, 0x11, 0x44];
509        assert_matches!(peer_channel.send(buf).await, Ok(()));
510
511        // Test client can request to send an RLS update - should be received by RFCOMM Test server.
512        assert_matches!(rfcomm_mgr.send_rls(remote_id, random_channel_number), Ok(_));
513        match rfcomm_test_server.next().await.expect("valid fidl request") {
514            Ok(rfcomm::RfcommTestRequest::RemoteLineStatus { id, channel_number, .. }) => {
515                assert_eq!(id, remote_id.into());
516                assert_eq!(channel_number, u8::from(random_channel_number));
517            }
518            x => panic!("Expected RLS request but got: {:?}", x),
519        }
520    }
521
522    #[fixture(setup_rfcomm_mgr)]
523    #[fuchsia::test]
524    async fn peer_initiating_rfcomm_channel_is_delivered(
525        (rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
526    ) {
527        // Keep the `bredr.Profile` requests alive - one advertisement and search.
528        let (_search_proxy, (connect_proxy, _adv_fut)) = {
529            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
530            expect_advertisement_and_search(&mut profile_server).await
531        };
532
533        // Peer connects to us.
534        let remote_id = PeerId(8978);
535        let random_channel_number = ServerChannel::try_from(7).unwrap();
536        let (_peer_channel, local_channel) = Channel::create();
537        let protocol: Vec<bredr::ProtocolDescriptor> =
538            build_rfcomm_protocol(random_channel_number).iter().map(Into::into).collect();
539        assert_matches!(
540            connect_proxy.connected(
541                &remote_id.into(),
542                local_channel.try_into().unwrap(),
543                &protocol,
544            ),
545            Ok(_)
546        );
547    }
548
549    #[fixture(setup_rfcomm_mgr)]
550    #[fuchsia::test]
551    async fn disconnect_session_received_by_rfcomm_test(
552        (rfcomm_mgr, mut profile_server, mut rfcomm_test_server): TestFixture,
553    ) {
554        // Keep the `bredr.Profile` requests alive - one advertisement and search.
555        let _profile_requests = {
556            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
557            expect_advertisement_and_search(&mut profile_server).await
558        };
559
560        // Even though there are no active RFCOMM channels established, a client can still request
561        // to disconnect the session - expect it to be received.
562        let remote = PeerId(834);
563        assert_matches!(rfcomm_mgr.close_session(remote), Ok(_));
564
565        match rfcomm_test_server.next().await.expect("valid fidl request") {
566            Ok(rfcomm::RfcommTestRequest::Disconnect { id, .. }) if id == remote.into() => {}
567            x => panic!("Expected Disconnect request but got: {:?}", x),
568        }
569    }
570
571    #[fixture(setup_rfcomm_mgr)]
572    #[fuchsia::test]
573    async fn rls_update_before_established_channel_is_error(
574        (rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
575    ) {
576        // Keep the `bredr.Profile` requests alive - one advertisement and search.
577        let _profile_requests = {
578            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
579            expect_advertisement_and_search(&mut profile_server).await
580        };
581
582        // RLS updates pertain to a specific RFCOMM channel. Expect an error if an RLS request is
583        // sent for a non existent channel.
584        let remote = PeerId(222);
585        let random_channel_number = ServerChannel::try_from(9).unwrap();
586        assert_matches!(rfcomm_mgr.send_rls(remote, random_channel_number), Err(_));
587    }
588
589    #[fixture(setup_rfcomm_mgr)]
590    #[fuchsia::test]
591    async fn clear_services_unregisters_profile_requests(
592        (rfcomm_mgr, mut profile_server, _rfcomm_test_server): TestFixture,
593    ) {
594        // Keep the `bredr.Profile` requests alive - one advertisement and search.
595        let (search_proxy, (connect_proxy, _advertise_fut)) = {
596            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
597            expect_advertisement_and_search(&mut profile_server).await
598        };
599        assert!(!search_proxy.is_closed());
600        assert!(!connect_proxy.is_closed());
601
602        // Clearing services should unregister advertisement and search (transitively closing the
603        // FIDL channels).
604        // Note: Clearing `Profile` services cancels the fasync::Task processing the `bredr.Profile`
605        // requests. Per documentation of fasync::Task, there are no guarantees about the freeing
606        // of resources held by a Task. Therefore, we cannot assume `search_proxy` and
607        // `connect_proxy` will be closed immediately (but we do expect them to be freed eventually)
608        rfcomm_mgr.clear_services();
609
610        // Can register again.
611        let _profile = {
612            assert_matches!(rfcomm_mgr.advertise(), Ok(_));
613            expect_advertisement_and_search(&mut profile_server).await
614        };
615    }
616
617    #[fuchsia::test]
618    async fn rfcomm_session_task() {
619        let id = PeerId(999);
620        let mut session = RfcommSession::new(id);
621
622        let random_channel_number = ServerChannel::try_from(4).unwrap();
623        let (local, mut remote) = Channel::create();
624        session.new_rfcomm_channel(random_channel_number, local);
625
626        assert!(session.is_active(&random_channel_number));
627
628        let data = vec![0x00, 0x02, 0x04, 0x06, 0x08, 0x10];
629        let unregistered = ServerChannel::try_from(9).unwrap();
630        // Unregistered channel number is error.
631        assert_matches!(session.send_user_data(unregistered, data.clone()), Err(_));
632        // Sending is OK.
633        assert_matches!(session.send_user_data(random_channel_number, data.clone()), Ok(_));
634
635        // Should be received by remote.
636        expect_data(&mut remote, data).await;
637
638        // Can send multiple buffers.
639        let data1 = vec![0x09];
640        let data2 = vec![0x11];
641        assert_matches!(session.send_user_data(random_channel_number, data1.clone()), Ok(_));
642        assert_matches!(session.send_user_data(random_channel_number, data2.clone()), Ok(_));
643        expect_data(&mut remote, data1).await;
644        expect_data(&mut remote, data2).await;
645
646        // Local wants to close channel - should disconnect.
647        assert!(session.close_rfcomm_channel(&random_channel_number));
648        assert_matches!(remote.closed().await, Ok(_));
649
650        // Trying again is OK - nothing happens.
651        assert!(!session.close_rfcomm_channel(&random_channel_number));
652    }
653
654    #[fuchsia::test]
655    async fn second_channel_overwrites_first_in_rfcomm_session() {
656        let id = PeerId(78);
657        let mut session = RfcommSession::new(id);
658
659        let random_channel_number = ServerChannel::try_from(10).unwrap();
660        let (local1, remote1) = Channel::create();
661        session.new_rfcomm_channel(random_channel_number, local1);
662        assert!(session.is_active(&random_channel_number));
663
664        // Can create a new RFCOMM channel, this will overwrite the existing one.
665        let (local2, mut remote2) = Channel::create();
666        session.new_rfcomm_channel(random_channel_number, local2);
667        assert!(session.is_active(&random_channel_number));
668
669        assert_matches!(remote1.closed().await, Ok(_));
670
671        let data = vec![0x00, 0x02, 0x04, 0x06, 0x08, 0x10];
672        // Sending is OK - should be received by remote.
673        assert_matches!(session.send_user_data(random_channel_number, data.clone()), Ok(_));
674        expect_data(&mut remote2, data).await;
675    }
676
677    #[fuchsia::test]
678    fn closing_sender_closes_rfcomm_channel_task() {
679        let mut exec = fasync::TestExecutor::new();
680
681        let random_channel_number = ServerChannel::try_from(10).unwrap();
682        let (local, _remote) = Channel::create();
683        let (_sender, receiver) = mpsc::channel(0);
684
685        let mut channel_task =
686            Box::pin(RfcommSession::rfcomm_channel_task(random_channel_number, local, receiver));
687
688        exec.run_until_stalled(&mut channel_task).expect_pending("sender still active");
689
690        drop(_sender);
691        let _ = exec.run_until_stalled(&mut channel_task).expect("task should complete");
692    }
693
694    #[fuchsia::test]
695    fn closing_channel_closes_rfcomm_channel_task() {
696        let mut exec = fasync::TestExecutor::new();
697
698        let random_channel_number = ServerChannel::try_from(10).unwrap();
699        let (local, _remote) = Channel::create();
700        let (_sender, receiver) = mpsc::channel(0);
701
702        let mut channel_task =
703            Box::pin(RfcommSession::rfcomm_channel_task(random_channel_number, local, receiver));
704
705        exec.run_until_stalled(&mut channel_task).expect_pending("sender still active");
706
707        drop(_remote);
708        let _ = exec.run_until_stalled(&mut channel_task).expect("task should complete");
709    }
710}