sensors_lib/
sensor_update_sender.rs1use crate::client::Client;
5use crate::sensor_manager::*;
6use fidl::endpoints::ControlHandle;
7use fidl_fuchsia_hardware_sensors::{DriverEvent, DriverEventStream};
8use futures::channel::mpsc;
9use futures::lock::Mutex;
10use futures::select;
11use futures::stream::{FuturesUnordered, StreamFuture};
12use futures_util::StreamExt;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16#[derive(Debug, Clone)]
17pub struct SensorUpdateSender {
18 sender: Arc<Mutex<mpsc::Sender<SensorUpdate>>>,
19}
20
21pub enum SensorUpdate {
22 SensorMap(HashMap<SensorId, Sensor>),
23 EventStream(DriverEventStream),
24}
25
26fn handle_sensor_event(
32 sensors: &mut HashMap<SensorId, Sensor>,
33 sensor_event: Option<Result<DriverEvent, fidl::Error>>,
34) -> bool {
35 let mut should_send_more_events = true;
36 match sensor_event {
37 Some(Ok(DriverEvent::OnSensorEvent { event })) => {
38 if let Some(sensor) = sensors.get_mut(&event.sensor_id) {
39 let mut clients_to_remove: Vec<Client> = Vec::new();
40 for client in &sensor.clients {
41 if !client.control_handle.is_closed() {
42 if let Err(e) = client.control_handle.send_on_sensor_event(&event) {
43 log::warn!("Failed to send sensor event: {:#?}", e);
44 }
45 } else {
46 log::error!("Client was PEER_CLOSED! Removing from clients list");
47 clients_to_remove.push(client.clone());
48 }
49 }
50 for client in clients_to_remove {
51 sensor.clients.remove(&client);
52 }
53 }
54 }
55 Some(Ok(DriverEvent::_UnknownEvent { ordinal, .. })) => {
56 log::warn!("SensorManager received an UnknownEvent with ordinal: {:#?}", ordinal);
57 }
58 Some(Err(e)) => {
59 log::error!("Received an error from sensor driver: {:#?}", e);
60 should_send_more_events = false;
61 }
62 None => {
63 log::error!("Got None from driver");
64 should_send_more_events = false;
65 }
66 }
67
68 should_send_more_events
69}
70
71pub async fn handle_sensor_event_streams(mut update_receiver: mpsc::Receiver<SensorUpdate>) {
74 let mut event_streams: FuturesUnordered<StreamFuture<DriverEventStream>> =
75 FuturesUnordered::new();
76 let mut sensors: HashMap<SensorId, Sensor> = HashMap::new();
77 loop {
78 select! {
79 sensor_event = event_streams.next() => {
80 if let Some((event, stream)) = sensor_event {
81 if handle_sensor_event(&mut sensors, event) {
82 event_streams.push(stream.into_future());
85 }
86 }
87 },
88 sensor_update = update_receiver.next() => {
89 match sensor_update {
90 Some(SensorUpdate::SensorMap(updated_sensors)) => {
91 sensors = updated_sensors;
92 },
93 Some(SensorUpdate::EventStream(stream)) => {
94 event_streams.push(stream.into_future());
95 }
96 None => {
97 log::error!("Channel has hung up! Will no longer receive sensor updates.");
98 }
99 }
100 },
101 }
102 }
103}
104
105impl SensorUpdateSender {
106 pub fn new(sender: Arc<Mutex<mpsc::Sender<SensorUpdate>>>) -> Self {
107 SensorUpdateSender { sender }
108 }
109
110 async fn send_update(&self, update: SensorUpdate) {
111 if let Err(e) = self.sender.lock().await.try_send(update) {
112 log::warn!("Failed to send sensor update! {:#?}", e);
113 }
114 }
115
116 pub(crate) async fn update_sensor_map(&self, sensors: HashMap<SensorId, Sensor>) {
117 self.send_update(SensorUpdate::SensorMap(sensors)).await
118 }
119
120 pub(crate) async fn add_event_stream(&self, event_stream: DriverEventStream) {
121 self.send_update(SensorUpdate::EventStream(event_stream)).await
122 }
123}