fuchsia_async/runtime/fuchsia/
task.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::scope::ScopeHandle;
6use crate::EHandle;
7use futures::prelude::*;
8use std::future::poll_fn;
9use std::marker::PhantomData;
10use std::mem::ManuallyDrop;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14/// A handle to a future that is owned and polled by the executor.
15///
16/// Once a task is created, the executor will poll it until done, even if the task handle itself is
17/// not polled.
18///
19/// NOTE: When a JoinHandle is dropped, its future will be detached.
20///
21/// Polling (or attempting to extract the value from) a task after the executor is dropped may
22/// trigger a panic.
23#[derive(Debug)]
24// LINT.IfChange
25pub struct JoinHandle<T> {
26    scope: ScopeHandle,
27    task_id: usize,
28    phantom: PhantomData<T>,
29}
30// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
31
32impl<T> Unpin for JoinHandle<T> {}
33
34impl<T> JoinHandle<T> {
35    pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
36        Self { scope, task_id, phantom: PhantomData }
37    }
38
39    /// Aborts a task and returns a future that resolves once the task is
40    /// aborted. The future can be ignored in which case the task will still be
41    /// aborted.
42    pub fn abort(mut self) -> impl Future<Output = Option<T>> {
43        // SAFETY: We spawned the task so the return type should be correct.
44        let result = unsafe { self.scope.abort_task(self.task_id) };
45        async move {
46            match result {
47                Some(output) => Some(output),
48                None => {
49                    // If we are dropped from here, we'll end up calling `abort_and_detach`.
50                    let result = std::future::poll_fn(|cx| {
51                        // SAFETY: We spawned the task so the return type should be correct.
52                        unsafe { self.scope.poll_aborted(self.task_id, cx) }
53                    })
54                    .await;
55                    self.task_id = 0;
56                    result
57                }
58            }
59        }
60    }
61}
62
63impl<T> Drop for JoinHandle<T> {
64    fn drop(&mut self) {
65        if self.task_id != 0 {
66            self.scope.detach(self.task_id);
67        }
68    }
69}
70
71impl<T: 'static> Future for JoinHandle<T> {
72    type Output = T;
73    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74        // SAFETY: We spawned the task so the return type should be correct.
75        let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
76        if result.is_ready() {
77            self.task_id = 0;
78        }
79        result
80    }
81}
82
83/// This is the same as a JoinHandle, except that the future will be aborted when the task is
84/// dropped.
85#[must_use]
86#[repr(transparent)]
87#[derive(Debug)]
88// LINT.IfChange
89pub struct Task<T>(JoinHandle<T>);
90// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
91
92impl<T> Task<T> {
93    /// Returns a `JoinHandle` which will have detach-on-drop semantics.
94    pub fn detach_on_drop(self) -> JoinHandle<T> {
95        let this = ManuallyDrop::new(self);
96        // SAFETY: We are bypassing our drop implementation.
97        unsafe { std::ptr::read(&this.0) }
98    }
99}
100
101impl Task<()> {
102    /// Detach this task so that it can run independently in the background.
103    ///
104    /// *Note*: This is usually not what you want. This API severs the control flow from the
105    /// caller. This can result in flaky tests and makes it impossible to return values
106    /// (including errors).
107    ///
108    /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
109    ///
110    /// You can also use other futures combinators such as:
111    ///
112    /// * [`futures::future::join`]
113    /// * [`futures::future::select`]
114    /// * [`futures::select`]
115    ///
116    /// or their error-aware variants
117    ///
118    /// * [`futures::future::try_join`]
119    /// * [`futures::future::try_select`]
120    ///
121    /// or their stream counterparts
122    ///
123    /// * [`futures::stream::StreamExt::for_each`]
124    /// * [`futures::stream::StreamExt::for_each_concurrent`]
125    /// * [`futures::stream::TryStreamExt::try_for_each`]
126    /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
127    ///
128    /// can meet your needs.
129    pub fn detach(mut self) {
130        self.0.scope.detach(self.0.task_id);
131        self.0.task_id = 0;
132    }
133}
134
135impl<T: Send + 'static> Task<T> {
136    /// Spawn a new task on the global scope of the current executor.
137    ///
138    /// The task may be executed on any thread(s) owned by the current executor.
139    /// See [`Task::local`] for an equivalent that ensures locality.
140    ///
141    /// The passed future will live until either (a) the future completes,
142    /// (b) the returned [`Task`] is dropped while the executor is running, or
143    /// (c) the executor is destroyed; whichever comes first.
144    ///
145    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
146    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
147    ///
148    /// # Panics
149    ///
150    /// May panic if not called in the context of an executor (e.g. within a
151    /// call to [`run`][crate::SendExecutor::run]).
152    pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
153        EHandle::local().global_scope().compute(future)
154    }
155}
156
157impl<T: 'static> Task<T> {
158    /// Spawn a new task on the global scope of the thread local executor.
159    ///
160    /// The passed future will live until either (a) the future completes,
161    /// (b) the returned [`Task`] is dropped while the executor is running, or
162    /// (c) the executor is destroyed; whichever comes first.
163    ///
164    /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
165    /// runtime panic. Use [`Task::spawn`] instead.
166    ///
167    /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
168    /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
169    ///
170    /// # Panics
171    ///
172    /// May panic if not called in the context of an executor (e.g. within a
173    /// call to [`run`][crate::SendExecutor::run]).
174    pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
175        EHandle::local().global_scope().compute_local(future)
176    }
177}
178
179impl<T: 'static> Task<T> {
180    /// Aborts a task and returns a future that resolves once the task is
181    /// aborted. The future can be ignored in which case the task will still be
182    /// aborted.
183    pub fn abort(self) -> impl Future<Output = Option<T>> {
184        self.detach_on_drop().abort()
185    }
186}
187
188impl<T: 'static> Future for Task<T> {
189    type Output = T;
190    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191        // SAFETY: We spawned the task so the return type should be correct.
192        let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
193        if result.is_ready() {
194            self.0.task_id = 0;
195        }
196        result
197    }
198}
199
200impl<T> Drop for Task<T> {
201    fn drop(&mut self) {
202        if self.0.task_id != 0 {
203            self.0.scope.abort_and_detach(self.0.task_id);
204            self.0.task_id = 0;
205        }
206    }
207}
208
209impl<T> From<JoinHandle<T>> for Task<T> {
210    fn from(value: JoinHandle<T>) -> Self {
211        Self(value)
212    }
213}
214
215/// Offload a blocking function call onto a different thread.
216///
217/// This function can be called from an asynchronous function without blocking
218/// it, returning a future that can be `.await`ed normally. The provided
219/// function should contain at least one blocking operation, such as:
220///
221/// - A synchronous syscall that does not yet have an async counterpart.
222/// - A compute operation which risks blocking the executor for an unacceptable
223///   amount of time.
224///
225/// If neither of these conditions are satisfied, just call the function normally,
226/// as synchronous functions themselves are allowed within an async context,
227/// as long as they are not blocking.
228///
229/// If you have an async function that may block, refactor the function such that
230/// the blocking operations are offloaded onto the function passed to [`unblock`].
231///
232/// NOTE:
233///
234/// - The input function should not interact with the executor. Attempting to do so
235///   can cause runtime errors. This includes spawning, creating new executors,
236///   passing futures between the input function and the calling context, and
237///   in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
238/// - Synchronous functions cannot be cancelled and may keep running after
239///   the returned future is dropped. As a result, resources held by the function
240///   should be assumed to be held until the returned future completes.
241/// - This function assumes panic=abort semantics, so if the input function panics,
242///   the process aborts. Behavior for panic=unwind is not defined.
243// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
244// spawning new threads if this proves to be a bottleneck.
245pub fn unblock<T: 'static + Send>(
246    f: impl 'static + Send + FnOnce() -> T,
247) -> impl 'static + Send + Future<Output = T> {
248    let (tx, rx) = futures::channel::oneshot::channel();
249    std::thread::spawn(move || {
250        let _ = tx.send(f());
251    });
252    rx.map(|r| r.unwrap())
253}
254
255/// Yields execution back to the runtime.
256pub async fn yield_now() {
257    let mut done = false;
258    poll_fn(|cx| {
259        if done {
260            Poll::Ready(())
261        } else {
262            done = true;
263            cx.waker().wake_by_ref();
264            Poll::Pending
265        }
266    })
267    .await;
268}
269
270#[cfg(test)]
271mod tests {
272    use super::super::executor::{LocalExecutor, SendExecutor};
273    use super::*;
274    use std::sync::{Arc, Mutex};
275
276    /// This struct holds a thread-safe mutable boolean and
277    /// sets its value to true when dropped.
278    #[derive(Clone)]
279    struct SetsBoolTrueOnDrop {
280        value: Arc<Mutex<bool>>,
281    }
282
283    impl SetsBoolTrueOnDrop {
284        fn new() -> (Self, Arc<Mutex<bool>>) {
285            let value = Arc::new(Mutex::new(false));
286            let sets_bool_true_on_drop = Self { value: value.clone() };
287            (sets_bool_true_on_drop, value)
288        }
289    }
290
291    impl Drop for SetsBoolTrueOnDrop {
292        fn drop(&mut self) {
293            let mut lock = self.value.lock().unwrap();
294            *lock = true;
295        }
296    }
297
298    #[test]
299    #[should_panic]
300    fn spawn_from_unblock_fails() {
301        // no executor in the off-thread, so spawning fails
302        SendExecutor::new(2).run(async move {
303            unblock(|| {
304                #[allow(clippy::let_underscore_future)]
305                let _ = Task::spawn(async {});
306            })
307            .await;
308        });
309    }
310
311    #[test]
312    fn future_destroyed_before_await_returns() {
313        LocalExecutor::new().run_singlethreaded(async {
314            let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
315
316            // Move the switch into a different thread.
317            // Once we return from this await, that switch should have been dropped.
318            unblock(move || {
319                let lock = sets_bool_true_on_drop.value.lock().unwrap();
320                assert!(!*lock);
321            })
322            .await;
323
324            // Switch moved into the future should have been dropped at this point.
325            // The value of the boolean should now be true.
326            let lock = value.lock().unwrap();
327            assert!(*lock);
328        });
329    }
330}