input_synthesis/modern_backend/
input_reports_reader.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![warn(missing_docs)]
6
7use anyhow::{format_err, Context as _, Error};
8use fidl_fuchsia_input_report::{
9    InputReport, InputReportsReaderRequest, InputReportsReaderRequestStream,
10};
11use futures::{StreamExt, TryStreamExt};
12use std::convert::TryFrom as _;
13
14/// Implements the server side of the `fuchsia.input.report.InputReportsReader`
15/// protocol. Used by `modern_backend::InputDevice`.
16pub(super) struct InputReportsReader {
17    pub(super) request_stream: InputReportsReaderRequestStream,
18    /// FIFO queue of reports to be consumed by calls to
19    /// `fuchsia.input.report.InputReportsReader.ReadInputReports()`.
20    pub(super) report_receiver: futures::channel::mpsc::UnboundedReceiver<InputReport>,
21}
22
23impl InputReportsReader {
24    /// Returns a `Future` that resolves when
25    /// * `self.reports` is empty, or
26    /// * `self.request_stream` yields `None`, or
27    /// * an error occurs (invalid FIDL request, failure to send FIDL response).
28    ///
29    /// # Resolves to
30    /// * `Ok(())` if all reports were written successfully
31    /// * `Err` otherwise
32    ///
33    /// # Corner cases
34    /// If `self.reports` is _initially_ empty, the returned `Future` will resolve immediately.
35    ///
36    /// # Note
37    /// When the future resolves, `InputReports` may still be sitting unread in the
38    /// channel to the `fuchsia.input.report.InputReportsReader` client. (The client will
39    /// typically be an input pipeline implementation.)
40    pub(super) async fn into_future(self) -> Result<(), Error> {
41        // Group `reports` into chunks, to respect the requirements of the `InputReportsReader`
42        // protocol. Then `zip()` each chunk with a `InputReportsReader` protocol request.
43        // * If there are more chunks than requests, then some of the `InputReport`s were
44        //   not sent to the `InputReportsReader` client. In this case, this function
45        //   will report an error by checking `reports.is_done()` below.
46        // * If there are more requests than reports, no special-case handling is needed.
47        //   This is because an input pipeline implementation will normally issue
48        //   `ReadInputReports` requests indefinitely.
49        let chunk_size = usize::try_from(fidl_fuchsia_input_report::MAX_DEVICE_REPORT_COUNT)
50            .context("converting MAX_DEVICE_REPORT_COUNT to usize")?;
51        let mut reports = self.report_receiver.ready_chunks(chunk_size).fuse();
52        self.request_stream
53            .zip(reports.by_ref())
54            .map(|(request, reports)| match request {
55                Ok(request) => Ok((request, reports)),
56                Err(e) => Err(anyhow::Error::from(e).context("while reading reader request")),
57            })
58            .try_for_each(|request_and_reports| async {
59                match request_and_reports {
60                    (InputReportsReaderRequest::ReadInputReports { responder }, reports) => {
61                        responder
62                            .send(Ok(&reports))
63                            .map_err(anyhow::Error::from)
64                            .context("while sending reports")
65                    }
66                }
67            })
68            .await?;
69
70        match reports.is_done() {
71            true => Ok(()),
72            false => Err(format_err!("request_stream terminated with reports still pending")),
73        }
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::{InputReport, InputReportsReader};
80    use anyhow::{Context as _, Error};
81    use fidl::endpoints;
82    use fidl_fuchsia_input_report::{InputReportsReaderMarker, MAX_DEVICE_REPORT_COUNT};
83    use fuchsia_async as fasync;
84    use futures::future;
85
86    mod report_count {
87        use super::*;
88        use futures::pin_mut;
89        use futures::task::Poll;
90
91        #[fasync::run_until_stalled(test)]
92        async fn serves_single_report() -> Result<(), Error> {
93            let (proxy, request_stream) =
94                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
95            let (report_sender, report_receiver) =
96                futures::channel::mpsc::unbounded::<InputReport>();
97            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
98            report_sender
99                .unbounded_send(InputReport::default())
100                .expect("sending empty InputReport");
101            let reports_fut = proxy.read_input_reports();
102            std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
103            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
104
105            let (_, reports_result) = future::join(reader_fut, reports_fut).await;
106            let reports = reports_result
107                .expect("fidl error")
108                .map_err(zx::Status::from_raw)
109                .expect("service error");
110            assert_eq!(reports.len(), 1, "incorrect reports length");
111            Ok(())
112        }
113
114        #[fasync::run_until_stalled(test)]
115        async fn serves_max_report_count_reports() -> Result<(), Error> {
116            let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
117                .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
118            let (proxy, request_stream) =
119                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
120            let (report_sender, report_receiver) =
121                futures::channel::mpsc::unbounded::<InputReport>();
122            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
123            for _ in 0..max_reports {
124                report_sender
125                    .unbounded_send(InputReport::default())
126                    .expect("sending empty InputReport");
127            }
128            let reports_fut = proxy.read_input_reports();
129            std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
130            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
131
132            let (_, reports_result) = future::join(reader_fut, reports_fut).await;
133            let reports = reports_result
134                .expect("fidl error")
135                .map_err(zx::Status::from_raw)
136                .expect("service error");
137            assert_eq!(reports.len(), max_reports, "incorrect reports length");
138            Ok(())
139        }
140
141        #[test]
142        fn splits_overflowed_reports_to_next_read() -> Result<(), Error> {
143            let mut executor = fasync::TestExecutor::new();
144            let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
145                .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
146            let (proxy, request_stream) =
147                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
148            let (report_sender, report_receiver) =
149                futures::channel::mpsc::unbounded::<InputReport>();
150            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
151            for _ in 0..max_reports + 1 {
152                report_sender
153                    .unbounded_send(InputReport::default())
154                    .expect("sending empty InputReport");
155            }
156            pin_mut!(reader_fut);
157
158            // Note: this test deliberately serializes its FIDL requests. Concurrent requests
159            // are tested separately, in `super::fidl_interactions::preserves_query_order()`.
160            let reports_fut = proxy.read_input_reports();
161            let _ = executor.run_until_stalled(&mut reader_fut);
162            pin_mut!(reports_fut);
163            match executor.run_until_stalled(&mut reports_fut) {
164                Poll::Pending => panic!("read did not complete (1st query)"),
165                Poll::Ready(res) => {
166                    let reports = res
167                        .expect("fidl error")
168                        .map_err(zx::Status::from_raw)
169                        .expect("service error");
170                    assert_eq!(reports.len(), max_reports, "incorrect reports length (1st query)");
171                }
172            }
173
174            let reports_fut = proxy.read_input_reports();
175            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
176            let _ = executor.run_until_stalled(&mut reader_fut);
177            pin_mut!(reports_fut);
178            match executor.run_until_stalled(&mut reports_fut) {
179                Poll::Pending => panic!("read did not complete (2nd query)"),
180                Poll::Ready(res) => {
181                    let reports = res
182                        .expect("fidl error")
183                        .map_err(zx::Status::from_raw)
184                        .expect("service error");
185                    assert_eq!(reports.len(), 1, "incorrect reports length (2nd query)");
186                }
187            }
188
189            Ok(())
190        }
191    }
192
193    mod future_resolution {
194        use super::*;
195        use assert_matches::assert_matches;
196
197        #[fasync::run_until_stalled(test)]
198        async fn resolves_to_ok_when_all_reports_are_written() -> Result<(), Error> {
199            let (proxy, request_stream) =
200                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
201            let (report_sender, report_receiver) =
202                futures::channel::mpsc::unbounded::<InputReport>();
203            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
204            report_sender
205                .unbounded_send(InputReport::default())
206                .expect("sending empty InputReport");
207            let _reports_fut = proxy.read_input_reports();
208            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
209            assert_matches!(reader_fut.await, Ok(()));
210            Ok(())
211        }
212
213        #[fasync::run_until_stalled(test)]
214        async fn resolves_to_err_when_request_stream_is_terminated_before_reports_are_written(
215        ) -> Result<(), Error> {
216            let (proxy, request_stream) =
217                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
218            let (report_sender, report_receiver) =
219                futures::channel::mpsc::unbounded::<InputReport>();
220            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
221            report_sender
222                .unbounded_send(InputReport::default())
223                .expect("sending empty InputReport");
224            std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
225            assert_matches!(reader_fut.await, Err(_));
226            Ok(())
227        }
228
229        #[fasync::run_until_stalled(test)]
230        async fn resolves_to_err_if_request_stream_yields_error() -> Result<(), Error> {
231            let (client_end, request_stream) =
232                endpoints::create_request_stream::<InputReportsReaderMarker>();
233            let (report_sender, report_receiver) =
234                futures::channel::mpsc::unbounded::<InputReport>();
235            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
236            report_sender
237                .unbounded_send(InputReport::default())
238                .expect("sending empty InputReport");
239            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
240            client_end
241                .into_channel()
242                .write(b"not a valid FIDL message", /* handles */ &mut [])
243                .expect("internal error writing to channel");
244            assert_matches!(reader_fut.await, Err(_)); // while reading reader request
245            Ok(())
246        }
247
248        #[fasync::run_until_stalled(test)]
249        async fn resolves_to_ok_if_client_closes_channel_and_ignores_reply() -> Result<(), Error> {
250            let (proxy, request_stream) =
251                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
252            let (report_sender, report_receiver) =
253                futures::channel::mpsc::unbounded::<InputReport>();
254            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
255            report_sender
256                .unbounded_send(InputReport::default())
257                .expect("sending empty InputReport");
258            let result_fut = proxy.read_input_reports(); // Send query.
259            std::mem::drop(result_fut); // Close handle to channel.
260            std::mem::drop(proxy); // Close other handle to channel.
261            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
262            assert_matches!(reader_fut.await, Ok(()));
263            Ok(())
264        }
265
266        #[fasync::run_until_stalled(test)]
267        async fn immediately_resolves_to_ok_when_reports_is_initially_empty() -> Result<(), Error> {
268            let (_proxy, request_stream) =
269                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
270            let (report_sender, report_receiver) =
271                futures::channel::mpsc::unbounded::<InputReport>();
272            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
273            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
274            assert_matches!(reader_fut.await, Ok(()));
275            Ok(())
276        }
277    }
278
279    mod fidl_interactions {
280        use super::*;
281        use assert_matches::assert_matches;
282        use futures::pin_mut;
283        use futures::task::Poll;
284
285        #[test]
286        fn closes_channel_after_reports_are_consumed() -> Result<(), Error> {
287            let mut executor = fasync::TestExecutor::new();
288            let (proxy, request_stream) =
289                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
290            let (report_sender, report_receiver) =
291                futures::channel::mpsc::unbounded::<InputReport>();
292            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
293            report_sender
294                .unbounded_send(InputReport::default())
295                .expect("sending empty InputReport");
296            let reports_fut = proxy.read_input_reports();
297            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
298
299            // Process the first query. This should close the FIDL connection.
300            let futures = future::join(reader_fut, reports_fut);
301            pin_mut!(futures);
302            std::mem::drop(executor.run_until_stalled(&mut futures));
303
304            // Try sending another query. This should fail.
305            assert_matches!(
306                executor.run_until_stalled(&mut proxy.read_input_reports()),
307                Poll::Ready(Err(fidl::Error::ClientChannelClosed { .. }))
308            );
309            Ok(())
310        }
311
312        #[fasync::run_until_stalled(test)]
313        async fn preserves_query_order() -> Result<(), Error> {
314            let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
315                .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
316            let (proxy, request_stream) =
317                endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
318            let (report_sender, report_receiver) =
319                futures::channel::mpsc::unbounded::<InputReport>();
320            let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
321            for _ in 0..max_reports + 1 {
322                report_sender
323                    .unbounded_send(InputReport::default())
324                    .expect("sending empty InputReport");
325            }
326            let first_reports_fut = proxy.read_input_reports();
327            let second_reports_fut = proxy.read_input_reports();
328            std::mem::drop(proxy); // Drop `proxy` to terminate `request_stream`.
329            std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
330
331            let (_, first_reports_result, second_reports_result) =
332                futures::join!(reader_fut, first_reports_fut, second_reports_fut);
333            let first_reports = first_reports_result
334                .expect("fidl error")
335                .map_err(zx::Status::from_raw)
336                .expect("service error");
337            let second_reports = second_reports_result
338                .expect("fidl error")
339                .map_err(zx::Status::from_raw)
340                .expect("service error");
341            assert_eq!(first_reports.len(), max_reports, "incorrect reports length (1st query)");
342            assert_eq!(second_reports.len(), 1, "incorrect reports length (2nd query)");
343            Ok(())
344        }
345    }
346
347    #[fasync::run_until_stalled(test)]
348    async fn preserves_report_order() -> Result<(), Error> {
349        let (proxy, request_stream) =
350            endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
351        let (report_sender, report_receiver) = futures::channel::mpsc::unbounded::<InputReport>();
352        let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
353        report_sender
354            .unbounded_send(InputReport { event_time: Some(1), ..Default::default() })
355            .expect("sending first InputReport");
356        report_sender
357            .unbounded_send(InputReport { event_time: Some(2), ..Default::default() })
358            .expect("sending second InputReport");
359
360        let reports_fut = proxy.read_input_reports();
361        std::mem::drop(report_sender); // Drop `report_sender` to terminate `report_receiver`.
362
363        assert_eq!(
364            future::join(reader_fut, reports_fut)
365                .await
366                .1
367                .expect("fidl error")
368                .map_err(zx::Status::from_raw)
369                .expect("service error")
370                .iter()
371                .map(|report| report.event_time)
372                .collect::<Vec<_>>(),
373            [Some(1), Some(2)]
374        );
375        Ok(())
376    }
377}