sensors_lib/
sensor_manager.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::client::*;
5use crate::playback::*;
6use crate::sensor_update_sender::SensorUpdateSender;
7use crate::service_watcher::*;
8use crate::utils::*;
9use anyhow::{Context as _, Error};
10use fidl::endpoints::RequestStream;
11use fidl_fuchsia_hardware_sensors::{
12    PlaybackSourceConfig, {self as driver_fidl},
13};
14use fidl_fuchsia_sensors::*;
15use fidl_fuchsia_sensors_types::*;
16use fuchsia_component::client as fclient;
17use fuchsia_component::server::ServiceFs;
18use futures::lock::Mutex;
19use futures_util::{StreamExt, TryStreamExt};
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22
23pub type SensorId = i32;
24
25#[derive(Debug, Clone)]
26pub struct SensorManager {
27    pub(crate) sensors: HashMap<SensorId, Sensor>,
28    driver_proxies: Vec<driver_fidl::DriverProxy>,
29    playback: Option<Playback>,
30    clients: HashSet<Client>,
31    pub(crate) update_sender: SensorUpdateSender,
32}
33
34#[derive(Debug, Clone)]
35pub struct Sensor {
36    pub(crate) driver: driver_fidl::DriverProxy,
37    pub(crate) info: SensorInfo,
38    // A subset of SensorManager::clients.
39    pub(crate) clients: HashSet<Client>,
40}
41
42enum IncomingRequest {
43    SensorManager(ManagerRequestStream),
44}
45
46async fn handle_sensors_request(
47    request: ManagerRequest,
48    manager: &Arc<Mutex<SensorManager>>,
49    client: &Client,
50) -> anyhow::Result<()> {
51    let mut manager = manager.lock().await;
52    match request {
53        ManagerRequest::GetSensorsList { responder } => {
54            let _ = responder.send(&manager.get_sensors_list().await);
55        }
56        ManagerRequest::ConfigureSensorRates { id, sensor_rate_config, responder } => {
57            let _ = responder.send(manager.configure_sensor_rates(id, sensor_rate_config).await);
58        }
59        ManagerRequest::Activate { id, responder } => {
60            let _ = responder.send(manager.activate(id, client.clone()).await);
61        }
62        ManagerRequest::Deactivate { id, responder } => {
63            let _ = responder.send(manager.deactivate(id, client).await);
64        }
65        ManagerRequest::ConfigurePlayback { source_config, responder } => {
66            let _ = responder.send(manager.configure_playback(source_config).await);
67        }
68        ManagerRequest::_UnknownMethod { ordinal, .. } => {
69            log::warn!("ManagerRequest::_UnknownMethod with ordinal {}", ordinal);
70        }
71    }
72
73    manager.update_sender.update_sensor_map(manager.sensors.clone()).await;
74
75    Ok(())
76}
77
78async fn handle_sensor_manager_request_stream(
79    mut stream: ManagerRequestStream,
80    manager: Arc<Mutex<SensorManager>>,
81    client: Client,
82) -> Result<(), Error> {
83    while let Some(request) =
84        stream.try_next().await.context("Error handling SensorManager events")?
85    {
86        handle_sensors_request(request, &manager, &client)
87            .await
88            .expect("Error handling sensor request");
89    }
90    Ok(())
91}
92
93impl SensorManager {
94    pub fn new(update_sender: SensorUpdateSender, playback: Option<Playback>) -> Self {
95        let sensors = HashMap::new();
96        let clients = HashSet::new();
97        let driver_proxies = Vec::new();
98
99        Self { sensors, driver_proxies, playback, clients, update_sender }
100    }
101
102    async fn get_sensors_list(&mut self) -> Vec<SensorInfo> {
103        self.populate_sensors().await;
104        let fidl_sensors: Vec<SensorInfo> =
105            self.sensors.values().into_iter().map(|x| x.info.clone()).collect();
106        if fidl_sensors.is_empty() {
107            log::warn!("Failed to get any sensors from driver. Sending empty list");
108        }
109
110        fidl_sensors
111    }
112
113    async fn activate(&mut self, id: SensorId, client: Client) -> Result<(), ActivateSensorError> {
114        if let Some(sensor) = self.sensors.get_mut(&id) {
115            let res = sensor.driver.activate_sensor(id).await;
116            if let Err(e) = res {
117                log::warn!("Error while activating sensor: {:#?}", e);
118                Err(ActivateSensorError::DriverUnavailable)
119            } else {
120                sensor.clients.insert(client);
121                Ok(())
122            }
123        } else {
124            log::warn!("Received request to activate unknown sensor id: {}", id);
125            Err(ActivateSensorError::InvalidSensorId)
126        }
127    }
128
129    async fn deactivate(
130        &mut self,
131        id: SensorId,
132        client: &Client,
133    ) -> Result<(), DeactivateSensorError> {
134        let mut response: Result<(), DeactivateSensorError> = Ok(());
135        if let Some(sensor) = self.sensors.get_mut(&id) {
136            // If this is the last subscriber for this sensor, deactivate it.
137            if sensor.clients.len() == 1 {
138                if let Err(e) = sensor.driver.deactivate_sensor(id).await {
139                    log::warn!("Error while deactivating sensor: {:#?}", e);
140                    response = Err(DeactivateSensorError::DriverUnavailable);
141                }
142            } else {
143                if !sensor.clients.is_empty() {
144                    log::info!(
145                        "Unsubscribing client from sensor {:#?}, but there are other subscribers.",
146                        id,
147                    );
148                }
149            }
150            sensor.clients.remove(&client);
151        } else {
152            log::warn!("Received request to deactivate unknown sensor id: {}", id);
153            response = Err(DeactivateSensorError::InvalidSensorId);
154        }
155
156        response
157    }
158
159    async fn configure_sensor_rates(
160        &mut self,
161        id: SensorId,
162        sensor_rate_config: SensorRateConfig,
163    ) -> Result<(), ConfigureSensorRateError> {
164        if let Some(sensor) = self.sensors.get(&id) {
165            match sensor.driver.configure_sensor_rate(id, &sensor_rate_config).await {
166                Ok(Ok(())) => Ok(()),
167                Ok(Err(driver_fidl::ConfigureSensorRateError::InvalidSensorId)) => {
168                    log::warn!(
169                        "Received ConfigureSensorRates request for unknown sensor id: {}",
170                        id
171                    );
172                    Err(ConfigureSensorRateError::InvalidSensorId)
173                }
174                Ok(Err(driver_fidl::ConfigureSensorRateError::InvalidConfig)) => {
175                    log::warn!(
176                        "Received ConfigureSensorRates request for invalid config: {:#?}",
177                        sensor_rate_config
178                    );
179                    Err(ConfigureSensorRateError::InvalidConfig)
180                }
181                Err(e) => {
182                    log::warn!("Error while configuring sensor rates: {:#?}", e);
183                    Err(ConfigureSensorRateError::DriverUnavailable)
184                }
185                Ok(Err(_)) => unreachable!(),
186            }
187        } else {
188            log::warn!("Received ConfigureSensorRates request for unknown sensor id: {}", id);
189            Err(ConfigureSensorRateError::InvalidSensorId)
190        }
191    }
192
193    async fn configure_playback(
194        &mut self,
195        source_config: PlaybackSourceConfig,
196    ) -> Result<(), ConfigurePlaybackError> {
197        let mut response: Result<(), ConfigurePlaybackError> = Ok(());
198
199        if let Some(mut playback) = self.playback.clone() {
200            let res = playback.playback_proxy.configure_playback(&source_config).await;
201
202            match res {
203                Ok(Ok(())) => {
204                    self.sensors.extend(playback.get_sensors_from_config(source_config).await);
205
206                    // Don't add the playback driver proxy if playback was previously
207                    // configured.
208                    if !self.driver_proxies.iter().any(|x| playback.is_playback_driver_proxy(x)) {
209                        self.driver_proxies.push(playback.driver_proxy.clone());
210                    }
211                    response = Ok(());
212                }
213                Err(e) => {
214                    log::warn!("Error while configuring sensor playback: {:#?}", e);
215                    response = Err(ConfigurePlaybackError::PlaybackUnavailable);
216                }
217                Ok(Err(e)) => {
218                    response = Err(from_driver_playback_error(e));
219                }
220            }
221
222            if !response.is_ok() {
223                // Remove the playback driver proxy and its sensors.
224                self.driver_proxies.retain(|x| !playback.is_playback_driver_proxy(&x));
225                self.sensors.retain(|id, _| !playback.playback_sensor_ids.contains(id));
226                playback.playback_sensor_ids.clear();
227            } else {
228                playback.configured = response.is_ok();
229                self.playback = Some(playback);
230            }
231        }
232
233        response
234    }
235
236    pub(crate) async fn add_instance(&mut self, driver_proxy: driver_fidl::DriverProxy) {
237        let event_stream = driver_proxy.take_event_stream();
238        self.driver_proxies.push(driver_proxy);
239        self.populate_sensors().await;
240        self.update_sender.update_sensor_map(self.sensors.clone()).await;
241        self.update_sender.add_event_stream(event_stream).await;
242    }
243
244    async fn populate_sensors(&mut self) {
245        let mut sensors = HashMap::new();
246        for proxy in self.driver_proxies.clone() {
247            if let Ok(driver_sensors) = proxy.get_sensors_list().await {
248                for sensor in driver_sensors {
249                    if is_sensor_valid(&sensor) {
250                        let id = sensor.sensor_id.expect("sensor_id");
251                        let mut clients: HashSet<Client> = HashSet::new();
252                        if let Some(sensor) = self.sensors.get(&id) {
253                            clients = sensor.clients.clone();
254                        }
255
256                        sensors.insert(id, Sensor { driver: proxy.clone(), info: sensor, clients });
257                    }
258                }
259            }
260        }
261
262        self.sensors = sensors;
263    }
264
265    async fn start_service_watcher(&self, manager: Arc<Mutex<SensorManager>>) -> Result<(), Error> {
266        let svc = fclient::Service::open(driver_fidl::ServiceMarker)?;
267        // Attempt to watch the service directory. If this fails, the manager will check if
268        // playback is configured and exit early if it is not.
269        svc.watch().await?;
270
271        let svc = fclient::Service::open(driver_fidl::ServiceMarker)?;
272        fuchsia_async::Task::local(async move {
273            if let Err(e) = watch_service_directory(svc, manager).await {
274                log::error!("Failed to open sensor service! Error: {:#?}", e);
275            }
276        })
277        .detach();
278
279        Ok(())
280    }
281
282    pub async fn run(&mut self) -> Result<(), Error> {
283        // Collect all the driver event streams into a set of futures that will be polled when the
284        // futures contain a sensor event.
285        if let Some(playback) = &self.playback {
286            self.add_instance(playback.driver_proxy.clone()).await;
287        }
288
289        let manager: Arc<Mutex<SensorManager>> = Arc::new(Mutex::new(self.clone()));
290
291        if let Err(_) = self.start_service_watcher(manager.clone()).await {
292            if self.playback.is_some() {
293                log::warn!("Failed to open sensor driver service directory. Starting with playback sensors only.");
294            } else {
295                return Err(anyhow::anyhow!(
296                    "Failed to open sensors service and sensor playback is not enabled on the system"
297                ));
298            }
299        }
300
301        let mut fs = ServiceFs::new_local();
302        fs.dir("svc").add_fidl_service(IncomingRequest::SensorManager);
303        fs.take_and_serve_directory_handle()?;
304        fs.for_each_concurrent(None, move |request: IncomingRequest| {
305            let manager = manager.clone();
306            async move {
307                match request {
308                    IncomingRequest::SensorManager(stream) => {
309                        let client = Client::new(stream.control_handle());
310                        manager.lock().await.clients.insert(client.clone());
311                        handle_sensor_manager_request_stream(stream, manager, client)
312                            .await
313                            .expect("Failed to serve sensor requests");
314                    }
315                }
316            }
317        })
318        .await;
319
320        Err(anyhow::anyhow!("SensorManager completed unexpectedly."))
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use fidl::endpoints::*;
328    use fidl_fuchsia_hardware_sensors::*;
329    use futures::channel::mpsc;
330
331    #[fuchsia::test]
332    async fn test_invalid_configure_playback() {
333        // Creates an invalid playback_proxy so that ConfigurePlayback gets PEER_CLOSED when trying
334        // to make a request.
335        let (playback_proxy, _) = create_proxy::<PlaybackMarker>();
336
337        let (driver_proxy, _) = create_proxy::<DriverMarker>();
338        let (sender, _receiver) = mpsc::channel(100);
339        let sender = SensorUpdateSender::new(Arc::new(Mutex::new(sender)));
340        let sm = SensorManager::new(sender, Some(Playback::new(driver_proxy, playback_proxy)));
341
342        let manager = Arc::new(Mutex::new(sm));
343        let (proxy, stream) = create_proxy_and_stream::<ManagerMarker>();
344        let client = Client::new(stream.control_handle().clone());
345        fuchsia_async::Task::spawn(async move {
346            manager.lock().await.clients.insert(client.clone());
347            handle_sensor_manager_request_stream(stream, manager, client)
348                .await
349                .expect("Failed to process request stream");
350        })
351        .detach();
352
353        let res = proxy
354            .configure_playback(&PlaybackSourceConfig::FixedValuesConfig(
355                FixedValuesPlaybackConfig {
356                    sensor_list: None,
357                    sensor_events: None,
358                    ..Default::default()
359                },
360            ))
361            .await;
362        assert_eq!(
363            res.unwrap(),
364            Err(fidl_fuchsia_sensors::ConfigurePlaybackError::PlaybackUnavailable)
365        );
366    }
367}