fuchsia_async/runtime/fuchsia/executor/
scope.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::condition::{Condition, ConditionGuard, WakerEntry};
9use crate::EHandle;
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{ready, Context, Poll, Waker};
26use std::{fmt, hash};
27
28//
29// # Public API
30//
31
32/// A scope for managing async tasks. This scope is aborted when dropped.
33///
34/// Scopes are how fuchsia-async implements [structured concurrency][sc]. Every
35/// task is spawned on a scope, and runs until either the task completes or the
36/// scope is cancelled or aborted. In addition to owning tasks, scopes may own
37/// child scopes, forming a nested structure.
38///
39/// Scopes are usually joined or cancelled when the owning code is done with
40/// them. This makes it easier to reason about when a background task might
41/// still be running. Note that in multithreaded contexts it is safer to cancel
42/// and await a scope explicitly than to drop it, because the destructor is not
43/// synchronized with other threads that might be running a task.
44///
45/// [`Task::spawn`][crate::Task::spawn] and related APIs spawn on the root scope
46/// of the executor. New code is encouraged to spawn directly on scopes instead,
47/// passing their handles as a way of documenting when a function might spawn
48/// tasks that run in the background and reasoning about their side effects.
49///
50/// ## Scope lifecycle
51///
52/// When a scope is created it is open, meaning it accepts new tasks. Scopes are
53/// closed when one of the following happens:
54///
55/// 1. When [`close()`][Scope::close] is called.
56/// 2. When the scope is aborted or dropped, the scope is closed immediately.
57/// 3. When the scope is cancelled, the scope is closed when all active guards
58///    are dropped.
59/// 4. When the scope is joined and all tasks complete, the scope is closed
60///    before the join future resolves.
61///
62/// When a scope is closed it no longer accepts tasks. Tasks spawned on the
63/// scope are dropped immediately, and their [`Task`][crate::Task] or
64/// [`JoinHandle`][crate::JoinHandle] futures never resolve. This applies
65/// transitively to all child scopes. Closed scopes cannot currently be
66/// reopened.
67///
68/// Scopes can also be detached, in which case they are never closed, and run
69/// until the completion of all tasks.
70///
71/// [sc]: https://en.wikipedia.org/wiki/Structured_concurrency
72#[must_use = "Scopes should be explicitly awaited or cancelled"]
73#[derive(Debug)]
74pub struct Scope {
75    // LINT.IfChange
76    inner: ScopeHandle,
77    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
78}
79
80impl Default for Scope {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl Scope {
87    /// Create a new scope.
88    ///
89    /// The returned scope is a child of the current scope.
90    ///
91    /// # Panics
92    ///
93    /// May panic if not called in the context of an executor (e.g. within a
94    /// call to [`run`][crate::SendExecutor::run]).
95    pub fn new() -> Scope {
96        ScopeHandle::with_current(|handle| handle.new_child())
97    }
98
99    /// Create a new scope with a name.
100    ///
101    /// The returned scope is a child of the current scope.
102    ///
103    /// # Panics
104    ///
105    /// May panic if not called in the context of an executor (e.g. within a
106    /// call to [`run`][crate::SendExecutor::run]).
107    pub fn new_with_name(name: impl Into<String>) -> Scope {
108        ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
109    }
110
111    /// Get the scope of the current task, or the global scope if there is no task
112    /// being polled.
113    ///
114    /// # Panics
115    ///
116    /// May panic if not called in the context of an executor (e.g. within a
117    /// call to [`run`][crate::SendExecutor::run]).
118    pub fn current() -> ScopeHandle {
119        ScopeHandle::with_current(|handle| handle.clone())
120    }
121
122    /// Get the global scope of the executor.
123    ///
124    /// This can be used to spawn tasks that live as long as the executor.
125    /// Usually, this means until the end of the program or test. This should
126    /// only be done for tasks where this is expected. If in doubt, spawn on a
127    /// shorter lived scope instead.
128    ///
129    /// In code that uses scopes, you are strongly encouraged to use this API
130    /// instead of the spawn APIs on [`Task`][crate::Task].
131    ///
132    /// All scopes are descendants of the global scope.
133    ///
134    /// # Panics
135    ///
136    /// May panic if not called in the context of an executor (e.g. within a
137    /// call to [`run`][crate::SendExecutor::run]).
138    pub fn global() -> ScopeHandle {
139        EHandle::local().global_scope().clone()
140    }
141
142    /// Create a child scope.
143    pub fn new_child(&self) -> Scope {
144        self.inner.new_child()
145    }
146
147    /// Create a child scope with a name.
148    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
149        self.inner.new_child_with_name(name.into())
150    }
151
152    /// Returns the name of the scope.
153    pub fn name(&self) -> &str {
154        &self.inner.inner.name
155    }
156
157    /// Create a [`ScopeHandle`] that may be used to spawn tasks on this scope.
158    ///
159    /// This is a shorthand for `scope.as_handle().clone()`.
160    ///
161    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
162    /// available. Note that you should _not_ call `scope.clone()`, even though
163    /// the compiler allows it due to the Deref impl. Call this method instead.
164    pub fn to_handle(&self) -> ScopeHandle {
165        self.inner.clone()
166    }
167
168    /// Get a reference to a [`ScopeHandle`] that may be used to spawn tasks on
169    /// this scope.
170    ///
171    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
172    /// available. If you have a `Scope` but need a `&ScopeHandle`, prefer
173    /// calling this method over the less readable `&*scope`.
174    pub fn as_handle(&self) -> &ScopeHandle {
175        &self.inner
176    }
177
178    /// Wait for all tasks in the scope and its children to complete.
179    ///
180    /// New tasks will be accepted on the scope until every task completes and
181    /// this future resolves.
182    ///
183    /// Note that you can await a scope directly because it implements
184    /// `IntoFuture`. `scope.join().await` is a more explicit form of
185    /// `scope.await`.
186    pub fn join(self) -> Join {
187        Join::new(self)
188    }
189
190    /// Stop accepting new tasks on the scope. Returns a future that waits for
191    /// every task on the scope to complete.
192    pub fn close(self) -> Join {
193        self.inner.close();
194        Join::new(self)
195    }
196
197    /// Cancel all tasks cooperatively in the scope and its children
198    /// recursively.
199    ///
200    /// `cancel` first gives a chance to all child tasks (including tasks of
201    /// child scopes) to shutdown cleanly if they're holding on to a
202    /// [`ScopeActiveGuard`]. Once no child tasks are holding on to guards, then
203    /// `cancel` behaves like [`Scope::abort`], dropping all tasks and stopping
204    /// them from running at the next yield point. A [`ScopeActiveGuard`]
205    /// provides a cooperative cancellation signal that is triggered by this
206    /// call, see its documentation for more details.
207    ///
208    /// Once the returned future resolves, no task on the scope will be polled
209    /// again.
210    ///
211    /// Cancelling a scope _does not_ immediately prevent new tasks from being
212    /// accepted. New tasks are accepted as long as there are
213    /// `ScopeActiveGuard`s for this scope.
214    pub fn cancel(self) -> Join {
215        self.inner.cancel_all_tasks();
216        Join::new(self)
217    }
218
219    /// Cancel all tasks in the scope and its children recursively.
220    ///
221    /// Once the returned future resolves, no task on the scope will be polled
222    /// again. Unlike [`Scope::cancel`], this doesn't send a cooperative
223    /// cancellation signal to tasks or child scopes.
224    ///
225    /// When a scope is aborted it immediately stops accepting tasks. Handles of
226    /// tasks spawned on the scope will pend forever.
227    ///
228    /// Dropping the `Scope` object is equivalent to calling this method and
229    /// discarding the returned future. Awaiting the future is preferred because
230    /// it eliminates the possibility of a task poll completing on another
231    /// thread after the scope object has been dropped, which can sometimes
232    /// result in surprising behavior.
233    pub fn abort(self) -> impl Future<Output = ()> {
234        self.inner.abort_all_tasks();
235        Join::new(self)
236    }
237
238    /// Detach the scope, allowing its tasks to continue running in the
239    /// background.
240    ///
241    /// Tasks of a detached scope are still subject to join and cancel
242    /// operations on parent scopes.
243    pub fn detach(self) {
244        // Use ManuallyDrop to destructure self, because Rust doesn't allow this
245        // for types which implement Drop.
246        let this = ManuallyDrop::new(self);
247        // SAFETY: this.inner is obviously valid, and we don't access `this`
248        // after moving.
249        mem::drop(unsafe { std::ptr::read(&this.inner) });
250    }
251}
252
253/// Abort the scope and all of its tasks. Prefer using the [`Scope::abort`]
254/// or [`Scope::join`] methods.
255impl Drop for Scope {
256    fn drop(&mut self) {
257        // Abort all tasks in the scope. Each task has a strong reference to the ScopeState,
258        // which will be dropped after all the tasks in the scope are dropped.
259
260        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
261        // here, but we cannot do that without either:
262        // - Sync drop support in AtomicFuture, or
263        // - The ability to reparent tasks, which requires atomic_arc or
264        //   acquiring a mutex during polling.
265        self.inner.abort_all_tasks();
266    }
267}
268
269impl IntoFuture for Scope {
270    type Output = ();
271    type IntoFuture = Join;
272    fn into_future(self) -> Self::IntoFuture {
273        self.join()
274    }
275}
276
277impl Deref for Scope {
278    type Target = ScopeHandle;
279    fn deref(&self) -> &Self::Target {
280        &self.inner
281    }
282}
283
284impl Borrow<ScopeHandle> for Scope {
285    fn borrow(&self) -> &ScopeHandle {
286        self
287    }
288}
289
290pin_project! {
291    /// Join operation for a [`Scope`].
292    ///
293    /// This is a future that resolves when all tasks on the scope are complete
294    /// or have been cancelled. New tasks will be accepted on the scope until
295    /// every task completes and this future resolves.
296    ///
297    /// When this object is dropped, the scope and all tasks in it are
298    /// cancelled.
299    //
300    // Note: The drop property is only true when S = Scope; it does not apply to
301    // other (non-public) uses of this struct where S = ScopeHandle.
302    pub struct Join<S = Scope> {
303        scope: S,
304        #[pin]
305        waker_entry: WakerEntry<ScopeState>,
306    }
307}
308
309impl<S: Borrow<ScopeHandle>> Join<S> {
310    fn new(scope: S) -> Self {
311        let waker_entry = scope.borrow().inner.state.waker_entry();
312        Self { scope, waker_entry }
313    }
314
315    /// Aborts the scope. The future will resolve when all tasks have finished
316    /// polling.
317    ///
318    /// See [`Scope::abort`] for more details.
319    pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
320        self.scope.borrow().abort_all_tasks();
321        self
322    }
323
324    /// Cancel the scope. The future will resolve when all tasks have finished
325    /// polling.
326    ///
327    /// See [`Scope::cancel`] for more details.
328    pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
329        self.scope.borrow().cancel_all_tasks();
330        self
331    }
332}
333
334impl<S> Future for Join<S>
335where
336    S: Borrow<ScopeHandle>,
337{
338    type Output = ();
339    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340        let this = self.project();
341        let mut state = Borrow::borrow(&*this.scope).lock();
342        if state.has_tasks() {
343            state.add_waker(this.waker_entry, cx.waker().clone());
344            Poll::Pending
345        } else {
346            state.mark_finished();
347            Poll::Ready(())
348        }
349    }
350}
351
352/// Trait for things that can be spawned on to a scope.  There is a blanket implementation
353/// below for futures.
354pub trait Spawnable {
355    /// The type of value produced on completion.
356    type Output;
357
358    /// Converts to a task that can be spawned directly.
359    fn into_task(self, scope: ScopeHandle) -> TaskHandle;
360}
361
362impl<F: Future + Send + 'static> Spawnable for F
363where
364    F::Output: Send + 'static,
365{
366    type Output = F::Output;
367
368    fn into_task(self, scope: ScopeHandle) -> TaskHandle {
369        scope.new_task(None, self)
370    }
371}
372
373/// A handle to a scope, which may be used to spawn tasks.
374///
375/// ## Ownership and cycles
376///
377/// Tasks running on a `Scope` may hold a `ScopeHandle` to that scope. This does
378/// not create an ownership cycle because the task will drop the `ScopeHandle`
379/// once it completes or is cancelled.
380///
381/// Naturally, scopes containing tasks that never complete and that are never
382/// cancelled will never be freed. Holding a `ScopeHandle` does not contribute to
383/// this problem.
384#[derive(Clone)]
385pub struct ScopeHandle {
386    // LINT.IfChange
387    inner: Arc<ScopeInner>,
388    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
389}
390
391impl ScopeHandle {
392    /// Create a child scope.
393    pub fn new_child(&self) -> Scope {
394        self.new_child_inner(String::new())
395    }
396
397    /// Create a child scope.
398    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
399        self.new_child_inner(name.into())
400    }
401
402    fn new_child_inner(&self, name: String) -> Scope {
403        let mut state = self.lock();
404        let child = ScopeHandle {
405            inner: Arc::new(ScopeInner {
406                executor: self.inner.executor.clone(),
407                state: Condition::new(ScopeState::new_child(
408                    self.clone(),
409                    &state,
410                    JoinResults::default().into(),
411                )),
412                name,
413            }),
414        };
415        let weak = child.downgrade();
416        state.insert_child(weak);
417        Scope { inner: child }
418    }
419
420    /// Spawn a new task on the scope.
421    // This does not have the must_use attribute because it's common to detach and the lifetime of
422    // the task is bound to the scope: when the scope is dropped, the task will be cancelled.
423    pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
424        let task = future.into_task(self.clone());
425        let task_id = task.id();
426        self.insert_task(task, false);
427        JoinHandle::new(self.clone(), task_id)
428    }
429
430    /// Spawn a new task on the scope of a thread local executor.
431    ///
432    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
433    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
434    pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
435        let task = self.new_local_task(None, future);
436        let id = task.id();
437        self.insert_task(task, false);
438        JoinHandle::new(self.clone(), id)
439    }
440
441    /// Like `spawn`, but for tasks that return a result.
442    ///
443    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
444    /// *cancelled*.
445    pub fn compute<T: Send + 'static>(
446        &self,
447        future: impl Spawnable<Output = T> + Send + 'static,
448    ) -> crate::Task<T> {
449        let task = future.into_task(self.clone());
450        let id = task.id();
451        self.insert_task(task, false);
452        JoinHandle::new(self.clone(), id).into()
453    }
454
455    /// Like `spawn`, but for tasks that return a result.
456    ///
457    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
458    /// *cancelled*.
459    ///
460    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
461    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
462    pub fn compute_local<T: 'static>(
463        &self,
464        future: impl Future<Output = T> + 'static,
465    ) -> crate::Task<T> {
466        let task = self.new_local_task(None, future);
467        let id = task.id();
468        self.insert_task(task, false);
469        JoinHandle::new(self.clone(), id).into()
470    }
471
472    pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
473        ScopeHandle {
474            inner: Arc::new(ScopeInner {
475                executor,
476                state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
477                name: "root".to_string(),
478            }),
479        }
480    }
481
482    /// Stop the scope from accepting new tasks.
483    ///
484    /// Note that unlike [`Scope::close`], this does not return a future that
485    /// waits for all tasks to complete. This could lead to resource leaks
486    /// because it is not uncommon to access a TaskGroup from a task running on
487    /// the scope itself. If such a task were to await a future returned by this
488    /// method it would suspend forever waiting for itself to complete.
489    pub fn close(&self) {
490        self.lock().close();
491    }
492
493    /// Cancel all the scope's tasks.
494    ///
495    /// Note that if this is called from within a task running on the scope, the
496    /// task will not resume from the next await point.
497    pub fn cancel(self) -> Join<Self> {
498        self.cancel_all_tasks();
499        Join::new(self)
500    }
501
502    /// Aborts all the scope's tasks.
503    ///
504    /// Note that if this is called from within a task running on the scope, the
505    /// task will not resume from the next await point.
506    pub fn abort(self) -> impl Future<Output = ()> {
507        self.abort_all_tasks();
508        Join::new(self)
509    }
510
511    /// Retrieves a [`ScopeActiveGuard`] for this scope.
512    ///
513    /// Note that this may fail if cancellation has already started for this
514    /// scope. In that case, the caller must assume any tasks from this scope
515    /// may be dropped at any yield point.
516    ///
517    /// Creating a [`ScopeActiveGuard`] is substantially more expensive than
518    /// just polling it, so callers should maintain the returned guard when
519    /// success is observed from this call for best performance.
520    ///
521    /// See [`Scope::cancel`] for details on cooperative cancellation behavior.
522    #[must_use]
523    pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
524        ScopeActiveGuard::new(self)
525    }
526
527    /// Returns true if the scope has been signaled to exit via
528    /// [`Scope::cancel`] or [`Scope::abort`].
529    pub fn is_cancelled(&self) -> bool {
530        self.lock().status().is_cancelled()
531    }
532
533    // Joining the scope could be allowed from a ScopeHandle, but the use case
534    // seems less common and more bug prone than cancelling. We don't allow this
535    // for the same reason we don't return a future from close().
536
537    /// Wait for there to be no tasks. This is racy: as soon as this returns it is possible for
538    /// another task to have been spawned on this scope.
539    pub async fn on_no_tasks(&self) {
540        self.inner
541            .state
542            .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
543            .await;
544    }
545
546    /// Wait for there to be no tasks and no guards. This is racy: as soon as this returns it is
547    /// possible for another task to have been spawned on this scope, or for there to be guards.
548    pub async fn on_no_tasks_and_guards(&self) {
549        self.inner
550            .state
551            .when(|state| {
552                if state.has_tasks() || state.guards() > 0 {
553                    Poll::Pending
554                } else {
555                    Poll::Ready(())
556                }
557            })
558            .await;
559    }
560
561    /// Wake all the scope's tasks so their futures will be polled again.
562    pub fn wake_all_with_active_guard(&self) {
563        self.lock().wake_all_with_active_guard();
564    }
565
566    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
567    /// That must be done separately.
568    pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
569        &self,
570        id: Option<usize>,
571        fut: Fut,
572    ) -> AtomicFutureHandle<'a>
573    where
574        Fut::Output: Send,
575    {
576        AtomicFutureHandle::new(
577            Some(self.clone()),
578            id.unwrap_or_else(|| self.executor().next_task_id()),
579            fut,
580        )
581    }
582
583    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
584    /// That must be done separately.
585    pub(crate) fn new_local_task<'a>(
586        &self,
587        id: Option<usize>,
588        fut: impl Future + 'a,
589    ) -> AtomicFutureHandle<'a> {
590        // Check that the executor is local.
591        if !self.executor().is_local() {
592            panic!(
593                "Error: called `new_local_task` on multithreaded executor. \
594                 Use `spawn` or a `LocalExecutor` instead."
595            );
596        }
597
598        // SAFETY: We've confirmed that the futures here will never be used across multiple threads,
599        // so the Send requirements that `new_local` requires should be met.
600        unsafe {
601            AtomicFutureHandle::new_local(
602                Some(self.clone()),
603                id.unwrap_or_else(|| self.executor().next_task_id()),
604                fut,
605            )
606        }
607    }
608}
609
610impl fmt::Debug for ScopeHandle {
611    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
612        f.debug_struct("Scope").field("name", &self.inner.name).finish()
613    }
614}
615
616/// Similar to a scope but all futures spawned on the scope *must* finish with the same result type.
617/// That allows the scope to return a stream of results. Attempting to spawn tasks using
618/// `ScopeHandle::spawn` (or similar) will result in tasks that are immediately dropped (just as if
619/// the scope was closed).  Like a regular scope, the scope can be closed, at which point the stream
620/// will terminate once all the tasks have finished.  This is designed to be a fairly close drop-in
621/// replacement to `FuturesUnordered`, the principle difference being that the tasks run in parallel
622/// rather than just concurrently.  Another difference is that the futures don't need to be the same
623/// type; only the outputs do.  In all other respects, the scope operates like a regular scope i.e.
624/// it can have children, you can join them, cancel them, etc.
625pub struct ScopeStream<R> {
626    inner: ScopeHandle,
627    stream: Arc<Mutex<ResultsStreamInner<R>>>,
628}
629
630impl<R: Send + 'static> ScopeStream<R> {
631    /// Creates a new scope stream.
632    ///
633    /// The returned scope stream is a child of the current scope.
634    ///
635    /// # Panics
636    ///
637    /// May panic if not called in the context of an executor (e.g. within a
638    /// call to [`run`][crate::SendExecutor::run]).
639    pub fn new() -> (Self, ScopeStreamHandle<R>) {
640        Self::new_inner(String::new())
641    }
642
643    /// Creates a new scope stream with a name.
644    ///
645    /// The returned scope stream is a child of the current scope.
646    ///
647    /// # Panics
648    ///
649    /// May panic if not called in the context of an executor (e.g. within a
650    /// call to [`run`][crate::SendExecutor::run]).
651    pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
652        Self::new_inner(name.into())
653    }
654
655    fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
656        let this = ScopeHandle::with_current(|handle| {
657            let mut state = handle.lock();
658            let stream = Arc::default();
659            let child = ScopeHandle {
660                inner: Arc::new(ScopeInner {
661                    executor: handle.executor().clone(),
662                    state: Condition::new(ScopeState::new_child(
663                        handle.clone(),
664                        &state,
665                        Box::new(ResultsStream { inner: Arc::clone(&stream) }),
666                    )),
667                    name,
668                }),
669            };
670            let weak = child.downgrade();
671            state.insert_child(weak);
672            ScopeStream { inner: child, stream }
673        });
674        let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
675        (this, handle)
676    }
677}
678
679impl<R> Drop for ScopeStream<R> {
680    fn drop(&mut self) {
681        // Cancel all tasks in the scope. Each task has a strong reference to the ScopeState,
682        // which will be dropped after all the tasks in the scope are dropped.
683
684        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
685        // here, but we cannot do that without either:
686        // - Sync drop support in AtomicFuture, or
687        // - The ability to reparent tasks, which requires atomic_arc or
688        //   acquiring a mutex during polling.
689        self.inner.abort_all_tasks();
690    }
691}
692
693impl<R: Send + 'static> Stream for ScopeStream<R> {
694    type Item = R;
695
696    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
697        let mut stream_inner = self.stream.lock();
698        match stream_inner.results.pop() {
699            Some(result) => Poll::Ready(Some(result)),
700            None => {
701                // Lock ordering: when results are posted, the state lock is taken first, so we must
702                // do the same.
703                drop(stream_inner);
704                let state = self.inner.lock();
705                let mut stream_inner = self.stream.lock();
706                match stream_inner.results.pop() {
707                    Some(result) => Poll::Ready(Some(result)),
708                    None => {
709                        if state.has_tasks() {
710                            stream_inner.waker = Some(cx.waker().clone());
711                            Poll::Pending
712                        } else {
713                            Poll::Ready(None)
714                        }
715                    }
716                }
717            }
718        }
719    }
720}
721
722impl<R> Deref for ScopeStream<R> {
723    type Target = ScopeHandle;
724    fn deref(&self) -> &Self::Target {
725        &self.inner
726    }
727}
728
729impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
730    fn borrow(&self) -> &ScopeHandle {
731        self
732    }
733}
734
735impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
736    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
737        let (stream, handle) = ScopeStream::new();
738        for fut in iter {
739            handle.push(fut);
740        }
741        stream.close();
742        stream
743    }
744}
745
746#[derive(Clone)]
747pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
748
749impl<R: Send> ScopeStreamHandle<R> {
750    pub fn push(&self, future: impl Spawnable<Output = R>) {
751        self.0.insert_task(future.into_task(self.0.clone()), true);
752    }
753}
754
755/// Holds a guard on the creating scope, holding off cancelation.
756///
757/// `ScopeActiveGuard` allows [`Scope`]s to perform cooperative cancellation.
758/// [`ScopeActiveGuard::on_cancel`] returns a future that resolves when
759/// [`Scope::cancel`] and [`ScopeHandle::cancel`] are called. That is the signal
760/// sent to cooperative tasks to stop doing work and finish.
761///
762/// A `ScopeActiveGuard` is obtained via [`ScopeHandle::active_guard`].
763/// `ScopeActiveGuard` releases the guard on the originating scope on drop.
764#[derive(Debug)]
765#[must_use]
766pub struct ScopeActiveGuard(ScopeHandle);
767
768impl Deref for ScopeActiveGuard {
769    type Target = ScopeHandle;
770    fn deref(&self) -> &Self::Target {
771        &self.0
772    }
773}
774
775impl Drop for ScopeActiveGuard {
776    fn drop(&mut self) {
777        let Self(scope) = self;
778        scope.release_cancel_guard();
779    }
780}
781
782impl Clone for ScopeActiveGuard {
783    fn clone(&self) -> Self {
784        self.0.lock().acquire_cancel_guard(1);
785        Self(self.0.clone())
786    }
787}
788
789impl ScopeActiveGuard {
790    /// Returns a borrow of the scope handle associated with this guard.
791    pub fn as_handle(&self) -> &ScopeHandle {
792        &self.0
793    }
794
795    /// Returns a clone of the scope handle associated with this guard.
796    pub fn to_handle(&self) -> ScopeHandle {
797        self.0.clone()
798    }
799
800    /// Retrieves a future from this guard that can be polled on for
801    /// cancellation.
802    ///
803    /// The returned future resolves when the scope is cancelled. Callers should
804    /// perform teardown and drop the guard when done.
805    pub async fn on_cancel(&self) {
806        self.0
807            .inner
808            .state
809            .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
810            .await
811    }
812
813    fn new(scope: &ScopeHandle) -> Option<Self> {
814        if scope.lock().acquire_cancel_guard_if_not_finished() {
815            Some(Self(scope.clone()))
816        } else {
817            None
818        }
819    }
820}
821
822//
823// # Internal API
824//
825
826/// A weak reference to a scope.
827#[derive(Clone)]
828struct WeakScopeHandle {
829    inner: Weak<ScopeInner>,
830}
831
832impl WeakScopeHandle {
833    /// Upgrades to a [`ScopeHandle`] if the scope still exists.
834    pub fn upgrade(&self) -> Option<ScopeHandle> {
835        self.inner.upgrade().map(|inner| ScopeHandle { inner })
836    }
837}
838
839impl hash::Hash for WeakScopeHandle {
840    fn hash<H: hash::Hasher>(&self, state: &mut H) {
841        Weak::as_ptr(&self.inner).hash(state);
842    }
843}
844
845impl PartialEq for WeakScopeHandle {
846    fn eq(&self, other: &Self) -> bool {
847        Weak::ptr_eq(&self.inner, &other.inner)
848    }
849}
850
851impl Eq for WeakScopeHandle {
852    // Weak::ptr_eq should return consistent results, even when the inner value
853    // has been dropped.
854}
855
856// This module exists as a privacy boundary so that we can make sure any
857// operation that might cause the scope to finish also wakes its waker.
858mod state {
859    use super::*;
860
861    pub struct ScopeState {
862        pub parent: Option<ScopeHandle>,
863        // LINT.IfChange
864        children: HashSet<WeakScopeHandle>,
865        all_tasks: HashSet<TaskHandle>,
866        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
867        /// The number of children that transitively contain tasks, plus one for
868        /// this scope if it directly contains tasks.
869        subscopes_with_tasks: u32,
870        can_spawn: bool,
871        guards: u32,
872        status: Status,
873        /// Wakers/results for joining each task.
874        pub results: Box<dyn Results>,
875    }
876
877    pub enum JoinResult {
878        Waker(Waker),
879        Result(TaskHandle),
880    }
881
882    #[repr(u8)] // So zxdb can read the status.
883    #[derive(Default, Debug, Clone, Copy)]
884    pub enum Status {
885        #[default]
886        /// The scope is active.
887        Active,
888        /// The scope has been signalled to cancel and is waiting for all guards
889        /// to be released.
890        PendingCancellation,
891        /// The scope is not accepting new tasks and all tasks have been
892        /// scheduled to be dropped.
893        Finished,
894    }
895
896    impl Status {
897        /// Returns whether this records a cancelled state.
898        pub fn is_cancelled(&self) -> bool {
899            match self {
900                Self::Active => false,
901                Self::PendingCancellation | Self::Finished => true,
902            }
903        }
904    }
905
906    impl ScopeState {
907        pub fn new_root(results: Box<impl Results>) -> Self {
908            Self {
909                parent: None,
910                children: Default::default(),
911                all_tasks: Default::default(),
912                subscopes_with_tasks: 0,
913                can_spawn: true,
914                guards: 0,
915                status: Default::default(),
916                results,
917            }
918        }
919
920        pub fn new_child(
921            parent_handle: ScopeHandle,
922            parent_state: &Self,
923            results: Box<impl Results>,
924        ) -> Self {
925            let (status, can_spawn) = match parent_state.status {
926                Status::Active => (Status::Active, parent_state.can_spawn),
927                Status::Finished | Status::PendingCancellation => (Status::Finished, false),
928            };
929            Self {
930                parent: Some(parent_handle),
931                children: Default::default(),
932                all_tasks: Default::default(),
933                subscopes_with_tasks: 0,
934                can_spawn,
935                guards: 0,
936                status,
937                results,
938            }
939        }
940    }
941
942    impl ScopeState {
943        pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
944            &self.all_tasks
945        }
946
947        /// Attempts to add a task to the scope. Returns the task if the scope cannot accept a task
948        /// (since it isn't safe to drop the task whilst the lock is held).
949        pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
950            if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
951                return Some(task);
952            }
953            if self.all_tasks.is_empty() && !self.register_first_task() {
954                return Some(task);
955            }
956            task.wake();
957            assert!(self.all_tasks.insert(task));
958            None
959        }
960
961        pub fn children(&self) -> &HashSet<WeakScopeHandle> {
962            &self.children
963        }
964
965        pub fn insert_child(&mut self, child: WeakScopeHandle) {
966            self.children.insert(child);
967        }
968
969        pub fn remove_child(&mut self, child: &PtrKey) {
970            let found = self.children.remove(child);
971            // This should always succeed unless the scope is being dropped
972            // (in which case children will be empty).
973            assert!(found || self.children.is_empty());
974        }
975
976        pub fn status(&self) -> Status {
977            self.status
978        }
979
980        pub fn guards(&self) -> u32 {
981            self.guards
982        }
983
984        pub fn close(&mut self) {
985            self.can_spawn = false;
986        }
987
988        pub fn mark_finished(&mut self) {
989            self.can_spawn = false;
990            self.status = Status::Finished;
991        }
992
993        pub fn has_tasks(&self) -> bool {
994            self.subscopes_with_tasks > 0
995        }
996
997        pub fn wake_all_with_active_guard(&mut self) {
998            let mut count = 0;
999            for task in &self.all_tasks {
1000                if task.wake_with_active_guard() {
1001                    count += 1;
1002                }
1003            }
1004            self.acquire_cancel_guard(count);
1005        }
1006
1007        pub fn abort_tasks_and_mark_finished(&mut self) {
1008            for task in self.all_tasks() {
1009                if task.abort() {
1010                    task.scope().executor().ready_tasks.push(task.clone());
1011                }
1012                // Don't bother dropping tasks that are finished; the entire
1013                // scope is going to be dropped soon anyway.
1014            }
1015            self.mark_finished();
1016        }
1017
1018        pub fn wake_wakers_and_mark_pending(
1019            this: &mut ConditionGuard<'_, ScopeState>,
1020            wakers: &mut Vec<Waker>,
1021        ) {
1022            wakers.extend(this.drain_wakers());
1023            this.status = Status::PendingCancellation;
1024        }
1025
1026        /// Registers our first task with the parent scope.
1027        ///
1028        /// Returns false if the scope is not allowed to accept a task.
1029        #[must_use]
1030        fn register_first_task(&mut self) -> bool {
1031            if !self.can_spawn {
1032                return false;
1033            }
1034            let can_spawn = match &self.parent {
1035                Some(parent) => {
1036                    // If our parent already knows we have tasks, we can always
1037                    // spawn. Otherwise, we have to recurse.
1038                    self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1039                }
1040                None => true,
1041            };
1042            if can_spawn {
1043                self.subscopes_with_tasks += 1;
1044                debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1045            };
1046            can_spawn
1047        }
1048
1049        fn on_last_task_removed(
1050            this: &mut ConditionGuard<'_, ScopeState>,
1051            num_wakers_hint: usize,
1052            wakers: &mut Vec<Waker>,
1053        ) {
1054            debug_assert!(this.subscopes_with_tasks > 0);
1055            this.subscopes_with_tasks -= 1;
1056            if this.subscopes_with_tasks > 0 {
1057                wakers.reserve(num_wakers_hint);
1058                return;
1059            }
1060
1061            match &this.parent {
1062                Some(parent) => {
1063                    Self::on_last_task_removed(
1064                        &mut parent.lock(),
1065                        num_wakers_hint + this.waker_count(),
1066                        wakers,
1067                    );
1068                }
1069                None => wakers.reserve(num_wakers_hint),
1070            };
1071            wakers.extend(this.drain_wakers());
1072        }
1073
1074        /// Acquires a cancel guard IFF we're not in the finished state.
1075        ///
1076        /// Returns `true` if a guard was acquired.
1077        pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1078            match self.status {
1079                Status::Active | Status::PendingCancellation => {
1080                    self.acquire_cancel_guard(1);
1081                    true
1082                }
1083                Status::Finished => false,
1084            }
1085        }
1086
1087        pub fn acquire_cancel_guard(&mut self, count: u32) {
1088            if count == 0 {
1089                return;
1090            }
1091            if self.guards == 0 {
1092                if let Some(parent) = self.parent.as_ref() {
1093                    parent.acquire_cancel_guard();
1094                }
1095            }
1096            self.guards += count;
1097        }
1098
1099        pub fn release_cancel_guard(
1100            this: &mut ConditionGuard<'_, Self>,
1101            wake_vec: &mut WakeVec,
1102            mut waker_count: usize,
1103        ) {
1104            this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1105            if this.guards == 0 {
1106                waker_count += this.waker_count();
1107                this.on_zero_guards(wake_vec, waker_count);
1108                wake_vec.0.extend(this.drain_wakers())
1109            } else {
1110                wake_vec.0.reserve_exact(waker_count);
1111            }
1112        }
1113
1114        fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1115            match self.status {
1116                Status::Active => {}
1117                Status::PendingCancellation => {
1118                    self.abort_tasks_and_mark_finished();
1119                }
1120                // Acquiring and releasing guards post finished state is a
1121                // no-op.
1122                Status::Finished => {}
1123            }
1124            if let Some(parent) = &self.parent {
1125                ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1126            }
1127        }
1128    }
1129
1130    #[derive(Default)]
1131    pub struct WakeVec(Vec<Waker>);
1132
1133    impl Drop for WakeVec {
1134        fn drop(&mut self) {
1135            for waker in self.0.drain(..) {
1136                waker.wake();
1137            }
1138        }
1139    }
1140
1141    // WakeVec *must* come after the guard because we want the guard to be dropped first.
1142    pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1143
1144    impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1145        fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1146            Self(value, WakeVec::default())
1147        }
1148    }
1149
1150    impl ScopeWaker<'_> {
1151        pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
1152            let task = self.all_tasks.take(&id);
1153            if task.is_some() {
1154                self.on_task_removed(0);
1155            }
1156            task
1157        }
1158
1159        pub fn task_did_finish(&mut self, id: usize) {
1160            if let Some(task) = self.all_tasks.take(&id) {
1161                self.on_task_removed(1);
1162                if !task.is_detached() {
1163                    let maybe_waker = self.results.task_did_finish(task);
1164                    self.1 .0.extend(maybe_waker);
1165                }
1166            }
1167        }
1168
1169        pub fn set_closed_and_drain(
1170            &mut self,
1171        ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1172            self.close();
1173            let all_tasks = std::mem::take(&mut self.all_tasks);
1174            let results = self.results.take();
1175            if !all_tasks.is_empty() {
1176                self.on_task_removed(0)
1177            }
1178            let children = self.children.drain();
1179            (all_tasks, results, children)
1180        }
1181
1182        fn on_task_removed(&mut self, num_wakers_hint: usize) {
1183            if self.all_tasks.is_empty() {
1184                ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1 .0)
1185            }
1186        }
1187
1188        pub fn wake_wakers_and_mark_pending(&mut self) {
1189            let Self(state, wakers) = self;
1190            ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1191        }
1192    }
1193
1194    impl<'a> Deref for ScopeWaker<'a> {
1195        type Target = ConditionGuard<'a, ScopeState>;
1196
1197        fn deref(&self) -> &Self::Target {
1198            &self.0
1199        }
1200    }
1201
1202    impl DerefMut for ScopeWaker<'_> {
1203        fn deref_mut(&mut self) -> &mut Self::Target {
1204            &mut self.0
1205        }
1206    }
1207}
1208
1209struct ScopeInner {
1210    executor: Arc<Executor>,
1211    state: Condition<ScopeState>,
1212    name: String,
1213}
1214
1215impl Drop for ScopeInner {
1216    fn drop(&mut self) {
1217        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1218        // This also complies with the correctness requirements of
1219        // HashSet::remove because the implementations of Hash and Eq match
1220        // between PtrKey and WeakScopeHandle.
1221        let key = unsafe { &*(self as *const _ as *const PtrKey) };
1222        let state = self.state.lock();
1223        if let Some(parent) = &state.parent {
1224            let mut wake_vec = WakeVec::default();
1225            let mut parent_state = parent.lock();
1226            if state.guards() != 0 {
1227                ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1228            }
1229            parent_state.remove_child(key);
1230        }
1231    }
1232}
1233
1234impl ScopeHandle {
1235    fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1236        super::common::TaskHandle::with_current(|task| match task {
1237            Some(task) => f(task.scope()),
1238            None => f(EHandle::local().global_scope()),
1239        })
1240    }
1241
1242    fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1243        self.inner.state.lock()
1244    }
1245
1246    fn downgrade(&self) -> WeakScopeHandle {
1247        WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1248    }
1249
1250    #[inline(always)]
1251    pub(crate) fn executor(&self) -> &Arc<Executor> {
1252        &self.inner.executor
1253    }
1254
1255    /// Marks the task as detached.
1256    pub(crate) fn detach(&self, task_id: usize) {
1257        let _maybe_task = {
1258            let mut state = self.lock();
1259            if let Some(task) = state.all_tasks().get(&task_id) {
1260                task.detach();
1261            }
1262            state.results.detach(task_id)
1263        };
1264    }
1265
1266    /// Aborts the task.
1267    ///
1268    /// # Safety
1269    ///
1270    /// The caller must guarantee that `R` is the correct type.
1271    pub(crate) unsafe fn abort_task<R>(&self, task_id: usize) -> Option<R> {
1272        let mut state = self.lock();
1273        if let Some(task) = state.results.detach(task_id) {
1274            drop(state);
1275            return task.take_result();
1276        }
1277        state.all_tasks().get(&task_id).and_then(|task| {
1278            if task.abort() {
1279                self.inner.executor.ready_tasks.push(task.clone());
1280            }
1281            task.take_result()
1282        })
1283    }
1284
1285    /// Aborts and detaches the task.
1286    pub(crate) fn abort_and_detach(&self, task_id: usize) {
1287        let _tasks = {
1288            let mut state = ScopeWaker::from(self.lock());
1289            let maybe_task1 = state.results.detach(task_id);
1290            let mut maybe_task2 = None;
1291            if let Some(task) = state.all_tasks().get(&task_id) {
1292                match task.abort_and_detach() {
1293                    AbortAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1294                    AbortAndDetachResult::AddToRunQueue => {
1295                        self.inner.executor.ready_tasks.push(task.clone());
1296                    }
1297                    AbortAndDetachResult::Pending => {}
1298                }
1299            }
1300            (maybe_task1, maybe_task2)
1301        };
1302    }
1303
1304    /// Polls for a join result for the given task ID.
1305    ///
1306    /// # Safety
1307    ///
1308    /// The caller must guarantee that `R` is the correct type.
1309    pub(crate) unsafe fn poll_join_result<R>(
1310        &self,
1311        task_id: usize,
1312        cx: &mut Context<'_>,
1313    ) -> Poll<R> {
1314        let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1315        match task.take_result() {
1316            Some(result) => Poll::Ready(result),
1317            None => {
1318                // The task has been aborted so all we can do is forever return pending.
1319                Poll::Pending
1320            }
1321        }
1322    }
1323
1324    /// Polls for the task to be aborted.
1325    pub(crate) unsafe fn poll_aborted<R>(
1326        &self,
1327        task_id: usize,
1328        cx: &mut Context<'_>,
1329    ) -> Poll<Option<R>> {
1330        let task = self.lock().results.poll_join_result(task_id, cx);
1331        task.map(|task| task.take_result())
1332    }
1333
1334    pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1335        let returned_task = self.lock().insert_task(task, for_stream);
1336        returned_task.is_none()
1337    }
1338
1339    /// Drops the specified task.
1340    ///
1341    /// The main task by the single-threaded executor might not be 'static, so we use this to drop
1342    /// the task and make sure we meet lifetime guarantees.  Note that removing the task from our
1343    /// task list isn't sufficient; we must make sure the future running in the task is dropped.
1344    ///
1345    /// # Safety
1346    ///
1347    /// This is unsafe because of the call to `drop_future_unchecked` which requires that no
1348    /// thread is currently polling the task.
1349    pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1350        let mut state = ScopeWaker::from(self.lock());
1351        let task = state.take_task(task_id);
1352        if let Some(task) = task {
1353            task.drop_future_unchecked();
1354        }
1355    }
1356
1357    pub(super) fn task_did_finish(&self, id: usize) {
1358        let mut state = ScopeWaker::from(self.lock());
1359        state.task_did_finish(id);
1360    }
1361
1362    /// Visits scopes by state. If the callback returns `true`, children will
1363    /// be visited.
1364    fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1365        let mut scopes = vec![self.clone()];
1366        while let Some(scope) = scopes.pop() {
1367            let mut scope_waker = ScopeWaker::from(scope.lock());
1368            if callback(&mut scope_waker) {
1369                scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1370            }
1371        }
1372    }
1373
1374    fn acquire_cancel_guard(&self) {
1375        self.lock().acquire_cancel_guard(1)
1376    }
1377
1378    pub(crate) fn release_cancel_guard(&self) {
1379        let mut wake_vec = WakeVec::default();
1380        ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1381    }
1382
1383    /// Cancels tasks in this scope and all child scopes.
1384    fn cancel_all_tasks(&self) {
1385        self.visit_scopes_locked(|state| {
1386            match state.status() {
1387                Status::Active => {
1388                    if state.guards() == 0 {
1389                        state.abort_tasks_and_mark_finished();
1390                    } else {
1391                        state.wake_wakers_and_mark_pending();
1392                    }
1393                    true
1394                }
1395                Status::PendingCancellation => {
1396                    // If we're already pending cancellation, don't wake all
1397                    // tasks. A single wake should be enough here. More
1398                    // wakes on further calls probably hides bugs.
1399                    true
1400                }
1401                Status::Finished => {
1402                    // Already finished.
1403                    false
1404                }
1405            }
1406        });
1407    }
1408
1409    /// Aborts tasks in this scope and all child scopes.
1410    fn abort_all_tasks(&self) {
1411        self.visit_scopes_locked(|state| match state.status() {
1412            Status::Active | Status::PendingCancellation => {
1413                state.abort_tasks_and_mark_finished();
1414                true
1415            }
1416            Status::Finished => false,
1417        });
1418    }
1419
1420    /// Drops tasks in this scope and all child scopes.
1421    ///
1422    /// # Panics
1423    ///
1424    /// Panics if any task is being accessed by another thread. Only call this
1425    /// method when the executor is shutting down and there are no other pollers.
1426    pub(super) fn drop_all_tasks(&self) {
1427        let mut scopes = vec![self.clone()];
1428        while let Some(scope) = scopes.pop() {
1429            let (tasks, join_results) = {
1430                let mut state = ScopeWaker::from(scope.lock());
1431                let (tasks, join_results, children) = state.set_closed_and_drain();
1432                scopes.extend(children.filter_map(|child| child.upgrade()));
1433                (tasks, join_results)
1434            };
1435            // Call task destructors once the scope lock is released so we don't risk a deadlock.
1436            for task in tasks {
1437                task.try_drop().expect("Expected drop to succeed");
1438            }
1439            std::mem::drop(join_results);
1440        }
1441    }
1442}
1443
1444/// Optimizes removal from parent scope.
1445#[repr(transparent)]
1446struct PtrKey;
1447
1448impl Borrow<PtrKey> for WeakScopeHandle {
1449    fn borrow(&self) -> &PtrKey {
1450        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1451        unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1452    }
1453}
1454
1455impl PartialEq for PtrKey {
1456    fn eq(&self, other: &Self) -> bool {
1457        std::ptr::eq(self, other)
1458    }
1459}
1460
1461impl Eq for PtrKey {}
1462
1463impl hash::Hash for PtrKey {
1464    fn hash<H: hash::Hasher>(&self, state: &mut H) {
1465        (self as *const PtrKey).hash(state);
1466    }
1467}
1468
1469#[derive(Default)]
1470struct JoinResults(HashMap<usize, JoinResult>);
1471
1472trait Results: Send + Sync + 'static {
1473    /// Returns true if we allow spawning futures with arbitrary outputs on the scope.
1474    fn can_spawn(&self) -> bool;
1475
1476    /// Polls for the specified task having finished.
1477    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1478
1479    /// Called when a task finishes.
1480    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1481
1482    /// Called to drop any results for a particular task.
1483    fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1484
1485    /// Takes *all* the stored results.
1486    fn take(&mut self) -> Box<dyn Any>;
1487
1488    /// Used only for testing.  Returns true if there are any results registered.
1489    #[cfg(test)]
1490    fn is_empty(&self) -> bool;
1491}
1492
1493impl Results for JoinResults {
1494    fn can_spawn(&self) -> bool {
1495        true
1496    }
1497
1498    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1499        match self.0.entry(task_id) {
1500            Entry::Occupied(mut o) => match o.get_mut() {
1501                JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1502                JoinResult::Result(_) => {
1503                    let JoinResult::Result(task) = o.remove() else { unreachable!() };
1504                    return Poll::Ready(task);
1505                }
1506            },
1507            Entry::Vacant(v) => {
1508                v.insert(JoinResult::Waker(cx.waker().clone()));
1509            }
1510        }
1511        Poll::Pending
1512    }
1513
1514    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1515        match self.0.entry(task.id()) {
1516            Entry::Occupied(mut o) => {
1517                let JoinResult::Waker(waker) =
1518                    std::mem::replace(o.get_mut(), JoinResult::Result(task))
1519                else {
1520                    // It can't be JoinResult::Result because this function is the only
1521                    // function that sets that, and `task_did_finish` won't get called
1522                    // twice.
1523                    unreachable!()
1524                };
1525                Some(waker)
1526            }
1527            Entry::Vacant(v) => {
1528                v.insert(JoinResult::Result(task));
1529                None
1530            }
1531        }
1532    }
1533
1534    fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1535        match self.0.remove(&task_id) {
1536            Some(JoinResult::Result(task)) => Some(task),
1537            _ => None,
1538        }
1539    }
1540
1541    fn take(&mut self) -> Box<dyn Any> {
1542        Box::new(Self(std::mem::take(&mut self.0)))
1543    }
1544
1545    #[cfg(test)]
1546    fn is_empty(&self) -> bool {
1547        self.0.is_empty()
1548    }
1549}
1550
1551#[derive(Default)]
1552struct ResultsStream<R> {
1553    inner: Arc<Mutex<ResultsStreamInner<R>>>,
1554}
1555
1556struct ResultsStreamInner<R> {
1557    results: Vec<R>,
1558    waker: Option<Waker>,
1559}
1560
1561impl<R> Default for ResultsStreamInner<R> {
1562    fn default() -> Self {
1563        Self { results: Vec::new(), waker: None }
1564    }
1565}
1566
1567impl<R: Send + 'static> Results for ResultsStream<R> {
1568    fn can_spawn(&self) -> bool {
1569        false
1570    }
1571
1572    fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1573        Poll::Pending
1574    }
1575
1576    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1577        let mut inner = self.inner.lock();
1578        // SAFETY: R is guaranteed to be the same return type as all futures finishing on this
1579        // scope.
1580        inner.results.extend(unsafe { task.take_result() });
1581        inner.waker.take()
1582    }
1583
1584    fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1585        None
1586    }
1587
1588    fn take(&mut self) -> Box<dyn Any> {
1589        Box::new(std::mem::take(&mut self.inner.lock().results))
1590    }
1591
1592    #[cfg(test)]
1593    fn is_empty(&self) -> bool {
1594        false
1595    }
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600    // NOTE: Tests that work on both the fuchsia and portable runtimes should be placed in
1601    // runtime/scope.rs.
1602
1603    use super::*;
1604    use crate::{
1605        yield_now, EHandle, LocalExecutor, SendExecutor, SpawnableFuture, Task, TestExecutor, Timer,
1606    };
1607    use fuchsia_sync::{Condvar, Mutex};
1608    use futures::channel::mpsc;
1609    use futures::{FutureExt, StreamExt};
1610    use std::future::pending;
1611    use std::pin::{pin, Pin};
1612    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1613    use std::sync::Arc;
1614    use std::task::{Context, Poll};
1615    use std::time::Duration;
1616
1617    #[derive(Default)]
1618    struct RemoteControlFuture(Mutex<RCFState>);
1619    #[derive(Default)]
1620    struct RCFState {
1621        resolved: bool,
1622        waker: Option<Waker>,
1623    }
1624
1625    impl Future for &RemoteControlFuture {
1626        type Output = ();
1627        fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1628            let mut this = self.0.lock();
1629            if this.resolved {
1630                Poll::Ready(())
1631            } else {
1632                this.waker.replace(cx.waker().clone());
1633                Poll::Pending
1634            }
1635        }
1636    }
1637
1638    impl RemoteControlFuture {
1639        fn new() -> Arc<Self> {
1640            Arc::new(Default::default())
1641        }
1642
1643        fn resolve(&self) {
1644            let mut this = self.0.lock();
1645            this.resolved = true;
1646            if let Some(waker) = this.waker.take() {
1647                waker.wake();
1648            }
1649        }
1650
1651        fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1652            let this = Arc::clone(self);
1653            #[allow(clippy::redundant_async_block)] // Allow returning `&*this` out of this fn.
1654            async move {
1655                (&*this).await
1656            }
1657        }
1658    }
1659
1660    #[test]
1661    fn compute_works_on_root_scope() {
1662        let mut executor = TestExecutor::new();
1663        let scope = executor.global_scope();
1664        let mut task = pin!(scope.compute(async { 1 }));
1665        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1666    }
1667
1668    #[test]
1669    fn compute_works_on_new_child() {
1670        let mut executor = TestExecutor::new();
1671        let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1672        let mut task = pin!(scope.compute(async { 1 }));
1673        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1674    }
1675
1676    #[test]
1677    fn scope_drop_cancels_tasks() {
1678        let mut executor = TestExecutor::new();
1679        let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1680        let mut task = pin!(scope.compute(async { 1 }));
1681        drop(scope);
1682        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1683    }
1684
1685    #[test]
1686    fn tasks_do_not_spawn_on_cancelled_scopes() {
1687        let mut executor = TestExecutor::new();
1688        let scope =
1689            executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1690        let handle = scope.to_handle();
1691        let mut cancel = pin!(scope.cancel());
1692        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1693        let mut task = pin!(handle.compute(async { 1 }));
1694        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1695    }
1696
1697    #[test]
1698    fn tasks_do_not_spawn_on_closed_empty_scopes() {
1699        let mut executor = TestExecutor::new();
1700        let scope =
1701            executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1702        let handle = scope.to_handle();
1703        let mut close = pin!(scope.cancel());
1704        assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1705        let mut task = pin!(handle.compute(async { 1 }));
1706        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1707    }
1708
1709    #[test]
1710    fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1711        let mut executor = TestExecutor::new();
1712        let scope = executor.global_scope().new_child();
1713        let handle = scope.to_handle();
1714        handle.spawn(pending());
1715        let mut close = pin!(scope.close());
1716        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1717        let mut task = pin!(handle.compute(async { 1 }));
1718        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1719    }
1720
1721    #[test]
1722    fn spawn_works_on_child_and_grandchild() {
1723        let mut executor = TestExecutor::new();
1724        let scope = executor.global_scope().new_child();
1725        let child = scope.new_child();
1726        let grandchild = child.new_child();
1727        let mut child_task = pin!(child.compute(async { 1 }));
1728        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1729        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1730        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1731    }
1732
1733    #[test]
1734    fn spawn_drop_cancels_child_and_grandchild_tasks() {
1735        let mut executor = TestExecutor::new();
1736        let scope = executor.global_scope().new_child();
1737        let child = scope.new_child();
1738        let grandchild = child.new_child();
1739        let mut child_task = pin!(child.compute(async { 1 }));
1740        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1741        drop(scope);
1742        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1743        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1744    }
1745
1746    #[test]
1747    fn completed_tasks_are_cleaned_up_after_cancel() {
1748        let mut executor = TestExecutor::new();
1749        let scope = executor.global_scope().new_child();
1750
1751        let task1 = scope.spawn(pending::<()>());
1752        let task2 = scope.spawn(async {});
1753        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1754        assert_eq!(scope.lock().all_tasks().len(), 1);
1755
1756        // Running the executor after cancelling the task isn't currently
1757        // necessary, but we might decide to do async cleanup in the future.
1758        assert_eq!(task1.abort().now_or_never(), None);
1759        assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1760
1761        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1762        assert_eq!(scope.lock().all_tasks().len(), 0);
1763        assert!(scope.lock().results.is_empty());
1764    }
1765
1766    #[test]
1767    fn join_emtpy_scope() {
1768        let mut executor = TestExecutor::new();
1769        let scope = executor.global_scope().new_child();
1770        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1771    }
1772
1773    #[test]
1774    fn task_handle_preserves_access_to_result_after_join_begins() {
1775        let mut executor = TestExecutor::new();
1776        let scope = executor.global_scope().new_child();
1777        let mut task = scope.compute(async { 1 });
1778        scope.spawn(async {});
1779        let task2 = scope.spawn(pending::<()>());
1780        // Fuse to stay agnostic as to whether the join completes before or
1781        // after awaiting the task handle.
1782        let mut join = pin!(scope.join().fuse());
1783        let _ = executor.run_until_stalled(&mut join);
1784        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1785        drop(task2.abort());
1786        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1787    }
1788
1789    #[test]
1790    fn join_blocks_until_task_is_cancelled() {
1791        // Scope with one outstanding task handle and one cancelled task.
1792        // The scope is not complete until the outstanding task handle is cancelled.
1793        let mut executor = TestExecutor::new();
1794        let scope = executor.global_scope().new_child();
1795        let outstanding_task = scope.spawn(pending::<()>());
1796        let cancelled_task = scope.spawn(pending::<()>());
1797        assert_eq!(
1798            executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1799            Poll::Ready(None)
1800        );
1801        let mut join = pin!(scope.join());
1802        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1803        assert_eq!(
1804            executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1805            Poll::Ready(None)
1806        );
1807        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1808    }
1809
1810    #[test]
1811    fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1812        let mut executor = TestExecutor::new();
1813        let scope = executor.global_scope().new_child();
1814        // The default is to detach.
1815        scope.spawn(pending::<()>());
1816        let mut join = pin!(scope.join());
1817        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1818        let mut cancel = pin!(join.cancel());
1819        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1820    }
1821
1822    #[test]
1823    fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1824        let mut executor = TestExecutor::new();
1825        let scope = executor.global_scope().new_child();
1826        // The default is to detach.
1827        scope.spawn(pending::<()>());
1828        let mut close = pin!(scope.close());
1829        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1830        let mut cancel = pin!(close.cancel());
1831        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1832    }
1833
1834    #[test]
1835    fn join_scope_blocks_until_spawned_task_completes() {
1836        let mut executor = TestExecutor::new();
1837        let scope = executor.global_scope().new_child();
1838        let remote = RemoteControlFuture::new();
1839        let mut task = scope.spawn(remote.as_future());
1840        let mut scope_join = pin!(scope.join());
1841        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1842        remote.resolve();
1843        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1844        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1845    }
1846
1847    #[test]
1848    fn close_scope_blocks_until_spawned_task_completes() {
1849        let mut executor = TestExecutor::new();
1850        let scope = executor.global_scope().new_child();
1851        let remote = RemoteControlFuture::new();
1852        let mut task = scope.spawn(remote.as_future());
1853        let mut scope_close = pin!(scope.close());
1854        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1855        remote.resolve();
1856        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1857        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1858    }
1859
1860    #[test]
1861    fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1862        let mut executor = TestExecutor::new();
1863        let scope = executor.global_scope().new_child();
1864        let child = scope.new_child();
1865        let remote = RemoteControlFuture::new();
1866        child.spawn(remote.as_future());
1867        let mut scope_join = pin!(scope.join());
1868        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1869        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1870        child.detach();
1871        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1872        remote.resolve();
1873        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1874    }
1875
1876    #[test]
1877    fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1878        let mut executor = TestExecutor::new();
1879        let scope = executor.global_scope().new_child();
1880        let remote = RemoteControlFuture::new();
1881        {
1882            let remote = remote.clone();
1883            scope.spawn(async move {
1884                let child = Scope::new_with_name("child");
1885                child.spawn(async move {
1886                    Scope::current().spawn(remote.as_future());
1887                });
1888                child.detach();
1889            });
1890        }
1891        let mut scope_join = pin!(scope.join());
1892        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1893        remote.resolve();
1894        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1895    }
1896
1897    #[test]
1898    fn join_scope_blocks_when_blocked_child_is_detached() {
1899        let mut executor = TestExecutor::new();
1900        let scope = executor.global_scope().new_child();
1901        let child = scope.new_child();
1902        child.spawn(pending());
1903        let mut scope_join = pin!(scope.join());
1904        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1905        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1906        child.detach();
1907        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1908    }
1909
1910    #[test]
1911    fn join_scope_completes_when_blocked_child_is_cancelled() {
1912        let mut executor = TestExecutor::new();
1913        let scope = executor.global_scope().new_child();
1914        let child = scope.new_child();
1915        child.spawn(pending());
1916        let mut scope_join = pin!(scope.join());
1917        {
1918            let mut child_join = pin!(child.join());
1919            assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1920            assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1921        }
1922        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1923    }
1924
1925    #[test]
1926    fn detached_scope_can_spawn() {
1927        let mut executor = TestExecutor::new();
1928        let scope = executor.global_scope().new_child();
1929        let handle = scope.to_handle();
1930        scope.detach();
1931        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1932    }
1933
1934    #[test]
1935    fn dropped_scope_cannot_spawn() {
1936        let mut executor = TestExecutor::new();
1937        let scope = executor.global_scope().new_child();
1938        let handle = scope.to_handle();
1939        drop(scope);
1940        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1941    }
1942
1943    #[test]
1944    fn dropped_scope_with_running_task_cannot_spawn() {
1945        let mut executor = TestExecutor::new();
1946        let scope = executor.global_scope().new_child();
1947        let handle = scope.to_handle();
1948        let _running_task = handle.spawn(pending::<()>());
1949        drop(scope);
1950        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1951    }
1952
1953    #[test]
1954    fn joined_scope_cannot_spawn() {
1955        let mut executor = TestExecutor::new();
1956        let scope = executor.global_scope().new_child();
1957        let handle = scope.to_handle();
1958        let mut scope_join = pin!(scope.join());
1959        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1960        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1961    }
1962
1963    #[test]
1964    fn joining_scope_with_running_task_can_spawn() {
1965        let mut executor = TestExecutor::new();
1966        let scope = executor.global_scope().new_child();
1967        let handle = scope.to_handle();
1968        let _running_task = handle.spawn(pending::<()>());
1969        let mut scope_join = pin!(scope.join());
1970        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1971        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1972    }
1973
1974    #[test]
1975    fn joined_scope_child_cannot_spawn() {
1976        let mut executor = TestExecutor::new();
1977        let scope = executor.global_scope().new_child();
1978        let handle = scope.to_handle();
1979        let child_before_join = scope.new_child();
1980        assert_eq!(
1981            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1982            Poll::Ready(1)
1983        );
1984        let mut scope_join = pin!(scope.join());
1985        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1986        let child_after_join = handle.new_child();
1987        let grandchild_after_join = child_before_join.new_child();
1988        assert_eq!(
1989            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1990            Poll::Pending
1991        );
1992        assert_eq!(
1993            executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
1994            Poll::Pending
1995        );
1996        assert_eq!(
1997            executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
1998            Poll::Pending
1999        );
2000    }
2001
2002    #[test]
2003    fn closed_scope_child_cannot_spawn() {
2004        let mut executor = TestExecutor::new();
2005        let scope = executor.global_scope().new_child();
2006        let handle = scope.to_handle();
2007        let child_before_close = scope.new_child();
2008        assert_eq!(
2009            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2010            Poll::Ready(1)
2011        );
2012        let mut scope_close = pin!(scope.close());
2013        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2014        let child_after_close = handle.new_child();
2015        let grandchild_after_close = child_before_close.new_child();
2016        assert_eq!(
2017            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2018            Poll::Pending
2019        );
2020        assert_eq!(
2021            executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2022            Poll::Pending
2023        );
2024        assert_eq!(
2025            executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2026            Poll::Pending
2027        );
2028    }
2029
2030    #[test]
2031    fn can_join_child_first() {
2032        let mut executor = TestExecutor::new();
2033        let scope = executor.global_scope().new_child();
2034        let child = scope.new_child();
2035        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2036        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2037        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2038    }
2039
2040    #[test]
2041    fn can_join_parent_first() {
2042        let mut executor = TestExecutor::new();
2043        let scope = executor.global_scope().new_child();
2044        let child = scope.new_child();
2045        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2046        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2047        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2048    }
2049
2050    #[test]
2051    fn task_in_parent_scope_can_join_child() {
2052        let mut executor = TestExecutor::new();
2053        let scope = executor.global_scope().new_child();
2054        let child = scope.new_child();
2055        let remote = RemoteControlFuture::new();
2056        child.spawn(remote.as_future());
2057        scope.spawn(async move { child.join().await });
2058        let mut join = pin!(scope.join());
2059        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2060        remote.resolve();
2061        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2062    }
2063
2064    #[test]
2065    fn join_completes_while_completed_task_handle_is_held() {
2066        let mut executor = TestExecutor::new();
2067        let scope = executor.global_scope().new_child();
2068        let mut task = scope.compute(async { 1 });
2069        scope.spawn(async {});
2070        let mut join = pin!(scope.join());
2071        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2072        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2073    }
2074
2075    #[test]
2076    fn cancel_completes_while_task_holds_handle() {
2077        let mut executor = TestExecutor::new();
2078        let scope = executor.global_scope().new_child();
2079        let handle = scope.to_handle();
2080        let mut task = scope.compute(async move {
2081            loop {
2082                pending::<()>().await; // never returns
2083                handle.spawn(async {});
2084            }
2085        });
2086
2087        // Join should not complete because the task never does.
2088        let mut join = pin!(scope.join());
2089        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2090
2091        let mut cancel = pin!(join.cancel());
2092        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2093        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2094    }
2095
2096    #[test]
2097    fn cancel_from_handle_inside_task() {
2098        let mut executor = TestExecutor::new();
2099        let scope = executor.global_scope().new_child();
2100        {
2101            // Spawn a task that never finishes until the scope is cancelled.
2102            scope.spawn(pending::<()>());
2103
2104            let mut no_tasks = pin!(scope.on_no_tasks());
2105            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2106
2107            let handle = scope.to_handle();
2108            scope.spawn(async move {
2109                handle.cancel().await;
2110                panic!("cancel() should never complete");
2111            });
2112
2113            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2114        }
2115        assert_eq!(scope.join().now_or_never(), Some(()));
2116    }
2117
2118    #[test]
2119    fn can_spawn_from_non_executor_thread() {
2120        let mut executor = TestExecutor::new();
2121        let scope = executor.global_scope().clone();
2122        let done = Arc::new(AtomicBool::new(false));
2123        let done_clone = done.clone();
2124        let _ = std::thread::spawn(move || {
2125            scope.spawn(async move {
2126                done_clone.store(true, Ordering::Relaxed);
2127            })
2128        })
2129        .join();
2130        let _ = executor.run_until_stalled(&mut pending::<()>());
2131        assert!(done.load(Ordering::Relaxed));
2132    }
2133
2134    #[test]
2135    fn scope_tree() {
2136        // A
2137        //  \
2138        //   B
2139        //  / \
2140        // C   D
2141        let mut executor = TestExecutor::new();
2142        let a = executor.global_scope().new_child();
2143        let b = a.new_child();
2144        let c = b.new_child();
2145        let d = b.new_child();
2146        let a_remote = RemoteControlFuture::new();
2147        let c_remote = RemoteControlFuture::new();
2148        let d_remote = RemoteControlFuture::new();
2149        a.spawn(a_remote.as_future());
2150        c.spawn(c_remote.as_future());
2151        d.spawn(d_remote.as_future());
2152        let mut a_join = pin!(a.join());
2153        let mut b_join = pin!(b.join());
2154        let mut d_join = pin!(d.join());
2155        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2156        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2157        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2158        d_remote.resolve();
2159        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2160        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2161        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2162        c_remote.resolve();
2163        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2164        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2165        a_remote.resolve();
2166        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2167        let mut c_join = pin!(c.join());
2168        assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2169    }
2170
2171    #[test]
2172    fn wake_all_with_active_guard_on_send_executor() {
2173        let mut executor = SendExecutor::new(2);
2174        let scope = executor.root_scope().new_child();
2175
2176        let (tx, mut rx) = mpsc::unbounded();
2177        // Bottom 32 bits are the poll count. Top 32 bits are when to signal.
2178        let state = Arc::new(AtomicU64::new(0));
2179
2180        struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2181
2182        impl Future for PollCounter {
2183            type Output = ();
2184            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2185                let old = self.0.fetch_add(1, Ordering::Relaxed);
2186                if old >> 32 == (old + 1) & u32::MAX as u64 {
2187                    let _ = self.1.unbounded_send(());
2188                }
2189                Poll::Pending
2190            }
2191        }
2192
2193        scope.spawn(PollCounter(state.clone(), tx.clone()));
2194        scope.spawn(PollCounter(state.clone(), tx.clone()));
2195
2196        executor.run(async move {
2197            let mut wait_for_poll_count = async |count| {
2198                let old = state.fetch_or(count << 32, Ordering::Relaxed);
2199                if old & u32::MAX as u64 != count {
2200                    rx.next().await.unwrap();
2201                }
2202                state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2203            };
2204
2205            // We must assume the executor will only poll the two tasks once each.
2206            wait_for_poll_count(2).await;
2207
2208            let mut start_count = 2;
2209            for _ in 0..2 {
2210                scope.wake_all_with_active_guard();
2211
2212                wait_for_poll_count(start_count + 2).await;
2213                start_count += 2;
2214            }
2215
2216            // Wake, then cancel the scope and verify the tasks still get polled.
2217            scope.wake_all_with_active_guard();
2218            let done = scope.cancel();
2219
2220            wait_for_poll_count(start_count + 2).await;
2221
2222            done.await;
2223        });
2224    }
2225
2226    #[test]
2227    fn on_no_tasks_race() {
2228        fn sleep_random() {
2229            use rand::Rng;
2230            std::thread::sleep(std::time::Duration::from_micros(
2231                rand::thread_rng().gen_range(0..10),
2232            ));
2233        }
2234        for _ in 0..2000 {
2235            let mut executor = SendExecutor::new(2);
2236            let scope = executor.root_scope().new_child();
2237            scope.spawn(async {
2238                sleep_random();
2239            });
2240            executor.run(async move {
2241                sleep_random();
2242                scope.on_no_tasks().await;
2243            });
2244        }
2245    }
2246
2247    #[test]
2248    fn test_detach() {
2249        let mut e = LocalExecutor::new();
2250        e.run_singlethreaded(async {
2251            let counter = Arc::new(AtomicU32::new(0));
2252
2253            {
2254                let counter = counter.clone();
2255                Task::spawn(async move {
2256                    for _ in 0..5 {
2257                        yield_now().await;
2258                        counter.fetch_add(1, Ordering::Relaxed);
2259                    }
2260                })
2261                .detach();
2262            }
2263
2264            while counter.load(Ordering::Relaxed) != 5 {
2265                yield_now().await;
2266            }
2267        });
2268
2269        assert!(e.ehandle.root_scope.lock().results.is_empty());
2270    }
2271
2272    #[test]
2273    fn test_cancel() {
2274        let mut e = LocalExecutor::new();
2275        e.run_singlethreaded(async {
2276            let ref_count = Arc::new(());
2277            // First, just drop the task.
2278            {
2279                let ref_count = ref_count.clone();
2280                drop(Task::spawn(async move {
2281                    let _ref_count = ref_count;
2282                    let _: () = std::future::pending().await;
2283                }));
2284            }
2285
2286            while Arc::strong_count(&ref_count) != 1 {
2287                yield_now().await;
2288            }
2289
2290            // Now try explicitly cancelling.
2291            let task = {
2292                let ref_count = ref_count.clone();
2293                Task::spawn(async move {
2294                    let _ref_count = ref_count;
2295                    let _: () = std::future::pending().await;
2296                })
2297            };
2298
2299            assert_eq!(task.abort().await, None);
2300            while Arc::strong_count(&ref_count) != 1 {
2301                yield_now().await;
2302            }
2303
2304            // Now cancel a task that has already finished.
2305            let task = {
2306                let ref_count = ref_count.clone();
2307                Task::spawn(async move {
2308                    let _ref_count = ref_count;
2309                })
2310            };
2311
2312            // Wait for it to finish.
2313            while Arc::strong_count(&ref_count) != 1 {
2314                yield_now().await;
2315            }
2316
2317            assert_eq!(task.abort().await, Some(()));
2318        });
2319
2320        assert!(e.ehandle.root_scope.lock().results.is_empty());
2321    }
2322
2323    #[test]
2324    fn test_cancel_waits() {
2325        let mut executor = SendExecutor::new(2);
2326        let running = Arc::new((Mutex::new(false), Condvar::new()));
2327        let task = {
2328            let running = running.clone();
2329            executor.root_scope().compute(async move {
2330                *running.0.lock() = true;
2331                running.1.notify_all();
2332                std::thread::sleep(std::time::Duration::from_millis(10));
2333                *running.0.lock() = false;
2334                "foo"
2335            })
2336        };
2337        executor.run(async move {
2338            {
2339                let mut guard = running.0.lock();
2340                while !*guard {
2341                    running.1.wait(&mut guard);
2342                }
2343            }
2344            assert_eq!(task.abort().await, Some("foo"));
2345            assert!(!*running.0.lock());
2346        });
2347    }
2348
2349    fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2350        let mut executor = SendExecutor::new(2);
2351        let running = Arc::new((Mutex::new(false), Condvar::new()));
2352        let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2353        let task = {
2354            let running = running.clone();
2355            let can_quit = can_quit.clone();
2356            executor.root_scope().compute(async move {
2357                *running.0.lock() = true;
2358                running.1.notify_all();
2359                {
2360                    let mut guard = can_quit.0.lock();
2361                    while !*guard {
2362                        can_quit.1.wait(&mut guard);
2363                    }
2364                }
2365                *running.0.lock() = false;
2366            })
2367        };
2368        executor.run(async move {
2369            {
2370                let mut guard = running.0.lock();
2371                while !*guard {
2372                    running.1.wait(&mut guard);
2373                }
2374            }
2375
2376            callback(task);
2377
2378            *can_quit.0.lock() = true;
2379            can_quit.1.notify_all();
2380
2381            let ehandle = EHandle::local();
2382            let scope = ehandle.global_scope();
2383
2384            // The only way of testing for this is to poll.
2385            while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2386                Timer::new(std::time::Duration::from_millis(1)).await;
2387            }
2388
2389            assert!(!*running.0.lock());
2390        });
2391    }
2392
2393    #[test]
2394    fn test_dropped_cancel_cleans_up() {
2395        test_clean_up(|task| {
2396            let abort_fut = std::pin::pin!(task.abort());
2397            let waker = futures::task::noop_waker();
2398            assert!(abort_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2399        });
2400    }
2401
2402    #[test]
2403    fn test_dropped_task_cleans_up() {
2404        test_clean_up(|task| {
2405            std::mem::drop(task);
2406        });
2407    }
2408
2409    #[test]
2410    fn test_detach_cleans_up() {
2411        test_clean_up(|task| {
2412            task.detach();
2413        });
2414    }
2415
2416    #[test]
2417    fn test_scope_stream() {
2418        let mut executor = SendExecutor::new(2);
2419        executor.run(async move {
2420            let (stream, handle) = ScopeStream::new();
2421            handle.push(async { 1 });
2422            handle.push(async { 2 });
2423            stream.close();
2424            let results: HashSet<_> = stream.collect().await;
2425            assert_eq!(results, HashSet::from_iter([1, 2]));
2426        });
2427    }
2428
2429    #[test]
2430    fn test_scope_stream_wakes_properly() {
2431        let mut executor = SendExecutor::new(2);
2432        executor.run(async move {
2433            let (stream, handle) = ScopeStream::new();
2434            handle.push(async {
2435                Timer::new(Duration::from_millis(10)).await;
2436                1
2437            });
2438            handle.push(async {
2439                Timer::new(Duration::from_millis(10)).await;
2440                2
2441            });
2442            stream.close();
2443            let results: HashSet<_> = stream.collect().await;
2444            assert_eq!(results, HashSet::from_iter([1, 2]));
2445        });
2446    }
2447
2448    #[test]
2449    fn test_scope_stream_drops_spawned_tasks() {
2450        let mut executor = SendExecutor::new(2);
2451        executor.run(async move {
2452            let (stream, handle) = ScopeStream::new();
2453            handle.push(async { 1 });
2454            let _task = stream.compute(async { "foo" });
2455            stream.close();
2456            let results: HashSet<_> = stream.collect().await;
2457            assert_eq!(results, HashSet::from_iter([1]));
2458        });
2459    }
2460
2461    #[test]
2462    fn test_nested_scope_stream() {
2463        let mut executor = SendExecutor::new(2);
2464        executor.run(async move {
2465            let (mut stream, handle) = ScopeStream::new();
2466            handle.clone().push(async move {
2467                handle.clone().push(async move {
2468                    handle.clone().push(async move { 3 });
2469                    2
2470                });
2471                1
2472            });
2473            let mut results = HashSet::default();
2474            while let Some(item) = stream.next().await {
2475                results.insert(item);
2476                if results.len() == 3 {
2477                    stream.close();
2478                }
2479            }
2480            assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2481        });
2482    }
2483
2484    #[test]
2485    fn test_dropping_scope_stream_cancels_all_tasks() {
2486        let mut executor = SendExecutor::new(2);
2487        executor.run(async move {
2488            let (stream, handle) = ScopeStream::new();
2489            let (tx1, mut rx) = mpsc::unbounded::<()>();
2490            let tx2 = tx1.clone();
2491            handle.push(async move {
2492                let _tx1 = tx1;
2493                let () = pending().await;
2494            });
2495            handle.push(async move {
2496                let _tx2 = tx2;
2497                let () = pending().await;
2498            });
2499            drop(stream);
2500
2501            // This will wait forever if the tasks aren't cancelled.
2502            assert_eq!(rx.next().await, None);
2503        });
2504    }
2505
2506    #[test]
2507    fn test_scope_stream_collect() {
2508        let mut executor = SendExecutor::new(2);
2509        executor.run(async move {
2510            let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2511            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2512
2513            let stream: ScopeStream<_> =
2514                (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2515            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2516        });
2517    }
2518
2519    struct DropSignal(Arc<AtomicBool>);
2520
2521    impl Drop for DropSignal {
2522        fn drop(&mut self) {
2523            self.0.store(true, Ordering::SeqCst);
2524        }
2525    }
2526
2527    struct DropChecker(Arc<AtomicBool>);
2528
2529    impl DropChecker {
2530        fn new() -> (Self, DropSignal) {
2531            let inner = Arc::new(AtomicBool::new(false));
2532            (Self(inner.clone()), DropSignal(inner))
2533        }
2534
2535        fn is_dropped(&self) -> bool {
2536            self.0.load(Ordering::SeqCst)
2537        }
2538    }
2539
2540    #[test]
2541    fn child_finished_when_parent_pending() {
2542        let mut executor = LocalExecutor::new();
2543        executor.run_singlethreaded(async {
2544            let scope = Scope::new();
2545            let _guard = scope.active_guard().expect("acquire guard");
2546            let cancel = scope.to_handle().cancel();
2547            let child = scope.new_child();
2548            let (checker, signal) = DropChecker::new();
2549            child.spawn(async move {
2550                let _signal = signal;
2551                futures::future::pending::<()>().await
2552            });
2553            assert!(checker.is_dropped());
2554            assert!(child.active_guard().is_none());
2555            cancel.await;
2556        })
2557    }
2558
2559    #[test]
2560    fn guarded_scopes_observe_closed() {
2561        let mut executor = LocalExecutor::new();
2562        executor.run_singlethreaded(async {
2563            let scope = Scope::new();
2564            let handle = scope.to_handle();
2565            let _guard = scope.active_guard().expect("acquire guard");
2566            handle.close();
2567            let (checker, signal) = DropChecker::new();
2568            handle.spawn(async move {
2569                let _signal = signal;
2570                futures::future::pending::<()>().await
2571            });
2572            assert!(checker.is_dropped());
2573            let (checker, signal) = DropChecker::new();
2574            let cancel = handle.clone().cancel();
2575            handle.spawn(async move {
2576                let _signal = signal;
2577                futures::future::pending::<()>().await
2578            });
2579            assert!(checker.is_dropped());
2580            scope.join().await;
2581            cancel.await;
2582        })
2583    }
2584
2585    #[test]
2586    fn child_guard_holds_parent_cancellation() {
2587        let mut executor = TestExecutor::new();
2588        let scope = executor.global_scope().new_child();
2589        let child = scope.new_child();
2590        let guard = child.active_guard().expect("acquire guard");
2591        scope.spawn(futures::future::pending());
2592        let mut join = pin!(scope.cancel());
2593        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2594        drop(guard);
2595        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2596    }
2597
2598    #[test]
2599    fn active_guard_on_cancel() {
2600        let mut executor = TestExecutor::new();
2601        let scope = executor.global_scope().new_child();
2602        let child1 = scope.new_child();
2603        let child2 = scope.new_child();
2604        let guard = child1.active_guard().expect("acquire guard");
2605        let guard_for_right_scope = guard.clone();
2606        let guard_for_wrong_scope = guard.clone();
2607        child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2608        child2.spawn(async move {
2609            guard_for_wrong_scope.on_cancel().await;
2610        });
2611
2612        let handle = scope.to_handle();
2613        let mut join = pin!(scope.join());
2614        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2615        let cancel: Join<_> = handle.cancel();
2616        drop(cancel);
2617        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2618    }
2619
2620    #[test]
2621    fn abort_join() {
2622        let mut executor = TestExecutor::new();
2623        let scope = executor.global_scope().new_child();
2624        let child = scope.new_child();
2625        let _guard = child.active_guard().expect("acquire guard");
2626
2627        let (checker1, signal) = DropChecker::new();
2628        scope.spawn(async move {
2629            let _signal = signal;
2630            futures::future::pending::<()>().await
2631        });
2632        let (checker2, signal) = DropChecker::new();
2633        scope.spawn(async move {
2634            let _signal = signal;
2635            futures::future::pending::<()>().await
2636        });
2637
2638        let mut join = pin!(scope.cancel());
2639        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2640        assert!(!checker1.is_dropped());
2641        assert!(!checker2.is_dropped());
2642
2643        let mut join = join.abort();
2644        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2645        assert!(checker1.is_dropped());
2646        assert!(checker2.is_dropped());
2647    }
2648
2649    #[test]
2650    fn child_without_guard_aborts_immediately_on_cancel() {
2651        let mut executor = TestExecutor::new();
2652        let scope = executor.global_scope().new_child();
2653        let child = scope.new_child();
2654        let guard = scope.active_guard().expect("acquire guard");
2655
2656        let (checker_scope, signal) = DropChecker::new();
2657        scope.spawn(async move {
2658            let _signal = signal;
2659            futures::future::pending::<()>().await
2660        });
2661        let (checker_child, signal) = DropChecker::new();
2662        child.spawn(async move {
2663            let _signal = signal;
2664            futures::future::pending::<()>().await
2665        });
2666
2667        let mut join = pin!(scope.cancel());
2668        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2669        assert!(!checker_scope.is_dropped());
2670        assert!(checker_child.is_dropped());
2671
2672        drop(guard);
2673        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2674        assert!(checker_child.is_dropped());
2675    }
2676}