1use anyhow::{format_err, Context as _, Error};
6use fidl_fuchsia_input_report::{
7 InputReport, InputReportsReaderRequest, InputReportsReaderRequestStream,
8};
9use futures::{StreamExt, TryStreamExt};
10use std::convert::TryFrom as _;
11
12pub(super) struct InputReportsReader {
15 pub(super) request_stream: InputReportsReaderRequestStream,
16 pub(super) report_receiver: futures::channel::mpsc::UnboundedReceiver<InputReport>,
19}
20
21impl InputReportsReader {
22 pub(super) async fn into_future(self) -> Result<(), Error> {
39 let chunk_size = usize::try_from(fidl_fuchsia_input_report::MAX_DEVICE_REPORT_COUNT)
48 .context("converting MAX_DEVICE_REPORT_COUNT to usize")?;
49 let mut reports = self.report_receiver.ready_chunks(chunk_size).fuse();
50 self.request_stream
51 .zip(reports.by_ref())
52 .map(|(request, reports)| match request {
53 Ok(request) => Ok((request, reports)),
54 Err(e) => Err(anyhow::Error::from(e).context("while reading reader request")),
55 })
56 .try_for_each(|request_and_reports| async {
57 match request_and_reports {
58 (InputReportsReaderRequest::ReadInputReports { responder }, reports) => {
59 responder
60 .send(Ok(&reports))
61 .map_err(anyhow::Error::from)
62 .context("while sending reports")
63 }
64 }
65 })
66 .await?;
67
68 match reports.is_done() {
69 true => Ok(()),
70 false => Err(format_err!("request_stream terminated with reports still pending")),
71 }
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::{InputReport, InputReportsReader};
78 use anyhow::{Context as _, Error};
79 use fidl::endpoints;
80 use fidl_fuchsia_input_report::{InputReportsReaderMarker, MAX_DEVICE_REPORT_COUNT};
81 use fuchsia_async as fasync;
82 use futures::future;
83
84 mod report_count {
85 use super::*;
86 use futures::pin_mut;
87 use futures::task::Poll;
88
89 #[fasync::run_until_stalled(test)]
90 async fn serves_single_report() -> Result<(), Error> {
91 let (proxy, request_stream) =
92 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
93 let (report_sender, report_receiver) =
94 futures::channel::mpsc::unbounded::<InputReport>();
95 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
96 report_sender
97 .unbounded_send(InputReport::default())
98 .expect("sending empty InputReport");
99 let reports_fut = proxy.read_input_reports();
100 std::mem::drop(proxy); std::mem::drop(report_sender); let (_, reports_result) = future::join(reader_fut, reports_fut).await;
104 let reports = reports_result
105 .expect("fidl error")
106 .map_err(zx::Status::from_raw)
107 .expect("service error");
108 assert_eq!(reports.len(), 1, "incorrect reports length");
109 Ok(())
110 }
111
112 #[fasync::run_until_stalled(test)]
113 async fn serves_max_report_count_reports() -> Result<(), Error> {
114 let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
115 .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
116 let (proxy, request_stream) =
117 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
118 let (report_sender, report_receiver) =
119 futures::channel::mpsc::unbounded::<InputReport>();
120 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
121 for _ in 0..max_reports {
122 report_sender
123 .unbounded_send(InputReport::default())
124 .expect("sending empty InputReport");
125 }
126 let reports_fut = proxy.read_input_reports();
127 std::mem::drop(proxy); std::mem::drop(report_sender); let (_, reports_result) = future::join(reader_fut, reports_fut).await;
131 let reports = reports_result
132 .expect("fidl error")
133 .map_err(zx::Status::from_raw)
134 .expect("service error");
135 assert_eq!(reports.len(), max_reports, "incorrect reports length");
136 Ok(())
137 }
138
139 #[test]
140 fn splits_overflowed_reports_to_next_read() -> Result<(), Error> {
141 let mut executor = fasync::TestExecutor::new();
142 let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
143 .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
144 let (proxy, request_stream) =
145 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
146 let (report_sender, report_receiver) =
147 futures::channel::mpsc::unbounded::<InputReport>();
148 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
149 for _ in 0..max_reports + 1 {
150 report_sender
151 .unbounded_send(InputReport::default())
152 .expect("sending empty InputReport");
153 }
154 pin_mut!(reader_fut);
155
156 let reports_fut = proxy.read_input_reports();
159 let _ = executor.run_until_stalled(&mut reader_fut);
160 pin_mut!(reports_fut);
161 match executor.run_until_stalled(&mut reports_fut) {
162 Poll::Pending => panic!("read did not complete (1st query)"),
163 Poll::Ready(res) => {
164 let reports = res
165 .expect("fidl error")
166 .map_err(zx::Status::from_raw)
167 .expect("service error");
168 assert_eq!(reports.len(), max_reports, "incorrect reports length (1st query)");
169 }
170 }
171
172 let reports_fut = proxy.read_input_reports();
173 std::mem::drop(report_sender); let _ = executor.run_until_stalled(&mut reader_fut);
175 pin_mut!(reports_fut);
176 match executor.run_until_stalled(&mut reports_fut) {
177 Poll::Pending => panic!("read did not complete (2nd query)"),
178 Poll::Ready(res) => {
179 let reports = res
180 .expect("fidl error")
181 .map_err(zx::Status::from_raw)
182 .expect("service error");
183 assert_eq!(reports.len(), 1, "incorrect reports length (2nd query)");
184 }
185 }
186
187 Ok(())
188 }
189 }
190
191 mod future_resolution {
192 use super::*;
193 use assert_matches::assert_matches;
194
195 #[fasync::run_until_stalled(test)]
196 async fn resolves_to_ok_when_all_reports_are_written() -> Result<(), Error> {
197 let (proxy, request_stream) =
198 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
199 let (report_sender, report_receiver) =
200 futures::channel::mpsc::unbounded::<InputReport>();
201 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
202 report_sender
203 .unbounded_send(InputReport::default())
204 .expect("sending empty InputReport");
205 let _reports_fut = proxy.read_input_reports();
206 std::mem::drop(report_sender); assert_matches!(reader_fut.await, Ok(()));
208 Ok(())
209 }
210
211 #[fasync::run_until_stalled(test)]
212 async fn resolves_to_err_when_request_stream_is_terminated_before_reports_are_written(
213 ) -> Result<(), Error> {
214 let (proxy, request_stream) =
215 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
216 let (report_sender, report_receiver) =
217 futures::channel::mpsc::unbounded::<InputReport>();
218 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
219 report_sender
220 .unbounded_send(InputReport::default())
221 .expect("sending empty InputReport");
222 std::mem::drop(proxy); assert_matches!(reader_fut.await, Err(_));
224 Ok(())
225 }
226
227 #[fasync::run_until_stalled(test)]
228 async fn resolves_to_err_if_request_stream_yields_error() -> Result<(), Error> {
229 let (client_end, request_stream) =
230 endpoints::create_request_stream::<InputReportsReaderMarker>();
231 let (report_sender, report_receiver) =
232 futures::channel::mpsc::unbounded::<InputReport>();
233 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
234 report_sender
235 .unbounded_send(InputReport::default())
236 .expect("sending empty InputReport");
237 std::mem::drop(report_sender); client_end
239 .into_channel()
240 .write(b"not a valid FIDL message", &mut [])
241 .expect("internal error writing to channel");
242 assert_matches!(reader_fut.await, Err(_)); Ok(())
244 }
245
246 #[fasync::run_until_stalled(test)]
265 async fn immediately_resolves_to_ok_when_reports_is_initially_empty() -> Result<(), Error> {
266 let (_proxy, request_stream) =
267 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
268 let (report_sender, report_receiver) =
269 futures::channel::mpsc::unbounded::<InputReport>();
270 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
271 std::mem::drop(report_sender); assert_matches!(reader_fut.await, Ok(()));
273 Ok(())
274 }
275 }
276
277 mod fidl_interactions {
278 use super::*;
279 use assert_matches::assert_matches;
280 use futures::pin_mut;
281 use futures::task::Poll;
282
283 #[test]
284 fn closes_channel_after_reports_are_consumed() -> Result<(), Error> {
285 let mut executor = fasync::TestExecutor::new();
286 let (proxy, request_stream) =
287 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
288 let (report_sender, report_receiver) =
289 futures::channel::mpsc::unbounded::<InputReport>();
290 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
291 report_sender
292 .unbounded_send(InputReport::default())
293 .expect("sending empty InputReport");
294 let reports_fut = proxy.read_input_reports();
295 std::mem::drop(report_sender); let futures = future::join(reader_fut, reports_fut);
299 pin_mut!(futures);
300 std::mem::drop(executor.run_until_stalled(&mut futures));
301
302 assert_matches!(
304 executor.run_until_stalled(&mut proxy.read_input_reports()),
305 Poll::Ready(Err(fidl::Error::ClientChannelClosed { .. }))
306 );
307 Ok(())
308 }
309
310 #[fasync::run_until_stalled(test)]
311 async fn preserves_query_order() -> Result<(), Error> {
312 let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
313 .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
314 let (proxy, request_stream) =
315 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
316 let (report_sender, report_receiver) =
317 futures::channel::mpsc::unbounded::<InputReport>();
318 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
319 for _ in 0..max_reports + 1 {
320 report_sender
321 .unbounded_send(InputReport::default())
322 .expect("sending empty InputReport");
323 }
324 let first_reports_fut = proxy.read_input_reports();
325 let second_reports_fut = proxy.read_input_reports();
326 std::mem::drop(proxy); std::mem::drop(report_sender); let (_, first_reports_result, second_reports_result) =
330 futures::join!(reader_fut, first_reports_fut, second_reports_fut);
331 let first_reports = first_reports_result
332 .expect("fidl error")
333 .map_err(zx::Status::from_raw)
334 .expect("service error");
335 let second_reports = second_reports_result
336 .expect("fidl error")
337 .map_err(zx::Status::from_raw)
338 .expect("service error");
339 assert_eq!(first_reports.len(), max_reports, "incorrect reports length (1st query)");
340 assert_eq!(second_reports.len(), 1, "incorrect reports length (2nd query)");
341 Ok(())
342 }
343 }
344
345 #[fasync::run_until_stalled(test)]
346 async fn preserves_report_order() -> Result<(), Error> {
347 let (proxy, request_stream) =
348 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
349 let (report_sender, report_receiver) = futures::channel::mpsc::unbounded::<InputReport>();
350 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
351 report_sender
352 .unbounded_send(InputReport { event_time: Some(1), ..Default::default() })
353 .expect("sending first InputReport");
354 report_sender
355 .unbounded_send(InputReport { event_time: Some(2), ..Default::default() })
356 .expect("sending second InputReport");
357
358 let reports_fut = proxy.read_input_reports();
359 std::mem::drop(report_sender); assert_eq!(
362 future::join(reader_fut, reports_fut)
363 .await
364 .1
365 .expect("fidl error")
366 .map_err(zx::Status::from_raw)
367 .expect("service error")
368 .iter()
369 .map(|report| report.event_time)
370 .collect::<Vec<_>>(),
371 [Some(1), Some(2)]
372 );
373 Ok(())
374 }
375}