run_test_suite_lib/
running_suite.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::cancel::{Cancelled, NamedFutureExt, OrCancel};
6use crate::outcome::{Lifecycle, Outcome, RunTestSuiteError, UnexpectedEventError};
7use crate::output::{self, ArtifactType, CaseId, SuiteReporter, Timestamp};
8use crate::stream_util::StreamUtil;
9use crate::trace::duration;
10use crate::{artifacts, diagnostics};
11use diagnostics_data::Severity;
12use fidl_fuchsia_test_manager::{
13    self as ftest_manager, CaseArtifact, CaseFinished, CaseFound, CaseStarted, CaseStopped,
14    SuiteArtifact, SuiteStopped,
15};
16use fuchsia_async as fasync;
17use futures::future::Either;
18use futures::prelude::*;
19use futures::stream::FuturesUnordered;
20use futures::StreamExt;
21use log::{error, info, warn};
22use std::collections::HashMap;
23use std::io::Write;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26
27/// Struct used by |run_suite_and_collect_logs| to track the state of test cases and suites.
28struct CollectedEntityState<R> {
29    reporter: R,
30    name: String,
31    lifecycle: Lifecycle,
32    artifact_tasks:
33        Vec<fasync::Task<Result<Option<diagnostics::LogCollectionOutcome>, anyhow::Error>>>,
34}
35
36/// Collects results and artifacts for a single suite.
37// TODO(satsukiu): There's two ways to return an error here:
38// * Err(RunTestSuiteError)
39// * Ok(Outcome::Error(RunTestSuiteError))
40// We should consider how to consolidate these.
41pub(crate) async fn run_suite_and_collect_logs<F: Future<Output = ()> + Unpin>(
42    running_suite: RunningSuite,
43    suite_reporter: &SuiteReporter<'_>,
44    log_display: diagnostics::LogDisplayConfiguration,
45    cancel_fut: F,
46) -> Result<Outcome, RunTestSuiteError> {
47    duration!(c"collect_suite");
48
49    let RunningSuite {
50        mut event_stream, stopper, timeout, timeout_grace, max_severity_logs, ..
51    } = running_suite;
52
53    let log_opts =
54        diagnostics::LogCollectionOptions { format: log_display, max_severity: max_severity_logs };
55
56    let mut test_cases: HashMap<u32, CollectedEntityState<_>> = HashMap::new();
57    let mut suite_state = CollectedEntityState {
58        reporter: suite_reporter,
59        name: "".to_string(),
60        lifecycle: Lifecycle::Found,
61        artifact_tasks: vec![],
62    };
63    let mut suite_finish_timestamp = Timestamp::Unknown;
64    let mut outcome = Outcome::Passed;
65
66    let collect_results_fut = async {
67        while let Some(event_result) = event_stream.next().named("next_event").await {
68            match event_result {
69                Err(e) => {
70                    suite_state
71                        .reporter
72                        .stopped(&output::ReportedOutcome::Error, Timestamp::Unknown)?;
73                    return Err(e);
74                }
75                Ok(event) => {
76                    let timestamp = Timestamp::from_nanos(event.timestamp);
77                    match event.payload.expect("event cannot be None") {
78                        ftest_manager::SuiteEventPayload::CaseFound(CaseFound {
79                            test_case_name,
80                            identifier,
81                        }) => {
82                            if test_cases.contains_key(&identifier) {
83                                return Err(UnexpectedEventError::InvalidCaseEvent {
84                                    last_state: Lifecycle::Found,
85                                    next_state: Lifecycle::Found,
86                                    test_case_name,
87                                    identifier,
88                                }
89                                .into());
90                            }
91                            test_cases.insert(
92                                identifier,
93                                CollectedEntityState {
94                                    reporter: suite_reporter
95                                        .new_case(&test_case_name, &CaseId(identifier))?,
96                                    name: test_case_name,
97                                    lifecycle: Lifecycle::Found,
98                                    artifact_tasks: vec![],
99                                },
100                            );
101                        }
102                        ftest_manager::SuiteEventPayload::CaseStarted(CaseStarted {
103                            identifier,
104                        }) => {
105                            let entry = test_cases.get_mut(&identifier).ok_or(
106                                UnexpectedEventError::CaseEventButNotFound {
107                                    next_state: Lifecycle::Started,
108                                    identifier,
109                                },
110                            )?;
111                            match &entry.lifecycle {
112                                Lifecycle::Found => {
113                                    // TODO(https://fxbug.dev/42159975): Record per-case runtime once we have an
114                                    // accurate way to measure it.
115                                    entry.reporter.started(Timestamp::Unknown)?;
116                                    entry.lifecycle = Lifecycle::Started;
117                                }
118                                other => {
119                                    return Err(UnexpectedEventError::InvalidCaseEvent {
120                                        last_state: *other,
121                                        next_state: Lifecycle::Started,
122                                        test_case_name: entry.name.clone(),
123                                        identifier,
124                                    }
125                                    .into());
126                                }
127                            }
128                        }
129                        ftest_manager::SuiteEventPayload::CaseArtifact(CaseArtifact {
130                            identifier,
131                            artifact,
132                        }) => {
133                            let entry = test_cases.get_mut(&identifier).ok_or(
134                                UnexpectedEventError::CaseArtifactButNotFound { identifier },
135                            )?;
136                            if matches!(entry.lifecycle, Lifecycle::Finished) {
137                                return Err(UnexpectedEventError::CaseArtifactButFinished {
138                                    identifier,
139                                }
140                                .into());
141                            }
142                            let artifact_fut = artifacts::drain_artifact(
143                                &entry.reporter,
144                                artifact,
145                                log_opts.clone(),
146                            )
147                            .await?;
148                            entry.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
149                        }
150                        ftest_manager::SuiteEventPayload::CaseStopped(CaseStopped {
151                            identifier,
152                            status,
153                        }) => {
154                            let entry = test_cases.get_mut(&identifier).ok_or(
155                                UnexpectedEventError::CaseEventButNotFound {
156                                    next_state: Lifecycle::Stopped,
157                                    identifier,
158                                },
159                            )?;
160                            match &entry.lifecycle {
161                                Lifecycle::Started => {
162                                    // TODO(https://fxbug.dev/42159975): Record per-case runtime once we have an
163                                    // accurate way to measure it.
164                                    entry.reporter.stopped(&status.into(), Timestamp::Unknown)?;
165                                    entry.lifecycle = Lifecycle::Stopped;
166                                }
167                                other => {
168                                    return Err(UnexpectedEventError::InvalidCaseEvent {
169                                        last_state: *other,
170                                        next_state: Lifecycle::Stopped,
171                                        test_case_name: entry.name.clone(),
172                                        identifier,
173                                    }
174                                    .into());
175                                }
176                            }
177                        }
178                        ftest_manager::SuiteEventPayload::CaseFinished(CaseFinished {
179                            identifier,
180                        }) => {
181                            let entry = test_cases.get_mut(&identifier).ok_or(
182                                UnexpectedEventError::CaseEventButNotFound {
183                                    next_state: Lifecycle::Finished,
184                                    identifier,
185                                },
186                            )?;
187                            match &entry.lifecycle {
188                                Lifecycle::Stopped => {
189                                    // don't mark reporter finished yet, we want to finish draining
190                                    // artifacts separately.
191                                    entry.lifecycle = Lifecycle::Finished;
192                                }
193                                other => {
194                                    return Err(UnexpectedEventError::InvalidCaseEvent {
195                                        last_state: *other,
196                                        next_state: Lifecycle::Finished,
197                                        test_case_name: entry.name.clone(),
198                                        identifier,
199                                    }
200                                    .into());
201                                }
202                            }
203                        }
204                        ftest_manager::SuiteEventPayload::SuiteArtifact(SuiteArtifact {
205                            artifact,
206                        }) => {
207                            let artifact_fut = artifacts::drain_artifact(
208                                suite_reporter,
209                                artifact,
210                                log_opts.clone(),
211                            )
212                            .await?;
213                            suite_state.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
214                        }
215                        ftest_manager::SuiteEventPayload::SuiteStarted(_) => {
216                            match &suite_state.lifecycle {
217                                Lifecycle::Found => {
218                                    suite_state.reporter.started(timestamp)?;
219                                    suite_state.lifecycle = Lifecycle::Started;
220                                }
221                                other => {
222                                    return Err(UnexpectedEventError::InvalidSuiteEvent {
223                                        last_state: *other,
224                                        next_state: Lifecycle::Started,
225                                    }
226                                    .into());
227                                }
228                            }
229                        }
230                        ftest_manager::SuiteEventPayload::SuiteStopped(SuiteStopped { status }) => {
231                            match &suite_state.lifecycle {
232                                Lifecycle::Started => {
233                                    suite_state.lifecycle = Lifecycle::Stopped;
234                                    suite_finish_timestamp = timestamp;
235                                    outcome = match status {
236                                        ftest_manager::SuiteStatus::Passed => Outcome::Passed,
237                                        ftest_manager::SuiteStatus::Failed => Outcome::Failed,
238                                        ftest_manager::SuiteStatus::DidNotFinish => {
239                                            Outcome::Inconclusive
240                                        }
241                                        ftest_manager::SuiteStatus::TimedOut
242                                        | ftest_manager::SuiteStatus::Stopped => Outcome::Timedout,
243                                        ftest_manager::SuiteStatus::InternalError => {
244                                            Outcome::error(
245                                                UnexpectedEventError::InternalErrorSuiteStatus,
246                                            )
247                                        }
248                                        s => {
249                                            return Err(
250                                                UnexpectedEventError::UnrecognizedSuiteStatus {
251                                                    status: s,
252                                                }
253                                                .into(),
254                                            );
255                                        }
256                                    };
257                                }
258                                other => {
259                                    return Err(UnexpectedEventError::InvalidSuiteEvent {
260                                        last_state: *other,
261                                        next_state: Lifecycle::Stopped,
262                                    }
263                                    .into());
264                                }
265                            }
266                        }
267                        ftest_manager::SuiteEventPayloadUnknown!() => {
268                            warn!("Encountered unrecognized suite event");
269                        }
270                    }
271                }
272            }
273        }
274        drop(event_stream); // Explicit drop here to force ownership move.
275        Ok(())
276    }
277    .boxed_local();
278
279    let start_time = std::time::Instant::now();
280    let (stop_timeout_future, kill_timeout_future) = match timeout {
281        None => {
282            (futures::future::pending::<()>().boxed(), futures::future::pending::<()>().boxed())
283        }
284        Some(duration) => (
285            fasync::Timer::new(start_time + duration).boxed(),
286            fasync::Timer::new(start_time + duration + timeout_grace).boxed(),
287        ),
288    };
289
290    // This polls event collection and calling SuiteController::Stop on timeout simultaneously.
291    let collect_or_stop_fut = async move {
292        match futures::future::select(stop_timeout_future, collect_results_fut).await {
293            Either::Left((_stop_done, collect_fut)) => {
294                stopper.stop();
295                collect_fut.await
296            }
297            Either::Right((result, _)) => result,
298        }
299    };
300
301    // If kill timeout or cancel occur, we want to stop polling events.
302    // kill_fut resolves to the outcome to which results should be overwritten
303    // if it resolves.
304    let kill_fut = async move {
305        match futures::future::select(cancel_fut, kill_timeout_future).await {
306            Either::Left(_) => Outcome::Cancelled,
307            Either::Right(_) => Outcome::Timedout,
308        }
309    }
310    .shared();
311
312    let early_termination_outcome =
313        match collect_or_stop_fut.boxed_local().or_cancelled(kill_fut.clone()).await {
314            Ok(Ok(())) => None,
315            Ok(Err(e)) => return Err(e),
316            Err(Cancelled(outcome)) => Some(outcome),
317        };
318
319    // Finish collecting artifacts and report errors.
320    info!("Awaiting case artifacts");
321    let mut unfinished_test_case_names = vec![];
322    for (_, test_case) in test_cases.into_iter() {
323        let CollectedEntityState { reporter, name, lifecycle, artifact_tasks } = test_case;
324        match (lifecycle, early_termination_outcome.clone()) {
325            (Lifecycle::Started | Lifecycle::Found, Some(early)) => {
326                reporter.stopped(&early.into(), Timestamp::Unknown)?;
327            }
328            (Lifecycle::Found, None) => {
329                unfinished_test_case_names.push(name.clone());
330                reporter.stopped(&Outcome::Inconclusive.into(), Timestamp::Unknown)?;
331            }
332            (Lifecycle::Started, None) => {
333                unfinished_test_case_names.push(name.clone());
334                reporter.stopped(&Outcome::DidNotFinish.into(), Timestamp::Unknown)?;
335            }
336            (Lifecycle::Stopped | Lifecycle::Finished, _) => (),
337        }
338
339        let finish_artifacts_fut = FuturesUnordered::from_iter(artifact_tasks)
340            .map(|result| match result {
341                Err(e) => {
342                    error!("Failed to collect artifact for {}: {:?}", name, e);
343                }
344                Ok(Some(_log_result)) => warn!("Unexpectedly got log results for a test case"),
345                Ok(None) => (),
346            })
347            .collect::<()>();
348        if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut.clone()).await {
349            warn!("Stopped polling artifacts for {} due to timeout", name);
350        }
351
352        reporter.finished()?;
353    }
354    if !unfinished_test_case_names.is_empty() {
355        outcome = Outcome::error(UnexpectedEventError::CasesDidNotFinish {
356            cases: unfinished_test_case_names,
357        });
358    }
359
360    match (suite_state.lifecycle, early_termination_outcome) {
361        (Lifecycle::Found | Lifecycle::Started, Some(early)) => {
362            if matches!(&outcome, Outcome::Passed | Outcome::Failed) {
363                outcome = early;
364            }
365        }
366        (Lifecycle::Found | Lifecycle::Started, None) => {
367            outcome = Outcome::error(UnexpectedEventError::SuiteDidNotReportStop);
368        }
369        // If the suite successfully reported a result, don't alter it.
370        (Lifecycle::Stopped, _) => (),
371        // Finished doesn't happen since there's no SuiteFinished event.
372        (Lifecycle::Finished, _) => unreachable!(),
373    }
374
375    let restricted_logs_present = AtomicBool::new(false);
376    let finish_artifacts_fut = FuturesUnordered::from_iter(suite_state.artifact_tasks)
377        .then(|result| async {
378            match result {
379                Err(e) => {
380                    error!("Failed to collect artifact for suite: {:?}", e);
381                }
382                Ok(Some(log_result)) => match log_result {
383                    diagnostics::LogCollectionOutcome::Error { restricted_logs } => {
384                        restricted_logs_present.store(true, Ordering::Relaxed);
385                        let mut log_artifact = match suite_reporter
386                            .new_artifact(&ArtifactType::RestrictedLog)
387                        {
388                            Ok(artifact) => artifact,
389                            Err(e) => {
390                                warn!("Error creating artifact to report restricted logs: {:?}", e);
391                                return;
392                            }
393                        };
394                        for log in restricted_logs.iter() {
395                            if let Err(e) = writeln!(log_artifact, "{}", log) {
396                                warn!("Error recording restricted logs: {:?}", e);
397                                return;
398                            }
399                        }
400                    }
401                    diagnostics::LogCollectionOutcome::Passed => (),
402                },
403                Ok(None) => (),
404            }
405        })
406        .collect::<()>();
407    if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut).await {
408        warn!("Stopped polling artifacts due to timeout");
409    }
410    if restricted_logs_present.into_inner() && matches!(outcome, Outcome::Passed) {
411        outcome = Outcome::Failed;
412    }
413
414    suite_reporter.stopped(&outcome.clone().into(), suite_finish_timestamp)?;
415
416    Ok(outcome)
417}
418
419type SuiteEventStream = std::pin::Pin<
420    Box<dyn Stream<Item = Result<ftest_manager::SuiteEvent, RunTestSuiteError>> + Send>,
421>;
422
423/// A test suite that is known to have started execution. A suite is considered started once
424/// any event is produced for the suite.
425pub(crate) struct RunningSuite {
426    event_stream: SuiteEventStream,
427    stopper: RunningSuiteStopper,
428    max_severity_logs: Option<Severity>,
429    timeout: Option<std::time::Duration>,
430    timeout_grace: std::time::Duration,
431}
432
433struct RunningSuiteStopper(Arc<ftest_manager::SuiteControllerProxy>);
434
435impl RunningSuiteStopper {
436    fn stop(self) {
437        let _ = self.0.stop();
438    }
439}
440
441impl RunningSuite {
442    /// Number of concurrently active GetEvents requests. Chosen by testing powers of 2 when
443    /// running a set of tests using ffx test against an emulator, and taking the value at
444    /// which improvement stops.
445    const DEFAULT_PIPELINED_REQUESTS: usize = 8;
446    pub(crate) async fn wait_for_start(
447        proxy: ftest_manager::SuiteControllerProxy,
448        max_severity_logs: Option<Severity>,
449        timeout: Option<std::time::Duration>,
450        timeout_grace: std::time::Duration,
451        max_pipelined: Option<usize>,
452    ) -> Self {
453        let proxy = Arc::new(proxy);
454        let proxy_clone = proxy.clone();
455        // Stream of fidl responses, with multiple concurrently active requests.
456        let unprocessed_event_stream = futures::stream::repeat_with(move || {
457            proxy.get_events().inspect(|events_result| match events_result {
458                Ok(Ok(ref events)) => info!("Latest suite event: {:?}", events.last()),
459                _ => (),
460            })
461        })
462        .buffered(max_pipelined.unwrap_or(Self::DEFAULT_PIPELINED_REQUESTS));
463        // Terminate the stream after we get an error or empty list of events.
464        let terminated_event_stream =
465            unprocessed_event_stream.take_until_stop_after(|result| match &result {
466                Ok(Ok(events)) => events.is_empty(),
467                Err(_) | Ok(Err(_)) => true,
468            });
469        // Flatten the stream of vecs into a stream of single events.
470        let mut event_stream = terminated_event_stream
471            .map(Self::convert_to_result_vec)
472            .map(futures::stream::iter)
473            .flatten()
474            .peekable();
475        // Wait for the first event to be ready, which signals the suite has started.
476        std::pin::Pin::new(&mut event_stream).peek().await;
477
478        Self {
479            event_stream: event_stream.boxed(),
480            stopper: RunningSuiteStopper(proxy_clone),
481            timeout,
482            timeout_grace,
483            max_severity_logs,
484        }
485    }
486
487    fn convert_to_result_vec(
488        vec: Result<
489            Result<Vec<ftest_manager::SuiteEvent>, ftest_manager::LaunchError>,
490            fidl::Error,
491        >,
492    ) -> Vec<Result<ftest_manager::SuiteEvent, RunTestSuiteError>> {
493        match vec {
494            Ok(Ok(events)) => events.into_iter().map(Ok).collect(),
495            Ok(Err(e)) => vec![Err(e.into())],
496            Err(e) => vec![Err(e.into())],
497        }
498    }
499}
500
501#[cfg(test)]
502mod test {
503    use super::*;
504    use crate::output::{EntityId, SuiteId};
505    use assert_matches::assert_matches;
506    use fidl::endpoints::create_proxy_and_stream;
507    use maplit::hashmap;
508
509    async fn respond_to_get_events(
510        request_stream: &mut ftest_manager::SuiteControllerRequestStream,
511        events: Vec<ftest_manager::SuiteEvent>,
512    ) {
513        let request = request_stream
514            .next()
515            .await
516            .expect("did not get next request")
517            .expect("error getting next request");
518        let responder = match request {
519            ftest_manager::SuiteControllerRequest::GetEvents { responder } => responder,
520            r => panic!("Expected GetEvents request but got {:?}", r),
521        };
522
523        responder.send(Ok(events)).expect("send events");
524    }
525
526    /// Serves all events to completion.
527    async fn serve_all_events(
528        mut request_stream: ftest_manager::SuiteControllerRequestStream,
529        events: Vec<ftest_manager::SuiteEvent>,
530    ) {
531        const BATCH_SIZE: usize = 5;
532        let mut event_iter = events.into_iter();
533        while event_iter.len() > 0 {
534            respond_to_get_events(
535                &mut request_stream,
536                event_iter.by_ref().take(BATCH_SIZE).collect(),
537            )
538            .await;
539        }
540        respond_to_get_events(&mut request_stream, vec![]).await;
541    }
542
543    /// Serves all events to completion, then wait for the channel to close.
544    async fn serve_all_events_then_hang(
545        mut request_stream: ftest_manager::SuiteControllerRequestStream,
546        events: Vec<ftest_manager::SuiteEvent>,
547    ) {
548        const BATCH_SIZE: usize = 5;
549        let mut event_iter = events.into_iter();
550        while event_iter.len() > 0 {
551            respond_to_get_events(
552                &mut request_stream,
553                event_iter.by_ref().take(BATCH_SIZE).collect(),
554            )
555            .await;
556        }
557        let _requests = request_stream.collect::<Vec<_>>().await;
558    }
559
560    /// Creates a SuiteEvent which is unpopulated, except for timestamp.
561    /// This isn't representative of an actual event from test framework, but is sufficient
562    /// to assert events are routed correctly.
563    fn create_empty_event(timestamp: i64) -> ftest_manager::SuiteEvent {
564        ftest_manager::SuiteEvent { timestamp: Some(timestamp), ..Default::default() }
565    }
566
567    macro_rules! assert_empty_events_eq {
568        ($t1:expr, $t2:expr) => {
569            assert_eq!($t1.timestamp, $t2.timestamp, "Got incorrect event.")
570        };
571    }
572
573    #[fuchsia::test]
574    async fn running_suite_events_simple() {
575        let (suite_proxy, mut suite_request_stream) =
576            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
577        let suite_server_task = fasync::Task::spawn(async move {
578            respond_to_get_events(&mut suite_request_stream, vec![create_empty_event(0)]).await;
579            respond_to_get_events(&mut suite_request_stream, vec![]).await;
580            drop(suite_request_stream);
581        });
582
583        let mut running_suite =
584            RunningSuite::wait_for_start(suite_proxy, None, None, std::time::Duration::ZERO, None)
585                .await;
586        assert_empty_events_eq!(
587            running_suite.event_stream.next().await.unwrap().unwrap(),
588            create_empty_event(0)
589        );
590        assert!(running_suite.event_stream.next().await.is_none());
591        // polling again should still give none.
592        assert!(running_suite.event_stream.next().await.is_none());
593        suite_server_task.await;
594    }
595
596    #[fuchsia::test]
597    async fn running_suite_events_multiple_events() {
598        let (suite_proxy, mut suite_request_stream) =
599            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
600        let suite_server_task = fasync::Task::spawn(async move {
601            respond_to_get_events(
602                &mut suite_request_stream,
603                vec![create_empty_event(0), create_empty_event(1)],
604            )
605            .await;
606            respond_to_get_events(
607                &mut suite_request_stream,
608                vec![create_empty_event(2), create_empty_event(3)],
609            )
610            .await;
611            respond_to_get_events(&mut suite_request_stream, vec![]).await;
612            drop(suite_request_stream);
613        });
614
615        let mut running_suite =
616            RunningSuite::wait_for_start(suite_proxy, None, None, std::time::Duration::ZERO, None)
617                .await;
618
619        for num in 0..4 {
620            assert_empty_events_eq!(
621                running_suite.event_stream.next().await.unwrap().unwrap(),
622                create_empty_event(num)
623            );
624        }
625        assert!(running_suite.event_stream.next().await.is_none());
626        suite_server_task.await;
627    }
628
629    #[fuchsia::test]
630    async fn running_suite_events_peer_closed() {
631        let (suite_proxy, mut suite_request_stream) =
632            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
633        let suite_server_task = fasync::Task::spawn(async move {
634            respond_to_get_events(&mut suite_request_stream, vec![create_empty_event(1)]).await;
635            drop(suite_request_stream);
636        });
637
638        let mut running_suite =
639            RunningSuite::wait_for_start(suite_proxy, None, None, std::time::Duration::ZERO, None)
640                .await;
641        assert_empty_events_eq!(
642            running_suite.event_stream.next().await.unwrap().unwrap(),
643            create_empty_event(1)
644        );
645        assert_matches!(
646            running_suite.event_stream.next().await,
647            Some(Err(RunTestSuiteError::Fidl(fidl::Error::ClientChannelClosed { .. })))
648        );
649        suite_server_task.await;
650    }
651
652    fn suite_event_from_payload(
653        timestamp: i64,
654        payload: ftest_manager::SuiteEventPayload,
655    ) -> ftest_manager::SuiteEvent {
656        let mut event = create_empty_event(timestamp);
657        event.payload = Some(payload);
658        event
659    }
660
661    fn case_found_event(timestamp: i64, identifier: u32, name: &str) -> ftest_manager::SuiteEvent {
662        suite_event_from_payload(
663            timestamp,
664            ftest_manager::SuiteEventPayload::CaseFound(ftest_manager::CaseFound {
665                test_case_name: name.into(),
666                identifier,
667            }),
668        )
669    }
670
671    fn case_started_event(timestamp: i64, identifier: u32) -> ftest_manager::SuiteEvent {
672        suite_event_from_payload(
673            timestamp,
674            ftest_manager::SuiteEventPayload::CaseStarted(ftest_manager::CaseStarted {
675                identifier,
676            }),
677        )
678    }
679
680    fn case_stopped_event(
681        timestamp: i64,
682        identifier: u32,
683        status: ftest_manager::CaseStatus,
684    ) -> ftest_manager::SuiteEvent {
685        suite_event_from_payload(
686            timestamp,
687            ftest_manager::SuiteEventPayload::CaseStopped(ftest_manager::CaseStopped {
688                identifier,
689                status,
690            }),
691        )
692    }
693
694    fn case_finished_event(timestamp: i64, identifier: u32) -> ftest_manager::SuiteEvent {
695        suite_event_from_payload(
696            timestamp,
697            ftest_manager::SuiteEventPayload::CaseFinished(ftest_manager::CaseFinished {
698                identifier,
699            }),
700        )
701    }
702
703    fn case_stdout_event(
704        timestamp: i64,
705        identifier: u32,
706        stdout: fidl::Socket,
707    ) -> ftest_manager::SuiteEvent {
708        suite_event_from_payload(
709            timestamp,
710            ftest_manager::SuiteEventPayload::CaseArtifact(ftest_manager::CaseArtifact {
711                identifier,
712                artifact: ftest_manager::Artifact::Stdout(stdout),
713            }),
714        )
715    }
716
717    fn case_stderr_event(
718        timestamp: i64,
719        identifier: u32,
720        stderr: fidl::Socket,
721    ) -> ftest_manager::SuiteEvent {
722        suite_event_from_payload(
723            timestamp,
724            ftest_manager::SuiteEventPayload::CaseArtifact(ftest_manager::CaseArtifact {
725                identifier,
726                artifact: ftest_manager::Artifact::Stderr(stderr),
727            }),
728        )
729    }
730
731    fn suite_started_event(timestamp: i64) -> ftest_manager::SuiteEvent {
732        suite_event_from_payload(
733            timestamp,
734            ftest_manager::SuiteEventPayload::SuiteStarted(ftest_manager::SuiteStarted),
735        )
736    }
737
738    fn suite_stopped_event(
739        timestamp: i64,
740        status: ftest_manager::SuiteStatus,
741    ) -> ftest_manager::SuiteEvent {
742        suite_event_from_payload(
743            timestamp,
744            ftest_manager::SuiteEventPayload::SuiteStopped(ftest_manager::SuiteStopped { status }),
745        )
746    }
747
748    #[fuchsia::test]
749    async fn collect_suite_events_simple() {
750        let all_events = vec![
751            suite_started_event(0),
752            case_found_event(100, 0, "my_test_case"),
753            case_started_event(200, 0),
754            case_stopped_event(300, 0, ftest_manager::CaseStatus::Passed),
755            case_finished_event(400, 0),
756            suite_stopped_event(500, ftest_manager::SuiteStatus::Passed),
757        ];
758
759        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
760        let test_fut = async move {
761            let reporter = output::InMemoryReporter::new();
762            let run_reporter = output::RunReporter::new(reporter.clone());
763            let suite_reporter =
764                run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
765
766            let suite =
767                RunningSuite::wait_for_start(proxy, None, None, std::time::Duration::ZERO, None)
768                    .await;
769            assert_eq!(
770                run_suite_and_collect_logs(
771                    suite,
772                    &suite_reporter,
773                    diagnostics::LogDisplayConfiguration::default(),
774                    futures::future::pending()
775                )
776                .await
777                .expect("collect results"),
778                Outcome::Passed
779            );
780            suite_reporter.finished().expect("Reporter finished");
781
782            let reports = reporter.get_reports();
783            let case = reports
784                .iter()
785                .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
786                .unwrap();
787            assert_eq!(case.report.name, "my_test_case");
788            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
789            assert!(case.report.is_finished);
790            assert!(case.report.artifacts.is_empty());
791            assert!(case.report.directories.is_empty());
792            let suite =
793                reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
794            assert_eq!(suite.report.name, "test-url");
795            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
796            assert!(suite.report.is_finished);
797            assert!(suite.report.artifacts.is_empty());
798            assert!(suite.report.directories.is_empty());
799        };
800
801        futures::future::join(serve_all_events(stream, all_events), test_fut).await;
802    }
803
804    #[fuchsia::test]
805    async fn collect_suite_events_with_case_artifacts() {
806        const STDOUT_CONTENT: &str = "stdout from my_test_case";
807        const STDERR_CONTENT: &str = "stderr from my_test_case";
808
809        let (stdout_write, stdout_read) = fidl::Socket::create_stream();
810        let (stderr_write, stderr_read) = fidl::Socket::create_stream();
811        let all_events = vec![
812            suite_started_event(0),
813            case_found_event(100, 0, "my_test_case"),
814            case_started_event(200, 0),
815            case_stdout_event(300, 0, stdout_read),
816            case_stderr_event(300, 0, stderr_read),
817            case_stopped_event(300, 0, ftest_manager::CaseStatus::Passed),
818            case_finished_event(400, 0),
819            suite_stopped_event(500, ftest_manager::SuiteStatus::Passed),
820        ];
821
822        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
823        let stdio_write_fut = async move {
824            let mut async_stdout = fasync::Socket::from_socket(stdout_write);
825            async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
826            let mut async_stderr = fasync::Socket::from_socket(stderr_write);
827            async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
828        };
829        let test_fut = async move {
830            let reporter = output::InMemoryReporter::new();
831            let run_reporter = output::RunReporter::new(reporter.clone());
832            let suite_reporter =
833                run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
834
835            let suite =
836                RunningSuite::wait_for_start(proxy, None, None, std::time::Duration::ZERO, None)
837                    .await;
838            assert_eq!(
839                run_suite_and_collect_logs(
840                    suite,
841                    &suite_reporter,
842                    diagnostics::LogDisplayConfiguration::default(),
843                    futures::future::pending()
844                )
845                .await
846                .expect("collect results"),
847                Outcome::Passed
848            );
849            suite_reporter.finished().expect("Reporter finished");
850
851            let reports = reporter.get_reports();
852            let case = reports
853                .iter()
854                .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
855                .unwrap();
856            assert_eq!(case.report.name, "my_test_case");
857            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
858            assert!(case.report.is_finished);
859            assert_eq!(case.report.artifacts.len(), 2);
860            assert_eq!(
861                case.report
862                    .artifacts
863                    .iter()
864                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
865                    .collect::<HashMap<_, _>>(),
866                hashmap! {
867                    output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
868                    output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
869                }
870            );
871            assert!(case.report.directories.is_empty());
872
873            let suite =
874                reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
875            assert_eq!(suite.report.name, "test-url");
876            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
877            assert!(suite.report.is_finished);
878            assert!(suite.report.artifacts.is_empty());
879            assert!(suite.report.directories.is_empty());
880        };
881
882        futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
883            .await;
884    }
885
886    #[fuchsia::test]
887    async fn collect_suite_events_case_artifacts_complete_after_suite() {
888        const STDOUT_CONTENT: &str = "stdout from my_test_case";
889        const STDERR_CONTENT: &str = "stderr from my_test_case";
890
891        let (stdout_write, stdout_read) = fidl::Socket::create_stream();
892        let (stderr_write, stderr_read) = fidl::Socket::create_stream();
893        let all_events = vec![
894            suite_started_event(0),
895            case_found_event(100, 0, "my_test_case"),
896            case_started_event(200, 0),
897            case_stdout_event(300, 0, stdout_read),
898            case_stderr_event(300, 0, stderr_read),
899            case_stopped_event(300, 0, ftest_manager::CaseStatus::Passed),
900            case_finished_event(400, 0),
901            suite_stopped_event(500, ftest_manager::SuiteStatus::Passed),
902        ];
903
904        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
905        let serve_fut = async move {
906            // server side will send all events, then write to (and close) sockets.
907            serve_all_events(stream, all_events).await;
908            let mut async_stdout = fasync::Socket::from_socket(stdout_write);
909            async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
910            let mut async_stderr = fasync::Socket::from_socket(stderr_write);
911            async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
912        };
913        let test_fut = async move {
914            let reporter = output::InMemoryReporter::new();
915            let run_reporter = output::RunReporter::new(reporter.clone());
916            let suite_reporter =
917                run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
918
919            let suite =
920                RunningSuite::wait_for_start(proxy, None, None, std::time::Duration::ZERO, Some(1))
921                    .await;
922            assert_eq!(
923                run_suite_and_collect_logs(
924                    suite,
925                    &suite_reporter,
926                    diagnostics::LogDisplayConfiguration::default(),
927                    futures::future::pending()
928                )
929                .await
930                .expect("collect results"),
931                Outcome::Passed
932            );
933            suite_reporter.finished().expect("Reporter finished");
934
935            let reports = reporter.get_reports();
936            let case = reports
937                .iter()
938                .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
939                .unwrap();
940            assert_eq!(case.report.name, "my_test_case");
941            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
942            assert!(case.report.is_finished);
943            assert_eq!(case.report.artifacts.len(), 2);
944            assert_eq!(
945                case.report
946                    .artifacts
947                    .iter()
948                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
949                    .collect::<HashMap<_, _>>(),
950                hashmap! {
951                    output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
952                    output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
953                }
954            );
955            assert!(case.report.directories.is_empty());
956
957            let suite =
958                reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
959            assert_eq!(suite.report.name, "test-url");
960            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
961            assert!(suite.report.is_finished);
962            assert!(suite.report.artifacts.is_empty());
963            assert!(suite.report.directories.is_empty());
964        };
965
966        futures::future::join(serve_fut, test_fut).await;
967    }
968
969    #[fuchsia::test]
970    async fn collect_suite_events_with_case_artifacts_sent_after_case_stopped() {
971        const STDOUT_CONTENT: &str = "stdout from my_test_case";
972        const STDERR_CONTENT: &str = "stderr from my_test_case";
973
974        let (stdout_write, stdout_read) = fidl::Socket::create_stream();
975        let (stderr_write, stderr_read) = fidl::Socket::create_stream();
976        let all_events = vec![
977            suite_started_event(0),
978            case_found_event(100, 0, "my_test_case"),
979            case_started_event(200, 0),
980            case_stopped_event(300, 0, ftest_manager::CaseStatus::Passed),
981            case_stdout_event(300, 0, stdout_read),
982            case_stderr_event(300, 0, stderr_read),
983            case_finished_event(400, 0),
984            suite_stopped_event(500, ftest_manager::SuiteStatus::Passed),
985        ];
986
987        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
988        let stdio_write_fut = async move {
989            let mut async_stdout = fasync::Socket::from_socket(stdout_write);
990            async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
991            let mut async_stderr = fasync::Socket::from_socket(stderr_write);
992            async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
993        };
994        let test_fut = async move {
995            let reporter = output::InMemoryReporter::new();
996            let run_reporter = output::RunReporter::new(reporter.clone());
997            let suite_reporter =
998                run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
999
1000            let suite =
1001                RunningSuite::wait_for_start(proxy, None, None, std::time::Duration::ZERO, None)
1002                    .await;
1003            assert_eq!(
1004                run_suite_and_collect_logs(
1005                    suite,
1006                    &suite_reporter,
1007                    diagnostics::LogDisplayConfiguration::default(),
1008                    futures::future::pending()
1009                )
1010                .await
1011                .expect("collect results"),
1012                Outcome::Passed
1013            );
1014            suite_reporter.finished().expect("Reporter finished");
1015
1016            let reports = reporter.get_reports();
1017            let case = reports
1018                .iter()
1019                .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
1020                .unwrap();
1021            assert_eq!(case.report.name, "my_test_case");
1022            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
1023            assert!(case.report.is_finished);
1024            assert_eq!(case.report.artifacts.len(), 2);
1025            assert_eq!(
1026                case.report
1027                    .artifacts
1028                    .iter()
1029                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1030                    .collect::<HashMap<_, _>>(),
1031                hashmap! {
1032                    output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
1033                    output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
1034                }
1035            );
1036            assert!(case.report.directories.is_empty());
1037
1038            let suite =
1039                reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
1040            assert_eq!(suite.report.name, "test-url");
1041            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
1042            assert!(suite.report.is_finished);
1043            assert!(suite.report.artifacts.is_empty());
1044            assert!(suite.report.directories.is_empty());
1045        };
1046
1047        futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
1048            .await;
1049    }
1050
1051    #[fuchsia::test]
1052    async fn collect_suite_events_timed_out_case_with_hanging_artifacts() {
1053        // create sockets and leave the server end open to simulate a hang.
1054        let (_stdout_write, stdout_read) = fidl::Socket::create_stream();
1055        let (_stderr_write, stderr_read) = fidl::Socket::create_stream();
1056        let all_events = vec![
1057            suite_started_event(0),
1058            case_found_event(100, 0, "my_test_case"),
1059            case_started_event(200, 0),
1060            case_stdout_event(300, 0, stdout_read),
1061            case_stderr_event(300, 0, stderr_read),
1062        ];
1063
1064        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
1065        let test_fut = async move {
1066            let reporter = output::InMemoryReporter::new();
1067            let run_reporter = output::RunReporter::new(reporter.clone());
1068            let suite_reporter =
1069                run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
1070
1071            let suite = RunningSuite::wait_for_start(
1072                proxy,
1073                None,
1074                Some(std::time::Duration::from_secs(2)),
1075                std::time::Duration::ZERO,
1076                None,
1077            )
1078            .await;
1079            assert_eq!(
1080                run_suite_and_collect_logs(
1081                    suite,
1082                    &suite_reporter,
1083                    diagnostics::LogDisplayConfiguration::default(),
1084                    futures::future::pending()
1085                )
1086                .await
1087                .expect("collect results"),
1088                Outcome::Timedout
1089            );
1090            suite_reporter.finished().expect("Reporter finished");
1091
1092            let reports = reporter.get_reports();
1093            let case = reports
1094                .iter()
1095                .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
1096                .unwrap();
1097            assert_eq!(case.report.name, "my_test_case");
1098            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Timedout));
1099            assert!(case.report.is_finished);
1100            assert_eq!(case.report.artifacts.len(), 2);
1101            assert_eq!(
1102                case.report
1103                    .artifacts
1104                    .iter()
1105                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1106                    .collect::<HashMap<_, _>>(),
1107                hashmap! {
1108                    output::ArtifactType::Stdout => vec![],
1109                    output::ArtifactType::Stderr => vec![]
1110                }
1111            );
1112            assert!(case.report.directories.is_empty());
1113
1114            let suite =
1115                reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
1116            assert_eq!(suite.report.name, "test-url");
1117            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Timedout));
1118            assert!(suite.report.is_finished);
1119            assert!(suite.report.artifacts.is_empty());
1120            assert!(suite.report.directories.is_empty());
1121        };
1122
1123        futures::future::join(serve_all_events_then_hang(stream, all_events), test_fut).await;
1124    }
1125}