fuchsia_async/runtime/fuchsia/executor/
scope.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::task::JoinHandle;
6use super::atomic_future::{AtomicFutureHandle, CancelAndDetachResult};
7use super::common::{Executor, TaskHandle};
8use crate::condition::{Condition, ConditionGuard, WakerEntry};
9use crate::EHandle;
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{ready, Context, Poll, Waker};
26use std::{fmt, hash};
27
28//
29// # Public API
30//
31
32/// A scope for managing async tasks. This scope is cancelled when dropped.
33///
34/// Scopes are how fuchsia-async implements [structured concurrency][sc]. Every
35/// task is spawned on a scope, and runs until either the task completes or the
36/// scope is cancelled. In addition to owning tasks, scopes may own child
37/// scopes, forming a nested structure.
38///
39/// Scopes are usually joined or cancelled when the owning code is done with
40/// them. This makes it easier to reason about when a background task might
41/// still be running. Note that in multithreaded contexts it is safer to cancel
42/// and await a scope explicitly than to drop it, because the destructor is not
43/// synchronized with other threads that might be running a task.
44///
45/// [`Task::spawn`][crate::Task::spawn] and related APIs spawn on the root scope
46/// of the executor. New code is encouraged to spawn directly on scopes instead,
47/// passing their handles as a way of documenting when a function might spawn
48/// tasks that run in the background and reasoning about their side effects.
49///
50/// ## Scope lifecycle
51///
52/// When a scope is created it is open, meaning it accepts new tasks. Scopes are
53/// closed when one of the following happens:
54///
55/// 1. When [`close()`][Scope::close] is called.
56/// 2. When the scope is cancelled or dropped, the scope is closed immediately.
57/// 3. When the scope is joined and all tasks complete, the scope is closed
58///    before the join future resolves.
59///
60/// When a scope is closed it no longer accepts tasks. Tasks spawned on the
61/// scope are dropped immediately, and their [`Task`][crate::Task] or
62/// [`JoinHandle`][crate::JoinHandle] futures never resolve. This applies
63/// transitively to all child scopes. Closed scopes cannot currently be
64/// reopened.
65///
66/// Scopes can also be detached, in which case they are never closed, and run
67/// until the completion of all tasks.
68///
69/// [sc]: https://en.wikipedia.org/wiki/Structured_concurrency
70#[must_use = "Scopes should be explicitly awaited or cancelled"]
71#[derive(Debug)]
72pub struct Scope {
73    // LINT.IfChange
74    inner: ScopeHandle,
75    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
76}
77
78impl Scope {
79    /// Create a new scope.
80    ///
81    /// The returned scope is a child of the current scope.
82    ///
83    /// # Panics
84    ///
85    /// May panic if not called in the context of an executor (e.g. within a
86    /// call to [`run`][crate::SendExecutor::run]).
87    pub fn new() -> Scope {
88        ScopeHandle::with_current(|handle| handle.new_child())
89    }
90
91    /// Create a new scope with a name.
92    ///
93    /// The returned scope is a child of the current scope.
94    ///
95    /// # Panics
96    ///
97    /// May panic if not called in the context of an executor (e.g. within a
98    /// call to [`run`][crate::SendExecutor::run]).
99    pub fn new_with_name(name: impl Into<String>) -> Scope {
100        ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
101    }
102
103    /// Get the scope of the current task, or the global scope if there is no task
104    /// being polled.
105    ///
106    /// # Panics
107    ///
108    /// May panic if not called in the context of an executor (e.g. within a
109    /// call to [`run`][crate::SendExecutor::run]).
110    pub fn current() -> ScopeHandle {
111        ScopeHandle::with_current(|handle| handle.clone())
112    }
113
114    /// Get the global scope of the executor.
115    ///
116    /// This can be used to spawn tasks that live as long as the executor.
117    /// Usually, this means until the end of the program or test. This should
118    /// only be done for tasks where this is expected. If in doubt, spawn on a
119    /// shorter lived scope instead.
120    ///
121    /// In code that uses scopes, you are strongly encouraged to use this API
122    /// instead of the spawn APIs on [`Task`][crate::Task].
123    ///
124    /// All scopes are descendants of the global scope.
125    ///
126    /// # Panics
127    ///
128    /// May panic if not called in the context of an executor (e.g. within a
129    /// call to [`run`][crate::SendExecutor::run]).
130    pub fn global() -> ScopeHandle {
131        EHandle::local().global_scope().clone()
132    }
133
134    /// Create a child scope.
135    pub fn new_child(&self) -> Scope {
136        self.inner.new_child()
137    }
138
139    /// Create a child scope with a name.
140    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
141        self.inner.new_child_with_name(name.into())
142    }
143
144    /// Returns the name of the scope.
145    pub fn name(&self) -> &str {
146        &self.inner.inner.name
147    }
148
149    /// Create a [`ScopeHandle`] that may be used to spawn tasks on this scope.
150    ///
151    /// This is a shorthand for `scope.as_handle().clone()`.
152    ///
153    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
154    /// available. Note that you should _not_ call `scope.clone()`, even though
155    /// the compiler allows it due to the Deref impl. Call this method instead.
156    pub fn to_handle(&self) -> ScopeHandle {
157        self.inner.clone()
158    }
159
160    /// Get a reference to a [`ScopeHandle`] that may be used to spawn tasks on
161    /// this scope.
162    ///
163    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
164    /// available. If you have a `Scope` but need a `&ScopeHandle`, prefer
165    /// calling this method over the less readable `&*scope`.
166    pub fn as_handle(&self) -> &ScopeHandle {
167        &self.inner
168    }
169
170    /// Wait for all tasks in the scope and its children to complete.
171    ///
172    /// New tasks will be accepted on the scope until every task completes and
173    /// this future resolves.
174    ///
175    /// Note that you can await a scope directly because it implements
176    /// `IntoFuture`. `scope.join().await` is a more explicit form of
177    /// `scope.await`.
178    pub fn join(self) -> Join {
179        Join::new(self)
180    }
181
182    /// Stop accepting new tasks on the scope. Returns a future that waits for
183    /// every task on the scope to complete.
184    pub fn close(self) -> Join {
185        self.inner.close();
186        Join::new(self)
187    }
188
189    /// Cancel all tasks in the scope and its children recursively.
190    ///
191    /// Once the returned future resolves, no task on the scope will be polled
192    /// again.
193    ///
194    /// When a scope is cancelled it immediately stops accepting tasks. Handles
195    /// of tasks spawned on the scope will pend forever.
196    ///
197    /// Dropping the `Scope` object is equivalent to calling this method and
198    /// discarding the returned future. Awaiting the future is preferred because
199    /// it eliminates the possibility of a task poll completing on another
200    /// thread after the scope object has been dropped, which can sometimes
201    /// result in surprising behavior.
202    pub fn cancel(self) -> impl Future<Output = ()> {
203        self.inner.cancel_all_tasks();
204        Join::new(self)
205    }
206
207    /// Detach the scope, allowing its tasks to continue running in the
208    /// background.
209    ///
210    /// Tasks of a detached scope are still subject to join and cancel
211    /// operations on parent scopes.
212    pub fn detach(self) {
213        // Use ManuallyDrop to destructure self, because Rust doesn't allow this
214        // for types which implement Drop.
215        let this = ManuallyDrop::new(self);
216        // SAFETY: this.inner is obviously valid, and we don't access `this`
217        // after moving.
218        mem::drop(unsafe { std::ptr::read(&this.inner) });
219    }
220}
221
222/// Cancel the scope and all of its tasks. Prefer using the [`Scope::cancel`]
223/// or [`Scope::join`] methods.
224impl Drop for Scope {
225    fn drop(&mut self) {
226        // Cancel all tasks in the scope. Each task has a strong reference to the ScopeState,
227        // which will be dropped after all the tasks in the scope are dropped.
228
229        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
230        // here, but we cannot do that without either:
231        // - Sync drop support in AtomicFuture, or
232        // - The ability to reparent tasks, which requires atomic_arc or
233        //   acquiring a mutex during polling.
234        self.inner.cancel_all_tasks();
235    }
236}
237
238impl IntoFuture for Scope {
239    type Output = ();
240    type IntoFuture = Join;
241    fn into_future(self) -> Self::IntoFuture {
242        self.join()
243    }
244}
245
246impl Deref for Scope {
247    type Target = ScopeHandle;
248    fn deref(&self) -> &Self::Target {
249        &self.inner
250    }
251}
252
253impl Borrow<ScopeHandle> for Scope {
254    fn borrow(&self) -> &ScopeHandle {
255        &*self
256    }
257}
258
259pin_project! {
260    /// Join operation for a [`Scope`].
261    ///
262    /// This is a future that resolves when all tasks on the scope are complete
263    /// or have been cancelled. New tasks will be accepted on the scope until
264    /// every task completes and this future resolves.
265    ///
266    /// When this object is dropped, the scope and all tasks in it are
267    /// cancelled.
268    //
269    // Note: The drop property is only true when S = Scope; it does not apply to
270    // other (non-public) uses of this struct where S = ScopeHandle.
271    pub struct Join<S = Scope> {
272        scope: S,
273        #[pin]
274        waker_entry: WakerEntry<ScopeState>,
275    }
276}
277
278impl<S> Join<S> {
279    fn new(scope: S) -> Self {
280        Self { scope, waker_entry: WakerEntry::new() }
281    }
282}
283
284impl Join {
285    /// Cancel the scope. The future will resolve when all tasks have finished
286    /// polling.
287    ///
288    /// See [`Scope::cancel`] for more details.
289    pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
290        self.scope.inner.cancel_all_tasks();
291        self
292    }
293}
294
295impl<S> Future for Join<S>
296where
297    S: Borrow<ScopeHandle>,
298{
299    type Output = ();
300    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
301        let this = self.project();
302        let mut state = Borrow::borrow(&*this.scope).lock();
303        if state.has_tasks() {
304            state.add_waker(this.waker_entry, cx.waker().clone());
305            Poll::Pending
306        } else {
307            state.mark_finished();
308            Poll::Ready(())
309        }
310    }
311}
312
313/// Trait for things that can be spawned on to a scope.  There is a blanket implementation
314/// below for futures.
315pub trait Spawnable {
316    /// The type of value produced on completion.
317    type Output;
318
319    /// Converts to a task that can be spawned directly.
320    fn into_task(self, scope: ScopeHandle) -> TaskHandle;
321}
322
323impl<F: Future + Send + 'static> Spawnable for F
324where
325    F::Output: Send + 'static,
326{
327    type Output = F::Output;
328
329    fn into_task(self, scope: ScopeHandle) -> TaskHandle {
330        scope.new_task(None, self)
331    }
332}
333
334/// A handle to a scope, which may be used to spawn tasks.
335///
336/// ## Ownership and cycles
337///
338/// Tasks running on a `Scope` may hold a `ScopeHandle` to that scope. This does
339/// not create an ownership cycle because the task will drop the `ScopeHandle`
340/// once it completes or is cancelled.
341///
342/// Naturally, scopes containing tasks that never complete and that are never
343/// cancelled will never be freed. Holding a `ScopeHandle` does not contribute to
344/// this problem.
345#[derive(Clone)]
346pub struct ScopeHandle {
347    // LINT.IfChange
348    inner: Arc<ScopeInner>,
349    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
350}
351
352impl ScopeHandle {
353    /// Create a child scope.
354    pub fn new_child(&self) -> Scope {
355        self.new_child_inner(String::new())
356    }
357
358    /// Create a child scope.
359    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
360        self.new_child_inner(name.into())
361    }
362
363    fn new_child_inner(&self, name: String) -> Scope {
364        let mut state = self.lock();
365        let child = ScopeHandle {
366            inner: Arc::new(ScopeInner {
367                executor: self.inner.executor.clone(),
368                state: Condition::new(ScopeState::new(
369                    Some(self.clone()),
370                    state.status(),
371                    JoinResults::default().into(),
372                )),
373                name,
374            }),
375        };
376        let weak = child.downgrade();
377        state.insert_child(weak);
378        Scope { inner: child }
379    }
380
381    /// Spawn a new task on the scope.
382    // This does not have the must_use attribute because it's common to detach and the lifetime of
383    // the task is bound to the scope: when the scope is dropped, the task will be cancelled.
384    pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
385        let task = future.into_task(self.clone());
386        let task_id = task.id();
387        self.insert_task(task, false);
388        JoinHandle::new(self.clone(), task_id)
389    }
390
391    /// Spawn a new task on the scope of a thread local executor.
392    ///
393    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
394    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
395    pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
396        let task = self.new_local_task(None, future);
397        let id = task.id();
398        self.insert_task(task, false);
399        JoinHandle::new(self.clone(), id)
400    }
401
402    /// Like `spawn`, but for tasks that return a result.
403    ///
404    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
405    /// *cancelled*.
406    pub fn compute<T: Send + 'static>(
407        &self,
408        future: impl Spawnable<Output = T> + Send + 'static,
409    ) -> crate::Task<T> {
410        let task = future.into_task(self.clone());
411        let id = task.id();
412        self.insert_task(task, false);
413        JoinHandle::new(self.clone(), id).into()
414    }
415
416    /// Like `spawn`, but for tasks that return a result.
417    ///
418    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
419    /// *cancelled*.
420    ///
421    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
422    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
423    pub fn compute_local<T: 'static>(
424        &self,
425        future: impl Future<Output = T> + 'static,
426    ) -> crate::Task<T> {
427        let task = self.new_local_task(None, future);
428        let id = task.id();
429        self.insert_task(task, false);
430        JoinHandle::new(self.clone(), id).into()
431    }
432
433    pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
434        ScopeHandle {
435            inner: Arc::new(ScopeInner {
436                executor,
437                state: Condition::new(ScopeState::new(
438                    None,
439                    Status::default(),
440                    JoinResults::default().into(),
441                )),
442                name: "root".to_string(),
443            }),
444        }
445    }
446
447    /// Stop the scope from accepting new tasks.
448    ///
449    /// Note that unlike [`Scope::close`], this does not return a future that
450    /// waits for all tasks to complete. This could lead to resource leaks
451    /// because it is not uncommon to access a TaskGroup from a task running on
452    /// the scope itself. If such a task were to await a future returned by this
453    /// method it would suspend forever waiting for itself to complete.
454    pub fn close(&self) {
455        self.lock().close();
456    }
457
458    /// Cancel all the scope's tasks.
459    ///
460    /// Note that if this is called from within a task running on the scope, the
461    /// task will not resume from the next await point.
462    pub fn cancel(self) -> impl Future<Output = ()> {
463        self.cancel_all_tasks();
464        Join::new(self)
465    }
466
467    // Joining the scope could be allowed from a ScopeHandle, but the use case
468    // seems less common and more bug prone than cancelling. We don't allow this
469    // for the same reason we don't return a future from close().
470
471    /// Wait for there to be no tasks. This is racy: as soon as this returns it is possible for
472    /// another task to have been spawned on this scope.
473    pub async fn on_no_tasks(&self) {
474        self.inner
475            .state
476            .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
477            .await;
478    }
479
480    /// Wake all the scope's tasks so their futures will be polled again.
481    pub fn wake_all(&self) {
482        self.lock().wake_all();
483    }
484
485    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
486    /// That must be done separately.
487    pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
488        &self,
489        id: Option<usize>,
490        fut: Fut,
491    ) -> AtomicFutureHandle<'a>
492    where
493        Fut::Output: Send,
494    {
495        AtomicFutureHandle::new(
496            Some(self.clone()),
497            id.unwrap_or_else(|| self.executor().next_task_id()),
498            fut,
499        )
500    }
501
502    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
503    /// That must be done separately.
504    pub(crate) fn new_local_task<'a>(
505        &self,
506        id: Option<usize>,
507        fut: impl Future + 'a,
508    ) -> AtomicFutureHandle<'a> {
509        // Check that the executor is local.
510        if !self.executor().is_local() {
511            panic!(
512                "Error: called `new_local_task` on multithreaded executor. \
513                 Use `spawn` or a `LocalExecutor` instead."
514            );
515        }
516
517        // SAFETY: We've confirmed that the futures here will never be used across multiple threads,
518        // so the Send requirements that `new_local` requires should be met.
519        unsafe {
520            AtomicFutureHandle::new_local(
521                Some(self.clone()),
522                id.unwrap_or_else(|| self.executor().next_task_id()),
523                fut,
524            )
525        }
526    }
527}
528
529impl fmt::Debug for ScopeHandle {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        f.debug_struct("Scope").field("name", &self.inner.name).finish()
532    }
533}
534
535/// Similar to a scope but all futures spawned on the scope *must* finish with the same result type.
536/// That allows the scope to return a stream of results. Attempting to spawn tasks using
537/// `ScopeHandle::spawn` (or similar) will result in tasks that are immediately dropped (just as if
538/// the scope was closed).  Like a regular scope, the scope can be closed, at which point the stream
539/// will terminate once all the tasks have finished.  This is designed to be a fairly close drop-in
540/// replacement to `FuturesUnordered`, the principle difference being that the tasks run in parallel
541/// rather than just concurrently.  Another difference is that the futures don't need to be the same
542/// type; only the outputs do.  In all other respects, the scope operates like a regular scope i.e.
543/// it can have children, you can join them, cancel them, etc.
544pub struct ScopeStream<R> {
545    inner: ScopeHandle,
546    stream: Arc<Mutex<ResultsStreamInner<R>>>,
547}
548
549impl<R: Send + 'static> ScopeStream<R> {
550    /// Creates a new scope stream.
551    ///
552    /// The returned scope stream is a child of the current scope.
553    ///
554    /// # Panics
555    ///
556    /// May panic if not called in the context of an executor (e.g. within a
557    /// call to [`run`][crate::SendExecutor::run]).
558    pub fn new() -> (Self, ScopeStreamHandle<R>) {
559        Self::new_inner(String::new())
560    }
561
562    /// Creates a new scope stream with a name.
563    ///
564    /// The returned scope stream is a child of the current scope.
565    ///
566    /// # Panics
567    ///
568    /// May panic if not called in the context of an executor (e.g. within a
569    /// call to [`run`][crate::SendExecutor::run]).
570    pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
571        Self::new_inner(name.into())
572    }
573
574    fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
575        let this = ScopeHandle::with_current(|handle| {
576            let mut state = handle.lock();
577            let stream = Arc::default();
578            let child = ScopeHandle {
579                inner: Arc::new(ScopeInner {
580                    executor: handle.executor().clone(),
581                    state: Condition::new(ScopeState::new(
582                        Some(handle.clone()),
583                        state.status(),
584                        Box::new(ResultsStream { inner: Arc::clone(&stream) }),
585                    )),
586                    name,
587                }),
588            };
589            let weak = child.downgrade();
590            state.insert_child(weak);
591            ScopeStream { inner: child, stream }
592        });
593        let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
594        (this, handle)
595    }
596}
597
598impl<R> Drop for ScopeStream<R> {
599    fn drop(&mut self) {
600        // Cancel all tasks in the scope. Each task has a strong reference to the ScopeState,
601        // which will be dropped after all the tasks in the scope are dropped.
602
603        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
604        // here, but we cannot do that without either:
605        // - Sync drop support in AtomicFuture, or
606        // - The ability to reparent tasks, which requires atomic_arc or
607        //   acquiring a mutex during polling.
608        self.inner.cancel_all_tasks();
609    }
610}
611
612impl<R: Send + 'static> Stream for ScopeStream<R> {
613    type Item = R;
614
615    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
616        let mut stream_inner = self.stream.lock();
617        match stream_inner.results.pop() {
618            Some(result) => Poll::Ready(Some(result)),
619            None => {
620                // Lock ordering: when results are posted, the state lock is taken first, so we must
621                // do the same.
622                drop(stream_inner);
623                let state = self.inner.lock();
624                let mut stream_inner = self.stream.lock();
625                match stream_inner.results.pop() {
626                    Some(result) => Poll::Ready(Some(result)),
627                    None => {
628                        if state.has_tasks() {
629                            stream_inner.waker = Some(cx.waker().clone());
630                            Poll::Pending
631                        } else {
632                            Poll::Ready(None)
633                        }
634                    }
635                }
636            }
637        }
638    }
639}
640
641impl<R> Deref for ScopeStream<R> {
642    type Target = ScopeHandle;
643    fn deref(&self) -> &Self::Target {
644        &self.inner
645    }
646}
647
648impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
649    fn borrow(&self) -> &ScopeHandle {
650        &*self
651    }
652}
653
654impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
655    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
656        let (stream, handle) = ScopeStream::new();
657        for fut in iter {
658            handle.push(fut);
659        }
660        stream.close();
661        stream
662    }
663}
664
665#[derive(Clone)]
666pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
667
668impl<R: Send> ScopeStreamHandle<R> {
669    pub fn push(&self, future: impl Spawnable<Output = R>) {
670        self.0.insert_task(future.into_task(self.0.clone()), true);
671    }
672}
673
674//
675// # Internal API
676//
677
678/// A weak reference to a scope.
679#[derive(Clone)]
680struct WeakScopeHandle {
681    inner: Weak<ScopeInner>,
682}
683
684impl WeakScopeHandle {
685    /// Upgrades to a [`ScopeHandle`] if the scope still exists.
686    pub fn upgrade(&self) -> Option<ScopeHandle> {
687        self.inner.upgrade().map(|inner| ScopeHandle { inner })
688    }
689}
690
691impl hash::Hash for WeakScopeHandle {
692    fn hash<H: hash::Hasher>(&self, state: &mut H) {
693        Weak::as_ptr(&self.inner).hash(state);
694    }
695}
696
697impl PartialEq for WeakScopeHandle {
698    fn eq(&self, other: &Self) -> bool {
699        Weak::ptr_eq(&self.inner, &other.inner)
700    }
701}
702
703impl Eq for WeakScopeHandle {
704    // Weak::ptr_eq should return consistent results, even when the inner value
705    // has been dropped.
706}
707
708// This module exists as a privacy boundary so that we can make sure any
709// operation that might cause the scope to finish also wakes its waker.
710mod state {
711    use super::*;
712
713    pub struct ScopeState {
714        pub parent: Option<ScopeHandle>,
715        // LINT.IfChange
716        children: HashSet<WeakScopeHandle>,
717        all_tasks: HashSet<TaskHandle>,
718        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
719        /// The number of children that transitively contain tasks, plus one for
720        /// this scope if it directly contains tasks.
721        subscopes_with_tasks: u32,
722        status: Status,
723        /// Wakers/results for joining each task.
724        pub results: Box<dyn Results>,
725    }
726
727    pub enum JoinResult {
728        Waker(Waker),
729        Result(TaskHandle),
730    }
731
732    #[repr(u8)] // So zxdb can read the status.
733    #[derive(Default, Debug, Clone, Copy)]
734    pub enum Status {
735        #[default]
736        /// The scope is accepting new tasks.
737        Open,
738        /// The scope is no longer accepting new tasks.
739        Closed,
740        /// The scope is not accepting new tasks and all tasks have completed.
741        ///
742        /// This is purely an optimization; it is not guaranteed to be set.
743        Finished,
744    }
745
746    impl Status {
747        pub fn can_spawn(&self) -> bool {
748            match self {
749                Status::Open => true,
750                Status::Closed | Status::Finished => false,
751            }
752        }
753
754        pub fn might_have_running_tasks(&self) -> bool {
755            match self {
756                Status::Open | Status::Closed => true,
757                Status::Finished => false,
758            }
759        }
760    }
761
762    impl ScopeState {
763        pub fn new(
764            parent: Option<ScopeHandle>,
765            status: Status,
766            results: Box<impl Results>,
767        ) -> Self {
768            Self {
769                parent,
770                children: Default::default(),
771                all_tasks: Default::default(),
772                subscopes_with_tasks: 0,
773                status,
774                results,
775            }
776        }
777    }
778
779    impl ScopeState {
780        pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
781            &self.all_tasks
782        }
783
784        /// Attempts to add a task to the scope. Returns the task if the scope cannot accept a task
785        /// (since it isn't safe to drop the task whilst the lock is held).
786        pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
787            if !self.status.can_spawn() || (!for_stream && !self.results.can_spawn()) {
788                return Some(task);
789            }
790            if self.all_tasks.is_empty() && !self.register_first_task() {
791                return Some(task);
792            }
793            task.wake();
794            assert!(self.all_tasks.insert(task));
795            None
796        }
797
798        pub fn children(&self) -> &HashSet<WeakScopeHandle> {
799            &self.children
800        }
801
802        pub fn insert_child(&mut self, child: WeakScopeHandle) {
803            self.children.insert(child);
804        }
805
806        pub fn remove_child(&mut self, child: &PtrKey) {
807            let found = self.children.remove(child);
808            // This should always succeed unless the scope is being dropped
809            // (in which case children will be empty).
810            assert!(found || self.children.is_empty());
811        }
812
813        pub fn status(&self) -> Status {
814            self.status
815        }
816
817        pub fn close(&mut self) {
818            self.status = Status::Closed;
819        }
820
821        pub fn mark_finished(&mut self) {
822            self.status = Status::Finished;
823        }
824
825        pub fn has_tasks(&self) -> bool {
826            self.subscopes_with_tasks > 0
827        }
828
829        pub fn wake_all(&self) {
830            for task in &self.all_tasks {
831                task.wake();
832            }
833        }
834
835        /// Registers our first task with the parent scope.
836        ///
837        /// Returns false if the scope is not allowed to accept a task.
838        #[must_use]
839        fn register_first_task(&mut self) -> bool {
840            if !self.status.can_spawn() {
841                return false;
842            }
843            let can_spawn = match &self.parent {
844                Some(parent) => {
845                    // If our parent already knows we have tasks, we can always
846                    // spawn. Otherwise, we have to recurse.
847                    self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
848                }
849                None => true,
850            };
851            if can_spawn {
852                self.subscopes_with_tasks += 1;
853                debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
854            };
855            can_spawn
856        }
857
858        fn on_last_task_removed(
859            this: &mut ConditionGuard<'_, ScopeState>,
860            num_wakers_hint: usize,
861            wakers: &mut Vec<Waker>,
862        ) {
863            debug_assert!(this.subscopes_with_tasks > 0);
864            this.subscopes_with_tasks -= 1;
865            if this.subscopes_with_tasks > 0 {
866                wakers.reserve(num_wakers_hint);
867                return;
868            }
869
870            match &this.parent {
871                Some(parent) => {
872                    Self::on_last_task_removed(
873                        &mut parent.lock(),
874                        num_wakers_hint + this.waker_count(),
875                        wakers,
876                    );
877                }
878                None => wakers.reserve(num_wakers_hint),
879            };
880            wakers.extend(this.drain_wakers());
881        }
882    }
883
884    #[derive(Default)]
885    struct WakeVec(Vec<Waker>);
886
887    impl Drop for WakeVec {
888        fn drop(&mut self) {
889            for waker in self.0.drain(..) {
890                waker.wake();
891            }
892        }
893    }
894
895    // WakeVec *must* come after the guard because we want the guard to be dropped first.
896    pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
897
898    impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
899        fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
900            Self(value, WakeVec::default())
901        }
902    }
903
904    impl ScopeWaker<'_> {
905        pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
906            let task = self.all_tasks.take(&id);
907            if task.is_some() {
908                self.on_task_removed(0);
909            }
910            task
911        }
912
913        pub fn task_did_finish(&mut self, id: usize) {
914            if let Some(task) = self.all_tasks.take(&id) {
915                self.on_task_removed(1);
916                if !task.is_detached() {
917                    let maybe_waker = self.results.task_did_finish(task);
918                    self.1 .0.extend(maybe_waker);
919                }
920            }
921        }
922
923        pub fn set_closed_and_drain(
924            &mut self,
925        ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
926            self.close();
927            let all_tasks = std::mem::take(&mut self.all_tasks);
928            let results = self.results.take();
929            if !all_tasks.is_empty() {
930                self.on_task_removed(0)
931            }
932            let children = self.children.drain();
933            (all_tasks, results, children)
934        }
935
936        fn on_task_removed(&mut self, num_wakers_hint: usize) {
937            if self.all_tasks.is_empty() {
938                ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1 .0)
939            }
940        }
941    }
942
943    impl<'a> Deref for ScopeWaker<'a> {
944        type Target = ConditionGuard<'a, ScopeState>;
945
946        fn deref(&self) -> &Self::Target {
947            &self.0
948        }
949    }
950
951    impl DerefMut for ScopeWaker<'_> {
952        fn deref_mut(&mut self) -> &mut Self::Target {
953            &mut self.0
954        }
955    }
956}
957
958struct ScopeInner {
959    executor: Arc<Executor>,
960    state: Condition<ScopeState>,
961    name: String,
962}
963
964impl Drop for ScopeInner {
965    fn drop(&mut self) {
966        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
967        // This also complies with the correctness requirements of
968        // HashSet::remove because the implementations of Hash and Eq match
969        // between PtrKey and WeakScopeHandle.
970        let key = unsafe { &*(self as *const _ as *const PtrKey) };
971        if let Some(parent) = &self.state.lock().parent {
972            let mut parent_state = parent.lock();
973            parent_state.remove_child(key);
974        }
975    }
976}
977
978impl ScopeHandle {
979    fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
980        super::common::TaskHandle::with_current(|task| match task {
981            Some(task) => f(task.scope()),
982            None => f(EHandle::local().global_scope()),
983        })
984    }
985
986    fn lock(&self) -> ConditionGuard<'_, ScopeState> {
987        self.inner.state.lock()
988    }
989
990    fn downgrade(&self) -> WeakScopeHandle {
991        WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
992    }
993
994    #[inline(always)]
995    pub(crate) fn executor(&self) -> &Arc<Executor> {
996        &self.inner.executor
997    }
998
999    /// Marks the task as detached.
1000    pub(crate) fn detach(&self, task_id: usize) {
1001        let _maybe_task = {
1002            let mut state = self.lock();
1003            if let Some(task) = state.all_tasks().get(&task_id) {
1004                task.detach();
1005            }
1006            state.results.detach(task_id)
1007        };
1008    }
1009
1010    /// Cancels the task.
1011    ///
1012    /// # Safety
1013    ///
1014    /// The caller must guarantee that `R` is the correct type.
1015    pub(crate) unsafe fn cancel_task<R>(&self, task_id: usize) -> Option<R> {
1016        let mut state = self.lock();
1017        if let Some(task) = state.results.detach(task_id) {
1018            drop(state);
1019            return task.take_result();
1020        }
1021        state.all_tasks().get(&task_id).and_then(|task| {
1022            if task.cancel() {
1023                self.inner.executor.ready_tasks.push(task.clone());
1024            }
1025            task.take_result()
1026        })
1027    }
1028
1029    /// Cancels and detaches the task.
1030    pub(crate) fn cancel_and_detach(&self, task_id: usize) {
1031        let _tasks = {
1032            let mut state = ScopeWaker::from(self.lock());
1033            let maybe_task1 = state.results.detach(task_id);
1034            let mut maybe_task2 = None;
1035            if let Some(task) = state.all_tasks().get(&task_id) {
1036                match task.cancel_and_detach() {
1037                    CancelAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1038                    CancelAndDetachResult::AddToRunQueue => {
1039                        self.inner.executor.ready_tasks.push(task.clone());
1040                    }
1041                    CancelAndDetachResult::Pending => {}
1042                }
1043            }
1044            (maybe_task1, maybe_task2)
1045        };
1046    }
1047
1048    /// Polls for a join result for the given task ID.
1049    ///
1050    /// # Safety
1051    ///
1052    /// The caller must guarantee that `R` is the correct type.
1053    pub(crate) unsafe fn poll_join_result<R>(
1054        &self,
1055        task_id: usize,
1056        cx: &mut Context<'_>,
1057    ) -> Poll<R> {
1058        let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1059        match task.take_result() {
1060            Some(result) => Poll::Ready(result),
1061            None => {
1062                // The task has been cancelled so all we can do is forever return pending.
1063                Poll::Pending
1064            }
1065        }
1066    }
1067
1068    /// Polls for the task to be cancelled.
1069    pub(crate) unsafe fn poll_cancelled<R>(
1070        &self,
1071        task_id: usize,
1072        cx: &mut Context<'_>,
1073    ) -> Poll<Option<R>> {
1074        let task = self.lock().results.poll_join_result(task_id, cx);
1075        task.map(|task| task.take_result())
1076    }
1077
1078    pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1079        let returned_task = self.lock().insert_task(task, for_stream);
1080        returned_task.is_none()
1081    }
1082
1083    /// Drops the specified task.
1084    ///
1085    /// The main task by the single-threaded executor might not be 'static, so we use this to drop
1086    /// the task and make sure we meet lifetime guarantees.  Note that removing the task from our
1087    /// task list isn't sufficient; we must make sure the future running in the task is dropped.
1088    ///
1089    /// # Safety
1090    ///
1091    /// This is unsafe because of the call to `drop_future_unchecked` which requires that no
1092    /// thread is currently polling the task.
1093    pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1094        let mut state = ScopeWaker::from(self.lock());
1095        let task = state.take_task(task_id);
1096        if let Some(task) = task {
1097            task.drop_future_unchecked();
1098        }
1099    }
1100
1101    pub(super) fn task_did_finish(&self, id: usize) {
1102        let mut state = ScopeWaker::from(self.lock());
1103        state.task_did_finish(id);
1104    }
1105
1106    /// Cancels tasks in this scope and all child scopes.
1107    fn cancel_all_tasks(&self) {
1108        let mut scopes = vec![self.clone()];
1109        while let Some(scope) = scopes.pop() {
1110            let mut state = scope.lock();
1111            if !state.status().might_have_running_tasks() {
1112                // Already cancelled or closed.
1113                continue;
1114            }
1115            for task in state.all_tasks() {
1116                if task.cancel() {
1117                    task.scope().executor().ready_tasks.push(task.clone());
1118                }
1119                // Don't bother dropping tasks that are finished; the entire
1120                // scope is going to be dropped soon anyway.
1121            }
1122            // Copy children to a vec so we don't hold the lock for too long.
1123            scopes.extend(state.children().iter().filter_map(|child| child.upgrade()));
1124            state.mark_finished();
1125        }
1126    }
1127
1128    /// Drops tasks in this scope and all child scopes.
1129    ///
1130    /// # Panics
1131    ///
1132    /// Panics if any task is being accessed by another thread. Only call this
1133    /// method when the executor is shutting down and there are no other pollers.
1134    pub(super) fn drop_all_tasks(&self) {
1135        let mut scopes = vec![self.clone()];
1136        while let Some(scope) = scopes.pop() {
1137            let (tasks, join_results) = {
1138                let mut state = ScopeWaker::from(scope.lock());
1139                let (tasks, join_results, children) = state.set_closed_and_drain();
1140                scopes.extend(children.filter_map(|child| child.upgrade()));
1141                (tasks, join_results)
1142            };
1143            // Call task destructors once the scope lock is released so we don't risk a deadlock.
1144            for task in tasks {
1145                task.try_drop().expect("Expected drop to succeed");
1146            }
1147            std::mem::drop(join_results);
1148        }
1149    }
1150}
1151
1152/// Optimizes removal from parent scope.
1153#[repr(transparent)]
1154struct PtrKey;
1155
1156impl Borrow<PtrKey> for WeakScopeHandle {
1157    fn borrow(&self) -> &PtrKey {
1158        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1159        unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1160    }
1161}
1162
1163impl PartialEq for PtrKey {
1164    fn eq(&self, other: &Self) -> bool {
1165        self as *const _ == other as *const _
1166    }
1167}
1168
1169impl Eq for PtrKey {}
1170
1171impl hash::Hash for PtrKey {
1172    fn hash<H: hash::Hasher>(&self, state: &mut H) {
1173        (self as *const PtrKey).hash(state);
1174    }
1175}
1176
1177#[derive(Default)]
1178struct JoinResults(HashMap<usize, JoinResult>);
1179
1180trait Results: Send + Sync + 'static {
1181    /// Returns true if we allow spawning futures with arbitrary outputs on the scope.
1182    fn can_spawn(&self) -> bool;
1183
1184    /// Polls for the specified task having finished.
1185    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1186
1187    /// Called when a task finishes.
1188    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1189
1190    /// Called to drop any results for a particular task.
1191    fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1192
1193    /// Takes *all* the stored results.
1194    fn take(&mut self) -> Box<dyn Any>;
1195
1196    /// Used only for testing.  Returns true if there are any results registered.
1197    #[cfg(test)]
1198    fn is_empty(&self) -> bool;
1199}
1200
1201impl Results for JoinResults {
1202    fn can_spawn(&self) -> bool {
1203        true
1204    }
1205
1206    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1207        match self.0.entry(task_id) {
1208            Entry::Occupied(mut o) => match o.get_mut() {
1209                JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1210                JoinResult::Result(_) => {
1211                    let JoinResult::Result(task) = o.remove() else { unreachable!() };
1212                    return Poll::Ready(task);
1213                }
1214            },
1215            Entry::Vacant(v) => {
1216                v.insert(JoinResult::Waker(cx.waker().clone()));
1217            }
1218        }
1219        Poll::Pending
1220    }
1221
1222    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1223        match self.0.entry(task.id()) {
1224            Entry::Occupied(mut o) => {
1225                let JoinResult::Waker(waker) =
1226                    std::mem::replace(o.get_mut(), JoinResult::Result(task))
1227                else {
1228                    // It can't be JoinResult::Result because this function is the only
1229                    // function that sets that, and `task_did_finish` won't get called
1230                    // twice.
1231                    unreachable!()
1232                };
1233                Some(waker)
1234            }
1235            Entry::Vacant(v) => {
1236                v.insert(JoinResult::Result(task));
1237                None
1238            }
1239        }
1240    }
1241
1242    fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1243        match self.0.remove(&task_id) {
1244            Some(JoinResult::Result(task)) => Some(task),
1245            _ => None,
1246        }
1247    }
1248
1249    fn take(&mut self) -> Box<dyn Any> {
1250        Box::new(Self(std::mem::take(&mut self.0)))
1251    }
1252
1253    #[cfg(test)]
1254    fn is_empty(&self) -> bool {
1255        self.0.is_empty()
1256    }
1257}
1258
1259#[derive(Default)]
1260struct ResultsStream<R> {
1261    inner: Arc<Mutex<ResultsStreamInner<R>>>,
1262}
1263
1264struct ResultsStreamInner<R> {
1265    results: Vec<R>,
1266    waker: Option<Waker>,
1267}
1268
1269impl<R> Default for ResultsStreamInner<R> {
1270    fn default() -> Self {
1271        Self { results: Vec::new(), waker: None }
1272    }
1273}
1274
1275impl<R: Send + 'static> Results for ResultsStream<R> {
1276    fn can_spawn(&self) -> bool {
1277        false
1278    }
1279
1280    fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1281        Poll::Pending
1282    }
1283
1284    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1285        let mut inner = self.inner.lock();
1286        // SAFETY: R is guaranteed to be the same return type as all futures finishing on this
1287        // scope.
1288        inner.results.extend(unsafe { task.take_result() });
1289        inner.waker.take()
1290    }
1291
1292    fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1293        None
1294    }
1295
1296    fn take(&mut self) -> Box<dyn Any> {
1297        Box::new(std::mem::take(&mut self.inner.lock().results))
1298    }
1299
1300    #[cfg(test)]
1301    fn is_empty(&self) -> bool {
1302        false
1303    }
1304}
1305
1306#[cfg(test)]
1307mod tests {
1308    use super::*;
1309    use crate::{EHandle, LocalExecutor, SendExecutor, SpawnableFuture, Task, TestExecutor, Timer};
1310    use assert_matches::assert_matches;
1311    use fuchsia_sync::{Condvar, Mutex};
1312    use futures::channel::mpsc;
1313    use futures::future::join_all;
1314    use futures::{FutureExt, StreamExt};
1315    use std::future::{pending, poll_fn};
1316    use std::pin::{pin, Pin};
1317    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1318    use std::sync::Arc;
1319    use std::task::{Context, Poll};
1320    use std::time::Duration;
1321
1322    #[derive(Default)]
1323    struct RemoteControlFuture(Mutex<RCFState>);
1324    #[derive(Default)]
1325    struct RCFState {
1326        resolved: bool,
1327        waker: Option<Waker>,
1328    }
1329
1330    impl Future for &RemoteControlFuture {
1331        type Output = ();
1332        fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1333            let mut this = self.0.lock();
1334            if this.resolved {
1335                Poll::Ready(())
1336            } else {
1337                this.waker.replace(cx.waker().clone());
1338                Poll::Pending
1339            }
1340        }
1341    }
1342
1343    impl RemoteControlFuture {
1344        fn new() -> Arc<Self> {
1345            Arc::new(Default::default())
1346        }
1347
1348        fn resolve(&self) {
1349            let mut this = self.0.lock();
1350            this.resolved = true;
1351            if let Some(waker) = this.waker.take() {
1352                waker.wake();
1353            }
1354        }
1355
1356        fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1357            let this = Arc::clone(self);
1358            async move { (&*this).await }
1359        }
1360    }
1361
1362    #[test]
1363    fn compute_works_on_root_scope() {
1364        let mut executor = TestExecutor::new();
1365        let scope = executor.global_scope();
1366        let mut task = pin!(scope.compute(async { 1 }));
1367        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1368    }
1369
1370    #[test]
1371    fn compute_works_on_new_child() {
1372        let mut executor = TestExecutor::new();
1373        let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1374        let mut task = pin!(scope.compute(async { 1 }));
1375        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1376    }
1377
1378    #[test]
1379    fn scope_drop_cancels_tasks() {
1380        let mut executor = TestExecutor::new();
1381        let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1382        let mut task = pin!(scope.compute(async { 1 }));
1383        drop(scope);
1384        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1385    }
1386
1387    #[test]
1388    fn tasks_do_not_spawn_on_cancelled_scopes() {
1389        let mut executor = TestExecutor::new();
1390        let scope =
1391            executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1392        let handle = scope.to_handle();
1393        let mut cancel = pin!(scope.cancel());
1394        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1395        let mut task = pin!(handle.compute(async { 1 }));
1396        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1397    }
1398
1399    #[test]
1400    fn tasks_do_not_spawn_on_closed_empty_scopes() {
1401        let mut executor = TestExecutor::new();
1402        let scope =
1403            executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1404        let handle = scope.to_handle();
1405        let mut close = pin!(scope.cancel());
1406        assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1407        let mut task = pin!(handle.compute(async { 1 }));
1408        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1409    }
1410
1411    #[test]
1412    fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1413        let mut executor = TestExecutor::new();
1414        let scope = executor.global_scope().new_child();
1415        let handle = scope.to_handle();
1416        handle.spawn(pending());
1417        let mut close = pin!(scope.close());
1418        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1419        let mut task = pin!(handle.compute(async { 1 }));
1420        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1421    }
1422
1423    #[test]
1424    fn spawn_works_on_child_and_grandchild() {
1425        let mut executor = TestExecutor::new();
1426        let scope = executor.global_scope().new_child();
1427        let child = scope.new_child();
1428        let grandchild = child.new_child();
1429        let mut child_task = pin!(child.compute(async { 1 }));
1430        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1431        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1432        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1433    }
1434
1435    #[test]
1436    fn spawn_drop_cancels_child_and_grandchild_tasks() {
1437        let mut executor = TestExecutor::new();
1438        let scope = executor.global_scope().new_child();
1439        let child = scope.new_child();
1440        let grandchild = child.new_child();
1441        let mut child_task = pin!(child.compute(async { 1 }));
1442        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1443        drop(scope);
1444        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1445        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1446    }
1447
1448    #[test]
1449    fn completed_tasks_are_cleaned_up_after_cancel() {
1450        let mut executor = TestExecutor::new();
1451        let scope = executor.global_scope().new_child();
1452
1453        let task1 = scope.spawn(pending::<()>());
1454        let task2 = scope.spawn(async {});
1455        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1456        assert_eq!(scope.lock().all_tasks().len(), 1);
1457
1458        // Running the executor after cancelling the task isn't currently
1459        // necessary, but we might decide to do async cleanup in the future.
1460        assert_eq!(task1.cancel().now_or_never(), None);
1461        assert_eq!(task2.cancel().now_or_never(), Some(Some(())));
1462
1463        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1464        assert_eq!(scope.lock().all_tasks().len(), 0);
1465        assert!(scope.lock().results.is_empty());
1466    }
1467
1468    #[test]
1469    fn join_emtpy_scope() {
1470        let mut executor = TestExecutor::new();
1471        let scope = executor.global_scope().new_child();
1472        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1473    }
1474
1475    #[test]
1476    fn task_handle_preserves_access_to_result_after_join_begins() {
1477        let mut executor = TestExecutor::new();
1478        let scope = executor.global_scope().new_child();
1479        let mut task = scope.compute(async { 1 });
1480        scope.spawn(async {});
1481        let task2 = scope.spawn(pending::<()>());
1482        // Fuse to stay agnostic as to whether the join completes before or
1483        // after awaiting the task handle.
1484        let mut join = pin!(scope.join().fuse());
1485        let _ = executor.run_until_stalled(&mut join);
1486        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1487        let _ = task2.cancel();
1488        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1489    }
1490
1491    #[test]
1492    fn join_blocks_until_task_is_cancelled() {
1493        // Scope with one outstanding task handle and one cancelled task.
1494        // The scope is not complete until the outstanding task handle is cancelled.
1495        let mut executor = TestExecutor::new();
1496        let scope = executor.global_scope().new_child();
1497        let outstanding_task = scope.spawn(pending::<()>());
1498        let cancelled_task = scope.spawn(pending::<()>());
1499        assert_eq!(
1500            executor.run_until_stalled(&mut pin!(cancelled_task.cancel())),
1501            Poll::Ready(None)
1502        );
1503        let mut join = pin!(scope.join());
1504        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1505        assert_eq!(
1506            executor.run_until_stalled(&mut pin!(outstanding_task.cancel())),
1507            Poll::Ready(None)
1508        );
1509        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1510    }
1511
1512    #[test]
1513    fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1514        let mut executor = TestExecutor::new();
1515        let scope = executor.global_scope().new_child();
1516        // The default is to detach.
1517        scope.spawn(pending::<()>());
1518        let mut join = pin!(scope.join());
1519        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1520        let mut cancel = pin!(join.cancel());
1521        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1522    }
1523
1524    #[test]
1525    fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1526        let mut executor = TestExecutor::new();
1527        let scope = executor.global_scope().new_child();
1528        // The default is to detach.
1529        scope.spawn(pending::<()>());
1530        let mut close = pin!(scope.close());
1531        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1532        let mut cancel = pin!(close.cancel());
1533        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1534    }
1535
1536    #[test]
1537    fn join_scope_blocks_until_spawned_task_completes() {
1538        let mut executor = TestExecutor::new();
1539        let scope = executor.global_scope().new_child();
1540        let remote = RemoteControlFuture::new();
1541        let mut task = scope.spawn(remote.as_future());
1542        let mut scope_join = pin!(scope.join());
1543        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1544        remote.resolve();
1545        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1546        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1547    }
1548
1549    #[test]
1550    fn close_scope_blocks_until_spawned_task_completes() {
1551        let mut executor = TestExecutor::new();
1552        let scope = executor.global_scope().new_child();
1553        let remote = RemoteControlFuture::new();
1554        let mut task = scope.spawn(remote.as_future());
1555        let mut scope_close = pin!(scope.close());
1556        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1557        remote.resolve();
1558        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1559        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1560    }
1561
1562    #[test]
1563    fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1564        let mut executor = TestExecutor::new();
1565        let scope = executor.global_scope().new_child();
1566        let child = scope.new_child();
1567        let remote = RemoteControlFuture::new();
1568        child.spawn(remote.as_future());
1569        let mut scope_join = pin!(scope.join());
1570        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1571        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1572        child.detach();
1573        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1574        remote.resolve();
1575        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1576    }
1577
1578    #[test]
1579    fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1580        let mut executor = TestExecutor::new();
1581        let scope = executor.global_scope().new_child();
1582        let remote = RemoteControlFuture::new();
1583        {
1584            let remote = remote.clone();
1585            scope.spawn(async move {
1586                let child = Scope::new_with_name("child");
1587                child.spawn(async move {
1588                    Scope::current().spawn(remote.as_future());
1589                });
1590                child.detach();
1591            });
1592        }
1593        let mut scope_join = pin!(scope.join());
1594        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1595        remote.resolve();
1596        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1597    }
1598
1599    #[test]
1600    fn join_scope_blocks_when_blocked_child_is_detached() {
1601        let mut executor = TestExecutor::new();
1602        let scope = executor.global_scope().new_child();
1603        let child = scope.new_child();
1604        child.spawn(pending());
1605        let mut scope_join = pin!(scope.join());
1606        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1607        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1608        child.detach();
1609        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1610    }
1611
1612    #[test]
1613    fn join_scope_completes_when_blocked_child_is_cancelled() {
1614        let mut executor = TestExecutor::new();
1615        let scope = executor.global_scope().new_child();
1616        let child = scope.new_child();
1617        child.spawn(pending());
1618        let mut scope_join = pin!(scope.join());
1619        {
1620            let mut child_join = pin!(child.join());
1621            assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1622            assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1623        }
1624        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1625    }
1626
1627    #[test]
1628    fn detached_scope_can_spawn() {
1629        let mut executor = TestExecutor::new();
1630        let scope = executor.global_scope().new_child();
1631        let handle = scope.to_handle();
1632        scope.detach();
1633        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1634    }
1635
1636    #[test]
1637    fn dropped_scope_cannot_spawn() {
1638        let mut executor = TestExecutor::new();
1639        let scope = executor.global_scope().new_child();
1640        let handle = scope.to_handle();
1641        drop(scope);
1642        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1643    }
1644
1645    #[test]
1646    fn dropped_scope_with_running_task_cannot_spawn() {
1647        let mut executor = TestExecutor::new();
1648        let scope = executor.global_scope().new_child();
1649        let handle = scope.to_handle();
1650        let _running_task = handle.spawn(pending::<()>());
1651        drop(scope);
1652        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1653    }
1654
1655    #[test]
1656    fn joined_scope_cannot_spawn() {
1657        let mut executor = TestExecutor::new();
1658        let scope = executor.global_scope().new_child();
1659        let handle = scope.to_handle();
1660        let mut scope_join = pin!(scope.join());
1661        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1662        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1663    }
1664
1665    #[test]
1666    fn joining_scope_with_running_task_can_spawn() {
1667        let mut executor = TestExecutor::new();
1668        let scope = executor.global_scope().new_child();
1669        let handle = scope.to_handle();
1670        let _running_task = handle.spawn(pending::<()>());
1671        let mut scope_join = pin!(scope.join());
1672        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1673        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1674    }
1675
1676    #[test]
1677    fn joined_scope_child_cannot_spawn() {
1678        let mut executor = TestExecutor::new();
1679        let scope = executor.global_scope().new_child();
1680        let handle = scope.to_handle();
1681        let child_before_join = scope.new_child();
1682        assert_eq!(
1683            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1684            Poll::Ready(1)
1685        );
1686        let mut scope_join = pin!(scope.join());
1687        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1688        let child_after_join = handle.new_child();
1689        let grandchild_after_join = child_before_join.new_child();
1690        assert_eq!(
1691            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1692            Poll::Pending
1693        );
1694        assert_eq!(
1695            executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
1696            Poll::Pending
1697        );
1698        assert_eq!(
1699            executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
1700            Poll::Pending
1701        );
1702    }
1703
1704    #[test]
1705    fn closed_scope_child_cannot_spawn() {
1706        let mut executor = TestExecutor::new();
1707        let scope = executor.global_scope().new_child();
1708        let handle = scope.to_handle();
1709        let child_before_close = scope.new_child();
1710        assert_eq!(
1711            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1712            Poll::Ready(1)
1713        );
1714        let mut scope_close = pin!(scope.close());
1715        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1716        let child_after_close = handle.new_child();
1717        let grandchild_after_close = child_before_close.new_child();
1718        assert_eq!(
1719            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1720            Poll::Pending
1721        );
1722        assert_eq!(
1723            executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
1724            Poll::Pending
1725        );
1726        assert_eq!(
1727            executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
1728            Poll::Pending
1729        );
1730    }
1731
1732    #[test]
1733    fn can_join_child_first() {
1734        let mut executor = TestExecutor::new();
1735        let scope = executor.global_scope().new_child();
1736        let child = scope.new_child();
1737        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
1738        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
1739        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1740    }
1741
1742    #[test]
1743    fn can_join_parent_first() {
1744        let mut executor = TestExecutor::new();
1745        let scope = executor.global_scope().new_child();
1746        let child = scope.new_child();
1747        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
1748        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1749        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
1750    }
1751
1752    #[test]
1753    fn task_in_parent_scope_can_join_child() {
1754        let mut executor = TestExecutor::new();
1755        let scope = executor.global_scope().new_child();
1756        let child = scope.new_child();
1757        let remote = RemoteControlFuture::new();
1758        child.spawn(remote.as_future());
1759        scope.spawn(async move { child.join().await });
1760        let mut join = pin!(scope.join());
1761        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1762        remote.resolve();
1763        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1764    }
1765
1766    #[test]
1767    fn join_completes_while_completed_task_handle_is_held() {
1768        let mut executor = TestExecutor::new();
1769        let scope = executor.global_scope().new_child();
1770        let mut task = scope.compute(async { 1 });
1771        scope.spawn(async {});
1772        let mut join = pin!(scope.join());
1773        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1774        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1775    }
1776
1777    #[test]
1778    fn cancel_completes_while_task_holds_handle() {
1779        let mut executor = TestExecutor::new();
1780        let scope = executor.global_scope().new_child();
1781        let handle = scope.to_handle();
1782        let mut task = scope.compute(async move {
1783            loop {
1784                pending::<()>().await; // never returns
1785                handle.spawn(async {});
1786            }
1787        });
1788
1789        // Join should not complete because the task never does.
1790        let mut join = pin!(scope.join());
1791        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1792
1793        let mut cancel = pin!(join.cancel());
1794        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1795        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1796    }
1797
1798    #[test]
1799    fn cancel_from_handle_inside_task() {
1800        let mut executor = TestExecutor::new();
1801        let scope = executor.global_scope().new_child();
1802        {
1803            // Spawn a task that never finishes until the scope is cancelled.
1804            scope.spawn(pending::<()>());
1805
1806            let mut no_tasks = pin!(scope.on_no_tasks());
1807            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
1808
1809            let handle = scope.to_handle();
1810            scope.spawn(async move {
1811                handle.cancel().await;
1812                panic!("cancel() should never complete");
1813            });
1814
1815            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
1816        }
1817        assert_eq!(scope.join().now_or_never(), Some(()));
1818    }
1819
1820    #[test]
1821    fn can_spawn_from_non_executor_thread() {
1822        let mut executor = TestExecutor::new();
1823        let scope = executor.global_scope().clone();
1824        let done = Arc::new(AtomicBool::new(false));
1825        let done_clone = done.clone();
1826        let _ = std::thread::spawn(move || {
1827            scope.spawn(async move {
1828                done_clone.store(true, Ordering::Relaxed);
1829            })
1830        })
1831        .join();
1832        let _ = executor.run_until_stalled(&mut pending::<()>());
1833        assert!(done.load(Ordering::Relaxed));
1834    }
1835
1836    #[test]
1837    fn scope_tree() {
1838        // A
1839        //  \
1840        //   B
1841        //  / \
1842        // C   D
1843        let mut executor = TestExecutor::new();
1844        let a = executor.global_scope().new_child();
1845        let b = a.new_child();
1846        let c = b.new_child();
1847        let d = b.new_child();
1848        let a_remote = RemoteControlFuture::new();
1849        let c_remote = RemoteControlFuture::new();
1850        let d_remote = RemoteControlFuture::new();
1851        a.spawn(a_remote.as_future());
1852        c.spawn(c_remote.as_future());
1853        d.spawn(d_remote.as_future());
1854        let mut a_join = pin!(a.join());
1855        let mut b_join = pin!(b.join());
1856        let mut d_join = pin!(d.join());
1857        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1858        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
1859        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
1860        d_remote.resolve();
1861        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1862        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
1863        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
1864        c_remote.resolve();
1865        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1866        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
1867        a_remote.resolve();
1868        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
1869        let mut c_join = pin!(c.join());
1870        assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
1871    }
1872
1873    #[test]
1874    fn on_no_tasks() {
1875        let mut executor = TestExecutor::new();
1876        let scope = executor.global_scope().new_child();
1877        let _task1 = scope.spawn(std::future::ready(()));
1878        let task2 = scope.spawn(pending::<()>());
1879
1880        let mut on_no_tasks = pin!(scope.on_no_tasks());
1881
1882        assert!(executor.run_until_stalled(&mut on_no_tasks).is_pending());
1883
1884        let _ = task2.cancel();
1885
1886        let on_no_tasks2 = pin!(scope.on_no_tasks());
1887        let on_no_tasks3 = pin!(scope.on_no_tasks());
1888
1889        assert_matches!(
1890            executor.run_until_stalled(&mut join_all([on_no_tasks, on_no_tasks2, on_no_tasks3])),
1891            Poll::Ready(_)
1892        );
1893    }
1894
1895    #[test]
1896    fn wake_all() {
1897        let mut executor = TestExecutor::new();
1898        let scope = executor.global_scope().new_child();
1899
1900        let poll_count = Arc::new(AtomicU64::new(0));
1901
1902        struct PollCounter(Arc<AtomicU64>);
1903
1904        impl Future for PollCounter {
1905            type Output = ();
1906            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1907                self.0.fetch_add(1, Ordering::Relaxed);
1908                Poll::Pending
1909            }
1910        }
1911
1912        scope.spawn(PollCounter(poll_count.clone()));
1913        scope.spawn(PollCounter(poll_count.clone()));
1914
1915        let _ = executor.run_until_stalled(&mut pending::<()>());
1916
1917        let mut start_count = poll_count.load(Ordering::Relaxed);
1918
1919        for _ in 0..2 {
1920            scope.wake_all();
1921            let _ = executor.run_until_stalled(&mut pending::<()>());
1922            assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
1923            start_count += 2;
1924        }
1925    }
1926
1927    #[test]
1928    fn on_no_tasks_race() {
1929        fn sleep_random() {
1930            use rand::Rng;
1931            std::thread::sleep(std::time::Duration::from_micros(
1932                rand::thread_rng().gen_range(0..10),
1933            ));
1934        }
1935        for _ in 0..2000 {
1936            let mut executor = SendExecutor::new(2);
1937            let scope = executor.root_scope().new_child();
1938            scope.spawn(async {
1939                sleep_random();
1940            });
1941            executor.run(async move {
1942                sleep_random();
1943                scope.on_no_tasks().await;
1944            });
1945        }
1946    }
1947
1948    async fn yield_to_executor() {
1949        let mut done = false;
1950        poll_fn(|cx| {
1951            if done {
1952                Poll::Ready(())
1953            } else {
1954                done = true;
1955                cx.waker().wake_by_ref();
1956                Poll::Pending
1957            }
1958        })
1959        .await;
1960    }
1961
1962    #[test]
1963    fn test_detach() {
1964        let mut e = LocalExecutor::new();
1965        e.run_singlethreaded(async {
1966            let counter = Arc::new(AtomicU32::new(0));
1967
1968            {
1969                let counter = counter.clone();
1970                Task::spawn(async move {
1971                    for _ in 0..5 {
1972                        yield_to_executor().await;
1973                        counter.fetch_add(1, Ordering::Relaxed);
1974                    }
1975                })
1976                .detach();
1977            }
1978
1979            while counter.load(Ordering::Relaxed) != 5 {
1980                yield_to_executor().await;
1981            }
1982        });
1983
1984        assert!(e.ehandle.root_scope.lock().results.is_empty());
1985    }
1986
1987    #[test]
1988    fn test_cancel() {
1989        let mut e = LocalExecutor::new();
1990        e.run_singlethreaded(async {
1991            let ref_count = Arc::new(());
1992            // First, just drop the task.
1993            {
1994                let ref_count = ref_count.clone();
1995                let _ = Task::spawn(async move {
1996                    let _ref_count = ref_count;
1997                    let _: () = std::future::pending().await;
1998                });
1999            }
2000
2001            while Arc::strong_count(&ref_count) != 1 {
2002                yield_to_executor().await;
2003            }
2004
2005            // Now try explicitly cancelling.
2006            let task = {
2007                let ref_count = ref_count.clone();
2008                Task::spawn(async move {
2009                    let _ref_count = ref_count;
2010                    let _: () = std::future::pending().await;
2011                })
2012            };
2013
2014            assert_eq!(task.cancel().await, None);
2015            while Arc::strong_count(&ref_count) != 1 {
2016                yield_to_executor().await;
2017            }
2018
2019            // Now cancel a task that has already finished.
2020            let task = {
2021                let ref_count = ref_count.clone();
2022                Task::spawn(async move {
2023                    let _ref_count = ref_count;
2024                })
2025            };
2026
2027            // Wait for it to finish.
2028            while Arc::strong_count(&ref_count) != 1 {
2029                yield_to_executor().await;
2030            }
2031
2032            assert_eq!(task.cancel().await, Some(()));
2033        });
2034
2035        assert!(e.ehandle.root_scope.lock().results.is_empty());
2036    }
2037
2038    #[test]
2039    fn test_cancel_waits() {
2040        let mut executor = SendExecutor::new(2);
2041        let running = Arc::new((Mutex::new(false), Condvar::new()));
2042        let task = {
2043            let running = running.clone();
2044            executor.root_scope().compute(async move {
2045                *running.0.lock() = true;
2046                running.1.notify_all();
2047                std::thread::sleep(std::time::Duration::from_millis(10));
2048                *running.0.lock() = false;
2049                "foo"
2050            })
2051        };
2052        executor.run(async move {
2053            {
2054                let mut guard = running.0.lock();
2055                while !*guard {
2056                    running.1.wait(&mut guard);
2057                }
2058            }
2059            assert_eq!(task.cancel().await, Some("foo"));
2060            assert!(!*running.0.lock());
2061        });
2062    }
2063
2064    fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2065        let mut executor = SendExecutor::new(2);
2066        let running = Arc::new((Mutex::new(false), Condvar::new()));
2067        let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2068        let task = {
2069            let running = running.clone();
2070            let can_quit = can_quit.clone();
2071            executor.root_scope().compute(async move {
2072                *running.0.lock() = true;
2073                running.1.notify_all();
2074                {
2075                    let mut guard = can_quit.0.lock();
2076                    while !*guard {
2077                        can_quit.1.wait(&mut guard);
2078                    }
2079                }
2080                *running.0.lock() = false;
2081            })
2082        };
2083        executor.run(async move {
2084            {
2085                let mut guard = running.0.lock();
2086                while !*guard {
2087                    running.1.wait(&mut guard);
2088                }
2089            }
2090
2091            callback(task);
2092
2093            *can_quit.0.lock() = true;
2094            can_quit.1.notify_all();
2095
2096            let ehandle = EHandle::local();
2097            let scope = ehandle.global_scope();
2098
2099            // The only way of testing for this is to poll.
2100            while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2101                Timer::new(std::time::Duration::from_millis(1)).await;
2102            }
2103
2104            assert!(!*running.0.lock());
2105        });
2106    }
2107
2108    #[test]
2109    fn test_dropped_cancel_cleans_up() {
2110        test_clean_up(|task| {
2111            let cancel_fut = std::pin::pin!(task.cancel());
2112            let waker = futures::task::noop_waker();
2113            assert!(cancel_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2114        });
2115    }
2116
2117    #[test]
2118    fn test_dropped_task_cleans_up() {
2119        test_clean_up(|task| {
2120            std::mem::drop(task);
2121        });
2122    }
2123
2124    #[test]
2125    fn test_detach_cleans_up() {
2126        test_clean_up(|task| {
2127            task.detach();
2128        });
2129    }
2130
2131    #[test]
2132    fn test_scope_stream() {
2133        let mut executor = SendExecutor::new(2);
2134        executor.run(async move {
2135            let (stream, handle) = ScopeStream::new();
2136            handle.push(async { 1 });
2137            handle.push(async { 2 });
2138            stream.close();
2139            let results: HashSet<_> = stream.collect().await;
2140            assert_eq!(results, HashSet::from_iter([1, 2]));
2141        });
2142    }
2143
2144    #[test]
2145    fn test_scope_stream_wakes_properly() {
2146        let mut executor = SendExecutor::new(2);
2147        executor.run(async move {
2148            let (stream, handle) = ScopeStream::new();
2149            handle.push(async {
2150                Timer::new(Duration::from_millis(10)).await;
2151                1
2152            });
2153            handle.push(async {
2154                Timer::new(Duration::from_millis(10)).await;
2155                2
2156            });
2157            stream.close();
2158            let results: HashSet<_> = stream.collect().await;
2159            assert_eq!(results, HashSet::from_iter([1, 2]));
2160        });
2161    }
2162
2163    #[test]
2164    fn test_scope_stream_drops_spawned_tasks() {
2165        let mut executor = SendExecutor::new(2);
2166        executor.run(async move {
2167            let (stream, handle) = ScopeStream::new();
2168            handle.push(async { 1 });
2169            let _task = stream.compute(async { "foo" });
2170            stream.close();
2171            let results: HashSet<_> = stream.collect().await;
2172            assert_eq!(results, HashSet::from_iter([1]));
2173        });
2174    }
2175
2176    #[test]
2177    fn test_nested_scope_stream() {
2178        let mut executor = SendExecutor::new(2);
2179        executor.run(async move {
2180            let (mut stream, handle) = ScopeStream::new();
2181            handle.clone().push(async move {
2182                handle.clone().push(async move {
2183                    handle.clone().push(async move { 3 });
2184                    2
2185                });
2186                1
2187            });
2188            let mut results = HashSet::default();
2189            while let Some(item) = stream.next().await {
2190                results.insert(item);
2191                if results.len() == 3 {
2192                    stream.close();
2193                }
2194            }
2195            assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2196        });
2197    }
2198
2199    #[test]
2200    fn test_dropping_scope_stream_cancels_all_tasks() {
2201        let mut executor = SendExecutor::new(2);
2202        executor.run(async move {
2203            let (stream, handle) = ScopeStream::new();
2204            let (tx1, mut rx) = mpsc::unbounded::<()>();
2205            let tx2 = tx1.clone();
2206            handle.push(async move {
2207                let _tx1 = tx1;
2208                let () = pending().await;
2209            });
2210            handle.push(async move {
2211                let _tx2 = tx2;
2212                let () = pending().await;
2213            });
2214            drop(stream);
2215
2216            // This will wait forever if the tasks aren't cancelled.
2217            assert_eq!(rx.next().await, None);
2218        });
2219    }
2220
2221    #[test]
2222    fn test_scope_stream_collect() {
2223        let mut executor = SendExecutor::new(2);
2224        executor.run(async move {
2225            let stream: ScopeStream<_> = (0..10).into_iter().map(|i| async move { i }).collect();
2226            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2227
2228            let stream: ScopeStream<_> =
2229                (0..10).into_iter().map(|i| SpawnableFuture::new(async move { i })).collect();
2230            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2231        });
2232    }
2233}