fuchsia_async/runtime/
instrument.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Pluggable instrumentation for the async executor.
6
7use crate::ScopeHandle;
8pub use crate::runtime::fuchsia::executor::atomic_future::AtomicFutureHandle;
9use std::any::Any;
10
11/// A trait for instrumenting futures.
12///
13/// This trait provides a way to receive callbacks for various events that occur
14/// for a future, such as completion, and polling.
15pub trait Hooks {
16    /// Called when the task has completed.
17    fn task_completed(&mut self);
18
19    /// Called when the task is about to be polled.
20    fn task_poll_start(&mut self);
21
22    /// Called when the task has finished being polled.
23    fn task_poll_end(&mut self);
24}
25
26/// A trait for instrumenting the async executor.
27///
28/// This trait provides a way to receive callbacks for various events that occur
29/// within the executor, such as task creation, completion, and polling.
30pub trait TaskInstrument: Send + Sync + 'static {
31    /// Called when a new task is created.
32    /// Typically, implementers will want to call `task.add_hooks()` here
33    /// to add hooks to the task.
34    fn task_created<'a>(&self, parent_scope: &ScopeHandle, task: &mut AtomicFutureHandle<'a>);
35
36    /// Called when scope is created
37    ///
38    /// # Arguments
39    ///
40    /// * `scope_name`: An optional name for the scope.
41    /// * `parent_scope`: A reference to the parent scope, or None for the root.
42    ///
43    /// # Returns
44    ///
45    /// A boxed `Any` trait object representing the created scope
46    /// which contains data that can later be retrieved from the
47    /// scope using instrument_data() on the scope.
48    fn scope_created(
49        &self,
50        scope_name: &str,
51        parent_scope: Option<&ScopeHandle>,
52    ) -> Box<dyn Any + Send + Sync>;
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use crate::{Scope, ScopeHandle, SendExecutorBuilder, yield_now};
59    use std::any::Any;
60    use std::sync::atomic::{AtomicUsize, Ordering};
61    use std::sync::{Arc, Mutex};
62
63    // Instrumentation to track scope associations
64    struct TrackedTask {
65        poll_count: AtomicUsize,
66        poll_end_count: AtomicUsize,
67        completed: AtomicUsize,
68    }
69
70    struct TrackedScope {
71        name: String,
72        tasks: Mutex<Vec<Arc<TrackedTask>>>,
73        scopes: Mutex<Vec<Arc<TrackedScope>>>,
74    }
75
76    struct ScopeTrackingInstrument {
77        scopes: Mutex<Vec<Arc<TrackedScope>>>,
78    }
79
80    impl ScopeTrackingInstrument {
81        fn new() -> Self {
82            Self { scopes: Mutex::new(Vec::new()) }
83        }
84
85        fn get_scopes(&self) -> Vec<Arc<TrackedScope>> {
86            self.scopes.lock().unwrap().clone()
87        }
88    }
89
90    struct TrackedHooks {
91        task: Arc<TrackedTask>,
92    }
93
94    impl Hooks for TrackedHooks {
95        fn task_completed(&mut self) {
96            // Relaxed ordering is fine because we hold a mut reference,
97            // so nothing else can mutate this (though because we're
98            // technically sharing this state with the test main,
99            // the borrow checker can't reason about this,
100            // making either unsafe or atomics necessary.)
101            self.task.completed.fetch_add(1, Ordering::Relaxed);
102        }
103
104        fn task_poll_end(&mut self) {
105            assert_eq!(
106                self.task.poll_count.load(Ordering::Relaxed) - 1,
107                self.task.poll_end_count.load(Ordering::Relaxed)
108            );
109            self.task.poll_end_count.fetch_add(1, Ordering::Relaxed);
110        }
111
112        fn task_poll_start(&mut self) {
113            self.task.poll_count.fetch_add(1, Ordering::Relaxed);
114        }
115    }
116
117    impl TaskInstrument for ScopeTrackingInstrument {
118        fn task_created<'a>(
119            &self,
120            parent_scope: &ScopeHandle,
121            handle: &mut AtomicFutureHandle<'a>,
122        ) {
123            // Extract scope name from the parent scope
124            let parent = parent_scope
125                .instrument_data()
126                .unwrap()
127                .downcast_ref::<Arc<TrackedScope>>()
128                .unwrap()
129                .clone();
130            let task = Arc::new(TrackedTask {
131                poll_count: AtomicUsize::new(0),
132                completed: AtomicUsize::new(0),
133                poll_end_count: AtomicUsize::new(0),
134            });
135
136            // Add task to scope
137            let mut tasks = parent.tasks.lock().unwrap();
138            tasks.push(task.clone());
139
140            handle.add_hooks(TrackedHooks { task });
141        }
142
143        fn scope_created(
144            &self,
145            scope_name: &str,
146            parent_scope: Option<&ScopeHandle>,
147        ) -> Box<dyn Any + Send + Sync> {
148            let tracked_scope = Arc::new(TrackedScope {
149                name: scope_name.to_string(),
150                tasks: Default::default(),
151                scopes: Default::default(),
152            });
153            // Extract parent scope
154            if let Some(parent_handle) = parent_scope {
155                if let Some(parent_scope) = parent_handle
156                    .instrument_data()
157                    .and_then(|data| data.downcast_ref::<Arc<TrackedScope>>())
158                {
159                    parent_scope.scopes.lock().unwrap().push(tracked_scope.clone());
160                }
161            }
162
163            self.scopes.lock().unwrap().push(tracked_scope.clone());
164
165            Box::new(tracked_scope)
166        }
167    }
168
169    #[test]
170    fn test_global_spawn_with_scope() {
171        let instrumentation = Arc::new(ScopeTrackingInstrument::new());
172
173        let mut executor = SendExecutorBuilder::new()
174            .num_threads(4)
175            .instrument(Some(instrumentation.clone()))
176            .build();
177        executor.run(async move {
178            let root_scope = Scope::new_with_name("test_root");
179
180            // Create a hierarchy of scopes
181            let level2_scope = root_scope.new_child_with_name("level2");
182            let level3_scope = level2_scope.new_child_with_name("level3".to_string());
183
184            // Spawn tasks in different scopes
185            root_scope.spawn(async {});
186
187            level2_scope.spawn(async {
188                yield_now().await; // Multiple polls
189            });
190
191            level3_scope.spawn(async {});
192
193            level2_scope.spawn(async {});
194
195            level3_scope.spawn(async {});
196
197            // Wait for all tasks to complete
198            root_scope.await;
199        });
200
201        // Verify the hierarchy
202        let scopes = instrumentation.get_scopes();
203        assert_eq!(scopes.len(), 4);
204
205        // The Fuchsia executor creates its own scope called "root",
206        // which is the true root scope here. All other scopes are
207        // children under that one.
208        let root_scope = &scopes[0];
209        assert_eq!(root_scope.name, "root".to_string());
210        assert_eq!(root_scope.tasks.lock().unwrap().len(), 1);
211        assert_eq!(root_scope.scopes.lock().unwrap().len(), 1);
212
213        let test_root_scope = &root_scope.scopes.lock().unwrap()[0];
214        assert_eq!(test_root_scope.name, "test_root".to_string());
215        assert_eq!(test_root_scope.tasks.lock().unwrap().len(), 1);
216        assert_eq!(test_root_scope.scopes.lock().unwrap().len(), 1);
217
218        let level2_scope = &test_root_scope.scopes.lock().unwrap()[0];
219        assert_eq!(level2_scope.name, "level2".to_string());
220        assert_eq!(level2_scope.tasks.lock().unwrap().len(), 2);
221        assert_eq!(level2_scope.scopes.lock().unwrap().len(), 1);
222
223        let level3_scope = &level2_scope.scopes.lock().unwrap()[0];
224        assert_eq!(level3_scope.name, "level3".to_string());
225        assert_eq!(level3_scope.tasks.lock().unwrap().len(), 2);
226        assert_eq!(level3_scope.scopes.lock().unwrap().len(), 0);
227
228        // Assert poll counts
229        let root_tasks = root_scope.tasks.lock().unwrap();
230        // We can't assert the number of polls for the root task,
231        // as that is nondeterministic on a multithreaded executor.
232        assert_eq!(root_tasks[0].completed.load(Ordering::Relaxed), 1);
233
234        let test_root_tasks = test_root_scope.tasks.lock().unwrap();
235        assert_eq!(test_root_tasks[0].poll_count.load(Ordering::Relaxed), 1);
236        assert_eq!(test_root_tasks[0].completed.load(Ordering::Relaxed), 1);
237
238        let level2_tasks = level2_scope.tasks.lock().unwrap();
239        assert_eq!(level2_tasks[0].poll_count.load(Ordering::Relaxed), 2);
240        assert_eq!(level2_tasks[0].completed.load(Ordering::Relaxed), 1);
241        assert_eq!(level2_tasks[1].poll_count.load(Ordering::Relaxed), 1);
242        assert_eq!(level2_tasks[1].completed.load(Ordering::Relaxed), 1);
243
244        let level3_tasks = level3_scope.tasks.lock().unwrap();
245        assert_eq!(level3_tasks[0].poll_count.load(Ordering::Relaxed), 1);
246        assert_eq!(level3_tasks[0].completed.load(Ordering::Relaxed), 1);
247        assert_eq!(level3_tasks[1].poll_count.load(Ordering::Relaxed), 1);
248        assert_eq!(level3_tasks[1].completed.load(Ordering::Relaxed), 1);
249    }
250}