Skip to main content

input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, Transport, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use focus_chain_provider::FocusChainProviderPublisher;
13use fuchsia_component::directory::AsRefDirectory;
14use fuchsia_fs::directory::{WatchEvent, Watcher};
15use fuchsia_inspect::NumericProperty;
16use fuchsia_inspect::health::Reporter;
17use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
18use futures::future::LocalBoxFuture;
19use futures::lock::Mutex;
20use futures::{StreamExt, TryStreamExt};
21use itertools::Itertools;
22use metrics_registry::*;
23use std::collections::HashMap;
24use std::path::PathBuf;
25use std::rc::Rc;
26use std::sync::atomic::{AtomicU32, Ordering};
27use std::sync::{Arc, LazyLock};
28use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
29
30/// Use a self incremental u32 unique id for device_id.
31///
32/// device id start from 10 to avoid conflict with default devices in Starnix.
33/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
34/// use default devices to deliver events from physical devices until we have
35/// API to expose device changes to UI clients.
36static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
37
38/// Each time this function is invoked, it returns the current value of its
39/// internal counter (serving as a unique id for device_id) and then increments
40/// that counter in preparation for the next call.
41fn get_next_device_id() -> u32 {
42    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
43}
44
45type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
46
47/// An [`InputDeviceBindingHashMap`] maps an input device to one or more InputDeviceBindings.
48/// It uses unique device id as key.
49pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
50
51/// An input pipeline assembly.
52///
53/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
54/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
55/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
56/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
57///
58/// # Implementation notes
59///
60/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
61/// handlers are connected together using async queues.  This allows fully streamed processing of
62/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
63/// without an external stimulus.
64pub struct InputPipelineAssembly {
65    /// The top-level sender: send into this queue to inject an event into the input
66    /// pipeline.
67    sender: UnboundedSender<Vec<input_device::InputEvent>>,
68    /// The bottom-level receiver: any events that fall through the entire pipeline can
69    /// be read from this receiver.
70    receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
71
72    /// The input handlers that comprise the input pipeline.
73    handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
74
75    /// The display ownership watcher task.
76    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
77
78    /// The focus listener task.
79    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
80
81    /// The metrics logger.
82    metrics_logger: metrics::MetricsLogger,
83}
84
85impl InputPipelineAssembly {
86    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
87    /// to add new handlers to it.
88    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
89        let (sender, receiver) = mpsc::unbounded();
90        InputPipelineAssembly {
91            sender,
92            receiver,
93            handlers: vec![],
94            metrics_logger,
95            display_ownership_fut: None,
96            focus_listener_fut: None,
97        }
98    }
99
100    /// Adds another [input_handler::BatchInputHandler] into the [InputPipelineAssembly]. The handlers
101    /// are invoked in the order they are added. Returns `Self` for chaining.
102    pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
103        self.handlers.push(handler);
104        self
105    }
106
107    /// Adds all handlers into the assembly in the order they appear in `handlers`.
108    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
109        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
110    }
111
112    pub fn add_display_ownership(
113        mut self,
114        display_ownership_event: zx::Event,
115        input_handlers_node: &fuchsia_inspect::Node,
116    ) -> InputPipelineAssembly {
117        let h = DisplayOwnership::new(
118            display_ownership_event,
119            input_handlers_node,
120            self.metrics_logger.clone(),
121        );
122        let metrics_logger_clone = self.metrics_logger.clone();
123        let h_clone = h.clone();
124        let sender_clone = self.sender.clone();
125        let display_ownership_fut = Box::pin(async move {
126            h_clone.clone().set_handler_healthy();
127            h_clone.clone()
128                .handle_ownership_change(sender_clone)
129                .await
130                .map_err(|e| {
131                    metrics_logger_clone.log_error(
132                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
133                        std::format!(
134                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
135                        })
136                        .unwrap();
137            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
138        });
139        self.display_ownership_fut = Some(display_ownership_fut);
140        self.add_handler(h)
141    }
142
143    /// Deconstructs the assembly into constituent components, used when constructing
144    /// [InputPipeline].
145    ///
146    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
147    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
148    fn into_components(
149        self,
150    ) -> (
151        UnboundedSender<Vec<input_device::InputEvent>>,
152        UnboundedReceiver<Vec<input_device::InputEvent>>,
153        Vec<Rc<dyn input_handler::BatchInputHandler>>,
154        metrics::MetricsLogger,
155        Option<LocalBoxFuture<'static, ()>>,
156        Option<LocalBoxFuture<'static, ()>>,
157    ) {
158        (
159            self.sender,
160            self.receiver,
161            self.handlers,
162            self.metrics_logger,
163            self.display_ownership_fut,
164            self.focus_listener_fut,
165        )
166    }
167
168    pub fn add_focus_listener(
169        mut self,
170        incoming: &Incoming,
171        focus_chain_publisher: FocusChainProviderPublisher,
172    ) -> Self {
173        let metrics_logger_clone = self.metrics_logger.clone();
174        let incoming2 = incoming.clone();
175        let focus_listener_fut = Box::pin(async move {
176            if let Ok(mut focus_listener) = FocusListener::new(
177                &incoming2,
178                focus_chain_publisher,
179                metrics_logger_clone,
180            )
181            .map_err(|e| {
182                log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
183            }) {
184                // This will await indefinitely and process focus messages in a loop, unless there
185                // is a problem.
186                let _result = focus_listener
187                    .dispatch_focus_changes()
188                    .await
189                    .map(|_| {
190                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
191                    })
192                    .map_err(|e| {
193                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
194                    });
195            }
196        });
197        self.focus_listener_fut = Some(focus_listener_fut);
198        self
199    }
200}
201
202/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
203///
204/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
205/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
206///
207/// # Example
208/// ```
209/// let ime_handler =
210///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
211/// let touch_handler = TouchHandler::new(
212///     scene_manager.session.clone(),
213///     scene_manager.compositor_id,
214///     scene_manager.display_size
215/// ).await?;
216///
217/// let assembly = InputPipelineAssembly::new()
218///     .add_handler(Box::new(ime_handler)),
219///     .add_handler(Box::new(touch_handler)),
220/// let input_pipeline = InputPipeline::new(
221///     vec![
222///         input_device::InputDeviceType::Touch,
223///         input_device::InputDeviceType::Keyboard,
224///     ],
225///     assembly,
226/// );
227/// input_pipeline.handle_input_events().await;
228/// ```
229pub struct InputPipeline {
230    /// The entry point into the input handler pipeline. Incoming input events should
231    /// be inserted into this async queue, and the input pipeline will ensure that they
232    /// are propagated through all the input handlers in the appropriate sequence.
233    pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
234
235    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
236    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
237    device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
238
239    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
240    device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
241
242    /// The types of devices this pipeline supports.
243    input_device_types: Vec<input_device::InputDeviceType>,
244
245    /// The InputDeviceBindings bound to this pipeline.
246    input_device_bindings: InputDeviceBindingHashMap,
247
248    /// This node is bound to the lifetime of this InputPipeline.
249    /// Inspect data will be dumped for this pipeline as long as it exists.
250    inspect_node: fuchsia_inspect::Node,
251
252    /// The metrics logger.
253    metrics_logger: metrics::MetricsLogger,
254
255    /// The feature flags for the input pipeline.
256    pub feature_flags: input_device::InputPipelineFeatureFlags,
257}
258
259impl InputPipeline {
260    fn new_common(
261        input_device_types: Vec<input_device::InputDeviceType>,
262        assembly: InputPipelineAssembly,
263        inspect_node: fuchsia_inspect::Node,
264        feature_flags: input_device::InputPipelineFeatureFlags,
265    ) -> Self {
266        let (
267            pipeline_sender,
268            receiver,
269            handlers,
270            metrics_logger,
271            display_ownership_fut,
272            focus_listener_fut,
273        ) = assembly.into_components();
274
275        let mut handlers_count = handlers.len();
276        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
277        if let Some(fut) = display_ownership_fut {
278            fasync::Task::local(fut).detach();
279            handlers_count += 1;
280        }
281
282        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
283        if let Some(fut) = focus_listener_fut {
284            fasync::Task::local(fut).detach();
285            handlers_count += 1;
286        }
287
288        // Add properties to inspect node
289        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
290        inspect_node.record_uint("handlers_registered", handlers_count as u64);
291        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
292
293        // Initializes all handlers and starts the input pipeline loop.
294        InputPipeline::run(receiver, handlers, metrics_logger.clone());
295
296        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
297        let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
298        InputPipeline {
299            pipeline_sender,
300            device_event_sender,
301            device_event_receiver,
302            input_device_types,
303            input_device_bindings,
304            inspect_node,
305            metrics_logger,
306            feature_flags,
307        }
308    }
309
310    /// Creates a new [`InputPipeline`] for integration testing.
311    /// Unlike a production input pipeline, this pipeline will not monitor
312    /// `/dev/class/input-report` for devices.
313    ///
314    /// # Parameters
315    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
316    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
317    pub fn new_for_test(
318        input_device_types: Vec<input_device::InputDeviceType>,
319        assembly: InputPipelineAssembly,
320    ) -> Self {
321        let inspector = fuchsia_inspect::Inspector::default();
322        let root = inspector.root();
323        let test_node = root.create_child("input_pipeline");
324        Self::new_common(
325            input_device_types,
326            assembly,
327            test_node,
328            input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
329        )
330    }
331
332    /// Creates a new [`InputPipeline`] for production use.
333    ///
334    /// # Parameters
335    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
336    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
337    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
338    pub fn new(
339        incoming: &Incoming,
340        input_device_types: Vec<input_device::InputDeviceType>,
341        assembly: InputPipelineAssembly,
342        inspect_node: fuchsia_inspect::Node,
343        feature_flags: input_device::InputPipelineFeatureFlags,
344        metrics_logger: metrics::MetricsLogger,
345    ) -> Result<Self, Error> {
346        let input_pipeline =
347            Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
348        let input_device_types = input_pipeline.input_device_types.clone();
349        let input_event_sender = input_pipeline.device_event_sender.clone();
350        let input_device_bindings = input_pipeline.input_device_bindings.clone();
351        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
352        let feature_flags = input_pipeline.feature_flags.clone();
353        let incoming = incoming.clone();
354        // This intentionally uses the [`fuchsia_async`] task dispatcher instead of
355        // [`crate::Dispatcher`] -- the directory watcher always uses the fuchsia-async dispatcher.
356        // This is fine for performance because the actual event dispatch is still configured to
357        // run on [`crate::Dispatcher`].
358        fasync::Task::local(async move {
359            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
360            // that send InputEvents to `input_event_receiver`.
361            match async {
362                let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
363                incoming.as_ref_directory().open(
364                    input_device::INPUT_REPORT_PATH,
365                    fio::PERM_READABLE,
366                    server.into()
367                )
368                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
369                let device_watcher =
370                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
371                Self::watch_for_devices(
372                    device_watcher,
373                    dir_proxy,
374                    input_device_types,
375                    input_event_sender,
376                    input_device_bindings,
377                    &devices_node,
378                    false, /* break_on_idle */
379                    feature_flags,
380                    metrics_logger.clone(),
381                )
382                .await
383                .context("failed to watch for devices")
384            }
385            .await
386            {
387                Ok(()) => {}
388                Err(err) => {
389                    // This error is usually benign in tests: it means that the setup does not
390                    // support dynamic device discovery. Almost no tests support dynamic
391                    // device discovery, and they also do not need those.
392                    metrics_logger.log_warn(
393                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
394                        std::format!(
395                            "Input pipeline is unable to watch for new input devices: {:?}",
396                            err
397                        ));
398                }
399            }
400        }).detach();
401
402        Ok(input_pipeline)
403    }
404
405    /// Gets the input device bindings.
406    pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
407        &self.input_device_bindings
408    }
409
410    /// Gets the input device sender: this is the channel that should be cloned
411    /// and used for injecting events from the drivers into the input pipeline.
412    pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
413        &self.device_event_sender
414    }
415
416    /// Gets a list of input device types supported by this input pipeline.
417    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
418        &self.input_device_types
419    }
420
421    /// Forwards all input events into the input pipeline.
422    pub async fn handle_input_events(mut self) {
423        let metrics_logger_clone = self.metrics_logger.clone();
424        while let Some(input_event) = self.device_event_receiver.next().await {
425            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
426                metrics_logger_clone.log_error(
427                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
428                    std::format!("could not forward event from driver: {:?}", &e));
429            }
430        }
431
432        metrics_logger_clone.log_error(
433            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
434            "Input pipeline stopped handling input events.".to_string(),
435        );
436    }
437
438    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
439    /// if new devices match a type in `device_types`.
440    ///
441    /// # Parameters
442    /// - `device_watcher`: Watches the input report directory for new devices.
443    /// - `dir_proxy`: The directory containing InputDevice connections.
444    /// - `device_types`: The types of devices to watch for.
445    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
446    /// - `bindings`: Holds all the InputDeviceBindings
447    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
448    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
449    /// - `metrics_logger`: The metrics logger.
450    ///
451    /// # Errors
452    /// If the input report directory or a file within it cannot be read.
453    async fn watch_for_devices(
454        mut device_watcher: Watcher,
455        dir_proxy: fio::DirectoryProxy,
456        device_types: Vec<input_device::InputDeviceType>,
457        input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
458        bindings: InputDeviceBindingHashMap,
459        input_devices_node: &fuchsia_inspect::Node,
460        break_on_idle: bool,
461        feature_flags: input_device::InputPipelineFeatureFlags,
462        metrics_logger: metrics::MetricsLogger,
463    ) -> Result<(), Error> {
464        // Add non-static properties to inspect node.
465        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
466        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
467        while let Some(msg) = device_watcher.try_next().await? {
468            if let Ok(filename) = msg.filename.into_os_string().into_string() {
469                if filename == "." {
470                    continue;
471                }
472
473                let pathbuf = PathBuf::from(filename.clone());
474                match msg.event {
475                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
476                        log::info!("found input device {}", filename);
477                        devices_discovered.add(1);
478                        let device_proxy =
479                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
480                        add_device_bindings(
481                            &device_types,
482                            &filename,
483                            device_proxy,
484                            &input_event_sender,
485                            &bindings,
486                            get_next_device_id(),
487                            input_devices_node,
488                            Some(&devices_connected),
489                            feature_flags.clone(),
490                            metrics_logger.clone(),
491                        )
492                        .await;
493                    }
494                    WatchEvent::IDLE => {
495                        if break_on_idle {
496                            break;
497                        }
498                    }
499                    _ => (),
500                }
501            }
502        }
503        // Ensure inspect properties persist for debugging if device watch loop ends.
504        input_devices_node.record(devices_discovered);
505        input_devices_node.record(devices_connected);
506        Err(format_err!("Input pipeline stopped watching for new input devices."))
507    }
508
509    /// Handles the incoming InputDeviceRegistryRequestStream.
510    ///
511    /// This method will end when the request stream is closed. If the stream closes with an
512    /// error the error will be returned in the Result.
513    ///
514    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
515    ///
516    /// # Parameters
517    /// - `stream`: The stream of InputDeviceRegistryRequests.
518    /// - `device_types`: The types of devices to watch for.
519    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
520    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
521    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
522    /// - `metrics_logger`: The metrics logger.
523    pub async fn handle_input_device_registry_request_stream(
524        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
525        device_types: &Vec<input_device::InputDeviceType>,
526        input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
527        bindings: &InputDeviceBindingHashMap,
528        input_devices_node: &fuchsia_inspect::Node,
529        feature_flags: input_device::InputPipelineFeatureFlags,
530        metrics_logger: metrics::MetricsLogger,
531    ) -> Result<(), Error> {
532        while let Some(request) = stream
533            .try_next()
534            .await
535            .context("Error handling input device registry request stream")?
536        {
537            match request {
538                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
539                    device,
540                    ..
541                } => {
542                    // Add a binding if the device is a type being tracked
543                    let device = fidl_next::ClientEnd::<
544                        fidl_next_fuchsia_input_report::InputDevice,
545                        zx::Channel,
546                    >::from_untyped(device.into_channel());
547                    let device = Dispatcher::client_from_zx_channel(device);
548                    let device = device.spawn();
549                    let device_id = get_next_device_id();
550
551                    add_device_bindings(
552                        device_types,
553                        &format!("input-device-registry-{}", device_id),
554                        device,
555                        input_event_sender,
556                        bindings,
557                        device_id,
558                        input_devices_node,
559                        None,
560                        feature_flags.clone(),
561                        metrics_logger.clone(),
562                    )
563                    .await;
564                }
565                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
566                    device,
567                    responder,
568                    .. } => {
569                    // Add a binding if the device is a type being tracked
570                    let device = fidl_next::ClientEnd::<
571                        fidl_next_fuchsia_input_report::InputDevice,
572                        zx::Channel,
573                    >::from_untyped(device.into_channel());
574                    let device = Dispatcher::client_from_zx_channel(device);
575                    let device = device.spawn();
576                    let device_id = get_next_device_id();
577
578                    add_device_bindings(
579                        device_types,
580                        &format!("input-device-registry-{}", device_id),
581                        device,
582                        input_event_sender,
583                        bindings,
584                        device_id,
585                        input_devices_node,
586                        None,
587                        feature_flags.clone(),
588                        metrics_logger.clone(),
589                    )
590                    .await;
591
592                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
593                        device_id: Some(device_id),
594                        ..Default::default()
595                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
596                }
597            }
598        }
599
600        Ok(())
601    }
602
603    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
604    fn run(
605        mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
606        handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
607        metrics_logger: metrics::MetricsLogger,
608    ) {
609        Dispatcher::spawn_local(async move {
610            for handler in &handlers {
611                handler.clone().set_handler_healthy();
612            }
613
614            use input_device::InputEventType;
615            use std::collections::HashMap;
616
617            // Pre-compute handler lists for each event type.
618            let mut handlers_by_type: HashMap<
619                InputEventType,
620                Vec<Rc<dyn input_handler::BatchInputHandler>>,
621            > = HashMap::new();
622
623            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
624            let event_types = vec![
625                InputEventType::Keyboard,
626                InputEventType::LightSensor,
627                InputEventType::ConsumerControls,
628                InputEventType::Mouse,
629                InputEventType::TouchScreen,
630                InputEventType::Touchpad,
631                #[cfg(test)]
632                InputEventType::Fake,
633            ];
634
635            for event_type in event_types {
636                let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
637                    .iter()
638                    .filter(|h| h.interest().contains(&event_type))
639                    .cloned()
640                    .collect();
641                handlers_by_type.insert(event_type, handlers_for_type);
642            }
643
644            while let Some(events) = receiver.next().await {
645                if events.is_empty() {
646                    continue;
647                }
648
649                let mut groups_seen = 0;
650                let events = events
651                    .into_iter()
652                    .chunk_by(|e| InputEventType::from(&e.device_event));
653                let events = events.into_iter().map(|(k, v)| (k, v.collect::<Vec<_>>()));
654                for (event_type, event_group) in events {
655                    groups_seen += 1;
656                    if groups_seen == 2 {
657                        metrics_logger.log_error(
658                                InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
659                                "it is not recommended to contain multiple types of events in 1 send".to_string(),
660                            );
661                    }
662                    let mut events_in_group = event_group;
663
664                    // Get pre-computed handlers for this event type.
665                    let handlers = handlers_by_type.get(&event_type).unwrap();
666
667                    for handler in handlers {
668                        events_in_group =
669                            handler.clone().handle_input_events(events_in_group).await;
670                    }
671
672                    for event in events_in_group {
673                        if event.handled == input_device::Handled::No {
674                            log::warn!("unhandled input event: {:?}", &event);
675                        }
676                        if let Some(trace_id) = event.trace_id {
677                            fuchsia_trace::flow_end!(
678                                "input",
679                                "event_in_input_pipeline",
680                                trace_id.into()
681                            );
682                        }
683                    }
684                }
685            }
686            for handler in &handlers {
687                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
688            }
689            panic!("Runner task is not supposed to terminate.")
690        }).detach();
691    }
692}
693
694/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
695///
696/// # Parameters
697/// - `device_types`: The types of devices to watch for.
698/// - `device_proxy`: A proxy to the input device.
699/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
700/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
701/// - `device_id`: The device id of the associated bindings.
702/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
703///
704/// # Note
705/// This will create multiple bindings, in the case where
706/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
707///   with multiple table fields populated, and
708/// * multiple populated table fields correspond to device types present in `device_types`
709///
710/// This is used, for example, to support the Atlas touchpad. In that case, a single
711/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
712/// a `fuchsia.input.report.TouchDescriptor`.
713async fn add_device_bindings(
714    device_types: &Vec<input_device::InputDeviceType>,
715    filename: &String,
716    device_proxy: fidl_next::Client<fidl_next_fuchsia_input_report::InputDevice, Transport>,
717    input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
718    bindings: &InputDeviceBindingHashMap,
719    device_id: u32,
720    input_devices_node: &fuchsia_inspect::Node,
721    devices_connected: Option<&fuchsia_inspect::UintProperty>,
722    feature_flags: InputPipelineFeatureFlags,
723    metrics_logger: metrics::MetricsLogger,
724) {
725    let mut matched_device_types = vec![];
726    if let Ok(res) = device_proxy.get_descriptor().await {
727        for device_type in device_types {
728            if input_device::is_device_type(&res.descriptor, *device_type).await {
729                matched_device_types.push(device_type);
730                match devices_connected {
731                    Some(dev_connected) => {
732                        let _ = dev_connected.add(1);
733                    }
734                    None => (),
735                };
736            }
737        }
738        if matched_device_types.is_empty() {
739            log::info!(
740                "device {} did not match any supported device types: {:?}",
741                filename,
742                device_types
743            );
744            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
745            let mut health = fuchsia_inspect::health::Node::new(&device_node);
746            health.set_unhealthy("Unsupported device type.");
747            device_node.record(health);
748            input_devices_node.record(device_node);
749            return;
750        }
751    } else {
752        metrics_logger.clone().log_error(
753            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
754            std::format!("cannot bind device {} without a device descriptor", filename),
755        );
756        return;
757    }
758
759    log::info!(
760        "binding {} to device types: {}",
761        filename,
762        matched_device_types
763            .iter()
764            .fold(String::new(), |device_types_string, device_type| device_types_string
765                + &format!("{:?}, ", device_type))
766    );
767
768    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
769    for device_type in matched_device_types {
770        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
771        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
772        //
773        // There's no conflict in having multiple bindings read from the same node,
774        // since:
775        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
776        // * the device driver will copy each incoming report to each connected reader.
777        //
778        // This does mean that reports from the Atlas touchpad device get read twice
779        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
780        // is operating in mouse mode or touchpad mode.
781        //
782        // This hasn't been an issue because:
783        // * Semantically: things are fine, because each binding discards irrelevant reports.
784        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
785        // * Performance wise: things are fine, because the data rate of the touchpad is low
786        //   (125 HZ).
787        //
788        // If we add additional cases where bindings share an underlying `input-report` node,
789        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
790        let proxy = device_proxy.clone();
791        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
792        match input_device::get_device_binding(
793            *device_type,
794            proxy,
795            device_id,
796            input_event_sender.clone(),
797            device_node,
798            feature_flags.clone(),
799            metrics_logger.clone(),
800        )
801        .await
802        {
803            Ok(binding) => new_bindings.push(binding),
804            Err(e) => {
805                metrics_logger.log_error(
806                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
807                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
808                );
809            }
810        }
811    }
812
813    if !new_bindings.is_empty() {
814        let mut bindings = bindings.lock().await;
815        bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822    use crate::input_device::{InputDeviceBinding, InputEventType};
823    use crate::utils::Position;
824    use crate::{
825        fake_input_device_binding, mouse_binding, mouse_model_database,
826        observe_fake_events_input_handler,
827    };
828    use async_trait::async_trait;
829    use diagnostics_assertions::AnyProperty;
830    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
831    use fuchsia_async as fasync;
832    use futures::FutureExt;
833    use pretty_assertions::assert_eq;
834    use rand::Rng;
835    use std::collections::HashSet;
836    use vfs::{pseudo_directory, service as pseudo_fs_service};
837
838    const COUNTS_PER_MM: u32 = 12;
839
840    /// Returns the InputEvent sent over `sender`.
841    ///
842    /// # Parameters
843    /// - `sender`: The channel to send the InputEvent over.
844    fn send_input_event(
845        sender: UnboundedSender<Vec<input_device::InputEvent>>,
846    ) -> Vec<input_device::InputEvent> {
847        let mut rng = rand::rng();
848        let offset =
849            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
850        let input_event = input_device::InputEvent {
851            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
852                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
853                    millimeters: Position {
854                        x: offset.x / COUNTS_PER_MM as f32,
855                        y: offset.y / COUNTS_PER_MM as f32,
856                    },
857                }),
858                None, /* wheel_delta_v */
859                None, /* wheel_delta_h */
860                mouse_binding::MousePhase::Move,
861                HashSet::new(),
862                HashSet::new(),
863                None, /* is_precision_scroll */
864                None, /* wake_lease */
865            )),
866            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
867                mouse_binding::MouseDeviceDescriptor {
868                    device_id: 1,
869                    absolute_x_range: None,
870                    absolute_y_range: None,
871                    wheel_v_range: None,
872                    wheel_h_range: None,
873                    buttons: None,
874                    counts_per_mm: COUNTS_PER_MM,
875                },
876            ),
877            event_time: zx::MonotonicInstant::get(),
878            handled: input_device::Handled::No,
879            trace_id: None,
880        };
881        match sender.unbounded_send(vec![input_event.clone()]) {
882            Err(_) => assert!(false),
883            _ => {}
884        }
885
886        vec![input_event]
887    }
888
889    /// Returns a MouseDescriptor on an InputDeviceRequest.
890    ///
891    /// # Parameters
892    /// - `input_device_request`: The request to handle.
893    fn handle_input_device_request(
894        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
895    ) {
896        match input_device_request {
897            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
898                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
899                    device_information: None,
900                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
901                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
902                            movement_x: None,
903                            movement_y: None,
904                            scroll_v: None,
905                            scroll_h: None,
906                            buttons: Some(vec![0]),
907                            position_x: None,
908                            position_y: None,
909                            ..Default::default()
910                        }),
911                        ..Default::default()
912                    }),
913                    sensor: None,
914                    touch: None,
915                    keyboard: None,
916                    consumer_control: None,
917                    ..Default::default()
918                });
919            }
920            _ => {}
921        }
922    }
923
924    /// Tests that an input pipeline handles events from multiple devices.
925    #[fasync::run_singlethreaded(test)]
926    async fn multiple_devices_single_handler() {
927        // Create two fake device bindings.
928        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
929        let first_device_binding =
930            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
931        let second_device_binding =
932            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
933
934        // Create a fake input handler.
935        let (handler_event_sender, mut handler_event_receiver) =
936            futures::channel::mpsc::channel(100);
937        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
938            handler_event_sender,
939        );
940
941        // Build the input pipeline.
942        let (sender, receiver, handlers, _, _, _) =
943            InputPipelineAssembly::new(metrics::MetricsLogger::default())
944                .add_handler(input_handler)
945                .into_components();
946        let inspector = fuchsia_inspect::Inspector::default();
947        let test_node = inspector.root().create_child("input_pipeline");
948        let input_pipeline = InputPipeline {
949            pipeline_sender: sender,
950            device_event_sender,
951            device_event_receiver,
952            input_device_types: vec![],
953            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
954            inspect_node: test_node,
955            metrics_logger: metrics::MetricsLogger::default(),
956            feature_flags: input_device::InputPipelineFeatureFlags::default(),
957        };
958        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
959
960        // Send an input event from each device.
961        let first_device_events = send_input_event(first_device_binding.input_event_sender());
962        let second_device_events = send_input_event(second_device_binding.input_event_sender());
963
964        // Run the pipeline.
965        fasync::Task::local(async {
966            input_pipeline.handle_input_events().await;
967        })
968        .detach();
969
970        // Assert the handler receives the events.
971        let first_handled_event = handler_event_receiver.next().await;
972        assert_eq!(first_handled_event, first_device_events.into_iter().next());
973
974        let second_handled_event = handler_event_receiver.next().await;
975        assert_eq!(second_handled_event, second_device_events.into_iter().next());
976    }
977
978    /// Tests that an input pipeline handles events through multiple input handlers.
979    #[fasync::run_singlethreaded(test)]
980    async fn single_device_multiple_handlers() {
981        // Create two fake device bindings.
982        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
983        let input_device_binding =
984            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
985
986        // Create two fake input handlers.
987        let (first_handler_event_sender, mut first_handler_event_receiver) =
988            futures::channel::mpsc::channel(100);
989        let first_input_handler =
990            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
991                first_handler_event_sender,
992            );
993        let (second_handler_event_sender, mut second_handler_event_receiver) =
994            futures::channel::mpsc::channel(100);
995        let second_input_handler =
996            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
997                second_handler_event_sender,
998            );
999
1000        // Build the input pipeline.
1001        let (sender, receiver, handlers, _, _, _) =
1002            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1003                .add_handler(first_input_handler)
1004                .add_handler(second_input_handler)
1005                .into_components();
1006        let inspector = fuchsia_inspect::Inspector::default();
1007        let test_node = inspector.root().create_child("input_pipeline");
1008        let input_pipeline = InputPipeline {
1009            pipeline_sender: sender,
1010            device_event_sender,
1011            device_event_receiver,
1012            input_device_types: vec![],
1013            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
1014            inspect_node: test_node,
1015            metrics_logger: metrics::MetricsLogger::default(),
1016            feature_flags: input_device::InputPipelineFeatureFlags::default(),
1017        };
1018        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1019
1020        // Send an input event.
1021        let input_events = send_input_event(input_device_binding.input_event_sender());
1022
1023        // Run the pipeline.
1024        fasync::Task::local(async {
1025            input_pipeline.handle_input_events().await;
1026        })
1027        .detach();
1028
1029        // Assert both handlers receive the event.
1030        let expected_event = input_events.into_iter().next();
1031        let first_handler_event = first_handler_event_receiver.next().await;
1032        assert_eq!(first_handler_event, expected_event);
1033        let second_handler_event = second_handler_event_receiver.next().await;
1034        assert_eq!(second_handler_event, expected_event);
1035    }
1036
1037    /// Tests that a single mouse device binding is created for the one input device in the
1038    /// input report directory.
1039    #[fasync::run_singlethreaded(test)]
1040    async fn watch_devices_one_match_exists() {
1041        // Create a file in a pseudo directory that represents an input device.
1042        let mut count: i8 = 0;
1043        let dir = pseudo_directory! {
1044            "file_name" => pseudo_fs_service::host(
1045                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1046                    async move {
1047                        while count < 3 {
1048                            if let Some(input_device_request) =
1049                                request_stream.try_next().await.unwrap()
1050                            {
1051                                handle_input_device_request(input_device_request);
1052                                count += 1;
1053                            }
1054                        }
1055
1056                    }.boxed()
1057                },
1058            )
1059        };
1060
1061        // Create a Watcher on the pseudo directory.
1062        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1063        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1064        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1065        // proxy to get connections to input devices.
1066        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1067
1068        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1069        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1070        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1071
1072        let inspector = fuchsia_inspect::Inspector::default();
1073        let test_node = inspector.root().create_child("input_pipeline");
1074        test_node.record_string(
1075            "supported_input_devices",
1076            supported_device_types.clone().iter().join(", "),
1077        );
1078        let input_devices = test_node.create_child("input_devices");
1079        // Assert that inspect tree is initialized with no devices.
1080        diagnostics_assertions::assert_data_tree!(inspector, root: {
1081            input_pipeline: {
1082                supported_input_devices: "Mouse",
1083                input_devices: {}
1084            }
1085        });
1086
1087        let _ = InputPipeline::watch_for_devices(
1088            device_watcher,
1089            dir_proxy_for_pipeline,
1090            supported_device_types,
1091            input_event_sender,
1092            bindings.clone(),
1093            &input_devices,
1094            true, /* break_on_idle */
1095            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1096            metrics::MetricsLogger::default(),
1097        )
1098        .await;
1099
1100        // Assert that one mouse device with accurate device id was found.
1101        let bindings_hashmap = bindings.lock().await;
1102        assert_eq!(bindings_hashmap.len(), 1);
1103        let bindings_vector = bindings_hashmap.get(&10);
1104        assert!(bindings_vector.is_some());
1105        assert_eq!(bindings_vector.unwrap().len(), 1);
1106        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1107        assert!(boxed_mouse_binding.is_some());
1108        assert_eq!(
1109            boxed_mouse_binding.unwrap().get_device_descriptor(),
1110            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1111                device_id: 10,
1112                absolute_x_range: None,
1113                absolute_y_range: None,
1114                wheel_v_range: None,
1115                wheel_h_range: None,
1116                buttons: Some(vec![0]),
1117                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1118            })
1119        );
1120
1121        // Assert that inspect tree reflects new device discovered and connected.
1122        diagnostics_assertions::assert_data_tree!(inspector, root: {
1123            input_pipeline: {
1124                supported_input_devices: "Mouse",
1125                input_devices: {
1126                    devices_discovered: 1u64,
1127                    devices_connected: 1u64,
1128                    "file_name_Mouse": contains {
1129                        reports_received_count: 0u64,
1130                        reports_filtered_count: 0u64,
1131                        events_generated: 0u64,
1132                        last_received_timestamp_ns: 0u64,
1133                        last_generated_timestamp_ns: 0u64,
1134                        "fuchsia.inspect.Health": {
1135                            status: "OK",
1136                            // Timestamp value is unpredictable and not relevant in this context,
1137                            // so we only assert that the property is present.
1138                            start_timestamp_nanos: AnyProperty
1139                        },
1140                    }
1141                }
1142            }
1143        });
1144    }
1145
1146    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1147    /// but only a mouse exists.
1148    #[fasync::run_singlethreaded(test)]
1149    async fn watch_devices_no_matches_exist() {
1150        // Create a file in a pseudo directory that represents an input device.
1151        let mut count: i8 = 0;
1152        let dir = pseudo_directory! {
1153            "file_name" => pseudo_fs_service::host(
1154                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1155                    async move {
1156                        while count < 1 {
1157                            if let Some(input_device_request) =
1158                                request_stream.try_next().await.unwrap()
1159                            {
1160                                handle_input_device_request(input_device_request);
1161                                count += 1;
1162                            }
1163                        }
1164
1165                    }.boxed()
1166                },
1167            )
1168        };
1169
1170        // Create a Watcher on the pseudo directory.
1171        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1172        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1173        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1174        // proxy to get connections to input devices.
1175        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1176
1177        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1178        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1179        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1180
1181        let inspector = fuchsia_inspect::Inspector::default();
1182        let test_node = inspector.root().create_child("input_pipeline");
1183        test_node.record_string(
1184            "supported_input_devices",
1185            supported_device_types.clone().iter().join(", "),
1186        );
1187        let input_devices = test_node.create_child("input_devices");
1188        // Assert that inspect tree is initialized with no devices.
1189        diagnostics_assertions::assert_data_tree!(inspector, root: {
1190            input_pipeline: {
1191                supported_input_devices: "Keyboard",
1192                input_devices: {}
1193            }
1194        });
1195
1196        let _ = InputPipeline::watch_for_devices(
1197            device_watcher,
1198            dir_proxy_for_pipeline,
1199            supported_device_types,
1200            input_event_sender,
1201            bindings.clone(),
1202            &input_devices,
1203            true, /* break_on_idle */
1204            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1205            metrics::MetricsLogger::default(),
1206        )
1207        .await;
1208
1209        // Assert that no devices were found.
1210        let bindings = bindings.lock().await;
1211        assert_eq!(bindings.len(), 0);
1212
1213        // Assert that inspect tree reflects new device discovered, but not connected.
1214        diagnostics_assertions::assert_data_tree!(inspector, root: {
1215            input_pipeline: {
1216                supported_input_devices: "Keyboard",
1217                input_devices: {
1218                    devices_discovered: 1u64,
1219                    devices_connected: 0u64,
1220                    "file_name_Unsupported": {
1221                        "fuchsia.inspect.Health": {
1222                            status: "UNHEALTHY",
1223                            message: "Unsupported device type.",
1224                            // Timestamp value is unpredictable and not relevant in this context,
1225                            // so we only assert that the property is present.
1226                            start_timestamp_nanos: AnyProperty
1227                        },
1228                    }
1229                }
1230            }
1231        });
1232    }
1233
1234    /// Tests that a single keyboard device binding is created for the input device registered
1235    /// through InputDeviceRegistry.
1236    #[fasync::run_singlethreaded(test)]
1237    async fn handle_input_device_registry_request_stream() {
1238        let (input_device_registry_proxy, input_device_registry_request_stream) =
1239            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1240        let (input_device_client_end, mut input_device_request_stream) =
1241            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1242
1243        let device_types = vec![input_device::InputDeviceType::Mouse];
1244        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1245        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1246
1247        // Handle input device requests.
1248        let mut count: i8 = 0;
1249        fasync::Task::local(async move {
1250            // Register a device.
1251            let _ = input_device_registry_proxy.register(input_device_client_end);
1252
1253            while count < 3 {
1254                if let Some(input_device_request) =
1255                    input_device_request_stream.try_next().await.unwrap()
1256                {
1257                    handle_input_device_request(input_device_request);
1258                    count += 1;
1259                }
1260            }
1261
1262            // End handle_input_device_registry_request_stream() by taking the event stream.
1263            input_device_registry_proxy.take_event_stream();
1264        })
1265        .detach();
1266
1267        let inspector = fuchsia_inspect::Inspector::default();
1268        let test_node = inspector.root().create_child("input_pipeline");
1269
1270        // Start listening for InputDeviceRegistryRequests.
1271        let bindings_clone = bindings.clone();
1272        let _ = InputPipeline::handle_input_device_registry_request_stream(
1273            input_device_registry_request_stream,
1274            &device_types,
1275            &input_event_sender,
1276            &bindings_clone,
1277            &test_node,
1278            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1279            metrics::MetricsLogger::default(),
1280        )
1281        .await;
1282
1283        // Assert that a device was registered.
1284        let bindings = bindings.lock().await;
1285        assert_eq!(bindings.len(), 1);
1286    }
1287
1288    // Tests that correct properties are added to inspect node when InputPipeline is created.
1289    #[fasync::run_singlethreaded(test)]
1290    async fn check_inspect_node_has_correct_properties() {
1291        let device_types = vec![
1292            input_device::InputDeviceType::Touch,
1293            input_device::InputDeviceType::ConsumerControls,
1294        ];
1295        let inspector = fuchsia_inspect::Inspector::default();
1296        let test_node = inspector.root().create_child("input_pipeline");
1297        // Create fake input handler for assembly
1298        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1299            futures::channel::mpsc::channel(100);
1300        let fake_input_handler =
1301            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1302                fake_handler_event_sender,
1303            );
1304        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1305            .add_handler(fake_input_handler);
1306        let _test_input_pipeline = InputPipeline::new(
1307            &Incoming::new(),
1308            device_types,
1309            assembly,
1310            test_node,
1311            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1312            metrics::MetricsLogger::default(),
1313        );
1314        diagnostics_assertions::assert_data_tree!(inspector, root: {
1315            input_pipeline: {
1316                supported_input_devices: "Touch, ConsumerControls",
1317                handlers_registered: 1u64,
1318                handlers_healthy: 1u64,
1319                input_devices: {}
1320            }
1321        });
1322    }
1323
1324    struct SpecificInterestFakeHandler {
1325        interest_types: Vec<input_device::InputEventType>,
1326        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1327    }
1328
1329    impl SpecificInterestFakeHandler {
1330        pub fn new(
1331            interest_types: Vec<input_device::InputEventType>,
1332            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1333        ) -> Rc<Self> {
1334            Rc::new(SpecificInterestFakeHandler {
1335                interest_types,
1336                event_sender: std::cell::RefCell::new(event_sender),
1337            })
1338        }
1339    }
1340
1341    impl Handler for SpecificInterestFakeHandler {
1342        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1343        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1344        fn get_name(&self) -> &'static str {
1345            "SpecificInterestFakeHandler"
1346        }
1347
1348        fn interest(&self) -> Vec<input_device::InputEventType> {
1349            self.interest_types.clone()
1350        }
1351    }
1352
1353    #[async_trait(?Send)]
1354    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1355        async fn handle_input_event(
1356            self: Rc<Self>,
1357            input_event: input_device::InputEvent,
1358        ) -> Vec<input_device::InputEvent> {
1359            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1360                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1361                Ok(_) => {}
1362            }
1363            vec![input_event]
1364        }
1365    }
1366
1367    #[fasync::run_singlethreaded(test)]
1368    async fn run_only_sends_events_to_interested_handlers() {
1369        // Mouse Handler (Specific Interest: Mouse)
1370        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1371        let mouse_handler =
1372            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1373
1374        // Fake Handler (Specific Interest: Fake)
1375        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1376        let fake_handler =
1377            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1378
1379        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1380            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1381                .add_handler(mouse_handler)
1382                .add_handler(fake_handler)
1383                .into_components();
1384
1385        // Run the pipeline logic
1386        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1387
1388        // Create a Fake event
1389        let fake_event = input_device::InputEvent {
1390            device_event: input_device::InputDeviceEvent::Fake,
1391            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1392            event_time: zx::MonotonicInstant::get(),
1393            handled: input_device::Handled::No,
1394            trace_id: None,
1395        };
1396
1397        // Send the Fake event
1398        pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1399
1400        // Verify Fake Handler received it
1401        let received_by_fake = fake_receiver.next().await;
1402        assert_eq!(received_by_fake, Some(fake_event));
1403
1404        // Verify Mouse Handler did NOT receive it
1405        assert!(mouse_receiver.try_next().is_err());
1406    }
1407
1408    fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1409        input_device::InputEvent {
1410            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1411                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1412                    millimeters: Position { x, y },
1413                }),
1414                None,
1415                None,
1416                mouse_binding::MousePhase::Move,
1417                HashSet::new(),
1418                HashSet::new(),
1419                None,
1420                None,
1421            )),
1422            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1423                mouse_binding::MouseDeviceDescriptor {
1424                    device_id: 1,
1425                    absolute_x_range: None,
1426                    absolute_y_range: None,
1427                    wheel_v_range: None,
1428                    wheel_h_range: None,
1429                    buttons: None,
1430                    counts_per_mm: 1,
1431                },
1432            ),
1433            event_time: zx::MonotonicInstant::get(),
1434            handled: input_device::Handled::No,
1435            trace_id: None,
1436        }
1437    }
1438
1439    #[fasync::run_singlethreaded(test)]
1440    async fn run_mixed_event_types_dispatched_correctly() {
1441        // Mouse Handler (Specific Interest: Mouse)
1442        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1443        let mouse_handler =
1444            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1445
1446        // Fake Handler (Specific Interest: Fake)
1447        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1448        let fake_handler =
1449            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1450
1451        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1452            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1453                .add_handler(mouse_handler)
1454                .add_handler(fake_handler)
1455                .into_components();
1456
1457        // Run the pipeline logic
1458        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1459
1460        // Create events
1461        let mouse_event_1 = create_mouse_event(1.0, 1.0);
1462        let mouse_event_2 = create_mouse_event(2.0, 2.0);
1463        let mouse_event_3 = create_mouse_event(3.0, 3.0);
1464
1465        let fake_event_1 = input_device::InputEvent {
1466            device_event: input_device::InputDeviceEvent::Fake,
1467            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1468            event_time: zx::MonotonicInstant::get(),
1469            handled: input_device::Handled::No,
1470            trace_id: None,
1471        };
1472
1473        // Send mixed batch: [Mouse, Mouse, Fake, Mouse]
1474        // This should result in 3 chunks: [Mouse, Mouse], [Fake], [Mouse]
1475        let mixed_batch = vec![
1476            mouse_event_1.clone(),
1477            mouse_event_2.clone(),
1478            fake_event_1.clone(),
1479            mouse_event_3.clone(),
1480        ];
1481        pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1482
1483        // Verify Mouse Handler received M1, M2, and then M3
1484        assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1485        assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1486        assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1487
1488        // Verify Fake Handler received F1
1489        assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1490    }
1491}