test_manager_lib/
scheduler.rs1use crate::debug_data_processor::DebugDataSender;
6use crate::self_diagnostics::DiagnosticNode;
7use crate::test_suite::{self, Suite};
8use async_trait::async_trait;
9use futures::channel::oneshot;
10use futures::stream::{self, StreamExt};
11use std::sync::atomic::{AtomicU16, Ordering};
12
13#[async_trait]
14pub(crate) trait Scheduler {
15 async fn execute(
27 &self,
28 suites: Vec<test_suite::Suite>,
29 diagnostics: DiagnosticNode,
30 stop_recv: &mut oneshot::Receiver<()>,
31 debug_data_sender: DebugDataSender,
32 );
33}
34
35#[async_trait]
36pub(crate) trait RunSuiteFn {
37 async fn run_suite(
42 &self,
43 suite: Suite,
44 debug_data_sender: DebugDataSender,
45 diagnostics: DiagnosticNode,
46 );
47}
48
49pub struct SerialScheduler {}
50
51#[async_trait]
52impl Scheduler for SerialScheduler {
53 async fn execute(
54 &self,
55 suites: Vec<test_suite::Suite>,
56 diagnostics: DiagnosticNode,
57 stop_recv: &mut oneshot::Receiver<()>,
58 debug_data_sender: DebugDataSender,
59 ) {
60 for (suite_idx, suite) in suites.into_iter().enumerate() {
62 if let Ok(Some(())) = stop_recv.try_recv() {
65 break;
66 }
67 let instance_name = format!("suite-{:?}", suite_idx);
68 let suite_node = diagnostics.child(instance_name);
69 suite_node.set_property("url", suite.test_url.clone());
70 test_suite::run_single_suite(suite, debug_data_sender.clone(), suite_node).await;
71 }
72 }
73}
74
75pub(crate) struct ParallelScheduler<T: RunSuiteFn> {
76 pub suite_runner: T,
77 pub max_parallel_suites: u16,
78}
79
80pub(crate) struct RunSuiteObj {}
81
82#[async_trait]
83impl RunSuiteFn for RunSuiteObj {
84 async fn run_suite(
85 &self,
86 suite: Suite,
87 debug_data_sender: DebugDataSender,
88 diagnostics: DiagnosticNode,
89 ) {
90 test_suite::run_single_suite(suite, debug_data_sender, diagnostics).await;
91 }
92}
93
94#[async_trait]
95impl<T: RunSuiteFn + std::marker::Sync + std::marker::Send> Scheduler for ParallelScheduler<T> {
96 async fn execute(
97 &self,
98 suites: Vec<test_suite::Suite>,
99 diagnostics: DiagnosticNode,
100 _stop_recv: &mut oneshot::Receiver<()>,
101 debug_data_sender: DebugDataSender,
102 ) {
103 const MAX_PARALLEL_SUITES_DEFAULT: usize = 8;
104 let mut max_parallel_suites = self.max_parallel_suites as usize;
105
106 max_parallel_suites =
112 if max_parallel_suites > 0 { max_parallel_suites } else { MAX_PARALLEL_SUITES_DEFAULT };
113 let suite_idx = AtomicU16::new(0);
114 let suite_idx_ref = &suite_idx;
115 let debug_data_sender_ref = &debug_data_sender;
116 let diagnostics_ref = &diagnostics;
117 stream::iter(suites)
118 .for_each_concurrent(max_parallel_suites, |suite| async move {
119 let suite_idx_local = suite_idx_ref.fetch_add(1, Ordering::Relaxed);
120 let instance_name = format!("suite-{:?}", suite_idx_local);
121 let suite_node = diagnostics_ref.child(instance_name);
122 suite_node.set_property("url", suite.test_url.clone());
123 self.suite_runner.run_suite(suite, debug_data_sender_ref.clone(), suite_node).await;
124 })
125 .await;
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use crate::debug_data_processor::{DebugDataDirectory, DebugDataProcessor};
133 use crate::{facet, AboveRootCapabilitiesForTest};
134 use async_trait::async_trait;
135 use fidl::endpoints::create_proxy_and_stream;
136 use fidl_fuchsia_test_manager::{RunOptions, SuiteControllerMarker};
137 use std::sync::{Arc, Mutex};
138 use {fidl_fuchsia_component_resolution as fresolution, fidl_fuchsia_pkg as fpkg};
139
140 async fn create_fake_suite(test_url: String) -> Suite {
141 let (_controller_proxy, controller_stream) =
142 create_proxy_and_stream::<SuiteControllerMarker>();
143 let (resolver_proxy, _resolver_stream) =
144 create_proxy_and_stream::<fresolution::ResolverMarker>();
145 let (pkg_resolver_proxy, _pkg_resolver_stream) =
146 create_proxy_and_stream::<fpkg::PackageResolverMarker>();
147 let resolver_proxy = Arc::new(resolver_proxy);
148 let pkg_resolver_proxy = Arc::new(pkg_resolver_proxy);
149 let routing_info = Arc::new(AboveRootCapabilitiesForTest::new_empty_for_tests());
150 Suite {
151 realm: None,
152 test_url,
153 options: RunOptions {
154 parallel: None,
155 arguments: None,
156 run_disabled_tests: Some(false),
157 timeout: None,
158 case_filters_to_run: None,
159 log_iterator: None,
160 ..Default::default()
161 },
162 controller: controller_stream,
163 resolver: resolver_proxy,
164 pkg_resolver: pkg_resolver_proxy,
165 above_root_capabilities_for_test: routing_info,
166 facets: facet::ResolveStatus::Unresolved,
167 }
168 }
169
170 struct RunSuiteObjForTests {
171 test_vec: Arc<Mutex<Vec<String>>>,
172 }
173
174 #[async_trait]
175 impl RunSuiteFn for &RunSuiteObjForTests {
176 async fn run_suite(
177 &self,
178 suite: Suite,
179 _debug_data_sender: DebugDataSender,
180 _diagnostics: DiagnosticNode,
181 ) {
182 let suite_url = suite.test_url;
183 self.test_vec.clone().lock().expect("expected locked").push(suite_url);
184 }
185 }
186
187 #[fuchsia::test]
188 async fn parallel_executor_runs_all_tests() {
189 let suite_1 = create_fake_suite("suite_1".to_string()).await;
190 let suite_2 = create_fake_suite("suite_2".to_string()).await;
191 let suite_3 = create_fake_suite("suite_3".to_string()).await;
192 let suite_vec = vec![suite_1, suite_2, suite_3];
193
194 let test_vec = Arc::new(Mutex::new(vec![]));
195 let suite_runner = RunSuiteObjForTests { test_vec };
196 let parallel_executor =
197 ParallelScheduler { suite_runner: &suite_runner, max_parallel_suites: 8 };
198
199 let diagnostics = DiagnosticNode::new(
200 "root",
201 Arc::new(fuchsia_inspect::component::inspector().root().clone_weak()),
202 );
203
204 let sender =
205 DebugDataProcessor::new_for_test(DebugDataDirectory::Isolated { parent: "/tmp" })
206 .sender;
207
208 let (_stop_sender, mut stop_recv) = oneshot::channel::<()>();
209
210 parallel_executor.execute(suite_vec, diagnostics, &mut stop_recv, sender).await;
211
212 assert!(suite_runner
213 .test_vec
214 .lock()
215 .expect("expected locked")
216 .contains(&"suite_1".to_string()));
217 assert!(suite_runner
218 .test_vec
219 .lock()
220 .expect("expected locked")
221 .contains(&"suite_2".to_string()));
222 assert!(suite_runner
223 .test_vec
224 .lock()
225 .expect("expected locked")
226 .contains(&"suite_3".to_string()));
227 }
228}