Skip to main content

input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, Transport, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use fidl_fuchsia_io as fio;
13use focus_chain_provider::FocusChainProviderPublisher;
14use fuchsia_async as fasync;
15use fuchsia_component::directory::AsRefDirectory;
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use fuchsia_inspect::NumericProperty;
18use fuchsia_inspect::health::Reporter;
19use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
20use futures::future::LocalBoxFuture;
21use futures::lock::Mutex;
22use futures::{StreamExt, TryStreamExt};
23use itertools::Itertools;
24use metrics_registry::*;
25use sorted_vec_map::SortedVecMap;
26use std::path::PathBuf;
27use std::rc::Rc;
28use std::sync::atomic::{AtomicU32, Ordering};
29use std::sync::{Arc, LazyLock};
30use strum::EnumCount;
31
32/// Use a self incremental u32 unique id for device_id.
33///
34/// device id start from 10 to avoid conflict with default devices in Starnix.
35/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
36/// use default devices to deliver events from physical devices until we have
37/// API to expose device changes to UI clients.
38static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
39
40/// Each time this function is invoked, it returns the current value of its
41/// internal counter (serving as a unique id for device_id) and then increments
42/// that counter in preparation for the next call.
43fn get_next_device_id() -> u32 {
44    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
45}
46
47type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
48
49/// An [`InputDeviceBindingMap`] maps an input device to one or more InputDeviceBindings.
50/// It uses unique device id as key.
51pub type InputDeviceBindingMap = Arc<Mutex<SortedVecMap<u32, Vec<BoxedInputDeviceBinding>>>>;
52
53/// An input pipeline assembly.
54///
55/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
56/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
57/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
58/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
59///
60/// # Implementation notes
61///
62/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
63/// handlers are connected together using async queues.  This allows fully streamed processing of
64/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
65/// without an external stimulus.
66pub struct InputPipelineAssembly {
67    /// The top-level sender: send into this queue to inject an event into the input
68    /// pipeline.
69    sender: UnboundedSender<Vec<input_device::InputEvent>>,
70    /// The bottom-level receiver: any events that fall through the entire pipeline can
71    /// be read from this receiver.
72    receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
73
74    /// The input handlers that comprise the input pipeline.
75    handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
76
77    /// The display ownership watcher task.
78    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
79
80    /// The focus listener task.
81    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
82
83    /// The metrics logger.
84    metrics_logger: metrics::MetricsLogger,
85}
86
87impl InputPipelineAssembly {
88    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
89    /// to add new handlers to it.
90    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
91        let (sender, receiver) = mpsc::unbounded();
92        InputPipelineAssembly {
93            sender,
94            receiver,
95            handlers: vec![],
96            metrics_logger,
97            display_ownership_fut: None,
98            focus_listener_fut: None,
99        }
100    }
101
102    /// Adds another [input_handler::BatchInputHandler] into the [InputPipelineAssembly]. The handlers
103    /// are invoked in the order they are added. Returns `Self` for chaining.
104    pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
105        self.handlers.push(handler);
106        self
107    }
108
109    /// Adds all handlers into the assembly in the order they appear in `handlers`.
110    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
111        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
112    }
113
114    pub fn add_display_ownership(
115        mut self,
116        display_ownership_event: zx::Event,
117        input_handlers_node: &fuchsia_inspect::Node,
118    ) -> InputPipelineAssembly {
119        let h = DisplayOwnership::new(
120            display_ownership_event,
121            input_handlers_node,
122            self.metrics_logger.clone(),
123        );
124        let metrics_logger_clone = self.metrics_logger.clone();
125        let h_clone = h.clone();
126        let sender_clone = self.sender.clone();
127        let display_ownership_fut = Box::pin(async move {
128            h_clone.clone().set_handler_healthy();
129            h_clone.clone()
130                .handle_ownership_change(sender_clone)
131                .await
132                .map_err(|e| {
133                    metrics_logger_clone.log_error(
134                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
135                        std::format!(
136                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
137                        })
138                        .unwrap();
139            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
140        });
141        self.display_ownership_fut = Some(display_ownership_fut);
142        self.add_handler(h)
143    }
144
145    /// Deconstructs the assembly into constituent components, used when constructing
146    /// [InputPipeline].
147    ///
148    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
149    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
150    fn into_components(
151        self,
152    ) -> (
153        UnboundedSender<Vec<input_device::InputEvent>>,
154        UnboundedReceiver<Vec<input_device::InputEvent>>,
155        Vec<Rc<dyn input_handler::BatchInputHandler>>,
156        metrics::MetricsLogger,
157        Option<LocalBoxFuture<'static, ()>>,
158        Option<LocalBoxFuture<'static, ()>>,
159    ) {
160        (
161            self.sender,
162            self.receiver,
163            self.handlers,
164            self.metrics_logger,
165            self.display_ownership_fut,
166            self.focus_listener_fut,
167        )
168    }
169
170    pub fn add_focus_listener(
171        mut self,
172        incoming: &Incoming,
173        focus_chain_publisher: FocusChainProviderPublisher,
174    ) -> Self {
175        let metrics_logger_clone = self.metrics_logger.clone();
176        let incoming2 = incoming.clone();
177        let focus_listener_fut = Box::pin(async move {
178            if let Ok(mut focus_listener) = FocusListener::new(
179                &incoming2,
180                focus_chain_publisher,
181                metrics_logger_clone,
182            )
183            .map_err(|e| {
184                log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
185            }) {
186                // This will await indefinitely and process focus messages in a loop, unless there
187                // is a problem.
188                let _result = focus_listener
189                    .dispatch_focus_changes()
190                    .await
191                    .map(|_| {
192                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
193                    })
194                    .map_err(|e| {
195                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
196                    });
197            }
198        });
199        self.focus_listener_fut = Some(focus_listener_fut);
200        self
201    }
202}
203
204/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
205///
206/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
207/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
208///
209/// # Example
210/// ```
211/// let ime_handler =
212///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
213/// let touch_handler = TouchHandler::new(
214///     scene_manager.session.clone(),
215///     scene_manager.compositor_id,
216///     scene_manager.display_size
217/// ).await?;
218///
219/// let assembly = InputPipelineAssembly::new()
220///     .add_handler(Box::new(ime_handler)),
221///     .add_handler(Box::new(touch_handler)),
222/// let input_pipeline = InputPipeline::new(
223///     vec![
224///         input_device::InputDeviceType::Touch,
225///         input_device::InputDeviceType::Keyboard,
226///     ],
227///     assembly,
228/// );
229/// input_pipeline.handle_input_events().await;
230/// ```
231pub struct InputPipeline {
232    /// The entry point into the input handler pipeline. Incoming input events should
233    /// be inserted into this async queue, and the input pipeline will ensure that they
234    /// are propagated through all the input handlers in the appropriate sequence.
235    pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
236
237    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
238    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
239    device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
240
241    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
242    device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
243
244    /// The types of devices this pipeline supports.
245    input_device_types: Vec<input_device::InputDeviceType>,
246
247    /// The InputDeviceBindings bound to this pipeline.
248    input_device_bindings: InputDeviceBindingMap,
249
250    /// This node is bound to the lifetime of this InputPipeline.
251    /// Inspect data will be dumped for this pipeline as long as it exists.
252    inspect_node: fuchsia_inspect::Node,
253
254    /// The metrics logger.
255    metrics_logger: metrics::MetricsLogger,
256
257    /// The feature flags for the input pipeline.
258    pub feature_flags: input_device::InputPipelineFeatureFlags,
259}
260
261impl InputPipeline {
262    fn new_common(
263        input_device_types: Vec<input_device::InputDeviceType>,
264        assembly: InputPipelineAssembly,
265        inspect_node: fuchsia_inspect::Node,
266        feature_flags: input_device::InputPipelineFeatureFlags,
267    ) -> Self {
268        let (
269            pipeline_sender,
270            receiver,
271            handlers,
272            metrics_logger,
273            display_ownership_fut,
274            focus_listener_fut,
275        ) = assembly.into_components();
276
277        let mut handlers_count = handlers.len();
278        // TODO: b/469745447 - should use futures::select! instead of detach().
279        if let Some(fut) = display_ownership_fut {
280            // The displayer ownership handler, like all input handlers, runs on [`crate::Dispatcher`]
281            // which is driver dispatcher in dso mode. The display ownership future must run on
282            // the same dispatcher because the types do not support multithreaded access.
283            Dispatcher::spawn_local(fut).detach();
284            handlers_count += 1;
285        }
286
287        // TODO: b/469745447 - should use futures::select! instead of detach().
288        if let Some(fut) = focus_listener_fut {
289            fasync::Task::local(fut).detach();
290            handlers_count += 1;
291        }
292
293        // Add properties to inspect node
294        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
295        inspect_node.record_uint("handlers_registered", handlers_count as u64);
296        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
297
298        // Initializes all handlers and starts the input pipeline loop.
299        InputPipeline::run(receiver, handlers, metrics_logger.clone());
300
301        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
302        let input_device_bindings: InputDeviceBindingMap =
303            Arc::new(Mutex::new(SortedVecMap::new()));
304        InputPipeline {
305            pipeline_sender,
306            device_event_sender,
307            device_event_receiver,
308            input_device_types,
309            input_device_bindings,
310            inspect_node,
311            metrics_logger,
312            feature_flags,
313        }
314    }
315
316    /// Creates a new [`InputPipeline`] for integration testing.
317    /// Unlike a production input pipeline, this pipeline will not monitor
318    /// `/dev/class/input-report` for devices.
319    ///
320    /// # Parameters
321    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
322    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
323    pub fn new_for_test(
324        input_device_types: Vec<input_device::InputDeviceType>,
325        assembly: InputPipelineAssembly,
326    ) -> Self {
327        let inspector = fuchsia_inspect::Inspector::default();
328        let root = inspector.root();
329        let test_node = root.create_child("input_pipeline");
330        Self::new_common(
331            input_device_types,
332            assembly,
333            test_node,
334            input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
335        )
336    }
337
338    /// Creates a new [`InputPipeline`] for production use.
339    ///
340    /// # Parameters
341    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
342    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
343    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
344    pub fn new(
345        incoming: &Incoming,
346        input_device_types: Vec<input_device::InputDeviceType>,
347        assembly: InputPipelineAssembly,
348        inspect_node: fuchsia_inspect::Node,
349        feature_flags: input_device::InputPipelineFeatureFlags,
350        metrics_logger: metrics::MetricsLogger,
351    ) -> Result<Self, Error> {
352        let input_pipeline =
353            Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
354        let input_device_types = input_pipeline.input_device_types.clone();
355        let input_event_sender = input_pipeline.device_event_sender.clone();
356        let input_device_bindings = input_pipeline.input_device_bindings.clone();
357        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
358        let feature_flags = input_pipeline.feature_flags.clone();
359        let incoming = incoming.clone();
360        // This intentionally uses the [`fuchsia_async`] task dispatcher instead of
361        // [`crate::Dispatcher`] -- the directory watcher always uses the fuchsia-async dispatcher.
362        // This is fine for performance because the actual event dispatch is still configured to
363        // run on [`crate::Dispatcher`].
364        fasync::Task::local(async move {
365            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
366            // that send InputEvents to `input_event_receiver`.
367            match async {
368                let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
369                incoming.as_ref_directory().open(
370                    input_device::INPUT_REPORT_PATH,
371                    fio::PERM_READABLE,
372                    server.into()
373                )
374                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
375                let device_watcher =
376                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
377                Self::watch_for_devices(
378                    device_watcher,
379                    dir_proxy,
380                    input_device_types,
381                    input_event_sender,
382                    input_device_bindings,
383                    &devices_node,
384                    false, /* break_on_idle */
385                    feature_flags,
386                    metrics_logger.clone(),
387                )
388                .await
389                .context("failed to watch for devices")
390            }
391            .await
392            {
393                Ok(()) => {}
394                Err(err) => {
395                    // This error is usually benign in tests: it means that the setup does not
396                    // support dynamic device discovery. Almost no tests support dynamic
397                    // device discovery, and they also do not need those.
398                    metrics_logger.log_warn(
399                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
400                        std::format!(
401                            "Input pipeline is unable to watch for new input devices: {:?}",
402                            err
403                        ));
404                }
405            }
406        }).detach();
407
408        Ok(input_pipeline)
409    }
410
411    /// Gets the input device bindings.
412    pub fn input_device_bindings(&self) -> &InputDeviceBindingMap {
413        &self.input_device_bindings
414    }
415
416    /// Gets the input device sender: this is the channel that should be cloned
417    /// and used for injecting events from the drivers into the input pipeline.
418    pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
419        &self.device_event_sender
420    }
421
422    /// Gets a list of input device types supported by this input pipeline.
423    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
424        &self.input_device_types
425    }
426
427    /// Forwards all input events into the input pipeline.
428    pub async fn handle_input_events(mut self) {
429        let metrics_logger_clone = self.metrics_logger.clone();
430        while let Some(input_event) = self.device_event_receiver.next().await {
431            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
432                metrics_logger_clone.log_error(
433                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
434                    std::format!("could not forward event from driver: {:?}", e));
435            }
436        }
437
438        metrics_logger_clone.log_error(
439            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
440            "Input pipeline stopped handling input events.".to_string(),
441        );
442    }
443
444    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
445    /// if new devices match a type in `device_types`.
446    ///
447    /// # Parameters
448    /// - `device_watcher`: Watches the input report directory for new devices.
449    /// - `dir_proxy`: The directory containing InputDevice connections.
450    /// - `device_types`: The types of devices to watch for.
451    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
452    /// - `bindings`: Holds all the InputDeviceBindings
453    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
454    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
455    /// - `metrics_logger`: The metrics logger.
456    ///
457    /// # Errors
458    /// If the input report directory or a file within it cannot be read.
459    async fn watch_for_devices(
460        mut device_watcher: Watcher,
461        dir_proxy: fio::DirectoryProxy,
462        device_types: Vec<input_device::InputDeviceType>,
463        input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
464        bindings: InputDeviceBindingMap,
465        input_devices_node: &fuchsia_inspect::Node,
466        break_on_idle: bool,
467        feature_flags: input_device::InputPipelineFeatureFlags,
468        metrics_logger: metrics::MetricsLogger,
469    ) -> Result<(), Error> {
470        // Add non-static properties to inspect node.
471        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
472        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
473        while let Some(msg) = device_watcher.try_next().await? {
474            if let Ok(filename) = msg.filename.into_os_string().into_string() {
475                if filename == "." {
476                    continue;
477                }
478
479                let pathbuf = PathBuf::from(filename.clone());
480                match msg.event {
481                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
482                        log::info!("found input device {}", filename);
483                        devices_discovered.add(1);
484                        let device_proxy =
485                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
486                        add_device_bindings(
487                            &device_types,
488                            &filename,
489                            device_proxy,
490                            &input_event_sender,
491                            &bindings,
492                            get_next_device_id(),
493                            input_devices_node,
494                            Some(&devices_connected),
495                            feature_flags.clone(),
496                            metrics_logger.clone(),
497                        )
498                        .await;
499                    }
500                    WatchEvent::IDLE => {
501                        if break_on_idle {
502                            break;
503                        }
504                    }
505                    _ => (),
506                }
507            }
508        }
509        // Ensure inspect properties persist for debugging if device watch loop ends.
510        input_devices_node.record(devices_discovered);
511        input_devices_node.record(devices_connected);
512        Err(format_err!("Input pipeline stopped watching for new input devices."))
513    }
514
515    /// Handles the incoming InputDeviceRegistryRequestStream.
516    ///
517    /// This method will end when the request stream is closed. If the stream closes with an
518    /// error the error will be returned in the Result.
519    ///
520    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
521    ///
522    /// # Parameters
523    /// - `stream`: The stream of InputDeviceRegistryRequests.
524    /// - `device_types`: The types of devices to watch for.
525    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
526    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
527    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
528    /// - `metrics_logger`: The metrics logger.
529    pub async fn handle_input_device_registry_request_stream(
530        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
531        device_types: &Vec<input_device::InputDeviceType>,
532        input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
533        bindings: &InputDeviceBindingMap,
534        input_devices_node: &fuchsia_inspect::Node,
535        feature_flags: input_device::InputPipelineFeatureFlags,
536        metrics_logger: metrics::MetricsLogger,
537    ) -> Result<(), Error> {
538        while let Some(request) = stream
539            .try_next()
540            .await
541            .context("Error handling input device registry request stream")?
542        {
543            match request {
544                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
545                    device,
546                    ..
547                } => {
548                    // Add a binding if the device is a type being tracked
549                    let device = fidl_next::ClientEnd::<
550                        fidl_next_fuchsia_input_report::InputDevice,
551                        zx::Channel,
552                    >::from_untyped(device.into_channel());
553                    let device = Dispatcher::client_from_zx_channel(device);
554                    let device = device.spawn();
555                    let device_id = get_next_device_id();
556
557                    add_device_bindings(
558                        device_types,
559                        &format!("input-device-registry-{}", device_id),
560                        device,
561                        input_event_sender,
562                        bindings,
563                        device_id,
564                        input_devices_node,
565                        None,
566                        feature_flags.clone(),
567                        metrics_logger.clone(),
568                    )
569                    .await;
570                }
571                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
572                    device,
573                    responder,
574                    .. } => {
575                    // Add a binding if the device is a type being tracked
576                    let device = fidl_next::ClientEnd::<
577                        fidl_next_fuchsia_input_report::InputDevice,
578                        zx::Channel,
579                    >::from_untyped(device.into_channel());
580                    let device = Dispatcher::client_from_zx_channel(device);
581                    let device = device.spawn();
582                    let device_id = get_next_device_id();
583
584                    add_device_bindings(
585                        device_types,
586                        &format!("input-device-registry-{}", device_id),
587                        device,
588                        input_event_sender,
589                        bindings,
590                        device_id,
591                        input_devices_node,
592                        None,
593                        feature_flags.clone(),
594                        metrics_logger.clone(),
595                    )
596                    .await;
597
598                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
599                        device_id: Some(device_id),
600                        ..Default::default()
601                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
602                }
603            }
604        }
605
606        Ok(())
607    }
608
609    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
610    fn run(
611        mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
612        handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
613        metrics_logger: metrics::MetricsLogger,
614    ) {
615        Dispatcher::spawn_local(async move {
616            for handler in &handlers {
617                handler.clone().set_handler_healthy();
618            }
619
620            use input_device::InputEventType;
621
622            let mut handlers_by_type: [Vec<Rc<dyn input_handler::BatchInputHandler>>; InputEventType::COUNT] = Default::default();
623
624            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
625            let event_types = vec![
626                InputEventType::Keyboard,
627                InputEventType::LightSensor,
628                InputEventType::ConsumerControls,
629                InputEventType::Mouse,
630                InputEventType::TouchScreen,
631                InputEventType::Touchpad,
632                #[cfg(test)]
633                InputEventType::Fake,
634            ];
635
636            for event_type in event_types {
637                let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
638                    .iter()
639                    .filter(|h| h.interest().contains(&event_type))
640                    .cloned()
641                    .collect();
642                handlers_by_type[event_type as usize] = handlers_for_type;
643            }
644
645            while let Some(events) = receiver.next().await {
646                if events.is_empty() {
647                    continue;
648                }
649
650                let mut groups_seen = 0;
651                let events = events
652                    .into_iter()
653                    .chunk_by(|e| InputEventType::from(&e.device_event));
654                let events = events.into_iter().map(|(k, v)| (k, v.collect::<Vec<_>>()));
655                for (event_type, event_group) in events {
656                    groups_seen += 1;
657                    if groups_seen == 2 {
658                        metrics_logger.log_error(
659                                InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
660                                "it is not recommended to contain multiple types of events in 1 send".to_string(),
661                            );
662                    }
663                    let mut events_in_group = event_group;
664
665                    // Get pre-computed handlers for this event type.
666                    let handlers = &handlers_by_type[event_type as usize];
667
668                    for handler in handlers {
669                        events_in_group =
670                            handler.clone().handle_input_events(events_in_group).await;
671                    }
672
673                    for event in events_in_group {
674                        if event.handled == input_device::Handled::No {
675                            log::warn!("unhandled input event: {:?}", event);
676                        }
677                        if let Some(trace_id) = event.trace_id {
678                            fuchsia_trace::flow_end!(
679                                "input",
680                                "event_in_input_pipeline",
681                                trace_id.into()
682                            );
683                        }
684                    }
685                }
686            }
687            for handler in &handlers {
688                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
689            }
690            panic!("Runner task is not supposed to terminate.")
691        }).detach();
692    }
693}
694
695/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
696///
697/// # Parameters
698/// - `device_types`: The types of devices to watch for.
699/// - `device_proxy`: A proxy to the input device.
700/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
701/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
702/// - `device_id`: The device id of the associated bindings.
703/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
704///
705/// # Note
706/// This will create multiple bindings, in the case where
707/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
708///   with multiple table fields populated, and
709/// * multiple populated table fields correspond to device types present in `device_types`
710///
711/// This is used, for example, to support the Atlas touchpad. In that case, a single
712/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
713/// a `fuchsia.input.report.TouchDescriptor`.
714async fn add_device_bindings(
715    device_types: &Vec<input_device::InputDeviceType>,
716    filename: &String,
717    device_proxy: fidl_next::Client<fidl_next_fuchsia_input_report::InputDevice, Transport>,
718    input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
719    bindings: &InputDeviceBindingMap,
720    device_id: u32,
721    input_devices_node: &fuchsia_inspect::Node,
722    devices_connected: Option<&fuchsia_inspect::UintProperty>,
723    feature_flags: InputPipelineFeatureFlags,
724    metrics_logger: metrics::MetricsLogger,
725) {
726    let mut matched_device_types = vec![];
727    if let Ok(res) = device_proxy.get_descriptor().await {
728        for device_type in device_types {
729            if input_device::is_device_type(&res.descriptor, *device_type).await {
730                matched_device_types.push(device_type);
731                match devices_connected {
732                    Some(dev_connected) => {
733                        let _ = dev_connected.add(1);
734                    }
735                    None => (),
736                };
737            }
738        }
739        if matched_device_types.is_empty() {
740            log::info!(
741                "device {} did not match any supported device types: {:?}",
742                filename,
743                device_types
744            );
745            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
746            let mut health = fuchsia_inspect::health::Node::new(&device_node);
747            health.set_unhealthy("Unsupported device type.");
748            device_node.record(health);
749            input_devices_node.record(device_node);
750            return;
751        }
752    } else {
753        metrics_logger.clone().log_error(
754            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
755            std::format!("cannot bind device {} without a device descriptor", filename),
756        );
757        return;
758    }
759
760    log::info!(
761        "binding {} to device types: {}",
762        filename,
763        matched_device_types
764            .iter()
765            .fold(String::new(), |device_types_string, device_type| device_types_string
766                + &format!("{:?}, ", device_type))
767    );
768
769    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
770    for device_type in matched_device_types {
771        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
772        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
773        //
774        // There's no conflict in having multiple bindings read from the same node,
775        // since:
776        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
777        // * the device driver will copy each incoming report to each connected reader.
778        //
779        // This does mean that reports from the Atlas touchpad device get read twice
780        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
781        // is operating in mouse mode or touchpad mode.
782        //
783        // This hasn't been an issue because:
784        // * Semantically: things are fine, because each binding discards irrelevant reports.
785        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
786        // * Performance wise: things are fine, because the data rate of the touchpad is low
787        //   (125 HZ).
788        //
789        // If we add additional cases where bindings share an underlying `input-report` node,
790        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
791        let proxy = device_proxy.clone();
792        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
793        match input_device::get_device_binding(
794            *device_type,
795            proxy,
796            device_id,
797            input_event_sender.clone(),
798            device_node,
799            feature_flags.clone(),
800            metrics_logger.clone(),
801        )
802        .await
803        {
804            Ok(binding) => new_bindings.push(binding),
805            Err(e) => {
806                metrics_logger.log_error(
807                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
808                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
809                );
810            }
811        }
812    }
813
814    if !new_bindings.is_empty() {
815        let mut bindings = bindings.lock().await;
816        if let Some(v) = bindings.get_mut(&device_id) {
817            v.extend(new_bindings);
818        } else {
819            bindings.insert(device_id, new_bindings);
820        }
821    }
822}
823
824#[cfg(test)]
825mod tests {
826    use super::*;
827    use crate::input_device::{InputDeviceBinding, InputEventType};
828    use crate::utils::Position;
829    use crate::{fake_input_device_binding, mouse_binding, observe_fake_events_input_handler};
830    use async_trait::async_trait;
831    use diagnostics_assertions::AnyProperty;
832    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
833    use fuchsia_async as fasync;
834    use futures::FutureExt;
835    use pretty_assertions::assert_eq;
836    use rand::Rng;
837    use sorted_vec_map::SortedVecSet;
838    use vfs::{pseudo_directory, service as pseudo_fs_service};
839
840    const COUNTS_PER_MM: u32 = 12;
841
842    /// Returns the InputEvent sent over `sender`.
843    ///
844    /// # Parameters
845    /// - `sender`: The channel to send the InputEvent over.
846    fn send_input_event(
847        sender: UnboundedSender<Vec<input_device::InputEvent>>,
848    ) -> Vec<input_device::InputEvent> {
849        let mut rng = rand::rng();
850        let offset =
851            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
852        let input_event = input_device::InputEvent {
853            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
854                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
855                    millimeters: Position {
856                        x: offset.x / COUNTS_PER_MM as f32,
857                        y: offset.y / COUNTS_PER_MM as f32,
858                    },
859                }),
860                None, /* wheel_delta_v */
861                None, /* wheel_delta_h */
862                mouse_binding::MousePhase::Move,
863                SortedVecSet::new(),
864                SortedVecSet::new(),
865                None, /* is_precision_scroll */
866                None, /* wake_lease */
867            )),
868            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
869                mouse_binding::MouseDeviceDescriptor {
870                    device_id: 1,
871                    absolute_x_range: None,
872                    absolute_y_range: None,
873                    wheel_v_range: None,
874                    wheel_h_range: None,
875                    buttons: None,
876                    counts_per_mm: COUNTS_PER_MM,
877                },
878            ),
879            event_time: zx::MonotonicInstant::get(),
880            handled: input_device::Handled::No,
881            trace_id: None,
882        };
883        match sender.unbounded_send(vec![input_event.clone()]) {
884            Err(_) => assert!(false),
885            _ => {}
886        }
887
888        vec![input_event]
889    }
890
891    /// Returns a MouseDescriptor on an InputDeviceRequest.
892    ///
893    /// # Parameters
894    /// - `input_device_request`: The request to handle.
895    fn handle_input_device_request(
896        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
897    ) {
898        match input_device_request {
899            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
900                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
901                    device_information: None,
902                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
903                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
904                            movement_x: None,
905                            movement_y: None,
906                            scroll_v: None,
907                            scroll_h: None,
908                            buttons: Some(vec![0]),
909                            position_x: None,
910                            position_y: None,
911                            ..Default::default()
912                        }),
913                        ..Default::default()
914                    }),
915                    sensor: None,
916                    touch: None,
917                    keyboard: None,
918                    consumer_control: None,
919                    ..Default::default()
920                });
921            }
922            _ => {}
923        }
924    }
925
926    /// Tests that an input pipeline handles events from multiple devices.
927    #[fasync::run_singlethreaded(test)]
928    async fn multiple_devices_single_handler() {
929        // Create two fake device bindings.
930        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
931        let first_device_binding =
932            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
933        let second_device_binding =
934            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
935
936        // Create a fake input handler.
937        let (handler_event_sender, mut handler_event_receiver) =
938            futures::channel::mpsc::channel(100);
939        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
940            handler_event_sender,
941        );
942
943        // Build the input pipeline.
944        let (sender, receiver, handlers, _, _, _) =
945            InputPipelineAssembly::new(metrics::MetricsLogger::default())
946                .add_handler(input_handler)
947                .into_components();
948        let inspector = fuchsia_inspect::Inspector::default();
949        let test_node = inspector.root().create_child("input_pipeline");
950        let input_pipeline = InputPipeline {
951            pipeline_sender: sender,
952            device_event_sender,
953            device_event_receiver,
954            input_device_types: vec![],
955            input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
956            inspect_node: test_node,
957            metrics_logger: metrics::MetricsLogger::default(),
958            feature_flags: input_device::InputPipelineFeatureFlags::default(),
959        };
960        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
961
962        // Send an input event from each device.
963        let first_device_events = send_input_event(first_device_binding.input_event_sender());
964        let second_device_events = send_input_event(second_device_binding.input_event_sender());
965
966        // Run the pipeline.
967        fasync::Task::local(async {
968            input_pipeline.handle_input_events().await;
969        })
970        .detach();
971
972        // Assert the handler receives the events.
973        let first_handled_event = handler_event_receiver.next().await;
974        assert_eq!(first_handled_event, first_device_events.into_iter().next());
975
976        let second_handled_event = handler_event_receiver.next().await;
977        assert_eq!(second_handled_event, second_device_events.into_iter().next());
978    }
979
980    /// Tests that an input pipeline handles events through multiple input handlers.
981    #[fasync::run_singlethreaded(test)]
982    async fn single_device_multiple_handlers() {
983        // Create two fake device bindings.
984        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
985        let input_device_binding =
986            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
987
988        // Create two fake input handlers.
989        let (first_handler_event_sender, mut first_handler_event_receiver) =
990            futures::channel::mpsc::channel(100);
991        let first_input_handler =
992            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
993                first_handler_event_sender,
994            );
995        let (second_handler_event_sender, mut second_handler_event_receiver) =
996            futures::channel::mpsc::channel(100);
997        let second_input_handler =
998            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
999                second_handler_event_sender,
1000            );
1001
1002        // Build the input pipeline.
1003        let (sender, receiver, handlers, _, _, _) =
1004            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1005                .add_handler(first_input_handler)
1006                .add_handler(second_input_handler)
1007                .into_components();
1008        let inspector = fuchsia_inspect::Inspector::default();
1009        let test_node = inspector.root().create_child("input_pipeline");
1010        let input_pipeline = InputPipeline {
1011            pipeline_sender: sender,
1012            device_event_sender,
1013            device_event_receiver,
1014            input_device_types: vec![],
1015            input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
1016            inspect_node: test_node,
1017            metrics_logger: metrics::MetricsLogger::default(),
1018            feature_flags: input_device::InputPipelineFeatureFlags::default(),
1019        };
1020        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1021
1022        // Send an input event.
1023        let input_events = send_input_event(input_device_binding.input_event_sender());
1024
1025        // Run the pipeline.
1026        fasync::Task::local(async {
1027            input_pipeline.handle_input_events().await;
1028        })
1029        .detach();
1030
1031        // Assert both handlers receive the event.
1032        let expected_event = input_events.into_iter().next();
1033        let first_handler_event = first_handler_event_receiver.next().await;
1034        assert_eq!(first_handler_event, expected_event);
1035        let second_handler_event = second_handler_event_receiver.next().await;
1036        assert_eq!(second_handler_event, expected_event);
1037    }
1038
1039    /// Tests that a single mouse device binding is created for the one input device in the
1040    /// input report directory.
1041    #[fasync::run_singlethreaded(test)]
1042    async fn watch_devices_one_match_exists() {
1043        // Create a file in a pseudo directory that represents an input device.
1044        let mut count: i8 = 0;
1045        let dir = pseudo_directory! {
1046            "file_name" => pseudo_fs_service::host(
1047                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1048                    async move {
1049                        while count < 3 {
1050                            if let Some(input_device_request) =
1051                                request_stream.try_next().await.unwrap()
1052                            {
1053                                handle_input_device_request(input_device_request);
1054                                count += 1;
1055                            }
1056                        }
1057
1058                    }.boxed()
1059                },
1060            )
1061        };
1062
1063        // Create a Watcher on the pseudo directory.
1064        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1065        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1066        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1067        // proxy to get connections to input devices.
1068        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1069
1070        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1071        let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1072        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1073
1074        let inspector = fuchsia_inspect::Inspector::default();
1075        let test_node = inspector.root().create_child("input_pipeline");
1076        test_node.record_string(
1077            "supported_input_devices",
1078            supported_device_types.clone().iter().join(", "),
1079        );
1080        let input_devices = test_node.create_child("input_devices");
1081        // Assert that inspect tree is initialized with no devices.
1082        diagnostics_assertions::assert_data_tree!(inspector, root: {
1083            input_pipeline: {
1084                supported_input_devices: "Mouse",
1085                input_devices: {}
1086            }
1087        });
1088
1089        let _ = InputPipeline::watch_for_devices(
1090            device_watcher,
1091            dir_proxy_for_pipeline,
1092            supported_device_types,
1093            input_event_sender,
1094            bindings.clone(),
1095            &input_devices,
1096            true, /* break_on_idle */
1097            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1098            metrics::MetricsLogger::default(),
1099        )
1100        .await;
1101
1102        // Assert that one mouse device with accurate device id was found.
1103        let bindings_map = bindings.lock().await;
1104        assert_eq!(bindings_map.len(), 1);
1105        let bindings_vector = bindings_map.get(&10);
1106        assert!(bindings_vector.is_some());
1107        assert_eq!(bindings_vector.unwrap().len(), 1);
1108        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1109        assert!(boxed_mouse_binding.is_some());
1110        assert_eq!(
1111            boxed_mouse_binding.unwrap().get_device_descriptor(),
1112            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1113                device_id: 10,
1114                absolute_x_range: None,
1115                absolute_y_range: None,
1116                wheel_v_range: None,
1117                wheel_h_range: None,
1118                buttons: Some(vec![0]),
1119                counts_per_mm: mouse_binding::DEFAULT_COUNTS_PER_MM,
1120            })
1121        );
1122
1123        // Assert that inspect tree reflects new device discovered and connected.
1124        diagnostics_assertions::assert_data_tree!(inspector, root: {
1125            input_pipeline: {
1126                supported_input_devices: "Mouse",
1127                input_devices: {
1128                    devices_discovered: 1u64,
1129                    devices_connected: 1u64,
1130                    "file_name_Mouse": contains {
1131                        reports_received_count: 0u64,
1132                        reports_filtered_count: 0u64,
1133                        events_generated: 0u64,
1134                        last_received_timestamp_ns: 0u64,
1135                        last_generated_timestamp_ns: 0u64,
1136                        "fuchsia.inspect.Health": {
1137                            status: "OK",
1138                            // Timestamp value is unpredictable and not relevant in this context,
1139                            // so we only assert that the property is present.
1140                            start_timestamp_nanos: AnyProperty
1141                        },
1142                    }
1143                }
1144            }
1145        });
1146    }
1147
1148    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1149    /// but only a mouse exists.
1150    #[fasync::run_singlethreaded(test)]
1151    async fn watch_devices_no_matches_exist() {
1152        // Create a file in a pseudo directory that represents an input device.
1153        let mut count: i8 = 0;
1154        let dir = pseudo_directory! {
1155            "file_name" => pseudo_fs_service::host(
1156                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1157                    async move {
1158                        while count < 1 {
1159                            if let Some(input_device_request) =
1160                                request_stream.try_next().await.unwrap()
1161                            {
1162                                handle_input_device_request(input_device_request);
1163                                count += 1;
1164                            }
1165                        }
1166
1167                    }.boxed()
1168                },
1169            )
1170        };
1171
1172        // Create a Watcher on the pseudo directory.
1173        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1174        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1175        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1176        // proxy to get connections to input devices.
1177        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1178
1179        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1180        let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1181        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1182
1183        let inspector = fuchsia_inspect::Inspector::default();
1184        let test_node = inspector.root().create_child("input_pipeline");
1185        test_node.record_string(
1186            "supported_input_devices",
1187            supported_device_types.clone().iter().join(", "),
1188        );
1189        let input_devices = test_node.create_child("input_devices");
1190        // Assert that inspect tree is initialized with no devices.
1191        diagnostics_assertions::assert_data_tree!(inspector, root: {
1192            input_pipeline: {
1193                supported_input_devices: "Keyboard",
1194                input_devices: {}
1195            }
1196        });
1197
1198        let _ = InputPipeline::watch_for_devices(
1199            device_watcher,
1200            dir_proxy_for_pipeline,
1201            supported_device_types,
1202            input_event_sender,
1203            bindings.clone(),
1204            &input_devices,
1205            true, /* break_on_idle */
1206            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1207            metrics::MetricsLogger::default(),
1208        )
1209        .await;
1210
1211        // Assert that no devices were found.
1212        let bindings = bindings.lock().await;
1213        assert_eq!(bindings.len(), 0);
1214
1215        // Assert that inspect tree reflects new device discovered, but not connected.
1216        diagnostics_assertions::assert_data_tree!(inspector, root: {
1217            input_pipeline: {
1218                supported_input_devices: "Keyboard",
1219                input_devices: {
1220                    devices_discovered: 1u64,
1221                    devices_connected: 0u64,
1222                    "file_name_Unsupported": {
1223                        "fuchsia.inspect.Health": {
1224                            status: "UNHEALTHY",
1225                            message: "Unsupported device type.",
1226                            // Timestamp value is unpredictable and not relevant in this context,
1227                            // so we only assert that the property is present.
1228                            start_timestamp_nanos: AnyProperty
1229                        },
1230                    }
1231                }
1232            }
1233        });
1234    }
1235
1236    /// Tests that a single keyboard device binding is created for the input device registered
1237    /// through InputDeviceRegistry.
1238    #[fasync::run_singlethreaded(test)]
1239    async fn handle_input_device_registry_request_stream() {
1240        let (input_device_registry_proxy, input_device_registry_request_stream) =
1241            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1242        let (input_device_client_end, mut input_device_request_stream) =
1243            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1244
1245        let device_types = vec![input_device::InputDeviceType::Mouse];
1246        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1247        let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1248
1249        // Handle input device requests.
1250        let mut count: i8 = 0;
1251        fasync::Task::local(async move {
1252            // Register a device.
1253            let _ = input_device_registry_proxy.register(input_device_client_end);
1254
1255            while count < 3 {
1256                if let Some(input_device_request) =
1257                    input_device_request_stream.try_next().await.unwrap()
1258                {
1259                    handle_input_device_request(input_device_request);
1260                    count += 1;
1261                }
1262            }
1263
1264            // End handle_input_device_registry_request_stream() by taking the event stream.
1265            input_device_registry_proxy.take_event_stream();
1266        })
1267        .detach();
1268
1269        let inspector = fuchsia_inspect::Inspector::default();
1270        let test_node = inspector.root().create_child("input_pipeline");
1271
1272        // Start listening for InputDeviceRegistryRequests.
1273        let bindings_clone = bindings.clone();
1274        let _ = InputPipeline::handle_input_device_registry_request_stream(
1275            input_device_registry_request_stream,
1276            &device_types,
1277            &input_event_sender,
1278            &bindings_clone,
1279            &test_node,
1280            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1281            metrics::MetricsLogger::default(),
1282        )
1283        .await;
1284
1285        // Assert that a device was registered.
1286        let bindings = bindings.lock().await;
1287        assert_eq!(bindings.len(), 1);
1288    }
1289
1290    // Tests that correct properties are added to inspect node when InputPipeline is created.
1291    #[fasync::run_singlethreaded(test)]
1292    async fn check_inspect_node_has_correct_properties() {
1293        let device_types = vec![
1294            input_device::InputDeviceType::Touch,
1295            input_device::InputDeviceType::ConsumerControls,
1296        ];
1297        let inspector = fuchsia_inspect::Inspector::default();
1298        let test_node = inspector.root().create_child("input_pipeline");
1299        // Create fake input handler for assembly
1300        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1301            futures::channel::mpsc::channel(100);
1302        let fake_input_handler =
1303            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1304                fake_handler_event_sender,
1305            );
1306        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1307            .add_handler(fake_input_handler);
1308        let _test_input_pipeline = InputPipeline::new(
1309            &Incoming::new(),
1310            device_types,
1311            assembly,
1312            test_node,
1313            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1314            metrics::MetricsLogger::default(),
1315        );
1316        diagnostics_assertions::assert_data_tree!(inspector, root: {
1317            input_pipeline: {
1318                supported_input_devices: "Touch, ConsumerControls",
1319                handlers_registered: 1u64,
1320                handlers_healthy: 1u64,
1321                input_devices: {}
1322            }
1323        });
1324    }
1325
1326    struct SpecificInterestFakeHandler {
1327        interest_types: Vec<input_device::InputEventType>,
1328        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1329    }
1330
1331    impl SpecificInterestFakeHandler {
1332        pub fn new(
1333            interest_types: Vec<input_device::InputEventType>,
1334            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1335        ) -> Rc<Self> {
1336            Rc::new(SpecificInterestFakeHandler {
1337                interest_types,
1338                event_sender: std::cell::RefCell::new(event_sender),
1339            })
1340        }
1341    }
1342
1343    impl Handler for SpecificInterestFakeHandler {
1344        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1345        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1346        fn get_name(&self) -> &'static str {
1347            "SpecificInterestFakeHandler"
1348        }
1349
1350        fn interest(&self) -> Vec<input_device::InputEventType> {
1351            self.interest_types.clone()
1352        }
1353    }
1354
1355    #[async_trait(?Send)]
1356    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1357        async fn handle_input_event(
1358            self: Rc<Self>,
1359            input_event: input_device::InputEvent,
1360        ) -> Vec<input_device::InputEvent> {
1361            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1362                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1363                Ok(_) => {}
1364            }
1365            vec![input_event]
1366        }
1367    }
1368
1369    #[fasync::run_singlethreaded(test)]
1370    async fn run_only_sends_events_to_interested_handlers() {
1371        // Mouse Handler (Specific Interest: Mouse)
1372        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1373        let mouse_handler =
1374            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1375
1376        // Fake Handler (Specific Interest: Fake)
1377        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1378        let fake_handler =
1379            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1380
1381        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1382            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1383                .add_handler(mouse_handler)
1384                .add_handler(fake_handler)
1385                .into_components();
1386
1387        // Run the pipeline logic
1388        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1389
1390        // Create a Fake event
1391        let fake_event = input_device::InputEvent {
1392            device_event: input_device::InputDeviceEvent::Fake,
1393            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1394            event_time: zx::MonotonicInstant::get(),
1395            handled: input_device::Handled::No,
1396            trace_id: None,
1397        };
1398
1399        // Send the Fake event
1400        pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1401
1402        // Verify Fake Handler received it
1403        let received_by_fake = fake_receiver.next().await;
1404        assert_eq!(received_by_fake, Some(fake_event));
1405
1406        // Verify Mouse Handler did NOT receive it
1407        assert!(mouse_receiver.try_next().is_err());
1408    }
1409
1410    fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1411        input_device::InputEvent {
1412            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1413                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1414                    millimeters: Position { x, y },
1415                }),
1416                None,
1417                None,
1418                mouse_binding::MousePhase::Move,
1419                SortedVecSet::new(),
1420                SortedVecSet::new(),
1421                None,
1422                None,
1423            )),
1424            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1425                mouse_binding::MouseDeviceDescriptor {
1426                    device_id: 1,
1427                    absolute_x_range: None,
1428                    absolute_y_range: None,
1429                    wheel_v_range: None,
1430                    wheel_h_range: None,
1431                    buttons: None,
1432                    counts_per_mm: 1,
1433                },
1434            ),
1435            event_time: zx::MonotonicInstant::get(),
1436            handled: input_device::Handled::No,
1437            trace_id: None,
1438        }
1439    }
1440
1441    #[fasync::run_singlethreaded(test)]
1442    async fn run_mixed_event_types_dispatched_correctly() {
1443        // Mouse Handler (Specific Interest: Mouse)
1444        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1445        let mouse_handler =
1446            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1447
1448        // Fake Handler (Specific Interest: Fake)
1449        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1450        let fake_handler =
1451            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1452
1453        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1454            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1455                .add_handler(mouse_handler)
1456                .add_handler(fake_handler)
1457                .into_components();
1458
1459        // Run the pipeline logic
1460        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1461
1462        // Create events
1463        let mouse_event_1 = create_mouse_event(1.0, 1.0);
1464        let mouse_event_2 = create_mouse_event(2.0, 2.0);
1465        let mouse_event_3 = create_mouse_event(3.0, 3.0);
1466
1467        let fake_event_1 = input_device::InputEvent {
1468            device_event: input_device::InputDeviceEvent::Fake,
1469            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1470            event_time: zx::MonotonicInstant::get(),
1471            handled: input_device::Handled::No,
1472            trace_id: None,
1473        };
1474
1475        // Send mixed batch: [Mouse, Mouse, Fake, Mouse]
1476        // This should result in 3 chunks: [Mouse, Mouse], [Fake], [Mouse]
1477        let mixed_batch = vec![
1478            mouse_event_1.clone(),
1479            mouse_event_2.clone(),
1480            fake_event_1.clone(),
1481            mouse_event_3.clone(),
1482        ];
1483        pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1484
1485        // Verify Mouse Handler received M1, M2, and then M3
1486        assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1487        assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1488        assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1489
1490        // Verify Fake Handler received F1
1491        assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1492    }
1493}