input_synthesis/modern_backend/
input_reports_reader.rs1#![warn(missing_docs)]
6
7use anyhow::{format_err, Context as _, Error};
8use fidl_fuchsia_input_report::{
9 InputReport, InputReportsReaderRequest, InputReportsReaderRequestStream,
10};
11use futures::{StreamExt, TryStreamExt};
12use std::convert::TryFrom as _;
13
14pub(super) struct InputReportsReader {
17 pub(super) request_stream: InputReportsReaderRequestStream,
18 pub(super) report_receiver: futures::channel::mpsc::UnboundedReceiver<InputReport>,
21}
22
23impl InputReportsReader {
24 pub(super) async fn into_future(self) -> Result<(), Error> {
41 let chunk_size = usize::try_from(fidl_fuchsia_input_report::MAX_DEVICE_REPORT_COUNT)
50 .context("converting MAX_DEVICE_REPORT_COUNT to usize")?;
51 let mut reports = self.report_receiver.ready_chunks(chunk_size).fuse();
52 self.request_stream
53 .zip(reports.by_ref())
54 .map(|(request, reports)| match request {
55 Ok(request) => Ok((request, reports)),
56 Err(e) => Err(anyhow::Error::from(e).context("while reading reader request")),
57 })
58 .try_for_each(|request_and_reports| async {
59 match request_and_reports {
60 (InputReportsReaderRequest::ReadInputReports { responder }, reports) => {
61 responder
62 .send(Ok(&reports))
63 .map_err(anyhow::Error::from)
64 .context("while sending reports")
65 }
66 }
67 })
68 .await?;
69
70 match reports.is_done() {
71 true => Ok(()),
72 false => Err(format_err!("request_stream terminated with reports still pending")),
73 }
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use super::{InputReport, InputReportsReader};
80 use anyhow::{Context as _, Error};
81 use fidl::endpoints;
82 use fidl_fuchsia_input_report::{InputReportsReaderMarker, MAX_DEVICE_REPORT_COUNT};
83 use fuchsia_async as fasync;
84 use futures::future;
85
86 mod report_count {
87 use super::*;
88 use futures::pin_mut;
89 use futures::task::Poll;
90
91 #[fasync::run_until_stalled(test)]
92 async fn serves_single_report() -> Result<(), Error> {
93 let (proxy, request_stream) =
94 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
95 let (report_sender, report_receiver) =
96 futures::channel::mpsc::unbounded::<InputReport>();
97 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
98 report_sender
99 .unbounded_send(InputReport::default())
100 .expect("sending empty InputReport");
101 let reports_fut = proxy.read_input_reports();
102 std::mem::drop(proxy); std::mem::drop(report_sender); let (_, reports_result) = future::join(reader_fut, reports_fut).await;
106 let reports = reports_result
107 .expect("fidl error")
108 .map_err(zx::Status::from_raw)
109 .expect("service error");
110 assert_eq!(reports.len(), 1, "incorrect reports length");
111 Ok(())
112 }
113
114 #[fasync::run_until_stalled(test)]
115 async fn serves_max_report_count_reports() -> Result<(), Error> {
116 let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
117 .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
118 let (proxy, request_stream) =
119 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
120 let (report_sender, report_receiver) =
121 futures::channel::mpsc::unbounded::<InputReport>();
122 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
123 for _ in 0..max_reports {
124 report_sender
125 .unbounded_send(InputReport::default())
126 .expect("sending empty InputReport");
127 }
128 let reports_fut = proxy.read_input_reports();
129 std::mem::drop(proxy); std::mem::drop(report_sender); let (_, reports_result) = future::join(reader_fut, reports_fut).await;
133 let reports = reports_result
134 .expect("fidl error")
135 .map_err(zx::Status::from_raw)
136 .expect("service error");
137 assert_eq!(reports.len(), max_reports, "incorrect reports length");
138 Ok(())
139 }
140
141 #[test]
142 fn splits_overflowed_reports_to_next_read() -> Result<(), Error> {
143 let mut executor = fasync::TestExecutor::new();
144 let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
145 .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
146 let (proxy, request_stream) =
147 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
148 let (report_sender, report_receiver) =
149 futures::channel::mpsc::unbounded::<InputReport>();
150 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
151 for _ in 0..max_reports + 1 {
152 report_sender
153 .unbounded_send(InputReport::default())
154 .expect("sending empty InputReport");
155 }
156 pin_mut!(reader_fut);
157
158 let reports_fut = proxy.read_input_reports();
161 let _ = executor.run_until_stalled(&mut reader_fut);
162 pin_mut!(reports_fut);
163 match executor.run_until_stalled(&mut reports_fut) {
164 Poll::Pending => panic!("read did not complete (1st query)"),
165 Poll::Ready(res) => {
166 let reports = res
167 .expect("fidl error")
168 .map_err(zx::Status::from_raw)
169 .expect("service error");
170 assert_eq!(reports.len(), max_reports, "incorrect reports length (1st query)");
171 }
172 }
173
174 let reports_fut = proxy.read_input_reports();
175 std::mem::drop(report_sender); let _ = executor.run_until_stalled(&mut reader_fut);
177 pin_mut!(reports_fut);
178 match executor.run_until_stalled(&mut reports_fut) {
179 Poll::Pending => panic!("read did not complete (2nd query)"),
180 Poll::Ready(res) => {
181 let reports = res
182 .expect("fidl error")
183 .map_err(zx::Status::from_raw)
184 .expect("service error");
185 assert_eq!(reports.len(), 1, "incorrect reports length (2nd query)");
186 }
187 }
188
189 Ok(())
190 }
191 }
192
193 mod future_resolution {
194 use super::*;
195 use assert_matches::assert_matches;
196
197 #[fasync::run_until_stalled(test)]
198 async fn resolves_to_ok_when_all_reports_are_written() -> Result<(), Error> {
199 let (proxy, request_stream) =
200 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
201 let (report_sender, report_receiver) =
202 futures::channel::mpsc::unbounded::<InputReport>();
203 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
204 report_sender
205 .unbounded_send(InputReport::default())
206 .expect("sending empty InputReport");
207 let _reports_fut = proxy.read_input_reports();
208 std::mem::drop(report_sender); assert_matches!(reader_fut.await, Ok(()));
210 Ok(())
211 }
212
213 #[fasync::run_until_stalled(test)]
214 async fn resolves_to_err_when_request_stream_is_terminated_before_reports_are_written(
215 ) -> Result<(), Error> {
216 let (proxy, request_stream) =
217 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
218 let (report_sender, report_receiver) =
219 futures::channel::mpsc::unbounded::<InputReport>();
220 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
221 report_sender
222 .unbounded_send(InputReport::default())
223 .expect("sending empty InputReport");
224 std::mem::drop(proxy); assert_matches!(reader_fut.await, Err(_));
226 Ok(())
227 }
228
229 #[fasync::run_until_stalled(test)]
230 async fn resolves_to_err_if_request_stream_yields_error() -> Result<(), Error> {
231 let (client_end, request_stream) =
232 endpoints::create_request_stream::<InputReportsReaderMarker>();
233 let (report_sender, report_receiver) =
234 futures::channel::mpsc::unbounded::<InputReport>();
235 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
236 report_sender
237 .unbounded_send(InputReport::default())
238 .expect("sending empty InputReport");
239 std::mem::drop(report_sender); client_end
241 .into_channel()
242 .write(b"not a valid FIDL message", &mut [])
243 .expect("internal error writing to channel");
244 assert_matches!(reader_fut.await, Err(_)); Ok(())
246 }
247
248 #[fasync::run_until_stalled(test)]
249 async fn resolves_to_ok_if_client_closes_channel_and_ignores_reply() -> Result<(), Error> {
250 let (proxy, request_stream) =
251 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
252 let (report_sender, report_receiver) =
253 futures::channel::mpsc::unbounded::<InputReport>();
254 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
255 report_sender
256 .unbounded_send(InputReport::default())
257 .expect("sending empty InputReport");
258 let result_fut = proxy.read_input_reports(); std::mem::drop(result_fut); std::mem::drop(proxy); std::mem::drop(report_sender); assert_matches!(reader_fut.await, Ok(()));
263 Ok(())
264 }
265
266 #[fasync::run_until_stalled(test)]
267 async fn immediately_resolves_to_ok_when_reports_is_initially_empty() -> Result<(), Error> {
268 let (_proxy, request_stream) =
269 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
270 let (report_sender, report_receiver) =
271 futures::channel::mpsc::unbounded::<InputReport>();
272 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
273 std::mem::drop(report_sender); assert_matches!(reader_fut.await, Ok(()));
275 Ok(())
276 }
277 }
278
279 mod fidl_interactions {
280 use super::*;
281 use assert_matches::assert_matches;
282 use futures::pin_mut;
283 use futures::task::Poll;
284
285 #[test]
286 fn closes_channel_after_reports_are_consumed() -> Result<(), Error> {
287 let mut executor = fasync::TestExecutor::new();
288 let (proxy, request_stream) =
289 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
290 let (report_sender, report_receiver) =
291 futures::channel::mpsc::unbounded::<InputReport>();
292 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
293 report_sender
294 .unbounded_send(InputReport::default())
295 .expect("sending empty InputReport");
296 let reports_fut = proxy.read_input_reports();
297 std::mem::drop(report_sender); let futures = future::join(reader_fut, reports_fut);
301 pin_mut!(futures);
302 std::mem::drop(executor.run_until_stalled(&mut futures));
303
304 assert_matches!(
306 executor.run_until_stalled(&mut proxy.read_input_reports()),
307 Poll::Ready(Err(fidl::Error::ClientChannelClosed { .. }))
308 );
309 Ok(())
310 }
311
312 #[fasync::run_until_stalled(test)]
313 async fn preserves_query_order() -> Result<(), Error> {
314 let max_reports = usize::try_from(MAX_DEVICE_REPORT_COUNT)
315 .context("internal error converting MAX_DEVICE_REPORT_COUNT to usize")?;
316 let (proxy, request_stream) =
317 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
318 let (report_sender, report_receiver) =
319 futures::channel::mpsc::unbounded::<InputReport>();
320 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
321 for _ in 0..max_reports + 1 {
322 report_sender
323 .unbounded_send(InputReport::default())
324 .expect("sending empty InputReport");
325 }
326 let first_reports_fut = proxy.read_input_reports();
327 let second_reports_fut = proxy.read_input_reports();
328 std::mem::drop(proxy); std::mem::drop(report_sender); let (_, first_reports_result, second_reports_result) =
332 futures::join!(reader_fut, first_reports_fut, second_reports_fut);
333 let first_reports = first_reports_result
334 .expect("fidl error")
335 .map_err(zx::Status::from_raw)
336 .expect("service error");
337 let second_reports = second_reports_result
338 .expect("fidl error")
339 .map_err(zx::Status::from_raw)
340 .expect("service error");
341 assert_eq!(first_reports.len(), max_reports, "incorrect reports length (1st query)");
342 assert_eq!(second_reports.len(), 1, "incorrect reports length (2nd query)");
343 Ok(())
344 }
345 }
346
347 #[fasync::run_until_stalled(test)]
348 async fn preserves_report_order() -> Result<(), Error> {
349 let (proxy, request_stream) =
350 endpoints::create_proxy_and_stream::<InputReportsReaderMarker>();
351 let (report_sender, report_receiver) = futures::channel::mpsc::unbounded::<InputReport>();
352 let reader_fut = InputReportsReader { request_stream, report_receiver }.into_future();
353 report_sender
354 .unbounded_send(InputReport { event_time: Some(1), ..Default::default() })
355 .expect("sending first InputReport");
356 report_sender
357 .unbounded_send(InputReport { event_time: Some(2), ..Default::default() })
358 .expect("sending second InputReport");
359
360 let reports_fut = proxy.read_input_reports();
361 std::mem::drop(report_sender); assert_eq!(
364 future::join(reader_fut, reports_fut)
365 .await
366 .1
367 .expect("fidl error")
368 .map_err(zx::Status::from_raw)
369 .expect("service error")
370 .iter()
371 .map(|report| report.event_time)
372 .collect::<Vec<_>>(),
373 [Some(1), Some(2)]
374 );
375 Ok(())
376 }
377}