fuchsia_async/runtime/fuchsia/executor/
scope.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::EHandle;
9use crate::condition::{Condition, ConditionGuard, WakerEntry};
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{Context, Poll, Waker, ready};
26use std::{fmt, hash};
27
28//
29// # Public API
30//
31
32/// A scope for managing async tasks. This scope is aborted when dropped.
33///
34/// Scopes are how fuchsia-async implements [structured concurrency][sc]. Every
35/// task is spawned on a scope, and runs until either the task completes or the
36/// scope is cancelled or aborted. In addition to owning tasks, scopes may own
37/// child scopes, forming a nested structure.
38///
39/// Scopes are usually joined or cancelled when the owning code is done with
40/// them. This makes it easier to reason about when a background task might
41/// still be running. Note that in multithreaded contexts it is safer to cancel
42/// and await a scope explicitly than to drop it, because the destructor is not
43/// synchronized with other threads that might be running a task.
44///
45/// [`Task::spawn`][crate::Task::spawn] and related APIs spawn on the root scope
46/// of the executor. New code is encouraged to spawn directly on scopes instead,
47/// passing their handles as a way of documenting when a function might spawn
48/// tasks that run in the background and reasoning about their side effects.
49///
50/// ## Scope lifecycle
51///
52/// When a scope is created it is open, meaning it accepts new tasks. Scopes are
53/// closed when one of the following happens:
54///
55/// 1. When [`close()`][Scope::close] is called.
56/// 2. When the scope is aborted or dropped, the scope is closed immediately.
57/// 3. When the scope is cancelled, the scope is closed when all active guards
58///    are dropped.
59/// 4. When the scope is joined and all tasks complete, the scope is closed
60///    before the join future resolves.
61///
62/// When a scope is closed it no longer accepts tasks. Tasks spawned on the
63/// scope are dropped immediately, and their [`Task`][crate::Task] or
64/// [`JoinHandle`][crate::JoinHandle] futures never resolve. This applies
65/// transitively to all child scopes. Closed scopes cannot currently be
66/// reopened.
67///
68/// Scopes can also be detached, in which case they are never closed, and run
69/// until the completion of all tasks.
70///
71/// [sc]: https://en.wikipedia.org/wiki/Structured_concurrency
72#[must_use = "Scopes should be explicitly awaited or cancelled"]
73#[derive(Debug)]
74pub struct Scope {
75    // LINT.IfChange
76    inner: ScopeHandle,
77    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
78}
79
80impl Default for Scope {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl Scope {
87    /// Create a new scope.
88    ///
89    /// The returned scope is a child of the current scope.
90    ///
91    /// # Panics
92    ///
93    /// May panic if not called in the context of an executor (e.g. within a
94    /// call to [`run`][crate::SendExecutor::run]).
95    pub fn new() -> Scope {
96        ScopeHandle::with_current(|handle| handle.new_child())
97    }
98
99    /// Create a new scope with a name.
100    ///
101    /// The returned scope is a child of the current scope.
102    ///
103    /// # Panics
104    ///
105    /// May panic if not called in the context of an executor (e.g. within a
106    /// call to [`run`][crate::SendExecutor::run]).
107    pub fn new_with_name(name: impl Into<String>) -> Scope {
108        ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
109    }
110
111    /// Get the scope of the current task, or the global scope if there is no task
112    /// being polled.
113    ///
114    /// # Panics
115    ///
116    /// May panic if not called in the context of an executor (e.g. within a
117    /// call to [`run`][crate::SendExecutor::run]).
118    pub fn current() -> ScopeHandle {
119        ScopeHandle::with_current(|handle| handle.clone())
120    }
121
122    /// Get the global scope of the executor.
123    ///
124    /// This can be used to spawn tasks that live as long as the executor.
125    /// Usually, this means until the end of the program or test. This should
126    /// only be done for tasks where this is expected. If in doubt, spawn on a
127    /// shorter lived scope instead.
128    ///
129    /// In code that uses scopes, you are strongly encouraged to use this API
130    /// instead of the spawn APIs on [`Task`][crate::Task].
131    ///
132    /// All scopes are descendants of the global scope.
133    ///
134    /// # Panics
135    ///
136    /// May panic if not called in the context of an executor (e.g. within a
137    /// call to [`run`][crate::SendExecutor::run]).
138    pub fn global() -> ScopeHandle {
139        EHandle::local().global_scope().clone()
140    }
141
142    /// Create a child scope.
143    pub fn new_child(&self) -> Scope {
144        self.inner.new_child()
145    }
146
147    /// Create a child scope with a name.
148    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
149        self.inner.new_child_with_name(name.into())
150    }
151
152    /// Returns the name of the scope.
153    pub fn name(&self) -> &str {
154        &self.inner.inner.name
155    }
156
157    /// Create a [`ScopeHandle`] that may be used to spawn tasks on this scope.
158    ///
159    /// This is a shorthand for `scope.as_handle().clone()`.
160    ///
161    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
162    /// available. Note that you should _not_ call `scope.clone()`, even though
163    /// the compiler allows it due to the Deref impl. Call this method instead.
164    pub fn to_handle(&self) -> ScopeHandle {
165        self.inner.clone()
166    }
167
168    /// Get a reference to a [`ScopeHandle`] that may be used to spawn tasks on
169    /// this scope.
170    ///
171    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
172    /// available. If you have a `Scope` but need a `&ScopeHandle`, prefer
173    /// calling this method over the less readable `&*scope`.
174    pub fn as_handle(&self) -> &ScopeHandle {
175        &self.inner
176    }
177
178    /// Wait for all tasks in the scope and its children to complete.
179    ///
180    /// New tasks will be accepted on the scope until every task completes and
181    /// this future resolves.
182    ///
183    /// Note that you can await a scope directly because it implements
184    /// `IntoFuture`. `scope.join().await` is a more explicit form of
185    /// `scope.await`.
186    pub fn join(self) -> Join {
187        Join::new(self)
188    }
189
190    /// Stop accepting new tasks on the scope. Returns a future that waits for
191    /// every task on the scope to complete.
192    pub fn close(self) -> Join {
193        self.inner.close();
194        Join::new(self)
195    }
196
197    /// Cancel all tasks cooperatively in the scope and its children
198    /// recursively.
199    ///
200    /// `cancel` first gives a chance to all child tasks (including tasks of
201    /// child scopes) to shutdown cleanly if they're holding on to a
202    /// [`ScopeActiveGuard`]. Once no child tasks are holding on to guards, then
203    /// `cancel` behaves like [`Scope::abort`], dropping all tasks and stopping
204    /// them from running at the next yield point. A [`ScopeActiveGuard`]
205    /// provides a cooperative cancellation signal that is triggered by this
206    /// call, see its documentation for more details.
207    ///
208    /// Once the returned future resolves, no task on the scope will be polled
209    /// again.
210    ///
211    /// Cancelling a scope _does not_ immediately prevent new tasks from being
212    /// accepted. New tasks are accepted as long as there are
213    /// `ScopeActiveGuard`s for this scope.
214    pub fn cancel(self) -> Join {
215        self.inner.cancel_all_tasks();
216        Join::new(self)
217    }
218
219    /// Cancel all tasks in the scope and its children recursively.
220    ///
221    /// Once the returned future resolves, no task on the scope will be polled
222    /// again. Unlike [`Scope::cancel`], this doesn't send a cooperative
223    /// cancellation signal to tasks or child scopes.
224    ///
225    /// When a scope is aborted it immediately stops accepting tasks. Handles of
226    /// tasks spawned on the scope will pend forever.
227    ///
228    /// Dropping the `Scope` object is equivalent to calling this method and
229    /// discarding the returned future. Awaiting the future is preferred because
230    /// it eliminates the possibility of a task poll completing on another
231    /// thread after the scope object has been dropped, which can sometimes
232    /// result in surprising behavior.
233    pub fn abort(self) -> impl Future<Output = ()> {
234        self.inner.abort_all_tasks();
235        Join::new(self)
236    }
237
238    /// Detach the scope, allowing its tasks to continue running in the
239    /// background.
240    ///
241    /// Tasks of a detached scope are still subject to join and cancel
242    /// operations on parent scopes.
243    pub fn detach(self) {
244        // Use ManuallyDrop to destructure self, because Rust doesn't allow this
245        // for types which implement Drop.
246        let this = ManuallyDrop::new(self);
247        // SAFETY: this.inner is obviously valid, and we don't access `this`
248        // after moving.
249        mem::drop(unsafe { std::ptr::read(&this.inner) });
250    }
251}
252
253/// Abort the scope and all of its tasks. Prefer using the [`Scope::abort`]
254/// or [`Scope::join`] methods.
255impl Drop for Scope {
256    fn drop(&mut self) {
257        // Abort all tasks in the scope. Each task has a strong reference to the ScopeState,
258        // which will be dropped after all the tasks in the scope are dropped.
259
260        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
261        // here, but we cannot do that without either:
262        // - Sync drop support in AtomicFuture, or
263        // - The ability to reparent tasks, which requires atomic_arc or
264        //   acquiring a mutex during polling.
265        self.inner.abort_all_tasks();
266    }
267}
268
269impl IntoFuture for Scope {
270    type Output = ();
271    type IntoFuture = Join;
272    fn into_future(self) -> Self::IntoFuture {
273        self.join()
274    }
275}
276
277impl Deref for Scope {
278    type Target = ScopeHandle;
279    fn deref(&self) -> &Self::Target {
280        &self.inner
281    }
282}
283
284impl Borrow<ScopeHandle> for Scope {
285    fn borrow(&self) -> &ScopeHandle {
286        self
287    }
288}
289
290pin_project! {
291    /// Join operation for a [`Scope`].
292    ///
293    /// This is a future that resolves when all tasks on the scope are complete
294    /// or have been cancelled. New tasks will be accepted on the scope until
295    /// every task completes and this future resolves.
296    ///
297    /// When this object is dropped, the scope and all tasks in it are
298    /// cancelled.
299    //
300    // Note: The drop property is only true when S = Scope; it does not apply to
301    // other (non-public) uses of this struct where S = ScopeHandle.
302    pub struct Join<S = Scope> {
303        scope: S,
304        #[pin]
305        waker_entry: WakerEntry<ScopeState>,
306    }
307}
308
309impl<S: Borrow<ScopeHandle>> Join<S> {
310    fn new(scope: S) -> Self {
311        let waker_entry = scope.borrow().inner.state.waker_entry();
312        Self { scope, waker_entry }
313    }
314
315    /// Aborts the scope. The future will resolve when all tasks have finished
316    /// polling.
317    ///
318    /// See [`Scope::abort`] for more details.
319    pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
320        self.scope.borrow().abort_all_tasks();
321        self
322    }
323
324    /// Cancel the scope. The future will resolve when all tasks have finished
325    /// polling.
326    ///
327    /// See [`Scope::cancel`] for more details.
328    pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
329        self.scope.borrow().cancel_all_tasks();
330        self
331    }
332}
333
334impl<S> Future for Join<S>
335where
336    S: Borrow<ScopeHandle>,
337{
338    type Output = ();
339    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340        let this = self.project();
341        let mut state = Borrow::borrow(&*this.scope).lock();
342        if state.has_tasks() {
343            state.add_waker(this.waker_entry, cx.waker().clone());
344            Poll::Pending
345        } else {
346            state.mark_finished();
347            Poll::Ready(())
348        }
349    }
350}
351
352/// Trait for things that can be spawned on to a scope.  There is a blanket implementation
353/// below for futures.
354pub trait Spawnable {
355    /// The type of value produced on completion.
356    type Output;
357
358    /// Converts to a task that can be spawned directly.
359    fn into_task(self, scope: ScopeHandle) -> TaskHandle;
360}
361
362impl<F: Future + Send + 'static> Spawnable for F
363where
364    F::Output: Send + 'static,
365{
366    type Output = F::Output;
367
368    fn into_task(self, scope: ScopeHandle) -> TaskHandle {
369        scope.new_task(None, self)
370    }
371}
372
373/// A handle to a scope, which may be used to spawn tasks.
374///
375/// ## Ownership and cycles
376///
377/// Tasks running on a `Scope` may hold a `ScopeHandle` to that scope. This does
378/// not create an ownership cycle because the task will drop the `ScopeHandle`
379/// once it completes or is cancelled.
380///
381/// Naturally, scopes containing tasks that never complete and that are never
382/// cancelled will never be freed. Holding a `ScopeHandle` does not contribute to
383/// this problem.
384#[derive(Clone)]
385pub struct ScopeHandle {
386    // LINT.IfChange
387    inner: Arc<ScopeInner>,
388    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
389}
390
391impl ScopeHandle {
392    /// Create a child scope.
393    pub fn new_child(&self) -> Scope {
394        self.new_child_inner(String::new())
395    }
396
397    /// Returns a reference to the instrument data.
398    pub fn instrument_data(&self) -> Option<&(dyn Any + Send + Sync)> {
399        self.inner.instrument_data.as_deref()
400    }
401
402    /// Create a child scope.
403    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
404        self.new_child_inner(name.into())
405    }
406
407    fn new_child_inner(&self, name: String) -> Scope {
408        let mut state = self.lock();
409        let child = ScopeHandle {
410            inner: Arc::new(ScopeInner {
411                executor: self.inner.executor.clone(),
412                state: Condition::new(ScopeState::new_child(
413                    self.clone(),
414                    &state,
415                    JoinResults::default().into(),
416                )),
417
418                instrument_data: self
419                    .inner
420                    .executor
421                    .instrument
422                    .as_ref()
423                    .map(|value| value.scope_created(&name, Some(self))),
424                name,
425            }),
426        };
427        let weak = child.downgrade();
428        state.insert_child(weak);
429        Scope { inner: child }
430    }
431
432    /// Spawn a new task on the scope.
433    // This does not have the must_use attribute because it's common to detach and the lifetime of
434    // the task is bound to the scope: when the scope is dropped, the task will be cancelled.
435    pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
436        let task = future.into_task(self.clone());
437        let task_id = task.id();
438        self.insert_task(task, false);
439        JoinHandle::new(self.clone(), task_id)
440    }
441
442    /// Spawn a new task on the scope of a thread local executor.
443    ///
444    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
445    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
446    pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
447        let task = self.new_local_task(None, future);
448        let id = task.id();
449        self.insert_task(task, false);
450        JoinHandle::new(self.clone(), id)
451    }
452
453    /// Like `spawn`, but for tasks that return a result.
454    ///
455    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
456    /// *cancelled*.
457    pub fn compute<T: Send + 'static>(
458        &self,
459        future: impl Spawnable<Output = T> + Send + 'static,
460    ) -> crate::Task<T> {
461        let task = future.into_task(self.clone());
462        let id = task.id();
463        self.insert_task(task, false);
464        JoinHandle::new(self.clone(), id).into()
465    }
466
467    /// Like `spawn`, but for tasks that return a result.
468    ///
469    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
470    /// *cancelled*.
471    ///
472    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
473    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
474    pub fn compute_local<T: 'static>(
475        &self,
476        future: impl Future<Output = T> + 'static,
477    ) -> crate::Task<T> {
478        let task = self.new_local_task(None, future);
479        let id = task.id();
480        self.insert_task(task, false);
481        JoinHandle::new(self.clone(), id).into()
482    }
483
484    pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
485        ScopeHandle {
486            inner: Arc::new(ScopeInner {
487                state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
488                name: "root".to_string(),
489                instrument_data: executor
490                    .instrument
491                    .as_ref()
492                    .map(|value| value.scope_created("root", None)),
493                executor,
494            }),
495        }
496    }
497
498    /// Stop the scope from accepting new tasks.
499    ///
500    /// Note that unlike [`Scope::close`], this does not return a future that
501    /// waits for all tasks to complete. This could lead to resource leaks
502    /// because it is not uncommon to access a TaskGroup from a task running on
503    /// the scope itself. If such a task were to await a future returned by this
504    /// method it would suspend forever waiting for itself to complete.
505    pub fn close(&self) {
506        self.lock().close();
507    }
508
509    /// Cancel all the scope's tasks.
510    ///
511    /// Note that if this is called from within a task running on the scope, the
512    /// task will not resume from the next await point.
513    pub fn cancel(self) -> Join<Self> {
514        self.cancel_all_tasks();
515        Join::new(self)
516    }
517
518    /// Aborts all the scope's tasks.
519    ///
520    /// Note that if this is called from within a task running on the scope, the
521    /// task will not resume from the next await point.
522    pub fn abort(self) -> impl Future<Output = ()> {
523        self.abort_all_tasks();
524        Join::new(self)
525    }
526
527    /// Retrieves a [`ScopeActiveGuard`] for this scope.
528    ///
529    /// Note that this may fail if cancellation has already started for this
530    /// scope. In that case, the caller must assume any tasks from this scope
531    /// may be dropped at any yield point.
532    ///
533    /// Creating a [`ScopeActiveGuard`] is substantially more expensive than
534    /// just polling it, so callers should maintain the returned guard when
535    /// success is observed from this call for best performance.
536    ///
537    /// See [`Scope::cancel`] for details on cooperative cancellation behavior.
538    #[must_use]
539    pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
540        ScopeActiveGuard::new(self)
541    }
542
543    /// Returns true if the scope has been signaled to exit via
544    /// [`Scope::cancel`] or [`Scope::abort`].
545    pub fn is_cancelled(&self) -> bool {
546        self.lock().status().is_cancelled()
547    }
548
549    // Joining the scope could be allowed from a ScopeHandle, but the use case
550    // seems less common and more bug prone than cancelling. We don't allow this
551    // for the same reason we don't return a future from close().
552
553    /// Wait for there to be no tasks. This is racy: as soon as this returns it is possible for
554    /// another task to have been spawned on this scope.
555    pub async fn on_no_tasks(&self) {
556        self.inner
557            .state
558            .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
559            .await;
560    }
561
562    /// Wait for there to be no tasks and no guards. This is racy: as soon as this returns it is
563    /// possible for another task to have been spawned on this scope, or for there to be guards.
564    pub async fn on_no_tasks_and_guards(&self) {
565        self.inner
566            .state
567            .when(|state| {
568                if state.has_tasks() || state.guards() > 0 {
569                    Poll::Pending
570                } else {
571                    Poll::Ready(())
572                }
573            })
574            .await;
575    }
576
577    /// Wake all the scope's tasks so their futures will be polled again.
578    pub fn wake_all_with_active_guard(&self) {
579        self.lock().wake_all_with_active_guard();
580    }
581
582    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
583    /// That must be done separately.
584    pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
585        &self,
586        id: Option<usize>,
587        fut: Fut,
588    ) -> AtomicFutureHandle<'a>
589    where
590        Fut::Output: Send,
591    {
592        let id = id.unwrap_or_else(|| self.executor().next_task_id());
593        let mut task = AtomicFutureHandle::new(Some(self.clone()), id, fut);
594        if let Some(instrument) = &self.executor().instrument {
595            instrument.task_created(self, &mut task);
596        }
597        task
598    }
599
600    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
601    /// That must be done separately.
602    pub(crate) fn new_local_task<'a>(
603        &self,
604        id: Option<usize>,
605        fut: impl Future + 'a,
606    ) -> AtomicFutureHandle<'a> {
607        // Check that the executor is local and that this is the executor thread.
608        if !self.executor().is_local() {
609            panic!(
610                "Error: called `new_local_task` on multithreaded executor. \
611                 Use `spawn` or a `LocalExecutor` instead."
612            );
613        }
614        assert_eq!(
615            self.executor().first_thread_id.get(),
616            Some(&std::thread::current().id()),
617            "Error: called `new_local_task` on a different thread to the executor",
618        );
619
620        let id = id.unwrap_or_else(|| self.executor().next_task_id());
621        // SAFETY: We've confirmed that the futures here will never be used across multiple threads,
622        // so the Send requirements that `new_local` requires should be met.
623        unsafe {
624            let mut task = AtomicFutureHandle::new_local(Some(self.clone()), id, fut);
625            if let Some(instrument) = &self.executor().instrument {
626                instrument.task_created(self, &mut task);
627            }
628            task
629        }
630    }
631}
632
633impl fmt::Debug for ScopeHandle {
634    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
635        f.debug_struct("Scope").field("name", &self.inner.name).finish()
636    }
637}
638
639/// Similar to a scope but all futures spawned on the scope *must* finish with the same result type.
640/// That allows the scope to return a stream of results. Attempting to spawn tasks using
641/// `ScopeHandle::spawn` (or similar) will result in tasks that are immediately dropped (just as if
642/// the scope was closed).  Like a regular scope, the scope can be closed, at which point the stream
643/// will terminate once all the tasks have finished.  This is designed to be a fairly close drop-in
644/// replacement to `FuturesUnordered`, the principle difference being that the tasks run in parallel
645/// rather than just concurrently.  Another difference is that the futures don't need to be the same
646/// type; only the outputs do.  In all other respects, the scope operates like a regular scope i.e.
647/// it can have children, you can join them, cancel them, etc.
648pub struct ScopeStream<R> {
649    inner: ScopeHandle,
650    stream: Arc<Mutex<ResultsStreamInner<R>>>,
651}
652
653impl<R: Send + 'static> ScopeStream<R> {
654    /// Creates a new scope stream.
655    ///
656    /// The returned scope stream is a child of the current scope.
657    ///
658    /// # Panics
659    ///
660    /// May panic if not called in the context of an executor (e.g. within a
661    /// call to [`run`][crate::SendExecutor::run]).
662    pub fn new() -> (Self, ScopeStreamHandle<R>) {
663        Self::new_inner(String::new())
664    }
665
666    /// Creates a new scope stream with a name.
667    ///
668    /// The returned scope stream is a child of the current scope.
669    ///
670    /// # Panics
671    ///
672    /// May panic if not called in the context of an executor (e.g. within a
673    /// call to [`run`][crate::SendExecutor::run]).
674    pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
675        Self::new_inner(name.into())
676    }
677
678    fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
679        let this = ScopeHandle::with_current(|handle| {
680            let mut state = handle.lock();
681            let stream = Arc::default();
682            let child = ScopeHandle {
683                inner: Arc::new(ScopeInner {
684                    executor: handle.executor().clone(),
685                    state: Condition::new(ScopeState::new_child(
686                        handle.clone(),
687                        &state,
688                        Box::new(ResultsStream { inner: Arc::clone(&stream) }),
689                    )),
690                    instrument_data: handle
691                        .executor()
692                        .instrument
693                        .as_ref()
694                        .map(|value| value.scope_created(&name, Some(handle))),
695                    name,
696                }),
697            };
698            let weak = child.downgrade();
699            state.insert_child(weak);
700            ScopeStream { inner: child, stream }
701        });
702        let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
703        (this, handle)
704    }
705}
706
707impl<R> Drop for ScopeStream<R> {
708    fn drop(&mut self) {
709        // Cancel all tasks in the scope. Each task has a strong reference to the ScopeState,
710        // which will be dropped after all the tasks in the scope are dropped.
711
712        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
713        // here, but we cannot do that without either:
714        // - Sync drop support in AtomicFuture, or
715        // - The ability to reparent tasks, which requires atomic_arc or
716        //   acquiring a mutex during polling.
717        self.inner.abort_all_tasks();
718    }
719}
720
721impl<R: Send + 'static> Stream for ScopeStream<R> {
722    type Item = R;
723
724    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
725        let mut stream_inner = self.stream.lock();
726        match stream_inner.results.pop() {
727            Some(result) => Poll::Ready(Some(result)),
728            None => {
729                // Lock ordering: when results are posted, the state lock is taken first, so we must
730                // do the same.
731                drop(stream_inner);
732                let state = self.inner.lock();
733                let mut stream_inner = self.stream.lock();
734                match stream_inner.results.pop() {
735                    Some(result) => Poll::Ready(Some(result)),
736                    None => {
737                        if state.has_tasks() {
738                            stream_inner.waker = Some(cx.waker().clone());
739                            Poll::Pending
740                        } else {
741                            Poll::Ready(None)
742                        }
743                    }
744                }
745            }
746        }
747    }
748}
749
750impl<R> Deref for ScopeStream<R> {
751    type Target = ScopeHandle;
752    fn deref(&self) -> &Self::Target {
753        &self.inner
754    }
755}
756
757impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
758    fn borrow(&self) -> &ScopeHandle {
759        self
760    }
761}
762
763impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
764    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
765        let (stream, handle) = ScopeStream::new();
766        for fut in iter {
767            handle.push(fut);
768        }
769        stream.close();
770        stream
771    }
772}
773
774#[derive(Clone)]
775pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
776
777impl<R: Send> ScopeStreamHandle<R> {
778    pub fn push(&self, future: impl Spawnable<Output = R>) {
779        self.0.insert_task(future.into_task(self.0.clone()), true);
780    }
781}
782
783/// Holds a guard on the creating scope, holding off cancelation.
784///
785/// `ScopeActiveGuard` allows [`Scope`]s to perform cooperative cancellation.
786/// [`ScopeActiveGuard::on_cancel`] returns a future that resolves when
787/// [`Scope::cancel`] and [`ScopeHandle::cancel`] are called. That is the signal
788/// sent to cooperative tasks to stop doing work and finish.
789///
790/// A `ScopeActiveGuard` is obtained via [`ScopeHandle::active_guard`].
791/// `ScopeActiveGuard` releases the guard on the originating scope on drop.
792#[derive(Debug)]
793#[must_use]
794pub struct ScopeActiveGuard(ScopeHandle);
795
796impl Deref for ScopeActiveGuard {
797    type Target = ScopeHandle;
798    fn deref(&self) -> &Self::Target {
799        &self.0
800    }
801}
802
803impl Drop for ScopeActiveGuard {
804    fn drop(&mut self) {
805        let Self(scope) = self;
806        scope.release_cancel_guard();
807    }
808}
809
810impl Clone for ScopeActiveGuard {
811    fn clone(&self) -> Self {
812        self.0.lock().acquire_cancel_guard(1);
813        Self(self.0.clone())
814    }
815}
816
817impl ScopeActiveGuard {
818    /// Returns a borrow of the scope handle associated with this guard.
819    pub fn as_handle(&self) -> &ScopeHandle {
820        &self.0
821    }
822
823    /// Returns a clone of the scope handle associated with this guard.
824    pub fn to_handle(&self) -> ScopeHandle {
825        self.0.clone()
826    }
827
828    /// Retrieves a future from this guard that can be polled on for
829    /// cancellation.
830    ///
831    /// The returned future resolves when the scope is cancelled. Callers should
832    /// perform teardown and drop the guard when done.
833    pub async fn on_cancel(&self) {
834        self.0
835            .inner
836            .state
837            .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
838            .await
839    }
840
841    fn new(scope: &ScopeHandle) -> Option<Self> {
842        if scope.lock().acquire_cancel_guard_if_not_finished() {
843            Some(Self(scope.clone()))
844        } else {
845            None
846        }
847    }
848}
849
850//
851// # Internal API
852//
853
854/// A weak reference to a scope.
855#[derive(Clone)]
856struct WeakScopeHandle {
857    inner: Weak<ScopeInner>,
858}
859
860impl WeakScopeHandle {
861    /// Upgrades to a [`ScopeHandle`] if the scope still exists.
862    pub fn upgrade(&self) -> Option<ScopeHandle> {
863        self.inner.upgrade().map(|inner| ScopeHandle { inner })
864    }
865}
866
867impl hash::Hash for WeakScopeHandle {
868    fn hash<H: hash::Hasher>(&self, state: &mut H) {
869        Weak::as_ptr(&self.inner).hash(state);
870    }
871}
872
873impl PartialEq for WeakScopeHandle {
874    fn eq(&self, other: &Self) -> bool {
875        Weak::ptr_eq(&self.inner, &other.inner)
876    }
877}
878
879impl Eq for WeakScopeHandle {
880    // Weak::ptr_eq should return consistent results, even when the inner value
881    // has been dropped.
882}
883
884// This module exists as a privacy boundary so that we can make sure any
885// operation that might cause the scope to finish also wakes its waker.
886mod state {
887    use super::*;
888
889    pub struct ScopeState {
890        pub parent: Option<ScopeHandle>,
891        // LINT.IfChange
892        children: HashSet<WeakScopeHandle>,
893        all_tasks: HashSet<TaskHandle>,
894        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
895        /// The number of children that transitively contain tasks, plus one for
896        /// this scope if it directly contains tasks.
897        subscopes_with_tasks: u32,
898        can_spawn: bool,
899        guards: u32,
900        status: Status,
901        /// Wakers/results for joining each task.
902        pub results: Box<dyn Results>,
903    }
904
905    pub enum JoinResult {
906        Waker(Waker),
907        Result(TaskHandle),
908    }
909
910    #[repr(u8)] // So zxdb can read the status.
911    #[derive(Default, Debug, Clone, Copy)]
912    pub enum Status {
913        #[default]
914        /// The scope is active.
915        Active,
916        /// The scope has been signalled to cancel and is waiting for all guards
917        /// to be released.
918        PendingCancellation,
919        /// The scope is not accepting new tasks and all tasks have been
920        /// scheduled to be dropped.
921        Finished,
922    }
923
924    impl Status {
925        /// Returns whether this records a cancelled state.
926        pub fn is_cancelled(&self) -> bool {
927            match self {
928                Self::Active => false,
929                Self::PendingCancellation | Self::Finished => true,
930            }
931        }
932    }
933
934    impl ScopeState {
935        pub fn new_root(results: Box<impl Results>) -> Self {
936            Self {
937                parent: None,
938                children: Default::default(),
939                all_tasks: Default::default(),
940                subscopes_with_tasks: 0,
941                can_spawn: true,
942                guards: 0,
943                status: Default::default(),
944                results,
945            }
946        }
947
948        pub fn new_child(
949            parent_handle: ScopeHandle,
950            parent_state: &Self,
951            results: Box<impl Results>,
952        ) -> Self {
953            let (status, can_spawn) = match parent_state.status {
954                Status::Active => (Status::Active, parent_state.can_spawn),
955                Status::Finished | Status::PendingCancellation => (Status::Finished, false),
956            };
957            Self {
958                parent: Some(parent_handle),
959                children: Default::default(),
960                all_tasks: Default::default(),
961                subscopes_with_tasks: 0,
962                can_spawn,
963                guards: 0,
964                status,
965                results,
966            }
967        }
968    }
969
970    impl ScopeState {
971        pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
972            &self.all_tasks
973        }
974
975        /// Attempts to add a task to the scope. Returns the task if the scope cannot accept a task
976        /// (since it isn't safe to drop the task whilst the lock is held).
977        pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
978            if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
979                return Some(task);
980            }
981            if self.all_tasks.is_empty() && !self.register_first_task() {
982                return Some(task);
983            }
984            task.wake();
985            assert!(self.all_tasks.insert(task));
986            None
987        }
988
989        pub fn children(&self) -> &HashSet<WeakScopeHandle> {
990            &self.children
991        }
992
993        pub fn insert_child(&mut self, child: WeakScopeHandle) {
994            self.children.insert(child);
995        }
996
997        pub fn remove_child(&mut self, child: &PtrKey) {
998            let found = self.children.remove(child);
999            // This should always succeed unless the scope is being dropped
1000            // (in which case children will be empty).
1001            assert!(found || self.children.is_empty());
1002        }
1003
1004        pub fn status(&self) -> Status {
1005            self.status
1006        }
1007
1008        pub fn guards(&self) -> u32 {
1009            self.guards
1010        }
1011
1012        pub fn close(&mut self) {
1013            self.can_spawn = false;
1014        }
1015
1016        pub fn mark_finished(&mut self) {
1017            self.can_spawn = false;
1018            self.status = Status::Finished;
1019        }
1020
1021        pub fn has_tasks(&self) -> bool {
1022            self.subscopes_with_tasks > 0
1023        }
1024
1025        pub fn wake_all_with_active_guard(&mut self) {
1026            let mut count = 0;
1027            for task in &self.all_tasks {
1028                if task.wake_with_active_guard() {
1029                    count += 1;
1030                }
1031            }
1032            self.acquire_cancel_guard(count);
1033        }
1034
1035        pub fn abort_tasks_and_mark_finished(&mut self) {
1036            for task in self.all_tasks() {
1037                if task.abort() {
1038                    task.scope().executor().ready_tasks.push(task.clone());
1039                }
1040                // Don't bother dropping tasks that are finished; the entire
1041                // scope is going to be dropped soon anyway.
1042            }
1043            self.mark_finished();
1044        }
1045
1046        pub fn wake_wakers_and_mark_pending(
1047            this: &mut ConditionGuard<'_, ScopeState>,
1048            wakers: &mut Vec<Waker>,
1049        ) {
1050            wakers.extend(this.drain_wakers());
1051            this.status = Status::PendingCancellation;
1052        }
1053
1054        /// Registers our first task with the parent scope.
1055        ///
1056        /// Returns false if the scope is not allowed to accept a task.
1057        #[must_use]
1058        fn register_first_task(&mut self) -> bool {
1059            if !self.can_spawn {
1060                return false;
1061            }
1062            let can_spawn = match &self.parent {
1063                Some(parent) => {
1064                    // If our parent already knows we have tasks, we can always
1065                    // spawn. Otherwise, we have to recurse.
1066                    self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1067                }
1068                None => true,
1069            };
1070            if can_spawn {
1071                self.subscopes_with_tasks += 1;
1072                debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1073            };
1074            can_spawn
1075        }
1076
1077        fn on_last_task_removed(
1078            this: &mut ConditionGuard<'_, ScopeState>,
1079            num_wakers_hint: usize,
1080            wakers: &mut Vec<Waker>,
1081        ) {
1082            debug_assert!(this.subscopes_with_tasks > 0);
1083            this.subscopes_with_tasks -= 1;
1084            if this.subscopes_with_tasks > 0 {
1085                wakers.reserve(num_wakers_hint);
1086                return;
1087            }
1088
1089            match &this.parent {
1090                Some(parent) => {
1091                    Self::on_last_task_removed(
1092                        &mut parent.lock(),
1093                        num_wakers_hint + this.waker_count(),
1094                        wakers,
1095                    );
1096                }
1097                None => wakers.reserve(num_wakers_hint),
1098            };
1099            wakers.extend(this.drain_wakers());
1100        }
1101
1102        /// Acquires a cancel guard IFF we're not in the finished state.
1103        ///
1104        /// Returns `true` if a guard was acquired.
1105        pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1106            match self.status {
1107                Status::Active | Status::PendingCancellation => {
1108                    self.acquire_cancel_guard(1);
1109                    true
1110                }
1111                Status::Finished => false,
1112            }
1113        }
1114
1115        pub fn acquire_cancel_guard(&mut self, count: u32) {
1116            if count == 0 {
1117                return;
1118            }
1119            if self.guards == 0 {
1120                if let Some(parent) = self.parent.as_ref() {
1121                    parent.acquire_cancel_guard();
1122                }
1123            }
1124            self.guards += count;
1125        }
1126
1127        pub fn release_cancel_guard(
1128            this: &mut ConditionGuard<'_, Self>,
1129            wake_vec: &mut WakeVec,
1130            mut waker_count: usize,
1131        ) {
1132            this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1133            if this.guards == 0 {
1134                waker_count += this.waker_count();
1135                this.on_zero_guards(wake_vec, waker_count);
1136                wake_vec.0.extend(this.drain_wakers())
1137            } else {
1138                wake_vec.0.reserve_exact(waker_count);
1139            }
1140        }
1141
1142        fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1143            match self.status {
1144                Status::Active => {}
1145                Status::PendingCancellation => {
1146                    self.abort_tasks_and_mark_finished();
1147                }
1148                // Acquiring and releasing guards post finished state is a
1149                // no-op.
1150                Status::Finished => {}
1151            }
1152            if let Some(parent) = &self.parent {
1153                ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1154            }
1155        }
1156    }
1157
1158    #[derive(Default)]
1159    pub struct WakeVec(Vec<Waker>);
1160
1161    impl Drop for WakeVec {
1162        fn drop(&mut self) {
1163            for waker in self.0.drain(..) {
1164                waker.wake();
1165            }
1166        }
1167    }
1168
1169    // WakeVec *must* come after the guard because we want the guard to be dropped first.
1170    pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1171
1172    impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1173        fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1174            Self(value, WakeVec::default())
1175        }
1176    }
1177
1178    impl ScopeWaker<'_> {
1179        pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
1180            let task = self.all_tasks.take(&id);
1181            if task.is_some() {
1182                self.on_task_removed(0);
1183            }
1184            task
1185        }
1186
1187        pub fn task_did_finish(&mut self, id: usize) {
1188            if let Some(task) = self.all_tasks.take(&id) {
1189                self.on_task_removed(1);
1190                if !task.is_detached() {
1191                    let maybe_waker = self.results.task_did_finish(task);
1192                    self.1.0.extend(maybe_waker);
1193                }
1194            }
1195        }
1196
1197        pub fn set_closed_and_drain(
1198            &mut self,
1199        ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1200            self.close();
1201            let all_tasks = std::mem::take(&mut self.all_tasks);
1202            let results = self.results.take();
1203            if !all_tasks.is_empty() {
1204                self.on_task_removed(0)
1205            }
1206            let children = self.children.drain();
1207            (all_tasks, results, children)
1208        }
1209
1210        fn on_task_removed(&mut self, num_wakers_hint: usize) {
1211            if self.all_tasks.is_empty() {
1212                ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1.0)
1213            }
1214        }
1215
1216        pub fn wake_wakers_and_mark_pending(&mut self) {
1217            let Self(state, wakers) = self;
1218            ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1219        }
1220    }
1221
1222    impl<'a> Deref for ScopeWaker<'a> {
1223        type Target = ConditionGuard<'a, ScopeState>;
1224
1225        fn deref(&self) -> &Self::Target {
1226            &self.0
1227        }
1228    }
1229
1230    impl DerefMut for ScopeWaker<'_> {
1231        fn deref_mut(&mut self) -> &mut Self::Target {
1232            &mut self.0
1233        }
1234    }
1235}
1236
1237struct ScopeInner {
1238    executor: Arc<Executor>,
1239    state: Condition<ScopeState>,
1240    name: String,
1241    instrument_data: Option<Box<dyn Any + Send + Sync>>,
1242}
1243
1244impl Drop for ScopeInner {
1245    fn drop(&mut self) {
1246        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1247        // This also complies with the correctness requirements of
1248        // HashSet::remove because the implementations of Hash and Eq match
1249        // between PtrKey and WeakScopeHandle.
1250        let key = unsafe { &*(self as *const _ as *const PtrKey) };
1251        let state = self.state.lock();
1252        if let Some(parent) = &state.parent {
1253            let mut wake_vec = WakeVec::default();
1254            let mut parent_state = parent.lock();
1255            if state.guards() != 0 {
1256                ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1257            }
1258            parent_state.remove_child(key);
1259        }
1260    }
1261}
1262
1263impl ScopeHandle {
1264    fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1265        super::common::TaskHandle::with_current(|task| match task {
1266            Some(task) => f(task.scope()),
1267            None => f(EHandle::local().global_scope()),
1268        })
1269    }
1270
1271    fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1272        self.inner.state.lock()
1273    }
1274
1275    fn downgrade(&self) -> WeakScopeHandle {
1276        WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1277    }
1278
1279    #[inline(always)]
1280    pub(crate) fn executor(&self) -> &Arc<Executor> {
1281        &self.inner.executor
1282    }
1283
1284    /// Marks the task as detached.
1285    pub(crate) fn detach(&self, task_id: usize) {
1286        let _maybe_task = {
1287            let mut state = self.lock();
1288            if let Some(task) = state.all_tasks().get(&task_id) {
1289                task.detach();
1290            }
1291            state.results.detach(task_id)
1292        };
1293    }
1294
1295    /// Aborts the task.
1296    ///
1297    /// # Safety
1298    ///
1299    /// The caller must guarantee that `R` is the correct type.
1300    pub(crate) unsafe fn abort_task<R>(&self, task_id: usize) -> Option<R> {
1301        let mut state = self.lock();
1302        if let Some(task) = state.results.detach(task_id) {
1303            drop(state);
1304            return task.take_result();
1305        }
1306        state.all_tasks().get(&task_id).and_then(|task| {
1307            if task.abort() {
1308                self.inner.executor.ready_tasks.push(task.clone());
1309            }
1310            task.take_result()
1311        })
1312    }
1313
1314    /// Aborts and detaches the task.
1315    pub(crate) fn abort_and_detach(&self, task_id: usize) {
1316        let _tasks = {
1317            let mut state = ScopeWaker::from(self.lock());
1318            let maybe_task1 = state.results.detach(task_id);
1319            let mut maybe_task2 = None;
1320            if let Some(task) = state.all_tasks().get(&task_id) {
1321                match task.abort_and_detach() {
1322                    AbortAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1323                    AbortAndDetachResult::AddToRunQueue => {
1324                        self.inner.executor.ready_tasks.push(task.clone());
1325                    }
1326                    AbortAndDetachResult::Pending => {}
1327                }
1328            }
1329            (maybe_task1, maybe_task2)
1330        };
1331    }
1332
1333    /// Polls for a join result for the given task ID.
1334    ///
1335    /// # Safety
1336    ///
1337    /// The caller must guarantee that `R` is the correct type.
1338    pub(crate) unsafe fn poll_join_result<R>(
1339        &self,
1340        task_id: usize,
1341        cx: &mut Context<'_>,
1342    ) -> Poll<R> {
1343        let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1344        match task.take_result() {
1345            Some(result) => Poll::Ready(result),
1346            None => {
1347                // The task has been aborted so all we can do is forever return pending.
1348                Poll::Pending
1349            }
1350        }
1351    }
1352
1353    /// Polls for the task to be aborted.
1354    pub(crate) unsafe fn poll_aborted<R>(
1355        &self,
1356        task_id: usize,
1357        cx: &mut Context<'_>,
1358    ) -> Poll<Option<R>> {
1359        let task = self.lock().results.poll_join_result(task_id, cx);
1360        task.map(|task| task.take_result())
1361    }
1362
1363    pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1364        let returned_task = self.lock().insert_task(task, for_stream);
1365        returned_task.is_none()
1366    }
1367
1368    /// Drops the specified task.
1369    ///
1370    /// The main task by the single-threaded executor might not be 'static, so we use this to drop
1371    /// the task and make sure we meet lifetime guarantees.  Note that removing the task from our
1372    /// task list isn't sufficient; we must make sure the future running in the task is dropped.
1373    ///
1374    /// # Safety
1375    ///
1376    /// This is unsafe because of the call to `drop_future_unchecked` which requires that no
1377    /// thread is currently polling the task.
1378    pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1379        let mut state = ScopeWaker::from(self.lock());
1380        let task = state.take_task(task_id);
1381        if let Some(task) = task {
1382            task.drop_future_unchecked();
1383        }
1384    }
1385
1386    pub(super) fn task_did_finish(&self, id: usize) {
1387        let mut state = ScopeWaker::from(self.lock());
1388        state.task_did_finish(id);
1389    }
1390
1391    /// Visits scopes by state. If the callback returns `true`, children will
1392    /// be visited.
1393    fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1394        let mut scopes = vec![self.clone()];
1395        while let Some(scope) = scopes.pop() {
1396            let mut scope_waker = ScopeWaker::from(scope.lock());
1397            if callback(&mut scope_waker) {
1398                scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1399            }
1400        }
1401    }
1402
1403    fn acquire_cancel_guard(&self) {
1404        self.lock().acquire_cancel_guard(1)
1405    }
1406
1407    pub(crate) fn release_cancel_guard(&self) {
1408        let mut wake_vec = WakeVec::default();
1409        ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1410    }
1411
1412    /// Cancels tasks in this scope and all child scopes.
1413    fn cancel_all_tasks(&self) {
1414        self.visit_scopes_locked(|state| {
1415            match state.status() {
1416                Status::Active => {
1417                    if state.guards() == 0 {
1418                        state.abort_tasks_and_mark_finished();
1419                    } else {
1420                        state.wake_wakers_and_mark_pending();
1421                    }
1422                    true
1423                }
1424                Status::PendingCancellation => {
1425                    // If we're already pending cancellation, don't wake all
1426                    // tasks. A single wake should be enough here. More
1427                    // wakes on further calls probably hides bugs.
1428                    true
1429                }
1430                Status::Finished => {
1431                    // Already finished.
1432                    false
1433                }
1434            }
1435        });
1436    }
1437
1438    /// Aborts tasks in this scope and all child scopes.
1439    fn abort_all_tasks(&self) {
1440        self.visit_scopes_locked(|state| match state.status() {
1441            Status::Active | Status::PendingCancellation => {
1442                state.abort_tasks_and_mark_finished();
1443                true
1444            }
1445            Status::Finished => false,
1446        });
1447    }
1448
1449    /// Drops tasks in this scope and all child scopes.
1450    ///
1451    /// # Panics
1452    ///
1453    /// Panics if any task is being accessed by another thread. Only call this
1454    /// method when the executor is shutting down and there are no other pollers.
1455    pub(super) fn drop_all_tasks(&self) {
1456        let mut scopes = vec![self.clone()];
1457        while let Some(scope) = scopes.pop() {
1458            let (tasks, join_results) = {
1459                let mut state = ScopeWaker::from(scope.lock());
1460                let (tasks, join_results, children) = state.set_closed_and_drain();
1461                scopes.extend(children.filter_map(|child| child.upgrade()));
1462                (tasks, join_results)
1463            };
1464            // Call task destructors once the scope lock is released so we don't risk a deadlock.
1465            for task in tasks {
1466                task.try_drop().expect("Expected drop to succeed");
1467            }
1468            std::mem::drop(join_results);
1469        }
1470    }
1471}
1472
1473/// Optimizes removal from parent scope.
1474#[repr(transparent)]
1475struct PtrKey;
1476
1477impl Borrow<PtrKey> for WeakScopeHandle {
1478    fn borrow(&self) -> &PtrKey {
1479        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1480        unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1481    }
1482}
1483
1484impl PartialEq for PtrKey {
1485    fn eq(&self, other: &Self) -> bool {
1486        std::ptr::eq(self, other)
1487    }
1488}
1489
1490impl Eq for PtrKey {}
1491
1492impl hash::Hash for PtrKey {
1493    fn hash<H: hash::Hasher>(&self, state: &mut H) {
1494        (self as *const PtrKey).hash(state);
1495    }
1496}
1497
1498#[derive(Default)]
1499struct JoinResults(HashMap<usize, JoinResult>);
1500
1501trait Results: Send + Sync + 'static {
1502    /// Returns true if we allow spawning futures with arbitrary outputs on the scope.
1503    fn can_spawn(&self) -> bool;
1504
1505    /// Polls for the specified task having finished.
1506    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1507
1508    /// Called when a task finishes.
1509    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1510
1511    /// Called to drop any results for a particular task.
1512    fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1513
1514    /// Takes *all* the stored results.
1515    fn take(&mut self) -> Box<dyn Any>;
1516
1517    /// Used only for testing.  Returns true if there are any results registered.
1518    #[cfg(test)]
1519    fn is_empty(&self) -> bool;
1520}
1521
1522impl Results for JoinResults {
1523    fn can_spawn(&self) -> bool {
1524        true
1525    }
1526
1527    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1528        match self.0.entry(task_id) {
1529            Entry::Occupied(mut o) => match o.get_mut() {
1530                JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1531                JoinResult::Result(_) => {
1532                    let JoinResult::Result(task) = o.remove() else { unreachable!() };
1533                    return Poll::Ready(task);
1534                }
1535            },
1536            Entry::Vacant(v) => {
1537                v.insert(JoinResult::Waker(cx.waker().clone()));
1538            }
1539        }
1540        Poll::Pending
1541    }
1542
1543    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1544        match self.0.entry(task.id()) {
1545            Entry::Occupied(mut o) => {
1546                let JoinResult::Waker(waker) =
1547                    std::mem::replace(o.get_mut(), JoinResult::Result(task))
1548                else {
1549                    // It can't be JoinResult::Result because this function is the only
1550                    // function that sets that, and `task_did_finish` won't get called
1551                    // twice.
1552                    unreachable!()
1553                };
1554                Some(waker)
1555            }
1556            Entry::Vacant(v) => {
1557                v.insert(JoinResult::Result(task));
1558                None
1559            }
1560        }
1561    }
1562
1563    fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1564        match self.0.remove(&task_id) {
1565            Some(JoinResult::Result(task)) => Some(task),
1566            _ => None,
1567        }
1568    }
1569
1570    fn take(&mut self) -> Box<dyn Any> {
1571        Box::new(Self(std::mem::take(&mut self.0)))
1572    }
1573
1574    #[cfg(test)]
1575    fn is_empty(&self) -> bool {
1576        self.0.is_empty()
1577    }
1578}
1579
1580#[derive(Default)]
1581struct ResultsStream<R> {
1582    inner: Arc<Mutex<ResultsStreamInner<R>>>,
1583}
1584
1585struct ResultsStreamInner<R> {
1586    results: Vec<R>,
1587    waker: Option<Waker>,
1588}
1589
1590impl<R> Default for ResultsStreamInner<R> {
1591    fn default() -> Self {
1592        Self { results: Vec::new(), waker: None }
1593    }
1594}
1595
1596impl<R: Send + 'static> Results for ResultsStream<R> {
1597    fn can_spawn(&self) -> bool {
1598        false
1599    }
1600
1601    fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1602        Poll::Pending
1603    }
1604
1605    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1606        let mut inner = self.inner.lock();
1607        // SAFETY: R is guaranteed to be the same return type as all futures finishing on this
1608        // scope.
1609        inner.results.extend(unsafe { task.take_result() });
1610        inner.waker.take()
1611    }
1612
1613    fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1614        None
1615    }
1616
1617    fn take(&mut self) -> Box<dyn Any> {
1618        Box::new(std::mem::take(&mut self.inner.lock().results))
1619    }
1620
1621    #[cfg(test)]
1622    fn is_empty(&self) -> bool {
1623        false
1624    }
1625}
1626
1627#[cfg(test)]
1628mod tests {
1629    // NOTE: Tests that work on both the fuchsia and portable runtimes should be placed in
1630    // runtime/scope.rs.
1631
1632    use super::*;
1633    use crate::{
1634        EHandle, LocalExecutor, SendExecutorBuilder, SpawnableFuture, Task, TestExecutor, Timer,
1635        yield_now,
1636    };
1637    use fuchsia_sync::{Condvar, Mutex};
1638    use futures::channel::mpsc;
1639    use futures::{FutureExt, StreamExt};
1640    use std::future::pending;
1641    use std::pin::{Pin, pin};
1642    use std::sync::Arc;
1643    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1644    use std::task::{Context, Poll};
1645    use std::time::Duration;
1646
1647    #[derive(Default)]
1648    struct RemoteControlFuture(Mutex<RCFState>);
1649    #[derive(Default)]
1650    struct RCFState {
1651        resolved: bool,
1652        waker: Option<Waker>,
1653    }
1654
1655    impl Future for &RemoteControlFuture {
1656        type Output = ();
1657        fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1658            let mut this = self.0.lock();
1659            if this.resolved {
1660                Poll::Ready(())
1661            } else {
1662                this.waker.replace(cx.waker().clone());
1663                Poll::Pending
1664            }
1665        }
1666    }
1667
1668    impl RemoteControlFuture {
1669        fn new() -> Arc<Self> {
1670            Arc::new(Default::default())
1671        }
1672
1673        fn resolve(&self) {
1674            let mut this = self.0.lock();
1675            this.resolved = true;
1676            if let Some(waker) = this.waker.take() {
1677                waker.wake();
1678            }
1679        }
1680
1681        fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1682            let this = Arc::clone(self);
1683            #[allow(clippy::redundant_async_block)] // Allow returning `&*this` out of this fn.
1684            async move {
1685                (&*this).await
1686            }
1687        }
1688    }
1689
1690    #[test]
1691    fn compute_works_on_root_scope() {
1692        let mut executor = TestExecutor::new();
1693        let scope = executor.global_scope();
1694        let mut task = pin!(scope.compute(async { 1 }));
1695        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1696    }
1697
1698    #[test]
1699    fn compute_works_on_new_child() {
1700        let mut executor = TestExecutor::new();
1701        let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1702        let mut task = pin!(scope.compute(async { 1 }));
1703        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1704    }
1705
1706    #[test]
1707    fn scope_drop_cancels_tasks() {
1708        let mut executor = TestExecutor::new();
1709        let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1710        let mut task = pin!(scope.compute(async { 1 }));
1711        drop(scope);
1712        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1713    }
1714
1715    #[test]
1716    fn tasks_do_not_spawn_on_cancelled_scopes() {
1717        let mut executor = TestExecutor::new();
1718        let scope =
1719            executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1720        let handle = scope.to_handle();
1721        let mut cancel = pin!(scope.cancel());
1722        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1723        let mut task = pin!(handle.compute(async { 1 }));
1724        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1725    }
1726
1727    #[test]
1728    fn tasks_do_not_spawn_on_closed_empty_scopes() {
1729        let mut executor = TestExecutor::new();
1730        let scope =
1731            executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1732        let handle = scope.to_handle();
1733        let mut close = pin!(scope.cancel());
1734        assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1735        let mut task = pin!(handle.compute(async { 1 }));
1736        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1737    }
1738
1739    #[test]
1740    fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1741        let mut executor = TestExecutor::new();
1742        let scope = executor.global_scope().new_child();
1743        let handle = scope.to_handle();
1744        handle.spawn(pending());
1745        let mut close = pin!(scope.close());
1746        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1747        let mut task = pin!(handle.compute(async { 1 }));
1748        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1749    }
1750
1751    #[test]
1752    fn spawn_works_on_child_and_grandchild() {
1753        let mut executor = TestExecutor::new();
1754        let scope = executor.global_scope().new_child();
1755        let child = scope.new_child();
1756        let grandchild = child.new_child();
1757        let mut child_task = pin!(child.compute(async { 1 }));
1758        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1759        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1760        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1761    }
1762
1763    #[test]
1764    fn spawn_drop_cancels_child_and_grandchild_tasks() {
1765        let mut executor = TestExecutor::new();
1766        let scope = executor.global_scope().new_child();
1767        let child = scope.new_child();
1768        let grandchild = child.new_child();
1769        let mut child_task = pin!(child.compute(async { 1 }));
1770        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1771        drop(scope);
1772        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1773        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1774    }
1775
1776    #[test]
1777    fn completed_tasks_are_cleaned_up_after_cancel() {
1778        let mut executor = TestExecutor::new();
1779        let scope = executor.global_scope().new_child();
1780
1781        let task1 = scope.spawn(pending::<()>());
1782        let task2 = scope.spawn(async {});
1783        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1784        assert_eq!(scope.lock().all_tasks().len(), 1);
1785
1786        // Running the executor after cancelling the task isn't currently
1787        // necessary, but we might decide to do async cleanup in the future.
1788        assert_eq!(task1.abort().now_or_never(), None);
1789        assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1790
1791        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1792        assert_eq!(scope.lock().all_tasks().len(), 0);
1793        assert!(scope.lock().results.is_empty());
1794    }
1795
1796    #[test]
1797    fn join_emtpy_scope() {
1798        let mut executor = TestExecutor::new();
1799        let scope = executor.global_scope().new_child();
1800        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1801    }
1802
1803    #[test]
1804    fn task_handle_preserves_access_to_result_after_join_begins() {
1805        let mut executor = TestExecutor::new();
1806        let scope = executor.global_scope().new_child();
1807        let mut task = scope.compute(async { 1 });
1808        scope.spawn(async {});
1809        let task2 = scope.spawn(pending::<()>());
1810        // Fuse to stay agnostic as to whether the join completes before or
1811        // after awaiting the task handle.
1812        let mut join = pin!(scope.join().fuse());
1813        let _ = executor.run_until_stalled(&mut join);
1814        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1815        drop(task2.abort());
1816        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1817    }
1818
1819    #[test]
1820    fn join_blocks_until_task_is_cancelled() {
1821        // Scope with one outstanding task handle and one cancelled task.
1822        // The scope is not complete until the outstanding task handle is cancelled.
1823        let mut executor = TestExecutor::new();
1824        let scope = executor.global_scope().new_child();
1825        let outstanding_task = scope.spawn(pending::<()>());
1826        let cancelled_task = scope.spawn(pending::<()>());
1827        assert_eq!(
1828            executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1829            Poll::Ready(None)
1830        );
1831        let mut join = pin!(scope.join());
1832        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1833        assert_eq!(
1834            executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1835            Poll::Ready(None)
1836        );
1837        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1838    }
1839
1840    #[test]
1841    fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1842        let mut executor = TestExecutor::new();
1843        let scope = executor.global_scope().new_child();
1844        // The default is to detach.
1845        scope.spawn(pending::<()>());
1846        let mut join = pin!(scope.join());
1847        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1848        let mut cancel = pin!(join.cancel());
1849        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1850    }
1851
1852    #[test]
1853    fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1854        let mut executor = TestExecutor::new();
1855        let scope = executor.global_scope().new_child();
1856        // The default is to detach.
1857        scope.spawn(pending::<()>());
1858        let mut close = pin!(scope.close());
1859        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1860        let mut cancel = pin!(close.cancel());
1861        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1862    }
1863
1864    #[test]
1865    fn join_scope_blocks_until_spawned_task_completes() {
1866        let mut executor = TestExecutor::new();
1867        let scope = executor.global_scope().new_child();
1868        let remote = RemoteControlFuture::new();
1869        let mut task = scope.spawn(remote.as_future());
1870        let mut scope_join = pin!(scope.join());
1871        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1872        remote.resolve();
1873        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1874        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1875    }
1876
1877    #[test]
1878    fn close_scope_blocks_until_spawned_task_completes() {
1879        let mut executor = TestExecutor::new();
1880        let scope = executor.global_scope().new_child();
1881        let remote = RemoteControlFuture::new();
1882        let mut task = scope.spawn(remote.as_future());
1883        let mut scope_close = pin!(scope.close());
1884        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1885        remote.resolve();
1886        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1887        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1888    }
1889
1890    #[test]
1891    fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1892        let mut executor = TestExecutor::new();
1893        let scope = executor.global_scope().new_child();
1894        let child = scope.new_child();
1895        let remote = RemoteControlFuture::new();
1896        child.spawn(remote.as_future());
1897        let mut scope_join = pin!(scope.join());
1898        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1899        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1900        child.detach();
1901        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1902        remote.resolve();
1903        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1904    }
1905
1906    #[test]
1907    fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1908        let mut executor = TestExecutor::new();
1909        let scope = executor.global_scope().new_child();
1910        let remote = RemoteControlFuture::new();
1911        {
1912            let remote = remote.clone();
1913            scope.spawn(async move {
1914                let child = Scope::new_with_name("child");
1915                child.spawn(async move {
1916                    Scope::current().spawn(remote.as_future());
1917                });
1918                child.detach();
1919            });
1920        }
1921        let mut scope_join = pin!(scope.join());
1922        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1923        remote.resolve();
1924        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1925    }
1926
1927    #[test]
1928    fn join_scope_blocks_when_blocked_child_is_detached() {
1929        let mut executor = TestExecutor::new();
1930        let scope = executor.global_scope().new_child();
1931        let child = scope.new_child();
1932        child.spawn(pending());
1933        let mut scope_join = pin!(scope.join());
1934        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1935        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1936        child.detach();
1937        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1938    }
1939
1940    #[test]
1941    fn join_scope_completes_when_blocked_child_is_cancelled() {
1942        let mut executor = TestExecutor::new();
1943        let scope = executor.global_scope().new_child();
1944        let child = scope.new_child();
1945        child.spawn(pending());
1946        let mut scope_join = pin!(scope.join());
1947        {
1948            let mut child_join = pin!(child.join());
1949            assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1950            assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1951        }
1952        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1953    }
1954
1955    #[test]
1956    fn detached_scope_can_spawn() {
1957        let mut executor = TestExecutor::new();
1958        let scope = executor.global_scope().new_child();
1959        let handle = scope.to_handle();
1960        scope.detach();
1961        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1962    }
1963
1964    #[test]
1965    fn dropped_scope_cannot_spawn() {
1966        let mut executor = TestExecutor::new();
1967        let scope = executor.global_scope().new_child();
1968        let handle = scope.to_handle();
1969        drop(scope);
1970        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1971    }
1972
1973    #[test]
1974    fn dropped_scope_with_running_task_cannot_spawn() {
1975        let mut executor = TestExecutor::new();
1976        let scope = executor.global_scope().new_child();
1977        let handle = scope.to_handle();
1978        let _running_task = handle.spawn(pending::<()>());
1979        drop(scope);
1980        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1981    }
1982
1983    #[test]
1984    fn joined_scope_cannot_spawn() {
1985        let mut executor = TestExecutor::new();
1986        let scope = executor.global_scope().new_child();
1987        let handle = scope.to_handle();
1988        let mut scope_join = pin!(scope.join());
1989        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1990        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1991    }
1992
1993    #[test]
1994    fn joining_scope_with_running_task_can_spawn() {
1995        let mut executor = TestExecutor::new();
1996        let scope = executor.global_scope().new_child();
1997        let handle = scope.to_handle();
1998        let _running_task = handle.spawn(pending::<()>());
1999        let mut scope_join = pin!(scope.join());
2000        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
2001        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2002    }
2003
2004    #[test]
2005    fn joined_scope_child_cannot_spawn() {
2006        let mut executor = TestExecutor::new();
2007        let scope = executor.global_scope().new_child();
2008        let handle = scope.to_handle();
2009        let child_before_join = scope.new_child();
2010        assert_eq!(
2011            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2012            Poll::Ready(1)
2013        );
2014        let mut scope_join = pin!(scope.join());
2015        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2016        let child_after_join = handle.new_child();
2017        let grandchild_after_join = child_before_join.new_child();
2018        assert_eq!(
2019            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2020            Poll::Pending
2021        );
2022        assert_eq!(
2023            executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
2024            Poll::Pending
2025        );
2026        assert_eq!(
2027            executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
2028            Poll::Pending
2029        );
2030    }
2031
2032    #[test]
2033    fn closed_scope_child_cannot_spawn() {
2034        let mut executor = TestExecutor::new();
2035        let scope = executor.global_scope().new_child();
2036        let handle = scope.to_handle();
2037        let child_before_close = scope.new_child();
2038        assert_eq!(
2039            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2040            Poll::Ready(1)
2041        );
2042        let mut scope_close = pin!(scope.close());
2043        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2044        let child_after_close = handle.new_child();
2045        let grandchild_after_close = child_before_close.new_child();
2046        assert_eq!(
2047            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2048            Poll::Pending
2049        );
2050        assert_eq!(
2051            executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2052            Poll::Pending
2053        );
2054        assert_eq!(
2055            executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2056            Poll::Pending
2057        );
2058    }
2059
2060    #[test]
2061    fn can_join_child_first() {
2062        let mut executor = TestExecutor::new();
2063        let scope = executor.global_scope().new_child();
2064        let child = scope.new_child();
2065        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2066        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2067        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2068    }
2069
2070    #[test]
2071    fn can_join_parent_first() {
2072        let mut executor = TestExecutor::new();
2073        let scope = executor.global_scope().new_child();
2074        let child = scope.new_child();
2075        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2076        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2077        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2078    }
2079
2080    #[test]
2081    fn task_in_parent_scope_can_join_child() {
2082        let mut executor = TestExecutor::new();
2083        let scope = executor.global_scope().new_child();
2084        let child = scope.new_child();
2085        let remote = RemoteControlFuture::new();
2086        child.spawn(remote.as_future());
2087        scope.spawn(async move { child.join().await });
2088        let mut join = pin!(scope.join());
2089        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2090        remote.resolve();
2091        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2092    }
2093
2094    #[test]
2095    fn join_completes_while_completed_task_handle_is_held() {
2096        let mut executor = TestExecutor::new();
2097        let scope = executor.global_scope().new_child();
2098        let mut task = scope.compute(async { 1 });
2099        scope.spawn(async {});
2100        let mut join = pin!(scope.join());
2101        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2102        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2103    }
2104
2105    #[test]
2106    fn cancel_completes_while_task_holds_handle() {
2107        let mut executor = TestExecutor::new();
2108        let scope = executor.global_scope().new_child();
2109        let handle = scope.to_handle();
2110        let mut task = scope.compute(async move {
2111            loop {
2112                pending::<()>().await; // never returns
2113                handle.spawn(async {});
2114            }
2115        });
2116
2117        // Join should not complete because the task never does.
2118        let mut join = pin!(scope.join());
2119        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2120
2121        let mut cancel = pin!(join.cancel());
2122        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2123        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2124    }
2125
2126    #[test]
2127    fn cancel_from_handle_inside_task() {
2128        let mut executor = TestExecutor::new();
2129        let scope = executor.global_scope().new_child();
2130        {
2131            // Spawn a task that never finishes until the scope is cancelled.
2132            scope.spawn(pending::<()>());
2133
2134            let mut no_tasks = pin!(scope.on_no_tasks());
2135            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2136
2137            let handle = scope.to_handle();
2138            scope.spawn(async move {
2139                handle.cancel().await;
2140                panic!("cancel() should never complete");
2141            });
2142
2143            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2144        }
2145        assert_eq!(scope.join().now_or_never(), Some(()));
2146    }
2147
2148    #[test]
2149    fn can_spawn_from_non_executor_thread() {
2150        let mut executor = TestExecutor::new();
2151        let scope = executor.global_scope().clone();
2152        let done = Arc::new(AtomicBool::new(false));
2153        let done_clone = done.clone();
2154        let _ = std::thread::spawn(move || {
2155            scope.spawn(async move {
2156                done_clone.store(true, Ordering::Relaxed);
2157            })
2158        })
2159        .join();
2160        let _ = executor.run_until_stalled(&mut pending::<()>());
2161        assert!(done.load(Ordering::Relaxed));
2162    }
2163
2164    #[test]
2165    fn scope_tree() {
2166        // A
2167        //  \
2168        //   B
2169        //  / \
2170        // C   D
2171        let mut executor = TestExecutor::new();
2172        let a = executor.global_scope().new_child();
2173        let b = a.new_child();
2174        let c = b.new_child();
2175        let d = b.new_child();
2176        let a_remote = RemoteControlFuture::new();
2177        let c_remote = RemoteControlFuture::new();
2178        let d_remote = RemoteControlFuture::new();
2179        a.spawn(a_remote.as_future());
2180        c.spawn(c_remote.as_future());
2181        d.spawn(d_remote.as_future());
2182        let mut a_join = pin!(a.join());
2183        let mut b_join = pin!(b.join());
2184        let mut d_join = pin!(d.join());
2185        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2186        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2187        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2188        d_remote.resolve();
2189        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2190        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2191        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2192        c_remote.resolve();
2193        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2194        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2195        a_remote.resolve();
2196        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2197        let mut c_join = pin!(c.join());
2198        assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2199    }
2200
2201    #[test]
2202    fn wake_all_with_active_guard_on_send_executor() {
2203        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2204        let scope = executor.root_scope().new_child();
2205
2206        let (tx, mut rx) = mpsc::unbounded();
2207        // Bottom 32 bits are the poll count. Top 32 bits are when to signal.
2208        let state = Arc::new(AtomicU64::new(0));
2209
2210        struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2211
2212        impl Future for PollCounter {
2213            type Output = ();
2214            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2215                let old = self.0.fetch_add(1, Ordering::Relaxed);
2216                if old >> 32 == (old + 1) & u32::MAX as u64 {
2217                    let _ = self.1.unbounded_send(());
2218                }
2219                Poll::Pending
2220            }
2221        }
2222
2223        scope.spawn(PollCounter(state.clone(), tx.clone()));
2224        scope.spawn(PollCounter(state.clone(), tx.clone()));
2225
2226        executor.run(async move {
2227            let mut wait_for_poll_count = async |count| {
2228                let old = state.fetch_or(count << 32, Ordering::Relaxed);
2229                if old & u32::MAX as u64 != count {
2230                    rx.next().await.unwrap();
2231                }
2232                state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2233            };
2234
2235            // We must assume the executor will only poll the two tasks once each.
2236            wait_for_poll_count(2).await;
2237
2238            let mut start_count = 2;
2239            for _ in 0..2 {
2240                scope.wake_all_with_active_guard();
2241
2242                wait_for_poll_count(start_count + 2).await;
2243                start_count += 2;
2244            }
2245
2246            // Wake, then cancel the scope and verify the tasks still get polled.
2247            scope.wake_all_with_active_guard();
2248            let done = scope.cancel();
2249
2250            wait_for_poll_count(start_count + 2).await;
2251
2252            done.await;
2253        });
2254    }
2255
2256    #[test]
2257    fn on_no_tasks_race() {
2258        fn sleep_random() {
2259            std::thread::sleep(std::time::Duration::from_micros(rand::random_range(0..10)));
2260        }
2261        for _ in 0..2000 {
2262            let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2263            let scope = executor.root_scope().new_child();
2264            scope.spawn(async {
2265                sleep_random();
2266            });
2267            executor.run(async move {
2268                sleep_random();
2269                scope.on_no_tasks().await;
2270            });
2271        }
2272    }
2273
2274    #[test]
2275    fn test_detach() {
2276        let mut e = LocalExecutor::default();
2277        e.run_singlethreaded(async {
2278            let counter = Arc::new(AtomicU32::new(0));
2279
2280            {
2281                let counter = counter.clone();
2282                Task::spawn(async move {
2283                    for _ in 0..5 {
2284                        yield_now().await;
2285                        counter.fetch_add(1, Ordering::Relaxed);
2286                    }
2287                })
2288                .detach();
2289            }
2290
2291            while counter.load(Ordering::Relaxed) != 5 {
2292                yield_now().await;
2293            }
2294        });
2295
2296        assert!(e.ehandle.root_scope.lock().results.is_empty());
2297    }
2298
2299    #[test]
2300    fn test_cancel() {
2301        let mut e = LocalExecutor::default();
2302        e.run_singlethreaded(async {
2303            let ref_count = Arc::new(());
2304            // First, just drop the task.
2305            {
2306                let ref_count = ref_count.clone();
2307                drop(Task::spawn(async move {
2308                    let _ref_count = ref_count;
2309                    let _: () = std::future::pending().await;
2310                }));
2311            }
2312
2313            while Arc::strong_count(&ref_count) != 1 {
2314                yield_now().await;
2315            }
2316
2317            // Now try explicitly cancelling.
2318            let task = {
2319                let ref_count = ref_count.clone();
2320                Task::spawn(async move {
2321                    let _ref_count = ref_count;
2322                    let _: () = std::future::pending().await;
2323                })
2324            };
2325
2326            assert_eq!(task.abort().await, None);
2327            while Arc::strong_count(&ref_count) != 1 {
2328                yield_now().await;
2329            }
2330
2331            // Now cancel a task that has already finished.
2332            let task = {
2333                let ref_count = ref_count.clone();
2334                Task::spawn(async move {
2335                    let _ref_count = ref_count;
2336                })
2337            };
2338
2339            // Wait for it to finish.
2340            while Arc::strong_count(&ref_count) != 1 {
2341                yield_now().await;
2342            }
2343
2344            assert_eq!(task.abort().await, Some(()));
2345        });
2346
2347        assert!(e.ehandle.root_scope.lock().results.is_empty());
2348    }
2349
2350    #[test]
2351    fn test_cancel_waits() {
2352        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2353        let state = Arc::new((Mutex::new(0), Condvar::new()));
2354        let task = {
2355            let state = state.clone();
2356            executor.root_scope().compute(async move {
2357                *state.0.lock() = 1;
2358                state.1.notify_all();
2359                // Wait till the other task has noticed we changed state to 1.
2360                state.1.wait_while(&mut state.0.lock(), |state| *state == 1);
2361                std::thread::sleep(std::time::Duration::from_millis(10));
2362                *state.0.lock() = 3;
2363                "foo"
2364            })
2365        };
2366        executor.run(async move {
2367            state.1.wait_while(&mut state.0.lock(), |state| {
2368                if *state == 1 {
2369                    // Tell the other task we've noticed state 1.
2370                    *state = 2;
2371                    false
2372                } else {
2373                    true
2374                }
2375            });
2376            state.1.notify_all();
2377            assert_eq!(task.abort().await, Some("foo"));
2378            // The other task should have finished and set state to 3.
2379            assert_eq!(*state.0.lock(), 3);
2380        });
2381    }
2382
2383    fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2384        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2385        let running = Arc::new((Mutex::new(false), Condvar::new()));
2386        let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2387        let task = {
2388            let running = running.clone();
2389            let can_quit = can_quit.clone();
2390            executor.root_scope().compute(async move {
2391                *running.0.lock() = true;
2392                running.1.notify_all();
2393                {
2394                    let mut guard = can_quit.0.lock();
2395                    while !*guard {
2396                        can_quit.1.wait(&mut guard);
2397                    }
2398                }
2399                *running.0.lock() = false;
2400            })
2401        };
2402        executor.run(async move {
2403            {
2404                let mut guard = running.0.lock();
2405                while !*guard {
2406                    running.1.wait(&mut guard);
2407                }
2408            }
2409
2410            callback(task);
2411
2412            *can_quit.0.lock() = true;
2413            can_quit.1.notify_all();
2414
2415            let ehandle = EHandle::local();
2416            let scope = ehandle.global_scope();
2417
2418            // The only way of testing for this is to poll.
2419            while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2420                Timer::new(std::time::Duration::from_millis(1)).await;
2421            }
2422
2423            assert!(!*running.0.lock());
2424        });
2425    }
2426
2427    #[test]
2428    fn test_dropped_cancel_cleans_up() {
2429        test_clean_up(|task| {
2430            let abort_fut = std::pin::pin!(task.abort());
2431            let waker = futures::task::noop_waker();
2432            assert!(abort_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2433        });
2434    }
2435
2436    #[test]
2437    fn test_dropped_task_cleans_up() {
2438        test_clean_up(|task| {
2439            std::mem::drop(task);
2440        });
2441    }
2442
2443    #[test]
2444    fn test_detach_cleans_up() {
2445        test_clean_up(|task| {
2446            task.detach();
2447        });
2448    }
2449
2450    #[test]
2451    fn test_scope_stream() {
2452        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2453        executor.run(async move {
2454            let (stream, handle) = ScopeStream::new();
2455            handle.push(async { 1 });
2456            handle.push(async { 2 });
2457            stream.close();
2458            let results: HashSet<_> = stream.collect().await;
2459            assert_eq!(results, HashSet::from_iter([1, 2]));
2460        });
2461    }
2462
2463    #[test]
2464    fn test_scope_stream_wakes_properly() {
2465        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2466        executor.run(async move {
2467            let (stream, handle) = ScopeStream::new();
2468            handle.push(async {
2469                Timer::new(Duration::from_millis(10)).await;
2470                1
2471            });
2472            handle.push(async {
2473                Timer::new(Duration::from_millis(10)).await;
2474                2
2475            });
2476            stream.close();
2477            let results: HashSet<_> = stream.collect().await;
2478            assert_eq!(results, HashSet::from_iter([1, 2]));
2479        });
2480    }
2481
2482    #[test]
2483    fn test_scope_stream_drops_spawned_tasks() {
2484        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2485        executor.run(async move {
2486            let (stream, handle) = ScopeStream::new();
2487            handle.push(async { 1 });
2488            let _task = stream.compute(async { "foo" });
2489            stream.close();
2490            let results: HashSet<_> = stream.collect().await;
2491            assert_eq!(results, HashSet::from_iter([1]));
2492        });
2493    }
2494
2495    #[test]
2496    fn test_nested_scope_stream() {
2497        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2498        executor.run(async move {
2499            let (mut stream, handle) = ScopeStream::new();
2500            handle.clone().push(async move {
2501                handle.clone().push(async move {
2502                    handle.clone().push(async move { 3 });
2503                    2
2504                });
2505                1
2506            });
2507            let mut results = HashSet::default();
2508            while let Some(item) = stream.next().await {
2509                results.insert(item);
2510                if results.len() == 3 {
2511                    stream.close();
2512                }
2513            }
2514            assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2515        });
2516    }
2517
2518    #[test]
2519    fn test_dropping_scope_stream_cancels_all_tasks() {
2520        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2521        executor.run(async move {
2522            let (stream, handle) = ScopeStream::new();
2523            let (tx1, mut rx) = mpsc::unbounded::<()>();
2524            let tx2 = tx1.clone();
2525            handle.push(async move {
2526                let _tx1 = tx1;
2527                let () = pending().await;
2528            });
2529            handle.push(async move {
2530                let _tx2 = tx2;
2531                let () = pending().await;
2532            });
2533            drop(stream);
2534
2535            // This will wait forever if the tasks aren't cancelled.
2536            assert_eq!(rx.next().await, None);
2537        });
2538    }
2539
2540    #[test]
2541    fn test_scope_stream_collect() {
2542        let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2543        executor.run(async move {
2544            let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2545            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2546
2547            let stream: ScopeStream<_> =
2548                (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2549            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2550        });
2551    }
2552
2553    struct DropSignal(Arc<AtomicBool>);
2554
2555    impl Drop for DropSignal {
2556        fn drop(&mut self) {
2557            self.0.store(true, Ordering::SeqCst);
2558        }
2559    }
2560
2561    struct DropChecker(Arc<AtomicBool>);
2562
2563    impl DropChecker {
2564        fn new() -> (Self, DropSignal) {
2565            let inner = Arc::new(AtomicBool::new(false));
2566            (Self(inner.clone()), DropSignal(inner))
2567        }
2568
2569        fn is_dropped(&self) -> bool {
2570            self.0.load(Ordering::SeqCst)
2571        }
2572    }
2573
2574    #[test]
2575    fn child_finished_when_parent_pending() {
2576        let mut executor = LocalExecutor::default();
2577        executor.run_singlethreaded(async {
2578            let scope = Scope::new();
2579            let _guard = scope.active_guard().expect("acquire guard");
2580            let cancel = scope.to_handle().cancel();
2581            let child = scope.new_child();
2582            let (checker, signal) = DropChecker::new();
2583            child.spawn(async move {
2584                let _signal = signal;
2585                futures::future::pending::<()>().await
2586            });
2587            assert!(checker.is_dropped());
2588            assert!(child.active_guard().is_none());
2589            cancel.await;
2590        })
2591    }
2592
2593    #[test]
2594    fn guarded_scopes_observe_closed() {
2595        let mut executor = LocalExecutor::default();
2596        executor.run_singlethreaded(async {
2597            let scope = Scope::new();
2598            let handle = scope.to_handle();
2599            let _guard = scope.active_guard().expect("acquire guard");
2600            handle.close();
2601            let (checker, signal) = DropChecker::new();
2602            handle.spawn(async move {
2603                let _signal = signal;
2604                futures::future::pending::<()>().await
2605            });
2606            assert!(checker.is_dropped());
2607            let (checker, signal) = DropChecker::new();
2608            let cancel = handle.clone().cancel();
2609            handle.spawn(async move {
2610                let _signal = signal;
2611                futures::future::pending::<()>().await
2612            });
2613            assert!(checker.is_dropped());
2614            scope.join().await;
2615            cancel.await;
2616        })
2617    }
2618
2619    #[test]
2620    fn child_guard_holds_parent_cancellation() {
2621        let mut executor = TestExecutor::new();
2622        let scope = executor.global_scope().new_child();
2623        let child = scope.new_child();
2624        let guard = child.active_guard().expect("acquire guard");
2625        scope.spawn(futures::future::pending());
2626        let mut join = pin!(scope.cancel());
2627        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2628        drop(guard);
2629        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2630    }
2631
2632    #[test]
2633    fn active_guard_on_cancel() {
2634        let mut executor = TestExecutor::new();
2635        let scope = executor.global_scope().new_child();
2636        let child1 = scope.new_child();
2637        let child2 = scope.new_child();
2638        let guard = child1.active_guard().expect("acquire guard");
2639        let guard_for_right_scope = guard.clone();
2640        let guard_for_wrong_scope = guard.clone();
2641        child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2642        child2.spawn(async move {
2643            guard_for_wrong_scope.on_cancel().await;
2644        });
2645
2646        let handle = scope.to_handle();
2647        let mut join = pin!(scope.join());
2648        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2649        let cancel: Join<_> = handle.cancel();
2650        drop(cancel);
2651        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2652    }
2653
2654    #[test]
2655    fn abort_join() {
2656        let mut executor = TestExecutor::new();
2657        let scope = executor.global_scope().new_child();
2658        let child = scope.new_child();
2659        let _guard = child.active_guard().expect("acquire guard");
2660
2661        let (checker1, signal) = DropChecker::new();
2662        scope.spawn(async move {
2663            let _signal = signal;
2664            futures::future::pending::<()>().await
2665        });
2666        let (checker2, signal) = DropChecker::new();
2667        scope.spawn(async move {
2668            let _signal = signal;
2669            futures::future::pending::<()>().await
2670        });
2671
2672        let mut join = pin!(scope.cancel());
2673        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2674        assert!(!checker1.is_dropped());
2675        assert!(!checker2.is_dropped());
2676
2677        let mut join = join.abort();
2678        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2679        assert!(checker1.is_dropped());
2680        assert!(checker2.is_dropped());
2681    }
2682
2683    #[test]
2684    fn child_without_guard_aborts_immediately_on_cancel() {
2685        let mut executor = TestExecutor::new();
2686        let scope = executor.global_scope().new_child();
2687        let child = scope.new_child();
2688        let guard = scope.active_guard().expect("acquire guard");
2689
2690        let (checker_scope, signal) = DropChecker::new();
2691        scope.spawn(async move {
2692            let _signal = signal;
2693            futures::future::pending::<()>().await
2694        });
2695        let (checker_child, signal) = DropChecker::new();
2696        child.spawn(async move {
2697            let _signal = signal;
2698            futures::future::pending::<()>().await
2699        });
2700
2701        let mut join = pin!(scope.cancel());
2702        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2703        assert!(!checker_scope.is_dropped());
2704        assert!(checker_child.is_dropped());
2705
2706        drop(guard);
2707        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2708        assert!(checker_child.is_dropped());
2709    }
2710}