Skip to main content

input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use focus_chain_provider::FocusChainProviderPublisher;
12use fuchsia_fs::directory::{WatchEvent, Watcher};
13use fuchsia_inspect::NumericProperty;
14use fuchsia_inspect::health::Reporter;
15use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
16use futures::future::LocalBoxFuture;
17use futures::lock::Mutex;
18use futures::{StreamExt, TryStreamExt};
19use itertools::Itertools;
20use metrics_registry::*;
21use std::collections::HashMap;
22use std::path::PathBuf;
23use std::rc::Rc;
24use std::sync::atomic::{AtomicU32, Ordering};
25use std::sync::{Arc, LazyLock};
26use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
27
28/// Use a self incremental u32 unique id for device_id.
29///
30/// device id start from 10 to avoid conflict with default devices in Starnix.
31/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
32/// use default devices to deliver events from physical devices until we have
33/// API to expose device changes to UI clients.
34static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
35
36/// Each time this function is invoked, it returns the current value of its
37/// internal counter (serving as a unique id for device_id) and then increments
38/// that counter in preparation for the next call.
39fn get_next_device_id() -> u32 {
40    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
41}
42
43type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
44
45/// An [`InputDeviceBindingHashMap`] maps an input device to one or more InputDeviceBindings.
46/// It uses unique device id as key.
47pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
48
49/// An input pipeline assembly.
50///
51/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
52/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
53/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
54/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
55///
56/// # Implementation notes
57///
58/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
59/// handlers are connected together using async queues.  This allows fully streamed processing of
60/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
61/// without an external stimulus.
62pub struct InputPipelineAssembly {
63    /// The top-level sender: send into this queue to inject an event into the input
64    /// pipeline.
65    sender: UnboundedSender<Vec<input_device::InputEvent>>,
66    /// The bottom-level receiver: any events that fall through the entire pipeline can
67    /// be read from this receiver.
68    receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
69
70    /// The input handlers that comprise the input pipeline.
71    handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
72
73    /// The display ownership watcher task.
74    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
75
76    /// The focus listener task.
77    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
78
79    /// The metrics logger.
80    metrics_logger: metrics::MetricsLogger,
81}
82
83impl InputPipelineAssembly {
84    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
85    /// to add new handlers to it.
86    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
87        let (sender, receiver) = mpsc::unbounded();
88        InputPipelineAssembly {
89            sender,
90            receiver,
91            handlers: vec![],
92            metrics_logger,
93            display_ownership_fut: None,
94            focus_listener_fut: None,
95        }
96    }
97
98    /// Adds another [input_handler::BatchInputHandler] into the [InputPipelineAssembly]. The handlers
99    /// are invoked in the order they are added. Returns `Self` for chaining.
100    pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
101        self.handlers.push(handler);
102        self
103    }
104
105    /// Adds all handlers into the assembly in the order they appear in `handlers`.
106    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
107        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
108    }
109
110    pub fn add_display_ownership(
111        mut self,
112        display_ownership_event: zx::Event,
113        input_handlers_node: &fuchsia_inspect::Node,
114    ) -> InputPipelineAssembly {
115        let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
116        let metrics_logger_clone = self.metrics_logger.clone();
117        let h_clone = h.clone();
118        let sender_clone = self.sender.clone();
119        let display_ownership_fut = Box::pin(async move {
120            h_clone.clone().set_handler_healthy();
121            h_clone.clone()
122                .handle_ownership_change(sender_clone)
123                .await
124                .map_err(|e| {
125                    metrics_logger_clone.log_error(
126                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
127                        std::format!(
128                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
129                        })
130                        .unwrap();
131            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
132        });
133        self.display_ownership_fut = Some(display_ownership_fut);
134        self.add_handler(h)
135    }
136
137    /// Deconstructs the assembly into constituent components, used when constructing
138    /// [InputPipeline].
139    ///
140    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
141    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
142    fn into_components(
143        self,
144    ) -> (
145        UnboundedSender<Vec<input_device::InputEvent>>,
146        UnboundedReceiver<Vec<input_device::InputEvent>>,
147        Vec<Rc<dyn input_handler::BatchInputHandler>>,
148        metrics::MetricsLogger,
149        Option<LocalBoxFuture<'static, ()>>,
150        Option<LocalBoxFuture<'static, ()>>,
151    ) {
152        (
153            self.sender,
154            self.receiver,
155            self.handlers,
156            self.metrics_logger,
157            self.display_ownership_fut,
158            self.focus_listener_fut,
159        )
160    }
161
162    pub fn add_focus_listener(
163        mut self,
164        focus_chain_publisher: FocusChainProviderPublisher,
165    ) -> Self {
166        let metrics_logger_clone = self.metrics_logger.clone();
167        let focus_listener_fut = Box::pin(async move {
168            if let Ok(mut focus_listener) =
169                FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
170                    log::warn!(
171                        "could not create focus listener, focus will not be dispatched: {:?}",
172                        e
173                    )
174                })
175            {
176                // This will await indefinitely and process focus messages in a loop, unless there
177                // is a problem.
178                let _result = focus_listener
179                    .dispatch_focus_changes()
180                    .await
181                    .map(|_| {
182                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
183                    })
184                    .map_err(|e| {
185                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
186                    });
187            }
188        });
189        self.focus_listener_fut = Some(focus_listener_fut);
190        self
191    }
192}
193
194/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
195///
196/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
197/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
198///
199/// # Example
200/// ```
201/// let ime_handler =
202///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
203/// let touch_handler = TouchHandler::new(
204///     scene_manager.session.clone(),
205///     scene_manager.compositor_id,
206///     scene_manager.display_size
207/// ).await?;
208///
209/// let assembly = InputPipelineAssembly::new()
210///     .add_handler(Box::new(ime_handler)),
211///     .add_handler(Box::new(touch_handler)),
212/// let input_pipeline = InputPipeline::new(
213///     vec![
214///         input_device::InputDeviceType::Touch,
215///         input_device::InputDeviceType::Keyboard,
216///     ],
217///     assembly,
218/// );
219/// input_pipeline.handle_input_events().await;
220/// ```
221pub struct InputPipeline {
222    /// The entry point into the input handler pipeline. Incoming input events should
223    /// be inserted into this async queue, and the input pipeline will ensure that they
224    /// are propagated through all the input handlers in the appropriate sequence.
225    pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
226
227    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
228    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
229    device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
230
231    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
232    device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
233
234    /// The types of devices this pipeline supports.
235    input_device_types: Vec<input_device::InputDeviceType>,
236
237    /// The InputDeviceBindings bound to this pipeline.
238    input_device_bindings: InputDeviceBindingHashMap,
239
240    /// This node is bound to the lifetime of this InputPipeline.
241    /// Inspect data will be dumped for this pipeline as long as it exists.
242    inspect_node: fuchsia_inspect::Node,
243
244    /// The metrics logger.
245    metrics_logger: metrics::MetricsLogger,
246
247    /// The feature flags for the input pipeline.
248    pub feature_flags: input_device::InputPipelineFeatureFlags,
249}
250
251impl InputPipeline {
252    fn new_common(
253        input_device_types: Vec<input_device::InputDeviceType>,
254        assembly: InputPipelineAssembly,
255        inspect_node: fuchsia_inspect::Node,
256        feature_flags: input_device::InputPipelineFeatureFlags,
257    ) -> Self {
258        let (
259            pipeline_sender,
260            receiver,
261            handlers,
262            metrics_logger,
263            display_ownership_fut,
264            focus_listener_fut,
265        ) = assembly.into_components();
266
267        let mut handlers_count = handlers.len();
268        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
269        if let Some(fut) = display_ownership_fut {
270            fasync::Task::local(fut).detach();
271            handlers_count += 1;
272        }
273
274        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
275        if let Some(fut) = focus_listener_fut {
276            fasync::Task::local(fut).detach();
277            handlers_count += 1;
278        }
279
280        // Add properties to inspect node
281        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
282        inspect_node.record_uint("handlers_registered", handlers_count as u64);
283        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
284
285        // Initializes all handlers and starts the input pipeline loop.
286        InputPipeline::run(receiver, handlers);
287
288        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
289        let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
290        InputPipeline {
291            pipeline_sender,
292            device_event_sender,
293            device_event_receiver,
294            input_device_types,
295            input_device_bindings,
296            inspect_node,
297            metrics_logger,
298            feature_flags,
299        }
300    }
301
302    /// Creates a new [`InputPipeline`] for integration testing.
303    /// Unlike a production input pipeline, this pipeline will not monitor
304    /// `/dev/class/input-report` for devices.
305    ///
306    /// # Parameters
307    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
308    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
309    pub fn new_for_test(
310        input_device_types: Vec<input_device::InputDeviceType>,
311        assembly: InputPipelineAssembly,
312    ) -> Self {
313        let inspector = fuchsia_inspect::Inspector::default();
314        let root = inspector.root();
315        let test_node = root.create_child("input_pipeline");
316        Self::new_common(
317            input_device_types,
318            assembly,
319            test_node,
320            input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
321        )
322    }
323
324    /// Creates a new [`InputPipeline`] for production use.
325    ///
326    /// # Parameters
327    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
328    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
329    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
330    pub fn new(
331        input_device_types: Vec<input_device::InputDeviceType>,
332        assembly: InputPipelineAssembly,
333        inspect_node: fuchsia_inspect::Node,
334        feature_flags: input_device::InputPipelineFeatureFlags,
335        metrics_logger: metrics::MetricsLogger,
336    ) -> Result<Self, Error> {
337        let input_pipeline =
338            Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
339        let input_device_types = input_pipeline.input_device_types.clone();
340        let input_event_sender = input_pipeline.device_event_sender.clone();
341        let input_device_bindings = input_pipeline.input_device_bindings.clone();
342        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
343        let feature_flags = input_pipeline.feature_flags.clone();
344        fasync::Task::local(async move {
345            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
346            // that send InputEvents to `input_event_receiver`.
347            match async {
348                let dir_proxy = fuchsia_fs::directory::open_in_namespace(
349                    input_device::INPUT_REPORT_PATH,
350                    fuchsia_fs::PERM_READABLE,
351                )
352                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
353                let device_watcher =
354                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
355                Self::watch_for_devices(
356                    device_watcher,
357                    dir_proxy,
358                    input_device_types,
359                    input_event_sender,
360                    input_device_bindings,
361                    &devices_node,
362                    false, /* break_on_idle */
363                    feature_flags,
364                    metrics_logger.clone(),
365                )
366                .await
367                .context("failed to watch for devices")
368            }
369            .await
370            {
371                Ok(()) => {}
372                Err(err) => {
373                    // This error is usually benign in tests: it means that the setup does not
374                    // support dynamic device discovery. Almost no tests support dynamic
375                    // device discovery, and they also do not need those.
376                    metrics_logger.log_warn(
377                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
378                        std::format!(
379                            "Input pipeline is unable to watch for new input devices: {:?}",
380                            err
381                        ));
382                }
383            }
384        })
385        .detach();
386
387        Ok(input_pipeline)
388    }
389
390    /// Gets the input device bindings.
391    pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
392        &self.input_device_bindings
393    }
394
395    /// Gets the input device sender: this is the channel that should be cloned
396    /// and used for injecting events from the drivers into the input pipeline.
397    pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
398        &self.device_event_sender
399    }
400
401    /// Gets a list of input device types supported by this input pipeline.
402    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
403        &self.input_device_types
404    }
405
406    /// Forwards all input events into the input pipeline.
407    pub async fn handle_input_events(mut self) {
408        let metrics_logger_clone = self.metrics_logger.clone();
409        while let Some(input_event) = self.device_event_receiver.next().await {
410            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
411                metrics_logger_clone.log_error(
412                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
413                    std::format!("could not forward event from driver: {:?}", &e));
414            }
415        }
416
417        metrics_logger_clone.log_error(
418            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
419            "Input pipeline stopped handling input events.".to_string(),
420        );
421    }
422
423    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
424    /// if new devices match a type in `device_types`.
425    ///
426    /// # Parameters
427    /// - `device_watcher`: Watches the input report directory for new devices.
428    /// - `dir_proxy`: The directory containing InputDevice connections.
429    /// - `device_types`: The types of devices to watch for.
430    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
431    /// - `bindings`: Holds all the InputDeviceBindings
432    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
433    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
434    /// - `metrics_logger`: The metrics logger.
435    ///
436    /// # Errors
437    /// If the input report directory or a file within it cannot be read.
438    async fn watch_for_devices(
439        mut device_watcher: Watcher,
440        dir_proxy: fio::DirectoryProxy,
441        device_types: Vec<input_device::InputDeviceType>,
442        input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
443        bindings: InputDeviceBindingHashMap,
444        input_devices_node: &fuchsia_inspect::Node,
445        break_on_idle: bool,
446        feature_flags: input_device::InputPipelineFeatureFlags,
447        metrics_logger: metrics::MetricsLogger,
448    ) -> Result<(), Error> {
449        // Add non-static properties to inspect node.
450        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
451        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
452        while let Some(msg) = device_watcher.try_next().await? {
453            if let Ok(filename) = msg.filename.into_os_string().into_string() {
454                if filename == "." {
455                    continue;
456                }
457
458                let pathbuf = PathBuf::from(filename.clone());
459                match msg.event {
460                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
461                        log::info!("found input device {}", filename);
462                        devices_discovered.add(1);
463                        let device_proxy =
464                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
465                        add_device_bindings(
466                            &device_types,
467                            &filename,
468                            device_proxy,
469                            &input_event_sender,
470                            &bindings,
471                            get_next_device_id(),
472                            input_devices_node,
473                            Some(&devices_connected),
474                            feature_flags.clone(),
475                            metrics_logger.clone(),
476                        )
477                        .await;
478                    }
479                    WatchEvent::IDLE => {
480                        if break_on_idle {
481                            break;
482                        }
483                    }
484                    _ => (),
485                }
486            }
487        }
488        // Ensure inspect properties persist for debugging if device watch loop ends.
489        input_devices_node.record(devices_discovered);
490        input_devices_node.record(devices_connected);
491        Err(format_err!("Input pipeline stopped watching for new input devices."))
492    }
493
494    /// Handles the incoming InputDeviceRegistryRequestStream.
495    ///
496    /// This method will end when the request stream is closed. If the stream closes with an
497    /// error the error will be returned in the Result.
498    ///
499    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
500    ///
501    /// # Parameters
502    /// - `stream`: The stream of InputDeviceRegistryRequests.
503    /// - `device_types`: The types of devices to watch for.
504    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
505    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
506    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
507    /// - `metrics_logger`: The metrics logger.
508    pub async fn handle_input_device_registry_request_stream(
509        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
510        device_types: &Vec<input_device::InputDeviceType>,
511        input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
512        bindings: &InputDeviceBindingHashMap,
513        input_devices_node: &fuchsia_inspect::Node,
514        feature_flags: input_device::InputPipelineFeatureFlags,
515        metrics_logger: metrics::MetricsLogger,
516    ) -> Result<(), Error> {
517        while let Some(request) = stream
518            .try_next()
519            .await
520            .context("Error handling input device registry request stream")?
521        {
522            match request {
523                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
524                    device,
525                    ..
526                } => {
527                    // Add a binding if the device is a type being tracked
528                    let device_proxy = device.into_proxy();
529
530                    let device_id = get_next_device_id();
531
532                    add_device_bindings(
533                        device_types,
534                        &format!("input-device-registry-{}", device_id),
535                        device_proxy,
536                        input_event_sender,
537                        bindings,
538                        device_id,
539                        input_devices_node,
540                        None,
541                        feature_flags.clone(),
542                        metrics_logger.clone(),
543                    )
544                    .await;
545                }
546                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
547                    device,
548                    responder,
549                    .. } => {
550                    // Add a binding if the device is a type being tracked
551                    let device_proxy = device.into_proxy();
552
553                    let device_id = get_next_device_id();
554
555                    add_device_bindings(
556                        device_types,
557                        &format!("input-device-registry-{}", device_id),
558                        device_proxy,
559                        input_event_sender,
560                        bindings,
561                        device_id,
562                        input_devices_node,
563                        None,
564                        feature_flags.clone(),
565                        metrics_logger.clone(),
566                    )
567                    .await;
568
569                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
570                        device_id: Some(device_id),
571                        ..Default::default()
572                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
573                }
574            }
575        }
576
577        Ok(())
578    }
579
580    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
581    fn run(
582        mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
583        handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
584    ) {
585        fasync::Task::local(async move {
586            for handler in &handlers {
587                handler.clone().set_handler_healthy();
588            }
589
590            use input_device::InputEventType;
591            use std::collections::HashMap;
592
593            // Pre-compute handler lists for each event type.
594            let mut handlers_by_type: HashMap<
595                InputEventType,
596                Vec<Rc<dyn input_handler::BatchInputHandler>>,
597            > = HashMap::new();
598
599            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
600            let event_types = vec![
601                InputEventType::Keyboard,
602                InputEventType::LightSensor,
603                InputEventType::ConsumerControls,
604                InputEventType::Mouse,
605                InputEventType::TouchScreen,
606                InputEventType::Touchpad,
607                #[cfg(test)]
608                InputEventType::Fake,
609            ];
610
611            for event_type in event_types {
612                let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
613                    .iter()
614                    .filter(|h| h.interest().contains(&event_type))
615                    .cloned()
616                    .collect();
617                handlers_by_type.insert(event_type, handlers_for_type);
618            }
619
620            while let Some(events) = receiver.next().await {
621                if events.is_empty() {
622                    continue;
623                }
624
625                let mut groups_seen = 0;
626                for (event_type, event_group) in events
627                    .into_iter()
628                    .chunk_by(|e| InputEventType::from(&e.device_event))
629                    .into_iter()
630                {
631                    groups_seen += 1;
632                    if groups_seen == 2 {
633                        log::warn!(
634                            "it is not recommanded to contains multiple type of event in 1 send"
635                        );
636                    }
637                    let mut events_in_group: Vec<_> = event_group.collect();
638
639                    // Get pre-computed handlers for this event type.
640                    let handlers = handlers_by_type.get(&event_type).unwrap();
641
642                    for handler in handlers {
643                        events_in_group =
644                            handler.clone().handle_input_events(events_in_group).await;
645                    }
646
647                    for event in events_in_group {
648                        if event.handled == input_device::Handled::No {
649                            log::warn!("unhandled input event: {:?}", &event);
650                        }
651                        if let Some(trace_id) = event.trace_id {
652                            fuchsia_trace::flow_end!(
653                                "input",
654                                "event_in_input_pipeline",
655                                trace_id.into()
656                            );
657                        }
658                    }
659                }
660            }
661            for handler in &handlers {
662                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
663            }
664            panic!("Runner task is not supposed to terminate.")
665        })
666        .detach();
667    }
668}
669
670/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
671///
672/// # Parameters
673/// - `device_types`: The types of devices to watch for.
674/// - `device_proxy`: A proxy to the input device.
675/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
676/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
677/// - `device_id`: The device id of the associated bindings.
678/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
679///
680/// # Note
681/// This will create multiple bindings, in the case where
682/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
683///   with multiple table fields populated, and
684/// * multiple populated table fields correspond to device types present in `device_types`
685///
686/// This is used, for example, to support the Atlas touchpad. In that case, a single
687/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
688/// a `fuchsia.input.report.TouchDescriptor`.
689async fn add_device_bindings(
690    device_types: &Vec<input_device::InputDeviceType>,
691    filename: &String,
692    device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
693    input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
694    bindings: &InputDeviceBindingHashMap,
695    device_id: u32,
696    input_devices_node: &fuchsia_inspect::Node,
697    devices_connected: Option<&fuchsia_inspect::UintProperty>,
698    feature_flags: InputPipelineFeatureFlags,
699    metrics_logger: metrics::MetricsLogger,
700) {
701    let mut matched_device_types = vec![];
702    if let Ok(descriptor) = device_proxy.get_descriptor().await {
703        for device_type in device_types {
704            if input_device::is_device_type(&descriptor, *device_type).await {
705                matched_device_types.push(device_type);
706                match devices_connected {
707                    Some(dev_connected) => {
708                        let _ = dev_connected.add(1);
709                    }
710                    None => (),
711                };
712            }
713        }
714        if matched_device_types.is_empty() {
715            log::info!(
716                "device {} did not match any supported device types: {:?}",
717                filename,
718                device_types
719            );
720            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
721            let mut health = fuchsia_inspect::health::Node::new(&device_node);
722            health.set_unhealthy("Unsupported device type.");
723            device_node.record(health);
724            input_devices_node.record(device_node);
725            return;
726        }
727    } else {
728        metrics_logger.clone().log_error(
729            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
730            std::format!("cannot bind device {} without a device descriptor", filename),
731        );
732        return;
733    }
734
735    log::info!(
736        "binding {} to device types: {}",
737        filename,
738        matched_device_types
739            .iter()
740            .fold(String::new(), |device_types_string, device_type| device_types_string
741                + &format!("{:?}, ", device_type))
742    );
743
744    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
745    for device_type in matched_device_types {
746        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
747        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
748        //
749        // There's no conflict in having multiple bindings read from the same node,
750        // since:
751        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
752        // * the device driver will copy each incoming report to each connected reader.
753        //
754        // This does mean that reports from the Atlas touchpad device get read twice
755        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
756        // is operating in mouse mode or touchpad mode.
757        //
758        // This hasn't been an issue because:
759        // * Semantically: things are fine, because each binding discards irrelevant reports.
760        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
761        // * Performance wise: things are fine, because the data rate of the touchpad is low
762        //   (125 HZ).
763        //
764        // If we add additional cases where bindings share an underlying `input-report` node,
765        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
766        let proxy = device_proxy.clone();
767        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
768        match input_device::get_device_binding(
769            *device_type,
770            proxy,
771            device_id,
772            input_event_sender.clone(),
773            device_node,
774            feature_flags.clone(),
775            metrics_logger.clone(),
776        )
777        .await
778        {
779            Ok(binding) => new_bindings.push(binding),
780            Err(e) => {
781                metrics_logger.log_error(
782                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
783                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
784                );
785            }
786        }
787    }
788
789    if !new_bindings.is_empty() {
790        let mut bindings = bindings.lock().await;
791        bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use super::*;
798    use crate::input_device::{InputDeviceBinding, InputEventType};
799    use crate::utils::Position;
800    use crate::{
801        fake_input_device_binding, mouse_binding, mouse_model_database,
802        observe_fake_events_input_handler,
803    };
804    use async_trait::async_trait;
805    use diagnostics_assertions::AnyProperty;
806    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
807    use fuchsia_async as fasync;
808    use futures::FutureExt;
809    use pretty_assertions::assert_eq;
810    use rand::Rng;
811    use std::collections::HashSet;
812    use vfs::{pseudo_directory, service as pseudo_fs_service};
813
814    const COUNTS_PER_MM: u32 = 12;
815
816    /// Returns the InputEvent sent over `sender`.
817    ///
818    /// # Parameters
819    /// - `sender`: The channel to send the InputEvent over.
820    fn send_input_event(
821        sender: UnboundedSender<Vec<input_device::InputEvent>>,
822    ) -> Vec<input_device::InputEvent> {
823        let mut rng = rand::rng();
824        let offset =
825            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
826        let input_event = input_device::InputEvent {
827            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
828                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
829                    millimeters: Position {
830                        x: offset.x / COUNTS_PER_MM as f32,
831                        y: offset.y / COUNTS_PER_MM as f32,
832                    },
833                }),
834                None, /* wheel_delta_v */
835                None, /* wheel_delta_h */
836                mouse_binding::MousePhase::Move,
837                HashSet::new(),
838                HashSet::new(),
839                None, /* is_precision_scroll */
840                None, /* wake_lease */
841            )),
842            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
843                mouse_binding::MouseDeviceDescriptor {
844                    device_id: 1,
845                    absolute_x_range: None,
846                    absolute_y_range: None,
847                    wheel_v_range: None,
848                    wheel_h_range: None,
849                    buttons: None,
850                    counts_per_mm: COUNTS_PER_MM,
851                },
852            ),
853            event_time: zx::MonotonicInstant::get(),
854            handled: input_device::Handled::No,
855            trace_id: None,
856        };
857        match sender.unbounded_send(vec![input_event.clone()]) {
858            Err(_) => assert!(false),
859            _ => {}
860        }
861
862        vec![input_event]
863    }
864
865    /// Returns a MouseDescriptor on an InputDeviceRequest.
866    ///
867    /// # Parameters
868    /// - `input_device_request`: The request to handle.
869    fn handle_input_device_request(
870        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
871    ) {
872        match input_device_request {
873            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
874                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
875                    device_information: None,
876                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
877                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
878                            movement_x: None,
879                            movement_y: None,
880                            scroll_v: None,
881                            scroll_h: None,
882                            buttons: Some(vec![0]),
883                            position_x: None,
884                            position_y: None,
885                            ..Default::default()
886                        }),
887                        ..Default::default()
888                    }),
889                    sensor: None,
890                    touch: None,
891                    keyboard: None,
892                    consumer_control: None,
893                    ..Default::default()
894                });
895            }
896            _ => {}
897        }
898    }
899
900    /// Tests that an input pipeline handles events from multiple devices.
901    #[fasync::run_singlethreaded(test)]
902    async fn multiple_devices_single_handler() {
903        // Create two fake device bindings.
904        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
905        let first_device_binding =
906            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
907        let second_device_binding =
908            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
909
910        // Create a fake input handler.
911        let (handler_event_sender, mut handler_event_receiver) =
912            futures::channel::mpsc::channel(100);
913        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
914            handler_event_sender,
915        );
916
917        // Build the input pipeline.
918        let (sender, receiver, handlers, _, _, _) =
919            InputPipelineAssembly::new(metrics::MetricsLogger::default())
920                .add_handler(input_handler)
921                .into_components();
922        let inspector = fuchsia_inspect::Inspector::default();
923        let test_node = inspector.root().create_child("input_pipeline");
924        let input_pipeline = InputPipeline {
925            pipeline_sender: sender,
926            device_event_sender,
927            device_event_receiver,
928            input_device_types: vec![],
929            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
930            inspect_node: test_node,
931            metrics_logger: metrics::MetricsLogger::default(),
932            feature_flags: input_device::InputPipelineFeatureFlags::default(),
933        };
934        InputPipeline::run(receiver, handlers);
935
936        // Send an input event from each device.
937        let first_device_events = send_input_event(first_device_binding.input_event_sender());
938        let second_device_events = send_input_event(second_device_binding.input_event_sender());
939
940        // Run the pipeline.
941        fasync::Task::local(async {
942            input_pipeline.handle_input_events().await;
943        })
944        .detach();
945
946        // Assert the handler receives the events.
947        let first_handled_event = handler_event_receiver.next().await;
948        assert_eq!(first_handled_event, first_device_events.into_iter().next());
949
950        let second_handled_event = handler_event_receiver.next().await;
951        assert_eq!(second_handled_event, second_device_events.into_iter().next());
952    }
953
954    /// Tests that an input pipeline handles events through multiple input handlers.
955    #[fasync::run_singlethreaded(test)]
956    async fn single_device_multiple_handlers() {
957        // Create two fake device bindings.
958        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
959        let input_device_binding =
960            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
961
962        // Create two fake input handlers.
963        let (first_handler_event_sender, mut first_handler_event_receiver) =
964            futures::channel::mpsc::channel(100);
965        let first_input_handler =
966            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
967                first_handler_event_sender,
968            );
969        let (second_handler_event_sender, mut second_handler_event_receiver) =
970            futures::channel::mpsc::channel(100);
971        let second_input_handler =
972            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
973                second_handler_event_sender,
974            );
975
976        // Build the input pipeline.
977        let (sender, receiver, handlers, _, _, _) =
978            InputPipelineAssembly::new(metrics::MetricsLogger::default())
979                .add_handler(first_input_handler)
980                .add_handler(second_input_handler)
981                .into_components();
982        let inspector = fuchsia_inspect::Inspector::default();
983        let test_node = inspector.root().create_child("input_pipeline");
984        let input_pipeline = InputPipeline {
985            pipeline_sender: sender,
986            device_event_sender,
987            device_event_receiver,
988            input_device_types: vec![],
989            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
990            inspect_node: test_node,
991            metrics_logger: metrics::MetricsLogger::default(),
992            feature_flags: input_device::InputPipelineFeatureFlags::default(),
993        };
994        InputPipeline::run(receiver, handlers);
995
996        // Send an input event.
997        let input_events = send_input_event(input_device_binding.input_event_sender());
998
999        // Run the pipeline.
1000        fasync::Task::local(async {
1001            input_pipeline.handle_input_events().await;
1002        })
1003        .detach();
1004
1005        // Assert both handlers receive the event.
1006        let expected_event = input_events.into_iter().next();
1007        let first_handler_event = first_handler_event_receiver.next().await;
1008        assert_eq!(first_handler_event, expected_event);
1009        let second_handler_event = second_handler_event_receiver.next().await;
1010        assert_eq!(second_handler_event, expected_event);
1011    }
1012
1013    /// Tests that a single mouse device binding is created for the one input device in the
1014    /// input report directory.
1015    #[fasync::run_singlethreaded(test)]
1016    async fn watch_devices_one_match_exists() {
1017        // Create a file in a pseudo directory that represents an input device.
1018        let mut count: i8 = 0;
1019        let dir = pseudo_directory! {
1020            "file_name" => pseudo_fs_service::host(
1021                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1022                    async move {
1023                        while count < 3 {
1024                            if let Some(input_device_request) =
1025                                request_stream.try_next().await.unwrap()
1026                            {
1027                                handle_input_device_request(input_device_request);
1028                                count += 1;
1029                            }
1030                        }
1031
1032                    }.boxed()
1033                },
1034            )
1035        };
1036
1037        // Create a Watcher on the pseudo directory.
1038        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1039        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1040        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1041        // proxy to get connections to input devices.
1042        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1043
1044        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1045        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1046        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1047
1048        let inspector = fuchsia_inspect::Inspector::default();
1049        let test_node = inspector.root().create_child("input_pipeline");
1050        test_node.record_string(
1051            "supported_input_devices",
1052            supported_device_types.clone().iter().join(", "),
1053        );
1054        let input_devices = test_node.create_child("input_devices");
1055        // Assert that inspect tree is initialized with no devices.
1056        diagnostics_assertions::assert_data_tree!(inspector, root: {
1057            input_pipeline: {
1058                supported_input_devices: "Mouse",
1059                input_devices: {}
1060            }
1061        });
1062
1063        let _ = InputPipeline::watch_for_devices(
1064            device_watcher,
1065            dir_proxy_for_pipeline,
1066            supported_device_types,
1067            input_event_sender,
1068            bindings.clone(),
1069            &input_devices,
1070            true, /* break_on_idle */
1071            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1072            metrics::MetricsLogger::default(),
1073        )
1074        .await;
1075
1076        // Assert that one mouse device with accurate device id was found.
1077        let bindings_hashmap = bindings.lock().await;
1078        assert_eq!(bindings_hashmap.len(), 1);
1079        let bindings_vector = bindings_hashmap.get(&10);
1080        assert!(bindings_vector.is_some());
1081        assert_eq!(bindings_vector.unwrap().len(), 1);
1082        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1083        assert!(boxed_mouse_binding.is_some());
1084        assert_eq!(
1085            boxed_mouse_binding.unwrap().get_device_descriptor(),
1086            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1087                device_id: 10,
1088                absolute_x_range: None,
1089                absolute_y_range: None,
1090                wheel_v_range: None,
1091                wheel_h_range: None,
1092                buttons: Some(vec![0]),
1093                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1094            })
1095        );
1096
1097        // Assert that inspect tree reflects new device discovered and connected.
1098        diagnostics_assertions::assert_data_tree!(inspector, root: {
1099            input_pipeline: {
1100                supported_input_devices: "Mouse",
1101                input_devices: {
1102                    devices_discovered: 1u64,
1103                    devices_connected: 1u64,
1104                    "file_name_Mouse": contains {
1105                        reports_received_count: 0u64,
1106                        reports_filtered_count: 0u64,
1107                        events_generated: 0u64,
1108                        last_received_timestamp_ns: 0u64,
1109                        last_generated_timestamp_ns: 0u64,
1110                        "fuchsia.inspect.Health": {
1111                            status: "OK",
1112                            // Timestamp value is unpredictable and not relevant in this context,
1113                            // so we only assert that the property is present.
1114                            start_timestamp_nanos: AnyProperty
1115                        },
1116                    }
1117                }
1118            }
1119        });
1120    }
1121
1122    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1123    /// but only a mouse exists.
1124    #[fasync::run_singlethreaded(test)]
1125    async fn watch_devices_no_matches_exist() {
1126        // Create a file in a pseudo directory that represents an input device.
1127        let mut count: i8 = 0;
1128        let dir = pseudo_directory! {
1129            "file_name" => pseudo_fs_service::host(
1130                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1131                    async move {
1132                        while count < 1 {
1133                            if let Some(input_device_request) =
1134                                request_stream.try_next().await.unwrap()
1135                            {
1136                                handle_input_device_request(input_device_request);
1137                                count += 1;
1138                            }
1139                        }
1140
1141                    }.boxed()
1142                },
1143            )
1144        };
1145
1146        // Create a Watcher on the pseudo directory.
1147        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1148        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1149        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1150        // proxy to get connections to input devices.
1151        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1152
1153        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1154        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1155        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1156
1157        let inspector = fuchsia_inspect::Inspector::default();
1158        let test_node = inspector.root().create_child("input_pipeline");
1159        test_node.record_string(
1160            "supported_input_devices",
1161            supported_device_types.clone().iter().join(", "),
1162        );
1163        let input_devices = test_node.create_child("input_devices");
1164        // Assert that inspect tree is initialized with no devices.
1165        diagnostics_assertions::assert_data_tree!(inspector, root: {
1166            input_pipeline: {
1167                supported_input_devices: "Keyboard",
1168                input_devices: {}
1169            }
1170        });
1171
1172        let _ = InputPipeline::watch_for_devices(
1173            device_watcher,
1174            dir_proxy_for_pipeline,
1175            supported_device_types,
1176            input_event_sender,
1177            bindings.clone(),
1178            &input_devices,
1179            true, /* break_on_idle */
1180            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1181            metrics::MetricsLogger::default(),
1182        )
1183        .await;
1184
1185        // Assert that no devices were found.
1186        let bindings = bindings.lock().await;
1187        assert_eq!(bindings.len(), 0);
1188
1189        // Assert that inspect tree reflects new device discovered, but not connected.
1190        diagnostics_assertions::assert_data_tree!(inspector, root: {
1191            input_pipeline: {
1192                supported_input_devices: "Keyboard",
1193                input_devices: {
1194                    devices_discovered: 1u64,
1195                    devices_connected: 0u64,
1196                    "file_name_Unsupported": {
1197                        "fuchsia.inspect.Health": {
1198                            status: "UNHEALTHY",
1199                            message: "Unsupported device type.",
1200                            // Timestamp value is unpredictable and not relevant in this context,
1201                            // so we only assert that the property is present.
1202                            start_timestamp_nanos: AnyProperty
1203                        },
1204                    }
1205                }
1206            }
1207        });
1208    }
1209
1210    /// Tests that a single keyboard device binding is created for the input device registered
1211    /// through InputDeviceRegistry.
1212    #[fasync::run_singlethreaded(test)]
1213    async fn handle_input_device_registry_request_stream() {
1214        let (input_device_registry_proxy, input_device_registry_request_stream) =
1215            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1216        let (input_device_client_end, mut input_device_request_stream) =
1217            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1218
1219        let device_types = vec![input_device::InputDeviceType::Mouse];
1220        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1221        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1222
1223        // Handle input device requests.
1224        let mut count: i8 = 0;
1225        fasync::Task::local(async move {
1226            // Register a device.
1227            let _ = input_device_registry_proxy.register(input_device_client_end);
1228
1229            while count < 3 {
1230                if let Some(input_device_request) =
1231                    input_device_request_stream.try_next().await.unwrap()
1232                {
1233                    handle_input_device_request(input_device_request);
1234                    count += 1;
1235                }
1236            }
1237
1238            // End handle_input_device_registry_request_stream() by taking the event stream.
1239            input_device_registry_proxy.take_event_stream();
1240        })
1241        .detach();
1242
1243        let inspector = fuchsia_inspect::Inspector::default();
1244        let test_node = inspector.root().create_child("input_pipeline");
1245
1246        // Start listening for InputDeviceRegistryRequests.
1247        let bindings_clone = bindings.clone();
1248        let _ = InputPipeline::handle_input_device_registry_request_stream(
1249            input_device_registry_request_stream,
1250            &device_types,
1251            &input_event_sender,
1252            &bindings_clone,
1253            &test_node,
1254            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1255            metrics::MetricsLogger::default(),
1256        )
1257        .await;
1258
1259        // Assert that a device was registered.
1260        let bindings = bindings.lock().await;
1261        assert_eq!(bindings.len(), 1);
1262    }
1263
1264    // Tests that correct properties are added to inspect node when InputPipeline is created.
1265    #[fasync::run_singlethreaded(test)]
1266    async fn check_inspect_node_has_correct_properties() {
1267        let device_types = vec![
1268            input_device::InputDeviceType::Touch,
1269            input_device::InputDeviceType::ConsumerControls,
1270        ];
1271        let inspector = fuchsia_inspect::Inspector::default();
1272        let test_node = inspector.root().create_child("input_pipeline");
1273        // Create fake input handler for assembly
1274        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1275            futures::channel::mpsc::channel(100);
1276        let fake_input_handler =
1277            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1278                fake_handler_event_sender,
1279            );
1280        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1281            .add_handler(fake_input_handler);
1282        let _test_input_pipeline = InputPipeline::new(
1283            device_types,
1284            assembly,
1285            test_node,
1286            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1287            metrics::MetricsLogger::default(),
1288        );
1289        diagnostics_assertions::assert_data_tree!(inspector, root: {
1290            input_pipeline: {
1291                supported_input_devices: "Touch, ConsumerControls",
1292                handlers_registered: 1u64,
1293                handlers_healthy: 1u64,
1294                input_devices: {}
1295            }
1296        });
1297    }
1298
1299    struct SpecificInterestFakeHandler {
1300        interest_types: Vec<input_device::InputEventType>,
1301        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1302    }
1303
1304    impl SpecificInterestFakeHandler {
1305        pub fn new(
1306            interest_types: Vec<input_device::InputEventType>,
1307            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1308        ) -> Rc<Self> {
1309            Rc::new(SpecificInterestFakeHandler {
1310                interest_types,
1311                event_sender: std::cell::RefCell::new(event_sender),
1312            })
1313        }
1314    }
1315
1316    impl Handler for SpecificInterestFakeHandler {
1317        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1318        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1319        fn get_name(&self) -> &'static str {
1320            "SpecificInterestFakeHandler"
1321        }
1322
1323        fn interest(&self) -> Vec<input_device::InputEventType> {
1324            self.interest_types.clone()
1325        }
1326    }
1327
1328    #[async_trait(?Send)]
1329    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1330        async fn handle_input_event(
1331            self: Rc<Self>,
1332            input_event: input_device::InputEvent,
1333        ) -> Vec<input_device::InputEvent> {
1334            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1335                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1336                Ok(_) => {}
1337            }
1338            vec![input_event]
1339        }
1340    }
1341
1342    #[fasync::run_singlethreaded(test)]
1343    async fn run_only_sends_events_to_interested_handlers() {
1344        // Mouse Handler (Specific Interest: Mouse)
1345        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1346        let mouse_handler =
1347            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1348
1349        // Fake Handler (Specific Interest: Fake)
1350        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1351        let fake_handler =
1352            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1353
1354        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1355            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1356                .add_handler(mouse_handler)
1357                .add_handler(fake_handler)
1358                .into_components();
1359
1360        // Run the pipeline logic
1361        InputPipeline::run(pipeline_receiver, handlers);
1362
1363        // Create a Fake event
1364        let fake_event = input_device::InputEvent {
1365            device_event: input_device::InputDeviceEvent::Fake,
1366            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1367            event_time: zx::MonotonicInstant::get(),
1368            handled: input_device::Handled::No,
1369            trace_id: None,
1370        };
1371
1372        // Send the Fake event
1373        pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1374
1375        // Verify Fake Handler received it
1376        let received_by_fake = fake_receiver.next().await;
1377        assert_eq!(received_by_fake, Some(fake_event));
1378
1379        // Verify Mouse Handler did NOT receive it
1380        assert!(mouse_receiver.try_next().is_err());
1381    }
1382
1383    fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1384        input_device::InputEvent {
1385            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1386                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1387                    millimeters: Position { x, y },
1388                }),
1389                None,
1390                None,
1391                mouse_binding::MousePhase::Move,
1392                HashSet::new(),
1393                HashSet::new(),
1394                None,
1395                None,
1396            )),
1397            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1398                mouse_binding::MouseDeviceDescriptor {
1399                    device_id: 1,
1400                    absolute_x_range: None,
1401                    absolute_y_range: None,
1402                    wheel_v_range: None,
1403                    wheel_h_range: None,
1404                    buttons: None,
1405                    counts_per_mm: 1,
1406                },
1407            ),
1408            event_time: zx::MonotonicInstant::get(),
1409            handled: input_device::Handled::No,
1410            trace_id: None,
1411        }
1412    }
1413
1414    #[fasync::run_singlethreaded(test)]
1415    async fn run_mixed_event_types_dispatched_correctly() {
1416        // Mouse Handler (Specific Interest: Mouse)
1417        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1418        let mouse_handler =
1419            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1420
1421        // Fake Handler (Specific Interest: Fake)
1422        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1423        let fake_handler =
1424            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1425
1426        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1427            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1428                .add_handler(mouse_handler)
1429                .add_handler(fake_handler)
1430                .into_components();
1431
1432        // Run the pipeline logic
1433        InputPipeline::run(pipeline_receiver, handlers);
1434
1435        // Create events
1436        let mouse_event_1 = create_mouse_event(1.0, 1.0);
1437        let mouse_event_2 = create_mouse_event(2.0, 2.0);
1438        let mouse_event_3 = create_mouse_event(3.0, 3.0);
1439
1440        let fake_event_1 = input_device::InputEvent {
1441            device_event: input_device::InputDeviceEvent::Fake,
1442            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1443            event_time: zx::MonotonicInstant::get(),
1444            handled: input_device::Handled::No,
1445            trace_id: None,
1446        };
1447
1448        // Send mixed batch: [Mouse, Mouse, Fake, Mouse]
1449        // This should result in 3 chunks: [Mouse, Mouse], [Fake], [Mouse]
1450        let mixed_batch = vec![
1451            mouse_event_1.clone(),
1452            mouse_event_2.clone(),
1453            fake_event_1.clone(),
1454            mouse_event_3.clone(),
1455        ];
1456        pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1457
1458        // Verify Mouse Handler received M1, M2, and then M3
1459        assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1460        assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1461        assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1462
1463        // Verify Fake Handler received F1
1464        assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1465    }
1466}