input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::autorepeater::Autorepeater;
6use crate::display_ownership::DisplayOwnership;
7use crate::focus_listener::FocusListener;
8use crate::{input_device, input_handler, metrics};
9use anyhow::{format_err, Context, Error};
10use focus_chain_provider::FocusChainProviderPublisher;
11use fuchsia_fs::directory::{WatchEvent, Watcher};
12use fuchsia_inspect::health::Reporter;
13use fuchsia_inspect::NumericProperty;
14use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
15use futures::lock::Mutex;
16use futures::{StreamExt, TryStreamExt};
17use itertools::Itertools;
18use metrics_registry::*;
19use std::collections::HashMap;
20use std::path::PathBuf;
21use std::rc::Rc;
22use std::sync::atomic::{AtomicU32, Ordering};
23use std::sync::{Arc, LazyLock};
24use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
25
26/// Use a self incremental u32 unique id for device_id.
27///
28/// device id start from 10 to avoid conflict with default devices in Starnix.
29/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
30/// use default devices to deliver events from physical devices until we have
31/// API to expose device changes to UI clients.
32static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
33
34/// Each time this function is invoked, it returns the current value of its
35/// internal counter (serving as a unique id for device_id) and then increments
36/// that counter in preparation for the next call.
37fn get_next_device_id() -> u32 {
38    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
39}
40
41type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
42
43/// An [`InputDeviceBindingHashMap`] maps an input device to one or more InputDeviceBindings.
44/// It uses unique device id as key.
45pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
46
47/// An input pipeline assembly.
48///
49/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
50/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
51/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
52/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
53///
54/// # Implementation notes
55///
56/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
57/// handlers are connected together using async queues.  This allows fully streamed processing of
58/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
59/// without an external stimulus.
60pub struct InputPipelineAssembly {
61    /// The top-level sender: send into this queue to inject an event into the input
62    /// pipeline.
63    sender: UnboundedSender<input_device::InputEvent>,
64    /// The bottom-level receiver: any events that fall through the entire pipeline can
65    /// be read from this receiver.  See [catch_unhandled] for a canned way to catch and
66    /// log unhandled events.
67    receiver: UnboundedReceiver<input_device::InputEvent>,
68    /// The tasks that were instantiated as result of calling [new].  You *must*
69    /// submit all the tasks to an executor to have them start.  Use [components] to
70    /// get the tasks.  See [run] for a canned way to start these tasks.
71    tasks: Vec<fuchsia_async::Task<()>>,
72
73    /// The metrics logger.
74    metrics_logger: metrics::MetricsLogger,
75}
76
77impl InputPipelineAssembly {
78    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
79    /// to add new handlers to it.
80    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
81        let (sender, receiver) = mpsc::unbounded();
82        let tasks = vec![];
83        InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
84    }
85
86    /// Adds another [input_handler::InputHandler] into the [InputPipelineAssembly]. The handlers
87    /// are invoked in the order they are added, and successive handlers are glued together using
88    /// unbounded queues.  Returns `Self` for chaining.
89    pub fn add_handler(self, handler: Rc<dyn input_handler::InputHandler>) -> Self {
90        let (sender, mut receiver, mut tasks, metrics_logger) = self.into_components();
91        let metrics_logger_clone = metrics_logger.clone();
92        let (next_sender, next_receiver) = mpsc::unbounded();
93        let handler_name = handler.get_name();
94        tasks.push(fasync::Task::local(async move {
95            handler.clone().set_handler_healthy();
96            while let Some(event) = receiver.next().await {
97                // Note: the `handler_name` _should not_ be used as ABI (e.g. referenced from
98                // data processing scripts), as `handler_name` is not guaranteed to be consistent
99                // between releases.
100                let out_events = {
101                    let _async_trace = fuchsia_trace::async_enter!(
102                        fuchsia_trace::Id::new(),
103                        c"input",
104                        c"handle_input_event",
105                        "name" => handler_name
106                    );
107                    handler.clone().handle_input_event(event).await
108                };
109                for out_event in out_events.into_iter() {
110                    if let Err(e) = next_sender.unbounded_send(out_event) {
111                        metrics_logger_clone.log_error(
112                            InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEvent,
113                            std::format!(
114                                "could not forward event output from handler: {:?}: {:?}",
115                                handler_name,
116                                e));
117                        // This is not a recoverable error, break here.
118                        break;
119                    }
120                }
121            }
122            handler.clone().set_handler_unhealthy(std::format!("Receive loop terminated for handler: {:?}", handler_name).as_str());
123            panic!("receive loop is not supposed to terminate for handler: {:?}", handler_name);
124        }));
125        receiver = next_receiver;
126        InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
127    }
128
129    /// Adds all handlers into the assembly in the order they appear in `handlers`.
130    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::InputHandler>>) -> Self {
131        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
132    }
133
134    /// Adds the [DisplayOwnership] to the input pipeline.  The `display_ownership_event` is
135    /// assumed to be the Scenic event used to report changes in display ownership, obtained
136    /// by `fuchsia.ui.scenic/Scenic.GetDisplayOwnershipEvent`. This code has no way to check
137    /// whether that invariant is upheld, so this is something that the user will need to
138    /// ensure.
139    pub fn add_display_ownership(
140        self,
141        display_ownership_event: zx::Event,
142        input_handlers_node: &fuchsia_inspect::Node,
143    ) -> InputPipelineAssembly {
144        let (sender, autorepeat_receiver, mut tasks, metrics_logger) = self.into_components();
145        let (autorepeat_sender, receiver) = mpsc::unbounded();
146        let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
147        let metrics_logger_clone = metrics_logger.clone();
148        tasks.push(fasync::Task::local(async move {
149            h.clone().set_handler_healthy();
150            h.clone().handle_input_events(autorepeat_receiver, autorepeat_sender)
151                .await
152                .map_err(|e| {
153                    metrics_logger_clone.log_error(
154                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
155                        std::format!(
156                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
157                }).unwrap();
158            h.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
159        }));
160        InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
161    }
162
163    /// Adds the autorepeater into the input pipeline assembly.  The autorepeater
164    /// is installed after any handlers that have been already added to the
165    /// assembly.
166    pub fn add_autorepeater(self, input_handlers_node: &fuchsia_inspect::Node) -> Self {
167        let (sender, autorepeat_receiver, mut tasks, metrics_logger) = self.into_components();
168        let (autorepeat_sender, receiver) = mpsc::unbounded();
169        let metrics_logger_clone = metrics_logger.clone();
170        let a = Autorepeater::new(autorepeat_receiver, input_handlers_node, metrics_logger.clone());
171        tasks.push(fasync::Task::local(async move {
172            a.clone().set_handler_healthy();
173            a.clone()
174                .run(autorepeat_sender)
175                .await
176                .map_err(|e| {
177                    metrics_logger_clone.log_error(
178                        InputPipelineErrorMetricDimensionEvent::InputPipelineAutorepeatRunningError,
179                        std::format!("error while running autorepeater: {:?}", e),
180                    );
181                })
182                .expect("autorepeater should never error out");
183            a.set_handler_unhealthy("Receive loop terminated for handler: Autorepeater");
184        }));
185        InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
186    }
187
188    /// Deconstructs the assembly into constituent components, used when constructing
189    /// [InputPipeline].
190    ///
191    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
192    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
193    fn into_components(
194        self,
195    ) -> (
196        UnboundedSender<input_device::InputEvent>,
197        UnboundedReceiver<input_device::InputEvent>,
198        Vec<fuchsia_async::Task<()>>,
199        metrics::MetricsLogger,
200    ) {
201        (self.sender, self.receiver, self.tasks, self.metrics_logger)
202    }
203
204    /// Adds a focus listener task into the input pipeline assembly.  The focus
205    /// listener forwards focus chain changes to
206    /// `fuchsia.ui.keyboard.focus.Controller` and watchers of
207    /// `fuchsia.ui.focus.FocusChainProvider`.  It is required for the correct
208    /// operation of the implementors of those protocols, e.g. `text_manager`.
209    ///
210    /// # Arguments:
211    /// * `focus_chain_publisher`: to forward to other downstream watchers.
212    ///
213    /// # Requires:
214    /// * `fuchsia.ui.views.FocusChainListenerRegistry`: to register for updates.
215    /// * `fuchsia.ui.keyboard.focus.Controller`: to forward to text_manager.
216    pub fn add_focus_listener(self, focus_chain_publisher: FocusChainProviderPublisher) -> Self {
217        let (sender, receiver, mut tasks, metrics_logger) = self.into_components();
218        let metrics_logger_clone = metrics_logger.clone();
219        tasks.push(fasync::Task::local(async move {
220            if let Ok(mut focus_listener) =
221                FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
222                    log::warn!(
223                        "could not create focus listener, focus will not be dispatched: {:?}",
224                        e
225                    )
226                })
227            {
228                // This will await indefinitely and process focus messages in a loop, unless there
229                // is a problem.
230                let _result = focus_listener
231                    .dispatch_focus_changes()
232                    .await
233                    .map(|_| {
234                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
235                    })
236                    .map_err(|e| {
237                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
238                    });
239            }
240        }));
241        InputPipelineAssembly { sender, receiver, tasks, metrics_logger }
242    }
243}
244
245/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
246///
247/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
248/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
249///
250/// # Example
251/// ```
252/// let ime_handler =
253///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
254/// let touch_handler = TouchHandler::new(
255///     scene_manager.session.clone(),
256///     scene_manager.compositor_id,
257///     scene_manager.display_size
258/// ).await?;
259///
260/// let assembly = InputPipelineAssembly::new()
261///     .add_handler(Box::new(ime_handler)),
262///     .add_handler(Box::new(touch_handler)),
263/// let input_pipeline = InputPipeline::new(
264///     vec![
265///         input_device::InputDeviceType::Touch,
266///         input_device::InputDeviceType::Keyboard,
267///     ],
268///     assembly,
269/// );
270/// input_pipeline.handle_input_events().await;
271/// ```
272pub struct InputPipeline {
273    /// The entry point into the input handler pipeline. Incoming input events should
274    /// be inserted into this async queue, and the input pipeline will ensure that they
275    /// are propagated through all the input handlers in the appropriate sequence.
276    pipeline_sender: UnboundedSender<input_device::InputEvent>,
277
278    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
279    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
280    device_event_sender: UnboundedSender<input_device::InputEvent>,
281
282    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
283    device_event_receiver: UnboundedReceiver<input_device::InputEvent>,
284
285    /// The types of devices this pipeline supports.
286    input_device_types: Vec<input_device::InputDeviceType>,
287
288    /// The InputDeviceBindings bound to this pipeline.
289    input_device_bindings: InputDeviceBindingHashMap,
290
291    /// This node is bound to the lifetime of this InputPipeline.
292    /// Inspect data will be dumped for this pipeline as long as it exists.
293    inspect_node: fuchsia_inspect::Node,
294
295    /// The metrics logger.
296    metrics_logger: metrics::MetricsLogger,
297}
298
299impl InputPipeline {
300    /// Does the work that is common to building an input pipeline, across
301    /// the integration-test and production configurations.
302    fn new_common(
303        input_device_types: Vec<input_device::InputDeviceType>,
304        assembly: InputPipelineAssembly,
305        inspect_node: fuchsia_inspect::Node,
306    ) -> Self {
307        let (pipeline_sender, receiver, tasks, metrics_logger) = assembly.into_components();
308
309        // Add properties to inspect node
310        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
311        inspect_node.record_uint("handlers_registered", tasks.len() as u64);
312        inspect_node.record_uint("handlers_healthy", tasks.len() as u64);
313
314        // Add a stage that catches events which drop all the way down through the pipeline
315        // and logs them.
316        InputPipeline::catch_unhandled(receiver);
317
318        // The tasks in the assembly are all unstarted.  Run them now.
319        InputPipeline::run(tasks);
320
321        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
322        let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
323        InputPipeline {
324            pipeline_sender,
325            device_event_sender,
326            device_event_receiver,
327            input_device_types,
328            input_device_bindings,
329            inspect_node,
330            metrics_logger,
331        }
332    }
333
334    /// Creates a new [`InputPipeline`] for integration testing.
335    /// Unlike a production input pipeline, this pipeline will not monitor
336    /// `/dev/class/input-report` for devices.
337    ///
338    /// # Parameters
339    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
340    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
341    pub fn new_for_test(
342        input_device_types: Vec<input_device::InputDeviceType>,
343        assembly: InputPipelineAssembly,
344    ) -> Self {
345        let inspector = fuchsia_inspect::Inspector::default();
346        let root = inspector.root();
347        let test_node = root.create_child("input_pipeline");
348        Self::new_common(input_device_types, assembly, test_node)
349    }
350
351    /// Creates a new [`InputPipeline`] for production use.
352    ///
353    /// # Parameters
354    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
355    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
356    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
357    pub fn new(
358        input_device_types: Vec<input_device::InputDeviceType>,
359        assembly: InputPipelineAssembly,
360        inspect_node: fuchsia_inspect::Node,
361        metrics_logger: metrics::MetricsLogger,
362    ) -> Result<Self, Error> {
363        let input_pipeline = Self::new_common(input_device_types, assembly, inspect_node);
364        let input_device_types = input_pipeline.input_device_types.clone();
365        let input_event_sender = input_pipeline.device_event_sender.clone();
366        let input_device_bindings = input_pipeline.input_device_bindings.clone();
367        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
368        fasync::Task::local(async move {
369            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
370            // that send InputEvents to `input_event_receiver`.
371            match async {
372                let dir_proxy = fuchsia_fs::directory::open_in_namespace(
373                    input_device::INPUT_REPORT_PATH,
374                    fuchsia_fs::PERM_READABLE,
375                )
376                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
377                let device_watcher =
378                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
379                Self::watch_for_devices(
380                    device_watcher,
381                    dir_proxy,
382                    input_device_types,
383                    input_event_sender,
384                    input_device_bindings,
385                    &devices_node,
386                    false, /* break_on_idle */
387                    metrics_logger.clone(),
388                )
389                .await
390                .context("failed to watch for devices")
391            }
392            .await
393            {
394                Ok(()) => {}
395                Err(err) => {
396                    // This error is usually benign in tests: it means that the setup does not
397                    // support dynamic device discovery. Almost no tests support dynamic
398                    // device discovery, and they also do not need those.
399                    metrics_logger.log_warn(
400                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
401                        std::format!(
402                            "Input pipeline is unable to watch for new input devices: {:?}",
403                            err
404                        ));
405                }
406            }
407        })
408        .detach();
409
410        Ok(input_pipeline)
411    }
412
413    /// Gets the input device bindings.
414    pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
415        &self.input_device_bindings
416    }
417
418    /// Gets the input device sender: this is the channel that should be cloned
419    /// and used for injecting events from the drivers into the input pipeline.
420    pub fn input_event_sender(&self) -> &UnboundedSender<input_device::InputEvent> {
421        &self.device_event_sender
422    }
423
424    /// Gets a list of input device types supported by this input pipeline.
425    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
426        &self.input_device_types
427    }
428
429    /// Forwards all input events into the input pipeline.
430    pub async fn handle_input_events(mut self) {
431        let metrics_logger_clone = self.metrics_logger.clone();
432        while let Some(input_event) = self.device_event_receiver.next().await {
433            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
434                metrics_logger_clone.log_error(
435                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
436                    std::format!("could not forward event from driver: {:?}", &e));
437            }
438        }
439
440        metrics_logger_clone.log_error(
441            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
442            "Input pipeline stopped handling input events.".to_string(),
443        );
444    }
445
446    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
447    /// if new devices match a type in `device_types`.
448    ///
449    /// # Parameters
450    /// - `device_watcher`: Watches the input report directory for new devices.
451    /// - `dir_proxy`: The directory containing InputDevice connections.
452    /// - `device_types`: The types of devices to watch for.
453    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
454    /// - `bindings`: Holds all the InputDeviceBindings
455    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
456    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
457    /// - `metrics_logger`: The metrics logger.
458    ///
459    /// # Errors
460    /// If the input report directory or a file within it cannot be read.
461    async fn watch_for_devices(
462        mut device_watcher: Watcher,
463        dir_proxy: fio::DirectoryProxy,
464        device_types: Vec<input_device::InputDeviceType>,
465        input_event_sender: UnboundedSender<input_device::InputEvent>,
466        bindings: InputDeviceBindingHashMap,
467        input_devices_node: &fuchsia_inspect::Node,
468        break_on_idle: bool,
469        metrics_logger: metrics::MetricsLogger,
470    ) -> Result<(), Error> {
471        // Add non-static properties to inspect node.
472        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
473        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
474        while let Some(msg) = device_watcher.try_next().await? {
475            if let Ok(filename) = msg.filename.into_os_string().into_string() {
476                if filename == "." {
477                    continue;
478                }
479
480                let pathbuf = PathBuf::from(filename.clone());
481                match msg.event {
482                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
483                        log::info!("found input device {}", filename);
484                        devices_discovered.add(1);
485                        let device_proxy =
486                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
487                        add_device_bindings(
488                            &device_types,
489                            &filename,
490                            device_proxy,
491                            &input_event_sender,
492                            &bindings,
493                            get_next_device_id(),
494                            input_devices_node,
495                            Some(&devices_connected),
496                            metrics_logger.clone(),
497                        )
498                        .await;
499                    }
500                    WatchEvent::IDLE => {
501                        if break_on_idle {
502                            break;
503                        }
504                    }
505                    _ => (),
506                }
507            }
508        }
509        // Ensure inspect properties persist for debugging if device watch loop ends.
510        input_devices_node.record(devices_discovered);
511        input_devices_node.record(devices_connected);
512        Err(format_err!("Input pipeline stopped watching for new input devices."))
513    }
514
515    /// Handles the incoming InputDeviceRegistryRequestStream.
516    ///
517    /// This method will end when the request stream is closed. If the stream closes with an
518    /// error the error will be returned in the Result.
519    ///
520    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
521    ///
522    /// # Parameters
523    /// - `stream`: The stream of InputDeviceRegistryRequests.
524    /// - `device_types`: The types of devices to watch for.
525    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
526    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
527    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
528    /// - `metrics_logger`: The metrics logger.
529    pub async fn handle_input_device_registry_request_stream(
530        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
531        device_types: &Vec<input_device::InputDeviceType>,
532        input_event_sender: &UnboundedSender<input_device::InputEvent>,
533        bindings: &InputDeviceBindingHashMap,
534        input_devices_node: &fuchsia_inspect::Node,
535        metrics_logger: metrics::MetricsLogger,
536    ) -> Result<(), Error> {
537        while let Some(request) = stream
538            .try_next()
539            .await
540            .context("Error handling input device registry request stream")?
541        {
542            match request {
543                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
544                    device,
545                    ..
546                } => {
547                    // Add a binding if the device is a type being tracked
548                    let device_proxy = device.into_proxy();
549
550                    let device_id = get_next_device_id();
551
552                    add_device_bindings(
553                        device_types,
554                        &format!("input-device-registry-{}", device_id),
555                        device_proxy,
556                        input_event_sender,
557                        bindings,
558                        device_id,
559                        input_devices_node,
560                        None,
561                        metrics_logger.clone(),
562                    )
563                    .await;
564                }
565                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
566                    device,
567                    responder,
568                    .. } => {
569                    // Add a binding if the device is a type being tracked
570                    let device_proxy = device.into_proxy();
571
572                    let device_id = get_next_device_id();
573
574                    add_device_bindings(
575                        device_types,
576                        &format!("input-device-registry-{}", device_id),
577                        device_proxy,
578                        input_event_sender,
579                        bindings,
580                        device_id,
581                        input_devices_node,
582                        None,
583                        metrics_logger.clone(),
584                    )
585                    .await;
586
587                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
588                        device_id: Some(device_id),
589                        ..Default::default()
590                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
591                }
592            }
593        }
594
595        Ok(())
596    }
597
598    /// Starts all tasks in an asynchronous executor.
599    fn run(tasks: Vec<fuchsia_async::Task<()>>) {
600        fasync::Task::local(async move {
601            futures::future::join_all(tasks).await;
602            panic!("Runner task is not supposed to terminate.")
603        })
604        .detach();
605    }
606
607    /// Installs a handler that will print a warning for each event that is received
608    /// unhandled from this receiver.
609    fn catch_unhandled(mut receiver: UnboundedReceiver<input_device::InputEvent>) {
610        fasync::Task::local(async move {
611            while let Some(event) = receiver.next().await {
612                if event.handled == input_device::Handled::No {
613                    log::warn!("unhandled input event: {:?}", &event);
614                }
615            }
616            panic!("unhandled event catcher is not supposed to terminate.");
617        })
618        .detach();
619    }
620}
621
622/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
623///
624/// # Parameters
625/// - `device_types`: The types of devices to watch for.
626/// - `device_proxy`: A proxy to the input device.
627/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
628/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
629/// - `device_id`: The device id of the associated bindings.
630/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
631///
632/// # Note
633/// This will create multiple bindings, in the case where
634/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
635///   with multiple table fields populated, and
636/// * multiple populated table fields correspond to device types present in `device_types`
637///
638/// This is used, for example, to support the Atlas touchpad. In that case, a single
639/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
640/// a `fuchsia.input.report.TouchDescriptor`.
641async fn add_device_bindings(
642    device_types: &Vec<input_device::InputDeviceType>,
643    filename: &String,
644    device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
645    input_event_sender: &UnboundedSender<input_device::InputEvent>,
646    bindings: &InputDeviceBindingHashMap,
647    device_id: u32,
648    input_devices_node: &fuchsia_inspect::Node,
649    devices_connected: Option<&fuchsia_inspect::UintProperty>,
650    metrics_logger: metrics::MetricsLogger,
651) {
652    let mut matched_device_types = vec![];
653    if let Ok(descriptor) = device_proxy.get_descriptor().await {
654        for device_type in device_types {
655            if input_device::is_device_type(&descriptor, *device_type).await {
656                matched_device_types.push(device_type);
657                match devices_connected {
658                    Some(dev_connected) => {
659                        let _ = dev_connected.add(1);
660                    }
661                    None => (),
662                };
663            }
664        }
665        if matched_device_types.is_empty() {
666            log::info!(
667                "device {} did not match any supported device types: {:?}",
668                filename,
669                device_types
670            );
671            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
672            let mut health = fuchsia_inspect::health::Node::new(&device_node);
673            health.set_unhealthy("Unsupported device type.");
674            device_node.record(health);
675            input_devices_node.record(device_node);
676            return;
677        }
678    } else {
679        metrics_logger.clone().log_error(
680            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
681            std::format!("cannot bind device {} without a device descriptor", filename),
682        );
683        return;
684    }
685
686    log::info!(
687        "binding {} to device types: {}",
688        filename,
689        matched_device_types
690            .iter()
691            .fold(String::new(), |device_types_string, device_type| device_types_string
692                + &format!("{:?}, ", device_type))
693    );
694
695    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
696    for device_type in matched_device_types {
697        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
698        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
699        //
700        // There's no conflict in having multiple bindings read from the same node,
701        // since:
702        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
703        // * the device driver will copy each incoming report to each connected reader.
704        //
705        // This does mean that reports from the Atlas touchpad device get read twice
706        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
707        // is operating in mouse mode or touchpad mode.
708        //
709        // This hasn't been an issue because:
710        // * Semantically: things are fine, because each binding discards irrelevant reports.
711        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
712        // * Performance wise: things are fine, because the data rate of the touchpad is low
713        //   (125 HZ).
714        //
715        // If we add additional cases where bindings share an underlying `input-report` node,
716        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
717        let proxy = device_proxy.clone();
718        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
719        match input_device::get_device_binding(
720            *device_type,
721            proxy,
722            device_id,
723            input_event_sender.clone(),
724            device_node,
725            metrics_logger.clone(),
726        )
727        .await
728        {
729            Ok(binding) => new_bindings.push(binding),
730            Err(e) => {
731                metrics_logger.log_error(
732                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
733                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
734                );
735            }
736        }
737    }
738
739    if !new_bindings.is_empty() {
740        let mut bindings = bindings.lock().await;
741        bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
742    }
743}
744
745#[cfg(test)]
746mod tests {
747    use super::*;
748    use crate::input_device::InputDeviceBinding;
749    use crate::utils::Position;
750    use crate::{
751        fake_input_device_binding, mouse_binding, mouse_model_database,
752        observe_fake_events_input_handler,
753    };
754    use diagnostics_assertions::AnyProperty;
755    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
756    use fuchsia_async as fasync;
757    use futures::FutureExt;
758    use pretty_assertions::assert_eq;
759    use rand::Rng;
760    use std::collections::HashSet;
761    use vfs::{pseudo_directory, service as pseudo_fs_service};
762
763    const COUNTS_PER_MM: u32 = 12;
764
765    /// Returns the InputEvent sent over `sender`.
766    ///
767    /// # Parameters
768    /// - `sender`: The channel to send the InputEvent over.
769    fn send_input_event(
770        sender: UnboundedSender<input_device::InputEvent>,
771    ) -> input_device::InputEvent {
772        let mut rng = rand::thread_rng();
773        let offset = Position { x: rng.gen_range(0..10) as f32, y: rng.gen_range(0..10) as f32 };
774        let input_event = input_device::InputEvent {
775            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
776                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
777                    millimeters: Position {
778                        x: offset.x / COUNTS_PER_MM as f32,
779                        y: offset.y / COUNTS_PER_MM as f32,
780                    },
781                }),
782                None, /* wheel_delta_v */
783                None, /* wheel_delta_h */
784                mouse_binding::MousePhase::Move,
785                HashSet::new(),
786                HashSet::new(),
787                None, /* is_precision_scroll */
788            )),
789            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
790                mouse_binding::MouseDeviceDescriptor {
791                    device_id: 1,
792                    absolute_x_range: None,
793                    absolute_y_range: None,
794                    wheel_v_range: None,
795                    wheel_h_range: None,
796                    buttons: None,
797                    counts_per_mm: COUNTS_PER_MM,
798                },
799            ),
800            event_time: zx::MonotonicInstant::get(),
801            handled: input_device::Handled::No,
802            trace_id: None,
803        };
804        match sender.unbounded_send(input_event.clone()) {
805            Err(_) => assert!(false),
806            _ => {}
807        }
808
809        input_event
810    }
811
812    /// Returns a MouseDescriptor on an InputDeviceRequest.
813    ///
814    /// # Parameters
815    /// - `input_device_request`: The request to handle.
816    fn handle_input_device_request(
817        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
818    ) {
819        match input_device_request {
820            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
821                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
822                    device_information: None,
823                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
824                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
825                            movement_x: None,
826                            movement_y: None,
827                            scroll_v: None,
828                            scroll_h: None,
829                            buttons: Some(vec![0]),
830                            position_x: None,
831                            position_y: None,
832                            ..Default::default()
833                        }),
834                        ..Default::default()
835                    }),
836                    sensor: None,
837                    touch: None,
838                    keyboard: None,
839                    consumer_control: None,
840                    ..Default::default()
841                });
842            }
843            _ => {}
844        }
845    }
846
847    /// Tests that an input pipeline handles events from multiple devices.
848    #[fasync::run_singlethreaded(test)]
849    async fn multiple_devices_single_handler() {
850        // Create two fake device bindings.
851        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
852        let first_device_binding =
853            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
854        let second_device_binding =
855            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
856
857        // Create a fake input handler.
858        let (handler_event_sender, mut handler_event_receiver) =
859            futures::channel::mpsc::channel(100);
860        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
861            handler_event_sender,
862        );
863
864        // Build the input pipeline.
865        let (sender, receiver, tasks, _) =
866            InputPipelineAssembly::new(metrics::MetricsLogger::default())
867                .add_handler(input_handler)
868                .into_components();
869        let inspector = fuchsia_inspect::Inspector::default();
870        let test_node = inspector.root().create_child("input_pipeline");
871        let input_pipeline = InputPipeline {
872            pipeline_sender: sender,
873            device_event_sender,
874            device_event_receiver,
875            input_device_types: vec![],
876            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
877            inspect_node: test_node,
878            metrics_logger: metrics::MetricsLogger::default(),
879        };
880        InputPipeline::catch_unhandled(receiver);
881        InputPipeline::run(tasks);
882
883        // Send an input event from each device.
884        let first_device_event = send_input_event(first_device_binding.input_event_sender());
885        let second_device_event = send_input_event(second_device_binding.input_event_sender());
886
887        // Run the pipeline.
888        fasync::Task::local(async {
889            input_pipeline.handle_input_events().await;
890        })
891        .detach();
892
893        // Assert the handler receives the events.
894        let first_handled_event = handler_event_receiver.next().await;
895        assert_eq!(first_handled_event, Some(first_device_event));
896
897        let second_handled_event = handler_event_receiver.next().await;
898        assert_eq!(second_handled_event, Some(second_device_event));
899    }
900
901    /// Tests that an input pipeline handles events through multiple input handlers.
902    #[fasync::run_singlethreaded(test)]
903    async fn single_device_multiple_handlers() {
904        // Create two fake device bindings.
905        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
906        let input_device_binding =
907            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
908
909        // Create two fake input handlers.
910        let (first_handler_event_sender, mut first_handler_event_receiver) =
911            futures::channel::mpsc::channel(100);
912        let first_input_handler =
913            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
914                first_handler_event_sender,
915            );
916        let (second_handler_event_sender, mut second_handler_event_receiver) =
917            futures::channel::mpsc::channel(100);
918        let second_input_handler =
919            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
920                second_handler_event_sender,
921            );
922
923        // Build the input pipeline.
924        let (sender, receiver, tasks, _) =
925            InputPipelineAssembly::new(metrics::MetricsLogger::default())
926                .add_handler(first_input_handler)
927                .add_handler(second_input_handler)
928                .into_components();
929        let inspector = fuchsia_inspect::Inspector::default();
930        let test_node = inspector.root().create_child("input_pipeline");
931        let input_pipeline = InputPipeline {
932            pipeline_sender: sender,
933            device_event_sender,
934            device_event_receiver,
935            input_device_types: vec![],
936            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
937            inspect_node: test_node,
938            metrics_logger: metrics::MetricsLogger::default(),
939        };
940        InputPipeline::catch_unhandled(receiver);
941        InputPipeline::run(tasks);
942
943        // Send an input event.
944        let input_event = send_input_event(input_device_binding.input_event_sender());
945
946        // Run the pipeline.
947        fasync::Task::local(async {
948            input_pipeline.handle_input_events().await;
949        })
950        .detach();
951
952        // Assert both handlers receive the event.
953        let first_handler_event = first_handler_event_receiver.next().await;
954        assert_eq!(first_handler_event, Some(input_event.clone()));
955        let second_handler_event = second_handler_event_receiver.next().await;
956        assert_eq!(second_handler_event, Some(input_event));
957    }
958
959    /// Tests that a single mouse device binding is created for the one input device in the
960    /// input report directory.
961    #[fasync::run_singlethreaded(test)]
962    async fn watch_devices_one_match_exists() {
963        // Create a file in a pseudo directory that represents an input device.
964        let mut count: i8 = 0;
965        let dir = pseudo_directory! {
966            "file_name" => pseudo_fs_service::host(
967                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
968                    async move {
969                        while count < 3 {
970                            if let Some(input_device_request) =
971                                request_stream.try_next().await.unwrap()
972                            {
973                                handle_input_device_request(input_device_request);
974                                count += 1;
975                            }
976                        }
977
978                    }.boxed()
979                },
980            )
981        };
982
983        // Create a Watcher on the pseudo directory.
984        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
985        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
986        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
987        // proxy to get connections to input devices.
988        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
989
990        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
991        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
992        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
993
994        let inspector = fuchsia_inspect::Inspector::default();
995        let test_node = inspector.root().create_child("input_pipeline");
996        test_node.record_string(
997            "supported_input_devices",
998            supported_device_types.clone().iter().join(", "),
999        );
1000        let input_devices = test_node.create_child("input_devices");
1001        // Assert that inspect tree is initialized with no devices.
1002        diagnostics_assertions::assert_data_tree!(inspector, root: {
1003            input_pipeline: {
1004                supported_input_devices: "Mouse",
1005                input_devices: {}
1006            }
1007        });
1008
1009        let _ = InputPipeline::watch_for_devices(
1010            device_watcher,
1011            dir_proxy_for_pipeline,
1012            supported_device_types,
1013            input_event_sender,
1014            bindings.clone(),
1015            &input_devices,
1016            true, /* break_on_idle */
1017            metrics::MetricsLogger::default(),
1018        )
1019        .await;
1020
1021        // Assert that one mouse device with accurate device id was found.
1022        let bindings_hashmap = bindings.lock().await;
1023        assert_eq!(bindings_hashmap.len(), 1);
1024        let bindings_vector = bindings_hashmap.get(&10);
1025        assert!(bindings_vector.is_some());
1026        assert_eq!(bindings_vector.unwrap().len(), 1);
1027        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1028        assert!(boxed_mouse_binding.is_some());
1029        assert_eq!(
1030            boxed_mouse_binding.unwrap().get_device_descriptor(),
1031            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1032                device_id: 10,
1033                absolute_x_range: None,
1034                absolute_y_range: None,
1035                wheel_v_range: None,
1036                wheel_h_range: None,
1037                buttons: Some(vec![0]),
1038                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1039            })
1040        );
1041
1042        // Assert that inspect tree reflects new device discovered and connected.
1043        diagnostics_assertions::assert_data_tree!(inspector, root: {
1044            input_pipeline: {
1045                supported_input_devices: "Mouse",
1046                input_devices: {
1047                    devices_discovered: 1u64,
1048                    devices_connected: 1u64,
1049                    "file_name_Mouse": contains {
1050                        reports_received_count: 0u64,
1051                        reports_filtered_count: 0u64,
1052                        events_generated: 0u64,
1053                        last_received_timestamp_ns: 0u64,
1054                        last_generated_timestamp_ns: 0u64,
1055                        "fuchsia.inspect.Health": {
1056                            status: "OK",
1057                            // Timestamp value is unpredictable and not relevant in this context,
1058                            // so we only assert that the property is present.
1059                            start_timestamp_nanos: AnyProperty
1060                        },
1061                    }
1062                }
1063            }
1064        });
1065    }
1066
1067    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1068    /// but only a mouse exists.
1069    #[fasync::run_singlethreaded(test)]
1070    async fn watch_devices_no_matches_exist() {
1071        // Create a file in a pseudo directory that represents an input device.
1072        let mut count: i8 = 0;
1073        let dir = pseudo_directory! {
1074            "file_name" => pseudo_fs_service::host(
1075                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1076                    async move {
1077                        while count < 1 {
1078                            if let Some(input_device_request) =
1079                                request_stream.try_next().await.unwrap()
1080                            {
1081                                handle_input_device_request(input_device_request);
1082                                count += 1;
1083                            }
1084                        }
1085
1086                    }.boxed()
1087                },
1088            )
1089        };
1090
1091        // Create a Watcher on the pseudo directory.
1092        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1093        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1094        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1095        // proxy to get connections to input devices.
1096        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1097
1098        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1099        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1100        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1101
1102        let inspector = fuchsia_inspect::Inspector::default();
1103        let test_node = inspector.root().create_child("input_pipeline");
1104        test_node.record_string(
1105            "supported_input_devices",
1106            supported_device_types.clone().iter().join(", "),
1107        );
1108        let input_devices = test_node.create_child("input_devices");
1109        // Assert that inspect tree is initialized with no devices.
1110        diagnostics_assertions::assert_data_tree!(inspector, root: {
1111            input_pipeline: {
1112                supported_input_devices: "Keyboard",
1113                input_devices: {}
1114            }
1115        });
1116
1117        let _ = InputPipeline::watch_for_devices(
1118            device_watcher,
1119            dir_proxy_for_pipeline,
1120            supported_device_types,
1121            input_event_sender,
1122            bindings.clone(),
1123            &input_devices,
1124            true, /* break_on_idle */
1125            metrics::MetricsLogger::default(),
1126        )
1127        .await;
1128
1129        // Assert that no devices were found.
1130        let bindings = bindings.lock().await;
1131        assert_eq!(bindings.len(), 0);
1132
1133        // Assert that inspect tree reflects new device discovered, but not connected.
1134        diagnostics_assertions::assert_data_tree!(inspector, root: {
1135            input_pipeline: {
1136                supported_input_devices: "Keyboard",
1137                input_devices: {
1138                    devices_discovered: 1u64,
1139                    devices_connected: 0u64,
1140                    "file_name_Unsupported": {
1141                        "fuchsia.inspect.Health": {
1142                            status: "UNHEALTHY",
1143                            message: "Unsupported device type.",
1144                            // Timestamp value is unpredictable and not relevant in this context,
1145                            // so we only assert that the property is present.
1146                            start_timestamp_nanos: AnyProperty
1147                        },
1148                    }
1149                }
1150            }
1151        });
1152    }
1153
1154    /// Tests that a single keyboard device binding is created for the input device registered
1155    /// through InputDeviceRegistry.
1156    #[fasync::run_singlethreaded(test)]
1157    async fn handle_input_device_registry_request_stream() {
1158        let (input_device_registry_proxy, input_device_registry_request_stream) =
1159            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1160        let (input_device_client_end, mut input_device_request_stream) =
1161            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1162
1163        let device_types = vec![input_device::InputDeviceType::Mouse];
1164        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1165        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1166
1167        // Handle input device requests.
1168        let mut count: i8 = 0;
1169        fasync::Task::local(async move {
1170            // Register a device.
1171            let _ = input_device_registry_proxy.register(input_device_client_end);
1172
1173            while count < 3 {
1174                if let Some(input_device_request) =
1175                    input_device_request_stream.try_next().await.unwrap()
1176                {
1177                    handle_input_device_request(input_device_request);
1178                    count += 1;
1179                }
1180            }
1181
1182            // End handle_input_device_registry_request_stream() by taking the event stream.
1183            input_device_registry_proxy.take_event_stream();
1184        })
1185        .detach();
1186
1187        let inspector = fuchsia_inspect::Inspector::default();
1188        let test_node = inspector.root().create_child("input_pipeline");
1189
1190        // Start listening for InputDeviceRegistryRequests.
1191        let bindings_clone = bindings.clone();
1192        let _ = InputPipeline::handle_input_device_registry_request_stream(
1193            input_device_registry_request_stream,
1194            &device_types,
1195            &input_event_sender,
1196            &bindings_clone,
1197            &test_node,
1198            metrics::MetricsLogger::default(),
1199        )
1200        .await;
1201
1202        // Assert that a device was registered.
1203        let bindings = bindings.lock().await;
1204        assert_eq!(bindings.len(), 1);
1205    }
1206
1207    // Tests that correct properties are added to inspect node when InputPipeline is created.
1208    #[fasync::run_singlethreaded(test)]
1209    async fn check_inspect_node_has_correct_properties() {
1210        let device_types = vec![
1211            input_device::InputDeviceType::Touch,
1212            input_device::InputDeviceType::ConsumerControls,
1213        ];
1214        let inspector = fuchsia_inspect::Inspector::default();
1215        let test_node = inspector.root().create_child("input_pipeline");
1216        // Create fake input handler for assembly
1217        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1218            futures::channel::mpsc::channel(100);
1219        let fake_input_handler =
1220            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1221                fake_handler_event_sender,
1222            );
1223        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1224            .add_handler(fake_input_handler);
1225        let _test_input_pipeline = InputPipeline::new(
1226            device_types,
1227            assembly,
1228            test_node,
1229            metrics::MetricsLogger::default(),
1230        );
1231        diagnostics_assertions::assert_data_tree!(inspector, root: {
1232            input_pipeline: {
1233                supported_input_devices: "Touch, ConsumerControls",
1234                handlers_registered: 1u64,
1235                handlers_healthy: 1u64,
1236                input_devices: {}
1237            }
1238        });
1239    }
1240}