1use super::AudioInfoLoader;
6use super::audio_fidl_handler::{Publisher, Publisher2};
7use super::types::AudioError;
8use crate::audio::types::{
9 AUDIO_STREAM_TYPE_COUNT, AudioInfo, AudioStream, AudioStreamType, SetAudioStream,
10};
11use crate::audio::{ModifiedCounters, StreamVolumeControl, create_default_modified_counters};
12use crate::{trace, trace_guard};
13use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
14
15use futures::StreamExt;
16use futures::channel::oneshot::Sender;
17use settings_common::inspect::event::{ExternalEventPublisher, SettingValuePublisher};
18use settings_common::service_context::ServiceContext;
19use settings_storage::device_storage::{DeviceStorage, DeviceStorageCompatible};
20use settings_storage::storage_factory::{DefaultLoader, StorageAccess, StorageFactory};
21use std::collections::HashMap;
22use std::rc::Rc;
23use {fuchsia_async as fasync, fuchsia_trace as ftrace};
24
25pub enum Request {
26 Get(ftrace::Id, Sender<AudioInfo>),
27 Listen(UnboundedSender<AudioInfo>),
28 Set(Vec<SetAudioStream>, ftrace::Id, Sender<Result<(), AudioError>>),
29}
30
31struct Restart;
32
33impl StorageAccess for AudioController {
34 type Storage = DeviceStorage;
35 type Data = AudioInfo;
36 const STORAGE_KEY: &'static str = AudioInfo::KEY;
37}
38
39pub(crate) struct AudioController {
40 service_context: Rc<ServiceContext>,
41 store: Rc<DeviceStorage>,
42 audio_service_connected: bool,
43 stream_volume_controls: HashMap<AudioStreamType, StreamVolumeControl>,
44 modified_counters: ModifiedCounters,
45 audio_info_loader: AudioInfoLoader,
46 publisher: Option<Publisher>,
47 publisher2: Option<Publisher2>,
48 listeners: Vec<UnboundedSender<AudioInfo>>,
49 setting_value_publisher: SettingValuePublisher<AudioInfo>,
50 external_publisher: ExternalEventPublisher,
51 restart_tx: UnboundedSender<Restart>,
52 restart_rx: Option<UnboundedReceiver<Restart>>,
53}
54
55impl AudioController {
56 pub(crate) async fn new<F>(
57 service_context: Rc<ServiceContext>,
58 audio_info_loader: AudioInfoLoader,
59 storage_factory: Rc<F>,
60 setting_value_publisher: SettingValuePublisher<AudioInfo>,
61 external_publisher: ExternalEventPublisher,
62 ) -> AudioController
63 where
64 F: StorageFactory<Storage = DeviceStorage>,
65 {
66 let store = storage_factory.get_store().await;
67 let (restart_tx, restart_rx) = mpsc::unbounded();
68 Self {
69 service_context,
70 store,
71 stream_volume_controls: HashMap::new(),
72 audio_service_connected: false,
73 modified_counters: create_default_modified_counters(),
74 audio_info_loader,
75 publisher: None,
76 publisher2: None,
77 listeners: vec![],
78 setting_value_publisher,
79 external_publisher,
80 restart_tx,
81 restart_rx: Some(restart_rx),
82 }
83 }
84
85 pub(crate) async fn restore(&mut self) -> AudioInfo {
88 let id = ftrace::Id::new();
89 trace!(id, c"restore");
90 self.restore_volume_state(id, true).await
91 }
92
93 pub(crate) async fn restore_volume_state(
96 &mut self,
97 id: ftrace::Id,
98 push_to_audio_core: bool,
99 ) -> AudioInfo {
100 let audio_info = self.store.get::<AudioInfo>().await;
101
102 trace!(id, c"update volume streams from info");
103 let new_streams = audio_info.streams.iter();
104 let _guard = trace_guard!(id, c"check and bind");
105 if let Err(e) = self.update_streams(push_to_audio_core, new_streams, id).await {
106 log::error!("Failed to update streams: {e:?}");
107 }
108 audio_info
109 }
110
111 pub(crate) async fn get_info(&self) -> AudioInfo {
112 let mut info = self.store.get::<AudioInfo>().await;
113 info.modified_counters = Some(self.modified_counters.clone());
114 info
115 }
116
117 pub(crate) fn register_publishers(&mut self, publisher: Publisher, publisher2: Publisher2) {
118 self.publisher = Some(publisher);
119 self.publisher2 = Some(publisher2);
120 }
121
122 fn register_listener(&mut self, tx: UnboundedSender<AudioInfo>) {
123 self.listeners.push(tx);
124 }
125
126 fn publish(&self, new_info: AudioInfo) {
127 let _ = self.setting_value_publisher.publish(&new_info);
128 for listener in &self.listeners {
130 let _ = listener.unbounded_send(new_info.clone());
131 }
132 if let Some(publisher) = self.publisher.as_ref() {
134 publisher.update(|info| {
135 let info = info.as_mut().unwrap();
137 let mut old_streams = info.streams.iter();
138 let new_streams = new_info.streams.iter();
139 for new_stream in new_streams {
140 let old_stream = old_streams
141 .find(|stream| stream.stream_type == new_stream.stream_type)
142 .expect("stream type should be found in existing streams");
143 if (old_stream != new_stream) && new_stream.stream_type.is_legacy() {
145 *info = new_info.clone();
146 return true;
147 }
148 }
149 false
150 });
151 }
152 if let Some(publisher2) = self.publisher2.as_ref() {
153 publisher2.update(|info| {
154 let info = info.as_mut().unwrap();
156 let mut old_streams = info.streams.iter();
157 let new_streams = new_info.streams.iter();
158 for new_stream in new_streams {
159 let old_stream = old_streams
160 .find(|stream| stream.stream_type == new_stream.stream_type)
161 .expect("stream type should be found in existing streams");
162 if old_stream != new_stream {
164 *info = new_info.clone();
165 return true;
166 }
167 }
168 false
169 });
170 }
171 }
172
173 async fn set_volume(
174 &mut self,
175 volume: Vec<SetAudioStream>,
176 id: ftrace::Id,
177 ) -> Result<AudioInfo, AudioError> {
178 let guard = trace_guard!(id, c"set volume updating counters");
179 for stream in &volume {
181 let counter = self.modified_counters.entry(stream.stream_type).or_insert(0);
184 *counter = counter.wrapping_add(1);
185 }
186 drop(guard);
187
188 self.update_volume_streams_from_new_streams(volume, true, id).await
189 }
190
191 async fn get_streams_array_from_map(
192 &self,
193 stream_map: &HashMap<AudioStreamType, StreamVolumeControl>,
194 ) -> [AudioStream; AUDIO_STREAM_TYPE_COUNT] {
195 let mut streams: [AudioStream; AUDIO_STREAM_TYPE_COUNT] =
196 self.audio_info_loader.default_value().streams;
197 for stream in &mut streams {
198 if let Some(volume_control) = stream_map.get(&stream.stream_type) {
199 *stream = volume_control.stored_stream;
200 }
201 }
202
203 streams
204 }
205
206 async fn update_streams(
207 &mut self,
208 push_to_audio_core: bool,
209 new_streams: impl Iterator<Item = &AudioStream>,
210 id: ftrace::Id,
211 ) -> Result<(), AudioError> {
212 if push_to_audio_core {
213 let guard = trace_guard!(id, c"push to core");
214 self.check_and_bind_volume_controls(
215 id,
216 self.audio_info_loader.default_value().streams.iter(),
217 )
218 .await?;
219 drop(guard);
220
221 trace!(id, c"setting core");
222 for stream in new_streams {
223 if let Some(volume_control) =
224 self.stream_volume_controls.get_mut(&stream.stream_type)
225 {
226 let _ = volume_control.set_volume(id, *stream).await?;
227 }
228 }
229 } else {
230 trace!(id, c"without push to core");
231 self.check_and_bind_volume_controls(id, new_streams).await?;
232 }
233
234 Ok(())
235 }
236
237 async fn update_volume_streams_from_new_streams(
238 &mut self,
239 streams: Vec<SetAudioStream>,
240 push_to_audio_core: bool,
241 id: ftrace::Id,
242 ) -> Result<AudioInfo, AudioError> {
243 let mut new_vec = vec![];
244 trace!(id, c"update volume streams from new streams");
245 let calculating_guard = trace_guard!(id, c"check and bind");
246 trace!(id, c"reading setting");
247 let mut stored_value = self.store.get::<AudioInfo>().await;
248 for set_stream in streams.iter() {
249 let stored_stream = stored_value
250 .streams
251 .iter()
252 .find(|stream| stream.stream_type == set_stream.stream_type)
253 .ok_or_else(|| AudioError::InvalidArgument("stream", format!("{set_stream:?}")))?;
254 new_vec.push(AudioStream {
255 stream_type: stored_stream.stream_type,
256 source: set_stream.source,
257 user_volume_level: set_stream
258 .user_volume_level
259 .unwrap_or(stored_stream.user_volume_level),
260 user_volume_muted: set_stream
261 .user_volume_muted
262 .unwrap_or(stored_stream.user_volume_muted),
263 });
264 }
265 let new_streams = new_vec.iter();
266
267 self.update_streams(push_to_audio_core, new_streams, id).await?;
268 drop(calculating_guard);
269
270 let guard = trace_guard!(id, c"updating streams and counters");
271 stored_value.streams = self.get_streams_array_from_map(&self.stream_volume_controls).await;
272 stored_value.modified_counters = Some(self.modified_counters.clone());
273 drop(guard);
274
275 let guard = trace_guard!(id, c"writing setting");
276 let write_result = self.store.write(&stored_value).await;
277 drop(guard);
278 write_result.map(|_| stored_value).map_err(AudioError::WriteFailure)
280 }
281
282 async fn check_and_bind_volume_controls(
284 &mut self,
285 id: ftrace::Id,
286 streams: impl Iterator<Item = &AudioStream>,
287 ) -> Result<(), AudioError> {
288 trace!(id, c"check and bind fn");
289 if self.audio_service_connected {
290 return Ok(());
291 }
292
293 let guard = trace_guard!(id, c"connecting to service");
294 let service_result = self
295 .service_context
296 .connect_with_publisher::<fidl_fuchsia_media::AudioCoreMarker, _>(
297 self.external_publisher.clone(),
298 )
299 .await;
300
301 let audio_service = service_result.map_err(|e| {
302 AudioError::ExternalFailure(
303 "fuchsia.media.audio",
304 "connect for audio_core".into(),
305 format!("{e:?}"),
306 )
307 })?;
308
309 drop(guard);
313 let mut stream_tuples = Vec::new();
314 for stream in streams {
315 trace!(id, c"create stream volume control");
316 let restart_tx = self.restart_tx.clone();
317
318 stream_tuples.push((
320 stream.stream_type,
321 StreamVolumeControl::create(
322 id,
323 audio_service.clone(),
324 *stream,
325 Some(Rc::new(move || {
326 if let Err(e) = restart_tx.unbounded_send(Restart) {
327 log::error!("Failed to send restart signal: {e:?}");
328 }
329 })),
330 #[cfg(test)]
331 None,
332 )
333 .await?,
334 ));
335 }
336
337 stream_tuples.into_iter().for_each(|(stream_type, stream_volume_control)| {
338 let _ = self.stream_volume_controls.insert(stream_type, stream_volume_control);
340 });
341 self.audio_service_connected = true;
342
343 Ok(())
344 }
345
346 pub(crate) async fn handle(
347 mut self,
348 mut request_rx: UnboundedReceiver<Request>,
349 ) -> fasync::Task<()> {
350 let mut restart_rx: UnboundedReceiver<Restart> = self.restart_rx.take().unwrap();
351 fasync::Task::local(async move {
352 let mut next_request = request_rx.next();
353 let mut next_restart = restart_rx.next();
354 loop {
355 futures::select! {
356 request = next_request => {
357 if let Some(request) = request {
358 self.handle_request(request).await;
359 next_request = request_rx.next();
360 }
361 }
362 restart = next_restart => {
363 if let Some(_) = restart {
364 self.handle_restart().await;
365 next_restart = restart_rx.next();
366 }
367 }
368 }
369 }
370 })
371 }
372
373 async fn handle_request(&mut self, request: Request) {
374 match request {
375 Request::Get(id, tx) => {
376 trace!(id, c"controller get");
377 let res = self.get_info().await;
378 let _ = tx.send(res);
379 }
380 Request::Listen(tx) => {
381 self.register_listener(tx);
382 }
383 Request::Set(streams, id, tx) => {
384 trace!(id, c"controller set");
385 for audio_stream in &streams {
387 if !audio_stream.has_valid_volume_level() {
388 let _ = tx.send(Err(AudioError::InvalidArgument(
389 "stream",
390 format!("{audio_stream:?}"),
391 )));
392 return;
393 }
394 }
395 let res = self.set_volume(streams, id).await.map(|mut info| {
396 info.modified_counters = Some(self.modified_counters.clone());
397 self.publish(info)
398 });
399 let _ = tx.send(res);
400 }
401 }
402 }
403
404 async fn handle_restart(&mut self) {
405 let id = ftrace::Id::new();
406 trace!(id, c"restart");
407 self.audio_service_connected = false;
408 self.stream_volume_controls.clear();
409 let _ = self.restore_volume_state(id, false).await;
410 }
411}