fuchsia_fs/file/
async_read_at_ext.rs1use crate::file::AsyncReadAt;
6use futures::future::Future;
7use futures::ready;
8use std::convert::{TryFrom as _, TryInto as _};
9use std::io;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13pub trait AsyncReadAtExt: AsyncReadAt {
15 fn read_at<'a>(&'a mut self, offset: u64, buf: &'a mut [u8]) -> ReadAt<'a, Self>
18 where
19 Self: Unpin,
20 {
21 ReadAt { reader: self, offset, buf }
22 }
23
24 fn read_at_exact<'a>(&'a mut self, offset: u64, buf: &'a mut [u8]) -> ReadAtExact<'a, Self>
26 where
27 Self: Unpin,
28 {
29 ReadAtExact { reader: self, offset, buf }
30 }
31
32 fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
35 where
36 Self: Unpin,
37 {
38 ReadToEnd { reader: self, buf }
39 }
40}
41
42impl<T: AsyncReadAt + ?Sized> AsyncReadAtExt for T {}
43
44#[derive(Debug)]
46#[must_use = "futures do nothing unless you `.await` or poll them"]
47pub struct ReadAt<'a, R: ?Sized> {
48 reader: &'a mut R,
49 offset: u64,
50 buf: &'a mut [u8],
51}
52
53impl<R: ?Sized + Unpin> Unpin for ReadAt<'_, R> {}
54
55impl<R: AsyncReadAt + ?Sized + Unpin> Future for ReadAt<'_, R> {
56 type Output = io::Result<usize>;
57
58 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
59 let this = &mut *self;
60 Pin::new(&mut *this.reader).poll_read_at(cx, this.offset, this.buf)
61 }
62}
63
64#[derive(Debug)]
66#[must_use = "futures do nothing unless you `.await` or poll them"]
67pub struct ReadAtExact<'a, R: ?Sized> {
68 reader: &'a mut R,
69 offset: u64,
70 buf: &'a mut [u8],
71}
72
73impl<R: ?Sized + Unpin> Unpin for ReadAtExact<'_, R> {}
74
75impl<R: AsyncReadAt + ?Sized + Unpin> Future for ReadAtExact<'_, R> {
76 type Output = io::Result<()>;
77
78 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
79 let this = &mut *self;
80 loop {
81 let n = ready!(Pin::new(&mut *this.reader).poll_read_at(cx, this.offset, this.buf))?;
82 if n == this.buf.len() {
83 return Poll::Ready(Ok(()));
84 }
85 if n == 0 {
86 return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
87 }
88 match u64::try_from(n) {
89 Ok(n) => this.offset += n,
90 Err(e) => return Poll::Ready(Err(io::Error::other(e))),
91 };
92 this.buf = &mut std::mem::replace(&mut this.buf, &mut [])[n..];
93 }
94 }
95}
96
97#[derive(Debug)]
99#[must_use = "futures do nothing unless you `.await` or poll them"]
100pub struct ReadToEnd<'a, R: ?Sized> {
101 reader: &'a mut R,
102 buf: &'a mut Vec<u8>,
103}
104
105impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {}
106
107impl<R: AsyncReadAt + ?Sized + Unpin> Future for ReadToEnd<'_, R> {
108 type Output = io::Result<usize>;
109
110 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
111 let this = &mut *self;
112 let mut g = Guard { len: this.buf.len(), buf: this.buf };
113 loop {
114 if g.len == g.buf.len() {
115 g.buf.reserve(32);
116 let capacity = g.buf.capacity();
117 g.buf.resize(capacity, 0);
118 }
119
120 let offset = match g.len.try_into() {
121 Ok(len) => len,
122 Err(e) => return Poll::Ready(Err(io::Error::other(e))),
123 };
124 let buf = &mut g.buf[g.len..];
125 match ready!(Pin::new(&mut *this.reader).poll_read_at(cx, offset, buf)) {
126 Ok(0) => {
127 return Poll::Ready(Ok(g.len));
128 }
129 Ok(n) => {
130 g.len += n;
131 }
132 Err(e) => {
133 return Poll::Ready(Err(e));
134 }
135 }
136 }
137 }
138}
139
140struct Guard<'a> {
141 buf: &'a mut Vec<u8>,
142 len: usize,
143}
144
145impl Drop for Guard<'_> {
146 fn drop(&mut self) {
147 self.buf.truncate(self.len);
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::file::{self, AsyncFile};
155 use fidl::endpoints;
156 use futures::future::{self};
157 use futures::StreamExt as _;
158 use tempfile::TempDir;
159 use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
160
161 async fn read_to_end_file_with_expected_contents(expected_contents: &[u8]) {
162 let dir = TempDir::new().unwrap();
163 let path =
164 dir.path().join("read_to_end_with_expected_contents").to_str().unwrap().to_owned();
165 let () = file::write_in_namespace(&path, expected_contents).await.unwrap();
166 let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
167
168 let mut reader = AsyncFile::from_proxy(file);
169 let mut actual_contents = vec![];
170 reader.read_to_end(&mut actual_contents).await.unwrap();
171
172 assert_eq!(actual_contents, expected_contents);
173 }
174
175 #[fasync::run_singlethreaded(test)]
176 async fn read_to_end_empty() {
177 read_to_end_file_with_expected_contents(&[]).await;
178 }
179
180 #[fasync::run_singlethreaded(test)]
181 async fn read_to_end_large() {
182 let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
183 read_to_end_file_with_expected_contents(&expected_contents[..]).await;
184 }
185
186 #[fasync::run_singlethreaded(test)]
187 async fn read_at_different_offsets() {
188 let file_contents = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
189 let dir = TempDir::new().unwrap();
190 let path = dir.path().join("read_at_different_offsets").to_str().unwrap().to_owned();
191 let () = file::write_in_namespace(&path, &file_contents).await.unwrap();
192 let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
193
194 let mut reader = AsyncFile::from_proxy(file);
195 for &(offset, length) in &[(0, 100), (100, 200), (50, 10), (500, 300)] {
196 let mut buffer = vec![0; length];
197 reader.read_at(offset as u64, &mut buffer).await.unwrap();
198
199 assert_eq!(buffer, &file_contents[offset..offset + length]);
200 }
201 }
202
203 #[fasync::run_singlethreaded(test)]
204 async fn read_at_exact() {
205 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
206
207 let mut reader = AsyncFile::from_proxy(proxy);
208
209 let contents = (0..50).collect::<Vec<_>>();
210
211 let read_at_exact = async {
212 let mut buffer = vec![0; 50];
213 reader.read_at_exact(20, &mut buffer[..]).await.unwrap();
214 assert_eq!(buffer, contents);
215 };
216 let handle_requests = async {
217 {
218 let (count, offset, responder) =
219 stream.next().await.unwrap().unwrap().into_read_at().unwrap();
220 assert_eq!(count, 50);
221 assert_eq!(offset, 20);
222 responder.send(Ok(&contents[..20])).unwrap();
223 }
224 {
225 let (count, offset, responder) =
226 stream.next().await.unwrap().unwrap().into_read_at().unwrap();
227 assert_eq!(count, 30);
228 assert_eq!(offset, 40);
229 responder.send(Ok(&contents[20..])).unwrap();
230 }
231 };
232 future::join(read_at_exact, handle_requests).await;
233 }
234}