1use crate::autorepeater::Autorepeater;
6use crate::display_ownership::DisplayOwnership;
7use crate::focus_listener::FocusListener;
8use crate::{input_device, input_handler, metrics};
9use anyhow::{format_err, Context, Error};
10use focus_chain_provider::FocusChainProviderPublisher;
11use fuchsia_fs::directory::{WatchEvent, Watcher};
12use fuchsia_inspect::health::Reporter;
13use fuchsia_inspect::NumericProperty;
14use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
15use futures::lock::Mutex;
16use futures::{StreamExt, TryStreamExt};
17use itertools::Itertools;
18use metrics_registry::*;
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::rc::Rc;
22use std::sync::atomic::{AtomicU32, Ordering};
23use std::sync::{Arc, LazyLock};
24use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
25
26static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
33
34fn get_next_device_id() -> u32 {
38 NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
39}
40
41type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
42
43pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
46
47pub struct InputPipelineAssembly {
61 sender: UnboundedSender<input_device::InputEvent>,
64 receiver: UnboundedReceiver<input_device::InputEvent>,
68 tasks: Vec<fuchsia_async::Task<()>>,
72
73 metrics_logger: metrics::MetricsLogger,
75}
76
77impl InputPipelineAssembly {
78 pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
81 let (sender, receiver) = mpsc::unbounded();
82 let tasks = vec![];
83 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
84 }
85
86 pub fn add_handler(self, handler: Rc<dyn input_handler::InputHandler>) -> Self {
90 let (sender, mut receiver, mut tasks, metrics_logger) = self.into_components();
91 let metrics_logger_clone = metrics_logger.clone();
92 let (next_sender, next_receiver) = mpsc::unbounded();
93 let handler_name = handler.get_name();
94 tasks.push(fasync::Task::local(async move {
95 handler.clone().set_handler_healthy();
96 while let Some(event) = receiver.next().await {
97 let out_events = {
101 let _async_trace = fuchsia_trace::async_enter!(
102 fuchsia_trace::Id::new(),
103 c"input",
104 c"handle_input_event",
105 "name" => handler_name
106 );
107 handler.clone().handle_input_event(event).await
108 };
109 for out_event in out_events.into_iter() {
110 if let Err(e) = next_sender.unbounded_send(out_event) {
111 metrics_logger_clone.log_error(
112 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEvent,
113 std::format!(
114 "could not forward event output from handler: {:?}: {:?}",
115 handler_name,
116 e));
117 break;
119 }
120 }
121 }
122 handler.clone().set_handler_unhealthy(std::format!("Receive loop terminated for handler: {:?}", handler_name).as_str());
123 panic!("receive loop is not supposed to terminate for handler: {:?}", handler_name);
124 }));
125 receiver = next_receiver;
126 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
127 }
128
129 pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::InputHandler>>) -> Self {
131 handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
132 }
133
134 pub fn add_display_ownership(
140 self,
141 display_ownership_event: zx::Event,
142 input_handlers_node: &fuchsia_inspect::Node,
143 ) -> InputPipelineAssembly {
144 let (sender, autorepeat_receiver, mut tasks, metrics_logger) = self.into_components();
145 let (autorepeat_sender, receiver) = mpsc::unbounded();
146 let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
147 let metrics_logger_clone = metrics_logger.clone();
148 tasks.push(fasync::Task::local(async move {
149 h.clone().set_handler_healthy();
150 h.clone().handle_input_events(autorepeat_receiver, autorepeat_sender)
151 .await
152 .map_err(|e| {
153 metrics_logger_clone.log_error(
154 InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
155 std::format!(
156 "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
157 }).unwrap();
158 h.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
159 }));
160 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
161 }
162
163 pub fn add_autorepeater(self, input_handlers_node: &fuchsia_inspect::Node) -> Self {
167 let (sender, autorepeat_receiver, mut tasks, metrics_logger) = self.into_components();
168 let (autorepeat_sender, receiver) = mpsc::unbounded();
169 let metrics_logger_clone = metrics_logger.clone();
170 let a = Autorepeater::new(autorepeat_receiver, input_handlers_node, metrics_logger.clone());
171 tasks.push(fasync::Task::local(async move {
172 a.clone().set_handler_healthy();
173 a.clone()
174 .run(autorepeat_sender)
175 .await
176 .map_err(|e| {
177 metrics_logger_clone.log_error(
178 InputPipelineErrorMetricDimensionEvent::InputPipelineAutorepeatRunningError,
179 std::format!("error while running autorepeater: {:?}", e),
180 );
181 })
182 .expect("autorepeater should never error out");
183 a.set_handler_unhealthy("Receive loop terminated for handler: Autorepeater");
184 }));
185 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
186 }
187
188 fn into_components(
194 self,
195 ) -> (
196 UnboundedSender<input_device::InputEvent>,
197 UnboundedReceiver<input_device::InputEvent>,
198 Vec<fuchsia_async::Task<()>>,
199 metrics::MetricsLogger,
200 ) {
201 (self.sender, self.receiver, self.tasks, self.metrics_logger)
202 }
203
204 pub fn add_focus_listener(self, focus_chain_publisher: FocusChainProviderPublisher) -> Self {
217 let (sender, receiver, mut tasks, metrics_logger) = self.into_components();
218 let metrics_logger_clone = metrics_logger.clone();
219 tasks.push(fasync::Task::local(async move {
220 if let Ok(mut focus_listener) =
221 FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
222 log::warn!(
223 "could not create focus listener, focus will not be dispatched: {:?}",
224 e
225 )
226 })
227 {
228 let _result = focus_listener
231 .dispatch_focus_changes()
232 .await
233 .map(|_| {
234 log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
235 })
236 .map_err(|e| {
237 panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
238 });
239 }
240 }));
241 InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
242 }
243}
244
245pub struct InputPipeline {
273 pipeline_sender: UnboundedSender<input_device::InputEvent>,
277
278 device_event_sender: UnboundedSender<input_device::InputEvent>,
281
282 device_event_receiver: UnboundedReceiver<input_device::InputEvent>,
284
285 input_device_types: Vec<input_device::InputDeviceType>,
287
288 input_device_bindings: InputDeviceBindingHashMap,
290
291 inspect_node: fuchsia_inspect::Node,
294
295 metrics_logger: metrics::MetricsLogger,
297}
298
299impl InputPipeline {
300 fn new_common(
303 input_device_types: Vec<input_device::InputDeviceType>,
304 assembly: InputPipelineAssembly,
305 inspect_node: fuchsia_inspect::Node,
306 ) -> Self {
307 let (pipeline_sender, receiver, tasks, metrics_logger) = assembly.into_components();
308
309 inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
311 inspect_node.record_uint("handlers_registered", tasks.len() as u64);
312 inspect_node.record_uint("handlers_healthy", tasks.len() as u64);
313
314 InputPipeline::catch_unhandled(receiver);
317
318 InputPipeline::run(tasks);
320
321 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
322 let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
323 InputPipeline {
324 pipeline_sender,
325 device_event_sender,
326 device_event_receiver,
327 input_device_types,
328 input_device_bindings,
329 inspect_node,
330 metrics_logger,
331 }
332 }
333
334 pub fn new_for_test(
342 input_device_types: Vec<input_device::InputDeviceType>,
343 assembly: InputPipelineAssembly,
344 ) -> Self {
345 let inspector = fuchsia_inspect::Inspector::default();
346 let root = inspector.root();
347 let test_node = root.create_child("input_pipeline");
348 Self::new_common(input_device_types, assembly, test_node)
349 }
350
351 pub fn new(
358 input_device_types: Vec<input_device::InputDeviceType>,
359 assembly: InputPipelineAssembly,
360 inspect_node: fuchsia_inspect::Node,
361 metrics_logger: metrics::MetricsLogger,
362 ) -> Result<Self, Error> {
363 let input_pipeline = Self::new_common(input_device_types, assembly, inspect_node);
364 let input_device_types = input_pipeline.input_device_types.clone();
365 let input_event_sender = input_pipeline.device_event_sender.clone();
366 let input_device_bindings = input_pipeline.input_device_bindings.clone();
367 let devices_node = input_pipeline.inspect_node.create_child("input_devices");
368 fasync::Task::local(async move {
369 match async {
372 let dir_proxy = fuchsia_fs::directory::open_in_namespace(
373 input_device::INPUT_REPORT_PATH,
374 fuchsia_fs::PERM_READABLE,
375 )
376 .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
377 let device_watcher =
378 Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
379 Self::watch_for_devices(
380 device_watcher,
381 dir_proxy,
382 input_device_types,
383 input_event_sender,
384 input_device_bindings,
385 &devices_node,
386 false, metrics_logger.clone(),
388 )
389 .await
390 .context("failed to watch for devices")
391 }
392 .await
393 {
394 Ok(()) => {}
395 Err(err) => {
396 metrics_logger.log_warn(
400 InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
401 std::format!(
402 "Input pipeline is unable to watch for new input devices: {:?}",
403 err
404 ));
405 }
406 }
407 })
408 .detach();
409
410 Ok(input_pipeline)
411 }
412
413 pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
415 &self.input_device_bindings
416 }
417
418 pub fn input_event_sender(&self) -> &UnboundedSender<input_device::InputEvent> {
421 &self.device_event_sender
422 }
423
424 pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
426 &self.input_device_types
427 }
428
429 pub async fn handle_input_events(mut self) {
431 let metrics_logger_clone = self.metrics_logger.clone();
432 while let Some(input_event) = self.device_event_receiver.next().await {
433 if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
434 metrics_logger_clone.log_error(
435 InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
436 std::format!("could not forward event from driver: {:?}", &e));
437 }
438 }
439
440 metrics_logger_clone.log_error(
441 InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
442 "Input pipeline stopped handling input events.".to_string(),
443 );
444 }
445
446 async fn watch_for_devices(
462 mut device_watcher: Watcher,
463 dir_proxy: fio::DirectoryProxy,
464 device_types: Vec<input_device::InputDeviceType>,
465 input_event_sender: UnboundedSender<input_device::InputEvent>,
466 bindings: InputDeviceBindingHashMap,
467 input_devices_node: &fuchsia_inspect::Node,
468 break_on_idle: bool,
469 metrics_logger: metrics::MetricsLogger,
470 ) -> Result<(), Error> {
471 let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
473 let devices_connected = input_devices_node.create_uint("devices_connected", 0);
474 while let Some(msg) = device_watcher.try_next().await? {
475 if let Ok(filename) = msg.filename.into_os_string().into_string() {
476 if filename == "." {
477 continue;
478 }
479
480 let pathbuf = PathBuf::from(filename.clone());
481 match msg.event {
482 WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
483 log::info!("found input device {}", filename);
484 devices_discovered.add(1);
485 let device_proxy =
486 input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
487 add_device_bindings(
488 &device_types,
489 &filename,
490 device_proxy,
491 &input_event_sender,
492 &bindings,
493 get_next_device_id(),
494 input_devices_node,
495 Some(&devices_connected),
496 metrics_logger.clone(),
497 )
498 .await;
499 }
500 WatchEvent::IDLE => {
501 if break_on_idle {
502 break;
503 }
504 }
505 _ => (),
506 }
507 }
508 }
509 input_devices_node.record(devices_discovered);
511 input_devices_node.record(devices_connected);
512 Err(format_err!("Input pipeline stopped watching for new input devices."))
513 }
514
515 pub async fn handle_input_device_registry_request_stream(
530 mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
531 device_types: &Vec<input_device::InputDeviceType>,
532 input_event_sender: &UnboundedSender<input_device::InputEvent>,
533 bindings: &InputDeviceBindingHashMap,
534 input_devices_node: &fuchsia_inspect::Node,
535 metrics_logger: metrics::MetricsLogger,
536 ) -> Result<(), Error> {
537 while let Some(request) = stream
538 .try_next()
539 .await
540 .context("Error handling input device registry request stream")?
541 {
542 match request {
543 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
544 device,
545 ..
546 } => {
547 let device_proxy = device.into_proxy();
549
550 let device_id = get_next_device_id();
551
552 add_device_bindings(
553 device_types,
554 &format!("input-device-registry-{}", device_id),
555 device_proxy,
556 input_event_sender,
557 bindings,
558 device_id,
559 input_devices_node,
560 None,
561 metrics_logger.clone(),
562 )
563 .await;
564 }
565 fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
566 device,
567 responder,
568 .. } => {
569 let device_proxy = device.into_proxy();
571
572 let device_id = get_next_device_id();
573
574 add_device_bindings(
575 device_types,
576 &format!("input-device-registry-{}", device_id),
577 device_proxy,
578 input_event_sender,
579 bindings,
580 device_id,
581 input_devices_node,
582 None,
583 metrics_logger.clone(),
584 )
585 .await;
586
587 responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
588 device_id: Some(device_id),
589 ..Default::default()
590 }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
591 }
592 }
593 }
594
595 Ok(())
596 }
597
598 fn run(tasks: Vec<fuchsia_async::Task<()>>) {
600 fasync::Task::local(async move {
601 futures::future::join_all(tasks).await;
602 panic!("Runner task is not supposed to terminate.")
603 })
604 .detach();
605 }
606
607 fn catch_unhandled(mut receiver: UnboundedReceiver<input_device::InputEvent>) {
610 fasync::Task::local(async move {
611 while let Some(event) = receiver.next().await {
612 if event.handled == input_device::Handled::No {
613 log::warn!("unhandled input event: {:?}", &event);
614 }
615 }
616 panic!("unhandled event catcher is not supposed to terminate.");
617 })
618 .detach();
619 }
620}
621
622async fn add_device_bindings(
642 device_types: &Vec<input_device::InputDeviceType>,
643 filename: &String,
644 device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
645 input_event_sender: &UnboundedSender<input_device::InputEvent>,
646 bindings: &InputDeviceBindingHashMap,
647 device_id: u32,
648 input_devices_node: &fuchsia_inspect::Node,
649 devices_connected: Option<&fuchsia_inspect::UintProperty>,
650 metrics_logger: metrics::MetricsLogger,
651) {
652 let mut matched_device_types = vec![];
653 if let Ok(descriptor) = device_proxy.get_descriptor().await {
654 for device_type in device_types {
655 if input_device::is_device_type(&descriptor, *device_type).await {
656 matched_device_types.push(device_type);
657 match devices_connected {
658 Some(dev_connected) => {
659 let _ = dev_connected.add(1);
660 }
661 None => (),
662 };
663 }
664 }
665 if matched_device_types.is_empty() {
666 log::info!(
667 "device {} did not match any supported device types: {:?}",
668 filename,
669 device_types
670 );
671 let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
672 let mut health = fuchsia_inspect::health::Node::new(&device_node);
673 health.set_unhealthy("Unsupported device type.");
674 device_node.record(health);
675 input_devices_node.record(device_node);
676 return;
677 }
678 } else {
679 metrics_logger.clone().log_error(
680 InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
681 std::format!("cannot bind device {} without a device descriptor", filename),
682 );
683 return;
684 }
685
686 log::info!(
687 "binding {} to device types: {}",
688 filename,
689 matched_device_types
690 .iter()
691 .fold(String::new(), |device_types_string, device_type| device_types_string
692 + &format!("{:?}, ", device_type))
693 );
694
695 let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
696 for device_type in matched_device_types {
697 let proxy = device_proxy.clone();
718 let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
719 match input_device::get_device_binding(
720 *device_type,
721 proxy,
722 device_id,
723 input_event_sender.clone(),
724 device_node,
725 metrics_logger.clone(),
726 )
727 .await
728 {
729 Ok(binding) => new_bindings.push(binding),
730 Err(e) => {
731 metrics_logger.log_error(
732 InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
733 std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
734 );
735 }
736 }
737 }
738
739 if !new_bindings.is_empty() {
740 let mut bindings = bindings.lock().await;
741 bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748 use crate::input_device::InputDeviceBinding;
749 use crate::utils::Position;
750 use crate::{
751 fake_input_device_binding, mouse_binding, mouse_model_database,
752 observe_fake_events_input_handler,
753 };
754 use diagnostics_assertions::AnyProperty;
755 use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
756 use fuchsia_async as fasync;
757 use futures::FutureExt;
758 use pretty_assertions::assert_eq;
759 use rand::Rng;
760 use std::collections::HashSet;
761 use vfs::{pseudo_directory, service as pseudo_fs_service};
762
763 const COUNTS_PER_MM: u32 = 12;
764
765 fn send_input_event(
770 sender: UnboundedSender<input_device::InputEvent>,
771 ) -> input_device::InputEvent {
772 let mut rng = rand::thread_rng();
773 let offset = Position { x: rng.gen_range(0..10) as f32, y: rng.gen_range(0..10) as f32 };
774 let input_event = input_device::InputEvent {
775 device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
776 mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
777 millimeters: Position {
778 x: offset.x / COUNTS_PER_MM as f32,
779 y: offset.y / COUNTS_PER_MM as f32,
780 },
781 }),
782 None, None, mouse_binding::MousePhase::Move,
785 HashSet::new(),
786 HashSet::new(),
787 None, )),
789 device_descriptor: input_device::InputDeviceDescriptor::Mouse(
790 mouse_binding::MouseDeviceDescriptor {
791 device_id: 1,
792 absolute_x_range: None,
793 absolute_y_range: None,
794 wheel_v_range: None,
795 wheel_h_range: None,
796 buttons: None,
797 counts_per_mm: COUNTS_PER_MM,
798 },
799 ),
800 event_time: zx::MonotonicInstant::get(),
801 handled: input_device::Handled::No,
802 trace_id: None,
803 };
804 match sender.unbounded_send(input_event.clone()) {
805 Err(_) => assert!(false),
806 _ => {}
807 }
808
809 input_event
810 }
811
812 fn handle_input_device_request(
817 input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
818 ) {
819 match input_device_request {
820 fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
821 let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
822 device_information: None,
823 mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
824 input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
825 movement_x: None,
826 movement_y: None,
827 scroll_v: None,
828 scroll_h: None,
829 buttons: Some(vec![0]),
830 position_x: None,
831 position_y: None,
832 ..Default::default()
833 }),
834 ..Default::default()
835 }),
836 sensor: None,
837 touch: None,
838 keyboard: None,
839 consumer_control: None,
840 ..Default::default()
841 });
842 }
843 _ => {}
844 }
845 }
846
847 #[fasync::run_singlethreaded(test)]
849 async fn multiple_devices_single_handler() {
850 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
852 let first_device_binding =
853 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
854 let second_device_binding =
855 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
856
857 let (handler_event_sender, mut handler_event_receiver) =
859 futures::channel::mpsc::channel(100);
860 let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
861 handler_event_sender,
862 );
863
864 let (sender, receiver, tasks, _) =
866 InputPipelineAssembly::new(metrics::MetricsLogger::default())
867 .add_handler(input_handler)
868 .into_components();
869 let inspector = fuchsia_inspect::Inspector::default();
870 let test_node = inspector.root().create_child("input_pipeline");
871 let input_pipeline = InputPipeline {
872 pipeline_sender: sender,
873 device_event_sender,
874 device_event_receiver,
875 input_device_types: vec![],
876 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
877 inspect_node: test_node,
878 metrics_logger: metrics::MetricsLogger::default(),
879 };
880 InputPipeline::catch_unhandled(receiver);
881 InputPipeline::run(tasks);
882
883 let first_device_event = send_input_event(first_device_binding.input_event_sender());
885 let second_device_event = send_input_event(second_device_binding.input_event_sender());
886
887 fasync::Task::local(async {
889 input_pipeline.handle_input_events().await;
890 })
891 .detach();
892
893 let first_handled_event = handler_event_receiver.next().await;
895 assert_eq!(first_handled_event, Some(first_device_event));
896
897 let second_handled_event = handler_event_receiver.next().await;
898 assert_eq!(second_handled_event, Some(second_device_event));
899 }
900
901 #[fasync::run_singlethreaded(test)]
903 async fn single_device_multiple_handlers() {
904 let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
906 let input_device_binding =
907 fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
908
909 let (first_handler_event_sender, mut first_handler_event_receiver) =
911 futures::channel::mpsc::channel(100);
912 let first_input_handler =
913 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
914 first_handler_event_sender,
915 );
916 let (second_handler_event_sender, mut second_handler_event_receiver) =
917 futures::channel::mpsc::channel(100);
918 let second_input_handler =
919 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
920 second_handler_event_sender,
921 );
922
923 let (sender, receiver, tasks, _) =
925 InputPipelineAssembly::new(metrics::MetricsLogger::default())
926 .add_handler(first_input_handler)
927 .add_handler(second_input_handler)
928 .into_components();
929 let inspector = fuchsia_inspect::Inspector::default();
930 let test_node = inspector.root().create_child("input_pipeline");
931 let input_pipeline = InputPipeline {
932 pipeline_sender: sender,
933 device_event_sender,
934 device_event_receiver,
935 input_device_types: vec![],
936 input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
937 inspect_node: test_node,
938 metrics_logger: metrics::MetricsLogger::default(),
939 };
940 InputPipeline::catch_unhandled(receiver);
941 InputPipeline::run(tasks);
942
943 let input_event = send_input_event(input_device_binding.input_event_sender());
945
946 fasync::Task::local(async {
948 input_pipeline.handle_input_events().await;
949 })
950 .detach();
951
952 let first_handler_event = first_handler_event_receiver.next().await;
954 assert_eq!(first_handler_event, Some(input_event.clone()));
955 let second_handler_event = second_handler_event_receiver.next().await;
956 assert_eq!(second_handler_event, Some(input_event));
957 }
958
959 #[fasync::run_singlethreaded(test)]
962 async fn watch_devices_one_match_exists() {
963 let mut count: i8 = 0;
965 let dir = pseudo_directory! {
966 "file_name" => pseudo_fs_service::host(
967 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
968 async move {
969 while count < 3 {
970 if let Some(input_device_request) =
971 request_stream.try_next().await.unwrap()
972 {
973 handle_input_device_request(input_device_request);
974 count += 1;
975 }
976 }
977
978 }.boxed()
979 },
980 )
981 };
982
983 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
985 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
986 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
989
990 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
991 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
992 let supported_device_types = vec![input_device::InputDeviceType::Mouse];
993
994 let inspector = fuchsia_inspect::Inspector::default();
995 let test_node = inspector.root().create_child("input_pipeline");
996 test_node.record_string(
997 "supported_input_devices",
998 supported_device_types.clone().iter().join(", "),
999 );
1000 let input_devices = test_node.create_child("input_devices");
1001 diagnostics_assertions::assert_data_tree!(inspector, root: {
1003 input_pipeline: {
1004 supported_input_devices: "Mouse",
1005 input_devices: {}
1006 }
1007 });
1008
1009 let _ = InputPipeline::watch_for_devices(
1010 device_watcher,
1011 dir_proxy_for_pipeline,
1012 supported_device_types,
1013 input_event_sender,
1014 bindings.clone(),
1015 &input_devices,
1016 true, metrics::MetricsLogger::default(),
1018 )
1019 .await;
1020
1021 let bindings_hashmap = bindings.lock().await;
1023 assert_eq!(bindings_hashmap.len(), 1);
1024 let bindings_vector = bindings_hashmap.get(&10);
1025 assert!(bindings_vector.is_some());
1026 assert_eq!(bindings_vector.unwrap().len(), 1);
1027 let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1028 assert!(boxed_mouse_binding.is_some());
1029 assert_eq!(
1030 boxed_mouse_binding.unwrap().get_device_descriptor(),
1031 input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1032 device_id: 10,
1033 absolute_x_range: None,
1034 absolute_y_range: None,
1035 wheel_v_range: None,
1036 wheel_h_range: None,
1037 buttons: Some(vec![0]),
1038 counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1039 })
1040 );
1041
1042 diagnostics_assertions::assert_data_tree!(inspector, root: {
1044 input_pipeline: {
1045 supported_input_devices: "Mouse",
1046 input_devices: {
1047 devices_discovered: 1u64,
1048 devices_connected: 1u64,
1049 "file_name_Mouse": contains {
1050 reports_received_count: 0u64,
1051 reports_filtered_count: 0u64,
1052 events_generated: 0u64,
1053 last_received_timestamp_ns: 0u64,
1054 last_generated_timestamp_ns: 0u64,
1055 "fuchsia.inspect.Health": {
1056 status: "OK",
1057 start_timestamp_nanos: AnyProperty
1060 },
1061 }
1062 }
1063 }
1064 });
1065 }
1066
1067 #[fasync::run_singlethreaded(test)]
1070 async fn watch_devices_no_matches_exist() {
1071 let mut count: i8 = 0;
1073 let dir = pseudo_directory! {
1074 "file_name" => pseudo_fs_service::host(
1075 move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1076 async move {
1077 while count < 1 {
1078 if let Some(input_device_request) =
1079 request_stream.try_next().await.unwrap()
1080 {
1081 handle_input_device_request(input_device_request);
1082 count += 1;
1083 }
1084 }
1085
1086 }.boxed()
1087 },
1088 )
1089 };
1090
1091 let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1093 let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1094 let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1097
1098 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1099 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1100 let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1101
1102 let inspector = fuchsia_inspect::Inspector::default();
1103 let test_node = inspector.root().create_child("input_pipeline");
1104 test_node.record_string(
1105 "supported_input_devices",
1106 supported_device_types.clone().iter().join(", "),
1107 );
1108 let input_devices = test_node.create_child("input_devices");
1109 diagnostics_assertions::assert_data_tree!(inspector, root: {
1111 input_pipeline: {
1112 supported_input_devices: "Keyboard",
1113 input_devices: {}
1114 }
1115 });
1116
1117 let _ = InputPipeline::watch_for_devices(
1118 device_watcher,
1119 dir_proxy_for_pipeline,
1120 supported_device_types,
1121 input_event_sender,
1122 bindings.clone(),
1123 &input_devices,
1124 true, metrics::MetricsLogger::default(),
1126 )
1127 .await;
1128
1129 let bindings = bindings.lock().await;
1131 assert_eq!(bindings.len(), 0);
1132
1133 diagnostics_assertions::assert_data_tree!(inspector, root: {
1135 input_pipeline: {
1136 supported_input_devices: "Keyboard",
1137 input_devices: {
1138 devices_discovered: 1u64,
1139 devices_connected: 0u64,
1140 "file_name_Unsupported": {
1141 "fuchsia.inspect.Health": {
1142 status: "UNHEALTHY",
1143 message: "Unsupported device type.",
1144 start_timestamp_nanos: AnyProperty
1147 },
1148 }
1149 }
1150 }
1151 });
1152 }
1153
1154 #[fasync::run_singlethreaded(test)]
1157 async fn handle_input_device_registry_request_stream() {
1158 let (input_device_registry_proxy, input_device_registry_request_stream) =
1159 create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1160 let (input_device_client_end, mut input_device_request_stream) =
1161 create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1162
1163 let device_types = vec![input_device::InputDeviceType::Mouse];
1164 let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1165 let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1166
1167 let mut count: i8 = 0;
1169 fasync::Task::local(async move {
1170 let _ = input_device_registry_proxy.register(input_device_client_end);
1172
1173 while count < 3 {
1174 if let Some(input_device_request) =
1175 input_device_request_stream.try_next().await.unwrap()
1176 {
1177 handle_input_device_request(input_device_request);
1178 count += 1;
1179 }
1180 }
1181
1182 input_device_registry_proxy.take_event_stream();
1184 })
1185 .detach();
1186
1187 let inspector = fuchsia_inspect::Inspector::default();
1188 let test_node = inspector.root().create_child("input_pipeline");
1189
1190 let bindings_clone = bindings.clone();
1192 let _ = InputPipeline::handle_input_device_registry_request_stream(
1193 input_device_registry_request_stream,
1194 &device_types,
1195 &input_event_sender,
1196 &bindings_clone,
1197 &test_node,
1198 metrics::MetricsLogger::default(),
1199 )
1200 .await;
1201
1202 let bindings = bindings.lock().await;
1204 assert_eq!(bindings.len(), 1);
1205 }
1206
1207 #[fasync::run_singlethreaded(test)]
1209 async fn check_inspect_node_has_correct_properties() {
1210 let device_types = vec![
1211 input_device::InputDeviceType::Touch,
1212 input_device::InputDeviceType::ConsumerControls,
1213 ];
1214 let inspector = fuchsia_inspect::Inspector::default();
1215 let test_node = inspector.root().create_child("input_pipeline");
1216 let (fake_handler_event_sender, _fake_handler_event_receiver) =
1218 futures::channel::mpsc::channel(100);
1219 let fake_input_handler =
1220 observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1221 fake_handler_event_sender,
1222 );
1223 let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1224 .add_handler(fake_input_handler);
1225 let _test_input_pipeline = InputPipeline::new(
1226 device_types,
1227 assembly,
1228 test_node,
1229 metrics::MetricsLogger::default(),
1230 );
1231 diagnostics_assertions::assert_data_tree!(inspector, root: {
1232 input_pipeline: {
1233 supported_input_devices: "Touch, ConsumerControls",
1234 handlers_registered: 1u64,
1235 handlers_healthy: 1u64,
1236 input_devices: {}
1237 }
1238 });
1239 }
1240}