cm_util/
task_group.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async as fasync;
6use futures::Future;
7use std::fmt;
8use std::sync::{self, Arc, Weak};
9
10/// A simple wrapper for `TaskGroup` that stores the `TaskGroup` in an `Arc` so it can be passed
11/// between threads.
12#[derive(Clone)]
13pub struct TaskGroup {
14    task_group: Arc<sync::Mutex<Option<fasync::TaskGroup>>>,
15}
16
17impl fmt::Debug for TaskGroup {
18    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19        f.debug_struct("TaskGroup").finish()
20    }
21}
22
23impl TaskGroup {
24    pub fn new() -> Self {
25        Self { task_group: Arc::new(sync::Mutex::new(Some(fasync::TaskGroup::new()))) }
26    }
27
28    /// Creates a new WeakTaskGroup from this group.
29    pub fn as_weak(&self) -> WeakTaskGroup {
30        WeakTaskGroup { task_group: Arc::downgrade(&self.task_group) }
31    }
32
33    /// Spawns a new task in this TaskGroup.
34    ///
35    /// If `join` has been called on a clone of this TaskGroup, `spawn` will drop the task instead.
36    ///
37    /// # Panics
38    ///
39    /// `spawn` may panic if not called in the context of an executor (e.g.
40    /// within a call to `run` or `run_singlethreaded`).
41    pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
42        let mut task_group = self.task_group.lock().unwrap();
43        if let Some(task_group) = task_group.as_mut() {
44            task_group.spawn(future);
45        }
46    }
47
48    /// Waits for all Tasks in this TaskGroup to finish. Prevents future tasks from being spawned
49    /// if there's another task that holds a clone of this TaskGroup.
50    pub async fn join(self) {
51        let task_group = {
52            let mut task_group_lock = self.task_group.lock().unwrap();
53            task_group_lock.take()
54        };
55        if let Some(task_group) = task_group {
56            task_group.join().await;
57        }
58    }
59}
60
61/// Holds a weak reference to the internal `TaskGroup`, and can spawn futures on it as long as the
62/// reference is still valid. If a task group is to hold a future that wants to spawn other tasks
63/// on the same group, this future should hold a WeakTaskGroup so that there is no reference cycle
64/// between the task group and tasks on the task group.
65#[derive(Debug, Clone)]
66pub struct WeakTaskGroup {
67    task_group: Weak<sync::Mutex<Option<fasync::TaskGroup>>>,
68}
69
70impl WeakTaskGroup {
71    /// Adds a task to the group this WeakTaskGroup was created from. The task is dropped if there
72    /// are no more strong references to the original task group.
73    pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
74        if let Some(task_group) = self.task_group.upgrade() {
75            let temp_task_group = TaskGroup { task_group };
76            temp_task_group.spawn(future);
77        }
78    }
79}