test_manager_lib/
test_suite.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::above_root_capabilities::AboveRootCapabilitiesForTest;
6use crate::constants::{self, HERMETIC_TESTS_COLLECTION};
7use crate::debug_data_processor::{DebugDataDirectory, DebugDataProcessor, DebugDataSender};
8use crate::error::*;
9use crate::facet::SuiteFacets;
10use crate::run_events::{RunEvent, SuiteEvents};
11use crate::scheduler::Scheduler;
12use crate::self_diagnostics::DiagnosticNode;
13use crate::{facet, running_suite, scheduler};
14use anyhow::Error;
15use fidl::endpoints::{ControlHandle, Responder};
16use fidl_fuchsia_component::RealmProxy;
17use fidl_fuchsia_component_resolution::ResolverProxy;
18use fidl_fuchsia_pkg::PackageResolverProxy;
19use ftest_manager::{
20    LaunchError, RunControllerRequest, RunControllerRequestStream, SchedulingOptions,
21    SuiteControllerRequest, SuiteControllerRequestStream,
22};
23use futures::channel::{mpsc, oneshot};
24use futures::future::Either;
25use futures::prelude::*;
26use futures::StreamExt;
27use log::{error, info, warn};
28use std::sync::Arc;
29use {
30    fidl_fuchsia_component_test as ftest, fidl_fuchsia_test_manager as ftest_manager,
31    fuchsia_async as fasync,
32};
33
34const EXECUTION_PROPERTY: &'static str = "execution";
35
36pub(crate) struct SuiteRealm {
37    pub realm_proxy: RealmProxy,
38    pub offers: Vec<ftest::Capability>,
39    pub test_collection: String,
40}
41
42pub(crate) struct Suite {
43    pub test_url: String,
44    pub options: ftest_manager::RunOptions,
45    pub controller: SuiteControllerRequestStream,
46    pub resolver: Arc<ResolverProxy>,
47    pub pkg_resolver: Arc<PackageResolverProxy>,
48    pub above_root_capabilities_for_test: Arc<AboveRootCapabilitiesForTest>,
49    pub facets: facet::ResolveStatus,
50    pub realm: Option<SuiteRealm>,
51}
52
53pub(crate) struct TestRunBuilder {
54    pub suites: Vec<Suite>,
55}
56
57impl TestRunBuilder {
58    /// Serve a RunControllerRequestStream. Returns Err if the client stops the test
59    /// prematurely or there is an error serving he stream.
60    /// Be careful, |run_task| is dropped once the other end of |controller| is dropped
61    /// or |kill| on the controller is called.
62    async fn run_controller(
63        mut controller: RunControllerRequestStream,
64        run_task: futures::future::RemoteHandle<()>,
65        stop_sender: oneshot::Sender<()>,
66        event_recv: mpsc::Receiver<RunEvent>,
67        diagnostics: DiagnosticNode,
68    ) {
69        let mut run_task = Some(run_task);
70        let mut stop_sender = Some(stop_sender);
71        let (events_responder_sender, mut events_responder_recv) = mpsc::unbounded();
72        let diagnostics_ref = &diagnostics;
73
74        let serve_controller_fut = async move {
75            while let Some(request) = controller.try_next().await? {
76                match request {
77                    RunControllerRequest::Stop { .. } => {
78                        diagnostics_ref.set_flag("stopped");
79                        if let Some(stop_sender) = stop_sender.take() {
80                            // no need to check error.
81                            let _ = stop_sender.send(());
82                            // after this all `senders` go away and subsequent GetEvent call will
83                            // return rest of events and eventually a empty array and will close the
84                            // connection after that.
85                        }
86                    }
87                    RunControllerRequest::Kill { .. } => {
88                        diagnostics_ref.set_flag("killed");
89                        // dropping the remote handle cancels it.
90                        drop(run_task.take());
91                        // after this all `senders` go away and subsequent GetEvent call will
92                        // return rest of events and eventually a empty array and will close the
93                        // connection after that.
94                    }
95                    RunControllerRequest::GetEvents { responder } => {
96                        events_responder_sender.unbounded_send(responder).unwrap_or_else(|e| {
97                            // If the handler is already done, drop responder without closing the
98                            // channel.
99                            e.into_inner().drop_without_shutdown();
100                        })
101                    }
102                    RunControllerRequest::_UnknownMethod { ordinal, control_handle, .. } => {
103                        warn!(
104                            "Unknown run controller request received: {}, closing connection",
105                            ordinal
106                        );
107                        // dropping the remote handle cancels it.
108                        drop(run_task.take());
109                        control_handle.shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
110                        break;
111                    }
112                }
113            }
114            Result::<(), Error>::Ok(())
115        };
116
117        let get_events_fut = async move {
118            let mut event_chunks = event_recv.map(RunEvent::into).ready_chunks(EVENTS_THRESHOLD);
119            while let Some(responder) = events_responder_recv.next().await {
120                diagnostics_ref.set_property("events", "awaiting");
121                let next_chunk = event_chunks.next().await.unwrap_or_default();
122                diagnostics_ref.set_property("events", "idle");
123                let done = next_chunk.is_empty();
124                responder.send(next_chunk)?;
125                if done {
126                    diagnostics_ref.set_flag("events_drained");
127                    break;
128                }
129            }
130            // Drain remaining responders without dropping them so that the channel doesn't
131            // close.
132            events_responder_recv.close();
133            events_responder_recv
134                .for_each(|responder| async move {
135                    responder.drop_without_shutdown();
136                })
137                .await;
138            Result::<(), Error>::Ok(())
139        };
140
141        match futures::future::select(serve_controller_fut.boxed(), get_events_fut.boxed()).await {
142            Either::Left((serve_result, _fut)) => {
143                if let Err(e) = serve_result {
144                    warn!(diagnostics:?; "Error serving RunController: {:?}", e);
145                }
146            }
147            Either::Right((get_events_result, serve_fut)) => {
148                if let Err(e) = get_events_result {
149                    warn!(diagnostics:?; "Error sending events for RunController: {:?}", e);
150                }
151                // Wait for the client to close the channel.
152                // TODO(https://fxbug.dev/42169156) once https://fxbug.dev/42169061 is fixed, this is no longer
153                // necessary.
154                if let Err(e) = serve_fut.await {
155                    warn!(diagnostics:?; "Error serving RunController: {:?}", e);
156                }
157            }
158        }
159    }
160
161    pub(crate) async fn run(
162        self,
163        controller: RunControllerRequestStream,
164        diagnostics: DiagnosticNode,
165        scheduling_options: Option<SchedulingOptions>,
166    ) {
167        let (stop_sender, mut stop_recv) = oneshot::channel::<()>();
168        let (event_sender, event_recv) = mpsc::channel::<RunEvent>(16);
169
170        let diagnostics_ref = &diagnostics;
171
172        let max_parallel_suites = match &scheduling_options {
173            Some(options) => options.max_parallel_suites,
174            None => None,
175        };
176        let max_parallel_suites_ref = &max_parallel_suites;
177        let accumulate_debug_data = scheduling_options
178            .as_ref()
179            .and_then(|options| options.accumulate_debug_data)
180            .unwrap_or(false);
181        let debug_data_directory = match accumulate_debug_data {
182            true => DebugDataDirectory::Accumulating { dir: constants::DEBUG_DATA_FOR_SCP },
183            false => DebugDataDirectory::Isolated { parent: constants::ISOLATED_TMP },
184        };
185        let (debug_data_processor, debug_data_sender) =
186            DebugDataProcessor::new(debug_data_directory);
187
188        let debug_task = fasync::Task::local(
189            debug_data_processor
190                .collect_and_serve(event_sender)
191                .unwrap_or_else(|err| warn!(err:?; "Error serving debug data")),
192        );
193
194        // This future returns the task which needs to be completed before completion.
195        let suite_scheduler_fut = async move {
196            diagnostics_ref.set_property(EXECUTION_PROPERTY, "executing");
197
198            let serial_executor = scheduler::SerialScheduler {};
199
200            match max_parallel_suites_ref {
201                Some(max_parallel_suites) => {
202                    let parallel_executor = scheduler::ParallelScheduler {
203                        suite_runner: scheduler::RunSuiteObj {},
204                        max_parallel_suites: *max_parallel_suites,
205                    };
206                    let get_facets_fn = |test_url, resolver| async move {
207                        facet::get_suite_facets(test_url, resolver).await
208                    };
209                    let (serial_suites, parallel_suites) =
210                        split_suites_by_hermeticity(self.suites, get_facets_fn).await;
211
212                    parallel_executor
213                        .execute(
214                            parallel_suites,
215                            diagnostics_ref.child("parallel_executor"),
216                            &mut stop_recv,
217                            debug_data_sender.clone(),
218                        )
219                        .await;
220                    serial_executor
221                        .execute(
222                            serial_suites,
223                            diagnostics_ref.child("serial_executor"),
224                            &mut stop_recv,
225                            debug_data_sender.clone(),
226                        )
227                        .await;
228                }
229                None => {
230                    serial_executor
231                        .execute(
232                            self.suites,
233                            diagnostics_ref.child("serial_executor"),
234                            &mut stop_recv,
235                            debug_data_sender.clone(),
236                        )
237                        .await;
238                }
239            }
240
241            drop(debug_data_sender); // needed for debug_data_processor to complete.
242
243            diagnostics_ref.set_property(EXECUTION_PROPERTY, "complete");
244        };
245
246        let (remote, remote_handle) = suite_scheduler_fut.remote_handle();
247
248        let ((), ()) = futures::future::join(
249            remote.then(|_| async move {
250                debug_task.await;
251            }),
252            Self::run_controller(
253                controller,
254                remote_handle,
255                stop_sender,
256                event_recv,
257                diagnostics.child("controller"),
258            ),
259        )
260        .await;
261    }
262}
263
264// max events to send so that we don't cross fidl limits.
265// TODO(https://fxbug.dev/42051179): Use tape measure to calculate limit.
266const EVENTS_THRESHOLD: usize = 50;
267
268impl Suite {
269    pub(crate) async fn run(self, diagnostics: DiagnosticNode, accumulate_debug_data: bool) {
270        let diagnostics_ref = &diagnostics;
271
272        let debug_data_directory = match accumulate_debug_data {
273            true => DebugDataDirectory::Accumulating { dir: constants::DEBUG_DATA_FOR_SCP },
274            false => DebugDataDirectory::Isolated { parent: constants::ISOLATED_TMP },
275        };
276        let (debug_data_processor, debug_data_sender) =
277            DebugDataProcessor::new(debug_data_directory);
278
279        let (event_sender, event_receiver) = mpsc::channel(1024);
280
281        let debug_task = fasync::Task::local(
282            debug_data_processor
283                .collect_and_serve_for_suite(event_sender.clone())
284                .unwrap_or_else(|err| warn!(err:?; "Error serving debug data")),
285        );
286
287        // This future returns the task which needs to be completed before completion.
288        let suite_run_fut = async move {
289            diagnostics_ref.set_property(EXECUTION_PROPERTY, "executing");
290
291            let suite_node = diagnostics_ref.child("serial_executor").child("suite-0");
292            suite_node.set_property("url", self.test_url.clone());
293            run_single_suite_for_suite_runner(
294                self,
295                debug_data_sender,
296                suite_node,
297                event_sender,
298                event_receiver,
299            )
300            .await;
301
302            diagnostics_ref.set_property(EXECUTION_PROPERTY, "complete");
303        };
304
305        suite_run_fut
306            .then(|_| async move {
307                debug_task.await;
308            })
309            .await;
310    }
311
312    async fn run_controller(
313        mut controller: SuiteControllerRequestStream,
314        stop_sender: oneshot::Sender<()>,
315        run_suite_remote_handle: futures::future::RemoteHandle<()>,
316        event_recv: mpsc::Receiver<Result<SuiteEvents, LaunchError>>,
317    ) -> Result<(), Error> {
318        let mut task = Some(run_suite_remote_handle);
319        let mut stop_sender = Some(stop_sender);
320        let (events_responder_sender, mut events_responder_recv) = mpsc::unbounded();
321
322        let serve_controller_fut = async move {
323            while let Some(event) = controller.try_next().await? {
324                match event {
325                    SuiteControllerRequest::Stop { .. } => {
326                        // no need to handle error as task might already have finished.
327                        if let Some(stop) = stop_sender.take() {
328                            let _ = stop.send(());
329                            // after this all `senders` go away and subsequent GetEvent call will
330                            // return rest of event. Eventually an empty array and will close the
331                            // connection after that.
332                        }
333                    }
334                    SuiteControllerRequest::Kill { .. } => {
335                        // Dropping the remote handle for the suite execution task cancels it.
336                        drop(task.take());
337                        // after this all `senders` go away and subsequent GetEvent call will
338                        // return rest of event. Eventually an empty array and will close the
339                        // connection after that.
340                    }
341                    SuiteControllerRequest::WatchEvents { responder } => {
342                        events_responder_sender
343                            .unbounded_send(EventResponder::New(responder))
344                            .unwrap_or_else(|e| {
345                                // If the handler is already done, drop responder without closing the
346                                // channel.
347                                e.into_inner().drop_without_shutdown();
348                            })
349                    }
350                    SuiteControllerRequest::GetEvents { responder } => {
351                        events_responder_sender
352                            .unbounded_send(EventResponder::Deprecated(responder))
353                            .unwrap_or_else(|e| {
354                                // If the handler is already done, drop responder without closing the
355                                // channel.
356                                e.into_inner().drop_without_shutdown();
357                            })
358                    }
359                    SuiteControllerRequest::_UnknownMethod { ordinal, control_handle, .. } => {
360                        warn!(
361                            "Unknown suite controller request received: {}, closing connection",
362                            ordinal
363                        );
364                        // Dropping the remote handle for the suite execution task cancels it.
365                        drop(task.take());
366                        control_handle.shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
367                        break;
368                    }
369                }
370            }
371            Ok(())
372        };
373
374        let get_events_fut = async move {
375            let mut event_chunks = event_recv.ready_chunks(EVENTS_THRESHOLD);
376            while let Some(responder) = events_responder_recv.next().await {
377                let next_chunk_results: Vec<Result<_, _>> =
378                    event_chunks.next().await.unwrap_or_default();
379                if responder.send(next_chunk_results)? {
380                    break;
381                }
382            }
383            // Drain remaining responders without dropping them so that the channel doesn't
384            // close.
385            events_responder_recv.close();
386            events_responder_recv
387                .for_each(|responder| async move {
388                    responder.drop_without_shutdown();
389                })
390                .await;
391            Result::<(), Error>::Ok(())
392        };
393
394        match futures::future::select(serve_controller_fut.boxed(), get_events_fut.boxed()).await {
395            Either::Left((serve_result, _fut)) => serve_result,
396            Either::Right((get_events_result, serve_fut)) => {
397                get_events_result?;
398                // Wait for the client to close the channel.
399                // TODO(https://fxbug.dev/42169156) once https://fxbug.dev/42169061 is fixed, this is no longer
400                // necessary.
401                serve_fut.await
402            }
403        }
404    }
405}
406
407enum EventResponder {
408    Deprecated(ftest_manager::SuiteControllerGetEventsResponder),
409    New(ftest_manager::SuiteControllerWatchEventsResponder),
410}
411
412impl EventResponder {
413    fn drop_without_shutdown(self) {
414        match self {
415            EventResponder::Deprecated(inner) => inner.drop_without_shutdown(),
416            EventResponder::New(inner) => inner.drop_without_shutdown(),
417        }
418    }
419
420    pub fn send(self, results: Vec<Result<SuiteEvents, LaunchError>>) -> Result<bool, fidl::Error> {
421        match self {
422            EventResponder::Deprecated(inner) => {
423                let result: Result<Vec<_>, _> =
424                    results.into_iter().map(|r| r.map(SuiteEvents::into)).collect();
425                let done = match &result {
426                    Ok(events) => events.is_empty(),
427                    Err(_) => true,
428                };
429                inner.send(result).map(|_| done)
430            }
431            EventResponder::New(inner) => {
432                let result: Result<Vec<_>, _> =
433                    results.into_iter().map(|r| r.map(SuiteEvents::into)).collect();
434                let done = match &result {
435                    Ok(events) => events.is_empty(),
436                    Err(_) => true,
437                };
438                inner.send(result).map(|_| done)
439            }
440        }
441    }
442}
443
444async fn run_single_suite_for_suite_runner(
445    suite: Suite,
446    debug_data_sender: DebugDataSender,
447    diagnostics: DiagnosticNode,
448    mut event_sender: mpsc::Sender<Result<SuiteEvents, LaunchError>>,
449    event_recv: mpsc::Receiver<Result<SuiteEvents, LaunchError>>,
450) {
451    let (stop_sender, stop_recv) = oneshot::channel::<()>();
452
453    let Suite {
454        test_url,
455        options,
456        controller,
457        resolver,
458        pkg_resolver,
459        above_root_capabilities_for_test,
460        facets,
461        realm: suite_realm,
462    } = suite;
463
464    let run_test_fut = async {
465        diagnostics.set_property(EXECUTION_PROPERTY, "get_facets");
466
467        let facets = match facets {
468            // Currently, all suites are passed in with unresolved facets by the
469            // SerialScheduler. ParallelScheduler will pass in Resolved facets
470            // once it is implemented.
471            facet::ResolveStatus::Resolved(result) => {
472                match result {
473                    Ok(facets) => facets,
474
475                    // This error is reported here instead of when the error was
476                    // first encountered because here is where it has access to
477                    // the SuiteController protocol server (Suite::run_controller)
478                    // which can report the error back to the test_manager client
479                    Err(error) => {
480                        event_sender.send(Err(error.into())).await.unwrap();
481                        return;
482                    }
483                }
484            }
485            facet::ResolveStatus::Unresolved => {
486                match facet::get_suite_facets(test_url.clone(), resolver.clone()).await {
487                    Ok(facets) => facets,
488                    Err(error) => {
489                        event_sender.send(Err(error.into())).await.unwrap();
490                        return;
491                    }
492                }
493            }
494        };
495        diagnostics.set_property(EXECUTION_PROPERTY, "launch");
496        match running_suite::RunningSuite::launch(
497            &test_url,
498            facets,
499            resolver,
500            pkg_resolver,
501            above_root_capabilities_for_test,
502            debug_data_sender,
503            &diagnostics,
504            &suite_realm,
505            use_debug_agent_for_runs(&options),
506        )
507        .await
508        {
509            Ok(mut instance) => {
510                diagnostics.set_property(EXECUTION_PROPERTY, "run_tests");
511                instance.run_tests(&test_url, options, event_sender, stop_recv).await;
512                diagnostics.set_property(EXECUTION_PROPERTY, "tests_done");
513                diagnostics.set_property(EXECUTION_PROPERTY, "tear_down");
514                if let Err(err) = instance.destroy(diagnostics.child("destroy")).await {
515                    // Failure to destroy an instance could mean that some component events fail to send.
516                    error!(
517                        diagnostics:?,
518                        err:?;
519                        "Failed to destroy instance. Debug data may be lost."
520                    );
521                }
522            }
523            Err(e) => {
524                event_sender.send(Err(e.into())).await.unwrap();
525            }
526        }
527    };
528    let (run_test_remote, run_test_handle) = run_test_fut.remote_handle();
529
530    let controller_fut =
531        Suite::run_controller(controller, stop_sender, run_test_handle, event_recv);
532    let ((), controller_ret) = futures::future::join(run_test_remote, controller_fut).await;
533
534    if let Err(e) = controller_ret {
535        warn!(diagnostics:?; "Ended test {}: {:?}", test_url, e);
536    }
537
538    diagnostics.set_property(EXECUTION_PROPERTY, "complete");
539    info!(diagnostics:?; "Test destruction complete");
540}
541
542pub(crate) async fn run_single_suite(
543    suite: Suite,
544    debug_data_sender: DebugDataSender,
545    diagnostics: DiagnosticNode,
546) {
547    let (mut sender, recv) = mpsc::channel(1024);
548    let (stop_sender, stop_recv) = oneshot::channel::<()>();
549    let mut maybe_instance = None;
550
551    let Suite {
552        test_url,
553        options,
554        controller,
555        resolver,
556        pkg_resolver,
557        above_root_capabilities_for_test,
558        facets,
559        realm: suite_realm,
560    } = suite;
561
562    let run_test_fut = async {
563        diagnostics.set_property(EXECUTION_PROPERTY, "get_facets");
564
565        let facets = match facets {
566            // Currently, all suites are passed in with unresolved facets by the
567            // SerialScheduler. ParallelScheduler will pass in Resolved facets
568            // once it is implemented.
569            facet::ResolveStatus::Resolved(result) => {
570                match result {
571                    Ok(facets) => facets,
572
573                    // This error is reported here instead of when the error was
574                    // first encountered because here is where it has access to
575                    // the SuiteController protocol server (Suite::run_controller)
576                    // which can report the error back to the test_manager client
577                    Err(error) => {
578                        sender.send(Err(error.into())).await.unwrap();
579                        return;
580                    }
581                }
582            }
583            facet::ResolveStatus::Unresolved => {
584                match facet::get_suite_facets(test_url.clone(), resolver.clone()).await {
585                    Ok(facets) => facets,
586                    Err(error) => {
587                        sender.send(Err(error.into())).await.unwrap();
588                        return;
589                    }
590                }
591            }
592        };
593        diagnostics.set_property(EXECUTION_PROPERTY, "launch");
594        match running_suite::RunningSuite::launch(
595            &test_url,
596            facets,
597            resolver,
598            pkg_resolver,
599            above_root_capabilities_for_test,
600            debug_data_sender,
601            &diagnostics,
602            &suite_realm,
603            use_debug_agent_for_runs(&options),
604        )
605        .await
606        {
607            Ok(instance) => {
608                diagnostics.set_property(EXECUTION_PROPERTY, "run_tests");
609                let instance_ref = maybe_instance.insert(instance);
610                instance_ref.run_tests(&test_url, options, sender, stop_recv).await;
611                diagnostics.set_property(EXECUTION_PROPERTY, "tests_done");
612            }
613            Err(e) => {
614                sender.send(Err(e.into())).await.unwrap();
615            }
616        }
617    };
618    let (run_test_remote, run_test_handle) = run_test_fut.remote_handle();
619
620    let controller_fut = Suite::run_controller(controller, stop_sender, run_test_handle, recv);
621    let ((), controller_ret) = futures::future::join(run_test_remote, controller_fut).await;
622
623    if let Err(e) = controller_ret {
624        warn!(diagnostics:?; "Ended test {}: {:?}", test_url, e);
625    }
626
627    if let Some(instance) = maybe_instance.take() {
628        diagnostics.set_property(EXECUTION_PROPERTY, "tear_down");
629        info!(diagnostics:?; "Test suite has finished, destroying instance...");
630        if let Err(err) = instance.destroy(diagnostics.child("destroy")).await {
631            // Failure to destroy an instance could mean that some component events fail to send.
632            error!(diagnostics:?, err:?; "Failed to destroy instance. Debug data may be lost.");
633        }
634    }
635    diagnostics.set_property(EXECUTION_PROPERTY, "complete");
636    info!(diagnostics:?; "Test destruction complete");
637}
638
639// Separate suite into a hermetic and a non-hermetic collection
640// Note: F takes String and Arc<ResolverProxy> to circumvent the
641// borrow checker.
642async fn split_suites_by_hermeticity<F, Fut>(
643    suites: Vec<Suite>,
644    get_facets_fn: F,
645) -> (Vec<Suite>, Vec<Suite>)
646where
647    F: Fn(String, Arc<ResolverProxy>) -> Fut,
648    Fut: futures::future::Future<Output = Result<SuiteFacets, LaunchTestError>>,
649{
650    let mut serial_suites: Vec<Suite> = Vec::new();
651    let mut parallel_suites: Vec<Suite> = Vec::new();
652
653    for mut suite in suites {
654        if suite.realm.is_some() {
655            serial_suites.push(suite);
656            continue;
657        }
658        let test_url = suite.test_url.clone();
659        let resolver = suite.resolver.clone();
660        let facet_result = get_facets_fn(test_url, resolver).await;
661        let can_run_in_parallel = match &facet_result {
662            Ok(facets) => facets.collection == HERMETIC_TESTS_COLLECTION,
663            Err(_) => false,
664        };
665        suite.facets = facet::ResolveStatus::Resolved(facet_result);
666        if can_run_in_parallel {
667            parallel_suites.push(suite);
668        } else {
669            serial_suites.push(suite);
670        }
671    }
672    (serial_suites, parallel_suites)
673}
674
675// Determine whether debug_agent should be used for test runs.
676fn use_debug_agent_for_runs(options: &ftest_manager::RunOptions) -> bool {
677    match options.no_exception_channel {
678        Some(true) => {
679            // Do not use debug_agent when the option is set to true.
680            false
681        }
682        Some(false) | None => {
683            // Use debug_agent when the option is set to false or not specified.
684            true
685        }
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692    use fidl::endpoints::create_proxy_and_stream;
693    use {fidl_fuchsia_component_resolution as fresolution, fuchsia_async as fasync};
694
695    fn new_run_inspect_node() -> DiagnosticNode {
696        DiagnosticNode::new("root", Arc::new(fuchsia_inspect::types::Node::default()))
697    }
698
699    #[fuchsia::test]
700    async fn run_controller_stop_test() {
701        let (sender, recv) = mpsc::channel(1024);
702        let (stop_sender, stop_recv) = oneshot::channel::<()>();
703        let (task, remote_handle) = async move {
704            stop_recv.await.unwrap();
705            // drop event sender so that fake test can end.
706            drop(sender);
707        }
708        .remote_handle();
709        let _task = fasync::Task::spawn(task);
710        let (proxy, controller) = create_proxy_and_stream::<ftest_manager::RunControllerMarker>();
711        let run_controller = fasync::Task::spawn(async move {
712            TestRunBuilder::run_controller(
713                controller,
714                remote_handle,
715                stop_sender,
716                recv,
717                new_run_inspect_node(),
718            )
719            .await
720        });
721        // sending a get event first should not prevent stop from cancelling the suite.
722        let get_events_task = fasync::Task::spawn(proxy.get_events());
723        proxy.stop().unwrap();
724        assert_eq!(get_events_task.await.unwrap(), vec![]);
725
726        drop(proxy);
727        run_controller.await;
728    }
729
730    #[fuchsia::test]
731    async fn run_controller_abort_when_channel_closed() {
732        let (_sender, recv) = mpsc::channel(1024);
733        let (stop_sender, _stop_recv) = oneshot::channel::<()>();
734        // Create a future that normally never resolves.
735        let (task, remote_handle) = futures::future::pending().remote_handle();
736        let pending_task = fasync::Task::spawn(task);
737        let (proxy, controller) = create_proxy_and_stream::<ftest_manager::RunControllerMarker>();
738        let run_controller = fasync::Task::spawn(async move {
739            TestRunBuilder::run_controller(
740                controller,
741                remote_handle,
742                stop_sender,
743                recv,
744                new_run_inspect_node(),
745            )
746            .await
747        });
748        // sending a get event first should not prevent killing the controller.
749        let get_events_task = fasync::Task::spawn(proxy.get_events());
750        drop(proxy);
751        drop(get_events_task);
752        // After controller is dropped, both the controller future and the task it was
753        // controlling should terminate.
754        pending_task.await;
755        run_controller.await;
756    }
757
758    #[fuchsia::test]
759    async fn suite_controller_stop_test() {
760        let (sender, recv) = mpsc::channel(1024);
761        let (stop_sender, stop_recv) = oneshot::channel::<()>();
762        let (task, remote_handle) = async move {
763            stop_recv.await.unwrap();
764            // drop event sender so that fake test can end.
765            drop(sender);
766        }
767        .remote_handle();
768        let _task = fasync::Task::spawn(task);
769        let (proxy, controller) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
770        let run_controller = fasync::Task::spawn(async move {
771            Suite::run_controller(controller, stop_sender, remote_handle, recv).await
772        });
773        // sending a get event first should not prevent stop from cancelling the suite.
774        let get_events_task = fasync::Task::spawn(proxy.get_events());
775        proxy.stop().unwrap();
776
777        assert_eq!(get_events_task.await.unwrap(), Ok(vec![]));
778        // run controller should end after channel is closed.
779        drop(proxy);
780        run_controller.await.unwrap();
781    }
782
783    #[fuchsia::test]
784    async fn suite_controller_abort_remote_when_controller_closed() {
785        let (_sender, recv) = mpsc::channel(1024);
786        let (stop_sender, _stop_recv) = oneshot::channel::<()>();
787        // Create a future that normally never resolves.
788        let (task, remote_handle) = futures::future::pending().remote_handle();
789        let pending_task = fasync::Task::spawn(task);
790        let (proxy, controller) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
791        let run_controller = fasync::Task::spawn(async move {
792            Suite::run_controller(controller, stop_sender, remote_handle, recv).await
793        });
794        // sending a get event first should not prevent killing the controller.
795        let get_events_task = fasync::Task::spawn(proxy.get_events());
796        drop(proxy);
797        drop(get_events_task);
798        // After controller is dropped, both the controller future and the task it was
799        // controlling should terminate.
800        pending_task.await;
801        run_controller.await.unwrap();
802    }
803
804    #[fuchsia::test]
805    async fn suite_controller_get_events() {
806        let (mut sender, recv) = mpsc::channel(1024);
807        let (stop_sender, stop_recv) = oneshot::channel::<()>();
808        let (task, remote_handle) = async {}.remote_handle();
809        let _task = fasync::Task::spawn(task);
810        let (proxy, controller) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
811        let run_controller = fasync::Task::spawn(async move {
812            Suite::run_controller(controller, stop_sender, remote_handle, recv).await
813        });
814        sender.send(Ok(SuiteEvents::case_found(1, "case1".to_string()).into())).await.unwrap();
815        sender.send(Ok(SuiteEvents::case_found(2, "case2".to_string()).into())).await.unwrap();
816
817        let events = proxy.get_events().await.unwrap().unwrap();
818        assert_eq!(events.len(), 2);
819        assert_eq!(
820            events[0].payload,
821            SuiteEvents::case_found(1, "case1".to_string()).into_suite_run_event().payload,
822        );
823        assert_eq!(
824            events[1].payload,
825            SuiteEvents::case_found(2, "case2".to_string()).into_suite_run_event().payload,
826        );
827        sender.send(Ok(SuiteEvents::case_started(2).into())).await.unwrap();
828        proxy.stop().unwrap();
829
830        // test that controller collects event after stop is called.
831        sender.send(Ok(SuiteEvents::case_started(1).into())).await.unwrap();
832        sender.send(Ok(SuiteEvents::case_found(3, "case3".to_string()).into())).await.unwrap();
833
834        stop_recv.await.unwrap();
835        // drop event sender so that fake test can end.
836        drop(sender);
837        let events = proxy.get_events().await.unwrap().unwrap();
838        assert_eq!(events.len(), 3);
839
840        assert_eq!(events[0].payload, SuiteEvents::case_started(2).into_suite_run_event().payload,);
841        assert_eq!(events[1].payload, SuiteEvents::case_started(1).into_suite_run_event().payload,);
842        assert_eq!(
843            events[2].payload,
844            SuiteEvents::case_found(3, "case3".to_string()).into_suite_run_event().payload,
845        );
846
847        let events = proxy.get_events().await.unwrap().unwrap();
848        assert_eq!(events, vec![]);
849        // run controller should end after channel is closed.
850        drop(proxy);
851        run_controller.await.unwrap();
852    }
853
854    async fn create_fake_suite(test_url: String) -> Suite {
855        let (_controller_proxy, controller_stream) =
856            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
857        let (resolver_proxy, _resolver_stream) =
858            create_proxy_and_stream::<fresolution::ResolverMarker>();
859        let resolver_proxy = Arc::new(resolver_proxy);
860        let (pkg_resolver_proxy, _pkg_resolver_stream) =
861            create_proxy_and_stream::<fidl_fuchsia_pkg::PackageResolverMarker>();
862        let pkg_resolver_proxy = Arc::new(pkg_resolver_proxy);
863        let routing_info = Arc::new(AboveRootCapabilitiesForTest::new_empty_for_tests());
864        Suite {
865            realm: None,
866            test_url,
867            options: ftest_manager::RunOptions {
868                parallel: None,
869                arguments: None,
870                run_disabled_tests: Some(false),
871                timeout: None,
872                case_filters_to_run: None,
873                log_iterator: None,
874                ..Default::default()
875            },
876            controller: controller_stream,
877            resolver: resolver_proxy,
878            pkg_resolver: pkg_resolver_proxy,
879            above_root_capabilities_for_test: routing_info,
880            facets: facet::ResolveStatus::Unresolved,
881        }
882    }
883
884    #[fuchsia::test]
885    async fn split_suites_by_hermeticity_test() {
886        let hermetic_suite = create_fake_suite("hermetic_suite".to_string()).await;
887        let non_hermetic_suite = create_fake_suite("non_hermetic_suite".to_string()).await;
888        let suites = vec![hermetic_suite, non_hermetic_suite];
889
890        // call split_suites_by_hermeticity
891        let get_facets_fn = |test_url, _resolver| async move {
892            if test_url == "hermetic_suite".to_string() {
893                Ok(SuiteFacets {
894                    collection: HERMETIC_TESTS_COLLECTION,
895                    deprecated_allowed_packages: None,
896                })
897            } else {
898                Ok(SuiteFacets {
899                    collection: crate::constants::SYSTEM_TESTS_COLLECTION,
900                    deprecated_allowed_packages: None,
901                })
902            }
903        };
904        let (serial_suites, parallel_suites) =
905            split_suites_by_hermeticity(suites, get_facets_fn).await;
906
907        assert_eq!(parallel_suites[0].test_url, "hermetic_suite".to_string());
908        assert_eq!(serial_suites[0].test_url, "non_hermetic_suite".to_string());
909    }
910
911    #[test]
912    fn suite_controller_hanging_get_events() {
913        let mut executor = fasync::TestExecutor::new();
914        let (mut sender, recv) = mpsc::channel(1024);
915        let (stop_sender, _stop_recv) = oneshot::channel::<()>();
916        let (task, remote_handle) = async {}.remote_handle();
917        let _task = fasync::Task::spawn(task);
918        let (proxy, controller) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
919        let _run_controller = fasync::Task::spawn(async move {
920            Suite::run_controller(controller, stop_sender, remote_handle, recv).await
921        });
922
923        // send get event call which would hang as there are no events.
924        let mut get_events =
925            fasync::Task::spawn(async move { proxy.get_events().await.unwrap().unwrap() });
926        assert_eq!(executor.run_until_stalled(&mut get_events), std::task::Poll::Pending);
927        executor.run_singlethreaded(async {
928            sender.send(Ok(SuiteEvents::case_found(1, "case1".to_string()).into())).await.unwrap();
929            sender.send(Ok(SuiteEvents::case_found(2, "case2".to_string()).into())).await.unwrap();
930        });
931        let events = executor.run_singlethreaded(get_events);
932        assert_eq!(events.len(), 2);
933        assert_eq!(
934            events[0].payload,
935            SuiteEvents::case_found(1, "case1".to_string()).into_suite_run_event().payload,
936        );
937        assert_eq!(
938            events[1].payload,
939            SuiteEvents::case_found(2, "case2".to_string()).into_suite_run_event().payload,
940        );
941    }
942}