Skip to main content

input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_device::InputPipelineFeatureFlags;
8use crate::input_handler::Handler;
9use crate::{Dispatcher, Incoming, Transport, input_device, input_handler, metrics};
10use anyhow::{Context, Error, format_err};
11use fidl::endpoints;
12use fidl_fuchsia_io as fio;
13use focus_chain_provider::FocusChainProviderPublisher;
14use fuchsia_async as fasync;
15use fuchsia_component::directory::AsRefDirectory;
16use fuchsia_fs::directory::{WatchEvent, Watcher};
17use fuchsia_inspect::NumericProperty;
18use fuchsia_inspect::health::Reporter;
19use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
20use futures::future::LocalBoxFuture;
21use futures::lock::Mutex;
22use futures::{StreamExt, TryStreamExt};
23use itertools::Itertools;
24use metrics_registry::*;
25use std::collections::HashMap;
26use std::path::PathBuf;
27use std::rc::Rc;
28use std::sync::atomic::{AtomicU32, Ordering};
29use std::sync::{Arc, LazyLock};
30use strum::EnumCount;
31
32/// Use a self incremental u32 unique id for device_id.
33///
34/// device id start from 10 to avoid conflict with default devices in Starnix.
35/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
36/// use default devices to deliver events from physical devices until we have
37/// API to expose device changes to UI clients.
38static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
39
40/// Each time this function is invoked, it returns the current value of its
41/// internal counter (serving as a unique id for device_id) and then increments
42/// that counter in preparation for the next call.
43fn get_next_device_id() -> u32 {
44    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
45}
46
47type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
48
49/// An [`InputDeviceBindingHashMap`] maps an input device to one or more InputDeviceBindings.
50/// It uses unique device id as key.
51pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
52
53/// An input pipeline assembly.
54///
55/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
56/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
57/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
58/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
59///
60/// # Implementation notes
61///
62/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
63/// handlers are connected together using async queues.  This allows fully streamed processing of
64/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
65/// without an external stimulus.
66pub struct InputPipelineAssembly {
67    /// The top-level sender: send into this queue to inject an event into the input
68    /// pipeline.
69    sender: UnboundedSender<Vec<input_device::InputEvent>>,
70    /// The bottom-level receiver: any events that fall through the entire pipeline can
71    /// be read from this receiver.
72    receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
73
74    /// The input handlers that comprise the input pipeline.
75    handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
76
77    /// The display ownership watcher task.
78    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
79
80    /// The focus listener task.
81    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
82
83    /// The metrics logger.
84    metrics_logger: metrics::MetricsLogger,
85}
86
87impl InputPipelineAssembly {
88    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
89    /// to add new handlers to it.
90    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
91        let (sender, receiver) = mpsc::unbounded();
92        InputPipelineAssembly {
93            sender,
94            receiver,
95            handlers: vec![],
96            metrics_logger,
97            display_ownership_fut: None,
98            focus_listener_fut: None,
99        }
100    }
101
102    /// Adds another [input_handler::BatchInputHandler] into the [InputPipelineAssembly]. The handlers
103    /// are invoked in the order they are added. Returns `Self` for chaining.
104    pub fn add_handler(mut self, handler: Rc<dyn input_handler::BatchInputHandler>) -> Self {
105        self.handlers.push(handler);
106        self
107    }
108
109    /// Adds all handlers into the assembly in the order they appear in `handlers`.
110    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>) -> Self {
111        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
112    }
113
114    pub fn add_display_ownership(
115        mut self,
116        display_ownership_event: zx::Event,
117        input_handlers_node: &fuchsia_inspect::Node,
118    ) -> InputPipelineAssembly {
119        let h = DisplayOwnership::new(
120            display_ownership_event,
121            input_handlers_node,
122            self.metrics_logger.clone(),
123        );
124        let metrics_logger_clone = self.metrics_logger.clone();
125        let h_clone = h.clone();
126        let sender_clone = self.sender.clone();
127        let display_ownership_fut = Box::pin(async move {
128            h_clone.clone().set_handler_healthy();
129            h_clone.clone()
130                .handle_ownership_change(sender_clone)
131                .await
132                .map_err(|e| {
133                    metrics_logger_clone.log_error(
134                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
135                        std::format!(
136                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
137                        })
138                        .unwrap();
139            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
140        });
141        self.display_ownership_fut = Some(display_ownership_fut);
142        self.add_handler(h)
143    }
144
145    /// Deconstructs the assembly into constituent components, used when constructing
146    /// [InputPipeline].
147    ///
148    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
149    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
150    fn into_components(
151        self,
152    ) -> (
153        UnboundedSender<Vec<input_device::InputEvent>>,
154        UnboundedReceiver<Vec<input_device::InputEvent>>,
155        Vec<Rc<dyn input_handler::BatchInputHandler>>,
156        metrics::MetricsLogger,
157        Option<LocalBoxFuture<'static, ()>>,
158        Option<LocalBoxFuture<'static, ()>>,
159    ) {
160        (
161            self.sender,
162            self.receiver,
163            self.handlers,
164            self.metrics_logger,
165            self.display_ownership_fut,
166            self.focus_listener_fut,
167        )
168    }
169
170    pub fn add_focus_listener(
171        mut self,
172        incoming: &Incoming,
173        focus_chain_publisher: FocusChainProviderPublisher,
174    ) -> Self {
175        let metrics_logger_clone = self.metrics_logger.clone();
176        let incoming2 = incoming.clone();
177        let focus_listener_fut = Box::pin(async move {
178            if let Ok(mut focus_listener) = FocusListener::new(
179                &incoming2,
180                focus_chain_publisher,
181                metrics_logger_clone,
182            )
183            .map_err(|e| {
184                log::warn!("could not create focus listener, focus will not be dispatched: {:?}", e)
185            }) {
186                // This will await indefinitely and process focus messages in a loop, unless there
187                // is a problem.
188                let _result = focus_listener
189                    .dispatch_focus_changes()
190                    .await
191                    .map(|_| {
192                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
193                    })
194                    .map_err(|e| {
195                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
196                    });
197            }
198        });
199        self.focus_listener_fut = Some(focus_listener_fut);
200        self
201    }
202}
203
204/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
205///
206/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
207/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
208///
209/// # Example
210/// ```
211/// let ime_handler =
212///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
213/// let touch_handler = TouchHandler::new(
214///     scene_manager.session.clone(),
215///     scene_manager.compositor_id,
216///     scene_manager.display_size
217/// ).await?;
218///
219/// let assembly = InputPipelineAssembly::new()
220///     .add_handler(Box::new(ime_handler)),
221///     .add_handler(Box::new(touch_handler)),
222/// let input_pipeline = InputPipeline::new(
223///     vec![
224///         input_device::InputDeviceType::Touch,
225///         input_device::InputDeviceType::Keyboard,
226///     ],
227///     assembly,
228/// );
229/// input_pipeline.handle_input_events().await;
230/// ```
231pub struct InputPipeline {
232    /// The entry point into the input handler pipeline. Incoming input events should
233    /// be inserted into this async queue, and the input pipeline will ensure that they
234    /// are propagated through all the input handlers in the appropriate sequence.
235    pipeline_sender: UnboundedSender<Vec<input_device::InputEvent>>,
236
237    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
238    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
239    device_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
240
241    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
242    device_event_receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
243
244    /// The types of devices this pipeline supports.
245    input_device_types: Vec<input_device::InputDeviceType>,
246
247    /// The InputDeviceBindings bound to this pipeline.
248    input_device_bindings: InputDeviceBindingHashMap,
249
250    /// This node is bound to the lifetime of this InputPipeline.
251    /// Inspect data will be dumped for this pipeline as long as it exists.
252    inspect_node: fuchsia_inspect::Node,
253
254    /// The metrics logger.
255    metrics_logger: metrics::MetricsLogger,
256
257    /// The feature flags for the input pipeline.
258    pub feature_flags: input_device::InputPipelineFeatureFlags,
259}
260
261impl InputPipeline {
262    fn new_common(
263        input_device_types: Vec<input_device::InputDeviceType>,
264        assembly: InputPipelineAssembly,
265        inspect_node: fuchsia_inspect::Node,
266        feature_flags: input_device::InputPipelineFeatureFlags,
267    ) -> Self {
268        let (
269            pipeline_sender,
270            receiver,
271            handlers,
272            metrics_logger,
273            display_ownership_fut,
274            focus_listener_fut,
275        ) = assembly.into_components();
276
277        let mut handlers_count = handlers.len();
278        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
279        if let Some(fut) = display_ownership_fut {
280            fasync::Task::local(fut).detach();
281            handlers_count += 1;
282        }
283
284        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
285        if let Some(fut) = focus_listener_fut {
286            fasync::Task::local(fut).detach();
287            handlers_count += 1;
288        }
289
290        // Add properties to inspect node
291        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
292        inspect_node.record_uint("handlers_registered", handlers_count as u64);
293        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
294
295        // Initializes all handlers and starts the input pipeline loop.
296        InputPipeline::run(receiver, handlers, metrics_logger.clone());
297
298        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
299        let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
300        InputPipeline {
301            pipeline_sender,
302            device_event_sender,
303            device_event_receiver,
304            input_device_types,
305            input_device_bindings,
306            inspect_node,
307            metrics_logger,
308            feature_flags,
309        }
310    }
311
312    /// Creates a new [`InputPipeline`] for integration testing.
313    /// Unlike a production input pipeline, this pipeline will not monitor
314    /// `/dev/class/input-report` for devices.
315    ///
316    /// # Parameters
317    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
318    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
319    pub fn new_for_test(
320        input_device_types: Vec<input_device::InputDeviceType>,
321        assembly: InputPipelineAssembly,
322    ) -> Self {
323        let inspector = fuchsia_inspect::Inspector::default();
324        let root = inspector.root();
325        let test_node = root.create_child("input_pipeline");
326        Self::new_common(
327            input_device_types,
328            assembly,
329            test_node,
330            input_device::InputPipelineFeatureFlags { enable_merge_touch_events: false },
331        )
332    }
333
334    /// Creates a new [`InputPipeline`] for production use.
335    ///
336    /// # Parameters
337    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
338    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
339    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
340    pub fn new(
341        incoming: &Incoming,
342        input_device_types: Vec<input_device::InputDeviceType>,
343        assembly: InputPipelineAssembly,
344        inspect_node: fuchsia_inspect::Node,
345        feature_flags: input_device::InputPipelineFeatureFlags,
346        metrics_logger: metrics::MetricsLogger,
347    ) -> Result<Self, Error> {
348        let input_pipeline =
349            Self::new_common(input_device_types, assembly, inspect_node, feature_flags);
350        let input_device_types = input_pipeline.input_device_types.clone();
351        let input_event_sender = input_pipeline.device_event_sender.clone();
352        let input_device_bindings = input_pipeline.input_device_bindings.clone();
353        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
354        let feature_flags = input_pipeline.feature_flags.clone();
355        let incoming = incoming.clone();
356        // This intentionally uses the [`fuchsia_async`] task dispatcher instead of
357        // [`crate::Dispatcher`] -- the directory watcher always uses the fuchsia-async dispatcher.
358        // This is fine for performance because the actual event dispatch is still configured to
359        // run on [`crate::Dispatcher`].
360        fasync::Task::local(async move {
361            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
362            // that send InputEvents to `input_event_receiver`.
363            match async {
364                let (dir_proxy, server) = endpoints::create_proxy::<fio::DirectoryMarker>();
365                incoming.as_ref_directory().open(
366                    input_device::INPUT_REPORT_PATH,
367                    fio::PERM_READABLE,
368                    server.into()
369                )
370                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
371                let device_watcher =
372                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
373                Self::watch_for_devices(
374                    device_watcher,
375                    dir_proxy,
376                    input_device_types,
377                    input_event_sender,
378                    input_device_bindings,
379                    &devices_node,
380                    false, /* break_on_idle */
381                    feature_flags,
382                    metrics_logger.clone(),
383                )
384                .await
385                .context("failed to watch for devices")
386            }
387            .await
388            {
389                Ok(()) => {}
390                Err(err) => {
391                    // This error is usually benign in tests: it means that the setup does not
392                    // support dynamic device discovery. Almost no tests support dynamic
393                    // device discovery, and they also do not need those.
394                    metrics_logger.log_warn(
395                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
396                        std::format!(
397                            "Input pipeline is unable to watch for new input devices: {:?}",
398                            err
399                        ));
400                }
401            }
402        }).detach();
403
404        Ok(input_pipeline)
405    }
406
407    /// Gets the input device bindings.
408    pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
409        &self.input_device_bindings
410    }
411
412    /// Gets the input device sender: this is the channel that should be cloned
413    /// and used for injecting events from the drivers into the input pipeline.
414    pub fn input_event_sender(&self) -> &UnboundedSender<Vec<input_device::InputEvent>> {
415        &self.device_event_sender
416    }
417
418    /// Gets a list of input device types supported by this input pipeline.
419    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
420        &self.input_device_types
421    }
422
423    /// Forwards all input events into the input pipeline.
424    pub async fn handle_input_events(mut self) {
425        let metrics_logger_clone = self.metrics_logger.clone();
426        while let Some(input_event) = self.device_event_receiver.next().await {
427            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
428                metrics_logger_clone.log_error(
429                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
430                    std::format!("could not forward event from driver: {:?}", &e));
431            }
432        }
433
434        metrics_logger_clone.log_error(
435            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
436            "Input pipeline stopped handling input events.".to_string(),
437        );
438    }
439
440    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
441    /// if new devices match a type in `device_types`.
442    ///
443    /// # Parameters
444    /// - `device_watcher`: Watches the input report directory for new devices.
445    /// - `dir_proxy`: The directory containing InputDevice connections.
446    /// - `device_types`: The types of devices to watch for.
447    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
448    /// - `bindings`: Holds all the InputDeviceBindings
449    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
450    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
451    /// - `metrics_logger`: The metrics logger.
452    ///
453    /// # Errors
454    /// If the input report directory or a file within it cannot be read.
455    async fn watch_for_devices(
456        mut device_watcher: Watcher,
457        dir_proxy: fio::DirectoryProxy,
458        device_types: Vec<input_device::InputDeviceType>,
459        input_event_sender: UnboundedSender<Vec<input_device::InputEvent>>,
460        bindings: InputDeviceBindingHashMap,
461        input_devices_node: &fuchsia_inspect::Node,
462        break_on_idle: bool,
463        feature_flags: input_device::InputPipelineFeatureFlags,
464        metrics_logger: metrics::MetricsLogger,
465    ) -> Result<(), Error> {
466        // Add non-static properties to inspect node.
467        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
468        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
469        while let Some(msg) = device_watcher.try_next().await? {
470            if let Ok(filename) = msg.filename.into_os_string().into_string() {
471                if filename == "." {
472                    continue;
473                }
474
475                let pathbuf = PathBuf::from(filename.clone());
476                match msg.event {
477                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
478                        log::info!("found input device {}", filename);
479                        devices_discovered.add(1);
480                        let device_proxy =
481                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
482                        add_device_bindings(
483                            &device_types,
484                            &filename,
485                            device_proxy,
486                            &input_event_sender,
487                            &bindings,
488                            get_next_device_id(),
489                            input_devices_node,
490                            Some(&devices_connected),
491                            feature_flags.clone(),
492                            metrics_logger.clone(),
493                        )
494                        .await;
495                    }
496                    WatchEvent::IDLE => {
497                        if break_on_idle {
498                            break;
499                        }
500                    }
501                    _ => (),
502                }
503            }
504        }
505        // Ensure inspect properties persist for debugging if device watch loop ends.
506        input_devices_node.record(devices_discovered);
507        input_devices_node.record(devices_connected);
508        Err(format_err!("Input pipeline stopped watching for new input devices."))
509    }
510
511    /// Handles the incoming InputDeviceRegistryRequestStream.
512    ///
513    /// This method will end when the request stream is closed. If the stream closes with an
514    /// error the error will be returned in the Result.
515    ///
516    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
517    ///
518    /// # Parameters
519    /// - `stream`: The stream of InputDeviceRegistryRequests.
520    /// - `device_types`: The types of devices to watch for.
521    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
522    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
523    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
524    /// - `metrics_logger`: The metrics logger.
525    pub async fn handle_input_device_registry_request_stream(
526        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
527        device_types: &Vec<input_device::InputDeviceType>,
528        input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
529        bindings: &InputDeviceBindingHashMap,
530        input_devices_node: &fuchsia_inspect::Node,
531        feature_flags: input_device::InputPipelineFeatureFlags,
532        metrics_logger: metrics::MetricsLogger,
533    ) -> Result<(), Error> {
534        while let Some(request) = stream
535            .try_next()
536            .await
537            .context("Error handling input device registry request stream")?
538        {
539            match request {
540                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
541                    device,
542                    ..
543                } => {
544                    // Add a binding if the device is a type being tracked
545                    let device = fidl_next::ClientEnd::<
546                        fidl_next_fuchsia_input_report::InputDevice,
547                        zx::Channel,
548                    >::from_untyped(device.into_channel());
549                    let device = Dispatcher::client_from_zx_channel(device);
550                    let device = device.spawn();
551                    let device_id = get_next_device_id();
552
553                    add_device_bindings(
554                        device_types,
555                        &format!("input-device-registry-{}", device_id),
556                        device,
557                        input_event_sender,
558                        bindings,
559                        device_id,
560                        input_devices_node,
561                        None,
562                        feature_flags.clone(),
563                        metrics_logger.clone(),
564                    )
565                    .await;
566                }
567                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
568                    device,
569                    responder,
570                    .. } => {
571                    // Add a binding if the device is a type being tracked
572                    let device = fidl_next::ClientEnd::<
573                        fidl_next_fuchsia_input_report::InputDevice,
574                        zx::Channel,
575                    >::from_untyped(device.into_channel());
576                    let device = Dispatcher::client_from_zx_channel(device);
577                    let device = device.spawn();
578                    let device_id = get_next_device_id();
579
580                    add_device_bindings(
581                        device_types,
582                        &format!("input-device-registry-{}", device_id),
583                        device,
584                        input_event_sender,
585                        bindings,
586                        device_id,
587                        input_devices_node,
588                        None,
589                        feature_flags.clone(),
590                        metrics_logger.clone(),
591                    )
592                    .await;
593
594                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
595                        device_id: Some(device_id),
596                        ..Default::default()
597                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
598                }
599            }
600        }
601
602        Ok(())
603    }
604
605    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
606    fn run(
607        mut receiver: UnboundedReceiver<Vec<input_device::InputEvent>>,
608        handlers: Vec<Rc<dyn input_handler::BatchInputHandler>>,
609        metrics_logger: metrics::MetricsLogger,
610    ) {
611        Dispatcher::spawn_local(async move {
612            for handler in &handlers {
613                handler.clone().set_handler_healthy();
614            }
615
616            use input_device::InputEventType;
617
618            let mut handlers_by_type: [Vec<Rc<dyn input_handler::BatchInputHandler>>; InputEventType::COUNT] = Default::default();
619
620            // TODO: b/478262850 - We can use supported_input_devices to populate this list.
621            let event_types = vec![
622                InputEventType::Keyboard,
623                InputEventType::LightSensor,
624                InputEventType::ConsumerControls,
625                InputEventType::Mouse,
626                InputEventType::TouchScreen,
627                InputEventType::Touchpad,
628                #[cfg(test)]
629                InputEventType::Fake,
630            ];
631
632            for event_type in event_types {
633                let handlers_for_type: Vec<Rc<dyn input_handler::BatchInputHandler>> = handlers
634                    .iter()
635                    .filter(|h| h.interest().contains(&event_type))
636                    .cloned()
637                    .collect();
638                handlers_by_type[event_type as usize] = handlers_for_type;
639            }
640
641            while let Some(events) = receiver.next().await {
642                if events.is_empty() {
643                    continue;
644                }
645
646                let mut groups_seen = 0;
647                let events = events
648                    .into_iter()
649                    .chunk_by(|e| InputEventType::from(&e.device_event));
650                let events = events.into_iter().map(|(k, v)| (k, v.collect::<Vec<_>>()));
651                for (event_type, event_group) in events {
652                    groups_seen += 1;
653                    if groups_seen == 2 {
654                        metrics_logger.log_error(
655                                InputPipelineErrorMetricDimensionEvent::InputFrameContainsMultipleTypesOfEvents,
656                                "it is not recommended to contain multiple types of events in 1 send".to_string(),
657                            );
658                    }
659                    let mut events_in_group = event_group;
660
661                    // Get pre-computed handlers for this event type.
662                    let handlers = &handlers_by_type[event_type as usize];
663
664                    for handler in handlers {
665                        events_in_group =
666                            handler.clone().handle_input_events(events_in_group).await;
667                    }
668
669                    for event in events_in_group {
670                        if event.handled == input_device::Handled::No {
671                            log::warn!("unhandled input event: {:?}", &event);
672                        }
673                        if let Some(trace_id) = event.trace_id {
674                            fuchsia_trace::flow_end!(
675                                "input",
676                                "event_in_input_pipeline",
677                                trace_id.into()
678                            );
679                        }
680                    }
681                }
682            }
683            for handler in &handlers {
684                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
685            }
686            panic!("Runner task is not supposed to terminate.")
687        }).detach();
688    }
689}
690
691/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
692///
693/// # Parameters
694/// - `device_types`: The types of devices to watch for.
695/// - `device_proxy`: A proxy to the input device.
696/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
697/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
698/// - `device_id`: The device id of the associated bindings.
699/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
700///
701/// # Note
702/// This will create multiple bindings, in the case where
703/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
704///   with multiple table fields populated, and
705/// * multiple populated table fields correspond to device types present in `device_types`
706///
707/// This is used, for example, to support the Atlas touchpad. In that case, a single
708/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
709/// a `fuchsia.input.report.TouchDescriptor`.
710async fn add_device_bindings(
711    device_types: &Vec<input_device::InputDeviceType>,
712    filename: &String,
713    device_proxy: fidl_next::Client<fidl_next_fuchsia_input_report::InputDevice, Transport>,
714    input_event_sender: &UnboundedSender<Vec<input_device::InputEvent>>,
715    bindings: &InputDeviceBindingHashMap,
716    device_id: u32,
717    input_devices_node: &fuchsia_inspect::Node,
718    devices_connected: Option<&fuchsia_inspect::UintProperty>,
719    feature_flags: InputPipelineFeatureFlags,
720    metrics_logger: metrics::MetricsLogger,
721) {
722    let mut matched_device_types = vec![];
723    if let Ok(res) = device_proxy.get_descriptor().await {
724        for device_type in device_types {
725            if input_device::is_device_type(&res.descriptor, *device_type).await {
726                matched_device_types.push(device_type);
727                match devices_connected {
728                    Some(dev_connected) => {
729                        let _ = dev_connected.add(1);
730                    }
731                    None => (),
732                };
733            }
734        }
735        if matched_device_types.is_empty() {
736            log::info!(
737                "device {} did not match any supported device types: {:?}",
738                filename,
739                device_types
740            );
741            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
742            let mut health = fuchsia_inspect::health::Node::new(&device_node);
743            health.set_unhealthy("Unsupported device type.");
744            device_node.record(health);
745            input_devices_node.record(device_node);
746            return;
747        }
748    } else {
749        metrics_logger.clone().log_error(
750            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
751            std::format!("cannot bind device {} without a device descriptor", filename),
752        );
753        return;
754    }
755
756    log::info!(
757        "binding {} to device types: {}",
758        filename,
759        matched_device_types
760            .iter()
761            .fold(String::new(), |device_types_string, device_type| device_types_string
762                + &format!("{:?}, ", device_type))
763    );
764
765    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
766    for device_type in matched_device_types {
767        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
768        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
769        //
770        // There's no conflict in having multiple bindings read from the same node,
771        // since:
772        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
773        // * the device driver will copy each incoming report to each connected reader.
774        //
775        // This does mean that reports from the Atlas touchpad device get read twice
776        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
777        // is operating in mouse mode or touchpad mode.
778        //
779        // This hasn't been an issue because:
780        // * Semantically: things are fine, because each binding discards irrelevant reports.
781        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
782        // * Performance wise: things are fine, because the data rate of the touchpad is low
783        //   (125 HZ).
784        //
785        // If we add additional cases where bindings share an underlying `input-report` node,
786        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
787        let proxy = device_proxy.clone();
788        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
789        match input_device::get_device_binding(
790            *device_type,
791            proxy,
792            device_id,
793            input_event_sender.clone(),
794            device_node,
795            feature_flags.clone(),
796            metrics_logger.clone(),
797        )
798        .await
799        {
800            Ok(binding) => new_bindings.push(binding),
801            Err(e) => {
802                metrics_logger.log_error(
803                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
804                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
805                );
806            }
807        }
808    }
809
810    if !new_bindings.is_empty() {
811        let mut bindings = bindings.lock().await;
812        bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
813    }
814}
815
816#[cfg(test)]
817mod tests {
818    use super::*;
819    use crate::input_device::{InputDeviceBinding, InputEventType};
820    use crate::utils::Position;
821    use crate::{
822        fake_input_device_binding, mouse_binding, mouse_model_database,
823        observe_fake_events_input_handler,
824    };
825    use async_trait::async_trait;
826    use diagnostics_assertions::AnyProperty;
827    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
828    use fuchsia_async as fasync;
829    use futures::FutureExt;
830    use pretty_assertions::assert_eq;
831    use rand::Rng;
832    use sorted_vec_map_rs::SortedVecSet;
833    use vfs::{pseudo_directory, service as pseudo_fs_service};
834
835    const COUNTS_PER_MM: u32 = 12;
836
837    /// Returns the InputEvent sent over `sender`.
838    ///
839    /// # Parameters
840    /// - `sender`: The channel to send the InputEvent over.
841    fn send_input_event(
842        sender: UnboundedSender<Vec<input_device::InputEvent>>,
843    ) -> Vec<input_device::InputEvent> {
844        let mut rng = rand::rng();
845        let offset =
846            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
847        let input_event = input_device::InputEvent {
848            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
849                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
850                    millimeters: Position {
851                        x: offset.x / COUNTS_PER_MM as f32,
852                        y: offset.y / COUNTS_PER_MM as f32,
853                    },
854                }),
855                None, /* wheel_delta_v */
856                None, /* wheel_delta_h */
857                mouse_binding::MousePhase::Move,
858                SortedVecSet::new(),
859                SortedVecSet::new(),
860                None, /* is_precision_scroll */
861                None, /* wake_lease */
862            )),
863            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
864                mouse_binding::MouseDeviceDescriptor {
865                    device_id: 1,
866                    absolute_x_range: None,
867                    absolute_y_range: None,
868                    wheel_v_range: None,
869                    wheel_h_range: None,
870                    buttons: None,
871                    counts_per_mm: COUNTS_PER_MM,
872                },
873            ),
874            event_time: zx::MonotonicInstant::get(),
875            handled: input_device::Handled::No,
876            trace_id: None,
877        };
878        match sender.unbounded_send(vec![input_event.clone()]) {
879            Err(_) => assert!(false),
880            _ => {}
881        }
882
883        vec![input_event]
884    }
885
886    /// Returns a MouseDescriptor on an InputDeviceRequest.
887    ///
888    /// # Parameters
889    /// - `input_device_request`: The request to handle.
890    fn handle_input_device_request(
891        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
892    ) {
893        match input_device_request {
894            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
895                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
896                    device_information: None,
897                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
898                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
899                            movement_x: None,
900                            movement_y: None,
901                            scroll_v: None,
902                            scroll_h: None,
903                            buttons: Some(vec![0]),
904                            position_x: None,
905                            position_y: None,
906                            ..Default::default()
907                        }),
908                        ..Default::default()
909                    }),
910                    sensor: None,
911                    touch: None,
912                    keyboard: None,
913                    consumer_control: None,
914                    ..Default::default()
915                });
916            }
917            _ => {}
918        }
919    }
920
921    /// Tests that an input pipeline handles events from multiple devices.
922    #[fasync::run_singlethreaded(test)]
923    async fn multiple_devices_single_handler() {
924        // Create two fake device bindings.
925        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
926        let first_device_binding =
927            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
928        let second_device_binding =
929            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
930
931        // Create a fake input handler.
932        let (handler_event_sender, mut handler_event_receiver) =
933            futures::channel::mpsc::channel(100);
934        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
935            handler_event_sender,
936        );
937
938        // Build the input pipeline.
939        let (sender, receiver, handlers, _, _, _) =
940            InputPipelineAssembly::new(metrics::MetricsLogger::default())
941                .add_handler(input_handler)
942                .into_components();
943        let inspector = fuchsia_inspect::Inspector::default();
944        let test_node = inspector.root().create_child("input_pipeline");
945        let input_pipeline = InputPipeline {
946            pipeline_sender: sender,
947            device_event_sender,
948            device_event_receiver,
949            input_device_types: vec![],
950            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
951            inspect_node: test_node,
952            metrics_logger: metrics::MetricsLogger::default(),
953            feature_flags: input_device::InputPipelineFeatureFlags::default(),
954        };
955        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
956
957        // Send an input event from each device.
958        let first_device_events = send_input_event(first_device_binding.input_event_sender());
959        let second_device_events = send_input_event(second_device_binding.input_event_sender());
960
961        // Run the pipeline.
962        fasync::Task::local(async {
963            input_pipeline.handle_input_events().await;
964        })
965        .detach();
966
967        // Assert the handler receives the events.
968        let first_handled_event = handler_event_receiver.next().await;
969        assert_eq!(first_handled_event, first_device_events.into_iter().next());
970
971        let second_handled_event = handler_event_receiver.next().await;
972        assert_eq!(second_handled_event, second_device_events.into_iter().next());
973    }
974
975    /// Tests that an input pipeline handles events through multiple input handlers.
976    #[fasync::run_singlethreaded(test)]
977    async fn single_device_multiple_handlers() {
978        // Create two fake device bindings.
979        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
980        let input_device_binding =
981            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
982
983        // Create two fake input handlers.
984        let (first_handler_event_sender, mut first_handler_event_receiver) =
985            futures::channel::mpsc::channel(100);
986        let first_input_handler =
987            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
988                first_handler_event_sender,
989            );
990        let (second_handler_event_sender, mut second_handler_event_receiver) =
991            futures::channel::mpsc::channel(100);
992        let second_input_handler =
993            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
994                second_handler_event_sender,
995            );
996
997        // Build the input pipeline.
998        let (sender, receiver, handlers, _, _, _) =
999            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1000                .add_handler(first_input_handler)
1001                .add_handler(second_input_handler)
1002                .into_components();
1003        let inspector = fuchsia_inspect::Inspector::default();
1004        let test_node = inspector.root().create_child("input_pipeline");
1005        let input_pipeline = InputPipeline {
1006            pipeline_sender: sender,
1007            device_event_sender,
1008            device_event_receiver,
1009            input_device_types: vec![],
1010            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
1011            inspect_node: test_node,
1012            metrics_logger: metrics::MetricsLogger::default(),
1013            feature_flags: input_device::InputPipelineFeatureFlags::default(),
1014        };
1015        InputPipeline::run(receiver, handlers, metrics::MetricsLogger::default());
1016
1017        // Send an input event.
1018        let input_events = send_input_event(input_device_binding.input_event_sender());
1019
1020        // Run the pipeline.
1021        fasync::Task::local(async {
1022            input_pipeline.handle_input_events().await;
1023        })
1024        .detach();
1025
1026        // Assert both handlers receive the event.
1027        let expected_event = input_events.into_iter().next();
1028        let first_handler_event = first_handler_event_receiver.next().await;
1029        assert_eq!(first_handler_event, expected_event);
1030        let second_handler_event = second_handler_event_receiver.next().await;
1031        assert_eq!(second_handler_event, expected_event);
1032    }
1033
1034    /// Tests that a single mouse device binding is created for the one input device in the
1035    /// input report directory.
1036    #[fasync::run_singlethreaded(test)]
1037    async fn watch_devices_one_match_exists() {
1038        // Create a file in a pseudo directory that represents an input device.
1039        let mut count: i8 = 0;
1040        let dir = pseudo_directory! {
1041            "file_name" => pseudo_fs_service::host(
1042                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1043                    async move {
1044                        while count < 3 {
1045                            if let Some(input_device_request) =
1046                                request_stream.try_next().await.unwrap()
1047                            {
1048                                handle_input_device_request(input_device_request);
1049                                count += 1;
1050                            }
1051                        }
1052
1053                    }.boxed()
1054                },
1055            )
1056        };
1057
1058        // Create a Watcher on the pseudo directory.
1059        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1060        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1061        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1062        // proxy to get connections to input devices.
1063        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1064
1065        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1066        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1067        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
1068
1069        let inspector = fuchsia_inspect::Inspector::default();
1070        let test_node = inspector.root().create_child("input_pipeline");
1071        test_node.record_string(
1072            "supported_input_devices",
1073            supported_device_types.clone().iter().join(", "),
1074        );
1075        let input_devices = test_node.create_child("input_devices");
1076        // Assert that inspect tree is initialized with no devices.
1077        diagnostics_assertions::assert_data_tree!(inspector, root: {
1078            input_pipeline: {
1079                supported_input_devices: "Mouse",
1080                input_devices: {}
1081            }
1082        });
1083
1084        let _ = InputPipeline::watch_for_devices(
1085            device_watcher,
1086            dir_proxy_for_pipeline,
1087            supported_device_types,
1088            input_event_sender,
1089            bindings.clone(),
1090            &input_devices,
1091            true, /* break_on_idle */
1092            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1093            metrics::MetricsLogger::default(),
1094        )
1095        .await;
1096
1097        // Assert that one mouse device with accurate device id was found.
1098        let bindings_hashmap = bindings.lock().await;
1099        assert_eq!(bindings_hashmap.len(), 1);
1100        let bindings_vector = bindings_hashmap.get(&10);
1101        assert!(bindings_vector.is_some());
1102        assert_eq!(bindings_vector.unwrap().len(), 1);
1103        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1104        assert!(boxed_mouse_binding.is_some());
1105        assert_eq!(
1106            boxed_mouse_binding.unwrap().get_device_descriptor(),
1107            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1108                device_id: 10,
1109                absolute_x_range: None,
1110                absolute_y_range: None,
1111                wheel_v_range: None,
1112                wheel_h_range: None,
1113                buttons: Some(vec![0]),
1114                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1115            })
1116        );
1117
1118        // Assert that inspect tree reflects new device discovered and connected.
1119        diagnostics_assertions::assert_data_tree!(inspector, root: {
1120            input_pipeline: {
1121                supported_input_devices: "Mouse",
1122                input_devices: {
1123                    devices_discovered: 1u64,
1124                    devices_connected: 1u64,
1125                    "file_name_Mouse": contains {
1126                        reports_received_count: 0u64,
1127                        reports_filtered_count: 0u64,
1128                        events_generated: 0u64,
1129                        last_received_timestamp_ns: 0u64,
1130                        last_generated_timestamp_ns: 0u64,
1131                        "fuchsia.inspect.Health": {
1132                            status: "OK",
1133                            // Timestamp value is unpredictable and not relevant in this context,
1134                            // so we only assert that the property is present.
1135                            start_timestamp_nanos: AnyProperty
1136                        },
1137                    }
1138                }
1139            }
1140        });
1141    }
1142
1143    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1144    /// but only a mouse exists.
1145    #[fasync::run_singlethreaded(test)]
1146    async fn watch_devices_no_matches_exist() {
1147        // Create a file in a pseudo directory that represents an input device.
1148        let mut count: i8 = 0;
1149        let dir = pseudo_directory! {
1150            "file_name" => pseudo_fs_service::host(
1151                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1152                    async move {
1153                        while count < 1 {
1154                            if let Some(input_device_request) =
1155                                request_stream.try_next().await.unwrap()
1156                            {
1157                                handle_input_device_request(input_device_request);
1158                                count += 1;
1159                            }
1160                        }
1161
1162                    }.boxed()
1163                },
1164            )
1165        };
1166
1167        // Create a Watcher on the pseudo directory.
1168        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1169        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1170        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1171        // proxy to get connections to input devices.
1172        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1173
1174        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1175        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1176        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1177
1178        let inspector = fuchsia_inspect::Inspector::default();
1179        let test_node = inspector.root().create_child("input_pipeline");
1180        test_node.record_string(
1181            "supported_input_devices",
1182            supported_device_types.clone().iter().join(", "),
1183        );
1184        let input_devices = test_node.create_child("input_devices");
1185        // Assert that inspect tree is initialized with no devices.
1186        diagnostics_assertions::assert_data_tree!(inspector, root: {
1187            input_pipeline: {
1188                supported_input_devices: "Keyboard",
1189                input_devices: {}
1190            }
1191        });
1192
1193        let _ = InputPipeline::watch_for_devices(
1194            device_watcher,
1195            dir_proxy_for_pipeline,
1196            supported_device_types,
1197            input_event_sender,
1198            bindings.clone(),
1199            &input_devices,
1200            true, /* break_on_idle */
1201            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1202            metrics::MetricsLogger::default(),
1203        )
1204        .await;
1205
1206        // Assert that no devices were found.
1207        let bindings = bindings.lock().await;
1208        assert_eq!(bindings.len(), 0);
1209
1210        // Assert that inspect tree reflects new device discovered, but not connected.
1211        diagnostics_assertions::assert_data_tree!(inspector, root: {
1212            input_pipeline: {
1213                supported_input_devices: "Keyboard",
1214                input_devices: {
1215                    devices_discovered: 1u64,
1216                    devices_connected: 0u64,
1217                    "file_name_Unsupported": {
1218                        "fuchsia.inspect.Health": {
1219                            status: "UNHEALTHY",
1220                            message: "Unsupported device type.",
1221                            // Timestamp value is unpredictable and not relevant in this context,
1222                            // so we only assert that the property is present.
1223                            start_timestamp_nanos: AnyProperty
1224                        },
1225                    }
1226                }
1227            }
1228        });
1229    }
1230
1231    /// Tests that a single keyboard device binding is created for the input device registered
1232    /// through InputDeviceRegistry.
1233    #[fasync::run_singlethreaded(test)]
1234    async fn handle_input_device_registry_request_stream() {
1235        let (input_device_registry_proxy, input_device_registry_request_stream) =
1236            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1237        let (input_device_client_end, mut input_device_request_stream) =
1238            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1239
1240        let device_types = vec![input_device::InputDeviceType::Mouse];
1241        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1242        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1243
1244        // Handle input device requests.
1245        let mut count: i8 = 0;
1246        fasync::Task::local(async move {
1247            // Register a device.
1248            let _ = input_device_registry_proxy.register(input_device_client_end);
1249
1250            while count < 3 {
1251                if let Some(input_device_request) =
1252                    input_device_request_stream.try_next().await.unwrap()
1253                {
1254                    handle_input_device_request(input_device_request);
1255                    count += 1;
1256                }
1257            }
1258
1259            // End handle_input_device_registry_request_stream() by taking the event stream.
1260            input_device_registry_proxy.take_event_stream();
1261        })
1262        .detach();
1263
1264        let inspector = fuchsia_inspect::Inspector::default();
1265        let test_node = inspector.root().create_child("input_pipeline");
1266
1267        // Start listening for InputDeviceRegistryRequests.
1268        let bindings_clone = bindings.clone();
1269        let _ = InputPipeline::handle_input_device_registry_request_stream(
1270            input_device_registry_request_stream,
1271            &device_types,
1272            &input_event_sender,
1273            &bindings_clone,
1274            &test_node,
1275            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1276            metrics::MetricsLogger::default(),
1277        )
1278        .await;
1279
1280        // Assert that a device was registered.
1281        let bindings = bindings.lock().await;
1282        assert_eq!(bindings.len(), 1);
1283    }
1284
1285    // Tests that correct properties are added to inspect node when InputPipeline is created.
1286    #[fasync::run_singlethreaded(test)]
1287    async fn check_inspect_node_has_correct_properties() {
1288        let device_types = vec![
1289            input_device::InputDeviceType::Touch,
1290            input_device::InputDeviceType::ConsumerControls,
1291        ];
1292        let inspector = fuchsia_inspect::Inspector::default();
1293        let test_node = inspector.root().create_child("input_pipeline");
1294        // Create fake input handler for assembly
1295        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1296            futures::channel::mpsc::channel(100);
1297        let fake_input_handler =
1298            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1299                fake_handler_event_sender,
1300            );
1301        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1302            .add_handler(fake_input_handler);
1303        let _test_input_pipeline = InputPipeline::new(
1304            &Incoming::new(),
1305            device_types,
1306            assembly,
1307            test_node,
1308            InputPipelineFeatureFlags { enable_merge_touch_events: false },
1309            metrics::MetricsLogger::default(),
1310        );
1311        diagnostics_assertions::assert_data_tree!(inspector, root: {
1312            input_pipeline: {
1313                supported_input_devices: "Touch, ConsumerControls",
1314                handlers_registered: 1u64,
1315                handlers_healthy: 1u64,
1316                input_devices: {}
1317            }
1318        });
1319    }
1320
1321    struct SpecificInterestFakeHandler {
1322        interest_types: Vec<input_device::InputEventType>,
1323        event_sender: std::cell::RefCell<futures::channel::mpsc::Sender<input_device::InputEvent>>,
1324    }
1325
1326    impl SpecificInterestFakeHandler {
1327        pub fn new(
1328            interest_types: Vec<input_device::InputEventType>,
1329            event_sender: futures::channel::mpsc::Sender<input_device::InputEvent>,
1330        ) -> Rc<Self> {
1331            Rc::new(SpecificInterestFakeHandler {
1332                interest_types,
1333                event_sender: std::cell::RefCell::new(event_sender),
1334            })
1335        }
1336    }
1337
1338    impl Handler for SpecificInterestFakeHandler {
1339        fn set_handler_healthy(self: std::rc::Rc<Self>) {}
1340        fn set_handler_unhealthy(self: std::rc::Rc<Self>, _msg: &str) {}
1341        fn get_name(&self) -> &'static str {
1342            "SpecificInterestFakeHandler"
1343        }
1344
1345        fn interest(&self) -> Vec<input_device::InputEventType> {
1346            self.interest_types.clone()
1347        }
1348    }
1349
1350    #[async_trait(?Send)]
1351    impl input_handler::InputHandler for SpecificInterestFakeHandler {
1352        async fn handle_input_event(
1353            self: Rc<Self>,
1354            input_event: input_device::InputEvent,
1355        ) -> Vec<input_device::InputEvent> {
1356            match self.event_sender.borrow_mut().try_send(input_event.clone()) {
1357                Err(e) => panic!("SpecificInterestFakeHandler failed to send event: {:?}", e),
1358                Ok(_) => {}
1359            }
1360            vec![input_event]
1361        }
1362    }
1363
1364    #[fasync::run_singlethreaded(test)]
1365    async fn run_only_sends_events_to_interested_handlers() {
1366        // Mouse Handler (Specific Interest: Mouse)
1367        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(1);
1368        let mouse_handler =
1369            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1370
1371        // Fake Handler (Specific Interest: Fake)
1372        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(1);
1373        let fake_handler =
1374            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1375
1376        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1377            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1378                .add_handler(mouse_handler)
1379                .add_handler(fake_handler)
1380                .into_components();
1381
1382        // Run the pipeline logic
1383        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1384
1385        // Create a Fake event
1386        let fake_event = input_device::InputEvent {
1387            device_event: input_device::InputDeviceEvent::Fake,
1388            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1389            event_time: zx::MonotonicInstant::get(),
1390            handled: input_device::Handled::No,
1391            trace_id: None,
1392        };
1393
1394        // Send the Fake event
1395        pipeline_sender.unbounded_send(vec![fake_event.clone()]).expect("failed to send event");
1396
1397        // Verify Fake Handler received it
1398        let received_by_fake = fake_receiver.next().await;
1399        assert_eq!(received_by_fake, Some(fake_event));
1400
1401        // Verify Mouse Handler did NOT receive it
1402        assert!(mouse_receiver.try_next().is_err());
1403    }
1404
1405    fn create_mouse_event(x: f32, y: f32) -> input_device::InputEvent {
1406        input_device::InputEvent {
1407            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
1408                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
1409                    millimeters: Position { x, y },
1410                }),
1411                None,
1412                None,
1413                mouse_binding::MousePhase::Move,
1414                SortedVecSet::new(),
1415                SortedVecSet::new(),
1416                None,
1417                None,
1418            )),
1419            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
1420                mouse_binding::MouseDeviceDescriptor {
1421                    device_id: 1,
1422                    absolute_x_range: None,
1423                    absolute_y_range: None,
1424                    wheel_v_range: None,
1425                    wheel_h_range: None,
1426                    buttons: None,
1427                    counts_per_mm: 1,
1428                },
1429            ),
1430            event_time: zx::MonotonicInstant::get(),
1431            handled: input_device::Handled::No,
1432            trace_id: None,
1433        }
1434    }
1435
1436    #[fasync::run_singlethreaded(test)]
1437    async fn run_mixed_event_types_dispatched_correctly() {
1438        // Mouse Handler (Specific Interest: Mouse)
1439        let (mouse_sender, mut mouse_receiver) = futures::channel::mpsc::channel(10);
1440        let mouse_handler =
1441            SpecificInterestFakeHandler::new(vec![InputEventType::Mouse], mouse_sender);
1442
1443        // Fake Handler (Specific Interest: Fake)
1444        let (fake_sender, mut fake_receiver) = futures::channel::mpsc::channel(10);
1445        let fake_handler =
1446            SpecificInterestFakeHandler::new(vec![InputEventType::Fake], fake_sender);
1447
1448        let (pipeline_sender, pipeline_receiver, handlers, _, _, _) =
1449            InputPipelineAssembly::new(metrics::MetricsLogger::default())
1450                .add_handler(mouse_handler)
1451                .add_handler(fake_handler)
1452                .into_components();
1453
1454        // Run the pipeline logic
1455        InputPipeline::run(pipeline_receiver, handlers, metrics::MetricsLogger::default());
1456
1457        // Create events
1458        let mouse_event_1 = create_mouse_event(1.0, 1.0);
1459        let mouse_event_2 = create_mouse_event(2.0, 2.0);
1460        let mouse_event_3 = create_mouse_event(3.0, 3.0);
1461
1462        let fake_event_1 = input_device::InputEvent {
1463            device_event: input_device::InputDeviceEvent::Fake,
1464            device_descriptor: input_device::InputDeviceDescriptor::Fake,
1465            event_time: zx::MonotonicInstant::get(),
1466            handled: input_device::Handled::No,
1467            trace_id: None,
1468        };
1469
1470        // Send mixed batch: [Mouse, Mouse, Fake, Mouse]
1471        // This should result in 3 chunks: [Mouse, Mouse], [Fake], [Mouse]
1472        let mixed_batch = vec![
1473            mouse_event_1.clone(),
1474            mouse_event_2.clone(),
1475            fake_event_1.clone(),
1476            mouse_event_3.clone(),
1477        ];
1478        pipeline_sender.unbounded_send(mixed_batch).expect("failed to send events");
1479
1480        // Verify Mouse Handler received M1, M2, and then M3
1481        assert_eq!(mouse_receiver.next().await, Some(mouse_event_1));
1482        assert_eq!(mouse_receiver.next().await, Some(mouse_event_2));
1483        assert_eq!(mouse_receiver.next().await, Some(mouse_event_3));
1484
1485        // Verify Fake Handler received F1
1486        assert_eq!(fake_receiver.next().await, Some(fake_event_1));
1487    }
1488}