fuchsia_fs/file/
async_reader.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::client::QueryResponseFut;
6use fidl::endpoints::Proxy as _;
7use futures::io::AsyncRead;
8use std::cmp::min;
9use std::convert::TryInto as _;
10use std::future::Future as _;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use {fidl_fuchsia_io as fio, zx_status};
14
15/// Wraps a `fidl_fuchsia_io::FileProxy` and implements `futures::io::AsyncRead`, which allows one
16/// to perform asynchronous file reads that don't block the current thread while waiting for data.
17#[derive(Debug)]
18pub struct AsyncReader {
19    file: fio::FileProxy,
20    state: State,
21}
22
23#[derive(Debug)]
24enum State {
25    Empty,
26    Forwarding { fut: QueryResponseFut<Result<Vec<u8>, i32>>, zero_byte_request: bool },
27    Bytes { bytes: Vec<u8>, offset: usize },
28}
29
30impl AsyncReader {
31    /// Errors if the provided `FileProxy` does not exclusively own the wrapped channel.
32    ///
33    /// Exclusive ownership avoids surprising behavior arising from the mismatch between the
34    /// semantics for `AsyncRead` and `fuchsia.io/File.Read`. On e.g. Linux, if two `AsyncRead`
35    /// objects were wrapping the same file descriptor and a call to `poll_read` on one of the
36    /// `AsyncRead` objects returned `Pending`, a client would generally not expect the offset of
37    /// the underlying file descriptor to advance. Meaning that a client could then call `poll_read`
38    /// on the other `AsyncRead` object and expect not to miss any file contents. However, with an
39    /// `AsyncRead` implementation that wraps `fuchsia.io/File.Read`, a `poll_read` call that
40    /// returns `Pending` would advance the file offset, meaning that interleaving usage of
41    /// `AsyncRead` objects that share a channel would return file contents in surprising order.
42    pub fn from_proxy(file: fio::FileProxy) -> Result<Self, AsyncReaderError> {
43        let file = match file.into_channel() {
44            Ok(channel) => fio::FileProxy::new(channel),
45            Err(file) => {
46                return Err(AsyncReaderError::NonExclusiveChannelOwnership(file));
47            }
48        };
49        Ok(Self { file, state: State::Empty })
50    }
51}
52
53impl AsyncRead for AsyncReader {
54    fn poll_read(
55        mut self: Pin<&mut Self>,
56        cx: &mut Context<'_>,
57        buf: &mut [u8],
58    ) -> Poll<std::io::Result<usize>> {
59        loop {
60            match self.state {
61                State::Empty => {
62                    let len = if let Ok(len) = buf.len().try_into() {
63                        min(len, fio::MAX_BUF)
64                    } else {
65                        fio::MAX_BUF
66                    };
67                    self.state =
68                        State::Forwarding { fut: self.file.read(len), zero_byte_request: len == 0 };
69                }
70                State::Forwarding { ref mut fut, ref zero_byte_request } => {
71                    match futures::ready!(Pin::new(fut).poll(cx)) {
72                        Ok(result) => {
73                            match result {
74                                Err(s) => {
75                                    self.state = State::Empty;
76                                    return Poll::Ready(Err(
77                                        zx_status::Status::from_raw(s).into_io_error()
78                                    ));
79                                }
80                                Ok(bytes) => {
81                                    // If the File.Read request was for zero bytes, but the current
82                                    // poll_read is not (because the File.Read request was made by an
83                                    // earlier call to poll_read with a zero length buffer) then we should
84                                    // not advance to State::Bytes because that would return Ready(Ok(0)),
85                                    // which would indicate EOF to the client.
86                                    // This handling is done here instead of short-circuiting at the
87                                    // beginning of the function so that zero-length poll_reads still
88                                    // trigger the validation performed by File.Read.
89                                    if *zero_byte_request && buf.len() != 0 {
90                                        self.state = State::Empty;
91                                    } else {
92                                        self.state = State::Bytes { bytes, offset: 0 };
93                                    }
94                                }
95                            }
96                        }
97                        Err(e) => {
98                            self.state = State::Empty;
99                            return Poll::Ready(Err(std::io::Error::other(e)));
100                        }
101                    }
102                }
103                State::Bytes { ref bytes, ref mut offset } => {
104                    let n = min(buf.len(), bytes.len() - *offset);
105                    let next_offset = *offset + n;
106                    let () = buf[..n].copy_from_slice(&bytes[*offset..next_offset]);
107                    if next_offset == bytes.len() {
108                        self.state = State::Empty;
109                    } else {
110                        *offset = next_offset;
111                    }
112                    return Poll::Ready(Ok(n));
113                }
114            }
115        }
116    }
117}
118
119#[derive(Debug, thiserror::Error)]
120pub enum AsyncReaderError {
121    #[error("Supplied FileProxy did not have exclusive ownership of the underlying channel")]
122    NonExclusiveChannelOwnership(fio::FileProxy),
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::file;
129    use assert_matches::assert_matches;
130    use fidl::endpoints;
131    use fuchsia_async as fasync;
132    use futures::future::poll_fn;
133    use futures::io::AsyncReadExt as _;
134    use futures::{join, StreamExt as _, TryStreamExt as _};
135    use std::convert::TryFrom as _;
136    use tempfile::TempDir;
137
138    #[fasync::run_singlethreaded(test)]
139    async fn exclusive_ownership() {
140        let (proxy, _) = endpoints::create_proxy::<fio::FileMarker>();
141        let _stream = proxy.take_event_stream();
142
143        assert_matches!(AsyncReader::from_proxy(proxy), Err(_));
144    }
145
146    async fn read_to_end_file_with_expected_contents(expected_contents: &[u8]) {
147        let dir = TempDir::new().unwrap();
148        let path =
149            dir.path().join("read_to_end_with_expected_contents").to_str().unwrap().to_owned();
150        let () = file::write_in_namespace(&path, expected_contents).await.unwrap();
151        let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
152
153        let mut reader = AsyncReader::from_proxy(file).unwrap();
154        let mut actual_contents = vec![];
155        reader.read_to_end(&mut actual_contents).await.unwrap();
156
157        assert_eq!(actual_contents, expected_contents);
158    }
159
160    #[fasync::run_singlethreaded(test)]
161    async fn read_to_end_empty() {
162        read_to_end_file_with_expected_contents(&[]).await;
163    }
164
165    #[fasync::run_singlethreaded(test)]
166    async fn read_to_end_large() {
167        let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
168        read_to_end_file_with_expected_contents(&expected_contents[..]).await;
169    }
170
171    async fn poll_read_with_specific_buf_size(poll_read_size: u64, expected_file_read_size: u64) {
172        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
173
174        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
175
176        let () = poll_fn(|cx| {
177            let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
178            assert_matches!(Pin::new(&mut reader).poll_read(cx, buf.as_mut_slice()), Poll::Pending);
179            Poll::Ready(())
180        })
181        .await;
182
183        match stream.next().await.unwrap().unwrap() {
184            fio::FileRequest::Read { count, .. } => {
185                assert_eq!(count, expected_file_read_size);
186            }
187            req => panic!("unhandled request {:?}", req),
188        }
189    }
190
191    #[fasync::run_singlethreaded(test)]
192    async fn poll_read_empty_buf() {
193        poll_read_with_specific_buf_size(0, 0).await;
194    }
195
196    #[fasync::run_singlethreaded(test)]
197    async fn poll_read_caps_buf_size() {
198        poll_read_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
199    }
200
201    #[fasync::run_singlethreaded(test)]
202    async fn poll_read_pending_saves_future() {
203        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
204
205        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
206
207        // This poll_read call will create a File.Read future and poll it. The poll of the File.Read
208        // future will return Pending because nothing is handling the FileRequestStream yet. The
209        // reader should save this File.Read future for handling subsequent poll_read calls.
210        let () = poll_fn(|cx| {
211            assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut [0u8; 1]), Poll::Pending);
212            Poll::Ready(())
213        })
214        .await;
215
216        // Call poll_read until we get a byte out. This byte should be from the first and only
217        // File.Read request.
218        let poll_read = async move {
219            let mut buf = [0u8; 1];
220            assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
221            assert_eq!(&buf, &[1]);
222        };
223
224        let mut file_read_requests = 0u8;
225        let handle_file_stream = async {
226            while let Some(req) = stream.try_next().await.unwrap() {
227                file_read_requests += 1;
228                match req {
229                    fio::FileRequest::Read { count, responder } => {
230                        assert_eq!(count, 1);
231                        responder.send(Ok(&[file_read_requests])).unwrap();
232                    }
233                    req => panic!("unhandled request {:?}", req),
234                }
235            }
236        };
237
238        let ((), ()) = join!(poll_read, handle_file_stream);
239        assert_eq!(file_read_requests, 1);
240    }
241
242    #[fasync::run_singlethreaded(test)]
243    async fn poll_read_with_smaller_buf_after_pending() {
244        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
245
246        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
247
248        // Call poll_read with a buf of length 3. This is the first poll_read call, so the reader
249        // will create a File.Read future for 3 bytes. poll_read will return Pending because nothing
250        // is handling the FileRequestStream yet.
251        let () = poll_fn(|cx| {
252            assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut [0u8; 3]), Poll::Pending);
253            Poll::Ready(())
254        })
255        .await;
256
257        // Respond to the three byte File.Read request.
258        let () = async {
259            match stream.next().await.unwrap().unwrap() {
260                fio::FileRequest::Read { count, responder } => {
261                    assert_eq!(count, 3);
262                    responder.send(Ok(b"012")).unwrap();
263                }
264                req => panic!("unhandled request {:?}", req),
265            }
266        }
267        .await;
268
269        // Call poll_read with a buf of length 1. This should resolve the previously created 3 byte
270        // File.Read future and return the first byte from it while saving the remaining two bytes.
271        let mut buf = [0u8; 1];
272        assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
273        assert_eq!(&buf, b"0");
274
275        // Call poll_read with a buf of len 1. This should return the first saved byte, which should
276        // be the second byte from the original File.Read request.
277        let mut buf = [0u8; 1];
278        assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
279        assert_eq!(&buf, b"1");
280
281        // Call poll_read with a buf of len 2. There should only be one remaining saved byte from
282        // the original File.Read request, so poll_read should only return one byte.
283        let mut buf = [0u8; 2];
284        assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
285        assert_eq!(&buf[..1], b"2");
286
287        // There should be no saved bytes remaining, so a poll_read of four bytes should cause a new
288        // File.Read request.
289        let mut buf = [0u8; 4];
290        let poll_read = reader.read(&mut buf);
291
292        let handle_second_file_request = async {
293            match stream.next().await.unwrap().unwrap() {
294                fio::FileRequest::Read { count, responder } => {
295                    assert_eq!(count, 4);
296                    responder.send(Ok(b"3456")).unwrap();
297                }
298                req => panic!("unhandled request {:?}", req),
299            }
300        };
301
302        let (read_res, ()) = join!(poll_read, handle_second_file_request);
303        assert_eq!(read_res.unwrap(), 4);
304        assert_eq!(&buf, b"3456");
305    }
306
307    #[fasync::run_singlethreaded(test)]
308    async fn transition_to_empty_on_fidl_error() {
309        let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
310
311        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
312
313        // poll_read will fail because the channel is closed because the server end was dropped.
314        let () = poll_fn(|cx| {
315            assert_matches!(
316                Pin::new(&mut reader).poll_read(cx, &mut [0u8; 1]),
317                Poll::Ready(Err(_))
318            );
319            Poll::Ready(())
320        })
321        .await;
322
323        // This test is accessing internal state because the only fidl error that is easy to inject
324        // is ZX_ERR_PEER_CLOSED (by closing the channel). Once the channel is closed, all new
325        // futures created by the AsyncReader will fail, but, if poll'ed, the old future would also
326        // continue to fail (not panic) because it is Fused.
327        assert_matches!(reader.state, State::Empty);
328    }
329
330    #[fasync::run_singlethreaded(test)]
331    async fn recover_from_file_read_error() {
332        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
333
334        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
335
336        // Call poll_read until failure.
337        let mut buf = [0u8; 1];
338        let poll_read = reader.read(&mut buf);
339
340        let failing_file_response = async {
341            match stream.next().await.unwrap().unwrap() {
342                fio::FileRequest::Read { count, responder } => {
343                    assert_eq!(count, 1);
344                    responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
345                }
346                req => panic!("unhandled request {:?}", req),
347            }
348        };
349
350        let (read_res, ()) = join!(poll_read, failing_file_response);
351        assert_matches!(read_res, Err(_));
352
353        // Calling poll_read again should create a new File.Read request instead of reusing the
354        // old future.
355        let mut buf = [0u8; 1];
356        let poll_read = reader.read(&mut buf);
357
358        let succeeding_file_response = async {
359            match stream.next().await.unwrap().unwrap() {
360                fio::FileRequest::Read { count, responder } => {
361                    assert_eq!(count, 1);
362                    responder.send(Ok(b"0")).unwrap();
363                }
364                req => panic!("unhandled request {:?}", req),
365            }
366        };
367
368        let (read_res, ()) = join!(poll_read, succeeding_file_response);
369        assert_eq!(read_res.unwrap(), 1);
370        assert_eq!(&buf, b"0");
371    }
372
373    #[fasync::run_singlethreaded(test)]
374    async fn poll_read_zero_then_read_nonzero() {
375        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
376
377        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
378
379        // Call poll_read with a zero-length buffer.
380        let () = poll_fn(|cx| {
381            assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut []), Poll::Pending);
382            Poll::Ready(())
383        })
384        .await;
385
386        // Handle the zero-length File.Read request.
387        match stream.next().await.unwrap().unwrap() {
388            fio::FileRequest::Read { count, responder } => {
389                assert_eq!(count, 0);
390                responder.send(Ok(&[])).unwrap();
391            }
392            req => panic!("unhandled request {:?}", req),
393        }
394
395        // Call poll_read with a length 1 buffer until Ready is returned;
396        let mut buf = vec![0u8; 1];
397        let poll_read = reader.read(&mut buf);
398
399        // The AsyncReader will discard the File.Read response from the first poll_read, and create
400        // another request, this handles that second request. The AsyncReader discards the first
401        // response because the first poll_read was for zero bytes, but the current poll_read is
402        // not.
403        let handle_file_request = async {
404            match stream.next().await.unwrap().unwrap() {
405                fio::FileRequest::Read { count, responder } => {
406                    assert_eq!(count, 1);
407                    responder.send(Ok(&[1])).unwrap();
408                }
409                req => panic!("unhandled request {:?}", req),
410            }
411        };
412
413        let (poll_read, ()) = join!(poll_read, handle_file_request);
414
415        // poll_read should read 1 byte, even though the first poll_read request was for zero bytes
416        // and returned Pending.
417        assert_eq!(poll_read.unwrap(), 1);
418        assert_eq!(&buf[..], &[1]);
419    }
420
421    #[fasync::run_singlethreaded(test)]
422    async fn different_poll_read_and_file_sizes() {
423        for first_poll_read_len in 0..5 {
424            for file_size in 0..5 {
425                for second_poll_read_len in 0..5 {
426                    let (proxy, mut stream) =
427                        endpoints::create_proxy_and_stream::<fio::FileMarker>();
428
429                    let mut reader = AsyncReader::from_proxy(proxy).unwrap();
430
431                    // poll_read causes the AsyncReader to create a File.Read request.
432                    let () = poll_fn(|cx| {
433                        let mut buf = vec![0u8; first_poll_read_len];
434                        assert_matches!(
435                            Pin::new(&mut reader).poll_read(cx, &mut buf),
436                            Poll::Pending
437                        );
438                        Poll::Ready(())
439                    })
440                    .await;
441
442                    // Respond to the File.Read request with at most as many bytes as the poll_read
443                    // requested.
444                    match stream.next().await.unwrap().unwrap() {
445                        fio::FileRequest::Read { count, responder } => {
446                            assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
447                            let resp = vec![7u8; min(file_size, first_poll_read_len)];
448                            responder.send(Ok(&resp)).unwrap();
449                        }
450                        req => panic!("unhandled request {:?}", req),
451                    }
452
453                    // Call poll_read until it returns Ready. If the first poll_read was for zero
454                    // bytes and this poll_read is not, the AsyncReader will make another File.Read
455                    // request.
456                    let mut buf = vec![0u8; second_poll_read_len];
457                    let poll_read = reader.read(&mut buf);
458
459                    let handle_conditional_file_request = async {
460                        if first_poll_read_len == 0 && second_poll_read_len != 0 {
461                            match stream.next().await.unwrap().unwrap() {
462                                fio::FileRequest::Read { count, responder } => {
463                                    assert_eq!(count, u64::try_from(second_poll_read_len).unwrap());
464                                    let resp = vec![7u8; min(file_size, second_poll_read_len)];
465                                    responder.send(Ok(&resp)).unwrap();
466                                }
467                                req => panic!("unhandled request {:?}", req),
468                            }
469                        }
470                    };
471
472                    let (read_res, ()) = join!(poll_read, handle_conditional_file_request);
473
474                    let expected_len = if first_poll_read_len == 0 {
475                        min(file_size, second_poll_read_len)
476                    } else {
477                        min(first_poll_read_len, min(file_size, second_poll_read_len))
478                    };
479                    let expected = vec![7u8; expected_len];
480                    assert_eq!(read_res.unwrap(), expected_len);
481                    assert_eq!(&buf[..expected_len], &expected[..]);
482                }
483            }
484        }
485    }
486}