1use super::super::task::JoinHandle;
6use super::atomic_future::{AtomicFutureHandle, CancelAndDetachResult};
7use super::common::{Executor, TaskHandle};
8use crate::condition::{Condition, ConditionGuard, WakerEntry};
9use crate::EHandle;
10use fuchsia_sync::Mutex;
11use futures::Stream;
12use pin_project_lite::pin_project;
13use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
14use state::{JoinResult, ScopeState, ScopeWaker, Status};
15use std::any::Any;
16use std::borrow::Borrow;
17use std::collections::hash_map::Entry;
18use std::collections::hash_set;
19use std::future::{Future, IntoFuture};
20use std::marker::PhantomData;
21use std::mem::{self, ManuallyDrop};
22use std::ops::{Deref, DerefMut};
23use std::pin::Pin;
24use std::sync::{Arc, Weak};
25use std::task::{ready, Context, Poll, Waker};
26use std::{fmt, hash};
27
28#[must_use = "Scopes should be explicitly awaited or cancelled"]
71#[derive(Debug)]
72pub struct Scope {
73 inner: ScopeHandle,
75 }
77
78impl Scope {
79 pub fn new() -> Scope {
88 ScopeHandle::with_current(|handle| handle.new_child())
89 }
90
91 pub fn new_with_name(name: impl Into<String>) -> Scope {
100 ScopeHandle::with_current(|handle| handle.new_child_with_name(name.into()))
101 }
102
103 pub fn current() -> ScopeHandle {
111 ScopeHandle::with_current(|handle| handle.clone())
112 }
113
114 pub fn global() -> ScopeHandle {
131 EHandle::local().global_scope().clone()
132 }
133
134 pub fn new_child(&self) -> Scope {
136 self.inner.new_child()
137 }
138
139 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
141 self.inner.new_child_with_name(name.into())
142 }
143
144 pub fn name(&self) -> &str {
146 &self.inner.inner.name
147 }
148
149 pub fn to_handle(&self) -> ScopeHandle {
157 self.inner.clone()
158 }
159
160 pub fn as_handle(&self) -> &ScopeHandle {
167 &self.inner
168 }
169
170 pub fn join(self) -> Join {
179 Join::new(self)
180 }
181
182 pub fn close(self) -> Join {
185 self.inner.close();
186 Join::new(self)
187 }
188
189 pub fn cancel(self) -> impl Future<Output = ()> {
203 self.inner.cancel_all_tasks();
204 Join::new(self)
205 }
206
207 pub fn detach(self) {
213 let this = ManuallyDrop::new(self);
216 mem::drop(unsafe { std::ptr::read(&this.inner) });
219 }
220}
221
222impl Drop for Scope {
225 fn drop(&mut self) {
226 self.inner.cancel_all_tasks();
235 }
236}
237
238impl IntoFuture for Scope {
239 type Output = ();
240 type IntoFuture = Join;
241 fn into_future(self) -> Self::IntoFuture {
242 self.join()
243 }
244}
245
246impl Deref for Scope {
247 type Target = ScopeHandle;
248 fn deref(&self) -> &Self::Target {
249 &self.inner
250 }
251}
252
253impl Borrow<ScopeHandle> for Scope {
254 fn borrow(&self) -> &ScopeHandle {
255 &*self
256 }
257}
258
259pin_project! {
260 pub struct Join<S = Scope> {
272 scope: S,
273 #[pin]
274 waker_entry: WakerEntry<ScopeState>,
275 }
276}
277
278impl<S> Join<S> {
279 fn new(scope: S) -> Self {
280 Self { scope, waker_entry: WakerEntry::new() }
281 }
282}
283
284impl Join {
285 pub fn cancel(self: Pin<&mut Self>) -> impl Future<Output = ()> + '_ {
290 self.scope.inner.cancel_all_tasks();
291 self
292 }
293}
294
295impl<S> Future for Join<S>
296where
297 S: Borrow<ScopeHandle>,
298{
299 type Output = ();
300 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
301 let this = self.project();
302 let mut state = Borrow::borrow(&*this.scope).lock();
303 if state.has_tasks() {
304 state.add_waker(this.waker_entry, cx.waker().clone());
305 Poll::Pending
306 } else {
307 state.mark_finished();
308 Poll::Ready(())
309 }
310 }
311}
312
313pub trait Spawnable {
316 type Output;
318
319 fn into_task(self, scope: ScopeHandle) -> TaskHandle;
321}
322
323impl<F: Future + Send + 'static> Spawnable for F
324where
325 F::Output: Send + 'static,
326{
327 type Output = F::Output;
328
329 fn into_task(self, scope: ScopeHandle) -> TaskHandle {
330 scope.new_task(None, self)
331 }
332}
333
334#[derive(Clone)]
346pub struct ScopeHandle {
347 inner: Arc<ScopeInner>,
349 }
351
352impl ScopeHandle {
353 pub fn new_child(&self) -> Scope {
355 self.new_child_inner(String::new())
356 }
357
358 pub fn new_child_with_name(&self, name: impl Into<String>) -> Scope {
360 self.new_child_inner(name.into())
361 }
362
363 fn new_child_inner(&self, name: String) -> Scope {
364 let mut state = self.lock();
365 let child = ScopeHandle {
366 inner: Arc::new(ScopeInner {
367 executor: self.inner.executor.clone(),
368 state: Condition::new(ScopeState::new(
369 Some(self.clone()),
370 state.status(),
371 JoinResults::default().into(),
372 )),
373 name,
374 }),
375 };
376 let weak = child.downgrade();
377 state.insert_child(weak);
378 Scope { inner: child }
379 }
380
381 pub fn spawn(&self, future: impl Spawnable<Output = ()>) -> JoinHandle<()> {
385 let task = future.into_task(self.clone());
386 let task_id = task.id();
387 self.insert_task(task, false);
388 JoinHandle::new(self.clone(), task_id)
389 }
390
391 pub fn spawn_local(&self, future: impl Future<Output = ()> + 'static) -> JoinHandle<()> {
396 let task = self.new_local_task(None, future);
397 let id = task.id();
398 self.insert_task(task, false);
399 JoinHandle::new(self.clone(), id)
400 }
401
402 pub fn compute<T: Send + 'static>(
407 &self,
408 future: impl Spawnable<Output = T> + Send + 'static,
409 ) -> crate::Task<T> {
410 let task = future.into_task(self.clone());
411 let id = task.id();
412 self.insert_task(task, false);
413 JoinHandle::new(self.clone(), id).into()
414 }
415
416 pub fn compute_local<T: 'static>(
424 &self,
425 future: impl Future<Output = T> + 'static,
426 ) -> crate::Task<T> {
427 let task = self.new_local_task(None, future);
428 let id = task.id();
429 self.insert_task(task, false);
430 JoinHandle::new(self.clone(), id).into()
431 }
432
433 pub(super) fn root(executor: Arc<Executor>) -> ScopeHandle {
434 ScopeHandle {
435 inner: Arc::new(ScopeInner {
436 executor,
437 state: Condition::new(ScopeState::new(
438 None,
439 Status::default(),
440 JoinResults::default().into(),
441 )),
442 name: "root".to_string(),
443 }),
444 }
445 }
446
447 pub fn close(&self) {
455 self.lock().close();
456 }
457
458 pub fn cancel(self) -> impl Future<Output = ()> {
463 self.cancel_all_tasks();
464 Join::new(self)
465 }
466
467 pub async fn on_no_tasks(&self) {
474 self.inner
475 .state
476 .when(|state| if state.has_tasks() { Poll::Pending } else { Poll::Ready(()) })
477 .await;
478 }
479
480 pub fn wake_all(&self) {
482 self.lock().wake_all();
483 }
484
485 pub(crate) fn new_task<'a, Fut: Future + Send + 'a>(
488 &self,
489 id: Option<usize>,
490 fut: Fut,
491 ) -> AtomicFutureHandle<'a>
492 where
493 Fut::Output: Send,
494 {
495 AtomicFutureHandle::new(
496 Some(self.clone()),
497 id.unwrap_or_else(|| self.executor().next_task_id()),
498 fut,
499 )
500 }
501
502 pub(crate) fn new_local_task<'a>(
505 &self,
506 id: Option<usize>,
507 fut: impl Future + 'a,
508 ) -> AtomicFutureHandle<'a> {
509 if !self.executor().is_local() {
511 panic!(
512 "Error: called `new_local_task` on multithreaded executor. \
513 Use `spawn` or a `LocalExecutor` instead."
514 );
515 }
516
517 unsafe {
520 AtomicFutureHandle::new_local(
521 Some(self.clone()),
522 id.unwrap_or_else(|| self.executor().next_task_id()),
523 fut,
524 )
525 }
526 }
527}
528
529impl fmt::Debug for ScopeHandle {
530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531 f.debug_struct("Scope").field("name", &self.inner.name).finish()
532 }
533}
534
535pub struct ScopeStream<R> {
545 inner: ScopeHandle,
546 stream: Arc<Mutex<ResultsStreamInner<R>>>,
547}
548
549impl<R: Send + 'static> ScopeStream<R> {
550 pub fn new() -> (Self, ScopeStreamHandle<R>) {
559 Self::new_inner(String::new())
560 }
561
562 pub fn new_with_name(name: impl Into<String>) -> (Self, ScopeStreamHandle<R>) {
571 Self::new_inner(name.into())
572 }
573
574 fn new_inner(name: String) -> (Self, ScopeStreamHandle<R>) {
575 let this = ScopeHandle::with_current(|handle| {
576 let mut state = handle.lock();
577 let stream = Arc::default();
578 let child = ScopeHandle {
579 inner: Arc::new(ScopeInner {
580 executor: handle.executor().clone(),
581 state: Condition::new(ScopeState::new(
582 Some(handle.clone()),
583 state.status(),
584 Box::new(ResultsStream { inner: Arc::clone(&stream) }),
585 )),
586 name,
587 }),
588 };
589 let weak = child.downgrade();
590 state.insert_child(weak);
591 ScopeStream { inner: child, stream }
592 });
593 let handle = ScopeStreamHandle(this.inner.clone(), PhantomData);
594 (this, handle)
595 }
596}
597
598impl<R> Drop for ScopeStream<R> {
599 fn drop(&mut self) {
600 self.inner.cancel_all_tasks();
609 }
610}
611
612impl<R: Send + 'static> Stream for ScopeStream<R> {
613 type Item = R;
614
615 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
616 let mut stream_inner = self.stream.lock();
617 match stream_inner.results.pop() {
618 Some(result) => Poll::Ready(Some(result)),
619 None => {
620 drop(stream_inner);
623 let state = self.inner.lock();
624 let mut stream_inner = self.stream.lock();
625 match stream_inner.results.pop() {
626 Some(result) => Poll::Ready(Some(result)),
627 None => {
628 if state.has_tasks() {
629 stream_inner.waker = Some(cx.waker().clone());
630 Poll::Pending
631 } else {
632 Poll::Ready(None)
633 }
634 }
635 }
636 }
637 }
638 }
639}
640
641impl<R> Deref for ScopeStream<R> {
642 type Target = ScopeHandle;
643 fn deref(&self) -> &Self::Target {
644 &self.inner
645 }
646}
647
648impl<R> Borrow<ScopeHandle> for ScopeStream<R> {
649 fn borrow(&self) -> &ScopeHandle {
650 &*self
651 }
652}
653
654impl<F: Spawnable<Output = R>, R: Send + 'static> FromIterator<F> for ScopeStream<R> {
655 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
656 let (stream, handle) = ScopeStream::new();
657 for fut in iter {
658 handle.push(fut);
659 }
660 stream.close();
661 stream
662 }
663}
664
665#[derive(Clone)]
666pub struct ScopeStreamHandle<R>(ScopeHandle, PhantomData<R>);
667
668impl<R: Send> ScopeStreamHandle<R> {
669 pub fn push(&self, future: impl Spawnable<Output = R>) {
670 self.0.insert_task(future.into_task(self.0.clone()), true);
671 }
672}
673
674#[derive(Clone)]
680struct WeakScopeHandle {
681 inner: Weak<ScopeInner>,
682}
683
684impl WeakScopeHandle {
685 pub fn upgrade(&self) -> Option<ScopeHandle> {
687 self.inner.upgrade().map(|inner| ScopeHandle { inner })
688 }
689}
690
691impl hash::Hash for WeakScopeHandle {
692 fn hash<H: hash::Hasher>(&self, state: &mut H) {
693 Weak::as_ptr(&self.inner).hash(state);
694 }
695}
696
697impl PartialEq for WeakScopeHandle {
698 fn eq(&self, other: &Self) -> bool {
699 Weak::ptr_eq(&self.inner, &other.inner)
700 }
701}
702
703impl Eq for WeakScopeHandle {
704 }
707
708mod state {
711 use super::*;
712
713 pub struct ScopeState {
714 pub parent: Option<ScopeHandle>,
715 children: HashSet<WeakScopeHandle>,
717 all_tasks: HashSet<TaskHandle>,
718 subscopes_with_tasks: u32,
722 status: Status,
723 pub results: Box<dyn Results>,
725 }
726
727 pub enum JoinResult {
728 Waker(Waker),
729 Result(TaskHandle),
730 }
731
732 #[repr(u8)] #[derive(Default, Debug, Clone, Copy)]
734 pub enum Status {
735 #[default]
736 Open,
738 Closed,
740 Finished,
744 }
745
746 impl Status {
747 pub fn can_spawn(&self) -> bool {
748 match self {
749 Status::Open => true,
750 Status::Closed | Status::Finished => false,
751 }
752 }
753
754 pub fn might_have_running_tasks(&self) -> bool {
755 match self {
756 Status::Open | Status::Closed => true,
757 Status::Finished => false,
758 }
759 }
760 }
761
762 impl ScopeState {
763 pub fn new(
764 parent: Option<ScopeHandle>,
765 status: Status,
766 results: Box<impl Results>,
767 ) -> Self {
768 Self {
769 parent,
770 children: Default::default(),
771 all_tasks: Default::default(),
772 subscopes_with_tasks: 0,
773 status,
774 results,
775 }
776 }
777 }
778
779 impl ScopeState {
780 pub fn all_tasks(&self) -> &HashSet<TaskHandle> {
781 &self.all_tasks
782 }
783
784 pub fn insert_task(&mut self, task: TaskHandle, for_stream: bool) -> Option<TaskHandle> {
787 if !self.status.can_spawn() || (!for_stream && !self.results.can_spawn()) {
788 return Some(task);
789 }
790 if self.all_tasks.is_empty() && !self.register_first_task() {
791 return Some(task);
792 }
793 task.wake();
794 assert!(self.all_tasks.insert(task));
795 None
796 }
797
798 pub fn children(&self) -> &HashSet<WeakScopeHandle> {
799 &self.children
800 }
801
802 pub fn insert_child(&mut self, child: WeakScopeHandle) {
803 self.children.insert(child);
804 }
805
806 pub fn remove_child(&mut self, child: &PtrKey) {
807 let found = self.children.remove(child);
808 assert!(found || self.children.is_empty());
811 }
812
813 pub fn status(&self) -> Status {
814 self.status
815 }
816
817 pub fn close(&mut self) {
818 self.status = Status::Closed;
819 }
820
821 pub fn mark_finished(&mut self) {
822 self.status = Status::Finished;
823 }
824
825 pub fn has_tasks(&self) -> bool {
826 self.subscopes_with_tasks > 0
827 }
828
829 pub fn wake_all(&self) {
830 for task in &self.all_tasks {
831 task.wake();
832 }
833 }
834
835 #[must_use]
839 fn register_first_task(&mut self) -> bool {
840 if !self.status.can_spawn() {
841 return false;
842 }
843 let can_spawn = match &self.parent {
844 Some(parent) => {
845 self.subscopes_with_tasks > 0 || parent.lock().register_first_task()
848 }
849 None => true,
850 };
851 if can_spawn {
852 self.subscopes_with_tasks += 1;
853 debug_assert!(self.subscopes_with_tasks as usize <= self.children.len() + 1);
854 };
855 can_spawn
856 }
857
858 fn on_last_task_removed(
859 this: &mut ConditionGuard<'_, ScopeState>,
860 num_wakers_hint: usize,
861 wakers: &mut Vec<Waker>,
862 ) {
863 debug_assert!(this.subscopes_with_tasks > 0);
864 this.subscopes_with_tasks -= 1;
865 if this.subscopes_with_tasks > 0 {
866 wakers.reserve(num_wakers_hint);
867 return;
868 }
869
870 match &this.parent {
871 Some(parent) => {
872 Self::on_last_task_removed(
873 &mut parent.lock(),
874 num_wakers_hint + this.waker_count(),
875 wakers,
876 );
877 }
878 None => wakers.reserve(num_wakers_hint),
879 };
880 wakers.extend(this.drain_wakers());
881 }
882 }
883
884 #[derive(Default)]
885 struct WakeVec(Vec<Waker>);
886
887 impl Drop for WakeVec {
888 fn drop(&mut self) {
889 for waker in self.0.drain(..) {
890 waker.wake();
891 }
892 }
893 }
894
895 pub struct ScopeWaker<'a>(ConditionGuard<'a, ScopeState>, WakeVec);
897
898 impl<'a> From<ConditionGuard<'a, ScopeState>> for ScopeWaker<'a> {
899 fn from(value: ConditionGuard<'a, ScopeState>) -> Self {
900 Self(value, WakeVec::default())
901 }
902 }
903
904 impl ScopeWaker<'_> {
905 pub fn take_task(&mut self, id: usize) -> Option<TaskHandle> {
906 let task = self.all_tasks.take(&id);
907 if task.is_some() {
908 self.on_task_removed(0);
909 }
910 task
911 }
912
913 pub fn task_did_finish(&mut self, id: usize) {
914 if let Some(task) = self.all_tasks.take(&id) {
915 self.on_task_removed(1);
916 if !task.is_detached() {
917 let maybe_waker = self.results.task_did_finish(task);
918 self.1 .0.extend(maybe_waker);
919 }
920 }
921 }
922
923 pub fn set_closed_and_drain(
924 &mut self,
925 ) -> (HashSet<TaskHandle>, Box<dyn Any>, hash_set::Drain<'_, WeakScopeHandle>) {
926 self.close();
927 let all_tasks = std::mem::take(&mut self.all_tasks);
928 let results = self.results.take();
929 if !all_tasks.is_empty() {
930 self.on_task_removed(0)
931 }
932 let children = self.children.drain();
933 (all_tasks, results, children)
934 }
935
936 fn on_task_removed(&mut self, num_wakers_hint: usize) {
937 if self.all_tasks.is_empty() {
938 ScopeState::on_last_task_removed(&mut self.0, num_wakers_hint, &mut self.1 .0)
939 }
940 }
941 }
942
943 impl<'a> Deref for ScopeWaker<'a> {
944 type Target = ConditionGuard<'a, ScopeState>;
945
946 fn deref(&self) -> &Self::Target {
947 &self.0
948 }
949 }
950
951 impl DerefMut for ScopeWaker<'_> {
952 fn deref_mut(&mut self) -> &mut Self::Target {
953 &mut self.0
954 }
955 }
956}
957
958struct ScopeInner {
959 executor: Arc<Executor>,
960 state: Condition<ScopeState>,
961 name: String,
962}
963
964impl Drop for ScopeInner {
965 fn drop(&mut self) {
966 let key = unsafe { &*(self as *const _ as *const PtrKey) };
971 if let Some(parent) = &self.state.lock().parent {
972 let mut parent_state = parent.lock();
973 parent_state.remove_child(key);
974 }
975 }
976}
977
978impl ScopeHandle {
979 fn with_current<R>(f: impl FnOnce(&ScopeHandle) -> R) -> R {
980 super::common::TaskHandle::with_current(|task| match task {
981 Some(task) => f(task.scope()),
982 None => f(EHandle::local().global_scope()),
983 })
984 }
985
986 fn lock(&self) -> ConditionGuard<'_, ScopeState> {
987 self.inner.state.lock()
988 }
989
990 fn downgrade(&self) -> WeakScopeHandle {
991 WeakScopeHandle { inner: Arc::downgrade(&self.inner) }
992 }
993
994 #[inline(always)]
995 pub(crate) fn executor(&self) -> &Arc<Executor> {
996 &self.inner.executor
997 }
998
999 pub(crate) fn detach(&self, task_id: usize) {
1001 let _maybe_task = {
1002 let mut state = self.lock();
1003 if let Some(task) = state.all_tasks().get(&task_id) {
1004 task.detach();
1005 }
1006 state.results.detach(task_id)
1007 };
1008 }
1009
1010 pub(crate) unsafe fn cancel_task<R>(&self, task_id: usize) -> Option<R> {
1016 let mut state = self.lock();
1017 if let Some(task) = state.results.detach(task_id) {
1018 drop(state);
1019 return task.take_result();
1020 }
1021 state.all_tasks().get(&task_id).and_then(|task| {
1022 if task.cancel() {
1023 self.inner.executor.ready_tasks.push(task.clone());
1024 }
1025 task.take_result()
1026 })
1027 }
1028
1029 pub(crate) fn cancel_and_detach(&self, task_id: usize) {
1031 let _tasks = {
1032 let mut state = ScopeWaker::from(self.lock());
1033 let maybe_task1 = state.results.detach(task_id);
1034 let mut maybe_task2 = None;
1035 if let Some(task) = state.all_tasks().get(&task_id) {
1036 match task.cancel_and_detach() {
1037 CancelAndDetachResult::Done => maybe_task2 = state.take_task(task_id),
1038 CancelAndDetachResult::AddToRunQueue => {
1039 self.inner.executor.ready_tasks.push(task.clone());
1040 }
1041 CancelAndDetachResult::Pending => {}
1042 }
1043 }
1044 (maybe_task1, maybe_task2)
1045 };
1046 }
1047
1048 pub(crate) unsafe fn poll_join_result<R>(
1054 &self,
1055 task_id: usize,
1056 cx: &mut Context<'_>,
1057 ) -> Poll<R> {
1058 let task = ready!(self.lock().results.poll_join_result(task_id, cx));
1059 match task.take_result() {
1060 Some(result) => Poll::Ready(result),
1061 None => {
1062 Poll::Pending
1064 }
1065 }
1066 }
1067
1068 pub(crate) unsafe fn poll_cancelled<R>(
1070 &self,
1071 task_id: usize,
1072 cx: &mut Context<'_>,
1073 ) -> Poll<Option<R>> {
1074 let task = self.lock().results.poll_join_result(task_id, cx);
1075 task.map(|task| task.take_result())
1076 }
1077
1078 pub(super) fn insert_task(&self, task: TaskHandle, for_stream: bool) -> bool {
1079 let returned_task = self.lock().insert_task(task, for_stream);
1080 returned_task.is_none()
1081 }
1082
1083 pub(super) unsafe fn drop_task_unchecked(&self, task_id: usize) {
1094 let mut state = ScopeWaker::from(self.lock());
1095 let task = state.take_task(task_id);
1096 if let Some(task) = task {
1097 task.drop_future_unchecked();
1098 }
1099 }
1100
1101 pub(super) fn task_did_finish(&self, id: usize) {
1102 let mut state = ScopeWaker::from(self.lock());
1103 state.task_did_finish(id);
1104 }
1105
1106 fn cancel_all_tasks(&self) {
1108 let mut scopes = vec![self.clone()];
1109 while let Some(scope) = scopes.pop() {
1110 let mut state = scope.lock();
1111 if !state.status().might_have_running_tasks() {
1112 continue;
1114 }
1115 for task in state.all_tasks() {
1116 if task.cancel() {
1117 task.scope().executor().ready_tasks.push(task.clone());
1118 }
1119 }
1122 scopes.extend(state.children().iter().filter_map(|child| child.upgrade()));
1124 state.mark_finished();
1125 }
1126 }
1127
1128 pub(super) fn drop_all_tasks(&self) {
1135 let mut scopes = vec![self.clone()];
1136 while let Some(scope) = scopes.pop() {
1137 let (tasks, join_results) = {
1138 let mut state = ScopeWaker::from(scope.lock());
1139 let (tasks, join_results, children) = state.set_closed_and_drain();
1140 scopes.extend(children.filter_map(|child| child.upgrade()));
1141 (tasks, join_results)
1142 };
1143 for task in tasks {
1145 task.try_drop().expect("Expected drop to succeed");
1146 }
1147 std::mem::drop(join_results);
1148 }
1149 }
1150}
1151
1152#[repr(transparent)]
1154struct PtrKey;
1155
1156impl Borrow<PtrKey> for WeakScopeHandle {
1157 fn borrow(&self) -> &PtrKey {
1158 unsafe { &*(self.inner.as_ptr() as *const PtrKey) }
1160 }
1161}
1162
1163impl PartialEq for PtrKey {
1164 fn eq(&self, other: &Self) -> bool {
1165 self as *const _ == other as *const _
1166 }
1167}
1168
1169impl Eq for PtrKey {}
1170
1171impl hash::Hash for PtrKey {
1172 fn hash<H: hash::Hasher>(&self, state: &mut H) {
1173 (self as *const PtrKey).hash(state);
1174 }
1175}
1176
1177#[derive(Default)]
1178struct JoinResults(HashMap<usize, JoinResult>);
1179
1180trait Results: Send + Sync + 'static {
1181 fn can_spawn(&self) -> bool;
1183
1184 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle>;
1186
1187 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker>;
1189
1190 fn detach(&mut self, task_id: usize) -> Option<TaskHandle>;
1192
1193 fn take(&mut self) -> Box<dyn Any>;
1195
1196 #[cfg(test)]
1198 fn is_empty(&self) -> bool;
1199}
1200
1201impl Results for JoinResults {
1202 fn can_spawn(&self) -> bool {
1203 true
1204 }
1205
1206 fn poll_join_result(&mut self, task_id: usize, cx: &mut Context<'_>) -> Poll<TaskHandle> {
1207 match self.0.entry(task_id) {
1208 Entry::Occupied(mut o) => match o.get_mut() {
1209 JoinResult::Waker(waker) => *waker = cx.waker().clone(),
1210 JoinResult::Result(_) => {
1211 let JoinResult::Result(task) = o.remove() else { unreachable!() };
1212 return Poll::Ready(task);
1213 }
1214 },
1215 Entry::Vacant(v) => {
1216 v.insert(JoinResult::Waker(cx.waker().clone()));
1217 }
1218 }
1219 Poll::Pending
1220 }
1221
1222 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1223 match self.0.entry(task.id()) {
1224 Entry::Occupied(mut o) => {
1225 let JoinResult::Waker(waker) =
1226 std::mem::replace(o.get_mut(), JoinResult::Result(task))
1227 else {
1228 unreachable!()
1232 };
1233 Some(waker)
1234 }
1235 Entry::Vacant(v) => {
1236 v.insert(JoinResult::Result(task));
1237 None
1238 }
1239 }
1240 }
1241
1242 fn detach(&mut self, task_id: usize) -> Option<TaskHandle> {
1243 match self.0.remove(&task_id) {
1244 Some(JoinResult::Result(task)) => Some(task),
1245 _ => None,
1246 }
1247 }
1248
1249 fn take(&mut self) -> Box<dyn Any> {
1250 Box::new(Self(std::mem::take(&mut self.0)))
1251 }
1252
1253 #[cfg(test)]
1254 fn is_empty(&self) -> bool {
1255 self.0.is_empty()
1256 }
1257}
1258
1259#[derive(Default)]
1260struct ResultsStream<R> {
1261 inner: Arc<Mutex<ResultsStreamInner<R>>>,
1262}
1263
1264struct ResultsStreamInner<R> {
1265 results: Vec<R>,
1266 waker: Option<Waker>,
1267}
1268
1269impl<R> Default for ResultsStreamInner<R> {
1270 fn default() -> Self {
1271 Self { results: Vec::new(), waker: None }
1272 }
1273}
1274
1275impl<R: Send + 'static> Results for ResultsStream<R> {
1276 fn can_spawn(&self) -> bool {
1277 false
1278 }
1279
1280 fn poll_join_result(&mut self, _task_id: usize, _cx: &mut Context<'_>) -> Poll<TaskHandle> {
1281 Poll::Pending
1282 }
1283
1284 fn task_did_finish(&mut self, task: TaskHandle) -> Option<Waker> {
1285 let mut inner = self.inner.lock();
1286 inner.results.extend(unsafe { task.take_result() });
1289 inner.waker.take()
1290 }
1291
1292 fn detach(&mut self, _task_id: usize) -> Option<TaskHandle> {
1293 None
1294 }
1295
1296 fn take(&mut self) -> Box<dyn Any> {
1297 Box::new(std::mem::take(&mut self.inner.lock().results))
1298 }
1299
1300 #[cfg(test)]
1301 fn is_empty(&self) -> bool {
1302 false
1303 }
1304}
1305
1306#[cfg(test)]
1307mod tests {
1308 use super::*;
1309 use crate::{EHandle, LocalExecutor, SendExecutor, SpawnableFuture, Task, TestExecutor, Timer};
1310 use assert_matches::assert_matches;
1311 use fuchsia_sync::{Condvar, Mutex};
1312 use futures::channel::mpsc;
1313 use futures::future::join_all;
1314 use futures::{FutureExt, StreamExt};
1315 use std::future::{pending, poll_fn};
1316 use std::pin::{pin, Pin};
1317 use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1318 use std::sync::Arc;
1319 use std::task::{Context, Poll};
1320 use std::time::Duration;
1321
1322 #[derive(Default)]
1323 struct RemoteControlFuture(Mutex<RCFState>);
1324 #[derive(Default)]
1325 struct RCFState {
1326 resolved: bool,
1327 waker: Option<Waker>,
1328 }
1329
1330 impl Future for &RemoteControlFuture {
1331 type Output = ();
1332 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1333 let mut this = self.0.lock();
1334 if this.resolved {
1335 Poll::Ready(())
1336 } else {
1337 this.waker.replace(cx.waker().clone());
1338 Poll::Pending
1339 }
1340 }
1341 }
1342
1343 impl RemoteControlFuture {
1344 fn new() -> Arc<Self> {
1345 Arc::new(Default::default())
1346 }
1347
1348 fn resolve(&self) {
1349 let mut this = self.0.lock();
1350 this.resolved = true;
1351 if let Some(waker) = this.waker.take() {
1352 waker.wake();
1353 }
1354 }
1355
1356 fn as_future(self: &Arc<Self>) -> impl Future<Output = ()> {
1357 let this = Arc::clone(self);
1358 async move { (&*this).await }
1359 }
1360 }
1361
1362 #[test]
1363 fn compute_works_on_root_scope() {
1364 let mut executor = TestExecutor::new();
1365 let scope = executor.global_scope();
1366 let mut task = pin!(scope.compute(async { 1 }));
1367 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1368 }
1369
1370 #[test]
1371 fn compute_works_on_new_child() {
1372 let mut executor = TestExecutor::new();
1373 let scope = executor.global_scope().new_child_with_name("compute_works_on_new_child");
1374 let mut task = pin!(scope.compute(async { 1 }));
1375 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1376 }
1377
1378 #[test]
1379 fn scope_drop_cancels_tasks() {
1380 let mut executor = TestExecutor::new();
1381 let scope = executor.global_scope().new_child_with_name("scope_drop_cancels_tasks");
1382 let mut task = pin!(scope.compute(async { 1 }));
1383 drop(scope);
1384 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1385 }
1386
1387 #[test]
1388 fn tasks_do_not_spawn_on_cancelled_scopes() {
1389 let mut executor = TestExecutor::new();
1390 let scope =
1391 executor.global_scope().new_child_with_name("tasks_do_not_spawn_on_cancelled_scopes");
1392 let handle = scope.to_handle();
1393 let mut cancel = pin!(scope.cancel());
1394 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1395 let mut task = pin!(handle.compute(async { 1 }));
1396 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1397 }
1398
1399 #[test]
1400 fn tasks_do_not_spawn_on_closed_empty_scopes() {
1401 let mut executor = TestExecutor::new();
1402 let scope =
1403 executor.global_scope().new_child_with_name("tasks_do_not_spawn_closed_empty_scopes");
1404 let handle = scope.to_handle();
1405 let mut close = pin!(scope.cancel());
1406 assert_eq!(executor.run_until_stalled(&mut close), Poll::Ready(()));
1407 let mut task = pin!(handle.compute(async { 1 }));
1408 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1409 }
1410
1411 #[test]
1412 fn tasks_do_not_spawn_on_closed_nonempty_scopes() {
1413 let mut executor = TestExecutor::new();
1414 let scope = executor.global_scope().new_child();
1415 let handle = scope.to_handle();
1416 handle.spawn(pending());
1417 let mut close = pin!(scope.close());
1418 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1419 let mut task = pin!(handle.compute(async { 1 }));
1420 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1421 }
1422
1423 #[test]
1424 fn spawn_works_on_child_and_grandchild() {
1425 let mut executor = TestExecutor::new();
1426 let scope = executor.global_scope().new_child();
1427 let child = scope.new_child();
1428 let grandchild = child.new_child();
1429 let mut child_task = pin!(child.compute(async { 1 }));
1430 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1431 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Ready(1));
1432 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Ready(1));
1433 }
1434
1435 #[test]
1436 fn spawn_drop_cancels_child_and_grandchild_tasks() {
1437 let mut executor = TestExecutor::new();
1438 let scope = executor.global_scope().new_child();
1439 let child = scope.new_child();
1440 let grandchild = child.new_child();
1441 let mut child_task = pin!(child.compute(async { 1 }));
1442 let mut grandchild_task = pin!(grandchild.compute(async { 1 }));
1443 drop(scope);
1444 assert_eq!(executor.run_until_stalled(&mut child_task), Poll::Pending);
1445 assert_eq!(executor.run_until_stalled(&mut grandchild_task), Poll::Pending);
1446 }
1447
1448 #[test]
1449 fn completed_tasks_are_cleaned_up_after_cancel() {
1450 let mut executor = TestExecutor::new();
1451 let scope = executor.global_scope().new_child();
1452
1453 let task1 = scope.spawn(pending::<()>());
1454 let task2 = scope.spawn(async {});
1455 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1456 assert_eq!(scope.lock().all_tasks().len(), 1);
1457
1458 assert_eq!(task1.cancel().now_or_never(), None);
1461 assert_eq!(task2.cancel().now_or_never(), Some(Some(())));
1462
1463 assert_eq!(executor.run_until_stalled(&mut pending::<()>()), Poll::Pending);
1464 assert_eq!(scope.lock().all_tasks().len(), 0);
1465 assert!(scope.lock().results.is_empty());
1466 }
1467
1468 #[test]
1469 fn join_emtpy_scope() {
1470 let mut executor = TestExecutor::new();
1471 let scope = executor.global_scope().new_child();
1472 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1473 }
1474
1475 #[test]
1476 fn task_handle_preserves_access_to_result_after_join_begins() {
1477 let mut executor = TestExecutor::new();
1478 let scope = executor.global_scope().new_child();
1479 let mut task = scope.compute(async { 1 });
1480 scope.spawn(async {});
1481 let task2 = scope.spawn(pending::<()>());
1482 let mut join = pin!(scope.join().fuse());
1485 let _ = executor.run_until_stalled(&mut join);
1486 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1487 let _ = task2.cancel();
1488 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1489 }
1490
1491 #[test]
1492 fn join_blocks_until_task_is_cancelled() {
1493 let mut executor = TestExecutor::new();
1496 let scope = executor.global_scope().new_child();
1497 let outstanding_task = scope.spawn(pending::<()>());
1498 let cancelled_task = scope.spawn(pending::<()>());
1499 assert_eq!(
1500 executor.run_until_stalled(&mut pin!(cancelled_task.cancel())),
1501 Poll::Ready(None)
1502 );
1503 let mut join = pin!(scope.join());
1504 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1505 assert_eq!(
1506 executor.run_until_stalled(&mut pin!(outstanding_task.cancel())),
1507 Poll::Ready(None)
1508 );
1509 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1510 }
1511
1512 #[test]
1513 fn join_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1514 let mut executor = TestExecutor::new();
1515 let scope = executor.global_scope().new_child();
1516 scope.spawn(pending::<()>());
1518 let mut join = pin!(scope.join());
1519 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1520 let mut cancel = pin!(join.cancel());
1521 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1522 }
1523
1524 #[test]
1525 fn close_blocks_but_cancel_succeeds_if_detached_task_never_completes() {
1526 let mut executor = TestExecutor::new();
1527 let scope = executor.global_scope().new_child();
1528 scope.spawn(pending::<()>());
1530 let mut close = pin!(scope.close());
1531 assert_eq!(executor.run_until_stalled(&mut close), Poll::Pending);
1532 let mut cancel = pin!(close.cancel());
1533 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1534 }
1535
1536 #[test]
1537 fn join_scope_blocks_until_spawned_task_completes() {
1538 let mut executor = TestExecutor::new();
1539 let scope = executor.global_scope().new_child();
1540 let remote = RemoteControlFuture::new();
1541 let mut task = scope.spawn(remote.as_future());
1542 let mut scope_join = pin!(scope.join());
1543 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1544 remote.resolve();
1545 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1546 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1547 }
1548
1549 #[test]
1550 fn close_scope_blocks_until_spawned_task_completes() {
1551 let mut executor = TestExecutor::new();
1552 let scope = executor.global_scope().new_child();
1553 let remote = RemoteControlFuture::new();
1554 let mut task = scope.spawn(remote.as_future());
1555 let mut scope_close = pin!(scope.close());
1556 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Pending);
1557 remote.resolve();
1558 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1559 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(()));
1560 }
1561
1562 #[test]
1563 fn join_scope_blocks_until_detached_task_of_detached_child_completes() {
1564 let mut executor = TestExecutor::new();
1565 let scope = executor.global_scope().new_child();
1566 let child = scope.new_child();
1567 let remote = RemoteControlFuture::new();
1568 child.spawn(remote.as_future());
1569 let mut scope_join = pin!(scope.join());
1570 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1571 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1572 child.detach();
1573 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1574 remote.resolve();
1575 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1576 }
1577
1578 #[test]
1579 fn join_scope_blocks_until_task_spawned_from_nested_detached_scope_completes() {
1580 let mut executor = TestExecutor::new();
1581 let scope = executor.global_scope().new_child();
1582 let remote = RemoteControlFuture::new();
1583 {
1584 let remote = remote.clone();
1585 scope.spawn(async move {
1586 let child = Scope::new_with_name("child");
1587 child.spawn(async move {
1588 Scope::current().spawn(remote.as_future());
1589 });
1590 child.detach();
1591 });
1592 }
1593 let mut scope_join = pin!(scope.join());
1594 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1595 remote.resolve();
1596 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1597 }
1598
1599 #[test]
1600 fn join_scope_blocks_when_blocked_child_is_detached() {
1601 let mut executor = TestExecutor::new();
1602 let scope = executor.global_scope().new_child();
1603 let child = scope.new_child();
1604 child.spawn(pending());
1605 let mut scope_join = pin!(scope.join());
1606 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1607 assert_eq!(executor.run_until_stalled(&mut pin!(child.on_no_tasks())), Poll::Pending);
1608 child.detach();
1609 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1610 }
1611
1612 #[test]
1613 fn join_scope_completes_when_blocked_child_is_cancelled() {
1614 let mut executor = TestExecutor::new();
1615 let scope = executor.global_scope().new_child();
1616 let child = scope.new_child();
1617 child.spawn(pending());
1618 let mut scope_join = pin!(scope.join());
1619 {
1620 let mut child_join = pin!(child.join());
1621 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1622 assert_eq!(executor.run_until_stalled(&mut child_join), Poll::Pending);
1623 }
1624 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1625 }
1626
1627 #[test]
1628 fn detached_scope_can_spawn() {
1629 let mut executor = TestExecutor::new();
1630 let scope = executor.global_scope().new_child();
1631 let handle = scope.to_handle();
1632 scope.detach();
1633 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1634 }
1635
1636 #[test]
1637 fn dropped_scope_cannot_spawn() {
1638 let mut executor = TestExecutor::new();
1639 let scope = executor.global_scope().new_child();
1640 let handle = scope.to_handle();
1641 drop(scope);
1642 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1643 }
1644
1645 #[test]
1646 fn dropped_scope_with_running_task_cannot_spawn() {
1647 let mut executor = TestExecutor::new();
1648 let scope = executor.global_scope().new_child();
1649 let handle = scope.to_handle();
1650 let _running_task = handle.spawn(pending::<()>());
1651 drop(scope);
1652 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1653 }
1654
1655 #[test]
1656 fn joined_scope_cannot_spawn() {
1657 let mut executor = TestExecutor::new();
1658 let scope = executor.global_scope().new_child();
1659 let handle = scope.to_handle();
1660 let mut scope_join = pin!(scope.join());
1661 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1662 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Pending);
1663 }
1664
1665 #[test]
1666 fn joining_scope_with_running_task_can_spawn() {
1667 let mut executor = TestExecutor::new();
1668 let scope = executor.global_scope().new_child();
1669 let handle = scope.to_handle();
1670 let _running_task = handle.spawn(pending::<()>());
1671 let mut scope_join = pin!(scope.join());
1672 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Pending);
1673 assert_eq!(executor.run_until_stalled(&mut handle.compute(async { 1 })), Poll::Ready(1));
1674 }
1675
1676 #[test]
1677 fn joined_scope_child_cannot_spawn() {
1678 let mut executor = TestExecutor::new();
1679 let scope = executor.global_scope().new_child();
1680 let handle = scope.to_handle();
1681 let child_before_join = scope.new_child();
1682 assert_eq!(
1683 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1684 Poll::Ready(1)
1685 );
1686 let mut scope_join = pin!(scope.join());
1687 assert_eq!(executor.run_until_stalled(&mut scope_join), Poll::Ready(()));
1688 let child_after_join = handle.new_child();
1689 let grandchild_after_join = child_before_join.new_child();
1690 assert_eq!(
1691 executor.run_until_stalled(&mut child_before_join.compute(async { 1 })),
1692 Poll::Pending
1693 );
1694 assert_eq!(
1695 executor.run_until_stalled(&mut child_after_join.compute(async { 1 })),
1696 Poll::Pending
1697 );
1698 assert_eq!(
1699 executor.run_until_stalled(&mut grandchild_after_join.compute(async { 1 })),
1700 Poll::Pending
1701 );
1702 }
1703
1704 #[test]
1705 fn closed_scope_child_cannot_spawn() {
1706 let mut executor = TestExecutor::new();
1707 let scope = executor.global_scope().new_child();
1708 let handle = scope.to_handle();
1709 let child_before_close = scope.new_child();
1710 assert_eq!(
1711 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1712 Poll::Ready(1)
1713 );
1714 let mut scope_close = pin!(scope.close());
1715 assert_eq!(executor.run_until_stalled(&mut scope_close), Poll::Ready(()));
1716 let child_after_close = handle.new_child();
1717 let grandchild_after_close = child_before_close.new_child();
1718 assert_eq!(
1719 executor.run_until_stalled(&mut child_before_close.compute(async { 1 })),
1720 Poll::Pending
1721 );
1722 assert_eq!(
1723 executor.run_until_stalled(&mut child_after_close.compute(async { 1 })),
1724 Poll::Pending
1725 );
1726 assert_eq!(
1727 executor.run_until_stalled(&mut grandchild_after_close.compute(async { 1 })),
1728 Poll::Pending
1729 );
1730 }
1731
1732 #[test]
1733 fn can_join_child_first() {
1734 let mut executor = TestExecutor::new();
1735 let scope = executor.global_scope().new_child();
1736 let child = scope.new_child();
1737 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
1738 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
1739 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1740 }
1741
1742 #[test]
1743 fn can_join_parent_first() {
1744 let mut executor = TestExecutor::new();
1745 let scope = executor.global_scope().new_child();
1746 let child = scope.new_child();
1747 assert_eq!(executor.run_until_stalled(&mut child.compute(async { 1 })), Poll::Ready(1));
1748 assert_eq!(executor.run_until_stalled(&mut pin!(scope.join())), Poll::Ready(()));
1749 assert_eq!(executor.run_until_stalled(&mut pin!(child.join())), Poll::Ready(()));
1750 }
1751
1752 #[test]
1753 fn task_in_parent_scope_can_join_child() {
1754 let mut executor = TestExecutor::new();
1755 let scope = executor.global_scope().new_child();
1756 let child = scope.new_child();
1757 let remote = RemoteControlFuture::new();
1758 child.spawn(remote.as_future());
1759 scope.spawn(async move { child.join().await });
1760 let mut join = pin!(scope.join());
1761 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1762 remote.resolve();
1763 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1764 }
1765
1766 #[test]
1767 fn join_completes_while_completed_task_handle_is_held() {
1768 let mut executor = TestExecutor::new();
1769 let scope = executor.global_scope().new_child();
1770 let mut task = scope.compute(async { 1 });
1771 scope.spawn(async {});
1772 let mut join = pin!(scope.join());
1773 assert_eq!(executor.run_until_stalled(&mut join), Poll::Ready(()));
1774 assert_eq!(executor.run_until_stalled(&mut task), Poll::Ready(1));
1775 }
1776
1777 #[test]
1778 fn cancel_completes_while_task_holds_handle() {
1779 let mut executor = TestExecutor::new();
1780 let scope = executor.global_scope().new_child();
1781 let handle = scope.to_handle();
1782 let mut task = scope.compute(async move {
1783 loop {
1784 pending::<()>().await; handle.spawn(async {});
1786 }
1787 });
1788
1789 let mut join = pin!(scope.join());
1791 assert_eq!(executor.run_until_stalled(&mut join), Poll::Pending);
1792
1793 let mut cancel = pin!(join.cancel());
1794 assert_eq!(executor.run_until_stalled(&mut cancel), Poll::Ready(()));
1795 assert_eq!(executor.run_until_stalled(&mut task), Poll::Pending);
1796 }
1797
1798 #[test]
1799 fn cancel_from_handle_inside_task() {
1800 let mut executor = TestExecutor::new();
1801 let scope = executor.global_scope().new_child();
1802 {
1803 scope.spawn(pending::<()>());
1805
1806 let mut no_tasks = pin!(scope.on_no_tasks());
1807 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Pending);
1808
1809 let handle = scope.to_handle();
1810 scope.spawn(async move {
1811 handle.cancel().await;
1812 panic!("cancel() should never complete");
1813 });
1814
1815 assert_eq!(executor.run_until_stalled(&mut no_tasks), Poll::Ready(()));
1816 }
1817 assert_eq!(scope.join().now_or_never(), Some(()));
1818 }
1819
1820 #[test]
1821 fn can_spawn_from_non_executor_thread() {
1822 let mut executor = TestExecutor::new();
1823 let scope = executor.global_scope().clone();
1824 let done = Arc::new(AtomicBool::new(false));
1825 let done_clone = done.clone();
1826 let _ = std::thread::spawn(move || {
1827 scope.spawn(async move {
1828 done_clone.store(true, Ordering::Relaxed);
1829 })
1830 })
1831 .join();
1832 let _ = executor.run_until_stalled(&mut pending::<()>());
1833 assert!(done.load(Ordering::Relaxed));
1834 }
1835
1836 #[test]
1837 fn scope_tree() {
1838 let mut executor = TestExecutor::new();
1844 let a = executor.global_scope().new_child();
1845 let b = a.new_child();
1846 let c = b.new_child();
1847 let d = b.new_child();
1848 let a_remote = RemoteControlFuture::new();
1849 let c_remote = RemoteControlFuture::new();
1850 let d_remote = RemoteControlFuture::new();
1851 a.spawn(a_remote.as_future());
1852 c.spawn(c_remote.as_future());
1853 d.spawn(d_remote.as_future());
1854 let mut a_join = pin!(a.join());
1855 let mut b_join = pin!(b.join());
1856 let mut d_join = pin!(d.join());
1857 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1858 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
1859 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Pending);
1860 d_remote.resolve();
1861 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1862 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Pending);
1863 assert_eq!(executor.run_until_stalled(&mut d_join), Poll::Ready(()));
1864 c_remote.resolve();
1865 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Pending);
1866 assert_eq!(executor.run_until_stalled(&mut b_join), Poll::Ready(()));
1867 a_remote.resolve();
1868 assert_eq!(executor.run_until_stalled(&mut a_join), Poll::Ready(()));
1869 let mut c_join = pin!(c.join());
1870 assert_eq!(executor.run_until_stalled(&mut c_join), Poll::Ready(()));
1871 }
1872
1873 #[test]
1874 fn on_no_tasks() {
1875 let mut executor = TestExecutor::new();
1876 let scope = executor.global_scope().new_child();
1877 let _task1 = scope.spawn(std::future::ready(()));
1878 let task2 = scope.spawn(pending::<()>());
1879
1880 let mut on_no_tasks = pin!(scope.on_no_tasks());
1881
1882 assert!(executor.run_until_stalled(&mut on_no_tasks).is_pending());
1883
1884 let _ = task2.cancel();
1885
1886 let on_no_tasks2 = pin!(scope.on_no_tasks());
1887 let on_no_tasks3 = pin!(scope.on_no_tasks());
1888
1889 assert_matches!(
1890 executor.run_until_stalled(&mut join_all([on_no_tasks, on_no_tasks2, on_no_tasks3])),
1891 Poll::Ready(_)
1892 );
1893 }
1894
1895 #[test]
1896 fn wake_all() {
1897 let mut executor = TestExecutor::new();
1898 let scope = executor.global_scope().new_child();
1899
1900 let poll_count = Arc::new(AtomicU64::new(0));
1901
1902 struct PollCounter(Arc<AtomicU64>);
1903
1904 impl Future for PollCounter {
1905 type Output = ();
1906 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
1907 self.0.fetch_add(1, Ordering::Relaxed);
1908 Poll::Pending
1909 }
1910 }
1911
1912 scope.spawn(PollCounter(poll_count.clone()));
1913 scope.spawn(PollCounter(poll_count.clone()));
1914
1915 let _ = executor.run_until_stalled(&mut pending::<()>());
1916
1917 let mut start_count = poll_count.load(Ordering::Relaxed);
1918
1919 for _ in 0..2 {
1920 scope.wake_all();
1921 let _ = executor.run_until_stalled(&mut pending::<()>());
1922 assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
1923 start_count += 2;
1924 }
1925 }
1926
1927 #[test]
1928 fn on_no_tasks_race() {
1929 fn sleep_random() {
1930 use rand::Rng;
1931 std::thread::sleep(std::time::Duration::from_micros(
1932 rand::thread_rng().gen_range(0..10),
1933 ));
1934 }
1935 for _ in 0..2000 {
1936 let mut executor = SendExecutor::new(2);
1937 let scope = executor.root_scope().new_child();
1938 scope.spawn(async {
1939 sleep_random();
1940 });
1941 executor.run(async move {
1942 sleep_random();
1943 scope.on_no_tasks().await;
1944 });
1945 }
1946 }
1947
1948 async fn yield_to_executor() {
1949 let mut done = false;
1950 poll_fn(|cx| {
1951 if done {
1952 Poll::Ready(())
1953 } else {
1954 done = true;
1955 cx.waker().wake_by_ref();
1956 Poll::Pending
1957 }
1958 })
1959 .await;
1960 }
1961
1962 #[test]
1963 fn test_detach() {
1964 let mut e = LocalExecutor::new();
1965 e.run_singlethreaded(async {
1966 let counter = Arc::new(AtomicU32::new(0));
1967
1968 {
1969 let counter = counter.clone();
1970 Task::spawn(async move {
1971 for _ in 0..5 {
1972 yield_to_executor().await;
1973 counter.fetch_add(1, Ordering::Relaxed);
1974 }
1975 })
1976 .detach();
1977 }
1978
1979 while counter.load(Ordering::Relaxed) != 5 {
1980 yield_to_executor().await;
1981 }
1982 });
1983
1984 assert!(e.ehandle.root_scope.lock().results.is_empty());
1985 }
1986
1987 #[test]
1988 fn test_cancel() {
1989 let mut e = LocalExecutor::new();
1990 e.run_singlethreaded(async {
1991 let ref_count = Arc::new(());
1992 {
1994 let ref_count = ref_count.clone();
1995 let _ = Task::spawn(async move {
1996 let _ref_count = ref_count;
1997 let _: () = std::future::pending().await;
1998 });
1999 }
2000
2001 while Arc::strong_count(&ref_count) != 1 {
2002 yield_to_executor().await;
2003 }
2004
2005 let task = {
2007 let ref_count = ref_count.clone();
2008 Task::spawn(async move {
2009 let _ref_count = ref_count;
2010 let _: () = std::future::pending().await;
2011 })
2012 };
2013
2014 assert_eq!(task.cancel().await, None);
2015 while Arc::strong_count(&ref_count) != 1 {
2016 yield_to_executor().await;
2017 }
2018
2019 let task = {
2021 let ref_count = ref_count.clone();
2022 Task::spawn(async move {
2023 let _ref_count = ref_count;
2024 })
2025 };
2026
2027 while Arc::strong_count(&ref_count) != 1 {
2029 yield_to_executor().await;
2030 }
2031
2032 assert_eq!(task.cancel().await, Some(()));
2033 });
2034
2035 assert!(e.ehandle.root_scope.lock().results.is_empty());
2036 }
2037
2038 #[test]
2039 fn test_cancel_waits() {
2040 let mut executor = SendExecutor::new(2);
2041 let running = Arc::new((Mutex::new(false), Condvar::new()));
2042 let task = {
2043 let running = running.clone();
2044 executor.root_scope().compute(async move {
2045 *running.0.lock() = true;
2046 running.1.notify_all();
2047 std::thread::sleep(std::time::Duration::from_millis(10));
2048 *running.0.lock() = false;
2049 "foo"
2050 })
2051 };
2052 executor.run(async move {
2053 {
2054 let mut guard = running.0.lock();
2055 while !*guard {
2056 running.1.wait(&mut guard);
2057 }
2058 }
2059 assert_eq!(task.cancel().await, Some("foo"));
2060 assert!(!*running.0.lock());
2061 });
2062 }
2063
2064 fn test_clean_up(callback: impl FnOnce(Task<()>) + Send + 'static) {
2065 let mut executor = SendExecutor::new(2);
2066 let running = Arc::new((Mutex::new(false), Condvar::new()));
2067 let can_quit = Arc::new((Mutex::new(false), Condvar::new()));
2068 let task = {
2069 let running = running.clone();
2070 let can_quit = can_quit.clone();
2071 executor.root_scope().compute(async move {
2072 *running.0.lock() = true;
2073 running.1.notify_all();
2074 {
2075 let mut guard = can_quit.0.lock();
2076 while !*guard {
2077 can_quit.1.wait(&mut guard);
2078 }
2079 }
2080 *running.0.lock() = false;
2081 })
2082 };
2083 executor.run(async move {
2084 {
2085 let mut guard = running.0.lock();
2086 while !*guard {
2087 running.1.wait(&mut guard);
2088 }
2089 }
2090
2091 callback(task);
2092
2093 *can_quit.0.lock() = true;
2094 can_quit.1.notify_all();
2095
2096 let ehandle = EHandle::local();
2097 let scope = ehandle.global_scope();
2098
2099 while scope.lock().all_tasks().len() > 1 || !scope.lock().results.is_empty() {
2101 Timer::new(std::time::Duration::from_millis(1)).await;
2102 }
2103
2104 assert!(!*running.0.lock());
2105 });
2106 }
2107
2108 #[test]
2109 fn test_dropped_cancel_cleans_up() {
2110 test_clean_up(|task| {
2111 let cancel_fut = std::pin::pin!(task.cancel());
2112 let waker = futures::task::noop_waker();
2113 assert!(cancel_fut.poll(&mut Context::from_waker(&waker)).is_pending());
2114 });
2115 }
2116
2117 #[test]
2118 fn test_dropped_task_cleans_up() {
2119 test_clean_up(|task| {
2120 std::mem::drop(task);
2121 });
2122 }
2123
2124 #[test]
2125 fn test_detach_cleans_up() {
2126 test_clean_up(|task| {
2127 task.detach();
2128 });
2129 }
2130
2131 #[test]
2132 fn test_scope_stream() {
2133 let mut executor = SendExecutor::new(2);
2134 executor.run(async move {
2135 let (stream, handle) = ScopeStream::new();
2136 handle.push(async { 1 });
2137 handle.push(async { 2 });
2138 stream.close();
2139 let results: HashSet<_> = stream.collect().await;
2140 assert_eq!(results, HashSet::from_iter([1, 2]));
2141 });
2142 }
2143
2144 #[test]
2145 fn test_scope_stream_wakes_properly() {
2146 let mut executor = SendExecutor::new(2);
2147 executor.run(async move {
2148 let (stream, handle) = ScopeStream::new();
2149 handle.push(async {
2150 Timer::new(Duration::from_millis(10)).await;
2151 1
2152 });
2153 handle.push(async {
2154 Timer::new(Duration::from_millis(10)).await;
2155 2
2156 });
2157 stream.close();
2158 let results: HashSet<_> = stream.collect().await;
2159 assert_eq!(results, HashSet::from_iter([1, 2]));
2160 });
2161 }
2162
2163 #[test]
2164 fn test_scope_stream_drops_spawned_tasks() {
2165 let mut executor = SendExecutor::new(2);
2166 executor.run(async move {
2167 let (stream, handle) = ScopeStream::new();
2168 handle.push(async { 1 });
2169 let _task = stream.compute(async { "foo" });
2170 stream.close();
2171 let results: HashSet<_> = stream.collect().await;
2172 assert_eq!(results, HashSet::from_iter([1]));
2173 });
2174 }
2175
2176 #[test]
2177 fn test_nested_scope_stream() {
2178 let mut executor = SendExecutor::new(2);
2179 executor.run(async move {
2180 let (mut stream, handle) = ScopeStream::new();
2181 handle.clone().push(async move {
2182 handle.clone().push(async move {
2183 handle.clone().push(async move { 3 });
2184 2
2185 });
2186 1
2187 });
2188 let mut results = HashSet::default();
2189 while let Some(item) = stream.next().await {
2190 results.insert(item);
2191 if results.len() == 3 {
2192 stream.close();
2193 }
2194 }
2195 assert_eq!(results, HashSet::from_iter([1, 2, 3]));
2196 });
2197 }
2198
2199 #[test]
2200 fn test_dropping_scope_stream_cancels_all_tasks() {
2201 let mut executor = SendExecutor::new(2);
2202 executor.run(async move {
2203 let (stream, handle) = ScopeStream::new();
2204 let (tx1, mut rx) = mpsc::unbounded::<()>();
2205 let tx2 = tx1.clone();
2206 handle.push(async move {
2207 let _tx1 = tx1;
2208 let () = pending().await;
2209 });
2210 handle.push(async move {
2211 let _tx2 = tx2;
2212 let () = pending().await;
2213 });
2214 drop(stream);
2215
2216 assert_eq!(rx.next().await, None);
2218 });
2219 }
2220
2221 #[test]
2222 fn test_scope_stream_collect() {
2223 let mut executor = SendExecutor::new(2);
2224 executor.run(async move {
2225 let stream: ScopeStream<_> = (0..10).into_iter().map(|i| async move { i }).collect();
2226 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2227
2228 let stream: ScopeStream<_> =
2229 (0..10).into_iter().map(|i| SpawnableFuture::new(async move { i })).collect();
2230 assert_eq!(stream.collect::<HashSet<u32>>().await, HashSet::from_iter(0..10));
2231 });
2232 }
2233}