1use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, Transport, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use fidl_fuchsia_io as fio;
13use focus_chain_provider::FocusChainProviderPublisher;
14use fuchsia_async as fasync;
15use fuchsia_component::directory::AsRefDirectory;
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use fuchsia_inspect::NumericProperty;
18use fuchsia_inspect::health::Reporter;
19use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
20use futures::future::LocalBoxFuture;
21use futures::lock::Mutex;
22use futures::{StreamExt, TryStreamExt};
23use itertools::Itertools;
24use metrics_registry::*;
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::rc::Rc;
28use std::sync::atomic::{AtomicU32, Ordering};
29use std::sync::{Arc, LazyLock};
30use strum::EnumCount;
31
32static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
39
40fn get_next_device_id() -> u32 {
44 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
45}
46
47type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
48
49pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
52
53pub struct InputPipelineAssembly {
67 sender: UnboundedSender<Vec<input_device::InputEvent>>,
70 receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
73
74 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
76
77 display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
79
80 focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
82
83 metrics_logger: metrics::MetricsLogger,
85}
86
87impl InputPipelineAssembly {
88 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
91 let (sender, receiver) = mpsc::unbounded();
92 InputPipelineAssembly {
93 sender,
94 receiver,
95 handlers: vec![],
96 metrics_logger,
97 display_ownership_fut: None,
98 focus_listener_fut: None,
99 }
100 }
101
102 pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
105 self.handlers.push(handler);
106 self
107 }
108
109 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
111 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
112 }
113
114 pub fn add_display_ownership(
115 mut self,
116 display_ownership_event: zx::Event,
117 input_handlers_node: &fuchsia_inspect::Node,
118 ) -> InputPipelineAssembly {
119 let h = DisplayOwnership::new(
120 display_ownership_event,
121 input_handlers_node,
122 self.metrics_logger.clone(),
123 );
124 let metrics_logger_clone = self.metrics_logger.clone();
125 let h_clone = h.clone();
126 let sender_clone = self.sender.clone();
127 let display_ownership_fut = Box::pin(async move {
128 h_clone.clone().set_handler_healthy();
129 h_clone.clone()
130 .handle_ownership_change(sender_clone)
131 .await
132 .map_err(|e| {
133 metrics_logger_clone.log_error(
134 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
135 std::format!(
136 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
137 })
138 .unwrap();
139 h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
140 });
141 self.display_ownership_fut = Some(display_ownership_fut);
142 self.add_handler(h)
143 }
144
145 fn into_components(
151 self,
152 ) -> (
153 UnboundedSender<Vec<input_device::InputEvent>>,
154 UnboundedReceiver<Vec<input_device::InputEvent>>,
155 Vec<Rc<dyn input_handler::BatchInputHandler>>,
156 metrics::MetricsLogger,
157 Option<LocalBoxFuture<'static, ()>>,
158 Option<LocalBoxFuture<'static, ()>>,
159 ) {
160 (
161 self.sender,
162 self.receiver,
163 self.handlers,
164 self.metrics_logger,
165 self.display_ownership_fut,
166 self.focus_listener_fut,
167 )
168 }
169
170 pub fn add_focus_listener(
171 mut self,
172 incoming: &Incoming,
173 focus_chain_publisher: FocusChainProviderPublisher,
174 ) -> Self {
175 let metrics_logger_clone = self.metrics_logger.clone();
176 let incoming2 = incoming.clone();
177 let focus_listener_fut = Box::pin(async move {
178 if let Ok(mut focus_listener) = FocusListener::new(
179 &incoming2,
180 focus_chain_publisher,
181 metrics_logger_clone,
182 )
183 .map_err(|e| {
184 log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
185 }) {
186 let _result = focus_listener
189 .dispatch_focus_changes()
190 .await
191 .map(|_| {
192 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
193 })
194 .map_err(|e| {
195 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
196 });
197 }
198 });
199 self.focus_listener_fut = Some(focus_listener_fut);
200 self
201 }
202}
203
204pub struct InputPipeline {
232 pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
236
237 device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
240
241 device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
243
244 input_device_types: Vec<input_device::InputDeviceType>,
246
247 input_device_bindings: InputDeviceBindingHashMap,
249
250 inspect_node: fuchsia_inspect::Node,
253
254 metrics_logger: metrics::MetricsLogger,
256
257 pub feature_flags: input_device::InputPipelineFeatureFlags,
259}
260
261impl InputPipeline {
262 fn new_common(
263 input_device_types: Vec<input_device::InputDeviceType>,
264 assembly: InputPipelineAssembly,
265 inspect_node: fuchsia_inspect::Node,
266 feature_flags: input_device::InputPipelineFeatureFlags,
267 ) -> Self {
268 let (
269 pipeline_sender,
270 receiver,
271 handlers,
272 metrics_logger,
273 display_ownership_fut,
274 focus_listener_fut,
275 ) = assembly.into_components();
276
277 let mut handlers_count = handlers.len();
278 if let Some(fut) = display_ownership_fut {
280 fasync::Task::local(fut).detach();
281 handlers_count += 1;
282 }
283
284 if let Some(fut) = focus_listener_fut {
286 fasync::Task::local(fut).detach();
287 handlers_count += 1;
288 }
289
290 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
292 inspect_node.record_uint("handlers_registered", handlers_count as u64);
293 inspect_node.record_uint("handlers_healthy", handlers_count as u64);
294
295 InputPipeline::run(receiver, handlers, metrics_logger.clone());
297
298 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
299 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
300 InputPipeline {
301 pipeline_sender,
302 device_event_sender,
303 device_event_receiver,
304 input_device_types,
305 input_device_bindings,
306 inspect_node,
307 metrics_logger,
308 feature_flags,
309 }
310 }
311
312 pub fn new_for_test(
320 input_device_types: Vec<input_device::InputDeviceType>,
321 assembly: InputPipelineAssembly,
322 ) -> Self {
323 let inspector = fuchsia_inspect::Inspector::default();
324 let root = inspector.root();
325 let test_node = root.create_child("input_pipeline");
326 Self::new_common(
327 input_device_types,
328 assembly,
329 test_node,
330 input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
331 )
332 }
333
334 pub fn new(
341 incoming: &Incoming,
342 input_device_types: Vec<input_device::InputDeviceType>,
343 assembly: InputPipelineAssembly,
344 inspect_node: fuchsia_inspect::Node,
345 feature_flags: input_device::InputPipelineFeatureFlags,
346 metrics_logger: metrics::MetricsLogger,
347 ) -> Result<Self, Error> {
348 let input_pipeline =
349 Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
350 let input_device_types = input_pipeline.input_device_types.clone();
351 let input_event_sender = input_pipeline.device_event_sender.clone();
352 let input_device_bindings = input_pipeline.input_device_bindings.clone();
353 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
354 let feature_flags = input_pipeline.feature_flags.clone();
355 let incoming = incoming.clone();
356 fasync::Task::local(async move {
361 match async {
364 let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
365 incoming.as_ref_directory().open(
366 input_device::INPUT_REPORT_PATH,
367 fio::PERM_READABLE,
368 server.into()
369 )
370 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
371 let device_watcher =
372 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
373 Self::watch_for_devices(
374 device_watcher,
375 dir_proxy,
376 input_device_types,
377 input_event_sender,
378 input_device_bindings,
379 &devices_node,
380 false, feature_flags,
382 metrics_logger.clone(),
383 )
384 .await
385 .context("failed to watch for devices")
386 }
387 .await
388 {
389 Ok(()) => {}
390 Err(err) => {
391 metrics_logger.log_warn(
395 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
396 std::format!(
397 "Input pipeline is unable to watch for new input devices: {:?}",
398 err
399 ));
400 }
401 }
402 }).detach();
403
404 Ok(input_pipeline)
405 }
406
407 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
409 &self.input_device_bindings
410 }
411
412 pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
415 &self.device_event_sender
416 }
417
418 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
420 &self.input_device_types
421 }
422
423 pub async fn handle_input_events(mut self) {
425 let metrics_logger_clone = self.metrics_logger.clone();
426 while let Some(input_event) = self.device_event_receiver.next().await {
427 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
428 metrics_logger_clone.log_error(
429 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
430 std::format!("could not forward event from driver: {:?}", &e));
431 }
432 }
433
434 metrics_logger_clone.log_error(
435 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
436 "Input pipeline stopped handling input events.".to_string(),
437 );
438 }
439
440 async fn watch_for_devices(
456 mut device_watcher: Watcher,
457 dir_proxy: fio::DirectoryProxy,
458 device_types: Vec<input_device::InputDeviceType>,
459 input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
460 bindings: InputDeviceBindingHashMap,
461 input_devices_node: &fuchsia_inspect::Node,
462 break_on_idle: bool,
463 feature_flags: input_device::InputPipelineFeatureFlags,
464 metrics_logger: metrics::MetricsLogger,
465 ) -> Result<(), Error> {
466 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
468 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
469 while let Some(msg) = device_watcher.try_next().await? {
470 if let Ok(filename) = msg.filename.into_os_string().into_string() {
471 if filename == "." {
472 continue;
473 }
474
475 let pathbuf = PathBuf::from(filename.clone());
476 match msg.event {
477 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
478 log::info!("found input device {}", filename);
479 devices_discovered.add(1);
480 let device_proxy =
481 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
482 add_device_bindings(
483 &device_types,
484 &filename,
485 device_proxy,
486 &input_event_sender,
487 &bindings,
488 get_next_device_id(),
489 input_devices_node,
490 Some(&devices_connected),
491 feature_flags.clone(),
492 metrics_logger.clone(),
493 )
494 .await;
495 }
496 WatchEvent::IDLE => {
497 if break_on_idle {
498 break;
499 }
500 }
501 _ => (),
502 }
503 }
504 }
505 input_devices_node.record(devices_discovered);
507 input_devices_node.record(devices_connected);
508 Err(format_err!("Input pipeline stopped watching for new input devices."))
509 }
510
511 pub async fn handle_input_device_registry_request_stream(
526 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
527 device_types: &Vec<input_device::InputDeviceType>,
528 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
529 bindings: &InputDeviceBindingHashMap,
530 input_devices_node: &fuchsia_inspect::Node,
531 feature_flags: input_device::InputPipelineFeatureFlags,
532 metrics_logger: metrics::MetricsLogger,
533 ) -> Result<(), Error> {
534 while let Some(request) = stream
535 .try_next()
536 .await
537 .context("Error handling input device registry request stream")?
538 {
539 match request {
540 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
541 device,
542 ..
543 } => {
544 let device = fidl_next::ClientEnd::<
546 fidl_next_fuchsia_input_report::InputDevice,
547 zx::Channel,
548 >::from_untyped(device.into_channel());
549 let device = Dispatcher::client_from_zx_channel(device);
550 let device = device.spawn();
551 let device_id = get_next_device_id();
552
553 add_device_bindings(
554 device_types,
555 &format!("input-device-registry-{}", device_id),
556 device,
557 input_event_sender,
558 bindings,
559 device_id,
560 input_devices_node,
561 None,
562 feature_flags.clone(),
563 metrics_logger.clone(),
564 )
565 .await;
566 }
567 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
568 device,
569 responder,
570 .. } => {
571 let device = fidl_next::ClientEnd::<
573 fidl_next_fuchsia_input_report::InputDevice,
574 zx::Channel,
575 >::from_untyped(device.into_channel());
576 let device = Dispatcher::client_from_zx_channel(device);
577 let device = device.spawn();
578 let device_id = get_next_device_id();
579
580 add_device_bindings(
581 device_types,
582 &format!("input-device-registry-{}", device_id),
583 device,
584 input_event_sender,
585 bindings,
586 device_id,
587 input_devices_node,
588 None,
589 feature_flags.clone(),
590 metrics_logger.clone(),
591 )
592 .await;
593
594 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
595 device_id: Some(device_id),
596 ..Default::default()
597 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
598 }
599 }
600 }
601
602 Ok(())
603 }
604
605 fn run(
607 mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
608 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
609 metrics_logger: metrics::MetricsLogger,
610 ) {
611 Dispatcher::spawn_local(async move {
612 for handler in &handlers {
613 handler.clone().set_handler_healthy();
614 }
615
616 use input_device::InputEventType;
617
618 let mut handlers_by_type: [Vec<Rc<dyn input_handler::BatchInputHandler>>; InputEventType::COUNT] = Default::default();
619
620 let event_types = vec![
622 InputEventType::Keyboard,
623 InputEventType::LightSensor,
624 InputEventType::ConsumerControls,
625 InputEventType::Mouse,
626 InputEventType::TouchScreen,
627 InputEventType::Touchpad,
628 #[cfg(test)]
629 InputEventType::Fake,
630 ];
631
632 for event_type in event_types {
633 let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
634 .iter()
635 .filter(|h| h.interest().contains(&event_type))
636 .cloned()
637 .collect();
638 handlers_by_type[event_type as usize] = handlers_for_type;
639 }
640
641 while let Some(events) = receiver.next().await {
642 if events.is_empty() {
643 continue;
644 }
645
646 let mut groups_seen = 0;
647 let events = events
648 .into_iter()
649 .chunk_by(|e| InputEventType::from(&e.device_event));
650 let events = events.into_iter().map(|(k, v)| (k, v.collect::<Vec<_>>()));
651 for (event_type, event_group) in events {
652 groups_seen += 1;
653 if groups_seen == 2 {
654 metrics_logger.log_error(
655 InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
656 "it is not recommended to contain multiple types of events in 1 send".to_string(),
657 );
658 }
659 let mut events_in_group = event_group;
660
661 let handlers = &handlers_by_type[event_type as usize];
663
664 for handler in handlers {
665 events_in_group =
666 handler.clone().handle_input_events(events_in_group).await;
667 }
668
669 for event in events_in_group {
670 if event.handled == input_device::Handled::No {
671 log::warn!("unhandled input event: {:?}", &event);
672 }
673 if let Some(trace_id) = event.trace_id {
674 fuchsia_trace::flow_end!(
675 "input",
676 "event_in_input_pipeline",
677 trace_id.into()
678 );
679 }
680 }
681 }
682 }
683 for handler in &handlers {
684 handler.clone().set_handler_unhealthy("Pipeline loop terminated");
685 }
686 panic!("Runner task is not supposed to terminate.")
687 }).detach();
688 }
689}
690
691async fn add_device_bindings(
711 device_types: &Vec<input_device::InputDeviceType>,
712 filename: &String,
713 device_proxy: fidl_next::Client<fidl_next_fuchsia_input_report::InputDevice, Transport>,
714 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
715 bindings: &InputDeviceBindingHashMap,
716 device_id: u32,
717 input_devices_node: &fuchsia_inspect::Node,
718 devices_connected: Option<&fuchsia_inspect::UintProperty>,
719 feature_flags: InputPipelineFeatureFlags,
720 metrics_logger: metrics::MetricsLogger,
721) {
722 let mut matched_device_types = vec![];
723 if let Ok(res) = device_proxy.get_descriptor().await {
724 for device_type in device_types {
725 if input_device::is_device_type(&res.descriptor, *device_type).await {
726 matched_device_types.push(device_type);
727 match devices_connected {
728 Some(dev_connected) => {
729 let _ = dev_connected.add(1);
730 }
731 None => (),
732 };
733 }
734 }
735 if matched_device_types.is_empty() {
736 log::info!(
737 "device {} did not match any supported device types: {:?}",
738 filename,
739 device_types
740 );
741 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
742 let mut health = fuchsia_inspect::health::Node::new(&device_node);
743 health.set_unhealthy("Unsupported device type.");
744 device_node.record(health);
745 input_devices_node.record(device_node);
746 return;
747 }
748 } else {
749 metrics_logger.clone().log_error(
750 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
751 std::format!("cannot bind device {} without a device descriptor", filename),
752 );
753 return;
754 }
755
756 log::info!(
757 "binding {} to device types: {}",
758 filename,
759 matched_device_types
760 .iter()
761 .fold(String::new(), |device_types_string, device_type| device_types_string
762 + &format!("{:?}, ", device_type))
763 );
764
765 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
766 for device_type in matched_device_types {
767 let proxy = device_proxy.clone();
788 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
789 match input_device::get_device_binding(
790 *device_type,
791 proxy,
792 device_id,
793 input_event_sender.clone(),
794 device_node,
795 feature_flags.clone(),
796 metrics_logger.clone(),
797 )
798 .await
799 {
800 Ok(binding) => new_bindings.push(binding),
801 Err(e) => {
802 metrics_logger.log_error(
803 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
804 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
805 );
806 }
807 }
808 }
809
810 if !new_bindings.is_empty() {
811 let mut bindings = bindings.lock().await;
812 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
813 }
814}
815
816#[cfg(test)]
817mod tests {
818 use super::*;
819 use crate::input_device::{InputDeviceBinding, InputEventType};
820 use crate::utils::Position;
821 use crate::{
822 fake_input_device_binding, mouse_binding, mouse_model_database,
823 observe_fake_events_input_handler,
824 };
825 use async_trait::async_trait;
826 use diagnostics_assertions::AnyProperty;
827 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
828 use fuchsia_async as fasync;
829 use futures::FutureExt;
830 use pretty_assertions::assert_eq;
831 use rand::Rng;
832 use sorted_vec_map_rs::SortedVecSet;
833 use vfs::{pseudo_directory, service as pseudo_fs_service};
834
835 const COUNTS_PER_MM: u32 = 12;
836
837 fn send_input_event(
842 sender: UnboundedSender<Vec<input_device::InputEvent>>,
843 ) -> Vec<input_device::InputEvent> {
844 let mut rng = rand::rng();
845 let offset =
846 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
847 let input_event = input_device::InputEvent {
848 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
849 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
850 millimeters: Position {
851 x: offset.x / COUNTS_PER_MM as f32,
852 y: offset.y / COUNTS_PER_MM as f32,
853 },
854 }),
855 None, None, mouse_binding::MousePhase::Move,
858 SortedVecSet::new(),
859 SortedVecSet::new(),
860 None, None, )),
863 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
864 mouse_binding::MouseDeviceDescriptor {
865 device_id: 1,
866 absolute_x_range: None,
867 absolute_y_range: None,
868 wheel_v_range: None,
869 wheel_h_range: None,
870 buttons: None,
871 counts_per_mm: COUNTS_PER_MM,
872 },
873 ),
874 event_time: zx::MonotonicInstant::get(),
875 handled: input_device::Handled::No,
876 trace_id: None,
877 };
878 match sender.unbounded_send(vec![input_event.clone()]) {
879 Err(_) => assert!(false),
880 _ => {}
881 }
882
883 vec![input_event]
884 }
885
886 fn handle_input_device_request(
891 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
892 ) {
893 match input_device_request {
894 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
895 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
896 device_information: None,
897 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
898 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
899 movement_x: None,
900 movement_y: None,
901 scroll_v: None,
902 scroll_h: None,
903 buttons: Some(vec![0]),
904 position_x: None,
905 position_y: None,
906 ..Default::default()
907 }),
908 ..Default::default()
909 }),
910 sensor: None,
911 touch: None,
912 keyboard: None,
913 consumer_control: None,
914 ..Default::default()
915 });
916 }
917 _ => {}
918 }
919 }
920
921 #[fasync::run_singlethreaded(test)]
923 async fn multiple_devices_single_handler() {
924 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
926 let first_device_binding =
927 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
928 let second_device_binding =
929 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
930
931 let (handler_event_sender, mut handler_event_receiver) =
933 futures::channel::mpsc::channel(100);
934 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
935 handler_event_sender,
936 );
937
938 let (sender, receiver, handlers, _, _, _) =
940 InputPipelineAssembly::new(metrics::MetricsLogger::default())
941 .add_handler(input_handler)
942 .into_components();
943 let inspector = fuchsia_inspect::Inspector::default();
944 let test_node = inspector.root().create_child("input_pipeline");
945 let input_pipeline = InputPipeline {
946 pipeline_sender: sender,
947 device_event_sender,
948 device_event_receiver,
949 input_device_types: vec![],
950 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
951 inspect_node: test_node,
952 metrics_logger: metrics::MetricsLogger::default(),
953 feature_flags: input_device::InputPipelineFeatureFlags::default(),
954 };
955 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
956
957 let first_device_events = send_input_event(first_device_binding.input_event_sender());
959 let second_device_events = send_input_event(second_device_binding.input_event_sender());
960
961 fasync::Task::local(async {
963 input_pipeline.handle_input_events().await;
964 })
965 .detach();
966
967 let first_handled_event = handler_event_receiver.next().await;
969 assert_eq!(first_handled_event, first_device_events.into_iter().next());
970
971 let second_handled_event = handler_event_receiver.next().await;
972 assert_eq!(second_handled_event, second_device_events.into_iter().next());
973 }
974
975 #[fasync::run_singlethreaded(test)]
977 async fn single_device_multiple_handlers() {
978 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
980 let input_device_binding =
981 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
982
983 let (first_handler_event_sender, mut first_handler_event_receiver) =
985 futures::channel::mpsc::channel(100);
986 let first_input_handler =
987 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
988 first_handler_event_sender,
989 );
990 let (second_handler_event_sender, mut second_handler_event_receiver) =
991 futures::channel::mpsc::channel(100);
992 let second_input_handler =
993 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
994 second_handler_event_sender,
995 );
996
997 let (sender, receiver, handlers, _, _, _) =
999 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1000 .add_handler(first_input_handler)
1001 .add_handler(second_input_handler)
1002 .into_components();
1003 let inspector = fuchsia_inspect::Inspector::default();
1004 let test_node = inspector.root().create_child("input_pipeline");
1005 let input_pipeline = InputPipeline {
1006 pipeline_sender: sender,
1007 device_event_sender,
1008 device_event_receiver,
1009 input_device_types: vec![],
1010 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
1011 inspect_node: test_node,
1012 metrics_logger: metrics::MetricsLogger::default(),
1013 feature_flags: input_device::InputPipelineFeatureFlags::default(),
1014 };
1015 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1016
1017 let input_events = send_input_event(input_device_binding.input_event_sender());
1019
1020 fasync::Task::local(async {
1022 input_pipeline.handle_input_events().await;
1023 })
1024 .detach();
1025
1026 let expected_event = input_events.into_iter().next();
1028 let first_handler_event = first_handler_event_receiver.next().await;
1029 assert_eq!(first_handler_event, expected_event);
1030 let second_handler_event = second_handler_event_receiver.next().await;
1031 assert_eq!(second_handler_event, expected_event);
1032 }
1033
1034 #[fasync::run_singlethreaded(test)]
1037 async fn watch_devices_one_match_exists() {
1038 let mut count: i8 = 0;
1040 let dir = pseudo_directory! {
1041 "file_name" => pseudo_fs_service::host(
1042 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1043 async move {
1044 while count < 3 {
1045 if let Some(input_device_request) =
1046 request_stream.try_next().await.unwrap()
1047 {
1048 handle_input_device_request(input_device_request);
1049 count += 1;
1050 }
1051 }
1052
1053 }.boxed()
1054 },
1055 )
1056 };
1057
1058 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1060 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1061 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1064
1065 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1066 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1067 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1068
1069 let inspector = fuchsia_inspect::Inspector::default();
1070 let test_node = inspector.root().create_child("input_pipeline");
1071 test_node.record_string(
1072 "supported_input_devices",
1073 supported_device_types.clone().iter().join(", "),
1074 );
1075 let input_devices = test_node.create_child("input_devices");
1076 diagnostics_assertions::assert_data_tree!(inspector, root: {
1078 input_pipeline: {
1079 supported_input_devices: "Mouse",
1080 input_devices: {}
1081 }
1082 });
1083
1084 let _ = InputPipeline::watch_for_devices(
1085 device_watcher,
1086 dir_proxy_for_pipeline,
1087 supported_device_types,
1088 input_event_sender,
1089 bindings.clone(),
1090 &input_devices,
1091 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1093 metrics::MetricsLogger::default(),
1094 )
1095 .await;
1096
1097 let bindings_hashmap = bindings.lock().await;
1099 assert_eq!(bindings_hashmap.len(), 1);
1100 let bindings_vector = bindings_hashmap.get(&10);
1101 assert!(bindings_vector.is_some());
1102 assert_eq!(bindings_vector.unwrap().len(), 1);
1103 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1104 assert!(boxed_mouse_binding.is_some());
1105 assert_eq!(
1106 boxed_mouse_binding.unwrap().get_device_descriptor(),
1107 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1108 device_id: 10,
1109 absolute_x_range: None,
1110 absolute_y_range: None,
1111 wheel_v_range: None,
1112 wheel_h_range: None,
1113 buttons: Some(vec![0]),
1114 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1115 })
1116 );
1117
1118 diagnostics_assertions::assert_data_tree!(inspector, root: {
1120 input_pipeline: {
1121 supported_input_devices: "Mouse",
1122 input_devices: {
1123 devices_discovered: 1u64,
1124 devices_connected: 1u64,
1125 "file_name_Mouse": contains {
1126 reports_received_count: 0u64,
1127 reports_filtered_count: 0u64,
1128 events_generated: 0u64,
1129 last_received_timestamp_ns: 0u64,
1130 last_generated_timestamp_ns: 0u64,
1131 "fuchsia.inspect.Health": {
1132 status: "OK",
1133 start_timestamp_nanos: AnyProperty
1136 },
1137 }
1138 }
1139 }
1140 });
1141 }
1142
1143 #[fasync::run_singlethreaded(test)]
1146 async fn watch_devices_no_matches_exist() {
1147 let mut count: i8 = 0;
1149 let dir = pseudo_directory! {
1150 "file_name" => pseudo_fs_service::host(
1151 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1152 async move {
1153 while count < 1 {
1154 if let Some(input_device_request) =
1155 request_stream.try_next().await.unwrap()
1156 {
1157 handle_input_device_request(input_device_request);
1158 count += 1;
1159 }
1160 }
1161
1162 }.boxed()
1163 },
1164 )
1165 };
1166
1167 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1169 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1170 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1173
1174 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1175 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1176 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1177
1178 let inspector = fuchsia_inspect::Inspector::default();
1179 let test_node = inspector.root().create_child("input_pipeline");
1180 test_node.record_string(
1181 "supported_input_devices",
1182 supported_device_types.clone().iter().join(", "),
1183 );
1184 let input_devices = test_node.create_child("input_devices");
1185 diagnostics_assertions::assert_data_tree!(inspector, root: {
1187 input_pipeline: {
1188 supported_input_devices: "Keyboard",
1189 input_devices: {}
1190 }
1191 });
1192
1193 let _ = InputPipeline::watch_for_devices(
1194 device_watcher,
1195 dir_proxy_for_pipeline,
1196 supported_device_types,
1197 input_event_sender,
1198 bindings.clone(),
1199 &input_devices,
1200 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1202 metrics::MetricsLogger::default(),
1203 )
1204 .await;
1205
1206 let bindings = bindings.lock().await;
1208 assert_eq!(bindings.len(), 0);
1209
1210 diagnostics_assertions::assert_data_tree!(inspector, root: {
1212 input_pipeline: {
1213 supported_input_devices: "Keyboard",
1214 input_devices: {
1215 devices_discovered: 1u64,
1216 devices_connected: 0u64,
1217 "file_name_Unsupported": {
1218 "fuchsia.inspect.Health": {
1219 status: "UNHEALTHY",
1220 message: "Unsupported device type.",
1221 start_timestamp_nanos: AnyProperty
1224 },
1225 }
1226 }
1227 }
1228 });
1229 }
1230
1231 #[fasync::run_singlethreaded(test)]
1234 async fn handle_input_device_registry_request_stream() {
1235 let (input_device_registry_proxy, input_device_registry_request_stream) =
1236 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1237 let (input_device_client_end, mut input_device_request_stream) =
1238 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1239
1240 let device_types = vec![input_device::InputDeviceType::Mouse];
1241 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1242 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1243
1244 let mut count: i8 = 0;
1246 fasync::Task::local(async move {
1247 let _ = input_device_registry_proxy.register(input_device_client_end);
1249
1250 while count < 3 {
1251 if let Some(input_device_request) =
1252 input_device_request_stream.try_next().await.unwrap()
1253 {
1254 handle_input_device_request(input_device_request);
1255 count += 1;
1256 }
1257 }
1258
1259 input_device_registry_proxy.take_event_stream();
1261 })
1262 .detach();
1263
1264 let inspector = fuchsia_inspect::Inspector::default();
1265 let test_node = inspector.root().create_child("input_pipeline");
1266
1267 let bindings_clone = bindings.clone();
1269 let _ = InputPipeline::handle_input_device_registry_request_stream(
1270 input_device_registry_request_stream,
1271 &device_types,
1272 &input_event_sender,
1273 &bindings_clone,
1274 &test_node,
1275 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1276 metrics::MetricsLogger::default(),
1277 )
1278 .await;
1279
1280 let bindings = bindings.lock().await;
1282 assert_eq!(bindings.len(), 1);
1283 }
1284
1285 #[fasync::run_singlethreaded(test)]
1287 async fn check_inspect_node_has_correct_properties() {
1288 let device_types = vec![
1289 input_device::InputDeviceType::Touch,
1290 input_device::InputDeviceType::ConsumerControls,
1291 ];
1292 let inspector = fuchsia_inspect::Inspector::default();
1293 let test_node = inspector.root().create_child("input_pipeline");
1294 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1296 futures::channel::mpsc::channel(100);
1297 let fake_input_handler =
1298 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1299 fake_handler_event_sender,
1300 );
1301 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1302 .add_handler(fake_input_handler);
1303 let _test_input_pipeline = InputPipeline::new(
1304 &Incoming::new(),
1305 device_types,
1306 assembly,
1307 test_node,
1308 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1309 metrics::MetricsLogger::default(),
1310 );
1311 diagnostics_assertions::assert_data_tree!(inspector, root: {
1312 input_pipeline: {
1313 supported_input_devices: "Touch, ConsumerControls",
1314 handlers_registered: 1u64,
1315 handlers_healthy: 1u64,
1316 input_devices: {}
1317 }
1318 });
1319 }
1320
1321 struct SpecificInterestFakeHandler {
1322 interest_types: Vec<input_device::InputEventType>,
1323 event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1324 }
1325
1326 impl SpecificInterestFakeHandler {
1327 pub fn new(
1328 interest_types: Vec<input_device::InputEventType>,
1329 event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1330 ) -> Rc<Self> {
1331 Rc::new(SpecificInterestFakeHandler {
1332 interest_types,
1333 event_sender: std::cell::RefCell::new(event_sender),
1334 })
1335 }
1336 }
1337
1338 impl Handler for SpecificInterestFakeHandler {
1339 fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1340 fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1341 fn get_name(&self) -> &'static str {
1342 "SpecificInterestFakeHandler"
1343 }
1344
1345 fn interest(&self) -> Vec<input_device::InputEventType> {
1346 self.interest_types.clone()
1347 }
1348 }
1349
1350 #[async_trait(?Send)]
1351 impl input_handler::InputHandler for SpecificInterestFakeHandler {
1352 async fn handle_input_event(
1353 self: Rc<Self>,
1354 input_event: input_device::InputEvent,
1355 ) -> Vec<input_device::InputEvent> {
1356 match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1357 Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1358 Ok(_) => {}
1359 }
1360 vec![input_event]
1361 }
1362 }
1363
1364 #[fasync::run_singlethreaded(test)]
1365 async fn run_only_sends_events_to_interested_handlers() {
1366 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1368 let mouse_handler =
1369 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1370
1371 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1373 let fake_handler =
1374 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1375
1376 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1377 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1378 .add_handler(mouse_handler)
1379 .add_handler(fake_handler)
1380 .into_components();
1381
1382 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1384
1385 let fake_event = input_device::InputEvent {
1387 device_event: input_device::InputDeviceEvent::Fake,
1388 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1389 event_time: zx::MonotonicInstant::get(),
1390 handled: input_device::Handled::No,
1391 trace_id: None,
1392 };
1393
1394 pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1396
1397 let received_by_fake = fake_receiver.next().await;
1399 assert_eq!(received_by_fake, Some(fake_event));
1400
1401 assert!(mouse_receiver.try_next().is_err());
1403 }
1404
1405 fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1406 input_device::InputEvent {
1407 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1408 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1409 millimeters: Position { x, y },
1410 }),
1411 None,
1412 None,
1413 mouse_binding::MousePhase::Move,
1414 SortedVecSet::new(),
1415 SortedVecSet::new(),
1416 None,
1417 None,
1418 )),
1419 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1420 mouse_binding::MouseDeviceDescriptor {
1421 device_id: 1,
1422 absolute_x_range: None,
1423 absolute_y_range: None,
1424 wheel_v_range: None,
1425 wheel_h_range: None,
1426 buttons: None,
1427 counts_per_mm: 1,
1428 },
1429 ),
1430 event_time: zx::MonotonicInstant::get(),
1431 handled: input_device::Handled::No,
1432 trace_id: None,
1433 }
1434 }
1435
1436 #[fasync::run_singlethreaded(test)]
1437 async fn run_mixed_event_types_dispatched_correctly() {
1438 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1440 let mouse_handler =
1441 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1442
1443 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1445 let fake_handler =
1446 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1447
1448 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1449 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1450 .add_handler(mouse_handler)
1451 .add_handler(fake_handler)
1452 .into_components();
1453
1454 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1456
1457 let mouse_event_1 = create_mouse_event(1.0, 1.0);
1459 let mouse_event_2 = create_mouse_event(2.0, 2.0);
1460 let mouse_event_3 = create_mouse_event(3.0, 3.0);
1461
1462 let fake_event_1 = input_device::InputEvent {
1463 device_event: input_device::InputDeviceEvent::Fake,
1464 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1465 event_time: zx::MonotonicInstant::get(),
1466 handled: input_device::Handled::No,
1467 trace_id: None,
1468 };
1469
1470 let mixed_batch = vec![
1473 mouse_event_1.clone(),
1474 mouse_event_2.clone(),
1475 fake_event_1.clone(),
1476 mouse_event_3.clone(),
1477 ];
1478 pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1479
1480 assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1482 assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1483 assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1484
1485 assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1487 }
1488}