fuchsia_async/runtime/
scope.rs1pub use super::implementation::scope::{Scope, ScopeActiveGuard, ScopeHandle};
6
7#[cfg(target_os = "fuchsia")]
8pub use super::implementation::scope::{Join, ScopeStream, Spawnable};
9
10#[cfg(test)]
11mod tests {
12 use crate::{yield_now, JoinHandle, Scope, TestExecutor};
13 use assert_matches::assert_matches;
14 use futures::future::join_all;
15 use futures::stream::FuturesUnordered;
16 use futures::{join, FutureExt, StreamExt};
17 use std::future::{pending, poll_fn, Future};
18 use std::pin::{pin, Pin};
19 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20 use std::sync::Arc;
21 use std::task::{Context, Poll};
22
23 async fn poll_until_stalled<T>(future: impl Future<Output = T> + Unpin) -> Poll<T> {
25 #[cfg(target_os = "fuchsia")]
26 {
27 TestExecutor::poll_until_stalled(future).await
28 }
29
30 #[cfg(not(target_os = "fuchsia"))]
31 {
32 let mut future = future;
33 for _ in 0..10 {
34 if let Poll::Ready(result) = futures::poll!(&mut future) {
35 return Poll::Ready(result);
36 }
37 crate::yield_now().await;
38 }
39 Poll::Pending
40 }
41 }
42
43 fn new_scope() -> Scope {
44 #[cfg(target_os = "fuchsia")]
45 {
46 crate::EHandle::local().global_scope().new_child()
47 }
48
49 #[cfg(not(target_os = "fuchsia"))]
50 {
51 Scope::new_with_name("test")
52 }
53 }
54
55 fn abort_task(task: JoinHandle<()>) {
56 #[cfg(target_os = "fuchsia")]
57 {
58 drop(task.abort());
59 }
60
61 #[cfg(not(target_os = "fuchsia"))]
62 {
63 task.abort();
64 }
65 }
66
67 fn run_with_test_executor(fut: impl Future) {
68 #[cfg(target_os = "fuchsia")]
69 assert!(TestExecutor::new().run_until_stalled(&mut pin!(fut)).is_ready());
70
71 #[cfg(not(target_os = "fuchsia"))]
72 TestExecutor::new().run_singlethreaded(fut);
73 }
74
75 #[test]
76 fn on_no_tasks() {
77 run_with_test_executor(async {
78 let scope = new_scope();
79 let _task1 = scope.spawn(std::future::ready(()));
80 let task2 = scope.spawn(pending::<()>());
81
82 let _guard = scope.active_guard().unwrap();
84
85 let mut on_no_tasks = pin!(scope.on_no_tasks());
86
87 assert!(poll_until_stalled(&mut on_no_tasks).await.is_pending());
88
89 abort_task(task2);
90
91 let on_no_tasks2 = pin!(scope.on_no_tasks());
92 let on_no_tasks3 = pin!(scope.on_no_tasks());
93
94 assert_matches!(
95 poll_until_stalled(&mut join_all([on_no_tasks, on_no_tasks2, on_no_tasks3])).await,
96 Poll::Ready(_)
97 );
98 });
99 }
100
101 #[test]
102 fn on_no_tasks_and_guards() {
103 #[derive(Debug)]
104 enum Pass {
105 DropTaskFirst,
106 DropGuardFirst,
107 WakeTaskWithGuardAndThenDrop,
108 WakeTaskWithGuardAndThenQuit,
109 }
110
111 for pass in [
112 Pass::DropTaskFirst,
113 Pass::DropGuardFirst,
114 Pass::WakeTaskWithGuardAndThenDrop,
115 Pass::WakeTaskWithGuardAndThenQuit,
116 ] {
117 run_with_test_executor(async {
118 let scope = new_scope();
119 let _task1 = scope.spawn(std::future::ready(()));
120 let should_quit = Arc::new(AtomicBool::new(false));
121 let should_quit_clone = should_quit.clone();
122 let task2 = scope.spawn(poll_fn(move |_| {
123 if should_quit_clone.load(Ordering::Relaxed) {
124 Poll::Ready(())
125 } else {
126 Poll::Pending
127 }
128 }));
129 let guard = scope.active_guard().unwrap();
130
131 let on_no_tasks = pin!(scope.on_no_tasks_and_guards());
132
133 let mut futures_unordered = FuturesUnordered::new();
135 futures_unordered.push(on_no_tasks);
136
137 assert!(poll_until_stalled(&mut futures_unordered.next()).await.is_pending());
139
140 match pass {
141 Pass::DropTaskFirst => {
142 abort_task(task2);
144
145 assert!(poll_until_stalled(&mut futures_unordered.next())
147 .await
148 .is_pending());
149
150 drop(guard);
151 }
152 Pass::DropGuardFirst => {
153 drop(guard);
155
156 assert!(poll_until_stalled(&mut futures_unordered.next())
158 .await
159 .is_pending());
160
161 abort_task(task2);
162 }
163 Pass::WakeTaskWithGuardAndThenDrop => {
164 drop(guard);
166
167 scope.wake_all_with_active_guard();
170 abort_task(task2);
171 }
172 Pass::WakeTaskWithGuardAndThenQuit => {
173 drop(guard);
175
176 should_quit.store(true, Ordering::Relaxed);
178 scope.wake_all_with_active_guard();
179 }
180 }
181
182 let on_no_tasks2 = pin!(scope.on_no_tasks_and_guards());
183 let on_no_tasks3 = pin!(scope.on_no_tasks_and_guards());
184
185 assert_matches!(
186 poll_until_stalled(&mut pin!(async {
187 join!(futures_unordered.next(), on_no_tasks2, on_no_tasks3);
188 }))
189 .await,
190 Poll::Ready(_),
191 "pass={pass:?}",
192 );
193 });
194 }
195 }
196
197 #[test]
198 fn wake_all_with_active_guard() {
199 run_with_test_executor(async {
200 let scope = new_scope();
201
202 let poll_count = Arc::new(AtomicU64::new(0));
203
204 struct PollCounter(Arc<AtomicU64>);
205
206 impl Future for PollCounter {
207 type Output = ();
208 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
209 self.0.fetch_add(1, Ordering::Relaxed);
210 Poll::Pending
211 }
212 }
213
214 scope.spawn(PollCounter(poll_count.clone()));
215 scope.spawn(PollCounter(poll_count.clone()));
216
217 let _ = poll_until_stalled(&mut pending::<()>()).await;
218
219 let mut start_count = poll_count.load(Ordering::Relaxed);
220
221 for _ in 0..2 {
222 scope.wake_all_with_active_guard();
223 let _ = poll_until_stalled(&mut pending::<()>()).await;
224 assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
225 start_count += 2;
226 }
227
228 scope.wake_all_with_active_guard();
230 let mut done = pin!(scope.cancel());
231 let _ = poll_until_stalled(&mut pending::<()>()).await;
232 assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
233 assert_eq!(poll_until_stalled(&mut done).await, Poll::Ready(()));
234 });
235 }
236
237 #[test]
238 fn active_guard_holds_cancellation() {
239 run_with_test_executor(async {
240 let scope = new_scope();
241 let guard = scope.active_guard().expect("acquire guard");
242 scope.spawn(futures::future::pending());
243 let mut join = pin!(scope.cancel());
244 yield_now().await;
245 assert!(join.as_mut().now_or_never().is_none());
246 drop(guard);
247 join.await;
248 });
249 }
250
251 #[test]
252 fn detach() {
253 run_with_test_executor(async {
254 let scope = Scope::new_with_name("detach");
255 let guard = scope.active_guard().expect("acquire guard");
256 let shared = Arc::new(());
257 let shared_copy = shared.clone();
258 scope.spawn(async move {
259 let _shared = shared_copy;
260 let _guard = guard;
261 let () = futures::future::pending().await;
262 });
263 scope.detach();
264 yield_now().await;
265 assert_eq!(Arc::strong_count(&shared), 2);
266 });
267 }
268
269 #[test]
270 fn abort() {
271 run_with_test_executor(async {
272 let scope = Scope::new_with_name("abort");
273 let guard = scope.active_guard().expect("acquire guard");
274 let shared = Arc::new(());
275 let shared_copy = shared.clone();
276 scope.spawn(async move {
277 let _shared = shared_copy;
278 let _guard = guard;
279 let () = futures::future::pending().await;
280 });
281 scope.clone().abort().await;
282 assert_eq!(Arc::strong_count(&shared), 1);
283 });
284 }
285}