1use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::condition::{Condition, ConditionGuard, WakerEntry};
9use crate::EHandle;
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{ready, Context, Poll, Waker};
26use std::{fmt, hash};
27
28#[must_use = "Scopes should be explicitly awaited or cancelled"]
73#[derive(Debug)]
74pub struct Scope {
75 inner: ScopeHandle,
77 }
79
80impl Default for Scope {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Scope {
87 pub fn new() -> Scope {
96 ScopeHandle::with_current(|handle| handle.new_child())
97 }
98
99 pub fn new_with_name(name: impl Into<String>) -> Scope {
108 ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
109 }
110
111 pub fn current() -> ScopeHandle {
119 ScopeHandle::with_current(|handle| handle.clone())
120 }
121
122 pub fn global() -> ScopeHandle {
139 EHandle::local().global_scope().clone()
140 }
141
142 pub fn new_child(&self) -> Scope {
144 self.inner.new_child()
145 }
146
147 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
149 self.inner.new_child_with_name(name.into())
150 }
151
152 pub fn name(&self) -> &str {
154 &self.inner.inner.name
155 }
156
157 pub fn to_handle(&self) -> ScopeHandle {
165 self.inner.clone()
166 }
167
168 pub fn as_handle(&self) -> &ScopeHandle {
175 &self.inner
176 }
177
178 pub fn join(self) -> Join {
187 Join::new(self)
188 }
189
190 pub fn close(self) -> Join {
193 self.inner.close();
194 Join::new(self)
195 }
196
197 pub fn cancel(self) -> Join {
215 self.inner.cancel_all_tasks();
216 Join::new(self)
217 }
218
219 pub fn abort(self) -> impl Future<Output = ()> {
234 self.inner.abort_all_tasks();
235 Join::new(self)
236 }
237
238 pub fn detach(self) {
244 let this = ManuallyDrop::new(self);
247 mem::drop(unsafe { std::ptr::read(&this.inner) });
250 }
251}
252
253impl Drop for Scope {
256 fn drop(&mut self) {
257 self.inner.abort_all_tasks();
266 }
267}
268
269impl IntoFuture for Scope {
270 type Output = ();
271 type IntoFuture = Join;
272 fn into_future(self) -> Self::IntoFuture {
273 self.join()
274 }
275}
276
277impl Deref for Scope {
278 type Target = ScopeHandle;
279 fn deref(&self) -> &Self::Target {
280 &self.inner
281 }
282}
283
284impl Borrow<ScopeHandle> for Scope {
285 fn borrow(&self) -> &ScopeHandle {
286 self
287 }
288}
289
290pin_project! {
291 pub struct Join<S = Scope> {
303 scope: S,
304 #[pin]
305 waker_entry: WakerEntry<ScopeState>,
306 }
307}
308
309impl<S> Join<S> {
310 fn new(scope: S) -> Self {
311 Self { scope, waker_entry: WakerEntry::new() }
312 }
313}
314
315impl<S: Borrow<ScopeHandle>> Join<S> {
316 pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
321 self.scope.borrow().abort_all_tasks();
322 self
323 }
324
325 pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
330 self.scope.borrow().cancel_all_tasks();
331 self
332 }
333}
334
335impl<S> Future for Join<S>
336where
337 S: Borrow<ScopeHandle>,
338{
339 type Output = ();
340 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341 let this = self.project();
342 let mut state = Borrow::borrow(&*this.scope).lock();
343 if state.has_tasks() {
344 state.add_waker(this.waker_entry, cx.waker().clone());
345 Poll::Pending
346 } else {
347 state.mark_finished();
348 Poll::Ready(())
349 }
350 }
351}
352
353pub trait Spawnable {
356 type Output;
358
359 fn into_task(self, scope: ScopeHandle) -> TaskHandle;
361}
362
363impl<F: Future + Send + 'static> Spawnable for F
364where
365 F::Output: Send + 'static,
366{
367 type Output = F::Output;
368
369 fn into_task(self, scope: ScopeHandle) -> TaskHandle {
370 scope.new_task(None, self)
371 }
372}
373
374#[derive(Clone)]
386pub struct ScopeHandle {
387 inner: Arc<ScopeInner>,
389 }
391
392impl ScopeHandle {
393 pub fn new_child(&self) -> Scope {
395 self.new_child_inner(String::new())
396 }
397
398 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
400 self.new_child_inner(name.into())
401 }
402
403 fn new_child_inner(&self, name: String) -> Scope {
404 let mut state = self.lock();
405 let child = ScopeHandle {
406 inner: Arc::new(ScopeInner {
407 executor: self.inner.executor.clone(),
408 state: Condition::new(ScopeState::new_child(
409 self.clone(),
410 &state,
411 JoinResults::default().into(),
412 )),
413 name,
414 }),
415 };
416 let weak = child.downgrade();
417 state.insert_child(weak);
418 Scope { inner: child }
419 }
420
421 pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
425 let task = future.into_task(self.clone());
426 let task_id = task.id();
427 self.insert_task(task, false);
428 JoinHandle::new(self.clone(), task_id)
429 }
430
431 pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
436 let task = self.new_local_task(None, future);
437 let id = task.id();
438 self.insert_task(task, false);
439 JoinHandle::new(self.clone(), id)
440 }
441
442 pub fn compute<T: Send + 'static>(
447 &self,
448 future: impl Spawnable<Output = T> + Send + 'static,
449 ) -> crate::Task<T> {
450 let task = future.into_task(self.clone());
451 let id = task.id();
452 self.insert_task(task, false);
453 JoinHandle::new(self.clone(), id).into()
454 }
455
456 pub fn compute_local<T: 'static>(
464 &self,
465 future: impl Future<Output = T> + 'static,
466 ) -> crate::Task<T> {
467 let task = self.new_local_task(None, future);
468 let id = task.id();
469 self.insert_task(task, false);
470 JoinHandle::new(self.clone(), id).into()
471 }
472
473 pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
474 ScopeHandle {
475 inner: Arc::new(ScopeInner {
476 executor,
477 state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
478 name: "root".to_string(),
479 }),
480 }
481 }
482
483 pub fn close(&self) {
491 self.lock().close();
492 }
493
494 pub fn cancel(self) -> Join<Self> {
499 self.cancel_all_tasks();
500 Join::new(self)
501 }
502
503 pub fn abort(self) -> impl Future<Output = ()> {
508 self.abort_all_tasks();
509 Join::new(self)
510 }
511
512 #[must_use]
524 pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
525 ScopeActiveGuard::new(self)
526 }
527
528 pub fn is_cancelled(&self) -> bool {
531 self.lock().status().is_cancelled()
532 }
533
534 pub async fn on_no_tasks(&self) {
541 self.inner
542 .state
543 .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
544 .await;
545 }
546
547 pub fn wake_all(&self) {
549 self.lock().wake_all();
550 }
551
552 pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
555 &self,
556 id: Option<usize>,
557 fut: Fut,
558 ) -> AtomicFutureHandle<'a>
559 where
560 Fut::Output: Send,
561 {
562 AtomicFutureHandle::new(
563 Some(self.clone()),
564 id.unwrap_or_else(|| self.executor().next_task_id()),
565 fut,
566 )
567 }
568
569 pub(crate) fn new_local_task<'a>(
572 &self,
573 id: Option<usize>,
574 fut: impl Future + 'a,
575 ) -> AtomicFutureHandle<'a> {
576 if !self.executor().is_local() {
578 panic!(
579 "Error: called `new_local_task` on multithreaded executor. \
580 Use `spawn` or a `LocalExecutor` instead."
581 );
582 }
583
584 unsafe {
587 AtomicFutureHandle::new_local(
588 Some(self.clone()),
589 id.unwrap_or_else(|| self.executor().next_task_id()),
590 fut,
591 )
592 }
593 }
594}
595
596impl fmt::Debug for ScopeHandle {
597 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598 f.debug_struct("Scope").field("name", &self.inner.name).finish()
599 }
600}
601
602pub struct ScopeStream<R> {
612 inner: ScopeHandle,
613 stream: Arc<Mutex<ResultsStreamInner<R>>>,
614}
615
616impl<R: Send + 'static> ScopeStream<R> {
617 pub fn new() -> (Self, ScopeStreamHandle<R>) {
626 Self::new_inner(String::new())
627 }
628
629 pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
638 Self::new_inner(name.into())
639 }
640
641 fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
642 let this = ScopeHandle::with_current(|handle| {
643 let mut state = handle.lock();
644 let stream = Arc::default();
645 let child = ScopeHandle {
646 inner: Arc::new(ScopeInner {
647 executor: handle.executor().clone(),
648 state: Condition::new(ScopeState::new_child(
649 handle.clone(),
650 &state,
651 Box::new(ResultsStream { inner: Arc::clone(&stream) }),
652 )),
653 name,
654 }),
655 };
656 let weak = child.downgrade();
657 state.insert_child(weak);
658 ScopeStream { inner: child, stream }
659 });
660 let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
661 (this, handle)
662 }
663}
664
665impl<R> Drop for ScopeStream<R> {
666 fn drop(&mut self) {
667 self.inner.abort_all_tasks();
676 }
677}
678
679impl<R: Send + 'static> Stream for ScopeStream<R> {
680 type Item = R;
681
682 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
683 let mut stream_inner = self.stream.lock();
684 match stream_inner.results.pop() {
685 Some(result) => Poll::Ready(Some(result)),
686 None => {
687 drop(stream_inner);
690 let state = self.inner.lock();
691 let mut stream_inner = self.stream.lock();
692 match stream_inner.results.pop() {
693 Some(result) => Poll::Ready(Some(result)),
694 None => {
695 if state.has_tasks() {
696 stream_inner.waker = Some(cx.waker().clone());
697 Poll::Pending
698 } else {
699 Poll::Ready(None)
700 }
701 }
702 }
703 }
704 }
705 }
706}
707
708impl<R> Deref for ScopeStream<R> {
709 type Target = ScopeHandle;
710 fn deref(&self) -> &Self::Target {
711 &self.inner
712 }
713}
714
715impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
716 fn borrow(&self) -> &ScopeHandle {
717 self
718 }
719}
720
721impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
722 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
723 let (stream, handle) = ScopeStream::new();
724 for fut in iter {
725 handle.push(fut);
726 }
727 stream.close();
728 stream
729 }
730}
731
732#[derive(Clone)]
733pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
734
735impl<R: Send> ScopeStreamHandle<R> {
736 pub fn push(&self, future: impl Spawnable<Output = R>) {
737 self.0.insert_task(future.into_task(self.0.clone()), true);
738 }
739}
740
741#[derive(Debug)]
751#[must_use]
752pub struct ScopeActiveGuard(ScopeHandle);
753
754impl Deref for ScopeActiveGuard {
755 type Target = ScopeHandle;
756 fn deref(&self) -> &Self::Target {
757 &self.0
758 }
759}
760
761impl Drop for ScopeActiveGuard {
762 fn drop(&mut self) {
763 let Self(scope) = self;
764 scope.release_cancel_guard();
765 }
766}
767
768impl Clone for ScopeActiveGuard {
769 fn clone(&self) -> Self {
770 self.0.lock().acquire_cancel_guard();
771 Self(self.0.clone())
772 }
773}
774
775impl ScopeActiveGuard {
776 pub fn as_handle(&self) -> &ScopeHandle {
778 &self.0
779 }
780
781 pub fn to_handle(&self) -> ScopeHandle {
783 self.0.clone()
784 }
785
786 pub async fn on_cancel(&self) {
792 self.0
793 .inner
794 .state
795 .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
796 .await
797 }
798
799 fn new(scope: &ScopeHandle) -> Option<Self> {
800 if scope.lock().acquire_cancel_guard_if_not_finished() {
801 Some(Self(scope.clone()))
802 } else {
803 None
804 }
805 }
806}
807
808#[derive(Clone)]
814struct WeakScopeHandle {
815 inner: Weak<ScopeInner>,
816}
817
818impl WeakScopeHandle {
819 pub fn upgrade(&self) -> Option<ScopeHandle> {
821 self.inner.upgrade().map(|inner| ScopeHandle { inner })
822 }
823}
824
825impl hash::Hash for WeakScopeHandle {
826 fn hash<H: hash::Hasher>(&self, state: &mut H) {
827 Weak::as_ptr(&self.inner).hash(state);
828 }
829}
830
831impl PartialEq for WeakScopeHandle {
832 fn eq(&self, other: &Self) -> bool {
833 Weak::ptr_eq(&self.inner, &other.inner)
834 }
835}
836
837impl Eq for WeakScopeHandle {
838 }
841
842mod state {
845 use super::*;
846
847 pub struct ScopeState {
848 pub parent: Option<ScopeHandle>,
849 children: HashSet<WeakScopeHandle>,
851 all_tasks: HashSet<TaskHandle>,
852 subscopes_with_tasks: u32,
856 can_spawn: bool,
857 guards: u32,
858 status: Status,
859 pub results: Box<dyn Results>,
861 }
862
863 pub enum JoinResult {
864 Waker(Waker),
865 Result(TaskHandle),
866 }
867
868 #[repr(u8)] #[derive(Default, Debug, Clone, Copy)]
870 pub enum Status {
871 #[default]
872 Active,
874 PendingCancellation,
877 Finished,
880 }
881
882 impl Status {
883 pub fn is_cancelled(&self) -> bool {
885 match self {
886 Self::Active => false,
887 Self::PendingCancellation | Self::Finished => true,
888 }
889 }
890 }
891
892 impl ScopeState {
893 pub fn new_root(results: Box<impl Results>) -> Self {
894 Self {
895 parent: None,
896 children: Default::default(),
897 all_tasks: Default::default(),
898 subscopes_with_tasks: 0,
899 can_spawn: true,
900 guards: 0,
901 status: Default::default(),
902 results,
903 }
904 }
905
906 pub fn new_child(
907 parent_handle: ScopeHandle,
908 parent_state: &Self,
909 results: Box<impl Results>,
910 ) -> Self {
911 let (status, can_spawn) = match parent_state.status {
912 Status::Active => (Status::Active, parent_state.can_spawn),
913 Status::Finished | Status::PendingCancellation => (Status::Finished, false),
914 };
915 Self {
916 parent: Some(parent_handle),
917 children: Default::default(),
918 all_tasks: Default::default(),
919 subscopes_with_tasks: 0,
920 can_spawn,
921 guards: 0,
922 status,
923 results,
924 }
925 }
926 }
927
928 impl ScopeState {
929 pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
930 &self.all_tasks
931 }
932
933 pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
936 if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
937 return Some(task);
938 }
939 if self.all_tasks.is_empty() && !self.register_first_task() {
940 return Some(task);
941 }
942 task.wake();
943 assert!(self.all_tasks.insert(task));
944 None
945 }
946
947 pub fn children(&self) -> &HashSet<WeakScopeHandle> {
948 &self.children
949 }
950
951 pub fn insert_child(&mut self, child: WeakScopeHandle) {
952 self.children.insert(child);
953 }
954
955 pub fn remove_child(&mut self, child: &PtrKey) {
956 let found = self.children.remove(child);
957 assert!(found || self.children.is_empty());
960 }
961
962 pub fn status(&self) -> Status {
963 self.status
964 }
965
966 pub fn guards(&self) -> u32 {
967 self.guards
968 }
969
970 pub fn close(&mut self) {
971 self.can_spawn = false;
972 }
973
974 pub fn mark_finished(&mut self) {
975 self.can_spawn = false;
976 self.status = Status::Finished;
977 }
978
979 pub fn has_tasks(&self) -> bool {
980 self.subscopes_with_tasks > 0
981 }
982
983 pub fn wake_all(&self) {
984 for task in &self.all_tasks {
985 task.wake();
986 }
987 }
988
989 pub fn abort_tasks_and_mark_finished(&mut self) {
990 for task in self.all_tasks() {
991 if task.abort() {
992 task.scope().executor().ready_tasks.push(task.clone());
993 }
994 }
997 self.mark_finished();
998 }
999
1000 pub fn wake_wakers_and_mark_pending(
1001 this: &mut ConditionGuard<'_, ScopeState>,
1002 wakers: &mut Vec<Waker>,
1003 ) {
1004 wakers.extend(this.drain_wakers());
1005 this.status = Status::PendingCancellation;
1006 }
1007
1008 #[must_use]
1012 fn register_first_task(&mut self) -> bool {
1013 if !self.can_spawn {
1014 return false;
1015 }
1016 let can_spawn = match &self.parent {
1017 Some(parent) => {
1018 self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1021 }
1022 None => true,
1023 };
1024 if can_spawn {
1025 self.subscopes_with_tasks += 1;
1026 debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1027 };
1028 can_spawn
1029 }
1030
1031 fn on_last_task_removed(
1032 this: &mut ConditionGuard<'_, ScopeState>,
1033 num_wakers_hint: usize,
1034 wakers: &mut Vec<Waker>,
1035 ) {
1036 debug_assert!(this.subscopes_with_tasks > 0);
1037 this.subscopes_with_tasks -= 1;
1038 if this.subscopes_with_tasks > 0 {
1039 wakers.reserve(num_wakers_hint);
1040 return;
1041 }
1042
1043 match &this.parent {
1044 Some(parent) => {
1045 Self::on_last_task_removed(
1046 &mut parent.lock(),
1047 num_wakers_hint + this.waker_count(),
1048 wakers,
1049 );
1050 }
1051 None => wakers.reserve(num_wakers_hint),
1052 };
1053 wakers.extend(this.drain_wakers());
1054 }
1055
1056 pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1060 match self.status {
1061 Status::Active | Status::PendingCancellation => {
1062 self.acquire_cancel_guard();
1063 true
1064 }
1065 Status::Finished => false,
1066 }
1067 }
1068
1069 pub fn acquire_cancel_guard(&mut self) {
1070 if self.guards == 0 {
1071 if let Some(parent) = self.parent.as_ref() {
1072 parent.acquire_cancel_guard();
1073 }
1074 }
1075 self.guards += 1;
1076 }
1077
1078 pub fn release_cancel_guard(&mut self) {
1079 self.guards = self.guards.checked_sub(1).expect("released non-acquired guard");
1080 if self.guards == 0 {
1081 self.on_zero_guards();
1082 }
1083 }
1084
1085 fn on_zero_guards(&mut self) {
1086 match self.status {
1087 Status::Active => {}
1088 Status::PendingCancellation => {
1089 self.abort_tasks_and_mark_finished();
1090 }
1091 Status::Finished => {}
1094 }
1095 if let Some(parent) = &self.parent {
1096 parent.release_cancel_guard();
1097 }
1098 }
1099 }
1100
1101 #[derive(Default)]
1102 struct WakeVec(Vec<Waker>);
1103
1104 impl Drop for WakeVec {
1105 fn drop(&mut self) {
1106 for waker in self.0.drain(..) {
1107 waker.wake();
1108 }
1109 }
1110 }
1111
1112 pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1114
1115 impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1116 fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1117 Self(value, WakeVec::default())
1118 }
1119 }
1120
1121 impl ScopeWaker<'_> {
1122 pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
1123 let task = self.all_tasks.take(&id);
1124 if task.is_some() {
1125 self.on_task_removed(0);
1126 }
1127 task
1128 }
1129
1130 pub fn task_did_finish(&mut self, id: usize) {
1131 if let Some(task) = self.all_tasks.take(&id) {
1132 self.on_task_removed(1);
1133 if !task.is_detached() {
1134 let maybe_waker = self.results.task_did_finish(task);
1135 self.1 .0.extend(maybe_waker);
1136 }
1137 }
1138 }
1139
1140 pub fn set_closed_and_drain(
1141 &mut self,
1142 ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1143 self.close();
1144 let all_tasks = std::mem::take(&mut self.all_tasks);
1145 let results = self.results.take();
1146 if !all_tasks.is_empty() {
1147 self.on_task_removed(0)
1148 }
1149 let children = self.children.drain();
1150 (all_tasks, results, children)
1151 }
1152
1153 fn on_task_removed(&mut self, num_wakers_hint: usize) {
1154 if self.all_tasks.is_empty() {
1155 ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1 .0)
1156 }
1157 }
1158
1159 pub fn wake_wakers_and_mark_pending(&mut self) {
1160 let Self(state, wakers) = self;
1161 ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1162 }
1163 }
1164
1165 impl<'a> Deref for ScopeWaker<'a> {
1166 type Target = ConditionGuard<'a, ScopeState>;
1167
1168 fn deref(&self) -> &Self::Target {
1169 &self.0
1170 }
1171 }
1172
1173 impl DerefMut for ScopeWaker<'_> {
1174 fn deref_mut(&mut self) -> &mut Self::Target {
1175 &mut self.0
1176 }
1177 }
1178}
1179
1180struct ScopeInner {
1181 executor: Arc<Executor>,
1182 state: Condition<ScopeState>,
1183 name: String,
1184}
1185
1186impl Drop for ScopeInner {
1187 fn drop(&mut self) {
1188 let key = unsafe { &*(self as *const _ as *const PtrKey) };
1193 let state = self.state.lock();
1194 if let Some(parent) = &state.parent {
1195 let mut parent_state = parent.lock();
1196 if state.guards() != 0 {
1197 parent_state.release_cancel_guard();
1198 }
1199 parent_state.remove_child(key);
1200 }
1201 }
1202}
1203
1204impl ScopeHandle {
1205 fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1206 super::common::TaskHandle::with_current(|task| match task {
1207 Some(task) => f(task.scope()),
1208 None => f(EHandle::local().global_scope()),
1209 })
1210 }
1211
1212 fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1213 self.inner.state.lock()
1214 }
1215
1216 fn downgrade(&self) -> WeakScopeHandle {
1217 WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1218 }
1219
1220 #[inline(always)]
1221 pub(crate) fn executor(&self) -> &Arc<Executor> {
1222 &self.inner.executor
1223 }
1224
1225 pub(crate) fn detach(&self, task_id: usize) {
1227 let _maybe_task = {
1228 let mut state = self.lock();
1229 if let Some(task) = state.all_tasks().get(&task_id) {
1230 task.detach();
1231 }
1232 state.results.detach(task_id)
1233 };
1234 }
1235
1236 pub(crate) unsafe fn abort_task<R>(&self, task_id: usize) -> Option<R> {
1242 let mut state = self.lock();
1243 if let Some(task) = state.results.detach(task_id) {
1244 drop(state);
1245 return task.take_result();
1246 }
1247 state.all_tasks().get(&task_id).and_then(|task| {
1248 if task.abort() {
1249 self.inner.executor.ready_tasks.push(task.clone());
1250 }
1251 task.take_result()
1252 })
1253 }
1254
1255 pub(crate) fn abort_and_detach(&self, task_id: usize) {
1257 let _tasks = {
1258 let mut state = ScopeWaker::from(self.lock());
1259 let maybe_task1 = state.results.detach(task_id);
1260 let mut maybe_task2 = None;
1261 if let Some(task) = state.all_tasks().get(&task_id) {
1262 match task.abort_and_detach() {
1263 AbortAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1264 AbortAndDetachResult::AddToRunQueue => {
1265 self.inner.executor.ready_tasks.push(task.clone());
1266 }
1267 AbortAndDetachResult::Pending => {}
1268 }
1269 }
1270 (maybe_task1, maybe_task2)
1271 };
1272 }
1273
1274 pub(crate) unsafe fn poll_join_result<R>(
1280 &self,
1281 task_id: usize,
1282 cx: &mut Context<'_>,
1283 ) -> Poll<R> {
1284 let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1285 match task.take_result() {
1286 Some(result) => Poll::Ready(result),
1287 None => {
1288 Poll::Pending
1290 }
1291 }
1292 }
1293
1294 pub(crate) unsafe fn poll_aborted<R>(
1296 &self,
1297 task_id: usize,
1298 cx: &mut Context<'_>,
1299 ) -> Poll<Option<R>> {
1300 let task = self.lock().results.poll_join_result(task_id, cx);
1301 task.map(|task| task.take_result())
1302 }
1303
1304 pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1305 let returned_task = self.lock().insert_task(task, for_stream);
1306 returned_task.is_none()
1307 }
1308
1309 pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1320 let mut state = ScopeWaker::from(self.lock());
1321 let task = state.take_task(task_id);
1322 if let Some(task) = task {
1323 task.drop_future_unchecked();
1324 }
1325 }
1326
1327 pub(super) fn task_did_finish(&self, id: usize) {
1328 let mut state = ScopeWaker::from(self.lock());
1329 state.task_did_finish(id);
1330 }
1331
1332 fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1335 let mut scopes = vec![self.clone()];
1336 while let Some(scope) = scopes.pop() {
1337 let mut scope_waker = ScopeWaker::from(scope.lock());
1338 if callback(&mut scope_waker) {
1339 scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1340 }
1341 }
1342 }
1343
1344 fn acquire_cancel_guard(&self) {
1345 self.lock().acquire_cancel_guard()
1346 }
1347
1348 fn release_cancel_guard(&self) {
1349 self.lock().release_cancel_guard()
1350 }
1351
1352 fn cancel_all_tasks(&self) {
1354 self.visit_scopes_locked(|state| {
1355 match state.status() {
1356 Status::Active => {
1357 if state.guards() == 0 {
1358 state.abort_tasks_and_mark_finished();
1359 } else {
1360 state.wake_wakers_and_mark_pending();
1361 }
1362 true
1363 }
1364 Status::PendingCancellation => {
1365 true
1369 }
1370 Status::Finished => {
1371 false
1373 }
1374 }
1375 });
1376 }
1377
1378 fn abort_all_tasks(&self) {
1380 self.visit_scopes_locked(|state| match state.status() {
1381 Status::Active | Status::PendingCancellation => {
1382 state.abort_tasks_and_mark_finished();
1383 true
1384 }
1385 Status::Finished => false,
1386 });
1387 }
1388
1389 pub(super) fn drop_all_tasks(&self) {
1396 let mut scopes = vec![self.clone()];
1397 while let Some(scope) = scopes.pop() {
1398 let (tasks, join_results) = {
1399 let mut state = ScopeWaker::from(scope.lock());
1400 let (tasks, join_results, children) = state.set_closed_and_drain();
1401 scopes.extend(children.filter_map(|child| child.upgrade()));
1402 (tasks, join_results)
1403 };
1404 for task in tasks {
1406 task.try_drop().expect("Expected drop to succeed");
1407 }
1408 std::mem::drop(join_results);
1409 }
1410 }
1411}
1412
1413#[repr(transparent)]
1415struct PtrKey;
1416
1417impl Borrow<PtrKey> for WeakScopeHandle {
1418 fn borrow(&self) -> &PtrKey {
1419 unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1421 }
1422}
1423
1424impl PartialEq for PtrKey {
1425 fn eq(&self, other: &Self) -> bool {
1426 std::ptr::eq(self, other)
1427 }
1428}
1429
1430impl Eq for PtrKey {}
1431
1432impl hash::Hash for PtrKey {
1433 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1434 (self as *const PtrKey).hash(state);
1435 }
1436}
1437
1438#[derive(Default)]
1439struct JoinResults(HashMap<usize, JoinResult>);
1440
1441trait Results: Send + Sync + 'static {
1442 fn can_spawn(&self) -> bool;
1444
1445 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1447
1448 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1450
1451 fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1453
1454 fn take(&mut self) -> Box<dyn Any>;
1456
1457 #[cfg(test)]
1459 fn is_empty(&self) -> bool;
1460}
1461
1462impl Results for JoinResults {
1463 fn can_spawn(&self) -> bool {
1464 true
1465 }
1466
1467 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1468 match self.0.entry(task_id) {
1469 Entry::Occupied(mut o) => match o.get_mut() {
1470 JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1471 JoinResult::Result(_) => {
1472 let JoinResult::Result(task) = o.remove() else { unreachable!() };
1473 return Poll::Ready(task);
1474 }
1475 },
1476 Entry::Vacant(v) => {
1477 v.insert(JoinResult::Waker(cx.waker().clone()));
1478 }
1479 }
1480 Poll::Pending
1481 }
1482
1483 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1484 match self.0.entry(task.id()) {
1485 Entry::Occupied(mut o) => {
1486 let JoinResult::Waker(waker) =
1487 std::mem::replace(o.get_mut(), JoinResult::Result(task))
1488 else {
1489 unreachable!()
1493 };
1494 Some(waker)
1495 }
1496 Entry::Vacant(v) => {
1497 v.insert(JoinResult::Result(task));
1498 None
1499 }
1500 }
1501 }
1502
1503 fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1504 match self.0.remove(&task_id) {
1505 Some(JoinResult::Result(task)) => Some(task),
1506 _ => None,
1507 }
1508 }
1509
1510 fn take(&mut self) -> Box<dyn Any> {
1511 Box::new(Self(std::mem::take(&mut self.0)))
1512 }
1513
1514 #[cfg(test)]
1515 fn is_empty(&self) -> bool {
1516 self.0.is_empty()
1517 }
1518}
1519
1520#[derive(Default)]
1521struct ResultsStream<R> {
1522 inner: Arc<Mutex<ResultsStreamInner<R>>>,
1523}
1524
1525struct ResultsStreamInner<R> {
1526 results: Vec<R>,
1527 waker: Option<Waker>,
1528}
1529
1530impl<R> Default for ResultsStreamInner<R> {
1531 fn default() -> Self {
1532 Self { results: Vec::new(), waker: None }
1533 }
1534}
1535
1536impl<R: Send + 'static> Results for ResultsStream<R> {
1537 fn can_spawn(&self) -> bool {
1538 false
1539 }
1540
1541 fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1542 Poll::Pending
1543 }
1544
1545 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1546 let mut inner = self.inner.lock();
1547 inner.results.extend(unsafe { task.take_result() });
1550 inner.waker.take()
1551 }
1552
1553 fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1554 None
1555 }
1556
1557 fn take(&mut self) -> Box<dyn Any> {
1558 Box::new(std::mem::take(&mut self.inner.lock().results))
1559 }
1560
1561 #[cfg(test)]
1562 fn is_empty(&self) -> bool {
1563 false
1564 }
1565}
1566
1567#[cfg(test)]
1568mod tests {
1569 use super::*;
1570 use crate::{EHandle, LocalExecutor, SendExecutor, SpawnableFuture, Task, TestExecutor, Timer};
1571 use assert_matches::assert_matches;
1572 use fuchsia_sync::{Condvar, Mutex};
1573 use futures::channel::mpsc;
1574 use futures::future::join_all;
1575 use futures::{FutureExt, StreamExt};
1576 use std::future::{pending, poll_fn};
1577 use std::pin::{pin, Pin};
1578 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1579 use std::sync::Arc;
1580 use std::task::{Context, Poll};
1581 use std::time::Duration;
1582
1583 #[derive(Default)]
1584 struct RemoteControlFuture(Mutex<RCFState>);
1585 #[derive(Default)]
1586 struct RCFState {
1587 resolved: bool,
1588 waker: Option<Waker>,
1589 }
1590
1591 impl Future for &RemoteControlFuture {
1592 type Output = ();
1593 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1594 let mut this = self.0.lock();
1595 if this.resolved {
1596 Poll::Ready(())
1597 } else {
1598 this.waker.replace(cx.waker().clone());
1599 Poll::Pending
1600 }
1601 }
1602 }
1603
1604 impl RemoteControlFuture {
1605 fn new() -> Arc<Self> {
1606 Arc::new(Default::default())
1607 }
1608
1609 fn resolve(&self) {
1610 let mut this = self.0.lock();
1611 this.resolved = true;
1612 if let Some(waker) = this.waker.take() {
1613 waker.wake();
1614 }
1615 }
1616
1617 fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1618 let this = Arc::clone(self);
1619 #[allow(clippy::redundant_async_block)] async move {
1621 (&*this).await
1622 }
1623 }
1624 }
1625
1626 #[test]
1627 fn compute_works_on_root_scope() {
1628 let mut executor = TestExecutor::new();
1629 let scope = executor.global_scope();
1630 let mut task = pin!(scope.compute(async { 1 }));
1631 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1632 }
1633
1634 #[test]
1635 fn compute_works_on_new_child() {
1636 let mut executor = TestExecutor::new();
1637 let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1638 let mut task = pin!(scope.compute(async { 1 }));
1639 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1640 }
1641
1642 #[test]
1643 fn scope_drop_cancels_tasks() {
1644 let mut executor = TestExecutor::new();
1645 let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1646 let mut task = pin!(scope.compute(async { 1 }));
1647 drop(scope);
1648 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1649 }
1650
1651 #[test]
1652 fn tasks_do_not_spawn_on_cancelled_scopes() {
1653 let mut executor = TestExecutor::new();
1654 let scope =
1655 executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1656 let handle = scope.to_handle();
1657 let mut cancel = pin!(scope.cancel());
1658 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1659 let mut task = pin!(handle.compute(async { 1 }));
1660 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1661 }
1662
1663 #[test]
1664 fn tasks_do_not_spawn_on_closed_empty_scopes() {
1665 let mut executor = TestExecutor::new();
1666 let scope =
1667 executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1668 let handle = scope.to_handle();
1669 let mut close = pin!(scope.cancel());
1670 assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1671 let mut task = pin!(handle.compute(async { 1 }));
1672 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1673 }
1674
1675 #[test]
1676 fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1677 let mut executor = TestExecutor::new();
1678 let scope = executor.global_scope().new_child();
1679 let handle = scope.to_handle();
1680 handle.spawn(pending());
1681 let mut close = pin!(scope.close());
1682 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1683 let mut task = pin!(handle.compute(async { 1 }));
1684 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1685 }
1686
1687 #[test]
1688 fn spawn_works_on_child_and_grandchild() {
1689 let mut executor = TestExecutor::new();
1690 let scope = executor.global_scope().new_child();
1691 let child = scope.new_child();
1692 let grandchild = child.new_child();
1693 let mut child_task = pin!(child.compute(async { 1 }));
1694 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1695 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1696 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1697 }
1698
1699 #[test]
1700 fn spawn_drop_cancels_child_and_grandchild_tasks() {
1701 let mut executor = TestExecutor::new();
1702 let scope = executor.global_scope().new_child();
1703 let child = scope.new_child();
1704 let grandchild = child.new_child();
1705 let mut child_task = pin!(child.compute(async { 1 }));
1706 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1707 drop(scope);
1708 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1709 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1710 }
1711
1712 #[test]
1713 fn completed_tasks_are_cleaned_up_after_cancel() {
1714 let mut executor = TestExecutor::new();
1715 let scope = executor.global_scope().new_child();
1716
1717 let task1 = scope.spawn(pending::<()>());
1718 let task2 = scope.spawn(async {});
1719 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1720 assert_eq!(scope.lock().all_tasks().len(), 1);
1721
1722 assert_eq!(task1.abort().now_or_never(), None);
1725 assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1726
1727 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1728 assert_eq!(scope.lock().all_tasks().len(), 0);
1729 assert!(scope.lock().results.is_empty());
1730 }
1731
1732 #[test]
1733 fn join_emtpy_scope() {
1734 let mut executor = TestExecutor::new();
1735 let scope = executor.global_scope().new_child();
1736 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1737 }
1738
1739 #[test]
1740 fn task_handle_preserves_access_to_result_after_join_begins() {
1741 let mut executor = TestExecutor::new();
1742 let scope = executor.global_scope().new_child();
1743 let mut task = scope.compute(async { 1 });
1744 scope.spawn(async {});
1745 let task2 = scope.spawn(pending::<()>());
1746 let mut join = pin!(scope.join().fuse());
1749 let _ = executor.run_until_stalled(&mut join);
1750 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1751 drop(task2.abort());
1752 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1753 }
1754
1755 #[test]
1756 fn join_blocks_until_task_is_cancelled() {
1757 let mut executor = TestExecutor::new();
1760 let scope = executor.global_scope().new_child();
1761 let outstanding_task = scope.spawn(pending::<()>());
1762 let cancelled_task = scope.spawn(pending::<()>());
1763 assert_eq!(
1764 executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1765 Poll::Ready(None)
1766 );
1767 let mut join = pin!(scope.join());
1768 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1769 assert_eq!(
1770 executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1771 Poll::Ready(None)
1772 );
1773 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1774 }
1775
1776 #[test]
1777 fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1778 let mut executor = TestExecutor::new();
1779 let scope = executor.global_scope().new_child();
1780 scope.spawn(pending::<()>());
1782 let mut join = pin!(scope.join());
1783 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1784 let mut cancel = pin!(join.cancel());
1785 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1786 }
1787
1788 #[test]
1789 fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1790 let mut executor = TestExecutor::new();
1791 let scope = executor.global_scope().new_child();
1792 scope.spawn(pending::<()>());
1794 let mut close = pin!(scope.close());
1795 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1796 let mut cancel = pin!(close.cancel());
1797 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1798 }
1799
1800 #[test]
1801 fn join_scope_blocks_until_spawned_task_completes() {
1802 let mut executor = TestExecutor::new();
1803 let scope = executor.global_scope().new_child();
1804 let remote = RemoteControlFuture::new();
1805 let mut task = scope.spawn(remote.as_future());
1806 let mut scope_join = pin!(scope.join());
1807 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1808 remote.resolve();
1809 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1810 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1811 }
1812
1813 #[test]
1814 fn close_scope_blocks_until_spawned_task_completes() {
1815 let mut executor = TestExecutor::new();
1816 let scope = executor.global_scope().new_child();
1817 let remote = RemoteControlFuture::new();
1818 let mut task = scope.spawn(remote.as_future());
1819 let mut scope_close = pin!(scope.close());
1820 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1821 remote.resolve();
1822 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1823 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1824 }
1825
1826 #[test]
1827 fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1828 let mut executor = TestExecutor::new();
1829 let scope = executor.global_scope().new_child();
1830 let child = scope.new_child();
1831 let remote = RemoteControlFuture::new();
1832 child.spawn(remote.as_future());
1833 let mut scope_join = pin!(scope.join());
1834 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1835 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1836 child.detach();
1837 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1838 remote.resolve();
1839 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1840 }
1841
1842 #[test]
1843 fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1844 let mut executor = TestExecutor::new();
1845 let scope = executor.global_scope().new_child();
1846 let remote = RemoteControlFuture::new();
1847 {
1848 let remote = remote.clone();
1849 scope.spawn(async move {
1850 let child = Scope::new_with_name("child");
1851 child.spawn(async move {
1852 Scope::current().spawn(remote.as_future());
1853 });
1854 child.detach();
1855 });
1856 }
1857 let mut scope_join = pin!(scope.join());
1858 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1859 remote.resolve();
1860 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1861 }
1862
1863 #[test]
1864 fn join_scope_blocks_when_blocked_child_is_detached() {
1865 let mut executor = TestExecutor::new();
1866 let scope = executor.global_scope().new_child();
1867 let child = scope.new_child();
1868 child.spawn(pending());
1869 let mut scope_join = pin!(scope.join());
1870 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1871 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1872 child.detach();
1873 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1874 }
1875
1876 #[test]
1877 fn join_scope_completes_when_blocked_child_is_cancelled() {
1878 let mut executor = TestExecutor::new();
1879 let scope = executor.global_scope().new_child();
1880 let child = scope.new_child();
1881 child.spawn(pending());
1882 let mut scope_join = pin!(scope.join());
1883 {
1884 let mut child_join = pin!(child.join());
1885 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1886 assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1887 }
1888 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1889 }
1890
1891 #[test]
1892 fn detached_scope_can_spawn() {
1893 let mut executor = TestExecutor::new();
1894 let scope = executor.global_scope().new_child();
1895 let handle = scope.to_handle();
1896 scope.detach();
1897 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1898 }
1899
1900 #[test]
1901 fn dropped_scope_cannot_spawn() {
1902 let mut executor = TestExecutor::new();
1903 let scope = executor.global_scope().new_child();
1904 let handle = scope.to_handle();
1905 drop(scope);
1906 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1907 }
1908
1909 #[test]
1910 fn dropped_scope_with_running_task_cannot_spawn() {
1911 let mut executor = TestExecutor::new();
1912 let scope = executor.global_scope().new_child();
1913 let handle = scope.to_handle();
1914 let _running_task = handle.spawn(pending::<()>());
1915 drop(scope);
1916 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1917 }
1918
1919 #[test]
1920 fn joined_scope_cannot_spawn() {
1921 let mut executor = TestExecutor::new();
1922 let scope = executor.global_scope().new_child();
1923 let handle = scope.to_handle();
1924 let mut scope_join = pin!(scope.join());
1925 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1926 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1927 }
1928
1929 #[test]
1930 fn joining_scope_with_running_task_can_spawn() {
1931 let mut executor = TestExecutor::new();
1932 let scope = executor.global_scope().new_child();
1933 let handle = scope.to_handle();
1934 let _running_task = handle.spawn(pending::<()>());
1935 let mut scope_join = pin!(scope.join());
1936 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1937 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1938 }
1939
1940 #[test]
1941 fn joined_scope_child_cannot_spawn() {
1942 let mut executor = TestExecutor::new();
1943 let scope = executor.global_scope().new_child();
1944 let handle = scope.to_handle();
1945 let child_before_join = scope.new_child();
1946 assert_eq!(
1947 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1948 Poll::Ready(1)
1949 );
1950 let mut scope_join = pin!(scope.join());
1951 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1952 let child_after_join = handle.new_child();
1953 let grandchild_after_join = child_before_join.new_child();
1954 assert_eq!(
1955 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1956 Poll::Pending
1957 );
1958 assert_eq!(
1959 executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
1960 Poll::Pending
1961 );
1962 assert_eq!(
1963 executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
1964 Poll::Pending
1965 );
1966 }
1967
1968 #[test]
1969 fn closed_scope_child_cannot_spawn() {
1970 let mut executor = TestExecutor::new();
1971 let scope = executor.global_scope().new_child();
1972 let handle = scope.to_handle();
1973 let child_before_close = scope.new_child();
1974 assert_eq!(
1975 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1976 Poll::Ready(1)
1977 );
1978 let mut scope_close = pin!(scope.close());
1979 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1980 let child_after_close = handle.new_child();
1981 let grandchild_after_close = child_before_close.new_child();
1982 assert_eq!(
1983 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1984 Poll::Pending
1985 );
1986 assert_eq!(
1987 executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
1988 Poll::Pending
1989 );
1990 assert_eq!(
1991 executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
1992 Poll::Pending
1993 );
1994 }
1995
1996 #[test]
1997 fn can_join_child_first() {
1998 let mut executor = TestExecutor::new();
1999 let scope = executor.global_scope().new_child();
2000 let child = scope.new_child();
2001 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2002 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2003 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2004 }
2005
2006 #[test]
2007 fn can_join_parent_first() {
2008 let mut executor = TestExecutor::new();
2009 let scope = executor.global_scope().new_child();
2010 let child = scope.new_child();
2011 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2012 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2013 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2014 }
2015
2016 #[test]
2017 fn task_in_parent_scope_can_join_child() {
2018 let mut executor = TestExecutor::new();
2019 let scope = executor.global_scope().new_child();
2020 let child = scope.new_child();
2021 let remote = RemoteControlFuture::new();
2022 child.spawn(remote.as_future());
2023 scope.spawn(async move { child.join().await });
2024 let mut join = pin!(scope.join());
2025 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2026 remote.resolve();
2027 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2028 }
2029
2030 #[test]
2031 fn join_completes_while_completed_task_handle_is_held() {
2032 let mut executor = TestExecutor::new();
2033 let scope = executor.global_scope().new_child();
2034 let mut task = scope.compute(async { 1 });
2035 scope.spawn(async {});
2036 let mut join = pin!(scope.join());
2037 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2038 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2039 }
2040
2041 #[test]
2042 fn cancel_completes_while_task_holds_handle() {
2043 let mut executor = TestExecutor::new();
2044 let scope = executor.global_scope().new_child();
2045 let handle = scope.to_handle();
2046 let mut task = scope.compute(async move {
2047 loop {
2048 pending::<()>().await; handle.spawn(async {});
2050 }
2051 });
2052
2053 let mut join = pin!(scope.join());
2055 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2056
2057 let mut cancel = pin!(join.cancel());
2058 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2059 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2060 }
2061
2062 #[test]
2063 fn cancel_from_handle_inside_task() {
2064 let mut executor = TestExecutor::new();
2065 let scope = executor.global_scope().new_child();
2066 {
2067 scope.spawn(pending::<()>());
2069
2070 let mut no_tasks = pin!(scope.on_no_tasks());
2071 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2072
2073 let handle = scope.to_handle();
2074 scope.spawn(async move {
2075 handle.cancel().await;
2076 panic!("cancel() should never complete");
2077 });
2078
2079 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2080 }
2081 assert_eq!(scope.join().now_or_never(), Some(()));
2082 }
2083
2084 #[test]
2085 fn can_spawn_from_non_executor_thread() {
2086 let mut executor = TestExecutor::new();
2087 let scope = executor.global_scope().clone();
2088 let done = Arc::new(AtomicBool::new(false));
2089 let done_clone = done.clone();
2090 let _ = std::thread::spawn(move || {
2091 scope.spawn(async move {
2092 done_clone.store(true, Ordering::Relaxed);
2093 })
2094 })
2095 .join();
2096 let _ = executor.run_until_stalled(&mut pending::<()>());
2097 assert!(done.load(Ordering::Relaxed));
2098 }
2099
2100 #[test]
2101 fn scope_tree() {
2102 let mut executor = TestExecutor::new();
2108 let a = executor.global_scope().new_child();
2109 let b = a.new_child();
2110 let c = b.new_child();
2111 let d = b.new_child();
2112 let a_remote = RemoteControlFuture::new();
2113 let c_remote = RemoteControlFuture::new();
2114 let d_remote = RemoteControlFuture::new();
2115 a.spawn(a_remote.as_future());
2116 c.spawn(c_remote.as_future());
2117 d.spawn(d_remote.as_future());
2118 let mut a_join = pin!(a.join());
2119 let mut b_join = pin!(b.join());
2120 let mut d_join = pin!(d.join());
2121 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2122 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2123 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2124 d_remote.resolve();
2125 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2126 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2127 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2128 c_remote.resolve();
2129 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2130 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2131 a_remote.resolve();
2132 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2133 let mut c_join = pin!(c.join());
2134 assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2135 }
2136
2137 #[test]
2138 fn on_no_tasks() {
2139 let mut executor = TestExecutor::new();
2140 let scope = executor.global_scope().new_child();
2141 let _task1 = scope.spawn(std::future::ready(()));
2142 let task2 = scope.spawn(pending::<()>());
2143
2144 let mut on_no_tasks = pin!(scope.on_no_tasks());
2145
2146 assert!(executor.run_until_stalled(&mut on_no_tasks).is_pending());
2147
2148 drop(task2.abort());
2149
2150 let on_no_tasks2 = pin!(scope.on_no_tasks());
2151 let on_no_tasks3 = pin!(scope.on_no_tasks());
2152
2153 assert_matches!(
2154 executor.run_until_stalled(&mut join_all([on_no_tasks, on_no_tasks2, on_no_tasks3])),
2155 Poll::Ready(_)
2156 );
2157 }
2158
2159 #[test]
2160 fn wake_all() {
2161 let mut executor = TestExecutor::new();
2162 let scope = executor.global_scope().new_child();
2163
2164 let poll_count = Arc::new(AtomicU64::new(0));
2165
2166 struct PollCounter(Arc<AtomicU64>);
2167
2168 impl Future for PollCounter {
2169 type Output = ();
2170 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2171 self.0.fetch_add(1, Ordering::Relaxed);
2172 Poll::Pending
2173 }
2174 }
2175
2176 scope.spawn(PollCounter(poll_count.clone()));
2177 scope.spawn(PollCounter(poll_count.clone()));
2178
2179 let _ = executor.run_until_stalled(&mut pending::<()>());
2180
2181 let mut start_count = poll_count.load(Ordering::Relaxed);
2182
2183 for _ in 0..2 {
2184 scope.wake_all();
2185 let _ = executor.run_until_stalled(&mut pending::<()>());
2186 assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
2187 start_count += 2;
2188 }
2189 }
2190
2191 #[test]
2192 fn on_no_tasks_race() {
2193 fn sleep_random() {
2194 use rand::Rng;
2195 std::thread::sleep(std::time::Duration::from_micros(
2196 rand::thread_rng().gen_range(0..10),
2197 ));
2198 }
2199 for _ in 0..2000 {
2200 let mut executor = SendExecutor::new(2);
2201 let scope = executor.root_scope().new_child();
2202 scope.spawn(async {
2203 sleep_random();
2204 });
2205 executor.run(async move {
2206 sleep_random();
2207 scope.on_no_tasks().await;
2208 });
2209 }
2210 }
2211
2212 async fn yield_to_executor() {
2213 let mut done = false;
2214 poll_fn(|cx| {
2215 if done {
2216 Poll::Ready(())
2217 } else {
2218 done = true;
2219 cx.waker().wake_by_ref();
2220 Poll::Pending
2221 }
2222 })
2223 .await;
2224 }
2225
2226 #[test]
2227 fn test_detach() {
2228 let mut e = LocalExecutor::new();
2229 e.run_singlethreaded(async {
2230 let counter = Arc::new(AtomicU32::new(0));
2231
2232 {
2233 let counter = counter.clone();
2234 Task::spawn(async move {
2235 for _ in 0..5 {
2236 yield_to_executor().await;
2237 counter.fetch_add(1, Ordering::Relaxed);
2238 }
2239 })
2240 .detach();
2241 }
2242
2243 while counter.load(Ordering::Relaxed) != 5 {
2244 yield_to_executor().await;
2245 }
2246 });
2247
2248 assert!(e.ehandle.root_scope.lock().results.is_empty());
2249 }
2250
2251 #[test]
2252 fn test_cancel() {
2253 let mut e = LocalExecutor::new();
2254 e.run_singlethreaded(async {
2255 let ref_count = Arc::new(());
2256 {
2258 let ref_count = ref_count.clone();
2259 drop(Task::spawn(async move {
2260 let _ref_count = ref_count;
2261 let _: () = std::future::pending().await;
2262 }));
2263 }
2264
2265 while Arc::strong_count(&ref_count) != 1 {
2266 yield_to_executor().await;
2267 }
2268
2269 let task = {
2271 let ref_count = ref_count.clone();
2272 Task::spawn(async move {
2273 let _ref_count = ref_count;
2274 let _: () = std::future::pending().await;
2275 })
2276 };
2277
2278 assert_eq!(task.abort().await, None);
2279 while Arc::strong_count(&ref_count) != 1 {
2280 yield_to_executor().await;
2281 }
2282
2283 let task = {
2285 let ref_count = ref_count.clone();
2286 Task::spawn(async move {
2287 let _ref_count = ref_count;
2288 })
2289 };
2290
2291 while Arc::strong_count(&ref_count) != 1 {
2293 yield_to_executor().await;
2294 }
2295
2296 assert_eq!(task.abort().await, Some(()));
2297 });
2298
2299 assert!(e.ehandle.root_scope.lock().results.is_empty());
2300 }
2301
2302 #[test]
2303 fn test_cancel_waits() {
2304 let mut executor = SendExecutor::new(2);
2305 let running = Arc::new((Mutex::new(false), Condvar::new()));
2306 let task = {
2307 let running = running.clone();
2308 executor.root_scope().compute(async move {
2309 *running.0.lock() = true;
2310 running.1.notify_all();
2311 std::thread::sleep(std::time::Duration::from_millis(10));
2312 *running.0.lock() = false;
2313 "foo"
2314 })
2315 };
2316 executor.run(async move {
2317 {
2318 let mut guard = running.0.lock();
2319 while !*guard {
2320 running.1.wait(&mut guard);
2321 }
2322 }
2323 assert_eq!(task.abort().await, Some("foo"));
2324 assert!(!*running.0.lock());
2325 });
2326 }
2327
2328 fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2329 let mut executor = SendExecutor::new(2);
2330 let running = Arc::new((Mutex::new(false), Condvar::new()));
2331 let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2332 let task = {
2333 let running = running.clone();
2334 let can_quit = can_quit.clone();
2335 executor.root_scope().compute(async move {
2336 *running.0.lock() = true;
2337 running.1.notify_all();
2338 {
2339 let mut guard = can_quit.0.lock();
2340 while !*guard {
2341 can_quit.1.wait(&mut guard);
2342 }
2343 }
2344 *running.0.lock() = false;
2345 })
2346 };
2347 executor.run(async move {
2348 {
2349 let mut guard = running.0.lock();
2350 while !*guard {
2351 running.1.wait(&mut guard);
2352 }
2353 }
2354
2355 callback(task);
2356
2357 *can_quit.0.lock() = true;
2358 can_quit.1.notify_all();
2359
2360 let ehandle = EHandle::local();
2361 let scope = ehandle.global_scope();
2362
2363 while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2365 Timer::new(std::time::Duration::from_millis(1)).await;
2366 }
2367
2368 assert!(!*running.0.lock());
2369 });
2370 }
2371
2372 #[test]
2373 fn test_dropped_cancel_cleans_up() {
2374 test_clean_up(|task| {
2375 let abort_fut = std::pin::pin!(task.abort());
2376 let waker = futures::task::noop_waker();
2377 assert!(abort_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2378 });
2379 }
2380
2381 #[test]
2382 fn test_dropped_task_cleans_up() {
2383 test_clean_up(|task| {
2384 std::mem::drop(task);
2385 });
2386 }
2387
2388 #[test]
2389 fn test_detach_cleans_up() {
2390 test_clean_up(|task| {
2391 task.detach();
2392 });
2393 }
2394
2395 #[test]
2396 fn test_scope_stream() {
2397 let mut executor = SendExecutor::new(2);
2398 executor.run(async move {
2399 let (stream, handle) = ScopeStream::new();
2400 handle.push(async { 1 });
2401 handle.push(async { 2 });
2402 stream.close();
2403 let results: HashSet<_> = stream.collect().await;
2404 assert_eq!(results, HashSet::from_iter([1, 2]));
2405 });
2406 }
2407
2408 #[test]
2409 fn test_scope_stream_wakes_properly() {
2410 let mut executor = SendExecutor::new(2);
2411 executor.run(async move {
2412 let (stream, handle) = ScopeStream::new();
2413 handle.push(async {
2414 Timer::new(Duration::from_millis(10)).await;
2415 1
2416 });
2417 handle.push(async {
2418 Timer::new(Duration::from_millis(10)).await;
2419 2
2420 });
2421 stream.close();
2422 let results: HashSet<_> = stream.collect().await;
2423 assert_eq!(results, HashSet::from_iter([1, 2]));
2424 });
2425 }
2426
2427 #[test]
2428 fn test_scope_stream_drops_spawned_tasks() {
2429 let mut executor = SendExecutor::new(2);
2430 executor.run(async move {
2431 let (stream, handle) = ScopeStream::new();
2432 handle.push(async { 1 });
2433 let _task = stream.compute(async { "foo" });
2434 stream.close();
2435 let results: HashSet<_> = stream.collect().await;
2436 assert_eq!(results, HashSet::from_iter([1]));
2437 });
2438 }
2439
2440 #[test]
2441 fn test_nested_scope_stream() {
2442 let mut executor = SendExecutor::new(2);
2443 executor.run(async move {
2444 let (mut stream, handle) = ScopeStream::new();
2445 handle.clone().push(async move {
2446 handle.clone().push(async move {
2447 handle.clone().push(async move { 3 });
2448 2
2449 });
2450 1
2451 });
2452 let mut results = HashSet::default();
2453 while let Some(item) = stream.next().await {
2454 results.insert(item);
2455 if results.len() == 3 {
2456 stream.close();
2457 }
2458 }
2459 assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2460 });
2461 }
2462
2463 #[test]
2464 fn test_dropping_scope_stream_cancels_all_tasks() {
2465 let mut executor = SendExecutor::new(2);
2466 executor.run(async move {
2467 let (stream, handle) = ScopeStream::new();
2468 let (tx1, mut rx) = mpsc::unbounded::<()>();
2469 let tx2 = tx1.clone();
2470 handle.push(async move {
2471 let _tx1 = tx1;
2472 let () = pending().await;
2473 });
2474 handle.push(async move {
2475 let _tx2 = tx2;
2476 let () = pending().await;
2477 });
2478 drop(stream);
2479
2480 assert_eq!(rx.next().await, None);
2482 });
2483 }
2484
2485 #[test]
2486 fn test_scope_stream_collect() {
2487 let mut executor = SendExecutor::new(2);
2488 executor.run(async move {
2489 let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2490 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2491
2492 let stream: ScopeStream<_> =
2493 (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2494 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2495 });
2496 }
2497
2498 struct DropSignal(Arc<AtomicBool>);
2499
2500 impl Drop for DropSignal {
2501 fn drop(&mut self) {
2502 self.0.store(true, Ordering::SeqCst);
2503 }
2504 }
2505
2506 struct DropChecker(Arc<AtomicBool>);
2507
2508 impl DropChecker {
2509 fn new() -> (Self, DropSignal) {
2510 let inner = Arc::new(AtomicBool::new(false));
2511 (Self(inner.clone()), DropSignal(inner))
2512 }
2513
2514 fn is_dropped(&self) -> bool {
2515 self.0.load(Ordering::SeqCst)
2516 }
2517 }
2518
2519 #[test]
2520 fn child_finished_when_parent_pending() {
2521 let mut executor = LocalExecutor::new();
2522 executor.run_singlethreaded(async {
2523 let scope = Scope::new();
2524 let _guard = scope.active_guard().expect("acquire guard");
2525 let cancel = scope.to_handle().cancel();
2526 let child = scope.new_child();
2527 let (checker, signal) = DropChecker::new();
2528 child.spawn(async move {
2529 let _signal = signal;
2530 futures::future::pending::<()>().await
2531 });
2532 assert!(checker.is_dropped());
2533 assert!(child.active_guard().is_none());
2534 cancel.await;
2535 })
2536 }
2537
2538 #[test]
2539 fn guarded_scopes_observe_closed() {
2540 let mut executor = LocalExecutor::new();
2541 executor.run_singlethreaded(async {
2542 let scope = Scope::new();
2543 let handle = scope.to_handle();
2544 let _guard = scope.active_guard().expect("acquire guard");
2545 handle.close();
2546 let (checker, signal) = DropChecker::new();
2547 handle.spawn(async move {
2548 let _signal = signal;
2549 futures::future::pending::<()>().await
2550 });
2551 assert!(checker.is_dropped());
2552 let (checker, signal) = DropChecker::new();
2553 let cancel = handle.clone().cancel();
2554 handle.spawn(async move {
2555 let _signal = signal;
2556 futures::future::pending::<()>().await
2557 });
2558 assert!(checker.is_dropped());
2559 scope.join().await;
2560 cancel.await;
2561 })
2562 }
2563
2564 #[test]
2565 fn active_guard_holds_cancellation() {
2566 let mut executor = TestExecutor::new();
2567 let scope = executor.global_scope().new_child();
2568 let guard = scope.active_guard().expect("acquire guard");
2569 scope.spawn(futures::future::pending());
2570 let mut join = pin!(scope.cancel());
2571 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2572 drop(guard);
2573 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2574 }
2575
2576 #[test]
2577 fn child_guard_holds_parent_cancellation() {
2578 let mut executor = TestExecutor::new();
2579 let scope = executor.global_scope().new_child();
2580 let child = scope.new_child();
2581 let guard = child.active_guard().expect("acquire guard");
2582 scope.spawn(futures::future::pending());
2583 let mut join = pin!(scope.cancel());
2584 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2585 drop(guard);
2586 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2587 }
2588
2589 #[test]
2590 fn active_guard_on_cancel() {
2591 let mut executor = TestExecutor::new();
2592 let scope = executor.global_scope().new_child();
2593 let child1 = scope.new_child();
2594 let child2 = scope.new_child();
2595 let guard = child1.active_guard().expect("acquire guard");
2596 let guard_for_right_scope = guard.clone();
2597 let guard_for_wrong_scope = guard.clone();
2598 child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2599 child2.spawn(async move {
2600 guard_for_wrong_scope.on_cancel().await;
2601 });
2602
2603 let handle = scope.to_handle();
2604 let mut join = pin!(scope.join());
2605 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2606 let cancel: Join<_> = handle.cancel();
2607 drop(cancel);
2608 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2609 }
2610
2611 #[test]
2612 fn abort_join() {
2613 let mut executor = TestExecutor::new();
2614 let scope = executor.global_scope().new_child();
2615 let child = scope.new_child();
2616 let _guard = child.active_guard().expect("acquire guard");
2617
2618 let (checker1, signal) = DropChecker::new();
2619 scope.spawn(async move {
2620 let _signal = signal;
2621 futures::future::pending::<()>().await
2622 });
2623 let (checker2, signal) = DropChecker::new();
2624 scope.spawn(async move {
2625 let _signal = signal;
2626 futures::future::pending::<()>().await
2627 });
2628
2629 let mut join = pin!(scope.cancel());
2630 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2631 assert!(!checker1.is_dropped());
2632 assert!(!checker2.is_dropped());
2633
2634 let mut join = join.abort();
2635 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2636 assert!(checker1.is_dropped());
2637 assert!(checker2.is_dropped());
2638 }
2639
2640 #[test]
2641 fn child_without_guard_aborts_immediately_on_cancel() {
2642 let mut executor = TestExecutor::new();
2643 let scope = executor.global_scope().new_child();
2644 let child = scope.new_child();
2645 let guard = scope.active_guard().expect("acquire guard");
2646
2647 let (checker_scope, signal) = DropChecker::new();
2648 scope.spawn(async move {
2649 let _signal = signal;
2650 futures::future::pending::<()>().await
2651 });
2652 let (checker_child, signal) = DropChecker::new();
2653 child.spawn(async move {
2654 let _signal = signal;
2655 futures::future::pending::<()>().await
2656 });
2657
2658 let mut join = pin!(scope.cancel());
2659 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2660 assert!(!checker_scope.is_dropped());
2661 assert!(checker_child.is_dropped());
2662
2663 drop(guard);
2664 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2665 assert!(checker_child.is_dropped());
2666 }
2667}