futures_util/io/
buf_reader.rs1use super::DEFAULT_BUF_SIZE;
2use futures_core::future::Future;
3use futures_core::ready;
4use futures_core::task::{Context, Poll};
5use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSliceMut, SeekFrom};
6use pin_project_lite::pin_project;
7use std::io::{self, Read};
8use std::pin::Pin;
9use std::{cmp, fmt};
10
11pin_project! {
12 pub struct BufReader<R> {
32 #[pin]
33 inner: R,
34 buffer: Box<[u8]>,
35 pos: usize,
36 cap: usize,
37 }
38}
39
40impl<R: AsyncRead> BufReader<R> {
41 pub fn new(inner: R) -> Self {
44 Self::with_capacity(DEFAULT_BUF_SIZE, inner)
45 }
46
47 pub fn with_capacity(capacity: usize, inner: R) -> Self {
49 unsafe {
50 let mut buffer = Vec::with_capacity(capacity);
51 buffer.set_len(capacity);
52 super::initialize(&inner, &mut buffer);
53 Self { inner, buffer: buffer.into_boxed_slice(), pos: 0, cap: 0 }
54 }
55 }
56
57 delegate_access_inner!(inner, R, ());
58
59 pub fn buffer(&self) -> &[u8] {
63 &self.buffer[self.pos..self.cap]
64 }
65
66 #[inline]
68 fn discard_buffer(self: Pin<&mut Self>) {
69 let this = self.project();
70 *this.pos = 0;
71 *this.cap = 0;
72 }
73}
74
75impl<R: AsyncRead + AsyncSeek> BufReader<R> {
76 pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> {
81 SeeKRelative { inner: self, offset, first: true }
82 }
83
84 pub fn poll_seek_relative(
89 self: Pin<&mut Self>,
90 cx: &mut Context<'_>,
91 offset: i64,
92 ) -> Poll<io::Result<()>> {
93 let pos = self.pos as u64;
94 if offset < 0 {
95 if let Some(new_pos) = pos.checked_sub((-offset) as u64) {
96 *self.project().pos = new_pos as usize;
97 return Poll::Ready(Ok(()));
98 }
99 } else if let Some(new_pos) = pos.checked_add(offset as u64) {
100 if new_pos <= self.cap as u64 {
101 *self.project().pos = new_pos as usize;
102 return Poll::Ready(Ok(()));
103 }
104 }
105 self.poll_seek(cx, SeekFrom::Current(offset)).map(|res| res.map(|_| ()))
106 }
107}
108
109impl<R: AsyncRead> AsyncRead for BufReader<R> {
110 fn poll_read(
111 mut self: Pin<&mut Self>,
112 cx: &mut Context<'_>,
113 buf: &mut [u8],
114 ) -> Poll<io::Result<usize>> {
115 if self.pos == self.cap && buf.len() >= self.buffer.len() {
119 let res = ready!(self.as_mut().project().inner.poll_read(cx, buf));
120 self.discard_buffer();
121 return Poll::Ready(res);
122 }
123 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
124 let nread = rem.read(buf)?;
125 self.consume(nread);
126 Poll::Ready(Ok(nread))
127 }
128
129 fn poll_read_vectored(
130 mut self: Pin<&mut Self>,
131 cx: &mut Context<'_>,
132 bufs: &mut [IoSliceMut<'_>],
133 ) -> Poll<io::Result<usize>> {
134 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
135 if self.pos == self.cap && total_len >= self.buffer.len() {
136 let res = ready!(self.as_mut().project().inner.poll_read_vectored(cx, bufs));
137 self.discard_buffer();
138 return Poll::Ready(res);
139 }
140 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
141 let nread = rem.read_vectored(bufs)?;
142 self.consume(nread);
143 Poll::Ready(Ok(nread))
144 }
145}
146
147impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
148 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
149 let this = self.project();
150
151 if *this.pos >= *this.cap {
156 debug_assert!(*this.pos == *this.cap);
157 *this.cap = ready!(this.inner.poll_read(cx, this.buffer))?;
158 *this.pos = 0;
159 }
160 Poll::Ready(Ok(&this.buffer[*this.pos..*this.cap]))
161 }
162
163 fn consume(self: Pin<&mut Self>, amt: usize) {
164 *self.project().pos = cmp::min(self.pos + amt, self.cap);
165 }
166}
167
168impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
169 delegate_async_write!(inner);
170}
171
172impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
173 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174 f.debug_struct("BufReader")
175 .field("reader", &self.inner)
176 .field("buffer", &format_args!("{}/{}", self.cap - self.pos, self.buffer.len()))
177 .finish()
178 }
179}
180
181impl<R: AsyncRead + AsyncSeek> AsyncSeek for BufReader<R> {
182 fn poll_seek(
205 mut self: Pin<&mut Self>,
206 cx: &mut Context<'_>,
207 pos: SeekFrom,
208 ) -> Poll<io::Result<u64>> {
209 let result: u64;
210 if let SeekFrom::Current(n) = pos {
211 let remainder = (self.cap - self.pos) as i64;
212 if let Some(offset) = n.checked_sub(remainder) {
218 result =
219 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(offset)))?;
220 } else {
221 ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(-remainder)))?;
223 self.as_mut().discard_buffer();
224 result = ready!(self.as_mut().project().inner.poll_seek(cx, SeekFrom::Current(n)))?;
225 }
226 } else {
227 result = ready!(self.as_mut().project().inner.poll_seek(cx, pos))?;
229 }
230 self.discard_buffer();
231 Poll::Ready(Ok(result))
232 }
233}
234
235#[derive(Debug)]
237#[must_use = "futures do nothing unless polled"]
238pub struct SeeKRelative<'a, R> {
239 inner: Pin<&'a mut BufReader<R>>,
240 offset: i64,
241 first: bool,
242}
243
244impl<R> Future for SeeKRelative<'_, R>
245where
246 R: AsyncRead + AsyncSeek,
247{
248 type Output = io::Result<()>;
249
250 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
251 let offset = self.offset;
252 if self.first {
253 self.first = false;
254 self.inner.as_mut().poll_seek_relative(cx, offset)
255 } else {
256 self.inner
257 .as_mut()
258 .as_mut()
259 .poll_seek(cx, SeekFrom::Current(offset))
260 .map(|res| res.map(|_| ()))
261 }
262 }
263}