input_pipeline/
input_pipeline.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::display_ownership::DisplayOwnership;
6use crate::focus_listener::FocusListener;
7use crate::input_handler::UnhandledInputHandler;
8use crate::{input_device, input_handler, metrics};
9use anyhow::{Context, Error, format_err};
10use focus_chain_provider::FocusChainProviderPublisher;
11use fuchsia_fs::directory::{WatchEvent, Watcher};
12use fuchsia_inspect::NumericProperty;
13use fuchsia_inspect::health::Reporter;
14use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
15use futures::future::LocalBoxFuture;
16use futures::lock::Mutex;
17use futures::{StreamExt, TryStreamExt};
18use itertools::Itertools;
19use metrics_registry::*;
20use std::collections::HashMap;
21use std::path::PathBuf;
22use std::rc::Rc;
23use std::sync::atomic::{AtomicU32, Ordering};
24use std::sync::{Arc, LazyLock};
25use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
26
27/// Use a self incremental u32 unique id for device_id.
28///
29/// device id start from 10 to avoid conflict with default devices in Starnix.
30/// Currently, Starnix using 0 and 1 as default devices' id. Starnix need to
31/// use default devices to deliver events from physical devices until we have
32/// API to expose device changes to UI clients.
33static NEXT_DEVICE_ID: LazyLock<AtomicU32> = LazyLock::new(|| AtomicU32::new(10));
34
35/// Each time this function is invoked, it returns the current value of its
36/// internal counter (serving as a unique id for device_id) and then increments
37/// that counter in preparation for the next call.
38fn get_next_device_id() -> u32 {
39    NEXT_DEVICE_ID.fetch_add(1, Ordering::SeqCst)
40}
41
42type BoxedInputDeviceBinding = Box<dyn input_device::InputDeviceBinding>;
43
44/// An [`InputDeviceBindingHashMap`] maps an input device to one or more InputDeviceBindings.
45/// It uses unique device id as key.
46pub type InputDeviceBindingHashMap = Arc<Mutex<HashMap<u32, Vec<BoxedInputDeviceBinding>>>>;
47
48/// An input pipeline assembly.
49///
50/// Represents a partial stage of the input pipeline which accepts inputs through an asynchronous
51/// sender channel, and emits outputs through an asynchronous receiver channel.  Use [new] to
52/// create a new assembly.  Use [add_handler], or [add_all_handlers] to add the input pipeline
53/// handlers to use.  When done, [InputPipeline::new] can be used to make a new input pipeline.
54///
55/// # Implementation notes
56///
57/// Internally, when a new [InputPipelineAssembly] is created with multiple [InputHandler]s, the
58/// handlers are connected together using async queues.  This allows fully streamed processing of
59/// input events, and also allows some pipeline stages to generate events spontaneously, i.e.
60/// without an external stimulus.
61pub struct InputPipelineAssembly {
62    /// The top-level sender: send into this queue to inject an event into the input
63    /// pipeline.
64    sender: UnboundedSender<input_device::InputEvent>,
65    /// The bottom-level receiver: any events that fall through the entire pipeline can
66    /// be read from this receiver.
67    receiver: UnboundedReceiver<input_device::InputEvent>,
68    /// The input handlers that comprise the input pipeline.
69    handlers: Vec<Rc<dyn input_handler::InputHandler>>,
70
71    /// The display ownership watcher task.
72    display_ownership_fut: Option<LocalBoxFuture<'static, ()>>,
73
74    /// The focus listener task.
75    focus_listener_fut: Option<LocalBoxFuture<'static, ()>>,
76
77    /// The metrics logger.
78    metrics_logger: metrics::MetricsLogger,
79}
80
81impl InputPipelineAssembly {
82    /// Create a new but empty [InputPipelineAssembly]. Use [add_handler] or similar
83    /// to add new handlers to it.
84    pub fn new(metrics_logger: metrics::MetricsLogger) -> Self {
85        let (sender, receiver) = mpsc::unbounded();
86        InputPipelineAssembly {
87            sender,
88            receiver,
89            handlers: vec![],
90            metrics_logger,
91            display_ownership_fut: None,
92            focus_listener_fut: None,
93        }
94    }
95
96    /// Adds another [input_handler::InputHandler] into the [InputPipelineAssembly]. The handlers
97    /// are invoked in the order they are added. Returns `Self` for chaining.
98    pub fn add_handler(mut self, handler: Rc<dyn input_handler::InputHandler>) -> Self {
99        self.handlers.push(handler);
100        self
101    }
102
103    /// Adds all handlers into the assembly in the order they appear in `handlers`.
104    pub fn add_all_handlers(self, handlers: Vec<Rc<dyn input_handler::InputHandler>>) -> Self {
105        handlers.into_iter().fold(self, |assembly, handler| assembly.add_handler(handler))
106    }
107
108    pub fn add_display_ownership(
109        mut self,
110        display_ownership_event: zx::Event,
111        input_handlers_node: &fuchsia_inspect::Node,
112    ) -> InputPipelineAssembly {
113        let h = DisplayOwnership::new(display_ownership_event, input_handlers_node);
114        let metrics_logger_clone = self.metrics_logger.clone();
115        let h_clone = h.clone();
116        let sender_clone = self.sender.clone();
117        let display_ownership_fut = Box::pin(async move {
118            h_clone.clone().set_handler_healthy();
119            h_clone.clone()
120                .handle_ownership_change(sender_clone)
121                .await
122                .map_err(|e| {
123                    metrics_logger_clone.log_error(
124                        InputPipelineErrorMetricDimensionEvent::InputPipelineDisplayOwnershipIsNotSupposedToTerminate,
125                        std::format!(
126                            "display ownership is not supposed to terminate - this is likely a problem: {:?}", e));
127                        })
128                        .unwrap();
129            h_clone.set_handler_unhealthy("Receive loop terminated for handler: DisplayOwnership");
130        });
131        self.display_ownership_fut = Some(display_ownership_fut);
132        self.add_handler(h)
133    }
134
135    /// Deconstructs the assembly into constituent components, used when constructing
136    /// [InputPipeline].
137    ///
138    /// You should call [catch_unhandled] on the returned [async_channel::Receiver], and
139    /// [run] on the returned [fuchsia_async::Tasks] (or supply own equivalents).
140    fn into_components(
141        self,
142    ) -> (
143        UnboundedSender<input_device::InputEvent>,
144        UnboundedReceiver<input_device::InputEvent>,
145        Vec<Rc<dyn input_handler::InputHandler>>,
146        metrics::MetricsLogger,
147        Option<LocalBoxFuture<'static, ()>>,
148        Option<LocalBoxFuture<'static, ()>>,
149    ) {
150        (
151            self.sender,
152            self.receiver,
153            self.handlers,
154            self.metrics_logger,
155            self.display_ownership_fut,
156            self.focus_listener_fut,
157        )
158    }
159
160    pub fn add_focus_listener(
161        mut self,
162        focus_chain_publisher: FocusChainProviderPublisher,
163    ) -> Self {
164        let metrics_logger_clone = self.metrics_logger.clone();
165        let focus_listener_fut = Box::pin(async move {
166            if let Ok(mut focus_listener) =
167                FocusListener::new(focus_chain_publisher, metrics_logger_clone).map_err(|e| {
168                    log::warn!(
169                        "could not create focus listener, focus will not be dispatched: {:?}",
170                        e
171                    )
172                })
173            {
174                // This will await indefinitely and process focus messages in a loop, unless there
175                // is a problem.
176                let _result = focus_listener
177                    .dispatch_focus_changes()
178                    .await
179                    .map(|_| {
180                        log::warn!("dispatch focus loop ended, focus will no longer be dispatched")
181                    })
182                    .map_err(|e| {
183                        panic!("could not dispatch focus changes, this is a fatal error: {:?}", e)
184                    });
185            }
186        });
187        self.focus_listener_fut = Some(focus_listener_fut);
188        self
189    }
190}
191
192/// An [`InputPipeline`] manages input devices and propagates input events through input handlers.
193///
194/// On creation, clients declare what types of input devices an [`InputPipeline`] manages. The
195/// [`InputPipeline`] will continuously detect new input devices of supported type(s).
196///
197/// # Example
198/// ```
199/// let ime_handler =
200///     ImeHandler::new(scene_manager.session.clone(), scene_manager.compositor_id).await?;
201/// let touch_handler = TouchHandler::new(
202///     scene_manager.session.clone(),
203///     scene_manager.compositor_id,
204///     scene_manager.display_size
205/// ).await?;
206///
207/// let assembly = InputPipelineAssembly::new()
208///     .add_handler(Box::new(ime_handler)),
209///     .add_handler(Box::new(touch_handler)),
210/// let input_pipeline = InputPipeline::new(
211///     vec![
212///         input_device::InputDeviceType::Touch,
213///         input_device::InputDeviceType::Keyboard,
214///     ],
215///     assembly,
216/// );
217/// input_pipeline.handle_input_events().await;
218/// ```
219pub struct InputPipeline {
220    /// The entry point into the input handler pipeline. Incoming input events should
221    /// be inserted into this async queue, and the input pipeline will ensure that they
222    /// are propagated through all the input handlers in the appropriate sequence.
223    pipeline_sender: UnboundedSender<input_device::InputEvent>,
224
225    /// A clone of this sender is given to every InputDeviceBinding that this pipeline owns.
226    /// Each InputDeviceBinding will send InputEvents to the pipeline through this channel.
227    device_event_sender: UnboundedSender<input_device::InputEvent>,
228
229    /// Receives InputEvents from all InputDeviceBindings that this pipeline owns.
230    device_event_receiver: UnboundedReceiver<input_device::InputEvent>,
231
232    /// The types of devices this pipeline supports.
233    input_device_types: Vec<input_device::InputDeviceType>,
234
235    /// The InputDeviceBindings bound to this pipeline.
236    input_device_bindings: InputDeviceBindingHashMap,
237
238    /// This node is bound to the lifetime of this InputPipeline.
239    /// Inspect data will be dumped for this pipeline as long as it exists.
240    inspect_node: fuchsia_inspect::Node,
241
242    /// The metrics logger.
243    metrics_logger: metrics::MetricsLogger,
244}
245
246impl InputPipeline {
247    fn new_common(
248        input_device_types: Vec<input_device::InputDeviceType>,
249        assembly: InputPipelineAssembly,
250        inspect_node: fuchsia_inspect::Node,
251    ) -> Self {
252        let (
253            pipeline_sender,
254            receiver,
255            handlers,
256            metrics_logger,
257            display_ownership_fut,
258            focus_listener_fut,
259        ) = assembly.into_components();
260
261        let mut handlers_count = handlers.len();
262        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
263        if let Some(fut) = display_ownership_fut {
264            fasync::Task::local(fut).detach();
265            handlers_count += 1;
266        }
267
268        // TODO: b/469745447 - should use futures::select! instead of Task::local().detach().
269        if let Some(fut) = focus_listener_fut {
270            fasync::Task::local(fut).detach();
271            handlers_count += 1;
272        }
273
274        // Add properties to inspect node
275        inspect_node.record_string("supported_input_devices", input_device_types.iter().join(", "));
276        inspect_node.record_uint("handlers_registered", handlers_count as u64);
277        inspect_node.record_uint("handlers_healthy", handlers_count as u64);
278
279        // Initializes all handlers and starts the input pipeline loop.
280        InputPipeline::run(receiver, handlers);
281
282        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
283        let input_device_bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
284        InputPipeline {
285            pipeline_sender,
286            device_event_sender,
287            device_event_receiver,
288            input_device_types,
289            input_device_bindings,
290            inspect_node,
291            metrics_logger,
292        }
293    }
294
295    /// Creates a new [`InputPipeline`] for integration testing.
296    /// Unlike a production input pipeline, this pipeline will not monitor
297    /// `/dev/class/input-report` for devices.
298    ///
299    /// # Parameters
300    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
301    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
302    pub fn new_for_test(
303        input_device_types: Vec<input_device::InputDeviceType>,
304        assembly: InputPipelineAssembly,
305    ) -> Self {
306        let inspector = fuchsia_inspect::Inspector::default();
307        let root = inspector.root();
308        let test_node = root.create_child("input_pipeline");
309        Self::new_common(input_device_types, assembly, test_node)
310    }
311
312    /// Creates a new [`InputPipeline`] for production use.
313    ///
314    /// # Parameters
315    /// - `input_device_types`: The types of devices the new [`InputPipeline`] will support.
316    /// - `assembly`: The input handlers that the [`InputPipeline`] sends InputEvents to.
317    /// - `inspect_node`: The root node for InputPipeline's Inspect tree
318    pub fn new(
319        input_device_types: Vec<input_device::InputDeviceType>,
320        assembly: InputPipelineAssembly,
321        inspect_node: fuchsia_inspect::Node,
322        metrics_logger: metrics::MetricsLogger,
323    ) -> Result<Self, Error> {
324        let input_pipeline = Self::new_common(input_device_types, assembly, inspect_node);
325        let input_device_types = input_pipeline.input_device_types.clone();
326        let input_event_sender = input_pipeline.device_event_sender.clone();
327        let input_device_bindings = input_pipeline.input_device_bindings.clone();
328        let devices_node = input_pipeline.inspect_node.create_child("input_devices");
329        fasync::Task::local(async move {
330            // Watches the input device directory for new input devices. Creates new InputDeviceBindings
331            // that send InputEvents to `input_event_receiver`.
332            match async {
333                let dir_proxy = fuchsia_fs::directory::open_in_namespace(
334                    input_device::INPUT_REPORT_PATH,
335                    fuchsia_fs::PERM_READABLE,
336                )
337                .with_context(|| format!("failed to open {}", input_device::INPUT_REPORT_PATH))?;
338                let device_watcher =
339                    Watcher::new(&dir_proxy).await.context("failed to create watcher")?;
340                Self::watch_for_devices(
341                    device_watcher,
342                    dir_proxy,
343                    input_device_types,
344                    input_event_sender,
345                    input_device_bindings,
346                    &devices_node,
347                    false, /* break_on_idle */
348                    metrics_logger.clone(),
349                )
350                .await
351                .context("failed to watch for devices")
352            }
353            .await
354            {
355                Ok(()) => {}
356                Err(err) => {
357                    // This error is usually benign in tests: it means that the setup does not
358                    // support dynamic device discovery. Almost no tests support dynamic
359                    // device discovery, and they also do not need those.
360                    metrics_logger.log_warn(
361                        InputPipelineErrorMetricDimensionEvent::InputPipelineUnableToWatchForNewInputDevices,
362                        std::format!(
363                            "Input pipeline is unable to watch for new input devices: {:?}",
364                            err
365                        ));
366                }
367            }
368        })
369        .detach();
370
371        Ok(input_pipeline)
372    }
373
374    /// Gets the input device bindings.
375    pub fn input_device_bindings(&self) -> &InputDeviceBindingHashMap {
376        &self.input_device_bindings
377    }
378
379    /// Gets the input device sender: this is the channel that should be cloned
380    /// and used for injecting events from the drivers into the input pipeline.
381    pub fn input_event_sender(&self) -> &UnboundedSender<input_device::InputEvent> {
382        &self.device_event_sender
383    }
384
385    /// Gets a list of input device types supported by this input pipeline.
386    pub fn input_device_types(&self) -> &Vec<input_device::InputDeviceType> {
387        &self.input_device_types
388    }
389
390    /// Forwards all input events into the input pipeline.
391    pub async fn handle_input_events(mut self) {
392        let metrics_logger_clone = self.metrics_logger.clone();
393        while let Some(input_event) = self.device_event_receiver.next().await {
394            if let Err(e) = self.pipeline_sender.unbounded_send(input_event) {
395                metrics_logger_clone.log_error(
396                    InputPipelineErrorMetricDimensionEvent::InputPipelineCouldNotForwardEventFromDriver,
397                    std::format!("could not forward event from driver: {:?}", &e));
398            }
399        }
400
401        metrics_logger_clone.log_error(
402            InputPipelineErrorMetricDimensionEvent::InputPipelineStopHandlingEvents,
403            "Input pipeline stopped handling input events.".to_string(),
404        );
405    }
406
407    /// Watches the input report directory for new input devices. Creates InputDeviceBindings
408    /// if new devices match a type in `device_types`.
409    ///
410    /// # Parameters
411    /// - `device_watcher`: Watches the input report directory for new devices.
412    /// - `dir_proxy`: The directory containing InputDevice connections.
413    /// - `device_types`: The types of devices to watch for.
414    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
415    /// - `bindings`: Holds all the InputDeviceBindings
416    /// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
417    /// - `break_on_idle`: If true, stops watching for devices once all existing devices are handled.
418    /// - `metrics_logger`: The metrics logger.
419    ///
420    /// # Errors
421    /// If the input report directory or a file within it cannot be read.
422    async fn watch_for_devices(
423        mut device_watcher: Watcher,
424        dir_proxy: fio::DirectoryProxy,
425        device_types: Vec<input_device::InputDeviceType>,
426        input_event_sender: UnboundedSender<input_device::InputEvent>,
427        bindings: InputDeviceBindingHashMap,
428        input_devices_node: &fuchsia_inspect::Node,
429        break_on_idle: bool,
430        metrics_logger: metrics::MetricsLogger,
431    ) -> Result<(), Error> {
432        // Add non-static properties to inspect node.
433        let devices_discovered = input_devices_node.create_uint("devices_discovered", 0);
434        let devices_connected = input_devices_node.create_uint("devices_connected", 0);
435        while let Some(msg) = device_watcher.try_next().await? {
436            if let Ok(filename) = msg.filename.into_os_string().into_string() {
437                if filename == "." {
438                    continue;
439                }
440
441                let pathbuf = PathBuf::from(filename.clone());
442                match msg.event {
443                    WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
444                        log::info!("found input device {}", filename);
445                        devices_discovered.add(1);
446                        let device_proxy =
447                            input_device::get_device_from_dir_entry_path(&dir_proxy, &pathbuf)?;
448                        add_device_bindings(
449                            &device_types,
450                            &filename,
451                            device_proxy,
452                            &input_event_sender,
453                            &bindings,
454                            get_next_device_id(),
455                            input_devices_node,
456                            Some(&devices_connected),
457                            metrics_logger.clone(),
458                        )
459                        .await;
460                    }
461                    WatchEvent::IDLE => {
462                        if break_on_idle {
463                            break;
464                        }
465                    }
466                    _ => (),
467                }
468            }
469        }
470        // Ensure inspect properties persist for debugging if device watch loop ends.
471        input_devices_node.record(devices_discovered);
472        input_devices_node.record(devices_connected);
473        Err(format_err!("Input pipeline stopped watching for new input devices."))
474    }
475
476    /// Handles the incoming InputDeviceRegistryRequestStream.
477    ///
478    /// This method will end when the request stream is closed. If the stream closes with an
479    /// error the error will be returned in the Result.
480    ///
481    /// **NOTE**: Only one stream is handled at a time. https://fxbug.dev/42061078
482    ///
483    /// # Parameters
484    /// - `stream`: The stream of InputDeviceRegistryRequests.
485    /// - `device_types`: The types of devices to watch for.
486    /// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
487    /// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
488    /// - `input_devices_node`: The parent node for all injected devices' inspect nodes.
489    /// - `metrics_logger`: The metrics logger.
490    pub async fn handle_input_device_registry_request_stream(
491        mut stream: fidl_fuchsia_input_injection::InputDeviceRegistryRequestStream,
492        device_types: &Vec<input_device::InputDeviceType>,
493        input_event_sender: &UnboundedSender<input_device::InputEvent>,
494        bindings: &InputDeviceBindingHashMap,
495        input_devices_node: &fuchsia_inspect::Node,
496        metrics_logger: metrics::MetricsLogger,
497    ) -> Result<(), Error> {
498        while let Some(request) = stream
499            .try_next()
500            .await
501            .context("Error handling input device registry request stream")?
502        {
503            match request {
504                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::Register {
505                    device,
506                    ..
507                } => {
508                    // Add a binding if the device is a type being tracked
509                    let device_proxy = device.into_proxy();
510
511                    let device_id = get_next_device_id();
512
513                    add_device_bindings(
514                        device_types,
515                        &format!("input-device-registry-{}", device_id),
516                        device_proxy,
517                        input_event_sender,
518                        bindings,
519                        device_id,
520                        input_devices_node,
521                        None,
522                        metrics_logger.clone(),
523                    )
524                    .await;
525                }
526                fidl_fuchsia_input_injection::InputDeviceRegistryRequest::RegisterAndGetDeviceInfo {
527                    device,
528                    responder,
529                    .. } => {
530                    // Add a binding if the device is a type being tracked
531                    let device_proxy = device.into_proxy();
532
533                    let device_id = get_next_device_id();
534
535                    add_device_bindings(
536                        device_types,
537                        &format!("input-device-registry-{}", device_id),
538                        device_proxy,
539                        input_event_sender,
540                        bindings,
541                        device_id,
542                        input_devices_node,
543                        None,
544                        metrics_logger.clone(),
545                    )
546                    .await;
547
548                    responder.send(fidl_fuchsia_input_injection::InputDeviceRegistryRegisterAndGetDeviceInfoResponse{
549                        device_id: Some(device_id),
550                        ..Default::default()
551                    }).expect("Failed to respond to RegisterAndGetDeviceInfo request");
552                }
553            }
554        }
555
556        Ok(())
557    }
558
559    /// Initializes all handlers and starts the input pipeline loop in an asynchronous executor.
560    fn run(
561        mut receiver: UnboundedReceiver<input_device::InputEvent>,
562        handlers: Vec<Rc<dyn input_handler::InputHandler>>,
563    ) {
564        fasync::Task::local(async move {
565            for handler in &handlers {
566                handler.clone().set_handler_healthy();
567            }
568
569            while let Some(event) = receiver.next().await {
570                let mut events = vec![event];
571                for handler in &handlers {
572                    let mut next_events = vec![];
573                    let handler_name = handler.get_name();
574                    for e in events {
575                        let out_events = {
576                            let _async_trace = fuchsia_trace::async_enter!(
577                                fuchsia_trace::Id::random(),
578                                "input",
579                                "handle_input_event",
580                                "name" => handler_name
581                            );
582                            handler.clone().handle_input_event(e).await
583                        };
584                        next_events.extend(out_events);
585                    }
586                    events = next_events;
587                }
588
589                for event in events {
590                    if event.handled == input_device::Handled::No {
591                        log::warn!("unhandled input event: {:?}", &event);
592                    }
593                }
594            }
595            for handler in &handlers {
596                handler.clone().set_handler_unhealthy("Pipeline loop terminated");
597            }
598            panic!("Runner task is not supposed to terminate.")
599        })
600        .detach();
601    }
602}
603
604/// Adds `InputDeviceBinding`s to `bindings` for all `device_types` exposed by `device_proxy`.
605///
606/// # Parameters
607/// - `device_types`: The types of devices to watch for.
608/// - `device_proxy`: A proxy to the input device.
609/// - `input_event_sender`: The channel new InputDeviceBindings will send InputEvents to.
610/// - `bindings`: Holds all the InputDeviceBindings associated with the InputPipeline.
611/// - `device_id`: The device id of the associated bindings.
612/// - `input_devices_node`: The parent node for all device bindings' inspect nodes.
613///
614/// # Note
615/// This will create multiple bindings, in the case where
616/// * `device_proxy().get_descriptor()` returns a `fidl_fuchsia_input_report::DeviceDescriptor`
617///   with multiple table fields populated, and
618/// * multiple populated table fields correspond to device types present in `device_types`
619///
620/// This is used, for example, to support the Atlas touchpad. In that case, a single
621/// node in `/dev/class/input-report` provides both a `fuchsia.input.report.MouseDescriptor` and
622/// a `fuchsia.input.report.TouchDescriptor`.
623async fn add_device_bindings(
624    device_types: &Vec<input_device::InputDeviceType>,
625    filename: &String,
626    device_proxy: fidl_fuchsia_input_report::InputDeviceProxy,
627    input_event_sender: &UnboundedSender<input_device::InputEvent>,
628    bindings: &InputDeviceBindingHashMap,
629    device_id: u32,
630    input_devices_node: &fuchsia_inspect::Node,
631    devices_connected: Option<&fuchsia_inspect::UintProperty>,
632    metrics_logger: metrics::MetricsLogger,
633) {
634    let mut matched_device_types = vec![];
635    if let Ok(descriptor) = device_proxy.get_descriptor().await {
636        for device_type in device_types {
637            if input_device::is_device_type(&descriptor, *device_type).await {
638                matched_device_types.push(device_type);
639                match devices_connected {
640                    Some(dev_connected) => {
641                        let _ = dev_connected.add(1);
642                    }
643                    None => (),
644                };
645            }
646        }
647        if matched_device_types.is_empty() {
648            log::info!(
649                "device {} did not match any supported device types: {:?}",
650                filename,
651                device_types
652            );
653            let device_node = input_devices_node.create_child(format!("{}_Unsupported", filename));
654            let mut health = fuchsia_inspect::health::Node::new(&device_node);
655            health.set_unhealthy("Unsupported device type.");
656            device_node.record(health);
657            input_devices_node.record(device_node);
658            return;
659        }
660    } else {
661        metrics_logger.clone().log_error(
662            InputPipelineErrorMetricDimensionEvent::InputPipelineNoDeviceDescriptor,
663            std::format!("cannot bind device {} without a device descriptor", filename),
664        );
665        return;
666    }
667
668    log::info!(
669        "binding {} to device types: {}",
670        filename,
671        matched_device_types
672            .iter()
673            .fold(String::new(), |device_types_string, device_type| device_types_string
674                + &format!("{:?}, ", device_type))
675    );
676
677    let mut new_bindings: Vec<BoxedInputDeviceBinding> = vec![];
678    for device_type in matched_device_types {
679        // Clone `device_proxy`, so that multiple bindings (e.g. a `MouseBinding` and a
680        // `TouchBinding`) can read data from the same `/dev/class/input-report` node.
681        //
682        // There's no conflict in having multiple bindings read from the same node,
683        // since:
684        // * each binding will create its own `fuchsia.input.report.InputReportsReader`, and
685        // * the device driver will copy each incoming report to each connected reader.
686        //
687        // This does mean that reports from the Atlas touchpad device get read twice
688        // (by a `MouseBinding` and a `TouchBinding`), regardless of whether the device
689        // is operating in mouse mode or touchpad mode.
690        //
691        // This hasn't been an issue because:
692        // * Semantically: things are fine, because each binding discards irrelevant reports.
693        //   (E.g. `MouseBinding` discards anything that isn't a `MouseInputReport`), and
694        // * Performance wise: things are fine, because the data rate of the touchpad is low
695        //   (125 HZ).
696        //
697        // If we add additional cases where bindings share an underlying `input-report` node,
698        // we might consider adding a multiplexing binding, to avoid reading duplicate reports.
699        let proxy = device_proxy.clone();
700        let device_node = input_devices_node.create_child(format!("{}_{}", filename, device_type));
701        match input_device::get_device_binding(
702            *device_type,
703            proxy,
704            device_id,
705            input_event_sender.clone(),
706            device_node,
707            metrics_logger.clone(),
708        )
709        .await
710        {
711            Ok(binding) => new_bindings.push(binding),
712            Err(e) => {
713                metrics_logger.log_error(
714                    InputPipelineErrorMetricDimensionEvent::InputPipelineFailedToBind,
715                    std::format!("failed to bind {} as {:?}: {}", filename, device_type, e),
716                );
717            }
718        }
719    }
720
721    if !new_bindings.is_empty() {
722        let mut bindings = bindings.lock().await;
723        bindings.entry(device_id).or_insert(Vec::new()).extend(new_bindings);
724    }
725}
726
727#[cfg(test)]
728mod tests {
729    use super::*;
730    use crate::input_device::InputDeviceBinding;
731    use crate::utils::Position;
732    use crate::{
733        fake_input_device_binding, mouse_binding, mouse_model_database,
734        observe_fake_events_input_handler,
735    };
736    use diagnostics_assertions::AnyProperty;
737    use fidl::endpoints::{create_proxy_and_stream, create_request_stream};
738    use fuchsia_async as fasync;
739    use futures::FutureExt;
740    use pretty_assertions::assert_eq;
741    use rand::Rng;
742    use std::collections::HashSet;
743    use vfs::{pseudo_directory, service as pseudo_fs_service};
744
745    const COUNTS_PER_MM: u32 = 12;
746
747    /// Returns the InputEvent sent over `sender`.
748    ///
749    /// # Parameters
750    /// - `sender`: The channel to send the InputEvent over.
751    fn send_input_event(
752        sender: UnboundedSender<input_device::InputEvent>,
753    ) -> input_device::InputEvent {
754        let mut rng = rand::rng();
755        let offset =
756            Position { x: rng.random_range(0..10) as f32, y: rng.random_range(0..10) as f32 };
757        let input_event = input_device::InputEvent {
758            device_event: input_device::InputDeviceEvent::Mouse(mouse_binding::MouseEvent::new(
759                mouse_binding::MouseLocation::Relative(mouse_binding::RelativeLocation {
760                    millimeters: Position {
761                        x: offset.x / COUNTS_PER_MM as f32,
762                        y: offset.y / COUNTS_PER_MM as f32,
763                    },
764                }),
765                None, /* wheel_delta_v */
766                None, /* wheel_delta_h */
767                mouse_binding::MousePhase::Move,
768                HashSet::new(),
769                HashSet::new(),
770                None, /* is_precision_scroll */
771                None, /* wake_lease */
772            )),
773            device_descriptor: input_device::InputDeviceDescriptor::Mouse(
774                mouse_binding::MouseDeviceDescriptor {
775                    device_id: 1,
776                    absolute_x_range: None,
777                    absolute_y_range: None,
778                    wheel_v_range: None,
779                    wheel_h_range: None,
780                    buttons: None,
781                    counts_per_mm: COUNTS_PER_MM,
782                },
783            ),
784            event_time: zx::MonotonicInstant::get(),
785            handled: input_device::Handled::No,
786            trace_id: None,
787        };
788        match sender.unbounded_send(input_event.clone()) {
789            Err(_) => assert!(false),
790            _ => {}
791        }
792
793        input_event
794    }
795
796    /// Returns a MouseDescriptor on an InputDeviceRequest.
797    ///
798    /// # Parameters
799    /// - `input_device_request`: The request to handle.
800    fn handle_input_device_request(
801        input_device_request: fidl_fuchsia_input_report::InputDeviceRequest,
802    ) {
803        match input_device_request {
804            fidl_fuchsia_input_report::InputDeviceRequest::GetDescriptor { responder } => {
805                let _ = responder.send(&fidl_fuchsia_input_report::DeviceDescriptor {
806                    device_information: None,
807                    mouse: Some(fidl_fuchsia_input_report::MouseDescriptor {
808                        input: Some(fidl_fuchsia_input_report::MouseInputDescriptor {
809                            movement_x: None,
810                            movement_y: None,
811                            scroll_v: None,
812                            scroll_h: None,
813                            buttons: Some(vec![0]),
814                            position_x: None,
815                            position_y: None,
816                            ..Default::default()
817                        }),
818                        ..Default::default()
819                    }),
820                    sensor: None,
821                    touch: None,
822                    keyboard: None,
823                    consumer_control: None,
824                    ..Default::default()
825                });
826            }
827            _ => {}
828        }
829    }
830
831    /// Tests that an input pipeline handles events from multiple devices.
832    #[fasync::run_singlethreaded(test)]
833    async fn multiple_devices_single_handler() {
834        // Create two fake device bindings.
835        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
836        let first_device_binding =
837            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
838        let second_device_binding =
839            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
840
841        // Create a fake input handler.
842        let (handler_event_sender, mut handler_event_receiver) =
843            futures::channel::mpsc::channel(100);
844        let input_handler = observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
845            handler_event_sender,
846        );
847
848        // Build the input pipeline.
849        let (sender, receiver, handlers, _, _, _) =
850            InputPipelineAssembly::new(metrics::MetricsLogger::default())
851                .add_handler(input_handler)
852                .into_components();
853        let inspector = fuchsia_inspect::Inspector::default();
854        let test_node = inspector.root().create_child("input_pipeline");
855        let input_pipeline = InputPipeline {
856            pipeline_sender: sender,
857            device_event_sender,
858            device_event_receiver,
859            input_device_types: vec![],
860            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
861            inspect_node: test_node,
862            metrics_logger: metrics::MetricsLogger::default(),
863        };
864        InputPipeline::run(receiver, handlers);
865
866        // Send an input event from each device.
867        let first_device_event = send_input_event(first_device_binding.input_event_sender());
868        let second_device_event = send_input_event(second_device_binding.input_event_sender());
869
870        // Run the pipeline.
871        fasync::Task::local(async {
872            input_pipeline.handle_input_events().await;
873        })
874        .detach();
875
876        // Assert the handler receives the events.
877        let first_handled_event = handler_event_receiver.next().await;
878        assert_eq!(first_handled_event, Some(first_device_event));
879
880        let second_handled_event = handler_event_receiver.next().await;
881        assert_eq!(second_handled_event, Some(second_device_event));
882    }
883
884    /// Tests that an input pipeline handles events through multiple input handlers.
885    #[fasync::run_singlethreaded(test)]
886    async fn single_device_multiple_handlers() {
887        // Create two fake device bindings.
888        let (device_event_sender, device_event_receiver) = futures::channel::mpsc::unbounded();
889        let input_device_binding =
890            fake_input_device_binding::FakeInputDeviceBinding::new(device_event_sender.clone());
891
892        // Create two fake input handlers.
893        let (first_handler_event_sender, mut first_handler_event_receiver) =
894            futures::channel::mpsc::channel(100);
895        let first_input_handler =
896            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
897                first_handler_event_sender,
898            );
899        let (second_handler_event_sender, mut second_handler_event_receiver) =
900            futures::channel::mpsc::channel(100);
901        let second_input_handler =
902            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
903                second_handler_event_sender,
904            );
905
906        // Build the input pipeline.
907        let (sender, receiver, handlers, _, _, _) =
908            InputPipelineAssembly::new(metrics::MetricsLogger::default())
909                .add_handler(first_input_handler)
910                .add_handler(second_input_handler)
911                .into_components();
912        let inspector = fuchsia_inspect::Inspector::default();
913        let test_node = inspector.root().create_child("input_pipeline");
914        let input_pipeline = InputPipeline {
915            pipeline_sender: sender,
916            device_event_sender,
917            device_event_receiver,
918            input_device_types: vec![],
919            input_device_bindings: Arc::new(Mutex::new(HashMap::new())),
920            inspect_node: test_node,
921            metrics_logger: metrics::MetricsLogger::default(),
922        };
923        InputPipeline::run(receiver, handlers);
924
925        // Send an input event.
926        let input_event = send_input_event(input_device_binding.input_event_sender());
927
928        // Run the pipeline.
929        fasync::Task::local(async {
930            input_pipeline.handle_input_events().await;
931        })
932        .detach();
933
934        // Assert both handlers receive the event.
935        let first_handler_event = first_handler_event_receiver.next().await;
936        assert_eq!(first_handler_event, Some(input_event.clone()));
937        let second_handler_event = second_handler_event_receiver.next().await;
938        assert_eq!(second_handler_event, Some(input_event));
939    }
940
941    /// Tests that a single mouse device binding is created for the one input device in the
942    /// input report directory.
943    #[fasync::run_singlethreaded(test)]
944    async fn watch_devices_one_match_exists() {
945        // Create a file in a pseudo directory that represents an input device.
946        let mut count: i8 = 0;
947        let dir = pseudo_directory! {
948            "file_name" => pseudo_fs_service::host(
949                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
950                    async move {
951                        while count < 3 {
952                            if let Some(input_device_request) =
953                                request_stream.try_next().await.unwrap()
954                            {
955                                handle_input_device_request(input_device_request);
956                                count += 1;
957                            }
958                        }
959
960                    }.boxed()
961                },
962            )
963        };
964
965        // Create a Watcher on the pseudo directory.
966        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
967        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
968        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
969        // proxy to get connections to input devices.
970        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
971
972        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
973        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
974        let supported_device_types = vec![input_device::InputDeviceType::Mouse];
975
976        let inspector = fuchsia_inspect::Inspector::default();
977        let test_node = inspector.root().create_child("input_pipeline");
978        test_node.record_string(
979            "supported_input_devices",
980            supported_device_types.clone().iter().join(", "),
981        );
982        let input_devices = test_node.create_child("input_devices");
983        // Assert that inspect tree is initialized with no devices.
984        diagnostics_assertions::assert_data_tree!(inspector, root: {
985            input_pipeline: {
986                supported_input_devices: "Mouse",
987                input_devices: {}
988            }
989        });
990
991        let _ = InputPipeline::watch_for_devices(
992            device_watcher,
993            dir_proxy_for_pipeline,
994            supported_device_types,
995            input_event_sender,
996            bindings.clone(),
997            &input_devices,
998            true, /* break_on_idle */
999            metrics::MetricsLogger::default(),
1000        )
1001        .await;
1002
1003        // Assert that one mouse device with accurate device id was found.
1004        let bindings_hashmap = bindings.lock().await;
1005        assert_eq!(bindings_hashmap.len(), 1);
1006        let bindings_vector = bindings_hashmap.get(&10);
1007        assert!(bindings_vector.is_some());
1008        assert_eq!(bindings_vector.unwrap().len(), 1);
1009        let boxed_mouse_binding = bindings_vector.unwrap().get(0);
1010        assert!(boxed_mouse_binding.is_some());
1011        assert_eq!(
1012            boxed_mouse_binding.unwrap().get_device_descriptor(),
1013            input_device::InputDeviceDescriptor::Mouse(mouse_binding::MouseDeviceDescriptor {
1014                device_id: 10,
1015                absolute_x_range: None,
1016                absolute_y_range: None,
1017                wheel_v_range: None,
1018                wheel_h_range: None,
1019                buttons: Some(vec![0]),
1020                counts_per_mm: mouse_model_database::db::DEFAULT_COUNTS_PER_MM,
1021            })
1022        );
1023
1024        // Assert that inspect tree reflects new device discovered and connected.
1025        diagnostics_assertions::assert_data_tree!(inspector, root: {
1026            input_pipeline: {
1027                supported_input_devices: "Mouse",
1028                input_devices: {
1029                    devices_discovered: 1u64,
1030                    devices_connected: 1u64,
1031                    "file_name_Mouse": contains {
1032                        reports_received_count: 0u64,
1033                        reports_filtered_count: 0u64,
1034                        events_generated: 0u64,
1035                        last_received_timestamp_ns: 0u64,
1036                        last_generated_timestamp_ns: 0u64,
1037                        "fuchsia.inspect.Health": {
1038                            status: "OK",
1039                            // Timestamp value is unpredictable and not relevant in this context,
1040                            // so we only assert that the property is present.
1041                            start_timestamp_nanos: AnyProperty
1042                        },
1043                    }
1044                }
1045            }
1046        });
1047    }
1048
1049    /// Tests that no device bindings are created because the input pipeline looks for keyboard devices
1050    /// but only a mouse exists.
1051    #[fasync::run_singlethreaded(test)]
1052    async fn watch_devices_no_matches_exist() {
1053        // Create a file in a pseudo directory that represents an input device.
1054        let mut count: i8 = 0;
1055        let dir = pseudo_directory! {
1056            "file_name" => pseudo_fs_service::host(
1057                move |mut request_stream: fidl_fuchsia_input_report::InputDeviceRequestStream| {
1058                    async move {
1059                        while count < 1 {
1060                            if let Some(input_device_request) =
1061                                request_stream.try_next().await.unwrap()
1062                            {
1063                                handle_input_device_request(input_device_request);
1064                                count += 1;
1065                            }
1066                        }
1067
1068                    }.boxed()
1069                },
1070            )
1071        };
1072
1073        // Create a Watcher on the pseudo directory.
1074        let dir_proxy_for_watcher = vfs::directory::serve_read_only(dir.clone());
1075        let device_watcher = Watcher::new(&dir_proxy_for_watcher).await.unwrap();
1076        // Get a proxy to the pseudo directory for the input pipeline. The input pipeline uses this
1077        // proxy to get connections to input devices.
1078        let dir_proxy_for_pipeline = vfs::directory::serve_read_only(dir);
1079
1080        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1081        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1082        let supported_device_types = vec![input_device::InputDeviceType::Keyboard];
1083
1084        let inspector = fuchsia_inspect::Inspector::default();
1085        let test_node = inspector.root().create_child("input_pipeline");
1086        test_node.record_string(
1087            "supported_input_devices",
1088            supported_device_types.clone().iter().join(", "),
1089        );
1090        let input_devices = test_node.create_child("input_devices");
1091        // Assert that inspect tree is initialized with no devices.
1092        diagnostics_assertions::assert_data_tree!(inspector, root: {
1093            input_pipeline: {
1094                supported_input_devices: "Keyboard",
1095                input_devices: {}
1096            }
1097        });
1098
1099        let _ = InputPipeline::watch_for_devices(
1100            device_watcher,
1101            dir_proxy_for_pipeline,
1102            supported_device_types,
1103            input_event_sender,
1104            bindings.clone(),
1105            &input_devices,
1106            true, /* break_on_idle */
1107            metrics::MetricsLogger::default(),
1108        )
1109        .await;
1110
1111        // Assert that no devices were found.
1112        let bindings = bindings.lock().await;
1113        assert_eq!(bindings.len(), 0);
1114
1115        // Assert that inspect tree reflects new device discovered, but not connected.
1116        diagnostics_assertions::assert_data_tree!(inspector, root: {
1117            input_pipeline: {
1118                supported_input_devices: "Keyboard",
1119                input_devices: {
1120                    devices_discovered: 1u64,
1121                    devices_connected: 0u64,
1122                    "file_name_Unsupported": {
1123                        "fuchsia.inspect.Health": {
1124                            status: "UNHEALTHY",
1125                            message: "Unsupported device type.",
1126                            // Timestamp value is unpredictable and not relevant in this context,
1127                            // so we only assert that the property is present.
1128                            start_timestamp_nanos: AnyProperty
1129                        },
1130                    }
1131                }
1132            }
1133        });
1134    }
1135
1136    /// Tests that a single keyboard device binding is created for the input device registered
1137    /// through InputDeviceRegistry.
1138    #[fasync::run_singlethreaded(test)]
1139    async fn handle_input_device_registry_request_stream() {
1140        let (input_device_registry_proxy, input_device_registry_request_stream) =
1141            create_proxy_and_stream::<fidl_fuchsia_input_injection::InputDeviceRegistryMarker>();
1142        let (input_device_client_end, mut input_device_request_stream) =
1143            create_request_stream::<fidl_fuchsia_input_report::InputDeviceMarker>();
1144
1145        let device_types = vec![input_device::InputDeviceType::Mouse];
1146        let (input_event_sender, _input_event_receiver) = futures::channel::mpsc::unbounded();
1147        let bindings: InputDeviceBindingHashMap = Arc::new(Mutex::new(HashMap::new()));
1148
1149        // Handle input device requests.
1150        let mut count: i8 = 0;
1151        fasync::Task::local(async move {
1152            // Register a device.
1153            let _ = input_device_registry_proxy.register(input_device_client_end);
1154
1155            while count < 3 {
1156                if let Some(input_device_request) =
1157                    input_device_request_stream.try_next().await.unwrap()
1158                {
1159                    handle_input_device_request(input_device_request);
1160                    count += 1;
1161                }
1162            }
1163
1164            // End handle_input_device_registry_request_stream() by taking the event stream.
1165            input_device_registry_proxy.take_event_stream();
1166        })
1167        .detach();
1168
1169        let inspector = fuchsia_inspect::Inspector::default();
1170        let test_node = inspector.root().create_child("input_pipeline");
1171
1172        // Start listening for InputDeviceRegistryRequests.
1173        let bindings_clone = bindings.clone();
1174        let _ = InputPipeline::handle_input_device_registry_request_stream(
1175            input_device_registry_request_stream,
1176            &device_types,
1177            &input_event_sender,
1178            &bindings_clone,
1179            &test_node,
1180            metrics::MetricsLogger::default(),
1181        )
1182        .await;
1183
1184        // Assert that a device was registered.
1185        let bindings = bindings.lock().await;
1186        assert_eq!(bindings.len(), 1);
1187    }
1188
1189    // Tests that correct properties are added to inspect node when InputPipeline is created.
1190    #[fasync::run_singlethreaded(test)]
1191    async fn check_inspect_node_has_correct_properties() {
1192        let device_types = vec![
1193            input_device::InputDeviceType::Touch,
1194            input_device::InputDeviceType::ConsumerControls,
1195        ];
1196        let inspector = fuchsia_inspect::Inspector::default();
1197        let test_node = inspector.root().create_child("input_pipeline");
1198        // Create fake input handler for assembly
1199        let (fake_handler_event_sender, _fake_handler_event_receiver) =
1200            futures::channel::mpsc::channel(100);
1201        let fake_input_handler =
1202            observe_fake_events_input_handler::ObserveFakeEventsInputHandler::new(
1203                fake_handler_event_sender,
1204            );
1205        let assembly = InputPipelineAssembly::new(metrics::MetricsLogger::default())
1206            .add_handler(fake_input_handler);
1207        let _test_input_pipeline = InputPipeline::new(
1208            device_types,
1209            assembly,
1210            test_node,
1211            metrics::MetricsLogger::default(),
1212        );
1213        diagnostics_assertions::assert_data_tree!(inspector, root: {
1214            input_pipeline: {
1215                supported_input_devices: "Touch, ConsumerControls",
1216                handlers_registered: 1u64,
1217                handlers_healthy: 1u64,
1218                input_devices: {}
1219            }
1220        });
1221    }
1222}