Skip to main content

fuchsia_bluetooth/types/
channel.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::endpoints::{ClientEnd, Proxy};
6use fidl_fuchsia_bluetooth as fidl_bt;
7use fidl_fuchsia_bluetooth_bredr as bredr;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::sink::Sink;
11use futures::stream::{FusedStream, Stream};
12use futures::{Future, TryFutureExt, io, ready};
13use log::{error, warn};
14use std::collections::VecDeque;
15use std::fmt;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use crate::error::Error;
21
22/// The Channel mode in use for a L2CAP channel.
23#[derive(PartialEq, Debug, Clone)]
24pub enum ChannelMode {
25    Basic,
26    EnhancedRetransmissionMode,
27    LeCreditBasedFlowControl,
28    EnhancedCreditBasedFlowControl,
29}
30
31impl fmt::Display for ChannelMode {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        match self {
34            ChannelMode::Basic => write!(f, "Basic"),
35            ChannelMode::EnhancedRetransmissionMode => write!(f, "ERTM"),
36            ChannelMode::LeCreditBasedFlowControl => write!(f, "LE_Credit"),
37            ChannelMode::EnhancedCreditBasedFlowControl => write!(f, "Credit"),
38        }
39    }
40}
41
42pub enum A2dpDirection {
43    Normal,
44    Source,
45    Sink,
46}
47
48impl From<A2dpDirection> for bredr::A2dpDirectionPriority {
49    fn from(pri: A2dpDirection) -> Self {
50        match pri {
51            A2dpDirection::Normal => bredr::A2dpDirectionPriority::Normal,
52            A2dpDirection::Source => bredr::A2dpDirectionPriority::Source,
53            A2dpDirection::Sink => bredr::A2dpDirectionPriority::Sink,
54        }
55    }
56}
57
58impl TryFrom<fidl_bt::ChannelMode> for ChannelMode {
59    type Error = Error;
60    fn try_from(fidl: fidl_bt::ChannelMode) -> Result<Self, Error> {
61        match fidl {
62            fidl_bt::ChannelMode::Basic => Ok(ChannelMode::Basic),
63            fidl_bt::ChannelMode::EnhancedRetransmission => {
64                Ok(ChannelMode::EnhancedRetransmissionMode)
65            }
66            fidl_bt::ChannelMode::LeCreditBasedFlowControl => {
67                Ok(ChannelMode::LeCreditBasedFlowControl)
68            }
69            fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl => {
70                Ok(ChannelMode::EnhancedCreditBasedFlowControl)
71            }
72            x => Err(Error::FailedConversion(format!("Unsupported channel mode type: {x:?}"))),
73        }
74    }
75}
76
77impl From<ChannelMode> for fidl_bt::ChannelMode {
78    fn from(x: ChannelMode) -> Self {
79        match x {
80            ChannelMode::Basic => fidl_bt::ChannelMode::Basic,
81            ChannelMode::EnhancedRetransmissionMode => fidl_bt::ChannelMode::EnhancedRetransmission,
82            ChannelMode::LeCreditBasedFlowControl => fidl_bt::ChannelMode::LeCreditBasedFlowControl,
83            ChannelMode::EnhancedCreditBasedFlowControl => {
84                fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl
85            }
86        }
87    }
88}
89
90/// A data channel to a remote Peer. Channels are the primary data transfer mechanism for
91/// Bluetooth profiles and protocols.
92/// Channel currently implements Deref<Target = Socket> to easily access the underlying
93/// socket, and also implements AsyncWrite using a forwarding implementation.
94#[derive(Debug)]
95pub struct Channel {
96    socket: fasync::Socket,
97    mode: ChannelMode,
98    max_tx_size: usize,
99    flush_timeout: Arc<Mutex<Option<zx::MonotonicDuration>>>,
100    audio_direction_ext: Option<bredr::AudioDirectionExtProxy>,
101    l2cap_parameters_ext: Option<bredr::L2capParametersExtProxy>,
102    audio_offload_ext: Option<bredr::AudioOffloadExtProxy>,
103    terminated: bool,
104    send_buffer: VecDeque<Vec<u8>>,
105}
106
107impl Channel {
108    const MAX_QUEUED_PACKETS: usize = 32;
109    /// Attempt to make a Channel from a zircon socket and a Maximum TX size received out of band.
110    /// Returns Err(status) if there is an error.
111    pub fn from_socket(socket: zx::Socket, max_tx_size: usize) -> Result<Self, zx::Status> {
112        Ok(Self::from_socket_infallible(socket, max_tx_size))
113    }
114
115    /// Make a Channel from a zircon socket and a Maximum TX size received out of band.
116    pub fn from_socket_infallible(socket: zx::Socket, max_tx_size: usize) -> Self {
117        Channel {
118            socket: fasync::Socket::from_socket(socket),
119            mode: ChannelMode::Basic,
120            max_tx_size,
121            flush_timeout: Arc::new(Mutex::new(None)),
122            audio_direction_ext: None,
123            l2cap_parameters_ext: None,
124            audio_offload_ext: None,
125            terminated: false,
126            send_buffer: VecDeque::with_capacity(Self::MAX_QUEUED_PACKETS),
127        }
128    }
129
130    /// The default max tx size is the default MTU size for L2CAP minus the channel header content.
131    /// See the Bluetooth Core Specification, Vol 3, Part A, Sec 5.1
132    pub const DEFAULT_MAX_TX: usize = 672;
133
134    /// Makes a pair of channels which are connected to each other, used commonly for testing.
135    /// The max_tx_size is set to `Channel::DEFAULT_MAX_TX`.
136    pub fn create() -> (Self, Self) {
137        Self::create_with_max_tx(Self::DEFAULT_MAX_TX)
138    }
139
140    /// Make a pair of channels which are connected to each other, used commonly for testing.
141    /// The maximum transmittable unit is taken from `max_tx_size`.
142    pub fn create_with_max_tx(max_tx_size: usize) -> (Self, Self) {
143        let (remote, local) = zx::Socket::create_datagram();
144        (
145            Channel::from_socket(remote, max_tx_size).unwrap(),
146            Channel::from_socket(local, max_tx_size).unwrap(),
147        )
148    }
149
150    /// The maximum transmittable size of a packet, in bytes.
151    /// Trying to send packets larger than this may cause the channel to be closed.
152    pub fn max_tx_size(&self) -> usize {
153        self.max_tx_size
154    }
155
156    pub fn channel_mode(&self) -> &ChannelMode {
157        &self.mode
158    }
159
160    pub fn flush_timeout(&self) -> Option<zx::MonotonicDuration> {
161        self.flush_timeout.lock().clone()
162    }
163
164    /// Returns a future which will set the audio priority of the channel.
165    /// The future will return Err if setting the priority is not supported.
166    pub fn set_audio_priority(
167        &self,
168        dir: A2dpDirection,
169    ) -> impl Future<Output = Result<(), Error>> + use<> {
170        let proxy = self.audio_direction_ext.clone();
171        async move {
172            match proxy {
173                None => return Err(Error::profile("audio priority not supported")),
174                Some(proxy) => proxy
175                    .set_priority(dir.into())
176                    .await?
177                    .map_err(|e| Error::profile(format!("setting priority failed: {e:?}"))),
178            }
179        }
180    }
181
182    /// Attempt to set the flush timeout for this channel.
183    /// If the timeout is not already set within 1ms of `duration`, we attempt to set it using the
184    /// L2cap parameter extension.
185    /// `duration` can be infinite to set packets flushable without a timeout.
186    /// Returns a future that when polled will set the flush timeout and return the new timeout,
187    /// or return an error setting the parameter is not supported.
188    pub fn set_flush_timeout(
189        &self,
190        duration: Option<zx::MonotonicDuration>,
191    ) -> impl Future<Output = Result<Option<zx::MonotonicDuration>, Error>> + use<> {
192        let flush_timeout = self.flush_timeout.clone();
193        let current = self.flush_timeout.lock().clone();
194        let proxy = self.l2cap_parameters_ext.clone();
195        async move {
196            match (current, duration) {
197                (None, None) => return Ok(None),
198                (Some(old), Some(new)) if (old - new).into_millis().abs() < 2 => {
199                    return Ok(current);
200                }
201                _ => {}
202            };
203            let proxy =
204                proxy.ok_or_else(|| Error::profile("l2cap parameter changing not supported"))?;
205            let parameters = fidl_bt::ChannelParameters {
206                flush_timeout: duration.clone().map(zx::MonotonicDuration::into_nanos),
207                ..Default::default()
208            };
209            let new_params = proxy.request_parameters(&parameters).await?;
210            let new_timeout = new_params.flush_timeout.map(zx::MonotonicDuration::from_nanos);
211            *(flush_timeout.lock()) = new_timeout.clone();
212            Ok(new_timeout)
213        }
214    }
215
216    /// Get a copy of the Audio Offload Proxy for this channel, if it exists
217    pub fn audio_offload(&self) -> Option<bredr::AudioOffloadExtProxy> {
218        self.audio_offload_ext.clone()
219    }
220
221    pub fn closed<'a>(&'a self) -> impl Future<Output = Result<(), zx::Status>> + 'a {
222        let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
223        let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
224        close_wait.map_ok(|_o| ())
225    }
226
227    pub fn is_closed<'a>(&'a self) -> bool {
228        self.socket.is_closed()
229    }
230
231    /// Write to the channel.  This will return zx::Status::SHOULD_WAIT if the
232    /// the channel is too full.
233    /// Prefer using the channel via Sink for asynchronous operations.
234    // TODO(b/499061686): remove to prefer async write.
235    pub fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
236        self.socket.as_ref().write(bytes)
237    }
238}
239
240impl TryFrom<fidl_fuchsia_bluetooth_bredr::Channel> for Channel {
241    type Error = zx::Status;
242
243    fn try_from(fidl: bredr::Channel) -> Result<Self, Self::Error> {
244        let channel = match fidl.channel_mode.unwrap_or(fidl_bt::ChannelMode::Basic).try_into() {
245            Err(e) => {
246                warn!("Unsupported channel mode type: {e:?}");
247                return Err(zx::Status::INTERNAL);
248            }
249            Ok(c) => c,
250        };
251
252        Ok(Self {
253            socket: fasync::Socket::from_socket(fidl.socket.ok_or(zx::Status::INVALID_ARGS)?),
254            mode: channel,
255            max_tx_size: fidl.max_tx_sdu_size.ok_or(zx::Status::INVALID_ARGS)? as usize,
256            flush_timeout: Arc::new(Mutex::new(
257                fidl.flush_timeout.map(zx::MonotonicDuration::from_nanos),
258            )),
259            audio_direction_ext: fidl.ext_direction.map(|e| e.into_proxy()),
260            l2cap_parameters_ext: fidl.ext_l2cap.map(|e| e.into_proxy()),
261            audio_offload_ext: fidl.ext_audio_offload.map(|c| c.into_proxy()),
262            terminated: false,
263            send_buffer: VecDeque::with_capacity(Self::MAX_QUEUED_PACKETS),
264        })
265    }
266}
267
268impl TryFrom<Channel> for bredr::Channel {
269    type Error = Error;
270
271    fn try_from(channel: Channel) -> Result<Self, Self::Error> {
272        let socket = channel.socket.into_zx_socket();
273        let ext_direction = channel
274            .audio_direction_ext
275            .map(|proxy| {
276                let chan = proxy.into_channel()?;
277                Ok(ClientEnd::new(chan.into()))
278            })
279            .transpose()
280            .map_err(|_: bredr::AudioDirectionExtProxy| {
281                Error::profile("AudioDirection proxy in use")
282            })?;
283        let ext_l2cap = channel
284            .l2cap_parameters_ext
285            .map(|proxy| {
286                let chan = proxy.into_channel()?;
287                Ok(ClientEnd::new(chan.into()))
288            })
289            .transpose()
290            .map_err(|_: bredr::L2capParametersExtProxy| {
291                Error::profile("l2cap parameters proxy in use")
292            })?;
293        let ext_audio_offload = channel
294            .audio_offload_ext
295            .map(|proxy| {
296                let chan = proxy.into_channel()?;
297                Ok(ClientEnd::new(chan.into()))
298            })
299            .transpose()
300            .map_err(|_: bredr::AudioOffloadExtProxy| {
301                Error::profile("audio offload proxy in use")
302            })?;
303        let flush_timeout = channel.flush_timeout.lock().map(zx::MonotonicDuration::into_nanos);
304        Ok(bredr::Channel {
305            socket: Some(socket),
306            channel_mode: Some(channel.mode.into()),
307            max_tx_sdu_size: Some(channel.max_tx_size as u16),
308            ext_direction,
309            flush_timeout,
310            ext_l2cap,
311            ext_audio_offload,
312            ..Default::default()
313        })
314    }
315}
316
317impl Stream for Channel {
318    type Item = Result<Vec<u8>, zx::Status>;
319
320    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
321        if self.terminated {
322            panic!("Channel polled after terminated");
323        }
324
325        let mut res = Vec::<u8>::new();
326        loop {
327            break match self.socket.poll_datagram(cx, &mut res) {
328                // TODO(https://fxbug.dev/42072274): Sometimes sockets return spirious 0 byte packets when polled.
329                // Try again.
330                Poll::Ready(Ok(0)) => continue,
331                Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
332                Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
333                    self.terminated = true;
334                    Poll::Ready(None)
335                }
336                Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
337                Poll::Pending => Poll::Pending,
338            };
339        }
340    }
341}
342
343impl FusedStream for Channel {
344    fn is_terminated(&self) -> bool {
345        self.terminated
346    }
347}
348
349// TODO(b/4144101870): remove once starnix side is migrated to use Stream instead of AsyncRead.
350impl io::AsyncRead for Channel {
351    fn poll_read(
352        mut self: Pin<&mut Self>,
353        cx: &mut Context<'_>,
354        buf: &mut [u8],
355    ) -> Poll<Result<usize, futures::io::Error>> {
356        Pin::new(&mut self.socket).as_mut().poll_read(cx, buf)
357    }
358}
359
360// TODO(b/4144101870): remove once starnix side is migrated to use Sink instead of AsyncWrite.
361impl io::AsyncWrite for Channel {
362    fn poll_write(
363        mut self: Pin<&mut Self>,
364        cx: &mut Context<'_>,
365        buf: &[u8],
366    ) -> Poll<Result<usize, io::Error>> {
367        Pin::new(&mut self.socket).as_mut().poll_write(cx, buf)
368    }
369
370    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
371        Pin::new(&mut self.socket).as_mut().poll_flush(cx)
372    }
373
374    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
375        Pin::new(&mut self.socket).as_mut().poll_close(cx)
376    }
377}
378
379impl Sink<Vec<u8>> for Channel {
380    type Error = zx::Status;
381
382    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
383        // Try to flush to make progress, but ignore pending results.
384        let _ = Sink::poll_flush(self.as_mut(), cx)?;
385
386        if self.send_buffer.len() >= Channel::MAX_QUEUED_PACKETS {
387            return Poll::Pending;
388        }
389        Poll::Ready(Ok(()))
390    }
391
392    fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
393        self.get_mut().send_buffer.push_back(item);
394        Ok(())
395    }
396
397    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
398        let this = self.get_mut();
399        use futures::io::AsyncWrite;
400        while let Some(item) = this.send_buffer.front() {
401            let res = Pin::new(&mut this.socket).poll_write(cx, item).map_err(zx::Status::from);
402            match res {
403                Poll::Ready(Ok(size)) => {
404                    if size == item.len() {
405                        let _ = this.send_buffer.pop_front();
406                    } else {
407                        error!(
408                            "Partial write in Channel::Sink::poll_flush: wrote {} bytes of {} byte packet.",
409                            size,
410                            item.len()
411                        );
412                        let item = this.send_buffer.front_mut().unwrap();
413                        *item = item.split_off(size);
414                    }
415                }
416                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
417                Poll::Pending => return Poll::Pending,
418            }
419        }
420        Pin::new(&mut this.socket).poll_flush(cx).map_err(zx::Status::from)
421    }
422
423    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
424        ready!(Sink::poll_flush(self.as_mut(), cx))?;
425        let this = self.get_mut();
426        use futures::io::AsyncWrite as _;
427        Pin::new(&mut this.socket).poll_close(cx).map_err(zx::Status::from)
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use fidl::endpoints::create_request_stream;
435    use futures::{AsyncReadExt, SinkExt, StreamExt};
436    use std::pin::pin;
437
438    #[test]
439    fn test_channel_create_and_write() {
440        let mut exec = fasync::TestExecutor::new();
441        let (mut recv, mut send) = Channel::create();
442
443        let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
444        let mut send_fut = send.send(heart.to_vec());
445        assert!(exec.run_until_stalled(&mut send_fut).is_ready());
446
447        let mut recv_fut = recv.next();
448        match exec.run_until_stalled(&mut recv_fut) {
449            Poll::Ready(Some(Ok(bytes))) => {
450                assert_eq!(heart, &bytes);
451            }
452            x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
453        };
454    }
455
456    #[test]
457    fn test_channel_from_fidl() {
458        let _exec = fasync::TestExecutor::new();
459        let empty = bredr::Channel::default();
460        assert!(Channel::try_from(empty).is_err());
461
462        let (remote, _local) = zx::Socket::create_datagram();
463
464        let okay = bredr::Channel {
465            socket: Some(remote),
466            channel_mode: Some(fidl_bt::ChannelMode::Basic),
467            max_tx_sdu_size: Some(1004),
468            ..Default::default()
469        };
470
471        let chan = Channel::try_from(okay).expect("okay channel to be converted");
472
473        assert_eq!(1004, chan.max_tx_size());
474        assert_eq!(&ChannelMode::Basic, chan.channel_mode());
475    }
476
477    #[test]
478    fn test_channel_closed() {
479        let mut exec = fasync::TestExecutor::new();
480
481        let (recv, send) = Channel::create();
482
483        let closed_fut = recv.closed();
484        let mut closed_fut = pin!(closed_fut);
485
486        assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
487        assert!(!recv.is_closed());
488
489        drop(send);
490
491        assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
492        assert!(recv.is_closed());
493    }
494
495    #[test]
496    fn test_direction_ext() {
497        let mut exec = fasync::TestExecutor::new();
498
499        let (remote, _local) = zx::Socket::create_datagram();
500        let no_ext = bredr::Channel {
501            socket: Some(remote),
502            channel_mode: Some(fidl_bt::ChannelMode::Basic),
503            max_tx_sdu_size: Some(1004),
504            ..Default::default()
505        };
506        let channel = Channel::try_from(no_ext).unwrap();
507
508        assert!(
509            exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal)).is_err()
510        );
511        assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
512
513        let (remote, _local) = zx::Socket::create_datagram();
514        let (client_end, mut direction_request_stream) =
515            create_request_stream::<bredr::AudioDirectionExtMarker>();
516        let ext = bredr::Channel {
517            socket: Some(remote),
518            channel_mode: Some(fidl_bt::ChannelMode::Basic),
519            max_tx_sdu_size: Some(1004),
520            ext_direction: Some(client_end),
521            ..Default::default()
522        };
523
524        let channel = Channel::try_from(ext).unwrap();
525
526        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
527        let mut audio_direction_fut = pin!(audio_direction_fut);
528
529        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
530
531        match exec.run_until_stalled(&mut direction_request_stream.next()) {
532            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
533                priority,
534                responder,
535            }))) => {
536                assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
537                responder.send(Ok(())).expect("response to send cleanly");
538            }
539            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
540        };
541
542        match exec.run_until_stalled(&mut audio_direction_fut) {
543            Poll::Ready(Ok(())) => {}
544            _x => panic!("Expected ok result from audio direction response"),
545        };
546
547        let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
548        let mut audio_direction_fut = pin!(audio_direction_fut);
549
550        assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
551
552        match exec.run_until_stalled(&mut direction_request_stream.next()) {
553            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
554                priority,
555                responder,
556            }))) => {
557                assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
558                responder
559                    .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
560                    .expect("response to send cleanly");
561            }
562            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
563        };
564
565        match exec.run_until_stalled(&mut audio_direction_fut) {
566            Poll::Ready(Err(_)) => {}
567            _x => panic!("Expected error result from audio direction response"),
568        };
569    }
570
571    #[test]
572    fn test_flush_timeout() {
573        let mut exec = fasync::TestExecutor::new();
574
575        let (remote, _local) = zx::Socket::create_datagram();
576        let no_ext = bredr::Channel {
577            socket: Some(remote),
578            channel_mode: Some(fidl_bt::ChannelMode::Basic),
579            max_tx_sdu_size: Some(1004),
580            flush_timeout: Some(50_000_000), // 50 milliseconds
581            ..Default::default()
582        };
583        let channel = Channel::try_from(no_ext).unwrap();
584
585        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
586
587        // Within 2 milliseconds, doesn't change.
588        let res = exec.run_singlethreaded(
589            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
590        );
591        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
592        let res = exec.run_singlethreaded(
593            channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
594        );
595        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
596
597        assert!(
598            exec.run_singlethreaded(
599                channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
600            )
601            .is_err()
602        );
603        assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
604
605        let (remote, _local) = zx::Socket::create_datagram();
606        let (client_end, mut l2cap_request_stream) =
607            create_request_stream::<bredr::L2capParametersExtMarker>();
608        let ext = bredr::Channel {
609            socket: Some(remote),
610            channel_mode: Some(fidl_bt::ChannelMode::Basic),
611            max_tx_sdu_size: Some(1004),
612            flush_timeout: None,
613            ext_l2cap: Some(client_end),
614            ..Default::default()
615        };
616
617        let channel = Channel::try_from(ext).unwrap();
618
619        {
620            let flush_timeout_fut = channel.set_flush_timeout(None);
621            let mut flush_timeout_fut = pin!(flush_timeout_fut);
622
623            // Requesting no change returns right away with no change.
624            match exec.run_until_stalled(&mut flush_timeout_fut) {
625                Poll::Ready(Ok(None)) => {}
626                x => panic!("Expected no flush timeout to not stall, got {:?}", x),
627            }
628        }
629
630        let req_duration = zx::MonotonicDuration::from_millis(42);
631
632        {
633            let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
634            let mut flush_timeout_fut = pin!(flush_timeout_fut);
635
636            assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
637
638            match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
639                Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
640                    request,
641                    responder,
642                }))) => {
643                    assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
644                    // Send a different response
645                    let params = fidl_bt::ChannelParameters {
646                        flush_timeout: Some(50_000_000), // 50ms
647                        ..Default::default()
648                    };
649                    responder.send(&params).expect("response to send cleanly");
650                }
651                x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
652            };
653
654            match exec.run_until_stalled(&mut flush_timeout_fut) {
655                Poll::Ready(Ok(Some(duration))) => {
656                    assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
657                }
658                x => panic!("Expected ready result from params response, got {:?}", x),
659            };
660        }
661
662        // Channel should have recorded the new flush timeout.
663        assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
664    }
665
666    #[test]
667    fn test_audio_offload() {
668        let _exec = fasync::TestExecutor::new();
669
670        let (remote, _local) = zx::Socket::create_datagram();
671        let no_ext = bredr::Channel {
672            socket: Some(remote),
673            channel_mode: Some(fidl_bt::ChannelMode::Basic),
674            max_tx_sdu_size: Some(1004),
675            ..Default::default()
676        };
677        let channel = Channel::try_from(no_ext).unwrap();
678
679        assert!(channel.audio_offload().is_none());
680
681        let (remote, _local) = zx::Socket::create_datagram();
682        let (client_end, mut _audio_offload_ext_req_stream) =
683            create_request_stream::<bredr::AudioOffloadExtMarker>();
684        let ext = bredr::Channel {
685            socket: Some(remote),
686            channel_mode: Some(fidl_bt::ChannelMode::Basic),
687            max_tx_sdu_size: Some(1004),
688            ext_audio_offload: Some(client_end),
689            ..Default::default()
690        };
691
692        let channel = Channel::try_from(ext).unwrap();
693
694        let offload_ext = channel.audio_offload();
695        assert!(offload_ext.is_some());
696        // We can get the audio offload multiple times without dropping
697        assert!(channel.audio_offload().is_some());
698        // And with dropping
699        drop(offload_ext);
700        assert!(channel.audio_offload().is_some());
701    }
702
703    // TODO(b/4144101870): remove once starnix side is migrated to use Stream instead of AsyncRead.
704    #[test]
705    fn channel_async_read() {
706        let mut exec = fasync::TestExecutor::new();
707        let (mut recv, send) = Channel::create();
708
709        // Test `read` with a datagram smaller than the read buffer.
710        let max_tx_size = recv.max_tx_size();
711        let mut read_buf = vec![0; max_tx_size];
712        let mut read_fut = recv.read(&mut read_buf[..]);
713
714        assert!(exec.run_until_stalled(&mut read_fut).is_pending());
715
716        let data = &[0x01, 0x02, 0x03, 0x04];
717        assert_eq!(data.len(), send.write(data).expect("should write successfully"));
718
719        // The read should complete, with the length of the datagram.
720        let read_len = match exec.run_until_stalled(&mut read_fut) {
721            Poll::Ready(Ok(read_len)) => read_len,
722            x => panic!("Expected successful read, got {x:?}"),
723        };
724        assert_eq!(read_len, data.len());
725        assert_eq!(&data[..], &read_buf[..data.len()]);
726
727        // Test `read` with a datagram that is larger than the read buffer.
728        let mut read_buf = [0; 4]; // buffer too small
729        let mut read_fut = recv.read(&mut read_buf);
730
731        let oversized_data = &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
732        assert_eq!(
733            oversized_data.len(),
734            send.write(oversized_data).expect("should write successfully")
735        );
736
737        // The read should complete, filling the buffer.
738        let read_len = match exec.run_until_stalled(&mut read_fut) {
739            Poll::Ready(Ok(read_len)) => read_len,
740            x => panic!("Expected successful read, got {x:?}"),
741        };
742        assert_eq!(read_len, read_buf.len());
743        assert_eq!(&oversized_data[..read_buf.len()], &read_buf[..]);
744
745        // The rest of the datagram should be discarded. A subsequent read should be pending.
746        let mut leftover_buf = [0; 1];
747        let mut leftover_fut = recv.read(&mut leftover_buf);
748        assert!(exec.run_until_stalled(&mut leftover_fut).is_pending());
749    }
750
751    #[test]
752    fn channel_sink() {
753        let mut exec = fasync::TestExecutor::new();
754        let (mut recv, mut send) = Channel::create();
755
756        let data = vec![0x01, 0x02, 0x03, 0x04];
757        let mut send_fut = send.send(data.clone());
758
759        // The send should complete immediately as the socket has space.
760        match exec.run_until_stalled(&mut send_fut) {
761            Poll::Ready(Ok(())) => {}
762            x => panic!("Expected Ready(Ok(())), got {:?}", x),
763        }
764
765        let mut recv_fut = recv.next();
766        match exec.run_until_stalled(&mut recv_fut) {
767            Poll::Ready(Some(Ok(bytes))) => assert_eq!(data, bytes),
768            x => panic!("Expected successful read, got {x:?}"),
769        }
770    }
771
772    #[test]
773    fn channel_stream() {
774        let mut exec = fasync::TestExecutor::new();
775        let (mut recv, send) = Channel::create();
776
777        let mut stream_fut = recv.next();
778
779        assert!(exec.run_until_stalled(&mut stream_fut).is_pending());
780
781        let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
782        assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
783
784        match exec.run_until_stalled(&mut stream_fut) {
785            Poll::Ready(Some(Ok(bytes))) => {
786                assert_eq!(heart.to_vec(), bytes);
787            }
788            x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
789        };
790
791        // After the sender is dropped, the stream should terminate.
792        drop(send);
793
794        let mut stream_fut = recv.next();
795        match exec.run_until_stalled(&mut stream_fut) {
796            Poll::Ready(None) => {}
797            x => panic!("Expected None from the stream after close, got {x:?}"),
798        }
799
800        // It should continue to report terminated.
801        assert!(recv.is_terminated());
802    }
803}