1use crate::cancel::{Cancelled, NamedFutureExt, OrCancel};
6use crate::outcome::{Lifecycle, Outcome, RunTestSuiteError, UnexpectedEventError};
7use crate::output::{self, ArtifactType, CaseId, SuiteReporter, Timestamp};
8use crate::stream_util::StreamUtil;
9use crate::trace::duration;
10use crate::{artifacts, diagnostics};
11use diagnostics_data::Severity;
12use fidl_fuchsia_test_manager::{
13 self as ftest_manager, CaseArtifact, CaseFinished, CaseFound, CaseStarted, CaseStopped,
14 SuiteArtifact, SuiteStopped,
15};
16use fuchsia_async as fasync;
17use futures::future::Either;
18use futures::prelude::*;
19use futures::stream::FuturesUnordered;
20use futures::StreamExt;
21use log::{error, info, warn};
22use std::collections::HashMap;
23use std::io::Write;
24use std::sync::atomic::{AtomicBool, Ordering};
25use std::sync::Arc;
26
27struct CollectedEntityState<R> {
29 reporter: R,
30 name: String,
31 lifecycle: Lifecycle,
32 artifact_tasks:
33 Vec<fasync::Task<Result<Option<diagnostics::LogCollectionOutcome>, anyhow::Error>>>,
34}
35
36pub(crate) async fn run_suite_and_collect_logs<F: Future<Output = ()> + Unpin>(
42 running_suite: RunningSuite,
43 suite_reporter: &SuiteReporter<'_>,
44 log_display: diagnostics::LogDisplayConfiguration,
45 cancel_fut: F,
46) -> Result<Outcome, RunTestSuiteError> {
47 duration!(c"collect_suite");
48
49 let RunningSuite {
50 mut event_stream, stopper, timeout, timeout_grace, max_severity_logs, ..
51 } = running_suite;
52
53 let log_opts =
54 diagnostics::LogCollectionOptions { format: log_display, max_severity: max_severity_logs };
55
56 let mut test_cases: HashMap<u32, CollectedEntityState<_>> = HashMap::new();
57 let mut suite_state = CollectedEntityState {
58 reporter: suite_reporter,
59 name: "".to_string(),
60 lifecycle: Lifecycle::Found,
61 artifact_tasks: vec![],
62 };
63 let mut suite_finish_timestamp = Timestamp::Unknown;
64 let mut outcome = Outcome::Passed;
65
66 let collect_results_fut = async {
67 while let Some(event_result) = event_stream.next().named("next_event").await {
68 match event_result {
69 Err(e) => {
70 suite_state
71 .reporter
72 .stopped(&output::ReportedOutcome::Error, Timestamp::Unknown)?;
73 return Err(e);
74 }
75 Ok(event) => {
76 let timestamp = Timestamp::from_nanos(event.timestamp);
77 match event.payload.expect("event cannot be None") {
78 ftest_manager::SuiteEventPayload::CaseFound(CaseFound {
79 test_case_name,
80 identifier,
81 }) => {
82 if test_cases.contains_key(&identifier) {
83 return Err(UnexpectedEventError::InvalidCaseEvent {
84 last_state: Lifecycle::Found,
85 next_state: Lifecycle::Found,
86 test_case_name,
87 identifier,
88 }
89 .into());
90 }
91 test_cases.insert(
92 identifier,
93 CollectedEntityState {
94 reporter: suite_reporter
95 .new_case(&test_case_name, &CaseId(identifier))?,
96 name: test_case_name,
97 lifecycle: Lifecycle::Found,
98 artifact_tasks: vec![],
99 },
100 );
101 }
102 ftest_manager::SuiteEventPayload::CaseStarted(CaseStarted {
103 identifier,
104 }) => {
105 let entry = test_cases.get_mut(&identifier).ok_or(
106 UnexpectedEventError::CaseEventButNotFound {
107 next_state: Lifecycle::Started,
108 identifier,
109 },
110 )?;
111 match &entry.lifecycle {
112 Lifecycle::Found => {
113 entry.reporter.started(Timestamp::Unknown)?;
116 entry.lifecycle = Lifecycle::Started;
117 }
118 other => {
119 return Err(UnexpectedEventError::InvalidCaseEvent {
120 last_state: *other,
121 next_state: Lifecycle::Started,
122 test_case_name: entry.name.clone(),
123 identifier,
124 }
125 .into());
126 }
127 }
128 }
129 ftest_manager::SuiteEventPayload::CaseArtifact(CaseArtifact {
130 identifier,
131 artifact,
132 }) => {
133 let entry = test_cases.get_mut(&identifier).ok_or(
134 UnexpectedEventError::CaseArtifactButNotFound { identifier },
135 )?;
136 if matches!(entry.lifecycle, Lifecycle::Finished) {
137 return Err(UnexpectedEventError::CaseArtifactButFinished {
138 identifier,
139 }
140 .into());
141 }
142 let artifact_fut = artifacts::drain_artifact(
143 &entry.reporter,
144 artifact,
145 log_opts.clone(),
146 )
147 .await?;
148 entry.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
149 }
150 ftest_manager::SuiteEventPayload::CaseStopped(CaseStopped {
151 identifier,
152 status,
153 }) => {
154 let entry = test_cases.get_mut(&identifier).ok_or(
155 UnexpectedEventError::CaseEventButNotFound {
156 next_state: Lifecycle::Stopped,
157 identifier,
158 },
159 )?;
160 match &entry.lifecycle {
161 Lifecycle::Started => {
162 entry.reporter.stopped(&status.into(), Timestamp::Unknown)?;
165 entry.lifecycle = Lifecycle::Stopped;
166 }
167 other => {
168 return Err(UnexpectedEventError::InvalidCaseEvent {
169 last_state: *other,
170 next_state: Lifecycle::Stopped,
171 test_case_name: entry.name.clone(),
172 identifier,
173 }
174 .into());
175 }
176 }
177 }
178 ftest_manager::SuiteEventPayload::CaseFinished(CaseFinished {
179 identifier,
180 }) => {
181 let entry = test_cases.get_mut(&identifier).ok_or(
182 UnexpectedEventError::CaseEventButNotFound {
183 next_state: Lifecycle::Finished,
184 identifier,
185 },
186 )?;
187 match &entry.lifecycle {
188 Lifecycle::Stopped => {
189 entry.lifecycle = Lifecycle::Finished;
192 }
193 other => {
194 return Err(UnexpectedEventError::InvalidCaseEvent {
195 last_state: *other,
196 next_state: Lifecycle::Finished,
197 test_case_name: entry.name.clone(),
198 identifier,
199 }
200 .into());
201 }
202 }
203 }
204 ftest_manager::SuiteEventPayload::SuiteArtifact(SuiteArtifact {
205 artifact,
206 }) => {
207 let artifact_fut = artifacts::drain_artifact(
208 suite_reporter,
209 artifact,
210 log_opts.clone(),
211 )
212 .await?;
213 suite_state.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
214 }
215 ftest_manager::SuiteEventPayload::SuiteStarted(_) => {
216 match &suite_state.lifecycle {
217 Lifecycle::Found => {
218 suite_state.reporter.started(timestamp)?;
219 suite_state.lifecycle = Lifecycle::Started;
220 }
221 other => {
222 return Err(UnexpectedEventError::InvalidSuiteEvent {
223 last_state: *other,
224 next_state: Lifecycle::Started,
225 }
226 .into());
227 }
228 }
229 }
230 ftest_manager::SuiteEventPayload::SuiteStopped(SuiteStopped { status }) => {
231 match &suite_state.lifecycle {
232 Lifecycle::Started => {
233 suite_state.lifecycle = Lifecycle::Stopped;
234 suite_finish_timestamp = timestamp;
235 outcome = match status {
236 ftest_manager::SuiteStatus::Passed => Outcome::Passed,
237 ftest_manager::SuiteStatus::Failed => Outcome::Failed,
238 ftest_manager::SuiteStatus::DidNotFinish => {
239 Outcome::Inconclusive
240 }
241 ftest_manager::SuiteStatus::TimedOut
242 | ftest_manager::SuiteStatus::Stopped => Outcome::Timedout,
243 ftest_manager::SuiteStatus::InternalError => {
244 Outcome::error(
245 UnexpectedEventError::InternalErrorSuiteStatus,
246 )
247 }
248 s => {
249 return Err(
250 UnexpectedEventError::UnrecognizedSuiteStatus {
251 status: s,
252 }
253 .into(),
254 );
255 }
256 };
257 }
258 other => {
259 return Err(UnexpectedEventError::InvalidSuiteEvent {
260 last_state: *other,
261 next_state: Lifecycle::Stopped,
262 }
263 .into());
264 }
265 }
266 }
267 ftest_manager::SuiteEventPayloadUnknown!() => {
268 warn!("Encountered unrecognized suite event");
269 }
270 }
271 }
272 }
273 }
274 drop(event_stream); Ok(())
276 }
277 .boxed_local();
278
279 let start_time = std::time::Instant::now();
280 let (stop_timeout_future, kill_timeout_future) = match timeout {
281 None => {
282 (futures::future::pending::<()>().boxed(), futures::future::pending::<()>().boxed())
283 }
284 Some(duration) => (
285 fasync::Timer::new(start_time + duration).boxed(),
286 fasync::Timer::new(start_time + duration + timeout_grace).boxed(),
287 ),
288 };
289
290 let collect_or_stop_fut = async move {
292 match futures::future::select(stop_timeout_future, collect_results_fut).await {
293 Either::Left((_stop_done, collect_fut)) => {
294 stopper.stop();
295 collect_fut.await
296 }
297 Either::Right((result, _)) => result,
298 }
299 };
300
301 let kill_fut = async move {
305 match futures::future::select(cancel_fut, kill_timeout_future).await {
306 Either::Left(_) => Outcome::Cancelled,
307 Either::Right(_) => Outcome::Timedout,
308 }
309 }
310 .shared();
311
312 let early_termination_outcome =
313 match collect_or_stop_fut.boxed_local().or_cancelled(kill_fut.clone()).await {
314 Ok(Ok(())) => None,
315 Ok(Err(e)) => return Err(e),
316 Err(Cancelled(outcome)) => Some(outcome),
317 };
318
319 info!("Awaiting case artifacts");
321 let mut unfinished_test_case_names = vec![];
322 for (_, test_case) in test_cases.into_iter() {
323 let CollectedEntityState { reporter, name, lifecycle, artifact_tasks } = test_case;
324 match (lifecycle, early_termination_outcome.clone()) {
325 (Lifecycle::Started | Lifecycle::Found, Some(early)) => {
326 reporter.stopped(&early.into(), Timestamp::Unknown)?;
327 }
328 (Lifecycle::Found, None) => {
329 unfinished_test_case_names.push(name.clone());
330 reporter.stopped(&Outcome::Inconclusive.into(), Timestamp::Unknown)?;
331 }
332 (Lifecycle::Started, None) => {
333 unfinished_test_case_names.push(name.clone());
334 reporter.stopped(&Outcome::DidNotFinish.into(), Timestamp::Unknown)?;
335 }
336 (Lifecycle::Stopped | Lifecycle::Finished, _) => (),
337 }
338
339 let finish_artifacts_fut = FuturesUnordered::from_iter(artifact_tasks)
340 .map(|result| match result {
341 Err(e) => {
342 error!("Failed to collect artifact for {}: {:?}", name, e);
343 }
344 Ok(Some(_log_result)) => warn!("Unexpectedly got log results for a test case"),
345 Ok(None) => (),
346 })
347 .collect::<()>();
348 if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut.clone()).await {
349 warn!("Stopped polling artifacts for {} due to timeout", name);
350 }
351
352 reporter.finished()?;
353 }
354 if !unfinished_test_case_names.is_empty() {
355 outcome = Outcome::error(UnexpectedEventError::CasesDidNotFinish {
356 cases: unfinished_test_case_names,
357 });
358 }
359
360 match (suite_state.lifecycle, early_termination_outcome) {
361 (Lifecycle::Found | Lifecycle::Started, Some(early)) => {
362 if matches!(&outcome, Outcome::Passed | Outcome::Failed) {
363 outcome = early;
364 }
365 }
366 (Lifecycle::Found | Lifecycle::Started, None) => {
367 outcome = Outcome::error(UnexpectedEventError::SuiteDidNotReportStop);
368 }
369 (Lifecycle::Stopped, _) => (),
371 (Lifecycle::Finished, _) => unreachable!(),
373 }
374
375 let restricted_logs_present = AtomicBool::new(false);
376 let finish_artifacts_fut = FuturesUnordered::from_iter(suite_state.artifact_tasks)
377 .then(|result| async {
378 match result {
379 Err(e) => {
380 error!("Failed to collect artifact for suite: {:?}", e);
381 }
382 Ok(Some(log_result)) => match log_result {
383 diagnostics::LogCollectionOutcome::Error { restricted_logs } => {
384 restricted_logs_present.store(true, Ordering::Relaxed);
385 let mut log_artifact = match suite_reporter
386 .new_artifact(&ArtifactType::RestrictedLog)
387 {
388 Ok(artifact) => artifact,
389 Err(e) => {
390 warn!("Error creating artifact to report restricted logs: {:?}", e);
391 return;
392 }
393 };
394 for log in restricted_logs.iter() {
395 if let Err(e) = writeln!(log_artifact, "{}", log) {
396 warn!("Error recording restricted logs: {:?}", e);
397 return;
398 }
399 }
400 }
401 diagnostics::LogCollectionOutcome::Passed => (),
402 },
403 Ok(None) => (),
404 }
405 })
406 .collect::<()>();
407 if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut).await {
408 warn!("Stopped polling artifacts due to timeout");
409 }
410 if restricted_logs_present.into_inner() && matches!(outcome, Outcome::Passed) {
411 outcome = Outcome::Failed;
412 }
413
414 suite_reporter.stopped(&outcome.clone().into(), suite_finish_timestamp)?;
415
416 Ok(outcome)
417}
418
419type SuiteEventStream = std::pin::Pin<
420 Box<dyn Stream<Item = Result<ftest_manager::SuiteEvent, RunTestSuiteError>> + Send>,
421>;
422
423pub(crate) struct RunningSuite {
426 event_stream: SuiteEventStream,
427 stopper: RunningSuiteStopper,
428 max_severity_logs: Option<Severity>,
429 timeout: Option<std::time::Duration>,
430 timeout_grace: std::time::Duration,
431}
432
433struct RunningSuiteStopper(Arc<ftest_manager::SuiteControllerProxy>);
434
435impl RunningSuiteStopper {
436 fn stop(self) {
437 let _ = self.0.stop();
438 }
439}
440
441impl RunningSuite {
442 const DEFAULT_PIPELINED_REQUESTS: usize = 8;
446 pub(crate) async fn wait_for_start(
447 proxy: ftest_manager::SuiteControllerProxy,
448 max_severity_logs: Option<Severity>,
449 timeout: Option<std::time::Duration>,
450 timeout_grace: std::time::Duration,
451 max_pipelined: Option<usize>,
452 ) -> Self {
453 let proxy = Arc::new(proxy);
454 let proxy_clone = proxy.clone();
455 let unprocessed_event_stream = futures::stream::repeat_with(move || {
457 proxy.get_events().inspect(|events_result| match events_result {
458 Ok(Ok(ref events)) => info!("Latest suite event: {:?}", events.last()),
459 _ => (),
460 })
461 })
462 .buffered(max_pipelined.unwrap_or(Self::DEFAULT_PIPELINED_REQUESTS));
463 let terminated_event_stream =
465 unprocessed_event_stream.take_until_stop_after(|result| match &result {
466 Ok(Ok(events)) => events.is_empty(),
467 Err(_) | Ok(Err(_)) => true,
468 });
469 let mut event_stream = terminated_event_stream
471 .map(Self::convert_to_result_vec)
472 .map(futures::stream::iter)
473 .flatten()
474 .peekable();
475 std::pin::Pin::new(&mut event_stream).peek().await;
477
478 Self {
479 event_stream: event_stream.boxed(),
480 stopper: RunningSuiteStopper(proxy_clone),
481 timeout,
482 timeout_grace,
483 max_severity_logs,
484 }
485 }
486
487 fn convert_to_result_vec(
488 vec: Result<
489 Result<Vec<ftest_manager::SuiteEvent>, ftest_manager::LaunchError>,
490 fidl::Error,
491 >,
492 ) -> Vec<Result<ftest_manager::SuiteEvent, RunTestSuiteError>> {
493 match vec {
494 Ok(Ok(events)) => events.into_iter().map(Ok).collect(),
495 Ok(Err(e)) => vec![Err(e.into())],
496 Err(e) => vec![Err(e.into())],
497 }
498 }
499}
500
501#[cfg(test)]
502mod test {
503 use super::*;
504 use crate::output::{EntityId, SuiteId};
505 use assert_matches::assert_matches;
506 use fidl::endpoints::create_proxy_and_stream;
507 use maplit::hashmap;
508
509 async fn respond_to_get_events(
510 request_stream: &mut ftest_manager::SuiteControllerRequestStream,
511 events: Vec<ftest_manager::SuiteEvent>,
512 ) {
513 let request = request_stream
514 .next()
515 .await
516 .expect("did not get next request")
517 .expect("error getting next request");
518 let responder = match request {
519 ftest_manager::SuiteControllerRequest::GetEvents { responder } => responder,
520 r => panic!("Expected GetEvents request but got {:?}", r),
521 };
522
523 responder.send(Ok(events)).expect("send events");
524 }
525
526 async fn serve_all_events(
528 mut request_stream: ftest_manager::SuiteControllerRequestStream,
529 events: Vec<ftest_manager::SuiteEvent>,
530 ) {
531 const BATCH_SIZE: usize = 5;
532 let mut event_iter = events.into_iter();
533 while event_iter.len() > 0 {
534 respond_to_get_events(
535 &mut request_stream,
536 event_iter.by_ref().take(BATCH_SIZE).collect(),
537 )
538 .await;
539 }
540 respond_to_get_events(&mut request_stream, vec![]).await;
541 }
542
543 async fn serve_all_events_then_hang(
545 mut request_stream: ftest_manager::SuiteControllerRequestStream,
546 events: Vec<ftest_manager::SuiteEvent>,
547 ) {
548 const BATCH_SIZE: usize = 5;
549 let mut event_iter = events.into_iter();
550 while event_iter.len() > 0 {
551 respond_to_get_events(
552 &mut request_stream,
553 event_iter.by_ref().take(BATCH_SIZE).collect(),
554 )
555 .await;
556 }
557 let _requests = request_stream.collect::<Vec<_>>().await;
558 }
559
560 fn create_empty_event(timestamp: i64) -> ftest_manager::SuiteEvent {
564 ftest_manager::SuiteEvent { timestamp: Some(timestamp), ..Default::default() }
565 }
566
567 macro_rules! assert_empty_events_eq {
568 ($t1:expr, $t2:expr) => {
569 assert_eq!($t1.timestamp, $t2.timestamp, "Got incorrect event.")
570 };
571 }
572
573 #[fuchsia::test]
574 async fn running_suite_events_simple() {
575 let (suite_proxy, mut suite_request_stream) =
576 create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
577 let suite_server_task = fasync::Task::spawn(async move {
578 respond_to_get_events(&mut suite_request_stream, vec![create_empty_event(0)]).await;
579 respond_to_get_events(&mut suite_request_stream, vec![]).await;
580 drop(suite_request_stream);
581 });
582
583 let mut running_suite =
584 RunningSuite::wait_for_start(suite_proxy, None, None, std::time::Duration::ZERO, None)
585 .await;
586 assert_empty_events_eq!(
587 running_suite.event_stream.next().await.unwrap().unwrap(),
588 create_empty_event(0)
589 );
590 assert!(running_suite.event_stream.next().await.is_none());
591 assert!(running_suite.event_stream.next().await.is_none());
593 suite_server_task.await;
594 }
595
596 #[fuchsia::test]
597 async fn running_suite_events_multiple_events() {
598 let (suite_proxy, mut suite_request_stream) =
599 create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
600 let suite_server_task = fasync::Task::spawn(async move {
601 respond_to_get_events(
602 &mut suite_request_stream,
603 vec![create_empty_event(0), create_empty_event(1)],
604 )
605 .await;
606 respond_to_get_events(
607 &mut suite_request_stream,
608 vec![create_empty_event(2), create_empty_event(3)],
609 )
610 .await;
611 respond_to_get_events(&mut suite_request_stream, vec![]).await;
612 drop(suite_request_stream);
613 });
614
615 let mut running_suite =
616 RunningSuite::wait_for_start(suite_proxy, None, None, std::time::Duration::ZERO, None)
617 .await;
618
619 for num in 0..4 {
620 assert_empty_events_eq!(
621 running_suite.event_stream.next().await.unwrap().unwrap(),
622 create_empty_event(num)
623 );
624 }
625 assert!(running_suite.event_stream.next().await.is_none());
626 suite_server_task.await;
627 }
628
629 #[fuchsia::test]
630 async fn running_suite_events_peer_closed() {
631 let (suite_proxy, mut suite_request_stream) =
632 create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
633 let suite_server_task = fasync::Task::spawn(async move {
634 respond_to_get_events(&mut suite_request_stream, vec![create_empty_event(1)]).await;
635 drop(suite_request_stream);
636 });
637
638 let mut running_suite =
639 RunningSuite::wait_for_start(suite_proxy, None, None, std::time::Duration::ZERO, None)
640 .await;
641 assert_empty_events_eq!(
642 running_suite.event_stream.next().await.unwrap().unwrap(),
643 create_empty_event(1)
644 );
645 assert_matches!(
646 running_suite.event_stream.next().await,
647 Some(Err(RunTestSuiteError::Fidl(fidl::Error::ClientChannelClosed { .. })))
648 );
649 suite_server_task.await;
650 }
651
652 fn suite_event_from_payload(
653 timestamp: i64,
654 payload: ftest_manager::SuiteEventPayload,
655 ) -> ftest_manager::SuiteEvent {
656 let mut event = create_empty_event(timestamp);
657 event.payload = Some(payload);
658 event
659 }
660
661 fn case_found_event(timestamp: i64, identifier: u32, name: &str) -> ftest_manager::SuiteEvent {
662 suite_event_from_payload(
663 timestamp,
664 ftest_manager::SuiteEventPayload::CaseFound(ftest_manager::CaseFound {
665 test_case_name: name.into(),
666 identifier,
667 }),
668 )
669 }
670
671 fn case_started_event(timestamp: i64, identifier: u32) -> ftest_manager::SuiteEvent {
672 suite_event_from_payload(
673 timestamp,
674 ftest_manager::SuiteEventPayload::CaseStarted(ftest_manager::CaseStarted {
675 identifier,
676 }),
677 )
678 }
679
680 fn case_stopped_event(
681 timestamp: i64,
682 identifier: u32,
683 status: ftest_manager::CaseStatus,
684 ) -> ftest_manager::SuiteEvent {
685 suite_event_from_payload(
686 timestamp,
687 ftest_manager::SuiteEventPayload::CaseStopped(ftest_manager::CaseStopped {
688 identifier,
689 status,
690 }),
691 )
692 }
693
694 fn case_finished_event(timestamp: i64, identifier: u32) -> ftest_manager::SuiteEvent {
695 suite_event_from_payload(
696 timestamp,
697 ftest_manager::SuiteEventPayload::CaseFinished(ftest_manager::CaseFinished {
698 identifier,
699 }),
700 )
701 }
702
703 fn case_stdout_event(
704 timestamp: i64,
705 identifier: u32,
706 stdout: fidl::Socket,
707 ) -> ftest_manager::SuiteEvent {
708 suite_event_from_payload(
709 timestamp,
710 ftest_manager::SuiteEventPayload::CaseArtifact(ftest_manager::CaseArtifact {
711 identifier,
712 artifact: ftest_manager::Artifact::Stdout(stdout),
713 }),
714 )
715 }
716
717 fn case_stderr_event(
718 timestamp: i64,
719 identifier: u32,
720 stderr: fidl::Socket,
721 ) -> ftest_manager::SuiteEvent {
722 suite_event_from_payload(
723 timestamp,
724 ftest_manager::SuiteEventPayload::CaseArtifact(ftest_manager::CaseArtifact {
725 identifier,
726 artifact: ftest_manager::Artifact::Stderr(stderr),
727 }),
728 )
729 }
730
731 fn suite_started_event(timestamp: i64) -> ftest_manager::SuiteEvent {
732 suite_event_from_payload(
733 timestamp,
734 ftest_manager::SuiteEventPayload::SuiteStarted(ftest_manager::SuiteStarted),
735 )
736 }
737
738 fn suite_stopped_event(
739 timestamp: i64,
740 status: ftest_manager::SuiteStatus,
741 ) -> ftest_manager::SuiteEvent {
742 suite_event_from_payload(
743 timestamp,
744 ftest_manager::SuiteEventPayload::SuiteStopped(ftest_manager::SuiteStopped { status }),
745 )
746 }
747
748 #[fuchsia::test]
749 async fn collect_suite_events_simple() {
750 let all_events = vec![
751 suite_started_event(0),
752 case_found_event(100, 0, "my_test_case"),
753 case_started_event(200, 0),
754 case_stopped_event(300, 0, ftest_manager::CaseStatus::Passed),
755 case_finished_event(400, 0),
756 suite_stopped_event(500, ftest_manager::SuiteStatus::Passed),
757 ];
758
759 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
760 let test_fut = async move {
761 let reporter = output::InMemoryReporter::new();
762 let run_reporter = output::RunReporter::new(reporter.clone());
763 let suite_reporter =
764 run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
765
766 let suite =
767 RunningSuite::wait_for_start(proxy, None, None, std::time::Duration::ZERO, None)
768 .await;
769 assert_eq!(
770 run_suite_and_collect_logs(
771 suite,
772 &suite_reporter,
773 diagnostics::LogDisplayConfiguration::default(),
774 futures::future::pending()
775 )
776 .await
777 .expect("collect results"),
778 Outcome::Passed
779 );
780 suite_reporter.finished().expect("Reporter finished");
781
782 let reports = reporter.get_reports();
783 let case = reports
784 .iter()
785 .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
786 .unwrap();
787 assert_eq!(case.report.name, "my_test_case");
788 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
789 assert!(case.report.is_finished);
790 assert!(case.report.artifacts.is_empty());
791 assert!(case.report.directories.is_empty());
792 let suite =
793 reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
794 assert_eq!(suite.report.name, "test-url");
795 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
796 assert!(suite.report.is_finished);
797 assert!(suite.report.artifacts.is_empty());
798 assert!(suite.report.directories.is_empty());
799 };
800
801 futures::future::join(serve_all_events(stream, all_events), test_fut).await;
802 }
803
804 #[fuchsia::test]
805 async fn collect_suite_events_with_case_artifacts() {
806 const STDOUT_CONTENT: &str = "stdout from my_test_case";
807 const STDERR_CONTENT: &str = "stderr from my_test_case";
808
809 let (stdout_write, stdout_read) = fidl::Socket::create_stream();
810 let (stderr_write, stderr_read) = fidl::Socket::create_stream();
811 let all_events = vec![
812 suite_started_event(0),
813 case_found_event(100, 0, "my_test_case"),
814 case_started_event(200, 0),
815 case_stdout_event(300, 0, stdout_read),
816 case_stderr_event(300, 0, stderr_read),
817 case_stopped_event(300, 0, ftest_manager::CaseStatus::Passed),
818 case_finished_event(400, 0),
819 suite_stopped_event(500, ftest_manager::SuiteStatus::Passed),
820 ];
821
822 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
823 let stdio_write_fut = async move {
824 let mut async_stdout = fasync::Socket::from_socket(stdout_write);
825 async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
826 let mut async_stderr = fasync::Socket::from_socket(stderr_write);
827 async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
828 };
829 let test_fut = async move {
830 let reporter = output::InMemoryReporter::new();
831 let run_reporter = output::RunReporter::new(reporter.clone());
832 let suite_reporter =
833 run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
834
835 let suite =
836 RunningSuite::wait_for_start(proxy, None, None, std::time::Duration::ZERO, None)
837 .await;
838 assert_eq!(
839 run_suite_and_collect_logs(
840 suite,
841 &suite_reporter,
842 diagnostics::LogDisplayConfiguration::default(),
843 futures::future::pending()
844 )
845 .await
846 .expect("collect results"),
847 Outcome::Passed
848 );
849 suite_reporter.finished().expect("Reporter finished");
850
851 let reports = reporter.get_reports();
852 let case = reports
853 .iter()
854 .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
855 .unwrap();
856 assert_eq!(case.report.name, "my_test_case");
857 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
858 assert!(case.report.is_finished);
859 assert_eq!(case.report.artifacts.len(), 2);
860 assert_eq!(
861 case.report
862 .artifacts
863 .iter()
864 .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
865 .collect::<HashMap<_, _>>(),
866 hashmap! {
867 output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
868 output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
869 }
870 );
871 assert!(case.report.directories.is_empty());
872
873 let suite =
874 reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
875 assert_eq!(suite.report.name, "test-url");
876 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
877 assert!(suite.report.is_finished);
878 assert!(suite.report.artifacts.is_empty());
879 assert!(suite.report.directories.is_empty());
880 };
881
882 futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
883 .await;
884 }
885
886 #[fuchsia::test]
887 async fn collect_suite_events_case_artifacts_complete_after_suite() {
888 const STDOUT_CONTENT: &str = "stdout from my_test_case";
889 const STDERR_CONTENT: &str = "stderr from my_test_case";
890
891 let (stdout_write, stdout_read) = fidl::Socket::create_stream();
892 let (stderr_write, stderr_read) = fidl::Socket::create_stream();
893 let all_events = vec![
894 suite_started_event(0),
895 case_found_event(100, 0, "my_test_case"),
896 case_started_event(200, 0),
897 case_stdout_event(300, 0, stdout_read),
898 case_stderr_event(300, 0, stderr_read),
899 case_stopped_event(300, 0, ftest_manager::CaseStatus::Passed),
900 case_finished_event(400, 0),
901 suite_stopped_event(500, ftest_manager::SuiteStatus::Passed),
902 ];
903
904 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
905 let serve_fut = async move {
906 serve_all_events(stream, all_events).await;
908 let mut async_stdout = fasync::Socket::from_socket(stdout_write);
909 async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
910 let mut async_stderr = fasync::Socket::from_socket(stderr_write);
911 async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
912 };
913 let test_fut = async move {
914 let reporter = output::InMemoryReporter::new();
915 let run_reporter = output::RunReporter::new(reporter.clone());
916 let suite_reporter =
917 run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
918
919 let suite =
920 RunningSuite::wait_for_start(proxy, None, None, std::time::Duration::ZERO, Some(1))
921 .await;
922 assert_eq!(
923 run_suite_and_collect_logs(
924 suite,
925 &suite_reporter,
926 diagnostics::LogDisplayConfiguration::default(),
927 futures::future::pending()
928 )
929 .await
930 .expect("collect results"),
931 Outcome::Passed
932 );
933 suite_reporter.finished().expect("Reporter finished");
934
935 let reports = reporter.get_reports();
936 let case = reports
937 .iter()
938 .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
939 .unwrap();
940 assert_eq!(case.report.name, "my_test_case");
941 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
942 assert!(case.report.is_finished);
943 assert_eq!(case.report.artifacts.len(), 2);
944 assert_eq!(
945 case.report
946 .artifacts
947 .iter()
948 .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
949 .collect::<HashMap<_, _>>(),
950 hashmap! {
951 output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
952 output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
953 }
954 );
955 assert!(case.report.directories.is_empty());
956
957 let suite =
958 reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
959 assert_eq!(suite.report.name, "test-url");
960 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
961 assert!(suite.report.is_finished);
962 assert!(suite.report.artifacts.is_empty());
963 assert!(suite.report.directories.is_empty());
964 };
965
966 futures::future::join(serve_fut, test_fut).await;
967 }
968
969 #[fuchsia::test]
970 async fn collect_suite_events_with_case_artifacts_sent_after_case_stopped() {
971 const STDOUT_CONTENT: &str = "stdout from my_test_case";
972 const STDERR_CONTENT: &str = "stderr from my_test_case";
973
974 let (stdout_write, stdout_read) = fidl::Socket::create_stream();
975 let (stderr_write, stderr_read) = fidl::Socket::create_stream();
976 let all_events = vec![
977 suite_started_event(0),
978 case_found_event(100, 0, "my_test_case"),
979 case_started_event(200, 0),
980 case_stopped_event(300, 0, ftest_manager::CaseStatus::Passed),
981 case_stdout_event(300, 0, stdout_read),
982 case_stderr_event(300, 0, stderr_read),
983 case_finished_event(400, 0),
984 suite_stopped_event(500, ftest_manager::SuiteStatus::Passed),
985 ];
986
987 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
988 let stdio_write_fut = async move {
989 let mut async_stdout = fasync::Socket::from_socket(stdout_write);
990 async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
991 let mut async_stderr = fasync::Socket::from_socket(stderr_write);
992 async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
993 };
994 let test_fut = async move {
995 let reporter = output::InMemoryReporter::new();
996 let run_reporter = output::RunReporter::new(reporter.clone());
997 let suite_reporter =
998 run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
999
1000 let suite =
1001 RunningSuite::wait_for_start(proxy, None, None, std::time::Duration::ZERO, None)
1002 .await;
1003 assert_eq!(
1004 run_suite_and_collect_logs(
1005 suite,
1006 &suite_reporter,
1007 diagnostics::LogDisplayConfiguration::default(),
1008 futures::future::pending()
1009 )
1010 .await
1011 .expect("collect results"),
1012 Outcome::Passed
1013 );
1014 suite_reporter.finished().expect("Reporter finished");
1015
1016 let reports = reporter.get_reports();
1017 let case = reports
1018 .iter()
1019 .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
1020 .unwrap();
1021 assert_eq!(case.report.name, "my_test_case");
1022 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
1023 assert!(case.report.is_finished);
1024 assert_eq!(case.report.artifacts.len(), 2);
1025 assert_eq!(
1026 case.report
1027 .artifacts
1028 .iter()
1029 .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1030 .collect::<HashMap<_, _>>(),
1031 hashmap! {
1032 output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
1033 output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
1034 }
1035 );
1036 assert!(case.report.directories.is_empty());
1037
1038 let suite =
1039 reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
1040 assert_eq!(suite.report.name, "test-url");
1041 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
1042 assert!(suite.report.is_finished);
1043 assert!(suite.report.artifacts.is_empty());
1044 assert!(suite.report.directories.is_empty());
1045 };
1046
1047 futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
1048 .await;
1049 }
1050
1051 #[fuchsia::test]
1052 async fn collect_suite_events_timed_out_case_with_hanging_artifacts() {
1053 let (_stdout_write, stdout_read) = fidl::Socket::create_stream();
1055 let (_stderr_write, stderr_read) = fidl::Socket::create_stream();
1056 let all_events = vec![
1057 suite_started_event(0),
1058 case_found_event(100, 0, "my_test_case"),
1059 case_started_event(200, 0),
1060 case_stdout_event(300, 0, stdout_read),
1061 case_stderr_event(300, 0, stderr_read),
1062 ];
1063
1064 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
1065 let test_fut = async move {
1066 let reporter = output::InMemoryReporter::new();
1067 let run_reporter = output::RunReporter::new(reporter.clone());
1068 let suite_reporter =
1069 run_reporter.new_suite("test-url", &SuiteId(0)).expect("create new suite");
1070
1071 let suite = RunningSuite::wait_for_start(
1072 proxy,
1073 None,
1074 Some(std::time::Duration::from_secs(2)),
1075 std::time::Duration::ZERO,
1076 None,
1077 )
1078 .await;
1079 assert_eq!(
1080 run_suite_and_collect_logs(
1081 suite,
1082 &suite_reporter,
1083 diagnostics::LogDisplayConfiguration::default(),
1084 futures::future::pending()
1085 )
1086 .await
1087 .expect("collect results"),
1088 Outcome::Timedout
1089 );
1090 suite_reporter.finished().expect("Reporter finished");
1091
1092 let reports = reporter.get_reports();
1093 let case = reports
1094 .iter()
1095 .find(|report| report.id == EntityId::Case { suite: SuiteId(0), case: CaseId(0) })
1096 .unwrap();
1097 assert_eq!(case.report.name, "my_test_case");
1098 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Timedout));
1099 assert!(case.report.is_finished);
1100 assert_eq!(case.report.artifacts.len(), 2);
1101 assert_eq!(
1102 case.report
1103 .artifacts
1104 .iter()
1105 .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1106 .collect::<HashMap<_, _>>(),
1107 hashmap! {
1108 output::ArtifactType::Stdout => vec![],
1109 output::ArtifactType::Stderr => vec![]
1110 }
1111 );
1112 assert!(case.report.directories.is_empty());
1113
1114 let suite =
1115 reports.iter().find(|report| report.id == EntityId::Suite(SuiteId(0))).unwrap();
1116 assert_eq!(suite.report.name, "test-url");
1117 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Timedout));
1118 assert!(suite.report.is_finished);
1119 assert!(suite.report.artifacts.is_empty());
1120 assert!(suite.report.directories.is_empty());
1121 };
1122
1123 futures::future::join(serve_all_events_then_hang(stream, all_events), test_fut).await;
1124 }
1125}