1use crate::client::*;
5use crate::playback::*;
6use crate::sensor_update_sender::SensorUpdateSender;
7use crate::service_watcher::*;
8use crate::utils::*;
9use anyhow::{Context as _, Error};
10use fidl::endpoints::RequestStream;
11use fidl_fuchsia_hardware_sensors::{
12 PlaybackSourceConfig, {self as driver_fidl},
13};
14use fidl_fuchsia_sensors::*;
15use fidl_fuchsia_sensors_types::*;
16use fuchsia_component::client as fclient;
17use fuchsia_component::server::ServiceFs;
18use futures::lock::Mutex;
19use futures_util::{StreamExt, TryStreamExt};
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22
23pub type SensorId = i32;
24
25#[derive(Debug, Clone)]
26pub struct SensorManager {
27 pub(crate) sensors: HashMap<SensorId, Sensor>,
28 driver_proxies: Vec<driver_fidl::DriverProxy>,
29 playback: Option<Playback>,
30 clients: HashSet<Client>,
31 pub(crate) update_sender: SensorUpdateSender,
32}
33
34#[derive(Debug, Clone)]
35pub struct Sensor {
36 pub(crate) driver: driver_fidl::DriverProxy,
37 pub(crate) info: SensorInfo,
38 pub(crate) clients: HashSet<Client>,
40}
41
42enum IncomingRequest {
43 SensorManager(ManagerRequestStream),
44}
45
46async fn handle_sensors_request(
47 request: ManagerRequest,
48 manager: &Arc<Mutex<SensorManager>>,
49 client: &Client,
50) -> anyhow::Result<()> {
51 let mut manager = manager.lock().await;
52 match request {
53 ManagerRequest::GetSensorsList { responder } => {
54 let _ = responder.send(&manager.get_sensors_list().await);
55 }
56 ManagerRequest::ConfigureSensorRates { id, sensor_rate_config, responder } => {
57 let _ = responder.send(manager.configure_sensor_rates(id, sensor_rate_config).await);
58 }
59 ManagerRequest::Activate { id, responder } => {
60 let _ = responder.send(manager.activate(id, client.clone()).await);
61 }
62 ManagerRequest::Deactivate { id, responder } => {
63 let _ = responder.send(manager.deactivate(id, client).await);
64 }
65 ManagerRequest::ConfigurePlayback { source_config, responder } => {
66 let _ = responder.send(manager.configure_playback(source_config).await);
67 }
68 ManagerRequest::_UnknownMethod { ordinal, .. } => {
69 log::warn!("ManagerRequest::_UnknownMethod with ordinal {}", ordinal);
70 }
71 }
72
73 manager.update_sender.update_sensor_map(manager.sensors.clone()).await;
74
75 Ok(())
76}
77
78async fn handle_sensor_manager_request_stream(
79 mut stream: ManagerRequestStream,
80 manager: Arc<Mutex<SensorManager>>,
81 client: Client,
82) -> Result<(), Error> {
83 while let Some(request) =
84 stream.try_next().await.context("Error handling SensorManager events")?
85 {
86 handle_sensors_request(request, &manager, &client)
87 .await
88 .expect("Error handling sensor request");
89 }
90 Ok(())
91}
92
93impl SensorManager {
94 pub fn new(update_sender: SensorUpdateSender, playback: Option<Playback>) -> Self {
95 let sensors = HashMap::new();
96 let clients = HashSet::new();
97 let driver_proxies = Vec::new();
98
99 Self { sensors, driver_proxies, playback, clients, update_sender }
100 }
101
102 async fn get_sensors_list(&mut self) -> Vec<SensorInfo> {
103 self.populate_sensors().await;
104 let fidl_sensors: Vec<SensorInfo> =
105 self.sensors.values().into_iter().map(|x| x.info.clone()).collect();
106 if fidl_sensors.is_empty() {
107 log::warn!("Failed to get any sensors from driver. Sending empty list");
108 }
109
110 fidl_sensors
111 }
112
113 async fn activate(&mut self, id: SensorId, client: Client) -> Result<(), ActivateSensorError> {
114 if let Some(sensor) = self.sensors.get_mut(&id) {
115 let res = sensor.driver.activate_sensor(id).await;
116 if let Err(e) = res {
117 log::warn!("Error while activating sensor: {:#?}", e);
118 Err(ActivateSensorError::DriverUnavailable)
119 } else {
120 sensor.clients.insert(client);
121 Ok(())
122 }
123 } else {
124 log::warn!("Received request to activate unknown sensor id: {}", id);
125 Err(ActivateSensorError::InvalidSensorId)
126 }
127 }
128
129 async fn deactivate(
130 &mut self,
131 id: SensorId,
132 client: &Client,
133 ) -> Result<(), DeactivateSensorError> {
134 let mut response: Result<(), DeactivateSensorError> = Ok(());
135 if let Some(sensor) = self.sensors.get_mut(&id) {
136 if sensor.clients.len() == 1 {
138 if let Err(e) = sensor.driver.deactivate_sensor(id).await {
139 log::warn!("Error while deactivating sensor: {:#?}", e);
140 response = Err(DeactivateSensorError::DriverUnavailable);
141 }
142 } else {
143 if !sensor.clients.is_empty() {
144 log::info!(
145 "Unsubscribing client from sensor {:#?}, but there are other subscribers.",
146 id,
147 );
148 }
149 }
150 sensor.clients.remove(&client);
151 } else {
152 log::warn!("Received request to deactivate unknown sensor id: {}", id);
153 response = Err(DeactivateSensorError::InvalidSensorId);
154 }
155
156 response
157 }
158
159 async fn configure_sensor_rates(
160 &mut self,
161 id: SensorId,
162 sensor_rate_config: SensorRateConfig,
163 ) -> Result<(), ConfigureSensorRateError> {
164 if let Some(sensor) = self.sensors.get(&id) {
165 match sensor.driver.configure_sensor_rate(id, &sensor_rate_config).await {
166 Ok(Ok(())) => Ok(()),
167 Ok(Err(driver_fidl::ConfigureSensorRateError::InvalidSensorId)) => {
168 log::warn!(
169 "Received ConfigureSensorRates request for unknown sensor id: {}",
170 id
171 );
172 Err(ConfigureSensorRateError::InvalidSensorId)
173 }
174 Ok(Err(driver_fidl::ConfigureSensorRateError::InvalidConfig)) => {
175 log::warn!(
176 "Received ConfigureSensorRates request for invalid config: {:#?}",
177 sensor_rate_config
178 );
179 Err(ConfigureSensorRateError::InvalidConfig)
180 }
181 Err(e) => {
182 log::warn!("Error while configuring sensor rates: {:#?}", e);
183 Err(ConfigureSensorRateError::DriverUnavailable)
184 }
185 Ok(Err(_)) => unreachable!(),
186 }
187 } else {
188 log::warn!("Received ConfigureSensorRates request for unknown sensor id: {}", id);
189 Err(ConfigureSensorRateError::InvalidSensorId)
190 }
191 }
192
193 async fn configure_playback(
194 &mut self,
195 source_config: PlaybackSourceConfig,
196 ) -> Result<(), ConfigurePlaybackError> {
197 let mut response: Result<(), ConfigurePlaybackError> = Ok(());
198
199 if let Some(mut playback) = self.playback.clone() {
200 let res = playback.playback_proxy.configure_playback(&source_config).await;
201
202 match res {
203 Ok(Ok(())) => {
204 self.sensors.extend(playback.get_sensors_from_config(source_config).await);
205
206 if !self.driver_proxies.iter().any(|x| playback.is_playback_driver_proxy(x)) {
209 self.driver_proxies.push(playback.driver_proxy.clone());
210 }
211 response = Ok(());
212 }
213 Err(e) => {
214 log::warn!("Error while configuring sensor playback: {:#?}", e);
215 response = Err(ConfigurePlaybackError::PlaybackUnavailable);
216 }
217 Ok(Err(e)) => {
218 response = Err(from_driver_playback_error(e));
219 }
220 }
221
222 if !response.is_ok() {
223 self.driver_proxies.retain(|x| !playback.is_playback_driver_proxy(&x));
225 self.sensors.retain(|id, _| !playback.playback_sensor_ids.contains(id));
226 playback.playback_sensor_ids.clear();
227 } else {
228 playback.configured = response.is_ok();
229 self.playback = Some(playback);
230 }
231 }
232
233 response
234 }
235
236 pub(crate) async fn add_instance(&mut self, driver_proxy: driver_fidl::DriverProxy) {
237 let event_stream = driver_proxy.take_event_stream();
238 self.driver_proxies.push(driver_proxy);
239 self.populate_sensors().await;
240 self.update_sender.update_sensor_map(self.sensors.clone()).await;
241 self.update_sender.add_event_stream(event_stream).await;
242 }
243
244 async fn populate_sensors(&mut self) {
245 let mut sensors = HashMap::new();
246 for proxy in self.driver_proxies.clone() {
247 if let Ok(driver_sensors) = proxy.get_sensors_list().await {
248 for sensor in driver_sensors {
249 if is_sensor_valid(&sensor) {
250 let id = sensor.sensor_id.expect("sensor_id");
251 let mut clients: HashSet<Client> = HashSet::new();
252 if let Some(sensor) = self.sensors.get(&id) {
253 clients = sensor.clients.clone();
254 }
255
256 sensors.insert(id, Sensor { driver: proxy.clone(), info: sensor, clients });
257 }
258 }
259 }
260 }
261
262 self.sensors = sensors;
263 }
264
265 async fn start_service_watcher(&self, manager: Arc<Mutex<SensorManager>>) -> Result<(), Error> {
266 let svc = fclient::Service::open(driver_fidl::ServiceMarker)?;
267 svc.watch().await?;
270
271 let svc = fclient::Service::open(driver_fidl::ServiceMarker)?;
272 fuchsia_async::Task::local(async move {
273 if let Err(e) = watch_service_directory(svc, manager).await {
274 log::error!("Failed to open sensor service! Error: {:#?}", e);
275 }
276 })
277 .detach();
278
279 Ok(())
280 }
281
282 pub async fn run(&mut self) -> Result<(), Error> {
283 if let Some(playback) = &self.playback {
286 self.add_instance(playback.driver_proxy.clone()).await;
287 }
288
289 let manager: Arc<Mutex<SensorManager>> = Arc::new(Mutex::new(self.clone()));
290
291 if let Err(_) = self.start_service_watcher(manager.clone()).await {
292 if self.playback.is_some() {
293 log::warn!("Failed to open sensor driver service directory. Starting with playback sensors only.");
294 } else {
295 return Err(anyhow::anyhow!(
296 "Failed to open sensors service and sensor playback is not enabled on the system"
297 ));
298 }
299 }
300
301 let mut fs = ServiceFs::new_local();
302 fs.dir("svc").add_fidl_service(IncomingRequest::SensorManager);
303 fs.take_and_serve_directory_handle()?;
304 fs.for_each_concurrent(None, move |request: IncomingRequest| {
305 let manager = manager.clone();
306 async move {
307 match request {
308 IncomingRequest::SensorManager(stream) => {
309 let client = Client::new(stream.control_handle());
310 manager.lock().await.clients.insert(client.clone());
311 handle_sensor_manager_request_stream(stream, manager, client)
312 .await
313 .expect("Failed to serve sensor requests");
314 }
315 }
316 }
317 })
318 .await;
319
320 Err(anyhow::anyhow!("SensorManager completed unexpectedly."))
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use fidl::endpoints::*;
328 use fidl_fuchsia_hardware_sensors::*;
329 use futures::channel::mpsc;
330
331 #[fuchsia::test]
332 async fn test_invalid_configure_playback() {
333 let (playback_proxy, _) = create_proxy::<PlaybackMarker>();
336
337 let (driver_proxy, _) = create_proxy::<DriverMarker>();
338 let (sender, _receiver) = mpsc::channel(100);
339 let sender = SensorUpdateSender::new(Arc::new(Mutex::new(sender)));
340 let sm = SensorManager::new(sender, Some(Playback::new(driver_proxy, playback_proxy)));
341
342 let manager = Arc::new(Mutex::new(sm));
343 let (proxy, stream) = create_proxy_and_stream::<ManagerMarker>();
344 let client = Client::new(stream.control_handle().clone());
345 fuchsia_async::Task::spawn(async move {
346 manager.lock().await.clients.insert(client.clone());
347 handle_sensor_manager_request_stream(stream, manager, client)
348 .await
349 .expect("Failed to process request stream");
350 })
351 .detach();
352
353 let res = proxy
354 .configure_playback(&PlaybackSourceConfig::FixedValuesConfig(
355 FixedValuesPlaybackConfig {
356 sensor_list: None,
357 sensor_events: None,
358 ..Default::default()
359 },
360 ))
361 .await;
362 assert_eq!(
363 res.unwrap(),
364 Err(fidl_fuchsia_sensors::ConfigurePlaybackError::PlaybackUnavailable)
365 );
366 }
367}