1use bt_avdtp::{EndpointType, MediaStream};
6use dyn_clone::DynClone;
7use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
8use fuchsia_bluetooth::types::PeerId;
9use fuchsia_inspect::Node;
10use fuchsia_inspect_derive::AttachError;
11use futures::future::{BoxFuture, Shared};
12use futures::FutureExt;
13use std::time::Duration;
14use thiserror::Error;
15
16use crate::codec::MediaCodecConfig;
17
18#[derive(Debug, Error, Clone)]
19#[non_exhaustive]
20pub enum MediaTaskError {
21 #[error("Operation or configuration not supported")]
22 NotSupported,
23 #[error("Peer closed the media stream")]
24 PeerClosed,
25 #[error("Resources needed are already being used")]
26 ResourcesInUse,
27 #[error("Other Media Task Error: {}", _0)]
28 Other(String),
29}
30
31impl From<bt_avdtp::Error> for MediaTaskError {
32 fn from(error: bt_avdtp::Error) -> Self {
33 Self::Other(format!("AVDTP Error: {}", error))
34 }
35}
36
37pub trait MediaTaskBuilder: Send + Sync + DynClone {
44 fn configure(
48 &self,
49 peer_id: &PeerId,
50 codec_config: &MediaCodecConfig,
51 ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError>;
52
53 fn direction(&self) -> EndpointType;
57
58 fn supported_configs(
65 &self,
66 peer_id: &PeerId,
67 offload: Option<AudioOffloadExtProxy>,
68 ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>>;
69}
70
71dyn_clone::clone_trait_object!(MediaTaskBuilder);
72
73pub trait MediaTaskRunner: Send {
78 fn start(
84 &mut self,
85 stream: MediaStream,
86 offload: Option<AudioOffloadExtProxy>,
87 ) -> Result<Box<dyn MediaTask>, MediaTaskError>;
88
89 fn reconfigure(&mut self, _config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
93 Err(MediaTaskError::NotSupported)
94 }
95
96 fn set_delay(&mut self, _delay: Duration) -> Result<(), MediaTaskError> {
102 Err(MediaTaskError::NotSupported)
103 }
104
105 fn iattach(&mut self, _parent: &Node, _name: &str) -> Result<(), AttachError> {
108 Err("attach not implemented".into())
109 }
110}
111
112pub trait MediaTask: Send {
117 fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>>;
120
121 fn result(&mut self) -> Option<Result<(), MediaTaskError>> {
123 self.finished().now_or_never()
124 }
125
126 fn stop(&mut self) -> Result<(), MediaTaskError>;
131}
132
133pub mod tests {
134 use super::*;
135
136 use futures::channel::{mpsc, oneshot};
137 use futures::stream::StreamExt;
138 use futures::{Future, TryFutureExt};
139 use std::fmt;
140 use std::sync::{Arc, Mutex};
141
142 #[derive(Clone)]
143 pub struct TestMediaTask {
144 pub peer_id: PeerId,
146 pub codec_config: MediaCodecConfig,
148 pub stream: Arc<Mutex<Option<MediaStream>>>,
150 sender: Arc<Mutex<Option<oneshot::Sender<Result<(), MediaTaskError>>>>>,
152 result: Shared<BoxFuture<'static, Result<(), MediaTaskError>>>,
154 pub delay: Duration,
156 }
157
158 impl fmt::Debug for TestMediaTask {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 f.debug_struct("TestMediaTask")
161 .field("peer_id", &self.peer_id)
162 .field("codec_config", &self.codec_config)
163 .field("result", &self.result.clone().now_or_never())
164 .finish()
165 }
166 }
167
168 impl TestMediaTask {
169 pub fn new(
170 peer_id: PeerId,
171 codec_config: MediaCodecConfig,
172 stream: MediaStream,
173 delay: Duration,
174 ) -> Self {
175 let (sender, receiver) = oneshot::channel();
176 let result = receiver
177 .map_ok_or_else(
178 |_err| Err(MediaTaskError::Other(format!("Nothing sent"))),
179 |result| result,
180 )
181 .boxed()
182 .shared();
183 Self {
184 peer_id,
185 codec_config,
186 stream: Arc::new(Mutex::new(Some(stream))),
187 sender: Arc::new(Mutex::new(Some(sender))),
188 result,
189 delay,
190 }
191 }
192
193 pub fn is_started(&self) -> bool {
195 self.stream.lock().expect("stream lock").is_some()
197 }
198
199 pub fn end_prematurely(&self, task_result: Option<Result<(), MediaTaskError>>) {
202 let _removed_stream = self.stream.lock().expect("mutex").take();
203 let mut lock = self.sender.lock().expect("sender lock");
204 let sender = lock.take();
205 if let (Some(result), Some(sender)) = (task_result, sender) {
206 sender.send(result).expect("send ok");
207 }
208 }
209 }
210
211 impl MediaTask for TestMediaTask {
212 fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>> {
213 self.result.clone().boxed()
214 }
215
216 fn stop(&mut self) -> Result<(), MediaTaskError> {
217 let _ = self.stream.lock().expect("stream lock").take();
218 {
219 let mut lock = self.sender.lock().expect("sender lock");
220 if let Some(sender) = lock.take() {
221 let _ = sender.send(Ok(()));
222 return Ok(());
223 }
224 }
225 self.finished().now_or_never().unwrap()
227 }
228 }
229
230 pub struct TestMediaTaskRunner {
231 pub peer_id: PeerId,
233 pub codec_config: MediaCodecConfig,
235 pub reconfigurable: bool,
237 pub supports_set_delay: bool,
239 pub set_delay: Option<std::time::Duration>,
241 pub sender: mpsc::Sender<TestMediaTask>,
243 }
244
245 impl MediaTaskRunner for TestMediaTaskRunner {
246 fn start(
247 &mut self,
248 stream: MediaStream,
249 _offload: Option<AudioOffloadExtProxy>,
250 ) -> Result<Box<dyn MediaTask>, MediaTaskError> {
251 let task = TestMediaTask::new(
252 self.peer_id.clone(),
253 self.codec_config.clone(),
254 stream,
255 self.set_delay.unwrap_or(Duration::ZERO),
256 );
257 let _ = self.sender.try_send(task.clone());
259 Ok(Box::new(task))
260 }
261
262 fn set_delay(&mut self, delay: std::time::Duration) -> Result<(), MediaTaskError> {
263 if self.supports_set_delay {
264 self.set_delay = Some(delay);
265 Ok(())
266 } else {
267 Err(MediaTaskError::NotSupported)
268 }
269 }
270
271 fn reconfigure(&mut self, config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
272 if self.reconfigurable {
273 self.codec_config = config.clone();
274 Ok(())
275 } else {
276 Err(MediaTaskError::NotSupported)
277 }
278 }
279 }
280
281 pub struct TestMediaTaskBuilder {
285 sender: Mutex<mpsc::Sender<TestMediaTask>>,
286 receiver: mpsc::Receiver<TestMediaTask>,
287 reconfigurable: bool,
288 supports_set_delay: bool,
289 configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
290 direction: EndpointType,
291 }
292
293 impl TestMediaTaskBuilder {
294 pub fn new() -> Self {
295 let (sender, receiver) = mpsc::channel(5);
296 Self {
297 sender: Mutex::new(sender),
298 receiver,
299 reconfigurable: false,
300 supports_set_delay: false,
301 configs: Ok(vec![crate::codec::MediaCodecConfig::min_sbc()]),
302 direction: EndpointType::Sink,
303 }
304 }
305
306 pub fn with_configs(
307 &mut self,
308 configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
309 ) -> &mut Self {
310 self.configs = configs;
311 self
312 }
313
314 pub fn with_direction(&mut self, direction: EndpointType) -> &mut Self {
315 self.direction = direction;
316 self
317 }
318
319 pub fn new_reconfigurable() -> Self {
320 Self { reconfigurable: true, ..Self::new() }
321 }
322
323 pub fn new_delayable() -> Self {
324 Self { supports_set_delay: true, ..Self::new() }
325 }
326
327 pub fn builder(&self) -> Box<dyn MediaTaskBuilder> {
330 Box::new(TestMediaTaskBuilderBuilder {
331 sender: self.sender.lock().expect("locking").clone(),
332 reconfigurable: self.reconfigurable,
333 supports_set_delay: self.supports_set_delay,
334 configs: self.configs.clone(),
335 direction: self.direction,
336 })
337 }
338
339 pub fn next_task(&mut self) -> impl Future<Output = Option<TestMediaTask>> + '_ {
343 self.receiver.next()
344 }
345
346 #[track_caller]
348 pub fn expect_task(&mut self) -> TestMediaTask {
349 self.receiver
350 .try_next()
351 .expect("should have made a task")
352 .expect("shouldn't have dropped all senders")
353 }
354 }
355
356 #[derive(Clone)]
357 struct TestMediaTaskBuilderBuilder {
358 sender: mpsc::Sender<TestMediaTask>,
359 reconfigurable: bool,
360 supports_set_delay: bool,
361 configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
362 direction: EndpointType,
363 }
364
365 impl MediaTaskBuilder for TestMediaTaskBuilderBuilder {
366 fn configure(
367 &self,
368 peer_id: &PeerId,
369 codec_config: &MediaCodecConfig,
370 ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
371 let runner = TestMediaTaskRunner {
372 peer_id: peer_id.clone(),
373 codec_config: codec_config.clone(),
374 sender: self.sender.clone(),
375 reconfigurable: self.reconfigurable,
376 supports_set_delay: self.supports_set_delay,
377 set_delay: None,
378 };
379 Ok::<Box<dyn MediaTaskRunner>, _>(Box::new(runner))
380 }
381
382 fn direction(&self) -> EndpointType {
383 self.direction
384 }
385
386 fn supported_configs(
387 &self,
388 _peer_id: &PeerId,
389 _offload: Option<AudioOffloadExtProxy>,
390 ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
391 futures::future::ready(self.configs.clone()).boxed()
392 }
393 }
394}