input_testing/
input_reports_reader.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{format_err, Context as _, Error};
6use fidl_fuchsia_input_report::{
7    InputReport, InputReportsReaderRequest, InputReportsReaderRequestStream,
8};
9use futures::{StreamExt, TryStreamExt};
10use std::convert::TryFrom as _;
11
12/// Implements the server side of the `fuchsia.input.report.InputReportsReader`
13/// protocol. Used by `modern_backend::InputDevice`.
14pub(super) struct InputReportsReader {
15    pub(super) request_stream: InputReportsReaderRequestStream,
16    /// FIFO queue of reports to be consumed by calls to
17    /// `fuchsia.input.report.InputReportsReader.ReadInputReports()`.
18    pub(super) report_receiver: futures::channel::mpsc::UnboundedReceiver<InputReport>,
19}
20
21impl InputReportsReader {
22    /// Returns a `Future` that resolves when
23    /// * `self.reports` is empty, or
24    /// * `self.request_stream` yields `None`, or
25    /// * an error occurs (invalid FIDL request, failure to send FIDL response).
26    ///
27    /// # Resolves to
28    /// * `Ok(())` if all reports were written successfully
29    /// * `Err` otherwise
30    ///
31    /// # Corner cases
32    /// If `self.reports` is _initially_ empty, the returned `Future` will resolve immediately.
33    ///
34    /// # Note
35    /// When the future resolves, `InputReports` may still be sitting unread in the
36    /// channel to the `fuchsia.input.report.InputReportsReader` client. (The client will
37    /// typically be an input pipeline implementation.)
38    pub(super) async fn into_future(self) -> Result<(), Error> {
39        // Group `reports` into chunks, to respect the requirements of the `InputReportsReader`
40        // protocol. Then `zip()` each chunk with a `InputReportsReader` protocol request.
41        // * If there are more chunks than requests, then some of the `InputReport`s were
42        //   not sent to the `InputReportsReader` client. In this case, this function
43        //   will report an error by checking `reports.is_done()` below.
44        // * If there are more requests than reports, no special-case handling is needed.
45        //   This is because an input pipeline implementation will normally issue
46        //   `ReadInputReports` requests indefinitely.
47        let chunk_size = usize::try_from(fidl_fuchsia_input_report::MAX_DEVICE_REPORT_COUNT)
48            .context("converting MAX_DEVICE_REPORT_COUNT to usize")?;
49        let mut reports = self.report_receiver.ready_chunks(chunk_size).fuse();
50        self.request_stream
51            .zip(reports.by_ref())
52            .map(|(request, reports)| match request {
53                Ok(request) => Ok((request, reports)),
54                Err(e) => Err(anyhow::Error::from(e).context("while reading reader request")),
55            })
56            .try_for_each(|request_and_reports| async {
57                match request_and_reports {
58                    (InputReportsReaderRequest::ReadInputReports { responder }, reports) => {
59                        responder
60                            .send(Ok(&reports))
61                            .map_err(anyhow::Error::from)
62                            .context("while sending reports")
63                    }
64                }
65            })
66            .await?;
67
68        match reports.is_done() {
69            true => Ok(()),
70            false => Err(format_err!("request_stream terminated with reports still pending")),
71        }
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::{InputReport, InputReportsReader};
78    use anyhow::{Context as _, Error};
79    use fidl::endpoints;
80    use fidl_fuchsia_input_report::{InputReportsReaderMarker, MAX_DEVICE_REPORT_COUNT};
81    use fuchsia_async as fasync;
82    use futures::future;
83
84    mod report_count {
85        use super::*;
86        use futures::pin_mut;
87        use futures::task::Poll;
88
89        #[fasync::run_until_stalled(test)]
90        async fn serves_single_report() -> Result<(), Error> {
91            let (proxy, request_stream) =
92                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
93            let (report_sender, report_receiver) =
94                futures::channel::mpsc::unbounded::<InputReport>();
95            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
96            report_sender
97                .unbounded_send(InputReport::default())
98                .expect("sending empty InputReport");
99            let reports_fut = proxy.read_input_reports();
100            std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
101            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
102
103            let (_, reports_result) = future::join(reader_fut, reports_fut).await;
104            let reports = reports_result
105                .expect("fidl error")
106                .map_err(zx::Status::from_raw)
107                .expect("service error");
108            assert_eq!(reports.len(), 1, "incorrect reports length");
109            Ok(())
110        }
111
112        #[fasync::run_until_stalled(test)]
113        async fn serves_max_report_count_reports() -> Result<(), Error> {
114            let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
115                .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
116            let (proxy, request_stream) =
117                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
118            let (report_sender, report_receiver) =
119                futures::channel::mpsc::unbounded::<InputReport>();
120            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
121            for _ in 0..max_reports {
122                report_sender
123                    .unbounded_send(InputReport::default())
124                    .expect("sending empty InputReport");
125            }
126            let reports_fut = proxy.read_input_reports();
127            std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
128            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
129
130            let (_, reports_result) = future::join(reader_fut, reports_fut).await;
131            let reports = reports_result
132                .expect("fidl error")
133                .map_err(zx::Status::from_raw)
134                .expect("service error");
135            assert_eq!(reports.len(), max_reports, "incorrect reports length");
136            Ok(())
137        }
138
139        #[test]
140        fn splits_overflowed_reports_to_next_read() -> Result<(), Error> {
141            let mut executor = fasync::TestExecutor::new();
142            let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
143                .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
144            let (proxy, request_stream) =
145                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
146            let (report_sender, report_receiver) =
147                futures::channel::mpsc::unbounded::<InputReport>();
148            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
149            for _ in 0..max_reports + 1 {
150                report_sender
151                    .unbounded_send(InputReport::default())
152                    .expect("sending empty InputReport");
153            }
154            pin_mut!(reader_fut);
155
156            // Note: this test deliberately serializes its FIDL requests. Concurrent requests
157            // are tested separately, in `super::fidl_interactions::preserves_query_order()`.
158            let reports_fut = proxy.read_input_reports();
159            let _ = executor.run_until_stalled(&mut reader_fut);
160            pin_mut!(reports_fut);
161            match executor.run_until_stalled(&mut reports_fut) {
162                Poll::Pending => panic!("read did not complete (1st query)"),
163                Poll::Ready(res) => {
164                    let reports = res
165                        .expect("fidl error")
166                        .map_err(zx::Status::from_raw)
167                        .expect("service error");
168                    assert_eq!(reports.len(), max_reports, "incorrect reports length (1st query)");
169                }
170            }
171
172            let reports_fut = proxy.read_input_reports();
173            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
174            let _ = executor.run_until_stalled(&mut reader_fut);
175            pin_mut!(reports_fut);
176            match executor.run_until_stalled(&mut reports_fut) {
177                Poll::Pending => panic!("read did not complete (2nd query)"),
178                Poll::Ready(res) => {
179                    let reports = res
180                        .expect("fidl error")
181                        .map_err(zx::Status::from_raw)
182                        .expect("service error");
183                    assert_eq!(reports.len(), 1, "incorrect reports length (2nd query)");
184                }
185            }
186
187            Ok(())
188        }
189    }
190
191    mod future_resolution {
192        use super::*;
193        use assert_matches::assert_matches;
194
195        #[fasync::run_until_stalled(test)]
196        async fn resolves_to_ok_when_all_reports_are_written() -> Result<(), Error> {
197            let (proxy, request_stream) =
198                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
199            let (report_sender, report_receiver) =
200                futures::channel::mpsc::unbounded::<InputReport>();
201            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
202            report_sender
203                .unbounded_send(InputReport::default())
204                .expect("sending empty InputReport");
205            let _reports_fut = proxy.read_input_reports();
206            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
207            assert_matches!(reader_fut.await, Ok(()));
208            Ok(())
209        }
210
211        #[fasync::run_until_stalled(test)]
212        async fn resolves_to_err_when_request_stream_is_terminated_before_reports_are_written(
213        ) -> Result<(), Error> {
214            let (proxy, request_stream) =
215                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
216            let (report_sender, report_receiver) =
217                futures::channel::mpsc::unbounded::<InputReport>();
218            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
219            report_sender
220                .unbounded_send(InputReport::default())
221                .expect("sending empty InputReport");
222            std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
223            assert_matches!(reader_fut.await, Err(_));
224            Ok(())
225        }
226
227        #[fasync::run_until_stalled(test)]
228        async fn resolves_to_err_if_request_stream_yields_error() -> Result<(), Error> {
229            let (client_end, request_stream) =
230                endpoints::create_request_stream::<InputReportsReaderMarker>();
231            let (report_sender, report_receiver) =
232                futures::channel::mpsc::unbounded::<InputReport>();
233            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
234            report_sender
235                .unbounded_send(InputReport::default())
236                .expect("sending empty InputReport");
237            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
238            client_end
239                .into_channel()
240                .write(b"not a valid FIDL message", /* handles */ &mut [])
241                .expect("internal error writing to channel");
242            assert_matches!(reader_fut.await, Err(_)); // while reading reader request
243            Ok(())
244        }
245
246        /* TODO(https://fxbug.dev/42075735): Re-enable this test
247        #[fasync::run_until_stalled(test)]
248        async fn resolves_to_err_if_send_fails() -> Result<(), Error> {
249            let (proxy, request_stream) =
250                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
251            let (report_sender, report_receiver) =
252                futures::channel::mpsc::unbounded::<InputReport>();
253            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
254            report_sender.unbounded_send(InputReport::default()).expect("sending empty InputReport");
255            let result_fut = proxy.read_input_reports(); // Send query.
256            std::mem::drop(result_fut); // Close handle to channel.
257            std::mem::drop(proxy); // Close other handle to channel.
258            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
259            assert_matches!(reader_fut.await, Err(_)); // while sending reports
260            Ok(())
261        }
262        */
263
264        #[fasync::run_until_stalled(test)]
265        async fn immediately_resolves_to_ok_when_reports_is_initially_empty() -> Result<(), Error> {
266            let (_proxy, request_stream) =
267                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
268            let (report_sender, report_receiver) =
269                futures::channel::mpsc::unbounded::<InputReport>();
270            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
271            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
272            assert_matches!(reader_fut.await, Ok(()));
273            Ok(())
274        }
275    }
276
277    mod fidl_interactions {
278        use super::*;
279        use assert_matches::assert_matches;
280        use futures::pin_mut;
281        use futures::task::Poll;
282
283        #[test]
284        fn closes_channel_after_reports_are_consumed() -> Result<(), Error> {
285            let mut executor = fasync::TestExecutor::new();
286            let (proxy, request_stream) =
287                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
288            let (report_sender, report_receiver) =
289                futures::channel::mpsc::unbounded::<InputReport>();
290            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
291            report_sender
292                .unbounded_send(InputReport::default())
293                .expect("sending empty InputReport");
294            let reports_fut = proxy.read_input_reports();
295            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
296
297            // Process the first query. This should close the FIDL connection.
298            let futures = future::join(reader_fut, reports_fut);
299            pin_mut!(futures);
300            std::mem::drop(executor.run_until_stalled(&mut futures));
301
302            // Try sending another query. This should fail.
303            assert_matches!(
304                executor.run_until_stalled(&mut proxy.read_input_reports()),
305                Poll::Ready(Err(fidl::Error::ClientChannelClosed { .. }))
306            );
307            Ok(())
308        }
309
310        #[fasync::run_until_stalled(test)]
311        async fn preserves_query_order() -> Result<(), Error> {
312            let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
313                .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
314            let (proxy, request_stream) =
315                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
316            let (report_sender, report_receiver) =
317                futures::channel::mpsc::unbounded::<InputReport>();
318            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
319            for _ in 0..max_reports + 1 {
320                report_sender
321                    .unbounded_send(InputReport::default())
322                    .expect("sending empty InputReport");
323            }
324            let first_reports_fut = proxy.read_input_reports();
325            let second_reports_fut = proxy.read_input_reports();
326            std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
327            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
328
329            let (_, first_reports_result, second_reports_result) =
330                futures::join!(reader_fut, first_reports_fut, second_reports_fut);
331            let first_reports = first_reports_result
332                .expect("fidl error")
333                .map_err(zx::Status::from_raw)
334                .expect("service error");
335            let second_reports = second_reports_result
336                .expect("fidl error")
337                .map_err(zx::Status::from_raw)
338                .expect("service error");
339            assert_eq!(first_reports.len(), max_reports, "incorrect reports length (1st query)");
340            assert_eq!(second_reports.len(), 1, "incorrect reports length (2nd query)");
341            Ok(())
342        }
343    }
344
345    #[fasync::run_until_stalled(test)]
346    async fn preserves_report_order() -> Result<(), Error> {
347        let (proxy, request_stream) =
348            endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
349        let (report_sender, report_receiver) = futures::channel::mpsc::unbounded::<InputReport>();
350        let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
351        report_sender
352            .unbounded_send(InputReport { event_time: Some(1), ..Default::default() })
353            .expect("sending first InputReport");
354        report_sender
355            .unbounded_send(InputReport { event_time: Some(2), ..Default::default() })
356            .expect("sending second InputReport");
357
358        let reports_fut = proxy.read_input_reports();
359        std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
360
361        assert_eq!(
362            future::join(reader_fut, reports_fut)
363                .await
364                .1
365                .expect("fidl error")
366                .map_err(zx::Status::from_raw)
367                .expect("service error")
368                .iter()
369                .map(|report| report.event_time)
370                .collect::<Vec<_>>(),
371            [Some(1), Some(2)]
372        );
373        Ok(())
374    }
375}