fuchsia_async/runtime/fuchsia/executor/
scope.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::condition::{Condition, ConditionGuard, WakerEntry};
9use crate::EHandle;
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{ready, Context, Poll, Waker};
26use std::{fmt, hash};
27
28//
29// # Public API
30//
31
32/// A scope for managing async tasks. This scope is aborted when dropped.
33///
34/// Scopes are how fuchsia-async implements [structured concurrency][sc]. Every
35/// task is spawned on a scope, and runs until either the task completes or the
36/// scope is cancelled or aborted. In addition to owning tasks, scopes may own
37/// child scopes, forming a nested structure.
38///
39/// Scopes are usually joined or cancelled when the owning code is done with
40/// them. This makes it easier to reason about when a background task might
41/// still be running. Note that in multithreaded contexts it is safer to cancel
42/// and await a scope explicitly than to drop it, because the destructor is not
43/// synchronized with other threads that might be running a task.
44///
45/// [`Task::spawn`][crate::Task::spawn] and related APIs spawn on the root scope
46/// of the executor. New code is encouraged to spawn directly on scopes instead,
47/// passing their handles as a way of documenting when a function might spawn
48/// tasks that run in the background and reasoning about their side effects.
49///
50/// ## Scope lifecycle
51///
52/// When a scope is created it is open, meaning it accepts new tasks. Scopes are
53/// closed when one of the following happens:
54///
55/// 1. When [`close()`][Scope::close] is called.
56/// 2. When the scope is aborted or dropped, the scope is closed immediately.
57/// 3. When the scope is cancelled, the scope is closed when all active guards
58///    are dropped.
59/// 4. When the scope is joined and all tasks complete, the scope is closed
60///    before the join future resolves.
61///
62/// When a scope is closed it no longer accepts tasks. Tasks spawned on the
63/// scope are dropped immediately, and their [`Task`][crate::Task] or
64/// [`JoinHandle`][crate::JoinHandle] futures never resolve. This applies
65/// transitively to all child scopes. Closed scopes cannot currently be
66/// reopened.
67///
68/// Scopes can also be detached, in which case they are never closed, and run
69/// until the completion of all tasks.
70///
71/// [sc]: https://en.wikipedia.org/wiki/Structured_concurrency
72#[must_use = "Scopes should be explicitly awaited or cancelled"]
73#[derive(Debug)]
74pub struct Scope {
75    // LINT.IfChange
76    inner: ScopeHandle,
77    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
78}
79
80impl Default for Scope {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl Scope {
87    /// Create a new scope.
88    ///
89    /// The returned scope is a child of the current scope.
90    ///
91    /// # Panics
92    ///
93    /// May panic if not called in the context of an executor (e.g. within a
94    /// call to [`run`][crate::SendExecutor::run]).
95    pub fn new() -> Scope {
96        ScopeHandle::with_current(|handle| handle.new_child())
97    }
98
99    /// Create a new scope with a name.
100    ///
101    /// The returned scope is a child of the current scope.
102    ///
103    /// # Panics
104    ///
105    /// May panic if not called in the context of an executor (e.g. within a
106    /// call to [`run`][crate::SendExecutor::run]).
107    pub fn new_with_name(name: impl Into<String>) -> Scope {
108        ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
109    }
110
111    /// Get the scope of the current task, or the global scope if there is no task
112    /// being polled.
113    ///
114    /// # Panics
115    ///
116    /// May panic if not called in the context of an executor (e.g. within a
117    /// call to [`run`][crate::SendExecutor::run]).
118    pub fn current() -> ScopeHandle {
119        ScopeHandle::with_current(|handle| handle.clone())
120    }
121
122    /// Get the global scope of the executor.
123    ///
124    /// This can be used to spawn tasks that live as long as the executor.
125    /// Usually, this means until the end of the program or test. This should
126    /// only be done for tasks where this is expected. If in doubt, spawn on a
127    /// shorter lived scope instead.
128    ///
129    /// In code that uses scopes, you are strongly encouraged to use this API
130    /// instead of the spawn APIs on [`Task`][crate::Task].
131    ///
132    /// All scopes are descendants of the global scope.
133    ///
134    /// # Panics
135    ///
136    /// May panic if not called in the context of an executor (e.g. within a
137    /// call to [`run`][crate::SendExecutor::run]).
138    pub fn global() -> ScopeHandle {
139        EHandle::local().global_scope().clone()
140    }
141
142    /// Create a child scope.
143    pub fn new_child(&self) -> Scope {
144        self.inner.new_child()
145    }
146
147    /// Create a child scope with a name.
148    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
149        self.inner.new_child_with_name(name.into())
150    }
151
152    /// Returns the name of the scope.
153    pub fn name(&self) -> &str {
154        &self.inner.inner.name
155    }
156
157    /// Create a [`ScopeHandle`] that may be used to spawn tasks on this scope.
158    ///
159    /// This is a shorthand for `scope.as_handle().clone()`.
160    ///
161    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
162    /// available. Note that you should _not_ call `scope.clone()`, even though
163    /// the compiler allows it due to the Deref impl. Call this method instead.
164    pub fn to_handle(&self) -> ScopeHandle {
165        self.inner.clone()
166    }
167
168    /// Get a reference to a [`ScopeHandle`] that may be used to spawn tasks on
169    /// this scope.
170    ///
171    /// Scope holds a `ScopeHandle` and implements Deref to make its methods
172    /// available. If you have a `Scope` but need a `&ScopeHandle`, prefer
173    /// calling this method over the less readable `&*scope`.
174    pub fn as_handle(&self) -> &ScopeHandle {
175        &self.inner
176    }
177
178    /// Wait for all tasks in the scope and its children to complete.
179    ///
180    /// New tasks will be accepted on the scope until every task completes and
181    /// this future resolves.
182    ///
183    /// Note that you can await a scope directly because it implements
184    /// `IntoFuture`. `scope.join().await` is a more explicit form of
185    /// `scope.await`.
186    pub fn join(self) -> Join {
187        Join::new(self)
188    }
189
190    /// Stop accepting new tasks on the scope. Returns a future that waits for
191    /// every task on the scope to complete.
192    pub fn close(self) -> Join {
193        self.inner.close();
194        Join::new(self)
195    }
196
197    /// Cancel all tasks cooperatively in the scope and its children
198    /// recursively.
199    ///
200    /// `cancel` first gives a chance to all child tasks (including tasks of
201    /// child scopes) to shutdown cleanly if they're holding on to a
202    /// [`ScopeActiveGuard`]. Once no child tasks are holding on to guards, then
203    /// `cancel` behaves like [`Scope::abort`], dropping all tasks and stopping
204    /// them from running at the next yield point. A [`ScopeActiveGuard`]
205    /// provides a cooperative cancellation signal that is triggered by this
206    /// call, see its documentation for more details.
207    ///
208    /// Once the returned future resolves, no task on the scope will be polled
209    /// again.
210    ///
211    /// Cancelling a scope _does not_ immediately prevent new tasks from being
212    /// accepted. New tasks are accepted as long as there are
213    /// `ScopeActiveGuard`s for this scope.
214    pub fn cancel(self) -> Join {
215        self.inner.cancel_all_tasks();
216        Join::new(self)
217    }
218
219    /// Cancel all tasks in the scope and its children recursively.
220    ///
221    /// Once the returned future resolves, no task on the scope will be polled
222    /// again. Unlike [`Scope::cancel`], this doesn't send a cooperative
223    /// cancellation signal to tasks or child scopes.
224    ///
225    /// When a scope is aborted it immediately stops accepting tasks. Handles of
226    /// tasks spawned on the scope will pend forever.
227    ///
228    /// Dropping the `Scope` object is equivalent to calling this method and
229    /// discarding the returned future. Awaiting the future is preferred because
230    /// it eliminates the possibility of a task poll completing on another
231    /// thread after the scope object has been dropped, which can sometimes
232    /// result in surprising behavior.
233    pub fn abort(self) -> impl Future<Output = ()> {
234        self.inner.abort_all_tasks();
235        Join::new(self)
236    }
237
238    /// Detach the scope, allowing its tasks to continue running in the
239    /// background.
240    ///
241    /// Tasks of a detached scope are still subject to join and cancel
242    /// operations on parent scopes.
243    pub fn detach(self) {
244        // Use ManuallyDrop to destructure self, because Rust doesn't allow this
245        // for types which implement Drop.
246        let this = ManuallyDrop::new(self);
247        // SAFETY: this.inner is obviously valid, and we don't access `this`
248        // after moving.
249        mem::drop(unsafe { std::ptr::read(&this.inner) });
250    }
251}
252
253/// Abort the scope and all of its tasks. Prefer using the [`Scope::abort`]
254/// or [`Scope::join`] methods.
255impl Drop for Scope {
256    fn drop(&mut self) {
257        // Abort all tasks in the scope. Each task has a strong reference to the ScopeState,
258        // which will be dropped after all the tasks in the scope are dropped.
259
260        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
261        // here, but we cannot do that without either:
262        // - Sync drop support in AtomicFuture, or
263        // - The ability to reparent tasks, which requires atomic_arc or
264        //   acquiring a mutex during polling.
265        self.inner.abort_all_tasks();
266    }
267}
268
269impl IntoFuture for Scope {
270    type Output = ();
271    type IntoFuture = Join;
272    fn into_future(self) -> Self::IntoFuture {
273        self.join()
274    }
275}
276
277impl Deref for Scope {
278    type Target = ScopeHandle;
279    fn deref(&self) -> &Self::Target {
280        &self.inner
281    }
282}
283
284impl Borrow<ScopeHandle> for Scope {
285    fn borrow(&self) -> &ScopeHandle {
286        self
287    }
288}
289
290pin_project! {
291    /// Join operation for a [`Scope`].
292    ///
293    /// This is a future that resolves when all tasks on the scope are complete
294    /// or have been cancelled. New tasks will be accepted on the scope until
295    /// every task completes and this future resolves.
296    ///
297    /// When this object is dropped, the scope and all tasks in it are
298    /// cancelled.
299    //
300    // Note: The drop property is only true when S = Scope; it does not apply to
301    // other (non-public) uses of this struct where S = ScopeHandle.
302    pub struct Join<S = Scope> {
303        scope: S,
304        #[pin]
305        waker_entry: WakerEntry<ScopeState>,
306    }
307}
308
309impl<S> Join<S> {
310    fn new(scope: S) -> Self {
311        Self { scope, waker_entry: WakerEntry::new() }
312    }
313}
314
315impl<S: Borrow<ScopeHandle>> Join<S> {
316    /// Aborts the scope. The future will resolve when all tasks have finished
317    /// polling.
318    ///
319    /// See [`Scope::abort`] for more details.
320    pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
321        self.scope.borrow().abort_all_tasks();
322        self
323    }
324
325    /// Cancel the scope. The future will resolve when all tasks have finished
326    /// polling.
327    ///
328    /// See [`Scope::cancel`] for more details.
329    pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
330        self.scope.borrow().cancel_all_tasks();
331        self
332    }
333}
334
335impl<S> Future for Join<S>
336where
337    S: Borrow<ScopeHandle>,
338{
339    type Output = ();
340    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341        let this = self.project();
342        let mut state = Borrow::borrow(&*this.scope).lock();
343        if state.has_tasks() {
344            state.add_waker(this.waker_entry, cx.waker().clone());
345            Poll::Pending
346        } else {
347            state.mark_finished();
348            Poll::Ready(())
349        }
350    }
351}
352
353/// Trait for things that can be spawned on to a scope.  There is a blanket implementation
354/// below for futures.
355pub trait Spawnable {
356    /// The type of value produced on completion.
357    type Output;
358
359    /// Converts to a task that can be spawned directly.
360    fn into_task(self, scope: ScopeHandle) -> TaskHandle;
361}
362
363impl<F: Future + Send + 'static> Spawnable for F
364where
365    F::Output: Send + 'static,
366{
367    type Output = F::Output;
368
369    fn into_task(self, scope: ScopeHandle) -> TaskHandle {
370        scope.new_task(None, self)
371    }
372}
373
374/// A handle to a scope, which may be used to spawn tasks.
375///
376/// ## Ownership and cycles
377///
378/// Tasks running on a `Scope` may hold a `ScopeHandle` to that scope. This does
379/// not create an ownership cycle because the task will drop the `ScopeHandle`
380/// once it completes or is cancelled.
381///
382/// Naturally, scopes containing tasks that never complete and that are never
383/// cancelled will never be freed. Holding a `ScopeHandle` does not contribute to
384/// this problem.
385#[derive(Clone)]
386pub struct ScopeHandle {
387    // LINT.IfChange
388    inner: Arc<ScopeInner>,
389    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
390}
391
392impl ScopeHandle {
393    /// Create a child scope.
394    pub fn new_child(&self) -> Scope {
395        self.new_child_inner(String::new())
396    }
397
398    /// Create a child scope.
399    pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
400        self.new_child_inner(name.into())
401    }
402
403    fn new_child_inner(&self, name: String) -> Scope {
404        let mut state = self.lock();
405        let child = ScopeHandle {
406            inner: Arc::new(ScopeInner {
407                executor: self.inner.executor.clone(),
408                state: Condition::new(ScopeState::new_child(
409                    self.clone(),
410                    &state,
411                    JoinResults::default().into(),
412                )),
413                name,
414            }),
415        };
416        let weak = child.downgrade();
417        state.insert_child(weak);
418        Scope { inner: child }
419    }
420
421    /// Spawn a new task on the scope.
422    // This does not have the must_use attribute because it's common to detach and the lifetime of
423    // the task is bound to the scope: when the scope is dropped, the task will be cancelled.
424    pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
425        let task = future.into_task(self.clone());
426        let task_id = task.id();
427        self.insert_task(task, false);
428        JoinHandle::new(self.clone(), task_id)
429    }
430
431    /// Spawn a new task on the scope of a thread local executor.
432    ///
433    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
434    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
435    pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
436        let task = self.new_local_task(None, future);
437        let id = task.id();
438        self.insert_task(task, false);
439        JoinHandle::new(self.clone(), id)
440    }
441
442    /// Like `spawn`, but for tasks that return a result.
443    ///
444    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
445    /// *cancelled*.
446    pub fn compute<T: Send + 'static>(
447        &self,
448        future: impl Spawnable<Output = T> + Send + 'static,
449    ) -> crate::Task<T> {
450        let task = future.into_task(self.clone());
451        let id = task.id();
452        self.insert_task(task, false);
453        JoinHandle::new(self.clone(), id).into()
454    }
455
456    /// Like `spawn`, but for tasks that return a result.
457    ///
458    /// NOTE: Unlike `spawn`, when tasks are dropped, the future will be
459    /// *cancelled*.
460    ///
461    /// NOTE: This is not supported with a [`SendExecutor`][crate::SendExecutor]
462    /// and will cause a runtime panic. Use [`ScopeHandle::spawn`] instead.
463    pub fn compute_local<T: 'static>(
464        &self,
465        future: impl Future<Output = T> + 'static,
466    ) -> crate::Task<T> {
467        let task = self.new_local_task(None, future);
468        let id = task.id();
469        self.insert_task(task, false);
470        JoinHandle::new(self.clone(), id).into()
471    }
472
473    pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
474        ScopeHandle {
475            inner: Arc::new(ScopeInner {
476                executor,
477                state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
478                name: "root".to_string(),
479            }),
480        }
481    }
482
483    /// Stop the scope from accepting new tasks.
484    ///
485    /// Note that unlike [`Scope::close`], this does not return a future that
486    /// waits for all tasks to complete. This could lead to resource leaks
487    /// because it is not uncommon to access a TaskGroup from a task running on
488    /// the scope itself. If such a task were to await a future returned by this
489    /// method it would suspend forever waiting for itself to complete.
490    pub fn close(&self) {
491        self.lock().close();
492    }
493
494    /// Cancel all the scope's tasks.
495    ///
496    /// Note that if this is called from within a task running on the scope, the
497    /// task will not resume from the next await point.
498    pub fn cancel(self) -> Join<Self> {
499        self.cancel_all_tasks();
500        Join::new(self)
501    }
502
503    /// Aborts all the scope's tasks.
504    ///
505    /// Note that if this is called from within a task running on the scope, the
506    /// task will not resume from the next await point.
507    pub fn abort(self) -> impl Future<Output = ()> {
508        self.abort_all_tasks();
509        Join::new(self)
510    }
511
512    /// Retrieves a [`ScopeActiveGuard`] for this scope.
513    ///
514    /// Note that this may fail if cancellation has already started for this
515    /// scope. In that case, the caller must assume any tasks from this scope
516    /// may be dropped at any yield point.
517    ///
518    /// Creating a [`ScopeActiveGuard`] is substantially more expensive than
519    /// just polling it, so callers should maintain the returned guard when
520    /// success is observed from this call for best performance.
521    ///
522    /// See [`Scope::cancel`] for details on cooperative cancellation behavior.
523    #[must_use]
524    pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
525        ScopeActiveGuard::new(self)
526    }
527
528    /// Returns true if the scope has been signaled to exit via
529    /// [`Scope::cancel`] or [`Scope::abort`].
530    pub fn is_cancelled(&self) -> bool {
531        self.lock().status().is_cancelled()
532    }
533
534    // Joining the scope could be allowed from a ScopeHandle, but the use case
535    // seems less common and more bug prone than cancelling. We don't allow this
536    // for the same reason we don't return a future from close().
537
538    /// Wait for there to be no tasks. This is racy: as soon as this returns it is possible for
539    /// another task to have been spawned on this scope.
540    pub async fn on_no_tasks(&self) {
541        self.inner
542            .state
543            .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
544            .await;
545    }
546
547    /// Wake all the scope's tasks so their futures will be polled again.
548    pub fn wake_all(&self) {
549        self.lock().wake_all();
550    }
551
552    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
553    /// That must be done separately.
554    pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
555        &self,
556        id: Option<usize>,
557        fut: Fut,
558    ) -> AtomicFutureHandle<'a>
559    where
560        Fut::Output: Send,
561    {
562        AtomicFutureHandle::new(
563            Some(self.clone()),
564            id.unwrap_or_else(|| self.executor().next_task_id()),
565            fut,
566        )
567    }
568
569    /// Creates a new task associated with this scope.  This does not spawn it on the executor.
570    /// That must be done separately.
571    pub(crate) fn new_local_task<'a>(
572        &self,
573        id: Option<usize>,
574        fut: impl Future + 'a,
575    ) -> AtomicFutureHandle<'a> {
576        // Check that the executor is local.
577        if !self.executor().is_local() {
578            panic!(
579                "Error: called `new_local_task` on multithreaded executor. \
580                 Use `spawn` or a `LocalExecutor` instead."
581            );
582        }
583
584        // SAFETY: We've confirmed that the futures here will never be used across multiple threads,
585        // so the Send requirements that `new_local` requires should be met.
586        unsafe {
587            AtomicFutureHandle::new_local(
588                Some(self.clone()),
589                id.unwrap_or_else(|| self.executor().next_task_id()),
590                fut,
591            )
592        }
593    }
594}
595
596impl fmt::Debug for ScopeHandle {
597    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598        f.debug_struct("Scope").field("name", &self.inner.name).finish()
599    }
600}
601
602/// Similar to a scope but all futures spawned on the scope *must* finish with the same result type.
603/// That allows the scope to return a stream of results. Attempting to spawn tasks using
604/// `ScopeHandle::spawn` (or similar) will result in tasks that are immediately dropped (just as if
605/// the scope was closed).  Like a regular scope, the scope can be closed, at which point the stream
606/// will terminate once all the tasks have finished.  This is designed to be a fairly close drop-in
607/// replacement to `FuturesUnordered`, the principle difference being that the tasks run in parallel
608/// rather than just concurrently.  Another difference is that the futures don't need to be the same
609/// type; only the outputs do.  In all other respects, the scope operates like a regular scope i.e.
610/// it can have children, you can join them, cancel them, etc.
611pub struct ScopeStream<R> {
612    inner: ScopeHandle,
613    stream: Arc<Mutex<ResultsStreamInner<R>>>,
614}
615
616impl<R: Send + 'static> ScopeStream<R> {
617    /// Creates a new scope stream.
618    ///
619    /// The returned scope stream is a child of the current scope.
620    ///
621    /// # Panics
622    ///
623    /// May panic if not called in the context of an executor (e.g. within a
624    /// call to [`run`][crate::SendExecutor::run]).
625    pub fn new() -> (Self, ScopeStreamHandle<R>) {
626        Self::new_inner(String::new())
627    }
628
629    /// Creates a new scope stream with a name.
630    ///
631    /// The returned scope stream is a child of the current scope.
632    ///
633    /// # Panics
634    ///
635    /// May panic if not called in the context of an executor (e.g. within a
636    /// call to [`run`][crate::SendExecutor::run]).
637    pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
638        Self::new_inner(name.into())
639    }
640
641    fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
642        let this = ScopeHandle::with_current(|handle| {
643            let mut state = handle.lock();
644            let stream = Arc::default();
645            let child = ScopeHandle {
646                inner: Arc::new(ScopeInner {
647                    executor: handle.executor().clone(),
648                    state: Condition::new(ScopeState::new_child(
649                        handle.clone(),
650                        &state,
651                        Box::new(ResultsStream { inner: Arc::clone(&stream) }),
652                    )),
653                    name,
654                }),
655            };
656            let weak = child.downgrade();
657            state.insert_child(weak);
658            ScopeStream { inner: child, stream }
659        });
660        let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
661        (this, handle)
662    }
663}
664
665impl<R> Drop for ScopeStream<R> {
666    fn drop(&mut self) {
667        // Cancel all tasks in the scope. Each task has a strong reference to the ScopeState,
668        // which will be dropped after all the tasks in the scope are dropped.
669
670        // TODO(https://fxbug.dev/340638625): Ideally we would drop all tasks
671        // here, but we cannot do that without either:
672        // - Sync drop support in AtomicFuture, or
673        // - The ability to reparent tasks, which requires atomic_arc or
674        //   acquiring a mutex during polling.
675        self.inner.abort_all_tasks();
676    }
677}
678
679impl<R: Send + 'static> Stream for ScopeStream<R> {
680    type Item = R;
681
682    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
683        let mut stream_inner = self.stream.lock();
684        match stream_inner.results.pop() {
685            Some(result) => Poll::Ready(Some(result)),
686            None => {
687                // Lock ordering: when results are posted, the state lock is taken first, so we must
688                // do the same.
689                drop(stream_inner);
690                let state = self.inner.lock();
691                let mut stream_inner = self.stream.lock();
692                match stream_inner.results.pop() {
693                    Some(result) => Poll::Ready(Some(result)),
694                    None => {
695                        if state.has_tasks() {
696                            stream_inner.waker = Some(cx.waker().clone());
697                            Poll::Pending
698                        } else {
699                            Poll::Ready(None)
700                        }
701                    }
702                }
703            }
704        }
705    }
706}
707
708impl<R> Deref for ScopeStream<R> {
709    type Target = ScopeHandle;
710    fn deref(&self) -> &Self::Target {
711        &self.inner
712    }
713}
714
715impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
716    fn borrow(&self) -> &ScopeHandle {
717        self
718    }
719}
720
721impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
722    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
723        let (stream, handle) = ScopeStream::new();
724        for fut in iter {
725            handle.push(fut);
726        }
727        stream.close();
728        stream
729    }
730}
731
732#[derive(Clone)]
733pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
734
735impl<R: Send> ScopeStreamHandle<R> {
736    pub fn push(&self, future: impl Spawnable<Output = R>) {
737        self.0.insert_task(future.into_task(self.0.clone()), true);
738    }
739}
740
741/// Holds a guard on the creating scope, holding off cancelation.
742///
743/// `ScopeActiveGuard` allows [`Scope`]s to perform cooperative cancellation.
744/// [`ScopeActiveGuard::on_cancel`] returns a future that resolves when
745/// [`Scope::cancel`] and [`ScopeHandle::cancel`] are called. That is the signal
746/// sent to cooperative tasks to stop doing work and finish.
747///
748/// A `ScopeActiveGuard` is obtained via [`ScopeHandle::active_guard`].
749/// `ScopeActiveGuard` releases the guard on the originating scope on drop.
750#[derive(Debug)]
751#[must_use]
752pub struct ScopeActiveGuard(ScopeHandle);
753
754impl Deref for ScopeActiveGuard {
755    type Target = ScopeHandle;
756    fn deref(&self) -> &Self::Target {
757        &self.0
758    }
759}
760
761impl Drop for ScopeActiveGuard {
762    fn drop(&mut self) {
763        let Self(scope) = self;
764        scope.release_cancel_guard();
765    }
766}
767
768impl Clone for ScopeActiveGuard {
769    fn clone(&self) -> Self {
770        self.0.lock().acquire_cancel_guard();
771        Self(self.0.clone())
772    }
773}
774
775impl ScopeActiveGuard {
776    /// Returns a borrow of the scope handle associated with this guard.
777    pub fn as_handle(&self) -> &ScopeHandle {
778        &self.0
779    }
780
781    /// Returns a clone of the scope handle associated with this guard.
782    pub fn to_handle(&self) -> ScopeHandle {
783        self.0.clone()
784    }
785
786    /// Retrieves a future from this guard that can be polled on for
787    /// cancellation.
788    ///
789    /// The returned future resolves when the scope is cancelled. Callers should
790    /// perform teardown and drop the guard when done.
791    pub async fn on_cancel(&self) {
792        self.0
793            .inner
794            .state
795            .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
796            .await
797    }
798
799    fn new(scope: &ScopeHandle) -> Option<Self> {
800        if scope.lock().acquire_cancel_guard_if_not_finished() {
801            Some(Self(scope.clone()))
802        } else {
803            None
804        }
805    }
806}
807
808//
809// # Internal API
810//
811
812/// A weak reference to a scope.
813#[derive(Clone)]
814struct WeakScopeHandle {
815    inner: Weak<ScopeInner>,
816}
817
818impl WeakScopeHandle {
819    /// Upgrades to a [`ScopeHandle`] if the scope still exists.
820    pub fn upgrade(&self) -> Option<ScopeHandle> {
821        self.inner.upgrade().map(|inner| ScopeHandle { inner })
822    }
823}
824
825impl hash::Hash for WeakScopeHandle {
826    fn hash<H: hash::Hasher>(&self, state: &mut H) {
827        Weak::as_ptr(&self.inner).hash(state);
828    }
829}
830
831impl PartialEq for WeakScopeHandle {
832    fn eq(&self, other: &Self) -> bool {
833        Weak::ptr_eq(&self.inner, &other.inner)
834    }
835}
836
837impl Eq for WeakScopeHandle {
838    // Weak::ptr_eq should return consistent results, even when the inner value
839    // has been dropped.
840}
841
842// This module exists as a privacy boundary so that we can make sure any
843// operation that might cause the scope to finish also wakes its waker.
844mod state {
845    use super::*;
846
847    pub struct ScopeState {
848        pub parent: Option<ScopeHandle>,
849        // LINT.IfChange
850        children: HashSet<WeakScopeHandle>,
851        all_tasks: HashSet<TaskHandle>,
852        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
853        /// The number of children that transitively contain tasks, plus one for
854        /// this scope if it directly contains tasks.
855        subscopes_with_tasks: u32,
856        can_spawn: bool,
857        guards: u32,
858        status: Status,
859        /// Wakers/results for joining each task.
860        pub results: Box<dyn Results>,
861    }
862
863    pub enum JoinResult {
864        Waker(Waker),
865        Result(TaskHandle),
866    }
867
868    #[repr(u8)] // So zxdb can read the status.
869    #[derive(Default, Debug, Clone, Copy)]
870    pub enum Status {
871        #[default]
872        /// The scope is active.
873        Active,
874        /// The scope has been signalled to cancel and is waiting for all guards
875        /// to be released.
876        PendingCancellation,
877        /// The scope is not accepting new tasks and all tasks have been
878        /// scheduled to be dropped.
879        Finished,
880    }
881
882    impl Status {
883        /// Returns whether this records a cancelled state.
884        pub fn is_cancelled(&self) -> bool {
885            match self {
886                Self::Active => false,
887                Self::PendingCancellation | Self::Finished => true,
888            }
889        }
890    }
891
892    impl ScopeState {
893        pub fn new_root(results: Box<impl Results>) -> Self {
894            Self {
895                parent: None,
896                children: Default::default(),
897                all_tasks: Default::default(),
898                subscopes_with_tasks: 0,
899                can_spawn: true,
900                guards: 0,
901                status: Default::default(),
902                results,
903            }
904        }
905
906        pub fn new_child(
907            parent_handle: ScopeHandle,
908            parent_state: &Self,
909            results: Box<impl Results>,
910        ) -> Self {
911            let (status, can_spawn) = match parent_state.status {
912                Status::Active => (Status::Active, parent_state.can_spawn),
913                Status::Finished | Status::PendingCancellation => (Status::Finished, false),
914            };
915            Self {
916                parent: Some(parent_handle),
917                children: Default::default(),
918                all_tasks: Default::default(),
919                subscopes_with_tasks: 0,
920                can_spawn,
921                guards: 0,
922                status,
923                results,
924            }
925        }
926    }
927
928    impl ScopeState {
929        pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
930            &self.all_tasks
931        }
932
933        /// Attempts to add a task to the scope. Returns the task if the scope cannot accept a task
934        /// (since it isn't safe to drop the task whilst the lock is held).
935        pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
936            if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
937                return Some(task);
938            }
939            if self.all_tasks.is_empty() && !self.register_first_task() {
940                return Some(task);
941            }
942            task.wake();
943            assert!(self.all_tasks.insert(task));
944            None
945        }
946
947        pub fn children(&self) -> &HashSet<WeakScopeHandle> {
948            &self.children
949        }
950
951        pub fn insert_child(&mut self, child: WeakScopeHandle) {
952            self.children.insert(child);
953        }
954
955        pub fn remove_child(&mut self, child: &PtrKey) {
956            let found = self.children.remove(child);
957            // This should always succeed unless the scope is being dropped
958            // (in which case children will be empty).
959            assert!(found || self.children.is_empty());
960        }
961
962        pub fn status(&self) -> Status {
963            self.status
964        }
965
966        pub fn guards(&self) -> u32 {
967            self.guards
968        }
969
970        pub fn close(&mut self) {
971            self.can_spawn = false;
972        }
973
974        pub fn mark_finished(&mut self) {
975            self.can_spawn = false;
976            self.status = Status::Finished;
977        }
978
979        pub fn has_tasks(&self) -> bool {
980            self.subscopes_with_tasks > 0
981        }
982
983        pub fn wake_all(&self) {
984            for task in &self.all_tasks {
985                task.wake();
986            }
987        }
988
989        pub fn abort_tasks_and_mark_finished(&mut self) {
990            for task in self.all_tasks() {
991                if task.abort() {
992                    task.scope().executor().ready_tasks.push(task.clone());
993                }
994                // Don't bother dropping tasks that are finished; the entire
995                // scope is going to be dropped soon anyway.
996            }
997            self.mark_finished();
998        }
999
1000        pub fn wake_wakers_and_mark_pending(
1001            this: &mut ConditionGuard<'_, ScopeState>,
1002            wakers: &mut Vec<Waker>,
1003        ) {
1004            wakers.extend(this.drain_wakers());
1005            this.status = Status::PendingCancellation;
1006        }
1007
1008        /// Registers our first task with the parent scope.
1009        ///
1010        /// Returns false if the scope is not allowed to accept a task.
1011        #[must_use]
1012        fn register_first_task(&mut self) -> bool {
1013            if !self.can_spawn {
1014                return false;
1015            }
1016            let can_spawn = match &self.parent {
1017                Some(parent) => {
1018                    // If our parent already knows we have tasks, we can always
1019                    // spawn. Otherwise, we have to recurse.
1020                    self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1021                }
1022                None => true,
1023            };
1024            if can_spawn {
1025                self.subscopes_with_tasks += 1;
1026                debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1027            };
1028            can_spawn
1029        }
1030
1031        fn on_last_task_removed(
1032            this: &mut ConditionGuard<'_, ScopeState>,
1033            num_wakers_hint: usize,
1034            wakers: &mut Vec<Waker>,
1035        ) {
1036            debug_assert!(this.subscopes_with_tasks > 0);
1037            this.subscopes_with_tasks -= 1;
1038            if this.subscopes_with_tasks > 0 {
1039                wakers.reserve(num_wakers_hint);
1040                return;
1041            }
1042
1043            match &this.parent {
1044                Some(parent) => {
1045                    Self::on_last_task_removed(
1046                        &mut parent.lock(),
1047                        num_wakers_hint + this.waker_count(),
1048                        wakers,
1049                    );
1050                }
1051                None => wakers.reserve(num_wakers_hint),
1052            };
1053            wakers.extend(this.drain_wakers());
1054        }
1055
1056        /// Acquires a cancel guard IFF we're not in the finished state.
1057        ///
1058        /// Returns `true` if a guard was acquired.
1059        pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1060            match self.status {
1061                Status::Active | Status::PendingCancellation => {
1062                    self.acquire_cancel_guard();
1063                    true
1064                }
1065                Status::Finished => false,
1066            }
1067        }
1068
1069        pub fn acquire_cancel_guard(&mut self) {
1070            if self.guards == 0 {
1071                if let Some(parent) = self.parent.as_ref() {
1072                    parent.acquire_cancel_guard();
1073                }
1074            }
1075            self.guards += 1;
1076        }
1077
1078        pub fn release_cancel_guard(&mut self) {
1079            self.guards = self.guards.checked_sub(1).expect("released non-acquired guard");
1080            if self.guards == 0 {
1081                self.on_zero_guards();
1082            }
1083        }
1084
1085        fn on_zero_guards(&mut self) {
1086            match self.status {
1087                Status::Active => {}
1088                Status::PendingCancellation => {
1089                    self.abort_tasks_and_mark_finished();
1090                }
1091                // Acquiring and releasing guards post finished state is a
1092                // no-op.
1093                Status::Finished => {}
1094            }
1095            if let Some(parent) = &self.parent {
1096                parent.release_cancel_guard();
1097            }
1098        }
1099    }
1100
1101    #[derive(Default)]
1102    struct WakeVec(Vec<Waker>);
1103
1104    impl Drop for WakeVec {
1105        fn drop(&mut self) {
1106            for waker in self.0.drain(..) {
1107                waker.wake();
1108            }
1109        }
1110    }
1111
1112    // WakeVec *must* come after the guard because we want the guard to be dropped first.
1113    pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1114
1115    impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1116        fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1117            Self(value, WakeVec::default())
1118        }
1119    }
1120
1121    impl ScopeWaker<'_> {
1122        pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
1123            let task = self.all_tasks.take(&id);
1124            if task.is_some() {
1125                self.on_task_removed(0);
1126            }
1127            task
1128        }
1129
1130        pub fn task_did_finish(&mut self, id: usize) {
1131            if let Some(task) = self.all_tasks.take(&id) {
1132                self.on_task_removed(1);
1133                if !task.is_detached() {
1134                    let maybe_waker = self.results.task_did_finish(task);
1135                    self.1 .0.extend(maybe_waker);
1136                }
1137            }
1138        }
1139
1140        pub fn set_closed_and_drain(
1141            &mut self,
1142        ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1143            self.close();
1144            let all_tasks = std::mem::take(&mut self.all_tasks);
1145            let results = self.results.take();
1146            if !all_tasks.is_empty() {
1147                self.on_task_removed(0)
1148            }
1149            let children = self.children.drain();
1150            (all_tasks, results, children)
1151        }
1152
1153        fn on_task_removed(&mut self, num_wakers_hint: usize) {
1154            if self.all_tasks.is_empty() {
1155                ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1 .0)
1156            }
1157        }
1158
1159        pub fn wake_wakers_and_mark_pending(&mut self) {
1160            let Self(state, wakers) = self;
1161            ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1162        }
1163    }
1164
1165    impl<'a> Deref for ScopeWaker<'a> {
1166        type Target = ConditionGuard<'a, ScopeState>;
1167
1168        fn deref(&self) -> &Self::Target {
1169            &self.0
1170        }
1171    }
1172
1173    impl DerefMut for ScopeWaker<'_> {
1174        fn deref_mut(&mut self) -> &mut Self::Target {
1175            &mut self.0
1176        }
1177    }
1178}
1179
1180struct ScopeInner {
1181    executor: Arc<Executor>,
1182    state: Condition<ScopeState>,
1183    name: String,
1184}
1185
1186impl Drop for ScopeInner {
1187    fn drop(&mut self) {
1188        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1189        // This also complies with the correctness requirements of
1190        // HashSet::remove because the implementations of Hash and Eq match
1191        // between PtrKey and WeakScopeHandle.
1192        let key = unsafe { &*(self as *const _ as *const PtrKey) };
1193        let state = self.state.lock();
1194        if let Some(parent) = &state.parent {
1195            let mut parent_state = parent.lock();
1196            if state.guards() != 0 {
1197                parent_state.release_cancel_guard();
1198            }
1199            parent_state.remove_child(key);
1200        }
1201    }
1202}
1203
1204impl ScopeHandle {
1205    fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1206        super::common::TaskHandle::with_current(|task| match task {
1207            Some(task) => f(task.scope()),
1208            None => f(EHandle::local().global_scope()),
1209        })
1210    }
1211
1212    fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1213        self.inner.state.lock()
1214    }
1215
1216    fn downgrade(&self) -> WeakScopeHandle {
1217        WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1218    }
1219
1220    #[inline(always)]
1221    pub(crate) fn executor(&self) -> &Arc<Executor> {
1222        &self.inner.executor
1223    }
1224
1225    /// Marks the task as detached.
1226    pub(crate) fn detach(&self, task_id: usize) {
1227        let _maybe_task = {
1228            let mut state = self.lock();
1229            if let Some(task) = state.all_tasks().get(&task_id) {
1230                task.detach();
1231            }
1232            state.results.detach(task_id)
1233        };
1234    }
1235
1236    /// Aborts the task.
1237    ///
1238    /// # Safety
1239    ///
1240    /// The caller must guarantee that `R` is the correct type.
1241    pub(crate) unsafe fn abort_task<R>(&self, task_id: usize) -> Option<R> {
1242        let mut state = self.lock();
1243        if let Some(task) = state.results.detach(task_id) {
1244            drop(state);
1245            return task.take_result();
1246        }
1247        state.all_tasks().get(&task_id).and_then(|task| {
1248            if task.abort() {
1249                self.inner.executor.ready_tasks.push(task.clone());
1250            }
1251            task.take_result()
1252        })
1253    }
1254
1255    /// Aborts and detaches the task.
1256    pub(crate) fn abort_and_detach(&self, task_id: usize) {
1257        let _tasks = {
1258            let mut state = ScopeWaker::from(self.lock());
1259            let maybe_task1 = state.results.detach(task_id);
1260            let mut maybe_task2 = None;
1261            if let Some(task) = state.all_tasks().get(&task_id) {
1262                match task.abort_and_detach() {
1263                    AbortAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1264                    AbortAndDetachResult::AddToRunQueue => {
1265                        self.inner.executor.ready_tasks.push(task.clone());
1266                    }
1267                    AbortAndDetachResult::Pending => {}
1268                }
1269            }
1270            (maybe_task1, maybe_task2)
1271        };
1272    }
1273
1274    /// Polls for a join result for the given task ID.
1275    ///
1276    /// # Safety
1277    ///
1278    /// The caller must guarantee that `R` is the correct type.
1279    pub(crate) unsafe fn poll_join_result<R>(
1280        &self,
1281        task_id: usize,
1282        cx: &mut Context<'_>,
1283    ) -> Poll<R> {
1284        let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1285        match task.take_result() {
1286            Some(result) => Poll::Ready(result),
1287            None => {
1288                // The task has been aborted so all we can do is forever return pending.
1289                Poll::Pending
1290            }
1291        }
1292    }
1293
1294    /// Polls for the task to be aborted.
1295    pub(crate) unsafe fn poll_aborted<R>(
1296        &self,
1297        task_id: usize,
1298        cx: &mut Context<'_>,
1299    ) -> Poll<Option<R>> {
1300        let task = self.lock().results.poll_join_result(task_id, cx);
1301        task.map(|task| task.take_result())
1302    }
1303
1304    pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1305        let returned_task = self.lock().insert_task(task, for_stream);
1306        returned_task.is_none()
1307    }
1308
1309    /// Drops the specified task.
1310    ///
1311    /// The main task by the single-threaded executor might not be 'static, so we use this to drop
1312    /// the task and make sure we meet lifetime guarantees.  Note that removing the task from our
1313    /// task list isn't sufficient; we must make sure the future running in the task is dropped.
1314    ///
1315    /// # Safety
1316    ///
1317    /// This is unsafe because of the call to `drop_future_unchecked` which requires that no
1318    /// thread is currently polling the task.
1319    pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1320        let mut state = ScopeWaker::from(self.lock());
1321        let task = state.take_task(task_id);
1322        if let Some(task) = task {
1323            task.drop_future_unchecked();
1324        }
1325    }
1326
1327    pub(super) fn task_did_finish(&self, id: usize) {
1328        let mut state = ScopeWaker::from(self.lock());
1329        state.task_did_finish(id);
1330    }
1331
1332    /// Visits scopes by state. If the callback returns `true`, children will
1333    /// be visited.
1334    fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1335        let mut scopes = vec![self.clone()];
1336        while let Some(scope) = scopes.pop() {
1337            let mut scope_waker = ScopeWaker::from(scope.lock());
1338            if callback(&mut scope_waker) {
1339                scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1340            }
1341        }
1342    }
1343
1344    fn acquire_cancel_guard(&self) {
1345        self.lock().acquire_cancel_guard()
1346    }
1347
1348    fn release_cancel_guard(&self) {
1349        self.lock().release_cancel_guard()
1350    }
1351
1352    /// Cancels tasks in this scope and all child scopes.
1353    fn cancel_all_tasks(&self) {
1354        self.visit_scopes_locked(|state| {
1355            match state.status() {
1356                Status::Active => {
1357                    if state.guards() == 0 {
1358                        state.abort_tasks_and_mark_finished();
1359                    } else {
1360                        state.wake_wakers_and_mark_pending();
1361                    }
1362                    true
1363                }
1364                Status::PendingCancellation => {
1365                    // If we're already pending cancellation, don't wake all
1366                    // tasks. A single wake should be enough here. More
1367                    // wakes on further calls probably hides bugs.
1368                    true
1369                }
1370                Status::Finished => {
1371                    // Already finished.
1372                    false
1373                }
1374            }
1375        });
1376    }
1377
1378    /// Aborts tasks in this scope and all child scopes.
1379    fn abort_all_tasks(&self) {
1380        self.visit_scopes_locked(|state| match state.status() {
1381            Status::Active | Status::PendingCancellation => {
1382                state.abort_tasks_and_mark_finished();
1383                true
1384            }
1385            Status::Finished => false,
1386        });
1387    }
1388
1389    /// Drops tasks in this scope and all child scopes.
1390    ///
1391    /// # Panics
1392    ///
1393    /// Panics if any task is being accessed by another thread. Only call this
1394    /// method when the executor is shutting down and there are no other pollers.
1395    pub(super) fn drop_all_tasks(&self) {
1396        let mut scopes = vec![self.clone()];
1397        while let Some(scope) = scopes.pop() {
1398            let (tasks, join_results) = {
1399                let mut state = ScopeWaker::from(scope.lock());
1400                let (tasks, join_results, children) = state.set_closed_and_drain();
1401                scopes.extend(children.filter_map(|child| child.upgrade()));
1402                (tasks, join_results)
1403            };
1404            // Call task destructors once the scope lock is released so we don't risk a deadlock.
1405            for task in tasks {
1406                task.try_drop().expect("Expected drop to succeed");
1407            }
1408            std::mem::drop(join_results);
1409        }
1410    }
1411}
1412
1413/// Optimizes removal from parent scope.
1414#[repr(transparent)]
1415struct PtrKey;
1416
1417impl Borrow<PtrKey> for WeakScopeHandle {
1418    fn borrow(&self) -> &PtrKey {
1419        // SAFETY: PtrKey is a ZST so we aren't creating a reference to invalid memory.
1420        unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1421    }
1422}
1423
1424impl PartialEq for PtrKey {
1425    fn eq(&self, other: &Self) -> bool {
1426        std::ptr::eq(self, other)
1427    }
1428}
1429
1430impl Eq for PtrKey {}
1431
1432impl hash::Hash for PtrKey {
1433    fn hash<H: hash::Hasher>(&self, state: &mut H) {
1434        (self as *const PtrKey).hash(state);
1435    }
1436}
1437
1438#[derive(Default)]
1439struct JoinResults(HashMap<usize, JoinResult>);
1440
1441trait Results: Send + Sync + 'static {
1442    /// Returns true if we allow spawning futures with arbitrary outputs on the scope.
1443    fn can_spawn(&self) -> bool;
1444
1445    /// Polls for the specified task having finished.
1446    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1447
1448    /// Called when a task finishes.
1449    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1450
1451    /// Called to drop any results for a particular task.
1452    fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1453
1454    /// Takes *all* the stored results.
1455    fn take(&mut self) -> Box<dyn Any>;
1456
1457    /// Used only for testing.  Returns true if there are any results registered.
1458    #[cfg(test)]
1459    fn is_empty(&self) -> bool;
1460}
1461
1462impl Results for JoinResults {
1463    fn can_spawn(&self) -> bool {
1464        true
1465    }
1466
1467    fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1468        match self.0.entry(task_id) {
1469            Entry::Occupied(mut o) => match o.get_mut() {
1470                JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1471                JoinResult::Result(_) => {
1472                    let JoinResult::Result(task) = o.remove() else { unreachable!() };
1473                    return Poll::Ready(task);
1474                }
1475            },
1476            Entry::Vacant(v) => {
1477                v.insert(JoinResult::Waker(cx.waker().clone()));
1478            }
1479        }
1480        Poll::Pending
1481    }
1482
1483    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1484        match self.0.entry(task.id()) {
1485            Entry::Occupied(mut o) => {
1486                let JoinResult::Waker(waker) =
1487                    std::mem::replace(o.get_mut(), JoinResult::Result(task))
1488                else {
1489                    // It can't be JoinResult::Result because this function is the only
1490                    // function that sets that, and `task_did_finish` won't get called
1491                    // twice.
1492                    unreachable!()
1493                };
1494                Some(waker)
1495            }
1496            Entry::Vacant(v) => {
1497                v.insert(JoinResult::Result(task));
1498                None
1499            }
1500        }
1501    }
1502
1503    fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1504        match self.0.remove(&task_id) {
1505            Some(JoinResult::Result(task)) => Some(task),
1506            _ => None,
1507        }
1508    }
1509
1510    fn take(&mut self) -> Box<dyn Any> {
1511        Box::new(Self(std::mem::take(&mut self.0)))
1512    }
1513
1514    #[cfg(test)]
1515    fn is_empty(&self) -> bool {
1516        self.0.is_empty()
1517    }
1518}
1519
1520#[derive(Default)]
1521struct ResultsStream<R> {
1522    inner: Arc<Mutex<ResultsStreamInner<R>>>,
1523}
1524
1525struct ResultsStreamInner<R> {
1526    results: Vec<R>,
1527    waker: Option<Waker>,
1528}
1529
1530impl<R> Default for ResultsStreamInner<R> {
1531    fn default() -> Self {
1532        Self { results: Vec::new(), waker: None }
1533    }
1534}
1535
1536impl<R: Send + 'static> Results for ResultsStream<R> {
1537    fn can_spawn(&self) -> bool {
1538        false
1539    }
1540
1541    fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1542        Poll::Pending
1543    }
1544
1545    fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1546        let mut inner = self.inner.lock();
1547        // SAFETY: R is guaranteed to be the same return type as all futures finishing on this
1548        // scope.
1549        inner.results.extend(unsafe { task.take_result() });
1550        inner.waker.take()
1551    }
1552
1553    fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1554        None
1555    }
1556
1557    fn take(&mut self) -> Box<dyn Any> {
1558        Box::new(std::mem::take(&mut self.inner.lock().results))
1559    }
1560
1561    #[cfg(test)]
1562    fn is_empty(&self) -> bool {
1563        false
1564    }
1565}
1566
1567#[cfg(test)]
1568mod tests {
1569    use super::*;
1570    use crate::{EHandle, LocalExecutor, SendExecutor, SpawnableFuture, Task, TestExecutor, Timer};
1571    use assert_matches::assert_matches;
1572    use fuchsia_sync::{Condvar, Mutex};
1573    use futures::channel::mpsc;
1574    use futures::future::join_all;
1575    use futures::{FutureExt, StreamExt};
1576    use std::future::{pending, poll_fn};
1577    use std::pin::{pin, Pin};
1578    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1579    use std::sync::Arc;
1580    use std::task::{Context, Poll};
1581    use std::time::Duration;
1582
1583    #[derive(Default)]
1584    struct RemoteControlFuture(Mutex<RCFState>);
1585    #[derive(Default)]
1586    struct RCFState {
1587        resolved: bool,
1588        waker: Option<Waker>,
1589    }
1590
1591    impl Future for &RemoteControlFuture {
1592        type Output = ();
1593        fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1594            let mut this = self.0.lock();
1595            if this.resolved {
1596                Poll::Ready(())
1597            } else {
1598                this.waker.replace(cx.waker().clone());
1599                Poll::Pending
1600            }
1601        }
1602    }
1603
1604    impl RemoteControlFuture {
1605        fn new() -> Arc<Self> {
1606            Arc::new(Default::default())
1607        }
1608
1609        fn resolve(&self) {
1610            let mut this = self.0.lock();
1611            this.resolved = true;
1612            if let Some(waker) = this.waker.take() {
1613                waker.wake();
1614            }
1615        }
1616
1617        fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1618            let this = Arc::clone(self);
1619            #[allow(clippy::redundant_async_block)] // Allow returning `&*this` out of this fn.
1620            async move {
1621                (&*this).await
1622            }
1623        }
1624    }
1625
1626    #[test]
1627    fn compute_works_on_root_scope() {
1628        let mut executor = TestExecutor::new();
1629        let scope = executor.global_scope();
1630        let mut task = pin!(scope.compute(async { 1 }));
1631        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1632    }
1633
1634    #[test]
1635    fn compute_works_on_new_child() {
1636        let mut executor = TestExecutor::new();
1637        let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1638        let mut task = pin!(scope.compute(async { 1 }));
1639        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1640    }
1641
1642    #[test]
1643    fn scope_drop_cancels_tasks() {
1644        let mut executor = TestExecutor::new();
1645        let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1646        let mut task = pin!(scope.compute(async { 1 }));
1647        drop(scope);
1648        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1649    }
1650
1651    #[test]
1652    fn tasks_do_not_spawn_on_cancelled_scopes() {
1653        let mut executor = TestExecutor::new();
1654        let scope =
1655            executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1656        let handle = scope.to_handle();
1657        let mut cancel = pin!(scope.cancel());
1658        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1659        let mut task = pin!(handle.compute(async { 1 }));
1660        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1661    }
1662
1663    #[test]
1664    fn tasks_do_not_spawn_on_closed_empty_scopes() {
1665        let mut executor = TestExecutor::new();
1666        let scope =
1667            executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1668        let handle = scope.to_handle();
1669        let mut close = pin!(scope.cancel());
1670        assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1671        let mut task = pin!(handle.compute(async { 1 }));
1672        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1673    }
1674
1675    #[test]
1676    fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1677        let mut executor = TestExecutor::new();
1678        let scope = executor.global_scope().new_child();
1679        let handle = scope.to_handle();
1680        handle.spawn(pending());
1681        let mut close = pin!(scope.close());
1682        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1683        let mut task = pin!(handle.compute(async { 1 }));
1684        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1685    }
1686
1687    #[test]
1688    fn spawn_works_on_child_and_grandchild() {
1689        let mut executor = TestExecutor::new();
1690        let scope = executor.global_scope().new_child();
1691        let child = scope.new_child();
1692        let grandchild = child.new_child();
1693        let mut child_task = pin!(child.compute(async { 1 }));
1694        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1695        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1696        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1697    }
1698
1699    #[test]
1700    fn spawn_drop_cancels_child_and_grandchild_tasks() {
1701        let mut executor = TestExecutor::new();
1702        let scope = executor.global_scope().new_child();
1703        let child = scope.new_child();
1704        let grandchild = child.new_child();
1705        let mut child_task = pin!(child.compute(async { 1 }));
1706        let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1707        drop(scope);
1708        assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1709        assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1710    }
1711
1712    #[test]
1713    fn completed_tasks_are_cleaned_up_after_cancel() {
1714        let mut executor = TestExecutor::new();
1715        let scope = executor.global_scope().new_child();
1716
1717        let task1 = scope.spawn(pending::<()>());
1718        let task2 = scope.spawn(async {});
1719        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1720        assert_eq!(scope.lock().all_tasks().len(), 1);
1721
1722        // Running the executor after cancelling the task isn't currently
1723        // necessary, but we might decide to do async cleanup in the future.
1724        assert_eq!(task1.abort().now_or_never(), None);
1725        assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1726
1727        assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1728        assert_eq!(scope.lock().all_tasks().len(), 0);
1729        assert!(scope.lock().results.is_empty());
1730    }
1731
1732    #[test]
1733    fn join_emtpy_scope() {
1734        let mut executor = TestExecutor::new();
1735        let scope = executor.global_scope().new_child();
1736        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1737    }
1738
1739    #[test]
1740    fn task_handle_preserves_access_to_result_after_join_begins() {
1741        let mut executor = TestExecutor::new();
1742        let scope = executor.global_scope().new_child();
1743        let mut task = scope.compute(async { 1 });
1744        scope.spawn(async {});
1745        let task2 = scope.spawn(pending::<()>());
1746        // Fuse to stay agnostic as to whether the join completes before or
1747        // after awaiting the task handle.
1748        let mut join = pin!(scope.join().fuse());
1749        let _ = executor.run_until_stalled(&mut join);
1750        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1751        drop(task2.abort());
1752        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1753    }
1754
1755    #[test]
1756    fn join_blocks_until_task_is_cancelled() {
1757        // Scope with one outstanding task handle and one cancelled task.
1758        // The scope is not complete until the outstanding task handle is cancelled.
1759        let mut executor = TestExecutor::new();
1760        let scope = executor.global_scope().new_child();
1761        let outstanding_task = scope.spawn(pending::<()>());
1762        let cancelled_task = scope.spawn(pending::<()>());
1763        assert_eq!(
1764            executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1765            Poll::Ready(None)
1766        );
1767        let mut join = pin!(scope.join());
1768        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1769        assert_eq!(
1770            executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1771            Poll::Ready(None)
1772        );
1773        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1774    }
1775
1776    #[test]
1777    fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1778        let mut executor = TestExecutor::new();
1779        let scope = executor.global_scope().new_child();
1780        // The default is to detach.
1781        scope.spawn(pending::<()>());
1782        let mut join = pin!(scope.join());
1783        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1784        let mut cancel = pin!(join.cancel());
1785        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1786    }
1787
1788    #[test]
1789    fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1790        let mut executor = TestExecutor::new();
1791        let scope = executor.global_scope().new_child();
1792        // The default is to detach.
1793        scope.spawn(pending::<()>());
1794        let mut close = pin!(scope.close());
1795        assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1796        let mut cancel = pin!(close.cancel());
1797        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1798    }
1799
1800    #[test]
1801    fn join_scope_blocks_until_spawned_task_completes() {
1802        let mut executor = TestExecutor::new();
1803        let scope = executor.global_scope().new_child();
1804        let remote = RemoteControlFuture::new();
1805        let mut task = scope.spawn(remote.as_future());
1806        let mut scope_join = pin!(scope.join());
1807        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1808        remote.resolve();
1809        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1810        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1811    }
1812
1813    #[test]
1814    fn close_scope_blocks_until_spawned_task_completes() {
1815        let mut executor = TestExecutor::new();
1816        let scope = executor.global_scope().new_child();
1817        let remote = RemoteControlFuture::new();
1818        let mut task = scope.spawn(remote.as_future());
1819        let mut scope_close = pin!(scope.close());
1820        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1821        remote.resolve();
1822        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1823        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1824    }
1825
1826    #[test]
1827    fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1828        let mut executor = TestExecutor::new();
1829        let scope = executor.global_scope().new_child();
1830        let child = scope.new_child();
1831        let remote = RemoteControlFuture::new();
1832        child.spawn(remote.as_future());
1833        let mut scope_join = pin!(scope.join());
1834        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1835        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1836        child.detach();
1837        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1838        remote.resolve();
1839        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1840    }
1841
1842    #[test]
1843    fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1844        let mut executor = TestExecutor::new();
1845        let scope = executor.global_scope().new_child();
1846        let remote = RemoteControlFuture::new();
1847        {
1848            let remote = remote.clone();
1849            scope.spawn(async move {
1850                let child = Scope::new_with_name("child");
1851                child.spawn(async move {
1852                    Scope::current().spawn(remote.as_future());
1853                });
1854                child.detach();
1855            });
1856        }
1857        let mut scope_join = pin!(scope.join());
1858        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1859        remote.resolve();
1860        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1861    }
1862
1863    #[test]
1864    fn join_scope_blocks_when_blocked_child_is_detached() {
1865        let mut executor = TestExecutor::new();
1866        let scope = executor.global_scope().new_child();
1867        let child = scope.new_child();
1868        child.spawn(pending());
1869        let mut scope_join = pin!(scope.join());
1870        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1871        assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1872        child.detach();
1873        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1874    }
1875
1876    #[test]
1877    fn join_scope_completes_when_blocked_child_is_cancelled() {
1878        let mut executor = TestExecutor::new();
1879        let scope = executor.global_scope().new_child();
1880        let child = scope.new_child();
1881        child.spawn(pending());
1882        let mut scope_join = pin!(scope.join());
1883        {
1884            let mut child_join = pin!(child.join());
1885            assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1886            assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1887        }
1888        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1889    }
1890
1891    #[test]
1892    fn detached_scope_can_spawn() {
1893        let mut executor = TestExecutor::new();
1894        let scope = executor.global_scope().new_child();
1895        let handle = scope.to_handle();
1896        scope.detach();
1897        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1898    }
1899
1900    #[test]
1901    fn dropped_scope_cannot_spawn() {
1902        let mut executor = TestExecutor::new();
1903        let scope = executor.global_scope().new_child();
1904        let handle = scope.to_handle();
1905        drop(scope);
1906        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1907    }
1908
1909    #[test]
1910    fn dropped_scope_with_running_task_cannot_spawn() {
1911        let mut executor = TestExecutor::new();
1912        let scope = executor.global_scope().new_child();
1913        let handle = scope.to_handle();
1914        let _running_task = handle.spawn(pending::<()>());
1915        drop(scope);
1916        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1917    }
1918
1919    #[test]
1920    fn joined_scope_cannot_spawn() {
1921        let mut executor = TestExecutor::new();
1922        let scope = executor.global_scope().new_child();
1923        let handle = scope.to_handle();
1924        let mut scope_join = pin!(scope.join());
1925        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1926        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1927    }
1928
1929    #[test]
1930    fn joining_scope_with_running_task_can_spawn() {
1931        let mut executor = TestExecutor::new();
1932        let scope = executor.global_scope().new_child();
1933        let handle = scope.to_handle();
1934        let _running_task = handle.spawn(pending::<()>());
1935        let mut scope_join = pin!(scope.join());
1936        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1937        assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1938    }
1939
1940    #[test]
1941    fn joined_scope_child_cannot_spawn() {
1942        let mut executor = TestExecutor::new();
1943        let scope = executor.global_scope().new_child();
1944        let handle = scope.to_handle();
1945        let child_before_join = scope.new_child();
1946        assert_eq!(
1947            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1948            Poll::Ready(1)
1949        );
1950        let mut scope_join = pin!(scope.join());
1951        assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1952        let child_after_join = handle.new_child();
1953        let grandchild_after_join = child_before_join.new_child();
1954        assert_eq!(
1955            executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1956            Poll::Pending
1957        );
1958        assert_eq!(
1959            executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
1960            Poll::Pending
1961        );
1962        assert_eq!(
1963            executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
1964            Poll::Pending
1965        );
1966    }
1967
1968    #[test]
1969    fn closed_scope_child_cannot_spawn() {
1970        let mut executor = TestExecutor::new();
1971        let scope = executor.global_scope().new_child();
1972        let handle = scope.to_handle();
1973        let child_before_close = scope.new_child();
1974        assert_eq!(
1975            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1976            Poll::Ready(1)
1977        );
1978        let mut scope_close = pin!(scope.close());
1979        assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1980        let child_after_close = handle.new_child();
1981        let grandchild_after_close = child_before_close.new_child();
1982        assert_eq!(
1983            executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1984            Poll::Pending
1985        );
1986        assert_eq!(
1987            executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
1988            Poll::Pending
1989        );
1990        assert_eq!(
1991            executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
1992            Poll::Pending
1993        );
1994    }
1995
1996    #[test]
1997    fn can_join_child_first() {
1998        let mut executor = TestExecutor::new();
1999        let scope = executor.global_scope().new_child();
2000        let child = scope.new_child();
2001        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2002        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2003        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2004    }
2005
2006    #[test]
2007    fn can_join_parent_first() {
2008        let mut executor = TestExecutor::new();
2009        let scope = executor.global_scope().new_child();
2010        let child = scope.new_child();
2011        assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2012        assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2013        assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2014    }
2015
2016    #[test]
2017    fn task_in_parent_scope_can_join_child() {
2018        let mut executor = TestExecutor::new();
2019        let scope = executor.global_scope().new_child();
2020        let child = scope.new_child();
2021        let remote = RemoteControlFuture::new();
2022        child.spawn(remote.as_future());
2023        scope.spawn(async move { child.join().await });
2024        let mut join = pin!(scope.join());
2025        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2026        remote.resolve();
2027        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2028    }
2029
2030    #[test]
2031    fn join_completes_while_completed_task_handle_is_held() {
2032        let mut executor = TestExecutor::new();
2033        let scope = executor.global_scope().new_child();
2034        let mut task = scope.compute(async { 1 });
2035        scope.spawn(async {});
2036        let mut join = pin!(scope.join());
2037        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2038        assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2039    }
2040
2041    #[test]
2042    fn cancel_completes_while_task_holds_handle() {
2043        let mut executor = TestExecutor::new();
2044        let scope = executor.global_scope().new_child();
2045        let handle = scope.to_handle();
2046        let mut task = scope.compute(async move {
2047            loop {
2048                pending::<()>().await; // never returns
2049                handle.spawn(async {});
2050            }
2051        });
2052
2053        // Join should not complete because the task never does.
2054        let mut join = pin!(scope.join());
2055        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2056
2057        let mut cancel = pin!(join.cancel());
2058        assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2059        assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2060    }
2061
2062    #[test]
2063    fn cancel_from_handle_inside_task() {
2064        let mut executor = TestExecutor::new();
2065        let scope = executor.global_scope().new_child();
2066        {
2067            // Spawn a task that never finishes until the scope is cancelled.
2068            scope.spawn(pending::<()>());
2069
2070            let mut no_tasks = pin!(scope.on_no_tasks());
2071            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2072
2073            let handle = scope.to_handle();
2074            scope.spawn(async move {
2075                handle.cancel().await;
2076                panic!("cancel() should never complete");
2077            });
2078
2079            assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2080        }
2081        assert_eq!(scope.join().now_or_never(), Some(()));
2082    }
2083
2084    #[test]
2085    fn can_spawn_from_non_executor_thread() {
2086        let mut executor = TestExecutor::new();
2087        let scope = executor.global_scope().clone();
2088        let done = Arc::new(AtomicBool::new(false));
2089        let done_clone = done.clone();
2090        let _ = std::thread::spawn(move || {
2091            scope.spawn(async move {
2092                done_clone.store(true, Ordering::Relaxed);
2093            })
2094        })
2095        .join();
2096        let _ = executor.run_until_stalled(&mut pending::<()>());
2097        assert!(done.load(Ordering::Relaxed));
2098    }
2099
2100    #[test]
2101    fn scope_tree() {
2102        // A
2103        //  \
2104        //   B
2105        //  / \
2106        // C   D
2107        let mut executor = TestExecutor::new();
2108        let a = executor.global_scope().new_child();
2109        let b = a.new_child();
2110        let c = b.new_child();
2111        let d = b.new_child();
2112        let a_remote = RemoteControlFuture::new();
2113        let c_remote = RemoteControlFuture::new();
2114        let d_remote = RemoteControlFuture::new();
2115        a.spawn(a_remote.as_future());
2116        c.spawn(c_remote.as_future());
2117        d.spawn(d_remote.as_future());
2118        let mut a_join = pin!(a.join());
2119        let mut b_join = pin!(b.join());
2120        let mut d_join = pin!(d.join());
2121        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2122        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2123        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2124        d_remote.resolve();
2125        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2126        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2127        assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2128        c_remote.resolve();
2129        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2130        assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2131        a_remote.resolve();
2132        assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2133        let mut c_join = pin!(c.join());
2134        assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2135    }
2136
2137    #[test]
2138    fn on_no_tasks() {
2139        let mut executor = TestExecutor::new();
2140        let scope = executor.global_scope().new_child();
2141        let _task1 = scope.spawn(std::future::ready(()));
2142        let task2 = scope.spawn(pending::<()>());
2143
2144        let mut on_no_tasks = pin!(scope.on_no_tasks());
2145
2146        assert!(executor.run_until_stalled(&mut on_no_tasks).is_pending());
2147
2148        drop(task2.abort());
2149
2150        let on_no_tasks2 = pin!(scope.on_no_tasks());
2151        let on_no_tasks3 = pin!(scope.on_no_tasks());
2152
2153        assert_matches!(
2154            executor.run_until_stalled(&mut join_all([on_no_tasks, on_no_tasks2, on_no_tasks3])),
2155            Poll::Ready(_)
2156        );
2157    }
2158
2159    #[test]
2160    fn wake_all() {
2161        let mut executor = TestExecutor::new();
2162        let scope = executor.global_scope().new_child();
2163
2164        let poll_count = Arc::new(AtomicU64::new(0));
2165
2166        struct PollCounter(Arc<AtomicU64>);
2167
2168        impl Future for PollCounter {
2169            type Output = ();
2170            fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2171                self.0.fetch_add(1, Ordering::Relaxed);
2172                Poll::Pending
2173            }
2174        }
2175
2176        scope.spawn(PollCounter(poll_count.clone()));
2177        scope.spawn(PollCounter(poll_count.clone()));
2178
2179        let _ = executor.run_until_stalled(&mut pending::<()>());
2180
2181        let mut start_count = poll_count.load(Ordering::Relaxed);
2182
2183        for _ in 0..2 {
2184            scope.wake_all();
2185            let _ = executor.run_until_stalled(&mut pending::<()>());
2186            assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
2187            start_count += 2;
2188        }
2189    }
2190
2191    #[test]
2192    fn on_no_tasks_race() {
2193        fn sleep_random() {
2194            use rand::Rng;
2195            std::thread::sleep(std::time::Duration::from_micros(
2196                rand::thread_rng().gen_range(0..10),
2197            ));
2198        }
2199        for _ in 0..2000 {
2200            let mut executor = SendExecutor::new(2);
2201            let scope = executor.root_scope().new_child();
2202            scope.spawn(async {
2203                sleep_random();
2204            });
2205            executor.run(async move {
2206                sleep_random();
2207                scope.on_no_tasks().await;
2208            });
2209        }
2210    }
2211
2212    async fn yield_to_executor() {
2213        let mut done = false;
2214        poll_fn(|cx| {
2215            if done {
2216                Poll::Ready(())
2217            } else {
2218                done = true;
2219                cx.waker().wake_by_ref();
2220                Poll::Pending
2221            }
2222        })
2223        .await;
2224    }
2225
2226    #[test]
2227    fn test_detach() {
2228        let mut e = LocalExecutor::new();
2229        e.run_singlethreaded(async {
2230            let counter = Arc::new(AtomicU32::new(0));
2231
2232            {
2233                let counter = counter.clone();
2234                Task::spawn(async move {
2235                    for _ in 0..5 {
2236                        yield_to_executor().await;
2237                        counter.fetch_add(1, Ordering::Relaxed);
2238                    }
2239                })
2240                .detach();
2241            }
2242
2243            while counter.load(Ordering::Relaxed) != 5 {
2244                yield_to_executor().await;
2245            }
2246        });
2247
2248        assert!(e.ehandle.root_scope.lock().results.is_empty());
2249    }
2250
2251    #[test]
2252    fn test_cancel() {
2253        let mut e = LocalExecutor::new();
2254        e.run_singlethreaded(async {
2255            let ref_count = Arc::new(());
2256            // First, just drop the task.
2257            {
2258                let ref_count = ref_count.clone();
2259                drop(Task::spawn(async move {
2260                    let _ref_count = ref_count;
2261                    let _: () = std::future::pending().await;
2262                }));
2263            }
2264
2265            while Arc::strong_count(&ref_count) != 1 {
2266                yield_to_executor().await;
2267            }
2268
2269            // Now try explicitly cancelling.
2270            let task = {
2271                let ref_count = ref_count.clone();
2272                Task::spawn(async move {
2273                    let _ref_count = ref_count;
2274                    let _: () = std::future::pending().await;
2275                })
2276            };
2277
2278            assert_eq!(task.abort().await, None);
2279            while Arc::strong_count(&ref_count) != 1 {
2280                yield_to_executor().await;
2281            }
2282
2283            // Now cancel a task that has already finished.
2284            let task = {
2285                let ref_count = ref_count.clone();
2286                Task::spawn(async move {
2287                    let _ref_count = ref_count;
2288                })
2289            };
2290
2291            // Wait for it to finish.
2292            while Arc::strong_count(&ref_count) != 1 {
2293                yield_to_executor().await;
2294            }
2295
2296            assert_eq!(task.abort().await, Some(()));
2297        });
2298
2299        assert!(e.ehandle.root_scope.lock().results.is_empty());
2300    }
2301
2302    #[test]
2303    fn test_cancel_waits() {
2304        let mut executor = SendExecutor::new(2);
2305        let running = Arc::new((Mutex::new(false), Condvar::new()));
2306        let task = {
2307            let running = running.clone();
2308            executor.root_scope().compute(async move {
2309                *running.0.lock() = true;
2310                running.1.notify_all();
2311                std::thread::sleep(std::time::Duration::from_millis(10));
2312                *running.0.lock() = false;
2313                "foo"
2314            })
2315        };
2316        executor.run(async move {
2317            {
2318                let mut guard = running.0.lock();
2319                while !*guard {
2320                    running.1.wait(&mut guard);
2321                }
2322            }
2323            assert_eq!(task.abort().await, Some("foo"));
2324            assert!(!*running.0.lock());
2325        });
2326    }
2327
2328    fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2329        let mut executor = SendExecutor::new(2);
2330        let running = Arc::new((Mutex::new(false), Condvar::new()));
2331        let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2332        let task = {
2333            let running = running.clone();
2334            let can_quit = can_quit.clone();
2335            executor.root_scope().compute(async move {
2336                *running.0.lock() = true;
2337                running.1.notify_all();
2338                {
2339                    let mut guard = can_quit.0.lock();
2340                    while !*guard {
2341                        can_quit.1.wait(&mut guard);
2342                    }
2343                }
2344                *running.0.lock() = false;
2345            })
2346        };
2347        executor.run(async move {
2348            {
2349                let mut guard = running.0.lock();
2350                while !*guard {
2351                    running.1.wait(&mut guard);
2352                }
2353            }
2354
2355            callback(task);
2356
2357            *can_quit.0.lock() = true;
2358            can_quit.1.notify_all();
2359
2360            let ehandle = EHandle::local();
2361            let scope = ehandle.global_scope();
2362
2363            // The only way of testing for this is to poll.
2364            while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2365                Timer::new(std::time::Duration::from_millis(1)).await;
2366            }
2367
2368            assert!(!*running.0.lock());
2369        });
2370    }
2371
2372    #[test]
2373    fn test_dropped_cancel_cleans_up() {
2374        test_clean_up(|task| {
2375            let abort_fut = std::pin::pin!(task.abort());
2376            let waker = futures::task::noop_waker();
2377            assert!(abort_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2378        });
2379    }
2380
2381    #[test]
2382    fn test_dropped_task_cleans_up() {
2383        test_clean_up(|task| {
2384            std::mem::drop(task);
2385        });
2386    }
2387
2388    #[test]
2389    fn test_detach_cleans_up() {
2390        test_clean_up(|task| {
2391            task.detach();
2392        });
2393    }
2394
2395    #[test]
2396    fn test_scope_stream() {
2397        let mut executor = SendExecutor::new(2);
2398        executor.run(async move {
2399            let (stream, handle) = ScopeStream::new();
2400            handle.push(async { 1 });
2401            handle.push(async { 2 });
2402            stream.close();
2403            let results: HashSet<_> = stream.collect().await;
2404            assert_eq!(results, HashSet::from_iter([1, 2]));
2405        });
2406    }
2407
2408    #[test]
2409    fn test_scope_stream_wakes_properly() {
2410        let mut executor = SendExecutor::new(2);
2411        executor.run(async move {
2412            let (stream, handle) = ScopeStream::new();
2413            handle.push(async {
2414                Timer::new(Duration::from_millis(10)).await;
2415                1
2416            });
2417            handle.push(async {
2418                Timer::new(Duration::from_millis(10)).await;
2419                2
2420            });
2421            stream.close();
2422            let results: HashSet<_> = stream.collect().await;
2423            assert_eq!(results, HashSet::from_iter([1, 2]));
2424        });
2425    }
2426
2427    #[test]
2428    fn test_scope_stream_drops_spawned_tasks() {
2429        let mut executor = SendExecutor::new(2);
2430        executor.run(async move {
2431            let (stream, handle) = ScopeStream::new();
2432            handle.push(async { 1 });
2433            let _task = stream.compute(async { "foo" });
2434            stream.close();
2435            let results: HashSet<_> = stream.collect().await;
2436            assert_eq!(results, HashSet::from_iter([1]));
2437        });
2438    }
2439
2440    #[test]
2441    fn test_nested_scope_stream() {
2442        let mut executor = SendExecutor::new(2);
2443        executor.run(async move {
2444            let (mut stream, handle) = ScopeStream::new();
2445            handle.clone().push(async move {
2446                handle.clone().push(async move {
2447                    handle.clone().push(async move { 3 });
2448                    2
2449                });
2450                1
2451            });
2452            let mut results = HashSet::default();
2453            while let Some(item) = stream.next().await {
2454                results.insert(item);
2455                if results.len() == 3 {
2456                    stream.close();
2457                }
2458            }
2459            assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2460        });
2461    }
2462
2463    #[test]
2464    fn test_dropping_scope_stream_cancels_all_tasks() {
2465        let mut executor = SendExecutor::new(2);
2466        executor.run(async move {
2467            let (stream, handle) = ScopeStream::new();
2468            let (tx1, mut rx) = mpsc::unbounded::<()>();
2469            let tx2 = tx1.clone();
2470            handle.push(async move {
2471                let _tx1 = tx1;
2472                let () = pending().await;
2473            });
2474            handle.push(async move {
2475                let _tx2 = tx2;
2476                let () = pending().await;
2477            });
2478            drop(stream);
2479
2480            // This will wait forever if the tasks aren't cancelled.
2481            assert_eq!(rx.next().await, None);
2482        });
2483    }
2484
2485    #[test]
2486    fn test_scope_stream_collect() {
2487        let mut executor = SendExecutor::new(2);
2488        executor.run(async move {
2489            let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2490            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2491
2492            let stream: ScopeStream<_> =
2493                (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2494            assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2495        });
2496    }
2497
2498    struct DropSignal(Arc<AtomicBool>);
2499
2500    impl Drop for DropSignal {
2501        fn drop(&mut self) {
2502            self.0.store(true, Ordering::SeqCst);
2503        }
2504    }
2505
2506    struct DropChecker(Arc<AtomicBool>);
2507
2508    impl DropChecker {
2509        fn new() -> (Self, DropSignal) {
2510            let inner = Arc::new(AtomicBool::new(false));
2511            (Self(inner.clone()), DropSignal(inner))
2512        }
2513
2514        fn is_dropped(&self) -> bool {
2515            self.0.load(Ordering::SeqCst)
2516        }
2517    }
2518
2519    #[test]
2520    fn child_finished_when_parent_pending() {
2521        let mut executor = LocalExecutor::new();
2522        executor.run_singlethreaded(async {
2523            let scope = Scope::new();
2524            let _guard = scope.active_guard().expect("acquire guard");
2525            let cancel = scope.to_handle().cancel();
2526            let child = scope.new_child();
2527            let (checker, signal) = DropChecker::new();
2528            child.spawn(async move {
2529                let _signal = signal;
2530                futures::future::pending::<()>().await
2531            });
2532            assert!(checker.is_dropped());
2533            assert!(child.active_guard().is_none());
2534            cancel.await;
2535        })
2536    }
2537
2538    #[test]
2539    fn guarded_scopes_observe_closed() {
2540        let mut executor = LocalExecutor::new();
2541        executor.run_singlethreaded(async {
2542            let scope = Scope::new();
2543            let handle = scope.to_handle();
2544            let _guard = scope.active_guard().expect("acquire guard");
2545            handle.close();
2546            let (checker, signal) = DropChecker::new();
2547            handle.spawn(async move {
2548                let _signal = signal;
2549                futures::future::pending::<()>().await
2550            });
2551            assert!(checker.is_dropped());
2552            let (checker, signal) = DropChecker::new();
2553            let cancel = handle.clone().cancel();
2554            handle.spawn(async move {
2555                let _signal = signal;
2556                futures::future::pending::<()>().await
2557            });
2558            assert!(checker.is_dropped());
2559            scope.join().await;
2560            cancel.await;
2561        })
2562    }
2563
2564    #[test]
2565    fn active_guard_holds_cancellation() {
2566        let mut executor = TestExecutor::new();
2567        let scope = executor.global_scope().new_child();
2568        let guard = scope.active_guard().expect("acquire guard");
2569        scope.spawn(futures::future::pending());
2570        let mut join = pin!(scope.cancel());
2571        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2572        drop(guard);
2573        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2574    }
2575
2576    #[test]
2577    fn child_guard_holds_parent_cancellation() {
2578        let mut executor = TestExecutor::new();
2579        let scope = executor.global_scope().new_child();
2580        let child = scope.new_child();
2581        let guard = child.active_guard().expect("acquire guard");
2582        scope.spawn(futures::future::pending());
2583        let mut join = pin!(scope.cancel());
2584        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2585        drop(guard);
2586        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2587    }
2588
2589    #[test]
2590    fn active_guard_on_cancel() {
2591        let mut executor = TestExecutor::new();
2592        let scope = executor.global_scope().new_child();
2593        let child1 = scope.new_child();
2594        let child2 = scope.new_child();
2595        let guard = child1.active_guard().expect("acquire guard");
2596        let guard_for_right_scope = guard.clone();
2597        let guard_for_wrong_scope = guard.clone();
2598        child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2599        child2.spawn(async move {
2600            guard_for_wrong_scope.on_cancel().await;
2601        });
2602
2603        let handle = scope.to_handle();
2604        let mut join = pin!(scope.join());
2605        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2606        let cancel: Join<_> = handle.cancel();
2607        drop(cancel);
2608        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2609    }
2610
2611    #[test]
2612    fn abort_join() {
2613        let mut executor = TestExecutor::new();
2614        let scope = executor.global_scope().new_child();
2615        let child = scope.new_child();
2616        let _guard = child.active_guard().expect("acquire guard");
2617
2618        let (checker1, signal) = DropChecker::new();
2619        scope.spawn(async move {
2620            let _signal = signal;
2621            futures::future::pending::<()>().await
2622        });
2623        let (checker2, signal) = DropChecker::new();
2624        scope.spawn(async move {
2625            let _signal = signal;
2626            futures::future::pending::<()>().await
2627        });
2628
2629        let mut join = pin!(scope.cancel());
2630        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2631        assert!(!checker1.is_dropped());
2632        assert!(!checker2.is_dropped());
2633
2634        let mut join = join.abort();
2635        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2636        assert!(checker1.is_dropped());
2637        assert!(checker2.is_dropped());
2638    }
2639
2640    #[test]
2641    fn child_without_guard_aborts_immediately_on_cancel() {
2642        let mut executor = TestExecutor::new();
2643        let scope = executor.global_scope().new_child();
2644        let child = scope.new_child();
2645        let guard = scope.active_guard().expect("acquire guard");
2646
2647        let (checker_scope, signal) = DropChecker::new();
2648        scope.spawn(async move {
2649            let _signal = signal;
2650            futures::future::pending::<()>().await
2651        });
2652        let (checker_child, signal) = DropChecker::new();
2653        child.spawn(async move {
2654            let _signal = signal;
2655            futures::future::pending::<()>().await
2656        });
2657
2658        let mut join = pin!(scope.cancel());
2659        assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2660        assert!(!checker_scope.is_dropped());
2661        assert!(checker_child.is_dropped());
2662
2663        drop(guard);
2664        assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2665        assert!(checker_child.is_dropped());
2666    }
2667}