1use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, Transport, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use focus_chain_provider::FocusChainProviderPublisher;
13use fuchsia_component::directory::AsRefDirectory;
14use fuchsia_fs::directory::{WatchEvent, Watcher};
15use fuchsia_inspect::NumericProperty;
16use fuchsia_inspect::health::Reporter;
17use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
18use futures::future::LocalBoxFuture;
19use futures::lock::Mutex;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use metrics_registry::*;
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::rc::Rc;
26use std::sync::atomic::{AtomicU32, Ordering};
27use std::sync::{Arc, LazyLock};
28use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
29
30static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
37
38fn get_next_device_id() -> u32 {
42 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
43}
44
45type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
46
47pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
50
51pub struct InputPipelineAssembly {
65 sender: UnboundedSender<Vec<input_device::InputEvent>>,
68 receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
71
72 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
74
75 display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
77
78 focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
80
81 metrics_logger: metrics::MetricsLogger,
83}
84
85impl InputPipelineAssembly {
86 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
89 let (sender, receiver) = mpsc::unbounded();
90 InputPipelineAssembly {
91 sender,
92 receiver,
93 handlers: vec![],
94 metrics_logger,
95 display_ownership_fut: None,
96 focus_listener_fut: None,
97 }
98 }
99
100 pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
103 self.handlers.push(handler);
104 self
105 }
106
107 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
109 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
110 }
111
112 pub fn add_display_ownership(
113 mut self,
114 display_ownership_event: zx::Event,
115 input_handlers_node: &fuchsia_inspect::Node,
116 ) -> InputPipelineAssembly {
117 let h = DisplayOwnership::new(
118 display_ownership_event,
119 input_handlers_node,
120 self.metrics_logger.clone(),
121 );
122 let metrics_logger_clone = self.metrics_logger.clone();
123 let h_clone = h.clone();
124 let sender_clone = self.sender.clone();
125 let display_ownership_fut = Box::pin(async move {
126 h_clone.clone().set_handler_healthy();
127 h_clone.clone()
128 .handle_ownership_change(sender_clone)
129 .await
130 .map_err(|e| {
131 metrics_logger_clone.log_error(
132 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
133 std::format!(
134 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
135 })
136 .unwrap();
137 h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
138 });
139 self.display_ownership_fut = Some(display_ownership_fut);
140 self.add_handler(h)
141 }
142
143 fn into_components(
149 self,
150 ) -> (
151 UnboundedSender<Vec<input_device::InputEvent>>,
152 UnboundedReceiver<Vec<input_device::InputEvent>>,
153 Vec<Rc<dyn input_handler::BatchInputHandler>>,
154 metrics::MetricsLogger,
155 Option<LocalBoxFuture<'static, ()>>,
156 Option<LocalBoxFuture<'static, ()>>,
157 ) {
158 (
159 self.sender,
160 self.receiver,
161 self.handlers,
162 self.metrics_logger,
163 self.display_ownership_fut,
164 self.focus_listener_fut,
165 )
166 }
167
168 pub fn add_focus_listener(
169 mut self,
170 incoming: &Incoming,
171 focus_chain_publisher: FocusChainProviderPublisher,
172 ) -> Self {
173 let metrics_logger_clone = self.metrics_logger.clone();
174 let incoming2 = incoming.clone();
175 let focus_listener_fut = Box::pin(async move {
176 if let Ok(mut focus_listener) = FocusListener::new(
177 &incoming2,
178 focus_chain_publisher,
179 metrics_logger_clone,
180 )
181 .map_err(|e| {
182 log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
183 }) {
184 let _result = focus_listener
187 .dispatch_focus_changes()
188 .await
189 .map(|_| {
190 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
191 })
192 .map_err(|e| {
193 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
194 });
195 }
196 });
197 self.focus_listener_fut = Some(focus_listener_fut);
198 self
199 }
200}
201
202pub struct InputPipeline {
230 pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
234
235 device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
238
239 device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
241
242 input_device_types: Vec<input_device::InputDeviceType>,
244
245 input_device_bindings: InputDeviceBindingHashMap,
247
248 inspect_node: fuchsia_inspect::Node,
251
252 metrics_logger: metrics::MetricsLogger,
254
255 pub feature_flags: input_device::InputPipelineFeatureFlags,
257}
258
259impl InputPipeline {
260 fn new_common(
261 input_device_types: Vec<input_device::InputDeviceType>,
262 assembly: InputPipelineAssembly,
263 inspect_node: fuchsia_inspect::Node,
264 feature_flags: input_device::InputPipelineFeatureFlags,
265 ) -> Self {
266 let (
267 pipeline_sender,
268 receiver,
269 handlers,
270 metrics_logger,
271 display_ownership_fut,
272 focus_listener_fut,
273 ) = assembly.into_components();
274
275 let mut handlers_count = handlers.len();
276 if let Some(fut) = display_ownership_fut {
278 fasync::Task::local(fut).detach();
279 handlers_count += 1;
280 }
281
282 if let Some(fut) = focus_listener_fut {
284 fasync::Task::local(fut).detach();
285 handlers_count += 1;
286 }
287
288 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
290 inspect_node.record_uint("handlers_registered", handlers_count as u64);
291 inspect_node.record_uint("handlers_healthy", handlers_count as u64);
292
293 InputPipeline::run(receiver, handlers, metrics_logger.clone());
295
296 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
297 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
298 InputPipeline {
299 pipeline_sender,
300 device_event_sender,
301 device_event_receiver,
302 input_device_types,
303 input_device_bindings,
304 inspect_node,
305 metrics_logger,
306 feature_flags,
307 }
308 }
309
310 pub fn new_for_test(
318 input_device_types: Vec<input_device::InputDeviceType>,
319 assembly: InputPipelineAssembly,
320 ) -> Self {
321 let inspector = fuchsia_inspect::Inspector::default();
322 let root = inspector.root();
323 let test_node = root.create_child("input_pipeline");
324 Self::new_common(
325 input_device_types,
326 assembly,
327 test_node,
328 input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
329 )
330 }
331
332 pub fn new(
339 incoming: &Incoming,
340 input_device_types: Vec<input_device::InputDeviceType>,
341 assembly: InputPipelineAssembly,
342 inspect_node: fuchsia_inspect::Node,
343 feature_flags: input_device::InputPipelineFeatureFlags,
344 metrics_logger: metrics::MetricsLogger,
345 ) -> Result<Self, Error> {
346 let input_pipeline =
347 Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
348 let input_device_types = input_pipeline.input_device_types.clone();
349 let input_event_sender = input_pipeline.device_event_sender.clone();
350 let input_device_bindings = input_pipeline.input_device_bindings.clone();
351 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
352 let feature_flags = input_pipeline.feature_flags.clone();
353 let incoming = incoming.clone();
354 fasync::Task::local(async move {
359 match async {
362 let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
363 incoming.as_ref_directory().open(
364 input_device::INPUT_REPORT_PATH,
365 fio::PERM_READABLE,
366 server.into()
367 )
368 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
369 let device_watcher =
370 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
371 Self::watch_for_devices(
372 device_watcher,
373 dir_proxy,
374 input_device_types,
375 input_event_sender,
376 input_device_bindings,
377 &devices_node,
378 false, feature_flags,
380 metrics_logger.clone(),
381 )
382 .await
383 .context("failed to watch for devices")
384 }
385 .await
386 {
387 Ok(()) => {}
388 Err(err) => {
389 metrics_logger.log_warn(
393 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
394 std::format!(
395 "Input pipeline is unable to watch for new input devices: {:?}",
396 err
397 ));
398 }
399 }
400 }).detach();
401
402 Ok(input_pipeline)
403 }
404
405 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
407 &self.input_device_bindings
408 }
409
410 pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
413 &self.device_event_sender
414 }
415
416 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
418 &self.input_device_types
419 }
420
421 pub async fn handle_input_events(mut self) {
423 let metrics_logger_clone = self.metrics_logger.clone();
424 while let Some(input_event) = self.device_event_receiver.next().await {
425 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
426 metrics_logger_clone.log_error(
427 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
428 std::format!("could not forward event from driver: {:?}", &e));
429 }
430 }
431
432 metrics_logger_clone.log_error(
433 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
434 "Input pipeline stopped handling input events.".to_string(),
435 );
436 }
437
438 async fn watch_for_devices(
454 mut device_watcher: Watcher,
455 dir_proxy: fio::DirectoryProxy,
456 device_types: Vec<input_device::InputDeviceType>,
457 input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
458 bindings: InputDeviceBindingHashMap,
459 input_devices_node: &fuchsia_inspect::Node,
460 break_on_idle: bool,
461 feature_flags: input_device::InputPipelineFeatureFlags,
462 metrics_logger: metrics::MetricsLogger,
463 ) -> Result<(), Error> {
464 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
466 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
467 while let Some(msg) = device_watcher.try_next().await? {
468 if let Ok(filename) = msg.filename.into_os_string().into_string() {
469 if filename == "." {
470 continue;
471 }
472
473 let pathbuf = PathBuf::from(filename.clone());
474 match msg.event {
475 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
476 log::info!("found input device {}", filename);
477 devices_discovered.add(1);
478 let device_proxy =
479 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
480 add_device_bindings(
481 &device_types,
482 &filename,
483 device_proxy,
484 &input_event_sender,
485 &bindings,
486 get_next_device_id(),
487 input_devices_node,
488 Some(&devices_connected),
489 feature_flags.clone(),
490 metrics_logger.clone(),
491 )
492 .await;
493 }
494 WatchEvent::IDLE => {
495 if break_on_idle {
496 break;
497 }
498 }
499 _ => (),
500 }
501 }
502 }
503 input_devices_node.record(devices_discovered);
505 input_devices_node.record(devices_connected);
506 Err(format_err!("Input pipeline stopped watching for new input devices."))
507 }
508
509 pub async fn handle_input_device_registry_request_stream(
524 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
525 device_types: &Vec<input_device::InputDeviceType>,
526 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
527 bindings: &InputDeviceBindingHashMap,
528 input_devices_node: &fuchsia_inspect::Node,
529 feature_flags: input_device::InputPipelineFeatureFlags,
530 metrics_logger: metrics::MetricsLogger,
531 ) -> Result<(), Error> {
532 while let Some(request) = stream
533 .try_next()
534 .await
535 .context("Error handling input device registry request stream")?
536 {
537 match request {
538 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
539 device,
540 ..
541 } => {
542 let device = fidl_next::ClientEnd::<
544 fidl_next_fuchsia_input_report::InputDevice,
545 zx::Channel,
546 >::from_untyped(device.into_channel());
547 let device = Dispatcher::client_from_zx_channel(device);
548 let device = device.spawn();
549 let device_id = get_next_device_id();
550
551 add_device_bindings(
552 device_types,
553 &format!("input-device-registry-{}", device_id),
554 device,
555 input_event_sender,
556 bindings,
557 device_id,
558 input_devices_node,
559 None,
560 feature_flags.clone(),
561 metrics_logger.clone(),
562 )
563 .await;
564 }
565 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
566 device,
567 responder,
568 .. } => {
569 let device = fidl_next::ClientEnd::<
571 fidl_next_fuchsia_input_report::InputDevice,
572 zx::Channel,
573 >::from_untyped(device.into_channel());
574 let device = Dispatcher::client_from_zx_channel(device);
575 let device = device.spawn();
576 let device_id = get_next_device_id();
577
578 add_device_bindings(
579 device_types,
580 &format!("input-device-registry-{}", device_id),
581 device,
582 input_event_sender,
583 bindings,
584 device_id,
585 input_devices_node,
586 None,
587 feature_flags.clone(),
588 metrics_logger.clone(),
589 )
590 .await;
591
592 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
593 device_id: Some(device_id),
594 ..Default::default()
595 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
596 }
597 }
598 }
599
600 Ok(())
601 }
602
603 fn run(
605 mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
606 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
607 metrics_logger: metrics::MetricsLogger,
608 ) {
609 Dispatcher::spawn_local(async move {
610 for handler in &handlers {
611 handler.clone().set_handler_healthy();
612 }
613
614 use input_device::InputEventType;
615 use std::collections::HashMap;
616
617 let mut handlers_by_type: HashMap<
619 InputEventType,
620 Vec<Rc<dyn input_handler::BatchInputHandler>>,
621 > = HashMap::new();
622
623 let event_types = vec![
625 InputEventType::Keyboard,
626 InputEventType::LightSensor,
627 InputEventType::ConsumerControls,
628 InputEventType::Mouse,
629 InputEventType::TouchScreen,
630 InputEventType::Touchpad,
631 #[cfg(test)]
632 InputEventType::Fake,
633 ];
634
635 for event_type in event_types {
636 let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
637 .iter()
638 .filter(|h| h.interest().contains(&event_type))
639 .cloned()
640 .collect();
641 handlers_by_type.insert(event_type, handlers_for_type);
642 }
643
644 while let Some(events) = receiver.next().await {
645 if events.is_empty() {
646 continue;
647 }
648
649 let mut groups_seen = 0;
650 let events = events
651 .into_iter()
652 .chunk_by(|e| InputEventType::from(&e.device_event));
653 let events = events.into_iter().map(|(k, v)| (k, v.collect::<Vec<_>>()));
654 for (event_type, event_group) in events {
655 groups_seen += 1;
656 if groups_seen == 2 {
657 metrics_logger.log_error(
658 InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
659 "it is not recommended to contain multiple types of events in 1 send".to_string(),
660 );
661 }
662 let mut events_in_group = event_group;
663
664 let handlers = handlers_by_type.get(&event_type).unwrap();
666
667 for handler in handlers {
668 events_in_group =
669 handler.clone().handle_input_events(events_in_group).await;
670 }
671
672 for event in events_in_group {
673 if event.handled == input_device::Handled::No {
674 log::warn!("unhandled input event: {:?}", &event);
675 }
676 if let Some(trace_id) = event.trace_id {
677 fuchsia_trace::flow_end!(
678 "input",
679 "event_in_input_pipeline",
680 trace_id.into()
681 );
682 }
683 }
684 }
685 }
686 for handler in &handlers {
687 handler.clone().set_handler_unhealthy("Pipeline loop terminated");
688 }
689 panic!("Runner task is not supposed to terminate.")
690 }).detach();
691 }
692}
693
694async fn add_device_bindings(
714 device_types: &Vec<input_device::InputDeviceType>,
715 filename: &String,
716 device_proxy: fidl_next::Client<fidl_next_fuchsia_input_report::InputDevice, Transport>,
717 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
718 bindings: &InputDeviceBindingHashMap,
719 device_id: u32,
720 input_devices_node: &fuchsia_inspect::Node,
721 devices_connected: Option<&fuchsia_inspect::UintProperty>,
722 feature_flags: InputPipelineFeatureFlags,
723 metrics_logger: metrics::MetricsLogger,
724) {
725 let mut matched_device_types = vec![];
726 if let Ok(res) = device_proxy.get_descriptor().await {
727 for device_type in device_types {
728 if input_device::is_device_type(&res.descriptor, *device_type).await {
729 matched_device_types.push(device_type);
730 match devices_connected {
731 Some(dev_connected) => {
732 let _ = dev_connected.add(1);
733 }
734 None => (),
735 };
736 }
737 }
738 if matched_device_types.is_empty() {
739 log::info!(
740 "device {} did not match any supported device types: {:?}",
741 filename,
742 device_types
743 );
744 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
745 let mut health = fuchsia_inspect::health::Node::new(&device_node);
746 health.set_unhealthy("Unsupported device type.");
747 device_node.record(health);
748 input_devices_node.record(device_node);
749 return;
750 }
751 } else {
752 metrics_logger.clone().log_error(
753 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
754 std::format!("cannot bind device {} without a device descriptor", filename),
755 );
756 return;
757 }
758
759 log::info!(
760 "binding {} to device types: {}",
761 filename,
762 matched_device_types
763 .iter()
764 .fold(String::new(), |device_types_string, device_type| device_types_string
765 + &format!("{:?}, ", device_type))
766 );
767
768 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
769 for device_type in matched_device_types {
770 let proxy = device_proxy.clone();
791 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
792 match input_device::get_device_binding(
793 *device_type,
794 proxy,
795 device_id,
796 input_event_sender.clone(),
797 device_node,
798 feature_flags.clone(),
799 metrics_logger.clone(),
800 )
801 .await
802 {
803 Ok(binding) => new_bindings.push(binding),
804 Err(e) => {
805 metrics_logger.log_error(
806 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
807 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
808 );
809 }
810 }
811 }
812
813 if !new_bindings.is_empty() {
814 let mut bindings = bindings.lock().await;
815 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use super::*;
822 use crate::input_device::{InputDeviceBinding, InputEventType};
823 use crate::utils::Position;
824 use crate::{
825 fake_input_device_binding, mouse_binding, mouse_model_database,
826 observe_fake_events_input_handler,
827 };
828 use async_trait::async_trait;
829 use diagnostics_assertions::AnyProperty;
830 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
831 use fuchsia_async as fasync;
832 use futures::FutureExt;
833 use pretty_assertions::assert_eq;
834 use rand::Rng;
835 use std::collections::HashSet;
836 use vfs::{pseudo_directory, service as pseudo_fs_service};
837
838 const COUNTS_PER_MM: u32 = 12;
839
840 fn send_input_event(
845 sender: UnboundedSender<Vec<input_device::InputEvent>>,
846 ) -> Vec<input_device::InputEvent> {
847 let mut rng = rand::rng();
848 let offset =
849 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
850 let input_event = input_device::InputEvent {
851 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
852 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
853 millimeters: Position {
854 x: offset.x / COUNTS_PER_MM as f32,
855 y: offset.y / COUNTS_PER_MM as f32,
856 },
857 }),
858 None, None, mouse_binding::MousePhase::Move,
861 HashSet::new(),
862 HashSet::new(),
863 None, None, )),
866 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
867 mouse_binding::MouseDeviceDescriptor {
868 device_id: 1,
869 absolute_x_range: None,
870 absolute_y_range: None,
871 wheel_v_range: None,
872 wheel_h_range: None,
873 buttons: None,
874 counts_per_mm: COUNTS_PER_MM,
875 },
876 ),
877 event_time: zx::MonotonicInstant::get(),
878 handled: input_device::Handled::No,
879 trace_id: None,
880 };
881 match sender.unbounded_send(vec![input_event.clone()]) {
882 Err(_) => assert!(false),
883 _ => {}
884 }
885
886 vec![input_event]
887 }
888
889 fn handle_input_device_request(
894 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
895 ) {
896 match input_device_request {
897 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
898 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
899 device_information: None,
900 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
901 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
902 movement_x: None,
903 movement_y: None,
904 scroll_v: None,
905 scroll_h: None,
906 buttons: Some(vec![0]),
907 position_x: None,
908 position_y: None,
909 ..Default::default()
910 }),
911 ..Default::default()
912 }),
913 sensor: None,
914 touch: None,
915 keyboard: None,
916 consumer_control: None,
917 ..Default::default()
918 });
919 }
920 _ => {}
921 }
922 }
923
924 #[fasync::run_singlethreaded(test)]
926 async fn multiple_devices_single_handler() {
927 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
929 let first_device_binding =
930 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
931 let second_device_binding =
932 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
933
934 let (handler_event_sender, mut handler_event_receiver) =
936 futures::channel::mpsc::channel(100);
937 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
938 handler_event_sender,
939 );
940
941 let (sender, receiver, handlers, _, _, _) =
943 InputPipelineAssembly::new(metrics::MetricsLogger::default())
944 .add_handler(input_handler)
945 .into_components();
946 let inspector = fuchsia_inspect::Inspector::default();
947 let test_node = inspector.root().create_child("input_pipeline");
948 let input_pipeline = InputPipeline {
949 pipeline_sender: sender,
950 device_event_sender,
951 device_event_receiver,
952 input_device_types: vec![],
953 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
954 inspect_node: test_node,
955 metrics_logger: metrics::MetricsLogger::default(),
956 feature_flags: input_device::InputPipelineFeatureFlags::default(),
957 };
958 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
959
960 let first_device_events = send_input_event(first_device_binding.input_event_sender());
962 let second_device_events = send_input_event(second_device_binding.input_event_sender());
963
964 fasync::Task::local(async {
966 input_pipeline.handle_input_events().await;
967 })
968 .detach();
969
970 let first_handled_event = handler_event_receiver.next().await;
972 assert_eq!(first_handled_event, first_device_events.into_iter().next());
973
974 let second_handled_event = handler_event_receiver.next().await;
975 assert_eq!(second_handled_event, second_device_events.into_iter().next());
976 }
977
978 #[fasync::run_singlethreaded(test)]
980 async fn single_device_multiple_handlers() {
981 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
983 let input_device_binding =
984 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
985
986 let (first_handler_event_sender, mut first_handler_event_receiver) =
988 futures::channel::mpsc::channel(100);
989 let first_input_handler =
990 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
991 first_handler_event_sender,
992 );
993 let (second_handler_event_sender, mut second_handler_event_receiver) =
994 futures::channel::mpsc::channel(100);
995 let second_input_handler =
996 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
997 second_handler_event_sender,
998 );
999
1000 let (sender, receiver, handlers, _, _, _) =
1002 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1003 .add_handler(first_input_handler)
1004 .add_handler(second_input_handler)
1005 .into_components();
1006 let inspector = fuchsia_inspect::Inspector::default();
1007 let test_node = inspector.root().create_child("input_pipeline");
1008 let input_pipeline = InputPipeline {
1009 pipeline_sender: sender,
1010 device_event_sender,
1011 device_event_receiver,
1012 input_device_types: vec![],
1013 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
1014 inspect_node: test_node,
1015 metrics_logger: metrics::MetricsLogger::default(),
1016 feature_flags: input_device::InputPipelineFeatureFlags::default(),
1017 };
1018 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1019
1020 let input_events = send_input_event(input_device_binding.input_event_sender());
1022
1023 fasync::Task::local(async {
1025 input_pipeline.handle_input_events().await;
1026 })
1027 .detach();
1028
1029 let expected_event = input_events.into_iter().next();
1031 let first_handler_event = first_handler_event_receiver.next().await;
1032 assert_eq!(first_handler_event, expected_event);
1033 let second_handler_event = second_handler_event_receiver.next().await;
1034 assert_eq!(second_handler_event, expected_event);
1035 }
1036
1037 #[fasync::run_singlethreaded(test)]
1040 async fn watch_devices_one_match_exists() {
1041 let mut count: i8 = 0;
1043 let dir = pseudo_directory! {
1044 "file_name" => pseudo_fs_service::host(
1045 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1046 async move {
1047 while count < 3 {
1048 if let Some(input_device_request) =
1049 request_stream.try_next().await.unwrap()
1050 {
1051 handle_input_device_request(input_device_request);
1052 count += 1;
1053 }
1054 }
1055
1056 }.boxed()
1057 },
1058 )
1059 };
1060
1061 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1063 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1064 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1067
1068 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1069 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1070 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1071
1072 let inspector = fuchsia_inspect::Inspector::default();
1073 let test_node = inspector.root().create_child("input_pipeline");
1074 test_node.record_string(
1075 "supported_input_devices",
1076 supported_device_types.clone().iter().join(", "),
1077 );
1078 let input_devices = test_node.create_child("input_devices");
1079 diagnostics_assertions::assert_data_tree!(inspector, root: {
1081 input_pipeline: {
1082 supported_input_devices: "Mouse",
1083 input_devices: {}
1084 }
1085 });
1086
1087 let _ = InputPipeline::watch_for_devices(
1088 device_watcher,
1089 dir_proxy_for_pipeline,
1090 supported_device_types,
1091 input_event_sender,
1092 bindings.clone(),
1093 &input_devices,
1094 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1096 metrics::MetricsLogger::default(),
1097 )
1098 .await;
1099
1100 let bindings_hashmap = bindings.lock().await;
1102 assert_eq!(bindings_hashmap.len(), 1);
1103 let bindings_vector = bindings_hashmap.get(&10);
1104 assert!(bindings_vector.is_some());
1105 assert_eq!(bindings_vector.unwrap().len(), 1);
1106 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1107 assert!(boxed_mouse_binding.is_some());
1108 assert_eq!(
1109 boxed_mouse_binding.unwrap().get_device_descriptor(),
1110 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1111 device_id: 10,
1112 absolute_x_range: None,
1113 absolute_y_range: None,
1114 wheel_v_range: None,
1115 wheel_h_range: None,
1116 buttons: Some(vec![0]),
1117 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1118 })
1119 );
1120
1121 diagnostics_assertions::assert_data_tree!(inspector, root: {
1123 input_pipeline: {
1124 supported_input_devices: "Mouse",
1125 input_devices: {
1126 devices_discovered: 1u64,
1127 devices_connected: 1u64,
1128 "file_name_Mouse": contains {
1129 reports_received_count: 0u64,
1130 reports_filtered_count: 0u64,
1131 events_generated: 0u64,
1132 last_received_timestamp_ns: 0u64,
1133 last_generated_timestamp_ns: 0u64,
1134 "fuchsia.inspect.Health": {
1135 status: "OK",
1136 start_timestamp_nanos: AnyProperty
1139 },
1140 }
1141 }
1142 }
1143 });
1144 }
1145
1146 #[fasync::run_singlethreaded(test)]
1149 async fn watch_devices_no_matches_exist() {
1150 let mut count: i8 = 0;
1152 let dir = pseudo_directory! {
1153 "file_name" => pseudo_fs_service::host(
1154 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1155 async move {
1156 while count < 1 {
1157 if let Some(input_device_request) =
1158 request_stream.try_next().await.unwrap()
1159 {
1160 handle_input_device_request(input_device_request);
1161 count += 1;
1162 }
1163 }
1164
1165 }.boxed()
1166 },
1167 )
1168 };
1169
1170 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1172 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1173 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1176
1177 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1178 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1179 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1180
1181 let inspector = fuchsia_inspect::Inspector::default();
1182 let test_node = inspector.root().create_child("input_pipeline");
1183 test_node.record_string(
1184 "supported_input_devices",
1185 supported_device_types.clone().iter().join(", "),
1186 );
1187 let input_devices = test_node.create_child("input_devices");
1188 diagnostics_assertions::assert_data_tree!(inspector, root: {
1190 input_pipeline: {
1191 supported_input_devices: "Keyboard",
1192 input_devices: {}
1193 }
1194 });
1195
1196 let _ = InputPipeline::watch_for_devices(
1197 device_watcher,
1198 dir_proxy_for_pipeline,
1199 supported_device_types,
1200 input_event_sender,
1201 bindings.clone(),
1202 &input_devices,
1203 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1205 metrics::MetricsLogger::default(),
1206 )
1207 .await;
1208
1209 let bindings = bindings.lock().await;
1211 assert_eq!(bindings.len(), 0);
1212
1213 diagnostics_assertions::assert_data_tree!(inspector, root: {
1215 input_pipeline: {
1216 supported_input_devices: "Keyboard",
1217 input_devices: {
1218 devices_discovered: 1u64,
1219 devices_connected: 0u64,
1220 "file_name_Unsupported": {
1221 "fuchsia.inspect.Health": {
1222 status: "UNHEALTHY",
1223 message: "Unsupported device type.",
1224 start_timestamp_nanos: AnyProperty
1227 },
1228 }
1229 }
1230 }
1231 });
1232 }
1233
1234 #[fasync::run_singlethreaded(test)]
1237 async fn handle_input_device_registry_request_stream() {
1238 let (input_device_registry_proxy, input_device_registry_request_stream) =
1239 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1240 let (input_device_client_end, mut input_device_request_stream) =
1241 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1242
1243 let device_types = vec![input_device::InputDeviceType::Mouse];
1244 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1245 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1246
1247 let mut count: i8 = 0;
1249 fasync::Task::local(async move {
1250 let _ = input_device_registry_proxy.register(input_device_client_end);
1252
1253 while count < 3 {
1254 if let Some(input_device_request) =
1255 input_device_request_stream.try_next().await.unwrap()
1256 {
1257 handle_input_device_request(input_device_request);
1258 count += 1;
1259 }
1260 }
1261
1262 input_device_registry_proxy.take_event_stream();
1264 })
1265 .detach();
1266
1267 let inspector = fuchsia_inspect::Inspector::default();
1268 let test_node = inspector.root().create_child("input_pipeline");
1269
1270 let bindings_clone = bindings.clone();
1272 let _ = InputPipeline::handle_input_device_registry_request_stream(
1273 input_device_registry_request_stream,
1274 &device_types,
1275 &input_event_sender,
1276 &bindings_clone,
1277 &test_node,
1278 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1279 metrics::MetricsLogger::default(),
1280 )
1281 .await;
1282
1283 let bindings = bindings.lock().await;
1285 assert_eq!(bindings.len(), 1);
1286 }
1287
1288 #[fasync::run_singlethreaded(test)]
1290 async fn check_inspect_node_has_correct_properties() {
1291 let device_types = vec![
1292 input_device::InputDeviceType::Touch,
1293 input_device::InputDeviceType::ConsumerControls,
1294 ];
1295 let inspector = fuchsia_inspect::Inspector::default();
1296 let test_node = inspector.root().create_child("input_pipeline");
1297 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1299 futures::channel::mpsc::channel(100);
1300 let fake_input_handler =
1301 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1302 fake_handler_event_sender,
1303 );
1304 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1305 .add_handler(fake_input_handler);
1306 let _test_input_pipeline = InputPipeline::new(
1307 &Incoming::new(),
1308 device_types,
1309 assembly,
1310 test_node,
1311 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1312 metrics::MetricsLogger::default(),
1313 );
1314 diagnostics_assertions::assert_data_tree!(inspector, root: {
1315 input_pipeline: {
1316 supported_input_devices: "Touch, ConsumerControls",
1317 handlers_registered: 1u64,
1318 handlers_healthy: 1u64,
1319 input_devices: {}
1320 }
1321 });
1322 }
1323
1324 struct SpecificInterestFakeHandler {
1325 interest_types: Vec<input_device::InputEventType>,
1326 event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1327 }
1328
1329 impl SpecificInterestFakeHandler {
1330 pub fn new(
1331 interest_types: Vec<input_device::InputEventType>,
1332 event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1333 ) -> Rc<Self> {
1334 Rc::new(SpecificInterestFakeHandler {
1335 interest_types,
1336 event_sender: std::cell::RefCell::new(event_sender),
1337 })
1338 }
1339 }
1340
1341 impl Handler for SpecificInterestFakeHandler {
1342 fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1343 fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1344 fn get_name(&self) -> &'static str {
1345 "SpecificInterestFakeHandler"
1346 }
1347
1348 fn interest(&self) -> Vec<input_device::InputEventType> {
1349 self.interest_types.clone()
1350 }
1351 }
1352
1353 #[async_trait(?Send)]
1354 impl input_handler::InputHandler for SpecificInterestFakeHandler {
1355 async fn handle_input_event(
1356 self: Rc<Self>,
1357 input_event: input_device::InputEvent,
1358 ) -> Vec<input_device::InputEvent> {
1359 match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1360 Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1361 Ok(_) => {}
1362 }
1363 vec![input_event]
1364 }
1365 }
1366
1367 #[fasync::run_singlethreaded(test)]
1368 async fn run_only_sends_events_to_interested_handlers() {
1369 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1371 let mouse_handler =
1372 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1373
1374 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1376 let fake_handler =
1377 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1378
1379 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1380 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1381 .add_handler(mouse_handler)
1382 .add_handler(fake_handler)
1383 .into_components();
1384
1385 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1387
1388 let fake_event = input_device::InputEvent {
1390 device_event: input_device::InputDeviceEvent::Fake,
1391 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1392 event_time: zx::MonotonicInstant::get(),
1393 handled: input_device::Handled::No,
1394 trace_id: None,
1395 };
1396
1397 pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1399
1400 let received_by_fake = fake_receiver.next().await;
1402 assert_eq!(received_by_fake, Some(fake_event));
1403
1404 assert!(mouse_receiver.try_next().is_err());
1406 }
1407
1408 fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1409 input_device::InputEvent {
1410 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1411 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1412 millimeters: Position { x, y },
1413 }),
1414 None,
1415 None,
1416 mouse_binding::MousePhase::Move,
1417 HashSet::new(),
1418 HashSet::new(),
1419 None,
1420 None,
1421 )),
1422 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1423 mouse_binding::MouseDeviceDescriptor {
1424 device_id: 1,
1425 absolute_x_range: None,
1426 absolute_y_range: None,
1427 wheel_v_range: None,
1428 wheel_h_range: None,
1429 buttons: None,
1430 counts_per_mm: 1,
1431 },
1432 ),
1433 event_time: zx::MonotonicInstant::get(),
1434 handled: input_device::Handled::No,
1435 trace_id: None,
1436 }
1437 }
1438
1439 #[fasync::run_singlethreaded(test)]
1440 async fn run_mixed_event_types_dispatched_correctly() {
1441 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1443 let mouse_handler =
1444 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1445
1446 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1448 let fake_handler =
1449 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1450
1451 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1452 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1453 .add_handler(mouse_handler)
1454 .add_handler(fake_handler)
1455 .into_components();
1456
1457 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1459
1460 let mouse_event_1 = create_mouse_event(1.0, 1.0);
1462 let mouse_event_2 = create_mouse_event(2.0, 2.0);
1463 let mouse_event_3 = create_mouse_event(3.0, 3.0);
1464
1465 let fake_event_1 = input_device::InputEvent {
1466 device_event: input_device::InputDeviceEvent::Fake,
1467 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1468 event_time: zx::MonotonicInstant::get(),
1469 handled: input_device::Handled::No,
1470 trace_id: None,
1471 };
1472
1473 let mixed_batch = vec![
1476 mouse_event_1.clone(),
1477 mouse_event_2.clone(),
1478 fake_event_1.clone(),
1479 mouse_event_3.clone(),
1480 ];
1481 pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1482
1483 assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1485 assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1486 assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1487
1488 assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1490 }
1491}