1use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, Transport, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use fidl_fuchsia_io as fio;
13use focus_chain_provider::FocusChainProviderPublisher;
14use fuchsia_async as fasync;
15use fuchsia_component::directory::AsRefDirectory;
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use fuchsia_inspect::NumericProperty;
18use fuchsia_inspect::health::Reporter;
19use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
20use futures::future::LocalBoxFuture;
21use futures::lock::Mutex;
22use futures::{StreamExt, TryStreamExt};
23use itertools::Itertools;
24use metrics_registry::*;
25use sorted_vec_map::SortedVecMap;
26use std::path::PathBuf;
27use std::rc::Rc;
28use std::sync::atomic::{AtomicU32, Ordering};
29use std::sync::{Arc, LazyLock};
30use strum::EnumCount;
31
32static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
39
40fn get_next_device_id() -> u32 {
44 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
45}
46
47type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
48
49pub type InputDeviceBindingMap = Arc<Mutex<SortedVecMap<u32, Vec<BoxedInputDeviceBinding>>>>;
52
53pub struct InputPipelineAssembly {
67 sender: UnboundedSender<Vec<input_device::InputEvent>>,
70 receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
73
74 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
76
77 display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
79
80 focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
82
83 metrics_logger: metrics::MetricsLogger,
85}
86
87impl InputPipelineAssembly {
88 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
91 let (sender, receiver) = mpsc::unbounded();
92 InputPipelineAssembly {
93 sender,
94 receiver,
95 handlers: vec![],
96 metrics_logger,
97 display_ownership_fut: None,
98 focus_listener_fut: None,
99 }
100 }
101
102 pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
105 self.handlers.push(handler);
106 self
107 }
108
109 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
111 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
112 }
113
114 pub fn add_display_ownership(
115 mut self,
116 display_ownership_event: zx::Event,
117 input_handlers_node: &fuchsia_inspect::Node,
118 ) -> InputPipelineAssembly {
119 let h = DisplayOwnership::new(
120 display_ownership_event,
121 input_handlers_node,
122 self.metrics_logger.clone(),
123 );
124 let metrics_logger_clone = self.metrics_logger.clone();
125 let h_clone = h.clone();
126 let sender_clone = self.sender.clone();
127 let display_ownership_fut = Box::pin(async move {
128 h_clone.clone().set_handler_healthy();
129 h_clone.clone()
130 .handle_ownership_change(sender_clone)
131 .await
132 .map_err(|e| {
133 metrics_logger_clone.log_error(
134 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
135 std::format!(
136 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
137 })
138 .unwrap();
139 h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
140 });
141 self.display_ownership_fut = Some(display_ownership_fut);
142 self.add_handler(h)
143 }
144
145 fn into_components(
151 self,
152 ) -> (
153 UnboundedSender<Vec<input_device::InputEvent>>,
154 UnboundedReceiver<Vec<input_device::InputEvent>>,
155 Vec<Rc<dyn input_handler::BatchInputHandler>>,
156 metrics::MetricsLogger,
157 Option<LocalBoxFuture<'static, ()>>,
158 Option<LocalBoxFuture<'static, ()>>,
159 ) {
160 (
161 self.sender,
162 self.receiver,
163 self.handlers,
164 self.metrics_logger,
165 self.display_ownership_fut,
166 self.focus_listener_fut,
167 )
168 }
169
170 pub fn add_focus_listener(
171 mut self,
172 incoming: &Incoming,
173 focus_chain_publisher: FocusChainProviderPublisher,
174 ) -> Self {
175 let metrics_logger_clone = self.metrics_logger.clone();
176 let incoming2 = incoming.clone();
177 let focus_listener_fut = Box::pin(async move {
178 if let Ok(mut focus_listener) = FocusListener::new(
179 &incoming2,
180 focus_chain_publisher,
181 metrics_logger_clone,
182 )
183 .map_err(|e| {
184 log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
185 }) {
186 let _result = focus_listener
189 .dispatch_focus_changes()
190 .await
191 .map(|_| {
192 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
193 })
194 .map_err(|e| {
195 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
196 });
197 }
198 });
199 self.focus_listener_fut = Some(focus_listener_fut);
200 self
201 }
202}
203
204pub struct InputPipeline {
232 pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
236
237 device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
240
241 device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
243
244 input_device_types: Vec<input_device::InputDeviceType>,
246
247 input_device_bindings: InputDeviceBindingMap,
249
250 inspect_node: fuchsia_inspect::Node,
253
254 metrics_logger: metrics::MetricsLogger,
256
257 pub feature_flags: input_device::InputPipelineFeatureFlags,
259}
260
261impl InputPipeline {
262 fn new_common(
263 input_device_types: Vec<input_device::InputDeviceType>,
264 assembly: InputPipelineAssembly,
265 inspect_node: fuchsia_inspect::Node,
266 feature_flags: input_device::InputPipelineFeatureFlags,
267 ) -> Self {
268 let (
269 pipeline_sender,
270 receiver,
271 handlers,
272 metrics_logger,
273 display_ownership_fut,
274 focus_listener_fut,
275 ) = assembly.into_components();
276
277 let mut handlers_count = handlers.len();
278 if let Some(fut) = display_ownership_fut {
280 Dispatcher::spawn_local(fut).detach();
284 handlers_count += 1;
285 }
286
287 if let Some(fut) = focus_listener_fut {
289 fasync::Task::local(fut).detach();
290 handlers_count += 1;
291 }
292
293 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
295 inspect_node.record_uint("handlers_registered", handlers_count as u64);
296 inspect_node.record_uint("handlers_healthy", handlers_count as u64);
297
298 InputPipeline::run(receiver, handlers, metrics_logger.clone());
300
301 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
302 let input_device_bindings: InputDeviceBindingMap =
303 Arc::new(Mutex::new(SortedVecMap::new()));
304 InputPipeline {
305 pipeline_sender,
306 device_event_sender,
307 device_event_receiver,
308 input_device_types,
309 input_device_bindings,
310 inspect_node,
311 metrics_logger,
312 feature_flags,
313 }
314 }
315
316 pub fn new_for_test(
324 input_device_types: Vec<input_device::InputDeviceType>,
325 assembly: InputPipelineAssembly,
326 ) -> Self {
327 let inspector = fuchsia_inspect::Inspector::default();
328 let root = inspector.root();
329 let test_node = root.create_child("input_pipeline");
330 Self::new_common(
331 input_device_types,
332 assembly,
333 test_node,
334 input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
335 )
336 }
337
338 pub fn new(
345 incoming: &Incoming,
346 input_device_types: Vec<input_device::InputDeviceType>,
347 assembly: InputPipelineAssembly,
348 inspect_node: fuchsia_inspect::Node,
349 feature_flags: input_device::InputPipelineFeatureFlags,
350 metrics_logger: metrics::MetricsLogger,
351 ) -> Result<Self, Error> {
352 let input_pipeline =
353 Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
354 let input_device_types = input_pipeline.input_device_types.clone();
355 let input_event_sender = input_pipeline.device_event_sender.clone();
356 let input_device_bindings = input_pipeline.input_device_bindings.clone();
357 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
358 let feature_flags = input_pipeline.feature_flags.clone();
359 let incoming = incoming.clone();
360 fasync::Task::local(async move {
365 match async {
368 let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
369 incoming.as_ref_directory().open(
370 input_device::INPUT_REPORT_PATH,
371 fio::PERM_READABLE,
372 server.into()
373 )
374 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
375 let device_watcher =
376 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
377 Self::watch_for_devices(
378 device_watcher,
379 dir_proxy,
380 input_device_types,
381 input_event_sender,
382 input_device_bindings,
383 &devices_node,
384 false, feature_flags,
386 metrics_logger.clone(),
387 )
388 .await
389 .context("failed to watch for devices")
390 }
391 .await
392 {
393 Ok(()) => {}
394 Err(err) => {
395 metrics_logger.log_warn(
399 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
400 std::format!(
401 "Input pipeline is unable to watch for new input devices: {:?}",
402 err
403 ));
404 }
405 }
406 }).detach();
407
408 Ok(input_pipeline)
409 }
410
411 pub fn input_device_bindings(&self) -> &InputDeviceBindingMap {
413 &self.input_device_bindings
414 }
415
416 pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
419 &self.device_event_sender
420 }
421
422 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
424 &self.input_device_types
425 }
426
427 pub async fn handle_input_events(mut self) {
429 let metrics_logger_clone = self.metrics_logger.clone();
430 while let Some(input_event) = self.device_event_receiver.next().await {
431 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
432 metrics_logger_clone.log_error(
433 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
434 std::format!("could not forward event from driver: {:?}", e));
435 }
436 }
437
438 metrics_logger_clone.log_error(
439 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
440 "Input pipeline stopped handling input events.".to_string(),
441 );
442 }
443
444 async fn watch_for_devices(
460 mut device_watcher: Watcher,
461 dir_proxy: fio::DirectoryProxy,
462 device_types: Vec<input_device::InputDeviceType>,
463 input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
464 bindings: InputDeviceBindingMap,
465 input_devices_node: &fuchsia_inspect::Node,
466 break_on_idle: bool,
467 feature_flags: input_device::InputPipelineFeatureFlags,
468 metrics_logger: metrics::MetricsLogger,
469 ) -> Result<(), Error> {
470 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
472 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
473 while let Some(msg) = device_watcher.try_next().await? {
474 if let Ok(filename) = msg.filename.into_os_string().into_string() {
475 if filename == "." {
476 continue;
477 }
478
479 let pathbuf = PathBuf::from(filename.clone());
480 match msg.event {
481 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
482 log::info!("found input device {}", filename);
483 devices_discovered.add(1);
484 let device_proxy =
485 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
486 add_device_bindings(
487 &device_types,
488 &filename,
489 device_proxy,
490 &input_event_sender,
491 &bindings,
492 get_next_device_id(),
493 input_devices_node,
494 Some(&devices_connected),
495 feature_flags.clone(),
496 metrics_logger.clone(),
497 )
498 .await;
499 }
500 WatchEvent::IDLE => {
501 if break_on_idle {
502 break;
503 }
504 }
505 _ => (),
506 }
507 }
508 }
509 input_devices_node.record(devices_discovered);
511 input_devices_node.record(devices_connected);
512 Err(format_err!("Input pipeline stopped watching for new input devices."))
513 }
514
515 pub async fn handle_input_device_registry_request_stream(
530 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
531 device_types: &Vec<input_device::InputDeviceType>,
532 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
533 bindings: &InputDeviceBindingMap,
534 input_devices_node: &fuchsia_inspect::Node,
535 feature_flags: input_device::InputPipelineFeatureFlags,
536 metrics_logger: metrics::MetricsLogger,
537 ) -> Result<(), Error> {
538 while let Some(request) = stream
539 .try_next()
540 .await
541 .context("Error handling input device registry request stream")?
542 {
543 match request {
544 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
545 device,
546 ..
547 } => {
548 let device = fidl_next::ClientEnd::<
550 fidl_next_fuchsia_input_report::InputDevice,
551 zx::Channel,
552 >::from_untyped(device.into_channel());
553 let device = Dispatcher::client_from_zx_channel(device);
554 let device = device.spawn();
555 let device_id = get_next_device_id();
556
557 add_device_bindings(
558 device_types,
559 &format!("input-device-registry-{}", device_id),
560 device,
561 input_event_sender,
562 bindings,
563 device_id,
564 input_devices_node,
565 None,
566 feature_flags.clone(),
567 metrics_logger.clone(),
568 )
569 .await;
570 }
571 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
572 device,
573 responder,
574 .. } => {
575 let device = fidl_next::ClientEnd::<
577 fidl_next_fuchsia_input_report::InputDevice,
578 zx::Channel,
579 >::from_untyped(device.into_channel());
580 let device = Dispatcher::client_from_zx_channel(device);
581 let device = device.spawn();
582 let device_id = get_next_device_id();
583
584 add_device_bindings(
585 device_types,
586 &format!("input-device-registry-{}", device_id),
587 device,
588 input_event_sender,
589 bindings,
590 device_id,
591 input_devices_node,
592 None,
593 feature_flags.clone(),
594 metrics_logger.clone(),
595 )
596 .await;
597
598 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
599 device_id: Some(device_id),
600 ..Default::default()
601 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
602 }
603 }
604 }
605
606 Ok(())
607 }
608
609 fn run(
611 mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
612 handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
613 metrics_logger: metrics::MetricsLogger,
614 ) {
615 Dispatcher::spawn_local(async move {
616 for handler in &handlers {
617 handler.clone().set_handler_healthy();
618 }
619
620 use input_device::InputEventType;
621
622 let mut handlers_by_type: [Vec<Rc<dyn input_handler::BatchInputHandler>>; InputEventType::COUNT] = Default::default();
623
624 let event_types = vec![
626 InputEventType::Keyboard,
627 InputEventType::LightSensor,
628 InputEventType::ConsumerControls,
629 InputEventType::Mouse,
630 InputEventType::TouchScreen,
631 InputEventType::Touchpad,
632 #[cfg(test)]
633 InputEventType::Fake,
634 ];
635
636 for event_type in event_types {
637 let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
638 .iter()
639 .filter(|h| h.interest().contains(&event_type))
640 .cloned()
641 .collect();
642 handlers_by_type[event_type as usize] = handlers_for_type;
643 }
644
645 while let Some(events) = receiver.next().await {
646 if events.is_empty() {
647 continue;
648 }
649
650 let mut groups_seen = 0;
651 let events = events
652 .into_iter()
653 .chunk_by(|e| InputEventType::from(&e.device_event));
654 let events = events.into_iter().map(|(k, v)| (k, v.collect::<Vec<_>>()));
655 for (event_type, event_group) in events {
656 groups_seen += 1;
657 if groups_seen == 2 {
658 metrics_logger.log_error(
659 InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
660 "it is not recommended to contain multiple types of events in 1 send".to_string(),
661 );
662 }
663 let mut events_in_group = event_group;
664
665 let handlers = &handlers_by_type[event_type as usize];
667
668 for handler in handlers {
669 events_in_group =
670 handler.clone().handle_input_events(events_in_group).await;
671 }
672
673 for event in events_in_group {
674 if event.handled == input_device::Handled::No {
675 log::warn!("unhandled input event: {:?}", event);
676 }
677 if let Some(trace_id) = event.trace_id {
678 fuchsia_trace::flow_end!(
679 "input",
680 "event_in_input_pipeline",
681 trace_id.into()
682 );
683 }
684 }
685 }
686 }
687 for handler in &handlers {
688 handler.clone().set_handler_unhealthy("Pipeline loop terminated");
689 }
690 panic!("Runner task is not supposed to terminate.")
691 }).detach();
692 }
693}
694
695async fn add_device_bindings(
715 device_types: &Vec<input_device::InputDeviceType>,
716 filename: &String,
717 device_proxy: fidl_next::Client<fidl_next_fuchsia_input_report::InputDevice, Transport>,
718 input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
719 bindings: &InputDeviceBindingMap,
720 device_id: u32,
721 input_devices_node: &fuchsia_inspect::Node,
722 devices_connected: Option<&fuchsia_inspect::UintProperty>,
723 feature_flags: InputPipelineFeatureFlags,
724 metrics_logger: metrics::MetricsLogger,
725) {
726 let mut matched_device_types = vec![];
727 if let Ok(res) = device_proxy.get_descriptor().await {
728 for device_type in device_types {
729 if input_device::is_device_type(&res.descriptor, *device_type).await {
730 matched_device_types.push(device_type);
731 match devices_connected {
732 Some(dev_connected) => {
733 let _ = dev_connected.add(1);
734 }
735 None => (),
736 };
737 }
738 }
739 if matched_device_types.is_empty() {
740 log::info!(
741 "device {} did not match any supported device types: {:?}",
742 filename,
743 device_types
744 );
745 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
746 let mut health = fuchsia_inspect::health::Node::new(&device_node);
747 health.set_unhealthy("Unsupported device type.");
748 device_node.record(health);
749 input_devices_node.record(device_node);
750 return;
751 }
752 } else {
753 metrics_logger.clone().log_error(
754 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
755 std::format!("cannot bind device {} without a device descriptor", filename),
756 );
757 return;
758 }
759
760 log::info!(
761 "binding {} to device types: {}",
762 filename,
763 matched_device_types
764 .iter()
765 .fold(String::new(), |device_types_string, device_type| device_types_string
766 + &format!("{:?}, ", device_type))
767 );
768
769 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
770 for device_type in matched_device_types {
771 let proxy = device_proxy.clone();
792 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
793 match input_device::get_device_binding(
794 *device_type,
795 proxy,
796 device_id,
797 input_event_sender.clone(),
798 device_node,
799 feature_flags.clone(),
800 metrics_logger.clone(),
801 )
802 .await
803 {
804 Ok(binding) => new_bindings.push(binding),
805 Err(e) => {
806 metrics_logger.log_error(
807 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
808 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
809 );
810 }
811 }
812 }
813
814 if !new_bindings.is_empty() {
815 let mut bindings = bindings.lock().await;
816 if let Some(v) = bindings.get_mut(&device_id) {
817 v.extend(new_bindings);
818 } else {
819 bindings.insert(device_id, new_bindings);
820 }
821 }
822}
823
824#[cfg(test)]
825mod tests {
826 use super::*;
827 use crate::input_device::{InputDeviceBinding, InputEventType};
828 use crate::utils::Position;
829 use crate::{fake_input_device_binding, mouse_binding, observe_fake_events_input_handler};
830 use async_trait::async_trait;
831 use diagnostics_assertions::AnyProperty;
832 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
833 use fuchsia_async as fasync;
834 use futures::FutureExt;
835 use pretty_assertions::assert_eq;
836 use rand::Rng;
837 use sorted_vec_map::SortedVecSet;
838 use vfs::{pseudo_directory, service as pseudo_fs_service};
839
840 fn send_input_event(
845 sender: UnboundedSender<Vec<input_device::InputEvent>>,
846 ) -> Vec<input_device::InputEvent> {
847 let mut rng = rand::rng();
848 let offset =
849 Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
850 let input_event = input_device::InputEvent {
851 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
852 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
853 counts: Position { x: offset.x, y: offset.y },
854 }),
855 None, None, mouse_binding::MousePhase::Move,
858 SortedVecSet::new(),
859 SortedVecSet::new(),
860 None, None, )),
863 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
864 mouse_binding::MouseDeviceDescriptor {
865 device_id: 1,
866 absolute_x_range: None,
867 absolute_y_range: None,
868 wheel_v_range: None,
869 wheel_h_range: None,
870 buttons: None,
871 },
872 ),
873 event_time: zx::MonotonicInstant::get(),
874 handled: input_device::Handled::No,
875 trace_id: None,
876 };
877 match sender.unbounded_send(vec![input_event.clone()]) {
878 Err(_) => assert!(false),
879 _ => {}
880 }
881
882 vec![input_event]
883 }
884
885 fn handle_input_device_request(
890 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
891 ) {
892 match input_device_request {
893 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
894 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
895 device_information: None,
896 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
897 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
898 movement_x: None,
899 movement_y: None,
900 scroll_v: None,
901 scroll_h: None,
902 buttons: Some(vec![0]),
903 position_x: None,
904 position_y: None,
905 ..Default::default()
906 }),
907 ..Default::default()
908 }),
909 sensor: None,
910 touch: None,
911 keyboard: None,
912 consumer_control: None,
913 ..Default::default()
914 });
915 }
916 _ => {}
917 }
918 }
919
920 #[fasync::run_singlethreaded(test)]
922 async fn multiple_devices_single_handler() {
923 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
925 let first_device_binding =
926 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
927 let second_device_binding =
928 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
929
930 let (handler_event_sender, mut handler_event_receiver) =
932 futures::channel::mpsc::channel(100);
933 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
934 handler_event_sender,
935 );
936
937 let (sender, receiver, handlers, _, _, _) =
939 InputPipelineAssembly::new(metrics::MetricsLogger::default())
940 .add_handler(input_handler)
941 .into_components();
942 let inspector = fuchsia_inspect::Inspector::default();
943 let test_node = inspector.root().create_child("input_pipeline");
944 let input_pipeline = InputPipeline {
945 pipeline_sender: sender,
946 device_event_sender,
947 device_event_receiver,
948 input_device_types: vec![],
949 input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
950 inspect_node: test_node,
951 metrics_logger: metrics::MetricsLogger::default(),
952 feature_flags: input_device::InputPipelineFeatureFlags::default(),
953 };
954 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
955
956 let first_device_events = send_input_event(first_device_binding.input_event_sender());
958 let second_device_events = send_input_event(second_device_binding.input_event_sender());
959
960 fasync::Task::local(async {
962 input_pipeline.handle_input_events().await;
963 })
964 .detach();
965
966 let first_handled_event = handler_event_receiver.next().await;
968 assert_eq!(first_handled_event, first_device_events.into_iter().next());
969
970 let second_handled_event = handler_event_receiver.next().await;
971 assert_eq!(second_handled_event, second_device_events.into_iter().next());
972 }
973
974 #[fasync::run_singlethreaded(test)]
976 async fn single_device_multiple_handlers() {
977 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
979 let input_device_binding =
980 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
981
982 let (first_handler_event_sender, mut first_handler_event_receiver) =
984 futures::channel::mpsc::channel(100);
985 let first_input_handler =
986 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
987 first_handler_event_sender,
988 );
989 let (second_handler_event_sender, mut second_handler_event_receiver) =
990 futures::channel::mpsc::channel(100);
991 let second_input_handler =
992 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
993 second_handler_event_sender,
994 );
995
996 let (sender, receiver, handlers, _, _, _) =
998 InputPipelineAssembly::new(metrics::MetricsLogger::default())
999 .add_handler(first_input_handler)
1000 .add_handler(second_input_handler)
1001 .into_components();
1002 let inspector = fuchsia_inspect::Inspector::default();
1003 let test_node = inspector.root().create_child("input_pipeline");
1004 let input_pipeline = InputPipeline {
1005 pipeline_sender: sender,
1006 device_event_sender,
1007 device_event_receiver,
1008 input_device_types: vec![],
1009 input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
1010 inspect_node: test_node,
1011 metrics_logger: metrics::MetricsLogger::default(),
1012 feature_flags: input_device::InputPipelineFeatureFlags::default(),
1013 };
1014 InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1015
1016 let input_events = send_input_event(input_device_binding.input_event_sender());
1018
1019 fasync::Task::local(async {
1021 input_pipeline.handle_input_events().await;
1022 })
1023 .detach();
1024
1025 let expected_event = input_events.into_iter().next();
1027 let first_handler_event = first_handler_event_receiver.next().await;
1028 assert_eq!(first_handler_event, expected_event);
1029 let second_handler_event = second_handler_event_receiver.next().await;
1030 assert_eq!(second_handler_event, expected_event);
1031 }
1032
1033 #[fasync::run_singlethreaded(test)]
1036 async fn watch_devices_one_match_exists() {
1037 let mut count: i8 = 0;
1039 let dir = pseudo_directory! {
1040 "file_name" => pseudo_fs_service::host(
1041 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1042 async move {
1043 while count < 3 {
1044 if let Some(input_device_request) =
1045 request_stream.try_next().await.unwrap()
1046 {
1047 handle_input_device_request(input_device_request);
1048 count += 1;
1049 }
1050 }
1051
1052 }.boxed()
1053 },
1054 )
1055 };
1056
1057 let dir_proxy_for_watcher = vfs::directory::serve_read_only(
1059 dir.clone(),
1060 vfs::execution_scope::ExecutionScope::new(),
1061 );
1062 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1063 let dir_proxy_for_pipeline =
1066 vfs::directory::serve_read_only(dir, vfs::execution_scope::ExecutionScope::new());
1067
1068 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1069 let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1070 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1071
1072 let inspector = fuchsia_inspect::Inspector::default();
1073 let test_node = inspector.root().create_child("input_pipeline");
1074 test_node.record_string(
1075 "supported_input_devices",
1076 supported_device_types.clone().iter().join(", "),
1077 );
1078 let input_devices = test_node.create_child("input_devices");
1079 diagnostics_assertions::assert_data_tree!(inspector, root: {
1081 input_pipeline: {
1082 supported_input_devices: "Mouse",
1083 input_devices: {}
1084 }
1085 });
1086
1087 let _ = InputPipeline::watch_for_devices(
1088 device_watcher,
1089 dir_proxy_for_pipeline,
1090 supported_device_types,
1091 input_event_sender,
1092 bindings.clone(),
1093 &input_devices,
1094 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1096 metrics::MetricsLogger::default(),
1097 )
1098 .await;
1099
1100 let bindings_map = bindings.lock().await;
1102 assert_eq!(bindings_map.len(), 1);
1103 let bindings_vector = bindings_map.get(&10);
1104 assert!(bindings_vector.is_some());
1105 assert_eq!(bindings_vector.unwrap().len(), 1);
1106 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1107 assert!(boxed_mouse_binding.is_some());
1108 assert_eq!(
1109 boxed_mouse_binding.unwrap().get_device_descriptor(),
1110 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1111 device_id: 10,
1112 absolute_x_range: None,
1113 absolute_y_range: None,
1114 wheel_v_range: None,
1115 wheel_h_range: None,
1116 buttons: Some(vec![0]),
1117 })
1118 );
1119
1120 diagnostics_assertions::assert_data_tree!(inspector, root: {
1122 input_pipeline: {
1123 supported_input_devices: "Mouse",
1124 input_devices: {
1125 devices_discovered: 1u64,
1126 devices_connected: 1u64,
1127 "file_name_Mouse": contains {
1128 reports_received_count: 0u64,
1129 reports_filtered_count: 0u64,
1130 events_generated: 0u64,
1131 last_received_timestamp_ns: 0u64,
1132 last_generated_timestamp_ns: 0u64,
1133 "fuchsia.inspect.Health": {
1134 status: "OK",
1135 start_timestamp_nanos: AnyProperty
1138 },
1139 }
1140 }
1141 }
1142 });
1143 }
1144
1145 #[fasync::run_singlethreaded(test)]
1148 async fn watch_devices_no_matches_exist() {
1149 let mut count: i8 = 0;
1151 let dir = pseudo_directory! {
1152 "file_name" => pseudo_fs_service::host(
1153 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1154 async move {
1155 while count < 1 {
1156 if let Some(input_device_request) =
1157 request_stream.try_next().await.unwrap()
1158 {
1159 handle_input_device_request(input_device_request);
1160 count += 1;
1161 }
1162 }
1163
1164 }.boxed()
1165 },
1166 )
1167 };
1168
1169 let dir_proxy_for_watcher = vfs::directory::serve_read_only(
1171 dir.clone(),
1172 vfs::execution_scope::ExecutionScope::new(),
1173 );
1174 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1175 let dir_proxy_for_pipeline =
1178 vfs::directory::serve_read_only(dir, vfs::execution_scope::ExecutionScope::new());
1179
1180 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1181 let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1182 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1183
1184 let inspector = fuchsia_inspect::Inspector::default();
1185 let test_node = inspector.root().create_child("input_pipeline");
1186 test_node.record_string(
1187 "supported_input_devices",
1188 supported_device_types.clone().iter().join(", "),
1189 );
1190 let input_devices = test_node.create_child("input_devices");
1191 diagnostics_assertions::assert_data_tree!(inspector, root: {
1193 input_pipeline: {
1194 supported_input_devices: "Keyboard",
1195 input_devices: {}
1196 }
1197 });
1198
1199 let _ = InputPipeline::watch_for_devices(
1200 device_watcher,
1201 dir_proxy_for_pipeline,
1202 supported_device_types,
1203 input_event_sender,
1204 bindings.clone(),
1205 &input_devices,
1206 true, InputPipelineFeatureFlags { enable_merge_touch_events: false },
1208 metrics::MetricsLogger::default(),
1209 )
1210 .await;
1211
1212 let bindings = bindings.lock().await;
1214 assert_eq!(bindings.len(), 0);
1215
1216 diagnostics_assertions::assert_data_tree!(inspector, root: {
1218 input_pipeline: {
1219 supported_input_devices: "Keyboard",
1220 input_devices: {
1221 devices_discovered: 1u64,
1222 devices_connected: 0u64,
1223 "file_name_Unsupported": {
1224 "fuchsia.inspect.Health": {
1225 status: "UNHEALTHY",
1226 message: "Unsupported device type.",
1227 start_timestamp_nanos: AnyProperty
1230 },
1231 }
1232 }
1233 }
1234 });
1235 }
1236
1237 #[fasync::run_singlethreaded(test)]
1240 async fn handle_input_device_registry_request_stream() {
1241 let (input_device_registry_proxy, input_device_registry_request_stream) =
1242 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1243 let (input_device_client_end, mut input_device_request_stream) =
1244 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1245
1246 let device_types = vec![input_device::InputDeviceType::Mouse];
1247 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1248 let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1249
1250 let mut count: i8 = 0;
1252 fasync::Task::local(async move {
1253 let _ = input_device_registry_proxy.register(input_device_client_end);
1255
1256 while count < 3 {
1257 if let Some(input_device_request) =
1258 input_device_request_stream.try_next().await.unwrap()
1259 {
1260 handle_input_device_request(input_device_request);
1261 count += 1;
1262 }
1263 }
1264
1265 input_device_registry_proxy.take_event_stream();
1267 })
1268 .detach();
1269
1270 let inspector = fuchsia_inspect::Inspector::default();
1271 let test_node = inspector.root().create_child("input_pipeline");
1272
1273 let bindings_clone = bindings.clone();
1275 let _ = InputPipeline::handle_input_device_registry_request_stream(
1276 input_device_registry_request_stream,
1277 &device_types,
1278 &input_event_sender,
1279 &bindings_clone,
1280 &test_node,
1281 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1282 metrics::MetricsLogger::default(),
1283 )
1284 .await;
1285
1286 let bindings = bindings.lock().await;
1288 assert_eq!(bindings.len(), 1);
1289 }
1290
1291 #[fasync::run_singlethreaded(test)]
1293 async fn check_inspect_node_has_correct_properties() {
1294 let device_types = vec![
1295 input_device::InputDeviceType::Touch,
1296 input_device::InputDeviceType::ConsumerControls,
1297 ];
1298 let inspector = fuchsia_inspect::Inspector::default();
1299 let test_node = inspector.root().create_child("input_pipeline");
1300 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1302 futures::channel::mpsc::channel(100);
1303 let fake_input_handler =
1304 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1305 fake_handler_event_sender,
1306 );
1307 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1308 .add_handler(fake_input_handler);
1309 let _test_input_pipeline = InputPipeline::new(
1310 &Incoming::new(),
1311 device_types,
1312 assembly,
1313 test_node,
1314 InputPipelineFeatureFlags { enable_merge_touch_events: false },
1315 metrics::MetricsLogger::default(),
1316 );
1317 diagnostics_assertions::assert_data_tree!(inspector, root: {
1318 input_pipeline: {
1319 supported_input_devices: "Touch, ConsumerControls",
1320 handlers_registered: 1u64,
1321 handlers_healthy: 1u64,
1322 input_devices: {}
1323 }
1324 });
1325 }
1326
1327 struct SpecificInterestFakeHandler {
1328 interest_types: Vec<input_device::InputEventType>,
1329 event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1330 }
1331
1332 impl SpecificInterestFakeHandler {
1333 pub fn new(
1334 interest_types: Vec<input_device::InputEventType>,
1335 event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1336 ) -> Rc<Self> {
1337 Rc::new(SpecificInterestFakeHandler {
1338 interest_types,
1339 event_sender: std::cell::RefCell::new(event_sender),
1340 })
1341 }
1342 }
1343
1344 impl Handler for SpecificInterestFakeHandler {
1345 fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1346 fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1347 fn get_name(&self) -> &'static str {
1348 "SpecificInterestFakeHandler"
1349 }
1350
1351 fn interest(&self) -> Vec<input_device::InputEventType> {
1352 self.interest_types.clone()
1353 }
1354 }
1355
1356 #[async_trait(?Send)]
1357 impl input_handler::InputHandler for SpecificInterestFakeHandler {
1358 async fn handle_input_event(
1359 self: Rc<Self>,
1360 input_event: input_device::InputEvent,
1361 ) -> Vec<input_device::InputEvent> {
1362 match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1363 Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1364 Ok(_) => {}
1365 }
1366 vec![input_event]
1367 }
1368 }
1369
1370 #[fasync::run_singlethreaded(test)]
1371 async fn run_only_sends_events_to_interested_handlers() {
1372 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1374 let mouse_handler =
1375 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1376
1377 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1379 let fake_handler =
1380 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1381
1382 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1383 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1384 .add_handler(mouse_handler)
1385 .add_handler(fake_handler)
1386 .into_components();
1387
1388 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1390
1391 let fake_event = input_device::InputEvent {
1393 device_event: input_device::InputDeviceEvent::Fake,
1394 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1395 event_time: zx::MonotonicInstant::get(),
1396 handled: input_device::Handled::No,
1397 trace_id: None,
1398 };
1399
1400 pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1402
1403 let received_by_fake = fake_receiver.next().await;
1405 assert_eq!(received_by_fake, Some(fake_event));
1406
1407 assert!(mouse_receiver.try_next().is_err());
1409 }
1410
1411 fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1412 input_device::InputEvent {
1413 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1414 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1415 counts: Position { x, y },
1416 }),
1417 None,
1418 None,
1419 mouse_binding::MousePhase::Move,
1420 SortedVecSet::new(),
1421 SortedVecSet::new(),
1422 None,
1423 None,
1424 )),
1425 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1426 mouse_binding::MouseDeviceDescriptor {
1427 device_id: 1,
1428 absolute_x_range: None,
1429 absolute_y_range: None,
1430 wheel_v_range: None,
1431 wheel_h_range: None,
1432 buttons: None,
1433 },
1434 ),
1435 event_time: zx::MonotonicInstant::get(),
1436 handled: input_device::Handled::No,
1437 trace_id: None,
1438 }
1439 }
1440
1441 #[fasync::run_singlethreaded(test)]
1442 async fn run_mixed_event_types_dispatched_correctly() {
1443 let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1445 let mouse_handler =
1446 SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1447
1448 let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1450 let fake_handler =
1451 SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1452
1453 let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1454 InputPipelineAssembly::new(metrics::MetricsLogger::default())
1455 .add_handler(mouse_handler)
1456 .add_handler(fake_handler)
1457 .into_components();
1458
1459 InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1461
1462 let mouse_event_1 = create_mouse_event(1.0, 1.0);
1464 let mouse_event_2 = create_mouse_event(2.0, 2.0);
1465 let mouse_event_3 = create_mouse_event(3.0, 3.0);
1466
1467 let fake_event_1 = input_device::InputEvent {
1468 device_event: input_device::InputDeviceEvent::Fake,
1469 device_descriptor: input_device::InputDeviceDescriptor::Fake,
1470 event_time: zx::MonotonicInstant::get(),
1471 handled: input_device::Handled::No,
1472 trace_id: None,
1473 };
1474
1475 let mixed_batch = vec![
1478 mouse_event_1.clone(),
1479 mouse_event_2.clone(),
1480 fake_event_1.clone(),
1481 mouse_event_3.clone(),
1482 ];
1483 pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1484
1485 assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1487 assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1488 assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1489
1490 assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1492 }
1493}