fuchsia_async/runtime/
scope.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub use super::implementation::scope::{Scope, ScopeActiveGuard, ScopeHandle};
6
7#[cfg(target_os = "fuchsia")]
8pub use super::implementation::scope::{Join, ScopeStream, Spawnable};
9
10#[cfg(test)]
11mod tests {
12    use crate::{yield_now, JoinHandle, Scope, TestExecutor};
13    use assert_matches::assert_matches;
14    use futures::future::join_all;
15    use futures::stream::FuturesUnordered;
16    use futures::{join, FutureExt, StreamExt};
17    use std::future::{pending, poll_fn, Future};
18    use std::pin::{pin, Pin};
19    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
20    use std::sync::Arc;
21    use std::task::{Context, Poll};
22
23    // Tokio doesn't have the equivalent of poll_until_stalled. This has a crude workaround.
24    async fn poll_until_stalled<T>(future: impl Future<Output = T> + Unpin) -> Poll<T> {
25        #[cfg(target_os = "fuchsia")]
26        {
27            TestExecutor::poll_until_stalled(future).await
28        }
29
30        #[cfg(not(target_os = "fuchsia"))]
31        {
32            let mut future = future;
33            for _ in 0..10 {
34                if let Poll::Ready(result) = futures::poll!(&mut future) {
35                    return Poll::Ready(result);
36                }
37                crate::yield_now().await;
38            }
39            Poll::Pending
40        }
41    }
42
43    fn new_scope() -> Scope {
44        #[cfg(target_os = "fuchsia")]
45        {
46            crate::EHandle::local().global_scope().new_child()
47        }
48
49        #[cfg(not(target_os = "fuchsia"))]
50        {
51            Scope::new_with_name("test")
52        }
53    }
54
55    fn abort_task(task: JoinHandle<()>) {
56        #[cfg(target_os = "fuchsia")]
57        {
58            drop(task.abort());
59        }
60
61        #[cfg(not(target_os = "fuchsia"))]
62        {
63            task.abort();
64        }
65    }
66
67    fn run_with_test_executor(fut: impl Future) {
68        #[cfg(target_os = "fuchsia")]
69        assert!(TestExecutor::new().run_until_stalled(&mut pin!(fut)).is_ready());
70
71        #[cfg(not(target_os = "fuchsia"))]
72        TestExecutor::new().run_singlethreaded(fut);
73    }
74
75    #[test]
76    fn on_no_tasks() {
77        run_with_test_executor(async {
78            let scope = new_scope();
79            let _task1 = scope.spawn(std::future::ready(()));
80            let task2 = scope.spawn(pending::<()>());
81
82            // A guard shouldn't stop on_no_tasks from working.
83            let _guard = scope.active_guard().unwrap();
84
85            let mut on_no_tasks = pin!(scope.on_no_tasks());
86
87            assert!(poll_until_stalled(&mut on_no_tasks).await.is_pending());
88
89            abort_task(task2);
90
91            let on_no_tasks2 = pin!(scope.on_no_tasks());
92            let on_no_tasks3 = pin!(scope.on_no_tasks());
93
94            assert_matches!(
95                poll_until_stalled(&mut join_all([on_no_tasks, on_no_tasks2, on_no_tasks3])).await,
96                Poll::Ready(_)
97            );
98        });
99    }
100
101    #[test]
102    fn on_no_tasks_and_guards() {
103        #[derive(Debug)]
104        enum Pass {
105            DropTaskFirst,
106            DropGuardFirst,
107            WakeTaskWithGuardAndThenDrop,
108            WakeTaskWithGuardAndThenQuit,
109        }
110
111        for pass in [
112            Pass::DropTaskFirst,
113            Pass::DropGuardFirst,
114            Pass::WakeTaskWithGuardAndThenDrop,
115            Pass::WakeTaskWithGuardAndThenQuit,
116        ] {
117            run_with_test_executor(async {
118                let scope = new_scope();
119                let _task1 = scope.spawn(std::future::ready(()));
120                let should_quit = Arc::new(AtomicBool::new(false));
121                let should_quit_clone = should_quit.clone();
122                let task2 = scope.spawn(poll_fn(move |_| {
123                    if should_quit_clone.load(Ordering::Relaxed) {
124                        Poll::Ready(())
125                    } else {
126                        Poll::Pending
127                    }
128                }));
129                let guard = scope.active_guard().unwrap();
130
131                let on_no_tasks = pin!(scope.on_no_tasks_and_guards());
132
133                // Use FuturesUnordered so that it uses its own waker.
134                let mut futures_unordered = FuturesUnordered::new();
135                futures_unordered.push(on_no_tasks);
136
137                // Pending because there's task2 and a guard.
138                assert!(poll_until_stalled(&mut futures_unordered.next()).await.is_pending());
139
140                match pass {
141                    Pass::DropTaskFirst => {
142                        // Drop the task first.
143                        abort_task(task2);
144
145                        // Still pending because there's a guard.
146                        assert!(poll_until_stalled(&mut futures_unordered.next())
147                            .await
148                            .is_pending());
149
150                        drop(guard);
151                    }
152                    Pass::DropGuardFirst => {
153                        // Drop the guard first.
154                        drop(guard);
155
156                        // Still pending because there's a guard.
157                        assert!(poll_until_stalled(&mut futures_unordered.next())
158                            .await
159                            .is_pending());
160
161                        abort_task(task2);
162                    }
163                    Pass::WakeTaskWithGuardAndThenDrop => {
164                        // Drop the guard first.
165                        drop(guard);
166
167                        // This time wake the second task with an active guard, but then immediately
168                        // drop the task.
169                        scope.wake_all_with_active_guard();
170                        abort_task(task2);
171                    }
172                    Pass::WakeTaskWithGuardAndThenQuit => {
173                        // Drop the guard first.
174                        drop(guard);
175
176                        // Wake the task with an active guard, and make it quit.
177                        should_quit.store(true, Ordering::Relaxed);
178                        scope.wake_all_with_active_guard();
179                    }
180                }
181
182                let on_no_tasks2 = pin!(scope.on_no_tasks_and_guards());
183                let on_no_tasks3 = pin!(scope.on_no_tasks_and_guards());
184
185                assert_matches!(
186                    poll_until_stalled(&mut pin!(async {
187                        join!(futures_unordered.next(), on_no_tasks2, on_no_tasks3);
188                    }))
189                    .await,
190                    Poll::Ready(_),
191                    "pass={pass:?}",
192                );
193            });
194        }
195    }
196
197    #[test]
198    fn wake_all_with_active_guard() {
199        run_with_test_executor(async {
200            let scope = new_scope();
201
202            let poll_count = Arc::new(AtomicU64::new(0));
203
204            struct PollCounter(Arc<AtomicU64>);
205
206            impl Future for PollCounter {
207                type Output = ();
208                fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
209                    self.0.fetch_add(1, Ordering::Relaxed);
210                    Poll::Pending
211                }
212            }
213
214            scope.spawn(PollCounter(poll_count.clone()));
215            scope.spawn(PollCounter(poll_count.clone()));
216
217            let _ = poll_until_stalled(&mut pending::<()>()).await;
218
219            let mut start_count = poll_count.load(Ordering::Relaxed);
220
221            for _ in 0..2 {
222                scope.wake_all_with_active_guard();
223                let _ = poll_until_stalled(&mut pending::<()>()).await;
224                assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
225                start_count += 2;
226            }
227
228            // Wake, then cancel the scope and verify the tasks still get polled.
229            scope.wake_all_with_active_guard();
230            let mut done = pin!(scope.cancel());
231            let _ = poll_until_stalled(&mut pending::<()>()).await;
232            assert_eq!(poll_count.load(Ordering::Relaxed), start_count + 2);
233            assert_eq!(poll_until_stalled(&mut done).await, Poll::Ready(()));
234        });
235    }
236
237    #[test]
238    fn active_guard_holds_cancellation() {
239        run_with_test_executor(async {
240            let scope = new_scope();
241            let guard = scope.active_guard().expect("acquire guard");
242            scope.spawn(futures::future::pending());
243            let mut join = pin!(scope.cancel());
244            yield_now().await;
245            assert!(join.as_mut().now_or_never().is_none());
246            drop(guard);
247            join.await;
248        });
249    }
250
251    #[test]
252    fn detach() {
253        run_with_test_executor(async {
254            let scope = Scope::new_with_name("detach");
255            let guard = scope.active_guard().expect("acquire guard");
256            let shared = Arc::new(());
257            let shared_copy = shared.clone();
258            scope.spawn(async move {
259                let _shared = shared_copy;
260                let _guard = guard;
261                let () = futures::future::pending().await;
262            });
263            scope.detach();
264            yield_now().await;
265            assert_eq!(Arc::strong_count(&shared), 2);
266        });
267    }
268
269    #[test]
270    fn abort() {
271        run_with_test_executor(async {
272            let scope = Scope::new_with_name("abort");
273            let guard = scope.active_guard().expect("acquire guard");
274            let shared = Arc::new(());
275            let shared_copy = shared.clone();
276            scope.spawn(async move {
277                let _shared = shared_copy;
278                let _guard = guard;
279                let () = futures::future::pending().await;
280            });
281            scope.clone().abort().await;
282            assert_eq!(Arc::strong_count(&shared), 1);
283        });
284    }
285}