fuchsia_async/runtime/fuchsia/executor/atomic_future.rs
1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod spawnable_future;
6
7use crate::ScopeHandle;
8use futures::ready;
9use std::borrow::Borrow;
10use std::future::Future;
11use std::hash::{Hash, Hasher};
12use std::marker::PhantomData;
13use std::mem::ManuallyDrop;
14use std::ops::Deref;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
20
21/// A lock-free thread-safe future.
22
23// The debugger knows the layout so that async backtraces work, so if this changes the debugger
24// might need to be changed too.
25//
26// This is `repr(C)` so that we can cast between `NonNull<Meta>` and `NonNull<AtomicFuture<F>>`.
27
28// LINT.IfChange
29#[repr(C)]
30struct AtomicFuture<F: Future> {
31 meta: Meta,
32
33 // `future` is safe to access after successfully clearing the INACTIVE state bit and the `DONE`
34 // state bit isn't set.
35 future: FutureOrResult<F>,
36}
37// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
38
39/// A lock-free thread-safe future. The handles can be cloned.
40pub struct AtomicFutureHandle<'a>(NonNull<Meta>, PhantomData<&'a ()>);
41
42/// `AtomicFutureHandle` is safe to access from multiple threads at once.
43unsafe impl Sync for AtomicFutureHandle<'_> {}
44unsafe impl Send for AtomicFutureHandle<'_> {}
45
46impl Drop for AtomicFutureHandle<'_> {
47 fn drop(&mut self) {
48 self.meta().release();
49 }
50}
51
52impl Clone for AtomicFutureHandle<'_> {
53 fn clone(&self) -> Self {
54 self.meta().retain();
55 Self(self.0, PhantomData)
56 }
57}
58
59struct Meta {
60 vtable: &'static VTable,
61
62 // Holds the reference count and state bits (INACTIVE, READY, etc.).
63 state: AtomicUsize,
64
65 scope: Option<ScopeHandle>,
66 id: usize,
67}
68
69impl Meta {
70 // # Safety
71 //
72 // This mints a handle with the 'static lifetime, so this should only be called from
73 // `AtomicFutureHandle<'static>`.
74 unsafe fn wake(&self) {
75 if self.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
76 self.retain();
77 self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
78 }
79 }
80
81 fn scope(&self) -> &ScopeHandle {
82 self.scope.as_ref().unwrap()
83 }
84
85 fn retain(&self) {
86 let old = self.state.fetch_add(1, Relaxed) & REF_COUNT_MASK;
87 assert!(old != REF_COUNT_MASK);
88 }
89
90 fn release(&self) {
91 // This can be Relaxed because there is a barrier in the drop function.
92 let old = self.state.fetch_sub(1, Relaxed) & REF_COUNT_MASK;
93 if old == 1 {
94 // SAFETY: This is safe because we just released the last reference.
95 unsafe {
96 (self.vtable.drop)(self.into());
97 }
98 } else {
99 // Check for underflow.
100 assert!(old > 0);
101 }
102 }
103
104 // # Safety
105 //
106 // The caller must know that the future has completed.
107 unsafe fn drop_result(&self, ordering: Ordering) {
108 // It's possible for this to race with another thread so we only drop the result if we are
109 // successful in setting the RESULT_TAKEN bit.
110 if self.state.fetch_or(RESULT_TAKEN, ordering) & RESULT_TAKEN == 0 {
111 (self.vtable.drop_result)(self.into());
112 }
113 }
114}
115
116struct VTable {
117 /// Drops the atomic future.
118 ///
119 /// # Safety
120 ///
121 /// The caller must ensure there are no other references i.e. the reference count should be
122 /// zero.
123 // zxdb uses this method to figure out the concrete type of the future.
124 // LINT.IfChange
125 drop: unsafe fn(NonNull<Meta>),
126 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
127 /// Drops the future.
128 ///
129 /// # Safety
130 ///
131 /// The caller must ensure the future hasn't been dropped.
132 drop_future: unsafe fn(NonNull<Meta>),
133 /// Polls the future.
134 ///
135 /// # Safety
136 ///
137 /// The caller must ensure the future hasn't been dropped and has exclusive access.
138 poll: unsafe fn(NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()>,
139
140 /// Gets the result.
141 ///
142 /// # Safety
143 ///
144 /// The caller must ensure the future is finished and the result hasn't been taken or dropped.
145 get_result: unsafe fn(NonNull<Meta>) -> *const (),
146
147 /// Drops the result.
148 ///
149 /// # Safety
150 ///
151 /// The caller must ensure the future is finished and the result hasn't already been taken or
152 /// dropped.
153 drop_result: unsafe fn(NonNull<Meta>),
154}
155
156union FutureOrResult<F: Future> {
157 future: ManuallyDrop<F>,
158 result: ManuallyDrop<F::Output>,
159}
160
161impl<F: Future> AtomicFuture<F> {
162 const VTABLE: VTable = VTable {
163 drop: Self::drop,
164 drop_future: Self::drop_future,
165 poll: Self::poll,
166 get_result: Self::get_result,
167 drop_result: Self::drop_result,
168 };
169
170 unsafe fn drop(meta: NonNull<Meta>) {
171 drop(Box::from_raw(meta.cast::<Self>().as_mut()));
172 }
173
174 unsafe fn poll(meta: NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()> {
175 let future = &mut meta.cast::<Self>().as_mut().future;
176 let result = ready!(Pin::new_unchecked(&mut *future.future).poll(cx));
177 // This might panic which will leave ourselves in a bad state. We deal with this by
178 // aborting (see below).
179 ManuallyDrop::drop(&mut future.future);
180 future.result = ManuallyDrop::new(result);
181 Poll::Ready(())
182 }
183
184 unsafe fn drop_future(meta: NonNull<Meta>) {
185 ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.future);
186 }
187
188 unsafe fn get_result(meta: NonNull<Meta>) -> *const () {
189 &*meta.cast::<Self>().as_mut().future.result as *const F::Output as *const ()
190 }
191
192 unsafe fn drop_result(meta: NonNull<Meta>) {
193 ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.result);
194 }
195}
196
197/// State Bits
198
199// Exclusive access is gained by clearing this bit.
200const INACTIVE: usize = 1 << 63;
201
202// Set to indicate the future needs to be polled again.
203const READY: usize = 1 << 62;
204
205// Terminal state: the future is dropped upon entry to this state. When in this state, other bits
206// can be set, including READY (which has no meaning).
207const DONE: usize = 1 << 61;
208
209// The task has been detached.
210const DETACHED: usize = 1 << 60;
211
212// The task has been cancelled.
213const CANCELLED: usize = 1 << 59;
214
215// The result has been taken.
216const RESULT_TAKEN: usize = 1 << 58;
217
218// The mask for the ref count.
219const REF_COUNT_MASK: usize = RESULT_TAKEN - 1;
220
221/// The result of a call to `try_poll`.
222/// This indicates the result of attempting to `poll` the future.
223pub enum AttemptPollResult {
224 /// The future was polled, but did not complete.
225 Pending,
226 /// The future was polled and finished by this thread.
227 /// This result is normally used to trigger garbage-collection of the future.
228 IFinished,
229 /// The future was already completed by another thread.
230 SomeoneElseFinished,
231 /// The future was polled, did not complete, but it is woken whilst it is polled so it
232 /// should be polled again.
233 Yield,
234 /// The future was cancelled.
235 Cancelled,
236}
237
238/// The result of calling the `cancel_and_detach` function.
239#[must_use]
240pub enum CancelAndDetachResult {
241 /// The future has finished; it can be dropped.
242 Done,
243
244 /// The future needs to be added to a run queue to be cancelled.
245 AddToRunQueue,
246
247 /// The future is soon to be cancelled and nothing needs to be done.
248 Pending,
249}
250
251impl<'a> AtomicFutureHandle<'a> {
252 /// Create a new `AtomicFuture`.
253 pub fn new<F: Future + Send + 'a>(scope: Option<ScopeHandle>, id: usize, future: F) -> Self
254 where
255 F::Output: Send + 'a,
256 {
257 // SAFETY: This is safe because the future and output are both Send.
258 unsafe { Self::new_local(scope, id, future) }
259 }
260
261 /// Create a new `AtomicFuture` from a !Send future.
262 ///
263 /// # Safety
264 ///
265 /// The caller must uphold the Send requirements.
266 pub unsafe fn new_local<F: Future + 'a>(
267 scope: Option<ScopeHandle>,
268 id: usize,
269 future: F,
270 ) -> Self
271 where
272 F::Output: 'a,
273 {
274 Self(
275 NonNull::new_unchecked(Box::into_raw(Box::new(AtomicFuture {
276 meta: Meta {
277 vtable: &AtomicFuture::<F>::VTABLE,
278 // The future is inactive and we start with a single reference.
279 state: AtomicUsize::new(1 | INACTIVE),
280 scope,
281 id,
282 },
283 future: FutureOrResult { future: ManuallyDrop::new(future) },
284 })))
285 .cast::<Meta>(),
286 PhantomData,
287 )
288 }
289
290 fn meta(&self) -> &Meta {
291 // SAFETY: This is safe because we hold a reference count.
292 unsafe { self.0.as_ref() }
293 }
294
295 /// Returns the future's ID.
296 pub fn id(&self) -> usize {
297 self.meta().id
298 }
299
300 /// Returns the associated scope.
301 pub fn scope(&self) -> &ScopeHandle {
302 &self.meta().scope()
303 }
304
305 /// Attempt to poll the underlying future.
306 ///
307 /// `try_poll` ensures that the future is polled at least once more
308 /// unless it has already finished.
309 pub fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult {
310 let meta = self.meta();
311 loop {
312 // Attempt to acquire sole responsibility for polling the future (by clearing the
313 // INACTIVE bit) and also clear the READY bit at the same time so that we track if it
314 // becomes READY again whilst we are polling.
315 let old = meta.state.fetch_and(!(INACTIVE | READY), Acquire);
316 assert_ne!(old & REF_COUNT_MASK, 0);
317 if old & DONE != 0 {
318 // Someone else completed this future already
319 return AttemptPollResult::SomeoneElseFinished;
320 }
321 if old & INACTIVE != 0 {
322 // We are now the (only) active worker, proceed to poll...
323 if old & CANCELLED != 0 {
324 // The future was cancelled.
325 // SAFETY: We have exclusive access.
326 unsafe {
327 self.drop_future_unchecked();
328 }
329 return AttemptPollResult::Cancelled;
330 }
331 break;
332 }
333 // Future was already active; this shouldn't really happen because we shouldn't be
334 // polling it from multiple threads at the same time. Still, we handle it by setting
335 // the READY bit so that it gets polled again. We do this regardless of whether we
336 // cleared the READY bit above.
337 let old = meta.state.fetch_or(READY, Relaxed);
338 // If the future is still active, or the future was already marked as ready, we can
339 // just return and it will get polled again.
340 if old & INACTIVE == 0 || old & READY != 0 {
341 return AttemptPollResult::Pending;
342 }
343 // The worker finished, and we marked the future as ready, so we must try again because
344 // the future won't be in a run queue.
345 }
346
347 // We cannot recover from panics.
348 let bomb = Bomb;
349
350 // SAFETY: We have exclusive access because we cleared the INACTIVE state bit.
351 let result = unsafe { (meta.vtable.poll)(meta.into(), cx) };
352
353 std::mem::forget(bomb);
354
355 if let Poll::Ready(()) = result {
356 // The future will have been dropped, so we just need to set the state.
357 //
358 // This needs to be Release ordering because we need to synchronize with another thread
359 // that takes or drops the result.
360 let old = meta.state.fetch_or(DONE, Release);
361
362 if old & DETACHED != 0 {
363 // If the future is detached, we should eagerly drop the result. This can be Relaxed
364 // ordering because the result was written by this thread.
365
366 // SAFETY: The future has completed.
367 unsafe {
368 meta.drop_result(Relaxed);
369 }
370 }
371 // No one else will read `future` unless they see `INACTIVE`, which will never
372 // happen again.
373 AttemptPollResult::IFinished
374 } else if meta.state.fetch_or(INACTIVE, Release) & READY == 0 {
375 AttemptPollResult::Pending
376 } else {
377 // The future was marked ready whilst we were polling, so yield.
378 AttemptPollResult::Yield
379 }
380 }
381
382 /// Drops the future without checking its current state.
383 ///
384 /// # Safety
385 ///
386 /// This doesn't check the current state, so this must only be called if it is known that there
387 /// is no concurrent access. This also does *not* include any memory barriers before dropping
388 /// the future.
389 pub unsafe fn drop_future_unchecked(&self) {
390 // Set the state first in case we panic when we drop.
391 let meta = self.meta();
392 assert!(meta.state.fetch_or(DONE | RESULT_TAKEN, Relaxed) & DONE == 0);
393 (meta.vtable.drop_future)(meta.into());
394 }
395
396 /// Drops the future if it is not currently being polled. Returns success if the future was
397 /// dropped or was already dropped.
398 pub fn try_drop(&self) -> Result<(), ()> {
399 let old = self.meta().state.fetch_and(!INACTIVE, Acquire);
400 if old & DONE != 0 {
401 Ok(())
402 } else if old & INACTIVE != 0 {
403 // SAFETY: We have exclusive access.
404 unsafe {
405 self.drop_future_unchecked();
406 }
407 Ok(())
408 } else {
409 Err(())
410 }
411 }
412
413 /// Cancels the task. Returns true if the task needs to be added to a run queue.
414 #[must_use]
415 pub fn cancel(&self) -> bool {
416 self.meta().state.fetch_or(CANCELLED | READY, Relaxed) & (INACTIVE | READY | DONE)
417 == INACTIVE
418 }
419
420 /// Marks the task as detached.
421 pub fn detach(&self) {
422 let meta = self.meta();
423 let old = meta.state.fetch_or(DETACHED, Relaxed);
424
425 if old & (DONE | RESULT_TAKEN) == DONE {
426 // If the future is done, we should eagerly drop the result. This needs to be acquire
427 // ordering because another thread might have written the result.
428
429 // SAFETY: The future has completed.
430 unsafe {
431 meta.drop_result(Acquire);
432 }
433 }
434 }
435
436 /// Marks the task as cancelled and detached (for when the caller isn't interested in waiting
437 /// for the cancellation to be finished). Returns true if the task should be added to a run
438 /// queue.
439 pub fn cancel_and_detach(&self) -> CancelAndDetachResult {
440 let meta = self.meta();
441 let old_state = meta.state.fetch_or(CANCELLED | DETACHED | READY, Relaxed);
442 if old_state & DONE != 0 {
443 // If the future is done, we should eagerly drop the result. This needs to be acquire
444 // ordering because another thread might have written the result.
445
446 // SAFETY: The future has completed.
447 unsafe {
448 meta.drop_result(Acquire);
449 }
450
451 CancelAndDetachResult::Done
452 } else if old_state & (INACTIVE | READY) == INACTIVE {
453 CancelAndDetachResult::AddToRunQueue
454 } else {
455 CancelAndDetachResult::Pending
456 }
457 }
458
459 /// Returns true if the task is detached.
460 pub fn is_detached(&self) -> bool {
461 self.meta().state.load(Relaxed) & DETACHED != 0
462 }
463
464 /// Takes the result.
465 ///
466 /// # Safety
467 ///
468 /// The caller must guarantee that `R` is the correct type.
469 pub unsafe fn take_result<R>(&self) -> Option<R> {
470 // This needs to be Acquire ordering to synchronize with the polling thread.
471 let meta = self.meta();
472 if meta.state.load(Relaxed) & (DONE | RESULT_TAKEN) == DONE
473 && meta.state.fetch_or(RESULT_TAKEN, Acquire) & RESULT_TAKEN == 0
474 {
475 Some(((meta.vtable.get_result)(meta.into()) as *const R).read())
476 } else {
477 None
478 }
479 }
480}
481
482impl AtomicFutureHandle<'static> {
483 /// Returns a waker for the future.
484 pub fn waker(&self) -> BorrowedWaker<'_> {
485 static BORROWED_WAKER_VTABLE: RawWakerVTable =
486 RawWakerVTable::new(waker_clone, waker_wake_by_ref, waker_wake_by_ref, waker_noop);
487 static WAKER_VTABLE: RawWakerVTable =
488 RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
489
490 fn waker_clone(raw_meta: *const ()) -> RawWaker {
491 // SAFETY: We did the reverse cast below.
492 let meta = unsafe { &*(raw_meta as *const Meta) };
493 meta.retain();
494 RawWaker::new(raw_meta, &WAKER_VTABLE)
495 }
496
497 fn waker_wake(raw_meta: *const ()) {
498 // SAFETY: We did the reverse cast below.
499 let meta = unsafe { &*(raw_meta as *const Meta) };
500 if meta.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
501 // This consumes the reference count.
502 meta.scope().executor().task_is_ready(AtomicFutureHandle(
503 // SAFETY: We know raw_meta is not null.
504 unsafe { NonNull::new_unchecked(raw_meta as *mut Meta) },
505 PhantomData,
506 ));
507 } else {
508 meta.release();
509 }
510 }
511
512 fn waker_wake_by_ref(meta: *const ()) {
513 // SAFETY: We did the reverse cast below.
514 let meta = unsafe { &*(meta as *const Meta) };
515 // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
516 unsafe {
517 meta.wake();
518 }
519 }
520
521 fn waker_noop(_meta: *const ()) {}
522
523 fn waker_drop(meta: *const ()) {
524 // SAFETY: We did the reverse cast below.
525 let meta = unsafe { &*(meta as *const Meta) };
526 meta.release();
527 }
528
529 BorrowedWaker(
530 // SAFETY: We meet the contract for RawWaker/RawWakerVtable.
531 unsafe {
532 Waker::from_raw(RawWaker::new(self.0.as_ptr() as *const (), &BORROWED_WAKER_VTABLE))
533 },
534 PhantomData,
535 )
536 }
537
538 /// Wakes the future.
539 pub fn wake(&self) {
540 // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
541 unsafe {
542 self.meta().wake();
543 }
544 }
545}
546
547impl<F: Future> Drop for AtomicFuture<F> {
548 fn drop(&mut self) {
549 let meta = &mut self.meta;
550 // This needs to be acquire ordering so that we see writes that might have just happened
551 // in another thread when the future was polled.
552 let state = meta.state.load(Acquire);
553 if state & DONE == 0 {
554 // SAFETY: The state isn't DONE so we must drop the future.
555 unsafe {
556 (meta.vtable.drop_future)(meta.into());
557 }
558 } else if state & RESULT_TAKEN == 0 {
559 // SAFETY: The result hasn't been taken so we must drop the result.
560 unsafe {
561 (meta.vtable.drop_result)(meta.into());
562 }
563 }
564 }
565}
566
567pub struct BorrowedWaker<'a>(std::task::Waker, PhantomData<&'a ()>);
568
569impl Deref for BorrowedWaker<'_> {
570 type Target = Waker;
571
572 fn deref(&self) -> &Self::Target {
573 &self.0
574 }
575}
576
577impl Borrow<usize> for AtomicFutureHandle<'static> {
578 fn borrow(&self) -> &usize {
579 &self.meta().id
580 }
581}
582
583impl Hash for AtomicFutureHandle<'static> {
584 fn hash<H: Hasher>(&self, state: &mut H) {
585 self.meta().id.hash(state);
586 }
587}
588
589impl PartialEq for AtomicFutureHandle<'static> {
590 fn eq(&self, other: &Self) -> bool {
591 self.meta().id == other.meta().id
592 }
593}
594
595impl Eq for AtomicFutureHandle<'static> {}
596
597struct Bomb;
598impl Drop for Bomb {
599 fn drop(&mut self) {
600 std::process::abort();
601 }
602}