fuchsia_async/runtime/fuchsia/executor/
scope.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::EHandle;
9use crate::condition::{Condition, ConditionGuard, WakerEntry};
10use core::{error, fmt};
11use fuchsia_sync::Mutex;
12use futures::Stream;
13use pin_project_lite::pin_project;
14use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
15use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
16use std::any::Any;
17use std::borrow::Borrow;
18use std::collections::hash_map::Entry;
19use std::collections::hash_set;
20use std::future::{Future, IntoFuture};
21use std::hash;
22use std::marker::PhantomData;
23use std::mem::{self, ManuallyDrop};
24use std::ops::{Deref, DerefMut};
25use std::pin::Pin;
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll, Waker, ready};
28
29//
30// # Public API
31//
32
33/// A scope for managing async tasks. This scope is aborted when dropped.
34///
35/// Scopes are how fuchsia-async implements [structured concurrency][sc]. Every
36/// task is spawned on a scope, and runs until either the task completes or the
37/// scope is cancelled or aborted. In addition to owning tasks, scopes may own
38/// child scopes, forming a nested structure.
39///
40/// Scopes are usually joined or cancelled when the owning code is done with
41/// them. This makes it easier to reason about when a background task might
42/// still be running. Note that in multithreaded contexts it is safer to cancel
43/// and await a scope explicitly than to drop it, because the destructor is not
44/// synchronized with other threads that might be running a task.
45///
46/// [`Task::spawn`][crate::Task::spawn] and related APIs spawn on the root scope
47/// of the executor. New code is encouraged to spawn directly on scopes instead,
48/// passing their handles as a way of documenting when a function might spawn
49/// tasks that run in the background and reasoning about their side effects.
50///
51/// ## Scope lifecycle
52///
53/// When a scope is created it is open, meaning it accepts new tasks. Scopes are
54/// closed when one of the following happens:
55///
56/// 1. When [`close()`][Scope::close] is called.
57/// 2. When the scope is aborted or dropped, the scope is closed immediately.
58/// 3. When the scope is cancelled, the scope is closed when all active guards
59///    are dropped.
60/// 4. When the scope is joined and all tasks complete, the scope is closed
61///    before the join future resolves.
62///
63/// When a scope is closed it no longer accepts tasks. Tasks spawned on the
64/// scope are dropped immediately, and their [`Task`][crate::Task] or
65/// [`JoinHandle`][crate::JoinHandle] futures never resolve. This applies
66/// transitively to all child scopes. Closed scopes cannot currently be
67/// reopened.
68///
69/// Scopes can also be detached, in which case they are never closed, and run
70/// until the completion of all tasks.
71///
72/// [sc]: https://en.wikipedia.org/wiki/Structured_concurrency
73#[must_use = "Scopes should be explicitly awaited or cancelled"]
74#[derive(Debug)]
75pub struct Scope {
76    // LINT.IfChange
77    inner: ScopeHandle,
78    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
79}
80
81impl Default for Scope {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl Scope {
88    /// Create a new scope.
89    ///
90    /// The returned scope is a child of the current scope.
91    ///
92    /// # Panics
93    ///
94    /// May panic if not called in the context of an executor (e.g. within a
95    /// call to [`run`][crate::SendExecutor::run]).
96    pub fn new() -> Scope {
97        ScopeHandle::with_current(|handle| handle.new_child())
98    }
99
100    /// Create a new scope with a name.
101    ///
102    /// The returned scope is a child of the current scope.
103    ///
104    /// # Panics
105    ///
106    /// May panic if not called in the context of an executor (e.g. within a
107    /// call to [`run`][crate::SendExecutor::run]).
108    pub fn new_with_name(name: impl Into<String>) -> Scope {
109        ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
110    }
111
112    /// Get the scope of the current task, or the global scope if there is no task
113    /// being polled.
114    ///
115    /// # Panics
116    ///
117    /// May panic if not called in the context of an executor (e.g. within a
118    /// call to [`run`][crate::SendExecutor::run]).
119    pub fn current() -> ScopeHandle {
120        ScopeHandle::with_current(|handle| handle.clone())
121    }
122
123    /// Get the global scope of the executor.
124    ///
125    /// This can be used to spawn tasks that live as long as the executor.
126    /// Usually, this means until the end of the program or test. This should
127    /// only be done for tasks where this is expected. If in doubt, spawn on a
128    /// shorter lived scope instead.
129    ///
130    /// In code that uses scopes, you are strongly encouraged to use this API
131    /// instead of the spawn APIs on [`Task`][crate::Task].
132    ///
133    /// All scopes are descendants of the global scope.
134    ///
135    /// # Panics
136    ///
137    /// May panic if not called in the context of an executor (e.g. within a
138    /// call to [`run`][crate::SendExecutor::run]).
139    pub fn global() -> ScopeHandle {
140        EHandle::local().global_scope().clone()
141    }
142
143    /// Create a child scope.
144    pub fn new_child(&self) -> Scope {
145        self.inner.new_child()
146    }
147
148    /// Create a child scope with a name.
149    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
150        self.inner.new_child_with_name(name.into())
151    }
152
153    /// Returns the name of the scope.
154    pub fn name(&self) -> &str {
155        &self.inner.inner.name
156    }
157
158    /// Create a [`ScopeHandle`] that may be used to spawn tasks on this scope.
159    ///
160    /// This is a shorthand for `scope.as_handle().clone()`.
161    ///
162    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
163    /// available. Note that you should _not_ call `scope.clone()`, even though
164    /// the compiler allows it due to the Deref impl. Call this method instead.
165    pub fn to_handle(&self) -> ScopeHandle {
166        self.inner.clone()
167    }
168
169    /// Get a reference to a [`ScopeHandle`] that may be used to spawn tasks on
170    /// this scope.
171    ///
172    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
173    /// available. If you have a `Scope` but need a `&ScopeHandle`, prefer
174    /// calling this method over the less readable `&*scope`.
175    pub fn as_handle(&self) -> &ScopeHandle {
176        &self.inner
177    }
178
179    /// Wait for all tasks in the scope and its children to complete.
180    ///
181    /// New tasks will be accepted on the scope until every task completes and
182    /// this future resolves.
183    ///
184    /// Note that you can await a scope directly because it implements
185    /// `IntoFuture`. `scope.join().await` is a more explicit form of
186    /// `scope.await`.
187    pub fn join(self) -> Join {
188        Join::new(self)
189    }
190
191    /// Stop accepting new tasks on the scope. Returns a future that waits for
192    /// every task on the scope to complete.
193    pub fn close(self) -> Join {
194        self.inner.close();
195        Join::new(self)
196    }
197
198    /// Cancel all tasks cooperatively in the scope and its children
199    /// recursively.
200    ///
201    /// `cancel` first gives a chance to all child tasks (including tasks of
202    /// child scopes) to shutdown cleanly if they're holding on to a
203    /// [`ScopeActiveGuard`]. Once no child tasks are holding on to guards, then
204    /// `cancel` behaves like [`Scope::abort`], dropping all tasks and stopping
205    /// them from running at the next yield point. A [`ScopeActiveGuard`]
206    /// provides a cooperative cancellation signal that is triggered by this
207    /// call, see its documentation for more details.
208    ///
209    /// Once the returned future resolves, no task on the scope will be polled
210    /// again.
211    ///
212    /// Cancelling a scope _does not_ immediately prevent new tasks from being
213    /// accepted. New tasks are accepted as long as there are
214    /// `ScopeActiveGuard`s for this scope.
215    pub fn cancel(self) -> Join {
216        self.inner.cancel_all_tasks();
217        Join::new(self)
218    }
219
220    /// Cancel all tasks in the scope and its children recursively.
221    ///
222    /// Once the returned future resolves, no task on the scope will be polled
223    /// again. Unlike [`Scope::cancel`], this doesn't send a cooperative
224    /// cancellation signal to tasks or child scopes.
225    ///
226    /// When a scope is aborted it immediately stops accepting tasks. Handles of
227    /// tasks spawned on the scope will pend forever.
228    ///
229    /// Dropping the `Scope` object is equivalent to calling this method and
230    /// discarding the returned future. Awaiting the future is preferred because
231    /// it eliminates the possibility of a task poll completing on another
232    /// thread after the scope object has been dropped, which can sometimes
233    /// result in surprising behavior.
234    pub fn abort(self) -> impl Future<Output = ()> {
235        self.inner.abort_all_tasks();
236        Join::new(self)
237    }
238
239    /// Detach the scope, allowing its tasks to continue running in the
240    /// background.
241    ///
242    /// Tasks of a detached scope are still subject to join and cancel
243    /// operations on parent scopes.
244    pub fn detach(self) {
245        // Use ManuallyDrop to destructure self, because Rust doesn't allow this
246        // for types which implement Drop.
247        let this = ManuallyDrop::new(self);
248        // SAFETY: this.inner is obviously valid, and we don't access `this`
249        // after moving.
250        mem::drop(unsafe { std::ptr::read(&this.inner) });
251    }
252}
253
254/// Abort the scope and all of its tasks. Prefer using the [`Scope::abort`]
255/// or [`Scope::join`] methods.
256impl Drop for Scope {
257    fn drop(&mut self) {
258        // Abort all tasks in the scope. Each task has a strong reference to the ScopeState,
259        // which will be dropped after all the tasks in the scope are dropped.
260
261        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
262        // here, but we cannot do that without either:
263        // - Sync drop support in AtomicFuture, or
264        // - The ability to reparent tasks, which requires atomic_arc or
265        //   acquiring a mutex during polling.
266        self.inner.abort_all_tasks();
267    }
268}
269
270impl IntoFuture for Scope {
271    type Output = ();
272    type IntoFuture = Join;
273    fn into_future(self) -> Self::IntoFuture {
274        self.join()
275    }
276}
277
278impl Deref for Scope {
279    type Target = ScopeHandle;
280    fn deref(&self) -> &Self::Target {
281        &self.inner
282    }
283}
284
285impl Borrow<ScopeHandle> for Scope {
286    fn borrow(&self) -> &ScopeHandle {
287        self
288    }
289}
290
291pin_project! {
292    /// Join operation for a [`Scope`].
293    ///
294    /// This is a future that resolves when all tasks on the scope are complete
295    /// or have been cancelled. New tasks will be accepted on the scope until
296    /// every task completes and this future resolves.
297    ///
298    /// When this object is dropped, the scope and all tasks in it are
299    /// cancelled.
300    //
301    // Note: The drop property is only true when S = Scope; it does not apply to
302    // other (non-public) uses of this struct where S = ScopeHandle.
303    pub struct Join<S = Scope> {
304        scope: S,
305        #[pin]
306        waker_entry: WakerEntry<ScopeState>,
307    }
308}
309
310impl<S: Borrow<ScopeHandle>> Join<S> {
311    fn new(scope: S) -> Self {
312        let waker_entry = scope.borrow().inner.state.waker_entry();
313        Self { scope, waker_entry }
314    }
315
316    /// Aborts the scope. The future will resolve when all tasks have finished
317    /// polling.
318    ///
319    /// See [`Scope::abort`] for more details.
320    pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
321        self.scope.borrow().abort_all_tasks();
322        self
323    }
324
325    /// Cancel the scope. The future will resolve when all tasks have finished
326    /// polling.
327    ///
328    /// See [`Scope::cancel`] for more details.
329    pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
330        self.scope.borrow().cancel_all_tasks();
331        self
332    }
333}
334
335impl<S> Future for Join<S>
336where
337    S: Borrow<ScopeHandle>,
338{
339    type Output = ();
340    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341        let this = self.project();
342        let mut state = Borrow::borrow(&*this.scope).lock();
343        if state.has_tasks() {
344            state.add_waker(this.waker_entry, cx.waker().clone());
345            Poll::Pending
346        } else {
347            state.mark_finished();
348            Poll::Ready(())
349        }
350    }
351}
352
353/// Trait for things that can be spawned on to a scope.  There is a blanket implementation
354/// below for futures.
355pub trait Spawnable {
356    /// The type of value produced on completion.
357    type Output;
358
359    /// Converts to a task that can be spawned directly.
360    fn into_task(self, scope: ScopeHandle) -> TaskHandle;
361}
362
363impl<F: Future + Send + 'static> Spawnable for F
364where
365    F::Output: Send + 'static,
366{
367    type Output = F::Output;
368
369    fn into_task(self, scope: ScopeHandle) -> TaskHandle {
370        scope.new_task(None, self)
371    }
372}
373
374/// A handle to a scope, which may be used to spawn tasks.
375///
376/// ## Ownership and cycles
377///
378/// Tasks running on a `Scope` may hold a `ScopeHandle` to that scope. This does
379/// not create an ownership cycle because the task will drop the `ScopeHandle`
380/// once it completes or is cancelled.
381///
382/// Naturally, scopes containing tasks that never complete and that are never
383/// cancelled will never be freed. Holding a `ScopeHandle` does not contribute to
384/// this problem.
385#[derive(Clone)]
386pub struct ScopeHandle {
387    // LINT.IfChange
388    inner: Arc<ScopeInner>,
389    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
390}
391
392impl ScopeHandle {
393    /// Create a child scope.
394    pub fn new_child(&self) -> Scope {
395        self.new_child_inner(String::new())
396    }
397
398    /// Returns a reference to the instrument data.
399    pub fn instrument_data(&self) -> Option<&(dyn Any + Send + Sync)> {
400        self.inner.instrument_data.as_deref()
401    }
402
403    /// Create a child scope.
404    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
405        self.new_child_inner(name.into())
406    }
407
408    fn new_child_inner(&self, name: String) -> Scope {
409        let mut state = self.lock();
410        let child = ScopeHandle {
411            inner: Arc::new(ScopeInner {
412                executor: self.inner.executor.clone(),
413                state: Condition::new(ScopeState::new_child(
414                    self.clone(),
415                    &state,
416                    JoinResults::default().into(),
417                )),
418
419                instrument_data: self
420                    .inner
421                    .executor
422                    .instrument
423                    .as_ref()
424                    .map(|value| value.scope_created(&name, Some(self))),
425                name,
426            }),
427        };
428        let weak = child.downgrade();
429        state.insert_child(weak);
430        Scope { inner: child }
431    }
432
433    /// Spawn a new task on the scope.
434    // This does not have the must_use attribute because it's common to detach and the lifetime of
435    // the task is bound to the scope: when the scope is dropped, the task will be cancelled.
436    pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
437        let task = future.into_task(self.clone());
438        let task_id = task.id();
439        self.insert_task(task, false);
440        JoinHandle::new(self.clone(), task_id)
441    }
442
443    /// Spawn a new task on the scope of a thread local executor.
444    ///
445    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
446    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
447    pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
448        let task = self.new_local_task(None, future);
449        let id = task.id();
450        self.insert_task(task, false);
451        JoinHandle::new(self.clone(), id)
452    }
453
454    /// Like `spawn`, but for tasks that return a result.
455    ///
456    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
457    /// *cancelled*.
458    pub fn compute<T: Send + 'static>(
459        &self,
460        future: impl Spawnable<Output = T> + Send + 'static,
461    ) -> crate::Task<T> {
462        let task = future.into_task(self.clone());
463        let id = task.id();
464        self.insert_task(task, false);
465        JoinHandle::new(self.clone(), id).into()
466    }
467
468    /// Like `spawn`, but for tasks that return a result.
469    ///
470    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
471    /// *cancelled*.
472    ///
473    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
474    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
475    pub fn compute_local<T: 'static>(
476        &self,
477        future: impl Future<Output = T> + 'static,
478    ) -> crate::Task<T> {
479        let task = self.new_local_task(None, future);
480        let id = task.id();
481        self.insert_task(task, false);
482        JoinHandle::new(self.clone(), id).into()
483    }
484
485    pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
486        ScopeHandle {
487            inner: Arc::new(ScopeInner {
488                state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
489                name: "root".to_string(),
490                instrument_data: executor
491                    .instrument
492                    .as_ref()
493                    .map(|value| value.scope_created("root", None)),
494                executor,
495            }),
496        }
497    }
498
499    /// Stop the scope from accepting new tasks.
500    ///
501    /// Note that unlike [`Scope::close`], this does not return a future that
502    /// waits for all tasks to complete. This could lead to resource leaks
503    /// because it is not uncommon to access a TaskGroup from a task running on
504    /// the scope itself. If such a task were to await a future returned by this
505    /// method it would suspend forever waiting for itself to complete.
506    pub fn close(&self) {
507        self.lock().close();
508    }
509
510    /// Cancel all the scope's tasks.
511    ///
512    /// Note that if this is called from within a task running on the scope, the
513    /// task will not resume from the next await point.
514    pub fn cancel(self) -> Join<Self> {
515        self.cancel_all_tasks();
516        Join::new(self)
517    }
518
519    /// Aborts all the scope's tasks.
520    ///
521    /// Note that if this is called from within a task running on the scope, the
522    /// task will not resume from the next await point.
523    pub fn abort(self) -> impl Future<Output = ()> {
524        self.abort_all_tasks();
525        Join::new(self)
526    }
527
528    /// Retrieves a [`ScopeActiveGuard`] for this scope.
529    ///
530    /// Note that this may fail if cancellation has already started for this
531    /// scope. In that case, the caller must assume any tasks from this scope
532    /// may be dropped at any yield point.
533    ///
534    /// Creating a [`ScopeActiveGuard`] is substantially more expensive than
535    /// just polling it, so callers should maintain the returned guard when
536    /// success is observed from this call for best performance.
537    ///
538    /// See [`Scope::cancel`] for details on cooperative cancellation behavior.
539    #[must_use]
540    pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
541        ScopeActiveGuard::new(self)
542    }
543
544    /// Returns true if the scope has been signaled to exit via
545    /// [`Scope::cancel`] or [`Scope::abort`].
546    pub fn is_cancelled(&self) -> bool {
547        self.lock().status().is_cancelled()
548    }
549
550    // Joining the scope could be allowed from a ScopeHandle, but the use case
551    // seems less common and more bug prone than cancelling. We don't allow this
552    // for the same reason we don't return a future from close().
553
554    /// Wait for there to be no tasks. This is racy: as soon as this returns it is possible for
555    /// another task to have been spawned on this scope.
556    pub async fn on_no_tasks(&self) {
557        self.inner
558            .state
559            .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
560            .await;
561    }
562
563    /// Wait for there to be no tasks and no guards. This is racy: as soon as this returns it is
564    /// possible for another task to have been spawned on this scope, or for there to be guards.
565    pub async fn on_no_tasks_and_guards(&self) {
566        self.inner
567            .state
568            .when(|state| {
569                if state.has_tasks() || state.guards() > 0 {
570                    Poll::Pending
571                } else {
572                    Poll::Ready(())
573                }
574            })
575            .await;
576    }
577
578    /// Wake all the scope's tasks so their futures will be polled again.
579    pub fn wake_all_with_active_guard(&self) {
580        self.lock().wake_all_with_active_guard();
581    }
582
583    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
584    /// That must be done separately.
585    pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
586        &self,
587        id: Option<usize>,
588        fut: Fut,
589    ) -> AtomicFutureHandle<'a>
590    where
591        Fut::Output: Send,
592    {
593        let id = id.unwrap_or_else(|| self.executor().next_task_id());
594        let mut task = AtomicFutureHandle::new(Some(self.clone()), id, fut);
595        if let Some(instrument) = &self.executor().instrument {
596            instrument.task_created(self, &mut task);
597        }
598        task
599    }
600
601    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
602    /// That must be done separately.
603    pub(crate) fn new_local_task<'a>(
604        &self,
605        id: Option<usize>,
606        fut: impl Future + 'a,
607    ) -> AtomicFutureHandle<'a> {
608        // Check that the executor is local and that this is the executor thread.
609        if !self.executor().is_local() {
610            panic!(
611                "Error: called `new_local_task` on multithreaded executor. \
612                 Use `spawn` or a `LocalExecutor` instead."
613            );
614        }
615        assert_eq!(
616            self.executor().first_thread_id.get(),
617            Some(&std::thread::current().id()),
618            "Error: called `new_local_task` on a different thread to the executor",
619        );
620
621        let id = id.unwrap_or_else(|| self.executor().next_task_id());
622        // SAFETY: We've confirmed that the futures here will never be used across multiple threads,
623        // so the Send requirements that `new_local` requires should be met.
624        unsafe {
625            let mut task = AtomicFutureHandle::new_local(Some(self.clone()), id, fut);
626            if let Some(instrument) = &self.executor().instrument {
627                instrument.task_created(self, &mut task);
628            }
629            task
630        }
631    }
632}
633
634impl fmt::Debug for ScopeHandle {
635    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
636        f.debug_struct("Scope").field("name", &self.inner.name).finish()
637    }
638}
639
640/// Similar to a scope but all futures spawned on the scope *must* finish with the same result type.
641/// That allows the scope to return a stream of results. Attempting to spawn tasks using
642/// `ScopeHandle::spawn` (or similar) will result in tasks that are immediately dropped (just as if
643/// the scope was closed).  Like a regular scope, the scope can be closed, at which point the stream
644/// will terminate once all the tasks have finished.  This is designed to be a fairly close drop-in
645/// replacement to `FuturesUnordered`, the principle difference being that the tasks run in parallel
646/// rather than just concurrently.  Another difference is that the futures don't need to be the same
647/// type; only the outputs do.  In all other respects, the scope operates like a regular scope i.e.
648/// it can have children, you can join them, cancel them, etc.
649pub struct ScopeStream<R> {
650    inner: ScopeHandle,
651    stream: Arc<Mutex<ResultsStreamInner<R>>>,
652}
653
654impl<R: Send + 'static> ScopeStream<R> {
655    /// Creates a new scope stream.
656    ///
657    /// The returned scope stream is a child of the current scope.
658    ///
659    /// # Panics
660    ///
661    /// May panic if not called in the context of an executor (e.g. within a
662    /// call to [`run`][crate::SendExecutor::run]).
663    pub fn new() -> (Self, ScopeStreamHandle<R>) {
664        Self::new_inner(String::new())
665    }
666
667    /// Creates a new scope stream with a name.
668    ///
669    /// The returned scope stream is a child of the current scope.
670    ///
671    /// # Panics
672    ///
673    /// May panic if not called in the context of an executor (e.g. within a
674    /// call to [`run`][crate::SendExecutor::run]).
675    pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
676        Self::new_inner(name.into())
677    }
678
679    fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
680        let this = ScopeHandle::with_current(|handle| {
681            let mut state = handle.lock();
682            let stream = Arc::default();
683            let child = ScopeHandle {
684                inner: Arc::new(ScopeInner {
685                    executor: handle.executor().clone(),
686                    state: Condition::new(ScopeState::new_child(
687                        handle.clone(),
688                        &state,
689                        Box::new(ResultsStream { inner: Arc::clone(&stream) }),
690                    )),
691                    instrument_data: handle
692                        .executor()
693                        .instrument
694                        .as_ref()
695                        .map(|value| value.scope_created(&name, Some(handle))),
696                    name,
697                }),
698            };
699            let weak = child.downgrade();
700            state.insert_child(weak);
701            ScopeStream { inner: child, stream }
702        });
703        let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
704        (this, handle)
705    }
706}
707
708impl<R> Drop for ScopeStream<R> {
709    fn drop(&mut self) {
710        // Cancel all tasks in the scope. Each task has a strong reference to the ScopeState,
711        // which will be dropped after all the tasks in the scope are dropped.
712
713        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
714        // here, but we cannot do that without either:
715        // - Sync drop support in AtomicFuture, or
716        // - The ability to reparent tasks, which requires atomic_arc or
717        //   acquiring a mutex during polling.
718        self.inner.abort_all_tasks();
719    }
720}
721
722impl<R: Send + 'static> Stream for ScopeStream<R> {
723    type Item = R;
724
725    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
726        let mut stream_inner = self.stream.lock();
727        match stream_inner.results.pop() {
728            Some(result) => Poll::Ready(Some(result)),
729            None => {
730                // Lock ordering: when results are posted, the state lock is taken first, so we must
731                // do the same.
732                drop(stream_inner);
733                let state = self.inner.lock();
734                let mut stream_inner = self.stream.lock();
735                match stream_inner.results.pop() {
736                    Some(result) => Poll::Ready(Some(result)),
737                    None => {
738                        if state.has_tasks() {
739                            stream_inner.waker = Some(cx.waker().clone());
740                            Poll::Pending
741                        } else {
742                            Poll::Ready(None)
743                        }
744                    }
745                }
746            }
747        }
748    }
749}
750
751impl<R> Deref for ScopeStream<R> {
752    type Target = ScopeHandle;
753    fn deref(&self) -> &Self::Target {
754        &self.inner
755    }
756}
757
758impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
759    fn borrow(&self) -> &ScopeHandle {
760        self
761    }
762}
763
764impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
765    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
766        let (stream, handle) = ScopeStream::new();
767        for fut in iter {
768            handle.push(fut);
769        }
770        stream.close();
771        stream
772    }
773}
774
775#[derive(Clone)]
776pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
777
778impl<R: Send> ScopeStreamHandle<R> {
779    pub fn push(&self, future: impl Spawnable<Output = R>) {
780        self.0.insert_task(future.into_task(self.0.clone()), true);
781    }
782}
783
784/// Holds a guard on the creating scope, holding off cancelation.
785///
786/// `ScopeActiveGuard` allows [`Scope`]s to perform cooperative cancellation.
787/// [`ScopeActiveGuard::on_cancel`] returns a future that resolves when
788/// [`Scope::cancel`] and [`ScopeHandle::cancel`] are called. That is the signal
789/// sent to cooperative tasks to stop doing work and finish.
790///
791/// A `ScopeActiveGuard` is obtained via [`ScopeHandle::active_guard`].
792/// `ScopeActiveGuard` releases the guard on the originating scope on drop.
793#[derive(Debug)]
794#[must_use]
795pub struct ScopeActiveGuard(ScopeHandle);
796
797impl Deref for ScopeActiveGuard {
798    type Target = ScopeHandle;
799    fn deref(&self) -> &Self::Target {
800        &self.0
801    }
802}
803
804impl Drop for ScopeActiveGuard {
805    fn drop(&mut self) {
806        let Self(scope) = self;
807        scope.release_cancel_guard();
808    }
809}
810
811impl Clone for ScopeActiveGuard {
812    fn clone(&self) -> Self {
813        self.0.lock().acquire_cancel_guard(1);
814        Self(self.0.clone())
815    }
816}
817
818impl ScopeActiveGuard {
819    /// Returns a borrow of the scope handle associated with this guard.
820    pub fn as_handle(&self) -> &ScopeHandle {
821        &self.0
822    }
823
824    /// Returns a clone of the scope handle associated with this guard.
825    pub fn to_handle(&self) -> ScopeHandle {
826        self.0.clone()
827    }
828
829    /// Retrieves a future from this guard that can be polled on for
830    /// cancellation.
831    ///
832    /// The returned future resolves when the scope is cancelled. Callers should
833    /// perform teardown and drop the guard when done.
834    pub async fn on_cancel(&self) {
835        self.0
836            .inner
837            .state
838            .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
839            .await
840    }
841
842    fn new(scope: &ScopeHandle) -> Option<Self> {
843        if scope.lock().acquire_cancel_guard_if_not_finished() {
844            Some(Self(scope.clone()))
845        } else {
846            None
847        }
848    }
849}
850
851/// An error indicating that a task failed to execute to completion.
852pub struct JoinError {
853    _phantom: PhantomData<()>,
854}
855
856impl fmt::Debug for JoinError {
857    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
858        f.debug_tuple("JoinError").finish()
859    }
860}
861
862impl fmt::Display for JoinError {
863    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
864        write!(f, "JoinError: a task failed to execute to completion")
865    }
866}
867
868impl error::Error for JoinError {}
869
870//
871// # Internal API
872//
873
874/// A weak reference to a scope.
875#[derive(Clone)]
876struct WeakScopeHandle {
877    inner: Weak<ScopeInner>,
878}
879
880impl WeakScopeHandle {
881    /// Upgrades to a [`ScopeHandle`] if the scope still exists.
882    pub fn upgrade(&self) -> Option<ScopeHandle> {
883        self.inner.upgrade().map(|inner| ScopeHandle { inner })
884    }
885}
886
887impl hash::Hash for WeakScopeHandle {
888    fn hash<H: hash::Hasher>(&self, state: &mut H) {
889        Weak::as_ptr(&self.inner).hash(state);
890    }
891}
892
893impl PartialEq for WeakScopeHandle {
894    fn eq(&self, other: &Self) -> bool {
895        Weak::ptr_eq(&self.inner, &other.inner)
896    }
897}
898
899impl Eq for WeakScopeHandle {
900    // Weak::ptr_eq should return consistent results, even when the inner value
901    // has been dropped.
902}
903
904// This module exists as a privacy boundary so that we can make sure any
905// operation that might cause the scope to finish also wakes its waker.
906mod state {
907    use super::*;
908
909    pub struct ScopeState {
910        pub parent: Option<ScopeHandle>,
911        // LINT.IfChange
912        children: HashSet<WeakScopeHandle>,
913        all_tasks: HashSet<TaskHandle>,
914        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
915        /// The number of children that transitively contain tasks, plus one for
916        /// this scope if it directly contains tasks.
917        subscopes_with_tasks: u32,
918        can_spawn: bool,
919        guards: u32,
920        status: Status,
921        /// Wakers/results for joining each task.
922        pub results: Box<dyn Results>,
923    }
924
925    pub enum JoinResult {
926        Waker(Waker),
927        Result(TaskHandle),
928    }
929
930    #[repr(u8)] // So zxdb can read the status.
931    #[derive(Default, Debug, Clone, Copy)]
932    pub enum Status {
933        #[default]
934        /// The scope is active.
935        Active,
936        /// The scope has been signalled to cancel and is waiting for all guards
937        /// to be released.
938        PendingCancellation,
939        /// The scope is not accepting new tasks and all tasks have been
940        /// scheduled to be dropped.
941        Finished,
942    }
943
944    impl Status {
945        /// Returns whether this records a cancelled state.
946        pub fn is_cancelled(&self) -> bool {
947            match self {
948                Self::Active => false,
949                Self::PendingCancellation | Self::Finished => true,
950            }
951        }
952    }
953
954    impl ScopeState {
955        pub fn new_root(results: Box<impl Results>) -> Self {
956            Self {
957                parent: None,
958                children: Default::default(),
959                all_tasks: Default::default(),
960                subscopes_with_tasks: 0,
961                can_spawn: true,
962                guards: 0,
963                status: Default::default(),
964                results,
965            }
966        }
967
968        pub fn new_child(
969            parent_handle: ScopeHandle,
970            parent_state: &Self,
971            results: Box<impl Results>,
972        ) -> Self {
973            let (status, can_spawn) = match parent_state.status {
974                Status::Active => (Status::Active, parent_state.can_spawn),
975                Status::Finished | Status::PendingCancellation => (Status::Finished, false),
976            };
977            Self {
978                parent: Some(parent_handle),
979                children: Default::default(),
980                all_tasks: Default::default(),
981                subscopes_with_tasks: 0,
982                can_spawn,
983                guards: 0,
984                status,
985                results,
986            }
987        }
988    }
989
990    impl ScopeState {
991        pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
992            &self.all_tasks
993        }
994
995        /// Attempts to add a task to the scope. Returns the task if the scope cannot accept a task
996        /// (since it isn't safe to drop the task whilst the lock is held).
997        pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
998            if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
999                return Some(task);
1000            }
1001            if self.all_tasks.is_empty() && !self.register_first_task() {
1002                return Some(task);
1003            }
1004            task.wake();
1005            assert!(self.all_tasks.insert(task));
1006            None
1007        }
1008
1009        pub fn children(&self) -> &HashSet<WeakScopeHandle> {
1010            &self.children
1011        }
1012
1013        pub fn insert_child(&mut self, child: WeakScopeHandle) {
1014            self.children.insert(child);
1015        }
1016
1017        pub fn remove_child(&mut self, child: &PtrKey) {
1018            let found = self.children.remove(child);
1019            // This should always succeed unless the scope is being dropped
1020            // (in which case children will be empty).
1021            assert!(found || self.children.is_empty());
1022        }
1023
1024        pub fn status(&self) -> Status {
1025            self.status
1026        }
1027
1028        pub fn guards(&self) -> u32 {
1029            self.guards
1030        }
1031
1032        pub fn close(&mut self) {
1033            self.can_spawn = false;
1034        }
1035
1036        pub fn mark_finished(&mut self) {
1037            self.can_spawn = false;
1038            self.status = Status::Finished;
1039        }
1040
1041        pub fn has_tasks(&self) -> bool {
1042            self.subscopes_with_tasks > 0
1043        }
1044
1045        pub fn wake_all_with_active_guard(&mut self) {
1046            let mut count = 0;
1047            for task in &self.all_tasks {
1048                if task.wake_with_active_guard() {
1049                    count += 1;
1050                }
1051            }
1052            self.acquire_cancel_guard(count);
1053        }
1054
1055        pub fn abort_tasks_and_mark_finished(&mut self) {
1056            for task in self.all_tasks() {
1057                if task.abort() {
1058                    task.scope().executor().ready_tasks.push(task.clone());
1059                }
1060                // Don't bother dropping tasks that are finished; the entire
1061                // scope is going to be dropped soon anyway.
1062            }
1063            self.mark_finished();
1064        }
1065
1066        pub fn wake_wakers_and_mark_pending(
1067            this: &mut ConditionGuard<'_, ScopeState>,
1068            wakers: &mut Vec<Waker>,
1069        ) {
1070            wakers.extend(this.drain_wakers());
1071            this.status = Status::PendingCancellation;
1072        }
1073
1074        /// Registers our first task with the parent scope.
1075        ///
1076        /// Returns false if the scope is not allowed to accept a task.
1077        #[must_use]
1078        fn register_first_task(&mut self) -> bool {
1079            if !self.can_spawn {
1080                return false;
1081            }
1082            let can_spawn = match &self.parent {
1083                Some(parent) => {
1084                    // If our parent already knows we have tasks, we can always
1085                    // spawn. Otherwise, we have to recurse.
1086                    self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1087                }
1088                None => true,
1089            };
1090            if can_spawn {
1091                self.subscopes_with_tasks += 1;
1092                debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1093            };
1094            can_spawn
1095        }
1096
1097        fn on_last_task_removed(
1098            this: &mut ConditionGuard<'_, ScopeState>,
1099            num_wakers_hint: usize,
1100            wakers: &mut Vec<Waker>,
1101        ) {
1102            debug_assert!(this.subscopes_with_tasks > 0);
1103            this.subscopes_with_tasks -= 1;
1104            if this.subscopes_with_tasks > 0 {
1105                wakers.reserve(num_wakers_hint);
1106                return;
1107            }
1108
1109            match &this.parent {
1110                Some(parent) => {
1111                    Self::on_last_task_removed(
1112                        &mut parent.lock(),
1113                        num_wakers_hint + this.waker_count(),
1114                        wakers,
1115                    );
1116                }
1117                None => wakers.reserve(num_wakers_hint),
1118            };
1119            wakers.extend(this.drain_wakers());
1120        }
1121
1122        /// Acquires a cancel guard IFF we're not in the finished state.
1123        ///
1124        /// Returns `true` if a guard was acquired.
1125        pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1126            match self.status {
1127                Status::Active | Status::PendingCancellation => {
1128                    self.acquire_cancel_guard(1);
1129                    true
1130                }
1131                Status::Finished => false,
1132            }
1133        }
1134
1135        pub fn acquire_cancel_guard(&mut self, count: u32) {
1136            if count == 0 {
1137                return;
1138            }
1139            if self.guards == 0
1140                && let Some(parent) = self.parent.as_ref()
1141            {
1142                parent.acquire_cancel_guard();
1143            }
1144            self.guards += count;
1145        }
1146
1147        pub fn release_cancel_guard(
1148            this: &mut ConditionGuard<'_, Self>,
1149            wake_vec: &mut WakeVec,
1150            mut waker_count: usize,
1151        ) {
1152            this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1153            if this.guards == 0 {
1154                waker_count += this.waker_count();
1155                this.on_zero_guards(wake_vec, waker_count);
1156                wake_vec.0.extend(this.drain_wakers())
1157            } else {
1158                wake_vec.0.reserve_exact(waker_count);
1159            }
1160        }
1161
1162        fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1163            match self.status {
1164                Status::Active => {}
1165                Status::PendingCancellation => {
1166                    self.abort_tasks_and_mark_finished();
1167                }
1168                // Acquiring and releasing guards post finished state is a
1169                // no-op.
1170                Status::Finished => {}
1171            }
1172            if let Some(parent) = &self.parent {
1173                ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1174            }
1175        }
1176    }
1177
1178    #[derive(Default)]
1179    pub struct WakeVec(Vec<Waker>);
1180
1181    impl Drop for WakeVec {
1182        fn drop(&mut self) {
1183            for waker in self.0.drain(..) {
1184                waker.wake();
1185            }
1186        }
1187    }
1188
1189    // WakeVec *must* come after the guard because we want the guard to be dropped first.
1190    pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1191
1192    impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1193        fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1194            Self(value, WakeVec::default())
1195        }
1196    }
1197
1198    impl ScopeWaker<'_> {
1199        pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
1200            let task = self.all_tasks.take(&id);
1201            if task.is_some() {
1202                self.on_task_removed(0);
1203            }
1204            task
1205        }
1206
1207        pub fn task_did_finish(&mut self, id: usize) {
1208            if let Some(task) = self.all_tasks.take(&id) {
1209                self.on_task_removed(1);
1210                if !task.is_detached() {
1211                    let maybe_waker = self.results.task_did_finish(task);
1212                    self.1.0.extend(maybe_waker);
1213                }
1214            }
1215        }
1216
1217        pub fn set_closed_and_drain(
1218            &mut self,
1219        ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1220            self.close();
1221            let all_tasks = std::mem::take(&mut self.all_tasks);
1222            let results = self.results.take();
1223            if !all_tasks.is_empty() {
1224                self.on_task_removed(0)
1225            }
1226            let children = self.children.drain();
1227            (all_tasks, results, children)
1228        }
1229
1230        fn on_task_removed(&mut self, num_wakers_hint: usize) {
1231            if self.all_tasks.is_empty() {
1232                ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1.0)
1233            }
1234        }
1235
1236        pub fn wake_wakers_and_mark_pending(&mut self) {
1237            let Self(state, wakers) = self;
1238            ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1239        }
1240    }
1241
1242    impl<'a> Deref for ScopeWaker<'a> {
1243        type Target = ConditionGuard<'a, ScopeState>;
1244
1245        fn deref(&self) -> &Self::Target {
1246            &self.0
1247        }
1248    }
1249
1250    impl DerefMut for ScopeWaker<'_> {
1251        fn deref_mut(&mut self) -> &mut Self::Target {
1252            &mut self.0
1253        }
1254    }
1255}
1256
1257struct ScopeInner {
1258    executor: Arc<Executor>,
1259    state: Condition<ScopeState>,
1260    name: String,
1261    instrument_data: Option<Box<dyn Any + Send + Sync>>,
1262}
1263
1264impl Drop for ScopeInner {
1265    fn drop(&mut self) {
1266        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1267        // This also complies with the correctness requirements of
1268        // HashSet::remove because the implementations of Hash and Eq match
1269        // between PtrKey and WeakScopeHandle.
1270        let key = unsafe { &*(self as *const _ as *const PtrKey) };
1271        let state = self.state.lock();
1272        if let Some(parent) = &state.parent {
1273            let mut wake_vec = WakeVec::default();
1274            let mut parent_state = parent.lock();
1275            if state.guards() != 0 {
1276                ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1277            }
1278            parent_state.remove_child(key);
1279        }
1280    }
1281}
1282
1283impl ScopeHandle {
1284    fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1285        super::common::TaskHandle::with_current(|task| match task {
1286            Some(task) => f(task.scope()),
1287            None => f(EHandle::local().global_scope()),
1288        })
1289    }
1290
1291    fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1292        self.inner.state.lock()
1293    }
1294
1295    fn downgrade(&self) -> WeakScopeHandle {
1296        WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1297    }
1298
1299    #[inline(always)]
1300    pub(crate) fn executor(&self) -> &Arc<Executor> {
1301        &self.inner.executor
1302    }
1303
1304    /// Marks the task as detached.
1305    pub(crate) fn detach(&self, task_id: usize) {
1306        let _maybe_task = {
1307            let mut state = self.lock();
1308            if let Some(task) = state.all_tasks().get(&task_id) {
1309                task.detach();
1310            }
1311            state.results.detach(task_id)
1312        };
1313    }
1314
1315    /// Aborts the task.
1316    ///
1317    /// # Safety
1318    ///
1319    /// The caller must guarantee that `R` is the correct type.
1320    pub(crate) unsafe fn abort_task<R>(&self, task_id: usize) -> Option<R> {
1321        let mut state = self.lock();
1322        if let Some(task) = state.results.detach(task_id) {
1323            drop(state);
1324            return unsafe { task.take_result() };
1325        }
1326        state.all_tasks().get(&task_id).and_then(|task| {
1327            if task.abort() {
1328                self.inner.executor.ready_tasks.push(task.clone());
1329            }
1330            unsafe { task.take_result() }
1331        })
1332    }
1333
1334    /// Aborts and detaches the task.
1335    pub(crate) fn abort_and_detach(&self, task_id: usize) {
1336        let _tasks = {
1337            let mut state = ScopeWaker::from(self.lock());
1338            let maybe_task1 = state.results.detach(task_id);
1339            let mut maybe_task2 = None;
1340            if let Some(task) = state.all_tasks().get(&task_id) {
1341                match task.abort_and_detach() {
1342                    AbortAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1343                    AbortAndDetachResult::AddToRunQueue => {
1344                        self.inner.executor.ready_tasks.push(task.clone());
1345                    }
1346                    AbortAndDetachResult::Pending => {}
1347                }
1348            }
1349            (maybe_task1, maybe_task2)
1350        };
1351    }
1352
1353    /// Polls for a join result for the given task ID.
1354    ///
1355    /// # Safety
1356    ///
1357    /// The caller must guarantee that `R` is the correct type.
1358    pub(crate) unsafe fn poll_join_result<R>(
1359        &self,
1360        task_id: usize,
1361        cx: &mut Context<'_>,
1362    ) -> Poll<R> {
1363        let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1364        match unsafe { task.take_result() } {
1365            Some(result) => Poll::Ready(result),
1366            None => {
1367                // The task has been aborted so all we can do is forever return pending.
1368                Poll::Pending
1369            }
1370        }
1371    }
1372
1373    /// Polls for a join result for the given task ID, or a `JoinError` if the
1374    /// task was canceled.
1375    ///
1376    /// # Safety
1377    ///
1378    /// The caller must guarantee that `R` is the correct type.
1379    pub(crate) unsafe fn try_poll_join_result<R>(
1380        &self,
1381        task_id: usize,
1382        cx: &mut Context<'_>,
1383    ) -> Poll<Result<R, JoinError>> {
1384        let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1385        match unsafe { task.take_result() } {
1386            Some(result) => Poll::Ready(Ok(result)),
1387            None => {
1388                if task.is_aborted() {
1389                    Poll::Ready(Err(JoinError { _phantom: PhantomData }))
1390                } else {
1391                    Poll::Pending
1392                }
1393            }
1394        }
1395    }
1396
1397    /// Polls for the task to be aborted.
1398    pub(crate) unsafe fn poll_aborted<R>(
1399        &self,
1400        task_id: usize,
1401        cx: &mut Context<'_>,
1402    ) -> Poll<Option<R>> {
1403        let task = self.lock().results.poll_join_result(task_id, cx);
1404        task.map(|task| unsafe { task.take_result() })
1405    }
1406
1407    pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1408        let returned_task = self.lock().insert_task(task, for_stream);
1409        returned_task.is_none()
1410    }
1411
1412    /// Drops the specified task.
1413    ///
1414    /// The main task by the single-threaded executor might not be 'static, so we use this to drop
1415    /// the task and make sure we meet lifetime guarantees.  Note that removing the task from our
1416    /// task list isn't sufficient; we must make sure the future running in the task is dropped.
1417    ///
1418    /// # Safety
1419    ///
1420    /// This is unsafe because of the call to `drop_future_unchecked` which requires that no
1421    /// thread is currently polling the task.
1422    pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1423        let mut state = ScopeWaker::from(self.lock());
1424        let task = state.take_task(task_id);
1425        if let Some(task) = task {
1426            unsafe { task.drop_future_unchecked() };
1427        }
1428    }
1429
1430    pub(super) fn task_did_finish(&self, id: usize) {
1431        let mut state = ScopeWaker::from(self.lock());
1432        state.task_did_finish(id);
1433    }
1434
1435    /// Visits scopes by state. If the callback returns `true`, children will
1436    /// be visited.
1437    fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1438        let mut scopes = vec![self.clone()];
1439        while let Some(scope) = scopes.pop() {
1440            let mut scope_waker = ScopeWaker::from(scope.lock());
1441            if callback(&mut scope_waker) {
1442                scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1443            }
1444        }
1445    }
1446
1447    fn acquire_cancel_guard(&self) {
1448        self.lock().acquire_cancel_guard(1)
1449    }
1450
1451    pub(crate) fn release_cancel_guard(&self) {
1452        let mut wake_vec = WakeVec::default();
1453        ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1454    }
1455
1456    /// Cancels tasks in this scope and all child scopes.
1457    fn cancel_all_tasks(&self) {
1458        self.visit_scopes_locked(|state| {
1459            match state.status() {
1460                Status::Active => {
1461                    if state.guards() == 0 {
1462                        state.abort_tasks_and_mark_finished();
1463                    } else {
1464                        state.wake_wakers_and_mark_pending();
1465                    }
1466                    true
1467                }
1468                Status::PendingCancellation => {
1469                    // If we're already pending cancellation, don't wake all
1470                    // tasks. A single wake should be enough here. More
1471                    // wakes on further calls probably hides bugs.
1472                    true
1473                }
1474                Status::Finished => {
1475                    // Already finished.
1476                    false
1477                }
1478            }
1479        });
1480    }
1481
1482    /// Aborts tasks in this scope and all child scopes.
1483    fn abort_all_tasks(&self) {
1484        self.visit_scopes_locked(|state| match state.status() {
1485            Status::Active | Status::PendingCancellation => {
1486                state.abort_tasks_and_mark_finished();
1487                true
1488            }
1489            Status::Finished => false,
1490        });
1491    }
1492
1493    /// Drops tasks in this scope and all child scopes.
1494    ///
1495    /// # Panics
1496    ///
1497    /// Panics if any task is being accessed by another thread. Only call this
1498    /// method when the executor is shutting down and there are no other pollers.
1499    pub(super) fn drop_all_tasks(&self) {
1500        let mut scopes = vec![self.clone()];
1501        while let Some(scope) = scopes.pop() {
1502            let (tasks, join_results) = {
1503                let mut state = ScopeWaker::from(scope.lock());
1504                let (tasks, join_results, children) = state.set_closed_and_drain();
1505                scopes.extend(children.filter_map(|child| child.upgrade()));
1506                (tasks, join_results)
1507            };
1508            // Call task destructors once the scope lock is released so we don't risk a deadlock.
1509            for task in tasks {
1510                task.try_drop().expect("Expected drop to succeed");
1511            }
1512            std::mem::drop(join_results);
1513        }
1514    }
1515}
1516
1517/// Optimizes removal from parent scope.
1518#[repr(transparent)]
1519struct PtrKey;
1520
1521impl Borrow<PtrKey> for WeakScopeHandle {
1522    fn borrow(&self) -> &PtrKey {
1523        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1524        unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1525    }
1526}
1527
1528impl PartialEq for PtrKey {
1529    fn eq(&self, other: &Self) -> bool {
1530        std::ptr::eq(self, other)
1531    }
1532}
1533
1534impl Eq for PtrKey {}
1535
1536impl hash::Hash for PtrKey {
1537    fn hash<H: hash::Hasher>(&self, state: &mut H) {
1538        (self as *const PtrKey).hash(state);
1539    }
1540}
1541
1542#[derive(Default)]
1543struct JoinResults(HashMap<usize, JoinResult>);
1544
1545trait Results: Send + Sync + 'static {
1546    /// Returns true if we allow spawning futures with arbitrary outputs on the scope.
1547    fn can_spawn(&self) -> bool;
1548
1549    /// Polls for the specified task having finished.
1550    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1551
1552    /// Called when a task finishes.
1553    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1554
1555    /// Called to drop any results for a particular task.
1556    fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1557
1558    /// Takes *all* the stored results.
1559    fn take(&mut self) -> Box<dyn Any>;
1560
1561    /// Used only for testing.  Returns true if there are any results registered.
1562    #[cfg(test)]
1563    fn is_empty(&self) -> bool;
1564}
1565
1566impl Results for JoinResults {
1567    fn can_spawn(&self) -> bool {
1568        true
1569    }
1570
1571    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1572        match self.0.entry(task_id) {
1573            Entry::Occupied(mut o) => match o.get_mut() {
1574                JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1575                JoinResult::Result(_) => {
1576                    let JoinResult::Result(task) = o.remove() else { unreachable!() };
1577                    return Poll::Ready(task);
1578                }
1579            },
1580            Entry::Vacant(v) => {
1581                v.insert(JoinResult::Waker(cx.waker().clone()));
1582            }
1583        }
1584        Poll::Pending
1585    }
1586
1587    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1588        match self.0.entry(task.id()) {
1589            Entry::Occupied(mut o) => {
1590                let JoinResult::Waker(waker) =
1591                    std::mem::replace(o.get_mut(), JoinResult::Result(task))
1592                else {
1593                    // It can't be JoinResult::Result because this function is the only
1594                    // function that sets that, and `task_did_finish` won't get called
1595                    // twice.
1596                    unreachable!()
1597                };
1598                Some(waker)
1599            }
1600            Entry::Vacant(v) => {
1601                v.insert(JoinResult::Result(task));
1602                None
1603            }
1604        }
1605    }
1606
1607    fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1608        match self.0.remove(&task_id) {
1609            Some(JoinResult::Result(task)) => Some(task),
1610            _ => None,
1611        }
1612    }
1613
1614    fn take(&mut self) -> Box<dyn Any> {
1615        Box::new(Self(std::mem::take(&mut self.0)))
1616    }
1617
1618    #[cfg(test)]
1619    fn is_empty(&self) -> bool {
1620        self.0.is_empty()
1621    }
1622}
1623
1624#[derive(Default)]
1625struct ResultsStream<R> {
1626    inner: Arc<Mutex<ResultsStreamInner<R>>>,
1627}
1628
1629struct ResultsStreamInner<R> {
1630    results: Vec<R>,
1631    waker: Option<Waker>,
1632}
1633
1634impl<R> Default for ResultsStreamInner<R> {
1635    fn default() -> Self {
1636        Self { results: Vec::new(), waker: None }
1637    }
1638}
1639
1640impl<R: Send + 'static> Results for ResultsStream<R> {
1641    fn can_spawn(&self) -> bool {
1642        false
1643    }
1644
1645    fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1646        Poll::Pending
1647    }
1648
1649    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1650        let mut inner = self.inner.lock();
1651        // SAFETY: R is guaranteed to be the same return type as all futures finishing on this
1652        // scope.
1653        inner.results.extend(unsafe { task.take_result() });
1654        inner.waker.take()
1655    }
1656
1657    fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1658        None
1659    }
1660
1661    fn take(&mut self) -> Box<dyn Any> {
1662        Box::new(std::mem::take(&mut self.inner.lock().results))
1663    }
1664
1665    #[cfg(test)]
1666    fn is_empty(&self) -> bool {
1667        false
1668    }
1669}
1670
1671#[cfg(test)]
1672mod tests {
1673    // NOTE: Tests that work on both the fuchsia and portable runtimes should be placed in
1674    // runtime/scope.rs.
1675
1676    use super::super::super::task::CancelableJoinHandle;
1677    use super::*;
1678    use crate::{
1679        EHandle, LocalExecutor, SendExecutorBuilder, SpawnableFuture, Task, TestExecutor, Timer,
1680        yield_now,
1681    };
1682    use fuchsia_sync::{Condvar, Mutex};
1683    use futures::channel::mpsc;
1684    use futures::{FutureExt, StreamExt};
1685    use std::future::pending;
1686    use std::pin::{Pin, pin};
1687    use std::sync::Arc;
1688    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1689    use std::task::{Context, Poll};
1690    use std::time::Duration;
1691
1692    #[derive(Default)]
1693    struct RemoteControlFuture(Mutex<RCFState>);
1694    #[derive(Default)]
1695    struct RCFState {
1696        resolved: bool,
1697        waker: Option<Waker>,
1698    }
1699
1700    impl Future for &RemoteControlFuture {
1701        type Output = ();
1702        fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1703            let mut this = self.0.lock();
1704            if this.resolved {
1705                Poll::Ready(())
1706            } else {
1707                this.waker.replace(cx.waker().clone());
1708                Poll::Pending
1709            }
1710        }
1711    }
1712
1713    impl RemoteControlFuture {
1714        fn new() -> Arc<Self> {
1715            Arc::new(Default::default())
1716        }
1717
1718        fn resolve(&self) {
1719            let mut this = self.0.lock();
1720            this.resolved = true;
1721            if let Some(waker) = this.waker.take() {
1722                waker.wake();
1723            }
1724        }
1725
1726        fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> + use<> {
1727            let this = Arc::clone(self);
1728            #[allow(clippy::redundant_async_block)] // Allow returning `&*this` out of this fn.
1729            async move {
1730                (&*this).await
1731            }
1732        }
1733    }
1734
1735    #[test]
1736    fn compute_works_on_root_scope() {
1737        let mut executor = TestExecutor::new();
1738        let scope = executor.global_scope();
1739        let mut task = pin!(scope.compute(async { 1 }));
1740        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1741    }
1742
1743    #[test]
1744    fn compute_works_on_new_child() {
1745        let mut executor = TestExecutor::new();
1746        let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1747        let mut task = pin!(scope.compute(async { 1 }));
1748        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1749    }
1750
1751    #[test]
1752    fn scope_drop_cancels_tasks() {
1753        let mut executor = TestExecutor::new();
1754        let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1755        let mut task = pin!(scope.compute(async { 1 }));
1756        drop(scope);
1757        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1758    }
1759
1760    #[test]
1761    fn tasks_do_not_spawn_on_cancelled_scopes() {
1762        let mut executor = TestExecutor::new();
1763        let scope =
1764            executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1765        let handle = scope.to_handle();
1766        let mut cancel = pin!(scope.cancel());
1767        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1768        let mut task = pin!(handle.compute(async { 1 }));
1769        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1770    }
1771
1772    #[test]
1773    fn tasks_do_not_spawn_on_closed_empty_scopes() {
1774        let mut executor = TestExecutor::new();
1775        let scope =
1776            executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1777        let handle = scope.to_handle();
1778        let mut close = pin!(scope.cancel());
1779        assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1780        let mut task = pin!(handle.compute(async { 1 }));
1781        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1782    }
1783
1784    #[test]
1785    fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1786        let mut executor = TestExecutor::new();
1787        let scope = executor.global_scope().new_child();
1788        let handle = scope.to_handle();
1789        handle.spawn(pending());
1790        let mut close = pin!(scope.close());
1791        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1792        let mut task = pin!(handle.compute(async { 1 }));
1793        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1794    }
1795
1796    #[test]
1797    fn spawn_works_on_child_and_grandchild() {
1798        let mut executor = TestExecutor::new();
1799        let scope = executor.global_scope().new_child();
1800        let child = scope.new_child();
1801        let grandchild = child.new_child();
1802        let mut child_task = pin!(child.compute(async { 1 }));
1803        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1804        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1805        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1806    }
1807
1808    #[test]
1809    fn spawn_drop_cancels_child_and_grandchild_tasks() {
1810        let mut executor = TestExecutor::new();
1811        let scope = executor.global_scope().new_child();
1812        let child = scope.new_child();
1813        let grandchild = child.new_child();
1814        let mut child_task = pin!(child.compute(async { 1 }));
1815        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1816        drop(scope);
1817        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1818        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1819    }
1820
1821    #[test]
1822    fn completed_tasks_are_cleaned_up_after_cancel() {
1823        let mut executor = TestExecutor::new();
1824        let scope = executor.global_scope().new_child();
1825
1826        let task1 = scope.spawn(pending::<()>());
1827        let task2 = scope.spawn(async {});
1828        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1829        assert_eq!(scope.lock().all_tasks().len(), 1);
1830
1831        // Running the executor after cancelling the task isn't currently
1832        // necessary, but we might decide to do async cleanup in the future.
1833        assert_eq!(task1.abort().now_or_never(), None);
1834        assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1835
1836        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1837        assert_eq!(scope.lock().all_tasks().len(), 0);
1838        assert!(scope.lock().results.is_empty());
1839    }
1840
1841    #[test]
1842    fn join_emtpy_scope() {
1843        let mut executor = TestExecutor::new();
1844        let scope = executor.global_scope().new_child();
1845        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1846    }
1847
1848    #[test]
1849    fn task_handle_preserves_access_to_result_after_join_begins() {
1850        let mut executor = TestExecutor::new();
1851        let scope = executor.global_scope().new_child();
1852        let mut task = scope.compute(async { 1 });
1853        scope.spawn(async {});
1854        let task2 = scope.spawn(pending::<()>());
1855        // Fuse to stay agnostic as to whether the join completes before or
1856        // after awaiting the task handle.
1857        let mut join = pin!(scope.join().fuse());
1858        let _ = executor.run_until_stalled(&mut join);
1859        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1860        drop(task2.abort());
1861        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1862    }
1863
1864    #[test]
1865    fn join_blocks_until_task_is_cancelled() {
1866        // Scope with one outstanding task handle and one cancelled task.
1867        // The scope is not complete until the outstanding task handle is cancelled.
1868        let mut executor = TestExecutor::new();
1869        let scope = executor.global_scope().new_child();
1870        let outstanding_task = scope.spawn(pending::<()>());
1871        let cancelled_task = scope.spawn(pending::<()>());
1872        assert_eq!(
1873            executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1874            Poll::Ready(None)
1875        );
1876        let mut join = pin!(scope.join());
1877        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1878        assert_eq!(
1879            executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1880            Poll::Ready(None)
1881        );
1882        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1883    }
1884
1885    #[test]
1886    fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1887        let mut executor = TestExecutor::new();
1888        let scope = executor.global_scope().new_child();
1889        // The default is to detach.
1890        scope.spawn(pending::<()>());
1891        let mut join = pin!(scope.join());
1892        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1893        let mut cancel = pin!(join.cancel());
1894        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1895    }
1896
1897    #[test]
1898    fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1899        let mut executor = TestExecutor::new();
1900        let scope = executor.global_scope().new_child();
1901        // The default is to detach.
1902        scope.spawn(pending::<()>());
1903        let mut close = pin!(scope.close());
1904        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1905        let mut cancel = pin!(close.cancel());
1906        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1907    }
1908
1909    #[test]
1910    fn join_scope_blocks_until_spawned_task_completes() {
1911        let mut executor = TestExecutor::new();
1912        let scope = executor.global_scope().new_child();
1913        let remote = RemoteControlFuture::new();
1914        let mut task = scope.spawn(remote.as_future());
1915        let mut scope_join = pin!(scope.join());
1916        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1917        remote.resolve();
1918        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1919        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1920    }
1921
1922    #[test]
1923    fn close_scope_blocks_until_spawned_task_completes() {
1924        let mut executor = TestExecutor::new();
1925        let scope = executor.global_scope().new_child();
1926        let remote = RemoteControlFuture::new();
1927        let mut task = scope.spawn(remote.as_future());
1928        let mut scope_close = pin!(scope.close());
1929        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1930        remote.resolve();
1931        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1932        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1933    }
1934
1935    #[test]
1936    fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1937        let mut executor = TestExecutor::new();
1938        let scope = executor.global_scope().new_child();
1939        let child = scope.new_child();
1940        let remote = RemoteControlFuture::new();
1941        child.spawn(remote.as_future());
1942        let mut scope_join = pin!(scope.join());
1943        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1944        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1945        child.detach();
1946        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1947        remote.resolve();
1948        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1949    }
1950
1951    #[test]
1952    fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1953        let mut executor = TestExecutor::new();
1954        let scope = executor.global_scope().new_child();
1955        let remote = RemoteControlFuture::new();
1956        {
1957            let remote = remote.clone();
1958            scope.spawn(async move {
1959                let child = Scope::new_with_name("child");
1960                child.spawn(async move {
1961                    Scope::current().spawn(remote.as_future());
1962                });
1963                child.detach();
1964            });
1965        }
1966        let mut scope_join = pin!(scope.join());
1967        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1968        remote.resolve();
1969        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1970    }
1971
1972    #[test]
1973    fn join_scope_blocks_when_blocked_child_is_detached() {
1974        let mut executor = TestExecutor::new();
1975        let scope = executor.global_scope().new_child();
1976        let child = scope.new_child();
1977        child.spawn(pending());
1978        let mut scope_join = pin!(scope.join());
1979        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1980        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1981        child.detach();
1982        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1983    }
1984
1985    #[test]
1986    fn join_scope_completes_when_blocked_child_is_cancelled() {
1987        let mut executor = TestExecutor::new();
1988        let scope = executor.global_scope().new_child();
1989        let child = scope.new_child();
1990        child.spawn(pending());
1991        let mut scope_join = pin!(scope.join());
1992        {
1993            let mut child_join = pin!(child.join());
1994            assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1995            assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1996        }
1997        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1998    }
1999
2000    #[test]
2001    fn detached_scope_can_spawn() {
2002        let mut executor = TestExecutor::new();
2003        let scope = executor.global_scope().new_child();
2004        let handle = scope.to_handle();
2005        scope.detach();
2006        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2007    }
2008
2009    #[test]
2010    fn dropped_scope_cannot_spawn() {
2011        let mut executor = TestExecutor::new();
2012        let scope = executor.global_scope().new_child();
2013        let handle = scope.to_handle();
2014        drop(scope);
2015        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2016    }
2017
2018    #[test]
2019    fn dropped_scope_with_running_task_cannot_spawn() {
2020        let mut executor = TestExecutor::new();
2021        let scope = executor.global_scope().new_child();
2022        let handle = scope.to_handle();
2023        let _running_task = handle.spawn(pending::<()>());
2024        drop(scope);
2025        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2026    }
2027
2028    #[test]
2029    fn joined_scope_cannot_spawn() {
2030        let mut executor = TestExecutor::new();
2031        let scope = executor.global_scope().new_child();
2032        let handle = scope.to_handle();
2033        let mut scope_join = pin!(scope.join());
2034        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2035        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2036    }
2037
2038    #[test]
2039    fn joining_scope_with_running_task_can_spawn() {
2040        let mut executor = TestExecutor::new();
2041        let scope = executor.global_scope().new_child();
2042        let handle = scope.to_handle();
2043        let _running_task = handle.spawn(pending::<()>());
2044        let mut scope_join = pin!(scope.join());
2045        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
2046        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2047    }
2048
2049    #[test]
2050    fn joined_scope_child_cannot_spawn() {
2051        let mut executor = TestExecutor::new();
2052        let scope = executor.global_scope().new_child();
2053        let handle = scope.to_handle();
2054        let child_before_join = scope.new_child();
2055        assert_eq!(
2056            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2057            Poll::Ready(1)
2058        );
2059        let mut scope_join = pin!(scope.join());
2060        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2061        let child_after_join = handle.new_child();
2062        let grandchild_after_join = child_before_join.new_child();
2063        assert_eq!(
2064            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2065            Poll::Pending
2066        );
2067        assert_eq!(
2068            executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
2069            Poll::Pending
2070        );
2071        assert_eq!(
2072            executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
2073            Poll::Pending
2074        );
2075    }
2076
2077    #[test]
2078    fn closed_scope_child_cannot_spawn() {
2079        let mut executor = TestExecutor::new();
2080        let scope = executor.global_scope().new_child();
2081        let handle = scope.to_handle();
2082        let child_before_close = scope.new_child();
2083        assert_eq!(
2084            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2085            Poll::Ready(1)
2086        );
2087        let mut scope_close = pin!(scope.close());
2088        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2089        let child_after_close = handle.new_child();
2090        let grandchild_after_close = child_before_close.new_child();
2091        assert_eq!(
2092            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2093            Poll::Pending
2094        );
2095        assert_eq!(
2096            executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2097            Poll::Pending
2098        );
2099        assert_eq!(
2100            executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2101            Poll::Pending
2102        );
2103    }
2104
2105    #[test]
2106    fn can_join_child_first() {
2107        let mut executor = TestExecutor::new();
2108        let scope = executor.global_scope().new_child();
2109        let child = scope.new_child();
2110        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2111        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2112        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2113    }
2114
2115    #[test]
2116    fn can_join_parent_first() {
2117        let mut executor = TestExecutor::new();
2118        let scope = executor.global_scope().new_child();
2119        let child = scope.new_child();
2120        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2121        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2122        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2123    }
2124
2125    #[test]
2126    fn task_in_parent_scope_can_join_child() {
2127        let mut executor = TestExecutor::new();
2128        let scope = executor.global_scope().new_child();
2129        let child = scope.new_child();
2130        let remote = RemoteControlFuture::new();
2131        child.spawn(remote.as_future());
2132        scope.spawn(async move { child.join().await });
2133        let mut join = pin!(scope.join());
2134        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2135        remote.resolve();
2136        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2137    }
2138
2139    #[test]
2140    fn join_completes_while_completed_task_handle_is_held() {
2141        let mut executor = TestExecutor::new();
2142        let scope = executor.global_scope().new_child();
2143        let mut task = scope.compute(async { 1 });
2144        scope.spawn(async {});
2145        let mut join = pin!(scope.join());
2146        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2147        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2148    }
2149
2150    #[test]
2151    fn cancel_completes_while_task_holds_handle() {
2152        let mut executor = TestExecutor::new();
2153        let scope = executor.global_scope().new_child();
2154        let handle = scope.to_handle();
2155        let mut task = scope.compute(async move {
2156            loop {
2157                pending::<()>().await; // never returns
2158                handle.spawn(async {});
2159            }
2160        });
2161
2162        // Join should not complete because the task never does.
2163        let mut join = pin!(scope.join());
2164        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2165
2166        let mut cancel = pin!(join.cancel());
2167        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2168        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2169    }
2170
2171    #[test]
2172    fn cancel_from_handle_inside_task() {
2173        let mut executor = TestExecutor::new();
2174        let scope = executor.global_scope().new_child();
2175        {
2176            // Spawn a task that never finishes until the scope is cancelled.
2177            scope.spawn(pending::<()>());
2178
2179            let mut no_tasks = pin!(scope.on_no_tasks());
2180            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2181
2182            let handle = scope.to_handle();
2183            scope.spawn(async move {
2184                handle.cancel().await;
2185                panic!("cancel() should never complete");
2186            });
2187
2188            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2189        }
2190        assert_eq!(scope.join().now_or_never(), Some(()));
2191    }
2192
2193    #[test]
2194    fn can_spawn_from_non_executor_thread() {
2195        let mut executor = TestExecutor::new();
2196        let scope = executor.global_scope().clone();
2197        let done = Arc::new(AtomicBool::new(false));
2198        let done_clone = done.clone();
2199        let _ = std::thread::spawn(move || {
2200            scope.spawn(async move {
2201                done_clone.store(true, Ordering::Relaxed);
2202            })
2203        })
2204        .join();
2205        let _ = executor.run_until_stalled(&mut pending::<()>());
2206        assert!(done.load(Ordering::Relaxed));
2207    }
2208
2209    #[test]
2210    fn scope_tree() {
2211        // A
2212        //  \
2213        //   B
2214        //  / \
2215        // C   D
2216        let mut executor = TestExecutor::new();
2217        let a = executor.global_scope().new_child();
2218        let b = a.new_child();
2219        let c = b.new_child();
2220        let d = b.new_child();
2221        let a_remote = RemoteControlFuture::new();
2222        let c_remote = RemoteControlFuture::new();
2223        let d_remote = RemoteControlFuture::new();
2224        a.spawn(a_remote.as_future());
2225        c.spawn(c_remote.as_future());
2226        d.spawn(d_remote.as_future());
2227        let mut a_join = pin!(a.join());
2228        let mut b_join = pin!(b.join());
2229        let mut d_join = pin!(d.join());
2230        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2231        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2232        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2233        d_remote.resolve();
2234        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2235        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2236        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2237        c_remote.resolve();
2238        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2239        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2240        a_remote.resolve();
2241        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2242        let mut c_join = pin!(c.join());
2243        assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2244    }
2245
2246    #[test]
2247    fn wake_all_with_active_guard_on_send_executor() {
2248        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2249        let scope = executor.root_scope().new_child();
2250
2251        let (tx, mut rx) = mpsc::unbounded();
2252        // Bottom 32 bits are the poll count. Top 32 bits are when to signal.
2253        let state = Arc::new(AtomicU64::new(0));
2254
2255        struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2256
2257        impl Future for PollCounter {
2258            type Output = ();
2259            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2260                let old = self.0.fetch_add(1, Ordering::Relaxed);
2261                if old >> 32 == (old + 1) & u32::MAX as u64 {
2262                    let _ = self.1.unbounded_send(());
2263                }
2264                Poll::Pending
2265            }
2266        }
2267
2268        scope.spawn(PollCounter(state.clone(), tx.clone()));
2269        scope.spawn(PollCounter(state.clone(), tx.clone()));
2270
2271        executor.run(async move {
2272            let mut wait_for_poll_count = async |count| {
2273                let old = state.fetch_or(count << 32, Ordering::Relaxed);
2274                if old & u32::MAX as u64 != count {
2275                    rx.next().await.unwrap();
2276                }
2277                state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2278            };
2279
2280            // We must assume the executor will only poll the two tasks once each.
2281            wait_for_poll_count(2).await;
2282
2283            let mut start_count = 2;
2284            for _ in 0..2 {
2285                scope.wake_all_with_active_guard();
2286
2287                wait_for_poll_count(start_count + 2).await;
2288                start_count += 2;
2289            }
2290
2291            // Wake, then cancel the scope and verify the tasks still get polled.
2292            scope.wake_all_with_active_guard();
2293            let done = scope.cancel();
2294
2295            wait_for_poll_count(start_count + 2).await;
2296
2297            done.await;
2298        });
2299    }
2300
2301    #[test]
2302    fn on_no_tasks_race() {
2303        fn sleep_random() {
2304            std::thread::sleep(std::time::Duration::from_micros(rand::random_range(0..10)));
2305        }
2306        for _ in 0..2000 {
2307            let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2308            let scope = executor.root_scope().new_child();
2309            scope.spawn(async {
2310                sleep_random();
2311            });
2312            executor.run(async move {
2313                sleep_random();
2314                scope.on_no_tasks().await;
2315            });
2316        }
2317    }
2318
2319    #[test]
2320    fn test_detach() {
2321        let mut e = LocalExecutor::default();
2322        e.run_singlethreaded(async {
2323            let counter = Arc::new(AtomicU32::new(0));
2324
2325            {
2326                let counter = counter.clone();
2327                Task::spawn(async move {
2328                    for _ in 0..5 {
2329                        yield_now().await;
2330                        counter.fetch_add(1, Ordering::Relaxed);
2331                    }
2332                })
2333                .detach();
2334            }
2335
2336            while counter.load(Ordering::Relaxed) != 5 {
2337                yield_now().await;
2338            }
2339        });
2340
2341        assert!(e.ehandle.root_scope.lock().results.is_empty());
2342    }
2343
2344    #[test]
2345    fn test_cancel() {
2346        let mut e = LocalExecutor::default();
2347        e.run_singlethreaded(async {
2348            let ref_count = Arc::new(());
2349            // First, just drop the task.
2350            {
2351                let ref_count = ref_count.clone();
2352                drop(Task::spawn(async move {
2353                    let _ref_count = ref_count;
2354                    let _: () = std::future::pending().await;
2355                }));
2356            }
2357
2358            while Arc::strong_count(&ref_count) != 1 {
2359                yield_now().await;
2360            }
2361
2362            // Now try explicitly cancelling.
2363            let task = {
2364                let ref_count = ref_count.clone();
2365                Task::spawn(async move {
2366                    let _ref_count = ref_count;
2367                    let _: () = std::future::pending().await;
2368                })
2369            };
2370
2371            assert_eq!(task.abort().await, None);
2372            while Arc::strong_count(&ref_count) != 1 {
2373                yield_now().await;
2374            }
2375
2376            // Now cancel a task that has already finished.
2377            let task = {
2378                let ref_count = ref_count.clone();
2379                Task::spawn(async move {
2380                    let _ref_count = ref_count;
2381                })
2382            };
2383
2384            // Wait for it to finish.
2385            while Arc::strong_count(&ref_count) != 1 {
2386                yield_now().await;
2387            }
2388
2389            assert_eq!(task.abort().await, Some(()));
2390        });
2391
2392        assert!(e.ehandle.root_scope.lock().results.is_empty());
2393    }
2394
2395    #[test]
2396    fn test_cancel_waits() {
2397        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2398        let state = Arc::new((Mutex::new(0), Condvar::new()));
2399        let task = {
2400            let state = state.clone();
2401            executor.root_scope().compute(async move {
2402                *state.0.lock() = 1;
2403                state.1.notify_all();
2404                // Wait till the other task has noticed we changed state to 1.
2405                state.1.wait_while(&mut state.0.lock(), |state| *state == 1);
2406                std::thread::sleep(std::time::Duration::from_millis(10));
2407                *state.0.lock() = 3;
2408                "foo"
2409            })
2410        };
2411        executor.run(async move {
2412            state.1.wait_while(&mut state.0.lock(), |state| {
2413                if *state == 1 {
2414                    // Tell the other task we've noticed state 1.
2415                    *state = 2;
2416                    false
2417                } else {
2418                    true
2419                }
2420            });
2421            state.1.notify_all();
2422            assert_eq!(task.abort().await, Some("foo"));
2423            // The other task should have finished and set state to 3.
2424            assert_eq!(*state.0.lock(), 3);
2425        });
2426    }
2427
2428    fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2429        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2430        let running = Arc::new((Mutex::new(false), Condvar::new()));
2431        let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2432        let task = {
2433            let running = running.clone();
2434            let can_quit = can_quit.clone();
2435            executor.root_scope().compute(async move {
2436                *running.0.lock() = true;
2437                running.1.notify_all();
2438                {
2439                    let mut guard = can_quit.0.lock();
2440                    while !*guard {
2441                        can_quit.1.wait(&mut guard);
2442                    }
2443                }
2444                *running.0.lock() = false;
2445            })
2446        };
2447        executor.run(async move {
2448            {
2449                let mut guard = running.0.lock();
2450                while !*guard {
2451                    running.1.wait(&mut guard);
2452                }
2453            }
2454
2455            callback(task);
2456
2457            *can_quit.0.lock() = true;
2458            can_quit.1.notify_all();
2459
2460            let ehandle = EHandle::local();
2461            let scope = ehandle.global_scope();
2462
2463            // The only way of testing for this is to poll.
2464            while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2465                Timer::new(std::time::Duration::from_millis(1)).await;
2466            }
2467
2468            assert!(!*running.0.lock());
2469        });
2470    }
2471
2472    #[test]
2473    fn test_dropped_cancel_cleans_up() {
2474        test_clean_up(|task| {
2475            let abort_fut = std::pin::pin!(task.abort());
2476            let waker = futures::task::noop_waker();
2477            assert!(abort_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2478        });
2479    }
2480
2481    #[test]
2482    fn test_dropped_task_cleans_up() {
2483        test_clean_up(|task| {
2484            std::mem::drop(task);
2485        });
2486    }
2487
2488    #[test]
2489    fn test_detach_cleans_up() {
2490        test_clean_up(|task| {
2491            task.detach();
2492        });
2493    }
2494
2495    #[test]
2496    fn test_scope_stream() {
2497        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2498        executor.run(async move {
2499            let (stream, handle) = ScopeStream::new();
2500            handle.push(async { 1 });
2501            handle.push(async { 2 });
2502            stream.close();
2503            let results: HashSet<_> = stream.collect().await;
2504            assert_eq!(results, HashSet::from_iter([1, 2]));
2505        });
2506    }
2507
2508    #[test]
2509    fn test_scope_stream_wakes_properly() {
2510        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2511        executor.run(async move {
2512            let (stream, handle) = ScopeStream::new();
2513            handle.push(async {
2514                Timer::new(Duration::from_millis(10)).await;
2515                1
2516            });
2517            handle.push(async {
2518                Timer::new(Duration::from_millis(10)).await;
2519                2
2520            });
2521            stream.close();
2522            let results: HashSet<_> = stream.collect().await;
2523            assert_eq!(results, HashSet::from_iter([1, 2]));
2524        });
2525    }
2526
2527    #[test]
2528    fn test_scope_stream_drops_spawned_tasks() {
2529        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2530        executor.run(async move {
2531            let (stream, handle) = ScopeStream::new();
2532            handle.push(async { 1 });
2533            let _task = stream.compute(async { "foo" });
2534            stream.close();
2535            let results: HashSet<_> = stream.collect().await;
2536            assert_eq!(results, HashSet::from_iter([1]));
2537        });
2538    }
2539
2540    #[test]
2541    fn test_nested_scope_stream() {
2542        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2543        executor.run(async move {
2544            let (mut stream, handle) = ScopeStream::new();
2545            handle.clone().push(async move {
2546                handle.clone().push(async move {
2547                    handle.clone().push(async move { 3 });
2548                    2
2549                });
2550                1
2551            });
2552            let mut results = HashSet::default();
2553            while let Some(item) = stream.next().await {
2554                results.insert(item);
2555                if results.len() == 3 {
2556                    stream.close();
2557                }
2558            }
2559            assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2560        });
2561    }
2562
2563    #[test]
2564    fn test_dropping_scope_stream_cancels_all_tasks() {
2565        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2566        executor.run(async move {
2567            let (stream, handle) = ScopeStream::new();
2568            let (tx1, mut rx) = mpsc::unbounded::<()>();
2569            let tx2 = tx1.clone();
2570            handle.push(async move {
2571                let _tx1 = tx1;
2572                let () = pending().await;
2573            });
2574            handle.push(async move {
2575                let _tx2 = tx2;
2576                let () = pending().await;
2577            });
2578            drop(stream);
2579
2580            // This will wait forever if the tasks aren't cancelled.
2581            assert_eq!(rx.next().await, None);
2582        });
2583    }
2584
2585    #[test]
2586    fn test_scope_stream_collect() {
2587        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2588        executor.run(async move {
2589            let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2590            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2591
2592            let stream: ScopeStream<_> =
2593                (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2594            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2595        });
2596    }
2597
2598    struct DropSignal(Arc<AtomicBool>);
2599
2600    impl Drop for DropSignal {
2601        fn drop(&mut self) {
2602            self.0.store(true, Ordering::SeqCst);
2603        }
2604    }
2605
2606    struct DropChecker(Arc<AtomicBool>);
2607
2608    impl DropChecker {
2609        fn new() -> (Self, DropSignal) {
2610            let inner = Arc::new(AtomicBool::new(false));
2611            (Self(inner.clone()), DropSignal(inner))
2612        }
2613
2614        fn is_dropped(&self) -> bool {
2615            self.0.load(Ordering::SeqCst)
2616        }
2617    }
2618
2619    #[test]
2620    fn child_finished_when_parent_pending() {
2621        let mut executor = LocalExecutor::default();
2622        executor.run_singlethreaded(async {
2623            let scope = Scope::new();
2624            let _guard = scope.active_guard().expect("acquire guard");
2625            let cancel = scope.to_handle().cancel();
2626            let child = scope.new_child();
2627            let (checker, signal) = DropChecker::new();
2628            child.spawn(async move {
2629                let _signal = signal;
2630                futures::future::pending::<()>().await
2631            });
2632            assert!(checker.is_dropped());
2633            assert!(child.active_guard().is_none());
2634            cancel.await;
2635        })
2636    }
2637
2638    #[test]
2639    fn guarded_scopes_observe_closed() {
2640        let mut executor = LocalExecutor::default();
2641        executor.run_singlethreaded(async {
2642            let scope = Scope::new();
2643            let handle = scope.to_handle();
2644            let _guard = scope.active_guard().expect("acquire guard");
2645            handle.close();
2646            let (checker, signal) = DropChecker::new();
2647            handle.spawn(async move {
2648                let _signal = signal;
2649                futures::future::pending::<()>().await
2650            });
2651            assert!(checker.is_dropped());
2652            let (checker, signal) = DropChecker::new();
2653            let cancel = handle.clone().cancel();
2654            handle.spawn(async move {
2655                let _signal = signal;
2656                futures::future::pending::<()>().await
2657            });
2658            assert!(checker.is_dropped());
2659            scope.join().await;
2660            cancel.await;
2661        })
2662    }
2663
2664    #[test]
2665    fn child_guard_holds_parent_cancellation() {
2666        let mut executor = TestExecutor::new();
2667        let scope = executor.global_scope().new_child();
2668        let child = scope.new_child();
2669        let guard = child.active_guard().expect("acquire guard");
2670        scope.spawn(futures::future::pending());
2671        let mut join = pin!(scope.cancel());
2672        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2673        drop(guard);
2674        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2675    }
2676
2677    #[test]
2678    fn active_guard_on_cancel() {
2679        let mut executor = TestExecutor::new();
2680        let scope = executor.global_scope().new_child();
2681        let child1 = scope.new_child();
2682        let child2 = scope.new_child();
2683        let guard = child1.active_guard().expect("acquire guard");
2684        let guard_for_right_scope = guard.clone();
2685        let guard_for_wrong_scope = guard.clone();
2686        child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2687        child2.spawn(async move {
2688            guard_for_wrong_scope.on_cancel().await;
2689        });
2690
2691        let handle = scope.to_handle();
2692        let mut join = pin!(scope.join());
2693        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2694        let cancel: Join<_> = handle.cancel();
2695        drop(cancel);
2696        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2697    }
2698
2699    #[test]
2700    fn abort_join() {
2701        let mut executor = TestExecutor::new();
2702        let scope = executor.global_scope().new_child();
2703        let child = scope.new_child();
2704        let _guard = child.active_guard().expect("acquire guard");
2705
2706        let (checker1, signal) = DropChecker::new();
2707        scope.spawn(async move {
2708            let _signal = signal;
2709            futures::future::pending::<()>().await
2710        });
2711        let (checker2, signal) = DropChecker::new();
2712        scope.spawn(async move {
2713            let _signal = signal;
2714            futures::future::pending::<()>().await
2715        });
2716
2717        let mut join = pin!(scope.cancel());
2718        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2719        assert!(!checker1.is_dropped());
2720        assert!(!checker2.is_dropped());
2721
2722        let mut join = join.abort();
2723        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2724        assert!(checker1.is_dropped());
2725        assert!(checker2.is_dropped());
2726    }
2727
2728    #[test]
2729    fn child_without_guard_aborts_immediately_on_cancel() {
2730        let mut executor = TestExecutor::new();
2731        let scope = executor.global_scope().new_child();
2732        let child = scope.new_child();
2733        let guard = scope.active_guard().expect("acquire guard");
2734
2735        let (checker_scope, signal) = DropChecker::new();
2736        scope.spawn(async move {
2737            let _signal = signal;
2738            futures::future::pending::<()>().await
2739        });
2740        let (checker_child, signal) = DropChecker::new();
2741        child.spawn(async move {
2742            let _signal = signal;
2743            futures::future::pending::<()>().await
2744        });
2745
2746        let mut join = pin!(scope.cancel());
2747        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2748        assert!(!checker_scope.is_dropped());
2749        assert!(checker_child.is_dropped());
2750
2751        drop(guard);
2752        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2753        assert!(checker_child.is_dropped());
2754    }
2755
2756    #[test]
2757    fn await_canceled_task_pends_forever() {
2758        let mut executor = TestExecutor::new();
2759        let scope = executor.global_scope().new_child();
2760
2761        let task = scope.spawn(pending::<()>());
2762        let mut main_future = pin!(async move {
2763            drop(scope);
2764            task.await;
2765        });
2766        assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Pending,);
2767    }
2768
2769    #[test]
2770    fn await_canceled_abortable_task_finishes_with_error() {
2771        let mut executor = TestExecutor::new();
2772        let scope = executor.global_scope().new_child();
2773
2774        let task = CancelableJoinHandle::from(scope.spawn(pending::<()>()));
2775        let mut main_future = pin!(async move {
2776            drop(scope);
2777            let _ = task.await;
2778        });
2779        assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Ready(()),);
2780    }
2781}