1use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use focus_chain_provider::FocusChainProviderPublisher;
12use fuchsia_fs::directory::{WatchEvent, Watcher};
13use fuchsia_inspect::NumericProperty;
14use fuchsia_inspect::health::Reporter;
15use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
16use futures::future::LocalBoxFuture;
17use futures::lock::Mutex;
18use futures::{StreamExt, TryStreamExt};
19use itertools::Itertools;
20use metrics_registry::*;
21use std::collections::HashMap;
22use std::path::PathBuf;
23use std::rc::Rc;
24use std::sync::atomic::{AtomicU32, Ordering};
25use std::sync::{Arc, LazyLock};
26use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
27
28static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
35
36fn get_next_device_id() -> u32 {
40 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
41}
42
43type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
44
45pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
48
49pub struct InputPipelineAssembly {
63 sender: UnboundedSender<Vec<input_device::InputEvent>>,
66 receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
69
70 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
72
73 display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
75
76 focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
78
79 metrics_logger: metrics::MetricsLogger,
81}
82
83impl InputPipelineAssembly {
84 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
87 let (sender, receiver) = mpsc::unbounded();
88 InputPipelineAssembly {
89 sender,
90 receiver,
91 handlers: vec![],
92 metrics_logger,
93 display_ownership_fut: None,
94 focus_listener_fut: None,
95 }
96 }
97
98 pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
101 self.handlers.push(handler);
102 self
103 }
104
105 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
107 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
108 }
109
110 pub fn add_display_ownership(
111 mut self,
112 display_ownership_event: zx::Event,
113 input_handlers_node: &fuchsia_inspect::Node,
114 ) -> InputPipelineAssembly {
115 let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
116 let metrics_logger_clone = self.metrics_logger.clone();
117 let h_clone = h.clone();
118 let sender_clone = self.sender.clone();
119 let display_ownership_fut = Box::pin(async move {
120 h_clone.clone().set_handler_healthy();
121 h_clone.clone()
122 .handle_ownership_change(sender_clone)
123 .await
124 .map_err(|e| {
125 metrics_logger_clone.log_error(
126 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
127 std::format!(
128 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
129 })
130 .unwrap();
131 h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
132 });
133 self.display_ownership_fut = Some(display_ownership_fut);
134 self.add_handler(h)
135 }
136
137 fn into_components(
143 self,
144 ) -> (
145 UnboundedSender<Vec<input_device::InputEvent>>,
146 UnboundedReceiver<Vec<input_device::InputEvent>>,
147 Vec<Rc<dyn input_handler::BatchInputHandler>>,
148 metrics::MetricsLogger,
149 Option<LocalBoxFuture<'static, ()>>,
150 Option<LocalBoxFuture<'static, ()>>,
151 ) {
152 (
153 self.sender,
154 self.receiver,
155 self.handlers,
156 self.metrics_logger,
157 self.display_ownership_fut,
158 self.focus_listener_fut,
159 )
160 }
161
162 pub fn add_focus_listener(
163 mut self,
164 focus_chain_publisher: FocusChainProviderPublisher,
165 ) -> Self {
166 let metrics_logger_clone = self.metrics_logger.clone();
167 let focus_listener_fut = Box::pin(async move {
168 if let Ok(mut focus_listener) =
169 FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
170 log::warn!(
171 "could not create focus listener, focus will not be dispatched: {:?}",
172 e
173 )
174 })
175 {
176 let _result = focus_listener
179 .dispatch_focus_changes()
180 .await
181 .map(|_| {
182 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
183 })
184 .map_err(|e| {
185 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
186 });
187 }
188 });
189 self.focus_listener_fut = Some(focus_listener_fut);
190 self
191 }
192}
193
194pub struct InputPipeline {
222 pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
226
227 device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
230
231 device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
233
234 input_device_types: Vec<input_device::InputDeviceType>,
236
237 input_device_bindings: InputDeviceBindingHashMap,
239
240 inspect_node: fuchsia_inspect::Node,
243
244 metrics_logger: metrics::MetricsLogger,
246
247 pub feature_flags: input_device::InputPipelineFeatureFlags,
249}
250
251impl InputPipeline {
252 fn new_common(
253 input_device_types: Vec<input_device::InputDeviceType>,
254 assembly: InputPipelineAssembly,
255 inspect_node: fuchsia_inspect::Node,
256 feature_flags: input_device::InputPipelineFeatureFlags,
257 ) -> Self {
258 let (
259 pipeline_sender,
260 receiver,
261 handlers,
262 metrics_logger,
263 display_ownership_fut,
264 focus_listener_fut,
265 ) = assembly.into_components();
266
267 let mut handlers_count = handlers.len();
268 if let Some(fut) = display_ownership_fut {
270 fasync::Task::local(fut).detach();
271 handlers_count += 1;
272 }
273
274 if let Some(fut) = focus_listener_fut {
276 fasync::Task::local(fut).detach();
277 handlers_count += 1;
278 }
279
280 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
282 inspect_node.record_uint("handlers_registered", handlers_count as u64);
283 inspect_node.record_uint("handlers_healthy", handlers_count as u64);
284
285 InputPipeline::run(receiver, handlers);
287
288 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
289 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
290 InputPipeline {
291 pipeline_sender,
292 device_event_sender,
293 device_event_receiver,
294 input_device_types,
295 input_device_bindings,
296 inspect_node,
297 metrics_logger,
298 feature_flags,
299 }
300 }
301
302 pub fn new_for_test(
310 input_device_types: Vec<input_device::InputDeviceType>,
311 assembly: InputPipelineAssembly,
312 ) -> Self {
313 let inspector = fuchsia_inspect::Inspector::default();
314 let root = inspector.root();
315 let test_node = root.create_child("input_pipeline");
316 Self::new_common(
317 input_device_types,
318 assembly,
319 test_node,
320 input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
321 )
322 }
323
324 pub fn new(
331 input_device_types: Vec<input_device::InputDeviceType>,
332 assembly: InputPipelineAssembly,
333 inspect_node: fuchsia_inspect::Node,
334 feature_flags: input_device::InputPipelineFeatureFlags,
335 metrics_logger: metrics::MetricsLogger,
336 ) -> Result<Self, Error> {
337 let input_pipeline =
338 Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
339 let input_device_types = input_pipeline.input_device_types.clone();
340 let input_event_sender = input_pipeline.device_event_sender.clone();
341 let input_device_bindings = input_pipeline.input_device_bindings.clone();
342 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
343 let feature_flags = input_pipeline.feature_flags.clone();
344 fasync::Task::local(async move {
345 match async {
348 let dir_proxy = fuchsia_fs::directory::open_in_namespace(
349 input_device::INPUT_REPORT_PATH,
350 fuchsia_fs::PERM_READABLE,
351 )
352 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
353 let device_watcher =
354 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
355 Self::watch_for_devices(
356 device_watcher,
357 dir_proxy,
358 input_device_types,
359 input_event_sender,
360 input_device_bindings,
361 &devices_node,
362 false, feature_flags,
364 metrics_logger.clone(),
365 )
366 .await
367 .context("failed to watch for devices")
368 }
369 .await
370 {
371 Ok(()) => {}
372 Err(err) => {
373 metrics_logger.log_warn(
377 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
378 std::format!(
379 "Input pipeline is unable to watch for new input devices: {:?}",
380 err
381 ));
382 }
383 }
384 })
385 .detach();
386
387 Ok(input_pipeline)
388 }
389
390 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
392 &self.input_device_bindings
393 }
394
395 pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
398 &self.device_event_sender
399 }
400
401 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
403 &self.input_device_types
404 }
405
406 pub async fn handle_input_events(mut self) {
408 let metrics_logger_clone = self.metrics_logger.clone();
409 while let Some(input_event) = self.device_event_receiver.next().await {
410 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
411 metrics_logger_clone.log_error(
412 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
413 std::format!("could not forward event from driver: {:?}", &e));
414 }
415 }
416
417 metrics_logger_clone.log_error(
418 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
419 "Input pipeline stopped handling input events.".to_string(),
420 );
421 }
422
423 async fn watch_for_devices(
439 mut device_watcher: Watcher,
440 dir_proxy: fio::DirectoryProxy,
441 device_types: Vec<input_device::InputDeviceType>,
442 input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
443 bindings: InputDeviceBindingHashMap,
444 input_devices_node: &fuchsia_inspect::Node,
445 break_on_idle: bool,
446 feature_flags: input_device::InputPipelineFeatureFlags,
447 metrics_logger: metrics::MetricsLogger,
448 ) -> Result<(), Error> {
449 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
451 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
452 while let Some(msg) = device_watcher.try_next().await? {
453 if let Ok(filename) = msg.filename.into_os_string().into_string() {
454 if filename == "." {
455 continue;
456 }
457
458 let pathbuf = PathBuf::from(filename.clone());
459 match msg.event {
460 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
461 log::info!("found input device {}", filename);
462 devices_discovered.add(1);
463 let device_proxy =
464 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
465 add_device_bindings(
466 &device_types,
467 &filename,
468 device_proxy,
469 &input_event_sender,
470 &bindings,
471 get_next_device_id(),
472 input_devices_node,
473 Some(&devices_connected),
474 feature_flags.clone(),
475 metrics_logger.clone(),
476 )
477 .await;
478 }
479 WatchEvent::IDLE => {
480 if break_on_idle {
481 break;
482 }
483 }
484 _ => (),
485 }
486 }
487 }
488 input_devices_node.record(devices_discovered);
490 input_devices_node.record(devices_connected);
491 Err(format_err!("Input pipeline stopped watching for new input devices."))
492 }
493
494 pub async fn handle_input_device_registry_request_stream(
509 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
510 device_types: &Vec<input_device::InputDeviceType>,
511 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
512 bindings: &InputDeviceBindingHashMap,
513 input_devices_node: &fuchsia_inspect::Node,
514 feature_flags: input_device::InputPipelineFeatureFlags,
515 metrics_logger: metrics::MetricsLogger,
516 ) -> Result<(), Error> {
517 while let Some(request) = stream
518 .try_next()
519 .await
520 .context("Error handling input device registry request stream")?
521 {
522 match request {
523 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
524 device,
525 ..
526 } => {
527 let device_proxy = device.into_proxy();
529
530 let device_id = get_next_device_id();
531
532 add_device_bindings(
533 device_types,
534 &format!("input-device-registry-{}", device_id),
535 device_proxy,
536 input_event_sender,
537 bindings,
538 device_id,
539 input_devices_node,
540 None,
541 feature_flags.clone(),
542 metrics_logger.clone(),
543 )
544 .await;
545 }
546 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
547 device,
548 responder,
549 .. } => {
550 let device_proxy = device.into_proxy();
552
553 let device_id = get_next_device_id();
554
555 add_device_bindings(
556 device_types,
557 &format!("input-device-registry-{}", device_id),
558 device_proxy,
559 input_event_sender,
560 bindings,
561 device_id,
562 input_devices_node,
563 None,
564 feature_flags.clone(),
565 metrics_logger.clone(),
566 )
567 .await;
568
569 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
570 device_id: Some(device_id),
571 ..Default::default()
572 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
573 }
574 }
575 }
576
577 Ok(())
578 }
579
580 fn run(
582 mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
583 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
584 ) {
585 fasync::Task::local(async move {
586 for handler in &handlers {
587 handler.clone().set_handler_healthy();
588 }
589
590 use input_device::InputEventType;
591 use std::collections::HashMap;
592
593 let mut handlers_by_type: HashMap<
595 InputEventType,
596 Vec<Rc<dyn input_handler::BatchInputHandler>>,
597 > = HashMap::new();
598
599 let event_types = vec![
601 InputEventType::Keyboard,
602 InputEventType::LightSensor,
603 InputEventType::ConsumerControls,
604 InputEventType::Mouse,
605 InputEventType::TouchScreen,
606 InputEventType::Touchpad,
607 #[cfg(test)]
608 InputEventType::Fake,
609 ];
610
611 for event_type in event_types {
612 let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
613 .iter()
614 .filter(|h| h.interest().contains(&event_type))
615 .cloned()
616 .collect();
617 handlers_by_type.insert(event_type, handlers_for_type);
618 }
619
620 while let Some(events) = receiver.next().await {
621 if events.is_empty() {
622 continue;
623 }
624
625 let mut groups_seen = 0;
626 for (event_type, event_group) in events
627 .into_iter()
628 .chunk_by(|e| InputEventType::from(&e.device_event))
629 .into_iter()
630 {
631 groups_seen += 1;
632 if groups_seen == 2 {
633 log::warn!(
634 "it is not recommanded to contains multiple type of event in 1 send"
635 );
636 }
637 let mut events_in_group: Vec<_> = event_group.collect();
638
639 let handlers = handlers_by_type.get(&event_type).unwrap();
641
642 for handler in handlers {
643 events_in_group =
644 handler.clone().handle_input_events(events_in_group).await;
645 }
646
647 for event in events_in_group {
648 if event.handled == input_device::Handled::No {
649 log::warn!("unhandled input event: {:?}", &event);
650 }
651 if let Some(trace_id) = event.trace_id {
652 fuchsia_trace::flow_end!(
653 "input",
654 "event_in_input_pipeline",
655 trace_id.into()
656 );
657 }
658 }
659 }
660 }
661 for handler in &handlers {
662 handler.clone().set_handler_unhealthy("Pipeline loop terminated");
663 }
664 panic!("Runner task is not supposed to terminate.")
665 })
666 .detach();
667 }
668}
669
670async fn add_device_bindings(
690 device_types: &Vec<input_device::InputDeviceType>,
691 filename: &String,
692 device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
693 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
694 bindings: &InputDeviceBindingHashMap,
695 device_id: u32,
696 input_devices_node: &fuchsia_inspect::Node,
697 devices_connected: Option<&fuchsia_inspect::UintProperty>,
698 feature_flags: InputPipelineFeatureFlags,
699 metrics_logger: metrics::MetricsLogger,
700) {
701 let mut matched_device_types = vec![];
702 if let Ok(descriptor) = device_proxy.get_descriptor().await {
703 for device_type in device_types {
704 if input_device::is_device_type(&descriptor, *device_type).await {
705 matched_device_types.push(device_type);
706 match devices_connected {
707 Some(dev_connected) => {
708 let _ = dev_connected.add(1);
709 }
710 None => (),
711 };
712 }
713 }
714 if matched_device_types.is_empty() {
715 log::info!(
716 "device {} did not match any supported device types: {:?}",
717 filename,
718 device_types
719 );
720 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
721 let mut health = fuchsia_inspect::health::Node::new(&device_node);
722 health.set_unhealthy("Unsupported device type.");
723 device_node.record(health);
724 input_devices_node.record(device_node);
725 return;
726 }
727 } else {
728 metrics_logger.clone().log_error(
729 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
730 std::format!("cannot bind device {} without a device descriptor", filename),
731 );
732 return;
733 }
734
735 log::info!(
736 "binding {} to device types: {}",
737 filename,
738 matched_device_types
739 .iter()
740 .fold(String::new(), |device_types_string, device_type| device_types_string
741 + &format!("{:?}, ", device_type))
742 );
743
744 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
745 for device_type in matched_device_types {
746 let proxy = device_proxy.clone();
767 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
768 match input_device::get_device_binding(
769 *device_type,
770 proxy,
771 device_id,
772 input_event_sender.clone(),
773 device_node,
774 feature_flags.clone(),
775 metrics_logger.clone(),
776 )
777 .await
778 {
779 Ok(binding) => new_bindings.push(binding),
780 Err(e) => {
781 metrics_logger.log_error(
782 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
783 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
784 );
785 }
786 }
787 }
788
789 if !new_bindings.is_empty() {
790 let mut bindings = bindings.lock().await;
791 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use super::*;
798 use crate::input_device::{InputDeviceBinding, InputEventType};
799 use crate::utils::Position;
800 use crate::{
801 fake_input_device_binding, mouse_binding, mouse_model_database,
802 observe_fake_events_input_handler,
803 };
804 use async_trait::async_trait;
805 use diagnostics_assertions::AnyProperty;
806 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
807 use fuchsia_async as fasync;
808 use futures::FutureExt;
809 use pretty_assertions::assert_eq;
810 use rand::Rng;
811 use std::collections::HashSet;
812 use vfs::{pseudo_directory, service as pseudo_fs_service};
813
814 const COUNTS_PER_MM: u32 = 12;
815
816 fn send_input_event(
821 sender: UnboundedSender<Vec<input_device::InputEvent>>,
822 ) -> Vec<input_device::InputEvent> {
823 let mut rng = rand::rng();
824 let offset =
825 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
826 let input_event = input_device::InputEvent {
827 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
828 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
829 millimeters: Position {
830 x: offset.x / COUNTS_PER_MM as f32,
831 y: offset.y / COUNTS_PER_MM as f32,
832 },
833 }),
834 None, None, mouse_binding::MousePhase::Move,
837 HashSet::new(),
838 HashSet::new(),
839 None, None, )),
842 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
843 mouse_binding::MouseDeviceDescriptor {
844 device_id: 1,
845 absolute_x_range: None,
846 absolute_y_range: None,
847 wheel_v_range: None,
848 wheel_h_range: None,
849 buttons: None,
850 counts_per_mm: COUNTS_PER_MM,
851 },
852 ),
853 event_time: zx::MonotonicInstant::get(),
854 handled: input_device::Handled::No,
855 trace_id: None,
856 };
857 match sender.unbounded_send(vec![input_event.clone()]) {
858 Err(_) => assert!(false),
859 _ => {}
860 }
861
862 vec![input_event]
863 }
864
865 fn handle_input_device_request(
870 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
871 ) {
872 match input_device_request {
873 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
874 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
875 device_information: None,
876 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
877 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
878 movement_x: None,
879 movement_y: None,
880 scroll_v: None,
881 scroll_h: None,
882 buttons: Some(vec![0]),
883 position_x: None,
884 position_y: None,
885 ..Default::default()
886 }),
887 ..Default::default()
888 }),
889 sensor: None,
890 touch: None,
891 keyboard: None,
892 consumer_control: None,
893 ..Default::default()
894 });
895 }
896 _ => {}
897 }
898 }
899
900 #[fasync::run_singlethreaded(test)]
902 async fn multiple_devices_single_handler() {
903 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
905 let first_device_binding =
906 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
907 let second_device_binding =
908 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
909
910 let (handler_event_sender, mut handler_event_receiver) =
912 futures::channel::mpsc::channel(100);
913 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
914 handler_event_sender,
915 );
916
917 let (sender, receiver, handlers, _, _, _) =
919 InputPipelineAssembly::new(metrics::MetricsLogger::default())
920 .add_handler(input_handler)
921 .into_components();
922 let inspector = fuchsia_inspect::Inspector::default();
923 let test_node = inspector.root().create_child("input_pipeline");
924 let input_pipeline = InputPipeline {
925 pipeline_sender: sender,
926 device_event_sender,
927 device_event_receiver,
928 input_device_types: vec![],
929 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
930 inspect_node: test_node,
931 metrics_logger: metrics::MetricsLogger::default(),
932 feature_flags: input_device::InputPipelineFeatureFlags::default(),
933 };
934 InputPipeline::run(receiver, handlers);
935
936 let first_device_events = send_input_event(first_device_binding.input_event_sender());
938 let second_device_events = send_input_event(second_device_binding.input_event_sender());
939
940 fasync::Task::local(async {
942 input_pipeline.handle_input_events().await;
943 })
944 .detach();
945
946 let first_handled_event = handler_event_receiver.next().await;
948 assert_eq!(first_handled_event, first_device_events.into_iter().next());
949
950 let second_handled_event = handler_event_receiver.next().await;
951 assert_eq!(second_handled_event, second_device_events.into_iter().next());
952 }
953
954 #[fasync::run_singlethreaded(test)]
956 async fn single_device_multiple_handlers() {
957 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
959 let input_device_binding =
960 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
961
962 let (first_handler_event_sender, mut first_handler_event_receiver) =
964 futures::channel::mpsc::channel(100);
965 let first_input_handler =
966 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
967 first_handler_event_sender,
968 );
969 let (second_handler_event_sender, mut second_handler_event_receiver) =
970 futures::channel::mpsc::channel(100);
971 let second_input_handler =
972 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
973 second_handler_event_sender,
974 );
975
976 let (sender, receiver, handlers, _, _, _) =
978 InputPipelineAssembly::new(metrics::MetricsLogger::default())
979 .add_handler(first_input_handler)
980 .add_handler(second_input_handler)
981 .into_components();
982 let inspector = fuchsia_inspect::Inspector::default();
983 let test_node = inspector.root().create_child("input_pipeline");
984 let input_pipeline = InputPipeline {
985 pipeline_sender: sender,
986 device_event_sender,
987 device_event_receiver,
988 input_device_types: vec![],
989 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
990 inspect_node: test_node,
991 metrics_logger: metrics::MetricsLogger::default(),
992 feature_flags: input_device::InputPipelineFeatureFlags::default(),
993 };
994 InputPipeline::run(receiver, handlers);
995
996 let input_events = send_input_event(input_device_binding.input_event_sender());
998
999 fasync::Task::local(async {
1001 input_pipeline.handle_input_events().await;
1002 })
1003 .detach();
1004
1005 let expected_event = input_events.into_iter().next();
1007 let first_handler_event = first_handler_event_receiver.next().await;
1008 assert_eq!(first_handler_event, expected_event);
1009 let second_handler_event = second_handler_event_receiver.next().await;
1010 assert_eq!(second_handler_event, expected_event);
1011 }
1012
1013 #[fasync::run_singlethreaded(test)]
1016 async fn watch_devices_one_match_exists() {
1017 let mut count: i8 = 0;
1019 let dir = pseudo_directory! {
1020 "file_name" => pseudo_fs_service::host(
1021 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1022 async move {
1023 while count < 3 {
1024 if let Some(input_device_request) =
1025 request_stream.try_next().await.unwrap()
1026 {
1027 handle_input_device_request(input_device_request);
1028 count += 1;
1029 }
1030 }
1031
1032 }.boxed()
1033 },
1034 )
1035 };
1036
1037 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1039 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1040 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1043
1044 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1045 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1046 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1047
1048 let inspector = fuchsia_inspect::Inspector::default();
1049 let test_node = inspector.root().create_child("input_pipeline");
1050 test_node.record_string(
1051 "supported_input_devices",
1052 supported_device_types.clone().iter().join(", "),
1053 );
1054 let input_devices = test_node.create_child("input_devices");
1055 diagnostics_assertions::assert_data_tree!(inspector, root: {
1057 input_pipeline: {
1058 supported_input_devices: "Mouse",
1059 input_devices: {}
1060 }
1061 });
1062
1063 let _ = InputPipeline::watch_for_devices(
1064 device_watcher,
1065 dir_proxy_for_pipeline,
1066 supported_device_types,
1067 input_event_sender,
1068 bindings.clone(),
1069 &input_devices,
1070 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1072 metrics::MetricsLogger::default(),
1073 )
1074 .await;
1075
1076 let bindings_hashmap = bindings.lock().await;
1078 assert_eq!(bindings_hashmap.len(), 1);
1079 let bindings_vector = bindings_hashmap.get(&10);
1080 assert!(bindings_vector.is_some());
1081 assert_eq!(bindings_vector.unwrap().len(), 1);
1082 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1083 assert!(boxed_mouse_binding.is_some());
1084 assert_eq!(
1085 boxed_mouse_binding.unwrap().get_device_descriptor(),
1086 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1087 device_id: 10,
1088 absolute_x_range: None,
1089 absolute_y_range: None,
1090 wheel_v_range: None,
1091 wheel_h_range: None,
1092 buttons: Some(vec![0]),
1093 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1094 })
1095 );
1096
1097 diagnostics_assertions::assert_data_tree!(inspector, root: {
1099 input_pipeline: {
1100 supported_input_devices: "Mouse",
1101 input_devices: {
1102 devices_discovered: 1u64,
1103 devices_connected: 1u64,
1104 "file_name_Mouse": contains {
1105 reports_received_count: 0u64,
1106 reports_filtered_count: 0u64,
1107 events_generated: 0u64,
1108 last_received_timestamp_ns: 0u64,
1109 last_generated_timestamp_ns: 0u64,
1110 "fuchsia.inspect.Health": {
1111 status: "OK",
1112 start_timestamp_nanos: AnyProperty
1115 },
1116 }
1117 }
1118 }
1119 });
1120 }
1121
1122 #[fasync::run_singlethreaded(test)]
1125 async fn watch_devices_no_matches_exist() {
1126 let mut count: i8 = 0;
1128 let dir = pseudo_directory! {
1129 "file_name" => pseudo_fs_service::host(
1130 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1131 async move {
1132 while count < 1 {
1133 if let Some(input_device_request) =
1134 request_stream.try_next().await.unwrap()
1135 {
1136 handle_input_device_request(input_device_request);
1137 count += 1;
1138 }
1139 }
1140
1141 }.boxed()
1142 },
1143 )
1144 };
1145
1146 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1148 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1149 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1152
1153 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1154 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1155 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1156
1157 let inspector = fuchsia_inspect::Inspector::default();
1158 let test_node = inspector.root().create_child("input_pipeline");
1159 test_node.record_string(
1160 "supported_input_devices",
1161 supported_device_types.clone().iter().join(", "),
1162 );
1163 let input_devices = test_node.create_child("input_devices");
1164 diagnostics_assertions::assert_data_tree!(inspector, root: {
1166 input_pipeline: {
1167 supported_input_devices: "Keyboard",
1168 input_devices: {}
1169 }
1170 });
1171
1172 let _ = InputPipeline::watch_for_devices(
1173 device_watcher,
1174 dir_proxy_for_pipeline,
1175 supported_device_types,
1176 input_event_sender,
1177 bindings.clone(),
1178 &input_devices,
1179 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1181 metrics::MetricsLogger::default(),
1182 )
1183 .await;
1184
1185 let bindings = bindings.lock().await;
1187 assert_eq!(bindings.len(), 0);
1188
1189 diagnostics_assertions::assert_data_tree!(inspector, root: {
1191 input_pipeline: {
1192 supported_input_devices: "Keyboard",
1193 input_devices: {
1194 devices_discovered: 1u64,
1195 devices_connected: 0u64,
1196 "file_name_Unsupported": {
1197 "fuchsia.inspect.Health": {
1198 status: "UNHEALTHY",
1199 message: "Unsupported device type.",
1200 start_timestamp_nanos: AnyProperty
1203 },
1204 }
1205 }
1206 }
1207 });
1208 }
1209
1210 #[fasync::run_singlethreaded(test)]
1213 async fn handle_input_device_registry_request_stream() {
1214 let (input_device_registry_proxy, input_device_registry_request_stream) =
1215 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1216 let (input_device_client_end, mut input_device_request_stream) =
1217 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1218
1219 let device_types = vec![input_device::InputDeviceType::Mouse];
1220 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1221 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1222
1223 let mut count: i8 = 0;
1225 fasync::Task::local(async move {
1226 let _ = input_device_registry_proxy.register(input_device_client_end);
1228
1229 while count < 3 {
1230 if let Some(input_device_request) =
1231 input_device_request_stream.try_next().await.unwrap()
1232 {
1233 handle_input_device_request(input_device_request);
1234 count += 1;
1235 }
1236 }
1237
1238 input_device_registry_proxy.take_event_stream();
1240 })
1241 .detach();
1242
1243 let inspector = fuchsia_inspect::Inspector::default();
1244 let test_node = inspector.root().create_child("input_pipeline");
1245
1246 let bindings_clone = bindings.clone();
1248 let _ = InputPipeline::handle_input_device_registry_request_stream(
1249 input_device_registry_request_stream,
1250 &device_types,
1251 &input_event_sender,
1252 &bindings_clone,
1253 &test_node,
1254 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1255 metrics::MetricsLogger::default(),
1256 )
1257 .await;
1258
1259 let bindings = bindings.lock().await;
1261 assert_eq!(bindings.len(), 1);
1262 }
1263
1264 #[fasync::run_singlethreaded(test)]
1266 async fn check_inspect_node_has_correct_properties() {
1267 let device_types = vec![
1268 input_device::InputDeviceType::Touch,
1269 input_device::InputDeviceType::ConsumerControls,
1270 ];
1271 let inspector = fuchsia_inspect::Inspector::default();
1272 let test_node = inspector.root().create_child("input_pipeline");
1273 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1275 futures::channel::mpsc::channel(100);
1276 let fake_input_handler =
1277 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1278 fake_handler_event_sender,
1279 );
1280 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1281 .add_handler(fake_input_handler);
1282 let _test_input_pipeline = InputPipeline::new(
1283 device_types,
1284 assembly,
1285 test_node,
1286 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1287 metrics::MetricsLogger::default(),
1288 );
1289 diagnostics_assertions::assert_data_tree!(inspector, root: {
1290 input_pipeline: {
1291 supported_input_devices: "Touch, ConsumerControls",
1292 handlers_registered: 1u64,
1293 handlers_healthy: 1u64,
1294 input_devices: {}
1295 }
1296 });
1297 }
1298
1299 struct SpecificInterestFakeHandler {
1300 interest_types: Vec<input_device::InputEventType>,
1301 event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1302 }
1303
1304 impl SpecificInterestFakeHandler {
1305 pub fn new(
1306 interest_types: Vec<input_device::InputEventType>,
1307 event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1308 ) -> Rc<Self> {
1309 Rc::new(SpecificInterestFakeHandler {
1310 interest_types,
1311 event_sender: std::cell::RefCell::new(event_sender),
1312 })
1313 }
1314 }
1315
1316 impl Handler for SpecificInterestFakeHandler {
1317 fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1318 fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1319 fn get_name(&self) -> &'static str {
1320 "SpecificInterestFakeHandler"
1321 }
1322
1323 fn interest(&self) -> Vec<input_device::InputEventType> {
1324 self.interest_types.clone()
1325 }
1326 }
1327
1328 #[async_trait(?Send)]
1329 impl input_handler::InputHandler for SpecificInterestFakeHandler {
1330 async fn handle_input_event(
1331 self: Rc<Self>,
1332 input_event: input_device::InputEvent,
1333 ) -> Vec<input_device::InputEvent> {
1334 match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1335 Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1336 Ok(_) => {}
1337 }
1338 vec![input_event]
1339 }
1340 }
1341
1342 #[fasync::run_singlethreaded(test)]
1343 async fn run_only_sends_events_to_interested_handlers() {
1344 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1346 let mouse_handler =
1347 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1348
1349 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1351 let fake_handler =
1352 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1353
1354 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1355 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1356 .add_handler(mouse_handler)
1357 .add_handler(fake_handler)
1358 .into_components();
1359
1360 InputPipeline::run(pipeline_receiver, handlers);
1362
1363 let fake_event = input_device::InputEvent {
1365 device_event: input_device::InputDeviceEvent::Fake,
1366 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1367 event_time: zx::MonotonicInstant::get(),
1368 handled: input_device::Handled::No,
1369 trace_id: None,
1370 };
1371
1372 pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1374
1375 let received_by_fake = fake_receiver.next().await;
1377 assert_eq!(received_by_fake, Some(fake_event));
1378
1379 assert!(mouse_receiver.try_next().is_err());
1381 }
1382
1383 fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1384 input_device::InputEvent {
1385 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1386 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1387 millimeters: Position { x, y },
1388 }),
1389 None,
1390 None,
1391 mouse_binding::MousePhase::Move,
1392 HashSet::new(),
1393 HashSet::new(),
1394 None,
1395 None,
1396 )),
1397 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1398 mouse_binding::MouseDeviceDescriptor {
1399 device_id: 1,
1400 absolute_x_range: None,
1401 absolute_y_range: None,
1402 wheel_v_range: None,
1403 wheel_h_range: None,
1404 buttons: None,
1405 counts_per_mm: 1,
1406 },
1407 ),
1408 event_time: zx::MonotonicInstant::get(),
1409 handled: input_device::Handled::No,
1410 trace_id: None,
1411 }
1412 }
1413
1414 #[fasync::run_singlethreaded(test)]
1415 async fn run_mixed_event_types_dispatched_correctly() {
1416 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1418 let mouse_handler =
1419 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1420
1421 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1423 let fake_handler =
1424 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1425
1426 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1427 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1428 .add_handler(mouse_handler)
1429 .add_handler(fake_handler)
1430 .into_components();
1431
1432 InputPipeline::run(pipeline_receiver, handlers);
1434
1435 let mouse_event_1 = create_mouse_event(1.0, 1.0);
1437 let mouse_event_2 = create_mouse_event(2.0, 2.0);
1438 let mouse_event_3 = create_mouse_event(3.0, 3.0);
1439
1440 let fake_event_1 = input_device::InputEvent {
1441 device_event: input_device::InputDeviceEvent::Fake,
1442 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1443 event_time: zx::MonotonicInstant::get(),
1444 handled: input_device::Handled::No,
1445 trace_id: None,
1446 };
1447
1448 let mixed_batch = vec![
1451 mouse_event_1.clone(),
1452 mouse_event_2.clone(),
1453 fake_event_1.clone(),
1454 mouse_event_3.clone(),
1455 ];
1456 pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1457
1458 assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1460 assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1461 assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1462
1463 assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1465 }
1466}