bt_avdtp/
stream_endpoint.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::{DurationExt, Task, TimeoutExt};
6use fuchsia_bluetooth::types::{A2dpDirection, Channel};
7use fuchsia_sync::Mutex;
8use futures::stream::{FusedStream, Stream};
9use futures::{io, FutureExt};
10use log::warn;
11use std::fmt;
12use std::pin::Pin;
13use std::sync::{Arc, RwLock, Weak};
14use std::task::{Context, Poll};
15use zx::{MonotonicDuration, Status};
16
17use crate::types::{
18    EndpointType, Error, ErrorCode, MediaCodecType, MediaType, Result as AvdtpResult,
19    ServiceCapability, ServiceCategory, StreamEndpointId, StreamInformation,
20};
21use crate::{Peer, SimpleResponder};
22
23pub type StreamEndpointUpdateCallback = Box<dyn Fn(&StreamEndpoint) -> () + Sync + Send>;
24
25/// The state of a StreamEndpoint.
26#[derive(PartialEq, Debug, Default, Clone, Copy)]
27pub enum StreamState {
28    #[default]
29    Idle,
30    Configured,
31    // An Open command has been accepted, but streams have not been established yet.
32    Opening,
33    Open,
34    Streaming,
35    Closing,
36    Aborting,
37}
38
39/// An AVDTP StreamEndpoint. StreamEndpoints represent a particular capability of the application
40/// to be a source of sink of media. Included here to aid negotiating the stream connection.
41/// See Section 5.3 of the AVDTP 1.3 Specification for more information about the Stream Endpoint
42/// Architecture.
43pub struct StreamEndpoint {
44    /// Local stream endpoint id.  This should be unique per AVDTP Peer.
45    id: StreamEndpointId,
46    /// The type of endpoint this is (TSEP), Source or Sink.
47    endpoint_type: EndpointType,
48    /// The media type this stream represents.
49    media_type: MediaType,
50    /// Current state the stream is in. See Section 6.5 for an overview.
51    state: Arc<Mutex<StreamState>>,
52    /// The media transport channel
53    /// This should be Some(channel) when state is Open or Streaming.
54    transport: Option<Arc<RwLock<Channel>>>,
55    /// True when the MediaStream is held.
56    /// Prevents multiple threads from owning the media stream.
57    stream_held: Arc<Mutex<bool>>,
58    /// The capabilities of this endpoint.
59    capabilities: Vec<ServiceCapability>,
60    /// The remote stream endpoint id.  None if the stream has never been configured.
61    remote_id: Option<StreamEndpointId>,
62    /// The current configuration of this endpoint.  Empty if the stream has never been configured.
63    configuration: Vec<ServiceCapability>,
64    /// Callback that is run whenever the endpoint is updated
65    update_callback: Option<StreamEndpointUpdateCallback>,
66    /// In-progress task. This is only used for the Release procedure which places the state in Closing
67    /// and must wait for the peer to close transport channels.
68    in_progress: Option<Task<()>>,
69}
70
71impl fmt::Debug for StreamEndpoint {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("StreamEndpoint")
74            .field("id", &self.id.0)
75            .field("endpoint_type", &self.endpoint_type)
76            .field("media_type", &self.media_type)
77            .field("state", &self.state)
78            .field("capabilities", &self.capabilities)
79            .field("remote_id", &self.remote_id.as_ref().map(|id| id.to_string()))
80            .field("configuration", &self.configuration)
81            .finish()
82    }
83}
84
85impl StreamEndpoint {
86    /// Make a new StreamEndpoint.
87    /// |id| must be in the valid range for a StreamEndpointId (0x01 - 0x3E).
88    /// StreamEndpoints start in the Idle state.
89    pub fn new(
90        id: u8,
91        media_type: MediaType,
92        endpoint_type: EndpointType,
93        capabilities: Vec<ServiceCapability>,
94    ) -> AvdtpResult<StreamEndpoint> {
95        let seid = StreamEndpointId::try_from(id)?;
96        Ok(StreamEndpoint {
97            id: seid,
98            capabilities,
99            media_type,
100            endpoint_type,
101            state: Default::default(),
102            transport: None,
103            stream_held: Arc::new(Mutex::new(false)),
104            remote_id: None,
105            configuration: vec![],
106            update_callback: None,
107            in_progress: None,
108        })
109    }
110
111    pub fn as_new(&self) -> Self {
112        StreamEndpoint::new(
113            self.id.0,
114            self.media_type.clone(),
115            self.endpoint_type.clone(),
116            self.capabilities.clone(),
117        )
118        .expect("as_new")
119    }
120
121    /// Set the state to the given value and run the `update_callback` afterwards
122    fn set_state(&mut self, state: StreamState) {
123        *self.state.lock() = state;
124        self.update_callback();
125    }
126
127    /// Pass update callback to StreamEndpoint that will be called anytime `StreamEndpoint` is
128    /// modified.
129    pub fn set_update_callback(&mut self, callback: Option<StreamEndpointUpdateCallback>) {
130        self.update_callback = callback;
131    }
132
133    fn update_callback(&self) {
134        if let Some(cb) = self.update_callback.as_ref() {
135            cb(self);
136        }
137    }
138
139    /// Build a new StreamEndpoint from a StreamInformation and associated Capabilities.
140    /// This makes it easy to build from AVDTP Discover and GetCapabilities procedures.
141    /// StreamEndpooints start in the Idle state.
142    pub fn from_info(
143        info: &StreamInformation,
144        capabilities: Vec<ServiceCapability>,
145    ) -> StreamEndpoint {
146        StreamEndpoint {
147            id: info.id().clone(),
148            capabilities,
149            media_type: info.media_type().clone(),
150            endpoint_type: info.endpoint_type().clone(),
151            state: Default::default(),
152            transport: None,
153            stream_held: Arc::new(Mutex::new(false)),
154            remote_id: None,
155            configuration: vec![],
156            update_callback: None,
157            in_progress: None,
158        }
159    }
160
161    /// Checks that the state is in the set of states.
162    /// If not, returns Err(ErrorCode::BadState).
163    fn state_is(&self, state: StreamState) -> Result<(), ErrorCode> {
164        (*self.state.lock() == state).then_some(()).ok_or(ErrorCode::BadState)
165    }
166
167    /// Attempt to Configure this stream using the capabilities given.
168    /// If the stream is not in an Idle state, fails with Err(InvalidState).
169    /// Used for the Stream Configuration procedure, see Section 6.9
170    pub fn configure(
171        &mut self,
172        remote_id: &StreamEndpointId,
173        capabilities: Vec<ServiceCapability>,
174    ) -> Result<(), (ServiceCategory, ErrorCode)> {
175        self.state_is(StreamState::Idle).map_err(|e| (ServiceCategory::None, e))?;
176        self.remote_id = Some(remote_id.clone());
177        for cap in &capabilities {
178            if !self
179                .capabilities
180                .iter()
181                .any(|y| std::mem::discriminant(cap) == std::mem::discriminant(y))
182            {
183                return Err((cap.category(), ErrorCode::UnsupportedConfiguration));
184            }
185        }
186        self.configuration = capabilities;
187        self.set_state(StreamState::Configured);
188        Ok(())
189    }
190
191    /// Attempt to reconfigure this stream with the capabilities given.  If any capability is not
192    /// valid to set, fails with the first such category and InvalidCapabilities If the stream is
193    /// not in the Open state, fails with Err((None, BadState)) Used for the Stream Reconfiguration
194    /// procedure, see Section 6.15.
195    pub fn reconfigure(
196        &mut self,
197        mut capabilities: Vec<ServiceCapability>,
198    ) -> Result<(), (ServiceCategory, ErrorCode)> {
199        self.state_is(StreamState::Open).map_err(|e| (ServiceCategory::None, e))?;
200        // Only application capabilities are allowed to be reconfigured. See Section 8.11.1
201        if let Some(cap) = capabilities.iter().find(|x| !x.is_application()) {
202            return Err((cap.category(), ErrorCode::InvalidCapabilities));
203        }
204        // Should only replace the capabilities that have been configured. See Section 8.11.2
205        let to_replace: std::vec::Vec<_> =
206            capabilities.iter().map(|x| std::mem::discriminant(x)).collect();
207        self.configuration.retain(|x| {
208            let disc = std::mem::discriminant(x);
209            !to_replace.contains(&disc)
210        });
211        self.configuration.append(&mut capabilities);
212        self.update_callback();
213        Ok(())
214    }
215
216    /// Get the current configuration of this stream.
217    /// If the stream is not configured, returns None.
218    /// Used for the Steam Get Configuration Procedure, see Section 6.10
219    pub fn get_configuration(&self) -> Option<&Vec<ServiceCapability>> {
220        if self.configuration.is_empty() {
221            return None;
222        }
223        Some(&self.configuration)
224    }
225
226    // 100 milliseconds chosen based on end of range testing, to allow for recovery after normal
227    // packet delivery continues.
228    const SRC_FLUSH_TIMEOUT: MonotonicDuration = MonotonicDuration::from_millis(100);
229
230    /// When a L2CAP channel is received after an Open command is accepted, it should be
231    /// delivered via receive_channel.
232    /// Returns true if this Endpoint expects more channels to be established before
233    /// streaming is started.
234    /// Returns Err(InvalidState) if this Endpoint is not expecting a channel to be established,
235    /// closing |c|.
236    pub fn receive_channel(&mut self, c: Channel) -> AvdtpResult<bool> {
237        if self.state_is(StreamState::Opening).is_err() || self.transport.is_some() {
238            return Err(Error::InvalidState);
239        }
240        self.transport = Some(Arc::new(RwLock::new(c)));
241        self.try_flush_timeout(Self::SRC_FLUSH_TIMEOUT);
242        self.stream_held = Arc::new(Mutex::new(false));
243        // TODO(jamuraa, https://fxbug.dev/42051664, https://fxbug.dev/42051776): Reporting and Recovery channels
244        self.set_state(StreamState::Open);
245        Ok(false)
246    }
247
248    /// Begin opening this stream.  The stream must be in a Configured state.
249    /// See Stream Establishment, Section 6.11
250    pub fn establish(&mut self) -> Result<(), ErrorCode> {
251        if self.state_is(StreamState::Configured).is_err() || self.transport.is_some() {
252            return Err(ErrorCode::BadState);
253        }
254        self.set_state(StreamState::Opening);
255        Ok(())
256    }
257
258    /// Attempts to set audio direction priority of the MediaTransport channel based on
259    /// whether the stream is a source or sink endpoint if `active` is true.  If `active` is
260    /// false, set the priority to Normal instead.  Does nothing on failure.
261    pub fn try_priority(&self, active: bool) {
262        let priority = match (active, &self.endpoint_type) {
263            (false, _) => A2dpDirection::Normal,
264            (true, EndpointType::Source) => A2dpDirection::Source,
265            (true, EndpointType::Sink) => A2dpDirection::Sink,
266        };
267        let fut = match self.transport.as_ref().unwrap().try_read() {
268            Err(_) => return,
269            Ok(channel) => channel.set_audio_priority(priority).map(|_| ()),
270        };
271        // TODO(https://fxbug.dev/331621666): We should avoid detaching this.
272        Task::spawn(fut).detach();
273    }
274
275    /// Attempts to set the flush timeout for the MediaTransport channel, for source endpoints.
276    pub fn try_flush_timeout(&self, timeout: MonotonicDuration) {
277        if self.endpoint_type != EndpointType::Source {
278            return;
279        }
280        let fut = match self.transport.as_ref().unwrap().try_write() {
281            Err(_) => return,
282            Ok(channel) => channel.set_flush_timeout(Some(timeout)).map(|_| ()),
283        };
284        // TODO(https://fxbug.dev/331621666): We should avoid detaching this.
285        Task::spawn(fut).detach();
286    }
287
288    /// Close this stream.  This procedure will wait until media channels are closed before
289    /// transitioning to Idle.  If the channels are not closed in 3 seconds, we initiate an abort
290    /// procedure with the remote |peer| to force a transition to Idle.
291    pub fn release(&mut self, responder: SimpleResponder, peer: &Peer) -> AvdtpResult<()> {
292        {
293            let lock = self.state.lock();
294            if *lock != StreamState::Open && *lock != StreamState::Streaming {
295                return responder.reject(ErrorCode::BadState);
296            }
297        }
298        self.set_state(StreamState::Closing);
299        responder.send()?;
300        let release_wait_fut = {
301            // Take our transport and remote id - after this procedure it will be closed.
302            // These must be Some(_) because we are in Open / Streaming state.
303            let seid = self.remote_id.take().unwrap();
304            let transport = self.transport.take().unwrap();
305            let peer = peer.clone();
306            let state = self.state.clone();
307            async move {
308                let Ok(transport) = transport.try_read() else {
309                    warn!("unable to lock transport channel, dropping and assuming closed");
310                    *state.lock() = StreamState::Idle;
311                    return;
312                };
313                let closed_fut = transport
314                    .closed()
315                    .on_timeout(MonotonicDuration::from_seconds(3).after_now(), || {
316                        Err(Status::TIMED_OUT)
317                    });
318                if let Err(Status::TIMED_OUT) = closed_fut.await {
319                    let _ = peer.abort(&seid).await;
320                    *state.lock() = StreamState::Aborting;
321                    // As the initiator of the Abort, we close our channel.
322                    drop(transport);
323                }
324                *state.lock() = StreamState::Idle;
325            }
326        };
327        self.in_progress = Some(Task::local(release_wait_fut));
328        // Closing will return this endpoint to the Idle state, one way or another with no
329        // configuration
330        self.configuration.clear();
331        self.update_callback();
332        Ok(())
333    }
334
335    /// Returns the current state of this endpoint.
336    pub fn state(&self) -> StreamState {
337        *self.state.lock()
338    }
339
340    /// Start this stream.  This can be done only from the Open State.
341    /// Used for the Stream Start procedure, See Section 6.12
342    pub fn start(&mut self) -> Result<(), ErrorCode> {
343        self.state_is(StreamState::Open)?;
344        self.try_priority(true);
345        self.set_state(StreamState::Streaming);
346        Ok(())
347    }
348
349    /// Suspend this stream.  This can be done only from the Streaming state.
350    /// Used for the Stream Suspend procedure, See Section 6.14
351    pub fn suspend(&mut self) -> Result<(), ErrorCode> {
352        self.state_is(StreamState::Streaming)?;
353        self.set_state(StreamState::Open);
354        self.try_priority(false);
355        Ok(())
356    }
357
358    /// Abort this stream.  This can be done from any state, and will always return the state
359    /// to Idle.  We are initiating this procedure so will wait for a response and all our
360    /// channels will be closed.
361    pub async fn initiate_abort<'a>(&'a mut self, peer: &'a Peer) {
362        if let Some(seid) = self.remote_id.take() {
363            let _ = peer.abort(&seid).await;
364            self.set_state(StreamState::Aborting);
365        }
366        self.abort()
367    }
368
369    /// Abort this stream.  This can be done from any state, and will always return the state
370    /// to Idle.  We are receiving this abort from the peer, and all our channels will close.
371    pub fn abort(&mut self) {
372        self.set_state(StreamState::Aborting);
373        self.configuration.clear();
374        self.remote_id = None;
375        self.transport = None;
376        self.set_state(StreamState::Idle);
377    }
378
379    /// Capabilities of this StreamEndpoint.
380    /// Provides support for the Get Capabilities and Get All Capabilities signaling procedures.
381    /// See Sections 6.7 and 6.8
382    pub fn capabilities(&self) -> &Vec<ServiceCapability> {
383        &self.capabilities
384    }
385
386    /// Returns the CodecType of this StreamEndpoint.
387    /// Returns None if there is no MediaCodec capability in the endpoint.
388    /// Note: a MediaCodec capability is required by all endpoints by the spec.
389    pub fn codec_type(&self) -> Option<&MediaCodecType> {
390        self.capabilities.iter().find_map(|cap| match cap {
391            ServiceCapability::MediaCodec { codec_type, .. } => Some(codec_type),
392            _ => None,
393        })
394    }
395
396    /// Returns the local StreamEndpointId for this endpoint.
397    pub fn local_id(&self) -> &StreamEndpointId {
398        &self.id
399    }
400
401    /// Returns the remote StreamEndpointId for this endpoint, if it's configured.
402    pub fn remote_id(&self) -> Option<&StreamEndpointId> {
403        self.remote_id.as_ref()
404    }
405
406    /// Returns the EndpointType of this endpoint
407    pub fn endpoint_type(&self) -> &EndpointType {
408        &self.endpoint_type
409    }
410
411    /// Make a StreamInformation which represents the current state of this stream.
412    pub fn information(&self) -> StreamInformation {
413        let in_use = self.state_is(StreamState::Idle).is_err();
414        StreamInformation::new(
415            self.id.clone(),
416            in_use,
417            self.media_type.clone(),
418            self.endpoint_type.clone(),
419        )
420    }
421
422    /// Take the media transport channel, which transmits (or receives) any media for this
423    /// StreamEndpoint.  Returns None if the channel is held already, or if the channel has not
424    /// been opened.
425    pub fn take_transport(&mut self) -> Option<MediaStream> {
426        let mut stream_held = self.stream_held.lock();
427        if *stream_held || self.transport.is_none() {
428            return None;
429        }
430
431        *stream_held = true;
432
433        Some(MediaStream::new(
434            self.stream_held.clone(),
435            Arc::downgrade(self.transport.as_ref().unwrap()),
436        ))
437    }
438}
439
440/// Represents a media transport stream.
441/// If a sink, produces the bytes that have been delivered from the peer.
442/// If a source, can send bytes using `send`
443pub struct MediaStream {
444    in_use: Arc<Mutex<bool>>,
445    channel: Weak<RwLock<Channel>>,
446    terminated: bool,
447}
448
449impl MediaStream {
450    pub fn new(in_use: Arc<Mutex<bool>>, channel: Weak<RwLock<Channel>>) -> Self {
451        Self { in_use, channel, terminated: false }
452    }
453
454    fn try_upgrade(&self) -> Result<Arc<RwLock<Channel>>, io::Error> {
455        self.channel
456            .upgrade()
457            .ok_or_else(|| io::Error::new(io::ErrorKind::ConnectionAborted, "lost connection"))
458    }
459
460    pub fn max_tx_size(&self) -> Result<usize, io::Error> {
461        match self.try_upgrade()?.try_read() {
462            Err(_e) => return Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")),
463            Ok(lock) => Ok(lock.max_tx_size()),
464        }
465    }
466}
467
468impl Drop for MediaStream {
469    fn drop(&mut self) {
470        let mut l = self.in_use.lock();
471        *l = false;
472    }
473}
474
475impl Stream for MediaStream {
476    type Item = AvdtpResult<Vec<u8>>;
477
478    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
479        let Ok(arc_chan) = self.try_upgrade() else {
480            self.terminated = true;
481            return Poll::Ready(None);
482        };
483        let Ok(lock) = arc_chan.try_write() else {
484            self.terminated = true;
485            return Poll::Ready(None);
486        };
487        let mut pin_chan = Pin::new(lock);
488        match pin_chan.as_mut().poll_next(cx) {
489            Poll::Ready(Some(Ok(res))) => Poll::Ready(Some(Ok(res))),
490            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Error::PeerRead(e)))),
491            Poll::Ready(None) => {
492                self.terminated = true;
493                Poll::Ready(None)
494            }
495            Poll::Pending => Poll::Pending,
496        }
497    }
498}
499
500impl FusedStream for MediaStream {
501    fn is_terminated(&self) -> bool {
502        self.terminated
503    }
504}
505
506impl io::AsyncWrite for MediaStream {
507    fn poll_write(
508        self: Pin<&mut Self>,
509        cx: &mut Context<'_>,
510        buf: &[u8],
511    ) -> Poll<Result<usize, io::Error>> {
512        let arc_chan = match self.try_upgrade() {
513            Err(e) => return Poll::Ready(Err(e)),
514            Ok(c) => c,
515        };
516        let lock = match arc_chan.try_write() {
517            Err(_) => {
518                return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
519            }
520            Ok(lock) => lock,
521        };
522        let mut pin_chan = Pin::new(lock);
523        pin_chan.as_mut().poll_write(cx, buf)
524    }
525
526    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
527        let arc_chan = match self.try_upgrade() {
528            Err(e) => return Poll::Ready(Err(e)),
529            Ok(c) => c,
530        };
531        let lock = match arc_chan.try_write() {
532            Err(_) => {
533                return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
534            }
535            Ok(lock) => lock,
536        };
537        let mut pin_chan = Pin::new(lock);
538        pin_chan.as_mut().poll_flush(cx)
539    }
540
541    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
542        let arc_chan = match self.try_upgrade() {
543            Err(e) => return Poll::Ready(Err(e)),
544            Ok(c) => c,
545        };
546        let lock = match arc_chan.try_write() {
547            Err(_) => {
548                return Poll::Ready(Err(io::Error::new(io::ErrorKind::WouldBlock, "couldn't lock")))
549            }
550            Ok(lock) => lock,
551        };
552        let mut pin_chan = Pin::new(lock);
553        pin_chan.as_mut().poll_close(cx)
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use crate::tests::{expect_remote_recv, setup_peer};
561    use crate::Request;
562
563    use assert_matches::assert_matches;
564    use fidl::endpoints::create_request_stream;
565    use futures::io::AsyncWriteExt;
566    use futures::stream::StreamExt;
567    use {
568        fidl_fuchsia_bluetooth as fidl_bt, fidl_fuchsia_bluetooth_bredr as bredr,
569        fuchsia_async as fasync,
570    };
571
572    const REMOTE_ID_VAL: u8 = 1;
573    const REMOTE_ID: StreamEndpointId = StreamEndpointId(REMOTE_ID_VAL);
574
575    #[test]
576    fn make() {
577        let s = StreamEndpoint::new(
578            REMOTE_ID_VAL,
579            MediaType::Audio,
580            EndpointType::Sink,
581            vec![ServiceCapability::MediaTransport],
582        );
583        assert!(s.is_ok());
584        let s = s.unwrap();
585        assert_eq!(&StreamEndpointId(1), s.local_id());
586
587        let info = s.information();
588        assert!(!info.in_use());
589
590        let no = StreamEndpoint::new(
591            0,
592            MediaType::Audio,
593            EndpointType::Sink,
594            vec![ServiceCapability::MediaTransport],
595        );
596        assert!(no.is_err());
597    }
598
599    fn establish_stream(s: &mut StreamEndpoint) -> Channel {
600        assert_matches!(s.establish(), Ok(()));
601        let (chan, remote) = Channel::create();
602        assert_matches!(s.receive_channel(chan), Ok(false));
603        remote
604    }
605
606    #[test]
607    fn from_info() {
608        let seid = StreamEndpointId::try_from(5).unwrap();
609        let info =
610            StreamInformation::new(seid.clone(), false, MediaType::Audio, EndpointType::Sink);
611        let capabilities = vec![ServiceCapability::MediaTransport];
612
613        let endpoint = StreamEndpoint::from_info(&info, capabilities);
614
615        assert_eq!(&seid, endpoint.local_id());
616        assert_eq!(&false, endpoint.information().in_use());
617        assert_eq!(1, endpoint.capabilities().len());
618    }
619
620    #[test]
621    fn codec_type() {
622        let s = StreamEndpoint::new(
623            REMOTE_ID_VAL,
624            MediaType::Audio,
625            EndpointType::Sink,
626            vec![
627                ServiceCapability::MediaTransport,
628                ServiceCapability::MediaCodec {
629                    media_type: MediaType::Audio,
630                    codec_type: MediaCodecType::new(0x40),
631                    codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data.
632                },
633            ],
634        )
635        .unwrap();
636
637        assert_eq!(Some(&MediaCodecType::new(0x40)), s.codec_type());
638
639        let s = StreamEndpoint::new(
640            REMOTE_ID_VAL,
641            MediaType::Audio,
642            EndpointType::Sink,
643            vec![ServiceCapability::MediaTransport],
644        )
645        .unwrap();
646
647        assert_eq!(None, s.codec_type());
648    }
649
650    fn test_endpoint(r#type: EndpointType) -> StreamEndpoint {
651        StreamEndpoint::new(
652            REMOTE_ID_VAL,
653            MediaType::Audio,
654            r#type,
655            vec![
656                ServiceCapability::MediaTransport,
657                ServiceCapability::MediaCodec {
658                    media_type: MediaType::Audio,
659                    codec_type: MediaCodecType::new(0x40),
660                    codec_extra: vec![0xDE, 0xAD, 0xBE, 0xEF], // Meaningless test data.
661                },
662            ],
663        )
664        .unwrap()
665    }
666
667    #[test]
668    fn stream_configure_reconfigure() {
669        let _exec = fasync::TestExecutor::new();
670        let mut s = test_endpoint(EndpointType::Sink);
671
672        // Can't configure items that aren't in range.
673        assert_matches!(
674            s.configure(&REMOTE_ID, vec![ServiceCapability::Reporting]),
675            Err((ServiceCategory::Reporting, ErrorCode::UnsupportedConfiguration))
676        );
677
678        assert_matches!(
679            s.configure(
680                &REMOTE_ID,
681                vec![
682                    ServiceCapability::MediaTransport,
683                    ServiceCapability::MediaCodec {
684                        media_type: MediaType::Audio,
685                        codec_type: MediaCodecType::new(0x40),
686                        // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6)
687                        codec_extra: vec![0x0C, 0x0D, 0x02, 0x51],
688                    }
689                ]
690            ),
691            Ok(())
692        );
693
694        // Note: we allow endpoints to be configured (and reconfigured) again when they
695        // are only configured, even though this is probably not allowed per the spec.
696
697        // Can't configure while open
698        let _channel = establish_stream(&mut s);
699
700        assert_matches!(
701            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
702            Err((_, ErrorCode::BadState))
703        );
704
705        let reconfiguration = vec![ServiceCapability::MediaCodec {
706            media_type: MediaType::Audio,
707            codec_type: MediaCodecType::new(0x40),
708            // Reconfigure to yet another different codec_extra value.
709            codec_extra: vec![0x0C, 0x0D, 0x0E, 0x0F],
710        }];
711
712        // The new configuration should match the previous one, but with the reconfigured
713        // capabilities updated.
714        let new_configuration = vec![ServiceCapability::MediaTransport, reconfiguration[0].clone()];
715
716        // Reconfiguring while open is fine though.
717        assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
718
719        assert_eq!(Some(&new_configuration), s.get_configuration());
720
721        // Can't reconfigure non-application types
722        assert_matches!(
723            s.reconfigure(vec![ServiceCapability::MediaTransport]),
724            Err((ServiceCategory::MediaTransport, ErrorCode::InvalidCapabilities))
725        );
726
727        // Can't configure or reconfigure while streaming
728        assert_matches!(s.start(), Ok(()));
729
730        assert_matches!(
731            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
732            Err((_, ErrorCode::BadState))
733        );
734
735        assert_matches!(s.reconfigure(reconfiguration.clone()), Err((_, ErrorCode::BadState)));
736
737        assert_matches!(s.suspend(), Ok(()));
738
739        // Reconfigure should be fine again in open state.
740        assert_matches!(s.reconfigure(reconfiguration.clone()), Ok(()));
741
742        // Configure is still not allowed.
743        assert_matches!(
744            s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]),
745            Err((_, ErrorCode::BadState))
746        );
747    }
748
749    #[test]
750    fn stream_establishment() {
751        let _exec = fasync::TestExecutor::new();
752        let mut s = test_endpoint(EndpointType::Sink);
753
754        let (remote, transport) = Channel::create();
755
756        // Can't establish before configuring
757        assert_matches!(s.establish(), Err(ErrorCode::BadState));
758
759        // Trying to receive a channel in the wrong state closes the channel
760        assert_matches!(s.receive_channel(transport), Err(Error::InvalidState));
761
762        let buf: &mut [u8] = &mut [0; 1];
763
764        assert_matches!(remote.read(buf), Err(zx::Status::PEER_CLOSED));
765
766        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
767
768        assert_matches!(s.establish(), Ok(()));
769
770        // And we should be able to give a channel now.
771        let (_remote, transport) = Channel::create();
772        assert_matches!(s.receive_channel(transport), Ok(false));
773    }
774
775    fn setup_peer_for_release(exec: &mut fasync::TestExecutor) -> (Peer, Channel, SimpleResponder) {
776        let (peer, signaling) = setup_peer();
777        // Send a close from the other side to produce an event we can respond to.
778        let _ = signaling.write(&[0x40, 0x08, 0x04]).expect("signaling write");
779        let mut req_stream = peer.take_request_stream();
780        let mut req_fut = req_stream.next();
781        let complete = exec.run_until_stalled(&mut req_fut);
782        let responder = match complete {
783            Poll::Ready(Some(Ok(Request::Close { responder, .. }))) => responder,
784            _ => panic!("Expected a close request"),
785        };
786        (peer, signaling, responder)
787    }
788
789    #[test]
790    fn stream_release_without_abort() {
791        let mut exec = fasync::TestExecutor::new();
792        let mut s = test_endpoint(EndpointType::Sink);
793
794        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
795
796        let remote_transport = establish_stream(&mut s);
797
798        let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
799
800        // We expect release to succeed in this state.
801        s.release(responder, &peer).unwrap();
802        // Expect a "yes" response.
803        expect_remote_recv(&[0x42, 0x08], &signaling);
804
805        // Close the transport channel by dropping it.
806        drop(remote_transport);
807
808        // After the transport is closed we should transition to Idle.
809        let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
810        assert_eq!(s.state(), StreamState::Idle);
811    }
812
813    #[test]
814    fn test_mediastream() {
815        let mut exec = fasync::TestExecutor::new();
816        let mut s = test_endpoint(EndpointType::Sink);
817
818        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
819
820        // Before the stream is opened, we shouldn't be able to take the transport.
821        assert!(s.take_transport().is_none());
822
823        let remote_transport = establish_stream(&mut s);
824
825        // Should be able to get the transport from the stream now.
826        let temp_stream = s.take_transport();
827        assert!(temp_stream.is_some());
828
829        // But only once
830        assert!(s.take_transport().is_none());
831
832        // Until you drop the stream
833        drop(temp_stream);
834
835        let media_stream = s.take_transport();
836        assert!(media_stream.is_some());
837        let mut media_stream = media_stream.unwrap();
838
839        // Max TX size is taken from the underlying channel.
840        assert_matches!(media_stream.max_tx_size(), Ok(Channel::DEFAULT_MAX_TX));
841
842        // Writing to the media stream should send it through the transport channel.
843        let hearts = &[0xF0, 0x9F, 0x92, 0x96, 0xF0, 0x9F, 0x92, 0x96];
844        let mut write_fut = media_stream.write(hearts);
845
846        assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Ok(8)));
847
848        expect_remote_recv(hearts, &remote_transport);
849
850        // Closing the media stream should close the channel.
851        let mut close_fut = media_stream.close();
852        assert_matches!(exec.run_until_stalled(&mut close_fut), Poll::Ready(Ok(())));
853        // Note: there's no effect on the other end of the channel when a close occurs,
854        // until the channel is dropped.
855
856        drop(s);
857
858        // Reading from the remote end should fail.
859        let mut result = vec![0];
860        assert_matches!(remote_transport.read(&mut result[..]), Err(zx::Status::PEER_CLOSED));
861
862        // After the stream is gone, any write should return an Err
863        let mut write_fut = media_stream.write(&[0xDE, 0xAD]);
864        assert_matches!(exec.run_until_stalled(&mut write_fut), Poll::Ready(Err(_)));
865
866        // After the stream is gone, the stream should be fused done.
867        let mut next_fut = media_stream.next();
868        assert_matches!(exec.run_until_stalled(&mut next_fut), Poll::Ready(None));
869
870        assert!(media_stream.is_terminated(), "should be terminated");
871
872        // And the Max TX should be an error.
873        assert_matches!(media_stream.max_tx_size(), Err(_));
874    }
875
876    #[test]
877    fn stream_release_with_abort() {
878        let mut exec = fasync::TestExecutor::new();
879        let mut s = test_endpoint(EndpointType::Sink);
880
881        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
882        let remote_transport = establish_stream(&mut s);
883        let (peer, mut signaling, responder) = setup_peer_for_release(&mut exec);
884
885        // We expect release to succeed in this state, then start the task to wait for the close.
886        s.release(responder, &peer).unwrap();
887        // Expect a "yes" response.
888        expect_remote_recv(&[0x42, 0x08], &signaling);
889
890        // Should get an abort
891        let next = std::pin::pin!(signaling.next());
892        let received =
893            exec.run_singlethreaded(next).expect("channel not closed").expect("successful read");
894        assert_eq!(0x0A, received[1]);
895        let txlabel = received[0] & 0xF0;
896        // Send a response
897        assert!(signaling.write(&[txlabel | 0x02, 0x0A]).is_ok());
898
899        let _ = exec.run_singlethreaded(&mut remote_transport.closed());
900
901        // We will then end up in Idle.
902        while s.state() != StreamState::Idle {
903            let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
904        }
905    }
906
907    #[test]
908    fn start_and_suspend() {
909        let mut exec = fasync::TestExecutor::new();
910        let mut s = test_endpoint(EndpointType::Sink);
911
912        // Can't start or suspend until configured and open.
913        assert_matches!(s.start(), Err(ErrorCode::BadState));
914        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
915
916        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
917
918        assert_matches!(s.start(), Err(ErrorCode::BadState));
919        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
920
921        assert_matches!(s.establish(), Ok(()));
922
923        assert_matches!(s.start(), Err(ErrorCode::BadState));
924        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
925
926        let (remote, local) = zx::Socket::create_datagram();
927        let (client_end, mut direction_request_stream) =
928            create_request_stream::<bredr::AudioDirectionExtMarker>();
929        let ext = bredr::Channel {
930            socket: Some(local),
931            channel_mode: Some(fidl_bt::ChannelMode::Basic),
932            max_tx_sdu_size: Some(1004),
933            ext_direction: Some(client_end),
934            ..Default::default()
935        };
936        let transport = Channel::try_from(ext).unwrap();
937        assert_matches!(s.receive_channel(transport), Ok(false));
938
939        // Should be able to start but not suspend now.
940        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
941        assert_matches!(s.start(), Ok(()));
942
943        match exec.run_until_stalled(&mut direction_request_stream.next()) {
944            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
945                priority,
946                responder,
947            }))) => {
948                assert_eq!(bredr::A2dpDirectionPriority::Sink, priority);
949                responder.send(Ok(())).expect("response to send cleanly");
950            }
951            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
952        };
953
954        // Are started, so we should be able to suspend but not start again here.
955        assert_matches!(s.start(), Err(ErrorCode::BadState));
956        assert_matches!(s.suspend(), Ok(()));
957
958        match exec.run_until_stalled(&mut direction_request_stream.next()) {
959            Poll::Ready(Some(Ok(bredr::AudioDirectionExtRequest::SetPriority {
960                priority,
961                responder,
962            }))) => {
963                assert_eq!(bredr::A2dpDirectionPriority::Normal, priority);
964                responder.send(Ok(())).expect("response to send cleanly");
965            }
966            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
967        };
968
969        // Now we're suspended, so we can start it again.
970        assert_matches!(s.start(), Ok(()));
971        assert_matches!(s.suspend(), Ok(()));
972
973        // After we close, we are back at idle and can't start / stop
974        let (peer, signaling, responder) = setup_peer_for_release(&mut exec);
975
976        {
977            s.release(responder, &peer).unwrap();
978            // Expect a "yes" response.
979            expect_remote_recv(&[0x42, 0x08], &signaling);
980            // Close the transport channel by dropping it.
981            drop(remote);
982            while s.state() != StreamState::Idle {
983                let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
984            }
985        }
986
987        // Shouldn't be able to start or suspend again.
988        assert_matches!(s.start(), Err(ErrorCode::BadState));
989        assert_matches!(s.suspend(), Err(ErrorCode::BadState));
990    }
991
992    fn receive_l2cap_params_channel(
993        s: &mut StreamEndpoint,
994    ) -> (zx::Socket, bredr::L2capParametersExtRequestStream) {
995        assert_matches!(s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport]), Ok(()));
996        assert_matches!(s.establish(), Ok(()));
997
998        let (remote, local) = zx::Socket::create_datagram();
999        let (client_end, l2cap_params_requests) =
1000            create_request_stream::<bredr::L2capParametersExtMarker>();
1001        let ext = bredr::Channel {
1002            socket: Some(local),
1003            channel_mode: Some(fidl_bt::ChannelMode::Basic),
1004            max_tx_sdu_size: Some(1004),
1005            ext_l2cap: Some(client_end),
1006            ..Default::default()
1007        };
1008        let transport = Channel::try_from(ext).unwrap();
1009        assert_matches!(s.receive_channel(transport), Ok(false));
1010        (remote, l2cap_params_requests)
1011    }
1012
1013    #[test]
1014    fn sets_flush_timeout_for_source_transports() {
1015        let mut exec = fasync::TestExecutor::new();
1016        let mut s = test_endpoint(EndpointType::Source);
1017        let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1018
1019        // Should request to set the flush timeout.
1020        match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1021            Poll::Ready(Some(Ok(bredr::L2capParametersExtRequest::RequestParameters {
1022                request,
1023                responder,
1024            }))) => {
1025                assert_eq!(
1026                    Some(StreamEndpoint::SRC_FLUSH_TIMEOUT.into_nanos()),
1027                    request.flush_timeout
1028                );
1029                responder.send(&request).expect("response to send cleanly");
1030            }
1031            x => panic!("Expected a item to be ready on the request stream, got {:?}", x),
1032        };
1033    }
1034
1035    #[test]
1036    fn no_flush_timeout_for_sink_transports() {
1037        let mut exec = fasync::TestExecutor::new();
1038        let mut s = test_endpoint(EndpointType::Sink);
1039        let (_remote, mut l2cap_params_requests) = receive_l2cap_params_channel(&mut s);
1040
1041        // Should NOT request to set the flush timeout.
1042        match exec.run_until_stalled(&mut l2cap_params_requests.next()) {
1043            Poll::Pending => {}
1044            x => panic!("Expected no request to set flush timeout, got {:?}", x),
1045        };
1046    }
1047
1048    #[test]
1049    fn get_configuration() {
1050        let mut s = test_endpoint(EndpointType::Sink);
1051
1052        // Can't get configuration if we aren't configured.
1053        assert!(s.get_configuration().is_none());
1054
1055        let config = vec![
1056            ServiceCapability::MediaTransport,
1057            ServiceCapability::MediaCodec {
1058                media_type: MediaType::Audio,
1059                codec_type: MediaCodecType::new(0),
1060                // Change the codec_extra which is typical, ex. SBC (A2DP Spec 4.3.2.6)
1061                codec_extra: vec![0x60, 0x0D, 0x02, 0x55],
1062            },
1063        ];
1064
1065        assert_matches!(s.configure(&REMOTE_ID, config.clone()), Ok(()));
1066
1067        match s.get_configuration() {
1068            Some(c) => assert_eq!(&config, c),
1069            x => panic!("Expected Ok from get_configuration but got {:?}", x),
1070        };
1071
1072        // Abort this stream, putting it back to the idle state.
1073        s.abort();
1074
1075        assert!(s.get_configuration().is_none());
1076    }
1077
1078    use std::sync::atomic::{AtomicUsize, Ordering};
1079
1080    /// Create a callback that tracks how many times it has been called
1081    fn call_count_callback() -> (Option<StreamEndpointUpdateCallback>, Arc<AtomicUsize>) {
1082        let call_count = Arc::new(AtomicUsize::new(0));
1083        let call_count_reader = call_count.clone();
1084        let count_cb: StreamEndpointUpdateCallback = Box::new(move |_stream: &StreamEndpoint| {
1085            let _ = call_count.fetch_add(1, Ordering::SeqCst);
1086        });
1087        (Some(count_cb), call_count_reader)
1088    }
1089
1090    /// Test that the update callback is run at least once for all methods that mutate the state of
1091    /// the StreamEndpoint. This is done through an atomic counter in the callback that increments
1092    /// when the callback is run.
1093    ///
1094    /// Note that the _results_ of calling these mutating methods on the state of StreamEndpoint are
1095    /// not validated here. They are validated in other tests.
1096    #[test]
1097    fn update_callback() {
1098        // Need an executor to make a socket
1099        let _exec = fasync::TestExecutor::new();
1100        let mut s = test_endpoint(EndpointType::Sink);
1101        let (cb, call_count) = call_count_callback();
1102        s.set_update_callback(cb);
1103
1104        s.configure(&REMOTE_ID, vec![ServiceCapability::MediaTransport])
1105            .expect("Configure to succeed in test");
1106        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1107        call_count.store(0, Ordering::SeqCst); // clear call count
1108
1109        s.establish().expect("Establish to succeed in test");
1110        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1111        call_count.store(0, Ordering::SeqCst); // clear call count
1112
1113        let (_, transport) = Channel::create();
1114        assert_eq!(
1115            s.receive_channel(transport).expect("Receive channel to succeed in test"),
1116            false
1117        );
1118        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1119        call_count.store(0, Ordering::SeqCst); // clear call count
1120
1121        s.start().expect("Start to succeed in test");
1122        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1123        call_count.store(0, Ordering::SeqCst); // clear call count
1124
1125        s.suspend().expect("Suspend to succeed in test");
1126        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1127        call_count.store(0, Ordering::SeqCst); // clear call count
1128
1129        s.reconfigure(vec![]).expect("Reconfigure to succeed in test");
1130        assert!(call_count.load(Ordering::SeqCst) > 0, "Update callback called at least once");
1131        call_count.store(0, Ordering::SeqCst); // clear call count
1132
1133        // Abort this stream, putting it back to the idle state.
1134        s.abort();
1135    }
1136}