1use fidl::client::QueryResponseFut;
6use flex_fuchsia_io as fio;
7use futures::io::AsyncRead;
8use std::cmp::min;
9use std::convert::TryInto as _;
10use std::future::Future as _;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use flex_client::fidl::Proxy as _;
15
16#[derive(Debug)]
19pub struct AsyncReader {
20 file: fio::FileProxy,
21 state: State,
22}
23
24#[derive(Debug)]
25enum State {
26 Empty,
27 Forwarding {
28 fut: QueryResponseFut<Result<Vec<u8>, i32>, flex_client::Dialect>,
29 zero_byte_request: bool,
30 },
31 Bytes {
32 bytes: Vec<u8>,
33 offset: usize,
34 },
35}
36
37impl AsyncReader {
38 pub fn from_proxy(file: fio::FileProxy) -> Result<Self, AsyncReaderError> {
50 let file = match file.into_channel() {
51 Ok(channel) => fio::FileProxy::new(channel),
52 Err(file) => {
53 return Err(AsyncReaderError::NonExclusiveChannelOwnership(file));
54 }
55 };
56 Ok(Self { file, state: State::Empty })
57 }
58}
59
60impl AsyncRead for AsyncReader {
61 fn poll_read(
62 mut self: Pin<&mut Self>,
63 cx: &mut Context<'_>,
64 buf: &mut [u8],
65 ) -> Poll<std::io::Result<usize>> {
66 loop {
67 match self.state {
68 State::Empty => {
69 let len = if let Ok(len) = buf.len().try_into() {
70 min(len, fio::MAX_BUF)
71 } else {
72 fio::MAX_BUF
73 };
74 self.state =
75 State::Forwarding { fut: self.file.read(len), zero_byte_request: len == 0 };
76 }
77 State::Forwarding { ref mut fut, ref zero_byte_request } => {
78 match futures::ready!(Pin::new(fut).poll(cx)) {
79 Ok(result) => {
80 match result {
81 Err(s) => {
82 self.state = State::Empty;
83 return Poll::Ready(Err(
84 zx_status::Status::from_raw(s).into_io_error()
85 ));
86 }
87 Ok(bytes) => {
88 if *zero_byte_request && buf.len() != 0 {
97 self.state = State::Empty;
98 } else {
99 self.state = State::Bytes { bytes, offset: 0 };
100 }
101 }
102 }
103 }
104 Err(e) => {
105 self.state = State::Empty;
106 return Poll::Ready(Err(std::io::Error::other(e)));
107 }
108 }
109 }
110 State::Bytes { ref bytes, ref mut offset } => {
111 let n = min(buf.len(), bytes.len() - *offset);
112 let next_offset = *offset + n;
113 let () = buf[..n].copy_from_slice(&bytes[*offset..next_offset]);
114 if next_offset == bytes.len() {
115 self.state = State::Empty;
116 } else {
117 *offset = next_offset;
118 }
119 return Poll::Ready(Ok(n));
120 }
121 }
122 }
123 }
124}
125
126#[derive(Debug, thiserror::Error)]
127pub enum AsyncReaderError {
128 #[error("Supplied FileProxy did not have exclusive ownership of the underlying channel")]
129 NonExclusiveChannelOwnership(fio::FileProxy),
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::file;
136 use assert_matches::assert_matches;
137 use fidl::endpoints;
138 use fuchsia_async as fasync;
139 use futures::future::poll_fn;
140 use futures::io::AsyncReadExt as _;
141 use futures::{join, StreamExt as _, TryStreamExt as _};
142 use std::convert::TryFrom as _;
143 use tempfile::TempDir;
144
145 #[fasync::run_singlethreaded(test)]
146 async fn exclusive_ownership() {
147 let (proxy, _) = endpoints::create_proxy::<fio::FileMarker>();
148 let _stream = proxy.take_event_stream();
149
150 assert_matches!(AsyncReader::from_proxy(proxy), Err(_));
151 }
152
153 async fn read_to_end_file_with_expected_contents(expected_contents: &[u8]) {
154 let dir = TempDir::new().unwrap();
155 let path =
156 dir.path().join("read_to_end_with_expected_contents").to_str().unwrap().to_owned();
157 let () = file::write_in_namespace(&path, expected_contents).await.unwrap();
158 let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
159
160 let mut reader = AsyncReader::from_proxy(file).unwrap();
161 let mut actual_contents = vec![];
162 reader.read_to_end(&mut actual_contents).await.unwrap();
163
164 assert_eq!(actual_contents, expected_contents);
165 }
166
167 #[fasync::run_singlethreaded(test)]
168 async fn read_to_end_empty() {
169 read_to_end_file_with_expected_contents(&[]).await;
170 }
171
172 #[fasync::run_singlethreaded(test)]
173 async fn read_to_end_large() {
174 let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
175 read_to_end_file_with_expected_contents(&expected_contents[..]).await;
176 }
177
178 async fn poll_read_with_specific_buf_size(poll_read_size: u64, expected_file_read_size: u64) {
179 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
180
181 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
182
183 let () = poll_fn(|cx| {
184 let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
185 assert_matches!(Pin::new(&mut reader).poll_read(cx, buf.as_mut_slice()), Poll::Pending);
186 Poll::Ready(())
187 })
188 .await;
189
190 match stream.next().await.unwrap().unwrap() {
191 fio::FileRequest::Read { count, .. } => {
192 assert_eq!(count, expected_file_read_size);
193 }
194 req => panic!("unhandled request {:?}", req),
195 }
196 }
197
198 #[fasync::run_singlethreaded(test)]
199 async fn poll_read_empty_buf() {
200 poll_read_with_specific_buf_size(0, 0).await;
201 }
202
203 #[fasync::run_singlethreaded(test)]
204 async fn poll_read_caps_buf_size() {
205 poll_read_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
206 }
207
208 #[fasync::run_singlethreaded(test)]
209 async fn poll_read_pending_saves_future() {
210 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
211
212 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
213
214 let () = poll_fn(|cx| {
218 assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut [0u8; 1]), Poll::Pending);
219 Poll::Ready(())
220 })
221 .await;
222
223 let poll_read = async move {
226 let mut buf = [0u8; 1];
227 assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
228 assert_eq!(&buf, &[1]);
229 };
230
231 let mut file_read_requests = 0u8;
232 let handle_file_stream = async {
233 while let Some(req) = stream.try_next().await.unwrap() {
234 file_read_requests += 1;
235 match req {
236 fio::FileRequest::Read { count, responder } => {
237 assert_eq!(count, 1);
238 responder.send(Ok(&[file_read_requests])).unwrap();
239 }
240 req => panic!("unhandled request {:?}", req),
241 }
242 }
243 };
244
245 let ((), ()) = join!(poll_read, handle_file_stream);
246 assert_eq!(file_read_requests, 1);
247 }
248
249 #[fasync::run_singlethreaded(test)]
250 async fn poll_read_with_smaller_buf_after_pending() {
251 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
252
253 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
254
255 let () = poll_fn(|cx| {
259 assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut [0u8; 3]), Poll::Pending);
260 Poll::Ready(())
261 })
262 .await;
263
264 let () = async {
266 match stream.next().await.unwrap().unwrap() {
267 fio::FileRequest::Read { count, responder } => {
268 assert_eq!(count, 3);
269 responder.send(Ok(b"012")).unwrap();
270 }
271 req => panic!("unhandled request {:?}", req),
272 }
273 }
274 .await;
275
276 let mut buf = [0u8; 1];
279 assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
280 assert_eq!(&buf, b"0");
281
282 let mut buf = [0u8; 1];
285 assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
286 assert_eq!(&buf, b"1");
287
288 let mut buf = [0u8; 2];
291 assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
292 assert_eq!(&buf[..1], b"2");
293
294 let mut buf = [0u8; 4];
297 let poll_read = reader.read(&mut buf);
298
299 let handle_second_file_request = async {
300 match stream.next().await.unwrap().unwrap() {
301 fio::FileRequest::Read { count, responder } => {
302 assert_eq!(count, 4);
303 responder.send(Ok(b"3456")).unwrap();
304 }
305 req => panic!("unhandled request {:?}", req),
306 }
307 };
308
309 let (read_res, ()) = join!(poll_read, handle_second_file_request);
310 assert_eq!(read_res.unwrap(), 4);
311 assert_eq!(&buf, b"3456");
312 }
313
314 #[fasync::run_singlethreaded(test)]
315 async fn transition_to_empty_on_fidl_error() {
316 let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
317
318 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
319
320 let () = poll_fn(|cx| {
322 assert_matches!(
323 Pin::new(&mut reader).poll_read(cx, &mut [0u8; 1]),
324 Poll::Ready(Err(_))
325 );
326 Poll::Ready(())
327 })
328 .await;
329
330 assert_matches!(reader.state, State::Empty);
335 }
336
337 #[fasync::run_singlethreaded(test)]
338 async fn recover_from_file_read_error() {
339 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
340
341 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
342
343 let mut buf = [0u8; 1];
345 let poll_read = reader.read(&mut buf);
346
347 let failing_file_response = async {
348 match stream.next().await.unwrap().unwrap() {
349 fio::FileRequest::Read { count, responder } => {
350 assert_eq!(count, 1);
351 responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
352 }
353 req => panic!("unhandled request {:?}", req),
354 }
355 };
356
357 let (read_res, ()) = join!(poll_read, failing_file_response);
358 assert_matches!(read_res, Err(_));
359
360 let mut buf = [0u8; 1];
363 let poll_read = reader.read(&mut buf);
364
365 let succeeding_file_response = async {
366 match stream.next().await.unwrap().unwrap() {
367 fio::FileRequest::Read { count, responder } => {
368 assert_eq!(count, 1);
369 responder.send(Ok(b"0")).unwrap();
370 }
371 req => panic!("unhandled request {:?}", req),
372 }
373 };
374
375 let (read_res, ()) = join!(poll_read, succeeding_file_response);
376 assert_eq!(read_res.unwrap(), 1);
377 assert_eq!(&buf, b"0");
378 }
379
380 #[fasync::run_singlethreaded(test)]
381 async fn poll_read_zero_then_read_nonzero() {
382 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
383
384 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
385
386 let () = poll_fn(|cx| {
388 assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut []), Poll::Pending);
389 Poll::Ready(())
390 })
391 .await;
392
393 match stream.next().await.unwrap().unwrap() {
395 fio::FileRequest::Read { count, responder } => {
396 assert_eq!(count, 0);
397 responder.send(Ok(&[])).unwrap();
398 }
399 req => panic!("unhandled request {:?}", req),
400 }
401
402 let mut buf = vec![0u8; 1];
404 let poll_read = reader.read(&mut buf);
405
406 let handle_file_request = async {
411 match stream.next().await.unwrap().unwrap() {
412 fio::FileRequest::Read { count, responder } => {
413 assert_eq!(count, 1);
414 responder.send(Ok(&[1])).unwrap();
415 }
416 req => panic!("unhandled request {:?}", req),
417 }
418 };
419
420 let (poll_read, ()) = join!(poll_read, handle_file_request);
421
422 assert_eq!(poll_read.unwrap(), 1);
425 assert_eq!(&buf[..], &[1]);
426 }
427
428 #[fasync::run_singlethreaded(test)]
429 async fn different_poll_read_and_file_sizes() {
430 for first_poll_read_len in 0..5 {
431 for file_size in 0..5 {
432 for second_poll_read_len in 0..5 {
433 let (proxy, mut stream) =
434 endpoints::create_proxy_and_stream::<fio::FileMarker>();
435
436 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
437
438 let () = poll_fn(|cx| {
440 let mut buf = vec![0u8; first_poll_read_len];
441 assert_matches!(
442 Pin::new(&mut reader).poll_read(cx, &mut buf),
443 Poll::Pending
444 );
445 Poll::Ready(())
446 })
447 .await;
448
449 match stream.next().await.unwrap().unwrap() {
452 fio::FileRequest::Read { count, responder } => {
453 assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
454 let resp = vec![7u8; min(file_size, first_poll_read_len)];
455 responder.send(Ok(&resp)).unwrap();
456 }
457 req => panic!("unhandled request {:?}", req),
458 }
459
460 let mut buf = vec![0u8; second_poll_read_len];
464 let poll_read = reader.read(&mut buf);
465
466 let handle_conditional_file_request = async {
467 if first_poll_read_len == 0 && second_poll_read_len != 0 {
468 match stream.next().await.unwrap().unwrap() {
469 fio::FileRequest::Read { count, responder } => {
470 assert_eq!(count, u64::try_from(second_poll_read_len).unwrap());
471 let resp = vec![7u8; min(file_size, second_poll_read_len)];
472 responder.send(Ok(&resp)).unwrap();
473 }
474 req => panic!("unhandled request {:?}", req),
475 }
476 }
477 };
478
479 let (read_res, ()) = join!(poll_read, handle_conditional_file_request);
480
481 let expected_len = if first_poll_read_len == 0 {
482 min(file_size, second_poll_read_len)
483 } else {
484 min(first_poll_read_len, min(file_size, second_poll_read_len))
485 };
486 let expected = vec![7u8; expected_len];
487 assert_eq!(read_res.unwrap(), expected_len);
488 assert_eq!(&buf[..expected_len], &expected[..]);
489 }
490 }
491 }
492 }
493}