fuchsia_async/runtime/fuchsia/executor/
atomic_future.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod spawnable_future;
6
7use crate::ScopeHandle;
8use futures::ready;
9use std::borrow::Borrow;
10use std::future::Future;
11use std::hash::{Hash, Hasher};
12use std::marker::PhantomData;
13use std::mem::ManuallyDrop;
14use std::ops::Deref;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
20
21/// A lock-free thread-safe future.
22
23// The debugger knows the layout so that async backtraces work, so if this changes the debugger
24// might need to be changed too.
25//
26// This is `repr(C)` so that we can cast between `NonNull<Meta>` and `NonNull<AtomicFuture<F>>`.
27
28// LINT.IfChange
29#[repr(C)]
30struct AtomicFuture<F: Future> {
31    meta: Meta,
32
33    // `future` is safe to access after successfully clearing the INACTIVE state bit and the `DONE`
34    // state bit isn't set.
35    future: FutureOrResult<F>,
36}
37// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
38
39/// A lock-free thread-safe future.  The handles can be cloned.
40pub struct AtomicFutureHandle<'a>(NonNull<Meta>, PhantomData<&'a ()>);
41
42/// `AtomicFutureHandle` is safe to access from multiple threads at once.
43unsafe impl Sync for AtomicFutureHandle<'_> {}
44unsafe impl Send for AtomicFutureHandle<'_> {}
45
46impl Drop for AtomicFutureHandle<'_> {
47    fn drop(&mut self) {
48        self.meta().release();
49    }
50}
51
52impl Clone for AtomicFutureHandle<'_> {
53    fn clone(&self) -> Self {
54        self.meta().retain();
55        Self(self.0, PhantomData)
56    }
57}
58
59struct Meta {
60    vtable: &'static VTable,
61
62    // Holds the reference count and state bits (INACTIVE, READY, etc.).
63    state: AtomicUsize,
64
65    scope: Option<ScopeHandle>,
66    id: usize,
67}
68
69impl Meta {
70    // # Safety
71    //
72    // This mints a handle with the 'static lifetime, so this should only be called from
73    // `AtomicFutureHandle<'static>`.
74    unsafe fn wake(&self) {
75        if self.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
76            self.retain();
77            self.scope().executor().task_is_ready(AtomicFutureHandle(self.into(), PhantomData));
78        }
79    }
80
81    fn scope(&self) -> &ScopeHandle {
82        self.scope.as_ref().unwrap()
83    }
84
85    fn retain(&self) {
86        let old = self.state.fetch_add(1, Relaxed) & REF_COUNT_MASK;
87        assert!(old != REF_COUNT_MASK);
88    }
89
90    fn release(&self) {
91        // This can be Relaxed because there is a barrier in the drop function.
92        let old = self.state.fetch_sub(1, Relaxed) & REF_COUNT_MASK;
93        if old == 1 {
94            // SAFETY: This is safe because we just released the last reference.
95            unsafe {
96                (self.vtable.drop)(self.into());
97            }
98        } else {
99            // Check for underflow.
100            assert!(old > 0);
101        }
102    }
103
104    // # Safety
105    //
106    // The caller must know that the future has completed.
107    unsafe fn drop_result(&self, ordering: Ordering) {
108        // It's possible for this to race with another thread so we only drop the result if we are
109        // successful in setting the RESULT_TAKEN bit.
110        if self.state.fetch_or(RESULT_TAKEN, ordering) & RESULT_TAKEN == 0 {
111            (self.vtable.drop_result)(self.into());
112        }
113    }
114}
115
116struct VTable {
117    /// Drops the atomic future.
118    ///
119    /// # Safety
120    ///
121    /// The caller must ensure there are no other references i.e. the reference count should be
122    /// zero.
123    // zxdb uses this method to figure out the concrete type of the future.
124    // LINT.IfChange
125    drop: unsafe fn(NonNull<Meta>),
126    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
127    /// Drops the future.
128    ///
129    /// # Safety
130    ///
131    /// The caller must ensure the future hasn't been dropped.
132    drop_future: unsafe fn(NonNull<Meta>),
133    /// Polls the future.
134    ///
135    /// # Safety
136    ///
137    /// The caller must ensure the future hasn't been dropped and has exclusive access.
138    poll: unsafe fn(NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()>,
139
140    /// Gets the result.
141    ///
142    /// # Safety
143    ///
144    /// The caller must ensure the future is finished and the result hasn't been taken or dropped.
145    get_result: unsafe fn(NonNull<Meta>) -> *const (),
146
147    /// Drops the result.
148    ///
149    /// # Safety
150    ///
151    /// The caller must ensure the future is finished and the result hasn't already been taken or
152    /// dropped.
153    drop_result: unsafe fn(NonNull<Meta>),
154}
155
156union FutureOrResult<F: Future> {
157    future: ManuallyDrop<F>,
158    result: ManuallyDrop<F::Output>,
159}
160
161impl<F: Future> AtomicFuture<F> {
162    const VTABLE: VTable = VTable {
163        drop: Self::drop,
164        drop_future: Self::drop_future,
165        poll: Self::poll,
166        get_result: Self::get_result,
167        drop_result: Self::drop_result,
168    };
169
170    unsafe fn drop(meta: NonNull<Meta>) {
171        drop(Box::from_raw(meta.cast::<Self>().as_mut()));
172    }
173
174    unsafe fn poll(meta: NonNull<Meta>, cx: &mut Context<'_>) -> Poll<()> {
175        let future = &mut meta.cast::<Self>().as_mut().future;
176        let result = ready!(Pin::new_unchecked(&mut *future.future).poll(cx));
177        // This might panic which will leave ourselves in a bad state.  We deal with this by
178        // aborting (see below).
179        ManuallyDrop::drop(&mut future.future);
180        future.result = ManuallyDrop::new(result);
181        Poll::Ready(())
182    }
183
184    unsafe fn drop_future(meta: NonNull<Meta>) {
185        ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.future);
186    }
187
188    unsafe fn get_result(meta: NonNull<Meta>) -> *const () {
189        &*meta.cast::<Self>().as_mut().future.result as *const F::Output as *const ()
190    }
191
192    unsafe fn drop_result(meta: NonNull<Meta>) {
193        ManuallyDrop::drop(&mut meta.cast::<Self>().as_mut().future.result);
194    }
195}
196
197/// State Bits
198
199// Exclusive access is gained by clearing this bit.
200const INACTIVE: usize = 1 << 63;
201
202// Set to indicate the future needs to be polled again.
203const READY: usize = 1 << 62;
204
205// Terminal state: the future is dropped upon entry to this state.  When in this state, other bits
206// can be set, including READY (which has no meaning).
207const DONE: usize = 1 << 61;
208
209// The task has been detached.
210const DETACHED: usize = 1 << 60;
211
212// The task has been cancelled.
213const ABORTED: usize = 1 << 59;
214
215// The result has been taken.
216const RESULT_TAKEN: usize = 1 << 58;
217
218// The mask for the ref count.
219const REF_COUNT_MASK: usize = RESULT_TAKEN - 1;
220
221/// The result of a call to `try_poll`.
222/// This indicates the result of attempting to `poll` the future.
223pub enum AttemptPollResult {
224    /// The future was polled, but did not complete.
225    Pending,
226    /// The future was polled and finished by this thread.
227    /// This result is normally used to trigger garbage-collection of the future.
228    IFinished,
229    /// The future was already completed by another thread.
230    SomeoneElseFinished,
231    /// The future was polled, did not complete, but it is woken whilst it is polled so it
232    /// should be polled again.
233    Yield,
234    /// The future was aborted.
235    Aborted,
236}
237
238/// The result of calling the `abort_and_detach` function.
239#[must_use]
240pub enum AbortAndDetachResult {
241    /// The future has finished; it can be dropped.
242    Done,
243
244    /// The future needs to be added to a run queue to be aborted.
245    AddToRunQueue,
246
247    /// The future is soon to be aborted and nothing needs to be done.
248    Pending,
249}
250
251impl<'a> AtomicFutureHandle<'a> {
252    /// Create a new `AtomicFuture`.
253    pub fn new<F: Future + Send + 'a>(scope: Option<ScopeHandle>, id: usize, future: F) -> Self
254    where
255        F::Output: Send + 'a,
256    {
257        // SAFETY: This is safe because the future and output are both Send.
258        unsafe { Self::new_local(scope, id, future) }
259    }
260
261    /// Create a new `AtomicFuture` from a !Send future.
262    ///
263    /// # Safety
264    ///
265    /// The caller must uphold the Send requirements.
266    pub unsafe fn new_local<F: Future + 'a>(
267        scope: Option<ScopeHandle>,
268        id: usize,
269        future: F,
270    ) -> Self
271    where
272        F::Output: 'a,
273    {
274        Self(
275            NonNull::new_unchecked(Box::into_raw(Box::new(AtomicFuture {
276                meta: Meta {
277                    vtable: &AtomicFuture::<F>::VTABLE,
278                    // The future is inactive and we start with a single reference.
279                    state: AtomicUsize::new(1 | INACTIVE),
280                    scope,
281                    id,
282                },
283                future: FutureOrResult { future: ManuallyDrop::new(future) },
284            })))
285            .cast::<Meta>(),
286            PhantomData,
287        )
288    }
289
290    fn meta(&self) -> &Meta {
291        // SAFETY: This is safe because we hold a reference count.
292        unsafe { self.0.as_ref() }
293    }
294
295    /// Returns the future's ID.
296    pub fn id(&self) -> usize {
297        self.meta().id
298    }
299
300    /// Returns the associated scope.
301    pub fn scope(&self) -> &ScopeHandle {
302        &self.meta().scope()
303    }
304
305    /// Attempt to poll the underlying future.
306    ///
307    /// `try_poll` ensures that the future is polled at least once more
308    /// unless it has already finished.
309    pub fn try_poll(&self, cx: &mut Context<'_>) -> AttemptPollResult {
310        let meta = self.meta();
311        loop {
312            // Attempt to acquire sole responsibility for polling the future (by clearing the
313            // INACTIVE bit) and also clear the READY bit at the same time so that we track if it
314            // becomes READY again whilst we are polling.
315            let old = meta.state.fetch_and(!(INACTIVE | READY), Acquire);
316            assert_ne!(old & REF_COUNT_MASK, 0);
317            if old & DONE != 0 {
318                // Someone else completed this future already
319                return AttemptPollResult::SomeoneElseFinished;
320            }
321            if old & INACTIVE != 0 {
322                // We are now the (only) active worker, proceed to poll...
323                if old & ABORTED != 0 {
324                    // The future was aborted.
325                    // SAFETY: We have exclusive access.
326                    unsafe {
327                        self.drop_future_unchecked();
328                    }
329                    return AttemptPollResult::Aborted;
330                }
331                break;
332            }
333            // Future was already active; this shouldn't really happen because we shouldn't be
334            // polling it from multiple threads at the same time.  Still, we handle it by setting
335            // the READY bit so that it gets polled again.  We do this regardless of whether we
336            // cleared the READY bit above.
337            let old = meta.state.fetch_or(READY, Relaxed);
338            // If the future is still active, or the future was already marked as ready, we can
339            // just return and it will get polled again.
340            if old & INACTIVE == 0 || old & READY != 0 {
341                return AttemptPollResult::Pending;
342            }
343            // The worker finished, and we marked the future as ready, so we must try again because
344            // the future won't be in a run queue.
345        }
346
347        // We cannot recover from panics.
348        let bomb = Bomb;
349
350        // SAFETY: We have exclusive access because we cleared the INACTIVE state bit.
351        let result = unsafe { (meta.vtable.poll)(meta.into(), cx) };
352
353        std::mem::forget(bomb);
354
355        if let Poll::Ready(()) = result {
356            // The future will have been dropped, so we just need to set the state.
357            //
358            // This needs to be Release ordering because we need to synchronize with another thread
359            // that takes or drops the result.
360            let old = meta.state.fetch_or(DONE, Release);
361
362            if old & DETACHED != 0 {
363                // If the future is detached, we should eagerly drop the result.  This can be Relaxed
364                // ordering because the result was written by this thread.
365
366                // SAFETY: The future has completed.
367                unsafe {
368                    meta.drop_result(Relaxed);
369                }
370            }
371            // No one else will read `future` unless they see `INACTIVE`, which will never
372            // happen again.
373            AttemptPollResult::IFinished
374        } else if meta.state.fetch_or(INACTIVE, Release) & READY == 0 {
375            AttemptPollResult::Pending
376        } else {
377            // The future was marked ready whilst we were polling, so yield.
378            AttemptPollResult::Yield
379        }
380    }
381
382    /// Drops the future without checking its current state.
383    ///
384    /// # Safety
385    ///
386    /// This doesn't check the current state, so this must only be called if it is known that there
387    /// is no concurrent access.  This also does *not* include any memory barriers before dropping
388    /// the future.
389    pub unsafe fn drop_future_unchecked(&self) {
390        // Set the state first in case we panic when we drop.
391        let meta = self.meta();
392        assert!(meta.state.fetch_or(DONE | RESULT_TAKEN, Relaxed) & DONE == 0);
393        (meta.vtable.drop_future)(meta.into());
394    }
395
396    /// Drops the future if it is not currently being polled. Returns success if the future was
397    /// dropped or was already dropped.
398    pub fn try_drop(&self) -> Result<(), ()> {
399        let old = self.meta().state.fetch_and(!INACTIVE, Acquire);
400        if old & DONE != 0 {
401            Ok(())
402        } else if old & INACTIVE != 0 {
403            // SAFETY: We have exclusive access.
404            unsafe {
405                self.drop_future_unchecked();
406            }
407            Ok(())
408        } else {
409            Err(())
410        }
411    }
412
413    /// Aborts the task.  Returns true if the task needs to be added to a run queue.
414    #[must_use]
415    pub fn abort(&self) -> bool {
416        self.meta().state.fetch_or(ABORTED | READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE
417    }
418
419    /// Marks the task as detached.
420    pub fn detach(&self) {
421        let meta = self.meta();
422        let old = meta.state.fetch_or(DETACHED, Relaxed);
423
424        if old & (DONE | RESULT_TAKEN) == DONE {
425            // If the future is done, we should eagerly drop the result.  This needs to be acquire
426            // ordering because another thread might have written the result.
427
428            // SAFETY: The future has completed.
429            unsafe {
430                meta.drop_result(Acquire);
431            }
432        }
433    }
434
435    /// Marks the task as aborted and detached (for when the caller isn't interested in waiting
436    /// for the cancellation to be finished).  Returns true if the task should be added to a run
437    /// queue.
438    pub fn abort_and_detach(&self) -> AbortAndDetachResult {
439        let meta = self.meta();
440        let old_state = meta.state.fetch_or(ABORTED | DETACHED | READY, Relaxed);
441        if old_state & DONE != 0 {
442            // If the future is done, we should eagerly drop the result.  This needs to be acquire
443            // ordering because another thread might have written the result.
444
445            // SAFETY: The future has completed.
446            unsafe {
447                meta.drop_result(Acquire);
448            }
449
450            AbortAndDetachResult::Done
451        } else if old_state & (INACTIVE | READY) == INACTIVE {
452            AbortAndDetachResult::AddToRunQueue
453        } else {
454            AbortAndDetachResult::Pending
455        }
456    }
457
458    /// Returns true if the task is detached.
459    pub fn is_detached(&self) -> bool {
460        self.meta().state.load(Relaxed) & DETACHED != 0
461    }
462
463    /// Takes the result.
464    ///
465    /// # Safety
466    ///
467    /// The caller must guarantee that `R` is the correct type.
468    pub unsafe fn take_result<R>(&self) -> Option<R> {
469        // This needs to be Acquire ordering to synchronize with the polling thread.
470        let meta = self.meta();
471        if meta.state.load(Relaxed) & (DONE | RESULT_TAKEN) == DONE
472            && meta.state.fetch_or(RESULT_TAKEN, Acquire) & RESULT_TAKEN == 0
473        {
474            Some(((meta.vtable.get_result)(meta.into()) as *const R).read())
475        } else {
476            None
477        }
478    }
479}
480
481impl AtomicFutureHandle<'static> {
482    /// Returns a waker for the future.
483    pub fn waker(&self) -> BorrowedWaker<'_> {
484        static BORROWED_WAKER_VTABLE: RawWakerVTable =
485            RawWakerVTable::new(waker_clone, waker_wake_by_ref, waker_wake_by_ref, waker_noop);
486        static WAKER_VTABLE: RawWakerVTable =
487            RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
488
489        fn waker_clone(raw_meta: *const ()) -> RawWaker {
490            // SAFETY: We did the reverse cast below.
491            let meta = unsafe { &*(raw_meta as *const Meta) };
492            meta.retain();
493            RawWaker::new(raw_meta, &WAKER_VTABLE)
494        }
495
496        fn waker_wake(raw_meta: *const ()) {
497            // SAFETY: We did the reverse cast below.
498            let meta = unsafe { &*(raw_meta as *const Meta) };
499            if meta.state.fetch_or(READY, Relaxed) & (INACTIVE | READY | DONE) == INACTIVE {
500                // This consumes the reference count.
501                meta.scope().executor().task_is_ready(AtomicFutureHandle(
502                    // SAFETY: We know raw_meta is not null.
503                    unsafe { NonNull::new_unchecked(raw_meta as *mut Meta) },
504                    PhantomData,
505                ));
506            } else {
507                meta.release();
508            }
509        }
510
511        fn waker_wake_by_ref(meta: *const ()) {
512            // SAFETY: We did the reverse cast below.
513            let meta = unsafe { &*(meta as *const Meta) };
514            // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
515            unsafe {
516                meta.wake();
517            }
518        }
519
520        fn waker_noop(_meta: *const ()) {}
521
522        fn waker_drop(meta: *const ()) {
523            // SAFETY: We did the reverse cast below.
524            let meta = unsafe { &*(meta as *const Meta) };
525            meta.release();
526        }
527
528        BorrowedWaker(
529            // SAFETY: We meet the contract for RawWaker/RawWakerVtable.
530            unsafe {
531                Waker::from_raw(RawWaker::new(self.0.as_ptr() as *const (), &BORROWED_WAKER_VTABLE))
532            },
533            PhantomData,
534        )
535    }
536
537    /// Wakes the future.
538    pub fn wake(&self) {
539        // SAFETY: The lifetime on `AtomicFutureHandle` is 'static.
540        unsafe {
541            self.meta().wake();
542        }
543    }
544}
545
546impl<F: Future> Drop for AtomicFuture<F> {
547    fn drop(&mut self) {
548        let meta = &mut self.meta;
549        // This needs to be acquire ordering so that we see writes that might have just happened
550        // in another thread when the future was polled.
551        let state = meta.state.load(Acquire);
552        if state & DONE == 0 {
553            // SAFETY: The state isn't DONE so we must drop the future.
554            unsafe {
555                (meta.vtable.drop_future)(meta.into());
556            }
557        } else if state & RESULT_TAKEN == 0 {
558            // SAFETY: The result hasn't been taken so we must drop the result.
559            unsafe {
560                (meta.vtable.drop_result)(meta.into());
561            }
562        }
563    }
564}
565
566pub struct BorrowedWaker<'a>(std::task::Waker, PhantomData<&'a ()>);
567
568impl Deref for BorrowedWaker<'_> {
569    type Target = Waker;
570
571    fn deref(&self) -> &Self::Target {
572        &self.0
573    }
574}
575
576impl Borrow<usize> for AtomicFutureHandle<'static> {
577    fn borrow(&self) -> &usize {
578        &self.meta().id
579    }
580}
581
582impl Hash for AtomicFutureHandle<'static> {
583    fn hash<H: Hasher>(&self, state: &mut H) {
584        self.meta().id.hash(state);
585    }
586}
587
588impl PartialEq for AtomicFutureHandle<'static> {
589    fn eq(&self, other: &Self) -> bool {
590        self.meta().id == other.meta().id
591    }
592}
593
594impl Eq for AtomicFutureHandle<'static> {}
595
596struct Bomb;
597impl Drop for Bomb {
598    fn drop(&mut self) {
599        std::process::abort();
600    }
601}