fuchsia_async/runtime/fuchsia/task.rs
1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::scope::ScopeHandle;
6use crate::EHandle;
7use futures::prelude::*;
8use std::future::poll_fn;
9use std::marker::PhantomData;
10use std::mem::ManuallyDrop;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14/// A handle to a future that is owned and polled by the executor.
15///
16/// Once a task is created, the executor will poll it until done, even if the task handle itself is
17/// not polled.
18///
19/// NOTE: When a JoinHandle is dropped, its future will be detached.
20///
21/// Polling (or attempting to extract the value from) a task after the executor is dropped may
22/// trigger a panic.
23#[derive(Debug)]
24// LINT.IfChange
25pub struct JoinHandle<T> {
26 scope: ScopeHandle,
27 task_id: usize,
28 phantom: PhantomData<T>,
29}
30// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
31
32impl<T> Unpin for JoinHandle<T> {}
33
34impl<T> JoinHandle<T> {
35 pub(crate) fn new(scope: ScopeHandle, task_id: usize) -> Self {
36 Self { scope, task_id, phantom: PhantomData }
37 }
38
39 /// Aborts a task and returns a future that resolves once the task is
40 /// aborted. The future can be ignored in which case the task will still be
41 /// aborted.
42 pub fn abort(mut self) -> impl Future<Output = Option<T>> {
43 // SAFETY: We spawned the task so the return type should be correct.
44 let result = unsafe { self.scope.abort_task(self.task_id) };
45 async move {
46 match result {
47 Some(output) => Some(output),
48 None => {
49 // If we are dropped from here, we'll end up calling `abort_and_detach`.
50 let result = std::future::poll_fn(|cx| {
51 // SAFETY: We spawned the task so the return type should be correct.
52 unsafe { self.scope.poll_aborted(self.task_id, cx) }
53 })
54 .await;
55 self.task_id = 0;
56 result
57 }
58 }
59 }
60 }
61}
62
63impl<T> Drop for JoinHandle<T> {
64 fn drop(&mut self) {
65 if self.task_id != 0 {
66 self.scope.detach(self.task_id);
67 }
68 }
69}
70
71impl<T: 'static> Future for JoinHandle<T> {
72 type Output = T;
73 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74 // SAFETY: We spawned the task so the return type should be correct.
75 let result = unsafe { self.scope.poll_join_result(self.task_id, cx) };
76 if result.is_ready() {
77 self.task_id = 0;
78 }
79 result
80 }
81}
82
83/// This is the same as a JoinHandle, except that the future will be aborted when the task is
84/// dropped.
85#[must_use]
86#[repr(transparent)]
87#[derive(Debug)]
88// LINT.IfChange
89pub struct Task<T>(JoinHandle<T>);
90// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
91
92impl<T> Task<T> {
93 /// Returns a `JoinHandle` which will have detach-on-drop semantics.
94 pub fn detach_on_drop(self) -> JoinHandle<T> {
95 let this = ManuallyDrop::new(self);
96 // SAFETY: We are bypassing our drop implementation.
97 unsafe { std::ptr::read(&this.0) }
98 }
99}
100
101impl Task<()> {
102 /// Detach this task so that it can run independently in the background.
103 ///
104 /// *Note*: This is usually not what you want. This API severs the control flow from the
105 /// caller. This can result in flaky tests and makes it impossible to return values
106 /// (including errors).
107 ///
108 /// If your goal is to run multiple tasks concurrently, use [`Scope`][crate::Scope].
109 ///
110 /// You can also use other futures combinators such as:
111 ///
112 /// * [`futures::future::join`]
113 /// * [`futures::future::select`]
114 /// * [`futures::select`]
115 ///
116 /// or their error-aware variants
117 ///
118 /// * [`futures::future::try_join`]
119 /// * [`futures::future::try_select`]
120 ///
121 /// or their stream counterparts
122 ///
123 /// * [`futures::stream::StreamExt::for_each`]
124 /// * [`futures::stream::StreamExt::for_each_concurrent`]
125 /// * [`futures::stream::TryStreamExt::try_for_each`]
126 /// * [`futures::stream::TryStreamExt::try_for_each_concurrent`]
127 ///
128 /// can meet your needs.
129 pub fn detach(mut self) {
130 self.0.scope.detach(self.0.task_id);
131 self.0.task_id = 0;
132 }
133}
134
135impl<T: Send + 'static> Task<T> {
136 /// Spawn a new task on the global scope of the current executor.
137 ///
138 /// The task may be executed on any thread(s) owned by the current executor.
139 /// See [`Task::local`] for an equivalent that ensures locality.
140 ///
141 /// The passed future will live until either (a) the future completes,
142 /// (b) the returned [`Task`] is dropped while the executor is running, or
143 /// (c) the executor is destroyed; whichever comes first.
144 ///
145 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
146 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
147 ///
148 /// # Panics
149 ///
150 /// May panic if not called in the context of an executor (e.g. within a
151 /// call to [`run`][crate::SendExecutor::run]).
152 pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
153 EHandle::local().global_scope().compute(future)
154 }
155}
156
157impl<T: 'static> Task<T> {
158 /// Spawn a new task on the global scope of the thread local executor.
159 ///
160 /// The passed future will live until either (a) the future completes,
161 /// (b) the returned [`Task`] is dropped while the executor is running, or
162 /// (c) the executor is destroyed; whichever comes first.
163 ///
164 /// NOTE: This is not supported with a [`SendExecutor`] and will cause a
165 /// runtime panic. Use [`Task::spawn`] instead.
166 ///
167 /// Code that uses scopes is encouraged to spawn on a shorter lived scope or
168 /// explicitly call [`Scope::global()`][crate::Scope::global] for spawning.
169 ///
170 /// # Panics
171 ///
172 /// May panic if not called in the context of an executor (e.g. within a
173 /// call to [`run`][crate::SendExecutor::run]).
174 pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
175 EHandle::local().global_scope().compute_local(future)
176 }
177}
178
179impl<T: 'static> Task<T> {
180 /// Aborts a task and returns a future that resolves once the task is
181 /// aborted. The future can be ignored in which case the task will still be
182 /// aborted.
183 pub fn abort(self) -> impl Future<Output = Option<T>> {
184 self.detach_on_drop().abort()
185 }
186}
187
188impl<T: 'static> Future for Task<T> {
189 type Output = T;
190 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
191 // SAFETY: We spawned the task so the return type should be correct.
192 let result = unsafe { self.0.scope.poll_join_result(self.0.task_id, cx) };
193 if result.is_ready() {
194 self.0.task_id = 0;
195 }
196 result
197 }
198}
199
200impl<T> Drop for Task<T> {
201 fn drop(&mut self) {
202 if self.0.task_id != 0 {
203 self.0.scope.abort_and_detach(self.0.task_id);
204 self.0.task_id = 0;
205 }
206 }
207}
208
209impl<T> From<JoinHandle<T>> for Task<T> {
210 fn from(value: JoinHandle<T>) -> Self {
211 Self(value)
212 }
213}
214
215/// Offload a blocking function call onto a different thread.
216///
217/// This function can be called from an asynchronous function without blocking
218/// it, returning a future that can be `.await`ed normally. The provided
219/// function should contain at least one blocking operation, such as:
220///
221/// - A synchronous syscall that does not yet have an async counterpart.
222/// - A compute operation which risks blocking the executor for an unacceptable
223/// amount of time.
224///
225/// If neither of these conditions are satisfied, just call the function normally,
226/// as synchronous functions themselves are allowed within an async context,
227/// as long as they are not blocking.
228///
229/// If you have an async function that may block, refactor the function such that
230/// the blocking operations are offloaded onto the function passed to [`unblock`].
231///
232/// NOTE:
233///
234/// - The input function should not interact with the executor. Attempting to do so
235/// can cause runtime errors. This includes spawning, creating new executors,
236/// passing futures between the input function and the calling context, and
237/// in some cases constructing async-aware types (such as IO-, IPC- and timer objects).
238/// - Synchronous functions cannot be cancelled and may keep running after
239/// the returned future is dropped. As a result, resources held by the function
240/// should be assumed to be held until the returned future completes.
241/// - This function assumes panic=abort semantics, so if the input function panics,
242/// the process aborts. Behavior for panic=unwind is not defined.
243// TODO(https://fxbug.dev/42158447): Consider using a backing thread pool to alleviate the cost of
244// spawning new threads if this proves to be a bottleneck.
245pub fn unblock<T: 'static + Send>(
246 f: impl 'static + Send + FnOnce() -> T,
247) -> impl 'static + Send + Future<Output = T> {
248 let (tx, rx) = futures::channel::oneshot::channel();
249 std::thread::spawn(move || {
250 let _ = tx.send(f());
251 });
252 rx.map(|r| r.unwrap())
253}
254
255/// Yields execution back to the runtime.
256pub async fn yield_now() {
257 let mut done = false;
258 poll_fn(|cx| {
259 if done {
260 Poll::Ready(())
261 } else {
262 done = true;
263 cx.waker().wake_by_ref();
264 Poll::Pending
265 }
266 })
267 .await;
268}
269
270#[cfg(test)]
271mod tests {
272 use super::super::executor::{LocalExecutor, SendExecutor};
273 use super::*;
274 use std::sync::{Arc, Mutex};
275
276 /// This struct holds a thread-safe mutable boolean and
277 /// sets its value to true when dropped.
278 #[derive(Clone)]
279 struct SetsBoolTrueOnDrop {
280 value: Arc<Mutex<bool>>,
281 }
282
283 impl SetsBoolTrueOnDrop {
284 fn new() -> (Self, Arc<Mutex<bool>>) {
285 let value = Arc::new(Mutex::new(false));
286 let sets_bool_true_on_drop = Self { value: value.clone() };
287 (sets_bool_true_on_drop, value)
288 }
289 }
290
291 impl Drop for SetsBoolTrueOnDrop {
292 fn drop(&mut self) {
293 let mut lock = self.value.lock().unwrap();
294 *lock = true;
295 }
296 }
297
298 #[test]
299 #[should_panic]
300 fn spawn_from_unblock_fails() {
301 // no executor in the off-thread, so spawning fails
302 SendExecutor::new(2).run(async move {
303 unblock(|| {
304 #[allow(clippy::let_underscore_future)]
305 let _ = Task::spawn(async {});
306 })
307 .await;
308 });
309 }
310
311 #[test]
312 fn future_destroyed_before_await_returns() {
313 LocalExecutor::new().run_singlethreaded(async {
314 let (sets_bool_true_on_drop, value) = SetsBoolTrueOnDrop::new();
315
316 // Move the switch into a different thread.
317 // Once we return from this await, that switch should have been dropped.
318 unblock(move || {
319 let lock = sets_bool_true_on_drop.value.lock().unwrap();
320 assert!(!*lock);
321 })
322 .await;
323
324 // Switch moved into the future should have been dropped at this point.
325 // The value of the boolean should now be true.
326 let lock = value.lock().unwrap();
327 assert!(*lock);
328 });
329 }
330}