1use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::condition::{Condition, ConditionGuard, WakerEntry};
9use crate::EHandle;
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{ready, Context, Poll, Waker};
26use std::{fmt, hash};
27
28#[must_use = "Scopes should be explicitly awaited or cancelled"]
73#[derive(Debug)]
74pub struct Scope {
75 inner: ScopeHandle,
77 }
79
80impl Default for Scope {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Scope {
87 pub fn new() -> Scope {
96 ScopeHandle::with_current(|handle| handle.new_child())
97 }
98
99 pub fn new_with_name(name: impl Into<String>) -> Scope {
108 ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
109 }
110
111 pub fn current() -> ScopeHandle {
119 ScopeHandle::with_current(|handle| handle.clone())
120 }
121
122 pub fn global() -> ScopeHandle {
139 EHandle::local().global_scope().clone()
140 }
141
142 pub fn new_child(&self) -> Scope {
144 self.inner.new_child()
145 }
146
147 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
149 self.inner.new_child_with_name(name.into())
150 }
151
152 pub fn name(&self) -> &str {
154 &self.inner.inner.name
155 }
156
157 pub fn to_handle(&self) -> ScopeHandle {
165 self.inner.clone()
166 }
167
168 pub fn as_handle(&self) -> &ScopeHandle {
175 &self.inner
176 }
177
178 pub fn join(self) -> Join {
187 Join::new(self)
188 }
189
190 pub fn close(self) -> Join {
193 self.inner.close();
194 Join::new(self)
195 }
196
197 pub fn cancel(self) -> Join {
215 self.inner.cancel_all_tasks();
216 Join::new(self)
217 }
218
219 pub fn abort(self) -> impl Future<Output = ()> {
234 self.inner.abort_all_tasks();
235 Join::new(self)
236 }
237
238 pub fn detach(self) {
244 let this = ManuallyDrop::new(self);
247 mem::drop(unsafe { std::ptr::read(&this.inner) });
250 }
251}
252
253impl Drop for Scope {
256 fn drop(&mut self) {
257 self.inner.abort_all_tasks();
266 }
267}
268
269impl IntoFuture for Scope {
270 type Output = ();
271 type IntoFuture = Join;
272 fn into_future(self) -> Self::IntoFuture {
273 self.join()
274 }
275}
276
277impl Deref for Scope {
278 type Target = ScopeHandle;
279 fn deref(&self) -> &Self::Target {
280 &self.inner
281 }
282}
283
284impl Borrow<ScopeHandle> for Scope {
285 fn borrow(&self) -> &ScopeHandle {
286 self
287 }
288}
289
290pin_project! {
291 pub struct Join<S = Scope> {
303 scope: S,
304 #[pin]
305 waker_entry: WakerEntry<ScopeState>,
306 }
307}
308
309impl<S: Borrow<ScopeHandle>> Join<S> {
310 fn new(scope: S) -> Self {
311 let waker_entry = scope.borrow().inner.state.waker_entry();
312 Self { scope, waker_entry }
313 }
314
315 pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
320 self.scope.borrow().abort_all_tasks();
321 self
322 }
323
324 pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
329 self.scope.borrow().cancel_all_tasks();
330 self
331 }
332}
333
334impl<S> Future for Join<S>
335where
336 S: Borrow<ScopeHandle>,
337{
338 type Output = ();
339 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340 let this = self.project();
341 let mut state = Borrow::borrow(&*this.scope).lock();
342 if state.has_tasks() {
343 state.add_waker(this.waker_entry, cx.waker().clone());
344 Poll::Pending
345 } else {
346 state.mark_finished();
347 Poll::Ready(())
348 }
349 }
350}
351
352pub trait Spawnable {
355 type Output;
357
358 fn into_task(self, scope: ScopeHandle) -> TaskHandle;
360}
361
362impl<F: Future + Send + 'static> Spawnable for F
363where
364 F::Output: Send + 'static,
365{
366 type Output = F::Output;
367
368 fn into_task(self, scope: ScopeHandle) -> TaskHandle {
369 scope.new_task(None, self)
370 }
371}
372
373#[derive(Clone)]
385pub struct ScopeHandle {
386 inner: Arc<ScopeInner>,
388 }
390
391impl ScopeHandle {
392 pub fn new_child(&self) -> Scope {
394 self.new_child_inner(String::new())
395 }
396
397 pub fn instrument_data(&self) -> Option<&(dyn Any + Send + Sync)> {
399 self.inner.instrument_data.as_deref()
400 }
401
402 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
404 self.new_child_inner(name.into())
405 }
406
407 fn new_child_inner(&self, name: String) -> Scope {
408 let mut state = self.lock();
409 let child = ScopeHandle {
410 inner: Arc::new(ScopeInner {
411 executor: self.inner.executor.clone(),
412 state: Condition::new(ScopeState::new_child(
413 self.clone(),
414 &state,
415 JoinResults::default().into(),
416 )),
417
418 instrument_data: self
419 .inner
420 .executor
421 .instrument
422 .as_ref()
423 .map(|value| value.scope_created(&name, Some(self))),
424 name,
425 }),
426 };
427 let weak = child.downgrade();
428 state.insert_child(weak);
429 Scope { inner: child }
430 }
431
432 pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
436 let task = future.into_task(self.clone());
437 let task_id = task.id();
438 self.insert_task(task, false);
439 JoinHandle::new(self.clone(), task_id)
440 }
441
442 pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
447 let task = self.new_local_task(None, future);
448 let id = task.id();
449 self.insert_task(task, false);
450 JoinHandle::new(self.clone(), id)
451 }
452
453 pub fn compute<T: Send + 'static>(
458 &self,
459 future: impl Spawnable<Output = T> + Send + 'static,
460 ) -> crate::Task<T> {
461 let task = future.into_task(self.clone());
462 let id = task.id();
463 self.insert_task(task, false);
464 JoinHandle::new(self.clone(), id).into()
465 }
466
467 pub fn compute_local<T: 'static>(
475 &self,
476 future: impl Future<Output = T> + 'static,
477 ) -> crate::Task<T> {
478 let task = self.new_local_task(None, future);
479 let id = task.id();
480 self.insert_task(task, false);
481 JoinHandle::new(self.clone(), id).into()
482 }
483
484 pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
485 ScopeHandle {
486 inner: Arc::new(ScopeInner {
487 state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
488 name: "root".to_string(),
489 instrument_data: executor
490 .instrument
491 .as_ref()
492 .map(|value| value.scope_created("root", None)),
493 executor,
494 }),
495 }
496 }
497
498 pub fn close(&self) {
506 self.lock().close();
507 }
508
509 pub fn cancel(self) -> Join<Self> {
514 self.cancel_all_tasks();
515 Join::new(self)
516 }
517
518 pub fn abort(self) -> impl Future<Output = ()> {
523 self.abort_all_tasks();
524 Join::new(self)
525 }
526
527 #[must_use]
539 pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
540 ScopeActiveGuard::new(self)
541 }
542
543 pub fn is_cancelled(&self) -> bool {
546 self.lock().status().is_cancelled()
547 }
548
549 pub async fn on_no_tasks(&self) {
556 self.inner
557 .state
558 .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
559 .await;
560 }
561
562 pub async fn on_no_tasks_and_guards(&self) {
565 self.inner
566 .state
567 .when(|state| {
568 if state.has_tasks() || state.guards() > 0 {
569 Poll::Pending
570 } else {
571 Poll::Ready(())
572 }
573 })
574 .await;
575 }
576
577 pub fn wake_all_with_active_guard(&self) {
579 self.lock().wake_all_with_active_guard();
580 }
581
582 pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
585 &self,
586 id: Option<usize>,
587 fut: Fut,
588 ) -> AtomicFutureHandle<'a>
589 where
590 Fut::Output: Send,
591 {
592 let id = id.unwrap_or_else(|| self.executor().next_task_id());
593 let mut task = AtomicFutureHandle::new(Some(self.clone()), id, fut);
594 if let Some(instrument) = &self.executor().instrument {
595 instrument.task_created(self, &mut task);
596 }
597 task
598 }
599
600 pub(crate) fn new_local_task<'a>(
603 &self,
604 id: Option<usize>,
605 fut: impl Future + 'a,
606 ) -> AtomicFutureHandle<'a> {
607 if !self.executor().is_local() {
609 panic!(
610 "Error: called `new_local_task` on multithreaded executor. \
611 Use `spawn` or a `LocalExecutor` instead."
612 );
613 }
614
615 let id = id.unwrap_or_else(|| self.executor().next_task_id());
616 unsafe {
619 let mut task = AtomicFutureHandle::new_local(Some(self.clone()), id, fut);
620 if let Some(instrument) = &self.executor().instrument {
621 instrument.task_created(self, &mut task);
622 }
623 task
624 }
625 }
626}
627
628impl fmt::Debug for ScopeHandle {
629 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
630 f.debug_struct("Scope").field("name", &self.inner.name).finish()
631 }
632}
633
634pub struct ScopeStream<R> {
644 inner: ScopeHandle,
645 stream: Arc<Mutex<ResultsStreamInner<R>>>,
646}
647
648impl<R: Send + 'static> ScopeStream<R> {
649 pub fn new() -> (Self, ScopeStreamHandle<R>) {
658 Self::new_inner(String::new())
659 }
660
661 pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
670 Self::new_inner(name.into())
671 }
672
673 fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
674 let this = ScopeHandle::with_current(|handle| {
675 let mut state = handle.lock();
676 let stream = Arc::default();
677 let child = ScopeHandle {
678 inner: Arc::new(ScopeInner {
679 executor: handle.executor().clone(),
680 state: Condition::new(ScopeState::new_child(
681 handle.clone(),
682 &state,
683 Box::new(ResultsStream { inner: Arc::clone(&stream) }),
684 )),
685 instrument_data: handle
686 .executor()
687 .instrument
688 .as_ref()
689 .map(|value| value.scope_created(&name, Some(handle))),
690 name,
691 }),
692 };
693 let weak = child.downgrade();
694 state.insert_child(weak);
695 ScopeStream { inner: child, stream }
696 });
697 let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
698 (this, handle)
699 }
700}
701
702impl<R> Drop for ScopeStream<R> {
703 fn drop(&mut self) {
704 self.inner.abort_all_tasks();
713 }
714}
715
716impl<R: Send + 'static> Stream for ScopeStream<R> {
717 type Item = R;
718
719 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
720 let mut stream_inner = self.stream.lock();
721 match stream_inner.results.pop() {
722 Some(result) => Poll::Ready(Some(result)),
723 None => {
724 drop(stream_inner);
727 let state = self.inner.lock();
728 let mut stream_inner = self.stream.lock();
729 match stream_inner.results.pop() {
730 Some(result) => Poll::Ready(Some(result)),
731 None => {
732 if state.has_tasks() {
733 stream_inner.waker = Some(cx.waker().clone());
734 Poll::Pending
735 } else {
736 Poll::Ready(None)
737 }
738 }
739 }
740 }
741 }
742 }
743}
744
745impl<R> Deref for ScopeStream<R> {
746 type Target = ScopeHandle;
747 fn deref(&self) -> &Self::Target {
748 &self.inner
749 }
750}
751
752impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
753 fn borrow(&self) -> &ScopeHandle {
754 self
755 }
756}
757
758impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
759 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
760 let (stream, handle) = ScopeStream::new();
761 for fut in iter {
762 handle.push(fut);
763 }
764 stream.close();
765 stream
766 }
767}
768
769#[derive(Clone)]
770pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
771
772impl<R: Send> ScopeStreamHandle<R> {
773 pub fn push(&self, future: impl Spawnable<Output = R>) {
774 self.0.insert_task(future.into_task(self.0.clone()), true);
775 }
776}
777
778#[derive(Debug)]
788#[must_use]
789pub struct ScopeActiveGuard(ScopeHandle);
790
791impl Deref for ScopeActiveGuard {
792 type Target = ScopeHandle;
793 fn deref(&self) -> &Self::Target {
794 &self.0
795 }
796}
797
798impl Drop for ScopeActiveGuard {
799 fn drop(&mut self) {
800 let Self(scope) = self;
801 scope.release_cancel_guard();
802 }
803}
804
805impl Clone for ScopeActiveGuard {
806 fn clone(&self) -> Self {
807 self.0.lock().acquire_cancel_guard(1);
808 Self(self.0.clone())
809 }
810}
811
812impl ScopeActiveGuard {
813 pub fn as_handle(&self) -> &ScopeHandle {
815 &self.0
816 }
817
818 pub fn to_handle(&self) -> ScopeHandle {
820 self.0.clone()
821 }
822
823 pub async fn on_cancel(&self) {
829 self.0
830 .inner
831 .state
832 .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
833 .await
834 }
835
836 fn new(scope: &ScopeHandle) -> Option<Self> {
837 if scope.lock().acquire_cancel_guard_if_not_finished() {
838 Some(Self(scope.clone()))
839 } else {
840 None
841 }
842 }
843}
844
845#[derive(Clone)]
851struct WeakScopeHandle {
852 inner: Weak<ScopeInner>,
853}
854
855impl WeakScopeHandle {
856 pub fn upgrade(&self) -> Option<ScopeHandle> {
858 self.inner.upgrade().map(|inner| ScopeHandle { inner })
859 }
860}
861
862impl hash::Hash for WeakScopeHandle {
863 fn hash<H: hash::Hasher>(&self, state: &mut H) {
864 Weak::as_ptr(&self.inner).hash(state);
865 }
866}
867
868impl PartialEq for WeakScopeHandle {
869 fn eq(&self, other: &Self) -> bool {
870 Weak::ptr_eq(&self.inner, &other.inner)
871 }
872}
873
874impl Eq for WeakScopeHandle {
875 }
878
879mod state {
882 use super::*;
883
884 pub struct ScopeState {
885 pub parent: Option<ScopeHandle>,
886 children: HashSet<WeakScopeHandle>,
888 all_tasks: HashSet<TaskHandle>,
889 subscopes_with_tasks: u32,
893 can_spawn: bool,
894 guards: u32,
895 status: Status,
896 pub results: Box<dyn Results>,
898 }
899
900 pub enum JoinResult {
901 Waker(Waker),
902 Result(TaskHandle),
903 }
904
905 #[repr(u8)] #[derive(Default, Debug, Clone, Copy)]
907 pub enum Status {
908 #[default]
909 Active,
911 PendingCancellation,
914 Finished,
917 }
918
919 impl Status {
920 pub fn is_cancelled(&self) -> bool {
922 match self {
923 Self::Active => false,
924 Self::PendingCancellation | Self::Finished => true,
925 }
926 }
927 }
928
929 impl ScopeState {
930 pub fn new_root(results: Box<impl Results>) -> Self {
931 Self {
932 parent: None,
933 children: Default::default(),
934 all_tasks: Default::default(),
935 subscopes_with_tasks: 0,
936 can_spawn: true,
937 guards: 0,
938 status: Default::default(),
939 results,
940 }
941 }
942
943 pub fn new_child(
944 parent_handle: ScopeHandle,
945 parent_state: &Self,
946 results: Box<impl Results>,
947 ) -> Self {
948 let (status, can_spawn) = match parent_state.status {
949 Status::Active => (Status::Active, parent_state.can_spawn),
950 Status::Finished | Status::PendingCancellation => (Status::Finished, false),
951 };
952 Self {
953 parent: Some(parent_handle),
954 children: Default::default(),
955 all_tasks: Default::default(),
956 subscopes_with_tasks: 0,
957 can_spawn,
958 guards: 0,
959 status,
960 results,
961 }
962 }
963 }
964
965 impl ScopeState {
966 pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
967 &self.all_tasks
968 }
969
970 pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
973 if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
974 return Some(task);
975 }
976 if self.all_tasks.is_empty() && !self.register_first_task() {
977 return Some(task);
978 }
979 task.wake();
980 assert!(self.all_tasks.insert(task));
981 None
982 }
983
984 pub fn children(&self) -> &HashSet<WeakScopeHandle> {
985 &self.children
986 }
987
988 pub fn insert_child(&mut self, child: WeakScopeHandle) {
989 self.children.insert(child);
990 }
991
992 pub fn remove_child(&mut self, child: &PtrKey) {
993 let found = self.children.remove(child);
994 assert!(found || self.children.is_empty());
997 }
998
999 pub fn status(&self) -> Status {
1000 self.status
1001 }
1002
1003 pub fn guards(&self) -> u32 {
1004 self.guards
1005 }
1006
1007 pub fn close(&mut self) {
1008 self.can_spawn = false;
1009 }
1010
1011 pub fn mark_finished(&mut self) {
1012 self.can_spawn = false;
1013 self.status = Status::Finished;
1014 }
1015
1016 pub fn has_tasks(&self) -> bool {
1017 self.subscopes_with_tasks > 0
1018 }
1019
1020 pub fn wake_all_with_active_guard(&mut self) {
1021 let mut count = 0;
1022 for task in &self.all_tasks {
1023 if task.wake_with_active_guard() {
1024 count += 1;
1025 }
1026 }
1027 self.acquire_cancel_guard(count);
1028 }
1029
1030 pub fn abort_tasks_and_mark_finished(&mut self) {
1031 for task in self.all_tasks() {
1032 if task.abort() {
1033 task.scope().executor().ready_tasks.push(task.clone());
1034 }
1035 }
1038 self.mark_finished();
1039 }
1040
1041 pub fn wake_wakers_and_mark_pending(
1042 this: &mut ConditionGuard<'_, ScopeState>,
1043 wakers: &mut Vec<Waker>,
1044 ) {
1045 wakers.extend(this.drain_wakers());
1046 this.status = Status::PendingCancellation;
1047 }
1048
1049 #[must_use]
1053 fn register_first_task(&mut self) -> bool {
1054 if !self.can_spawn {
1055 return false;
1056 }
1057 let can_spawn = match &self.parent {
1058 Some(parent) => {
1059 self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1062 }
1063 None => true,
1064 };
1065 if can_spawn {
1066 self.subscopes_with_tasks += 1;
1067 debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1068 };
1069 can_spawn
1070 }
1071
1072 fn on_last_task_removed(
1073 this: &mut ConditionGuard<'_, ScopeState>,
1074 num_wakers_hint: usize,
1075 wakers: &mut Vec<Waker>,
1076 ) {
1077 debug_assert!(this.subscopes_with_tasks > 0);
1078 this.subscopes_with_tasks -= 1;
1079 if this.subscopes_with_tasks > 0 {
1080 wakers.reserve(num_wakers_hint);
1081 return;
1082 }
1083
1084 match &this.parent {
1085 Some(parent) => {
1086 Self::on_last_task_removed(
1087 &mut parent.lock(),
1088 num_wakers_hint + this.waker_count(),
1089 wakers,
1090 );
1091 }
1092 None => wakers.reserve(num_wakers_hint),
1093 };
1094 wakers.extend(this.drain_wakers());
1095 }
1096
1097 pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1101 match self.status {
1102 Status::Active | Status::PendingCancellation => {
1103 self.acquire_cancel_guard(1);
1104 true
1105 }
1106 Status::Finished => false,
1107 }
1108 }
1109
1110 pub fn acquire_cancel_guard(&mut self, count: u32) {
1111 if count == 0 {
1112 return;
1113 }
1114 if self.guards == 0 {
1115 if let Some(parent) = self.parent.as_ref() {
1116 parent.acquire_cancel_guard();
1117 }
1118 }
1119 self.guards += count;
1120 }
1121
1122 pub fn release_cancel_guard(
1123 this: &mut ConditionGuard<'_, Self>,
1124 wake_vec: &mut WakeVec,
1125 mut waker_count: usize,
1126 ) {
1127 this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1128 if this.guards == 0 {
1129 waker_count += this.waker_count();
1130 this.on_zero_guards(wake_vec, waker_count);
1131 wake_vec.0.extend(this.drain_wakers())
1132 } else {
1133 wake_vec.0.reserve_exact(waker_count);
1134 }
1135 }
1136
1137 fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1138 match self.status {
1139 Status::Active => {}
1140 Status::PendingCancellation => {
1141 self.abort_tasks_and_mark_finished();
1142 }
1143 Status::Finished => {}
1146 }
1147 if let Some(parent) = &self.parent {
1148 ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1149 }
1150 }
1151 }
1152
1153 #[derive(Default)]
1154 pub struct WakeVec(Vec<Waker>);
1155
1156 impl Drop for WakeVec {
1157 fn drop(&mut self) {
1158 for waker in self.0.drain(..) {
1159 waker.wake();
1160 }
1161 }
1162 }
1163
1164 pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1166
1167 impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1168 fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1169 Self(value, WakeVec::default())
1170 }
1171 }
1172
1173 impl ScopeWaker<'_> {
1174 pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
1175 let task = self.all_tasks.take(&id);
1176 if task.is_some() {
1177 self.on_task_removed(0);
1178 }
1179 task
1180 }
1181
1182 pub fn task_did_finish(&mut self, id: usize) {
1183 if let Some(task) = self.all_tasks.take(&id) {
1184 self.on_task_removed(1);
1185 if !task.is_detached() {
1186 let maybe_waker = self.results.task_did_finish(task);
1187 self.1 .0.extend(maybe_waker);
1188 }
1189 }
1190 }
1191
1192 pub fn set_closed_and_drain(
1193 &mut self,
1194 ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1195 self.close();
1196 let all_tasks = std::mem::take(&mut self.all_tasks);
1197 let results = self.results.take();
1198 if !all_tasks.is_empty() {
1199 self.on_task_removed(0)
1200 }
1201 let children = self.children.drain();
1202 (all_tasks, results, children)
1203 }
1204
1205 fn on_task_removed(&mut self, num_wakers_hint: usize) {
1206 if self.all_tasks.is_empty() {
1207 ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1 .0)
1208 }
1209 }
1210
1211 pub fn wake_wakers_and_mark_pending(&mut self) {
1212 let Self(state, wakers) = self;
1213 ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1214 }
1215 }
1216
1217 impl<'a> Deref for ScopeWaker<'a> {
1218 type Target = ConditionGuard<'a, ScopeState>;
1219
1220 fn deref(&self) -> &Self::Target {
1221 &self.0
1222 }
1223 }
1224
1225 impl DerefMut for ScopeWaker<'_> {
1226 fn deref_mut(&mut self) -> &mut Self::Target {
1227 &mut self.0
1228 }
1229 }
1230}
1231
1232struct ScopeInner {
1233 executor: Arc<Executor>,
1234 state: Condition<ScopeState>,
1235 name: String,
1236 instrument_data: Option<Box<dyn Any + Send + Sync>>,
1237}
1238
1239impl Drop for ScopeInner {
1240 fn drop(&mut self) {
1241 let key = unsafe { &*(self as *const _ as *const PtrKey) };
1246 let state = self.state.lock();
1247 if let Some(parent) = &state.parent {
1248 let mut wake_vec = WakeVec::default();
1249 let mut parent_state = parent.lock();
1250 if state.guards() != 0 {
1251 ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1252 }
1253 parent_state.remove_child(key);
1254 }
1255 }
1256}
1257
1258impl ScopeHandle {
1259 fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1260 super::common::TaskHandle::with_current(|task| match task {
1261 Some(task) => f(task.scope()),
1262 None => f(EHandle::local().global_scope()),
1263 })
1264 }
1265
1266 fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1267 self.inner.state.lock()
1268 }
1269
1270 fn downgrade(&self) -> WeakScopeHandle {
1271 WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1272 }
1273
1274 #[inline(always)]
1275 pub(crate) fn executor(&self) -> &Arc<Executor> {
1276 &self.inner.executor
1277 }
1278
1279 pub(crate) fn detach(&self, task_id: usize) {
1281 let _maybe_task = {
1282 let mut state = self.lock();
1283 if let Some(task) = state.all_tasks().get(&task_id) {
1284 task.detach();
1285 }
1286 state.results.detach(task_id)
1287 };
1288 }
1289
1290 pub(crate) unsafe fn abort_task<R>(&self, task_id: usize) -> Option<R> {
1296 let mut state = self.lock();
1297 if let Some(task) = state.results.detach(task_id) {
1298 drop(state);
1299 return task.take_result();
1300 }
1301 state.all_tasks().get(&task_id).and_then(|task| {
1302 if task.abort() {
1303 self.inner.executor.ready_tasks.push(task.clone());
1304 }
1305 task.take_result()
1306 })
1307 }
1308
1309 pub(crate) fn abort_and_detach(&self, task_id: usize) {
1311 let _tasks = {
1312 let mut state = ScopeWaker::from(self.lock());
1313 let maybe_task1 = state.results.detach(task_id);
1314 let mut maybe_task2 = None;
1315 if let Some(task) = state.all_tasks().get(&task_id) {
1316 match task.abort_and_detach() {
1317 AbortAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1318 AbortAndDetachResult::AddToRunQueue => {
1319 self.inner.executor.ready_tasks.push(task.clone());
1320 }
1321 AbortAndDetachResult::Pending => {}
1322 }
1323 }
1324 (maybe_task1, maybe_task2)
1325 };
1326 }
1327
1328 pub(crate) unsafe fn poll_join_result<R>(
1334 &self,
1335 task_id: usize,
1336 cx: &mut Context<'_>,
1337 ) -> Poll<R> {
1338 let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1339 match task.take_result() {
1340 Some(result) => Poll::Ready(result),
1341 None => {
1342 Poll::Pending
1344 }
1345 }
1346 }
1347
1348 pub(crate) unsafe fn poll_aborted<R>(
1350 &self,
1351 task_id: usize,
1352 cx: &mut Context<'_>,
1353 ) -> Poll<Option<R>> {
1354 let task = self.lock().results.poll_join_result(task_id, cx);
1355 task.map(|task| task.take_result())
1356 }
1357
1358 pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1359 let returned_task = self.lock().insert_task(task, for_stream);
1360 returned_task.is_none()
1361 }
1362
1363 pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1374 let mut state = ScopeWaker::from(self.lock());
1375 let task = state.take_task(task_id);
1376 if let Some(task) = task {
1377 task.drop_future_unchecked();
1378 }
1379 }
1380
1381 pub(super) fn task_did_finish(&self, id: usize) {
1382 let mut state = ScopeWaker::from(self.lock());
1383 state.task_did_finish(id);
1384 }
1385
1386 fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1389 let mut scopes = vec![self.clone()];
1390 while let Some(scope) = scopes.pop() {
1391 let mut scope_waker = ScopeWaker::from(scope.lock());
1392 if callback(&mut scope_waker) {
1393 scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1394 }
1395 }
1396 }
1397
1398 fn acquire_cancel_guard(&self) {
1399 self.lock().acquire_cancel_guard(1)
1400 }
1401
1402 pub(crate) fn release_cancel_guard(&self) {
1403 let mut wake_vec = WakeVec::default();
1404 ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1405 }
1406
1407 fn cancel_all_tasks(&self) {
1409 self.visit_scopes_locked(|state| {
1410 match state.status() {
1411 Status::Active => {
1412 if state.guards() == 0 {
1413 state.abort_tasks_and_mark_finished();
1414 } else {
1415 state.wake_wakers_and_mark_pending();
1416 }
1417 true
1418 }
1419 Status::PendingCancellation => {
1420 true
1424 }
1425 Status::Finished => {
1426 false
1428 }
1429 }
1430 });
1431 }
1432
1433 fn abort_all_tasks(&self) {
1435 self.visit_scopes_locked(|state| match state.status() {
1436 Status::Active | Status::PendingCancellation => {
1437 state.abort_tasks_and_mark_finished();
1438 true
1439 }
1440 Status::Finished => false,
1441 });
1442 }
1443
1444 pub(super) fn drop_all_tasks(&self) {
1451 let mut scopes = vec![self.clone()];
1452 while let Some(scope) = scopes.pop() {
1453 let (tasks, join_results) = {
1454 let mut state = ScopeWaker::from(scope.lock());
1455 let (tasks, join_results, children) = state.set_closed_and_drain();
1456 scopes.extend(children.filter_map(|child| child.upgrade()));
1457 (tasks, join_results)
1458 };
1459 for task in tasks {
1461 task.try_drop().expect("Expected drop to succeed");
1462 }
1463 std::mem::drop(join_results);
1464 }
1465 }
1466}
1467
1468#[repr(transparent)]
1470struct PtrKey;
1471
1472impl Borrow<PtrKey> for WeakScopeHandle {
1473 fn borrow(&self) -> &PtrKey {
1474 unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1476 }
1477}
1478
1479impl PartialEq for PtrKey {
1480 fn eq(&self, other: &Self) -> bool {
1481 std::ptr::eq(self, other)
1482 }
1483}
1484
1485impl Eq for PtrKey {}
1486
1487impl hash::Hash for PtrKey {
1488 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1489 (self as *const PtrKey).hash(state);
1490 }
1491}
1492
1493#[derive(Default)]
1494struct JoinResults(HashMap<usize, JoinResult>);
1495
1496trait Results: Send + Sync + 'static {
1497 fn can_spawn(&self) -> bool;
1499
1500 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1502
1503 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1505
1506 fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1508
1509 fn take(&mut self) -> Box<dyn Any>;
1511
1512 #[cfg(test)]
1514 fn is_empty(&self) -> bool;
1515}
1516
1517impl Results for JoinResults {
1518 fn can_spawn(&self) -> bool {
1519 true
1520 }
1521
1522 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1523 match self.0.entry(task_id) {
1524 Entry::Occupied(mut o) => match o.get_mut() {
1525 JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1526 JoinResult::Result(_) => {
1527 let JoinResult::Result(task) = o.remove() else { unreachable!() };
1528 return Poll::Ready(task);
1529 }
1530 },
1531 Entry::Vacant(v) => {
1532 v.insert(JoinResult::Waker(cx.waker().clone()));
1533 }
1534 }
1535 Poll::Pending
1536 }
1537
1538 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1539 match self.0.entry(task.id()) {
1540 Entry::Occupied(mut o) => {
1541 let JoinResult::Waker(waker) =
1542 std::mem::replace(o.get_mut(), JoinResult::Result(task))
1543 else {
1544 unreachable!()
1548 };
1549 Some(waker)
1550 }
1551 Entry::Vacant(v) => {
1552 v.insert(JoinResult::Result(task));
1553 None
1554 }
1555 }
1556 }
1557
1558 fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1559 match self.0.remove(&task_id) {
1560 Some(JoinResult::Result(task)) => Some(task),
1561 _ => None,
1562 }
1563 }
1564
1565 fn take(&mut self) -> Box<dyn Any> {
1566 Box::new(Self(std::mem::take(&mut self.0)))
1567 }
1568
1569 #[cfg(test)]
1570 fn is_empty(&self) -> bool {
1571 self.0.is_empty()
1572 }
1573}
1574
1575#[derive(Default)]
1576struct ResultsStream<R> {
1577 inner: Arc<Mutex<ResultsStreamInner<R>>>,
1578}
1579
1580struct ResultsStreamInner<R> {
1581 results: Vec<R>,
1582 waker: Option<Waker>,
1583}
1584
1585impl<R> Default for ResultsStreamInner<R> {
1586 fn default() -> Self {
1587 Self { results: Vec::new(), waker: None }
1588 }
1589}
1590
1591impl<R: Send + 'static> Results for ResultsStream<R> {
1592 fn can_spawn(&self) -> bool {
1593 false
1594 }
1595
1596 fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1597 Poll::Pending
1598 }
1599
1600 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1601 let mut inner = self.inner.lock();
1602 inner.results.extend(unsafe { task.take_result() });
1605 inner.waker.take()
1606 }
1607
1608 fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1609 None
1610 }
1611
1612 fn take(&mut self) -> Box<dyn Any> {
1613 Box::new(std::mem::take(&mut self.inner.lock().results))
1614 }
1615
1616 #[cfg(test)]
1617 fn is_empty(&self) -> bool {
1618 false
1619 }
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624 use super::*;
1628 use crate::{
1629 yield_now, EHandle, LocalExecutor, SendExecutorBuilder, SpawnableFuture, Task,
1630 TestExecutor, Timer,
1631 };
1632 use fuchsia_sync::{Condvar, Mutex};
1633 use futures::channel::mpsc;
1634 use futures::{FutureExt, StreamExt};
1635 use std::future::pending;
1636 use std::pin::{pin, Pin};
1637 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1638 use std::sync::Arc;
1639 use std::task::{Context, Poll};
1640 use std::time::Duration;
1641
1642 #[derive(Default)]
1643 struct RemoteControlFuture(Mutex<RCFState>);
1644 #[derive(Default)]
1645 struct RCFState {
1646 resolved: bool,
1647 waker: Option<Waker>,
1648 }
1649
1650 impl Future for &RemoteControlFuture {
1651 type Output = ();
1652 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1653 let mut this = self.0.lock();
1654 if this.resolved {
1655 Poll::Ready(())
1656 } else {
1657 this.waker.replace(cx.waker().clone());
1658 Poll::Pending
1659 }
1660 }
1661 }
1662
1663 impl RemoteControlFuture {
1664 fn new() -> Arc<Self> {
1665 Arc::new(Default::default())
1666 }
1667
1668 fn resolve(&self) {
1669 let mut this = self.0.lock();
1670 this.resolved = true;
1671 if let Some(waker) = this.waker.take() {
1672 waker.wake();
1673 }
1674 }
1675
1676 fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1677 let this = Arc::clone(self);
1678 #[allow(clippy::redundant_async_block)] async move {
1680 (&*this).await
1681 }
1682 }
1683 }
1684
1685 #[test]
1686 fn compute_works_on_root_scope() {
1687 let mut executor = TestExecutor::new();
1688 let scope = executor.global_scope();
1689 let mut task = pin!(scope.compute(async { 1 }));
1690 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1691 }
1692
1693 #[test]
1694 fn compute_works_on_new_child() {
1695 let mut executor = TestExecutor::new();
1696 let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1697 let mut task = pin!(scope.compute(async { 1 }));
1698 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1699 }
1700
1701 #[test]
1702 fn scope_drop_cancels_tasks() {
1703 let mut executor = TestExecutor::new();
1704 let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1705 let mut task = pin!(scope.compute(async { 1 }));
1706 drop(scope);
1707 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1708 }
1709
1710 #[test]
1711 fn tasks_do_not_spawn_on_cancelled_scopes() {
1712 let mut executor = TestExecutor::new();
1713 let scope =
1714 executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1715 let handle = scope.to_handle();
1716 let mut cancel = pin!(scope.cancel());
1717 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1718 let mut task = pin!(handle.compute(async { 1 }));
1719 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1720 }
1721
1722 #[test]
1723 fn tasks_do_not_spawn_on_closed_empty_scopes() {
1724 let mut executor = TestExecutor::new();
1725 let scope =
1726 executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1727 let handle = scope.to_handle();
1728 let mut close = pin!(scope.cancel());
1729 assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1730 let mut task = pin!(handle.compute(async { 1 }));
1731 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1732 }
1733
1734 #[test]
1735 fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1736 let mut executor = TestExecutor::new();
1737 let scope = executor.global_scope().new_child();
1738 let handle = scope.to_handle();
1739 handle.spawn(pending());
1740 let mut close = pin!(scope.close());
1741 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1742 let mut task = pin!(handle.compute(async { 1 }));
1743 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1744 }
1745
1746 #[test]
1747 fn spawn_works_on_child_and_grandchild() {
1748 let mut executor = TestExecutor::new();
1749 let scope = executor.global_scope().new_child();
1750 let child = scope.new_child();
1751 let grandchild = child.new_child();
1752 let mut child_task = pin!(child.compute(async { 1 }));
1753 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1754 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1755 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1756 }
1757
1758 #[test]
1759 fn spawn_drop_cancels_child_and_grandchild_tasks() {
1760 let mut executor = TestExecutor::new();
1761 let scope = executor.global_scope().new_child();
1762 let child = scope.new_child();
1763 let grandchild = child.new_child();
1764 let mut child_task = pin!(child.compute(async { 1 }));
1765 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1766 drop(scope);
1767 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1768 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1769 }
1770
1771 #[test]
1772 fn completed_tasks_are_cleaned_up_after_cancel() {
1773 let mut executor = TestExecutor::new();
1774 let scope = executor.global_scope().new_child();
1775
1776 let task1 = scope.spawn(pending::<()>());
1777 let task2 = scope.spawn(async {});
1778 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1779 assert_eq!(scope.lock().all_tasks().len(), 1);
1780
1781 assert_eq!(task1.abort().now_or_never(), None);
1784 assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1785
1786 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1787 assert_eq!(scope.lock().all_tasks().len(), 0);
1788 assert!(scope.lock().results.is_empty());
1789 }
1790
1791 #[test]
1792 fn join_emtpy_scope() {
1793 let mut executor = TestExecutor::new();
1794 let scope = executor.global_scope().new_child();
1795 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1796 }
1797
1798 #[test]
1799 fn task_handle_preserves_access_to_result_after_join_begins() {
1800 let mut executor = TestExecutor::new();
1801 let scope = executor.global_scope().new_child();
1802 let mut task = scope.compute(async { 1 });
1803 scope.spawn(async {});
1804 let task2 = scope.spawn(pending::<()>());
1805 let mut join = pin!(scope.join().fuse());
1808 let _ = executor.run_until_stalled(&mut join);
1809 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1810 drop(task2.abort());
1811 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1812 }
1813
1814 #[test]
1815 fn join_blocks_until_task_is_cancelled() {
1816 let mut executor = TestExecutor::new();
1819 let scope = executor.global_scope().new_child();
1820 let outstanding_task = scope.spawn(pending::<()>());
1821 let cancelled_task = scope.spawn(pending::<()>());
1822 assert_eq!(
1823 executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1824 Poll::Ready(None)
1825 );
1826 let mut join = pin!(scope.join());
1827 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1828 assert_eq!(
1829 executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1830 Poll::Ready(None)
1831 );
1832 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1833 }
1834
1835 #[test]
1836 fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1837 let mut executor = TestExecutor::new();
1838 let scope = executor.global_scope().new_child();
1839 scope.spawn(pending::<()>());
1841 let mut join = pin!(scope.join());
1842 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1843 let mut cancel = pin!(join.cancel());
1844 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1845 }
1846
1847 #[test]
1848 fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1849 let mut executor = TestExecutor::new();
1850 let scope = executor.global_scope().new_child();
1851 scope.spawn(pending::<()>());
1853 let mut close = pin!(scope.close());
1854 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1855 let mut cancel = pin!(close.cancel());
1856 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1857 }
1858
1859 #[test]
1860 fn join_scope_blocks_until_spawned_task_completes() {
1861 let mut executor = TestExecutor::new();
1862 let scope = executor.global_scope().new_child();
1863 let remote = RemoteControlFuture::new();
1864 let mut task = scope.spawn(remote.as_future());
1865 let mut scope_join = pin!(scope.join());
1866 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1867 remote.resolve();
1868 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1869 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1870 }
1871
1872 #[test]
1873 fn close_scope_blocks_until_spawned_task_completes() {
1874 let mut executor = TestExecutor::new();
1875 let scope = executor.global_scope().new_child();
1876 let remote = RemoteControlFuture::new();
1877 let mut task = scope.spawn(remote.as_future());
1878 let mut scope_close = pin!(scope.close());
1879 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1880 remote.resolve();
1881 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1882 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1883 }
1884
1885 #[test]
1886 fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1887 let mut executor = TestExecutor::new();
1888 let scope = executor.global_scope().new_child();
1889 let child = scope.new_child();
1890 let remote = RemoteControlFuture::new();
1891 child.spawn(remote.as_future());
1892 let mut scope_join = pin!(scope.join());
1893 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1894 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1895 child.detach();
1896 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1897 remote.resolve();
1898 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1899 }
1900
1901 #[test]
1902 fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1903 let mut executor = TestExecutor::new();
1904 let scope = executor.global_scope().new_child();
1905 let remote = RemoteControlFuture::new();
1906 {
1907 let remote = remote.clone();
1908 scope.spawn(async move {
1909 let child = Scope::new_with_name("child");
1910 child.spawn(async move {
1911 Scope::current().spawn(remote.as_future());
1912 });
1913 child.detach();
1914 });
1915 }
1916 let mut scope_join = pin!(scope.join());
1917 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1918 remote.resolve();
1919 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1920 }
1921
1922 #[test]
1923 fn join_scope_blocks_when_blocked_child_is_detached() {
1924 let mut executor = TestExecutor::new();
1925 let scope = executor.global_scope().new_child();
1926 let child = scope.new_child();
1927 child.spawn(pending());
1928 let mut scope_join = pin!(scope.join());
1929 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1930 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1931 child.detach();
1932 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1933 }
1934
1935 #[test]
1936 fn join_scope_completes_when_blocked_child_is_cancelled() {
1937 let mut executor = TestExecutor::new();
1938 let scope = executor.global_scope().new_child();
1939 let child = scope.new_child();
1940 child.spawn(pending());
1941 let mut scope_join = pin!(scope.join());
1942 {
1943 let mut child_join = pin!(child.join());
1944 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1945 assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1946 }
1947 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1948 }
1949
1950 #[test]
1951 fn detached_scope_can_spawn() {
1952 let mut executor = TestExecutor::new();
1953 let scope = executor.global_scope().new_child();
1954 let handle = scope.to_handle();
1955 scope.detach();
1956 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1957 }
1958
1959 #[test]
1960 fn dropped_scope_cannot_spawn() {
1961 let mut executor = TestExecutor::new();
1962 let scope = executor.global_scope().new_child();
1963 let handle = scope.to_handle();
1964 drop(scope);
1965 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1966 }
1967
1968 #[test]
1969 fn dropped_scope_with_running_task_cannot_spawn() {
1970 let mut executor = TestExecutor::new();
1971 let scope = executor.global_scope().new_child();
1972 let handle = scope.to_handle();
1973 let _running_task = handle.spawn(pending::<()>());
1974 drop(scope);
1975 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1976 }
1977
1978 #[test]
1979 fn joined_scope_cannot_spawn() {
1980 let mut executor = TestExecutor::new();
1981 let scope = executor.global_scope().new_child();
1982 let handle = scope.to_handle();
1983 let mut scope_join = pin!(scope.join());
1984 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1985 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1986 }
1987
1988 #[test]
1989 fn joining_scope_with_running_task_can_spawn() {
1990 let mut executor = TestExecutor::new();
1991 let scope = executor.global_scope().new_child();
1992 let handle = scope.to_handle();
1993 let _running_task = handle.spawn(pending::<()>());
1994 let mut scope_join = pin!(scope.join());
1995 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1996 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1997 }
1998
1999 #[test]
2000 fn joined_scope_child_cannot_spawn() {
2001 let mut executor = TestExecutor::new();
2002 let scope = executor.global_scope().new_child();
2003 let handle = scope.to_handle();
2004 let child_before_join = scope.new_child();
2005 assert_eq!(
2006 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2007 Poll::Ready(1)
2008 );
2009 let mut scope_join = pin!(scope.join());
2010 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2011 let child_after_join = handle.new_child();
2012 let grandchild_after_join = child_before_join.new_child();
2013 assert_eq!(
2014 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2015 Poll::Pending
2016 );
2017 assert_eq!(
2018 executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
2019 Poll::Pending
2020 );
2021 assert_eq!(
2022 executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
2023 Poll::Pending
2024 );
2025 }
2026
2027 #[test]
2028 fn closed_scope_child_cannot_spawn() {
2029 let mut executor = TestExecutor::new();
2030 let scope = executor.global_scope().new_child();
2031 let handle = scope.to_handle();
2032 let child_before_close = scope.new_child();
2033 assert_eq!(
2034 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2035 Poll::Ready(1)
2036 );
2037 let mut scope_close = pin!(scope.close());
2038 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2039 let child_after_close = handle.new_child();
2040 let grandchild_after_close = child_before_close.new_child();
2041 assert_eq!(
2042 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2043 Poll::Pending
2044 );
2045 assert_eq!(
2046 executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2047 Poll::Pending
2048 );
2049 assert_eq!(
2050 executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2051 Poll::Pending
2052 );
2053 }
2054
2055 #[test]
2056 fn can_join_child_first() {
2057 let mut executor = TestExecutor::new();
2058 let scope = executor.global_scope().new_child();
2059 let child = scope.new_child();
2060 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2061 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2062 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2063 }
2064
2065 #[test]
2066 fn can_join_parent_first() {
2067 let mut executor = TestExecutor::new();
2068 let scope = executor.global_scope().new_child();
2069 let child = scope.new_child();
2070 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2071 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2072 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2073 }
2074
2075 #[test]
2076 fn task_in_parent_scope_can_join_child() {
2077 let mut executor = TestExecutor::new();
2078 let scope = executor.global_scope().new_child();
2079 let child = scope.new_child();
2080 let remote = RemoteControlFuture::new();
2081 child.spawn(remote.as_future());
2082 scope.spawn(async move { child.join().await });
2083 let mut join = pin!(scope.join());
2084 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2085 remote.resolve();
2086 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2087 }
2088
2089 #[test]
2090 fn join_completes_while_completed_task_handle_is_held() {
2091 let mut executor = TestExecutor::new();
2092 let scope = executor.global_scope().new_child();
2093 let mut task = scope.compute(async { 1 });
2094 scope.spawn(async {});
2095 let mut join = pin!(scope.join());
2096 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2097 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2098 }
2099
2100 #[test]
2101 fn cancel_completes_while_task_holds_handle() {
2102 let mut executor = TestExecutor::new();
2103 let scope = executor.global_scope().new_child();
2104 let handle = scope.to_handle();
2105 let mut task = scope.compute(async move {
2106 loop {
2107 pending::<()>().await; handle.spawn(async {});
2109 }
2110 });
2111
2112 let mut join = pin!(scope.join());
2114 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2115
2116 let mut cancel = pin!(join.cancel());
2117 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2118 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2119 }
2120
2121 #[test]
2122 fn cancel_from_handle_inside_task() {
2123 let mut executor = TestExecutor::new();
2124 let scope = executor.global_scope().new_child();
2125 {
2126 scope.spawn(pending::<()>());
2128
2129 let mut no_tasks = pin!(scope.on_no_tasks());
2130 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2131
2132 let handle = scope.to_handle();
2133 scope.spawn(async move {
2134 handle.cancel().await;
2135 panic!("cancel() should never complete");
2136 });
2137
2138 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2139 }
2140 assert_eq!(scope.join().now_or_never(), Some(()));
2141 }
2142
2143 #[test]
2144 fn can_spawn_from_non_executor_thread() {
2145 let mut executor = TestExecutor::new();
2146 let scope = executor.global_scope().clone();
2147 let done = Arc::new(AtomicBool::new(false));
2148 let done_clone = done.clone();
2149 let _ = std::thread::spawn(move || {
2150 scope.spawn(async move {
2151 done_clone.store(true, Ordering::Relaxed);
2152 })
2153 })
2154 .join();
2155 let _ = executor.run_until_stalled(&mut pending::<()>());
2156 assert!(done.load(Ordering::Relaxed));
2157 }
2158
2159 #[test]
2160 fn scope_tree() {
2161 let mut executor = TestExecutor::new();
2167 let a = executor.global_scope().new_child();
2168 let b = a.new_child();
2169 let c = b.new_child();
2170 let d = b.new_child();
2171 let a_remote = RemoteControlFuture::new();
2172 let c_remote = RemoteControlFuture::new();
2173 let d_remote = RemoteControlFuture::new();
2174 a.spawn(a_remote.as_future());
2175 c.spawn(c_remote.as_future());
2176 d.spawn(d_remote.as_future());
2177 let mut a_join = pin!(a.join());
2178 let mut b_join = pin!(b.join());
2179 let mut d_join = pin!(d.join());
2180 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2181 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2182 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2183 d_remote.resolve();
2184 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2185 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2186 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2187 c_remote.resolve();
2188 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2189 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2190 a_remote.resolve();
2191 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2192 let mut c_join = pin!(c.join());
2193 assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2194 }
2195
2196 #[test]
2197 fn wake_all_with_active_guard_on_send_executor() {
2198 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2199 let scope = executor.root_scope().new_child();
2200
2201 let (tx, mut rx) = mpsc::unbounded();
2202 let state = Arc::new(AtomicU64::new(0));
2204
2205 struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2206
2207 impl Future for PollCounter {
2208 type Output = ();
2209 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2210 let old = self.0.fetch_add(1, Ordering::Relaxed);
2211 if old >> 32 == (old + 1) & u32::MAX as u64 {
2212 let _ = self.1.unbounded_send(());
2213 }
2214 Poll::Pending
2215 }
2216 }
2217
2218 scope.spawn(PollCounter(state.clone(), tx.clone()));
2219 scope.spawn(PollCounter(state.clone(), tx.clone()));
2220
2221 executor.run(async move {
2222 let mut wait_for_poll_count = async |count| {
2223 let old = state.fetch_or(count << 32, Ordering::Relaxed);
2224 if old & u32::MAX as u64 != count {
2225 rx.next().await.unwrap();
2226 }
2227 state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2228 };
2229
2230 wait_for_poll_count(2).await;
2232
2233 let mut start_count = 2;
2234 for _ in 0..2 {
2235 scope.wake_all_with_active_guard();
2236
2237 wait_for_poll_count(start_count + 2).await;
2238 start_count += 2;
2239 }
2240
2241 scope.wake_all_with_active_guard();
2243 let done = scope.cancel();
2244
2245 wait_for_poll_count(start_count + 2).await;
2246
2247 done.await;
2248 });
2249 }
2250
2251 #[test]
2252 fn on_no_tasks_race() {
2253 fn sleep_random() {
2254 std::thread::sleep(std::time::Duration::from_micros(rand::random_range(0..10)));
2255 }
2256 for _ in 0..2000 {
2257 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2258 let scope = executor.root_scope().new_child();
2259 scope.spawn(async {
2260 sleep_random();
2261 });
2262 executor.run(async move {
2263 sleep_random();
2264 scope.on_no_tasks().await;
2265 });
2266 }
2267 }
2268
2269 #[test]
2270 fn test_detach() {
2271 let mut e = LocalExecutor::new();
2272 e.run_singlethreaded(async {
2273 let counter = Arc::new(AtomicU32::new(0));
2274
2275 {
2276 let counter = counter.clone();
2277 Task::spawn(async move {
2278 for _ in 0..5 {
2279 yield_now().await;
2280 counter.fetch_add(1, Ordering::Relaxed);
2281 }
2282 })
2283 .detach();
2284 }
2285
2286 while counter.load(Ordering::Relaxed) != 5 {
2287 yield_now().await;
2288 }
2289 });
2290
2291 assert!(e.ehandle.root_scope.lock().results.is_empty());
2292 }
2293
2294 #[test]
2295 fn test_cancel() {
2296 let mut e = LocalExecutor::new();
2297 e.run_singlethreaded(async {
2298 let ref_count = Arc::new(());
2299 {
2301 let ref_count = ref_count.clone();
2302 drop(Task::spawn(async move {
2303 let _ref_count = ref_count;
2304 let _: () = std::future::pending().await;
2305 }));
2306 }
2307
2308 while Arc::strong_count(&ref_count) != 1 {
2309 yield_now().await;
2310 }
2311
2312 let task = {
2314 let ref_count = ref_count.clone();
2315 Task::spawn(async move {
2316 let _ref_count = ref_count;
2317 let _: () = std::future::pending().await;
2318 })
2319 };
2320
2321 assert_eq!(task.abort().await, None);
2322 while Arc::strong_count(&ref_count) != 1 {
2323 yield_now().await;
2324 }
2325
2326 let task = {
2328 let ref_count = ref_count.clone();
2329 Task::spawn(async move {
2330 let _ref_count = ref_count;
2331 })
2332 };
2333
2334 while Arc::strong_count(&ref_count) != 1 {
2336 yield_now().await;
2337 }
2338
2339 assert_eq!(task.abort().await, Some(()));
2340 });
2341
2342 assert!(e.ehandle.root_scope.lock().results.is_empty());
2343 }
2344
2345 #[test]
2346 fn test_cancel_waits() {
2347 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2348 let running = Arc::new((Mutex::new(false), Condvar::new()));
2349 let task = {
2350 let running = running.clone();
2351 executor.root_scope().compute(async move {
2352 *running.0.lock() = true;
2353 running.1.notify_all();
2354 std::thread::sleep(std::time::Duration::from_millis(10));
2355 *running.0.lock() = false;
2356 "foo"
2357 })
2358 };
2359 executor.run(async move {
2360 {
2361 let mut guard = running.0.lock();
2362 while !*guard {
2363 running.1.wait(&mut guard);
2364 }
2365 }
2366 assert_eq!(task.abort().await, Some("foo"));
2367 assert!(!*running.0.lock());
2368 });
2369 }
2370
2371 fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2372 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2373 let running = Arc::new((Mutex::new(false), Condvar::new()));
2374 let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2375 let task = {
2376 let running = running.clone();
2377 let can_quit = can_quit.clone();
2378 executor.root_scope().compute(async move {
2379 *running.0.lock() = true;
2380 running.1.notify_all();
2381 {
2382 let mut guard = can_quit.0.lock();
2383 while !*guard {
2384 can_quit.1.wait(&mut guard);
2385 }
2386 }
2387 *running.0.lock() = false;
2388 })
2389 };
2390 executor.run(async move {
2391 {
2392 let mut guard = running.0.lock();
2393 while !*guard {
2394 running.1.wait(&mut guard);
2395 }
2396 }
2397
2398 callback(task);
2399
2400 *can_quit.0.lock() = true;
2401 can_quit.1.notify_all();
2402
2403 let ehandle = EHandle::local();
2404 let scope = ehandle.global_scope();
2405
2406 while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2408 Timer::new(std::time::Duration::from_millis(1)).await;
2409 }
2410
2411 assert!(!*running.0.lock());
2412 });
2413 }
2414
2415 #[test]
2416 fn test_dropped_cancel_cleans_up() {
2417 test_clean_up(|task| {
2418 let abort_fut = std::pin::pin!(task.abort());
2419 let waker = futures::task::noop_waker();
2420 assert!(abort_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2421 });
2422 }
2423
2424 #[test]
2425 fn test_dropped_task_cleans_up() {
2426 test_clean_up(|task| {
2427 std::mem::drop(task);
2428 });
2429 }
2430
2431 #[test]
2432 fn test_detach_cleans_up() {
2433 test_clean_up(|task| {
2434 task.detach();
2435 });
2436 }
2437
2438 #[test]
2439 fn test_scope_stream() {
2440 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2441 executor.run(async move {
2442 let (stream, handle) = ScopeStream::new();
2443 handle.push(async { 1 });
2444 handle.push(async { 2 });
2445 stream.close();
2446 let results: HashSet<_> = stream.collect().await;
2447 assert_eq!(results, HashSet::from_iter([1, 2]));
2448 });
2449 }
2450
2451 #[test]
2452 fn test_scope_stream_wakes_properly() {
2453 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2454 executor.run(async move {
2455 let (stream, handle) = ScopeStream::new();
2456 handle.push(async {
2457 Timer::new(Duration::from_millis(10)).await;
2458 1
2459 });
2460 handle.push(async {
2461 Timer::new(Duration::from_millis(10)).await;
2462 2
2463 });
2464 stream.close();
2465 let results: HashSet<_> = stream.collect().await;
2466 assert_eq!(results, HashSet::from_iter([1, 2]));
2467 });
2468 }
2469
2470 #[test]
2471 fn test_scope_stream_drops_spawned_tasks() {
2472 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2473 executor.run(async move {
2474 let (stream, handle) = ScopeStream::new();
2475 handle.push(async { 1 });
2476 let _task = stream.compute(async { "foo" });
2477 stream.close();
2478 let results: HashSet<_> = stream.collect().await;
2479 assert_eq!(results, HashSet::from_iter([1]));
2480 });
2481 }
2482
2483 #[test]
2484 fn test_nested_scope_stream() {
2485 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2486 executor.run(async move {
2487 let (mut stream, handle) = ScopeStream::new();
2488 handle.clone().push(async move {
2489 handle.clone().push(async move {
2490 handle.clone().push(async move { 3 });
2491 2
2492 });
2493 1
2494 });
2495 let mut results = HashSet::default();
2496 while let Some(item) = stream.next().await {
2497 results.insert(item);
2498 if results.len() == 3 {
2499 stream.close();
2500 }
2501 }
2502 assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2503 });
2504 }
2505
2506 #[test]
2507 fn test_dropping_scope_stream_cancels_all_tasks() {
2508 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2509 executor.run(async move {
2510 let (stream, handle) = ScopeStream::new();
2511 let (tx1, mut rx) = mpsc::unbounded::<()>();
2512 let tx2 = tx1.clone();
2513 handle.push(async move {
2514 let _tx1 = tx1;
2515 let () = pending().await;
2516 });
2517 handle.push(async move {
2518 let _tx2 = tx2;
2519 let () = pending().await;
2520 });
2521 drop(stream);
2522
2523 assert_eq!(rx.next().await, None);
2525 });
2526 }
2527
2528 #[test]
2529 fn test_scope_stream_collect() {
2530 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2531 executor.run(async move {
2532 let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2533 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2534
2535 let stream: ScopeStream<_> =
2536 (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2537 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2538 });
2539 }
2540
2541 struct DropSignal(Arc<AtomicBool>);
2542
2543 impl Drop for DropSignal {
2544 fn drop(&mut self) {
2545 self.0.store(true, Ordering::SeqCst);
2546 }
2547 }
2548
2549 struct DropChecker(Arc<AtomicBool>);
2550
2551 impl DropChecker {
2552 fn new() -> (Self, DropSignal) {
2553 let inner = Arc::new(AtomicBool::new(false));
2554 (Self(inner.clone()), DropSignal(inner))
2555 }
2556
2557 fn is_dropped(&self) -> bool {
2558 self.0.load(Ordering::SeqCst)
2559 }
2560 }
2561
2562 #[test]
2563 fn child_finished_when_parent_pending() {
2564 let mut executor = LocalExecutor::new();
2565 executor.run_singlethreaded(async {
2566 let scope = Scope::new();
2567 let _guard = scope.active_guard().expect("acquire guard");
2568 let cancel = scope.to_handle().cancel();
2569 let child = scope.new_child();
2570 let (checker, signal) = DropChecker::new();
2571 child.spawn(async move {
2572 let _signal = signal;
2573 futures::future::pending::<()>().await
2574 });
2575 assert!(checker.is_dropped());
2576 assert!(child.active_guard().is_none());
2577 cancel.await;
2578 })
2579 }
2580
2581 #[test]
2582 fn guarded_scopes_observe_closed() {
2583 let mut executor = LocalExecutor::new();
2584 executor.run_singlethreaded(async {
2585 let scope = Scope::new();
2586 let handle = scope.to_handle();
2587 let _guard = scope.active_guard().expect("acquire guard");
2588 handle.close();
2589 let (checker, signal) = DropChecker::new();
2590 handle.spawn(async move {
2591 let _signal = signal;
2592 futures::future::pending::<()>().await
2593 });
2594 assert!(checker.is_dropped());
2595 let (checker, signal) = DropChecker::new();
2596 let cancel = handle.clone().cancel();
2597 handle.spawn(async move {
2598 let _signal = signal;
2599 futures::future::pending::<()>().await
2600 });
2601 assert!(checker.is_dropped());
2602 scope.join().await;
2603 cancel.await;
2604 })
2605 }
2606
2607 #[test]
2608 fn child_guard_holds_parent_cancellation() {
2609 let mut executor = TestExecutor::new();
2610 let scope = executor.global_scope().new_child();
2611 let child = scope.new_child();
2612 let guard = child.active_guard().expect("acquire guard");
2613 scope.spawn(futures::future::pending());
2614 let mut join = pin!(scope.cancel());
2615 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2616 drop(guard);
2617 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2618 }
2619
2620 #[test]
2621 fn active_guard_on_cancel() {
2622 let mut executor = TestExecutor::new();
2623 let scope = executor.global_scope().new_child();
2624 let child1 = scope.new_child();
2625 let child2 = scope.new_child();
2626 let guard = child1.active_guard().expect("acquire guard");
2627 let guard_for_right_scope = guard.clone();
2628 let guard_for_wrong_scope = guard.clone();
2629 child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2630 child2.spawn(async move {
2631 guard_for_wrong_scope.on_cancel().await;
2632 });
2633
2634 let handle = scope.to_handle();
2635 let mut join = pin!(scope.join());
2636 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2637 let cancel: Join<_> = handle.cancel();
2638 drop(cancel);
2639 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2640 }
2641
2642 #[test]
2643 fn abort_join() {
2644 let mut executor = TestExecutor::new();
2645 let scope = executor.global_scope().new_child();
2646 let child = scope.new_child();
2647 let _guard = child.active_guard().expect("acquire guard");
2648
2649 let (checker1, signal) = DropChecker::new();
2650 scope.spawn(async move {
2651 let _signal = signal;
2652 futures::future::pending::<()>().await
2653 });
2654 let (checker2, signal) = DropChecker::new();
2655 scope.spawn(async move {
2656 let _signal = signal;
2657 futures::future::pending::<()>().await
2658 });
2659
2660 let mut join = pin!(scope.cancel());
2661 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2662 assert!(!checker1.is_dropped());
2663 assert!(!checker2.is_dropped());
2664
2665 let mut join = join.abort();
2666 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2667 assert!(checker1.is_dropped());
2668 assert!(checker2.is_dropped());
2669 }
2670
2671 #[test]
2672 fn child_without_guard_aborts_immediately_on_cancel() {
2673 let mut executor = TestExecutor::new();
2674 let scope = executor.global_scope().new_child();
2675 let child = scope.new_child();
2676 let guard = scope.active_guard().expect("acquire guard");
2677
2678 let (checker_scope, signal) = DropChecker::new();
2679 scope.spawn(async move {
2680 let _signal = signal;
2681 futures::future::pending::<()>().await
2682 });
2683 let (checker_child, signal) = DropChecker::new();
2684 child.spawn(async move {
2685 let _signal = signal;
2686 futures::future::pending::<()>().await
2687 });
2688
2689 let mut join = pin!(scope.cancel());
2690 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2691 assert!(!checker_scope.is_dropped());
2692 assert!(checker_child.is_dropped());
2693
2694 drop(guard);
2695 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2696 assert!(checker_child.is_dropped());
2697 }
2698}