1use fidl::client::QueryResponseFut;
6use futures::future::Future;
7use futures::io::{AsyncRead, AsyncSeek, SeekFrom};
8use futures::FutureExt;
9use std::cmp::min;
10use std::convert::TryInto as _;
11use std::io;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use {fidl_fuchsia_io as fio, zx_status};
15
16pub trait AsyncReadAt {
19 fn poll_read_at(
25 self: Pin<&mut Self>,
26 cx: &mut Context<'_>,
27 offset: u64,
28 buf: &mut [u8],
29 ) -> Poll<io::Result<usize>>;
30}
31
32pub trait AsyncGetSize {
34 fn poll_get_size(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
36}
37
38pub trait AsyncGetSizeExt: AsyncGetSize {
40 fn get_size<'a>(&'a mut self) -> GetSize<'a, Self>
42 where
43 Self: Unpin,
44 {
45 GetSize { size_getter: self }
46 }
47}
48
49impl<T: AsyncGetSize + ?Sized> AsyncGetSizeExt for T {}
50
51#[derive(Debug)]
53#[must_use = "futures do nothing unless you `.await` or poll them"]
54pub struct GetSize<'a, R: ?Sized> {
55 size_getter: &'a mut R,
56}
57
58impl<R: ?Sized + Unpin> Unpin for GetSize<'_, R> {}
59
60impl<R: AsyncGetSize + ?Sized + Unpin> Future for GetSize<'_, R> {
61 type Output = io::Result<u64>;
62
63 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
64 let this = &mut *self;
65 Pin::new(&mut *this.size_getter).poll_get_size(cx)
66 }
67}
68
69#[derive(Debug)]
76pub struct AsyncFile {
77 file: fio::FileProxy,
78 read_at_state: ReadAtState,
79 get_attributes_fut: Option<
80 QueryResponseFut<Result<(fio::MutableNodeAttributes, fio::ImmutableNodeAttributes), i32>>,
81 >,
82}
83
84#[derive(Debug)]
85enum ReadAtState {
86 Empty,
87 Forwarding {
88 fut: QueryResponseFut<Result<Vec<u8>, i32>>,
89 file_offset: u64,
90 zero_byte_request: bool,
91 },
92 Bytes {
93 bytes: Vec<u8>,
94 file_offset: u64,
95 },
96}
97
98impl AsyncFile {
99 pub fn from_proxy(file: fio::FileProxy) -> Self {
100 Self { file, read_at_state: ReadAtState::Empty, get_attributes_fut: None }
101 }
102}
103
104impl AsyncReadAt for AsyncFile {
105 fn poll_read_at(
106 mut self: Pin<&mut Self>,
107 cx: &mut Context<'_>,
108 offset: u64,
109 buf: &mut [u8],
110 ) -> Poll<std::io::Result<usize>> {
111 loop {
112 match self.read_at_state {
113 ReadAtState::Empty => {
114 let len = if let Ok(len) = buf.len().try_into() {
115 min(len, fio::MAX_BUF)
116 } else {
117 fio::MAX_BUF
118 };
119 self.read_at_state = ReadAtState::Forwarding {
120 fut: self.file.read_at(len, offset),
121 file_offset: offset,
122 zero_byte_request: len == 0,
123 };
124 }
125 ReadAtState::Forwarding { ref mut fut, file_offset, zero_byte_request } => {
126 match futures::ready!(Pin::new(fut).poll(cx)) {
127 Ok(result) => {
128 match result {
129 Err(s) => {
130 self.read_at_state = ReadAtState::Empty;
131 return Poll::Ready(Err(
132 zx_status::Status::from_raw(s).into_io_error()
133 ));
134 }
135 Ok(bytes) => {
136 if zero_byte_request && buf.len() != 0 {
145 self.read_at_state = ReadAtState::Empty;
146 } else {
147 self.read_at_state =
148 ReadAtState::Bytes { bytes, file_offset };
149 }
150 }
151 }
152 }
153 Err(e) => {
154 self.read_at_state = ReadAtState::Empty;
155 return Poll::Ready(Err(std::io::Error::other(e)));
156 }
157 }
158 }
159 ReadAtState::Bytes { ref bytes, file_offset } => {
160 if offset < file_offset {
161 self.read_at_state = ReadAtState::Empty;
162 continue;
163 }
164 let bytes_offset = match (offset - file_offset).try_into() {
165 Ok(offset) => offset,
166 Err(_) => {
167 self.read_at_state = ReadAtState::Empty;
168 continue;
169 }
170 };
171 if bytes_offset != 0 && bytes_offset >= bytes.len() {
172 self.read_at_state = ReadAtState::Empty;
173 continue;
174 }
175 let n = min(buf.len(), bytes.len() - bytes_offset);
176 let () = buf[..n].copy_from_slice(&bytes[bytes_offset..bytes_offset + n]);
177 return Poll::Ready(Ok(n));
178 }
179 }
180 }
181 }
182}
183
184impl AsyncGetSize for AsyncFile {
185 fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
186 if self.get_attributes_fut.is_none() {
187 self.get_attributes_fut =
188 Some(self.file.get_attributes(fio::NodeAttributesQuery::CONTENT_SIZE));
189 }
190 let fut = self.get_attributes_fut.as_mut().unwrap();
191 let get_attributes_fut_result = futures::ready!(fut.poll_unpin(cx));
192 self.get_attributes_fut = None;
193 match get_attributes_fut_result {
194 Ok(get_attributes_response) => match get_attributes_response {
195 Ok((_mutable_attr, immutable_attr)) => {
196 if let Some(content_size) = immutable_attr.content_size {
197 return Poll::Ready(Ok(content_size));
198 }
199 return Poll::Ready(Err(zx_status::Status::NOT_SUPPORTED.into_io_error()));
200 }
201 Err(status) => {
202 return Poll::Ready(Err(zx_status::Status::from_raw(status).into_io_error()));
203 }
204 },
205 Err(e) => {
206 return Poll::Ready(Err(std::io::Error::other(e)));
207 }
208 }
209 }
210}
211
212#[derive(Debug)]
214pub struct Adapter<T> {
215 inner: T,
216}
217
218impl<T> Adapter<T> {
219 pub fn new(inner: T) -> Adapter<T> {
220 Self { inner }
221 }
222}
223
224impl<T: AsyncRead + AsyncSeek + Unpin> AsyncReadAt for Adapter<T> {
225 fn poll_read_at(
226 mut self: Pin<&mut Self>,
227 cx: &mut Context<'_>,
228 offset: u64,
229 buf: &mut [u8],
230 ) -> Poll<std::io::Result<usize>> {
231 futures::ready!(Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::Start(offset)))?;
232 Pin::new(&mut self.inner).poll_read(cx, buf)
233 }
234}
235
236impl<T: AsyncSeek + Unpin> AsyncGetSize for Adapter<T> {
237 fn poll_get_size(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
238 Pin::new(&mut self.inner).poll_seek(cx, SeekFrom::End(0))
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245 use crate::file::{self, AsyncReadAtExt};
246 use assert_matches::assert_matches;
247 use fidl::endpoints;
248 use fuchsia_async as fasync;
249 use futures::future::{self, poll_fn};
250 use futures::{StreamExt as _, TryStreamExt as _};
251 use std::convert::TryFrom as _;
252 use std::io::Write;
253 use tempfile::{NamedTempFile, TempDir};
254
255 async fn poll_read_at_with_specific_buf_size(
256 poll_read_size: u64,
257 expected_file_read_size: u64,
258 ) {
259 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
260
261 let mut reader = AsyncFile::from_proxy(proxy);
262
263 let () = poll_fn(|cx| {
264 let mut buf = vec![0u8; poll_read_size.try_into().unwrap()];
265 assert_matches!(
266 Pin::new(&mut reader).poll_read_at(cx, 0, buf.as_mut_slice()),
267 Poll::Pending
268 );
269 Poll::Ready(())
270 })
271 .await;
272
273 match stream.next().await.unwrap().unwrap() {
274 fio::FileRequest::ReadAt { count, .. } => {
275 assert_eq!(count, expected_file_read_size);
276 }
277 req => panic!("unhandled request {:?}", req),
278 }
279 }
280
281 #[fasync::run_singlethreaded(test)]
282 async fn poll_read_at_empty_buf() {
283 poll_read_at_with_specific_buf_size(0, 0).await;
284 }
285
286 #[fasync::run_singlethreaded(test)]
287 async fn poll_read_at_caps_buf_size() {
288 poll_read_at_with_specific_buf_size(fio::MAX_BUF * 2, fio::MAX_BUF).await;
289 }
290
291 #[fasync::run_singlethreaded(test)]
292 async fn poll_read_at_pending_saves_future() {
293 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
294
295 let mut reader = AsyncFile::from_proxy(proxy);
296
297 let () = poll_fn(|cx| {
302 assert_matches!(
303 Pin::new(&mut reader).poll_read_at(cx, 2, &mut [0u8; 1]),
304 Poll::Pending
305 );
306 Poll::Ready(())
307 })
308 .await;
309
310 let poll_read_at = async move {
313 let mut buf = [0u8; 1];
314 assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), buf.len());
315 assert_eq!(&buf, &[1]);
316 };
317
318 let mut file_read_requests = 0u8;
319 let handle_file_stream = async {
320 while let Some(req) = stream.try_next().await.unwrap() {
321 file_read_requests += 1;
322 match req {
323 fio::FileRequest::ReadAt { count, offset, responder } => {
324 assert_eq!(count, 1);
325 assert_eq!(offset, 2);
326 responder.send(Ok(&[file_read_requests])).unwrap();
327 }
328 req => panic!("unhandled request {:?}", req),
329 }
330 }
331 };
332
333 let ((), ()) = future::join(poll_read_at, handle_file_stream).await;
334 assert_eq!(file_read_requests, 1);
335 }
336
337 #[fasync::run_singlethreaded(test)]
338 async fn poll_read_at_with_smaller_buf_after_pending() {
339 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
340
341 let mut reader = AsyncFile::from_proxy(proxy);
342
343 let () = poll_fn(|cx| {
347 assert_matches!(
348 Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 3]),
349 Poll::Pending
350 );
351 Poll::Ready(())
352 })
353 .await;
354
355 let () = async {
357 match stream.next().await.unwrap().unwrap() {
358 fio::FileRequest::ReadAt { count, offset, responder } => {
359 assert_eq!(count, 3);
360 assert_eq!(offset, 0);
361 responder.send(Ok(b"012")).unwrap();
362 }
363 req => panic!("unhandled request {:?}", req),
364 }
365 }
366 .await;
367
368 let mut buf = [0u8; 1];
372 assert_eq!(reader.read_at(0, &mut buf).await.unwrap(), buf.len());
373 assert_eq!(&buf, b"0");
374
375 let mut buf = [0u8; 1];
378 assert_eq!(reader.read_at(1, &mut buf).await.unwrap(), buf.len());
379 assert_eq!(&buf, b"1");
380
381 let mut buf = [0u8; 2];
384 assert_eq!(reader.read_at(2, &mut buf).await.unwrap(), 1);
385 assert_eq!(&buf[..1], b"2");
386
387 let mut buf = [0u8; 4];
390 let poll_read_at = reader.read_at(3, &mut buf);
391
392 let handle_second_file_request = async {
393 match stream.next().await.unwrap().unwrap() {
394 fio::FileRequest::ReadAt { count, offset, responder } => {
395 assert_eq!(count, 4);
396 assert_eq!(offset, 3);
397 responder.send(Ok(b"3456")).unwrap();
398 }
399 req => panic!("unhandled request {:?}", req),
400 }
401 };
402
403 let (read_res, ()) = future::join(poll_read_at, handle_second_file_request).await;
404 assert_eq!(read_res.unwrap(), 4);
405 assert_eq!(&buf, b"3456");
406 }
407
408 #[fasync::run_singlethreaded(test)]
409 async fn transition_to_empty_on_fidl_error() {
410 let (proxy, _) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
411
412 let mut reader = AsyncFile::from_proxy(proxy);
413
414 let () = poll_fn(|cx| {
416 assert_matches!(
417 Pin::new(&mut reader).poll_read_at(cx, 0, &mut [0u8; 1]),
418 Poll::Ready(Err(_))
419 );
420 Poll::Ready(())
421 })
422 .await;
423
424 assert_matches!(reader.read_at_state, ReadAtState::Empty);
429 }
430
431 #[fasync::run_singlethreaded(test)]
432 async fn recover_from_file_read_error() {
433 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
434
435 let mut reader = AsyncFile::from_proxy(proxy);
436
437 let mut buf = [0u8; 1];
439 let poll_read_at = reader.read_at(0, &mut buf);
440
441 let failing_file_response = async {
442 match stream.next().await.unwrap().unwrap() {
443 fio::FileRequest::ReadAt { count, offset, responder } => {
444 assert_eq!(count, 1);
445 assert_eq!(offset, 0);
446 responder.send(Err(zx_status::Status::NO_MEMORY.into_raw())).unwrap();
447 }
448 req => panic!("unhandled request {:?}", req),
449 }
450 };
451
452 let (read_res, ()) = future::join(poll_read_at, failing_file_response).await;
453 assert_matches!(read_res, Err(_));
454
455 let mut buf = [0u8; 1];
458 let poll_read_at = reader.read_at(0, &mut buf);
459
460 let succeeding_file_response = async {
461 match stream.next().await.unwrap().unwrap() {
462 fio::FileRequest::ReadAt { count, offset, responder } => {
463 assert_eq!(count, 1);
464 assert_eq!(offset, 0);
465 responder.send(Ok(b"0")).unwrap();
466 }
467 req => panic!("unhandled request {:?}", req),
468 }
469 };
470
471 let (read_res, ()) = future::join(poll_read_at, succeeding_file_response).await;
472 assert_eq!(read_res.unwrap(), 1);
473 assert_eq!(&buf, b"0");
474 }
475
476 #[fasync::run_singlethreaded(test)]
477 async fn poll_read_at_zero_then_read_nonzero() {
478 let (proxy, mut stream) = endpoints::create_proxy_and_stream::<fio::FileMarker>();
479
480 let mut reader = AsyncFile::from_proxy(proxy);
481
482 let () = poll_fn(|cx| {
484 assert_matches!(Pin::new(&mut reader).poll_read_at(cx, 0, &mut []), Poll::Pending);
485 Poll::Ready(())
486 })
487 .await;
488
489 match stream.next().await.unwrap().unwrap() {
491 fio::FileRequest::ReadAt { count, offset, responder } => {
492 assert_eq!(count, 0);
493 assert_eq!(offset, 0);
494 responder.send(Ok(&[])).unwrap();
495 }
496 req => panic!("unhandled request {:?}", req),
497 }
498
499 let mut buf = vec![0u8; 1];
501 let poll_read_at = reader.read_at(0, &mut buf);
502
503 let handle_file_request = async {
508 match stream.next().await.unwrap().unwrap() {
509 fio::FileRequest::ReadAt { count, offset, responder } => {
510 assert_eq!(count, 1);
511 assert_eq!(offset, 0);
512 responder.send(Ok(&[1])).unwrap();
513 }
514 req => panic!("unhandled request {:?}", req),
515 }
516 };
517
518 let (poll_read, ()) = future::join(poll_read_at, handle_file_request).await;
519
520 assert_eq!(poll_read.unwrap(), 1);
523 assert_eq!(&buf[..], &[1]);
524 }
525
526 #[fasync::run_singlethreaded(test)]
527 async fn different_poll_read_at_and_file_sizes() {
528 for first_poll_read_len in 0..5 {
529 for file_size in 0..5 {
530 for second_poll_offset in 0..file_size {
531 for second_poll_read_len in 0..5 {
532 let (proxy, mut stream) =
533 endpoints::create_proxy_and_stream::<fio::FileMarker>();
534
535 let mut reader = AsyncFile::from_proxy(proxy);
536
537 let () = poll_fn(|cx| {
539 let mut buf = vec![0u8; first_poll_read_len];
540 assert_matches!(
541 Pin::new(&mut reader).poll_read_at(cx, 0, &mut buf),
542 Poll::Pending
543 );
544 Poll::Ready(())
545 })
546 .await;
547
548 match stream.next().await.unwrap().unwrap() {
551 fio::FileRequest::ReadAt { count, offset, responder } => {
552 assert_eq!(count, u64::try_from(first_poll_read_len).unwrap());
553 assert_eq!(offset, 0);
554 let resp = vec![7u8; min(file_size, first_poll_read_len)];
555 responder.send(Ok(&resp)).unwrap();
556 }
557 req => panic!("unhandled request {:?}", req),
558 }
559
560 let mut buf = vec![0u8; second_poll_read_len];
565 let poll_read_at = reader.read_at(second_poll_offset as u64, &mut buf);
566
567 let second_request = first_poll_read_len == 0 && second_poll_read_len != 0
568 || second_poll_offset != 0 && second_poll_offset >= first_poll_read_len;
569 let handle_conditional_file_request = async {
570 if second_request {
571 match stream.next().await.unwrap().unwrap() {
572 fio::FileRequest::ReadAt { count, offset, responder } => {
573 assert_eq!(
574 count,
575 u64::try_from(second_poll_read_len).unwrap()
576 );
577 assert_eq!(
578 offset,
579 u64::try_from(second_poll_offset).unwrap()
580 );
581 let resp = vec![
582 7u8;
583 min(
584 file_size - second_poll_offset,
585 second_poll_read_len
586 )
587 ];
588 responder.send(Ok(&resp)).unwrap();
589 }
590 req => panic!("unhandled request {:?}", req),
591 }
592 }
593 };
594
595 let (read_res, ()) =
596 future::join(poll_read_at, handle_conditional_file_request).await;
597
598 let expected_len = if second_request {
599 min(file_size - second_poll_offset, second_poll_read_len)
600 } else {
601 min(
602 min(file_size, first_poll_read_len) - second_poll_offset,
603 second_poll_read_len,
604 )
605 };
606 let expected = vec![7u8; expected_len];
607 assert_eq!(read_res.unwrap(), expected_len);
608 assert_eq!(&buf[..expected_len], &expected[..]);
609 }
610 }
611 }
612 }
613 }
614
615 async fn get_size_file_with_contents(contents: &[u8]) {
616 let dir = TempDir::new().unwrap();
617 let path = dir.path().join("get_size_file_with_contents").to_str().unwrap().to_owned();
618 let () = file::write_in_namespace(&path, contents).await.unwrap();
619 let file = file::open_in_namespace(&path, fio::PERM_READABLE).unwrap();
620
621 let mut reader = AsyncFile::from_proxy(file);
622
623 assert_eq!(reader.get_size().await.unwrap(), contents.len() as u64);
624 }
625
626 #[fasync::run_singlethreaded(test)]
627 async fn get_size_empty() {
628 get_size_file_with_contents(&[]).await;
629 }
630
631 #[fasync::run_singlethreaded(test)]
632 async fn get_size_large() {
633 let expected_contents = vec![7u8; (fio::MAX_BUF * 3).try_into().unwrap()];
634 get_size_file_with_contents(&expected_contents[..]).await;
635 }
636
637 #[fasync::run_singlethreaded(test)]
638 async fn get_size_changing_size() {
639 let (mut file, path) = NamedTempFile::new().unwrap().into_parts();
640 let proxy = file::open_in_namespace(path.to_str().unwrap(), fio::PERM_READABLE).unwrap();
641
642 let mut reader = AsyncFile::from_proxy(proxy);
643
644 assert_eq!(reader.get_size().await.unwrap(), 0);
645 file.write_all(&[1; 3][..]).unwrap();
646 assert_eq!(reader.get_size().await.unwrap(), 3);
647 file.write_all(&[2; 5][..]).unwrap();
648 assert_eq!(reader.get_size().await.unwrap(), 8);
649 }
650
651 #[fasync::run_singlethreaded(test)]
652 async fn adapter_for_cursor() {
653 let data = (0..1000).map(|i| (i % 256) as u8).collect::<Vec<_>>();
654 let cursor = futures::io::Cursor::new(data.clone());
655 let mut adapter = Adapter::new(cursor);
656
657 assert_eq!(adapter.get_size().await.unwrap(), 1000);
658
659 let mut buffer = vec![];
660 adapter.read_to_end(&mut buffer).await.unwrap();
661 assert_eq!(buffer, data);
662
663 let mut buffer = vec![0; 100];
664 adapter.read_at_exact(333, &mut buffer).await.unwrap();
665 assert_eq!(buffer, &data[333..433]);
666 }
667}