bt_a2dp/
media_task.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use bt_avdtp::{EndpointType, MediaStream};
6use dyn_clone::DynClone;
7use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
8use fuchsia_bluetooth::types::PeerId;
9use fuchsia_inspect::Node;
10use fuchsia_inspect_derive::AttachError;
11use futures::future::{BoxFuture, Shared};
12use futures::FutureExt;
13use std::time::Duration;
14use thiserror::Error;
15
16use crate::codec::MediaCodecConfig;
17
18#[derive(Debug, Error, Clone)]
19#[non_exhaustive]
20pub enum MediaTaskError {
21    #[error("Operation or configuration not supported")]
22    NotSupported,
23    #[error("Peer closed the media stream")]
24    PeerClosed,
25    #[error("Resources needed are already being used")]
26    ResourcesInUse,
27    #[error("Other Media Task Error: {}", _0)]
28    Other(String),
29}
30
31impl From<bt_avdtp::Error> for MediaTaskError {
32    fn from(error: bt_avdtp::Error) -> Self {
33        Self::Other(format!("AVDTP Error: {}", error))
34    }
35}
36
37/// MediaTaskRunners are configured with information about the media codec when either peer in a
38/// conversation configures a stream endpoint.  When successfully configured, they can start
39/// MediaTasks by accepting a MediaStream, which will provide or consume media on that stream until
40/// dropped or stopped.
41///
42/// A builder that will make media task runners from requested configurations.
43pub trait MediaTaskBuilder: Send + Sync + DynClone {
44    /// Configure a new stream based on the given `codec_config` parameters.
45    /// Returns a MediaTaskRunner if the configuration is supported, an
46    /// MediaTaskError::NotSupported otherwise.
47    fn configure(
48        &self,
49        peer_id: &PeerId,
50        codec_config: &MediaCodecConfig,
51    ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError>;
52
53    /// Return the direction of tasks created by this builder.
54    /// Source tasks provide local encoded audio to a peer.
55    /// Sink tasks consume encoded audio from a peer.
56    fn direction(&self) -> EndpointType;
57
58    /// Provide a set of encoded media configurations that this task can support.
59    /// This can vary based on current system capabilities, and should be checked before
60    /// communicating capabilities to each peer.
61    /// `offload` is a proxy to the offload capabilities of the controller for this peer.
62    /// Returns a future that resolves to the set of MediaCodecConfigs that this builder supports,
63    /// typically one config per MediaCodecType, or an error if building the configs failed.
64    fn supported_configs(
65        &self,
66        peer_id: &PeerId,
67        offload: Option<AudioOffloadExtProxy>,
68    ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>>;
69}
70
71dyn_clone::clone_trait_object!(MediaTaskBuilder);
72
73/// MediaTaskRunners represent an ability of the media system to start streaming media.
74/// They are configured for a specific codec by `MediaTaskBuilder::configure`
75/// Typically a MediaTaskRunner can start multiple streams without needing to be reconfigured,
76/// although possibly not simultaneously.
77pub trait MediaTaskRunner: Send {
78    /// Start a MediaTask using the MediaStream given.
79    /// If the task started, returns a MediaTask which will finish if the stream ends or an
80    /// error occurs, and can be stopped using `MediaTask::stop` or by dropping the MediaTask.
81    /// This can fail with MediaTaskError::ResourcesInUse if a MediaTask cannot be started because
82    /// one is already running.
83    fn start(
84        &mut self,
85        stream: MediaStream,
86        offload: Option<AudioOffloadExtProxy>,
87    ) -> Result<Box<dyn MediaTask>, MediaTaskError>;
88
89    /// Try to reconfigure the MediaTask to accept a new configuration.  This differs from
90    /// `MediaTaskBuilder::configure` as it attempts to preserve the same configured session.
91    /// The runner remains configured with the initial configuration on an error.
92    fn reconfigure(&mut self, _config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
93        Err(MediaTaskError::NotSupported)
94    }
95
96    /// Set the delay reported from the peer for this media task.
97    /// This should configure the media source or sink to attempt to compensate.
98    /// Typically this is zero for Sink tasks, but Source tasks can receive this info from the peer.
99    /// May only be supported before start.
100    /// If an Error is returned, the delay has not been set.
101    fn set_delay(&mut self, _delay: Duration) -> Result<(), MediaTaskError> {
102        Err(MediaTaskError::NotSupported)
103    }
104
105    /// Add information from the running media task to the inspect tree
106    /// (i.e. data transferred, jitter, etc)
107    fn iattach(&mut self, _parent: &Node, _name: &str) -> Result<(), AttachError> {
108        Err("attach not implemented".into())
109    }
110}
111
112/// MediaTasks represent a media stream being actively processed (sent or received from a peer).
113/// They are are created by `MediaTaskRunner::start`.
114/// Typically a MediaTask will run a background task that is active until dropped or
115/// `MediaTask::stop` is called.
116pub trait MediaTask: Send {
117    /// Returns a Future that finishes when the running media task finshes for any reason.
118    /// Should return a future that immediately resolves if this task is finished.
119    fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>>;
120
121    /// Returns the result if this task has finished, and None otherwise
122    fn result(&mut self) -> Option<Result<(), MediaTaskError>> {
123        self.finished().now_or_never()
124    }
125
126    /// Stops the task normally, signalling to all waiters Ok(()).
127    /// Returns the result sent to MediaTask::finished futures, which may be different from Ok(()).
128    /// When this function returns, is is good practice to ensure the MediaStream that started
129    /// this task is also dropped.
130    fn stop(&mut self) -> Result<(), MediaTaskError>;
131}
132
133pub mod tests {
134    use super::*;
135
136    use futures::channel::{mpsc, oneshot};
137    use futures::stream::StreamExt;
138    use futures::{Future, TryFutureExt};
139    use std::fmt;
140    use std::sync::{Arc, Mutex};
141
142    #[derive(Clone)]
143    pub struct TestMediaTask {
144        /// The PeerId that was used to make this Task
145        pub peer_id: PeerId,
146        /// The configuration used to make this task
147        pub codec_config: MediaCodecConfig,
148        /// If still started, this holds the MediaStream.
149        pub stream: Arc<Mutex<Option<MediaStream>>>,
150        /// Sender for the shared result future. None if already sent.
151        sender: Arc<Mutex<Option<oneshot::Sender<Result<(), MediaTaskError>>>>>,
152        /// Shared result future.
153        result: Shared<BoxFuture<'static, Result<(), MediaTaskError>>>,
154        /// Delay the task was started with.
155        pub delay: Duration,
156    }
157
158    impl fmt::Debug for TestMediaTask {
159        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160            f.debug_struct("TestMediaTask")
161                .field("peer_id", &self.peer_id)
162                .field("codec_config", &self.codec_config)
163                .field("result", &self.result.clone().now_or_never())
164                .finish()
165        }
166    }
167
168    impl TestMediaTask {
169        pub fn new(
170            peer_id: PeerId,
171            codec_config: MediaCodecConfig,
172            stream: MediaStream,
173            delay: Duration,
174        ) -> Self {
175            let (sender, receiver) = oneshot::channel();
176            let result = receiver
177                .map_ok_or_else(
178                    |_err| Err(MediaTaskError::Other(format!("Nothing sent"))),
179                    |result| result,
180                )
181                .boxed()
182                .shared();
183            Self {
184                peer_id,
185                codec_config,
186                stream: Arc::new(Mutex::new(Some(stream))),
187                sender: Arc::new(Mutex::new(Some(sender))),
188                result,
189                delay,
190            }
191        }
192
193        /// Return true if the background media task is running.
194        pub fn is_started(&self) -> bool {
195            // The stream being held represents the task running.
196            self.stream.lock().expect("stream lock").is_some()
197        }
198
199        /// End the streaming task without an external stop().
200        /// Sends an optional result from the task.
201        pub fn end_prematurely(&self, task_result: Option<Result<(), MediaTaskError>>) {
202            let _removed_stream = self.stream.lock().expect("mutex").take();
203            let mut lock = self.sender.lock().expect("sender lock");
204            let sender = lock.take();
205            if let (Some(result), Some(sender)) = (task_result, sender) {
206                sender.send(result).expect("send ok");
207            }
208        }
209    }
210
211    impl MediaTask for TestMediaTask {
212        fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>> {
213            self.result.clone().boxed()
214        }
215
216        fn stop(&mut self) -> Result<(), MediaTaskError> {
217            let _ = self.stream.lock().expect("stream lock").take();
218            {
219                let mut lock = self.sender.lock().expect("sender lock");
220                if let Some(sender) = lock.take() {
221                    let _ = sender.send(Ok(()));
222                    return Ok(());
223                }
224            }
225            // Result should be available.
226            self.finished().now_or_never().unwrap()
227        }
228    }
229
230    pub struct TestMediaTaskRunner {
231        /// The peer_id this was started with.
232        pub peer_id: PeerId,
233        /// The config that this runner will start tasks for
234        pub codec_config: MediaCodecConfig,
235        /// If this is reconfigurable
236        pub reconfigurable: bool,
237        /// If this supports delay reporting
238        pub supports_set_delay: bool,
239        /// What the delay is right now
240        pub set_delay: Option<std::time::Duration>,
241        /// The Sender that will send a clone of the started tasks to the builder.
242        pub sender: mpsc::Sender<TestMediaTask>,
243    }
244
245    impl MediaTaskRunner for TestMediaTaskRunner {
246        fn start(
247            &mut self,
248            stream: MediaStream,
249            _offload: Option<AudioOffloadExtProxy>,
250        ) -> Result<Box<dyn MediaTask>, MediaTaskError> {
251            let task = TestMediaTask::new(
252                self.peer_id.clone(),
253                self.codec_config.clone(),
254                stream,
255                self.set_delay.unwrap_or(Duration::ZERO),
256            );
257            // Don't particularly care if the receiver got dropped.
258            let _ = self.sender.try_send(task.clone());
259            Ok(Box::new(task))
260        }
261
262        fn set_delay(&mut self, delay: std::time::Duration) -> Result<(), MediaTaskError> {
263            if self.supports_set_delay {
264                self.set_delay = Some(delay);
265                Ok(())
266            } else {
267                Err(MediaTaskError::NotSupported)
268            }
269        }
270
271        fn reconfigure(&mut self, config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
272            if self.reconfigurable {
273                self.codec_config = config.clone();
274                Ok(())
275            } else {
276                Err(MediaTaskError::NotSupported)
277            }
278        }
279    }
280
281    /// A TestMediaTask expects to be configured once, and then started and stopped as appropriate.
282    /// It will Error if started again while started or stopped while stopped, or if it was
283    /// configured multiple times.
284    pub struct TestMediaTaskBuilder {
285        sender: Mutex<mpsc::Sender<TestMediaTask>>,
286        receiver: mpsc::Receiver<TestMediaTask>,
287        reconfigurable: bool,
288        supports_set_delay: bool,
289        configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
290        direction: EndpointType,
291    }
292
293    impl TestMediaTaskBuilder {
294        pub fn new() -> Self {
295            let (sender, receiver) = mpsc::channel(5);
296            Self {
297                sender: Mutex::new(sender),
298                receiver,
299                reconfigurable: false,
300                supports_set_delay: false,
301                configs: Ok(vec![crate::codec::MediaCodecConfig::min_sbc()]),
302                direction: EndpointType::Sink,
303            }
304        }
305
306        pub fn with_configs(
307            &mut self,
308            configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
309        ) -> &mut Self {
310            self.configs = configs;
311            self
312        }
313
314        pub fn with_direction(&mut self, direction: EndpointType) -> &mut Self {
315            self.direction = direction;
316            self
317        }
318
319        pub fn new_reconfigurable() -> Self {
320            Self { reconfigurable: true, ..Self::new() }
321        }
322
323        pub fn new_delayable() -> Self {
324            Self { supports_set_delay: true, ..Self::new() }
325        }
326
327        /// Returns a type that implements MediaTaskBuilder.  When a MediaTask is built using
328        /// configure(), it will be available from `next_task`.
329        pub fn builder(&self) -> Box<dyn MediaTaskBuilder> {
330            Box::new(TestMediaTaskBuilderBuilder {
331                sender: self.sender.lock().expect("locking").clone(),
332                reconfigurable: self.reconfigurable,
333                supports_set_delay: self.supports_set_delay,
334                configs: self.configs.clone(),
335                direction: self.direction,
336            })
337        }
338
339        /// Gets a future that will return a handle to the next TestMediaTask that gets started
340        /// from a Runner that was retrieved from this builder.
341        /// The TestMediaTask, can tell you when it's started and give you a handle to the MediaStream.
342        pub fn next_task(&mut self) -> impl Future<Output = Option<TestMediaTask>> + '_ {
343            self.receiver.next()
344        }
345
346        /// Expects that a task had been built, and retrieves that task, or panics.
347        #[track_caller]
348        pub fn expect_task(&mut self) -> TestMediaTask {
349            self.receiver
350                .try_next()
351                .expect("should have made a task")
352                .expect("shouldn't have dropped all senders")
353        }
354    }
355
356    #[derive(Clone)]
357    struct TestMediaTaskBuilderBuilder {
358        sender: mpsc::Sender<TestMediaTask>,
359        reconfigurable: bool,
360        supports_set_delay: bool,
361        configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
362        direction: EndpointType,
363    }
364
365    impl MediaTaskBuilder for TestMediaTaskBuilderBuilder {
366        fn configure(
367            &self,
368            peer_id: &PeerId,
369            codec_config: &MediaCodecConfig,
370        ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
371            let runner = TestMediaTaskRunner {
372                peer_id: peer_id.clone(),
373                codec_config: codec_config.clone(),
374                sender: self.sender.clone(),
375                reconfigurable: self.reconfigurable,
376                supports_set_delay: self.supports_set_delay,
377                set_delay: None,
378            };
379            Ok::<Box<dyn MediaTaskRunner>, _>(Box::new(runner))
380        }
381
382        fn direction(&self) -> EndpointType {
383            self.direction
384        }
385
386        fn supported_configs(
387            &self,
388            _peer_id: &PeerId,
389            _offload: Option<AudioOffloadExtProxy>,
390        ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
391            futures::future::ready(self.configs.clone()).boxed()
392        }
393    }
394}