futures_util/
lib.rs

1//! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
2//! and the `AsyncRead` and `AsyncWrite` traits.
3
4#![cfg_attr(feature = "write-all-vectored", feature(io_slice_advance))]
5#![cfg_attr(not(feature = "std"), no_std)]
6#![warn(
7    missing_debug_implementations,
8    missing_docs,
9    rust_2018_idioms,
10    single_use_lifetimes,
11    unreachable_pub
12)]
13#![doc(test(
14    no_crate_inject,
15    attr(
16        deny(warnings, rust_2018_idioms, single_use_lifetimes),
17        allow(dead_code, unused_assignments, unused_variables)
18    )
19))]
20#![cfg_attr(docsrs, feature(doc_cfg))]
21
22#[cfg(all(feature = "bilock", not(feature = "unstable")))]
23compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
24
25#[cfg(feature = "alloc")]
26extern crate alloc;
27
28// Macro re-exports
29pub use futures_core::ready;
30pub use pin_utils::pin_mut;
31
32#[cfg(feature = "async-await")]
33#[macro_use]
34mod async_await;
35#[cfg(feature = "async-await")]
36#[doc(hidden)]
37pub use self::async_await::*;
38
39// Not public API.
40#[cfg(feature = "async-await")]
41#[doc(hidden)]
42pub mod __private {
43    pub use crate::*;
44    pub use core::{
45        option::Option::{self, None, Some},
46        pin::Pin,
47        result::Result::{Err, Ok},
48    };
49
50    pub mod async_await {
51        pub use crate::async_await::*;
52    }
53}
54
55#[cfg(feature = "sink")]
56macro_rules! delegate_sink {
57    ($field:ident, $item:ty) => {
58        fn poll_ready(
59            self: core::pin::Pin<&mut Self>,
60            cx: &mut core::task::Context<'_>,
61        ) -> core::task::Poll<Result<(), Self::Error>> {
62            self.project().$field.poll_ready(cx)
63        }
64
65        fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
66            self.project().$field.start_send(item)
67        }
68
69        fn poll_flush(
70            self: core::pin::Pin<&mut Self>,
71            cx: &mut core::task::Context<'_>,
72        ) -> core::task::Poll<Result<(), Self::Error>> {
73            self.project().$field.poll_flush(cx)
74        }
75
76        fn poll_close(
77            self: core::pin::Pin<&mut Self>,
78            cx: &mut core::task::Context<'_>,
79        ) -> core::task::Poll<Result<(), Self::Error>> {
80            self.project().$field.poll_close(cx)
81        }
82    };
83}
84
85macro_rules! delegate_future {
86    ($field:ident) => {
87        fn poll(
88            self: core::pin::Pin<&mut Self>,
89            cx: &mut core::task::Context<'_>,
90        ) -> core::task::Poll<Self::Output> {
91            self.project().$field.poll(cx)
92        }
93    };
94}
95
96macro_rules! delegate_stream {
97    ($field:ident) => {
98        fn poll_next(
99            self: core::pin::Pin<&mut Self>,
100            cx: &mut core::task::Context<'_>,
101        ) -> core::task::Poll<Option<Self::Item>> {
102            self.project().$field.poll_next(cx)
103        }
104        fn size_hint(&self) -> (usize, Option<usize>) {
105            self.$field.size_hint()
106        }
107    };
108}
109
110#[cfg(feature = "io")]
111#[cfg(feature = "std")]
112macro_rules! delegate_async_write {
113    ($field:ident) => {
114        fn poll_write(
115            self: core::pin::Pin<&mut Self>,
116            cx: &mut core::task::Context<'_>,
117            buf: &[u8],
118        ) -> core::task::Poll<std::io::Result<usize>> {
119            self.project().$field.poll_write(cx, buf)
120        }
121        fn poll_write_vectored(
122            self: core::pin::Pin<&mut Self>,
123            cx: &mut core::task::Context<'_>,
124            bufs: &[std::io::IoSlice<'_>],
125        ) -> core::task::Poll<std::io::Result<usize>> {
126            self.project().$field.poll_write_vectored(cx, bufs)
127        }
128        fn poll_flush(
129            self: core::pin::Pin<&mut Self>,
130            cx: &mut core::task::Context<'_>,
131        ) -> core::task::Poll<std::io::Result<()>> {
132            self.project().$field.poll_flush(cx)
133        }
134        fn poll_close(
135            self: core::pin::Pin<&mut Self>,
136            cx: &mut core::task::Context<'_>,
137        ) -> core::task::Poll<std::io::Result<()>> {
138            self.project().$field.poll_close(cx)
139        }
140    };
141}
142
143#[cfg(feature = "io")]
144#[cfg(feature = "std")]
145macro_rules! delegate_async_read {
146    ($field:ident) => {
147        fn poll_read(
148            self: core::pin::Pin<&mut Self>,
149            cx: &mut core::task::Context<'_>,
150            buf: &mut [u8],
151        ) -> core::task::Poll<std::io::Result<usize>> {
152            self.project().$field.poll_read(cx, buf)
153        }
154
155        fn poll_read_vectored(
156            self: core::pin::Pin<&mut Self>,
157            cx: &mut core::task::Context<'_>,
158            bufs: &mut [std::io::IoSliceMut<'_>],
159        ) -> core::task::Poll<std::io::Result<usize>> {
160            self.project().$field.poll_read_vectored(cx, bufs)
161        }
162    };
163}
164
165#[cfg(feature = "io")]
166#[cfg(feature = "std")]
167macro_rules! delegate_async_buf_read {
168    ($field:ident) => {
169        fn poll_fill_buf(
170            self: core::pin::Pin<&mut Self>,
171            cx: &mut core::task::Context<'_>,
172        ) -> core::task::Poll<std::io::Result<&[u8]>> {
173            self.project().$field.poll_fill_buf(cx)
174        }
175
176        fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
177            self.project().$field.consume(amt)
178        }
179    };
180}
181
182macro_rules! delegate_access_inner {
183    ($field:ident, $inner:ty, ($($ind:tt)*)) => {
184        /// Acquires a reference to the underlying sink or stream that this combinator is
185        /// pulling from.
186        pub fn get_ref(&self) -> &$inner {
187            (&self.$field) $($ind get_ref())*
188        }
189
190        /// Acquires a mutable reference to the underlying sink or stream that this
191        /// combinator is pulling from.
192        ///
193        /// Note that care must be taken to avoid tampering with the state of the
194        /// sink or stream which may otherwise confuse this combinator.
195        pub fn get_mut(&mut self) -> &mut $inner {
196            (&mut self.$field) $($ind get_mut())*
197        }
198
199        /// Acquires a pinned mutable reference to the underlying sink or stream that this
200        /// combinator is pulling from.
201        ///
202        /// Note that care must be taken to avoid tampering with the state of the
203        /// sink or stream which may otherwise confuse this combinator.
204        pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> {
205            self.project().$field $($ind get_pin_mut())*
206        }
207
208        /// Consumes this combinator, returning the underlying sink or stream.
209        ///
210        /// Note that this may discard intermediate state of this combinator, so
211        /// care should be taken to avoid losing resources when this is called.
212        pub fn into_inner(self) -> $inner {
213            self.$field $($ind into_inner())*
214        }
215    }
216}
217
218macro_rules! delegate_all {
219    (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
220        impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* {
221            type Output = <$t as futures_core::future::Future>::Output;
222
223            delegate_future!(inner);
224        }
225    };
226    (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
227        impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* {
228            fn is_terminated(&self) -> bool {
229                self.inner.is_terminated()
230            }
231        }
232    };
233    (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
234        impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* {
235            type Item = <$t as futures_core::stream::Stream>::Item;
236
237            delegate_stream!(inner);
238        }
239    };
240    (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
241        impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* {
242            fn is_terminated(&self) -> bool {
243                self.inner.is_terminated()
244            }
245        }
246    };
247    (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
248        #[cfg(feature = "sink")]
249        impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* {
250            type Error = <$t as futures_sink::Sink<_Item>>::Error;
251
252            delegate_sink!(inner, _Item);
253        }
254    };
255    (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
256        impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* {
257            fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
258                core::fmt::Debug::fmt(&self.inner, f)
259            }
260        }
261    };
262    (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
263        impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
264            delegate_access_inner!(inner, $inner, ($($ind)*));
265        }
266    };
267    (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
268        impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
269            pub(crate) fn new($($param: $paramt),*) -> Self {
270                Self { inner: $cons }
271            }
272        }
273    };
274    ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
275        pin_project_lite::pin_project! {
276            #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
277            $(#[$attr])*
278            pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t }
279        }
280
281        impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* {
282            $($($item)*)*
283        }
284
285        delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
286    };
287    ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
288        delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*);
289
290        delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
291    };
292}
293
294pub mod future;
295#[doc(no_inline)]
296pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt};
297
298pub mod stream;
299#[doc(no_inline)]
300pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt};
301
302#[cfg(feature = "sink")]
303#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
304pub mod sink;
305#[cfg(feature = "sink")]
306#[doc(no_inline)]
307pub use crate::sink::{Sink, SinkExt};
308
309pub mod task;
310
311pub mod never;
312
313#[cfg(feature = "compat")]
314#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
315pub mod compat;
316
317#[cfg(feature = "io")]
318#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
319#[cfg(feature = "std")]
320pub mod io;
321#[cfg(feature = "io")]
322#[cfg(feature = "std")]
323#[doc(no_inline)]
324pub use crate::io::{
325    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
326    AsyncWriteExt,
327};
328
329#[cfg(feature = "alloc")]
330pub mod lock;
331
332#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
333#[cfg(feature = "alloc")]
334mod abortable;
335
336mod fns;
337mod unfold_state;