1use super::super::task::JoinHandle;
6use super::atomic_future::{AbortAndDetachResult, AtomicFutureHandle};
7use super::common::{Executor, TaskHandle};
8use crate::EHandle;
9use crate::condition::{Condition, ConditionGuard, WakerEntry};
10use core::{error, fmt};
11use fuchsia_sync::Mutex;
12use futures::Stream;
13use pin_project_lite::pin_project;
14use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
15use state::{JoinResult, ScopeState, ScopeWaker, Status, WakeVec};
16use std::any::Any;
17use std::borrow::Borrow;
18use std::collections::hash_map::Entry;
19use std::collections::hash_set;
20use std::future::{Future, IntoFuture};
21use std::hash;
22use std::marker::PhantomData;
23use std::mem::{self, ManuallyDrop};
24use std::ops::{Deref, DerefMut};
25use std::pin::Pin;
26use std::sync::{Arc, Weak};
27use std::task::{Context, Poll, Waker, ready};
28
29#[must_use = "Scopes should be explicitly awaited or cancelled"]
74#[derive(Debug)]
75pub struct Scope {
76 inner: ScopeHandle,
78 }
80
81impl Default for Scope {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl Scope {
88 pub fn new() -> Scope {
97 ScopeHandle::with_current(|handle| handle.new_child())
98 }
99
100 pub fn new_with_name(name: impl Into<String>) -> Scope {
109 ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
110 }
111
112 pub fn current() -> ScopeHandle {
120 ScopeHandle::with_current(|handle| handle.clone())
121 }
122
123 pub fn global() -> ScopeHandle {
140 EHandle::local().global_scope().clone()
141 }
142
143 pub fn new_child(&self) -> Scope {
145 self.inner.new_child()
146 }
147
148 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
150 self.inner.new_child_with_name(name.into())
151 }
152
153 pub fn name(&self) -> &str {
155 &self.inner.inner.name
156 }
157
158 pub fn to_handle(&self) -> ScopeHandle {
166 self.inner.clone()
167 }
168
169 pub fn as_handle(&self) -> &ScopeHandle {
176 &self.inner
177 }
178
179 pub fn join(self) -> Join {
188 Join::new(self)
189 }
190
191 pub fn close(self) -> Join {
194 self.inner.close();
195 Join::new(self)
196 }
197
198 pub fn cancel(self) -> Join {
216 self.inner.cancel_all_tasks();
217 Join::new(self)
218 }
219
220 pub fn abort(self) -> impl Future<Output = ()> {
235 self.inner.abort_all_tasks();
236 Join::new(self)
237 }
238
239 pub fn detach(self) {
245 let this = ManuallyDrop::new(self);
248 mem::drop(unsafe { std::ptr::read(&this.inner) });
251 }
252}
253
254impl Drop for Scope {
257 fn drop(&mut self) {
258 self.inner.abort_all_tasks();
267 }
268}
269
270impl IntoFuture for Scope {
271 type Output = ();
272 type IntoFuture = Join;
273 fn into_future(self) -> Self::IntoFuture {
274 self.join()
275 }
276}
277
278impl Deref for Scope {
279 type Target = ScopeHandle;
280 fn deref(&self) -> &Self::Target {
281 &self.inner
282 }
283}
284
285impl Borrow<ScopeHandle> for Scope {
286 fn borrow(&self) -> &ScopeHandle {
287 self
288 }
289}
290
291pin_project! {
292 pub struct Join<S = Scope> {
304 scope: S,
305 #[pin]
306 waker_entry: WakerEntry<ScopeState>,
307 }
308}
309
310impl<S: Borrow<ScopeHandle>> Join<S> {
311 fn new(scope: S) -> Self {
312 let waker_entry = scope.borrow().inner.state.waker_entry();
313 Self { scope, waker_entry }
314 }
315
316 pub fn abort(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
321 self.scope.borrow().abort_all_tasks();
322 self
323 }
324
325 pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
330 self.scope.borrow().cancel_all_tasks();
331 self
332 }
333}
334
335impl<S> Future for Join<S>
336where
337 S: Borrow<ScopeHandle>,
338{
339 type Output = ();
340 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341 let this = self.project();
342 let mut state = Borrow::borrow(&*this.scope).lock();
343 if state.has_tasks() {
344 state.add_waker(this.waker_entry, cx.waker().clone());
345 Poll::Pending
346 } else {
347 state.mark_finished();
348 Poll::Ready(())
349 }
350 }
351}
352
353pub trait Spawnable {
356 type Output;
358
359 fn into_task(self, scope: ScopeHandle) -> TaskHandle;
361}
362
363impl<F: Future + Send + 'static> Spawnable for F
364where
365 F::Output: Send + 'static,
366{
367 type Output = F::Output;
368
369 fn into_task(self, scope: ScopeHandle) -> TaskHandle {
370 scope.new_task(None, self)
371 }
372}
373
374#[derive(Clone)]
386pub struct ScopeHandle {
387 inner: Arc<ScopeInner>,
389 }
391
392impl ScopeHandle {
393 pub fn new_child(&self) -> Scope {
395 self.new_child_inner(String::new())
396 }
397
398 pub fn instrument_data(&self) -> Option<&(dyn Any + Send + Sync)> {
400 self.inner.instrument_data.as_deref()
401 }
402
403 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
405 self.new_child_inner(name.into())
406 }
407
408 fn new_child_inner(&self, name: String) -> Scope {
409 let mut state = self.lock();
410 let child = ScopeHandle {
411 inner: Arc::new(ScopeInner {
412 executor: self.inner.executor.clone(),
413 state: Condition::new(ScopeState::new_child(
414 self.clone(),
415 &state,
416 JoinResults::default().into(),
417 )),
418
419 instrument_data: self
420 .inner
421 .executor
422 .instrument
423 .as_ref()
424 .map(|value| value.scope_created(&name, Some(self))),
425 name,
426 }),
427 };
428 let weak = child.downgrade();
429 state.insert_child(weak);
430 Scope { inner: child }
431 }
432
433 pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
437 let task = future.into_task(self.clone());
438 let task_id = task.id();
439 self.insert_task(task, false);
440 JoinHandle::new(self.clone(), task_id)
441 }
442
443 pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
448 let task = self.new_local_task(None, future);
449 let id = task.id();
450 self.insert_task(task, false);
451 JoinHandle::new(self.clone(), id)
452 }
453
454 pub fn compute<T: Send + 'static>(
459 &self,
460 future: impl Spawnable<Output = T> + Send + 'static,
461 ) -> crate::Task<T> {
462 let task = future.into_task(self.clone());
463 let id = task.id();
464 self.insert_task(task, false);
465 JoinHandle::new(self.clone(), id).into()
466 }
467
468 pub fn compute_local<T: 'static>(
476 &self,
477 future: impl Future<Output = T> + 'static,
478 ) -> crate::Task<T> {
479 let task = self.new_local_task(None, future);
480 let id = task.id();
481 self.insert_task(task, false);
482 JoinHandle::new(self.clone(), id).into()
483 }
484
485 pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
486 ScopeHandle {
487 inner: Arc::new(ScopeInner {
488 state: Condition::new(ScopeState::new_root(JoinResults::default().into())),
489 name: "root".to_string(),
490 instrument_data: executor
491 .instrument
492 .as_ref()
493 .map(|value| value.scope_created("root", None)),
494 executor,
495 }),
496 }
497 }
498
499 pub fn close(&self) {
507 self.lock().close();
508 }
509
510 pub fn cancel(self) -> Join<Self> {
515 self.cancel_all_tasks();
516 Join::new(self)
517 }
518
519 pub fn abort(self) -> impl Future<Output = ()> {
524 self.abort_all_tasks();
525 Join::new(self)
526 }
527
528 #[must_use]
540 pub fn active_guard(&self) -> Option<ScopeActiveGuard> {
541 ScopeActiveGuard::new(self)
542 }
543
544 pub fn is_cancelled(&self) -> bool {
547 self.lock().status().is_cancelled()
548 }
549
550 pub async fn on_no_tasks(&self) {
557 self.inner
558 .state
559 .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
560 .await;
561 }
562
563 pub async fn on_no_tasks_and_guards(&self) {
566 self.inner
567 .state
568 .when(|state| {
569 if state.has_tasks() || state.guards() > 0 {
570 Poll::Pending
571 } else {
572 Poll::Ready(())
573 }
574 })
575 .await;
576 }
577
578 pub fn wake_all_with_active_guard(&self) {
580 self.lock().wake_all_with_active_guard();
581 }
582
583 pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
586 &self,
587 id: Option<usize>,
588 fut: Fut,
589 ) -> AtomicFutureHandle<'a>
590 where
591 Fut::Output: Send,
592 {
593 let id = id.unwrap_or_else(|| self.executor().next_task_id());
594 let mut task = AtomicFutureHandle::new(Some(self.clone()), id, fut);
595 if let Some(instrument) = &self.executor().instrument {
596 instrument.task_created(self, &mut task);
597 }
598 task
599 }
600
601 pub(crate) fn new_local_task<'a>(
604 &self,
605 id: Option<usize>,
606 fut: impl Future + 'a,
607 ) -> AtomicFutureHandle<'a> {
608 if !self.executor().is_local() {
610 panic!(
611 "Error: called `new_local_task` on multithreaded executor. \
612 Use `spawn` or a `LocalExecutor` instead."
613 );
614 }
615 assert_eq!(
616 self.executor().first_thread_id.get(),
617 Some(&std::thread::current().id()),
618 "Error: called `new_local_task` on a different thread to the executor",
619 );
620
621 let id = id.unwrap_or_else(|| self.executor().next_task_id());
622 unsafe {
625 let mut task = AtomicFutureHandle::new_local(Some(self.clone()), id, fut);
626 if let Some(instrument) = &self.executor().instrument {
627 instrument.task_created(self, &mut task);
628 }
629 task
630 }
631 }
632}
633
634impl fmt::Debug for ScopeHandle {
635 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
636 f.debug_struct("Scope").field("name", &self.inner.name).finish()
637 }
638}
639
640pub struct ScopeStream<R> {
650 inner: ScopeHandle,
651 stream: Arc<Mutex<ResultsStreamInner<R>>>,
652}
653
654impl<R: Send + 'static> ScopeStream<R> {
655 pub fn new() -> (Self, ScopeStreamHandle<R>) {
664 Self::new_inner(String::new())
665 }
666
667 pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
676 Self::new_inner(name.into())
677 }
678
679 fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
680 let this = ScopeHandle::with_current(|handle| {
681 let mut state = handle.lock();
682 let stream = Arc::default();
683 let child = ScopeHandle {
684 inner: Arc::new(ScopeInner {
685 executor: handle.executor().clone(),
686 state: Condition::new(ScopeState::new_child(
687 handle.clone(),
688 &state,
689 Box::new(ResultsStream { inner: Arc::clone(&stream) }),
690 )),
691 instrument_data: handle
692 .executor()
693 .instrument
694 .as_ref()
695 .map(|value| value.scope_created(&name, Some(handle))),
696 name,
697 }),
698 };
699 let weak = child.downgrade();
700 state.insert_child(weak);
701 ScopeStream { inner: child, stream }
702 });
703 let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
704 (this, handle)
705 }
706}
707
708impl<R> Drop for ScopeStream<R> {
709 fn drop(&mut self) {
710 self.inner.abort_all_tasks();
719 }
720}
721
722impl<R: Send + 'static> Stream for ScopeStream<R> {
723 type Item = R;
724
725 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
726 let mut stream_inner = self.stream.lock();
727 match stream_inner.results.pop() {
728 Some(result) => Poll::Ready(Some(result)),
729 None => {
730 drop(stream_inner);
733 let state = self.inner.lock();
734 let mut stream_inner = self.stream.lock();
735 match stream_inner.results.pop() {
736 Some(result) => Poll::Ready(Some(result)),
737 None => {
738 if state.has_tasks() {
739 stream_inner.waker = Some(cx.waker().clone());
740 Poll::Pending
741 } else {
742 Poll::Ready(None)
743 }
744 }
745 }
746 }
747 }
748 }
749}
750
751impl<R> Deref for ScopeStream<R> {
752 type Target = ScopeHandle;
753 fn deref(&self) -> &Self::Target {
754 &self.inner
755 }
756}
757
758impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
759 fn borrow(&self) -> &ScopeHandle {
760 self
761 }
762}
763
764impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
765 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
766 let (stream, handle) = ScopeStream::new();
767 for fut in iter {
768 handle.push(fut);
769 }
770 stream.close();
771 stream
772 }
773}
774
775#[derive(Clone)]
776pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
777
778impl<R: Send> ScopeStreamHandle<R> {
779 pub fn push(&self, future: impl Spawnable<Output = R>) {
780 self.0.insert_task(future.into_task(self.0.clone()), true);
781 }
782}
783
784#[derive(Debug)]
794#[must_use]
795pub struct ScopeActiveGuard(ScopeHandle);
796
797impl Deref for ScopeActiveGuard {
798 type Target = ScopeHandle;
799 fn deref(&self) -> &Self::Target {
800 &self.0
801 }
802}
803
804impl Drop for ScopeActiveGuard {
805 fn drop(&mut self) {
806 let Self(scope) = self;
807 scope.release_cancel_guard();
808 }
809}
810
811impl Clone for ScopeActiveGuard {
812 fn clone(&self) -> Self {
813 self.0.lock().acquire_cancel_guard(1);
814 Self(self.0.clone())
815 }
816}
817
818impl ScopeActiveGuard {
819 pub fn as_handle(&self) -> &ScopeHandle {
821 &self.0
822 }
823
824 pub fn to_handle(&self) -> ScopeHandle {
826 self.0.clone()
827 }
828
829 pub async fn on_cancel(&self) {
835 self.0
836 .inner
837 .state
838 .when(|s| if s.status().is_cancelled() { Poll::Ready(()) } else { Poll::Pending })
839 .await
840 }
841
842 fn new(scope: &ScopeHandle) -> Option<Self> {
843 if scope.lock().acquire_cancel_guard_if_not_finished() {
844 Some(Self(scope.clone()))
845 } else {
846 None
847 }
848 }
849}
850
851pub struct JoinError {
853 _phantom: PhantomData<()>,
854}
855
856impl fmt::Debug for JoinError {
857 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
858 f.debug_tuple("JoinError").finish()
859 }
860}
861
862impl fmt::Display for JoinError {
863 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
864 write!(f, "JoinError: a task failed to execute to completion")
865 }
866}
867
868impl error::Error for JoinError {}
869
870#[derive(Clone)]
876struct WeakScopeHandle {
877 inner: Weak<ScopeInner>,
878}
879
880impl WeakScopeHandle {
881 pub fn upgrade(&self) -> Option<ScopeHandle> {
883 self.inner.upgrade().map(|inner| ScopeHandle { inner })
884 }
885}
886
887impl hash::Hash for WeakScopeHandle {
888 fn hash<H: hash::Hasher>(&self, state: &mut H) {
889 Weak::as_ptr(&self.inner).hash(state);
890 }
891}
892
893impl PartialEq for WeakScopeHandle {
894 fn eq(&self, other: &Self) -> bool {
895 Weak::ptr_eq(&self.inner, &other.inner)
896 }
897}
898
899impl Eq for WeakScopeHandle {
900 }
903
904mod state {
907 use super::*;
908
909 pub struct ScopeState {
910 pub parent: Option<ScopeHandle>,
911 children: HashSet<WeakScopeHandle>,
913 all_tasks: HashSet<TaskHandle>,
914 subscopes_with_tasks: u32,
918 can_spawn: bool,
919 guards: u32,
920 status: Status,
921 pub results: Box<dyn Results>,
923 }
924
925 pub enum JoinResult {
926 Waker(Waker),
927 Result(TaskHandle),
928 }
929
930 #[repr(u8)] #[derive(Default, Debug, Clone, Copy)]
932 pub enum Status {
933 #[default]
934 Active,
936 PendingCancellation,
939 Finished,
942 }
943
944 impl Status {
945 pub fn is_cancelled(&self) -> bool {
947 match self {
948 Self::Active => false,
949 Self::PendingCancellation | Self::Finished => true,
950 }
951 }
952 }
953
954 impl ScopeState {
955 pub fn new_root(results: Box<impl Results>) -> Self {
956 Self {
957 parent: None,
958 children: Default::default(),
959 all_tasks: Default::default(),
960 subscopes_with_tasks: 0,
961 can_spawn: true,
962 guards: 0,
963 status: Default::default(),
964 results,
965 }
966 }
967
968 pub fn new_child(
969 parent_handle: ScopeHandle,
970 parent_state: &Self,
971 results: Box<impl Results>,
972 ) -> Self {
973 let (status, can_spawn) = match parent_state.status {
974 Status::Active => (Status::Active, parent_state.can_spawn),
975 Status::Finished | Status::PendingCancellation => (Status::Finished, false),
976 };
977 Self {
978 parent: Some(parent_handle),
979 children: Default::default(),
980 all_tasks: Default::default(),
981 subscopes_with_tasks: 0,
982 can_spawn,
983 guards: 0,
984 status,
985 results,
986 }
987 }
988 }
989
990 impl ScopeState {
991 pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
992 &self.all_tasks
993 }
994
995 pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
998 if !self.can_spawn || (!for_stream && !self.results.can_spawn()) {
999 return Some(task);
1000 }
1001 if self.all_tasks.is_empty() && !self.register_first_task() {
1002 return Some(task);
1003 }
1004 task.wake();
1005 assert!(self.all_tasks.insert(task));
1006 None
1007 }
1008
1009 pub fn children(&self) -> &HashSet<WeakScopeHandle> {
1010 &self.children
1011 }
1012
1013 pub fn insert_child(&mut self, child: WeakScopeHandle) {
1014 self.children.insert(child);
1015 }
1016
1017 pub fn remove_child(&mut self, child: &PtrKey) {
1018 let found = self.children.remove(child);
1019 assert!(found || self.children.is_empty());
1022 }
1023
1024 pub fn status(&self) -> Status {
1025 self.status
1026 }
1027
1028 pub fn guards(&self) -> u32 {
1029 self.guards
1030 }
1031
1032 pub fn close(&mut self) {
1033 self.can_spawn = false;
1034 }
1035
1036 pub fn mark_finished(&mut self) {
1037 self.can_spawn = false;
1038 self.status = Status::Finished;
1039 }
1040
1041 pub fn has_tasks(&self) -> bool {
1042 self.subscopes_with_tasks > 0
1043 }
1044
1045 pub fn wake_all_with_active_guard(&mut self) {
1046 let mut count = 0;
1047 for task in &self.all_tasks {
1048 if task.wake_with_active_guard() {
1049 count += 1;
1050 }
1051 }
1052 self.acquire_cancel_guard(count);
1053 }
1054
1055 pub fn abort_tasks_and_mark_finished(&mut self) {
1056 for task in self.all_tasks() {
1057 if task.abort() {
1058 task.scope().executor().ready_tasks.push(task.clone());
1059 }
1060 }
1063 self.mark_finished();
1064 }
1065
1066 pub fn wake_wakers_and_mark_pending(
1067 this: &mut ConditionGuard<'_, ScopeState>,
1068 wakers: &mut Vec<Waker>,
1069 ) {
1070 wakers.extend(this.drain_wakers());
1071 this.status = Status::PendingCancellation;
1072 }
1073
1074 #[must_use]
1078 fn register_first_task(&mut self) -> bool {
1079 if !self.can_spawn {
1080 return false;
1081 }
1082 let can_spawn = match &self.parent {
1083 Some(parent) => {
1084 self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
1087 }
1088 None => true,
1089 };
1090 if can_spawn {
1091 self.subscopes_with_tasks += 1;
1092 debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
1093 };
1094 can_spawn
1095 }
1096
1097 fn on_last_task_removed(
1098 this: &mut ConditionGuard<'_, ScopeState>,
1099 num_wakers_hint: usize,
1100 wakers: &mut Vec<Waker>,
1101 ) {
1102 debug_assert!(this.subscopes_with_tasks > 0);
1103 this.subscopes_with_tasks -= 1;
1104 if this.subscopes_with_tasks > 0 {
1105 wakers.reserve(num_wakers_hint);
1106 return;
1107 }
1108
1109 match &this.parent {
1110 Some(parent) => {
1111 Self::on_last_task_removed(
1112 &mut parent.lock(),
1113 num_wakers_hint + this.waker_count(),
1114 wakers,
1115 );
1116 }
1117 None => wakers.reserve(num_wakers_hint),
1118 };
1119 wakers.extend(this.drain_wakers());
1120 }
1121
1122 pub fn acquire_cancel_guard_if_not_finished(&mut self) -> bool {
1126 match self.status {
1127 Status::Active | Status::PendingCancellation => {
1128 self.acquire_cancel_guard(1);
1129 true
1130 }
1131 Status::Finished => false,
1132 }
1133 }
1134
1135 pub fn acquire_cancel_guard(&mut self, count: u32) {
1136 if count == 0 {
1137 return;
1138 }
1139 if self.guards == 0
1140 && let Some(parent) = self.parent.as_ref()
1141 {
1142 parent.acquire_cancel_guard();
1143 }
1144 self.guards += count;
1145 }
1146
1147 pub fn release_cancel_guard(
1148 this: &mut ConditionGuard<'_, Self>,
1149 wake_vec: &mut WakeVec,
1150 mut waker_count: usize,
1151 ) {
1152 this.guards = this.guards.checked_sub(1).expect("released non-acquired guard");
1153 if this.guards == 0 {
1154 waker_count += this.waker_count();
1155 this.on_zero_guards(wake_vec, waker_count);
1156 wake_vec.0.extend(this.drain_wakers())
1157 } else {
1158 wake_vec.0.reserve_exact(waker_count);
1159 }
1160 }
1161
1162 fn on_zero_guards(&mut self, wake_vec: &mut WakeVec, waker_count: usize) {
1163 match self.status {
1164 Status::Active => {}
1165 Status::PendingCancellation => {
1166 self.abort_tasks_and_mark_finished();
1167 }
1168 Status::Finished => {}
1171 }
1172 if let Some(parent) = &self.parent {
1173 ScopeState::release_cancel_guard(&mut parent.lock(), wake_vec, waker_count);
1174 }
1175 }
1176 }
1177
1178 #[derive(Default)]
1179 pub struct WakeVec(Vec<Waker>);
1180
1181 impl Drop for WakeVec {
1182 fn drop(&mut self) {
1183 for waker in self.0.drain(..) {
1184 waker.wake();
1185 }
1186 }
1187 }
1188
1189 pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
1191
1192 impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
1193 fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
1194 Self(value, WakeVec::default())
1195 }
1196 }
1197
1198 impl ScopeWaker<'_> {
1199 pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
1200 let task = self.all_tasks.take(&id);
1201 if task.is_some() {
1202 self.on_task_removed(0);
1203 }
1204 task
1205 }
1206
1207 pub fn task_did_finish(&mut self, id: usize) {
1208 if let Some(task) = self.all_tasks.take(&id) {
1209 self.on_task_removed(1);
1210 if !task.is_detached() {
1211 let maybe_waker = self.results.task_did_finish(task);
1212 self.1.0.extend(maybe_waker);
1213 }
1214 }
1215 }
1216
1217 pub fn set_closed_and_drain(
1218 &mut self,
1219 ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
1220 self.close();
1221 let all_tasks = std::mem::take(&mut self.all_tasks);
1222 let results = self.results.take();
1223 if !all_tasks.is_empty() {
1224 self.on_task_removed(0)
1225 }
1226 let children = self.children.drain();
1227 (all_tasks, results, children)
1228 }
1229
1230 fn on_task_removed(&mut self, num_wakers_hint: usize) {
1231 if self.all_tasks.is_empty() {
1232 ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1.0)
1233 }
1234 }
1235
1236 pub fn wake_wakers_and_mark_pending(&mut self) {
1237 let Self(state, wakers) = self;
1238 ScopeState::wake_wakers_and_mark_pending(state, &mut wakers.0)
1239 }
1240 }
1241
1242 impl<'a> Deref for ScopeWaker<'a> {
1243 type Target = ConditionGuard<'a, ScopeState>;
1244
1245 fn deref(&self) -> &Self::Target {
1246 &self.0
1247 }
1248 }
1249
1250 impl DerefMut for ScopeWaker<'_> {
1251 fn deref_mut(&mut self) -> &mut Self::Target {
1252 &mut self.0
1253 }
1254 }
1255}
1256
1257struct ScopeInner {
1258 executor: Arc<Executor>,
1259 state: Condition<ScopeState>,
1260 name: String,
1261 instrument_data: Option<Box<dyn Any + Send + Sync>>,
1262}
1263
1264impl Drop for ScopeInner {
1265 fn drop(&mut self) {
1266 let key = unsafe { &*(self as *const _ as *const PtrKey) };
1271 let state = self.state.lock();
1272 if let Some(parent) = &state.parent {
1273 let mut wake_vec = WakeVec::default();
1274 let mut parent_state = parent.lock();
1275 if state.guards() != 0 {
1276 ScopeState::release_cancel_guard(&mut parent_state, &mut wake_vec, 0);
1277 }
1278 parent_state.remove_child(key);
1279 }
1280 }
1281}
1282
1283impl ScopeHandle {
1284 fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
1285 super::common::TaskHandle::with_current(|task| match task {
1286 Some(task) => f(task.scope()),
1287 None => f(EHandle::local().global_scope()),
1288 })
1289 }
1290
1291 fn lock(&self) -> ConditionGuard<'_, ScopeState> {
1292 self.inner.state.lock()
1293 }
1294
1295 fn downgrade(&self) -> WeakScopeHandle {
1296 WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
1297 }
1298
1299 #[inline(always)]
1300 pub(crate) fn executor(&self) -> &Arc<Executor> {
1301 &self.inner.executor
1302 }
1303
1304 pub(crate) fn detach(&self, task_id: usize) {
1306 let _maybe_task = {
1307 let mut state = self.lock();
1308 if let Some(task) = state.all_tasks().get(&task_id) {
1309 task.detach();
1310 }
1311 state.results.detach(task_id)
1312 };
1313 }
1314
1315 pub(crate) unsafe fn abort_task<R>(&self, task_id: usize) -> Option<R> {
1321 let mut state = self.lock();
1322 if let Some(task) = state.results.detach(task_id) {
1323 drop(state);
1324 return unsafe { task.take_result() };
1325 }
1326 state.all_tasks().get(&task_id).and_then(|task| {
1327 if task.abort() {
1328 self.inner.executor.ready_tasks.push(task.clone());
1329 }
1330 unsafe { task.take_result() }
1331 })
1332 }
1333
1334 pub(crate) fn abort_and_detach(&self, task_id: usize) {
1336 let _tasks = {
1337 let mut state = ScopeWaker::from(self.lock());
1338 let maybe_task1 = state.results.detach(task_id);
1339 let mut maybe_task2 = None;
1340 if let Some(task) = state.all_tasks().get(&task_id) {
1341 match task.abort_and_detach() {
1342 AbortAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1343 AbortAndDetachResult::AddToRunQueue => {
1344 self.inner.executor.ready_tasks.push(task.clone());
1345 }
1346 AbortAndDetachResult::Pending => {}
1347 }
1348 }
1349 (maybe_task1, maybe_task2)
1350 };
1351 }
1352
1353 pub(crate) unsafe fn poll_join_result<R>(
1359 &self,
1360 task_id: usize,
1361 cx: &mut Context<'_>,
1362 ) -> Poll<R> {
1363 let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1364 match unsafe { task.take_result() } {
1365 Some(result) => Poll::Ready(result),
1366 None => {
1367 Poll::Pending
1369 }
1370 }
1371 }
1372
1373 pub(crate) unsafe fn try_poll_join_result<R>(
1380 &self,
1381 task_id: usize,
1382 cx: &mut Context<'_>,
1383 ) -> Poll<Result<R, JoinError>> {
1384 let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1385 match unsafe { task.take_result() } {
1386 Some(result) => Poll::Ready(Ok(result)),
1387 None => {
1388 if task.is_aborted() {
1389 Poll::Ready(Err(JoinError { _phantom: PhantomData }))
1390 } else {
1391 Poll::Pending
1392 }
1393 }
1394 }
1395 }
1396
1397 pub(crate) unsafe fn poll_aborted<R>(
1399 &self,
1400 task_id: usize,
1401 cx: &mut Context<'_>,
1402 ) -> Poll<Option<R>> {
1403 let task = self.lock().results.poll_join_result(task_id, cx);
1404 task.map(|task| unsafe { task.take_result() })
1405 }
1406
1407 pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1408 let returned_task = self.lock().insert_task(task, for_stream);
1409 returned_task.is_none()
1410 }
1411
1412 pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1423 let mut state = ScopeWaker::from(self.lock());
1424 let task = state.take_task(task_id);
1425 if let Some(task) = task {
1426 unsafe { task.drop_future_unchecked() };
1427 }
1428 }
1429
1430 pub(super) fn task_did_finish(&self, id: usize) {
1431 let mut state = ScopeWaker::from(self.lock());
1432 state.task_did_finish(id);
1433 }
1434
1435 fn visit_scopes_locked(&self, callback: impl Fn(&mut ScopeWaker<'_>) -> bool) {
1438 let mut scopes = vec![self.clone()];
1439 while let Some(scope) = scopes.pop() {
1440 let mut scope_waker = ScopeWaker::from(scope.lock());
1441 if callback(&mut scope_waker) {
1442 scopes.extend(scope_waker.children().iter().filter_map(|child| child.upgrade()));
1443 }
1444 }
1445 }
1446
1447 fn acquire_cancel_guard(&self) {
1448 self.lock().acquire_cancel_guard(1)
1449 }
1450
1451 pub(crate) fn release_cancel_guard(&self) {
1452 let mut wake_vec = WakeVec::default();
1453 ScopeState::release_cancel_guard(&mut self.lock(), &mut wake_vec, 0);
1454 }
1455
1456 fn cancel_all_tasks(&self) {
1458 self.visit_scopes_locked(|state| {
1459 match state.status() {
1460 Status::Active => {
1461 if state.guards() == 0 {
1462 state.abort_tasks_and_mark_finished();
1463 } else {
1464 state.wake_wakers_and_mark_pending();
1465 }
1466 true
1467 }
1468 Status::PendingCancellation => {
1469 true
1473 }
1474 Status::Finished => {
1475 false
1477 }
1478 }
1479 });
1480 }
1481
1482 fn abort_all_tasks(&self) {
1484 self.visit_scopes_locked(|state| match state.status() {
1485 Status::Active | Status::PendingCancellation => {
1486 state.abort_tasks_and_mark_finished();
1487 true
1488 }
1489 Status::Finished => false,
1490 });
1491 }
1492
1493 pub(super) fn drop_all_tasks(&self) {
1500 let mut scopes = vec![self.clone()];
1501 while let Some(scope) = scopes.pop() {
1502 let (tasks, join_results) = {
1503 let mut state = ScopeWaker::from(scope.lock());
1504 let (tasks, join_results, children) = state.set_closed_and_drain();
1505 scopes.extend(children.filter_map(|child| child.upgrade()));
1506 (tasks, join_results)
1507 };
1508 for task in tasks {
1510 task.try_drop().expect("Expected drop to succeed");
1511 }
1512 std::mem::drop(join_results);
1513 }
1514 }
1515}
1516
1517#[repr(transparent)]
1519struct PtrKey;
1520
1521impl Borrow<PtrKey> for WeakScopeHandle {
1522 fn borrow(&self) -> &PtrKey {
1523 unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1525 }
1526}
1527
1528impl PartialEq for PtrKey {
1529 fn eq(&self, other: &Self) -> bool {
1530 std::ptr::eq(self, other)
1531 }
1532}
1533
1534impl Eq for PtrKey {}
1535
1536impl hash::Hash for PtrKey {
1537 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1538 (self as *const PtrKey).hash(state);
1539 }
1540}
1541
1542#[derive(Default)]
1543struct JoinResults(HashMap<usize, JoinResult>);
1544
1545trait Results: Send + Sync + 'static {
1546 fn can_spawn(&self) -> bool;
1548
1549 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1551
1552 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1554
1555 fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1557
1558 fn take(&mut self) -> Box<dyn Any>;
1560
1561 #[cfg(test)]
1563 fn is_empty(&self) -> bool;
1564}
1565
1566impl Results for JoinResults {
1567 fn can_spawn(&self) -> bool {
1568 true
1569 }
1570
1571 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1572 match self.0.entry(task_id) {
1573 Entry::Occupied(mut o) => match o.get_mut() {
1574 JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1575 JoinResult::Result(_) => {
1576 let JoinResult::Result(task) = o.remove() else { unreachable!() };
1577 return Poll::Ready(task);
1578 }
1579 },
1580 Entry::Vacant(v) => {
1581 v.insert(JoinResult::Waker(cx.waker().clone()));
1582 }
1583 }
1584 Poll::Pending
1585 }
1586
1587 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1588 match self.0.entry(task.id()) {
1589 Entry::Occupied(mut o) => {
1590 let JoinResult::Waker(waker) =
1591 std::mem::replace(o.get_mut(), JoinResult::Result(task))
1592 else {
1593 unreachable!()
1597 };
1598 Some(waker)
1599 }
1600 Entry::Vacant(v) => {
1601 v.insert(JoinResult::Result(task));
1602 None
1603 }
1604 }
1605 }
1606
1607 fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1608 match self.0.remove(&task_id) {
1609 Some(JoinResult::Result(task)) => Some(task),
1610 _ => None,
1611 }
1612 }
1613
1614 fn take(&mut self) -> Box<dyn Any> {
1615 Box::new(Self(std::mem::take(&mut self.0)))
1616 }
1617
1618 #[cfg(test)]
1619 fn is_empty(&self) -> bool {
1620 self.0.is_empty()
1621 }
1622}
1623
1624#[derive(Default)]
1625struct ResultsStream<R> {
1626 inner: Arc<Mutex<ResultsStreamInner<R>>>,
1627}
1628
1629struct ResultsStreamInner<R> {
1630 results: Vec<R>,
1631 waker: Option<Waker>,
1632}
1633
1634impl<R> Default for ResultsStreamInner<R> {
1635 fn default() -> Self {
1636 Self { results: Vec::new(), waker: None }
1637 }
1638}
1639
1640impl<R: Send + 'static> Results for ResultsStream<R> {
1641 fn can_spawn(&self) -> bool {
1642 false
1643 }
1644
1645 fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1646 Poll::Pending
1647 }
1648
1649 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1650 let mut inner = self.inner.lock();
1651 inner.results.extend(unsafe { task.take_result() });
1654 inner.waker.take()
1655 }
1656
1657 fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1658 None
1659 }
1660
1661 fn take(&mut self) -> Box<dyn Any> {
1662 Box::new(std::mem::take(&mut self.inner.lock().results))
1663 }
1664
1665 #[cfg(test)]
1666 fn is_empty(&self) -> bool {
1667 false
1668 }
1669}
1670
1671#[cfg(test)]
1672mod tests {
1673 use super::super::super::task::CancelableJoinHandle;
1677 use super::*;
1678 use crate::{
1679 EHandle, LocalExecutor, SendExecutorBuilder, SpawnableFuture, Task, TestExecutor, Timer,
1680 yield_now,
1681 };
1682 use fuchsia_sync::{Condvar, Mutex};
1683 use futures::channel::mpsc;
1684 use futures::{FutureExt, StreamExt};
1685 use std::future::pending;
1686 use std::pin::{Pin, pin};
1687 use std::sync::Arc;
1688 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1689 use std::task::{Context, Poll};
1690 use std::time::Duration;
1691
1692 #[derive(Default)]
1693 struct RemoteControlFuture(Mutex<RCFState>);
1694 #[derive(Default)]
1695 struct RCFState {
1696 resolved: bool,
1697 waker: Option<Waker>,
1698 }
1699
1700 impl Future for &RemoteControlFuture {
1701 type Output = ();
1702 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1703 let mut this = self.0.lock();
1704 if this.resolved {
1705 Poll::Ready(())
1706 } else {
1707 this.waker.replace(cx.waker().clone());
1708 Poll::Pending
1709 }
1710 }
1711 }
1712
1713 impl RemoteControlFuture {
1714 fn new() -> Arc<Self> {
1715 Arc::new(Default::default())
1716 }
1717
1718 fn resolve(&self) {
1719 let mut this = self.0.lock();
1720 this.resolved = true;
1721 if let Some(waker) = this.waker.take() {
1722 waker.wake();
1723 }
1724 }
1725
1726 fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> + use<> {
1727 let this = Arc::clone(self);
1728 #[allow(clippy::redundant_async_block)] async move {
1730 (&*this).await
1731 }
1732 }
1733 }
1734
1735 #[test]
1736 fn compute_works_on_root_scope() {
1737 let mut executor = TestExecutor::new();
1738 let scope = executor.global_scope();
1739 let mut task = pin!(scope.compute(async { 1 }));
1740 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1741 }
1742
1743 #[test]
1744 fn compute_works_on_new_child() {
1745 let mut executor = TestExecutor::new();
1746 let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1747 let mut task = pin!(scope.compute(async { 1 }));
1748 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1749 }
1750
1751 #[test]
1752 fn scope_drop_cancels_tasks() {
1753 let mut executor = TestExecutor::new();
1754 let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1755 let mut task = pin!(scope.compute(async { 1 }));
1756 drop(scope);
1757 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1758 }
1759
1760 #[test]
1761 fn tasks_do_not_spawn_on_cancelled_scopes() {
1762 let mut executor = TestExecutor::new();
1763 let scope =
1764 executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1765 let handle = scope.to_handle();
1766 let mut cancel = pin!(scope.cancel());
1767 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1768 let mut task = pin!(handle.compute(async { 1 }));
1769 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1770 }
1771
1772 #[test]
1773 fn tasks_do_not_spawn_on_closed_empty_scopes() {
1774 let mut executor = TestExecutor::new();
1775 let scope =
1776 executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1777 let handle = scope.to_handle();
1778 let mut close = pin!(scope.cancel());
1779 assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1780 let mut task = pin!(handle.compute(async { 1 }));
1781 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1782 }
1783
1784 #[test]
1785 fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1786 let mut executor = TestExecutor::new();
1787 let scope = executor.global_scope().new_child();
1788 let handle = scope.to_handle();
1789 handle.spawn(pending());
1790 let mut close = pin!(scope.close());
1791 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1792 let mut task = pin!(handle.compute(async { 1 }));
1793 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1794 }
1795
1796 #[test]
1797 fn spawn_works_on_child_and_grandchild() {
1798 let mut executor = TestExecutor::new();
1799 let scope = executor.global_scope().new_child();
1800 let child = scope.new_child();
1801 let grandchild = child.new_child();
1802 let mut child_task = pin!(child.compute(async { 1 }));
1803 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1804 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1805 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1806 }
1807
1808 #[test]
1809 fn spawn_drop_cancels_child_and_grandchild_tasks() {
1810 let mut executor = TestExecutor::new();
1811 let scope = executor.global_scope().new_child();
1812 let child = scope.new_child();
1813 let grandchild = child.new_child();
1814 let mut child_task = pin!(child.compute(async { 1 }));
1815 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1816 drop(scope);
1817 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1818 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1819 }
1820
1821 #[test]
1822 fn completed_tasks_are_cleaned_up_after_cancel() {
1823 let mut executor = TestExecutor::new();
1824 let scope = executor.global_scope().new_child();
1825
1826 let task1 = scope.spawn(pending::<()>());
1827 let task2 = scope.spawn(async {});
1828 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1829 assert_eq!(scope.lock().all_tasks().len(), 1);
1830
1831 assert_eq!(task1.abort().now_or_never(), None);
1834 assert_eq!(task2.abort().now_or_never(), Some(Some(())));
1835
1836 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1837 assert_eq!(scope.lock().all_tasks().len(), 0);
1838 assert!(scope.lock().results.is_empty());
1839 }
1840
1841 #[test]
1842 fn join_emtpy_scope() {
1843 let mut executor = TestExecutor::new();
1844 let scope = executor.global_scope().new_child();
1845 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1846 }
1847
1848 #[test]
1849 fn task_handle_preserves_access_to_result_after_join_begins() {
1850 let mut executor = TestExecutor::new();
1851 let scope = executor.global_scope().new_child();
1852 let mut task = scope.compute(async { 1 });
1853 scope.spawn(async {});
1854 let task2 = scope.spawn(pending::<()>());
1855 let mut join = pin!(scope.join().fuse());
1858 let _ = executor.run_until_stalled(&mut join);
1859 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1860 drop(task2.abort());
1861 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1862 }
1863
1864 #[test]
1865 fn join_blocks_until_task_is_cancelled() {
1866 let mut executor = TestExecutor::new();
1869 let scope = executor.global_scope().new_child();
1870 let outstanding_task = scope.spawn(pending::<()>());
1871 let cancelled_task = scope.spawn(pending::<()>());
1872 assert_eq!(
1873 executor.run_until_stalled(&mut pin!(cancelled_task.abort())),
1874 Poll::Ready(None)
1875 );
1876 let mut join = pin!(scope.join());
1877 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1878 assert_eq!(
1879 executor.run_until_stalled(&mut pin!(outstanding_task.abort())),
1880 Poll::Ready(None)
1881 );
1882 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1883 }
1884
1885 #[test]
1886 fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1887 let mut executor = TestExecutor::new();
1888 let scope = executor.global_scope().new_child();
1889 scope.spawn(pending::<()>());
1891 let mut join = pin!(scope.join());
1892 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1893 let mut cancel = pin!(join.cancel());
1894 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1895 }
1896
1897 #[test]
1898 fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1899 let mut executor = TestExecutor::new();
1900 let scope = executor.global_scope().new_child();
1901 scope.spawn(pending::<()>());
1903 let mut close = pin!(scope.close());
1904 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1905 let mut cancel = pin!(close.cancel());
1906 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1907 }
1908
1909 #[test]
1910 fn join_scope_blocks_until_spawned_task_completes() {
1911 let mut executor = TestExecutor::new();
1912 let scope = executor.global_scope().new_child();
1913 let remote = RemoteControlFuture::new();
1914 let mut task = scope.spawn(remote.as_future());
1915 let mut scope_join = pin!(scope.join());
1916 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1917 remote.resolve();
1918 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1919 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1920 }
1921
1922 #[test]
1923 fn close_scope_blocks_until_spawned_task_completes() {
1924 let mut executor = TestExecutor::new();
1925 let scope = executor.global_scope().new_child();
1926 let remote = RemoteControlFuture::new();
1927 let mut task = scope.spawn(remote.as_future());
1928 let mut scope_close = pin!(scope.close());
1929 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1930 remote.resolve();
1931 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1932 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1933 }
1934
1935 #[test]
1936 fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1937 let mut executor = TestExecutor::new();
1938 let scope = executor.global_scope().new_child();
1939 let child = scope.new_child();
1940 let remote = RemoteControlFuture::new();
1941 child.spawn(remote.as_future());
1942 let mut scope_join = pin!(scope.join());
1943 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1944 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1945 child.detach();
1946 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1947 remote.resolve();
1948 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1949 }
1950
1951 #[test]
1952 fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1953 let mut executor = TestExecutor::new();
1954 let scope = executor.global_scope().new_child();
1955 let remote = RemoteControlFuture::new();
1956 {
1957 let remote = remote.clone();
1958 scope.spawn(async move {
1959 let child = Scope::new_with_name("child");
1960 child.spawn(async move {
1961 Scope::current().spawn(remote.as_future());
1962 });
1963 child.detach();
1964 });
1965 }
1966 let mut scope_join = pin!(scope.join());
1967 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1968 remote.resolve();
1969 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1970 }
1971
1972 #[test]
1973 fn join_scope_blocks_when_blocked_child_is_detached() {
1974 let mut executor = TestExecutor::new();
1975 let scope = executor.global_scope().new_child();
1976 let child = scope.new_child();
1977 child.spawn(pending());
1978 let mut scope_join = pin!(scope.join());
1979 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1980 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1981 child.detach();
1982 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1983 }
1984
1985 #[test]
1986 fn join_scope_completes_when_blocked_child_is_cancelled() {
1987 let mut executor = TestExecutor::new();
1988 let scope = executor.global_scope().new_child();
1989 let child = scope.new_child();
1990 child.spawn(pending());
1991 let mut scope_join = pin!(scope.join());
1992 {
1993 let mut child_join = pin!(child.join());
1994 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1995 assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1996 }
1997 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1998 }
1999
2000 #[test]
2001 fn detached_scope_can_spawn() {
2002 let mut executor = TestExecutor::new();
2003 let scope = executor.global_scope().new_child();
2004 let handle = scope.to_handle();
2005 scope.detach();
2006 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2007 }
2008
2009 #[test]
2010 fn dropped_scope_cannot_spawn() {
2011 let mut executor = TestExecutor::new();
2012 let scope = executor.global_scope().new_child();
2013 let handle = scope.to_handle();
2014 drop(scope);
2015 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2016 }
2017
2018 #[test]
2019 fn dropped_scope_with_running_task_cannot_spawn() {
2020 let mut executor = TestExecutor::new();
2021 let scope = executor.global_scope().new_child();
2022 let handle = scope.to_handle();
2023 let _running_task = handle.spawn(pending::<()>());
2024 drop(scope);
2025 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2026 }
2027
2028 #[test]
2029 fn joined_scope_cannot_spawn() {
2030 let mut executor = TestExecutor::new();
2031 let scope = executor.global_scope().new_child();
2032 let handle = scope.to_handle();
2033 let mut scope_join = pin!(scope.join());
2034 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2035 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
2036 }
2037
2038 #[test]
2039 fn joining_scope_with_running_task_can_spawn() {
2040 let mut executor = TestExecutor::new();
2041 let scope = executor.global_scope().new_child();
2042 let handle = scope.to_handle();
2043 let _running_task = handle.spawn(pending::<()>());
2044 let mut scope_join = pin!(scope.join());
2045 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
2046 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
2047 }
2048
2049 #[test]
2050 fn joined_scope_child_cannot_spawn() {
2051 let mut executor = TestExecutor::new();
2052 let scope = executor.global_scope().new_child();
2053 let handle = scope.to_handle();
2054 let child_before_join = scope.new_child();
2055 assert_eq!(
2056 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2057 Poll::Ready(1)
2058 );
2059 let mut scope_join = pin!(scope.join());
2060 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
2061 let child_after_join = handle.new_child();
2062 let grandchild_after_join = child_before_join.new_child();
2063 assert_eq!(
2064 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
2065 Poll::Pending
2066 );
2067 assert_eq!(
2068 executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
2069 Poll::Pending
2070 );
2071 assert_eq!(
2072 executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
2073 Poll::Pending
2074 );
2075 }
2076
2077 #[test]
2078 fn closed_scope_child_cannot_spawn() {
2079 let mut executor = TestExecutor::new();
2080 let scope = executor.global_scope().new_child();
2081 let handle = scope.to_handle();
2082 let child_before_close = scope.new_child();
2083 assert_eq!(
2084 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2085 Poll::Ready(1)
2086 );
2087 let mut scope_close = pin!(scope.close());
2088 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
2089 let child_after_close = handle.new_child();
2090 let grandchild_after_close = child_before_close.new_child();
2091 assert_eq!(
2092 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
2093 Poll::Pending
2094 );
2095 assert_eq!(
2096 executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
2097 Poll::Pending
2098 );
2099 assert_eq!(
2100 executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
2101 Poll::Pending
2102 );
2103 }
2104
2105 #[test]
2106 fn can_join_child_first() {
2107 let mut executor = TestExecutor::new();
2108 let scope = executor.global_scope().new_child();
2109 let child = scope.new_child();
2110 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2111 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2112 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2113 }
2114
2115 #[test]
2116 fn can_join_parent_first() {
2117 let mut executor = TestExecutor::new();
2118 let scope = executor.global_scope().new_child();
2119 let child = scope.new_child();
2120 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
2121 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
2122 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
2123 }
2124
2125 #[test]
2126 fn task_in_parent_scope_can_join_child() {
2127 let mut executor = TestExecutor::new();
2128 let scope = executor.global_scope().new_child();
2129 let child = scope.new_child();
2130 let remote = RemoteControlFuture::new();
2131 child.spawn(remote.as_future());
2132 scope.spawn(async move { child.join().await });
2133 let mut join = pin!(scope.join());
2134 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2135 remote.resolve();
2136 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2137 }
2138
2139 #[test]
2140 fn join_completes_while_completed_task_handle_is_held() {
2141 let mut executor = TestExecutor::new();
2142 let scope = executor.global_scope().new_child();
2143 let mut task = scope.compute(async { 1 });
2144 scope.spawn(async {});
2145 let mut join = pin!(scope.join());
2146 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2147 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
2148 }
2149
2150 #[test]
2151 fn cancel_completes_while_task_holds_handle() {
2152 let mut executor = TestExecutor::new();
2153 let scope = executor.global_scope().new_child();
2154 let handle = scope.to_handle();
2155 let mut task = scope.compute(async move {
2156 loop {
2157 pending::<()>().await; handle.spawn(async {});
2159 }
2160 });
2161
2162 let mut join = pin!(scope.join());
2164 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2165
2166 let mut cancel = pin!(join.cancel());
2167 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
2168 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
2169 }
2170
2171 #[test]
2172 fn cancel_from_handle_inside_task() {
2173 let mut executor = TestExecutor::new();
2174 let scope = executor.global_scope().new_child();
2175 {
2176 scope.spawn(pending::<()>());
2178
2179 let mut no_tasks = pin!(scope.on_no_tasks());
2180 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
2181
2182 let handle = scope.to_handle();
2183 scope.spawn(async move {
2184 handle.cancel().await;
2185 panic!("cancel() should never complete");
2186 });
2187
2188 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
2189 }
2190 assert_eq!(scope.join().now_or_never(), Some(()));
2191 }
2192
2193 #[test]
2194 fn can_spawn_from_non_executor_thread() {
2195 let mut executor = TestExecutor::new();
2196 let scope = executor.global_scope().clone();
2197 let done = Arc::new(AtomicBool::new(false));
2198 let done_clone = done.clone();
2199 let _ = std::thread::spawn(move || {
2200 scope.spawn(async move {
2201 done_clone.store(true, Ordering::Relaxed);
2202 })
2203 })
2204 .join();
2205 let _ = executor.run_until_stalled(&mut pending::<()>());
2206 assert!(done.load(Ordering::Relaxed));
2207 }
2208
2209 #[test]
2210 fn scope_tree() {
2211 let mut executor = TestExecutor::new();
2217 let a = executor.global_scope().new_child();
2218 let b = a.new_child();
2219 let c = b.new_child();
2220 let d = b.new_child();
2221 let a_remote = RemoteControlFuture::new();
2222 let c_remote = RemoteControlFuture::new();
2223 let d_remote = RemoteControlFuture::new();
2224 a.spawn(a_remote.as_future());
2225 c.spawn(c_remote.as_future());
2226 d.spawn(d_remote.as_future());
2227 let mut a_join = pin!(a.join());
2228 let mut b_join = pin!(b.join());
2229 let mut d_join = pin!(d.join());
2230 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2231 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2232 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
2233 d_remote.resolve();
2234 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2235 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
2236 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
2237 c_remote.resolve();
2238 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
2239 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
2240 a_remote.resolve();
2241 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
2242 let mut c_join = pin!(c.join());
2243 assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
2244 }
2245
2246 #[test]
2247 fn wake_all_with_active_guard_on_send_executor() {
2248 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2249 let scope = executor.root_scope().new_child();
2250
2251 let (tx, mut rx) = mpsc::unbounded();
2252 let state = Arc::new(AtomicU64::new(0));
2254
2255 struct PollCounter(Arc<AtomicU64>, mpsc::UnboundedSender<()>);
2256
2257 impl Future for PollCounter {
2258 type Output = ();
2259 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
2260 let old = self.0.fetch_add(1, Ordering::Relaxed);
2261 if old >> 32 == (old + 1) & u32::MAX as u64 {
2262 let _ = self.1.unbounded_send(());
2263 }
2264 Poll::Pending
2265 }
2266 }
2267
2268 scope.spawn(PollCounter(state.clone(), tx.clone()));
2269 scope.spawn(PollCounter(state.clone(), tx.clone()));
2270
2271 executor.run(async move {
2272 let mut wait_for_poll_count = async |count| {
2273 let old = state.fetch_or(count << 32, Ordering::Relaxed);
2274 if old & u32::MAX as u64 != count {
2275 rx.next().await.unwrap();
2276 }
2277 state.fetch_and(u32::MAX as u64, Ordering::Relaxed);
2278 };
2279
2280 wait_for_poll_count(2).await;
2282
2283 let mut start_count = 2;
2284 for _ in 0..2 {
2285 scope.wake_all_with_active_guard();
2286
2287 wait_for_poll_count(start_count + 2).await;
2288 start_count += 2;
2289 }
2290
2291 scope.wake_all_with_active_guard();
2293 let done = scope.cancel();
2294
2295 wait_for_poll_count(start_count + 2).await;
2296
2297 done.await;
2298 });
2299 }
2300
2301 #[test]
2302 fn on_no_tasks_race() {
2303 fn sleep_random() {
2304 std::thread::sleep(std::time::Duration::from_micros(rand::random_range(0..10)));
2305 }
2306 for _ in 0..2000 {
2307 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2308 let scope = executor.root_scope().new_child();
2309 scope.spawn(async {
2310 sleep_random();
2311 });
2312 executor.run(async move {
2313 sleep_random();
2314 scope.on_no_tasks().await;
2315 });
2316 }
2317 }
2318
2319 #[test]
2320 fn test_detach() {
2321 let mut e = LocalExecutor::default();
2322 e.run_singlethreaded(async {
2323 let counter = Arc::new(AtomicU32::new(0));
2324
2325 {
2326 let counter = counter.clone();
2327 Task::spawn(async move {
2328 for _ in 0..5 {
2329 yield_now().await;
2330 counter.fetch_add(1, Ordering::Relaxed);
2331 }
2332 })
2333 .detach();
2334 }
2335
2336 while counter.load(Ordering::Relaxed) != 5 {
2337 yield_now().await;
2338 }
2339 });
2340
2341 assert!(e.ehandle.root_scope.lock().results.is_empty());
2342 }
2343
2344 #[test]
2345 fn test_cancel() {
2346 let mut e = LocalExecutor::default();
2347 e.run_singlethreaded(async {
2348 let ref_count = Arc::new(());
2349 {
2351 let ref_count = ref_count.clone();
2352 drop(Task::spawn(async move {
2353 let _ref_count = ref_count;
2354 let _: () = std::future::pending().await;
2355 }));
2356 }
2357
2358 while Arc::strong_count(&ref_count) != 1 {
2359 yield_now().await;
2360 }
2361
2362 let task = {
2364 let ref_count = ref_count.clone();
2365 Task::spawn(async move {
2366 let _ref_count = ref_count;
2367 let _: () = std::future::pending().await;
2368 })
2369 };
2370
2371 assert_eq!(task.abort().await, None);
2372 while Arc::strong_count(&ref_count) != 1 {
2373 yield_now().await;
2374 }
2375
2376 let task = {
2378 let ref_count = ref_count.clone();
2379 Task::spawn(async move {
2380 let _ref_count = ref_count;
2381 })
2382 };
2383
2384 while Arc::strong_count(&ref_count) != 1 {
2386 yield_now().await;
2387 }
2388
2389 assert_eq!(task.abort().await, Some(()));
2390 });
2391
2392 assert!(e.ehandle.root_scope.lock().results.is_empty());
2393 }
2394
2395 #[test]
2396 fn test_cancel_waits() {
2397 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2398 let state = Arc::new((Mutex::new(0), Condvar::new()));
2399 let task = {
2400 let state = state.clone();
2401 executor.root_scope().compute(async move {
2402 *state.0.lock() = 1;
2403 state.1.notify_all();
2404 state.1.wait_while(&mut state.0.lock(), |state| *state == 1);
2406 std::thread::sleep(std::time::Duration::from_millis(10));
2407 *state.0.lock() = 3;
2408 "foo"
2409 })
2410 };
2411 executor.run(async move {
2412 state.1.wait_while(&mut state.0.lock(), |state| {
2413 if *state == 1 {
2414 *state = 2;
2416 false
2417 } else {
2418 true
2419 }
2420 });
2421 state.1.notify_all();
2422 assert_eq!(task.abort().await, Some("foo"));
2423 assert_eq!(*state.0.lock(), 3);
2425 });
2426 }
2427
2428 fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2429 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2430 let running = Arc::new((Mutex::new(false), Condvar::new()));
2431 let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2432 let task = {
2433 let running = running.clone();
2434 let can_quit = can_quit.clone();
2435 executor.root_scope().compute(async move {
2436 *running.0.lock() = true;
2437 running.1.notify_all();
2438 {
2439 let mut guard = can_quit.0.lock();
2440 while !*guard {
2441 can_quit.1.wait(&mut guard);
2442 }
2443 }
2444 *running.0.lock() = false;
2445 })
2446 };
2447 executor.run(async move {
2448 {
2449 let mut guard = running.0.lock();
2450 while !*guard {
2451 running.1.wait(&mut guard);
2452 }
2453 }
2454
2455 callback(task);
2456
2457 *can_quit.0.lock() = true;
2458 can_quit.1.notify_all();
2459
2460 let ehandle = EHandle::local();
2461 let scope = ehandle.global_scope();
2462
2463 while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2465 Timer::new(std::time::Duration::from_millis(1)).await;
2466 }
2467
2468 assert!(!*running.0.lock());
2469 });
2470 }
2471
2472 #[test]
2473 fn test_dropped_cancel_cleans_up() {
2474 test_clean_up(|task| {
2475 let abort_fut = std::pin::pin!(task.abort());
2476 let waker = futures::task::noop_waker();
2477 assert!(abort_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2478 });
2479 }
2480
2481 #[test]
2482 fn test_dropped_task_cleans_up() {
2483 test_clean_up(|task| {
2484 std::mem::drop(task);
2485 });
2486 }
2487
2488 #[test]
2489 fn test_detach_cleans_up() {
2490 test_clean_up(|task| {
2491 task.detach();
2492 });
2493 }
2494
2495 #[test]
2496 fn test_scope_stream() {
2497 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2498 executor.run(async move {
2499 let (stream, handle) = ScopeStream::new();
2500 handle.push(async { 1 });
2501 handle.push(async { 2 });
2502 stream.close();
2503 let results: HashSet<_> = stream.collect().await;
2504 assert_eq!(results, HashSet::from_iter([1, 2]));
2505 });
2506 }
2507
2508 #[test]
2509 fn test_scope_stream_wakes_properly() {
2510 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2511 executor.run(async move {
2512 let (stream, handle) = ScopeStream::new();
2513 handle.push(async {
2514 Timer::new(Duration::from_millis(10)).await;
2515 1
2516 });
2517 handle.push(async {
2518 Timer::new(Duration::from_millis(10)).await;
2519 2
2520 });
2521 stream.close();
2522 let results: HashSet<_> = stream.collect().await;
2523 assert_eq!(results, HashSet::from_iter([1, 2]));
2524 });
2525 }
2526
2527 #[test]
2528 fn test_scope_stream_drops_spawned_tasks() {
2529 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2530 executor.run(async move {
2531 let (stream, handle) = ScopeStream::new();
2532 handle.push(async { 1 });
2533 let _task = stream.compute(async { "foo" });
2534 stream.close();
2535 let results: HashSet<_> = stream.collect().await;
2536 assert_eq!(results, HashSet::from_iter([1]));
2537 });
2538 }
2539
2540 #[test]
2541 fn test_nested_scope_stream() {
2542 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2543 executor.run(async move {
2544 let (mut stream, handle) = ScopeStream::new();
2545 handle.clone().push(async move {
2546 handle.clone().push(async move {
2547 handle.clone().push(async move { 3 });
2548 2
2549 });
2550 1
2551 });
2552 let mut results = HashSet::default();
2553 while let Some(item) = stream.next().await {
2554 results.insert(item);
2555 if results.len() == 3 {
2556 stream.close();
2557 }
2558 }
2559 assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2560 });
2561 }
2562
2563 #[test]
2564 fn test_dropping_scope_stream_cancels_all_tasks() {
2565 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2566 executor.run(async move {
2567 let (stream, handle) = ScopeStream::new();
2568 let (tx1, mut rx) = mpsc::unbounded::<()>();
2569 let tx2 = tx1.clone();
2570 handle.push(async move {
2571 let _tx1 = tx1;
2572 let () = pending().await;
2573 });
2574 handle.push(async move {
2575 let _tx2 = tx2;
2576 let () = pending().await;
2577 });
2578 drop(stream);
2579
2580 assert_eq!(rx.next().await, None);
2582 });
2583 }
2584
2585 #[test]
2586 fn test_scope_stream_collect() {
2587 let mut executor = SendExecutorBuilder::new().num_threads(2).build();
2588 executor.run(async move {
2589 let stream: ScopeStream<_> = (0..10).map(|i| async move { i }).collect();
2590 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2591
2592 let stream: ScopeStream<_> =
2593 (0..10).map(|i| SpawnableFuture::new(async move { i })).collect();
2594 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2595 });
2596 }
2597
2598 struct DropSignal(Arc<AtomicBool>);
2599
2600 impl Drop for DropSignal {
2601 fn drop(&mut self) {
2602 self.0.store(true, Ordering::SeqCst);
2603 }
2604 }
2605
2606 struct DropChecker(Arc<AtomicBool>);
2607
2608 impl DropChecker {
2609 fn new() -> (Self, DropSignal) {
2610 let inner = Arc::new(AtomicBool::new(false));
2611 (Self(inner.clone()), DropSignal(inner))
2612 }
2613
2614 fn is_dropped(&self) -> bool {
2615 self.0.load(Ordering::SeqCst)
2616 }
2617 }
2618
2619 #[test]
2620 fn child_finished_when_parent_pending() {
2621 let mut executor = LocalExecutor::default();
2622 executor.run_singlethreaded(async {
2623 let scope = Scope::new();
2624 let _guard = scope.active_guard().expect("acquire guard");
2625 let cancel = scope.to_handle().cancel();
2626 let child = scope.new_child();
2627 let (checker, signal) = DropChecker::new();
2628 child.spawn(async move {
2629 let _signal = signal;
2630 futures::future::pending::<()>().await
2631 });
2632 assert!(checker.is_dropped());
2633 assert!(child.active_guard().is_none());
2634 cancel.await;
2635 })
2636 }
2637
2638 #[test]
2639 fn guarded_scopes_observe_closed() {
2640 let mut executor = LocalExecutor::default();
2641 executor.run_singlethreaded(async {
2642 let scope = Scope::new();
2643 let handle = scope.to_handle();
2644 let _guard = scope.active_guard().expect("acquire guard");
2645 handle.close();
2646 let (checker, signal) = DropChecker::new();
2647 handle.spawn(async move {
2648 let _signal = signal;
2649 futures::future::pending::<()>().await
2650 });
2651 assert!(checker.is_dropped());
2652 let (checker, signal) = DropChecker::new();
2653 let cancel = handle.clone().cancel();
2654 handle.spawn(async move {
2655 let _signal = signal;
2656 futures::future::pending::<()>().await
2657 });
2658 assert!(checker.is_dropped());
2659 scope.join().await;
2660 cancel.await;
2661 })
2662 }
2663
2664 #[test]
2665 fn child_guard_holds_parent_cancellation() {
2666 let mut executor = TestExecutor::new();
2667 let scope = executor.global_scope().new_child();
2668 let child = scope.new_child();
2669 let guard = child.active_guard().expect("acquire guard");
2670 scope.spawn(futures::future::pending());
2671 let mut join = pin!(scope.cancel());
2672 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2673 drop(guard);
2674 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2675 }
2676
2677 #[test]
2678 fn active_guard_on_cancel() {
2679 let mut executor = TestExecutor::new();
2680 let scope = executor.global_scope().new_child();
2681 let child1 = scope.new_child();
2682 let child2 = scope.new_child();
2683 let guard = child1.active_guard().expect("acquire guard");
2684 let guard_for_right_scope = guard.clone();
2685 let guard_for_wrong_scope = guard.clone();
2686 child1.spawn(async move { guard_for_right_scope.on_cancel().await });
2687 child2.spawn(async move {
2688 guard_for_wrong_scope.on_cancel().await;
2689 });
2690
2691 let handle = scope.to_handle();
2692 let mut join = pin!(scope.join());
2693 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2694 let cancel: Join<_> = handle.cancel();
2695 drop(cancel);
2696 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2697 }
2698
2699 #[test]
2700 fn abort_join() {
2701 let mut executor = TestExecutor::new();
2702 let scope = executor.global_scope().new_child();
2703 let child = scope.new_child();
2704 let _guard = child.active_guard().expect("acquire guard");
2705
2706 let (checker1, signal) = DropChecker::new();
2707 scope.spawn(async move {
2708 let _signal = signal;
2709 futures::future::pending::<()>().await
2710 });
2711 let (checker2, signal) = DropChecker::new();
2712 scope.spawn(async move {
2713 let _signal = signal;
2714 futures::future::pending::<()>().await
2715 });
2716
2717 let mut join = pin!(scope.cancel());
2718 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2719 assert!(!checker1.is_dropped());
2720 assert!(!checker2.is_dropped());
2721
2722 let mut join = join.abort();
2723 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2724 assert!(checker1.is_dropped());
2725 assert!(checker2.is_dropped());
2726 }
2727
2728 #[test]
2729 fn child_without_guard_aborts_immediately_on_cancel() {
2730 let mut executor = TestExecutor::new();
2731 let scope = executor.global_scope().new_child();
2732 let child = scope.new_child();
2733 let guard = scope.active_guard().expect("acquire guard");
2734
2735 let (checker_scope, signal) = DropChecker::new();
2736 scope.spawn(async move {
2737 let _signal = signal;
2738 futures::future::pending::<()>().await
2739 });
2740 let (checker_child, signal) = DropChecker::new();
2741 child.spawn(async move {
2742 let _signal = signal;
2743 futures::future::pending::<()>().await
2744 });
2745
2746 let mut join = pin!(scope.cancel());
2747 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
2748 assert!(!checker_scope.is_dropped());
2749 assert!(checker_child.is_dropped());
2750
2751 drop(guard);
2752 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
2753 assert!(checker_child.is_dropped());
2754 }
2755
2756 #[test]
2757 fn await_canceled_task_pends_forever() {
2758 let mut executor = TestExecutor::new();
2759 let scope = executor.global_scope().new_child();
2760
2761 let task = scope.spawn(pending::<()>());
2762 let mut main_future = pin!(async move {
2763 drop(scope);
2764 task.await;
2765 });
2766 assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Pending,);
2767 }
2768
2769 #[test]
2770 fn await_canceled_abortable_task_finishes_with_error() {
2771 let mut executor = TestExecutor::new();
2772 let scope = executor.global_scope().new_child();
2773
2774 let task = CancelableJoinHandle::from(scope.spawn(pending::<()>()));
2775 let mut main_future = pin!(async move {
2776 drop(scope);
2777 let _ = task.await;
2778 });
2779 assert_eq!(executor.run_until_stalled(&mut main_future), Poll::Ready(()),);
2780 }
2781}