1use fidl::client::QueryResponseFut;
6use fidl::endpoints::Proxy as _;
7use futures::io::AsyncRead;
8use std::cmp::min;
9use std::convert::TryInto as _;
10use std::future::Future as _;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use {fidl_fuchsia_io as fio, zx_status};
14
15#[derive(Debug)]
18pub struct AsyncReader {
19 file: fio::FileProxy,
20 state: State,
21}
22
23#[derive(Debug)]
24enum State {
25 Empty,
26 Forwarding { fut: QueryResponseFut<Result<Vec<u8>, i32>>, zero_byte_request: bool },
27 Bytes { bytes: Vec<u8>, offset: usize },
28}
29
30impl AsyncReader {
31 pub fn from_proxy(file: fio::FileProxy) -> Result<Self, AsyncReaderError> {
43 let file = match file.into_channel() {
44 Ok(channel) => fio::FileProxy::new(channel),
45 Err(file) => {
46 return Err(AsyncReaderError::NonExclusiveChannelOwnership(file));
47 }
48 };
49 Ok(Self { file, state: State::Empty })
50 }
51}
52
53impl AsyncRead for AsyncReader {
54 fn poll_read(
55 mut self: Pin<&mut Self>,
56 cx: &mut Context<'_>,
57 buf: &mut [u8],
58 ) -> Poll<std::io::Result<usize>> {
59 loop {
60 match self.state {
61 State::Empty => {
62 let len = if let Ok(len) = buf.len().try_into() {
63 min(len, fio::MAX_BUF)
64 } else {
65 fio::MAX_BUF
66 };
67 self.state =
68 State::Forwarding { fut: self.file.read(len), zero_byte_request: len == 0 };
69 }
70 State::Forwarding { ref mut fut, ref zero_byte_request } => {
71 match futures::ready!(Pin::new(fut).poll(cx)) {
72 Ok(result) => {
73 match result {
74 Err(s) => {
75 self.state = State::Empty;
76 return Poll::Ready(Err(
77 zx_status::Status::from_raw(s).into_io_error()
78 ));
79 }
80 Ok(bytes) => {
81 if *zero_byte_request && buf.len() != 0 {
90 self.state = State::Empty;
91 } else {
92 self.state = State::Bytes { bytes, offset: 0 };
93 }
94 }
95 }
96 }
97 Err(e) => {
98 self.state = State::Empty;
99 return Poll::Ready(Err(std::io::Error::other(e)));
100 }
101 }
102 }
103 State::Bytes { ref bytes, ref mut offset } => {
104 let n = min(buf.len(), bytes.len() - *offset);
105 let next_offset = *offset + n;
106 let () = buf[..n].copy_from_slice(&bytes[*offset..next_offset]);
107 if next_offset == bytes.len() {
108 self.state = State::Empty;
109 } else {
110 *offset = next_offset;
111 }
112 return Poll::Ready(Ok(n));
113 }
114 }
115 }
116 }
117}
118
119#[derive(Debug, thiserror::Error)]
120pub enum AsyncReaderError {
121 #[error("Supplied FileProxy did not have exclusive ownership of the underlying channel")]
122 NonExclusiveChannelOwnership(fio::FileProxy),
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use crate::file;
129 use assert_matches::assert_matches;
130 use fidl::endpoints;
131 use fuchsia_async as fasync;
132 use futures::future::poll_fn;
133 use futures::io::AsyncReadExt as _;
134 use futures::{join, StreamExt as _, TryStreamExt as _};
135 use std::convert::TryFrom as _;
136 use tempfile::TempDir;
137
138 #[fasync::run_singlethreaded(test)]
139 async fn exclusive_ownership() {
140 let (proxy, _) = endpoints::create_proxy::<fio::FileMarker>();
141 let _stream = proxy.take_event_stream();
142
143 assert_matches!(AsyncReader::from_proxy(proxy), Err(_));
144 }
145
146 async fn read_to_end_file_with_expected_contents(expected_contents: &[u8]) {
147 let dir = TempDir::new().unwrap();
148 let path =
149 dir.path().join("read_to_end_with_expected_contents").to_str().unwrap().to_owned();
150 let () = file::write_in_namespace(&path, expected_contents).await.unwrap();
151 let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
152
153 let mut reader = AsyncReader::from_proxy(file).unwrap();
154 let mut actual_contents = vec![];
155 reader.read_to_end(&mut actual_contents).await.unwrap();
156
157 assert_eq!(actual_contents, expected_contents);
158 }
159
160 #[fasync::run_singlethreaded(test)]
161 async fn read_to_end_empty() {
162 read_to_end_file_with_expected_contents(&[]).await;
163 }
164
165 #[fasync::run_singlethreaded(test)]
166 async fn read_to_end_large() {
167 let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
168 read_to_end_file_with_expected_contents(&expected_contents[..]).await;
169 }
170
171 async fn poll_read_with_specific_buf_size(poll_read_size: u64, expected_file_read_size: u64) {
172 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
173
174 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
175
176 let () = poll_fn(|cx| {
177 let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
178 assert_matches!(Pin::new(&mut reader).poll_read(cx, buf.as_mut_slice()), Poll::Pending);
179 Poll::Ready(())
180 })
181 .await;
182
183 match stream.next().await.unwrap().unwrap() {
184 fio::FileRequest::Read { count, .. } => {
185 assert_eq!(count, expected_file_read_size);
186 }
187 req => panic!("unhandled request {:?}", req),
188 }
189 }
190
191 #[fasync::run_singlethreaded(test)]
192 async fn poll_read_empty_buf() {
193 poll_read_with_specific_buf_size(0, 0).await;
194 }
195
196 #[fasync::run_singlethreaded(test)]
197 async fn poll_read_caps_buf_size() {
198 poll_read_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
199 }
200
201 #[fasync::run_singlethreaded(test)]
202 async fn poll_read_pending_saves_future() {
203 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
204
205 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
206
207 let () = poll_fn(|cx| {
211 assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut [0u8; 1]), Poll::Pending);
212 Poll::Ready(())
213 })
214 .await;
215
216 let poll_read = async move {
219 let mut buf = [0u8; 1];
220 assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
221 assert_eq!(&buf, &[1]);
222 };
223
224 let mut file_read_requests = 0u8;
225 let handle_file_stream = async {
226 while let Some(req) = stream.try_next().await.unwrap() {
227 file_read_requests += 1;
228 match req {
229 fio::FileRequest::Read { count, responder } => {
230 assert_eq!(count, 1);
231 responder.send(Ok(&[file_read_requests])).unwrap();
232 }
233 req => panic!("unhandled request {:?}", req),
234 }
235 }
236 };
237
238 let ((), ()) = join!(poll_read, handle_file_stream);
239 assert_eq!(file_read_requests, 1);
240 }
241
242 #[fasync::run_singlethreaded(test)]
243 async fn poll_read_with_smaller_buf_after_pending() {
244 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
245
246 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
247
248 let () = poll_fn(|cx| {
252 assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut [0u8; 3]), Poll::Pending);
253 Poll::Ready(())
254 })
255 .await;
256
257 let () = async {
259 match stream.next().await.unwrap().unwrap() {
260 fio::FileRequest::Read { count, responder } => {
261 assert_eq!(count, 3);
262 responder.send(Ok(b"012")).unwrap();
263 }
264 req => panic!("unhandled request {:?}", req),
265 }
266 }
267 .await;
268
269 let mut buf = [0u8; 1];
272 assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
273 assert_eq!(&buf, b"0");
274
275 let mut buf = [0u8; 1];
278 assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
279 assert_eq!(&buf, b"1");
280
281 let mut buf = [0u8; 2];
284 assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
285 assert_eq!(&buf[..1], b"2");
286
287 let mut buf = [0u8; 4];
290 let poll_read = reader.read(&mut buf);
291
292 let handle_second_file_request = async {
293 match stream.next().await.unwrap().unwrap() {
294 fio::FileRequest::Read { count, responder } => {
295 assert_eq!(count, 4);
296 responder.send(Ok(b"3456")).unwrap();
297 }
298 req => panic!("unhandled request {:?}", req),
299 }
300 };
301
302 let (read_res, ()) = join!(poll_read, handle_second_file_request);
303 assert_eq!(read_res.unwrap(), 4);
304 assert_eq!(&buf, b"3456");
305 }
306
307 #[fasync::run_singlethreaded(test)]
308 async fn transition_to_empty_on_fidl_error() {
309 let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
310
311 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
312
313 let () = poll_fn(|cx| {
315 assert_matches!(
316 Pin::new(&mut reader).poll_read(cx, &mut [0u8; 1]),
317 Poll::Ready(Err(_))
318 );
319 Poll::Ready(())
320 })
321 .await;
322
323 assert_matches!(reader.state, State::Empty);
328 }
329
330 #[fasync::run_singlethreaded(test)]
331 async fn recover_from_file_read_error() {
332 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
333
334 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
335
336 let mut buf = [0u8; 1];
338 let poll_read = reader.read(&mut buf);
339
340 let failing_file_response = async {
341 match stream.next().await.unwrap().unwrap() {
342 fio::FileRequest::Read { count, responder } => {
343 assert_eq!(count, 1);
344 responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
345 }
346 req => panic!("unhandled request {:?}", req),
347 }
348 };
349
350 let (read_res, ()) = join!(poll_read, failing_file_response);
351 assert_matches!(read_res, Err(_));
352
353 let mut buf = [0u8; 1];
356 let poll_read = reader.read(&mut buf);
357
358 let succeeding_file_response = async {
359 match stream.next().await.unwrap().unwrap() {
360 fio::FileRequest::Read { count, responder } => {
361 assert_eq!(count, 1);
362 responder.send(Ok(b"0")).unwrap();
363 }
364 req => panic!("unhandled request {:?}", req),
365 }
366 };
367
368 let (read_res, ()) = join!(poll_read, succeeding_file_response);
369 assert_eq!(read_res.unwrap(), 1);
370 assert_eq!(&buf, b"0");
371 }
372
373 #[fasync::run_singlethreaded(test)]
374 async fn poll_read_zero_then_read_nonzero() {
375 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
376
377 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
378
379 let () = poll_fn(|cx| {
381 assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut []), Poll::Pending);
382 Poll::Ready(())
383 })
384 .await;
385
386 match stream.next().await.unwrap().unwrap() {
388 fio::FileRequest::Read { count, responder } => {
389 assert_eq!(count, 0);
390 responder.send(Ok(&[])).unwrap();
391 }
392 req => panic!("unhandled request {:?}", req),
393 }
394
395 let mut buf = vec![0u8; 1];
397 let poll_read = reader.read(&mut buf);
398
399 let handle_file_request = async {
404 match stream.next().await.unwrap().unwrap() {
405 fio::FileRequest::Read { count, responder } => {
406 assert_eq!(count, 1);
407 responder.send(Ok(&[1])).unwrap();
408 }
409 req => panic!("unhandled request {:?}", req),
410 }
411 };
412
413 let (poll_read, ()) = join!(poll_read, handle_file_request);
414
415 assert_eq!(poll_read.unwrap(), 1);
418 assert_eq!(&buf[..], &[1]);
419 }
420
421 #[fasync::run_singlethreaded(test)]
422 async fn different_poll_read_and_file_sizes() {
423 for first_poll_read_len in 0..5 {
424 for file_size in 0..5 {
425 for second_poll_read_len in 0..5 {
426 let (proxy, mut stream) =
427 endpoints::create_proxy_and_stream::<fio::FileMarker>();
428
429 let mut reader = AsyncReader::from_proxy(proxy).unwrap();
430
431 let () = poll_fn(|cx| {
433 let mut buf = vec![0u8; first_poll_read_len];
434 assert_matches!(
435 Pin::new(&mut reader).poll_read(cx, &mut buf),
436 Poll::Pending
437 );
438 Poll::Ready(())
439 })
440 .await;
441
442 match stream.next().await.unwrap().unwrap() {
445 fio::FileRequest::Read { count, responder } => {
446 assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
447 let resp = vec![7u8; min(file_size, first_poll_read_len)];
448 responder.send(Ok(&resp)).unwrap();
449 }
450 req => panic!("unhandled request {:?}", req),
451 }
452
453 let mut buf = vec![0u8; second_poll_read_len];
457 let poll_read = reader.read(&mut buf);
458
459 let handle_conditional_file_request = async {
460 if first_poll_read_len == 0 && second_poll_read_len != 0 {
461 match stream.next().await.unwrap().unwrap() {
462 fio::FileRequest::Read { count, responder } => {
463 assert_eq!(count, u64::try_from(second_poll_read_len).unwrap());
464 let resp = vec![7u8; min(file_size, second_poll_read_len)];
465 responder.send(Ok(&resp)).unwrap();
466 }
467 req => panic!("unhandled request {:?}", req),
468 }
469 }
470 };
471
472 let (read_res, ()) = join!(poll_read, handle_conditional_file_request);
473
474 let expected_len = if first_poll_read_len == 0 {
475 min(file_size, second_poll_read_len)
476 } else {
477 min(first_poll_read_len, min(file_size, second_poll_read_len))
478 };
479 let expected = vec![7u8; expected_len];
480 assert_eq!(read_res.unwrap(), expected_len);
481 assert_eq!(&buf[..expected_len], &expected[..]);
482 }
483 }
484 }
485 }
486}