fuchsia_async/runtime/
task_group.rs1use futures::Future;
6
7use super::Scope;
8
9pub struct TaskGroup {
16 scope: Scope,
17}
18
19impl Default for TaskGroup {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl TaskGroup {
26 pub fn new() -> Self {
31 #[cfg(target_os = "fuchsia")]
32 return Self { scope: Scope::global().new_child() };
33 #[cfg(not(target_os = "fuchsia"))]
34 return Self { scope: Scope::new() };
35 }
36
37 pub fn spawn(&mut self, future: impl Future<Output = ()> + Send + 'static) {
46 self.scope.spawn(future);
47 }
48
49 pub fn local(&mut self, future: impl Future<Output = ()> + 'static) {
56 self.scope.spawn_local(future);
57 }
58
59 pub async fn join(self) {
63 self.scope.on_no_tasks().await;
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use super::*;
70 use crate::{SendExecutor, Task};
71 use futures::channel::mpsc;
72 use futures::StreamExt;
73 use std::sync::atomic::{AtomicU64, Ordering};
74 use std::sync::Arc;
75
76 #[derive(Clone)]
78 struct DoneSignaler {
79 done: mpsc::UnboundedSender<()>,
80 }
81 impl Drop for DoneSignaler {
82 fn drop(&mut self) {
83 self.done.unbounded_send(()).unwrap();
84 self.done.disconnect();
85 }
86 }
87
88 struct WaitGroup {
92 tx: mpsc::UnboundedSender<()>,
93 rx: mpsc::UnboundedReceiver<()>,
94 }
95
96 impl WaitGroup {
97 fn new() -> Self {
98 let (tx, rx) = mpsc::unbounded();
99 Self { tx, rx }
100 }
101
102 fn add_one(&self) -> impl Drop {
103 DoneSignaler { done: self.tx.clone() }
104 }
105
106 async fn wait(self) {
107 drop(self.tx);
108 self.rx.collect::<()>().await;
109 }
110 }
111
112 #[test]
113 fn test_task_group_join_waits_for_tasks() {
114 let task_count = 20;
115
116 SendExecutor::new(task_count).run(async move {
117 let mut task_group = TaskGroup::new();
118 let value = Arc::new(AtomicU64::new(0));
119
120 for _ in 0..task_count {
121 let value = value.clone();
122 task_group.spawn(async move {
123 value.fetch_add(1, Ordering::Relaxed);
124 });
125 }
126
127 task_group.join().await;
128 assert_eq!(value.load(Ordering::Relaxed), task_count as u64);
129 });
130 }
131
132 #[test]
133 fn test_task_group_empty_join_completes() {
134 SendExecutor::new(1).run(async move {
135 TaskGroup::new().join().await;
136 });
137 }
138
139 #[test]
140 fn test_task_group_added_tasks_are_cancelled_on_drop() {
141 let wait_group = WaitGroup::new();
142 let task_count = 10;
143
144 SendExecutor::new(task_count).run(async move {
145 let mut task_group = TaskGroup::new();
146 for _ in 0..task_count {
147 let done_signaler = wait_group.add_one();
148
149 task_group.spawn(async move {
151 let _done_signaler = done_signaler;
153 std::future::pending::<()>().await;
154 });
155 }
156
157 drop(task_group);
158 wait_group.wait().await;
159 });
161 }
162
163 #[test]
164 fn test_task_group_spawn() {
165 let task_count = 3;
166 SendExecutor::new(task_count).run(async move {
167 let mut task_group = TaskGroup::new();
168
169 task_group.spawn(std::future::ready(()));
173
174 task_group.spawn(async move {
176 std::future::ready(()).await;
177 });
178
179 task_group.spawn(Task::spawn(std::future::ready(())));
181
182 task_group.join().await;
183 });
184 }
185}