1use fidl::endpoints::{ClientEnd, Proxy};
6use fidl_fuchsia_bluetooth as fidl_bt;
7use fidl_fuchsia_bluetooth_bredr as bredr;
8use fuchsia_async as fasync;
9use fuchsia_sync::Mutex;
10use futures::sink::Sink;
11use futures::stream::{FusedStream, Stream};
12use futures::{Future, TryFutureExt, io, ready};
13use log::{error, warn};
14use std::collections::VecDeque;
15use std::fmt;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use crate::error::Error;
21
22#[derive(PartialEq, Debug, Clone)]
24pub enum ChannelMode {
25 Basic,
26 EnhancedRetransmissionMode,
27 LeCreditBasedFlowControl,
28 EnhancedCreditBasedFlowControl,
29}
30
31impl fmt::Display for ChannelMode {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 match self {
34 ChannelMode::Basic => write!(f, "Basic"),
35 ChannelMode::EnhancedRetransmissionMode => write!(f, "ERTM"),
36 ChannelMode::LeCreditBasedFlowControl => write!(f, "LE_Credit"),
37 ChannelMode::EnhancedCreditBasedFlowControl => write!(f, "Credit"),
38 }
39 }
40}
41
42pub enum A2dpDirection {
43 Normal,
44 Source,
45 Sink,
46}
47
48impl From<A2dpDirection> for bredr::A2dpDirectionPriority {
49 fn from(pri: A2dpDirection) -> Self {
50 match pri {
51 A2dpDirection::Normal => bredr::A2dpDirectionPriority::Normal,
52 A2dpDirection::Source => bredr::A2dpDirectionPriority::Source,
53 A2dpDirection::Sink => bredr::A2dpDirectionPriority::Sink,
54 }
55 }
56}
57
58impl TryFrom<fidl_bt::ChannelMode> for ChannelMode {
59 type Error = Error;
60 fn try_from(fidl: fidl_bt::ChannelMode) -> Result<Self, Error> {
61 match fidl {
62 fidl_bt::ChannelMode::Basic => Ok(ChannelMode::Basic),
63 fidl_bt::ChannelMode::EnhancedRetransmission => {
64 Ok(ChannelMode::EnhancedRetransmissionMode)
65 }
66 fidl_bt::ChannelMode::LeCreditBasedFlowControl => {
67 Ok(ChannelMode::LeCreditBasedFlowControl)
68 }
69 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl => {
70 Ok(ChannelMode::EnhancedCreditBasedFlowControl)
71 }
72 x => Err(Error::FailedConversion(format!("Unsupported channel mode type: {x:?}"))),
73 }
74 }
75}
76
77impl From<ChannelMode> for fidl_bt::ChannelMode {
78 fn from(x: ChannelMode) -> Self {
79 match x {
80 ChannelMode::Basic => fidl_bt::ChannelMode::Basic,
81 ChannelMode::EnhancedRetransmissionMode => fidl_bt::ChannelMode::EnhancedRetransmission,
82 ChannelMode::LeCreditBasedFlowControl => fidl_bt::ChannelMode::LeCreditBasedFlowControl,
83 ChannelMode::EnhancedCreditBasedFlowControl => {
84 fidl_bt::ChannelMode::EnhancedCreditBasedFlowControl
85 }
86 }
87 }
88}
89
90#[derive(Debug)]
95pub struct Channel {
96 socket: fasync::Socket,
97 mode: ChannelMode,
98 max_tx_size: usize,
99 flush_timeout: Arc<Mutex<Option<zx::MonotonicDuration>>>,
100 audio_direction_ext: Option<bredr::AudioDirectionExtProxy>,
101 l2cap_parameters_ext: Option<bredr::L2capParametersExtProxy>,
102 audio_offload_ext: Option<bredr::AudioOffloadExtProxy>,
103 terminated: bool,
104 send_buffer: VecDeque<Vec<u8>>,
105}
106
107impl Channel {
108 const MAX_QUEUED_PACKETS: usize = 32;
109 pub fn from_socket(socket: zx::Socket, max_tx_size: usize) -> Result<Self, zx::Status> {
112 Ok(Self::from_socket_infallible(socket, max_tx_size))
113 }
114
115 pub fn from_socket_infallible(socket: zx::Socket, max_tx_size: usize) -> Self {
117 Channel {
118 socket: fasync::Socket::from_socket(socket),
119 mode: ChannelMode::Basic,
120 max_tx_size,
121 flush_timeout: Arc::new(Mutex::new(None)),
122 audio_direction_ext: None,
123 l2cap_parameters_ext: None,
124 audio_offload_ext: None,
125 terminated: false,
126 send_buffer: VecDeque::with_capacity(Self::MAX_QUEUED_PACKETS),
127 }
128 }
129
130 pub const DEFAULT_MAX_TX: usize = 672;
133
134 pub fn create() -> (Self, Self) {
137 Self::create_with_max_tx(Self::DEFAULT_MAX_TX)
138 }
139
140 pub fn create_with_max_tx(max_tx_size: usize) -> (Self, Self) {
143 let (remote, local) = zx::Socket::create_datagram();
144 (
145 Channel::from_socket(remote, max_tx_size).unwrap(),
146 Channel::from_socket(local, max_tx_size).unwrap(),
147 )
148 }
149
150 pub fn max_tx_size(&self) -> usize {
153 self.max_tx_size
154 }
155
156 pub fn channel_mode(&self) -> &ChannelMode {
157 &self.mode
158 }
159
160 pub fn flush_timeout(&self) -> Option<zx::MonotonicDuration> {
161 self.flush_timeout.lock().clone()
162 }
163
164 pub fn set_audio_priority(
167 &self,
168 dir: A2dpDirection,
169 ) -> impl Future<Output = Result<(), Error>> + use<> {
170 let proxy = self.audio_direction_ext.clone();
171 async move {
172 match proxy {
173 None => return Err(Error::profile("audio priority not supported")),
174 Some(proxy) => proxy
175 .set_priority(dir.into())
176 .await?
177 .map_err(|e| Error::profile(format!("setting priority failed: {e:?}"))),
178 }
179 }
180 }
181
182 pub fn set_flush_timeout(
189 &self,
190 duration: Option<zx::MonotonicDuration>,
191 ) -> impl Future<Output = Result<Option<zx::MonotonicDuration>, Error>> + use<> {
192 let flush_timeout = self.flush_timeout.clone();
193 let current = self.flush_timeout.lock().clone();
194 let proxy = self.l2cap_parameters_ext.clone();
195 async move {
196 match (current, duration) {
197 (None, None) => return Ok(None),
198 (Some(old), Some(new)) if (old - new).into_millis().abs() < 2 => {
199 return Ok(current);
200 }
201 _ => {}
202 };
203 let proxy =
204 proxy.ok_or_else(|| Error::profile("l2cap parameter changing not supported"))?;
205 let parameters = fidl_bt::ChannelParameters {
206 flush_timeout: duration.clone().map(zx::MonotonicDuration::into_nanos),
207 ..Default::default()
208 };
209 let new_params = proxy.request_parameters(¶meters).await?;
210 let new_timeout = new_params.flush_timeout.map(zx::MonotonicDuration::from_nanos);
211 *(flush_timeout.lock()) = new_timeout.clone();
212 Ok(new_timeout)
213 }
214 }
215
216 pub fn audio_offload(&self) -> Option<bredr::AudioOffloadExtProxy> {
218 self.audio_offload_ext.clone()
219 }
220
221 pub fn closed<'a>(&'a self) -> impl Future<Output = Result<(), zx::Status>> + 'a {
222 let close_signals = zx::Signals::SOCKET_PEER_CLOSED;
223 let close_wait = fasync::OnSignals::new(&self.socket, close_signals);
224 close_wait.map_ok(|_o| ())
225 }
226
227 pub fn is_closed<'a>(&'a self) -> bool {
228 self.socket.is_closed()
229 }
230
231 pub fn write(&self, bytes: &[u8]) -> Result<usize, zx::Status> {
236 self.socket.as_ref().write(bytes)
237 }
238}
239
240impl TryFrom<fidl_fuchsia_bluetooth_bredr::Channel> for Channel {
241 type Error = zx::Status;
242
243 fn try_from(fidl: bredr::Channel) -> Result<Self, Self::Error> {
244 let channel = match fidl.channel_mode.unwrap_or(fidl_bt::ChannelMode::Basic).try_into() {
245 Err(e) => {
246 warn!("Unsupported channel mode type: {e:?}");
247 return Err(zx::Status::INTERNAL);
248 }
249 Ok(c) => c,
250 };
251
252 Ok(Self {
253 socket: fasync::Socket::from_socket(fidl.socket.ok_or(zx::Status::INVALID_ARGS)?),
254 mode: channel,
255 max_tx_size: fidl.max_tx_sdu_size.ok_or(zx::Status::INVALID_ARGS)? as usize,
256 flush_timeout: Arc::new(Mutex::new(
257 fidl.flush_timeout.map(zx::MonotonicDuration::from_nanos),
258 )),
259 audio_direction_ext: fidl.ext_direction.map(|e| e.into_proxy()),
260 l2cap_parameters_ext: fidl.ext_l2cap.map(|e| e.into_proxy()),
261 audio_offload_ext: fidl.ext_audio_offload.map(|c| c.into_proxy()),
262 terminated: false,
263 send_buffer: VecDeque::with_capacity(Self::MAX_QUEUED_PACKETS),
264 })
265 }
266}
267
268impl TryFrom<Channel> for bredr::Channel {
269 type Error = Error;
270
271 fn try_from(channel: Channel) -> Result<Self, Self::Error> {
272 let socket = channel.socket.into_zx_socket();
273 let ext_direction = channel
274 .audio_direction_ext
275 .map(|proxy| {
276 let chan = proxy.into_channel()?;
277 Ok(ClientEnd::new(chan.into()))
278 })
279 .transpose()
280 .map_err(|_: bredr::AudioDirectionExtProxy| {
281 Error::profile("AudioDirection proxy in use")
282 })?;
283 let ext_l2cap = channel
284 .l2cap_parameters_ext
285 .map(|proxy| {
286 let chan = proxy.into_channel()?;
287 Ok(ClientEnd::new(chan.into()))
288 })
289 .transpose()
290 .map_err(|_: bredr::L2capParametersExtProxy| {
291 Error::profile("l2cap parameters proxy in use")
292 })?;
293 let ext_audio_offload = channel
294 .audio_offload_ext
295 .map(|proxy| {
296 let chan = proxy.into_channel()?;
297 Ok(ClientEnd::new(chan.into()))
298 })
299 .transpose()
300 .map_err(|_: bredr::AudioOffloadExtProxy| {
301 Error::profile("audio offload proxy in use")
302 })?;
303 let flush_timeout = channel.flush_timeout.lock().map(zx::MonotonicDuration::into_nanos);
304 Ok(bredr::Channel {
305 socket: Some(socket),
306 channel_mode: Some(channel.mode.into()),
307 max_tx_sdu_size: Some(channel.max_tx_size as u16),
308 ext_direction,
309 flush_timeout,
310 ext_l2cap,
311 ext_audio_offload,
312 ..Default::default()
313 })
314 }
315}
316
317impl Stream for Channel {
318 type Item = Result<Vec<u8>, zx::Status>;
319
320 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
321 if self.terminated {
322 panic!("Channel polled after terminated");
323 }
324
325 let mut res = Vec::<u8>::new();
326 loop {
327 break match self.socket.poll_datagram(cx, &mut res) {
328 Poll::Ready(Ok(0)) => continue,
331 Poll::Ready(Ok(_size)) => Poll::Ready(Some(Ok(res))),
332 Poll::Ready(Err(zx::Status::PEER_CLOSED)) => {
333 self.terminated = true;
334 Poll::Ready(None)
335 }
336 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
337 Poll::Pending => Poll::Pending,
338 };
339 }
340 }
341}
342
343impl FusedStream for Channel {
344 fn is_terminated(&self) -> bool {
345 self.terminated
346 }
347}
348
349impl io::AsyncRead for Channel {
351 fn poll_read(
352 mut self: Pin<&mut Self>,
353 cx: &mut Context<'_>,
354 buf: &mut [u8],
355 ) -> Poll<Result<usize, futures::io::Error>> {
356 Pin::new(&mut self.socket).as_mut().poll_read(cx, buf)
357 }
358}
359
360impl io::AsyncWrite for Channel {
362 fn poll_write(
363 mut self: Pin<&mut Self>,
364 cx: &mut Context<'_>,
365 buf: &[u8],
366 ) -> Poll<Result<usize, io::Error>> {
367 Pin::new(&mut self.socket).as_mut().poll_write(cx, buf)
368 }
369
370 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
371 Pin::new(&mut self.socket).as_mut().poll_flush(cx)
372 }
373
374 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
375 Pin::new(&mut self.socket).as_mut().poll_close(cx)
376 }
377}
378
379impl Sink<Vec<u8>> for Channel {
380 type Error = zx::Status;
381
382 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
383 let _ = Sink::poll_flush(self.as_mut(), cx)?;
385
386 if self.send_buffer.len() >= Channel::MAX_QUEUED_PACKETS {
387 return Poll::Pending;
388 }
389 Poll::Ready(Ok(()))
390 }
391
392 fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
393 self.get_mut().send_buffer.push_back(item);
394 Ok(())
395 }
396
397 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
398 let this = self.get_mut();
399 use futures::io::AsyncWrite;
400 while let Some(item) = this.send_buffer.front() {
401 let res = Pin::new(&mut this.socket).poll_write(cx, item).map_err(zx::Status::from);
402 match res {
403 Poll::Ready(Ok(size)) => {
404 if size == item.len() {
405 let _ = this.send_buffer.pop_front();
406 } else {
407 error!(
408 "Partial write in Channel::Sink::poll_flush: wrote {} bytes of {} byte packet.",
409 size,
410 item.len()
411 );
412 let item = this.send_buffer.front_mut().unwrap();
413 *item = item.split_off(size);
414 }
415 }
416 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
417 Poll::Pending => return Poll::Pending,
418 }
419 }
420 Pin::new(&mut this.socket).poll_flush(cx).map_err(zx::Status::from)
421 }
422
423 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
424 ready!(Sink::poll_flush(self.as_mut(), cx))?;
425 let this = self.get_mut();
426 use futures::io::AsyncWrite as _;
427 Pin::new(&mut this.socket).poll_close(cx).map_err(zx::Status::from)
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use fidl::endpoints::create_request_stream;
435 use futures::{AsyncReadExt, SinkExt, StreamExt};
436 use std::pin::pin;
437
438 #[test]
439 fn test_channel_create_and_write() {
440 let mut exec = fasync::TestExecutor::new();
441 let (mut recv, mut send) = Channel::create();
442
443 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
444 let mut send_fut = send.send(heart.to_vec());
445 assert!(exec.run_until_stalled(&mut send_fut).is_ready());
446
447 let mut recv_fut = recv.next();
448 match exec.run_until_stalled(&mut recv_fut) {
449 Poll::Ready(Some(Ok(bytes))) => {
450 assert_eq!(heart, &bytes);
451 }
452 x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
453 };
454 }
455
456 #[test]
457 fn test_channel_from_fidl() {
458 let _exec = fasync::TestExecutor::new();
459 let empty = bredr::Channel::default();
460 assert!(Channel::try_from(empty).is_err());
461
462 let (remote, _local) = zx::Socket::create_datagram();
463
464 let okay = bredr::Channel {
465 socket: Some(remote),
466 channel_mode: Some(fidl_bt::ChannelMode::Basic),
467 max_tx_sdu_size: Some(1004),
468 ..Default::default()
469 };
470
471 let chan = Channel::try_from(okay).expect("okay channel to be converted");
472
473 assert_eq!(1004, chan.max_tx_size());
474 assert_eq!(&ChannelMode::Basic, chan.channel_mode());
475 }
476
477 #[test]
478 fn test_channel_closed() {
479 let mut exec = fasync::TestExecutor::new();
480
481 let (recv, send) = Channel::create();
482
483 let closed_fut = recv.closed();
484 let mut closed_fut = pin!(closed_fut);
485
486 assert!(exec.run_until_stalled(&mut closed_fut).is_pending());
487 assert!(!recv.is_closed());
488
489 drop(send);
490
491 assert!(exec.run_until_stalled(&mut closed_fut).is_ready());
492 assert!(recv.is_closed());
493 }
494
495 #[test]
496 fn test_direction_ext() {
497 let mut exec = fasync::TestExecutor::new();
498
499 let (remote, _local) = zx::Socket::create_datagram();
500 let no_ext = bredr::Channel {
501 socket: Some(remote),
502 channel_mode: Some(fidl_bt::ChannelMode::Basic),
503 max_tx_sdu_size: Some(1004),
504 ..Default::default()
505 };
506 let channel = Channel::try_from(no_ext).unwrap();
507
508 assert!(
509 exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Normal)).is_err()
510 );
511 assert!(exec.run_singlethreaded(channel.set_audio_priority(A2dpDirection::Sink)).is_err());
512
513 let (remote, _local) = zx::Socket::create_datagram();
514 let (client_end, mut direction_request_stream) =
515 create_request_stream::<bredr::AudioDirectionExtMarker>();
516 let ext = bredr::Channel {
517 socket: Some(remote),
518 channel_mode: Some(fidl_bt::ChannelMode::Basic),
519 max_tx_sdu_size: Some(1004),
520 ext_direction: Some(client_end),
521 ..Default::default()
522 };
523
524 let channel = Channel::try_from(ext).unwrap();
525
526 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Normal);
527 let mut audio_direction_fut = pin!(audio_direction_fut);
528
529 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
530
531 match exec.run_until_stalled(&mut direction_request_stream.next()) {
532 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
533 priority,
534 responder,
535 }))) => {
536 assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
537 responder.send(Ok(())).expect("response to send cleanly");
538 }
539 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
540 };
541
542 match exec.run_until_stalled(&mut audio_direction_fut) {
543 Poll::Ready(Ok(())) => {}
544 _x => panic!("Expected ok result from audio direction response"),
545 };
546
547 let audio_direction_fut = channel.set_audio_priority(A2dpDirection::Sink);
548 let mut audio_direction_fut = pin!(audio_direction_fut);
549
550 assert!(exec.run_until_stalled(&mut audio_direction_fut).is_pending());
551
552 match exec.run_until_stalled(&mut direction_request_stream.next()) {
553 Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
554 priority,
555 responder,
556 }))) => {
557 assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
558 responder
559 .send(Err(fidl_fuchsia_bluetooth::ErrorCode::Failed))
560 .expect("response to send cleanly");
561 }
562 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
563 };
564
565 match exec.run_until_stalled(&mut audio_direction_fut) {
566 Poll::Ready(Err(_)) => {}
567 _x => panic!("Expected error result from audio direction response"),
568 };
569 }
570
571 #[test]
572 fn test_flush_timeout() {
573 let mut exec = fasync::TestExecutor::new();
574
575 let (remote, _local) = zx::Socket::create_datagram();
576 let no_ext = bredr::Channel {
577 socket: Some(remote),
578 channel_mode: Some(fidl_bt::ChannelMode::Basic),
579 max_tx_sdu_size: Some(1004),
580 flush_timeout: Some(50_000_000), ..Default::default()
582 };
583 let channel = Channel::try_from(no_ext).unwrap();
584
585 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
586
587 let res = exec.run_singlethreaded(
589 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(49))),
590 );
591 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
592 let res = exec.run_singlethreaded(
593 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(51))),
594 );
595 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), res.expect("shouldn't error"));
596
597 assert!(
598 exec.run_singlethreaded(
599 channel.set_flush_timeout(Some(zx::MonotonicDuration::from_millis(200)))
600 )
601 .is_err()
602 );
603 assert!(exec.run_singlethreaded(channel.set_flush_timeout(None)).is_err());
604
605 let (remote, _local) = zx::Socket::create_datagram();
606 let (client_end, mut l2cap_request_stream) =
607 create_request_stream::<bredr::L2capParametersExtMarker>();
608 let ext = bredr::Channel {
609 socket: Some(remote),
610 channel_mode: Some(fidl_bt::ChannelMode::Basic),
611 max_tx_sdu_size: Some(1004),
612 flush_timeout: None,
613 ext_l2cap: Some(client_end),
614 ..Default::default()
615 };
616
617 let channel = Channel::try_from(ext).unwrap();
618
619 {
620 let flush_timeout_fut = channel.set_flush_timeout(None);
621 let mut flush_timeout_fut = pin!(flush_timeout_fut);
622
623 match exec.run_until_stalled(&mut flush_timeout_fut) {
625 Poll::Ready(Ok(None)) => {}
626 x => panic!("Expected no flush timeout to not stall, got {:?}", x),
627 }
628 }
629
630 let req_duration = zx::MonotonicDuration::from_millis(42);
631
632 {
633 let flush_timeout_fut = channel.set_flush_timeout(Some(req_duration));
634 let mut flush_timeout_fut = pin!(flush_timeout_fut);
635
636 assert!(exec.run_until_stalled(&mut flush_timeout_fut).is_pending());
637
638 match exec.run_until_stalled(&mut l2cap_request_stream.next()) {
639 Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
640 request,
641 responder,
642 }))) => {
643 assert_eq!(Some(req_duration.into_nanos()), request.flush_timeout);
644 let params = fidl_bt::ChannelParameters {
646 flush_timeout: Some(50_000_000), ..Default::default()
648 };
649 responder.send(¶ms).expect("response to send cleanly");
650 }
651 x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
652 };
653
654 match exec.run_until_stalled(&mut flush_timeout_fut) {
655 Poll::Ready(Ok(Some(duration))) => {
656 assert_eq!(zx::MonotonicDuration::from_millis(50), duration)
657 }
658 x => panic!("Expected ready result from params response, got {:?}", x),
659 };
660 }
661
662 assert_eq!(Some(zx::MonotonicDuration::from_millis(50)), channel.flush_timeout());
664 }
665
666 #[test]
667 fn test_audio_offload() {
668 let _exec = fasync::TestExecutor::new();
669
670 let (remote, _local) = zx::Socket::create_datagram();
671 let no_ext = bredr::Channel {
672 socket: Some(remote),
673 channel_mode: Some(fidl_bt::ChannelMode::Basic),
674 max_tx_sdu_size: Some(1004),
675 ..Default::default()
676 };
677 let channel = Channel::try_from(no_ext).unwrap();
678
679 assert!(channel.audio_offload().is_none());
680
681 let (remote, _local) = zx::Socket::create_datagram();
682 let (client_end, mut _audio_offload_ext_req_stream) =
683 create_request_stream::<bredr::AudioOffloadExtMarker>();
684 let ext = bredr::Channel {
685 socket: Some(remote),
686 channel_mode: Some(fidl_bt::ChannelMode::Basic),
687 max_tx_sdu_size: Some(1004),
688 ext_audio_offload: Some(client_end),
689 ..Default::default()
690 };
691
692 let channel = Channel::try_from(ext).unwrap();
693
694 let offload_ext = channel.audio_offload();
695 assert!(offload_ext.is_some());
696 assert!(channel.audio_offload().is_some());
698 drop(offload_ext);
700 assert!(channel.audio_offload().is_some());
701 }
702
703 #[test]
705 fn channel_async_read() {
706 let mut exec = fasync::TestExecutor::new();
707 let (mut recv, send) = Channel::create();
708
709 let max_tx_size = recv.max_tx_size();
711 let mut read_buf = vec![0; max_tx_size];
712 let mut read_fut = recv.read(&mut read_buf[..]);
713
714 assert!(exec.run_until_stalled(&mut read_fut).is_pending());
715
716 let data = &[0x01, 0x02, 0x03, 0x04];
717 assert_eq!(data.len(), send.write(data).expect("should write successfully"));
718
719 let read_len = match exec.run_until_stalled(&mut read_fut) {
721 Poll::Ready(Ok(read_len)) => read_len,
722 x => panic!("Expected successful read, got {x:?}"),
723 };
724 assert_eq!(read_len, data.len());
725 assert_eq!(&data[..], &read_buf[..data.len()]);
726
727 let mut read_buf = [0; 4]; let mut read_fut = recv.read(&mut read_buf);
730
731 let oversized_data = &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
732 assert_eq!(
733 oversized_data.len(),
734 send.write(oversized_data).expect("should write successfully")
735 );
736
737 let read_len = match exec.run_until_stalled(&mut read_fut) {
739 Poll::Ready(Ok(read_len)) => read_len,
740 x => panic!("Expected successful read, got {x:?}"),
741 };
742 assert_eq!(read_len, read_buf.len());
743 assert_eq!(&oversized_data[..read_buf.len()], &read_buf[..]);
744
745 let mut leftover_buf = [0; 1];
747 let mut leftover_fut = recv.read(&mut leftover_buf);
748 assert!(exec.run_until_stalled(&mut leftover_fut).is_pending());
749 }
750
751 #[test]
752 fn channel_sink() {
753 let mut exec = fasync::TestExecutor::new();
754 let (mut recv, mut send) = Channel::create();
755
756 let data = vec![0x01, 0x02, 0x03, 0x04];
757 let mut send_fut = send.send(data.clone());
758
759 match exec.run_until_stalled(&mut send_fut) {
761 Poll::Ready(Ok(())) => {}
762 x => panic!("Expected Ready(Ok(())), got {:?}", x),
763 }
764
765 let mut recv_fut = recv.next();
766 match exec.run_until_stalled(&mut recv_fut) {
767 Poll::Ready(Some(Ok(bytes))) => assert_eq!(data, bytes),
768 x => panic!("Expected successful read, got {x:?}"),
769 }
770 }
771
772 #[test]
773 fn channel_stream() {
774 let mut exec = fasync::TestExecutor::new();
775 let (mut recv, send) = Channel::create();
776
777 let mut stream_fut = recv.next();
778
779 assert!(exec.run_until_stalled(&mut stream_fut).is_pending());
780
781 let heart: &[u8] = &[0xF0, 0x9F, 0x92, 0x96];
782 assert_eq!(heart.len(), send.write(heart).expect("should write successfully"));
783
784 match exec.run_until_stalled(&mut stream_fut) {
785 Poll::Ready(Some(Ok(bytes))) => {
786 assert_eq!(heart.to_vec(), bytes);
787 }
788 x => panic!("Expected Some(Ok(bytes)) from the stream, got {x:?}"),
789 };
790
791 drop(send);
793
794 let mut stream_fut = recv.next();
795 match exec.run_until_stalled(&mut stream_fut) {
796 Poll::Ready(None) => {}
797 x => panic!("Expected None from the stream after close, got {x:?}"),
798 }
799
800 assert!(recv.is_terminated());
802 }
803}