settings/audio/
audio_controller.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::AudioInfoLoader;
6use super::audio_fidl_handler::{Publisher, Publisher2};
7use super::types::AudioError;
8use crate::audio::types::{
9    AUDIO_STREAM_TYPE_COUNT, AudioInfo, AudioStream, AudioStreamType, SetAudioStream,
10};
11use crate::audio::{ModifiedCounters, StreamVolumeControl, create_default_modified_counters};
12use crate::{trace, trace_guard};
13use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
14
15use futures::StreamExt;
16use futures::channel::oneshot::Sender;
17use settings_common::inspect::event::{ExternalEventPublisher, SettingValuePublisher};
18use settings_common::service_context::ServiceContext;
19use settings_storage::device_storage::{DeviceStorage, DeviceStorageCompatible};
20use settings_storage::storage_factory::{DefaultLoader, StorageAccess, StorageFactory};
21use std::collections::HashMap;
22use std::rc::Rc;
23use {fuchsia_async as fasync, fuchsia_trace as ftrace};
24
25pub enum Request {
26    Get(ftrace::Id, Sender<AudioInfo>),
27    Listen(UnboundedSender<AudioInfo>),
28    Set(Vec<SetAudioStream>, ftrace::Id, Sender<Result<(), AudioError>>),
29}
30
31struct Restart;
32
33impl StorageAccess for AudioController {
34    type Storage = DeviceStorage;
35    type Data = AudioInfo;
36    const STORAGE_KEY: &'static str = AudioInfo::KEY;
37}
38
39pub(crate) struct AudioController {
40    service_context: Rc<ServiceContext>,
41    store: Rc<DeviceStorage>,
42    audio_service_connected: bool,
43    stream_volume_controls: HashMap<AudioStreamType, StreamVolumeControl>,
44    modified_counters: ModifiedCounters,
45    audio_info_loader: AudioInfoLoader,
46    publisher: Option<Publisher>,
47    publisher2: Option<Publisher2>,
48    listeners: Vec<UnboundedSender<AudioInfo>>,
49    setting_value_publisher: SettingValuePublisher<AudioInfo>,
50    external_publisher: ExternalEventPublisher,
51    restart_tx: UnboundedSender<Restart>,
52    restart_rx: Option<UnboundedReceiver<Restart>>,
53}
54
55impl AudioController {
56    pub(crate) async fn new<F>(
57        service_context: Rc<ServiceContext>,
58        audio_info_loader: AudioInfoLoader,
59        storage_factory: Rc<F>,
60        setting_value_publisher: SettingValuePublisher<AudioInfo>,
61        external_publisher: ExternalEventPublisher,
62    ) -> AudioController
63    where
64        F: StorageFactory<Storage = DeviceStorage>,
65    {
66        let store = storage_factory.get_store().await;
67        let (restart_tx, restart_rx) = mpsc::unbounded();
68        Self {
69            service_context,
70            store,
71            stream_volume_controls: HashMap::new(),
72            audio_service_connected: false,
73            modified_counters: create_default_modified_counters(),
74            audio_info_loader,
75            publisher: None,
76            publisher2: None,
77            listeners: vec![],
78            setting_value_publisher,
79            external_publisher,
80            restart_tx,
81            restart_rx: Some(restart_rx),
82        }
83    }
84
85    /// Restores the necessary dependencies' state on boot. Extracts the audio state from
86    /// persistent storage and restores it on the local state.
87    pub(crate) async fn restore(&mut self) -> AudioInfo {
88        let id = ftrace::Id::new();
89        trace!(id, c"restore");
90        self.restore_volume_state(id, true).await
91    }
92
93    /// Restores the necessary dependencies' state on boot. Extracts the audio state from
94    /// persistent storage and restores it on the local state.
95    pub(crate) async fn restore_volume_state(
96        &mut self,
97        id: ftrace::Id,
98        push_to_audio_core: bool,
99    ) -> AudioInfo {
100        let audio_info = self.store.get::<AudioInfo>().await;
101
102        trace!(id, c"update volume streams from info");
103        let new_streams = audio_info.streams.iter();
104        let _guard = trace_guard!(id, c"check and bind");
105        if let Err(e) = self.update_streams(push_to_audio_core, new_streams, id).await {
106            log::error!("Failed to update streams: {e:?}");
107        }
108        audio_info
109    }
110
111    pub(crate) async fn get_info(&self) -> AudioInfo {
112        let mut info = self.store.get::<AudioInfo>().await;
113        info.modified_counters = Some(self.modified_counters.clone());
114        info
115    }
116
117    pub(crate) fn register_publishers(&mut self, publisher: Publisher, publisher2: Publisher2) {
118        self.publisher = Some(publisher);
119        self.publisher2 = Some(publisher2);
120    }
121
122    fn register_listener(&mut self, tx: UnboundedSender<AudioInfo>) {
123        self.listeners.push(tx);
124    }
125
126    fn publish(&self, new_info: AudioInfo) {
127        let _ = self.setting_value_publisher.publish(&new_info);
128        // Listeners always get updated.
129        for listener in &self.listeners {
130            let _ = listener.unbounded_send(new_info.clone());
131        }
132        // Watch subscribers only receive updates to streams.
133        if let Some(publisher) = self.publisher.as_ref() {
134            publisher.update(|info| {
135                // Unwrap ok because info is always initialized.
136                let info = info.as_mut().unwrap();
137                let mut old_streams = info.streams.iter();
138                let new_streams = new_info.streams.iter();
139                for new_stream in new_streams {
140                    let old_stream = old_streams
141                        .find(|stream| stream.stream_type == new_stream.stream_type)
142                        .expect("stream type should be found in existing streams");
143                    // Watch() notifies upon changes to "legacy" stream types.
144                    if (old_stream != new_stream) && new_stream.stream_type.is_legacy() {
145                        *info = new_info.clone();
146                        return true;
147                    }
148                }
149                false
150            });
151        }
152        if let Some(publisher2) = self.publisher2.as_ref() {
153            publisher2.update(|info| {
154                // Unwrap ok because info is always initialized.
155                let info = info.as_mut().unwrap();
156                let mut old_streams = info.streams.iter();
157                let new_streams = new_info.streams.iter();
158                for new_stream in new_streams {
159                    let old_stream = old_streams
160                        .find(|stream| stream.stream_type == new_stream.stream_type)
161                        .expect("stream type should be found in existing streams");
162                    // Watch2() notifies upon changes to any stream type.
163                    if old_stream != new_stream {
164                        *info = new_info.clone();
165                        return true;
166                    }
167                }
168                false
169            });
170        }
171    }
172
173    async fn set_volume(
174        &mut self,
175        volume: Vec<SetAudioStream>,
176        id: ftrace::Id,
177    ) -> Result<AudioInfo, AudioError> {
178        let guard = trace_guard!(id, c"set volume updating counters");
179        // Update counters for changed streams.
180        for stream in &volume {
181            // We don't care what the value of the counter is, just that it is different from the
182            // previous value. We use wrapping_add to avoid eventual overflow of the counter.
183            let counter = self.modified_counters.entry(stream.stream_type).or_insert(0);
184            *counter = counter.wrapping_add(1);
185        }
186        drop(guard);
187
188        self.update_volume_streams_from_new_streams(volume, true, id).await
189    }
190
191    async fn get_streams_array_from_map(
192        &self,
193        stream_map: &HashMap<AudioStreamType, StreamVolumeControl>,
194    ) -> [AudioStream; AUDIO_STREAM_TYPE_COUNT] {
195        let mut streams: [AudioStream; AUDIO_STREAM_TYPE_COUNT] =
196            self.audio_info_loader.default_value().streams;
197        for stream in &mut streams {
198            if let Some(volume_control) = stream_map.get(&stream.stream_type) {
199                *stream = volume_control.stored_stream;
200            }
201        }
202
203        streams
204    }
205
206    async fn update_streams(
207        &mut self,
208        push_to_audio_core: bool,
209        new_streams: impl Iterator<Item = &AudioStream>,
210        id: ftrace::Id,
211    ) -> Result<(), AudioError> {
212        if push_to_audio_core {
213            let guard = trace_guard!(id, c"push to core");
214            self.check_and_bind_volume_controls(
215                id,
216                self.audio_info_loader.default_value().streams.iter(),
217            )
218            .await?;
219            drop(guard);
220
221            trace!(id, c"setting core");
222            for stream in new_streams {
223                if let Some(volume_control) =
224                    self.stream_volume_controls.get_mut(&stream.stream_type)
225                {
226                    let _ = volume_control.set_volume(id, *stream).await?;
227                }
228            }
229        } else {
230            trace!(id, c"without push to core");
231            self.check_and_bind_volume_controls(id, new_streams).await?;
232        }
233
234        Ok(())
235    }
236
237    async fn update_volume_streams_from_new_streams(
238        &mut self,
239        streams: Vec<SetAudioStream>,
240        push_to_audio_core: bool,
241        id: ftrace::Id,
242    ) -> Result<AudioInfo, AudioError> {
243        let mut new_vec = vec![];
244        trace!(id, c"update volume streams from new streams");
245        let calculating_guard = trace_guard!(id, c"check and bind");
246        trace!(id, c"reading setting");
247        let mut stored_value = self.store.get::<AudioInfo>().await;
248        for set_stream in streams.iter() {
249            let stored_stream = stored_value
250                .streams
251                .iter()
252                .find(|stream| stream.stream_type == set_stream.stream_type)
253                .ok_or_else(|| AudioError::InvalidArgument("stream", format!("{set_stream:?}")))?;
254            new_vec.push(AudioStream {
255                stream_type: stored_stream.stream_type,
256                source: set_stream.source,
257                user_volume_level: set_stream
258                    .user_volume_level
259                    .unwrap_or(stored_stream.user_volume_level),
260                user_volume_muted: set_stream
261                    .user_volume_muted
262                    .unwrap_or(stored_stream.user_volume_muted),
263            });
264        }
265        let new_streams = new_vec.iter();
266
267        self.update_streams(push_to_audio_core, new_streams, id).await?;
268        drop(calculating_guard);
269
270        let guard = trace_guard!(id, c"updating streams and counters");
271        stored_value.streams = self.get_streams_array_from_map(&self.stream_volume_controls).await;
272        stored_value.modified_counters = Some(self.modified_counters.clone());
273        drop(guard);
274
275        let guard = trace_guard!(id, c"writing setting");
276        let write_result = self.store.write(&stored_value).await;
277        drop(guard);
278        // Always return the stored value
279        write_result.map(|_| stored_value).map_err(AudioError::WriteFailure)
280    }
281
282    /// Populates the local state with the given `streams` and binds it to the audio core service.
283    async fn check_and_bind_volume_controls(
284        &mut self,
285        id: ftrace::Id,
286        streams: impl Iterator<Item = &AudioStream>,
287    ) -> Result<(), AudioError> {
288        trace!(id, c"check and bind fn");
289        if self.audio_service_connected {
290            return Ok(());
291        }
292
293        let guard = trace_guard!(id, c"connecting to service");
294        let service_result = self
295            .service_context
296            .connect_with_publisher::<fidl_fuchsia_media::AudioCoreMarker, _>(
297                self.external_publisher.clone(),
298            )
299            .await;
300
301        let audio_service = service_result.map_err(|e| {
302            AudioError::ExternalFailure(
303                "fuchsia.media.audio",
304                "connect for audio_core".into(),
305                format!("{e:?}"),
306            )
307        })?;
308
309        // The stream_volume_controls are generated in two steps instead of
310        // one so that if one of the bindings fails during the first loop,
311        // none of the streams are modified.
312        drop(guard);
313        let mut stream_tuples = Vec::new();
314        for stream in streams {
315            trace!(id, c"create stream volume control");
316            let restart_tx = self.restart_tx.clone();
317
318            // Generate a tuple with stream type and StreamVolumeControl.
319            stream_tuples.push((
320                stream.stream_type,
321                StreamVolumeControl::create(
322                    id,
323                    audio_service.clone(),
324                    *stream,
325                    Some(Rc::new(move || {
326                        if let Err(e) = restart_tx.unbounded_send(Restart) {
327                            log::error!("Failed to send restart signal: {e:?}");
328                        }
329                    })),
330                    #[cfg(test)]
331                    None,
332                )
333                .await?,
334            ));
335        }
336
337        stream_tuples.into_iter().for_each(|(stream_type, stream_volume_control)| {
338            // Ignore the previous value, if any.
339            let _ = self.stream_volume_controls.insert(stream_type, stream_volume_control);
340        });
341        self.audio_service_connected = true;
342
343        Ok(())
344    }
345
346    pub(crate) async fn handle(
347        mut self,
348        mut request_rx: UnboundedReceiver<Request>,
349    ) -> fasync::Task<()> {
350        let mut restart_rx: UnboundedReceiver<Restart> = self.restart_rx.take().unwrap();
351        fasync::Task::local(async move {
352            let mut next_request = request_rx.next();
353            let mut next_restart = restart_rx.next();
354            loop {
355                futures::select! {
356                    request = next_request => {
357                        if let Some(request) = request {
358                            self.handle_request(request).await;
359                            next_request = request_rx.next();
360                        }
361                    }
362                    restart = next_restart => {
363                        if let Some(_) = restart {
364                            self.handle_restart().await;
365                            next_restart = restart_rx.next();
366                        }
367                    }
368                }
369            }
370        })
371    }
372
373    async fn handle_request(&mut self, request: Request) {
374        match request {
375            Request::Get(id, tx) => {
376                trace!(id, c"controller get");
377                let res = self.get_info().await;
378                let _ = tx.send(res);
379            }
380            Request::Listen(tx) => {
381                self.register_listener(tx);
382            }
383            Request::Set(streams, id, tx) => {
384                trace!(id, c"controller set");
385                // Validate volume contains valid volume level numbers.
386                for audio_stream in &streams {
387                    if !audio_stream.has_valid_volume_level() {
388                        let _ = tx.send(Err(AudioError::InvalidArgument(
389                            "stream",
390                            format!("{audio_stream:?}"),
391                        )));
392                        return;
393                    }
394                }
395                let res = self.set_volume(streams, id).await.map(|mut info| {
396                    info.modified_counters = Some(self.modified_counters.clone());
397                    self.publish(info)
398                });
399                let _ = tx.send(res);
400            }
401        }
402    }
403
404    async fn handle_restart(&mut self) {
405        let id = ftrace::Id::new();
406        trace!(id, c"restart");
407        self.audio_service_connected = false;
408        self.stream_volume_controls.clear();
409        let _ = self.restore_volume_state(id, false).await;
410    }
411}