fuchsia_async/runtime/fuchsia/executor/atomic_future.rs
1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod hooks;
6pub mod spawnable_future;
7
8use crate::ScopeHandle;
9use futures::ready;
10use std::borrow::Borrow;
11use std::future::Future;
12use std::hash::{Hash, Hasher};
13use std::marker::PhantomData;
14use std::mem::ManuallyDrop;
15use std::ops::Deref;
16use std::pin::Pin;
17use std::ptr::NonNull;
18use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
21
22/// A lock-free thread-safe future.
23//
24// The debugger knows the layout so that async backtraces work, so if this changes the debugger
25// might need to be changed too.
26//
27// This is `repr(C)` so that we can cast between `NonNull<Meta>` and `NonNull<AtomicFuture<F>>`.
28//
29// LINT.IfChange
30#[repr(C)]
31struct AtomicFuture<F: Future> {
32 meta: Meta,
33
34 // `future` is safe to access after successfully clearing the INACTIVE state bit and the `DONE`
35 // state bit isn't set.
36 future: FutureOrResult<F>,
37}
38// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
39
40/// A lock-free thread-safe future. The handles can be cloned.
41pub struct AtomicFutureHandle<'a>(NonNull<Meta>, PhantomData<&'a ()>);
42
43/// `AtomicFutureHandle` is safe to access from multiple threads at once.
44unsafe impl Sync for AtomicFutureHandle<'_> {}
45unsafe impl Send for AtomicFutureHandle<'_> {}
46
47impl Drop for AtomicFutureHandle<'_> {
48 fn drop(&mut self) {
49 self.meta().release();
50 }
51}
52
53impl Clone for AtomicFutureHandle<'_> {
54 fn clone(&self) -> Self {
55 self.meta().retain();
56 Self(self.0, PhantomData)
57 }
58}
59
60struct Meta {
61 vtable: &'static VTable,
62
63 // Holds the reference count and state bits (INACTIVE, READY, etc.).
64 state: AtomicUsize,
65
66 scope: Option<ScopeHandle>,
67 id: usize,
68}
69
70impl Meta {
71 // # Safety
72 //
73 // This mints a handle with the 'static lifetime, so this should only be called from
74 // `AtomicFutureHandle<'static>`.
75 unsafe fn wake(&self) {
76 if self.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
77 self.retain();
78 self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
79 }
80 }
81
82 // Returns true if a guard should be acquired.
83 //
84 // # Safety
85 //
86 // This mints a handle with the 'static lifetime, so this should only be called from
87 // `AtomicFutureHandle<'static>`.
88 unsafe fn wake_with_active_guard(&self) -> bool {
89 let old = self.state.fetch_or(READY | WITH_ACTIVE_GUARD, Relaxed);
90 if old & (INACTIVE | READY | DONE) == INACTIVE {
91 self.retain();
92 self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
93 }
94
95 // If the task is DONE, the guard won't be released, so we must let the caller know.
96 old & (DONE | WITH_ACTIVE_GUARD) == 0
97 }
98
99 fn scope(&self) -> &ScopeHandle {
100 self.scope.as_ref().unwrap()
101 }
102
103 fn retain(&self) {
104 let old = self.state.fetch_add(1, Relaxed) & REF_COUNT_MASK;
105 assert!(old != REF_COUNT_MASK);
106 }
107
108 fn release(&self) {
109 // This can be Relaxed because there is a barrier in the drop function.
110 let old = self.state.fetch_sub(1, Relaxed) & REF_COUNT_MASK;
111 if old == 1 {
112 // SAFETY: This is safe because we just released the last reference.
113 unsafe {
114 (self.vtable.drop)(self.into());
115 }
116 } else {
117 // Check for underflow.
118 assert!(old > 0);
119 }
120 }
121
122 // # Safety
123 //
124 // The caller must know that the future has completed.
125 unsafe fn drop_result(&self, ordering: Ordering) {
126 // It's possible for this to race with another thread so we only drop the result if we are
127 // successful in setting the RESULT_TAKEN bit.
128 if self.state.fetch_or(RESULT_TAKEN, ordering) & RESULT_TAKEN == 0 {
129 (self.vtable.drop_result)(self.into());
130 }
131 }
132}
133
134struct VTable {
135 /// Drops the atomic future.
136 ///
137 /// # Safety
138 ///
139 /// The caller must ensure there are no other references i.e. the reference count should be
140 /// zero.
141 // zxdb uses this method to figure out the concrete type of the future.
142 // LINT.IfChange
143 drop: unsafe fn(NonNull<Meta>),
144 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
145 /// Drops the future.
146 ///
147 /// # Safety
148 ///
149 /// The caller must ensure the future hasn't been dropped.
150 drop_future: unsafe fn(NonNull<Meta>),
151 /// Polls the future.
152 ///
153 /// # Safety
154 ///
155 /// The caller must ensure the future hasn't been dropped and has exclusive access.
156 poll: unsafe fn(NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()>,
157
158 /// Gets the result.
159 ///
160 /// # Safety
161 ///
162 /// The caller must ensure the future is finished and the result hasn't been taken or dropped.
163 get_result: unsafe fn(NonNull<Meta>) -> *const (),
164
165 /// Drops the result.
166 ///
167 /// # Safety
168 ///
169 /// The caller must ensure the future is finished and the result hasn't already been taken or
170 /// dropped.
171 drop_result: unsafe fn(NonNull<Meta>),
172}
173
174union FutureOrResult<F: Future> {
175 future: ManuallyDrop<F>,
176 result: ManuallyDrop<F::Output>,
177}
178
179impl<F: Future> AtomicFuture<F> {
180 const VTABLE: VTable = VTable {
181 drop: Self::drop,
182 drop_future: Self::drop_future,
183 poll: Self::poll,
184 get_result: Self::get_result,
185 drop_result: Self::drop_result,
186 };
187
188 unsafe fn drop(meta: NonNull<Meta>) {
189 drop(Box::from_raw(meta.cast::<Self>().as_mut()));
190 }
191
192 unsafe fn poll(meta: NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()> {
193 let future = &mut meta.cast::<Self>().as_mut().future;
194 let result = ready!(Pin::new_unchecked(&mut *future.future).poll(cx));
195 // This might panic which will leave ourselves in a bad state. We deal with this by
196 // aborting (see below).
197 ManuallyDrop::drop(&mut future.future);
198 future.result = ManuallyDrop::new(result);
199 Poll::Ready(())
200 }
201
202 unsafe fn drop_future(meta: NonNull<Meta>) {
203 ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.future);
204 }
205
206 unsafe fn get_result(meta: NonNull<Meta>) -> *const () {
207 &*meta.cast::<Self>().as_mut().future.result as *const F::Output as *const ()
208 }
209
210 unsafe fn drop_result(meta: NonNull<Meta>) {
211 ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.result);
212 }
213}
214
215/// State Bits
216//
217// Exclusive access is gained by clearing this bit.
218const INACTIVE: usize = 1 << 63;
219
220// Set to indicate the future needs to be polled again.
221const READY: usize = 1 << 62;
222
223// Terminal state: the future is dropped upon entry to this state. When in this state, other bits
224// can be set, including READY (which has no meaning).
225const DONE: usize = 1 << 61;
226
227// The task has been detached.
228const DETACHED: usize = 1 << 60;
229
230// The task has been cancelled.
231const ABORTED: usize = 1 << 59;
232
233// The task has an active guard that should be dropped when the task is next polled.
234const WITH_ACTIVE_GUARD: usize = 1 << 58;
235
236// The result has been taken.
237const RESULT_TAKEN: usize = 1 << 57;
238
239// The mask for the ref count.
240const REF_COUNT_MASK: usize = RESULT_TAKEN - 1;
241
242/// The result of a call to `try_poll`.
243/// This indicates the result of attempting to `poll` the future.
244pub enum AttemptPollResult {
245 /// The future was polled, but did not complete.
246 Pending,
247 /// The future was polled and finished by this thread.
248 /// This result is normally used to trigger garbage-collection of the future.
249 IFinished,
250 /// The future was already completed by another thread.
251 SomeoneElseFinished,
252 /// The future was polled, did not complete, but it is woken whilst it is polled so it
253 /// should be polled again.
254 Yield,
255 /// The future was aborted.
256 Aborted,
257}
258
259/// The result of calling the `abort_and_detach` function.
260#[must_use]
261pub enum AbortAndDetachResult {
262 /// The future has finished; it can be dropped.
263 Done,
264
265 /// The future needs to be added to a run queue to be aborted.
266 AddToRunQueue,
267
268 /// The future is soon to be aborted and nothing needs to be done.
269 Pending,
270}
271
272impl<'a> AtomicFutureHandle<'a> {
273 /// Create a new `AtomicFuture`.
274 pub(crate) fn new<F: Future + Send + 'a>(
275 scope: Option<ScopeHandle>,
276 id: usize,
277 future: F,
278 ) -> Self
279 where
280 F::Output: Send + 'a,
281 {
282 // SAFETY: This is safe because the future and output are both Send.
283 unsafe { Self::new_local(scope, id, future) }
284 }
285
286 /// Create a new `AtomicFuture` from a !Send future.
287 ///
288 /// # Safety
289 ///
290 /// The caller must uphold the Send requirements.
291 pub(crate) unsafe fn new_local<F: Future + 'a>(
292 scope: Option<ScopeHandle>,
293 id: usize,
294 future: F,
295 ) -> Self
296 where
297 F::Output: 'a,
298 {
299 Self(
300 NonNull::new_unchecked(Box::into_raw(Box::new(AtomicFuture {
301 meta: Meta {
302 vtable: &AtomicFuture::<F>::VTABLE,
303 // The future is inactive and we start with a single reference.
304 state: AtomicUsize::new(1 | INACTIVE),
305 scope,
306 id,
307 },
308 future: FutureOrResult { future: ManuallyDrop::new(future) },
309 })))
310 .cast::<Meta>(),
311 PhantomData,
312 )
313 }
314
315 fn meta(&self) -> &Meta {
316 // SAFETY: This is safe because we hold a reference count.
317 unsafe { self.0.as_ref() }
318 }
319
320 /// Returns the future's ID.
321 pub fn id(&self) -> usize {
322 self.meta().id
323 }
324
325 /// Returns the associated scope.
326 pub fn scope(&self) -> &ScopeHandle {
327 self.meta().scope()
328 }
329
330 /// Attempt to poll the underlying future.
331 ///
332 /// `try_poll` ensures that the future is polled at least once more
333 /// unless it has already finished.
334 pub(crate) fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult {
335 let meta = self.meta();
336 let has_active_guard = loop {
337 // Attempt to acquire sole responsibility for polling the future (by clearing the
338 // INACTIVE bit) and also clear the READY and WITH_ACTIVE_GUARD bits at the same time.
339 // We clear both so that we can track if they are set again whilst we are polling.
340 let old = meta.state.fetch_and(!(INACTIVE | READY | WITH_ACTIVE_GUARD), Acquire);
341 assert_ne!(old & REF_COUNT_MASK, 0);
342 if old & DONE != 0 {
343 // If the DONE bit is set, the WITH_ACTIVE_GUARD bit should be ignored; it may or
344 // may not be set, but it doesn't reflect whether an active guard is held so even
345 // though we just cleared it, we shouldn't release a guard here.
346 return AttemptPollResult::SomeoneElseFinished;
347 }
348 let has_active_guard = old & WITH_ACTIVE_GUARD != 0;
349 if old & INACTIVE != 0 {
350 // We are now the (only) active worker, proceed to poll...
351 if old & ABORTED != 0 {
352 if has_active_guard {
353 meta.scope().release_cancel_guard();
354 }
355 // The future was aborted.
356 // SAFETY: We have exclusive access.
357 unsafe {
358 self.drop_future_unchecked();
359 }
360 return AttemptPollResult::Aborted;
361 }
362 break has_active_guard;
363 }
364 // Future was already active; this shouldn't really happen because we shouldn't be
365 // polling it from multiple threads at the same time. Still, we handle it by setting
366 // the READY bit so that it gets polled again. We do this regardless of whether we
367 // cleared the READY bit above.
368 let old2 = meta.state.fetch_or(READY | (old & WITH_ACTIVE_GUARD), Relaxed);
369
370 if old2 & DONE != 0 {
371 // If `has_active_guard` is true, we are responsible for releasing a guard since it
372 // means we cleared the `WITH_ACTIVE_GUARD` bit.
373 if has_active_guard {
374 meta.scope().release_cancel_guard();
375 }
376 return AttemptPollResult::SomeoneElseFinished;
377 }
378
379 if has_active_guard && old2 & WITH_ACTIVE_GUARD != 0 {
380 // Within the small window, something else gave this task an active guard, so we
381 // must return one of them.
382 meta.scope().release_cancel_guard();
383 }
384
385 // If the future is still active, or the future was already marked as ready, we can
386 // just return and it will get polled again.
387 if old2 & INACTIVE == 0 || old2 & READY != 0 {
388 return AttemptPollResult::Pending;
389 }
390 // The worker finished, and we marked the future as ready, so we must try again because
391 // the future won't be in a run queue.
392 };
393
394 // We cannot recover from panics.
395 let bomb = Bomb;
396
397 // SAFETY: We have exclusive access because we cleared the INACTIVE state bit.
398 let result = unsafe { (meta.vtable.poll)(meta.into(), cx) };
399
400 std::mem::forget(bomb);
401
402 if has_active_guard {
403 meta.scope().release_cancel_guard();
404 }
405
406 if let Poll::Ready(()) = result {
407 // The future will have been dropped, so we just need to set the state.
408 //
409 // This needs to be Release ordering because we need to synchronize with another thread
410 // that takes or drops the result.
411 let old = meta.state.fetch_or(DONE, Release);
412
413 if old & WITH_ACTIVE_GUARD != 0 {
414 // Whilst we were polling the task, it was given an active guard. We must return it
415 // now.
416 meta.scope().release_cancel_guard();
417 }
418
419 if old & DETACHED != 0 {
420 // If the future is detached, we should eagerly drop the result. This can be
421 // Relaxed ordering because the result was written by this thread.
422
423 // SAFETY: The future has completed.
424 unsafe {
425 meta.drop_result(Relaxed);
426 }
427 }
428 // No one else will read `future` unless they see `INACTIVE`, which will never
429 // happen again.
430 AttemptPollResult::IFinished
431 } else if meta.state.fetch_or(INACTIVE, Release) & READY == 0 {
432 AttemptPollResult::Pending
433 } else {
434 // The future was marked ready whilst we were polling, so yield.
435 AttemptPollResult::Yield
436 }
437 }
438
439 /// Drops the future without checking its current state.
440 ///
441 /// # Panics
442 ///
443 /// This will panic if the future is already marked with `DONE`.
444 ///
445 /// # Safety
446 ///
447 /// This doesn't check the current state, so this must only be called if it is known that there
448 /// is no concurrent access. This also does *not* include any memory barriers before dropping
449 /// the future.
450 pub(crate) unsafe fn drop_future_unchecked(&self) {
451 // Set the state first in case we panic when we drop.
452 let meta = self.meta();
453 let old = meta.state.fetch_or(DONE | RESULT_TAKEN, Relaxed);
454 assert_eq!(old & DONE, 0);
455 if old & WITH_ACTIVE_GUARD != 0 {
456 meta.scope().release_cancel_guard();
457 }
458 (meta.vtable.drop_future)(meta.into());
459 }
460
461 /// Drops the future if it is not currently being polled. Returns success if the future was
462 /// dropped or was already dropped.
463 pub(crate) fn try_drop(&self) -> Result<(), ()> {
464 let old = self.meta().state.fetch_and(!INACTIVE, Acquire);
465 if old & DONE != 0 {
466 Ok(())
467 } else if old & INACTIVE != 0 {
468 // SAFETY: We have exclusive access.
469 unsafe {
470 self.drop_future_unchecked();
471 }
472 Ok(())
473 } else {
474 Err(())
475 }
476 }
477
478 /// Aborts the task. Returns true if the task needs to be added to a run queue.
479 #[must_use]
480 pub(crate) fn abort(&self) -> bool {
481 self.meta().state.fetch_or(ABORTED | READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE
482 }
483
484 /// Marks the task as detached.
485 pub(crate) fn detach(&self) {
486 let meta = self.meta();
487 let old = meta.state.fetch_or(DETACHED, Relaxed);
488
489 if old & (DONE | RESULT_TAKEN) == DONE {
490 // If the future is done, we should eagerly drop the result. This needs to be acquire
491 // ordering because another thread might have written the result.
492
493 // SAFETY: The future has completed.
494 unsafe {
495 meta.drop_result(Acquire);
496 }
497 }
498 }
499
500 /// Marks the task as aborted and detached (for when the caller isn't interested in waiting
501 /// for the cancellation to be finished). Returns true if the task should be added to a run
502 /// queue.
503 pub(crate) fn abort_and_detach(&self) -> AbortAndDetachResult {
504 let meta = self.meta();
505 let old_state = meta.state.fetch_or(ABORTED | DETACHED | READY, Relaxed);
506 if old_state & DONE != 0 {
507 // If the future is done, we should eagerly drop the result. This needs to be acquire
508 // ordering because another thread might have written the result.
509
510 // SAFETY: The future has completed.
511 unsafe {
512 meta.drop_result(Acquire);
513 }
514
515 AbortAndDetachResult::Done
516 } else if old_state & (INACTIVE | READY) == INACTIVE {
517 AbortAndDetachResult::AddToRunQueue
518 } else {
519 AbortAndDetachResult::Pending
520 }
521 }
522
523 /// Returns true if the task is detached.
524 pub(crate) fn is_detached(&self) -> bool {
525 self.meta().state.load(Relaxed) & DETACHED != 0
526 }
527
528 /// Takes the result.
529 ///
530 /// # Safety
531 ///
532 /// The caller must guarantee that `R` is the correct type.
533 pub(crate) unsafe fn take_result<R>(&self) -> Option<R> {
534 // This needs to be Acquire ordering to synchronize with the polling thread.
535 let meta = self.meta();
536 if meta.state.load(Relaxed) & (DONE | RESULT_TAKEN) == DONE
537 && meta.state.fetch_or(RESULT_TAKEN, Acquire) & RESULT_TAKEN == 0
538 {
539 Some(((meta.vtable.get_result)(meta.into()) as *const R).read())
540 } else {
541 None
542 }
543 }
544}
545
546impl AtomicFutureHandle<'static> {
547 /// Returns a waker for the future.
548 pub(crate) fn waker(&self) -> BorrowedWaker<'_> {
549 static BORROWED_WAKER_VTABLE: RawWakerVTable =
550 RawWakerVTable::new(waker_clone, waker_wake_by_ref, waker_wake_by_ref, waker_noop);
551 static WAKER_VTABLE: RawWakerVTable =
552 RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
553
554 fn waker_clone(raw_meta: *const ()) -> RawWaker {
555 // SAFETY: We did the reverse cast below.
556 let meta = unsafe { &*(raw_meta as *const Meta) };
557 meta.retain();
558 RawWaker::new(raw_meta, &WAKER_VTABLE)
559 }
560
561 fn waker_wake(raw_meta: *const ()) {
562 // SAFETY: We did the reverse cast below.
563 let meta = unsafe { &*(raw_meta as *const Meta) };
564 if meta.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
565 // This consumes the reference count.
566 meta.scope().executor().task_is_ready(AtomicFutureHandle(
567 // SAFETY: We know raw_meta is not null.
568 unsafe { NonNull::new_unchecked(raw_meta as *mut Meta) },
569 PhantomData,
570 ));
571 } else {
572 meta.release();
573 }
574 }
575
576 fn waker_wake_by_ref(meta: *const ()) {
577 // SAFETY: We did the reverse cast below.
578 let meta = unsafe { &*(meta as *const Meta) };
579 // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
580 unsafe {
581 meta.wake();
582 }
583 }
584
585 fn waker_noop(_meta: *const ()) {}
586
587 fn waker_drop(meta: *const ()) {
588 // SAFETY: We did the reverse cast below.
589 let meta = unsafe { &*(meta as *const Meta) };
590 meta.release();
591 }
592
593 BorrowedWaker(
594 // SAFETY: We meet the contract for RawWaker/RawWakerVtable.
595 unsafe {
596 Waker::from_raw(RawWaker::new(self.0.as_ptr() as *const (), &BORROWED_WAKER_VTABLE))
597 },
598 PhantomData,
599 )
600 }
601
602 /// Wakes the future.
603 pub(crate) fn wake(&self) {
604 // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
605 unsafe {
606 self.meta().wake();
607 }
608 }
609
610 /// Wakes the future with an active guard. Returns true if successful i.e. a guard needs to be
611 /// acquired.
612 ///
613 /// NOTE: `Scope::release_cancel_guard` can be called *before* this function returns because the
614 /// task can be polled on another thread. For this reason, the caller either needs to hold a
615 /// lock, or it should preemptively take the guard.
616 pub(crate) fn wake_with_active_guard(&self) -> bool {
617 // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
618 unsafe { self.meta().wake_with_active_guard() }
619 }
620}
621
622impl<F: Future> Drop for AtomicFuture<F> {
623 fn drop(&mut self) {
624 let meta = &mut self.meta;
625 // This needs to be acquire ordering so that we see writes that might have just happened
626 // in another thread when the future was polled.
627 let state = meta.state.load(Acquire);
628 if state & DONE == 0 {
629 // SAFETY: The state isn't DONE so we must drop the future.
630 unsafe {
631 (meta.vtable.drop_future)(meta.into());
632 }
633 } else if state & RESULT_TAKEN == 0 {
634 // SAFETY: The result hasn't been taken so we must drop the result.
635 unsafe {
636 (meta.vtable.drop_result)(meta.into());
637 }
638 }
639 }
640}
641
642pub struct BorrowedWaker<'a>(std::task::Waker, PhantomData<&'a ()>);
643
644impl Deref for BorrowedWaker<'_> {
645 type Target = Waker;
646
647 fn deref(&self) -> &Self::Target {
648 &self.0
649 }
650}
651
652impl Borrow<usize> for AtomicFutureHandle<'static> {
653 fn borrow(&self) -> &usize {
654 &self.meta().id
655 }
656}
657
658impl Hash for AtomicFutureHandle<'static> {
659 fn hash<H: Hasher>(&self, state: &mut H) {
660 self.meta().id.hash(state);
661 }
662}
663
664impl PartialEq for AtomicFutureHandle<'static> {
665 fn eq(&self, other: &Self) -> bool {
666 self.meta().id == other.meta().id
667 }
668}
669
670impl Eq for AtomicFutureHandle<'static> {}
671
672struct Bomb;
673impl Drop for Bomb {
674 fn drop(&mut self) {
675 std::process::abort();
676 }
677}