1use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::condition::{Condition, ConditionGuard, WakerEntry};
9use crate::EHandle;
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{ready, Context, Poll, Waker};
26use std::{fmt, hash};
27
28#[must_use = "Scopes should be explicitly awaited or cancelled"]
73#[derive(Debug)]
74pub struct Scope {
75 inner: ScopeHandle,
77 }
79
80impl Default for Scope {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Scope {
87 pub fn new() -> Scope {
96 ScopeHandle::with_current(|handle| handle.new_child())
97 }
98
99 pub fn new_with_name(name: impl Into<String>) -> Scope {
108 ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
109 }
110
111 pub fn current() -> ScopeHandle {
119 ScopeHandle::with_current(|handle| handle.clone())
120 }
121
122 pub fn global() -> ScopeHandle {
139 EHandle::local().global_scope().clone()
140 }
141
142 pub fn new_child(&self) -> Scope {
144 self.inner.new_child()
145 }
146
147 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
149 self.inner.new_child_with_name(name.into())
150 }
151
152 pub fn name(&self) -> &str {
154 &self.inner.inner.name
155 }
156
157 pub fn to_handle(&self) -> ScopeHandle {
165 self.inner.clone()
166 }
167
168 pub fn as_handle(&self) -> &ScopeHandle {
175 &self.inner
176 }
177
178 pub fn join(self) -> Join {
187 Join::new(self)
188 }
189
190 pub fn close(self) -> Join {
193 self.inner.close();
194 Join::new(self)
195 }
196
197 pub fn cancel(self) -> Join {
215 self.inner.cancel_all_tasks();
216 Join::new(self)
217 }
218
219 pub fn abort(self) -> impl Future<Output = ()> {
234 self.inner.abort_all_tasks();
235 Join::new(self)
236 }
237
238 pub fn detach(self) {
244 let this = ManuallyDrop::new(self);
247 mem::drop(unsafe { std::ptr::read(&this.inner) });
250 }
251}
252
253impl Drop for Scope {
256 fn drop(&mut self) {
257 self.inner.abort_all_tasks();
266 }
267}
268
269impl IntoFuture for Scope {
270 type Output = ();
271 type IntoFuture = Join;
272 fn into_future(self) -> Self::IntoFuture {
273 self.join()
274 }
275}
276
277impl Deref for Scope {
278 type Target = ScopeHandle;
279 fn deref(&self) -> &Self::Target {
280 &self.inner
281 }
282}
283
284impl Borrow<ScopeHandle> for Scope {
285 fn borrow(&self) -> &ScopeHandle {
286 self
287 }
288}
289
290pin_project! {
291 pub struct Join<S = Scope> {
303 scope: S,
304 #[pin]
305 waker_entry: WakerEntry<ScopeState>,
306 }
307}
308
309impl<S: Borrow<ScopeHandle>> Join<S> {
310 fn new(scope: S) -> Self {
311 let waker_entry = scope.borrow().inner.state.waker_entry();
312 Self { scope, waker_entry }
313 }
314
315 pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
320 self.scope.borrow().abort_all_tasks();
321 self
322 }
323
324 pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
329 self.scope.borrow().cancel_all_tasks();
330 self
331 }
332}
333
334impl<S> Future for Join<S>
335where
336 S: Borrow<ScopeHandle>,
337{
338 type Output = ();
339 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340 let this = self.project();
341 let mut state = Borrow::borrow(&*this.scope).lock();
342 if state.has_tasks() {
343 state.add_waker(this.waker_entry, cx.waker().clone());
344 Poll::Pending
345 } else {
346 state.mark_finished();
347 Poll::Ready(())
348 }
349 }
350}
351
352pub trait Spawnable {
355 type Output;
357
358 fn into_task(self, scope: ScopeHandle) -> TaskHandle;
360}
361
362impl<F: Future + Send + 'static> Spawnable for F
363where
364 F::Output: Send + 'static,
365{
366 type Output = F::Output;
367
368 fn into_task(self, scope: ScopeHandle) -> TaskHandle {
369 scope.new_task(None, self)
370 }
371}
372
373#[derive(Clone)]
385pub struct ScopeHandle {
386 inner: Arc<ScopeInner>,
388 }
390
391impl ScopeHandle {
392 pub fn new_child(&self) -> Scope {
394 self.new_child_inner(String::new())
395 }
396
397 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
399 self.new_child_inner(name.into())
400 }
401
402 fn new_child_inner(&self, name: String) -> Scope {
403 let mut state = self.lock();
404 let child = ScopeHandle {
405 inner: Arc::new(ScopeInner {
406 executor: self.inner.executor.clone(),
407 state: Condition::new(ScopeState::new_child(
408 self.clone(),
409 &state,
410 JoinResults::default().into(),
411 )),
412 name,
413 }),
414 };
415 let weak = child.downgrade();
416 state.insert_child(weak);
417 Scope { inner: child }
418 }
419
420 pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
424 let task = future.into_task(self.clone());
425 let task_id = task.id();
426 self.insert_task(task, false);
427 JoinHandle::new(self.clone(), task_id)
428 }
429
430 pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
435 let task = self.new_local_task(None, future);
436 let id = task.id();
437 self.insert_task(task, false);
438 JoinHandle::new(self.clone(), id)
439 }
440
441 pub fn compute<T: Send + 'static>(
446 &self,
447 future: impl Spawnable<Output = T> + Send + 'static,
448 ) -> crate::Task<T> {
449 let task = future.into_task(self.clone());
450 let id = task.id();
451 self.insert_task(task, false);
452 JoinHandle::new(self.clone(), id).into()
453 }
454
455 pub fn compute_local<T: 'static>(
463 &self,
464 future: impl Future<Output = T> + 'static,
465 ) -> crate::Task<T> {
466 let task = self.new_local_task(None, future);
467 let id = task.id();
468 self.insert_task(task, false);
469 JoinHandle::new(self.clone(), id).into()
470 }
471
472 pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
473 ScopeHandle {
474 inner: Arc::new(ScopeInner {
475 executor,
476 state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
477 name: "root".to_string(),
478 }),
479 }
480 }
481
482 pub fn close(&self) {
490 self.lock().close();
491 }
492
493 pub fn cancel(self) -> Join<Self> {
498 self.cancel_all_tasks();
499 Join::new(self)
500 }
501
502 pub fn abort(self) -> impl Future<Output = ()> {
507 self.abort_all_tasks();
508 Join::new(self)
509 }
510
511 #[must_use]
523 pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
524 ScopeActiveGuard::new(self)
525 }
526
527 pub fn is_cancelled(&self) -> bool {
530 self.lock().status().is_cancelled()
531 }
532
533 pub async fn on_no_tasks(&self) {
540 self.inner
541 .state
542 .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
543 .await;
544 }
545
546 pub async fn on_no_tasks_and_guards(&self) {
549 self.inner
550 .state
551 .when(|state| {
552 if state.has_tasks() || state.guards() > 0 {
553 Poll::Pending
554 } else {
555 Poll::Ready(())
556 }
557 })
558 .await;
559 }
560
561 pub fn wake_all_with_active_guard(&self) {
563 self.lock().wake_all_with_active_guard();
564 }
565
566 pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
569 &self,
570 id: Option<usize>,
571 fut: Fut,
572 ) -> AtomicFutureHandle<'a>
573 where
574 Fut::Output: Send,
575 {
576 AtomicFutureHandle::new(
577 Some(self.clone()),
578 id.unwrap_or_else(|| self.executor().next_task_id()),
579 fut,
580 )
581 }
582
583 pub(crate) fn new_local_task<'a>(
586 &self,
587 id: Option<usize>,
588 fut: impl Future + 'a,
589 ) -> AtomicFutureHandle<'a> {
590 if !self.executor().is_local() {
592 panic!(
593 "Error: called `new_local_task` on multithreaded executor. \
594 Use `spawn` or a `LocalExecutor` instead."
595 );
596 }
597
598 unsafe {
601 AtomicFutureHandle::new_local(
602 Some(self.clone()),
603 id.unwrap_or_else(|| self.executor().next_task_id()),
604 fut,
605 )
606 }
607 }
608}
609
610impl fmt::Debug for ScopeHandle {
611 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
612 f.debug_struct("Scope").field("name", &self.inner.name).finish()
613 }
614}
615
616pub struct ScopeStream<R> {
626 inner: ScopeHandle,
627 stream: Arc<Mutex<ResultsStreamInner<R>>>,
628}
629
630impl<R: Send + 'static> ScopeStream<R> {
631 pub fn new() -> (Self, ScopeStreamHandle<R>) {
640 Self::new_inner(String::new())
641 }
642
643 pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
652 Self::new_inner(name.into())
653 }
654
655 fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
656 let this = ScopeHandle::with_current(|handle| {
657 let mut state = handle.lock();
658 let stream = Arc::default();
659 let child = ScopeHandle {
660 inner: Arc::new(ScopeInner {
661 executor: handle.executor().clone(),
662 state: Condition::new(ScopeState::new_child(
663 handle.clone(),
664 &state,
665 Box::new(ResultsStream { inner: Arc::clone(&stream) }),
666 )),
667 name,
668 }),
669 };
670 let weak = child.downgrade();
671 state.insert_child(weak);
672 ScopeStream { inner: child, stream }
673 });
674 let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
675 (this, handle)
676 }
677}
678
679impl<R> Drop for ScopeStream<R> {
680 fn drop(&mut self) {
681 self.inner.abort_all_tasks();
690 }
691}
692
693impl<R: Send + 'static> Stream for ScopeStream<R> {
694 type Item = R;
695
696 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
697 let mut stream_inner = self.stream.lock();
698 match stream_inner.results.pop() {
699 Some(result) => Poll::Ready(Some(result)),
700 None => {
701 drop(stream_inner);
704 let state = self.inner.lock();
705 let mut stream_inner = self.stream.lock();
706 match stream_inner.results.pop() {
707 Some(result) => Poll::Ready(Some(result)),
708 None => {
709 if state.has_tasks() {
710 stream_inner.waker = Some(cx.waker().clone());
711 Poll::Pending
712 } else {
713 Poll::Ready(None)
714 }
715 }
716 }
717 }
718 }
719 }
720}
721
722impl<R> Deref for ScopeStream<R> {
723 type Target = ScopeHandle;
724 fn deref(&self) -> &Self::Target {
725 &self.inner
726 }
727}
728
729impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
730 fn borrow(&self) -> &ScopeHandle {
731 self
732 }
733}
734
735impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
736 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
737 let (stream, handle) = ScopeStream::new();
738 for fut in iter {
739 handle.push(fut);
740 }
741 stream.close();
742 stream
743 }
744}
745
746#[derive(Clone)]
747pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
748
749impl<R: Send> ScopeStreamHandle<R> {
750 pub fn push(&self, future: impl Spawnable<Output = R>) {
751 self.0.insert_task(future.into_task(self.0.clone()), true);
752 }
753}
754
755#[derive(Debug)]
765#[must_use]
766pub struct ScopeActiveGuard(ScopeHandle);
767
768impl Deref for ScopeActiveGuard {
769 type Target = ScopeHandle;
770 fn deref(&self) -> &Self::Target {
771 &self.0
772 }
773}
774
775impl Drop for ScopeActiveGuard {
776 fn drop(&mut self) {
777 let Self(scope) = self;
778 scope.release_cancel_guard();
779 }
780}
781
782impl Clone for ScopeActiveGuard {
783 fn clone(&self) -> Self {
784 self.0.lock().acquire_cancel_guard(1);
785 Self(self.0.clone())
786 }
787}
788
789impl ScopeActiveGuard {
790 pub fn as_handle(&self) -> &ScopeHandle {
792 &self.0
793 }
794
795 pub fn to_handle(&self) -> ScopeHandle {
797 self.0.clone()
798 }
799
800 pub async fn on_cancel(&self) {
806 self.0
807 .inner
808 .state
809 .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
810 .await
811 }
812
813 fn new(scope: &ScopeHandle) -> Option<Self> {
814 if scope.lock().acquire_cancel_guard_if_not_finished() {
815 Some(Self(scope.clone()))
816 } else {
817 None
818 }
819 }
820}
821
822#[derive(Clone)]
828struct WeakScopeHandle {
829 inner: Weak<ScopeInner>,
830}
831
832impl WeakScopeHandle {
833 pub fn upgrade(&self) -> Option<ScopeHandle> {
835 self.inner.upgrade().map(|inner| ScopeHandle { inner })
836 }
837}
838
839impl hash::Hash for WeakScopeHandle {
840 fn hash<H: hash::Hasher>(&self, state: &mut H) {
841 Weak::as_ptr(&self.inner).hash(state);
842 }
843}
844
845impl PartialEq for WeakScopeHandle {
846 fn eq(&self, other: &Self) -> bool {
847 Weak::ptr_eq(&self.inner, &other.inner)
848 }
849}
850
851impl Eq for WeakScopeHandle {
852 }
855
856mod state {
859 use super::*;
860
861 pub struct ScopeState {
862 pub parent: Option<ScopeHandle>,
863 children: HashSet<WeakScopeHandle>,
865 all_tasks: HashSet<TaskHandle>,
866 subscopes_with_tasks: u32,
870 can_spawn: bool,
871 guards: u32,
872 status: Status,
873 pub results: Box<dyn Results>,
875 }
876
877 pub enum JoinResult {
878 Waker(Waker),
879 Result(TaskHandle),
880 }
881
882 #[repr(u8)] #[derive(Default, Debug, Clone, Copy)]
884 pub enum Status {
885 #[default]
886 Active,
888 PendingCancellation,
891 Finished,
894 }
895
896 impl Status {
897 pub fn is_cancelled(&self) -> bool {
899 match self {
900 Self::Active => false,
901 Self::PendingCancellation | Self::Finished => true,
902 }
903 }
904 }
905
906 impl ScopeState {
907 pub fn new_root(results: Box<impl Results>) -> Self {
908 Self {
909 parent: None,
910 children: Default::default(),
911 all_tasks: Default::default(),
912 subscopes_with_tasks: 0,
913 can_spawn: true,
914 guards: 0,
915 status: Default::default(),
916 results,
917 }
918 }
919
920 pub fn new_child(
921 parent_handle: ScopeHandle,
922 parent_state: &Self,
923 results: Box<impl Results>,
924 ) -> Self {
925 let (status, can_spawn) = match parent_state.status {
926 Status::Active => (Status::Active, parent_state.can_spawn),
927 Status::Finished | Status::PendingCancellation => (Status::Finished, false),
928 };
929 Self {
930 parent: Some(parent_handle),
931 children: Default::default(),
932 all_tasks: Default::default(),
933 subscopes_with_tasks: 0,
934 can_spawn,
935 guards: 0,
936 status,
937 results,
938 }
939 }
940 }
941
942 impl ScopeState {
943 pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
944 &self.all_tasks
945 }
946
947 pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
950 if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
951 return Some(task);
952 }
953 if self.all_tasks.is_empty() && !self.register_first_task() {
954 return Some(task);
955 }
956 task.wake();
957 assert!(self.all_tasks.insert(task));
958 None
959 }
960
961 pub fn children(&self) -> &HashSet<WeakScopeHandle> {
962 &self.children
963 }
964
965 pub fn insert_child(&mut self, child: WeakScopeHandle) {
966 self.children.insert(child);
967 }
968
969 pub fn remove_child(&mut self, child: &PtrKey) {
970 let found = self.children.remove(child);
971 assert!(found || self.children.is_empty());
974 }
975
976 pub fn status(&self) -> Status {
977 self.status
978 }
979
980 pub fn guards(&self) -> u32 {
981 self.guards
982 }
983
984 pub fn close(&mut self) {
985 self.can_spawn = false;
986 }
987
988 pub fn mark_finished(&mut self) {
989 self.can_spawn = false;
990 self.status = Status::Finished;
991 }
992
993 pub fn has_tasks(&self) -> bool {
994 self.subscopes_with_tasks > 0
995 }
996
997 pub fn wake_all_with_active_guard(&mut self) {
998 let mut count = 0;
999 for task in &self.all_tasks {
1000 if task.wake_with_active_guard() {
1001 count += 1;
1002 }
1003 }
1004 self.acquire_cancel_guard(count);
1005 }
1006
1007 pub fn abort_tasks_and_mark_finished(&mut self) {
1008 for task in self.all_tasks() {
1009 if task.abort() {
1010 task.scope().executor().ready_tasks.push(task.clone());
1011 }
1012 }
1015 self.mark_finished();
1016 }
1017
1018 pub fn wake_wakers_and_mark_pending(
1019 this: &mut ConditionGuard<'_, ScopeState>,
1020 wakers: &mut Vec<Waker>,
1021 ) {
1022 wakers.extend(this.drain_wakers());
1023 this.status = Status::PendingCancellation;
1024 }
1025
1026 #[must_use]
1030 fn register_first_task(&mut self) -> bool {
1031 if !self.can_spawn {
1032 return false;
1033 }
1034 let can_spawn = match &self.parent {
1035 Some(parent) => {
1036 self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1039 }
1040 None => true,
1041 };
1042 if can_spawn {
1043 self.subscopes_with_tasks += 1;
1044 debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1045 };
1046 can_spawn
1047 }
1048
1049 fn on_last_task_removed(
1050 this: &mut ConditionGuard<'_, ScopeState>,
1051 num_wakers_hint: usize,
1052 wakers: &mut Vec<Waker>,
1053 ) {
1054 debug_assert!(this.subscopes_with_tasks > 0);
1055 this.subscopes_with_tasks -= 1;
1056 if this.subscopes_with_tasks > 0 {
1057 wakers.reserve(num_wakers_hint);
1058 return;
1059 }
1060
1061 match &this.parent {
1062 Some(parent) => {
1063 Self::on_last_task_removed(
1064 &mut parent.lock(),
1065 num_wakers_hint + this.waker_count(),
1066 wakers,
1067 );
1068 }
1069 None => wakers.reserve(num_wakers_hint),
1070 };
1071 wakers.extend(this.drain_wakers());
1072 }
1073
1074 pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1078 match self.status {
1079 Status::Active | Status::PendingCancellation => {
1080 self.acquire_cancel_guard(1);
1081 true
1082 }
1083 Status::Finished => false,
1084 }
1085 }
1086
1087 pub fn acquire_cancel_guard(&mut self, count: u32) {
1088 if count == 0 {
1089 return;
1090 }
1091 if self.guards == 0 {
1092 if let Some(parent) = self.parent.as_ref() {
1093 parent.acquire_cancel_guard();
1094 }
1095 }
1096 self.guards += count;
1097 }
1098
1099 pub fn release_cancel_guard(
1100 this: &mut ConditionGuard<'_, Self>,
1101 wake_vec: &mut WakeVec,
1102 mut waker_count: usize,
1103 ) {
1104 this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1105 if this.guards == 0 {
1106 waker_count += this.waker_count();
1107 this.on_zero_guards(wake_vec, waker_count);
1108 wake_vec.0.extend(this.drain_wakers())
1109 } else {
1110 wake_vec.0.reserve_exact(waker_count);
1111 }
1112 }
1113
1114 fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1115 match self.status {
1116 Status::Active => {}
1117 Status::PendingCancellation => {
1118 self.abort_tasks_and_mark_finished();
1119 }
1120 Status::Finished => {}
1123 }
1124 if let Some(parent) = &self.parent {
1125 ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1126 }
1127 }
1128 }
1129
1130 #[derive(Default)]
1131 pub struct WakeVec(Vec<Waker>);
1132
1133 impl Drop for WakeVec {
1134 fn drop(&mut self) {
1135 for waker in self.0.drain(..) {
1136 waker.wake();
1137 }
1138 }
1139 }
1140
1141 pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1143
1144 impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1145 fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1146 Self(value, WakeVec::default())
1147 }
1148 }
1149
1150 impl ScopeWaker<'_> {
1151 pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
1152 let task = self.all_tasks.take(&id);
1153 if task.is_some() {
1154 self.on_task_removed(0);
1155 }
1156 task
1157 }
1158
1159 pub fn task_did_finish(&mut self, id: usize) {
1160 if let Some(task) = self.all_tasks.take(&id) {
1161 self.on_task_removed(1);
1162 if !task.is_detached() {
1163 let maybe_waker = self.results.task_did_finish(task);
1164 self.1 .0.extend(maybe_waker);
1165 }
1166 }
1167 }
1168
1169 pub fn set_closed_and_drain(
1170 &mut self,
1171 ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1172 self.close();
1173 let all_tasks = std::mem::take(&mut self.all_tasks);
1174 let results = self.results.take();
1175 if !all_tasks.is_empty() {
1176 self.on_task_removed(0)
1177 }
1178 let children = self.children.drain();
1179 (all_tasks, results, children)
1180 }
1181
1182 fn on_task_removed(&mut self, num_wakers_hint: usize) {
1183 if self.all_tasks.is_empty() {
1184 ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1 .0)
1185 }
1186 }
1187
1188 pub fn wake_wakers_and_mark_pending(&mut self) {
1189 let Self(state, wakers) = self;
1190 ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1191 }
1192 }
1193
1194 impl<'a> Deref for ScopeWaker<'a> {
1195 type Target = ConditionGuard<'a, ScopeState>;
1196
1197 fn deref(&self) -> &Self::Target {
1198 &self.0
1199 }
1200 }
1201
1202 impl DerefMut for ScopeWaker<'_> {
1203 fn deref_mut(&mut self) -> &mut Self::Target {
1204 &mut self.0
1205 }
1206 }
1207}
1208
1209struct ScopeInner {
1210 executor: Arc<Executor>,
1211 state: Condition<ScopeState>,
1212 name: String,
1213}
1214
1215impl Drop for ScopeInner {
1216 fn drop(&mut self) {
1217 let key = unsafe { &*(self as *const _ as *const PtrKey) };
1222 let state = self.state.lock();
1223 if let Some(parent) = &state.parent {
1224 let mut wake_vec = WakeVec::default();
1225 let mut parent_state = parent.lock();
1226 if state.guards() != 0 {
1227 ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1228 }
1229 parent_state.remove_child(key);
1230 }
1231 }
1232}
1233
1234impl ScopeHandle {
1235 fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1236 super::common::TaskHandle::with_current(|task| match task {
1237 Some(task) => f(task.scope()),
1238 None => f(EHandle::local().global_scope()),
1239 })
1240 }
1241
1242 fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1243 self.inner.state.lock()
1244 }
1245
1246 fn downgrade(&self) -> WeakScopeHandle {
1247 WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1248 }
1249
1250 #[inline(always)]
1251 pub(crate) fn executor(&self) -> &Arc<Executor> {
1252 &self.inner.executor
1253 }
1254
1255 pub(crate) fn detach(&self, task_id: usize) {
1257 let _maybe_task = {
1258 let mut state = self.lock();
1259 if let Some(task) = state.all_tasks().get(&task_id) {
1260 task.detach();
1261 }
1262 state.results.detach(task_id)
1263 };
1264 }
1265
1266 pub(crate) unsafe fn abort_task<R>(&self, task_id: usize) -> Option<R> {
1272 let mut state = self.lock();
1273 if let Some(task) = state.results.detach(task_id) {
1274 drop(state);
1275 return task.take_result();
1276 }
1277 state.all_tasks().get(&task_id).and_then(|task| {
1278 if task.abort() {
1279 self.inner.executor.ready_tasks.push(task.clone());
1280 }
1281 task.take_result()
1282 })
1283 }
1284
1285 pub(crate) fn abort_and_detach(&self, task_id: usize) {
1287 let _tasks = {
1288 let mut state = ScopeWaker::from(self.lock());
1289 let maybe_task1 = state.results.detach(task_id);
1290 let mut maybe_task2 = None;
1291 if let Some(task) = state.all_tasks().get(&task_id) {
1292 match task.abort_and_detach() {
1293 AbortAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1294 AbortAndDetachResult::AddToRunQueue => {
1295 self.inner.executor.ready_tasks.push(task.clone());
1296 }
1297 AbortAndDetachResult::Pending => {}
1298 }
1299 }
1300 (maybe_task1, maybe_task2)
1301 };
1302 }
1303
1304 pub(crate) unsafe fn poll_join_result<R>(
1310 &self,
1311 task_id: usize,
1312 cx: &mut Context<'_>,
1313 ) -> Poll<R> {
1314 let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1315 match task.take_result() {
1316 Some(result) => Poll::Ready(result),
1317 None => {
1318 Poll::Pending
1320 }
1321 }
1322 }
1323
1324 pub(crate) unsafe fn poll_aborted<R>(
1326 &self,
1327 task_id: usize,
1328 cx: &mut Context<'_>,
1329 ) -> Poll<Option<R>> {
1330 let task = self.lock().results.poll_join_result(task_id, cx);
1331 task.map(|task| task.take_result())
1332 }
1333
1334 pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1335 let returned_task = self.lock().insert_task(task, for_stream);
1336 returned_task.is_none()
1337 }
1338
1339 pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1350 let mut state = ScopeWaker::from(self.lock());
1351 let task = state.take_task(task_id);
1352 if let Some(task) = task {
1353 task.drop_future_unchecked();
1354 }
1355 }
1356
1357 pub(super) fn task_did_finish(&self, id: usize) {
1358 let mut state = ScopeWaker::from(self.lock());
1359 state.task_did_finish(id);
1360 }
1361
1362 fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1365 let mut scopes = vec![self.clone()];
1366 while let Some(scope) = scopes.pop() {
1367 let mut scope_waker = ScopeWaker::from(scope.lock());
1368 if callback(&mut scope_waker) {
1369 scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1370 }
1371 }
1372 }
1373
1374 fn acquire_cancel_guard(&self) {
1375 self.lock().acquire_cancel_guard(1)
1376 }
1377
1378 pub(crate) fn release_cancel_guard(&self) {
1379 let mut wake_vec = WakeVec::default();
1380 ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1381 }
1382
1383 fn cancel_all_tasks(&self) {
1385 self.visit_scopes_locked(|state| {
1386 match state.status() {
1387 Status::Active => {
1388 if state.guards() == 0 {
1389 state.abort_tasks_and_mark_finished();
1390 } else {
1391 state.wake_wakers_and_mark_pending();
1392 }
1393 true
1394 }
1395 Status::PendingCancellation => {
1396 true
1400 }
1401 Status::Finished => {
1402 false
1404 }
1405 }
1406 });
1407 }
1408
1409 fn abort_all_tasks(&self) {
1411 self.visit_scopes_locked(|state| match state.status() {
1412 Status::Active | Status::PendingCancellation => {
1413 state.abort_tasks_and_mark_finished();
1414 true
1415 }
1416 Status::Finished => false,
1417 });
1418 }
1419
1420 pub(super) fn drop_all_tasks(&self) {
1427 let mut scopes = vec![self.clone()];
1428 while let Some(scope) = scopes.pop() {
1429 let (tasks, join_results) = {
1430 let mut state = ScopeWaker::from(scope.lock());
1431 let (tasks, join_results, children) = state.set_closed_and_drain();
1432 scopes.extend(children.filter_map(|child| child.upgrade()));
1433 (tasks, join_results)
1434 };
1435 for task in tasks {
1437 task.try_drop().expect("Expected drop to succeed");
1438 }
1439 std::mem::drop(join_results);
1440 }
1441 }
1442}
1443
1444#[repr(transparent)]
1446struct PtrKey;
1447
1448impl Borrow<PtrKey> for WeakScopeHandle {
1449 fn borrow(&self) -> &PtrKey {
1450 unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1452 }
1453}
1454
1455impl PartialEq for PtrKey {
1456 fn eq(&self, other: &Self) -> bool {
1457 std::ptr::eq(self, other)
1458 }
1459}
1460
1461impl Eq for PtrKey {}
1462
1463impl hash::Hash for PtrKey {
1464 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1465 (self as *const PtrKey).hash(state);
1466 }
1467}
1468
1469#[derive(Default)]
1470struct JoinResults(HashMap<usize, JoinResult>);
1471
1472trait Results: Send + Sync + 'static {
1473 fn can_spawn(&self) -> bool;
1475
1476 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1478
1479 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1481
1482 fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1484
1485 fn take(&mut self) -> Box<dyn Any>;
1487
1488 #[cfg(test)]
1490 fn is_empty(&self) -> bool;
1491}
1492
1493impl Results for JoinResults {
1494 fn can_spawn(&self) -> bool {
1495 true
1496 }
1497
1498 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1499 match self.0.entry(task_id) {
1500 Entry::Occupied(mut o) => match o.get_mut() {
1501 JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1502 JoinResult::Result(_) => {
1503 let JoinResult::Result(task) = o.remove() else { unreachable!() };
1504 return Poll::Ready(task);
1505 }
1506 },
1507 Entry::Vacant(v) => {
1508 v.insert(JoinResult::Waker(cx.waker().clone()));
1509 }
1510 }
1511 Poll::Pending
1512 }
1513
1514 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1515 match self.0.entry(task.id()) {
1516 Entry::Occupied(mut o) => {
1517 let JoinResult::Waker(waker) =
1518 std::mem::replace(o.get_mut(), JoinResult::Result(task))
1519 else {
1520 unreachable!()
1524 };
1525 Some(waker)
1526 }
1527 Entry::Vacant(v) => {
1528 v.insert(JoinResult::Result(task));
1529 None
1530 }
1531 }
1532 }
1533
1534 fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1535 match self.0.remove(&task_id) {
1536 Some(JoinResult::Result(task)) => Some(task),
1537 _ => None,
1538 }
1539 }
1540
1541 fn take(&mut self) -> Box<dyn Any> {
1542 Box::new(Self(std::mem::take(&mut self.0)))
1543 }
1544
1545 #[cfg(test)]
1546 fn is_empty(&self) -> bool {
1547 self.0.is_empty()
1548 }
1549}
1550
1551#[derive(Default)]
1552struct ResultsStream<R> {
1553 inner: Arc<Mutex<ResultsStreamInner<R>>>,
1554}
1555
1556struct ResultsStreamInner<R> {
1557 results: Vec<R>,
1558 waker: Option<Waker>,
1559}
1560
1561impl<R> Default for ResultsStreamInner<R> {
1562 fn default() -> Self {
1563 Self { results: Vec::new(), waker: None }
1564 }
1565}
1566
1567impl<R: Send + 'static> Results for ResultsStream<R> {
1568 fn can_spawn(&self) -> bool {
1569 false
1570 }
1571
1572 fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1573 Poll::Pending
1574 }
1575
1576 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1577 let mut inner = self.inner.lock();
1578 inner.results.extend(unsafe { task.take_result() });
1581 inner.waker.take()
1582 }
1583
1584 fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1585 None
1586 }
1587
1588 fn take(&mut self) -> Box<dyn Any> {
1589 Box::new(std::mem::take(&mut self.inner.lock().results))
1590 }
1591
1592 #[cfg(test)]
1593 fn is_empty(&self) -> bool {
1594 false
1595 }
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600 use super::*;
1604 use crate::{
1605 yield_now, EHandle, LocalExecutor, SendExecutor, SpawnableFuture, Task, TestExecutor, Timer,
1606 };
1607 use fuchsia_sync::{Condvar, Mutex};
1608 use futures::channel::mpsc;
1609 use futures::{FutureExt, StreamExt};
1610 use std::future::pending;
1611 use std::pin::{pin, Pin};
1612 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1613 use std::sync::Arc;
1614 use std::task::{Context, Poll};
1615 use std::time::Duration;
1616
1617 #[derive(Default)]
1618 struct RemoteControlFuture(Mutex<RCFState>);
1619 #[derive(Default)]
1620 struct RCFState {
1621 resolved: bool,
1622 waker: Option<Waker>,
1623 }
1624
1625 impl Future for &RemoteControlFuture {
1626 type Output = ();
1627 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1628 let mut this = self.0.lock();
1629 if this.resolved {
1630 Poll::Ready(())
1631 } else {
1632 this.waker.replace(cx.waker().clone());
1633 Poll::Pending
1634 }
1635 }
1636 }
1637
1638 impl RemoteControlFuture {
1639 fn new() -> Arc<Self> {
1640 Arc::new(Default::default())
1641 }
1642
1643 fn resolve(&self) {
1644 let mut this = self.0.lock();
1645 this.resolved = true;
1646 if let Some(waker) = this.waker.take() {
1647 waker.wake();
1648 }
1649 }
1650
1651 fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1652 let this = Arc::clone(self);
1653 #[allow(clippy::redundant_async_block)] async move {
1655 (&*this).await
1656 }
1657 }
1658 }
1659
1660 #[test]
1661 fn compute_works_on_root_scope() {
1662 let mut executor = TestExecutor::new();
1663 let scope = executor.global_scope();
1664 let mut task = pin!(scope.compute(async { 1 }));
1665 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1666 }
1667
1668 #[test]
1669 fn compute_works_on_new_child() {
1670 let mut executor = TestExecutor::new();
1671 let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1672 let mut task = pin!(scope.compute(async { 1 }));
1673 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1674 }
1675
1676 #[test]
1677 fn scope_drop_cancels_tasks() {
1678 let mut executor = TestExecutor::new();
1679 let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1680 let mut task = pin!(scope.compute(async { 1 }));
1681 drop(scope);
1682 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1683 }
1684
1685 #[test]
1686 fn tasks_do_not_spawn_on_cancelled_scopes() {
1687 let mut executor = TestExecutor::new();
1688 let scope =
1689 executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1690 let handle = scope.to_handle();
1691 let mut cancel = pin!(scope.cancel());
1692 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1693 let mut task = pin!(handle.compute(async { 1 }));
1694 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1695 }
1696
1697 #[test]
1698 fn tasks_do_not_spawn_on_closed_empty_scopes() {
1699 let mut executor = TestExecutor::new();
1700 let scope =
1701 executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1702 let handle = scope.to_handle();
1703 let mut close = pin!(scope.cancel());
1704 assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1705 let mut task = pin!(handle.compute(async { 1 }));
1706 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1707 }
1708
1709 #[test]
1710 fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1711 let mut executor = TestExecutor::new();
1712 let scope = executor.global_scope().new_child();
1713 let handle = scope.to_handle();
1714 handle.spawn(pending());
1715 let mut close = pin!(scope.close());
1716 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1717 let mut task = pin!(handle.compute(async { 1 }));
1718 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1719 }
1720
1721 #[test]
1722 fn spawn_works_on_child_and_grandchild() {
1723 let mut executor = TestExecutor::new();
1724 let scope = executor.global_scope().new_child();
1725 let child = scope.new_child();
1726 let grandchild = child.new_child();
1727 let mut child_task = pin!(child.compute(async { 1 }));
1728 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1729 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1730 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1731 }
1732
1733 #[test]
1734 fn spawn_drop_cancels_child_and_grandchild_tasks() {
1735 let mut executor = TestExecutor::new();
1736 let scope = executor.global_scope().new_child();
1737 let child = scope.new_child();
1738 let grandchild = child.new_child();
1739 let mut child_task = pin!(child.compute(async { 1 }));
1740 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1741 drop(scope);
1742 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1743 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1744 }
1745
1746 #[test]
1747 fn completed_tasks_are_cleaned_up_after_cancel() {
1748 let mut executor = TestExecutor::new();
1749 let scope = executor.global_scope().new_child();
1750
1751 let task1 = scope.spawn(pending::<()>());
1752 let task2 = scope.spawn(async {});
1753 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1754 assert_eq!(scope.lock().all_tasks().len(), 1);
1755
1756 assert_eq!(task1.abort().now_or_never(), None);
1759 assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1760
1761 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1762 assert_eq!(scope.lock().all_tasks().len(), 0);
1763 assert!(scope.lock().results.is_empty());
1764 }
1765
1766 #[test]
1767 fn join_emtpy_scope() {
1768 let mut executor = TestExecutor::new();
1769 let scope = executor.global_scope().new_child();
1770 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1771 }
1772
1773 #[test]
1774 fn task_handle_preserves_access_to_result_after_join_begins() {
1775 let mut executor = TestExecutor::new();
1776 let scope = executor.global_scope().new_child();
1777 let mut task = scope.compute(async { 1 });
1778 scope.spawn(async {});
1779 let task2 = scope.spawn(pending::<()>());
1780 let mut join = pin!(scope.join().fuse());
1783 let _ = executor.run_until_stalled(&mut join);
1784 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1785 drop(task2.abort());
1786 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1787 }
1788
1789 #[test]
1790 fn join_blocks_until_task_is_cancelled() {
1791 let mut executor = TestExecutor::new();
1794 let scope = executor.global_scope().new_child();
1795 let outstanding_task = scope.spawn(pending::<()>());
1796 let cancelled_task = scope.spawn(pending::<()>());
1797 assert_eq!(
1798 executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1799 Poll::Ready(None)
1800 );
1801 let mut join = pin!(scope.join());
1802 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1803 assert_eq!(
1804 executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1805 Poll::Ready(None)
1806 );
1807 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1808 }
1809
1810 #[test]
1811 fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1812 let mut executor = TestExecutor::new();
1813 let scope = executor.global_scope().new_child();
1814 scope.spawn(pending::<()>());
1816 let mut join = pin!(scope.join());
1817 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1818 let mut cancel = pin!(join.cancel());
1819 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1820 }
1821
1822 #[test]
1823 fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1824 let mut executor = TestExecutor::new();
1825 let scope = executor.global_scope().new_child();
1826 scope.spawn(pending::<()>());
1828 let mut close = pin!(scope.close());
1829 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1830 let mut cancel = pin!(close.cancel());
1831 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1832 }
1833
1834 #[test]
1835 fn join_scope_blocks_until_spawned_task_completes() {
1836 let mut executor = TestExecutor::new();
1837 let scope = executor.global_scope().new_child();
1838 let remote = RemoteControlFuture::new();
1839 let mut task = scope.spawn(remote.as_future());
1840 let mut scope_join = pin!(scope.join());
1841 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1842 remote.resolve();
1843 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1844 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1845 }
1846
1847 #[test]
1848 fn close_scope_blocks_until_spawned_task_completes() {
1849 let mut executor = TestExecutor::new();
1850 let scope = executor.global_scope().new_child();
1851 let remote = RemoteControlFuture::new();
1852 let mut task = scope.spawn(remote.as_future());
1853 let mut scope_close = pin!(scope.close());
1854 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1855 remote.resolve();
1856 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1857 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1858 }
1859
1860 #[test]
1861 fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1862 let mut executor = TestExecutor::new();
1863 let scope = executor.global_scope().new_child();
1864 let child = scope.new_child();
1865 let remote = RemoteControlFuture::new();
1866 child.spawn(remote.as_future());
1867 let mut scope_join = pin!(scope.join());
1868 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1869 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1870 child.detach();
1871 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1872 remote.resolve();
1873 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1874 }
1875
1876 #[test]
1877 fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1878 let mut executor = TestExecutor::new();
1879 let scope = executor.global_scope().new_child();
1880 let remote = RemoteControlFuture::new();
1881 {
1882 let remote = remote.clone();
1883 scope.spawn(async move {
1884 let child = Scope::new_with_name("child");
1885 child.spawn(async move {
1886 Scope::current().spawn(remote.as_future());
1887 });
1888 child.detach();
1889 });
1890 }
1891 let mut scope_join = pin!(scope.join());
1892 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1893 remote.resolve();
1894 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1895 }
1896
1897 #[test]
1898 fn join_scope_blocks_when_blocked_child_is_detached() {
1899 let mut executor = TestExecutor::new();
1900 let scope = executor.global_scope().new_child();
1901 let child = scope.new_child();
1902 child.spawn(pending());
1903 let mut scope_join = pin!(scope.join());
1904 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1905 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1906 child.detach();
1907 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1908 }
1909
1910 #[test]
1911 fn join_scope_completes_when_blocked_child_is_cancelled() {
1912 let mut executor = TestExecutor::new();
1913 let scope = executor.global_scope().new_child();
1914 let child = scope.new_child();
1915 child.spawn(pending());
1916 let mut scope_join = pin!(scope.join());
1917 {
1918 let mut child_join = pin!(child.join());
1919 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1920 assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1921 }
1922 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1923 }
1924
1925 #[test]
1926 fn detached_scope_can_spawn() {
1927 let mut executor = TestExecutor::new();
1928 let scope = executor.global_scope().new_child();
1929 let handle = scope.to_handle();
1930 scope.detach();
1931 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1932 }
1933
1934 #[test]
1935 fn dropped_scope_cannot_spawn() {
1936 let mut executor = TestExecutor::new();
1937 let scope = executor.global_scope().new_child();
1938 let handle = scope.to_handle();
1939 drop(scope);
1940 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1941 }
1942
1943 #[test]
1944 fn dropped_scope_with_running_task_cannot_spawn() {
1945 let mut executor = TestExecutor::new();
1946 let scope = executor.global_scope().new_child();
1947 let handle = scope.to_handle();
1948 let _running_task = handle.spawn(pending::<()>());
1949 drop(scope);
1950 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1951 }
1952
1953 #[test]
1954 fn joined_scope_cannot_spawn() {
1955 let mut executor = TestExecutor::new();
1956 let scope = executor.global_scope().new_child();
1957 let handle = scope.to_handle();
1958 let mut scope_join = pin!(scope.join());
1959 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1960 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1961 }
1962
1963 #[test]
1964 fn joining_scope_with_running_task_can_spawn() {
1965 let mut executor = TestExecutor::new();
1966 let scope = executor.global_scope().new_child();
1967 let handle = scope.to_handle();
1968 let _running_task = handle.spawn(pending::<()>());
1969 let mut scope_join = pin!(scope.join());
1970 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1971 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1972 }
1973
1974 #[test]
1975 fn joined_scope_child_cannot_spawn() {
1976 let mut executor = TestExecutor::new();
1977 let scope = executor.global_scope().new_child();
1978 let handle = scope.to_handle();
1979 let child_before_join = scope.new_child();
1980 assert_eq!(
1981 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1982 Poll::Ready(1)
1983 );
1984 let mut scope_join = pin!(scope.join());
1985 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1986 let child_after_join = handle.new_child();
1987 let grandchild_after_join = child_before_join.new_child();
1988 assert_eq!(
1989 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1990 Poll::Pending
1991 );
1992 assert_eq!(
1993 executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
1994 Poll::Pending
1995 );
1996 assert_eq!(
1997 executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
1998 Poll::Pending
1999 );
2000 }
2001
2002 #[test]
2003 fn closed_scope_child_cannot_spawn() {
2004 let mut executor = TestExecutor::new();
2005 let scope = executor.global_scope().new_child();
2006 let handle = scope.to_handle();
2007 let child_before_close = scope.new_child();
2008 assert_eq!(
2009 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2010 Poll::Ready(1)
2011 );
2012 let mut scope_close = pin!(scope.close());
2013 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2014 let child_after_close = handle.new_child();
2015 let grandchild_after_close = child_before_close.new_child();
2016 assert_eq!(
2017 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2018 Poll::Pending
2019 );
2020 assert_eq!(
2021 executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2022 Poll::Pending
2023 );
2024 assert_eq!(
2025 executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2026 Poll::Pending
2027 );
2028 }
2029
2030 #[test]
2031 fn can_join_child_first() {
2032 let mut executor = TestExecutor::new();
2033 let scope = executor.global_scope().new_child();
2034 let child = scope.new_child();
2035 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2036 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2037 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2038 }
2039
2040 #[test]
2041 fn can_join_parent_first() {
2042 let mut executor = TestExecutor::new();
2043 let scope = executor.global_scope().new_child();
2044 let child = scope.new_child();
2045 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2046 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2047 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2048 }
2049
2050 #[test]
2051 fn task_in_parent_scope_can_join_child() {
2052 let mut executor = TestExecutor::new();
2053 let scope = executor.global_scope().new_child();
2054 let child = scope.new_child();
2055 let remote = RemoteControlFuture::new();
2056 child.spawn(remote.as_future());
2057 scope.spawn(async move { child.join().await });
2058 let mut join = pin!(scope.join());
2059 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2060 remote.resolve();
2061 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2062 }
2063
2064 #[test]
2065 fn join_completes_while_completed_task_handle_is_held() {
2066 let mut executor = TestExecutor::new();
2067 let scope = executor.global_scope().new_child();
2068 let mut task = scope.compute(async { 1 });
2069 scope.spawn(async {});
2070 let mut join = pin!(scope.join());
2071 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2072 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2073 }
2074
2075 #[test]
2076 fn cancel_completes_while_task_holds_handle() {
2077 let mut executor = TestExecutor::new();
2078 let scope = executor.global_scope().new_child();
2079 let handle = scope.to_handle();
2080 let mut task = scope.compute(async move {
2081 loop {
2082 pending::<()>().await; handle.spawn(async {});
2084 }
2085 });
2086
2087 let mut join = pin!(scope.join());
2089 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2090
2091 let mut cancel = pin!(join.cancel());
2092 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2093 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2094 }
2095
2096 #[test]
2097 fn cancel_from_handle_inside_task() {
2098 let mut executor = TestExecutor::new();
2099 let scope = executor.global_scope().new_child();
2100 {
2101 scope.spawn(pending::<()>());
2103
2104 let mut no_tasks = pin!(scope.on_no_tasks());
2105 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2106
2107 let handle = scope.to_handle();
2108 scope.spawn(async move {
2109 handle.cancel().await;
2110 panic!("cancel() should never complete");
2111 });
2112
2113 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2114 }
2115 assert_eq!(scope.join().now_or_never(), Some(()));
2116 }
2117
2118 #[test]
2119 fn can_spawn_from_non_executor_thread() {
2120 let mut executor = TestExecutor::new();
2121 let scope = executor.global_scope().clone();
2122 let done = Arc::new(AtomicBool::new(false));
2123 let done_clone = done.clone();
2124 let _ = std::thread::spawn(move || {
2125 scope.spawn(async move {
2126 done_clone.store(true, Ordering::Relaxed);
2127 })
2128 })
2129 .join();
2130 let _ = executor.run_until_stalled(&mut pending::<()>());
2131 assert!(done.load(Ordering::Relaxed));
2132 }
2133
2134 #[test]
2135 fn scope_tree() {
2136 let mut executor = TestExecutor::new();
2142 let a = executor.global_scope().new_child();
2143 let b = a.new_child();
2144 let c = b.new_child();
2145 let d = b.new_child();
2146 let a_remote = RemoteControlFuture::new();
2147 let c_remote = RemoteControlFuture::new();
2148 let d_remote = RemoteControlFuture::new();
2149 a.spawn(a_remote.as_future());
2150 c.spawn(c_remote.as_future());
2151 d.spawn(d_remote.as_future());
2152 let mut a_join = pin!(a.join());
2153 let mut b_join = pin!(b.join());
2154 let mut d_join = pin!(d.join());
2155 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2156 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2157 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2158 d_remote.resolve();
2159 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2160 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2161 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2162 c_remote.resolve();
2163 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2164 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2165 a_remote.resolve();
2166 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2167 let mut c_join = pin!(c.join());
2168 assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2169 }
2170
2171 #[test]
2172 fn wake_all_with_active_guard_on_send_executor() {
2173 let mut executor = SendExecutor::new(2);
2174 let scope = executor.root_scope().new_child();
2175
2176 let (tx, mut rx) = mpsc::unbounded();
2177 let state = Arc::new(AtomicU64::new(0));
2179
2180 struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2181
2182 impl Future for PollCounter {
2183 type Output = ();
2184 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2185 let old = self.0.fetch_add(1, Ordering::Relaxed);
2186 if old >> 32 == (old + 1) & u32::MAX as u64 {
2187 let _ = self.1.unbounded_send(());
2188 }
2189 Poll::Pending
2190 }
2191 }
2192
2193 scope.spawn(PollCounter(state.clone(), tx.clone()));
2194 scope.spawn(PollCounter(state.clone(), tx.clone()));
2195
2196 executor.run(async move {
2197 let mut wait_for_poll_count = async |count| {
2198 let old = state.fetch_or(count << 32, Ordering::Relaxed);
2199 if old & u32::MAX as u64 != count {
2200 rx.next().await.unwrap();
2201 }
2202 state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2203 };
2204
2205 wait_for_poll_count(2).await;
2207
2208 let mut start_count = 2;
2209 for _ in 0..2 {
2210 scope.wake_all_with_active_guard();
2211
2212 wait_for_poll_count(start_count + 2).await;
2213 start_count += 2;
2214 }
2215
2216 scope.wake_all_with_active_guard();
2218 let done = scope.cancel();
2219
2220 wait_for_poll_count(start_count + 2).await;
2221
2222 done.await;
2223 });
2224 }
2225
2226 #[test]
2227 fn on_no_tasks_race() {
2228 fn sleep_random() {
2229 use rand::Rng;
2230 std::thread::sleep(std::time::Duration::from_micros(
2231 rand::thread_rng().gen_range(0..10),
2232 ));
2233 }
2234 for _ in 0..2000 {
2235 let mut executor = SendExecutor::new(2);
2236 let scope = executor.root_scope().new_child();
2237 scope.spawn(async {
2238 sleep_random();
2239 });
2240 executor.run(async move {
2241 sleep_random();
2242 scope.on_no_tasks().await;
2243 });
2244 }
2245 }
2246
2247 #[test]
2248 fn test_detach() {
2249 let mut e = LocalExecutor::new();
2250 e.run_singlethreaded(async {
2251 let counter = Arc::new(AtomicU32::new(0));
2252
2253 {
2254 let counter = counter.clone();
2255 Task::spawn(async move {
2256 for _ in 0..5 {
2257 yield_now().await;
2258 counter.fetch_add(1, Ordering::Relaxed);
2259 }
2260 })
2261 .detach();
2262 }
2263
2264 while counter.load(Ordering::Relaxed) != 5 {
2265 yield_now().await;
2266 }
2267 });
2268
2269 assert!(e.ehandle.root_scope.lock().results.is_empty());
2270 }
2271
2272 #[test]
2273 fn test_cancel() {
2274 let mut e = LocalExecutor::new();
2275 e.run_singlethreaded(async {
2276 let ref_count = Arc::new(());
2277 {
2279 let ref_count = ref_count.clone();
2280 drop(Task::spawn(async move {
2281 let _ref_count = ref_count;
2282 let _: () = std::future::pending().await;
2283 }));
2284 }
2285
2286 while Arc::strong_count(&ref_count) != 1 {
2287 yield_now().await;
2288 }
2289
2290 let task = {
2292 let ref_count = ref_count.clone();
2293 Task::spawn(async move {
2294 let _ref_count = ref_count;
2295 let _: () = std::future::pending().await;
2296 })
2297 };
2298
2299 assert_eq!(task.abort().await, None);
2300 while Arc::strong_count(&ref_count) != 1 {
2301 yield_now().await;
2302 }
2303
2304 let task = {
2306 let ref_count = ref_count.clone();
2307 Task::spawn(async move {
2308 let _ref_count = ref_count;
2309 })
2310 };
2311
2312 while Arc::strong_count(&ref_count) != 1 {
2314 yield_now().await;
2315 }
2316
2317 assert_eq!(task.abort().await, Some(()));
2318 });
2319
2320 assert!(e.ehandle.root_scope.lock().results.is_empty());
2321 }
2322
2323 #[test]
2324 fn test_cancel_waits() {
2325 let mut executor = SendExecutor::new(2);
2326 let running = Arc::new((Mutex::new(false), Condvar::new()));
2327 let task = {
2328 let running = running.clone();
2329 executor.root_scope().compute(async move {
2330 *running.0.lock() = true;
2331 running.1.notify_all();
2332 std::thread::sleep(std::time::Duration::from_millis(10));
2333 *running.0.lock() = false;
2334 "foo"
2335 })
2336 };
2337 executor.run(async move {
2338 {
2339 let mut guard = running.0.lock();
2340 while !*guard {
2341 running.1.wait(&mut guard);
2342 }
2343 }
2344 assert_eq!(task.abort().await, Some("foo"));
2345 assert!(!*running.0.lock());
2346 });
2347 }
2348
2349 fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2350 let mut executor = SendExecutor::new(2);
2351 let running = Arc::new((Mutex::new(false), Condvar::new()));
2352 let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2353 let task = {
2354 let running = running.clone();
2355 let can_quit = can_quit.clone();
2356 executor.root_scope().compute(async move {
2357 *running.0.lock() = true;
2358 running.1.notify_all();
2359 {
2360 let mut guard = can_quit.0.lock();
2361 while !*guard {
2362 can_quit.1.wait(&mut guard);
2363 }
2364 }
2365 *running.0.lock() = false;
2366 })
2367 };
2368 executor.run(async move {
2369 {
2370 let mut guard = running.0.lock();
2371 while !*guard {
2372 running.1.wait(&mut guard);
2373 }
2374 }
2375
2376 callback(task);
2377
2378 *can_quit.0.lock() = true;
2379 can_quit.1.notify_all();
2380
2381 let ehandle = EHandle::local();
2382 let scope = ehandle.global_scope();
2383
2384 while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2386 Timer::new(std::time::Duration::from_millis(1)).await;
2387 }
2388
2389 assert!(!*running.0.lock());
2390 });
2391 }
2392
2393 #[test]
2394 fn test_dropped_cancel_cleans_up() {
2395 test_clean_up(|task| {
2396 let abort_fut = std::pin::pin!(task.abort());
2397 let waker = futures::task::noop_waker();
2398 assert!(abort_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2399 });
2400 }
2401
2402 #[test]
2403 fn test_dropped_task_cleans_up() {
2404 test_clean_up(|task| {
2405 std::mem::drop(task);
2406 });
2407 }
2408
2409 #[test]
2410 fn test_detach_cleans_up() {
2411 test_clean_up(|task| {
2412 task.detach();
2413 });
2414 }
2415
2416 #[test]
2417 fn test_scope_stream() {
2418 let mut executor = SendExecutor::new(2);
2419 executor.run(async move {
2420 let (stream, handle) = ScopeStream::new();
2421 handle.push(async { 1 });
2422 handle.push(async { 2 });
2423 stream.close();
2424 let results: HashSet<_> = stream.collect().await;
2425 assert_eq!(results, HashSet::from_iter([1, 2]));
2426 });
2427 }
2428
2429 #[test]
2430 fn test_scope_stream_wakes_properly() {
2431 let mut executor = SendExecutor::new(2);
2432 executor.run(async move {
2433 let (stream, handle) = ScopeStream::new();
2434 handle.push(async {
2435 Timer::new(Duration::from_millis(10)).await;
2436 1
2437 });
2438 handle.push(async {
2439 Timer::new(Duration::from_millis(10)).await;
2440 2
2441 });
2442 stream.close();
2443 let results: HashSet<_> = stream.collect().await;
2444 assert_eq!(results, HashSet::from_iter([1, 2]));
2445 });
2446 }
2447
2448 #[test]
2449 fn test_scope_stream_drops_spawned_tasks() {
2450 let mut executor = SendExecutor::new(2);
2451 executor.run(async move {
2452 let (stream, handle) = ScopeStream::new();
2453 handle.push(async { 1 });
2454 let _task = stream.compute(async { "foo" });
2455 stream.close();
2456 let results: HashSet<_> = stream.collect().await;
2457 assert_eq!(results, HashSet::from_iter([1]));
2458 });
2459 }
2460
2461 #[test]
2462 fn test_nested_scope_stream() {
2463 let mut executor = SendExecutor::new(2);
2464 executor.run(async move {
2465 let (mut stream, handle) = ScopeStream::new();
2466 handle.clone().push(async move {
2467 handle.clone().push(async move {
2468 handle.clone().push(async move { 3 });
2469 2
2470 });
2471 1
2472 });
2473 let mut results = HashSet::default();
2474 while let Some(item) = stream.next().await {
2475 results.insert(item);
2476 if results.len() == 3 {
2477 stream.close();
2478 }
2479 }
2480 assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2481 });
2482 }
2483
2484 #[test]
2485 fn test_dropping_scope_stream_cancels_all_tasks() {
2486 let mut executor = SendExecutor::new(2);
2487 executor.run(async move {
2488 let (stream, handle) = ScopeStream::new();
2489 let (tx1, mut rx) = mpsc::unbounded::<()>();
2490 let tx2 = tx1.clone();
2491 handle.push(async move {
2492 let _tx1 = tx1;
2493 let () = pending().await;
2494 });
2495 handle.push(async move {
2496 let _tx2 = tx2;
2497 let () = pending().await;
2498 });
2499 drop(stream);
2500
2501 assert_eq!(rx.next().await, None);
2503 });
2504 }
2505
2506 #[test]
2507 fn test_scope_stream_collect() {
2508 let mut executor = SendExecutor::new(2);
2509 executor.run(async move {
2510 let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2511 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2512
2513 let stream: ScopeStream<_> =
2514 (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2515 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2516 });
2517 }
2518
2519 struct DropSignal(Arc<AtomicBool>);
2520
2521 impl Drop for DropSignal {
2522 fn drop(&mut self) {
2523 self.0.store(true, Ordering::SeqCst);
2524 }
2525 }
2526
2527 struct DropChecker(Arc<AtomicBool>);
2528
2529 impl DropChecker {
2530 fn new() -> (Self, DropSignal) {
2531 let inner = Arc::new(AtomicBool::new(false));
2532 (Self(inner.clone()), DropSignal(inner))
2533 }
2534
2535 fn is_dropped(&self) -> bool {
2536 self.0.load(Ordering::SeqCst)
2537 }
2538 }
2539
2540 #[test]
2541 fn child_finished_when_parent_pending() {
2542 let mut executor = LocalExecutor::new();
2543 executor.run_singlethreaded(async {
2544 let scope = Scope::new();
2545 let _guard = scope.active_guard().expect("acquire guard");
2546 let cancel = scope.to_handle().cancel();
2547 let child = scope.new_child();
2548 let (checker, signal) = DropChecker::new();
2549 child.spawn(async move {
2550 let _signal = signal;
2551 futures::future::pending::<()>().await
2552 });
2553 assert!(checker.is_dropped());
2554 assert!(child.active_guard().is_none());
2555 cancel.await;
2556 })
2557 }
2558
2559 #[test]
2560 fn guarded_scopes_observe_closed() {
2561 let mut executor = LocalExecutor::new();
2562 executor.run_singlethreaded(async {
2563 let scope = Scope::new();
2564 let handle = scope.to_handle();
2565 let _guard = scope.active_guard().expect("acquire guard");
2566 handle.close();
2567 let (checker, signal) = DropChecker::new();
2568 handle.spawn(async move {
2569 let _signal = signal;
2570 futures::future::pending::<()>().await
2571 });
2572 assert!(checker.is_dropped());
2573 let (checker, signal) = DropChecker::new();
2574 let cancel = handle.clone().cancel();
2575 handle.spawn(async move {
2576 let _signal = signal;
2577 futures::future::pending::<()>().await
2578 });
2579 assert!(checker.is_dropped());
2580 scope.join().await;
2581 cancel.await;
2582 })
2583 }
2584
2585 #[test]
2586 fn child_guard_holds_parent_cancellation() {
2587 let mut executor = TestExecutor::new();
2588 let scope = executor.global_scope().new_child();
2589 let child = scope.new_child();
2590 let guard = child.active_guard().expect("acquire guard");
2591 scope.spawn(futures::future::pending());
2592 let mut join = pin!(scope.cancel());
2593 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2594 drop(guard);
2595 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2596 }
2597
2598 #[test]
2599 fn active_guard_on_cancel() {
2600 let mut executor = TestExecutor::new();
2601 let scope = executor.global_scope().new_child();
2602 let child1 = scope.new_child();
2603 let child2 = scope.new_child();
2604 let guard = child1.active_guard().expect("acquire guard");
2605 let guard_for_right_scope = guard.clone();
2606 let guard_for_wrong_scope = guard.clone();
2607 child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2608 child2.spawn(async move {
2609 guard_for_wrong_scope.on_cancel().await;
2610 });
2611
2612 let handle = scope.to_handle();
2613 let mut join = pin!(scope.join());
2614 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2615 let cancel: Join<_> = handle.cancel();
2616 drop(cancel);
2617 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2618 }
2619
2620 #[test]
2621 fn abort_join() {
2622 let mut executor = TestExecutor::new();
2623 let scope = executor.global_scope().new_child();
2624 let child = scope.new_child();
2625 let _guard = child.active_guard().expect("acquire guard");
2626
2627 let (checker1, signal) = DropChecker::new();
2628 scope.spawn(async move {
2629 let _signal = signal;
2630 futures::future::pending::<()>().await
2631 });
2632 let (checker2, signal) = DropChecker::new();
2633 scope.spawn(async move {
2634 let _signal = signal;
2635 futures::future::pending::<()>().await
2636 });
2637
2638 let mut join = pin!(scope.cancel());
2639 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2640 assert!(!checker1.is_dropped());
2641 assert!(!checker2.is_dropped());
2642
2643 let mut join = join.abort();
2644 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2645 assert!(checker1.is_dropped());
2646 assert!(checker2.is_dropped());
2647 }
2648
2649 #[test]
2650 fn child_without_guard_aborts_immediately_on_cancel() {
2651 let mut executor = TestExecutor::new();
2652 let scope = executor.global_scope().new_child();
2653 let child = scope.new_child();
2654 let guard = scope.active_guard().expect("acquire guard");
2655
2656 let (checker_scope, signal) = DropChecker::new();
2657 scope.spawn(async move {
2658 let _signal = signal;
2659 futures::future::pending::<()>().await
2660 });
2661 let (checker_child, signal) = DropChecker::new();
2662 child.spawn(async move {
2663 let _signal = signal;
2664 futures::future::pending::<()>().await
2665 });
2666
2667 let mut join = pin!(scope.cancel());
2668 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2669 assert!(!checker_scope.is_dropped());
2670 assert!(checker_child.is_dropped());
2671
2672 drop(guard);
2673 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2674 assert!(checker_child.is_dropped());
2675 }
2676}