run_test_suite_lib/
running_suite.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::cancel::{Cancelled, NamedFutureExt, OrCancel};
6use crate::outcome::{Lifecycle, Outcome, RunTestSuiteError, UnexpectedEventError};
7use crate::output::{self, ArtifactType, CaseId, SuiteReporter, Timestamp};
8use crate::stream_util::StreamUtil;
9use crate::trace::duration;
10use crate::{artifacts, diagnostics};
11use diagnostics_data::Severity;
12use fidl_fuchsia_test_manager::{
13    self as ftest_manager, LaunchError, SuiteArtifactGeneratedEventDetails,
14    SuiteStoppedEventDetails, TestCaseArtifactGeneratedEventDetails, TestCaseFinishedEventDetails,
15    TestCaseFoundEventDetails, TestCaseStartedEventDetails, TestCaseStoppedEventDetails,
16};
17use fuchsia_async as fasync;
18use futures::future::Either;
19use futures::prelude::*;
20use futures::stream::FuturesUnordered;
21use futures::StreamExt;
22use log::{error, info, warn};
23use std::collections::HashMap;
24use std::io::Write;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::Arc;
27
28/// Struct used by |run_suite_and_collect_logs| to track the state of test cases and suites.
29struct CollectedEntityState<R> {
30    reporter: R,
31    name: String,
32    lifecycle: Lifecycle,
33    artifact_tasks:
34        Vec<fasync::Task<Result<Option<diagnostics::LogCollectionOutcome>, anyhow::Error>>>,
35}
36
37/// Collects results and artifacts for a single suite.
38// TODO(satsukiu): There's two ways to return an error here:
39// * Err(RunTestSuiteError)
40// * Ok(Outcome::Error(RunTestSuiteError))
41// We should consider how to consolidate these.
42pub(crate) async fn run_suite_and_collect_logs<F: Future<Output = ()> + Unpin>(
43    running_suite: RunningSuite,
44    suite_reporter: &SuiteReporter<'_>,
45    log_display: diagnostics::LogDisplayConfiguration,
46    cancel_fut: F,
47) -> Result<Outcome, RunTestSuiteError> {
48    duration!(c"collect_suite");
49
50    let RunningSuite {
51        mut event_stream,
52        stopper,
53        timeout,
54        timeout_grace,
55        max_severity_logs,
56        no_cases_equals_success,
57        ..
58    } = running_suite;
59
60    let log_opts =
61        diagnostics::LogCollectionOptions { format: log_display, max_severity: max_severity_logs };
62
63    let mut test_cases: HashMap<u32, CollectedEntityState<_>> = HashMap::new();
64    let mut suite_state = CollectedEntityState {
65        reporter: suite_reporter,
66        name: "".to_string(),
67        lifecycle: Lifecycle::Found,
68        artifact_tasks: vec![],
69    };
70    let mut suite_finish_timestamp = Timestamp::Unknown;
71    let mut outcome = Outcome::Passed;
72
73    let collect_results_fut = async {
74        while let Some(event_result) = event_stream.next().named("next_event").await {
75            match event_result {
76                Err(e) => match (e, no_cases_equals_success) {
77                    (RunTestSuiteError::Launch(LaunchError::NoMatchingCases), Some(true)) => {
78                        suite_state.lifecycle = Lifecycle::Stopped;
79                    }
80                    (e, _) => {
81                        suite_state
82                            .reporter
83                            .stopped(&output::ReportedOutcome::Error, Timestamp::Unknown)?;
84                        return Err(e);
85                    }
86                },
87                Ok(event) => {
88                    let timestamp = Timestamp::from_nanos(event.timestamp);
89                    match event.details.expect("event cannot be None") {
90                        ftest_manager::EventDetails::TestCaseFound(TestCaseFoundEventDetails {
91                            test_case_name: Some(test_case_name),
92                            test_case_id: Some(test_case_id),
93                            ..
94                        }) => {
95                            if test_cases.contains_key(&test_case_id) {
96                                return Err(UnexpectedEventError::InvalidCaseEvent {
97                                    last_state: Lifecycle::Found,
98                                    next_state: Lifecycle::Found,
99                                    test_case_name,
100                                    test_case_id,
101                                }
102                                .into());
103                            }
104                            test_cases.insert(
105                                test_case_id,
106                                CollectedEntityState {
107                                    reporter: suite_reporter
108                                        .new_case(&test_case_name, &CaseId(test_case_id))?,
109                                    name: test_case_name,
110                                    lifecycle: Lifecycle::Found,
111                                    artifact_tasks: vec![],
112                                },
113                            );
114                        }
115                        ftest_manager::EventDetails::TestCaseStarted(
116                            TestCaseStartedEventDetails {
117                                test_case_id: Some(test_case_id), ..
118                            },
119                        ) => {
120                            let entry = test_cases.get_mut(&test_case_id).ok_or(
121                                UnexpectedEventError::CaseEventButNotFound {
122                                    next_state: Lifecycle::Started,
123                                    test_case_id,
124                                },
125                            )?;
126                            match &entry.lifecycle {
127                                Lifecycle::Found => {
128                                    // TODO(https://fxbug.dev/42159975): Record per-case runtime once we have an
129                                    // accurate way to measure it.
130                                    entry.reporter.started(Timestamp::Unknown)?;
131                                    entry.lifecycle = Lifecycle::Started;
132                                }
133                                other => {
134                                    return Err(UnexpectedEventError::InvalidCaseEvent {
135                                        last_state: *other,
136                                        next_state: Lifecycle::Started,
137                                        test_case_name: entry.name.clone(),
138                                        test_case_id,
139                                    }
140                                    .into());
141                                }
142                            }
143                        }
144                        ftest_manager::EventDetails::TestCaseArtifactGenerated(
145                            TestCaseArtifactGeneratedEventDetails {
146                                test_case_id: Some(test_case_id),
147                                artifact: Some(artifact),
148                                ..
149                            },
150                        ) => {
151                            let entry = test_cases.get_mut(&test_case_id).ok_or(
152                                UnexpectedEventError::CaseArtifactButNotFound { test_case_id },
153                            )?;
154                            if matches!(entry.lifecycle, Lifecycle::Finished) {
155                                return Err(UnexpectedEventError::CaseArtifactButFinished {
156                                    test_case_id,
157                                }
158                                .into());
159                            }
160                            let artifact_fut = artifacts::drain_artifact(
161                                &entry.reporter,
162                                artifact,
163                                log_opts.clone(),
164                            )
165                            .await?;
166                            entry.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
167                        }
168                        ftest_manager::EventDetails::TestCaseStopped(
169                            TestCaseStoppedEventDetails {
170                                test_case_id: Some(test_case_id),
171                                result: Some(result),
172                                ..
173                            },
174                        ) => {
175                            let entry = test_cases.get_mut(&test_case_id).ok_or(
176                                UnexpectedEventError::CaseEventButNotFound {
177                                    next_state: Lifecycle::Stopped,
178                                    test_case_id,
179                                },
180                            )?;
181                            match &entry.lifecycle {
182                                Lifecycle::Started => {
183                                    // TODO(https://fxbug.dev/42159975): Record per-case runtime once we have an
184                                    // accurate way to measure it.
185                                    entry.reporter.stopped(&result.into(), Timestamp::Unknown)?;
186                                    entry.lifecycle = Lifecycle::Stopped;
187                                }
188                                other => {
189                                    return Err(UnexpectedEventError::InvalidCaseEvent {
190                                        last_state: *other,
191                                        next_state: Lifecycle::Stopped,
192                                        test_case_name: entry.name.clone(),
193                                        test_case_id,
194                                    }
195                                    .into());
196                                }
197                            }
198                        }
199                        ftest_manager::EventDetails::TestCaseFinished(
200                            TestCaseFinishedEventDetails {
201                                test_case_id: Some(test_case_id), ..
202                            },
203                        ) => {
204                            let entry = test_cases.get_mut(&test_case_id).ok_or(
205                                UnexpectedEventError::CaseEventButNotFound {
206                                    next_state: Lifecycle::Finished,
207                                    test_case_id,
208                                },
209                            )?;
210                            match &entry.lifecycle {
211                                Lifecycle::Stopped => {
212                                    // don't mark reporter finished yet, we want to finish draining
213                                    // artifacts separately.
214                                    entry.lifecycle = Lifecycle::Finished;
215                                }
216                                other => {
217                                    return Err(UnexpectedEventError::InvalidCaseEvent {
218                                        last_state: *other,
219                                        next_state: Lifecycle::Finished,
220                                        test_case_name: entry.name.clone(),
221                                        test_case_id,
222                                    }
223                                    .into());
224                                }
225                            }
226                        }
227                        ftest_manager::EventDetails::SuiteArtifactGenerated(
228                            SuiteArtifactGeneratedEventDetails { artifact: Some(artifact), .. },
229                        ) => {
230                            let artifact_fut = artifacts::drain_artifact(
231                                suite_reporter,
232                                artifact,
233                                log_opts.clone(),
234                            )
235                            .await?;
236                            suite_state.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
237                        }
238                        ftest_manager::EventDetails::SuiteStarted(_) => {
239                            match &suite_state.lifecycle {
240                                Lifecycle::Found => {
241                                    suite_state.reporter.started(timestamp)?;
242                                    suite_state.lifecycle = Lifecycle::Started;
243                                }
244                                other => {
245                                    return Err(UnexpectedEventError::InvalidEvent {
246                                        last_state: *other,
247                                        next_state: Lifecycle::Started,
248                                    }
249                                    .into());
250                                }
251                            }
252                        }
253                        ftest_manager::EventDetails::SuiteStopped(SuiteStoppedEventDetails {
254                            result: Some(result),
255                            ..
256                        }) => match &suite_state.lifecycle {
257                            Lifecycle::Started => {
258                                suite_state.lifecycle = Lifecycle::Stopped;
259                                suite_finish_timestamp = timestamp;
260                                outcome = match result {
261                                    ftest_manager::SuiteResult::Finished => Outcome::Passed,
262                                    ftest_manager::SuiteResult::Failed => Outcome::Failed,
263                                    ftest_manager::SuiteResult::DidNotFinish => {
264                                        Outcome::Inconclusive
265                                    }
266                                    ftest_manager::SuiteResult::TimedOut
267                                    | ftest_manager::SuiteResult::Stopped => Outcome::Timedout,
268                                    ftest_manager::SuiteResult::InternalError => Outcome::error(
269                                        UnexpectedEventError::InternalErrorSuiteResult,
270                                    ),
271                                    r => {
272                                        return Err(
273                                            UnexpectedEventError::UnrecognizedSuiteResult {
274                                                result: r,
275                                            }
276                                            .into(),
277                                        );
278                                    }
279                                };
280                            }
281                            other => {
282                                return Err(UnexpectedEventError::InvalidEvent {
283                                    last_state: *other,
284                                    next_state: Lifecycle::Stopped,
285                                }
286                                .into());
287                            }
288                        },
289                        ftest_manager::EventDetailsUnknown!() => {
290                            warn!("Encountered unrecognized suite event");
291                        }
292                    }
293                }
294            }
295        }
296        drop(event_stream); // Explicit drop here to force ownership move.
297        Ok(())
298    }
299    .boxed_local();
300
301    let start_time = std::time::Instant::now();
302    let (stop_timeout_future, kill_timeout_future) = match timeout {
303        None => {
304            (futures::future::pending::<()>().boxed(), futures::future::pending::<()>().boxed())
305        }
306        Some(duration) => (
307            fasync::Timer::new(start_time + duration).boxed(),
308            fasync::Timer::new(start_time + duration + timeout_grace).boxed(),
309        ),
310    };
311
312    // This polls event collection and calling SuiteController::Stop on timeout simultaneously.
313    let collect_or_stop_fut = async move {
314        match futures::future::select(stop_timeout_future, collect_results_fut).await {
315            Either::Left((_stop_done, collect_fut)) => {
316                stopper.stop();
317                collect_fut.await
318            }
319            Either::Right((result, _)) => result,
320        }
321    };
322
323    // If kill timeout or cancel occur, we want to stop polling events.
324    // kill_fut resolves to the outcome to which results should be overwritten
325    // if it resolves.
326    let kill_fut = async move {
327        match futures::future::select(cancel_fut, kill_timeout_future).await {
328            Either::Left(_) => Outcome::Cancelled,
329            Either::Right(_) => Outcome::Timedout,
330        }
331    }
332    .shared();
333
334    let early_termination_outcome =
335        match collect_or_stop_fut.boxed_local().or_cancelled(kill_fut.clone()).await {
336            Ok(Ok(())) => None,
337            Ok(Err(e)) => return Err(e),
338            Err(Cancelled(outcome)) => Some(outcome),
339        };
340
341    // Finish collecting artifacts and report errors.
342    info!("Awaiting case artifacts");
343    let mut unfinished_test_case_names = vec![];
344    for (_, test_case) in test_cases.into_iter() {
345        let CollectedEntityState { reporter, name, lifecycle, artifact_tasks } = test_case;
346        match (lifecycle, early_termination_outcome.clone()) {
347            (Lifecycle::Started | Lifecycle::Found, Some(early)) => {
348                reporter.stopped(&early.into(), Timestamp::Unknown)?;
349            }
350            (Lifecycle::Found, None) => {
351                unfinished_test_case_names.push(name.clone());
352                reporter.stopped(&Outcome::Inconclusive.into(), Timestamp::Unknown)?;
353            }
354            (Lifecycle::Started, None) => {
355                unfinished_test_case_names.push(name.clone());
356                reporter.stopped(&Outcome::DidNotFinish.into(), Timestamp::Unknown)?;
357            }
358            (Lifecycle::Stopped | Lifecycle::Finished, _) => (),
359        }
360
361        let finish_artifacts_fut = FuturesUnordered::from_iter(artifact_tasks)
362            .map(|result| match result {
363                Err(e) => {
364                    error!("Failed to collect artifact for {}: {:?}", name, e);
365                }
366                Ok(Some(_log_result)) => warn!("Unexpectedly got log results for a test case"),
367                Ok(None) => (),
368            })
369            .collect::<()>();
370        if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut.clone()).await {
371            warn!("Stopped polling artifacts for {} due to timeout", name);
372        }
373
374        reporter.finished()?;
375    }
376    if !unfinished_test_case_names.is_empty() {
377        outcome = Outcome::error(UnexpectedEventError::CasesDidNotFinish {
378            cases: unfinished_test_case_names,
379        });
380    }
381
382    match (suite_state.lifecycle, early_termination_outcome) {
383        (Lifecycle::Found | Lifecycle::Started, Some(early)) => {
384            if matches!(&outcome, Outcome::Passed | Outcome::Failed) {
385                outcome = early;
386            }
387        }
388        (Lifecycle::Found | Lifecycle::Started, None) => {
389            outcome = Outcome::error(UnexpectedEventError::SuiteDidNotReportStop);
390        }
391        // If the suite successfully reported a result, don't alter it.
392        (Lifecycle::Stopped, _) => (),
393        // Finished doesn't happen since there's no SuiteFinished event.
394        (Lifecycle::Finished, _) => unreachable!(),
395    }
396
397    let restricted_logs_present = AtomicBool::new(false);
398    let finish_artifacts_fut = FuturesUnordered::from_iter(suite_state.artifact_tasks)
399        .then(|result| async {
400            match result {
401                Err(e) => {
402                    error!("Failed to collect artifact for suite: {:?}", e);
403                }
404                Ok(Some(log_result)) => match log_result {
405                    diagnostics::LogCollectionOutcome::Error { restricted_logs } => {
406                        restricted_logs_present.store(true, Ordering::Relaxed);
407                        let mut log_artifact = match suite_reporter
408                            .new_artifact(&ArtifactType::RestrictedLog)
409                        {
410                            Ok(artifact) => artifact,
411                            Err(e) => {
412                                warn!("Error creating artifact to report restricted logs: {:?}", e);
413                                return;
414                            }
415                        };
416                        for log in restricted_logs.iter() {
417                            if let Err(e) = writeln!(log_artifact, "{}", log) {
418                                warn!("Error recording restricted logs: {:?}", e);
419                                return;
420                            }
421                        }
422                    }
423                    diagnostics::LogCollectionOutcome::Passed => (),
424                },
425                Ok(None) => (),
426            }
427        })
428        .collect::<()>();
429    if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut).await {
430        warn!("Stopped polling artifacts due to timeout");
431    }
432    if restricted_logs_present.into_inner() && matches!(outcome, Outcome::Passed) {
433        outcome = Outcome::Failed;
434    }
435
436    suite_reporter.stopped(&outcome.clone().into(), suite_finish_timestamp)?;
437
438    Ok(outcome)
439}
440
441type EventStream =
442    std::pin::Pin<Box<dyn Stream<Item = Result<ftest_manager::Event, RunTestSuiteError>> + Send>>;
443
444/// A test suite that is known to have started execution. A suite is considered started once
445/// any event is produced for the suite.
446pub(crate) struct RunningSuite {
447    event_stream: EventStream,
448    stopper: RunningSuiteStopper,
449    max_severity_logs: Option<Severity>,
450    timeout: Option<std::time::Duration>,
451    timeout_grace: std::time::Duration,
452    no_cases_equals_success: Option<bool>,
453}
454
455struct RunningSuiteStopper(Arc<ftest_manager::SuiteControllerProxy>);
456
457impl RunningSuiteStopper {
458    fn stop(self) {
459        let _ = self.0.stop();
460    }
461}
462
463pub(crate) struct WaitForStartArgs {
464    pub(crate) proxy: ftest_manager::SuiteControllerProxy,
465    pub(crate) max_severity_logs: Option<Severity>,
466    pub(crate) timeout: Option<std::time::Duration>,
467    pub(crate) timeout_grace: std::time::Duration,
468    pub(crate) max_pipelined: Option<usize>,
469    pub(crate) no_cases_equals_success: Option<bool>,
470}
471
472impl RunningSuite {
473    /// Number of concurrently active WatchEvents requests. Chosen by testing powers of 2 when
474    /// running a set of tests using ffx test against an emulator, and taking the value at
475    /// which improvement stops.
476    const DEFAULT_PIPELINED_REQUESTS: usize = 8;
477    pub(crate) async fn wait_for_start(args: WaitForStartArgs) -> Self {
478        let WaitForStartArgs {
479            proxy,
480            max_severity_logs,
481            timeout,
482            timeout_grace,
483            max_pipelined,
484            no_cases_equals_success,
485        } = args;
486
487        let proxy = Arc::new(proxy);
488        let proxy_clone = proxy.clone();
489        // Stream of fidl responses, with multiple concurrently active requests.
490        let unprocessed_event_stream = futures::stream::repeat_with(move || {
491            proxy.watch_events().inspect(|events_result| match events_result {
492                Ok(Ok(ref events)) => info!("Latest suite event: {:?}", events.last()),
493                _ => (),
494            })
495        })
496        .buffered(max_pipelined.unwrap_or(Self::DEFAULT_PIPELINED_REQUESTS));
497        // Terminate the stream after we get an error or empty list of events.
498        let terminated_event_stream =
499            unprocessed_event_stream.take_until_stop_after(|result| match &result {
500                Ok(Ok(events)) => events.is_empty(),
501                Err(_) | Ok(Err(_)) => true,
502            });
503        // Flatten the stream of vecs into a stream of single events.
504        let mut event_stream = terminated_event_stream
505            .map(Self::convert_to_result_vec)
506            .map(futures::stream::iter)
507            .flatten()
508            .peekable();
509        // Wait for the first event to be ready, which signals the suite has started.
510        std::pin::Pin::new(&mut event_stream).peek().await;
511
512        Self {
513            event_stream: event_stream.boxed(),
514            stopper: RunningSuiteStopper(proxy_clone),
515            timeout,
516            timeout_grace,
517            max_severity_logs,
518            no_cases_equals_success,
519        }
520    }
521
522    fn convert_to_result_vec(
523        vec: Result<Result<Vec<ftest_manager::Event>, ftest_manager::LaunchError>, fidl::Error>,
524    ) -> Vec<Result<ftest_manager::Event, RunTestSuiteError>> {
525        match vec {
526            Ok(Ok(events)) => events.into_iter().map(Ok).collect(),
527            Ok(Err(e)) => vec![Err(e.into())],
528            Err(e) => vec![Err(e.into())],
529        }
530    }
531}
532
533#[cfg(test)]
534mod test {
535    use super::*;
536    use crate::output::EntityId;
537    use assert_matches::assert_matches;
538    use fidl::endpoints::create_proxy_and_stream;
539    use maplit::hashmap;
540
541    async fn respond_to_watch_events(
542        request_stream: &mut ftest_manager::SuiteControllerRequestStream,
543        events: Vec<ftest_manager::Event>,
544    ) {
545        let request = request_stream
546            .next()
547            .await
548            .expect("did not get next request")
549            .expect("error getting next request");
550        let responder = match request {
551            ftest_manager::SuiteControllerRequest::WatchEvents { responder } => responder,
552            r => panic!("Expected WatchEvents request but got {:?}", r),
553        };
554
555        responder.send(Ok(events)).expect("send events");
556    }
557
558    /// Serves all events to completion.
559    async fn serve_all_events(
560        mut request_stream: ftest_manager::SuiteControllerRequestStream,
561        events: Vec<ftest_manager::Event>,
562    ) {
563        const BATCH_SIZE: usize = 5;
564        let mut event_iter = events.into_iter();
565        while event_iter.len() > 0 {
566            respond_to_watch_events(
567                &mut request_stream,
568                event_iter.by_ref().take(BATCH_SIZE).collect(),
569            )
570            .await;
571        }
572        respond_to_watch_events(&mut request_stream, vec![]).await;
573    }
574
575    /// Serves all events to completion, then wait for the channel to close.
576    async fn serve_all_events_then_hang(
577        mut request_stream: ftest_manager::SuiteControllerRequestStream,
578        events: Vec<ftest_manager::Event>,
579    ) {
580        const BATCH_SIZE: usize = 5;
581        let mut event_iter = events.into_iter();
582        while event_iter.len() > 0 {
583            respond_to_watch_events(
584                &mut request_stream,
585                event_iter.by_ref().take(BATCH_SIZE).collect(),
586            )
587            .await;
588        }
589        let _requests = request_stream.collect::<Vec<_>>().await;
590    }
591
592    /// Creates an Event which is unpopulated, except for timestamp.
593    /// This isn't representative of an actual event from test framework, but is sufficient
594    /// to assert events are routed correctly.
595    fn create_empty_event(timestamp: i64) -> ftest_manager::Event {
596        ftest_manager::Event { timestamp: Some(timestamp), ..Default::default() }
597    }
598
599    macro_rules! assert_empty_events_eq {
600        ($t1:expr, $t2:expr) => {
601            assert_eq!($t1.timestamp, $t2.timestamp, "Got incorrect event.")
602        };
603    }
604
605    #[fuchsia::test]
606    async fn running_events_simple() {
607        let (suite_proxy, mut suite_request_stream) =
608            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
609        let suite_server_task = fasync::Task::spawn(async move {
610            respond_to_watch_events(&mut suite_request_stream, vec![create_empty_event(0)]).await;
611            respond_to_watch_events(&mut suite_request_stream, vec![]).await;
612            drop(suite_request_stream);
613        });
614
615        let mut running_suite = RunningSuite::wait_for_start(WaitForStartArgs {
616            proxy: suite_proxy,
617            max_severity_logs: None,
618            timeout: None,
619            timeout_grace: std::time::Duration::ZERO,
620            max_pipelined: None,
621            no_cases_equals_success: None,
622        })
623        .await;
624        assert_empty_events_eq!(
625            running_suite.event_stream.next().await.unwrap().unwrap(),
626            create_empty_event(0)
627        );
628        assert!(running_suite.event_stream.next().await.is_none());
629        // polling again should still give none.
630        assert!(running_suite.event_stream.next().await.is_none());
631        suite_server_task.await;
632    }
633
634    #[fuchsia::test]
635    async fn running_events_multiple_events() {
636        let (suite_proxy, mut suite_request_stream) =
637            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
638        let suite_server_task = fasync::Task::spawn(async move {
639            respond_to_watch_events(
640                &mut suite_request_stream,
641                vec![create_empty_event(0), create_empty_event(1)],
642            )
643            .await;
644            respond_to_watch_events(
645                &mut suite_request_stream,
646                vec![create_empty_event(2), create_empty_event(3)],
647            )
648            .await;
649            respond_to_watch_events(&mut suite_request_stream, vec![]).await;
650            drop(suite_request_stream);
651        });
652
653        let mut running_suite = RunningSuite::wait_for_start(WaitForStartArgs {
654            proxy: suite_proxy,
655            max_severity_logs: None,
656            timeout: None,
657            timeout_grace: std::time::Duration::ZERO,
658            max_pipelined: None,
659            no_cases_equals_success: None,
660        })
661        .await;
662
663        for num in 0..4 {
664            assert_empty_events_eq!(
665                running_suite.event_stream.next().await.unwrap().unwrap(),
666                create_empty_event(num)
667            );
668        }
669        assert!(running_suite.event_stream.next().await.is_none());
670        suite_server_task.await;
671    }
672
673    #[fuchsia::test]
674    async fn running_events_peer_closed() {
675        let (suite_proxy, mut suite_request_stream) =
676            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
677        let suite_server_task = fasync::Task::spawn(async move {
678            respond_to_watch_events(&mut suite_request_stream, vec![create_empty_event(1)]).await;
679            drop(suite_request_stream);
680        });
681
682        let mut running_suite = RunningSuite::wait_for_start(WaitForStartArgs {
683            proxy: suite_proxy,
684            max_severity_logs: None,
685            timeout: None,
686            timeout_grace: std::time::Duration::ZERO,
687            max_pipelined: None,
688            no_cases_equals_success: None,
689        })
690        .await;
691        assert_empty_events_eq!(
692            running_suite.event_stream.next().await.unwrap().unwrap(),
693            create_empty_event(1)
694        );
695        assert_matches!(
696            running_suite.event_stream.next().await,
697            Some(Err(RunTestSuiteError::Fidl(fidl::Error::ClientChannelClosed { .. })))
698        );
699        suite_server_task.await;
700    }
701
702    fn event_from_details(
703        timestamp: i64,
704        details: ftest_manager::EventDetails,
705    ) -> ftest_manager::Event {
706        let mut event = create_empty_event(timestamp);
707        event.details = Some(details);
708        event
709    }
710
711    fn case_found_event(timestamp: i64, test_case_id: u32, name: &str) -> ftest_manager::Event {
712        event_from_details(
713            timestamp,
714            ftest_manager::EventDetails::TestCaseFound(ftest_manager::TestCaseFoundEventDetails {
715                test_case_name: Some(name.into()),
716                test_case_id: Some(test_case_id),
717                ..Default::default()
718            }),
719        )
720    }
721
722    fn case_started_event(timestamp: i64, test_case_id: u32) -> ftest_manager::Event {
723        event_from_details(
724            timestamp,
725            ftest_manager::EventDetails::TestCaseStarted(
726                ftest_manager::TestCaseStartedEventDetails {
727                    test_case_id: Some(test_case_id),
728                    ..Default::default()
729                },
730            ),
731        )
732    }
733
734    fn case_stopped_event(
735        timestamp: i64,
736        test_case_id: u32,
737        result: ftest_manager::TestCaseResult,
738    ) -> ftest_manager::Event {
739        event_from_details(
740            timestamp,
741            ftest_manager::EventDetails::TestCaseStopped(
742                ftest_manager::TestCaseStoppedEventDetails {
743                    test_case_id: Some(test_case_id),
744                    result: Some(result),
745                    ..Default::default()
746                },
747            ),
748        )
749    }
750
751    fn case_finished_event(timestamp: i64, test_case_id: u32) -> ftest_manager::Event {
752        event_from_details(
753            timestamp,
754            ftest_manager::EventDetails::TestCaseFinished(
755                ftest_manager::TestCaseFinishedEventDetails {
756                    test_case_id: Some(test_case_id),
757                    ..Default::default()
758                },
759            ),
760        )
761    }
762
763    fn case_stdout_event(
764        timestamp: i64,
765        test_case_id: u32,
766        stdout: fidl::Socket,
767    ) -> ftest_manager::Event {
768        event_from_details(
769            timestamp,
770            ftest_manager::EventDetails::TestCaseArtifactGenerated(
771                ftest_manager::TestCaseArtifactGeneratedEventDetails {
772                    test_case_id: Some(test_case_id),
773                    artifact: Some(ftest_manager::Artifact::Stdout(stdout)),
774                    ..Default::default()
775                },
776            ),
777        )
778    }
779
780    fn case_stderr_event(
781        timestamp: i64,
782        test_case_id: u32,
783        stderr: fidl::Socket,
784    ) -> ftest_manager::Event {
785        event_from_details(
786            timestamp,
787            ftest_manager::EventDetails::TestCaseArtifactGenerated(
788                ftest_manager::TestCaseArtifactGeneratedEventDetails {
789                    test_case_id: Some(test_case_id),
790                    artifact: Some(ftest_manager::Artifact::Stderr(stderr)),
791                    ..Default::default()
792                },
793            ),
794        )
795    }
796
797    fn suite_started_event(timestamp: i64) -> ftest_manager::Event {
798        event_from_details(
799            timestamp,
800            ftest_manager::EventDetails::SuiteStarted(ftest_manager::SuiteStartedEventDetails {
801                ..Default::default()
802            }),
803        )
804    }
805
806    fn suite_stopped_event(
807        timestamp: i64,
808        result: ftest_manager::SuiteResult,
809    ) -> ftest_manager::Event {
810        event_from_details(
811            timestamp,
812            ftest_manager::EventDetails::SuiteStopped(ftest_manager::SuiteStoppedEventDetails {
813                result: Some(result),
814                ..Default::default()
815            }),
816        )
817    }
818
819    #[fuchsia::test]
820    async fn collect_events_simple() {
821        let all_events = vec![
822            suite_started_event(0),
823            case_found_event(100, 0, "my_test_case"),
824            case_started_event(200, 0),
825            case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
826            case_finished_event(400, 0),
827            suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
828        ];
829
830        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
831        let test_fut = async move {
832            let reporter = output::InMemoryReporter::new();
833            let run_reporter = output::RunReporter::new(reporter.clone());
834            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
835
836            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
837                proxy,
838                max_severity_logs: None,
839                timeout: None,
840                timeout_grace: std::time::Duration::ZERO,
841                max_pipelined: None,
842                no_cases_equals_success: None,
843            })
844            .await;
845            assert_eq!(
846                run_suite_and_collect_logs(
847                    suite,
848                    &suite_reporter,
849                    diagnostics::LogDisplayConfiguration::default(),
850                    futures::future::pending()
851                )
852                .await
853                .expect("collect results"),
854                Outcome::Passed
855            );
856            suite_reporter.finished().expect("Reporter finished");
857
858            let reports = reporter.get_reports();
859            let case = reports
860                .iter()
861                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
862                .unwrap();
863            assert_eq!(case.report.name, "my_test_case");
864            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
865            assert!(case.report.is_finished);
866            assert!(case.report.artifacts.is_empty());
867            assert!(case.report.directories.is_empty());
868            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
869            assert_eq!(suite.report.name, "test-url");
870            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
871            assert!(suite.report.is_finished);
872            assert!(suite.report.artifacts.is_empty());
873            assert!(suite.report.directories.is_empty());
874        };
875
876        futures::future::join(serve_all_events(stream, all_events), test_fut).await;
877    }
878
879    #[fuchsia::test]
880    async fn collect_events_with_case_artifacts() {
881        const STDOUT_CONTENT: &str = "stdout from my_test_case";
882        const STDERR_CONTENT: &str = "stderr from my_test_case";
883
884        let (stdout_write, stdout_read) = fidl::Socket::create_stream();
885        let (stderr_write, stderr_read) = fidl::Socket::create_stream();
886        let all_events = vec![
887            suite_started_event(0),
888            case_found_event(100, 0, "my_test_case"),
889            case_started_event(200, 0),
890            case_stdout_event(300, 0, stdout_read),
891            case_stderr_event(300, 0, stderr_read),
892            case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
893            case_finished_event(400, 0),
894            suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
895        ];
896
897        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
898        let stdio_write_fut = async move {
899            let mut async_stdout = fasync::Socket::from_socket(stdout_write);
900            async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
901            let mut async_stderr = fasync::Socket::from_socket(stderr_write);
902            async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
903        };
904        let test_fut = async move {
905            let reporter = output::InMemoryReporter::new();
906            let run_reporter = output::RunReporter::new(reporter.clone());
907            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
908
909            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
910                proxy,
911                max_severity_logs: None,
912                timeout: None,
913                timeout_grace: std::time::Duration::ZERO,
914                max_pipelined: None,
915                no_cases_equals_success: None,
916            })
917            .await;
918            assert_eq!(
919                run_suite_and_collect_logs(
920                    suite,
921                    &suite_reporter,
922                    diagnostics::LogDisplayConfiguration::default(),
923                    futures::future::pending()
924                )
925                .await
926                .expect("collect results"),
927                Outcome::Passed
928            );
929            suite_reporter.finished().expect("Reporter finished");
930
931            let reports = reporter.get_reports();
932            let case = reports
933                .iter()
934                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
935                .unwrap();
936            assert_eq!(case.report.name, "my_test_case");
937            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
938            assert!(case.report.is_finished);
939            assert_eq!(case.report.artifacts.len(), 2);
940            assert_eq!(
941                case.report
942                    .artifacts
943                    .iter()
944                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
945                    .collect::<HashMap<_, _>>(),
946                hashmap! {
947                    output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
948                    output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
949                }
950            );
951            assert!(case.report.directories.is_empty());
952
953            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
954            assert_eq!(suite.report.name, "test-url");
955            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
956            assert!(suite.report.is_finished);
957            assert!(suite.report.artifacts.is_empty());
958            assert!(suite.report.directories.is_empty());
959        };
960
961        futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
962            .await;
963    }
964
965    #[fuchsia::test]
966    async fn collect_events_case_artifacts_complete_after_suite() {
967        const STDOUT_CONTENT: &str = "stdout from my_test_case";
968        const STDERR_CONTENT: &str = "stderr from my_test_case";
969
970        let (stdout_write, stdout_read) = fidl::Socket::create_stream();
971        let (stderr_write, stderr_read) = fidl::Socket::create_stream();
972        let all_events = vec![
973            suite_started_event(0),
974            case_found_event(100, 0, "my_test_case"),
975            case_started_event(200, 0),
976            case_stdout_event(300, 0, stdout_read),
977            case_stderr_event(300, 0, stderr_read),
978            case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
979            case_finished_event(400, 0),
980            suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
981        ];
982
983        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
984        let serve_fut = async move {
985            // server side will send all events, then write to (and close) sockets.
986            serve_all_events(stream, all_events).await;
987            let mut async_stdout = fasync::Socket::from_socket(stdout_write);
988            async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
989            let mut async_stderr = fasync::Socket::from_socket(stderr_write);
990            async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
991        };
992        let test_fut = async move {
993            let reporter = output::InMemoryReporter::new();
994            let run_reporter = output::RunReporter::new(reporter.clone());
995            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
996
997            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
998                proxy,
999                max_severity_logs: None,
1000                timeout: None,
1001                timeout_grace: std::time::Duration::ZERO,
1002                max_pipelined: Some(1),
1003                no_cases_equals_success: None,
1004            })
1005            .await;
1006            assert_eq!(
1007                run_suite_and_collect_logs(
1008                    suite,
1009                    &suite_reporter,
1010                    diagnostics::LogDisplayConfiguration::default(),
1011                    futures::future::pending()
1012                )
1013                .await
1014                .expect("collect results"),
1015                Outcome::Passed
1016            );
1017            suite_reporter.finished().expect("Reporter finished");
1018
1019            let reports = reporter.get_reports();
1020            let case = reports
1021                .iter()
1022                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
1023                .unwrap();
1024            assert_eq!(case.report.name, "my_test_case");
1025            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
1026            assert!(case.report.is_finished);
1027            assert_eq!(case.report.artifacts.len(), 2);
1028            assert_eq!(
1029                case.report
1030                    .artifacts
1031                    .iter()
1032                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1033                    .collect::<HashMap<_, _>>(),
1034                hashmap! {
1035                    output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
1036                    output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
1037                }
1038            );
1039            assert!(case.report.directories.is_empty());
1040
1041            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
1042            assert_eq!(suite.report.name, "test-url");
1043            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
1044            assert!(suite.report.is_finished);
1045            assert!(suite.report.artifacts.is_empty());
1046            assert!(suite.report.directories.is_empty());
1047        };
1048
1049        futures::future::join(serve_fut, test_fut).await;
1050    }
1051
1052    #[fuchsia::test]
1053    async fn collect_events_with_case_artifacts_sent_after_case_stopped() {
1054        const STDOUT_CONTENT: &str = "stdout from my_test_case";
1055        const STDERR_CONTENT: &str = "stderr from my_test_case";
1056
1057        let (stdout_write, stdout_read) = fidl::Socket::create_stream();
1058        let (stderr_write, stderr_read) = fidl::Socket::create_stream();
1059        let all_events = vec![
1060            suite_started_event(0),
1061            case_found_event(100, 0, "my_test_case"),
1062            case_started_event(200, 0),
1063            case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
1064            case_stdout_event(300, 0, stdout_read),
1065            case_stderr_event(300, 0, stderr_read),
1066            case_finished_event(400, 0),
1067            suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
1068        ];
1069
1070        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
1071        let stdio_write_fut = async move {
1072            let mut async_stdout = fasync::Socket::from_socket(stdout_write);
1073            async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
1074            let mut async_stderr = fasync::Socket::from_socket(stderr_write);
1075            async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
1076        };
1077        let test_fut = async move {
1078            let reporter = output::InMemoryReporter::new();
1079            let run_reporter = output::RunReporter::new(reporter.clone());
1080            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
1081
1082            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
1083                proxy,
1084                max_severity_logs: None,
1085                timeout: None,
1086                timeout_grace: std::time::Duration::ZERO,
1087                max_pipelined: None,
1088                no_cases_equals_success: None,
1089            })
1090            .await;
1091            assert_eq!(
1092                run_suite_and_collect_logs(
1093                    suite,
1094                    &suite_reporter,
1095                    diagnostics::LogDisplayConfiguration::default(),
1096                    futures::future::pending()
1097                )
1098                .await
1099                .expect("collect results"),
1100                Outcome::Passed
1101            );
1102            suite_reporter.finished().expect("Reporter finished");
1103
1104            let reports = reporter.get_reports();
1105            let case = reports
1106                .iter()
1107                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
1108                .unwrap();
1109            assert_eq!(case.report.name, "my_test_case");
1110            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
1111            assert!(case.report.is_finished);
1112            assert_eq!(case.report.artifacts.len(), 2);
1113            assert_eq!(
1114                case.report
1115                    .artifacts
1116                    .iter()
1117                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1118                    .collect::<HashMap<_, _>>(),
1119                hashmap! {
1120                    output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
1121                    output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
1122                }
1123            );
1124            assert!(case.report.directories.is_empty());
1125
1126            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
1127            assert_eq!(suite.report.name, "test-url");
1128            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
1129            assert!(suite.report.is_finished);
1130            assert!(suite.report.artifacts.is_empty());
1131            assert!(suite.report.directories.is_empty());
1132        };
1133
1134        futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
1135            .await;
1136    }
1137
1138    #[fuchsia::test]
1139    async fn collect_events_timed_out_case_with_hanging_artifacts() {
1140        // create sockets and leave the server end open to simulate a hang.
1141        let (_stdout_write, stdout_read) = fidl::Socket::create_stream();
1142        let (_stderr_write, stderr_read) = fidl::Socket::create_stream();
1143        let all_events = vec![
1144            suite_started_event(0),
1145            case_found_event(100, 0, "my_test_case"),
1146            case_started_event(200, 0),
1147            case_stdout_event(300, 0, stdout_read),
1148            case_stderr_event(300, 0, stderr_read),
1149        ];
1150
1151        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
1152        let test_fut = async move {
1153            let reporter = output::InMemoryReporter::new();
1154            let run_reporter = output::RunReporter::new(reporter.clone());
1155            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
1156
1157            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
1158                proxy,
1159                max_severity_logs: None,
1160                timeout: Some(std::time::Duration::from_secs(2)),
1161                timeout_grace: std::time::Duration::ZERO,
1162                max_pipelined: None,
1163                no_cases_equals_success: None,
1164            })
1165            .await;
1166            assert_eq!(
1167                run_suite_and_collect_logs(
1168                    suite,
1169                    &suite_reporter,
1170                    diagnostics::LogDisplayConfiguration::default(),
1171                    futures::future::pending()
1172                )
1173                .await
1174                .expect("collect results"),
1175                Outcome::Timedout
1176            );
1177            suite_reporter.finished().expect("Reporter finished");
1178
1179            let reports = reporter.get_reports();
1180            let case = reports
1181                .iter()
1182                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
1183                .unwrap();
1184            assert_eq!(case.report.name, "my_test_case");
1185            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Timedout));
1186            assert!(case.report.is_finished);
1187            assert_eq!(case.report.artifacts.len(), 2);
1188            assert_eq!(
1189                case.report
1190                    .artifacts
1191                    .iter()
1192                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1193                    .collect::<HashMap<_, _>>(),
1194                hashmap! {
1195                    output::ArtifactType::Stdout => vec![],
1196                    output::ArtifactType::Stderr => vec![]
1197                }
1198            );
1199            assert!(case.report.directories.is_empty());
1200
1201            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
1202            assert_eq!(suite.report.name, "test-url");
1203            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Timedout));
1204            assert!(suite.report.is_finished);
1205            assert!(suite.report.artifacts.is_empty());
1206            assert!(suite.report.directories.is_empty());
1207        };
1208
1209        futures::future::join(serve_all_events_then_hang(stream, all_events), test_fut).await;
1210    }
1211}