Skip to main content

input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, Transport, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use fidl_fuchsia_io as fio;
13use focus_chain_provider::FocusChainProviderPublisher;
14use fuchsia_async as fasync;
15use fuchsia_component::directory::AsRefDirectory;
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use fuchsia_inspect::NumericProperty;
18use fuchsia_inspect::health::Reporter;
19use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
20use futures::future::LocalBoxFuture;
21use futures::lock::Mutex;
22use futures::{StreamExt, TryStreamExt};
23use itertools::Itertools;
24use metrics_registry::*;
25use sorted_vec_map::SortedVecMap;
26use std::path::PathBuf;
27use std::rc::Rc;
28use std::sync::atomic::{AtomicU32, Ordering};
29use std::sync::{Arc, LazyLock};
30use strum::EnumCount;
31
32/// Use a self incremental u32 unique id for device_id.
33///
34/// device id start from 10 to avoid conflict with default devices in Starnix.
35/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
36/// use default devices to deliver events from physical devices until we have
37/// API to expose device changes to UI clients.
38static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
39
40/// Each time this function is invoked, it returns the current value of its
41/// internal counter (serving as a unique id for device_id) and then increments
42/// that counter in preparation for the next call.
43fn get_next_device_id() -> u32 {
44    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
45}
46
47type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
48
49/// An [`InputDeviceBindingMap`] maps an input device to one or more InputDeviceBindings.
50/// It uses unique device id as key.
51pub type InputDeviceBindingMap = Arc<Mutex<SortedVecMap<u32, Vec<BoxedInputDeviceBinding>>>>;
52
53/// An input pipeline assembly.
54///
55/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
56/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
57/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
58/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
59///
60/// # Implementation notes
61///
62/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
63/// handlers are connected together using async queues.  This allows fully streamed processing of
64/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
65/// without an external stimulus.
66pub struct InputPipelineAssembly {
67    /// The top-level sender: send into this queue to inject an event into the input
68    /// pipeline.
69    sender: UnboundedSender<Vec<input_device::InputEvent>>,
70    /// The bottom-level receiver: any events that fall through the entire pipeline can
71    /// be read from this receiver.
72    receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
73
74    /// The input handlers that comprise the input pipeline.
75    handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
76
77    /// The display ownership watcher task.
78    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
79
80    /// The focus listener task.
81    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
82
83    /// The metrics logger.
84    metrics_logger: metrics::MetricsLogger,
85}
86
87impl InputPipelineAssembly {
88    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
89    /// to add new handlers to it.
90    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
91        let (sender, receiver) = mpsc::unbounded();
92        InputPipelineAssembly {
93            sender,
94            receiver,
95            handlers: vec![],
96            metrics_logger,
97            display_ownership_fut: None,
98            focus_listener_fut: None,
99        }
100    }
101
102    /// Adds another [input_handler::BatchInputHandler] into the [InputPipelineAssembly]. The handlers
103    /// are invoked in the order they are added. Returns `Self` for chaining.
104    pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
105        self.handlers.push(handler);
106        self
107    }
108
109    /// Adds all handlers into the assembly in the order they appear in `handlers`.
110    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
111        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
112    }
113
114    pub fn add_display_ownership(
115        mut self,
116        display_ownership_event: zx::Event,
117        input_handlers_node: &fuchsia_inspect::Node,
118    ) -> InputPipelineAssembly {
119        let h = DisplayOwnership::new(
120            display_ownership_event,
121            input_handlers_node,
122            self.metrics_logger.clone(),
123        );
124        let metrics_logger_clone = self.metrics_logger.clone();
125        let h_clone = h.clone();
126        let sender_clone = self.sender.clone();
127        let display_ownership_fut = Box::pin(async move {
128            h_clone.clone().set_handler_healthy();
129            h_clone.clone()
130                .handle_ownership_change(sender_clone)
131                .await
132                .map_err(|e| {
133                    metrics_logger_clone.log_error(
134                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
135                        std::format!(
136                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
137                        })
138                        .unwrap();
139            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
140        });
141        self.display_ownership_fut = Some(display_ownership_fut);
142        self.add_handler(h)
143    }
144
145    /// Deconstructs the assembly into constituent components, used when constructing
146    /// [InputPipeline].
147    ///
148    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
149    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
150    fn into_components(
151        self,
152    ) -> (
153        UnboundedSender<Vec<input_device::InputEvent>>,
154        UnboundedReceiver<Vec<input_device::InputEvent>>,
155        Vec<Rc<dyn input_handler::BatchInputHandler>>,
156        metrics::MetricsLogger,
157        Option<LocalBoxFuture<'static, ()>>,
158        Option<LocalBoxFuture<'static, ()>>,
159    ) {
160        (
161            self.sender,
162            self.receiver,
163            self.handlers,
164            self.metrics_logger,
165            self.display_ownership_fut,
166            self.focus_listener_fut,
167        )
168    }
169
170    pub fn add_focus_listener(
171        mut self,
172        incoming: &Incoming,
173        focus_chain_publisher: FocusChainProviderPublisher,
174    ) -> Self {
175        let metrics_logger_clone = self.metrics_logger.clone();
176        let incoming2 = incoming.clone();
177        let focus_listener_fut = Box::pin(async move {
178            if let Ok(mut focus_listener) = FocusListener::new(
179                &incoming2,
180                focus_chain_publisher,
181                metrics_logger_clone,
182            )
183            .map_err(|e| {
184                log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
185            }) {
186                // This will await indefinitely and process focus messages in a loop, unless there
187                // is a problem.
188                let _result = focus_listener
189                    .dispatch_focus_changes()
190                    .await
191                    .map(|_| {
192                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
193                    })
194                    .map_err(|e| {
195                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
196                    });
197            }
198        });
199        self.focus_listener_fut = Some(focus_listener_fut);
200        self
201    }
202}
203
204/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
205///
206/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
207/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
208///
209/// # Example
210/// ```
211/// let ime_handler =
212///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
213/// let touch_handler = TouchHandler::new(
214///     scene_manager.session.clone(),
215///     scene_manager.compositor_id,
216///     scene_manager.display_size
217/// ).await?;
218///
219/// let assembly = InputPipelineAssembly::new()
220///     .add_handler(Box::new(ime_handler)),
221///     .add_handler(Box::new(touch_handler)),
222/// let input_pipeline = InputPipeline::new(
223///     vec![
224///         input_device::InputDeviceType::Touch,
225///         input_device::InputDeviceType::Keyboard,
226///     ],
227///     assembly,
228/// );
229/// input_pipeline.handle_input_events().await;
230/// ```
231pub struct InputPipeline {
232    /// The entry point into the input handler pipeline. Incoming input events should
233    /// be inserted into this async queue, and the input pipeline will ensure that they
234    /// are propagated through all the input handlers in the appropriate sequence.
235    pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
236
237    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
238    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
239    device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
240
241    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
242    device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
243
244    /// The types of devices this pipeline supports.
245    input_device_types: Vec<input_device::InputDeviceType>,
246
247    /// The InputDeviceBindings bound to this pipeline.
248    input_device_bindings: InputDeviceBindingMap,
249
250    /// This node is bound to the lifetime of this InputPipeline.
251    /// Inspect data will be dumped for this pipeline as long as it exists.
252    inspect_node: fuchsia_inspect::Node,
253
254    /// The metrics logger.
255    metrics_logger: metrics::MetricsLogger,
256
257    /// The feature flags for the input pipeline.
258    pub feature_flags: input_device::InputPipelineFeatureFlags,
259}
260
261impl InputPipeline {
262    fn new_common(
263        input_device_types: Vec<input_device::InputDeviceType>,
264        assembly: InputPipelineAssembly,
265        inspect_node: fuchsia_inspect::Node,
266        feature_flags: input_device::InputPipelineFeatureFlags,
267    ) -> Self {
268        let (
269            pipeline_sender,
270            receiver,
271            handlers,
272            metrics_logger,
273            display_ownership_fut,
274            focus_listener_fut,
275        ) = assembly.into_components();
276
277        let mut handlers_count = handlers.len();
278        // TODO: b/469745447 - should use futures::select! instead of detach().
279        if let Some(fut) = display_ownership_fut {
280            // The displayer ownership handler, like all input handlers, runs on [`crate::Dispatcher`]
281            // which is driver dispatcher in dso mode. The display ownership future must run on
282            // the same dispatcher because the types do not support multithreaded access.
283            Dispatcher::spawn_local(fut).detach();
284            handlers_count += 1;
285        }
286
287        // TODO: b/469745447 - should use futures::select! instead of detach().
288        if let Some(fut) = focus_listener_fut {
289            fasync::Task::local(fut).detach();
290            handlers_count += 1;
291        }
292
293        // Add properties to inspect node
294        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
295        inspect_node.record_uint("handlers_registered", handlers_count as u64);
296        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
297
298        // Initializes all handlers and starts the input pipeline loop.
299        InputPipeline::run(receiver, handlers, metrics_logger.clone());
300
301        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
302        let input_device_bindings: InputDeviceBindingMap =
303            Arc::new(Mutex::new(SortedVecMap::new()));
304        InputPipeline {
305            pipeline_sender,
306            device_event_sender,
307            device_event_receiver,
308            input_device_types,
309            input_device_bindings,
310            inspect_node,
311            metrics_logger,
312            feature_flags,
313        }
314    }
315
316    /// Creates a new [`InputPipeline`] for integration testing.
317    /// Unlike a production input pipeline, this pipeline will not monitor
318    /// `/dev/class/input-report` for devices.
319    ///
320    /// # Parameters
321    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
322    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
323    pub fn new_for_test(
324        input_device_types: Vec<input_device::InputDeviceType>,
325        assembly: InputPipelineAssembly,
326    ) -> Self {
327        let inspector = fuchsia_inspect::Inspector::default();
328        let root = inspector.root();
329        let test_node = root.create_child("input_pipeline");
330        Self::new_common(
331            input_device_types,
332            assembly,
333            test_node,
334            input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
335        )
336    }
337
338    /// Creates a new [`InputPipeline`] for production use.
339    ///
340    /// # Parameters
341    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
342    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
343    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
344    pub fn new(
345        incoming: &Incoming,
346        input_device_types: Vec<input_device::InputDeviceType>,
347        assembly: InputPipelineAssembly,
348        inspect_node: fuchsia_inspect::Node,
349        feature_flags: input_device::InputPipelineFeatureFlags,
350        metrics_logger: metrics::MetricsLogger,
351    ) -> Result<Self, Error> {
352        let input_pipeline =
353            Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
354        let input_device_types = input_pipeline.input_device_types.clone();
355        let input_event_sender = input_pipeline.device_event_sender.clone();
356        let input_device_bindings = input_pipeline.input_device_bindings.clone();
357        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
358        let feature_flags = input_pipeline.feature_flags.clone();
359        let incoming = incoming.clone();
360        // This intentionally uses the [`fuchsia_async`] task dispatcher instead of
361        // [`crate::Dispatcher`] -- the directory watcher always uses the fuchsia-async dispatcher.
362        // This is fine for performance because the actual event dispatch is still configured to
363        // run on [`crate::Dispatcher`].
364        fasync::Task::local(async move {
365            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
366            // that send InputEvents to `input_event_receiver`.
367            match async {
368                let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
369                incoming.as_ref_directory().open(
370                    input_device::INPUT_REPORT_PATH,
371                    fio::PERM_READABLE,
372                    server.into()
373                )
374                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
375                let device_watcher =
376                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
377                Self::watch_for_devices(
378                    device_watcher,
379                    dir_proxy,
380                    input_device_types,
381                    input_event_sender,
382                    input_device_bindings,
383                    &devices_node,
384                    false, /* break_on_idle */
385                    feature_flags,
386                    metrics_logger.clone(),
387                )
388                .await
389                .context("failed to watch for devices")
390            }
391            .await
392            {
393                Ok(()) => {}
394                Err(err) => {
395                    // This error is usually benign in tests: it means that the setup does not
396                    // support dynamic device discovery. Almost no tests support dynamic
397                    // device discovery, and they also do not need those.
398                    metrics_logger.log_warn(
399                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
400                        std::format!(
401                            "Input pipeline is unable to watch for new input devices: {:?}",
402                            err
403                        ));
404                }
405            }
406        }).detach();
407
408        Ok(input_pipeline)
409    }
410
411    /// Gets the input device bindings.
412    pub fn input_device_bindings(&self) -> &InputDeviceBindingMap {
413        &self.input_device_bindings
414    }
415
416    /// Gets the input device sender: this is the channel that should be cloned
417    /// and used for injecting events from the drivers into the input pipeline.
418    pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
419        &self.device_event_sender
420    }
421
422    /// Gets a list of input device types supported by this input pipeline.
423    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
424        &self.input_device_types
425    }
426
427    /// Forwards all input events into the input pipeline.
428    pub async fn handle_input_events(mut self) {
429        let metrics_logger_clone = self.metrics_logger.clone();
430        while let Some(input_event) = self.device_event_receiver.next().await {
431            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
432                metrics_logger_clone.log_error(
433                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
434                    std::format!("could not forward event from driver: {:?}", e));
435            }
436        }
437
438        metrics_logger_clone.log_error(
439            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
440            "Input pipeline stopped handling input events.".to_string(),
441        );
442    }
443
444    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
445    /// if new devices match a type in `device_types`.
446    ///
447    /// # Parameters
448    /// - `device_watcher`: Watches the input report directory for new devices.
449    /// - `dir_proxy`: The directory containing InputDevice connections.
450    /// - `device_types`: The types of devices to watch for.
451    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
452    /// - `bindings`: Holds all the InputDeviceBindings
453    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
454    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
455    /// - `metrics_logger`: The metrics logger.
456    ///
457    /// # Errors
458    /// If the input report directory or a file within it cannot be read.
459    async fn watch_for_devices(
460        mut device_watcher: Watcher,
461        dir_proxy: fio::DirectoryProxy,
462        device_types: Vec<input_device::InputDeviceType>,
463        input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
464        bindings: InputDeviceBindingMap,
465        input_devices_node: &fuchsia_inspect::Node,
466        break_on_idle: bool,
467        feature_flags: input_device::InputPipelineFeatureFlags,
468        metrics_logger: metrics::MetricsLogger,
469    ) -> Result<(), Error> {
470        // Add non-static properties to inspect node.
471        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
472        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
473        while let Some(msg) = device_watcher.try_next().await? {
474            if let Ok(filename) = msg.filename.into_os_string().into_string() {
475                if filename == "." {
476                    continue;
477                }
478
479                let pathbuf = PathBuf::from(filename.clone());
480                match msg.event {
481                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
482                        log::info!("found input device {}", filename);
483                        devices_discovered.add(1);
484                        let device_proxy =
485                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
486                        add_device_bindings(
487                            &device_types,
488                            &filename,
489                            device_proxy,
490                            &input_event_sender,
491                            &bindings,
492                            get_next_device_id(),
493                            input_devices_node,
494                            Some(&devices_connected),
495                            feature_flags.clone(),
496                            metrics_logger.clone(),
497                        )
498                        .await;
499                    }
500                    WatchEvent::IDLE => {
501                        if break_on_idle {
502                            break;
503                        }
504                    }
505                    _ => (),
506                }
507            }
508        }
509        // Ensure inspect properties persist for debugging if device watch loop ends.
510        input_devices_node.record(devices_discovered);
511        input_devices_node.record(devices_connected);
512        Err(format_err!("Input pipeline stopped watching for new input devices."))
513    }
514
515    /// Handles the incoming InputDeviceRegistryRequestStream.
516    ///
517    /// This method will end when the request stream is closed. If the stream closes with an
518    /// error the error will be returned in the Result.
519    ///
520    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
521    ///
522    /// # Parameters
523    /// - `stream`: The stream of InputDeviceRegistryRequests.
524    /// - `device_types`: The types of devices to watch for.
525    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
526    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
527    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
528    /// - `metrics_logger`: The metrics logger.
529    pub async fn handle_input_device_registry_request_stream(
530        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
531        device_types: &Vec<input_device::InputDeviceType>,
532        input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
533        bindings: &InputDeviceBindingMap,
534        input_devices_node: &fuchsia_inspect::Node,
535        feature_flags: input_device::InputPipelineFeatureFlags,
536        metrics_logger: metrics::MetricsLogger,
537    ) -> Result<(), Error> {
538        while let Some(request) = stream
539            .try_next()
540            .await
541            .context("Error handling input device registry request stream")?
542        {
543            match request {
544                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
545                    device,
546                    ..
547                } => {
548                    // Add a binding if the device is a type being tracked
549                    let device = fidl_next::ClientEnd::<
550                        fidl_next_fuchsia_input_report::InputDevice,
551                        zx::Channel,
552                    >::from_untyped(device.into_channel());
553                    let device = Dispatcher::client_from_zx_channel(device);
554                    let device = device.spawn();
555                    let device_id = get_next_device_id();
556
557                    add_device_bindings(
558                        device_types,
559                        &format!("input-device-registry-{}", device_id),
560                        device,
561                        input_event_sender,
562                        bindings,
563                        device_id,
564                        input_devices_node,
565                        None,
566                        feature_flags.clone(),
567                        metrics_logger.clone(),
568                    )
569                    .await;
570                }
571                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
572                    device,
573                    responder,
574                    .. } => {
575                    // Add a binding if the device is a type being tracked
576                    let device = fidl_next::ClientEnd::<
577                        fidl_next_fuchsia_input_report::InputDevice,
578                        zx::Channel,
579                    >::from_untyped(device.into_channel());
580                    let device = Dispatcher::client_from_zx_channel(device);
581                    let device = device.spawn();
582                    let device_id = get_next_device_id();
583
584                    add_device_bindings(
585                        device_types,
586                        &format!("input-device-registry-{}", device_id),
587                        device,
588                        input_event_sender,
589                        bindings,
590                        device_id,
591                        input_devices_node,
592                        None,
593                        feature_flags.clone(),
594                        metrics_logger.clone(),
595                    )
596                    .await;
597
598                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
599                        device_id: Some(device_id),
600                        ..Default::default()
601                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
602                }
603            }
604        }
605
606        Ok(())
607    }
608
609    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
610    fn run(
611        mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
612        handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
613        metrics_logger: metrics::MetricsLogger,
614    ) {
615        Dispatcher::spawn_local(async move {
616            for handler in &handlers {
617                handler.clone().set_handler_healthy();
618            }
619
620            use input_device::InputEventType;
621
622            let mut handlers_by_type: [Vec<Rc<dyn input_handler::BatchInputHandler>>; InputEventType::COUNT] = Default::default();
623
624            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
625            let event_types = vec![
626                InputEventType::Keyboard,
627                InputEventType::LightSensor,
628                InputEventType::ConsumerControls,
629                InputEventType::Mouse,
630                InputEventType::TouchScreen,
631                InputEventType::Touchpad,
632                #[cfg(test)]
633                InputEventType::Fake,
634            ];
635
636            for event_type in event_types {
637                let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
638                    .iter()
639                    .filter(|h| h.interest().contains(&event_type))
640                    .cloned()
641                    .collect();
642                handlers_by_type[event_type as usize] = handlers_for_type;
643            }
644
645            while let Some(events) = receiver.next().await {
646                if events.is_empty() {
647                    continue;
648                }
649
650                let mut groups_seen = 0;
651                let events = events
652                    .into_iter()
653                    .chunk_by(|e| InputEventType::from(&e.device_event));
654                let events = events.into_iter().map(|(k, v)| (k, v.collect::<Vec<_>>()));
655                for (event_type, event_group) in events {
656                    groups_seen += 1;
657                    if groups_seen == 2 {
658                        metrics_logger.log_error(
659                                InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
660                                "it is not recommended to contain multiple types of events in 1 send".to_string(),
661                            );
662                    }
663                    let mut events_in_group = event_group;
664
665                    // Get pre-computed handlers for this event type.
666                    let handlers = &handlers_by_type[event_type as usize];
667
668                    for handler in handlers {
669                        events_in_group =
670                            handler.clone().handle_input_events(events_in_group).await;
671                    }
672
673                    for event in events_in_group {
674                        if event.handled == input_device::Handled::No {
675                            log::warn!("unhandled input event: {:?}", event);
676                        }
677                        if let Some(trace_id) = event.trace_id {
678                            fuchsia_trace::flow_end!(
679                                "input",
680                                "event_in_input_pipeline",
681                                trace_id.into()
682                            );
683                        }
684                    }
685                }
686            }
687            for handler in &handlers {
688                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
689            }
690            panic!("Runner task is not supposed to terminate.")
691        }).detach();
692    }
693}
694
695/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
696///
697/// # Parameters
698/// - `device_types`: The types of devices to watch for.
699/// - `device_proxy`: A proxy to the input device.
700/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
701/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
702/// - `device_id`: The device id of the associated bindings.
703/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
704///
705/// # Note
706/// This will create multiple bindings, in the case where
707/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
708///   with multiple table fields populated, and
709/// * multiple populated table fields correspond to device types present in `device_types`
710///
711/// This is used, for example, to support the Atlas touchpad. In that case, a single
712/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
713/// a `fuchsia.input.report.TouchDescriptor`.
714async fn add_device_bindings(
715    device_types: &Vec<input_device::InputDeviceType>,
716    filename: &String,
717    device_proxy: fidl_next::Client<fidl_next_fuchsia_input_report::InputDevice, Transport>,
718    input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
719    bindings: &InputDeviceBindingMap,
720    device_id: u32,
721    input_devices_node: &fuchsia_inspect::Node,
722    devices_connected: Option<&fuchsia_inspect::UintProperty>,
723    feature_flags: InputPipelineFeatureFlags,
724    metrics_logger: metrics::MetricsLogger,
725) {
726    let mut matched_device_types = vec![];
727    if let Ok(res) = device_proxy.get_descriptor().await {
728        for device_type in device_types {
729            if input_device::is_device_type(&res.descriptor, *device_type).await {
730                matched_device_types.push(device_type);
731                match devices_connected {
732                    Some(dev_connected) => {
733                        let _ = dev_connected.add(1);
734                    }
735                    None => (),
736                };
737            }
738        }
739        if matched_device_types.is_empty() {
740            log::info!(
741                "device {} did not match any supported device types: {:?}",
742                filename,
743                device_types
744            );
745            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
746            let mut health = fuchsia_inspect::health::Node::new(&device_node);
747            health.set_unhealthy("Unsupported device type.");
748            device_node.record(health);
749            input_devices_node.record(device_node);
750            return;
751        }
752    } else {
753        metrics_logger.clone().log_error(
754            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
755            std::format!("cannot bind device {} without a device descriptor", filename),
756        );
757        return;
758    }
759
760    log::info!(
761        "binding {} to device types: {}",
762        filename,
763        matched_device_types
764            .iter()
765            .fold(String::new(), |device_types_string, device_type| device_types_string
766                + &format!("{:?}, ", device_type))
767    );
768
769    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
770    for device_type in matched_device_types {
771        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
772        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
773        //
774        // There's no conflict in having multiple bindings read from the same node,
775        // since:
776        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
777        // * the device driver will copy each incoming report to each connected reader.
778        //
779        // This does mean that reports from the Atlas touchpad device get read twice
780        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
781        // is operating in mouse mode or touchpad mode.
782        //
783        // This hasn't been an issue because:
784        // * Semantically: things are fine, because each binding discards irrelevant reports.
785        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
786        // * Performance wise: things are fine, because the data rate of the touchpad is low
787        //   (125 HZ).
788        //
789        // If we add additional cases where bindings share an underlying `input-report` node,
790        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
791        let proxy = device_proxy.clone();
792        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
793        match input_device::get_device_binding(
794            *device_type,
795            proxy,
796            device_id,
797            input_event_sender.clone(),
798            device_node,
799            feature_flags.clone(),
800            metrics_logger.clone(),
801        )
802        .await
803        {
804            Ok(binding) => new_bindings.push(binding),
805            Err(e) => {
806                metrics_logger.log_error(
807                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
808                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
809                );
810            }
811        }
812    }
813
814    if !new_bindings.is_empty() {
815        let mut bindings = bindings.lock().await;
816        if let Some(v) = bindings.get_mut(&device_id) {
817            v.extend(new_bindings);
818        } else {
819            bindings.insert(device_id, new_bindings);
820        }
821    }
822}
823
824#[cfg(test)]
825mod tests {
826    use super::*;
827    use crate::input_device::{InputDeviceBinding, InputEventType};
828    use crate::utils::Position;
829    use crate::{fake_input_device_binding, mouse_binding, observe_fake_events_input_handler};
830    use async_trait::async_trait;
831    use diagnostics_assertions::AnyProperty;
832    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
833    use fuchsia_async as fasync;
834    use futures::FutureExt;
835    use pretty_assertions::assert_eq;
836    use rand::Rng;
837    use sorted_vec_map::SortedVecSet;
838    use vfs::{pseudo_directory, service as pseudo_fs_service};
839
840    /// Returns the InputEvent sent over `sender`.
841    ///
842    /// # Parameters
843    /// - `sender`: The channel to send the InputEvent over.
844    fn send_input_event(
845        sender: UnboundedSender<Vec<input_device::InputEvent>>,
846    ) -> Vec<input_device::InputEvent> {
847        let mut rng = rand::rng();
848        let offset =
849            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
850        let input_event = input_device::InputEvent {
851            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
852                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
853                    counts: Position { x: offset.x, y: offset.y },
854                }),
855                None, /* wheel_delta_v */
856                None, /* wheel_delta_h */
857                mouse_binding::MousePhase::Move,
858                SortedVecSet::new(),
859                SortedVecSet::new(),
860                None, /* is_precision_scroll */
861                None, /* wake_lease */
862            )),
863            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
864                mouse_binding::MouseDeviceDescriptor {
865                    device_id: 1,
866                    absolute_x_range: None,
867                    absolute_y_range: None,
868                    wheel_v_range: None,
869                    wheel_h_range: None,
870                    buttons: None,
871                },
872            ),
873            event_time: zx::MonotonicInstant::get(),
874            handled: input_device::Handled::No,
875            trace_id: None,
876        };
877        match sender.unbounded_send(vec![input_event.clone()]) {
878            Err(_) => assert!(false),
879            _ => {}
880        }
881
882        vec![input_event]
883    }
884
885    /// Returns a MouseDescriptor on an InputDeviceRequest.
886    ///
887    /// # Parameters
888    /// - `input_device_request`: The request to handle.
889    fn handle_input_device_request(
890        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
891    ) {
892        match input_device_request {
893            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
894                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
895                    device_information: None,
896                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
897                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
898                            movement_x: None,
899                            movement_y: None,
900                            scroll_v: None,
901                            scroll_h: None,
902                            buttons: Some(vec![0]),
903                            position_x: None,
904                            position_y: None,
905                            ..Default::default()
906                        }),
907                        ..Default::default()
908                    }),
909                    sensor: None,
910                    touch: None,
911                    keyboard: None,
912                    consumer_control: None,
913                    ..Default::default()
914                });
915            }
916            _ => {}
917        }
918    }
919
920    /// Tests that an input pipeline handles events from multiple devices.
921    #[fasync::run_singlethreaded(test)]
922    async fn multiple_devices_single_handler() {
923        // Create two fake device bindings.
924        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
925        let first_device_binding =
926            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
927        let second_device_binding =
928            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
929
930        // Create a fake input handler.
931        let (handler_event_sender, mut handler_event_receiver) =
932            futures::channel::mpsc::channel(100);
933        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
934            handler_event_sender,
935        );
936
937        // Build the input pipeline.
938        let (sender, receiver, handlers, _, _, _) =
939            InputPipelineAssembly::new(metrics::MetricsLogger::default())
940                .add_handler(input_handler)
941                .into_components();
942        let inspector = fuchsia_inspect::Inspector::default();
943        let test_node = inspector.root().create_child("input_pipeline");
944        let input_pipeline = InputPipeline {
945            pipeline_sender: sender,
946            device_event_sender,
947            device_event_receiver,
948            input_device_types: vec![],
949            input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
950            inspect_node: test_node,
951            metrics_logger: metrics::MetricsLogger::default(),
952            feature_flags: input_device::InputPipelineFeatureFlags::default(),
953        };
954        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
955
956        // Send an input event from each device.
957        let first_device_events = send_input_event(first_device_binding.input_event_sender());
958        let second_device_events = send_input_event(second_device_binding.input_event_sender());
959
960        // Run the pipeline.
961        fasync::Task::local(async {
962            input_pipeline.handle_input_events().await;
963        })
964        .detach();
965
966        // Assert the handler receives the events.
967        let first_handled_event = handler_event_receiver.next().await;
968        assert_eq!(first_handled_event, first_device_events.into_iter().next());
969
970        let second_handled_event = handler_event_receiver.next().await;
971        assert_eq!(second_handled_event, second_device_events.into_iter().next());
972    }
973
974    /// Tests that an input pipeline handles events through multiple input handlers.
975    #[fasync::run_singlethreaded(test)]
976    async fn single_device_multiple_handlers() {
977        // Create two fake device bindings.
978        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
979        let input_device_binding =
980            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
981
982        // Create two fake input handlers.
983        let (first_handler_event_sender, mut first_handler_event_receiver) =
984            futures::channel::mpsc::channel(100);
985        let first_input_handler =
986            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
987                first_handler_event_sender,
988            );
989        let (second_handler_event_sender, mut second_handler_event_receiver) =
990            futures::channel::mpsc::channel(100);
991        let second_input_handler =
992            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
993                second_handler_event_sender,
994            );
995
996        // Build the input pipeline.
997        let (sender, receiver, handlers, _, _, _) =
998            InputPipelineAssembly::new(metrics::MetricsLogger::default())
999                .add_handler(first_input_handler)
1000                .add_handler(second_input_handler)
1001                .into_components();
1002        let inspector = fuchsia_inspect::Inspector::default();
1003        let test_node = inspector.root().create_child("input_pipeline");
1004        let input_pipeline = InputPipeline {
1005            pipeline_sender: sender,
1006            device_event_sender,
1007            device_event_receiver,
1008            input_device_types: vec![],
1009            input_device_bindings: Arc::new(Mutex::new(SortedVecMap::new())),
1010            inspect_node: test_node,
1011            metrics_logger: metrics::MetricsLogger::default(),
1012            feature_flags: input_device::InputPipelineFeatureFlags::default(),
1013        };
1014        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1015
1016        // Send an input event.
1017        let input_events = send_input_event(input_device_binding.input_event_sender());
1018
1019        // Run the pipeline.
1020        fasync::Task::local(async {
1021            input_pipeline.handle_input_events().await;
1022        })
1023        .detach();
1024
1025        // Assert both handlers receive the event.
1026        let expected_event = input_events.into_iter().next();
1027        let first_handler_event = first_handler_event_receiver.next().await;
1028        assert_eq!(first_handler_event, expected_event);
1029        let second_handler_event = second_handler_event_receiver.next().await;
1030        assert_eq!(second_handler_event, expected_event);
1031    }
1032
1033    /// Tests that a single mouse device binding is created for the one input device in the
1034    /// input report directory.
1035    #[fasync::run_singlethreaded(test)]
1036    async fn watch_devices_one_match_exists() {
1037        // Create a file in a pseudo directory that represents an input device.
1038        let mut count: i8 = 0;
1039        let dir = pseudo_directory! {
1040            "file_name" => pseudo_fs_service::host(
1041                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1042                    async move {
1043                        while count < 3 {
1044                            if let Some(input_device_request) =
1045                                request_stream.try_next().await.unwrap()
1046                            {
1047                                handle_input_device_request(input_device_request);
1048                                count += 1;
1049                            }
1050                        }
1051
1052                    }.boxed()
1053                },
1054            )
1055        };
1056
1057        // Create a Watcher on the pseudo directory.
1058        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1059        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1060        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1061        // proxy to get connections to input devices.
1062        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1063
1064        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1065        let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1066        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1067
1068        let inspector = fuchsia_inspect::Inspector::default();
1069        let test_node = inspector.root().create_child("input_pipeline");
1070        test_node.record_string(
1071            "supported_input_devices",
1072            supported_device_types.clone().iter().join(", "),
1073        );
1074        let input_devices = test_node.create_child("input_devices");
1075        // Assert that inspect tree is initialized with no devices.
1076        diagnostics_assertions::assert_data_tree!(inspector, root: {
1077            input_pipeline: {
1078                supported_input_devices: "Mouse",
1079                input_devices: {}
1080            }
1081        });
1082
1083        let _ = InputPipeline::watch_for_devices(
1084            device_watcher,
1085            dir_proxy_for_pipeline,
1086            supported_device_types,
1087            input_event_sender,
1088            bindings.clone(),
1089            &input_devices,
1090            true, /* break_on_idle */
1091            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1092            metrics::MetricsLogger::default(),
1093        )
1094        .await;
1095
1096        // Assert that one mouse device with accurate device id was found.
1097        let bindings_map = bindings.lock().await;
1098        assert_eq!(bindings_map.len(), 1);
1099        let bindings_vector = bindings_map.get(&10);
1100        assert!(bindings_vector.is_some());
1101        assert_eq!(bindings_vector.unwrap().len(), 1);
1102        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1103        assert!(boxed_mouse_binding.is_some());
1104        assert_eq!(
1105            boxed_mouse_binding.unwrap().get_device_descriptor(),
1106            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1107                device_id: 10,
1108                absolute_x_range: None,
1109                absolute_y_range: None,
1110                wheel_v_range: None,
1111                wheel_h_range: None,
1112                buttons: Some(vec![0]),
1113            })
1114        );
1115
1116        // Assert that inspect tree reflects new device discovered and connected.
1117        diagnostics_assertions::assert_data_tree!(inspector, root: {
1118            input_pipeline: {
1119                supported_input_devices: "Mouse",
1120                input_devices: {
1121                    devices_discovered: 1u64,
1122                    devices_connected: 1u64,
1123                    "file_name_Mouse": contains {
1124                        reports_received_count: 0u64,
1125                        reports_filtered_count: 0u64,
1126                        events_generated: 0u64,
1127                        last_received_timestamp_ns: 0u64,
1128                        last_generated_timestamp_ns: 0u64,
1129                        "fuchsia.inspect.Health": {
1130                            status: "OK",
1131                            // Timestamp value is unpredictable and not relevant in this context,
1132                            // so we only assert that the property is present.
1133                            start_timestamp_nanos: AnyProperty
1134                        },
1135                    }
1136                }
1137            }
1138        });
1139    }
1140
1141    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1142    /// but only a mouse exists.
1143    #[fasync::run_singlethreaded(test)]
1144    async fn watch_devices_no_matches_exist() {
1145        // Create a file in a pseudo directory that represents an input device.
1146        let mut count: i8 = 0;
1147        let dir = pseudo_directory! {
1148            "file_name" => pseudo_fs_service::host(
1149                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1150                    async move {
1151                        while count < 1 {
1152                            if let Some(input_device_request) =
1153                                request_stream.try_next().await.unwrap()
1154                            {
1155                                handle_input_device_request(input_device_request);
1156                                count += 1;
1157                            }
1158                        }
1159
1160                    }.boxed()
1161                },
1162            )
1163        };
1164
1165        // Create a Watcher on the pseudo directory.
1166        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1167        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1168        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1169        // proxy to get connections to input devices.
1170        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1171
1172        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1173        let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1174        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1175
1176        let inspector = fuchsia_inspect::Inspector::default();
1177        let test_node = inspector.root().create_child("input_pipeline");
1178        test_node.record_string(
1179            "supported_input_devices",
1180            supported_device_types.clone().iter().join(", "),
1181        );
1182        let input_devices = test_node.create_child("input_devices");
1183        // Assert that inspect tree is initialized with no devices.
1184        diagnostics_assertions::assert_data_tree!(inspector, root: {
1185            input_pipeline: {
1186                supported_input_devices: "Keyboard",
1187                input_devices: {}
1188            }
1189        });
1190
1191        let _ = InputPipeline::watch_for_devices(
1192            device_watcher,
1193            dir_proxy_for_pipeline,
1194            supported_device_types,
1195            input_event_sender,
1196            bindings.clone(),
1197            &input_devices,
1198            true, /* break_on_idle */
1199            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1200            metrics::MetricsLogger::default(),
1201        )
1202        .await;
1203
1204        // Assert that no devices were found.
1205        let bindings = bindings.lock().await;
1206        assert_eq!(bindings.len(), 0);
1207
1208        // Assert that inspect tree reflects new device discovered, but not connected.
1209        diagnostics_assertions::assert_data_tree!(inspector, root: {
1210            input_pipeline: {
1211                supported_input_devices: "Keyboard",
1212                input_devices: {
1213                    devices_discovered: 1u64,
1214                    devices_connected: 0u64,
1215                    "file_name_Unsupported": {
1216                        "fuchsia.inspect.Health": {
1217                            status: "UNHEALTHY",
1218                            message: "Unsupported device type.",
1219                            // Timestamp value is unpredictable and not relevant in this context,
1220                            // so we only assert that the property is present.
1221                            start_timestamp_nanos: AnyProperty
1222                        },
1223                    }
1224                }
1225            }
1226        });
1227    }
1228
1229    /// Tests that a single keyboard device binding is created for the input device registered
1230    /// through InputDeviceRegistry.
1231    #[fasync::run_singlethreaded(test)]
1232    async fn handle_input_device_registry_request_stream() {
1233        let (input_device_registry_proxy, input_device_registry_request_stream) =
1234            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1235        let (input_device_client_end, mut input_device_request_stream) =
1236            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1237
1238        let device_types = vec![input_device::InputDeviceType::Mouse];
1239        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1240        let bindings: InputDeviceBindingMap = Arc::new(Mutex::new(SortedVecMap::new()));
1241
1242        // Handle input device requests.
1243        let mut count: i8 = 0;
1244        fasync::Task::local(async move {
1245            // Register a device.
1246            let _ = input_device_registry_proxy.register(input_device_client_end);
1247
1248            while count < 3 {
1249                if let Some(input_device_request) =
1250                    input_device_request_stream.try_next().await.unwrap()
1251                {
1252                    handle_input_device_request(input_device_request);
1253                    count += 1;
1254                }
1255            }
1256
1257            // End handle_input_device_registry_request_stream() by taking the event stream.
1258            input_device_registry_proxy.take_event_stream();
1259        })
1260        .detach();
1261
1262        let inspector = fuchsia_inspect::Inspector::default();
1263        let test_node = inspector.root().create_child("input_pipeline");
1264
1265        // Start listening for InputDeviceRegistryRequests.
1266        let bindings_clone = bindings.clone();
1267        let _ = InputPipeline::handle_input_device_registry_request_stream(
1268            input_device_registry_request_stream,
1269            &device_types,
1270            &input_event_sender,
1271            &bindings_clone,
1272            &test_node,
1273            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1274            metrics::MetricsLogger::default(),
1275        )
1276        .await;
1277
1278        // Assert that a device was registered.
1279        let bindings = bindings.lock().await;
1280        assert_eq!(bindings.len(), 1);
1281    }
1282
1283    // Tests that correct properties are added to inspect node when InputPipeline is created.
1284    #[fasync::run_singlethreaded(test)]
1285    async fn check_inspect_node_has_correct_properties() {
1286        let device_types = vec![
1287            input_device::InputDeviceType::Touch,
1288            input_device::InputDeviceType::ConsumerControls,
1289        ];
1290        let inspector = fuchsia_inspect::Inspector::default();
1291        let test_node = inspector.root().create_child("input_pipeline");
1292        // Create fake input handler for assembly
1293        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1294            futures::channel::mpsc::channel(100);
1295        let fake_input_handler =
1296            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1297                fake_handler_event_sender,
1298            );
1299        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1300            .add_handler(fake_input_handler);
1301        let _test_input_pipeline = InputPipeline::new(
1302            &Incoming::new(),
1303            device_types,
1304            assembly,
1305            test_node,
1306            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1307            metrics::MetricsLogger::default(),
1308        );
1309        diagnostics_assertions::assert_data_tree!(inspector, root: {
1310            input_pipeline: {
1311                supported_input_devices: "Touch, ConsumerControls",
1312                handlers_registered: 1u64,
1313                handlers_healthy: 1u64,
1314                input_devices: {}
1315            }
1316        });
1317    }
1318
1319    struct SpecificInterestFakeHandler {
1320        interest_types: Vec<input_device::InputEventType>,
1321        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1322    }
1323
1324    impl SpecificInterestFakeHandler {
1325        pub fn new(
1326            interest_types: Vec<input_device::InputEventType>,
1327            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1328        ) -> Rc<Self> {
1329            Rc::new(SpecificInterestFakeHandler {
1330                interest_types,
1331                event_sender: std::cell::RefCell::new(event_sender),
1332            })
1333        }
1334    }
1335
1336    impl Handler for SpecificInterestFakeHandler {
1337        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1338        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1339        fn get_name(&self) -> &'static str {
1340            "SpecificInterestFakeHandler"
1341        }
1342
1343        fn interest(&self) -> Vec<input_device::InputEventType> {
1344            self.interest_types.clone()
1345        }
1346    }
1347
1348    #[async_trait(?Send)]
1349    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1350        async fn handle_input_event(
1351            self: Rc<Self>,
1352            input_event: input_device::InputEvent,
1353        ) -> Vec<input_device::InputEvent> {
1354            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1355                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1356                Ok(_) => {}
1357            }
1358            vec![input_event]
1359        }
1360    }
1361
1362    #[fasync::run_singlethreaded(test)]
1363    async fn run_only_sends_events_to_interested_handlers() {
1364        // Mouse Handler (Specific Interest: Mouse)
1365        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1366        let mouse_handler =
1367            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1368
1369        // Fake Handler (Specific Interest: Fake)
1370        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1371        let fake_handler =
1372            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1373
1374        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1375            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1376                .add_handler(mouse_handler)
1377                .add_handler(fake_handler)
1378                .into_components();
1379
1380        // Run the pipeline logic
1381        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1382
1383        // Create a Fake event
1384        let fake_event = input_device::InputEvent {
1385            device_event: input_device::InputDeviceEvent::Fake,
1386            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1387            event_time: zx::MonotonicInstant::get(),
1388            handled: input_device::Handled::No,
1389            trace_id: None,
1390        };
1391
1392        // Send the Fake event
1393        pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1394
1395        // Verify Fake Handler received it
1396        let received_by_fake = fake_receiver.next().await;
1397        assert_eq!(received_by_fake, Some(fake_event));
1398
1399        // Verify Mouse Handler did NOT receive it
1400        assert!(mouse_receiver.try_next().is_err());
1401    }
1402
1403    fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1404        input_device::InputEvent {
1405            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1406                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1407                    counts: Position { x, y },
1408                }),
1409                None,
1410                None,
1411                mouse_binding::MousePhase::Move,
1412                SortedVecSet::new(),
1413                SortedVecSet::new(),
1414                None,
1415                None,
1416            )),
1417            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1418                mouse_binding::MouseDeviceDescriptor {
1419                    device_id: 1,
1420                    absolute_x_range: None,
1421                    absolute_y_range: None,
1422                    wheel_v_range: None,
1423                    wheel_h_range: None,
1424                    buttons: None,
1425                },
1426            ),
1427            event_time: zx::MonotonicInstant::get(),
1428            handled: input_device::Handled::No,
1429            trace_id: None,
1430        }
1431    }
1432
1433    #[fasync::run_singlethreaded(test)]
1434    async fn run_mixed_event_types_dispatched_correctly() {
1435        // Mouse Handler (Specific Interest: Mouse)
1436        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1437        let mouse_handler =
1438            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1439
1440        // Fake Handler (Specific Interest: Fake)
1441        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1442        let fake_handler =
1443            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1444
1445        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1446            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1447                .add_handler(mouse_handler)
1448                .add_handler(fake_handler)
1449                .into_components();
1450
1451        // Run the pipeline logic
1452        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1453
1454        // Create events
1455        let mouse_event_1 = create_mouse_event(1.0, 1.0);
1456        let mouse_event_2 = create_mouse_event(2.0, 2.0);
1457        let mouse_event_3 = create_mouse_event(3.0, 3.0);
1458
1459        let fake_event_1 = input_device::InputEvent {
1460            device_event: input_device::InputDeviceEvent::Fake,
1461            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1462            event_time: zx::MonotonicInstant::get(),
1463            handled: input_device::Handled::No,
1464            trace_id: None,
1465        };
1466
1467        // Send mixed batch: [Mouse, Mouse, Fake, Mouse]
1468        // This should result in 3 chunks: [Mouse, Mouse], [Fake], [Mouse]
1469        let mixed_batch = vec![
1470            mouse_event_1.clone(),
1471            mouse_event_2.clone(),
1472            fake_event_1.clone(),
1473            mouse_event_3.clone(),
1474        ];
1475        pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1476
1477        // Verify Mouse Handler received M1, M2, and then M3
1478        assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1479        assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1480        assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1481
1482        // Verify Fake Handler received F1
1483        assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1484    }
1485}