fuchsia_fs/file/
async_read_at_ext.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::file::AsyncReadAt;
6use futures::future::Future;
7use futures::ready;
8use std::convert::{TryFrom as _, TryInto as _};
9use std::io;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13/// An extension trait which adds utility methods to AsyncReadAt.
14pub trait AsyncReadAtExt: AsyncReadAt {
15    /// Returns a future that reads at `offset`, and fill `buf`, on success the number of bytes
16    /// read is returned.
17    fn read_at<'a>(&'a mut self, offset: u64, buf: &'a mut [u8]) -> ReadAt<'a, Self>
18    where
19        Self: Unpin,
20    {
21        ReadAt { reader: self, offset, buf }
22    }
23
24    /// Returns a future that reads at `offset`, and fill `buf` exactly.
25    fn read_at_exact<'a>(&'a mut self, offset: u64, buf: &'a mut [u8]) -> ReadAtExact<'a, Self>
26    where
27        Self: Unpin,
28    {
29        ReadAtExact { reader: self, offset, buf }
30    }
31
32    /// Returns a future that appends all data to `buf`, on success the number of bytes
33    /// read is returned.
34    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
35    where
36        Self: Unpin,
37    {
38        ReadToEnd { reader: self, buf }
39    }
40}
41
42impl<T: AsyncReadAt + ?Sized> AsyncReadAtExt for T {}
43
44/// Future for the [`read_at`](AsyncReadAtExt::read_at) method.
45#[derive(Debug)]
46#[must_use = "futures do nothing unless you `.await` or poll them"]
47pub struct ReadAt<'a, R: ?Sized> {
48    reader: &'a mut R,
49    offset: u64,
50    buf: &'a mut [u8],
51}
52
53impl<R: ?Sized + Unpin> Unpin for ReadAt<'_, R> {}
54
55impl<R: AsyncReadAt + ?Sized + Unpin> Future for ReadAt<'_, R> {
56    type Output = io::Result<usize>;
57
58    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
59        let this = &mut *self;
60        Pin::new(&mut *this.reader).poll_read_at(cx, this.offset, this.buf)
61    }
62}
63
64/// Future for the [`read_at_exact`](AsyncReadAtExt::read_at_exact) method.
65#[derive(Debug)]
66#[must_use = "futures do nothing unless you `.await` or poll them"]
67pub struct ReadAtExact<'a, R: ?Sized> {
68    reader: &'a mut R,
69    offset: u64,
70    buf: &'a mut [u8],
71}
72
73impl<R: ?Sized + Unpin> Unpin for ReadAtExact<'_, R> {}
74
75impl<R: AsyncReadAt + ?Sized + Unpin> Future for ReadAtExact<'_, R> {
76    type Output = io::Result<()>;
77
78    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79        let this = &mut *self;
80        loop {
81            let n = ready!(Pin::new(&mut *this.reader).poll_read_at(cx, this.offset, this.buf))?;
82            if n == this.buf.len() {
83                return Poll::Ready(Ok(()));
84            }
85            if n == 0 {
86                return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
87            }
88            match u64::try_from(n) {
89                Ok(n) => this.offset += n,
90                Err(e) => return Poll::Ready(Err(io::Error::other(e))),
91            };
92            this.buf = &mut std::mem::replace(&mut this.buf, &mut [])[n..];
93        }
94    }
95}
96
97/// Future for the [`read_to_end`](AsyncReadAtExt::read_to_end) method.
98#[derive(Debug)]
99#[must_use = "futures do nothing unless you `.await` or poll them"]
100pub struct ReadToEnd<'a, R: ?Sized> {
101    reader: &'a mut R,
102    buf: &'a mut Vec<u8>,
103}
104
105impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {}
106
107impl<R: AsyncReadAt + ?Sized + Unpin> Future for ReadToEnd<'_, R> {
108    type Output = io::Result<usize>;
109
110    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111        let this = &mut *self;
112        let mut g = Guard { len: this.buf.len(), buf: this.buf };
113        loop {
114            if g.len == g.buf.len() {
115                g.buf.reserve(32);
116                let capacity = g.buf.capacity();
117                g.buf.resize(capacity, 0);
118            }
119
120            let offset = match g.len.try_into() {
121                Ok(len) => len,
122                Err(e) => return Poll::Ready(Err(io::Error::other(e))),
123            };
124            let buf = &mut g.buf[g.len..];
125            match ready!(Pin::new(&mut *this.reader).poll_read_at(cx, offset, buf)) {
126                Ok(0) => {
127                    return Poll::Ready(Ok(g.len));
128                }
129                Ok(n) => {
130                    g.len += n;
131                }
132                Err(e) => {
133                    return Poll::Ready(Err(e));
134                }
135            }
136        }
137    }
138}
139
140struct Guard<'a> {
141    buf: &'a mut Vec<u8>,
142    len: usize,
143}
144
145impl Drop for Guard<'_> {
146    fn drop(&mut self) {
147        self.buf.truncate(self.len);
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use crate::file::{self, AsyncFile};
155    use fidl::endpoints;
156    use futures::future::{self};
157    use futures::StreamExt as _;
158    use tempfile::TempDir;
159    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
160
161    async fn read_to_end_file_with_expected_contents(expected_contents: &[u8]) {
162        let dir = TempDir::new().unwrap();
163        let path =
164            dir.path().join("read_to_end_with_expected_contents").to_str().unwrap().to_owned();
165        let () = file::write_in_namespace(&path, expected_contents).await.unwrap();
166        let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
167
168        let mut reader = AsyncFile::from_proxy(file);
169        let mut actual_contents = vec![];
170        reader.read_to_end(&mut actual_contents).await.unwrap();
171
172        assert_eq!(actual_contents, expected_contents);
173    }
174
175    #[fasync::run_singlethreaded(test)]
176    async fn read_to_end_empty() {
177        read_to_end_file_with_expected_contents(&[]).await;
178    }
179
180    #[fasync::run_singlethreaded(test)]
181    async fn read_to_end_large() {
182        let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
183        read_to_end_file_with_expected_contents(&expected_contents[..]).await;
184    }
185
186    #[fasync::run_singlethreaded(test)]
187    async fn read_at_different_offsets() {
188        let file_contents = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
189        let dir = TempDir::new().unwrap();
190        let path = dir.path().join("read_at_different_offsets").to_str().unwrap().to_owned();
191        let () = file::write_in_namespace(&path, &file_contents).await.unwrap();
192        let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
193
194        let mut reader = AsyncFile::from_proxy(file);
195        for &(offset, length) in &[(0, 100), (100, 200), (50, 10), (500, 300)] {
196            let mut buffer = vec![0; length];
197            reader.read_at(offset as u64, &mut buffer).await.unwrap();
198
199            assert_eq!(buffer, &file_contents[offset..offset + length]);
200        }
201    }
202
203    #[fasync::run_singlethreaded(test)]
204    async fn read_at_exact() {
205        let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
206
207        let mut reader = AsyncFile::from_proxy(proxy);
208
209        let contents = (0..50).collect::<Vec<_>>();
210
211        let read_at_exact = async {
212            let mut buffer = vec![0; 50];
213            reader.read_at_exact(20, &mut buffer[..]).await.unwrap();
214            assert_eq!(buffer, contents);
215        };
216        let handle_requests = async {
217            {
218                let (count, offset, responder) =
219                    stream.next().await.unwrap().unwrap().into_read_at().unwrap();
220                assert_eq!(count, 50);
221                assert_eq!(offset, 20);
222                responder.send(Ok(&contents[..20])).unwrap();
223            }
224            {
225                let (count, offset, responder) =
226                    stream.next().await.unwrap().unwrap().into_read_at().unwrap();
227                assert_eq!(count, 30);
228                assert_eq!(offset, 40);
229                responder.send(Ok(&contents[20..])).unwrap();
230            }
231        };
232        future::join(read_at_exact, handle_requests).await;
233    }
234}