cm_util/task_group.rs
1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async as fasync;
6use futures::Future;
7use std::fmt;
8use std::sync::{self, Arc, Weak};
9
10/// A simple wrapper for `TaskGroup` that stores the `TaskGroup` in an `Arc` so it can be passed
11/// between threads.
12#[derive(Clone)]
13pub struct TaskGroup {
14 task_group: Arc<sync::Mutex<Option<fasync::TaskGroup>>>,
15}
16
17impl fmt::Debug for TaskGroup {
18 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19 f.debug_struct("TaskGroup").finish()
20 }
21}
22
23impl TaskGroup {
24 pub fn new() -> Self {
25 Self { task_group: Arc::new(sync::Mutex::new(Some(fasync::TaskGroup::new()))) }
26 }
27
28 /// Creates a new WeakTaskGroup from this group.
29 pub fn as_weak(&self) -> WeakTaskGroup {
30 WeakTaskGroup { task_group: Arc::downgrade(&self.task_group) }
31 }
32
33 /// Spawns a new task in this TaskGroup.
34 ///
35 /// If `join` has been called on a clone of this TaskGroup, `spawn` will drop the task instead.
36 ///
37 /// # Panics
38 ///
39 /// `spawn` may panic if not called in the context of an executor (e.g.
40 /// within a call to `run` or `run_singlethreaded`).
41 pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
42 let mut task_group = self.task_group.lock().unwrap();
43 if let Some(task_group) = task_group.as_mut() {
44 task_group.spawn(future);
45 }
46 }
47
48 /// Waits for all Tasks in this TaskGroup to finish. Prevents future tasks from being spawned
49 /// if there's another task that holds a clone of this TaskGroup.
50 pub async fn join(self) {
51 let task_group = {
52 let mut task_group_lock = self.task_group.lock().unwrap();
53 task_group_lock.take()
54 };
55 if let Some(task_group) = task_group {
56 task_group.join().await;
57 }
58 }
59}
60
61/// Holds a weak reference to the internal `TaskGroup`, and can spawn futures on it as long as the
62/// reference is still valid. If a task group is to hold a future that wants to spawn other tasks
63/// on the same group, this future should hold a WeakTaskGroup so that there is no reference cycle
64/// between the task group and tasks on the task group.
65#[derive(Debug, Clone)]
66pub struct WeakTaskGroup {
67 task_group: Weak<sync::Mutex<Option<fasync::TaskGroup>>>,
68}
69
70impl WeakTaskGroup {
71 /// Adds a task to the group this WeakTaskGroup was created from. The task is dropped if there
72 /// are no more strong references to the original task group.
73 pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
74 if let Some(task_group) = self.task_group.upgrade() {
75 let temp_task_group = TaskGroup { task_group };
76 temp_task_group.spawn(future);
77 }
78 }
79}