run_test_suite_lib/
run.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::artifacts;
6use crate::cancel::{Cancelled, NamedFutureExt, OrCancel};
7use crate::connector::RunBuilderConnector;
8use crate::diagnostics::{self, LogDisplayConfiguration};
9use crate::outcome::{Outcome, RunTestSuiteError};
10use crate::output::{self, RunReporter, SuiteId, Timestamp};
11use crate::params::{RunParams, TestParams, TimeoutBehavior};
12use crate::running_suite::{run_suite_and_collect_logs, RunningSuite};
13use crate::trace::duration;
14use diagnostics_data::LogTextDisplayOptions;
15use fidl_fuchsia_test_manager::{self as ftest_manager, RunBuilderProxy};
16use fuchsia_async as fasync;
17use futures::future::Either;
18use futures::prelude::*;
19use futures::stream::FuturesUnordered;
20use futures::StreamExt;
21use log::{error, warn};
22use std::collections::HashMap;
23use std::io::Write;
24use std::path::PathBuf;
25
26// Will invoke the WithSchedulingOptions FIDL method if a client indicates
27// that they want to use experimental parallel execution.
28async fn request_scheduling_options(
29    run_params: &RunParams,
30    builder_proxy: &RunBuilderProxy,
31) -> Result<(), RunTestSuiteError> {
32    let scheduling_options = ftest_manager::SchedulingOptions {
33        max_parallel_suites: run_params.experimental_parallel_execution,
34        accumulate_debug_data: Some(run_params.accumulate_debug_data),
35        ..Default::default()
36    };
37    builder_proxy.with_scheduling_options(&scheduling_options)?;
38    Ok(())
39}
40
41struct RunState<'a> {
42    run_params: &'a RunParams,
43    final_outcome: Option<Outcome>,
44    failed_suites: u32,
45    timeout_occurred: bool,
46    cancel_occurred: bool,
47    internal_error_occurred: bool,
48}
49
50impl<'a> RunState<'a> {
51    fn new(run_params: &'a RunParams) -> Self {
52        Self {
53            run_params,
54            final_outcome: None,
55            failed_suites: 0,
56            timeout_occurred: false,
57            cancel_occurred: false,
58            internal_error_occurred: false,
59        }
60    }
61
62    fn cancel_run(&mut self, final_outcome: Outcome) {
63        self.final_outcome = Some(final_outcome);
64        self.cancel_occurred = true;
65    }
66
67    fn record_next_outcome(&mut self, next_outcome: Outcome) {
68        if next_outcome != Outcome::Passed {
69            self.failed_suites += 1;
70        }
71        match &next_outcome {
72            Outcome::Timedout => self.timeout_occurred = true,
73            Outcome::Cancelled => self.cancel_occurred = true,
74            Outcome::Error { origin } if origin.is_internal_error() => {
75                self.internal_error_occurred = true;
76            }
77            Outcome::Passed
78            | Outcome::Failed
79            | Outcome::Inconclusive
80            | Outcome::DidNotFinish
81            | Outcome::Error { .. } => (),
82        }
83
84        self.final_outcome = match (self.final_outcome.take(), next_outcome) {
85            (None, first_outcome) => Some(first_outcome),
86            (Some(outcome), Outcome::Passed) => Some(outcome),
87            (Some(_), failing_outcome) => Some(failing_outcome),
88        };
89    }
90
91    fn should_stop_run(&mut self) -> bool {
92        let stop_due_to_timeout = self.run_params.timeout_behavior
93            == TimeoutBehavior::TerminateRemaining
94            && self.timeout_occurred;
95        let stop_due_to_failures = match self.run_params.stop_after_failures.as_ref() {
96            Some(threshold) => self.failed_suites >= threshold.get(),
97            None => false,
98        };
99        stop_due_to_timeout
100            || stop_due_to_failures
101            || self.cancel_occurred
102            || self.internal_error_occurred
103    }
104
105    fn final_outcome(self) -> Outcome {
106        self.final_outcome.unwrap_or(Outcome::Passed)
107    }
108}
109
110/// Schedule and run the tests specified in |test_params|, and collect the results.
111/// Note this currently doesn't record the result or call finished() on run_reporter,
112/// the caller should do this instead.
113async fn run_test_chunk<'a, F: 'a + Future<Output = ()> + Unpin>(
114    builder_proxy: RunBuilderProxy,
115    test_chunk: Vec<(TestParams, SuiteId)>,
116    run_state: &'a mut RunState<'_>,
117    run_params: &'a RunParams,
118    run_reporter: &'a RunReporter,
119    cancel_fut: F,
120) -> Result<(), RunTestSuiteError> {
121    let mut suite_start_futs = FuturesUnordered::new();
122    let mut suite_reporters = HashMap::new();
123    for (params, suite_id) in test_chunk {
124        let timeout = params
125            .timeout_seconds
126            .map(|seconds| std::time::Duration::from_secs(seconds.get() as u64));
127
128        // If the test spec includes minimum log severity, combine that with any selectors we
129        // got from the command line.
130        let mut combined_log_interest = run_params.min_severity_logs.clone();
131        combined_log_interest.extend(params.min_severity_logs.iter().cloned());
132
133        let run_options = fidl_fuchsia_test_manager::RunOptions {
134            parallel: params.parallel,
135            arguments: Some(params.test_args),
136            run_disabled_tests: Some(params.also_run_disabled_tests),
137            case_filters_to_run: params.test_filters,
138            break_on_failure: Some(params.break_on_failure),
139            log_iterator: Some(run_params.log_protocol.unwrap_or_else(diagnostics::get_type)),
140            log_interest: Some(combined_log_interest),
141            no_exception_channel: Some(params.no_exception_channel),
142            ..Default::default()
143        };
144        let suite = run_reporter.new_suite(&params.test_url, &suite_id)?;
145        suite.set_tags(params.tags);
146        suite_reporters.insert(suite_id, suite);
147        let (suite_controller, suite_server_end) = fidl::endpoints::create_proxy();
148        let suite_and_id_fut = RunningSuite::wait_for_start(
149            suite_controller,
150            params.max_severity_logs,
151            timeout,
152            std::time::Duration::from_secs(run_params.timeout_grace_seconds as u64),
153            None,
154        )
155        .map(move |running_suite| (running_suite, suite_id));
156        suite_start_futs.push(suite_and_id_fut);
157        if let Some(realm) = params.realm.as_ref() {
158            builder_proxy.add_suite_in_realm(
159                realm.get_realm_client()?,
160                &realm.offers(),
161                realm.collection(),
162                &params.test_url,
163                &run_options,
164                suite_server_end,
165            )?;
166        } else {
167            builder_proxy.add_suite(&params.test_url, &run_options, suite_server_end)?;
168        }
169    }
170
171    request_scheduling_options(&run_params, &builder_proxy).await?;
172    let (run_controller, run_server_end) = fidl::endpoints::create_proxy();
173    let run_controller_ref = &run_controller;
174    builder_proxy.build(run_server_end)?;
175    let cancel_fut = cancel_fut.shared();
176    let cancel_fut_clone = cancel_fut.clone();
177
178    let handle_suite_fut = async move {
179        let mut stopped_prematurely = false;
180        // for now, we assume that suites are run serially.
181        loop {
182            let suite_stop_fut = cancel_fut.clone().map(|_| Outcome::Cancelled);
183
184            let (running_suite, suite_id) = match suite_start_futs
185                .next()
186                .named("suite_start")
187                .or_cancelled(suite_stop_fut)
188                .await
189            {
190                Ok(Some((running_suite, suite_id))) => (running_suite, suite_id),
191                // normal completion.
192                Ok(None) => break,
193                Err(Cancelled(final_outcome)) => {
194                    stopped_prematurely = true;
195                    run_state.cancel_run(final_outcome);
196                    break;
197                }
198            };
199
200            let suite_reporter = suite_reporters.remove(&suite_id).unwrap();
201
202            let log_display = LogDisplayConfiguration {
203                interest: run_params.min_severity_logs.clone(),
204                text_options: LogTextDisplayOptions {
205                    show_full_moniker: run_params.show_full_moniker,
206                    ..Default::default()
207                },
208            };
209
210            let result = run_suite_and_collect_logs(
211                running_suite,
212                &suite_reporter,
213                log_display,
214                cancel_fut.clone(),
215            )
216            .await;
217            let suite_outcome = result.unwrap_or_else(|err| Outcome::error(err));
218            // We should always persist results, even if something failed.
219            suite_reporter.finished()?;
220            run_state.record_next_outcome(suite_outcome);
221            if run_state.should_stop_run() {
222                stopped_prematurely = true;
223                break;
224            }
225        }
226        if stopped_prematurely {
227            // Ignore errors here since we're stopping anyway.
228            let _ = run_controller_ref.stop();
229            // Drop remaining controllers, which is the same as calling kill on
230            // each controller.
231            suite_start_futs.clear();
232            for (_id, reporter) in suite_reporters.drain() {
233                reporter.finished()?;
234            }
235        }
236        Result::<_, RunTestSuiteError>::Ok(run_state)
237    };
238
239    let handle_run_events_fut = async move {
240        duration!(c"run_events");
241        let mut artifact_tasks = vec![];
242        loop {
243            let events = run_controller_ref.get_events().named("run_event").await?;
244            if events.len() == 0 {
245                break;
246            }
247
248            for event in events.into_iter() {
249                let ftest_manager::RunEvent { payload, .. } = event;
250                match payload {
251                    // TODO(https://fxbug.dev/42172683): Add support for RunStarted and RunStopped when test_manager sends them.
252                    Some(ftest_manager::RunEventPayload::Artifact(artifact)) => {
253                        let artifact_fut = artifacts::drain_artifact(
254                            run_reporter,
255                            artifact,
256                            diagnostics::LogCollectionOptions {
257                                max_severity: None,
258                                format: LogDisplayConfiguration {
259                                    interest: run_params.min_severity_logs.clone(),
260                                    text_options: LogTextDisplayOptions {
261                                        show_full_moniker: run_params.show_full_moniker,
262                                        ..Default::default()
263                                    },
264                                },
265                            },
266                        )
267                        .await?;
268                        artifact_tasks.push(fasync::Task::spawn(artifact_fut));
269                    }
270                    e => {
271                        warn!("Discarding run event: {:?}", e);
272                    }
273                }
274            }
275        }
276        for task in artifact_tasks {
277            match task.await {
278                Err(e) => {
279                    error!("Failed to collect artifact for run: {:?}", e);
280                }
281                Ok(Some(_log_result)) => warn!("Unexpectedly got log results for the test run"),
282                Ok(None) => (),
283            }
284        }
285        Result::<_, RunTestSuiteError>::Ok(())
286    };
287
288    // Make sure we stop polling run events on cancel. Since cancellation is expected
289    // ignore cancellation errors.
290    let cancellable_run_events_fut = handle_run_events_fut
291        .boxed_local()
292        .or_cancelled(cancel_fut_clone)
293        .map(|cancelled_result| match cancelled_result {
294            Ok(completed_result) => completed_result,
295            Err(Cancelled(_)) => Ok(()),
296        });
297
298    let result =
299        match futures::future::select(handle_suite_fut.boxed_local(), cancellable_run_events_fut)
300            .await
301        {
302            Either::Left((Ok(run_state), run_events_fut)) => match run_state.should_stop_run() {
303                // in case of early termination, don't complete polling events.
304                true => Ok(()),
305                // otherwise, complete with a timeout.
306                false => {
307                    run_events_fut.await?;
308                    Ok(())
309                }
310            },
311            Either::Left((Err(e), run_events_fut)) => {
312                run_events_fut.await?;
313                Err(e.into())
314            }
315            Either::Right((result, suite_fut)) => {
316                suite_fut.await?;
317                result?;
318                Ok(())
319            }
320        };
321    result
322}
323
324async fn run_tests<'a, F: 'a + Future<Output = ()> + Unpin>(
325    connector: impl RunBuilderConnector,
326    test_params: impl Iterator<Item = TestParams>,
327    run_params: RunParams,
328    run_reporter: &'a RunReporter,
329    cancel_fut: F,
330) -> Result<Outcome, RunTestSuiteError> {
331    let mut run_state = RunState::new(&run_params);
332    let mut test_param_iter = test_params
333        .into_iter()
334        .enumerate()
335        .map(|(id, param)| (param, SuiteId(id as u32)))
336        .peekable();
337    let cancel_fut = cancel_fut.shared();
338    loop {
339        match (test_param_iter.peek().is_some(), run_state.should_stop_run()) {
340            (false, _) => return Ok(run_state.final_outcome()),
341            (true, true) => {
342                // This indicates there are suites left, but we need to terminate early.
343                // These weren't recorded at all, so we need to drain and record they weren't
344                // started.
345                for (params, suite_id) in test_param_iter {
346                    let suite_reporter = run_reporter.new_suite(&params.test_url, &suite_id)?;
347                    suite_reporter.set_tags(params.tags);
348                    suite_reporter.finished()?;
349                }
350                return Ok(run_state.final_outcome());
351            }
352            (true, false) => {
353                let builder_proxy = connector.connect().await?;
354                let next_chunk = test_param_iter.by_ref().take(connector.batch_size()).collect();
355                run_test_chunk(
356                    builder_proxy,
357                    next_chunk,
358                    &mut run_state,
359                    &run_params,
360                    run_reporter,
361                    cancel_fut.clone(),
362                )
363                .await?;
364            }
365        }
366    }
367}
368
369/// Runs tests specified in |test_params| and reports the results to
370/// |run_reporter|.
371///
372/// Options specifying how the test run is executed are passed in via |run_params|.
373/// Options specific to how a single suite is run are passed in via the entry for
374/// the suite in |test_params|.
375/// |cancel_fut| is used to gracefully stop execution of tests. Tests are
376/// terminated and recorded when the future resolves. The caller can control when the
377/// future resolves by passing in the receiver end of a `future::channel::oneshot`
378/// channel.
379pub async fn run_tests_and_get_outcome<II, EI, F>(
380    connector: impl RunBuilderConnector,
381    test_params: II,
382    run_params: RunParams,
383    run_reporter: RunReporter,
384    cancel_fut: F,
385) -> Outcome
386where
387    F: Future<Output = ()>,
388    II: IntoIterator<Item = TestParams, IntoIter = EI>,
389    EI: Iterator<Item = TestParams> + ExactSizeIterator,
390{
391    let params_iter = test_params.into_iter();
392    match run_reporter.started(Timestamp::Unknown) {
393        Ok(()) => (),
394        Err(e) => return Outcome::error(e),
395    }
396    run_reporter.set_expected_suites(params_iter.len() as u32);
397    let test_outcome = match run_tests(
398        connector,
399        params_iter,
400        run_params,
401        &run_reporter,
402        cancel_fut.boxed_local(),
403    )
404    .await
405    {
406        Ok(s) => s,
407        Err(e) => {
408            return Outcome::error(e);
409        }
410    };
411
412    let report_result = match run_reporter.stopped(&test_outcome.clone().into(), Timestamp::Unknown)
413    {
414        Ok(()) => run_reporter.finished(),
415        Err(e) => Err(e),
416    };
417    if let Err(e) = report_result {
418        warn!("Failed to record results: {:?}", e);
419    }
420
421    test_outcome
422}
423
424pub struct DirectoryReporterOptions {
425    /// Root path of the directory.
426    pub root_path: PathBuf,
427}
428
429/// Create a reporter for use with |run_tests_and_get_outcome|.
430pub fn create_reporter<W: 'static + Write + Send + Sync>(
431    filter_ansi: bool,
432    dir: Option<DirectoryReporterOptions>,
433    writer: W,
434) -> Result<output::RunReporter, anyhow::Error> {
435    let stdout_reporter = output::ShellReporter::new(writer);
436    let dir_reporter = dir
437        .map(|dir| {
438            output::DirectoryWithStdoutReporter::new(dir.root_path, output::SchemaVersion::V1)
439        })
440        .transpose()?;
441    let reporter = match (dir_reporter, filter_ansi) {
442        (Some(dir_reporter), false) => output::RunReporter::new(output::MultiplexedReporter::new(
443            stdout_reporter,
444            dir_reporter,
445        )),
446        (Some(dir_reporter), true) => output::RunReporter::new_ansi_filtered(
447            output::MultiplexedReporter::new(stdout_reporter, dir_reporter),
448        ),
449        (None, false) => output::RunReporter::new(stdout_reporter),
450        (None, true) => output::RunReporter::new_ansi_filtered(stdout_reporter),
451    };
452    Ok(reporter)
453}
454
455#[cfg(test)]
456mod test {
457    use super::*;
458    use crate::connector::SingleRunConnector;
459    use crate::output::{EntityId, InMemoryReporter};
460    use assert_matches::assert_matches;
461    use fidl::endpoints::create_proxy_and_stream;
462    use futures::future::join;
463    use maplit::hashmap;
464    #[cfg(target_os = "fuchsia")]
465    use {
466        fidl::endpoints::Proxy as _,
467        fidl_fuchsia_io as fio,
468        futures::future::join3,
469        vfs::{file::vmo::read_only, pseudo_directory},
470        zx,
471    };
472
473    // TODO(https://fxbug.dev/42180532): add unit tests for suite artifacts too.
474
475    async fn fake_running_all_suites_and_return_run_events(
476        mut stream: ftest_manager::RunBuilderRequestStream,
477        mut suite_events: HashMap<&str, Vec<ftest_manager::SuiteEvent>>,
478        run_events: Vec<ftest_manager::RunEvent>,
479    ) {
480        let mut suite_streams = vec![];
481
482        let mut run_controller = None;
483        while let Ok(Some(req)) = stream.try_next().await {
484            match req {
485                ftest_manager::RunBuilderRequest::AddSuite { test_url, controller, .. } => {
486                    let events = suite_events
487                        .remove(test_url.as_str())
488                        .expect("Got a request for an unexpected test URL");
489                    suite_streams.push((controller.into_stream(), events));
490                }
491                ftest_manager::RunBuilderRequest::Build { controller, .. } => {
492                    run_controller = Some(controller);
493                    break;
494                }
495                ftest_manager::RunBuilderRequest::WithSchedulingOptions { options, .. } => {
496                    if let Some(_) = options.max_parallel_suites {
497                        panic!("Not expecting calls to WithSchedulingOptions where options.max_parallel_suites is Some()")
498                    }
499                }
500                ftest_manager::RunBuilderRequest::AddSuiteInRealm { .. } => {
501                    panic!("AddSuiteInRealm not supported")
502                }
503                ftest_manager::RunBuilderRequest::_UnknownMethod { ordinal, .. } => {
504                    panic!("Not expecting unknown request: {}", ordinal)
505                }
506            }
507        }
508        assert!(
509            run_controller.is_some(),
510            "Expected a RunController to be present. RunBuilder/Build() may not have been called."
511        );
512        assert!(suite_events.is_empty(), "Expected AddSuite to be called for all specified suites");
513        let mut run_stream = run_controller.expect("controller present").into_stream();
514
515        // Each suite just reports that it started and passed.
516        let mut suite_streams = suite_streams
517            .into_iter()
518            .map(|(mut stream, events)| {
519                async move {
520                    let mut maybe_events = Some(events);
521                    while let Ok(Some(req)) = stream.try_next().await {
522                        match req {
523                            ftest_manager::SuiteControllerRequest::GetEvents {
524                                responder, ..
525                            } => {
526                                let send_events = maybe_events.take().unwrap_or(vec![]);
527                                let _ = responder.send(Ok(send_events));
528                            }
529                            _ => {
530                                // ignore all other requests
531                            }
532                        }
533                    }
534                }
535            })
536            .collect::<FuturesUnordered<_>>();
537
538        let suite_drain_fut = async move { while let Some(_) = suite_streams.next().await {} };
539
540        let run_fut = async move {
541            let mut events = Some(run_events);
542            while let Ok(Some(req)) = run_stream.try_next().await {
543                match req {
544                    ftest_manager::RunControllerRequest::GetEvents { responder, .. } => {
545                        if events.is_none() {
546                            let _ = responder.send(vec![]);
547                            continue;
548                        }
549                        let events = events.take().unwrap();
550                        let _ = responder.send(events);
551                    }
552                    _ => {
553                        // ignore all other requests
554                    }
555                }
556            }
557        };
558
559        join(suite_drain_fut, run_fut).await;
560    }
561
562    struct ParamsForRunTests {
563        builder_proxy: ftest_manager::RunBuilderProxy,
564        test_params: Vec<TestParams>,
565        run_reporter: RunReporter,
566    }
567
568    fn create_empty_suite_events() -> Vec<ftest_manager::SuiteEvent> {
569        vec![
570            ftest_manager::SuiteEvent {
571                timestamp: Some(1000),
572                payload: Some(ftest_manager::SuiteEventPayload::SuiteStarted(
573                    ftest_manager::SuiteStarted,
574                )),
575                ..Default::default()
576            },
577            ftest_manager::SuiteEvent {
578                timestamp: Some(2000),
579                payload: Some(ftest_manager::SuiteEventPayload::SuiteStopped(
580                    ftest_manager::SuiteStopped { status: ftest_manager::SuiteStatus::Passed },
581                )),
582                ..Default::default()
583            },
584        ]
585    }
586
587    async fn call_run_tests(params: ParamsForRunTests) -> Outcome {
588        run_tests_and_get_outcome(
589            SingleRunConnector::new(params.builder_proxy),
590            params.test_params,
591            RunParams {
592                timeout_behavior: TimeoutBehavior::Continue,
593                timeout_grace_seconds: 0,
594                stop_after_failures: None,
595                experimental_parallel_execution: None,
596                accumulate_debug_data: false,
597                log_protocol: None,
598                min_severity_logs: vec![],
599                show_full_moniker: false,
600            },
601            params.run_reporter,
602            futures::future::pending(),
603        )
604        .await
605    }
606
607    #[fuchsia::test]
608    async fn empty_run_no_events() {
609        let (builder_proxy, _run_builder_stream) =
610            create_proxy_and_stream::<ftest_manager::RunBuilderMarker>();
611
612        let reporter = InMemoryReporter::new();
613        let run_reporter = RunReporter::new(reporter.clone());
614        let run_fut =
615            call_run_tests(ParamsForRunTests { builder_proxy, test_params: vec![], run_reporter });
616
617        // This should pass without ever calling test manager.
618        assert_eq!(run_fut.await, Outcome::Passed);
619
620        let reports = reporter.get_reports();
621        assert_eq!(1usize, reports.len());
622        assert_eq!(reports[0].id, EntityId::TestRun);
623    }
624
625    #[fuchsia::test]
626    async fn single_run_no_events() {
627        let (builder_proxy, run_builder_stream) =
628            create_proxy_and_stream::<ftest_manager::RunBuilderMarker>();
629
630        let reporter = InMemoryReporter::new();
631        let run_reporter = RunReporter::new(reporter.clone());
632        let run_fut = call_run_tests(ParamsForRunTests {
633            builder_proxy,
634            test_params: vec![TestParams {
635                test_url: "fuchsia-pkg://fuchsia.com/nothing#meta/nothing.cm".to_string(),
636                ..TestParams::default()
637            }],
638            run_reporter,
639        });
640        let fake_fut = fake_running_all_suites_and_return_run_events(
641            run_builder_stream,
642            hashmap! {
643                "fuchsia-pkg://fuchsia.com/nothing#meta/nothing.cm" => create_empty_suite_events()
644            },
645            vec![],
646        );
647
648        assert_eq!(join(run_fut, fake_fut).await.0, Outcome::Passed,);
649
650        let reports = reporter.get_reports();
651        assert_eq!(2usize, reports.len());
652        assert!(reports[0].report.artifacts.is_empty());
653        assert!(reports[0].report.directories.is_empty());
654        assert!(reports[1].report.artifacts.is_empty());
655        assert!(reports[1].report.directories.is_empty());
656    }
657
658    #[cfg(target_os = "fuchsia")]
659    #[fuchsia::test]
660    async fn single_run_custom_directory() {
661        let (builder_proxy, run_builder_stream) =
662            create_proxy_and_stream::<ftest_manager::RunBuilderMarker>();
663
664        let reporter = InMemoryReporter::new();
665        let run_reporter = RunReporter::new(reporter.clone());
666        let run_fut = call_run_tests(ParamsForRunTests {
667            builder_proxy,
668            test_params: vec![TestParams {
669                test_url: "fuchsia-pkg://fuchsia.com/nothing#meta/nothing.cm".to_string(),
670                ..TestParams::default()
671            }],
672            run_reporter,
673        });
674
675        let dir = pseudo_directory! {
676            "test_file.txt" => read_only("Hello, World!"),
677        };
678
679        let directory_proxy = vfs::directory::serve(dir, fio::PERM_READABLE | fio::PERM_WRITABLE);
680        let directory_client =
681            fidl::endpoints::ClientEnd::new(directory_proxy.into_channel().unwrap().into());
682
683        let (_pair_1, pair_2) = zx::EventPair::create();
684
685        let events = vec![ftest_manager::RunEvent {
686            payload: Some(ftest_manager::RunEventPayload::Artifact(
687                ftest_manager::Artifact::Custom(ftest_manager::CustomArtifact {
688                    directory_and_token: Some(ftest_manager::DirectoryAndToken {
689                        directory: directory_client,
690                        token: pair_2,
691                    }),
692                    ..Default::default()
693                }),
694            )),
695            ..Default::default()
696        }];
697
698        let fake_fut = fake_running_all_suites_and_return_run_events(
699            run_builder_stream,
700            hashmap! {
701                "fuchsia-pkg://fuchsia.com/nothing#meta/nothing.cm" => create_empty_suite_events()
702            },
703            events,
704        );
705
706        assert_eq!(join(run_fut, fake_fut).await.0, Outcome::Passed,);
707
708        let reports = reporter.get_reports();
709        assert_eq!(2usize, reports.len());
710        let run = reports.iter().find(|e| e.id == EntityId::TestRun).expect("find run report");
711        assert_eq!(1usize, run.report.directories.len());
712        let dir = &run.report.directories[0];
713        let files = dir.1.files.lock();
714        assert_eq!(1usize, files.len());
715        let (name, file) = &files[0];
716        assert_eq!(name.to_string_lossy(), "test_file.txt".to_string());
717        assert_eq!(file.get_contents(), b"Hello, World!");
718    }
719
720    #[fuchsia::test]
721    async fn record_output_after_internal_error() {
722        let (builder_proxy, run_builder_stream) =
723            create_proxy_and_stream::<ftest_manager::RunBuilderMarker>();
724
725        let reporter = InMemoryReporter::new();
726        let run_reporter = RunReporter::new(reporter.clone());
727        let run_fut = call_run_tests(ParamsForRunTests {
728            builder_proxy,
729            test_params: vec![
730                TestParams {
731                    test_url: "fuchsia-pkg://fuchsia.com/invalid#meta/invalid.cm".to_string(),
732                    ..TestParams::default()
733                },
734                TestParams {
735                    test_url: "fuchsia-pkg://fuchsia.com/nothing#meta/nothing.cm".to_string(),
736                    ..TestParams::default()
737                },
738            ],
739            run_reporter,
740        });
741
742        let fake_fut = fake_running_all_suites_and_return_run_events(
743            run_builder_stream,
744            hashmap! {
745                // return an internal error from the first test.
746                "fuchsia-pkg://fuchsia.com/invalid#meta/invalid.cm" => vec![
747                    ftest_manager::SuiteEvent {
748                        timestamp: Some(1000),
749                        payload: Some(
750                            ftest_manager::SuiteEventPayload::SuiteStarted(
751                                ftest_manager::SuiteStarted,
752                            ),
753                        ),
754                        ..Default::default()
755                    },
756                    ftest_manager::SuiteEvent {
757                        timestamp: Some(2000),
758                        payload: Some(ftest_manager::SuiteEventPayload::SuiteStopped(
759                            ftest_manager::SuiteStopped { status: ftest_manager::SuiteStatus::InternalError },
760                        )),
761                        ..Default::default()
762                    },
763                ],
764                "fuchsia-pkg://fuchsia.com/nothing#meta/nothing.cm" => create_empty_suite_events()
765            },
766            vec![],
767        );
768
769        assert_matches!(join(run_fut, fake_fut).await.0, Outcome::Error { .. });
770
771        let reports = reporter.get_reports();
772        assert_eq!(3usize, reports.len());
773        let invalid_suite = reports
774            .iter()
775            .find(|e| e.report.name == "fuchsia-pkg://fuchsia.com/invalid#meta/invalid.cm")
776            .expect("find run report");
777        assert_eq!(invalid_suite.report.outcome, Some(output::ReportedOutcome::Error));
778        assert!(invalid_suite.report.is_finished);
779
780        // The valid suite should not have been started, but finish should've been called so that
781        // results get persisted.
782        let not_started = reports
783            .iter()
784            .find(|e| e.report.name == "fuchsia-pkg://fuchsia.com/nothing#meta/nothing.cm")
785            .expect("find run report");
786        assert!(not_started.report.outcome.is_none());
787        assert!(not_started.report.is_finished);
788
789        // The results for the run should also be saved.
790        let run = reports.iter().find(|e| e.id == EntityId::TestRun).expect("find run report");
791        assert_eq!(run.report.outcome, Some(output::ReportedOutcome::Error));
792        assert!(run.report.is_finished);
793        assert!(run.report.started_time.is_some());
794    }
795
796    #[cfg(target_os = "fuchsia")]
797    #[fuchsia::test]
798    async fn single_run_debug_data() {
799        let (builder_proxy, run_builder_stream) =
800            create_proxy_and_stream::<ftest_manager::RunBuilderMarker>();
801
802        let reporter = InMemoryReporter::new();
803        let run_reporter = RunReporter::new(reporter.clone());
804        let run_fut = call_run_tests(ParamsForRunTests {
805            builder_proxy,
806            test_params: vec![TestParams {
807                test_url: "fuchsia-pkg://fuchsia.com/nothing#meta/nothing.cm".to_string(),
808                ..TestParams::default()
809            }],
810            run_reporter,
811        });
812
813        let (debug_client, debug_service) =
814            fidl::endpoints::create_endpoints::<ftest_manager::DebugDataIteratorMarker>();
815        let debug_data_fut = async move {
816            let (client, server) = zx::Socket::create_stream();
817            let mut compressor = zstd::bulk::Compressor::new(0).unwrap();
818            let bytes = compressor.compress(b"Not a real profile").unwrap();
819            let _ = server.write(bytes.as_slice()).unwrap();
820            let mut service = debug_service.into_stream();
821            let mut data = vec![ftest_manager::DebugData {
822                name: Some("test_file.profraw".to_string()),
823                socket: Some(client.into()),
824                ..Default::default()
825            }];
826            drop(server);
827            while let Ok(Some(request)) = service.try_next().await {
828                match request {
829                    ftest_manager::DebugDataIteratorRequest::GetNext { .. } => {
830                        panic!("Not Implemented");
831                    }
832                    ftest_manager::DebugDataIteratorRequest::GetNextCompressed {
833                        responder,
834                        ..
835                    } => {
836                        let _ = responder.send(std::mem::take(&mut data));
837                    }
838                }
839            }
840        };
841        let events = vec![ftest_manager::RunEvent {
842            payload: Some(ftest_manager::RunEventPayload::Artifact(
843                ftest_manager::Artifact::DebugData(debug_client),
844            )),
845            ..Default::default()
846        }];
847
848        let fake_fut = fake_running_all_suites_and_return_run_events(
849            run_builder_stream,
850            hashmap! {
851
852                "fuchsia-pkg://fuchsia.com/nothing#meta/nothing.cm" => create_empty_suite_events(),
853            },
854            events,
855        );
856
857        assert_eq!(join3(run_fut, debug_data_fut, fake_fut).await.0, Outcome::Passed);
858
859        let reports = reporter.get_reports();
860        assert_eq!(2usize, reports.len());
861        let run = reports.iter().find(|e| e.id == EntityId::TestRun).expect("find run report");
862        assert_eq!(1usize, run.report.directories.len());
863        let dir = &run.report.directories[0];
864        let files = dir.1.files.lock();
865        assert_eq!(1usize, files.len());
866        let (name, file) = &files[0];
867        assert_eq!(name.to_string_lossy(), "test_file.profraw".to_string());
868        assert_eq!(file.get_contents(), b"Not a real profile");
869    }
870
871    async fn fake_parallel_options_server(
872        mut stream: ftest_manager::RunBuilderRequestStream,
873    ) -> Option<ftest_manager::SchedulingOptions> {
874        let mut scheduling_options = None;
875        if let Ok(Some(req)) = stream.try_next().await {
876            match req {
877                ftest_manager::RunBuilderRequest::AddSuite { .. } => {
878                    panic!("Not expecting an AddSuite request")
879                }
880                ftest_manager::RunBuilderRequest::AddSuiteInRealm { .. } => {
881                    panic!("Not expecting an AddSuiteInRealm request")
882                }
883                ftest_manager::RunBuilderRequest::Build { .. } => {
884                    panic!("Not expecting a Build request")
885                }
886                ftest_manager::RunBuilderRequest::WithSchedulingOptions { options, .. } => {
887                    scheduling_options = Some(options);
888                }
889                ftest_manager::RunBuilderRequest::_UnknownMethod { ordinal, .. } => {
890                    panic!("Not expecting unknown request: {}", ordinal)
891                }
892            }
893        }
894        scheduling_options
895    }
896
897    #[fuchsia::test]
898    async fn request_scheduling_options_test_parallel() {
899        let max_parallel_suites: u16 = 10;
900        let expected_max_parallel_suites = Some(max_parallel_suites);
901
902        let (builder_proxy, run_builder_stream) =
903            create_proxy_and_stream::<ftest_manager::RunBuilderMarker>();
904
905        let run_params = RunParams {
906            timeout_behavior: TimeoutBehavior::Continue,
907            timeout_grace_seconds: 0,
908            stop_after_failures: None,
909            experimental_parallel_execution: Some(max_parallel_suites),
910            accumulate_debug_data: false,
911            log_protocol: None,
912            min_severity_logs: vec![],
913            show_full_moniker: false,
914        };
915
916        let request_parallel_fut = request_scheduling_options(&run_params, &builder_proxy);
917        let fake_server_fut = fake_parallel_options_server(run_builder_stream);
918
919        let returned_options = join(request_parallel_fut, fake_server_fut).await.1;
920        let max_parallel_suites_received = match returned_options {
921            Some(scheduling_options) => scheduling_options.max_parallel_suites,
922            None => panic!("Expected scheduling options."),
923        };
924        assert_eq!(max_parallel_suites_received, expected_max_parallel_suites);
925    }
926
927    #[fuchsia::test]
928    async fn request_scheduling_options_test_serial() {
929        let expected_max_parallel_suites = None;
930
931        let (builder_proxy, run_builder_stream) =
932            create_proxy_and_stream::<ftest_manager::RunBuilderMarker>();
933
934        let run_params = RunParams {
935            timeout_behavior: TimeoutBehavior::Continue,
936            timeout_grace_seconds: 0,
937            stop_after_failures: None,
938            experimental_parallel_execution: None,
939            accumulate_debug_data: false,
940            log_protocol: None,
941            min_severity_logs: vec![],
942            show_full_moniker: false,
943        };
944
945        let request_parallel_fut = request_scheduling_options(&run_params, &builder_proxy);
946        let fake_server_fut = fake_parallel_options_server(run_builder_stream);
947
948        let returned_options = join(request_parallel_fut, fake_server_fut)
949            .await
950            .1
951            .expect("Expected scheduling options.");
952        let max_parallel_suites_received = returned_options.max_parallel_suites;
953        assert_eq!(max_parallel_suites_received, expected_max_parallel_suites);
954    }
955}