1use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::EHandle;
9use crate::condition::{Condition, ConditionGuard, WakerEntry};
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{Context, Poll, Waker, ready};
26use std::{fmt, hash};
27
28#[must_use = "Scopes should be explicitly awaited or cancelled"]
73#[derive(Debug)]
74pub struct Scope {
75 inner: ScopeHandle,
77 }
79
80impl Default for Scope {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl Scope {
87 pub fn new() -> Scope {
96 ScopeHandle::with_current(|handle| handle.new_child())
97 }
98
99 pub fn new_with_name(name: impl Into<String>) -> Scope {
108 ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
109 }
110
111 pub fn current() -> ScopeHandle {
119 ScopeHandle::with_current(|handle| handle.clone())
120 }
121
122 pub fn global() -> ScopeHandle {
139 EHandle::local().global_scope().clone()
140 }
141
142 pub fn new_child(&self) -> Scope {
144 self.inner.new_child()
145 }
146
147 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
149 self.inner.new_child_with_name(name.into())
150 }
151
152 pub fn name(&self) -> &str {
154 &self.inner.inner.name
155 }
156
157 pub fn to_handle(&self) -> ScopeHandle {
165 self.inner.clone()
166 }
167
168 pub fn as_handle(&self) -> &ScopeHandle {
175 &self.inner
176 }
177
178 pub fn join(self) -> Join {
187 Join::new(self)
188 }
189
190 pub fn close(self) -> Join {
193 self.inner.close();
194 Join::new(self)
195 }
196
197 pub fn cancel(self) -> Join {
215 self.inner.cancel_all_tasks();
216 Join::new(self)
217 }
218
219 pub fn abort(self) -> impl Future<Output = ()> {
234 self.inner.abort_all_tasks();
235 Join::new(self)
236 }
237
238 pub fn detach(self) {
244 let this = ManuallyDrop::new(self);
247 mem::drop(unsafe { std::ptr::read(&this.inner) });
250 }
251}
252
253impl Drop for Scope {
256 fn drop(&mut self) {
257 self.inner.abort_all_tasks();
266 }
267}
268
269impl IntoFuture for Scope {
270 type Output = ();
271 type IntoFuture = Join;
272 fn into_future(self) -> Self::IntoFuture {
273 self.join()
274 }
275}
276
277impl Deref for Scope {
278 type Target = ScopeHandle;
279 fn deref(&self) -> &Self::Target {
280 &self.inner
281 }
282}
283
284impl Borrow<ScopeHandle> for Scope {
285 fn borrow(&self) -> &ScopeHandle {
286 self
287 }
288}
289
290pin_project! {
291 pub struct Join<S = Scope> {
303 scope: S,
304 #[pin]
305 waker_entry: WakerEntry<ScopeState>,
306 }
307}
308
309impl<S: Borrow<ScopeHandle>> Join<S> {
310 fn new(scope: S) -> Self {
311 let waker_entry = scope.borrow().inner.state.waker_entry();
312 Self { scope, waker_entry }
313 }
314
315 pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
320 self.scope.borrow().abort_all_tasks();
321 self
322 }
323
324 pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
329 self.scope.borrow().cancel_all_tasks();
330 self
331 }
332}
333
334impl<S> Future for Join<S>
335where
336 S: Borrow<ScopeHandle>,
337{
338 type Output = ();
339 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340 let this = self.project();
341 let mut state = Borrow::borrow(&*this.scope).lock();
342 if state.has_tasks() {
343 state.add_waker(this.waker_entry, cx.waker().clone());
344 Poll::Pending
345 } else {
346 state.mark_finished();
347 Poll::Ready(())
348 }
349 }
350}
351
352pub trait Spawnable {
355 type Output;
357
358 fn into_task(self, scope: ScopeHandle) -> TaskHandle;
360}
361
362impl<F: Future + Send + 'static> Spawnable for F
363where
364 F::Output: Send + 'static,
365{
366 type Output = F::Output;
367
368 fn into_task(self, scope: ScopeHandle) -> TaskHandle {
369 scope.new_task(None, self)
370 }
371}
372
373#[derive(Clone)]
385pub struct ScopeHandle {
386 inner: Arc<ScopeInner>,
388 }
390
391impl ScopeHandle {
392 pub fn new_child(&self) -> Scope {
394 self.new_child_inner(String::new())
395 }
396
397 pub fn instrument_data(&self) -> Option<&(dyn Any + Send + Sync)> {
399 self.inner.instrument_data.as_deref()
400 }
401
402 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
404 self.new_child_inner(name.into())
405 }
406
407 fn new_child_inner(&self, name: String) -> Scope {
408 let mut state = self.lock();
409 let child = ScopeHandle {
410 inner: Arc::new(ScopeInner {
411 executor: self.inner.executor.clone(),
412 state: Condition::new(ScopeState::new_child(
413 self.clone(),
414 &state,
415 JoinResults::default().into(),
416 )),
417
418 instrument_data: self
419 .inner
420 .executor
421 .instrument
422 .as_ref()
423 .map(|value| value.scope_created(&name, Some(self))),
424 name,
425 }),
426 };
427 let weak = child.downgrade();
428 state.insert_child(weak);
429 Scope { inner: child }
430 }
431
432 pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
436 let task = future.into_task(self.clone());
437 let task_id = task.id();
438 self.insert_task(task, false);
439 JoinHandle::new(self.clone(), task_id)
440 }
441
442 pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
447 let task = self.new_local_task(None, future);
448 let id = task.id();
449 self.insert_task(task, false);
450 JoinHandle::new(self.clone(), id)
451 }
452
453 pub fn compute<T: Send + 'static>(
458 &self,
459 future: impl Spawnable<Output = T> + Send + 'static,
460 ) -> crate::Task<T> {
461 let task = future.into_task(self.clone());
462 let id = task.id();
463 self.insert_task(task, false);
464 JoinHandle::new(self.clone(), id).into()
465 }
466
467 pub fn compute_local<T: 'static>(
475 &self,
476 future: impl Future<Output = T> + 'static,
477 ) -> crate::Task<T> {
478 let task = self.new_local_task(None, future);
479 let id = task.id();
480 self.insert_task(task, false);
481 JoinHandle::new(self.clone(), id).into()
482 }
483
484 pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
485 ScopeHandle {
486 inner: Arc::new(ScopeInner {
487 state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
488 name: "root".to_string(),
489 instrument_data: executor
490 .instrument
491 .as_ref()
492 .map(|value| value.scope_created("root", None)),
493 executor,
494 }),
495 }
496 }
497
498 pub fn close(&self) {
506 self.lock().close();
507 }
508
509 pub fn cancel(self) -> Join<Self> {
514 self.cancel_all_tasks();
515 Join::new(self)
516 }
517
518 pub fn abort(self) -> impl Future<Output = ()> {
523 self.abort_all_tasks();
524 Join::new(self)
525 }
526
527 #[must_use]
539 pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
540 ScopeActiveGuard::new(self)
541 }
542
543 pub fn is_cancelled(&self) -> bool {
546 self.lock().status().is_cancelled()
547 }
548
549 pub async fn on_no_tasks(&self) {
556 self.inner
557 .state
558 .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
559 .await;
560 }
561
562 pub async fn on_no_tasks_and_guards(&self) {
565 self.inner
566 .state
567 .when(|state| {
568 if state.has_tasks() || state.guards() > 0 {
569 Poll::Pending
570 } else {
571 Poll::Ready(())
572 }
573 })
574 .await;
575 }
576
577 pub fn wake_all_with_active_guard(&self) {
579 self.lock().wake_all_with_active_guard();
580 }
581
582 pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
585 &self,
586 id: Option<usize>,
587 fut: Fut,
588 ) -> AtomicFutureHandle<'a>
589 where
590 Fut::Output: Send,
591 {
592 let id = id.unwrap_or_else(|| self.executor().next_task_id());
593 let mut task = AtomicFutureHandle::new(Some(self.clone()), id, fut);
594 if let Some(instrument) = &self.executor().instrument {
595 instrument.task_created(self, &mut task);
596 }
597 task
598 }
599
600 pub(crate) fn new_local_task<'a>(
603 &self,
604 id: Option<usize>,
605 fut: impl Future + 'a,
606 ) -> AtomicFutureHandle<'a> {
607 if !self.executor().is_local() {
609 panic!(
610 "Error: called `new_local_task` on multithreaded executor. \
611 Use `spawn` or a `LocalExecutor` instead."
612 );
613 }
614 assert_eq!(
615 self.executor().first_thread_id.get(),
616 Some(&std::thread::current().id()),
617 "Error: called `new_local_task` on a different thread to the executor",
618 );
619
620 let id = id.unwrap_or_else(|| self.executor().next_task_id());
621 unsafe {
624 let mut task = AtomicFutureHandle::new_local(Some(self.clone()), id, fut);
625 if let Some(instrument) = &self.executor().instrument {
626 instrument.task_created(self, &mut task);
627 }
628 task
629 }
630 }
631}
632
633impl fmt::Debug for ScopeHandle {
634 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
635 f.debug_struct("Scope").field("name", &self.inner.name).finish()
636 }
637}
638
639pub struct ScopeStream<R> {
649 inner: ScopeHandle,
650 stream: Arc<Mutex<ResultsStreamInner<R>>>,
651}
652
653impl<R: Send + 'static> ScopeStream<R> {
654 pub fn new() -> (Self, ScopeStreamHandle<R>) {
663 Self::new_inner(String::new())
664 }
665
666 pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
675 Self::new_inner(name.into())
676 }
677
678 fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
679 let this = ScopeHandle::with_current(|handle| {
680 let mut state = handle.lock();
681 let stream = Arc::default();
682 let child = ScopeHandle {
683 inner: Arc::new(ScopeInner {
684 executor: handle.executor().clone(),
685 state: Condition::new(ScopeState::new_child(
686 handle.clone(),
687 &state,
688 Box::new(ResultsStream { inner: Arc::clone(&stream) }),
689 )),
690 instrument_data: handle
691 .executor()
692 .instrument
693 .as_ref()
694 .map(|value| value.scope_created(&name, Some(handle))),
695 name,
696 }),
697 };
698 let weak = child.downgrade();
699 state.insert_child(weak);
700 ScopeStream { inner: child, stream }
701 });
702 let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
703 (this, handle)
704 }
705}
706
707impl<R> Drop for ScopeStream<R> {
708 fn drop(&mut self) {
709 self.inner.abort_all_tasks();
718 }
719}
720
721impl<R: Send + 'static> Stream for ScopeStream<R> {
722 type Item = R;
723
724 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
725 let mut stream_inner = self.stream.lock();
726 match stream_inner.results.pop() {
727 Some(result) => Poll::Ready(Some(result)),
728 None => {
729 drop(stream_inner);
732 let state = self.inner.lock();
733 let mut stream_inner = self.stream.lock();
734 match stream_inner.results.pop() {
735 Some(result) => Poll::Ready(Some(result)),
736 None => {
737 if state.has_tasks() {
738 stream_inner.waker = Some(cx.waker().clone());
739 Poll::Pending
740 } else {
741 Poll::Ready(None)
742 }
743 }
744 }
745 }
746 }
747 }
748}
749
750impl<R> Deref for ScopeStream<R> {
751 type Target = ScopeHandle;
752 fn deref(&self) -> &Self::Target {
753 &self.inner
754 }
755}
756
757impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
758 fn borrow(&self) -> &ScopeHandle {
759 self
760 }
761}
762
763impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
764 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
765 let (stream, handle) = ScopeStream::new();
766 for fut in iter {
767 handle.push(fut);
768 }
769 stream.close();
770 stream
771 }
772}
773
774#[derive(Clone)]
775pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
776
777impl<R: Send> ScopeStreamHandle<R> {
778 pub fn push(&self, future: impl Spawnable<Output = R>) {
779 self.0.insert_task(future.into_task(self.0.clone()), true);
780 }
781}
782
783#[derive(Debug)]
793#[must_use]
794pub struct ScopeActiveGuard(ScopeHandle);
795
796impl Deref for ScopeActiveGuard {
797 type Target = ScopeHandle;
798 fn deref(&self) -> &Self::Target {
799 &self.0
800 }
801}
802
803impl Drop for ScopeActiveGuard {
804 fn drop(&mut self) {
805 let Self(scope) = self;
806 scope.release_cancel_guard();
807 }
808}
809
810impl Clone for ScopeActiveGuard {
811 fn clone(&self) -> Self {
812 self.0.lock().acquire_cancel_guard(1);
813 Self(self.0.clone())
814 }
815}
816
817impl ScopeActiveGuard {
818 pub fn as_handle(&self) -> &ScopeHandle {
820 &self.0
821 }
822
823 pub fn to_handle(&self) -> ScopeHandle {
825 self.0.clone()
826 }
827
828 pub async fn on_cancel(&self) {
834 self.0
835 .inner
836 .state
837 .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
838 .await
839 }
840
841 fn new(scope: &ScopeHandle) -> Option<Self> {
842 if scope.lock().acquire_cancel_guard_if_not_finished() {
843 Some(Self(scope.clone()))
844 } else {
845 None
846 }
847 }
848}
849
850#[derive(Clone)]
856struct WeakScopeHandle {
857 inner: Weak<ScopeInner>,
858}
859
860impl WeakScopeHandle {
861 pub fn upgrade(&self) -> Option<ScopeHandle> {
863 self.inner.upgrade().map(|inner| ScopeHandle { inner })
864 }
865}
866
867impl hash::Hash for WeakScopeHandle {
868 fn hash<H: hash::Hasher>(&self, state: &mut H) {
869 Weak::as_ptr(&self.inner).hash(state);
870 }
871}
872
873impl PartialEq for WeakScopeHandle {
874 fn eq(&self, other: &Self) -> bool {
875 Weak::ptr_eq(&self.inner, &other.inner)
876 }
877}
878
879impl Eq for WeakScopeHandle {
880 }
883
884mod state {
887 use super::*;
888
889 pub struct ScopeState {
890 pub parent: Option<ScopeHandle>,
891 children: HashSet<WeakScopeHandle>,
893 all_tasks: HashSet<TaskHandle>,
894 subscopes_with_tasks: u32,
898 can_spawn: bool,
899 guards: u32,
900 status: Status,
901 pub results: Box<dyn Results>,
903 }
904
905 pub enum JoinResult {
906 Waker(Waker),
907 Result(TaskHandle),
908 }
909
910 #[repr(u8)] #[derive(Default, Debug, Clone, Copy)]
912 pub enum Status {
913 #[default]
914 Active,
916 PendingCancellation,
919 Finished,
922 }
923
924 impl Status {
925 pub fn is_cancelled(&self) -> bool {
927 match self {
928 Self::Active => false,
929 Self::PendingCancellation | Self::Finished => true,
930 }
931 }
932 }
933
934 impl ScopeState {
935 pub fn new_root(results: Box<impl Results>) -> Self {
936 Self {
937 parent: None,
938 children: Default::default(),
939 all_tasks: Default::default(),
940 subscopes_with_tasks: 0,
941 can_spawn: true,
942 guards: 0,
943 status: Default::default(),
944 results,
945 }
946 }
947
948 pub fn new_child(
949 parent_handle: ScopeHandle,
950 parent_state: &Self,
951 results: Box<impl Results>,
952 ) -> Self {
953 let (status, can_spawn) = match parent_state.status {
954 Status::Active => (Status::Active, parent_state.can_spawn),
955 Status::Finished | Status::PendingCancellation => (Status::Finished, false),
956 };
957 Self {
958 parent: Some(parent_handle),
959 children: Default::default(),
960 all_tasks: Default::default(),
961 subscopes_with_tasks: 0,
962 can_spawn,
963 guards: 0,
964 status,
965 results,
966 }
967 }
968 }
969
970 impl ScopeState {
971 pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
972 &self.all_tasks
973 }
974
975 pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
978 if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
979 return Some(task);
980 }
981 if self.all_tasks.is_empty() && !self.register_first_task() {
982 return Some(task);
983 }
984 task.wake();
985 assert!(self.all_tasks.insert(task));
986 None
987 }
988
989 pub fn children(&self) -> &HashSet<WeakScopeHandle> {
990 &self.children
991 }
992
993 pub fn insert_child(&mut self, child: WeakScopeHandle) {
994 self.children.insert(child);
995 }
996
997 pub fn remove_child(&mut self, child: &PtrKey) {
998 let found = self.children.remove(child);
999 assert!(found || self.children.is_empty());
1002 }
1003
1004 pub fn status(&self) -> Status {
1005 self.status
1006 }
1007
1008 pub fn guards(&self) -> u32 {
1009 self.guards
1010 }
1011
1012 pub fn close(&mut self) {
1013 self.can_spawn = false;
1014 }
1015
1016 pub fn mark_finished(&mut self) {
1017 self.can_spawn = false;
1018 self.status = Status::Finished;
1019 }
1020
1021 pub fn has_tasks(&self) -> bool {
1022 self.subscopes_with_tasks > 0
1023 }
1024
1025 pub fn wake_all_with_active_guard(&mut self) {
1026 let mut count = 0;
1027 for task in &self.all_tasks {
1028 if task.wake_with_active_guard() {
1029 count += 1;
1030 }
1031 }
1032 self.acquire_cancel_guard(count);
1033 }
1034
1035 pub fn abort_tasks_and_mark_finished(&mut self) {
1036 for task in self.all_tasks() {
1037 if task.abort() {
1038 task.scope().executor().ready_tasks.push(task.clone());
1039 }
1040 }
1043 self.mark_finished();
1044 }
1045
1046 pub fn wake_wakers_and_mark_pending(
1047 this: &mut ConditionGuard<'_, ScopeState>,
1048 wakers: &mut Vec<Waker>,
1049 ) {
1050 wakers.extend(this.drain_wakers());
1051 this.status = Status::PendingCancellation;
1052 }
1053
1054 #[must_use]
1058 fn register_first_task(&mut self) -> bool {
1059 if !self.can_spawn {
1060 return false;
1061 }
1062 let can_spawn = match &self.parent {
1063 Some(parent) => {
1064 self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1067 }
1068 None => true,
1069 };
1070 if can_spawn {
1071 self.subscopes_with_tasks += 1;
1072 debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1073 };
1074 can_spawn
1075 }
1076
1077 fn on_last_task_removed(
1078 this: &mut ConditionGuard<'_, ScopeState>,
1079 num_wakers_hint: usize,
1080 wakers: &mut Vec<Waker>,
1081 ) {
1082 debug_assert!(this.subscopes_with_tasks > 0);
1083 this.subscopes_with_tasks -= 1;
1084 if this.subscopes_with_tasks > 0 {
1085 wakers.reserve(num_wakers_hint);
1086 return;
1087 }
1088
1089 match &this.parent {
1090 Some(parent) => {
1091 Self::on_last_task_removed(
1092 &mut parent.lock(),
1093 num_wakers_hint + this.waker_count(),
1094 wakers,
1095 );
1096 }
1097 None => wakers.reserve(num_wakers_hint),
1098 };
1099 wakers.extend(this.drain_wakers());
1100 }
1101
1102 pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1106 match self.status {
1107 Status::Active | Status::PendingCancellation => {
1108 self.acquire_cancel_guard(1);
1109 true
1110 }
1111 Status::Finished => false,
1112 }
1113 }
1114
1115 pub fn acquire_cancel_guard(&mut self, count: u32) {
1116 if count == 0 {
1117 return;
1118 }
1119 if self.guards == 0 {
1120 if let Some(parent) = self.parent.as_ref() {
1121 parent.acquire_cancel_guard();
1122 }
1123 }
1124 self.guards += count;
1125 }
1126
1127 pub fn release_cancel_guard(
1128 this: &mut ConditionGuard<'_, Self>,
1129 wake_vec: &mut WakeVec,
1130 mut waker_count: usize,
1131 ) {
1132 this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1133 if this.guards == 0 {
1134 waker_count += this.waker_count();
1135 this.on_zero_guards(wake_vec, waker_count);
1136 wake_vec.0.extend(this.drain_wakers())
1137 } else {
1138 wake_vec.0.reserve_exact(waker_count);
1139 }
1140 }
1141
1142 fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1143 match self.status {
1144 Status::Active => {}
1145 Status::PendingCancellation => {
1146 self.abort_tasks_and_mark_finished();
1147 }
1148 Status::Finished => {}
1151 }
1152 if let Some(parent) = &self.parent {
1153 ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1154 }
1155 }
1156 }
1157
1158 #[derive(Default)]
1159 pub struct WakeVec(Vec<Waker>);
1160
1161 impl Drop for WakeVec {
1162 fn drop(&mut self) {
1163 for waker in self.0.drain(..) {
1164 waker.wake();
1165 }
1166 }
1167 }
1168
1169 pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1171
1172 impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1173 fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1174 Self(value, WakeVec::default())
1175 }
1176 }
1177
1178 impl ScopeWaker<'_> {
1179 pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
1180 let task = self.all_tasks.take(&id);
1181 if task.is_some() {
1182 self.on_task_removed(0);
1183 }
1184 task
1185 }
1186
1187 pub fn task_did_finish(&mut self, id: usize) {
1188 if let Some(task) = self.all_tasks.take(&id) {
1189 self.on_task_removed(1);
1190 if !task.is_detached() {
1191 let maybe_waker = self.results.task_did_finish(task);
1192 self.1.0.extend(maybe_waker);
1193 }
1194 }
1195 }
1196
1197 pub fn set_closed_and_drain(
1198 &mut self,
1199 ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1200 self.close();
1201 let all_tasks = std::mem::take(&mut self.all_tasks);
1202 let results = self.results.take();
1203 if !all_tasks.is_empty() {
1204 self.on_task_removed(0)
1205 }
1206 let children = self.children.drain();
1207 (all_tasks, results, children)
1208 }
1209
1210 fn on_task_removed(&mut self, num_wakers_hint: usize) {
1211 if self.all_tasks.is_empty() {
1212 ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1.0)
1213 }
1214 }
1215
1216 pub fn wake_wakers_and_mark_pending(&mut self) {
1217 let Self(state, wakers) = self;
1218 ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1219 }
1220 }
1221
1222 impl<'a> Deref for ScopeWaker<'a> {
1223 type Target = ConditionGuard<'a, ScopeState>;
1224
1225 fn deref(&self) -> &Self::Target {
1226 &self.0
1227 }
1228 }
1229
1230 impl DerefMut for ScopeWaker<'_> {
1231 fn deref_mut(&mut self) -> &mut Self::Target {
1232 &mut self.0
1233 }
1234 }
1235}
1236
1237struct ScopeInner {
1238 executor: Arc<Executor>,
1239 state: Condition<ScopeState>,
1240 name: String,
1241 instrument_data: Option<Box<dyn Any + Send + Sync>>,
1242}
1243
1244impl Drop for ScopeInner {
1245 fn drop(&mut self) {
1246 let key = unsafe { &*(self as *const _ as *const PtrKey) };
1251 let state = self.state.lock();
1252 if let Some(parent) = &state.parent {
1253 let mut wake_vec = WakeVec::default();
1254 let mut parent_state = parent.lock();
1255 if state.guards() != 0 {
1256 ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1257 }
1258 parent_state.remove_child(key);
1259 }
1260 }
1261}
1262
1263impl ScopeHandle {
1264 fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1265 super::common::TaskHandle::with_current(|task| match task {
1266 Some(task) => f(task.scope()),
1267 None => f(EHandle::local().global_scope()),
1268 })
1269 }
1270
1271 fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1272 self.inner.state.lock()
1273 }
1274
1275 fn downgrade(&self) -> WeakScopeHandle {
1276 WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1277 }
1278
1279 #[inline(always)]
1280 pub(crate) fn executor(&self) -> &Arc<Executor> {
1281 &self.inner.executor
1282 }
1283
1284 pub(crate) fn detach(&self, task_id: usize) {
1286 let _maybe_task = {
1287 let mut state = self.lock();
1288 if let Some(task) = state.all_tasks().get(&task_id) {
1289 task.detach();
1290 }
1291 state.results.detach(task_id)
1292 };
1293 }
1294
1295 pub(crate) unsafe fn abort_task<R>(&self, task_id: usize) -> Option<R> {
1301 let mut state = self.lock();
1302 if let Some(task) = state.results.detach(task_id) {
1303 drop(state);
1304 return task.take_result();
1305 }
1306 state.all_tasks().get(&task_id).and_then(|task| {
1307 if task.abort() {
1308 self.inner.executor.ready_tasks.push(task.clone());
1309 }
1310 task.take_result()
1311 })
1312 }
1313
1314 pub(crate) fn abort_and_detach(&self, task_id: usize) {
1316 let _tasks = {
1317 let mut state = ScopeWaker::from(self.lock());
1318 let maybe_task1 = state.results.detach(task_id);
1319 let mut maybe_task2 = None;
1320 if let Some(task) = state.all_tasks().get(&task_id) {
1321 match task.abort_and_detach() {
1322 AbortAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1323 AbortAndDetachResult::AddToRunQueue => {
1324 self.inner.executor.ready_tasks.push(task.clone());
1325 }
1326 AbortAndDetachResult::Pending => {}
1327 }
1328 }
1329 (maybe_task1, maybe_task2)
1330 };
1331 }
1332
1333 pub(crate) unsafe fn poll_join_result<R>(
1339 &self,
1340 task_id: usize,
1341 cx: &mut Context<'_>,
1342 ) -> Poll<R> {
1343 let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1344 match task.take_result() {
1345 Some(result) => Poll::Ready(result),
1346 None => {
1347 Poll::Pending
1349 }
1350 }
1351 }
1352
1353 pub(crate) unsafe fn poll_aborted<R>(
1355 &self,
1356 task_id: usize,
1357 cx: &mut Context<'_>,
1358 ) -> Poll<Option<R>> {
1359 let task = self.lock().results.poll_join_result(task_id, cx);
1360 task.map(|task| task.take_result())
1361 }
1362
1363 pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1364 let returned_task = self.lock().insert_task(task, for_stream);
1365 returned_task.is_none()
1366 }
1367
1368 pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1379 let mut state = ScopeWaker::from(self.lock());
1380 let task = state.take_task(task_id);
1381 if let Some(task) = task {
1382 task.drop_future_unchecked();
1383 }
1384 }
1385
1386 pub(super) fn task_did_finish(&self, id: usize) {
1387 let mut state = ScopeWaker::from(self.lock());
1388 state.task_did_finish(id);
1389 }
1390
1391 fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1394 let mut scopes = vec![self.clone()];
1395 while let Some(scope) = scopes.pop() {
1396 let mut scope_waker = ScopeWaker::from(scope.lock());
1397 if callback(&mut scope_waker) {
1398 scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1399 }
1400 }
1401 }
1402
1403 fn acquire_cancel_guard(&self) {
1404 self.lock().acquire_cancel_guard(1)
1405 }
1406
1407 pub(crate) fn release_cancel_guard(&self) {
1408 let mut wake_vec = WakeVec::default();
1409 ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1410 }
1411
1412 fn cancel_all_tasks(&self) {
1414 self.visit_scopes_locked(|state| {
1415 match state.status() {
1416 Status::Active => {
1417 if state.guards() == 0 {
1418 state.abort_tasks_and_mark_finished();
1419 } else {
1420 state.wake_wakers_and_mark_pending();
1421 }
1422 true
1423 }
1424 Status::PendingCancellation => {
1425 true
1429 }
1430 Status::Finished => {
1431 false
1433 }
1434 }
1435 });
1436 }
1437
1438 fn abort_all_tasks(&self) {
1440 self.visit_scopes_locked(|state| match state.status() {
1441 Status::Active | Status::PendingCancellation => {
1442 state.abort_tasks_and_mark_finished();
1443 true
1444 }
1445 Status::Finished => false,
1446 });
1447 }
1448
1449 pub(super) fn drop_all_tasks(&self) {
1456 let mut scopes = vec![self.clone()];
1457 while let Some(scope) = scopes.pop() {
1458 let (tasks, join_results) = {
1459 let mut state = ScopeWaker::from(scope.lock());
1460 let (tasks, join_results, children) = state.set_closed_and_drain();
1461 scopes.extend(children.filter_map(|child| child.upgrade()));
1462 (tasks, join_results)
1463 };
1464 for task in tasks {
1466 task.try_drop().expect("Expected drop to succeed");
1467 }
1468 std::mem::drop(join_results);
1469 }
1470 }
1471}
1472
1473#[repr(transparent)]
1475struct PtrKey;
1476
1477impl Borrow<PtrKey> for WeakScopeHandle {
1478 fn borrow(&self) -> &PtrKey {
1479 unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1481 }
1482}
1483
1484impl PartialEq for PtrKey {
1485 fn eq(&self, other: &Self) -> bool {
1486 std::ptr::eq(self, other)
1487 }
1488}
1489
1490impl Eq for PtrKey {}
1491
1492impl hash::Hash for PtrKey {
1493 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1494 (self as *const PtrKey).hash(state);
1495 }
1496}
1497
1498#[derive(Default)]
1499struct JoinResults(HashMap<usize, JoinResult>);
1500
1501trait Results: Send + Sync + 'static {
1502 fn can_spawn(&self) -> bool;
1504
1505 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1507
1508 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1510
1511 fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1513
1514 fn take(&mut self) -> Box<dyn Any>;
1516
1517 #[cfg(test)]
1519 fn is_empty(&self) -> bool;
1520}
1521
1522impl Results for JoinResults {
1523 fn can_spawn(&self) -> bool {
1524 true
1525 }
1526
1527 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1528 match self.0.entry(task_id) {
1529 Entry::Occupied(mut o) => match o.get_mut() {
1530 JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1531 JoinResult::Result(_) => {
1532 let JoinResult::Result(task) = o.remove() else { unreachable!() };
1533 return Poll::Ready(task);
1534 }
1535 },
1536 Entry::Vacant(v) => {
1537 v.insert(JoinResult::Waker(cx.waker().clone()));
1538 }
1539 }
1540 Poll::Pending
1541 }
1542
1543 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1544 match self.0.entry(task.id()) {
1545 Entry::Occupied(mut o) => {
1546 let JoinResult::Waker(waker) =
1547 std::mem::replace(o.get_mut(), JoinResult::Result(task))
1548 else {
1549 unreachable!()
1553 };
1554 Some(waker)
1555 }
1556 Entry::Vacant(v) => {
1557 v.insert(JoinResult::Result(task));
1558 None
1559 }
1560 }
1561 }
1562
1563 fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1564 match self.0.remove(&task_id) {
1565 Some(JoinResult::Result(task)) => Some(task),
1566 _ => None,
1567 }
1568 }
1569
1570 fn take(&mut self) -> Box<dyn Any> {
1571 Box::new(Self(std::mem::take(&mut self.0)))
1572 }
1573
1574 #[cfg(test)]
1575 fn is_empty(&self) -> bool {
1576 self.0.is_empty()
1577 }
1578}
1579
1580#[derive(Default)]
1581struct ResultsStream<R> {
1582 inner: Arc<Mutex<ResultsStreamInner<R>>>,
1583}
1584
1585struct ResultsStreamInner<R> {
1586 results: Vec<R>,
1587 waker: Option<Waker>,
1588}
1589
1590impl<R> Default for ResultsStreamInner<R> {
1591 fn default() -> Self {
1592 Self { results: Vec::new(), waker: None }
1593 }
1594}
1595
1596impl<R: Send + 'static> Results for ResultsStream<R> {
1597 fn can_spawn(&self) -> bool {
1598 false
1599 }
1600
1601 fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1602 Poll::Pending
1603 }
1604
1605 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1606 let mut inner = self.inner.lock();
1607 inner.results.extend(unsafe { task.take_result() });
1610 inner.waker.take()
1611 }
1612
1613 fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1614 None
1615 }
1616
1617 fn take(&mut self) -> Box<dyn Any> {
1618 Box::new(std::mem::take(&mut self.inner.lock().results))
1619 }
1620
1621 #[cfg(test)]
1622 fn is_empty(&self) -> bool {
1623 false
1624 }
1625}
1626
1627#[cfg(test)]
1628mod tests {
1629 use super::*;
1633 use crate::{
1634 EHandle, LocalExecutor, SendExecutorBuilder, SpawnableFuture, Task, TestExecutor, Timer,
1635 yield_now,
1636 };
1637 use fuchsia_sync::{Condvar, Mutex};
1638 use futures::channel::mpsc;
1639 use futures::{FutureExt, StreamExt};
1640 use std::future::pending;
1641 use std::pin::{Pin, pin};
1642 use std::sync::Arc;
1643 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1644 use std::task::{Context, Poll};
1645 use std::time::Duration;
1646
1647 #[derive(Default)]
1648 struct RemoteControlFuture(Mutex<RCFState>);
1649 #[derive(Default)]
1650 struct RCFState {
1651 resolved: bool,
1652 waker: Option<Waker>,
1653 }
1654
1655 impl Future for &RemoteControlFuture {
1656 type Output = ();
1657 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1658 let mut this = self.0.lock();
1659 if this.resolved {
1660 Poll::Ready(())
1661 } else {
1662 this.waker.replace(cx.waker().clone());
1663 Poll::Pending
1664 }
1665 }
1666 }
1667
1668 impl RemoteControlFuture {
1669 fn new() -> Arc<Self> {
1670 Arc::new(Default::default())
1671 }
1672
1673 fn resolve(&self) {
1674 let mut this = self.0.lock();
1675 this.resolved = true;
1676 if let Some(waker) = this.waker.take() {
1677 waker.wake();
1678 }
1679 }
1680
1681 fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1682 let this = Arc::clone(self);
1683 #[allow(clippy::redundant_async_block)] async move {
1685 (&*this).await
1686 }
1687 }
1688 }
1689
1690 #[test]
1691 fn compute_works_on_root_scope() {
1692 let mut executor = TestExecutor::new();
1693 let scope = executor.global_scope();
1694 let mut task = pin!(scope.compute(async { 1 }));
1695 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1696 }
1697
1698 #[test]
1699 fn compute_works_on_new_child() {
1700 let mut executor = TestExecutor::new();
1701 let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1702 let mut task = pin!(scope.compute(async { 1 }));
1703 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1704 }
1705
1706 #[test]
1707 fn scope_drop_cancels_tasks() {
1708 let mut executor = TestExecutor::new();
1709 let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1710 let mut task = pin!(scope.compute(async { 1 }));
1711 drop(scope);
1712 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1713 }
1714
1715 #[test]
1716 fn tasks_do_not_spawn_on_cancelled_scopes() {
1717 let mut executor = TestExecutor::new();
1718 let scope =
1719 executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1720 let handle = scope.to_handle();
1721 let mut cancel = pin!(scope.cancel());
1722 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1723 let mut task = pin!(handle.compute(async { 1 }));
1724 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1725 }
1726
1727 #[test]
1728 fn tasks_do_not_spawn_on_closed_empty_scopes() {
1729 let mut executor = TestExecutor::new();
1730 let scope =
1731 executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1732 let handle = scope.to_handle();
1733 let mut close = pin!(scope.cancel());
1734 assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1735 let mut task = pin!(handle.compute(async { 1 }));
1736 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1737 }
1738
1739 #[test]
1740 fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1741 let mut executor = TestExecutor::new();
1742 let scope = executor.global_scope().new_child();
1743 let handle = scope.to_handle();
1744 handle.spawn(pending());
1745 let mut close = pin!(scope.close());
1746 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1747 let mut task = pin!(handle.compute(async { 1 }));
1748 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1749 }
1750
1751 #[test]
1752 fn spawn_works_on_child_and_grandchild() {
1753 let mut executor = TestExecutor::new();
1754 let scope = executor.global_scope().new_child();
1755 let child = scope.new_child();
1756 let grandchild = child.new_child();
1757 let mut child_task = pin!(child.compute(async { 1 }));
1758 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1759 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1760 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1761 }
1762
1763 #[test]
1764 fn spawn_drop_cancels_child_and_grandchild_tasks() {
1765 let mut executor = TestExecutor::new();
1766 let scope = executor.global_scope().new_child();
1767 let child = scope.new_child();
1768 let grandchild = child.new_child();
1769 let mut child_task = pin!(child.compute(async { 1 }));
1770 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1771 drop(scope);
1772 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1773 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1774 }
1775
1776 #[test]
1777 fn completed_tasks_are_cleaned_up_after_cancel() {
1778 let mut executor = TestExecutor::new();
1779 let scope = executor.global_scope().new_child();
1780
1781 let task1 = scope.spawn(pending::<()>());
1782 let task2 = scope.spawn(async {});
1783 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1784 assert_eq!(scope.lock().all_tasks().len(), 1);
1785
1786 assert_eq!(task1.abort().now_or_never(), None);
1789 assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1790
1791 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1792 assert_eq!(scope.lock().all_tasks().len(), 0);
1793 assert!(scope.lock().results.is_empty());
1794 }
1795
1796 #[test]
1797 fn join_emtpy_scope() {
1798 let mut executor = TestExecutor::new();
1799 let scope = executor.global_scope().new_child();
1800 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1801 }
1802
1803 #[test]
1804 fn task_handle_preserves_access_to_result_after_join_begins() {
1805 let mut executor = TestExecutor::new();
1806 let scope = executor.global_scope().new_child();
1807 let mut task = scope.compute(async { 1 });
1808 scope.spawn(async {});
1809 let task2 = scope.spawn(pending::<()>());
1810 let mut join = pin!(scope.join().fuse());
1813 let _ = executor.run_until_stalled(&mut join);
1814 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1815 drop(task2.abort());
1816 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1817 }
1818
1819 #[test]
1820 fn join_blocks_until_task_is_cancelled() {
1821 let mut executor = TestExecutor::new();
1824 let scope = executor.global_scope().new_child();
1825 let outstanding_task = scope.spawn(pending::<()>());
1826 let cancelled_task = scope.spawn(pending::<()>());
1827 assert_eq!(
1828 executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1829 Poll::Ready(None)
1830 );
1831 let mut join = pin!(scope.join());
1832 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1833 assert_eq!(
1834 executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1835 Poll::Ready(None)
1836 );
1837 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1838 }
1839
1840 #[test]
1841 fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1842 let mut executor = TestExecutor::new();
1843 let scope = executor.global_scope().new_child();
1844 scope.spawn(pending::<()>());
1846 let mut join = pin!(scope.join());
1847 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1848 let mut cancel = pin!(join.cancel());
1849 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1850 }
1851
1852 #[test]
1853 fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1854 let mut executor = TestExecutor::new();
1855 let scope = executor.global_scope().new_child();
1856 scope.spawn(pending::<()>());
1858 let mut close = pin!(scope.close());
1859 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1860 let mut cancel = pin!(close.cancel());
1861 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1862 }
1863
1864 #[test]
1865 fn join_scope_blocks_until_spawned_task_completes() {
1866 let mut executor = TestExecutor::new();
1867 let scope = executor.global_scope().new_child();
1868 let remote = RemoteControlFuture::new();
1869 let mut task = scope.spawn(remote.as_future());
1870 let mut scope_join = pin!(scope.join());
1871 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1872 remote.resolve();
1873 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1874 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1875 }
1876
1877 #[test]
1878 fn close_scope_blocks_until_spawned_task_completes() {
1879 let mut executor = TestExecutor::new();
1880 let scope = executor.global_scope().new_child();
1881 let remote = RemoteControlFuture::new();
1882 let mut task = scope.spawn(remote.as_future());
1883 let mut scope_close = pin!(scope.close());
1884 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1885 remote.resolve();
1886 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1887 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1888 }
1889
1890 #[test]
1891 fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1892 let mut executor = TestExecutor::new();
1893 let scope = executor.global_scope().new_child();
1894 let child = scope.new_child();
1895 let remote = RemoteControlFuture::new();
1896 child.spawn(remote.as_future());
1897 let mut scope_join = pin!(scope.join());
1898 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1899 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1900 child.detach();
1901 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1902 remote.resolve();
1903 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1904 }
1905
1906 #[test]
1907 fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1908 let mut executor = TestExecutor::new();
1909 let scope = executor.global_scope().new_child();
1910 let remote = RemoteControlFuture::new();
1911 {
1912 let remote = remote.clone();
1913 scope.spawn(async move {
1914 let child = Scope::new_with_name("child");
1915 child.spawn(async move {
1916 Scope::current().spawn(remote.as_future());
1917 });
1918 child.detach();
1919 });
1920 }
1921 let mut scope_join = pin!(scope.join());
1922 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1923 remote.resolve();
1924 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1925 }
1926
1927 #[test]
1928 fn join_scope_blocks_when_blocked_child_is_detached() {
1929 let mut executor = TestExecutor::new();
1930 let scope = executor.global_scope().new_child();
1931 let child = scope.new_child();
1932 child.spawn(pending());
1933 let mut scope_join = pin!(scope.join());
1934 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1935 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1936 child.detach();
1937 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1938 }
1939
1940 #[test]
1941 fn join_scope_completes_when_blocked_child_is_cancelled() {
1942 let mut executor = TestExecutor::new();
1943 let scope = executor.global_scope().new_child();
1944 let child = scope.new_child();
1945 child.spawn(pending());
1946 let mut scope_join = pin!(scope.join());
1947 {
1948 let mut child_join = pin!(child.join());
1949 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1950 assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1951 }
1952 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1953 }
1954
1955 #[test]
1956 fn detached_scope_can_spawn() {
1957 let mut executor = TestExecutor::new();
1958 let scope = executor.global_scope().new_child();
1959 let handle = scope.to_handle();
1960 scope.detach();
1961 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1962 }
1963
1964 #[test]
1965 fn dropped_scope_cannot_spawn() {
1966 let mut executor = TestExecutor::new();
1967 let scope = executor.global_scope().new_child();
1968 let handle = scope.to_handle();
1969 drop(scope);
1970 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1971 }
1972
1973 #[test]
1974 fn dropped_scope_with_running_task_cannot_spawn() {
1975 let mut executor = TestExecutor::new();
1976 let scope = executor.global_scope().new_child();
1977 let handle = scope.to_handle();
1978 let _running_task = handle.spawn(pending::<()>());
1979 drop(scope);
1980 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1981 }
1982
1983 #[test]
1984 fn joined_scope_cannot_spawn() {
1985 let mut executor = TestExecutor::new();
1986 let scope = executor.global_scope().new_child();
1987 let handle = scope.to_handle();
1988 let mut scope_join = pin!(scope.join());
1989 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1990 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1991 }
1992
1993 #[test]
1994 fn joining_scope_with_running_task_can_spawn() {
1995 let mut executor = TestExecutor::new();
1996 let scope = executor.global_scope().new_child();
1997 let handle = scope.to_handle();
1998 let _running_task = handle.spawn(pending::<()>());
1999 let mut scope_join = pin!(scope.join());
2000 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
2001 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2002 }
2003
2004 #[test]
2005 fn joined_scope_child_cannot_spawn() {
2006 let mut executor = TestExecutor::new();
2007 let scope = executor.global_scope().new_child();
2008 let handle = scope.to_handle();
2009 let child_before_join = scope.new_child();
2010 assert_eq!(
2011 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2012 Poll::Ready(1)
2013 );
2014 let mut scope_join = pin!(scope.join());
2015 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2016 let child_after_join = handle.new_child();
2017 let grandchild_after_join = child_before_join.new_child();
2018 assert_eq!(
2019 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2020 Poll::Pending
2021 );
2022 assert_eq!(
2023 executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
2024 Poll::Pending
2025 );
2026 assert_eq!(
2027 executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
2028 Poll::Pending
2029 );
2030 }
2031
2032 #[test]
2033 fn closed_scope_child_cannot_spawn() {
2034 let mut executor = TestExecutor::new();
2035 let scope = executor.global_scope().new_child();
2036 let handle = scope.to_handle();
2037 let child_before_close = scope.new_child();
2038 assert_eq!(
2039 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2040 Poll::Ready(1)
2041 );
2042 let mut scope_close = pin!(scope.close());
2043 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2044 let child_after_close = handle.new_child();
2045 let grandchild_after_close = child_before_close.new_child();
2046 assert_eq!(
2047 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2048 Poll::Pending
2049 );
2050 assert_eq!(
2051 executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2052 Poll::Pending
2053 );
2054 assert_eq!(
2055 executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2056 Poll::Pending
2057 );
2058 }
2059
2060 #[test]
2061 fn can_join_child_first() {
2062 let mut executor = TestExecutor::new();
2063 let scope = executor.global_scope().new_child();
2064 let child = scope.new_child();
2065 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2066 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2067 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2068 }
2069
2070 #[test]
2071 fn can_join_parent_first() {
2072 let mut executor = TestExecutor::new();
2073 let scope = executor.global_scope().new_child();
2074 let child = scope.new_child();
2075 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2076 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2077 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2078 }
2079
2080 #[test]
2081 fn task_in_parent_scope_can_join_child() {
2082 let mut executor = TestExecutor::new();
2083 let scope = executor.global_scope().new_child();
2084 let child = scope.new_child();
2085 let remote = RemoteControlFuture::new();
2086 child.spawn(remote.as_future());
2087 scope.spawn(async move { child.join().await });
2088 let mut join = pin!(scope.join());
2089 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2090 remote.resolve();
2091 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2092 }
2093
2094 #[test]
2095 fn join_completes_while_completed_task_handle_is_held() {
2096 let mut executor = TestExecutor::new();
2097 let scope = executor.global_scope().new_child();
2098 let mut task = scope.compute(async { 1 });
2099 scope.spawn(async {});
2100 let mut join = pin!(scope.join());
2101 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2102 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2103 }
2104
2105 #[test]
2106 fn cancel_completes_while_task_holds_handle() {
2107 let mut executor = TestExecutor::new();
2108 let scope = executor.global_scope().new_child();
2109 let handle = scope.to_handle();
2110 let mut task = scope.compute(async move {
2111 loop {
2112 pending::<()>().await; handle.spawn(async {});
2114 }
2115 });
2116
2117 let mut join = pin!(scope.join());
2119 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2120
2121 let mut cancel = pin!(join.cancel());
2122 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2123 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2124 }
2125
2126 #[test]
2127 fn cancel_from_handle_inside_task() {
2128 let mut executor = TestExecutor::new();
2129 let scope = executor.global_scope().new_child();
2130 {
2131 scope.spawn(pending::<()>());
2133
2134 let mut no_tasks = pin!(scope.on_no_tasks());
2135 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2136
2137 let handle = scope.to_handle();
2138 scope.spawn(async move {
2139 handle.cancel().await;
2140 panic!("cancel() should never complete");
2141 });
2142
2143 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2144 }
2145 assert_eq!(scope.join().now_or_never(), Some(()));
2146 }
2147
2148 #[test]
2149 fn can_spawn_from_non_executor_thread() {
2150 let mut executor = TestExecutor::new();
2151 let scope = executor.global_scope().clone();
2152 let done = Arc::new(AtomicBool::new(false));
2153 let done_clone = done.clone();
2154 let _ = std::thread::spawn(move || {
2155 scope.spawn(async move {
2156 done_clone.store(true, Ordering::Relaxed);
2157 })
2158 })
2159 .join();
2160 let _ = executor.run_until_stalled(&mut pending::<()>());
2161 assert!(done.load(Ordering::Relaxed));
2162 }
2163
2164 #[test]
2165 fn scope_tree() {
2166 let mut executor = TestExecutor::new();
2172 let a = executor.global_scope().new_child();
2173 let b = a.new_child();
2174 let c = b.new_child();
2175 let d = b.new_child();
2176 let a_remote = RemoteControlFuture::new();
2177 let c_remote = RemoteControlFuture::new();
2178 let d_remote = RemoteControlFuture::new();
2179 a.spawn(a_remote.as_future());
2180 c.spawn(c_remote.as_future());
2181 d.spawn(d_remote.as_future());
2182 let mut a_join = pin!(a.join());
2183 let mut b_join = pin!(b.join());
2184 let mut d_join = pin!(d.join());
2185 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2186 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2187 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2188 d_remote.resolve();
2189 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2190 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2191 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2192 c_remote.resolve();
2193 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2194 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2195 a_remote.resolve();
2196 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2197 let mut c_join = pin!(c.join());
2198 assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2199 }
2200
2201 #[test]
2202 fn wake_all_with_active_guard_on_send_executor() {
2203 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2204 let scope = executor.root_scope().new_child();
2205
2206 let (tx, mut rx) = mpsc::unbounded();
2207 let state = Arc::new(AtomicU64::new(0));
2209
2210 struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2211
2212 impl Future for PollCounter {
2213 type Output = ();
2214 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2215 let old = self.0.fetch_add(1, Ordering::Relaxed);
2216 if old >> 32 == (old + 1) & u32::MAX as u64 {
2217 let _ = self.1.unbounded_send(());
2218 }
2219 Poll::Pending
2220 }
2221 }
2222
2223 scope.spawn(PollCounter(state.clone(), tx.clone()));
2224 scope.spawn(PollCounter(state.clone(), tx.clone()));
2225
2226 executor.run(async move {
2227 let mut wait_for_poll_count = async |count| {
2228 let old = state.fetch_or(count << 32, Ordering::Relaxed);
2229 if old & u32::MAX as u64 != count {
2230 rx.next().await.unwrap();
2231 }
2232 state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2233 };
2234
2235 wait_for_poll_count(2).await;
2237
2238 let mut start_count = 2;
2239 for _ in 0..2 {
2240 scope.wake_all_with_active_guard();
2241
2242 wait_for_poll_count(start_count + 2).await;
2243 start_count += 2;
2244 }
2245
2246 scope.wake_all_with_active_guard();
2248 let done = scope.cancel();
2249
2250 wait_for_poll_count(start_count + 2).await;
2251
2252 done.await;
2253 });
2254 }
2255
2256 #[test]
2257 fn on_no_tasks_race() {
2258 fn sleep_random() {
2259 std::thread::sleep(std::time::Duration::from_micros(rand::random_range(0..10)));
2260 }
2261 for _ in 0..2000 {
2262 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2263 let scope = executor.root_scope().new_child();
2264 scope.spawn(async {
2265 sleep_random();
2266 });
2267 executor.run(async move {
2268 sleep_random();
2269 scope.on_no_tasks().await;
2270 });
2271 }
2272 }
2273
2274 #[test]
2275 fn test_detach() {
2276 let mut e = LocalExecutor::default();
2277 e.run_singlethreaded(async {
2278 let counter = Arc::new(AtomicU32::new(0));
2279
2280 {
2281 let counter = counter.clone();
2282 Task::spawn(async move {
2283 for _ in 0..5 {
2284 yield_now().await;
2285 counter.fetch_add(1, Ordering::Relaxed);
2286 }
2287 })
2288 .detach();
2289 }
2290
2291 while counter.load(Ordering::Relaxed) != 5 {
2292 yield_now().await;
2293 }
2294 });
2295
2296 assert!(e.ehandle.root_scope.lock().results.is_empty());
2297 }
2298
2299 #[test]
2300 fn test_cancel() {
2301 let mut e = LocalExecutor::default();
2302 e.run_singlethreaded(async {
2303 let ref_count = Arc::new(());
2304 {
2306 let ref_count = ref_count.clone();
2307 drop(Task::spawn(async move {
2308 let _ref_count = ref_count;
2309 let _: () = std::future::pending().await;
2310 }));
2311 }
2312
2313 while Arc::strong_count(&ref_count) != 1 {
2314 yield_now().await;
2315 }
2316
2317 let task = {
2319 let ref_count = ref_count.clone();
2320 Task::spawn(async move {
2321 let _ref_count = ref_count;
2322 let _: () = std::future::pending().await;
2323 })
2324 };
2325
2326 assert_eq!(task.abort().await, None);
2327 while Arc::strong_count(&ref_count) != 1 {
2328 yield_now().await;
2329 }
2330
2331 let task = {
2333 let ref_count = ref_count.clone();
2334 Task::spawn(async move {
2335 let _ref_count = ref_count;
2336 })
2337 };
2338
2339 while Arc::strong_count(&ref_count) != 1 {
2341 yield_now().await;
2342 }
2343
2344 assert_eq!(task.abort().await, Some(()));
2345 });
2346
2347 assert!(e.ehandle.root_scope.lock().results.is_empty());
2348 }
2349
2350 #[test]
2351 fn test_cancel_waits() {
2352 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2353 let state = Arc::new((Mutex::new(0), Condvar::new()));
2354 let task = {
2355 let state = state.clone();
2356 executor.root_scope().compute(async move {
2357 *state.0.lock() = 1;
2358 state.1.notify_all();
2359 state.1.wait_while(&mut state.0.lock(), |state| *state == 1);
2361 std::thread::sleep(std::time::Duration::from_millis(10));
2362 *state.0.lock() = 3;
2363 "foo"
2364 })
2365 };
2366 executor.run(async move {
2367 state.1.wait_while(&mut state.0.lock(), |state| {
2368 if *state == 1 {
2369 *state = 2;
2371 false
2372 } else {
2373 true
2374 }
2375 });
2376 state.1.notify_all();
2377 assert_eq!(task.abort().await, Some("foo"));
2378 assert_eq!(*state.0.lock(), 3);
2380 });
2381 }
2382
2383 fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2384 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2385 let running = Arc::new((Mutex::new(false), Condvar::new()));
2386 let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2387 let task = {
2388 let running = running.clone();
2389 let can_quit = can_quit.clone();
2390 executor.root_scope().compute(async move {
2391 *running.0.lock() = true;
2392 running.1.notify_all();
2393 {
2394 let mut guard = can_quit.0.lock();
2395 while !*guard {
2396 can_quit.1.wait(&mut guard);
2397 }
2398 }
2399 *running.0.lock() = false;
2400 })
2401 };
2402 executor.run(async move {
2403 {
2404 let mut guard = running.0.lock();
2405 while !*guard {
2406 running.1.wait(&mut guard);
2407 }
2408 }
2409
2410 callback(task);
2411
2412 *can_quit.0.lock() = true;
2413 can_quit.1.notify_all();
2414
2415 let ehandle = EHandle::local();
2416 let scope = ehandle.global_scope();
2417
2418 while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2420 Timer::new(std::time::Duration::from_millis(1)).await;
2421 }
2422
2423 assert!(!*running.0.lock());
2424 });
2425 }
2426
2427 #[test]
2428 fn test_dropped_cancel_cleans_up() {
2429 test_clean_up(|task| {
2430 let abort_fut = std::pin::pin!(task.abort());
2431 let waker = futures::task::noop_waker();
2432 assert!(abort_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2433 });
2434 }
2435
2436 #[test]
2437 fn test_dropped_task_cleans_up() {
2438 test_clean_up(|task| {
2439 std::mem::drop(task);
2440 });
2441 }
2442
2443 #[test]
2444 fn test_detach_cleans_up() {
2445 test_clean_up(|task| {
2446 task.detach();
2447 });
2448 }
2449
2450 #[test]
2451 fn test_scope_stream() {
2452 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2453 executor.run(async move {
2454 let (stream, handle) = ScopeStream::new();
2455 handle.push(async { 1 });
2456 handle.push(async { 2 });
2457 stream.close();
2458 let results: HashSet<_> = stream.collect().await;
2459 assert_eq!(results, HashSet::from_iter([1, 2]));
2460 });
2461 }
2462
2463 #[test]
2464 fn test_scope_stream_wakes_properly() {
2465 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2466 executor.run(async move {
2467 let (stream, handle) = ScopeStream::new();
2468 handle.push(async {
2469 Timer::new(Duration::from_millis(10)).await;
2470 1
2471 });
2472 handle.push(async {
2473 Timer::new(Duration::from_millis(10)).await;
2474 2
2475 });
2476 stream.close();
2477 let results: HashSet<_> = stream.collect().await;
2478 assert_eq!(results, HashSet::from_iter([1, 2]));
2479 });
2480 }
2481
2482 #[test]
2483 fn test_scope_stream_drops_spawned_tasks() {
2484 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2485 executor.run(async move {
2486 let (stream, handle) = ScopeStream::new();
2487 handle.push(async { 1 });
2488 let _task = stream.compute(async { "foo" });
2489 stream.close();
2490 let results: HashSet<_> = stream.collect().await;
2491 assert_eq!(results, HashSet::from_iter([1]));
2492 });
2493 }
2494
2495 #[test]
2496 fn test_nested_scope_stream() {
2497 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2498 executor.run(async move {
2499 let (mut stream, handle) = ScopeStream::new();
2500 handle.clone().push(async move {
2501 handle.clone().push(async move {
2502 handle.clone().push(async move { 3 });
2503 2
2504 });
2505 1
2506 });
2507 let mut results = HashSet::default();
2508 while let Some(item) = stream.next().await {
2509 results.insert(item);
2510 if results.len() == 3 {
2511 stream.close();
2512 }
2513 }
2514 assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2515 });
2516 }
2517
2518 #[test]
2519 fn test_dropping_scope_stream_cancels_all_tasks() {
2520 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2521 executor.run(async move {
2522 let (stream, handle) = ScopeStream::new();
2523 let (tx1, mut rx) = mpsc::unbounded::<()>();
2524 let tx2 = tx1.clone();
2525 handle.push(async move {
2526 let _tx1 = tx1;
2527 let () = pending().await;
2528 });
2529 handle.push(async move {
2530 let _tx2 = tx2;
2531 let () = pending().await;
2532 });
2533 drop(stream);
2534
2535 assert_eq!(rx.next().await, None);
2537 });
2538 }
2539
2540 #[test]
2541 fn test_scope_stream_collect() {
2542 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2543 executor.run(async move {
2544 let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2545 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2546
2547 let stream: ScopeStream<_> =
2548 (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2549 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2550 });
2551 }
2552
2553 struct DropSignal(Arc<AtomicBool>);
2554
2555 impl Drop for DropSignal {
2556 fn drop(&mut self) {
2557 self.0.store(true, Ordering::SeqCst);
2558 }
2559 }
2560
2561 struct DropChecker(Arc<AtomicBool>);
2562
2563 impl DropChecker {
2564 fn new() -> (Self, DropSignal) {
2565 let inner = Arc::new(AtomicBool::new(false));
2566 (Self(inner.clone()), DropSignal(inner))
2567 }
2568
2569 fn is_dropped(&self) -> bool {
2570 self.0.load(Ordering::SeqCst)
2571 }
2572 }
2573
2574 #[test]
2575 fn child_finished_when_parent_pending() {
2576 let mut executor = LocalExecutor::default();
2577 executor.run_singlethreaded(async {
2578 let scope = Scope::new();
2579 let _guard = scope.active_guard().expect("acquire guard");
2580 let cancel = scope.to_handle().cancel();
2581 let child = scope.new_child();
2582 let (checker, signal) = DropChecker::new();
2583 child.spawn(async move {
2584 let _signal = signal;
2585 futures::future::pending::<()>().await
2586 });
2587 assert!(checker.is_dropped());
2588 assert!(child.active_guard().is_none());
2589 cancel.await;
2590 })
2591 }
2592
2593 #[test]
2594 fn guarded_scopes_observe_closed() {
2595 let mut executor = LocalExecutor::default();
2596 executor.run_singlethreaded(async {
2597 let scope = Scope::new();
2598 let handle = scope.to_handle();
2599 let _guard = scope.active_guard().expect("acquire guard");
2600 handle.close();
2601 let (checker, signal) = DropChecker::new();
2602 handle.spawn(async move {
2603 let _signal = signal;
2604 futures::future::pending::<()>().await
2605 });
2606 assert!(checker.is_dropped());
2607 let (checker, signal) = DropChecker::new();
2608 let cancel = handle.clone().cancel();
2609 handle.spawn(async move {
2610 let _signal = signal;
2611 futures::future::pending::<()>().await
2612 });
2613 assert!(checker.is_dropped());
2614 scope.join().await;
2615 cancel.await;
2616 })
2617 }
2618
2619 #[test]
2620 fn child_guard_holds_parent_cancellation() {
2621 let mut executor = TestExecutor::new();
2622 let scope = executor.global_scope().new_child();
2623 let child = scope.new_child();
2624 let guard = child.active_guard().expect("acquire guard");
2625 scope.spawn(futures::future::pending());
2626 let mut join = pin!(scope.cancel());
2627 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2628 drop(guard);
2629 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2630 }
2631
2632 #[test]
2633 fn active_guard_on_cancel() {
2634 let mut executor = TestExecutor::new();
2635 let scope = executor.global_scope().new_child();
2636 let child1 = scope.new_child();
2637 let child2 = scope.new_child();
2638 let guard = child1.active_guard().expect("acquire guard");
2639 let guard_for_right_scope = guard.clone();
2640 let guard_for_wrong_scope = guard.clone();
2641 child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2642 child2.spawn(async move {
2643 guard_for_wrong_scope.on_cancel().await;
2644 });
2645
2646 let handle = scope.to_handle();
2647 let mut join = pin!(scope.join());
2648 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2649 let cancel: Join<_> = handle.cancel();
2650 drop(cancel);
2651 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2652 }
2653
2654 #[test]
2655 fn abort_join() {
2656 let mut executor = TestExecutor::new();
2657 let scope = executor.global_scope().new_child();
2658 let child = scope.new_child();
2659 let _guard = child.active_guard().expect("acquire guard");
2660
2661 let (checker1, signal) = DropChecker::new();
2662 scope.spawn(async move {
2663 let _signal = signal;
2664 futures::future::pending::<()>().await
2665 });
2666 let (checker2, signal) = DropChecker::new();
2667 scope.spawn(async move {
2668 let _signal = signal;
2669 futures::future::pending::<()>().await
2670 });
2671
2672 let mut join = pin!(scope.cancel());
2673 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2674 assert!(!checker1.is_dropped());
2675 assert!(!checker2.is_dropped());
2676
2677 let mut join = join.abort();
2678 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2679 assert!(checker1.is_dropped());
2680 assert!(checker2.is_dropped());
2681 }
2682
2683 #[test]
2684 fn child_without_guard_aborts_immediately_on_cancel() {
2685 let mut executor = TestExecutor::new();
2686 let scope = executor.global_scope().new_child();
2687 let child = scope.new_child();
2688 let guard = scope.active_guard().expect("acquire guard");
2689
2690 let (checker_scope, signal) = DropChecker::new();
2691 scope.spawn(async move {
2692 let _signal = signal;
2693 futures::future::pending::<()>().await
2694 });
2695 let (checker_child, signal) = DropChecker::new();
2696 child.spawn(async move {
2697 let _signal = signal;
2698 futures::future::pending::<()>().await
2699 });
2700
2701 let mut join = pin!(scope.cancel());
2702 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2703 assert!(!checker_scope.is_dropped());
2704 assert!(checker_child.is_dropped());
2705
2706 drop(guard);
2707 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2708 assert!(checker_child.is_dropped());
2709 }
2710}