fuchsia_async/runtime/fuchsia/executor/
atomic_future.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod hooks;
6pub mod spawnable_future;
7
8use crate::ScopeHandle;
9use futures::ready;
10use std::borrow::Borrow;
11use std::future::Future;
12use std::hash::{Hash, Hasher};
13use std::marker::PhantomData;
14use std::mem::ManuallyDrop;
15use std::ops::Deref;
16use std::pin::Pin;
17use std::ptr::NonNull;
18use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
21
22/// A lock-free thread-safe future.
23//
24// The debugger knows the layout so that async backtraces work, so if this changes the debugger
25// might need to be changed too.
26//
27// This is `repr(C)` so that we can cast between `NonNull<Meta>` and `NonNull<AtomicFuture<F>>`.
28//
29// LINT.IfChange
30#[repr(C)]
31struct AtomicFuture<F: Future> {
32    meta: Meta,
33
34    // `future` is safe to access after successfully clearing the INACTIVE state bit and the `DONE`
35    // state bit isn't set.
36    future: FutureOrResult<F>,
37}
38// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
39
40/// A lock-free thread-safe future. The handles can be cloned.
41pub struct AtomicFutureHandle<'a>(NonNull<Meta>, PhantomData<&'a ()>);
42
43/// `AtomicFutureHandle` is safe to access from multiple threads at once.
44unsafe impl Sync for AtomicFutureHandle<'_> {}
45unsafe impl Send for AtomicFutureHandle<'_> {}
46
47impl Drop for AtomicFutureHandle<'_> {
48    fn drop(&mut self) {
49        self.meta().release();
50    }
51}
52
53impl Clone for AtomicFutureHandle<'_> {
54    fn clone(&self) -> Self {
55        self.meta().retain();
56        Self(self.0, PhantomData)
57    }
58}
59
60struct Meta {
61    vtable: &'static VTable,
62
63    // Holds the reference count and state bits (INACTIVE, READY, etc.).
64    state: AtomicUsize,
65
66    scope: Option<ScopeHandle>,
67    id: usize,
68}
69
70impl Meta {
71    // # Safety
72    //
73    // This mints a handle with the 'static lifetime, so this should only be called from
74    // `AtomicFutureHandle<'static>`.
75    unsafe fn wake(&self) {
76        if self.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
77            self.retain();
78            self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
79        }
80    }
81
82    // Returns true if a guard should be acquired.
83    //
84    // # Safety
85    //
86    // This mints a handle with the 'static lifetime, so this should only be called from
87    // `AtomicFutureHandle<'static>`.
88    unsafe fn wake_with_active_guard(&self) -> bool {
89        let old = self.state.fetch_or(READY | WITH_ACTIVE_GUARD, Relaxed);
90        if old & (INACTIVE | READY | DONE) == INACTIVE {
91            self.retain();
92            self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
93        }
94
95        // If the task is DONE, the guard won't be released, so we must let the caller know.
96        old & (DONE | WITH_ACTIVE_GUARD) == 0
97    }
98
99    fn scope(&self) -> &ScopeHandle {
100        self.scope.as_ref().unwrap()
101    }
102
103    fn retain(&self) {
104        let old = self.state.fetch_add(1, Relaxed) & REF_COUNT_MASK;
105        assert!(old != REF_COUNT_MASK);
106    }
107
108    fn release(&self) {
109        // This can be Relaxed because there is a barrier in the drop function.
110        let old = self.state.fetch_sub(1, Relaxed) & REF_COUNT_MASK;
111        if old == 1 {
112            // SAFETY: This is safe because we just released the last reference.
113            unsafe {
114                (self.vtable.drop)(self.into());
115            }
116        } else {
117            // Check for underflow.
118            assert!(old > 0);
119        }
120    }
121
122    // # Safety
123    //
124    // The caller must know that the future has completed.
125    unsafe fn drop_result(&self, ordering: Ordering) {
126        // It's possible for this to race with another thread so we only drop the result if we are
127        // successful in setting the RESULT_TAKEN bit.
128        if self.state.fetch_or(RESULT_TAKEN, ordering) & RESULT_TAKEN == 0 {
129            unsafe { (self.vtable.drop_result)(self.into()) };
130        }
131    }
132}
133
134struct VTable {
135    /// Drops the atomic future.
136    ///
137    /// # Safety
138    ///
139    /// The caller must ensure there are no other references i.e. the reference count should be
140    /// zero.
141    // zxdb uses this method to figure out the concrete type of the future.
142    // LINT.IfChange
143    drop: unsafe fn(NonNull<Meta>),
144    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
145    /// Drops the future.
146    ///
147    /// # Safety
148    ///
149    /// The caller must ensure the future hasn't been dropped.
150    drop_future: unsafe fn(NonNull<Meta>),
151    /// Polls the future.
152    ///
153    /// # Safety
154    ///
155    /// The caller must ensure the future hasn't been dropped and has exclusive access.
156    poll: unsafe fn(NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()>,
157
158    /// Gets the result.
159    ///
160    /// # Safety
161    ///
162    /// The caller must ensure the future is finished and the result hasn't been taken or dropped.
163    get_result: unsafe fn(NonNull<Meta>) -> *const (),
164
165    /// Drops the result.
166    ///
167    /// # Safety
168    ///
169    /// The caller must ensure the future is finished and the result hasn't already been taken or
170    /// dropped.
171    drop_result: unsafe fn(NonNull<Meta>),
172}
173
174union FutureOrResult<F: Future> {
175    future: ManuallyDrop<F>,
176    result: ManuallyDrop<F::Output>,
177}
178
179impl<F: Future> AtomicFuture<F> {
180    const VTABLE: VTable = VTable {
181        drop: Self::drop,
182        drop_future: Self::drop_future,
183        poll: Self::poll,
184        get_result: Self::get_result,
185        drop_result: Self::drop_result,
186    };
187
188    unsafe fn drop(meta: NonNull<Meta>) {
189        drop(unsafe { Box::from_raw(meta.cast::<Self>().as_mut()) });
190    }
191
192    unsafe fn poll(meta: NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()> {
193        let future = &mut unsafe { meta.cast::<Self>().as_mut() }.future;
194        let result = ready!(unsafe { Pin::new_unchecked(&mut *future.future) }.poll(cx));
195        // This might panic which will leave ourselves in a bad state. We deal with this by
196        // aborting (see below).
197        unsafe { ManuallyDrop::drop(&mut future.future) };
198        future.result = ManuallyDrop::new(result);
199        Poll::Ready(())
200    }
201
202    unsafe fn drop_future(meta: NonNull<Meta>) {
203        unsafe { ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.future) };
204    }
205
206    unsafe fn get_result(meta: NonNull<Meta>) -> *const () {
207        unsafe { &*meta.cast::<Self>().as_mut().future.result as *const F::Output as *const () }
208    }
209
210    unsafe fn drop_result(meta: NonNull<Meta>) {
211        unsafe { ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.result) };
212    }
213}
214
215/// State Bits
216//
217// Exclusive access is gained by clearing this bit.
218const INACTIVE: usize = 1 << 63;
219
220// Set to indicate the future needs to be polled again.
221const READY: usize = 1 << 62;
222
223// Terminal state: the future is dropped upon entry to this state. When in this state, other bits
224// can be set, including READY (which has no meaning).
225const DONE: usize = 1 << 61;
226
227// The task has been detached.
228const DETACHED: usize = 1 << 60;
229
230// The task has been cancelled.
231const ABORTED: usize = 1 << 59;
232
233// The task has an active guard that should be dropped when the task is next polled.
234const WITH_ACTIVE_GUARD: usize = 1 << 58;
235
236// The result has been taken.
237const RESULT_TAKEN: usize = 1 << 57;
238
239// The mask for the ref count.
240const REF_COUNT_MASK: usize = RESULT_TAKEN - 1;
241
242/// The result of a call to `try_poll`.
243/// This indicates the result of attempting to `poll` the future.
244pub enum AttemptPollResult {
245    /// The future was polled, but did not complete.
246    Pending,
247    /// The future was polled and finished by this thread.
248    /// This result is normally used to trigger garbage-collection of the future.
249    IFinished,
250    /// The future was already completed by another thread.
251    SomeoneElseFinished,
252    /// The future was polled, did not complete, but it is woken whilst it is polled so it
253    /// should be polled again.
254    Yield,
255    /// The future was aborted.
256    Aborted,
257}
258
259/// The result of calling the `abort_and_detach` function.
260#[must_use]
261pub enum AbortAndDetachResult {
262    /// The future has finished; it can be dropped.
263    Done,
264
265    /// The future needs to be added to a run queue to be aborted.
266    AddToRunQueue,
267
268    /// The future is soon to be aborted and nothing needs to be done.
269    Pending,
270}
271
272impl<'a> AtomicFutureHandle<'a> {
273    /// Create a new `AtomicFuture`.
274    pub(crate) fn new<F: Future + Send + 'a>(
275        scope: Option<ScopeHandle>,
276        id: usize,
277        future: F,
278    ) -> Self
279    where
280        F::Output: Send + 'a,
281    {
282        // SAFETY: This is safe because the future and output are both Send.
283        unsafe { Self::new_local(scope, id, future) }
284    }
285
286    /// Create a new `AtomicFuture` from a !Send future.
287    ///
288    /// # Safety
289    ///
290    /// The caller must uphold the Send requirements.
291    pub(crate) unsafe fn new_local<F: Future + 'a>(
292        scope: Option<ScopeHandle>,
293        id: usize,
294        future: F,
295    ) -> Self
296    where
297        F::Output: 'a,
298    {
299        Self(
300            unsafe {
301                NonNull::new_unchecked(Box::into_raw(Box::new(AtomicFuture {
302                    meta: Meta {
303                        vtable: &AtomicFuture::<F>::VTABLE,
304                        // The future is inactive and we start with a single reference.
305                        state: AtomicUsize::new(1 | INACTIVE),
306                        scope,
307                        id,
308                    },
309                    future: FutureOrResult { future: ManuallyDrop::new(future) },
310                })))
311            }
312            .cast::<Meta>(),
313            PhantomData,
314        )
315    }
316
317    fn meta(&self) -> &Meta {
318        // SAFETY: This is safe because we hold a reference count.
319        unsafe { self.0.as_ref() }
320    }
321
322    /// Returns the future's ID.
323    pub fn id(&self) -> usize {
324        self.meta().id
325    }
326
327    /// Returns the associated scope.
328    pub fn scope(&self) -> &ScopeHandle {
329        self.meta().scope()
330    }
331
332    /// Attempt to poll the underlying future.
333    ///
334    /// `try_poll` ensures that the future is polled at least once more
335    /// unless it has already finished.
336    pub(crate) fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult {
337        let meta = self.meta();
338        let has_active_guard = loop {
339            // Attempt to acquire sole responsibility for polling the future (by clearing the
340            // INACTIVE bit) and also clear the READY and WITH_ACTIVE_GUARD bits at the same time.
341            // We clear both so that we can track if they are set again whilst we are polling.
342            let old = meta.state.fetch_and(!(INACTIVE | READY | WITH_ACTIVE_GUARD), Acquire);
343            assert_ne!(old & REF_COUNT_MASK, 0);
344            if old & DONE != 0 {
345                // If the DONE bit is set, the WITH_ACTIVE_GUARD bit should be ignored; it may or
346                // may not be set, but it doesn't reflect whether an active guard is held so even
347                // though we just cleared it, we shouldn't release a guard here.
348                return AttemptPollResult::SomeoneElseFinished;
349            }
350            let has_active_guard = old & WITH_ACTIVE_GUARD != 0;
351            if old & INACTIVE != 0 {
352                // We are now the (only) active worker, proceed to poll...
353                if old & ABORTED != 0 {
354                    if has_active_guard {
355                        meta.scope().release_cancel_guard();
356                    }
357                    // The future was aborted.
358                    // SAFETY: We have exclusive access.
359                    unsafe {
360                        self.drop_future_unchecked();
361                    }
362                    return AttemptPollResult::Aborted;
363                }
364                break has_active_guard;
365            }
366            // Future was already active; this shouldn't really happen because we shouldn't be
367            // polling it from multiple threads at the same time. Still, we handle it by setting
368            // the READY bit so that it gets polled again. We do this regardless of whether we
369            // cleared the READY bit above.
370            let old2 = meta.state.fetch_or(READY | (old & WITH_ACTIVE_GUARD), Relaxed);
371
372            if old2 & DONE != 0 {
373                // If `has_active_guard` is true, we are responsible for releasing a guard since it
374                // means we cleared the `WITH_ACTIVE_GUARD` bit.
375                if has_active_guard {
376                    meta.scope().release_cancel_guard();
377                }
378                return AttemptPollResult::SomeoneElseFinished;
379            }
380
381            if has_active_guard && old2 & WITH_ACTIVE_GUARD != 0 {
382                // Within the small window, something else gave this task an active guard, so we
383                // must return one of them.
384                meta.scope().release_cancel_guard();
385            }
386
387            // If the future is still active, or the future was already marked as ready, we can
388            // just return and it will get polled again.
389            if old2 & INACTIVE == 0 || old2 & READY != 0 {
390                return AttemptPollResult::Pending;
391            }
392            // The worker finished, and we marked the future as ready, so we must try again because
393            // the future won't be in a run queue.
394        };
395
396        // We cannot recover from panics.
397        let bomb = Bomb;
398
399        // SAFETY: We have exclusive access because we cleared the INACTIVE state bit.
400        let result = unsafe { (meta.vtable.poll)(meta.into(), cx) };
401
402        std::mem::forget(bomb);
403
404        if has_active_guard {
405            meta.scope().release_cancel_guard();
406        }
407
408        if let Poll::Ready(()) = result {
409            // The future will have been dropped, so we just need to set the state.
410            //
411            // This needs to be Release ordering because we need to synchronize with another thread
412            // that takes or drops the result.
413            let old = meta.state.fetch_or(DONE, Release);
414
415            if old & WITH_ACTIVE_GUARD != 0 {
416                // Whilst we were polling the task, it was given an active guard. We must return it
417                // now.
418                meta.scope().release_cancel_guard();
419            }
420
421            if old & DETACHED != 0 {
422                // If the future is detached, we should eagerly drop the result. This can be
423                // Relaxed ordering because the result was written by this thread.
424
425                // SAFETY: The future has completed.
426                unsafe {
427                    meta.drop_result(Relaxed);
428                }
429            }
430            // No one else will read `future` unless they see `INACTIVE`, which will never
431            // happen again.
432            AttemptPollResult::IFinished
433        } else if meta.state.fetch_or(INACTIVE, Release) & READY == 0 {
434            AttemptPollResult::Pending
435        } else {
436            // The future was marked ready whilst we were polling, so yield.
437            AttemptPollResult::Yield
438        }
439    }
440
441    /// Drops the future without checking its current state.
442    ///
443    /// # Panics
444    ///
445    /// This will panic if the future is already marked with `DONE`.
446    ///
447    /// # Safety
448    ///
449    /// This doesn't check the current state, so this must only be called if it is known that there
450    /// is no concurrent access. This also does *not* include any memory barriers before dropping
451    /// the future.
452    pub(crate) unsafe fn drop_future_unchecked(&self) {
453        // Set the state first in case we panic when we drop.
454        let meta = self.meta();
455        let old = meta.state.fetch_or(DONE | RESULT_TAKEN, Relaxed);
456        assert_eq!(old & DONE, 0);
457        if old & WITH_ACTIVE_GUARD != 0 {
458            meta.scope().release_cancel_guard();
459        }
460        unsafe { (meta.vtable.drop_future)(meta.into()) };
461    }
462
463    /// Drops the future if it is not currently being polled. Returns success if the future was
464    /// dropped or was already dropped.
465    pub(crate) fn try_drop(&self) -> Result<(), ()> {
466        let old = self.meta().state.fetch_and(!INACTIVE, Acquire);
467        if old & DONE != 0 {
468            Ok(())
469        } else if old & INACTIVE != 0 {
470            // SAFETY: We have exclusive access.
471            unsafe {
472                self.drop_future_unchecked();
473            }
474            Ok(())
475        } else {
476            Err(())
477        }
478    }
479
480    /// Aborts the task. Returns true if the task needs to be added to a run queue.
481    #[must_use]
482    pub(crate) fn abort(&self) -> bool {
483        self.meta().state.fetch_or(ABORTED | READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE
484    }
485
486    /// Marks the task as detached.
487    pub(crate) fn detach(&self) {
488        let meta = self.meta();
489        let old = meta.state.fetch_or(DETACHED, Relaxed);
490
491        if old & (DONE | RESULT_TAKEN) == DONE {
492            // If the future is done, we should eagerly drop the result. This needs to be acquire
493            // ordering because another thread might have written the result.
494
495            // SAFETY: The future has completed.
496            unsafe {
497                meta.drop_result(Acquire);
498            }
499        }
500    }
501
502    /// Marks the task as aborted and detached (for when the caller isn't interested in waiting
503    /// for the cancellation to be finished). Returns true if the task should be added to a run
504    /// queue.
505    pub(crate) fn abort_and_detach(&self) -> AbortAndDetachResult {
506        let meta = self.meta();
507        let old_state = meta.state.fetch_or(ABORTED | DETACHED | READY, Relaxed);
508        if old_state & DONE != 0 {
509            // If the future is done, we should eagerly drop the result. This needs to be acquire
510            // ordering because another thread might have written the result.
511
512            // SAFETY: The future has completed.
513            unsafe {
514                meta.drop_result(Acquire);
515            }
516
517            AbortAndDetachResult::Done
518        } else if old_state & (INACTIVE | READY) == INACTIVE {
519            AbortAndDetachResult::AddToRunQueue
520        } else {
521            AbortAndDetachResult::Pending
522        }
523    }
524
525    /// Returns true if the task is detached.
526    pub(crate) fn is_detached(&self) -> bool {
527        self.meta().state.load(Relaxed) & DETACHED != 0
528    }
529
530    /// Takes the result.
531    ///
532    /// # Safety
533    ///
534    /// The caller must guarantee that `R` is the correct type.
535    pub(crate) unsafe fn take_result<R>(&self) -> Option<R> {
536        // This needs to be Acquire ordering to synchronize with the polling thread.
537        let meta = self.meta();
538        if meta.state.load(Relaxed) & (DONE | RESULT_TAKEN) == DONE
539            && meta.state.fetch_or(RESULT_TAKEN, Acquire) & RESULT_TAKEN == 0
540        {
541            Some(unsafe { ((meta.vtable.get_result)(meta.into()) as *const R).read() })
542        } else {
543            None
544        }
545    }
546}
547
548impl AtomicFutureHandle<'static> {
549    /// Returns a waker for the future.
550    pub(crate) fn waker(&self) -> BorrowedWaker<'_> {
551        static BORROWED_WAKER_VTABLE: RawWakerVTable =
552            RawWakerVTable::new(waker_clone, waker_wake_by_ref, waker_wake_by_ref, waker_noop);
553        static WAKER_VTABLE: RawWakerVTable =
554            RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
555
556        fn waker_clone(raw_meta: *const ()) -> RawWaker {
557            // SAFETY: We did the reverse cast below.
558            let meta = unsafe { &*(raw_meta as *const Meta) };
559            meta.retain();
560            RawWaker::new(raw_meta, &WAKER_VTABLE)
561        }
562
563        fn waker_wake(raw_meta: *const ()) {
564            // SAFETY: We did the reverse cast below.
565            let meta = unsafe { &*(raw_meta as *const Meta) };
566            if meta.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
567                // This consumes the reference count.
568                meta.scope().executor().task_is_ready(AtomicFutureHandle(
569                    // SAFETY: We know raw_meta is not null.
570                    unsafe { NonNull::new_unchecked(raw_meta as *mut Meta) },
571                    PhantomData,
572                ));
573            } else {
574                meta.release();
575            }
576        }
577
578        fn waker_wake_by_ref(meta: *const ()) {
579            // SAFETY: We did the reverse cast below.
580            let meta = unsafe { &*(meta as *const Meta) };
581            // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
582            unsafe {
583                meta.wake();
584            }
585        }
586
587        fn waker_noop(_meta: *const ()) {}
588
589        fn waker_drop(meta: *const ()) {
590            // SAFETY: We did the reverse cast below.
591            let meta = unsafe { &*(meta as *const Meta) };
592            meta.release();
593        }
594
595        BorrowedWaker(
596            // SAFETY: We meet the contract for RawWaker/RawWakerVtable.
597            unsafe {
598                Waker::from_raw(RawWaker::new(self.0.as_ptr() as *const (), &BORROWED_WAKER_VTABLE))
599            },
600            PhantomData,
601        )
602    }
603
604    /// Wakes the future.
605    pub(crate) fn wake(&self) {
606        // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
607        unsafe {
608            self.meta().wake();
609        }
610    }
611
612    /// Wakes the future with an active guard. Returns true if successful i.e. a guard needs to be
613    /// acquired.
614    ///
615    /// NOTE: `Scope::release_cancel_guard` can be called *before* this function returns because the
616    /// task can be polled on another thread. For this reason, the caller either needs to hold a
617    /// lock, or it should preemptively take the guard.
618    pub(crate) fn wake_with_active_guard(&self) -> bool {
619        // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
620        unsafe { self.meta().wake_with_active_guard() }
621    }
622}
623
624impl<F: Future> Drop for AtomicFuture<F> {
625    fn drop(&mut self) {
626        let meta = &mut self.meta;
627        // This needs to be acquire ordering so that we see writes that might have just happened
628        // in another thread when the future was polled.
629        let state = meta.state.load(Acquire);
630        if state & DONE == 0 {
631            // SAFETY: The state isn't DONE so we must drop the future.
632            unsafe {
633                (meta.vtable.drop_future)(meta.into());
634            }
635        } else if state & RESULT_TAKEN == 0 {
636            // SAFETY: The result hasn't been taken so we must drop the result.
637            unsafe {
638                (meta.vtable.drop_result)(meta.into());
639            }
640        }
641    }
642}
643
644pub struct BorrowedWaker<'a>(std::task::Waker, PhantomData<&'a ()>);
645
646impl Deref for BorrowedWaker<'_> {
647    type Target = Waker;
648
649    fn deref(&self) -> &Self::Target {
650        &self.0
651    }
652}
653
654impl Borrow<usize> for AtomicFutureHandle<'static> {
655    fn borrow(&self) -> &usize {
656        &self.meta().id
657    }
658}
659
660impl Hash for AtomicFutureHandle<'static> {
661    fn hash<H: Hasher>(&self, state: &mut H) {
662        self.meta().id.hash(state);
663    }
664}
665
666impl PartialEq for AtomicFutureHandle<'static> {
667    fn eq(&self, other: &Self) -> bool {
668        self.meta().id == other.meta().id
669    }
670}
671
672impl Eq for AtomicFutureHandle<'static> {}
673
674struct Bomb;
675impl Drop for Bomb {
676    fn drop(&mut self) {
677        std::process::abort();
678    }
679}