fuchsia_fs/file/
async_reader.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fidl::client::QueryResponseFut;
6use flex_fuchsia_io as fio;
7use futures::io::AsyncRead;
8use std::cmp::min;
9use std::convert::TryInto as _;
10use std::future::Future as _;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use flex_client::fidl::Proxy as _;
15
16/// Wraps a `fidl_fuchsia_io::FileProxy` and implements `futures::io::AsyncRead`, which allows one
17/// to perform asynchronous file reads that don't block the current thread while waiting for data.
18#[derive(Debug)]
19pub struct AsyncReader {
20    file: fio::FileProxy,
21    state: State,
22}
23
24#[derive(Debug)]
25enum State {
26    Empty,
27    Forwarding {
28        fut: QueryResponseFut<Result<Vec<u8>, i32>, flex_client::Dialect>,
29        zero_byte_request: bool,
30    },
31    Bytes {
32        bytes: Vec<u8>,
33        offset: usize,
34    },
35}
36
37impl AsyncReader {
38    /// Errors if the provided `FileProxy` does not exclusively own the wrapped channel.
39    ///
40    /// Exclusive ownership avoids surprising behavior arising from the mismatch between the
41    /// semantics for `AsyncRead` and `fuchsia.io/File.Read`. On e.g. Linux, if two `AsyncRead`
42    /// objects were wrapping the same file descriptor and a call to `poll_read` on one of the
43    /// `AsyncRead` objects returned `Pending`, a client would generally not expect the offset of
44    /// the underlying file descriptor to advance. Meaning that a client could then call `poll_read`
45    /// on the other `AsyncRead` object and expect not to miss any file contents. However, with an
46    /// `AsyncRead` implementation that wraps `fuchsia.io/File.Read`, a `poll_read` call that
47    /// returns `Pending` would advance the file offset, meaning that interleaving usage of
48    /// `AsyncRead` objects that share a channel would return file contents in surprising order.
49    pub fn from_proxy(file: fio::FileProxy) -> Result<Self, AsyncReaderError> {
50        let file = match file.into_channel() {
51            Ok(channel) => fio::FileProxy::new(channel),
52            Err(file) => {
53                return Err(AsyncReaderError::NonExclusiveChannelOwnership(file));
54            }
55        };
56        Ok(Self { file, state: State::Empty })
57    }
58}
59
60impl AsyncRead for AsyncReader {
61    fn poll_read(
62        mut self: Pin<&mut Self>,
63        cx: &mut Context<'_>,
64        buf: &mut [u8],
65    ) -> Poll<std::io::Result<usize>> {
66        loop {
67            match self.state {
68                State::Empty => {
69                    let len = if let Ok(len) = buf.len().try_into() {
70                        min(len, fio::MAX_BUF)
71                    } else {
72                        fio::MAX_BUF
73                    };
74                    self.state =
75                        State::Forwarding { fut: self.file.read(len), zero_byte_request: len == 0 };
76                }
77                State::Forwarding { ref mut fut, ref zero_byte_request } => {
78                    match futures::ready!(Pin::new(fut).poll(cx)) {
79                        Ok(result) => {
80                            match result {
81                                Err(s) => {
82                                    self.state = State::Empty;
83                                    return Poll::Ready(Err(
84                                        zx_status::Status::from_raw(s).into_io_error()
85                                    ));
86                                }
87                                Ok(bytes) => {
88                                    // If the File.Read request was for zero bytes, but the current
89                                    // poll_read is not (because the File.Read request was made by an
90                                    // earlier call to poll_read with a zero length buffer) then we should
91                                    // not advance to State::Bytes because that would return Ready(Ok(0)),
92                                    // which would indicate EOF to the client.
93                                    // This handling is done here instead of short-circuiting at the
94                                    // beginning of the function so that zero-length poll_reads still
95                                    // trigger the validation performed by File.Read.
96                                    if *zero_byte_request && buf.len() != 0 {
97                                        self.state = State::Empty;
98                                    } else {
99                                        self.state = State::Bytes { bytes, offset: 0 };
100                                    }
101                                }
102                            }
103                        }
104                        Err(e) => {
105                            self.state = State::Empty;
106                            return Poll::Ready(Err(std::io::Error::other(e)));
107                        }
108                    }
109                }
110                State::Bytes { ref bytes, ref mut offset } => {
111                    let n = min(buf.len(), bytes.len() - *offset);
112                    let next_offset = *offset + n;
113                    let () = buf[..n].copy_from_slice(&bytes[*offset..next_offset]);
114                    if next_offset == bytes.len() {
115                        self.state = State::Empty;
116                    } else {
117                        *offset = next_offset;
118                    }
119                    return Poll::Ready(Ok(n));
120                }
121            }
122        }
123    }
124}
125
126#[derive(Debug, thiserror::Error)]
127pub enum AsyncReaderError {
128    #[error("Supplied FileProxy did not have exclusive ownership of the underlying channel")]
129    NonExclusiveChannelOwnership(fio::FileProxy),
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use crate::file;
136    use assert_matches::assert_matches;
137    use fidl::endpoints;
138    use fuchsia_async as fasync;
139    use futures::future::poll_fn;
140    use futures::io::AsyncReadExt as _;
141    use futures::{join, StreamExt as _, TryStreamExt as _};
142    use std::convert::TryFrom as _;
143    use tempfile::TempDir;
144
145    #[fasync::run_singlethreaded(test)]
146    async fn exclusive_ownership() {
147        let (proxy, _) = endpoints::create_proxy::<fio::FileMarker>();
148        let _stream = proxy.take_event_stream();
149
150        assert_matches!(AsyncReader::from_proxy(proxy), Err(_));
151    }
152
153    async fn read_to_end_file_with_expected_contents(expected_contents: &[u8]) {
154        let dir = TempDir::new().unwrap();
155        let path =
156            dir.path().join("read_to_end_with_expected_contents").to_str().unwrap().to_owned();
157        let () = file::write_in_namespace(&path, expected_contents).await.unwrap();
158        let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
159
160        let mut reader = AsyncReader::from_proxy(file).unwrap();
161        let mut actual_contents = vec![];
162        reader.read_to_end(&mut actual_contents).await.unwrap();
163
164        assert_eq!(actual_contents, expected_contents);
165    }
166
167    #[fasync::run_singlethreaded(test)]
168    async fn read_to_end_empty() {
169        read_to_end_file_with_expected_contents(&[]).await;
170    }
171
172    #[fasync::run_singlethreaded(test)]
173    async fn read_to_end_large() {
174        let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
175        read_to_end_file_with_expected_contents(&expected_contents[..]).await;
176    }
177
178    async fn poll_read_with_specific_buf_size(poll_read_size: u64, expected_file_read_size: u64) {
179        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
180
181        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
182
183        let () = poll_fn(|cx| {
184            let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
185            assert_matches!(Pin::new(&mut reader).poll_read(cx, buf.as_mut_slice()), Poll::Pending);
186            Poll::Ready(())
187        })
188        .await;
189
190        match stream.next().await.unwrap().unwrap() {
191            fio::FileRequest::Read { count, .. } => {
192                assert_eq!(count, expected_file_read_size);
193            }
194            req => panic!("unhandled request {:?}", req),
195        }
196    }
197
198    #[fasync::run_singlethreaded(test)]
199    async fn poll_read_empty_buf() {
200        poll_read_with_specific_buf_size(0, 0).await;
201    }
202
203    #[fasync::run_singlethreaded(test)]
204    async fn poll_read_caps_buf_size() {
205        poll_read_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
206    }
207
208    #[fasync::run_singlethreaded(test)]
209    async fn poll_read_pending_saves_future() {
210        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
211
212        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
213
214        // This poll_read call will create a File.Read future and poll it. The poll of the File.Read
215        // future will return Pending because nothing is handling the FileRequestStream yet. The
216        // reader should save this File.Read future for handling subsequent poll_read calls.
217        let () = poll_fn(|cx| {
218            assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut [0u8; 1]), Poll::Pending);
219            Poll::Ready(())
220        })
221        .await;
222
223        // Call poll_read until we get a byte out. This byte should be from the first and only
224        // File.Read request.
225        let poll_read = async move {
226            let mut buf = [0u8; 1];
227            assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
228            assert_eq!(&buf, &[1]);
229        };
230
231        let mut file_read_requests = 0u8;
232        let handle_file_stream = async {
233            while let Some(req) = stream.try_next().await.unwrap() {
234                file_read_requests += 1;
235                match req {
236                    fio::FileRequest::Read { count, responder } => {
237                        assert_eq!(count, 1);
238                        responder.send(Ok(&[file_read_requests])).unwrap();
239                    }
240                    req => panic!("unhandled request {:?}", req),
241                }
242            }
243        };
244
245        let ((), ()) = join!(poll_read, handle_file_stream);
246        assert_eq!(file_read_requests, 1);
247    }
248
249    #[fasync::run_singlethreaded(test)]
250    async fn poll_read_with_smaller_buf_after_pending() {
251        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
252
253        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
254
255        // Call poll_read with a buf of length 3. This is the first poll_read call, so the reader
256        // will create a File.Read future for 3 bytes. poll_read will return Pending because nothing
257        // is handling the FileRequestStream yet.
258        let () = poll_fn(|cx| {
259            assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut [0u8; 3]), Poll::Pending);
260            Poll::Ready(())
261        })
262        .await;
263
264        // Respond to the three byte File.Read request.
265        let () = async {
266            match stream.next().await.unwrap().unwrap() {
267                fio::FileRequest::Read { count, responder } => {
268                    assert_eq!(count, 3);
269                    responder.send(Ok(b"012")).unwrap();
270                }
271                req => panic!("unhandled request {:?}", req),
272            }
273        }
274        .await;
275
276        // Call poll_read with a buf of length 1. This should resolve the previously created 3 byte
277        // File.Read future and return the first byte from it while saving the remaining two bytes.
278        let mut buf = [0u8; 1];
279        assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
280        assert_eq!(&buf, b"0");
281
282        // Call poll_read with a buf of len 1. This should return the first saved byte, which should
283        // be the second byte from the original File.Read request.
284        let mut buf = [0u8; 1];
285        assert_eq!(reader.read(&mut buf).await.unwrap(), buf.len());
286        assert_eq!(&buf, b"1");
287
288        // Call poll_read with a buf of len 2. There should only be one remaining saved byte from
289        // the original File.Read request, so poll_read should only return one byte.
290        let mut buf = [0u8; 2];
291        assert_eq!(reader.read(&mut buf).await.unwrap(), 1);
292        assert_eq!(&buf[..1], b"2");
293
294        // There should be no saved bytes remaining, so a poll_read of four bytes should cause a new
295        // File.Read request.
296        let mut buf = [0u8; 4];
297        let poll_read = reader.read(&mut buf);
298
299        let handle_second_file_request = async {
300            match stream.next().await.unwrap().unwrap() {
301                fio::FileRequest::Read { count, responder } => {
302                    assert_eq!(count, 4);
303                    responder.send(Ok(b"3456")).unwrap();
304                }
305                req => panic!("unhandled request {:?}", req),
306            }
307        };
308
309        let (read_res, ()) = join!(poll_read, handle_second_file_request);
310        assert_eq!(read_res.unwrap(), 4);
311        assert_eq!(&buf, b"3456");
312    }
313
314    #[fasync::run_singlethreaded(test)]
315    async fn transition_to_empty_on_fidl_error() {
316        let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
317
318        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
319
320        // poll_read will fail because the channel is closed because the server end was dropped.
321        let () = poll_fn(|cx| {
322            assert_matches!(
323                Pin::new(&mut reader).poll_read(cx, &mut [0u8; 1]),
324                Poll::Ready(Err(_))
325            );
326            Poll::Ready(())
327        })
328        .await;
329
330        // This test is accessing internal state because the only fidl error that is easy to inject
331        // is ZX_ERR_PEER_CLOSED (by closing the channel). Once the channel is closed, all new
332        // futures created by the AsyncReader will fail, but, if poll'ed, the old future would also
333        // continue to fail (not panic) because it is Fused.
334        assert_matches!(reader.state, State::Empty);
335    }
336
337    #[fasync::run_singlethreaded(test)]
338    async fn recover_from_file_read_error() {
339        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
340
341        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
342
343        // Call poll_read until failure.
344        let mut buf = [0u8; 1];
345        let poll_read = reader.read(&mut buf);
346
347        let failing_file_response = async {
348            match stream.next().await.unwrap().unwrap() {
349                fio::FileRequest::Read { count, responder } => {
350                    assert_eq!(count, 1);
351                    responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
352                }
353                req => panic!("unhandled request {:?}", req),
354            }
355        };
356
357        let (read_res, ()) = join!(poll_read, failing_file_response);
358        assert_matches!(read_res, Err(_));
359
360        // Calling poll_read again should create a new File.Read request instead of reusing the
361        // old future.
362        let mut buf = [0u8; 1];
363        let poll_read = reader.read(&mut buf);
364
365        let succeeding_file_response = async {
366            match stream.next().await.unwrap().unwrap() {
367                fio::FileRequest::Read { count, responder } => {
368                    assert_eq!(count, 1);
369                    responder.send(Ok(b"0")).unwrap();
370                }
371                req => panic!("unhandled request {:?}", req),
372            }
373        };
374
375        let (read_res, ()) = join!(poll_read, succeeding_file_response);
376        assert_eq!(read_res.unwrap(), 1);
377        assert_eq!(&buf, b"0");
378    }
379
380    #[fasync::run_singlethreaded(test)]
381    async fn poll_read_zero_then_read_nonzero() {
382        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
383
384        let mut reader = AsyncReader::from_proxy(proxy).unwrap();
385
386        // Call poll_read with a zero-length buffer.
387        let () = poll_fn(|cx| {
388            assert_matches!(Pin::new(&mut reader).poll_read(cx, &mut []), Poll::Pending);
389            Poll::Ready(())
390        })
391        .await;
392
393        // Handle the zero-length File.Read request.
394        match stream.next().await.unwrap().unwrap() {
395            fio::FileRequest::Read { count, responder } => {
396                assert_eq!(count, 0);
397                responder.send(Ok(&[])).unwrap();
398            }
399            req => panic!("unhandled request {:?}", req),
400        }
401
402        // Call poll_read with a length 1 buffer until Ready is returned;
403        let mut buf = vec![0u8; 1];
404        let poll_read = reader.read(&mut buf);
405
406        // The AsyncReader will discard the File.Read response from the first poll_read, and create
407        // another request, this handles that second request. The AsyncReader discards the first
408        // response because the first poll_read was for zero bytes, but the current poll_read is
409        // not.
410        let handle_file_request = async {
411            match stream.next().await.unwrap().unwrap() {
412                fio::FileRequest::Read { count, responder } => {
413                    assert_eq!(count, 1);
414                    responder.send(Ok(&[1])).unwrap();
415                }
416                req => panic!("unhandled request {:?}", req),
417            }
418        };
419
420        let (poll_read, ()) = join!(poll_read, handle_file_request);
421
422        // poll_read should read 1 byte, even though the first poll_read request was for zero bytes
423        // and returned Pending.
424        assert_eq!(poll_read.unwrap(), 1);
425        assert_eq!(&buf[..], &[1]);
426    }
427
428    #[fasync::run_singlethreaded(test)]
429    async fn different_poll_read_and_file_sizes() {
430        for first_poll_read_len in 0..5 {
431            for file_size in 0..5 {
432                for second_poll_read_len in 0..5 {
433                    let (proxy, mut stream) =
434                        endpoints::create_proxy_and_stream::<fio::FileMarker>();
435
436                    let mut reader = AsyncReader::from_proxy(proxy).unwrap();
437
438                    // poll_read causes the AsyncReader to create a File.Read request.
439                    let () = poll_fn(|cx| {
440                        let mut buf = vec![0u8; first_poll_read_len];
441                        assert_matches!(
442                            Pin::new(&mut reader).poll_read(cx, &mut buf),
443                            Poll::Pending
444                        );
445                        Poll::Ready(())
446                    })
447                    .await;
448
449                    // Respond to the File.Read request with at most as many bytes as the poll_read
450                    // requested.
451                    match stream.next().await.unwrap().unwrap() {
452                        fio::FileRequest::Read { count, responder } => {
453                            assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
454                            let resp = vec![7u8; min(file_size, first_poll_read_len)];
455                            responder.send(Ok(&resp)).unwrap();
456                        }
457                        req => panic!("unhandled request {:?}", req),
458                    }
459
460                    // Call poll_read until it returns Ready. If the first poll_read was for zero
461                    // bytes and this poll_read is not, the AsyncReader will make another File.Read
462                    // request.
463                    let mut buf = vec![0u8; second_poll_read_len];
464                    let poll_read = reader.read(&mut buf);
465
466                    let handle_conditional_file_request = async {
467                        if first_poll_read_len == 0 && second_poll_read_len != 0 {
468                            match stream.next().await.unwrap().unwrap() {
469                                fio::FileRequest::Read { count, responder } => {
470                                    assert_eq!(count, u64::try_from(second_poll_read_len).unwrap());
471                                    let resp = vec![7u8; min(file_size, second_poll_read_len)];
472                                    responder.send(Ok(&resp)).unwrap();
473                                }
474                                req => panic!("unhandled request {:?}", req),
475                            }
476                        }
477                    };
478
479                    let (read_res, ()) = join!(poll_read, handle_conditional_file_request);
480
481                    let expected_len = if first_poll_read_len == 0 {
482                        min(file_size, second_poll_read_len)
483                    } else {
484                        min(first_poll_read_len, min(file_size, second_poll_read_len))
485                    };
486                    let expected = vec![7u8; expected_len];
487                    assert_eq!(read_res.unwrap(), expected_len);
488                    assert_eq!(&buf[..expected_len], &expected[..]);
489                }
490            }
491        }
492    }
493}