sensors_lib/
sensor_update_sender.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::client::Client;
5use crate::sensor_manager::*;
6use fidl::endpoints::ControlHandle;
7use fidl_fuchsia_hardware_sensors::{DriverEvent, DriverEventStream};
8use futures::channel::mpsc;
9use futures::lock::Mutex;
10use futures::select;
11use futures::stream::{FuturesUnordered, StreamFuture};
12use futures_util::StreamExt;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16#[derive(Debug, Clone)]
17pub struct SensorUpdateSender {
18    sender: Arc<Mutex<mpsc::Sender<SensorUpdate>>>,
19}
20
21pub enum SensorUpdate {
22    SensorMap(HashMap<SensorId, Sensor>),
23    EventStream(DriverEventStream),
24}
25
26// Sends all clients of a sensor an event if event contains a DriverEvent. If the channel to the
27// client has closed, it will remove that client from the list of subscribers for that particular
28// sensor.
29//
30// Returns whether or not to continue polling the event stream that generated this sensor event.
31fn handle_sensor_event(
32    sensors: &mut HashMap<SensorId, Sensor>,
33    sensor_event: Option<Result<DriverEvent, fidl::Error>>,
34) -> bool {
35    let mut should_send_more_events = true;
36    match sensor_event {
37        Some(Ok(DriverEvent::OnSensorEvent { event })) => {
38            if let Some(sensor) = sensors.get_mut(&event.sensor_id) {
39                let mut clients_to_remove: Vec<Client> = Vec::new();
40                for client in &sensor.clients {
41                    if !client.control_handle.is_closed() {
42                        if let Err(e) = client.control_handle.send_on_sensor_event(&event) {
43                            log::warn!("Failed to send sensor event: {:#?}", e);
44                        }
45                    } else {
46                        log::error!("Client was PEER_CLOSED! Removing from clients list");
47                        clients_to_remove.push(client.clone());
48                    }
49                }
50                for client in clients_to_remove {
51                    sensor.clients.remove(&client);
52                }
53            }
54        }
55        Some(Ok(DriverEvent::_UnknownEvent { ordinal, .. })) => {
56            log::warn!("SensorManager received an UnknownEvent with ordinal: {:#?}", ordinal);
57        }
58        Some(Err(e)) => {
59            log::error!("Received an error from sensor driver: {:#?}", e);
60            should_send_more_events = false;
61        }
62        None => {
63            log::error!("Got None from driver");
64            should_send_more_events = false;
65        }
66    }
67
68    should_send_more_events
69}
70
71// Handles the stream of sensor events from all drivers. Receives updates about sensor information
72// and new sensors from a channel to the SensorManager.
73pub async fn handle_sensor_event_streams(mut update_receiver: mpsc::Receiver<SensorUpdate>) {
74    let mut event_streams: FuturesUnordered<StreamFuture<DriverEventStream>> =
75        FuturesUnordered::new();
76    let mut sensors: HashMap<SensorId, Sensor> = HashMap::new();
77    loop {
78        select! {
79            sensor_event = event_streams.next() => {
80                if let Some((event, stream)) = sensor_event {
81                    if handle_sensor_event(&mut sensors, event) {
82                        // Once the future has resolved, the rest of the events need to be
83                        // placed back onto the list of futures.
84                        event_streams.push(stream.into_future());
85                    }
86                }
87            },
88            sensor_update = update_receiver.next() => {
89                match sensor_update {
90                    Some(SensorUpdate::SensorMap(updated_sensors)) => {
91                        sensors = updated_sensors;
92                    },
93                    Some(SensorUpdate::EventStream(stream)) => {
94                        event_streams.push(stream.into_future());
95                    }
96                    None => {
97                        log::error!("Channel has hung up! Will no longer receive sensor updates.");
98                    }
99                }
100            },
101        }
102    }
103}
104
105impl SensorUpdateSender {
106    pub fn new(sender: Arc<Mutex<mpsc::Sender<SensorUpdate>>>) -> Self {
107        SensorUpdateSender { sender }
108    }
109
110    async fn send_update(&self, update: SensorUpdate) {
111        if let Err(e) = self.sender.lock().await.try_send(update) {
112            log::warn!("Failed to send sensor update! {:#?}", e);
113        }
114    }
115
116    pub(crate) async fn update_sensor_map(&self, sensors: HashMap<SensorId, Sensor>) {
117        self.send_update(SensorUpdate::SensorMap(sensors)).await
118    }
119
120    pub(crate) async fn add_event_stream(&self, event_stream: DriverEventStream) {
121        self.send_update(SensorUpdate::EventStream(event_stream)).await
122    }
123}