fuchsia_async/runtime/fuchsia/executor/
atomic_future.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod spawnable_future;
6
7use crate::ScopeHandle;
8use futures::ready;
9use std::borrow::Borrow;
10use std::future::Future;
11use std::hash::{Hash, Hasher};
12use std::marker::PhantomData;
13use std::mem::ManuallyDrop;
14use std::ops::Deref;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
20
21/// A lock-free thread-safe future.
22//
23// The debugger knows the layout so that async backtraces work, so if this changes the debugger
24// might need to be changed too.
25//
26// This is `repr(C)` so that we can cast between `NonNull<Meta>` and `NonNull<AtomicFuture<F>>`.
27//
28// LINT.IfChange
29#[repr(C)]
30struct AtomicFuture<F: Future> {
31    meta: Meta,
32
33    // `future` is safe to access after successfully clearing the INACTIVE state bit and the `DONE`
34    // state bit isn't set.
35    future: FutureOrResult<F>,
36}
37// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
38
39/// A lock-free thread-safe future. The handles can be cloned.
40pub struct AtomicFutureHandle<'a>(NonNull<Meta>, PhantomData<&'a ()>);
41
42/// `AtomicFutureHandle` is safe to access from multiple threads at once.
43unsafe impl Sync for AtomicFutureHandle<'_> {}
44unsafe impl Send for AtomicFutureHandle<'_> {}
45
46impl Drop for AtomicFutureHandle<'_> {
47    fn drop(&mut self) {
48        self.meta().release();
49    }
50}
51
52impl Clone for AtomicFutureHandle<'_> {
53    fn clone(&self) -> Self {
54        self.meta().retain();
55        Self(self.0, PhantomData)
56    }
57}
58
59struct Meta {
60    vtable: &'static VTable,
61
62    // Holds the reference count and state bits (INACTIVE, READY, etc.).
63    state: AtomicUsize,
64
65    scope: Option<ScopeHandle>,
66    id: usize,
67}
68
69impl Meta {
70    // # Safety
71    //
72    // This mints a handle with the 'static lifetime, so this should only be called from
73    // `AtomicFutureHandle<'static>`.
74    unsafe fn wake(&self) {
75        if self.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
76            self.retain();
77            self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
78        }
79    }
80
81    // Returns true if a guard should be acquired.
82    //
83    // # Safety
84    //
85    // This mints a handle with the 'static lifetime, so this should only be called from
86    // `AtomicFutureHandle<'static>`.
87    unsafe fn wake_with_active_guard(&self) -> bool {
88        let old = self.state.fetch_or(READY | WITH_ACTIVE_GUARD, Relaxed);
89        if old & (INACTIVE | READY | DONE) == INACTIVE {
90            self.retain();
91            self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
92        }
93
94        // If the task is DONE, the guard won't be released, so we must let the caller know.
95        old & (DONE | WITH_ACTIVE_GUARD) == 0
96    }
97
98    fn scope(&self) -> &ScopeHandle {
99        self.scope.as_ref().unwrap()
100    }
101
102    fn retain(&self) {
103        let old = self.state.fetch_add(1, Relaxed) & REF_COUNT_MASK;
104        assert!(old != REF_COUNT_MASK);
105    }
106
107    fn release(&self) {
108        // This can be Relaxed because there is a barrier in the drop function.
109        let old = self.state.fetch_sub(1, Relaxed) & REF_COUNT_MASK;
110        if old == 1 {
111            // SAFETY: This is safe because we just released the last reference.
112            unsafe {
113                (self.vtable.drop)(self.into());
114            }
115        } else {
116            // Check for underflow.
117            assert!(old > 0);
118        }
119    }
120
121    // # Safety
122    //
123    // The caller must know that the future has completed.
124    unsafe fn drop_result(&self, ordering: Ordering) {
125        // It's possible for this to race with another thread so we only drop the result if we are
126        // successful in setting the RESULT_TAKEN bit.
127        if self.state.fetch_or(RESULT_TAKEN, ordering) & RESULT_TAKEN == 0 {
128            (self.vtable.drop_result)(self.into());
129        }
130    }
131}
132
133struct VTable {
134    /// Drops the atomic future.
135    ///
136    /// # Safety
137    ///
138    /// The caller must ensure there are no other references i.e. the reference count should be
139    /// zero.
140    // zxdb uses this method to figure out the concrete type of the future.
141    // LINT.IfChange
142    drop: unsafe fn(NonNull<Meta>),
143    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
144    /// Drops the future.
145    ///
146    /// # Safety
147    ///
148    /// The caller must ensure the future hasn't been dropped.
149    drop_future: unsafe fn(NonNull<Meta>),
150    /// Polls the future.
151    ///
152    /// # Safety
153    ///
154    /// The caller must ensure the future hasn't been dropped and has exclusive access.
155    poll: unsafe fn(NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()>,
156
157    /// Gets the result.
158    ///
159    /// # Safety
160    ///
161    /// The caller must ensure the future is finished and the result hasn't been taken or dropped.
162    get_result: unsafe fn(NonNull<Meta>) -> *const (),
163
164    /// Drops the result.
165    ///
166    /// # Safety
167    ///
168    /// The caller must ensure the future is finished and the result hasn't already been taken or
169    /// dropped.
170    drop_result: unsafe fn(NonNull<Meta>),
171}
172
173union FutureOrResult<F: Future> {
174    future: ManuallyDrop<F>,
175    result: ManuallyDrop<F::Output>,
176}
177
178impl<F: Future> AtomicFuture<F> {
179    const VTABLE: VTable = VTable {
180        drop: Self::drop,
181        drop_future: Self::drop_future,
182        poll: Self::poll,
183        get_result: Self::get_result,
184        drop_result: Self::drop_result,
185    };
186
187    unsafe fn drop(meta: NonNull<Meta>) {
188        drop(Box::from_raw(meta.cast::<Self>().as_mut()));
189    }
190
191    unsafe fn poll(meta: NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()> {
192        let future = &mut meta.cast::<Self>().as_mut().future;
193        let result = ready!(Pin::new_unchecked(&mut *future.future).poll(cx));
194        // This might panic which will leave ourselves in a bad state. We deal with this by
195        // aborting (see below).
196        ManuallyDrop::drop(&mut future.future);
197        future.result = ManuallyDrop::new(result);
198        Poll::Ready(())
199    }
200
201    unsafe fn drop_future(meta: NonNull<Meta>) {
202        ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.future);
203    }
204
205    unsafe fn get_result(meta: NonNull<Meta>) -> *const () {
206        &*meta.cast::<Self>().as_mut().future.result as *const F::Output as *const ()
207    }
208
209    unsafe fn drop_result(meta: NonNull<Meta>) {
210        ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.result);
211    }
212}
213
214/// State Bits
215//
216// Exclusive access is gained by clearing this bit.
217const INACTIVE: usize = 1 << 63;
218
219// Set to indicate the future needs to be polled again.
220const READY: usize = 1 << 62;
221
222// Terminal state: the future is dropped upon entry to this state. When in this state, other bits
223// can be set, including READY (which has no meaning).
224const DONE: usize = 1 << 61;
225
226// The task has been detached.
227const DETACHED: usize = 1 << 60;
228
229// The task has been cancelled.
230const ABORTED: usize = 1 << 59;
231
232// The task has an active guard that should be dropped when the task is next polled.
233const WITH_ACTIVE_GUARD: usize = 1 << 58;
234
235// The result has been taken.
236const RESULT_TAKEN: usize = 1 << 57;
237
238// The mask for the ref count.
239const REF_COUNT_MASK: usize = RESULT_TAKEN - 1;
240
241/// The result of a call to `try_poll`.
242/// This indicates the result of attempting to `poll` the future.
243pub enum AttemptPollResult {
244    /// The future was polled, but did not complete.
245    Pending,
246    /// The future was polled and finished by this thread.
247    /// This result is normally used to trigger garbage-collection of the future.
248    IFinished,
249    /// The future was already completed by another thread.
250    SomeoneElseFinished,
251    /// The future was polled, did not complete, but it is woken whilst it is polled so it
252    /// should be polled again.
253    Yield,
254    /// The future was aborted.
255    Aborted,
256}
257
258/// The result of calling the `abort_and_detach` function.
259#[must_use]
260pub enum AbortAndDetachResult {
261    /// The future has finished; it can be dropped.
262    Done,
263
264    /// The future needs to be added to a run queue to be aborted.
265    AddToRunQueue,
266
267    /// The future is soon to be aborted and nothing needs to be done.
268    Pending,
269}
270
271impl<'a> AtomicFutureHandle<'a> {
272    /// Create a new `AtomicFuture`.
273    pub fn new<F: Future + Send + 'a>(scope: Option<ScopeHandle>, id: usize, future: F) -> Self
274    where
275        F::Output: Send + 'a,
276    {
277        // SAFETY: This is safe because the future and output are both Send.
278        unsafe { Self::new_local(scope, id, future) }
279    }
280
281    /// Create a new `AtomicFuture` from a !Send future.
282    ///
283    /// # Safety
284    ///
285    /// The caller must uphold the Send requirements.
286    pub unsafe fn new_local<F: Future + 'a>(
287        scope: Option<ScopeHandle>,
288        id: usize,
289        future: F,
290    ) -> Self
291    where
292        F::Output: 'a,
293    {
294        Self(
295            NonNull::new_unchecked(Box::into_raw(Box::new(AtomicFuture {
296                meta: Meta {
297                    vtable: &AtomicFuture::<F>::VTABLE,
298                    // The future is inactive and we start with a single reference.
299                    state: AtomicUsize::new(1 | INACTIVE),
300                    scope,
301                    id,
302                },
303                future: FutureOrResult { future: ManuallyDrop::new(future) },
304            })))
305            .cast::<Meta>(),
306            PhantomData,
307        )
308    }
309
310    fn meta(&self) -> &Meta {
311        // SAFETY: This is safe because we hold a reference count.
312        unsafe { self.0.as_ref() }
313    }
314
315    /// Returns the future's ID.
316    pub fn id(&self) -> usize {
317        self.meta().id
318    }
319
320    /// Returns the associated scope.
321    pub fn scope(&self) -> &ScopeHandle {
322        self.meta().scope()
323    }
324
325    /// Attempt to poll the underlying future.
326    ///
327    /// `try_poll` ensures that the future is polled at least once more
328    /// unless it has already finished.
329    pub fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult {
330        let meta = self.meta();
331        let has_active_guard = loop {
332            // Attempt to acquire sole responsibility for polling the future (by clearing the
333            // INACTIVE bit) and also clear the READY and WITH_ACTIVE_GUARD bits at the same time.
334            // We clear both so that we can track if they are set again whilst we are polling.
335            let old = meta.state.fetch_and(!(INACTIVE | READY | WITH_ACTIVE_GUARD), Acquire);
336            assert_ne!(old & REF_COUNT_MASK, 0);
337            if old & DONE != 0 {
338                // If the DONE bit is set, the WITH_ACTIVE_GUARD bit should be ignored; it may or
339                // may not be set, but it doesn't reflect whether an active guard is held so even
340                // though we just cleared it, we shouldn't release a guard here.
341                return AttemptPollResult::SomeoneElseFinished;
342            }
343            let has_active_guard = old & WITH_ACTIVE_GUARD != 0;
344            if old & INACTIVE != 0 {
345                // We are now the (only) active worker, proceed to poll...
346                if old & ABORTED != 0 {
347                    if has_active_guard {
348                        meta.scope().release_cancel_guard();
349                    }
350                    // The future was aborted.
351                    // SAFETY: We have exclusive access.
352                    unsafe {
353                        self.drop_future_unchecked();
354                    }
355                    return AttemptPollResult::Aborted;
356                }
357                break has_active_guard;
358            }
359            // Future was already active; this shouldn't really happen because we shouldn't be
360            // polling it from multiple threads at the same time. Still, we handle it by setting
361            // the READY bit so that it gets polled again. We do this regardless of whether we
362            // cleared the READY bit above.
363            let old2 = meta.state.fetch_or(READY | (old & WITH_ACTIVE_GUARD), Relaxed);
364
365            if old2 & DONE != 0 {
366                // If `has_active_guard` is true, we are responsible for releasing a guard since it
367                // means we cleared the `WITH_ACTIVE_GUARD` bit.
368                if has_active_guard {
369                    meta.scope().release_cancel_guard();
370                }
371                return AttemptPollResult::SomeoneElseFinished;
372            }
373
374            if has_active_guard && old2 & WITH_ACTIVE_GUARD != 0 {
375                // Within the small window, something else gave this task an active guard, so we
376                // must return one of them.
377                meta.scope().release_cancel_guard();
378            }
379
380            // If the future is still active, or the future was already marked as ready, we can
381            // just return and it will get polled again.
382            if old2 & INACTIVE == 0 || old2 & READY != 0 {
383                return AttemptPollResult::Pending;
384            }
385            // The worker finished, and we marked the future as ready, so we must try again because
386            // the future won't be in a run queue.
387        };
388
389        // We cannot recover from panics.
390        let bomb = Bomb;
391
392        // SAFETY: We have exclusive access because we cleared the INACTIVE state bit.
393        let result = unsafe { (meta.vtable.poll)(meta.into(), cx) };
394
395        std::mem::forget(bomb);
396
397        if has_active_guard {
398            meta.scope().release_cancel_guard();
399        }
400
401        if let Poll::Ready(()) = result {
402            // The future will have been dropped, so we just need to set the state.
403            //
404            // This needs to be Release ordering because we need to synchronize with another thread
405            // that takes or drops the result.
406            let old = meta.state.fetch_or(DONE, Release);
407
408            if old & WITH_ACTIVE_GUARD != 0 {
409                // Whilst we were polling the task, it was given an active guard. We must return it
410                // now.
411                meta.scope().release_cancel_guard();
412            }
413
414            if old & DETACHED != 0 {
415                // If the future is detached, we should eagerly drop the result. This can be
416                // Relaxed ordering because the result was written by this thread.
417
418                // SAFETY: The future has completed.
419                unsafe {
420                    meta.drop_result(Relaxed);
421                }
422            }
423            // No one else will read `future` unless they see `INACTIVE`, which will never
424            // happen again.
425            AttemptPollResult::IFinished
426        } else if meta.state.fetch_or(INACTIVE, Release) & READY == 0 {
427            AttemptPollResult::Pending
428        } else {
429            // The future was marked ready whilst we were polling, so yield.
430            AttemptPollResult::Yield
431        }
432    }
433
434    /// Drops the future without checking its current state.
435    ///
436    /// # Panics
437    ///
438    /// This will panic if the future is already marked with `DONE`.
439    ///
440    /// # Safety
441    ///
442    /// This doesn't check the current state, so this must only be called if it is known that there
443    /// is no concurrent access. This also does *not* include any memory barriers before dropping
444    /// the future.
445    pub unsafe fn drop_future_unchecked(&self) {
446        // Set the state first in case we panic when we drop.
447        let meta = self.meta();
448        let old = meta.state.fetch_or(DONE | RESULT_TAKEN, Relaxed);
449        assert_eq!(old & DONE, 0);
450        if old & WITH_ACTIVE_GUARD != 0 {
451            meta.scope().release_cancel_guard();
452        }
453        (meta.vtable.drop_future)(meta.into());
454    }
455
456    /// Drops the future if it is not currently being polled. Returns success if the future was
457    /// dropped or was already dropped.
458    pub fn try_drop(&self) -> Result<(), ()> {
459        let old = self.meta().state.fetch_and(!INACTIVE, Acquire);
460        if old & DONE != 0 {
461            Ok(())
462        } else if old & INACTIVE != 0 {
463            // SAFETY: We have exclusive access.
464            unsafe {
465                self.drop_future_unchecked();
466            }
467            Ok(())
468        } else {
469            Err(())
470        }
471    }
472
473    /// Aborts the task. Returns true if the task needs to be added to a run queue.
474    #[must_use]
475    pub fn abort(&self) -> bool {
476        self.meta().state.fetch_or(ABORTED | READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE
477    }
478
479    /// Marks the task as detached.
480    pub fn detach(&self) {
481        let meta = self.meta();
482        let old = meta.state.fetch_or(DETACHED, Relaxed);
483
484        if old & (DONE | RESULT_TAKEN) == DONE {
485            // If the future is done, we should eagerly drop the result. This needs to be acquire
486            // ordering because another thread might have written the result.
487
488            // SAFETY: The future has completed.
489            unsafe {
490                meta.drop_result(Acquire);
491            }
492        }
493    }
494
495    /// Marks the task as aborted and detached (for when the caller isn't interested in waiting
496    /// for the cancellation to be finished). Returns true if the task should be added to a run
497    /// queue.
498    pub fn abort_and_detach(&self) -> AbortAndDetachResult {
499        let meta = self.meta();
500        let old_state = meta.state.fetch_or(ABORTED | DETACHED | READY, Relaxed);
501        if old_state & DONE != 0 {
502            // If the future is done, we should eagerly drop the result. This needs to be acquire
503            // ordering because another thread might have written the result.
504
505            // SAFETY: The future has completed.
506            unsafe {
507                meta.drop_result(Acquire);
508            }
509
510            AbortAndDetachResult::Done
511        } else if old_state & (INACTIVE | READY) == INACTIVE {
512            AbortAndDetachResult::AddToRunQueue
513        } else {
514            AbortAndDetachResult::Pending
515        }
516    }
517
518    /// Returns true if the task is detached.
519    pub fn is_detached(&self) -> bool {
520        self.meta().state.load(Relaxed) & DETACHED != 0
521    }
522
523    /// Takes the result.
524    ///
525    /// # Safety
526    ///
527    /// The caller must guarantee that `R` is the correct type.
528    pub unsafe fn take_result<R>(&self) -> Option<R> {
529        // This needs to be Acquire ordering to synchronize with the polling thread.
530        let meta = self.meta();
531        if meta.state.load(Relaxed) & (DONE | RESULT_TAKEN) == DONE
532            && meta.state.fetch_or(RESULT_TAKEN, Acquire) & RESULT_TAKEN == 0
533        {
534            Some(((meta.vtable.get_result)(meta.into()) as *const R).read())
535        } else {
536            None
537        }
538    }
539}
540
541impl AtomicFutureHandle<'static> {
542    /// Returns a waker for the future.
543    pub fn waker(&self) -> BorrowedWaker<'_> {
544        static BORROWED_WAKER_VTABLE: RawWakerVTable =
545            RawWakerVTable::new(waker_clone, waker_wake_by_ref, waker_wake_by_ref, waker_noop);
546        static WAKER_VTABLE: RawWakerVTable =
547            RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
548
549        fn waker_clone(raw_meta: *const ()) -> RawWaker {
550            // SAFETY: We did the reverse cast below.
551            let meta = unsafe { &*(raw_meta as *const Meta) };
552            meta.retain();
553            RawWaker::new(raw_meta, &WAKER_VTABLE)
554        }
555
556        fn waker_wake(raw_meta: *const ()) {
557            // SAFETY: We did the reverse cast below.
558            let meta = unsafe { &*(raw_meta as *const Meta) };
559            if meta.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
560                // This consumes the reference count.
561                meta.scope().executor().task_is_ready(AtomicFutureHandle(
562                    // SAFETY: We know raw_meta is not null.
563                    unsafe { NonNull::new_unchecked(raw_meta as *mut Meta) },
564                    PhantomData,
565                ));
566            } else {
567                meta.release();
568            }
569        }
570
571        fn waker_wake_by_ref(meta: *const ()) {
572            // SAFETY: We did the reverse cast below.
573            let meta = unsafe { &*(meta as *const Meta) };
574            // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
575            unsafe {
576                meta.wake();
577            }
578        }
579
580        fn waker_noop(_meta: *const ()) {}
581
582        fn waker_drop(meta: *const ()) {
583            // SAFETY: We did the reverse cast below.
584            let meta = unsafe { &*(meta as *const Meta) };
585            meta.release();
586        }
587
588        BorrowedWaker(
589            // SAFETY: We meet the contract for RawWaker/RawWakerVtable.
590            unsafe {
591                Waker::from_raw(RawWaker::new(self.0.as_ptr() as *const (), &BORROWED_WAKER_VTABLE))
592            },
593            PhantomData,
594        )
595    }
596
597    /// Wakes the future.
598    pub fn wake(&self) {
599        // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
600        unsafe {
601            self.meta().wake();
602        }
603    }
604
605    /// Wakes the future with an active guard. Returns true if successful i.e. a guard needs to be
606    /// acquired.
607    ///
608    /// NOTE: `Scope::release_cancel_guard` can be called *before* this function returns because the
609    /// task can be polled on another thread. For this reason, the caller either needs to hold a
610    /// lock, or it should preemptively take the guard.
611    pub fn wake_with_active_guard(&self) -> bool {
612        // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
613        unsafe { self.meta().wake_with_active_guard() }
614    }
615}
616
617impl<F: Future> Drop for AtomicFuture<F> {
618    fn drop(&mut self) {
619        let meta = &mut self.meta;
620        // This needs to be acquire ordering so that we see writes that might have just happened
621        // in another thread when the future was polled.
622        let state = meta.state.load(Acquire);
623        if state & DONE == 0 {
624            // SAFETY: The state isn't DONE so we must drop the future.
625            unsafe {
626                (meta.vtable.drop_future)(meta.into());
627            }
628        } else if state & RESULT_TAKEN == 0 {
629            // SAFETY: The result hasn't been taken so we must drop the result.
630            unsafe {
631                (meta.vtable.drop_result)(meta.into());
632            }
633        }
634    }
635}
636
637pub struct BorrowedWaker<'a>(std::task::Waker, PhantomData<&'a ()>);
638
639impl Deref for BorrowedWaker<'_> {
640    type Target = Waker;
641
642    fn deref(&self) -> &Self::Target {
643        &self.0
644    }
645}
646
647impl Borrow<usize> for AtomicFutureHandle<'static> {
648    fn borrow(&self) -> &usize {
649        &self.meta().id
650    }
651}
652
653impl Hash for AtomicFutureHandle<'static> {
654    fn hash<H: Hasher>(&self, state: &mut H) {
655        self.meta().id.hash(state);
656    }
657}
658
659impl PartialEq for AtomicFutureHandle<'static> {
660    fn eq(&self, other: &Self) -> bool {
661        self.meta().id == other.meta().id
662    }
663}
664
665impl Eq for AtomicFutureHandle<'static> {}
666
667struct Bomb;
668impl Drop for Bomb {
669    fn drop(&mut self) {
670        std::process::abort();
671    }
672}