test_manager_lib/
scheduler.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::debug_data_processor::DebugDataSender;
6use crate::self_diagnostics::DiagnosticNode;
7use crate::test_suite::{self, Suite};
8use async_trait::async_trait;
9use futures::channel::oneshot;
10use futures::stream::{self, StreamExt};
11use std::sync::atomic::{AtomicU16, Ordering};
12
13#[async_trait]
14pub(crate) trait Scheduler {
15    /// This function schedules and executes the provided collection
16    /// of test suites. This allows objects that implement the
17    /// Scheduler trait to define their own test suite scheduling
18    /// algorithm. Inputs:
19    ///     - &self
20    ///     - suites: a collection of suites to schedule and execute
21    ///     - stop_recv: Receiving end of a channel that receives messages to attempt to stop the
22    ///                  test run. Scheduler::execute should check for stop messages over
23    ///                  this channel and try to terminate the test run gracefully.
24    ///     - run_id: an id that identifies the test run.
25    ///     - debug_data_sender: used to send debug data VMOs for processing
26    async fn execute(
27        &self,
28        suites: Vec<test_suite::Suite>,
29        diagnostics: DiagnosticNode,
30        stop_recv: &mut oneshot::Receiver<()>,
31        debug_data_sender: DebugDataSender,
32    );
33}
34
35#[async_trait]
36pub(crate) trait RunSuiteFn {
37    /// This function allows us to specify what function we want the
38    /// parallel scheduler to invoke to run a single suite.
39    /// This trait was added for testing purposes, specifically to add the
40    /// ability to mock test_suite::run_single_suite in test::parallel_executor_test.
41    async fn run_suite(
42        &self,
43        suite: Suite,
44        debug_data_sender: DebugDataSender,
45        diagnostics: DiagnosticNode,
46    );
47}
48
49pub struct SerialScheduler {}
50
51#[async_trait]
52impl Scheduler for SerialScheduler {
53    async fn execute(
54        &self,
55        suites: Vec<test_suite::Suite>,
56        diagnostics: DiagnosticNode,
57        stop_recv: &mut oneshot::Receiver<()>,
58        debug_data_sender: DebugDataSender,
59    ) {
60        // run test suites serially for now
61        for (suite_idx, suite) in suites.into_iter().enumerate() {
62            // only check before running the test. We should complete the test run for
63            // running tests, if stop is called.
64            if let Ok(Some(())) = stop_recv.try_recv() {
65                break;
66            }
67            let instance_name = format!("suite-{:?}", suite_idx);
68            let suite_node = diagnostics.child(instance_name);
69            suite_node.set_property("url", suite.test_url.clone());
70            test_suite::run_single_suite(suite, debug_data_sender.clone(), suite_node).await;
71        }
72    }
73}
74
75pub(crate) struct ParallelScheduler<T: RunSuiteFn> {
76    pub suite_runner: T,
77    pub max_parallel_suites: u16,
78}
79
80pub(crate) struct RunSuiteObj {}
81
82#[async_trait]
83impl RunSuiteFn for RunSuiteObj {
84    async fn run_suite(
85        &self,
86        suite: Suite,
87        debug_data_sender: DebugDataSender,
88        diagnostics: DiagnosticNode,
89    ) {
90        test_suite::run_single_suite(suite, debug_data_sender, diagnostics).await;
91    }
92}
93
94#[async_trait]
95impl<T: RunSuiteFn + std::marker::Sync + std::marker::Send> Scheduler for ParallelScheduler<T> {
96    async fn execute(
97        &self,
98        suites: Vec<test_suite::Suite>,
99        diagnostics: DiagnosticNode,
100        _stop_recv: &mut oneshot::Receiver<()>,
101        debug_data_sender: DebugDataSender,
102    ) {
103        const MAX_PARALLEL_SUITES_DEFAULT: usize = 8;
104        let mut max_parallel_suites = self.max_parallel_suites as usize;
105
106        // This logic is necessary due to the defined behavior in the RunOptions
107        // fidl. We promise clients that if they use the WithSchedulingOptions
108        // method, and they set max_parallel_suites in SchedulingOptions to 0,
109        // the parallel scheduler implementation will choose a default
110        // max_parallel_suites value.
111        max_parallel_suites =
112            if max_parallel_suites > 0 { max_parallel_suites } else { MAX_PARALLEL_SUITES_DEFAULT };
113        let suite_idx = AtomicU16::new(0);
114        let suite_idx_ref = &suite_idx;
115        let debug_data_sender_ref = &debug_data_sender;
116        let diagnostics_ref = &diagnostics;
117        stream::iter(suites)
118            .for_each_concurrent(max_parallel_suites, |suite| async move {
119                let suite_idx_local = suite_idx_ref.fetch_add(1, Ordering::Relaxed);
120                let instance_name = format!("suite-{:?}", suite_idx_local);
121                let suite_node = diagnostics_ref.child(instance_name);
122                suite_node.set_property("url", suite.test_url.clone());
123                self.suite_runner.run_suite(suite, debug_data_sender_ref.clone(), suite_node).await;
124            })
125            .await;
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use crate::debug_data_processor::{DebugDataDirectory, DebugDataProcessor};
133    use crate::{facet, AboveRootCapabilitiesForTest};
134    use async_trait::async_trait;
135    use fidl::endpoints::create_proxy_and_stream;
136    use fidl_fuchsia_test_manager::{RunOptions, SuiteControllerMarker};
137    use std::sync::{Arc, Mutex};
138    use {fidl_fuchsia_component_resolution as fresolution, fidl_fuchsia_pkg as fpkg};
139
140    async fn create_fake_suite(test_url: String) -> Suite {
141        let (_controller_proxy, controller_stream) =
142            create_proxy_and_stream::<SuiteControllerMarker>();
143        let (resolver_proxy, _resolver_stream) =
144            create_proxy_and_stream::<fresolution::ResolverMarker>();
145        let (pkg_resolver_proxy, _pkg_resolver_stream) =
146            create_proxy_and_stream::<fpkg::PackageResolverMarker>();
147        let resolver_proxy = Arc::new(resolver_proxy);
148        let pkg_resolver_proxy = Arc::new(pkg_resolver_proxy);
149        let routing_info = Arc::new(AboveRootCapabilitiesForTest::new_empty_for_tests());
150        Suite {
151            realm: None,
152            test_url,
153            options: RunOptions {
154                parallel: None,
155                arguments: None,
156                run_disabled_tests: Some(false),
157                timeout: None,
158                case_filters_to_run: None,
159                log_iterator: None,
160                ..Default::default()
161            },
162            controller: controller_stream,
163            resolver: resolver_proxy,
164            pkg_resolver: pkg_resolver_proxy,
165            above_root_capabilities_for_test: routing_info,
166            facets: facet::ResolveStatus::Unresolved,
167        }
168    }
169
170    struct RunSuiteObjForTests {
171        test_vec: Arc<Mutex<Vec<String>>>,
172    }
173
174    #[async_trait]
175    impl RunSuiteFn for &RunSuiteObjForTests {
176        async fn run_suite(
177            &self,
178            suite: Suite,
179            _debug_data_sender: DebugDataSender,
180            _diagnostics: DiagnosticNode,
181        ) {
182            let suite_url = suite.test_url;
183            self.test_vec.clone().lock().expect("expected locked").push(suite_url);
184        }
185    }
186
187    #[fuchsia::test]
188    async fn parallel_executor_runs_all_tests() {
189        let suite_1 = create_fake_suite("suite_1".to_string()).await;
190        let suite_2 = create_fake_suite("suite_2".to_string()).await;
191        let suite_3 = create_fake_suite("suite_3".to_string()).await;
192        let suite_vec = vec![suite_1, suite_2, suite_3];
193
194        let test_vec = Arc::new(Mutex::new(vec![]));
195        let suite_runner = RunSuiteObjForTests { test_vec };
196        let parallel_executor =
197            ParallelScheduler { suite_runner: &suite_runner, max_parallel_suites: 8 };
198
199        let diagnostics = DiagnosticNode::new(
200            "root",
201            Arc::new(fuchsia_inspect::component::inspector().root().clone_weak()),
202        );
203
204        let sender =
205            DebugDataProcessor::new_for_test(DebugDataDirectory::Isolated { parent: "/tmp" })
206                .sender;
207
208        let (_stop_sender, mut stop_recv) = oneshot::channel::<()>();
209
210        parallel_executor.execute(suite_vec, diagnostics, &mut stop_recv, sender).await;
211
212        assert!(suite_runner
213            .test_vec
214            .lock()
215            .expect("expected locked")
216            .contains(&"suite_1".to_string()));
217        assert!(suite_runner
218            .test_vec
219            .lock()
220            .expect("expected locked")
221            .contains(&"suite_2".to_string()));
222        assert!(suite_runner
223            .test_vec
224            .lock()
225            .expect("expected locked")
226            .contains(&"suite_3".to_string()));
227    }
228}