1use crate::cancel::{Cancelled, NamedFutureExt, OrCancel};
6use crate::outcome::{Lifecycle, Outcome, RunTestSuiteError, UnexpectedEventError};
7use crate::output::{self, ArtifactType, CaseId, SuiteReporter, Timestamp};
8use crate::stream_util::StreamUtil;
9use crate::trace::duration;
10use crate::{artifacts, diagnostics};
11use diagnostics_data::Severity;
12use fidl_fuchsia_test_manager::{
13 self as ftest_manager, LaunchError, SuiteArtifactGeneratedEventDetails,
14 SuiteStoppedEventDetails, TestCaseArtifactGeneratedEventDetails, TestCaseFinishedEventDetails,
15 TestCaseFoundEventDetails, TestCaseStartedEventDetails, TestCaseStoppedEventDetails,
16};
17use fuchsia_async as fasync;
18use futures::future::Either;
19use futures::prelude::*;
20use futures::stream::FuturesUnordered;
21use futures::StreamExt;
22use log::{error, info, warn};
23use std::collections::HashMap;
24use std::io::Write;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::sync::Arc;
27
28struct CollectedEntityState<R> {
30 reporter: R,
31 name: String,
32 lifecycle: Lifecycle,
33 artifact_tasks:
34 Vec<fasync::Task<Result<Option<diagnostics::LogCollectionOutcome>, anyhow::Error>>>,
35}
36
37pub(crate) async fn run_suite_and_collect_logs<F: Future<Output = ()> + Unpin>(
43 running_suite: RunningSuite,
44 suite_reporter: &SuiteReporter<'_>,
45 log_display: diagnostics::LogDisplayConfiguration,
46 cancel_fut: F,
47) -> Result<Outcome, RunTestSuiteError> {
48 duration!(c"collect_suite");
49
50 let RunningSuite {
51 mut event_stream,
52 stopper,
53 timeout,
54 timeout_grace,
55 max_severity_logs,
56 no_cases_equals_success,
57 ..
58 } = running_suite;
59
60 let log_opts =
61 diagnostics::LogCollectionOptions { format: log_display, max_severity: max_severity_logs };
62
63 let mut test_cases: HashMap<u32, CollectedEntityState<_>> = HashMap::new();
64 let mut suite_state = CollectedEntityState {
65 reporter: suite_reporter,
66 name: "".to_string(),
67 lifecycle: Lifecycle::Found,
68 artifact_tasks: vec![],
69 };
70 let mut suite_finish_timestamp = Timestamp::Unknown;
71 let mut outcome = Outcome::Passed;
72
73 let collect_results_fut = async {
74 while let Some(event_result) = event_stream.next().named("next_event").await {
75 match event_result {
76 Err(e) => match (e, no_cases_equals_success) {
77 (RunTestSuiteError::Launch(LaunchError::NoMatchingCases), Some(true)) => {
78 suite_state.lifecycle = Lifecycle::Stopped;
79 }
80 (e, _) => {
81 suite_state
82 .reporter
83 .stopped(&output::ReportedOutcome::Error, Timestamp::Unknown)?;
84 return Err(e);
85 }
86 },
87 Ok(event) => {
88 let timestamp = Timestamp::from_nanos(event.timestamp);
89 match event.details.expect("event cannot be None") {
90 ftest_manager::EventDetails::TestCaseFound(TestCaseFoundEventDetails {
91 test_case_name: Some(test_case_name),
92 test_case_id: Some(test_case_id),
93 ..
94 }) => {
95 if test_cases.contains_key(&test_case_id) {
96 return Err(UnexpectedEventError::InvalidCaseEvent {
97 last_state: Lifecycle::Found,
98 next_state: Lifecycle::Found,
99 test_case_name,
100 test_case_id,
101 }
102 .into());
103 }
104 test_cases.insert(
105 test_case_id,
106 CollectedEntityState {
107 reporter: suite_reporter
108 .new_case(&test_case_name, &CaseId(test_case_id))?,
109 name: test_case_name,
110 lifecycle: Lifecycle::Found,
111 artifact_tasks: vec![],
112 },
113 );
114 }
115 ftest_manager::EventDetails::TestCaseStarted(
116 TestCaseStartedEventDetails {
117 test_case_id: Some(test_case_id), ..
118 },
119 ) => {
120 let entry = test_cases.get_mut(&test_case_id).ok_or(
121 UnexpectedEventError::CaseEventButNotFound {
122 next_state: Lifecycle::Started,
123 test_case_id,
124 },
125 )?;
126 match &entry.lifecycle {
127 Lifecycle::Found => {
128 entry.reporter.started(Timestamp::Unknown)?;
131 entry.lifecycle = Lifecycle::Started;
132 }
133 other => {
134 return Err(UnexpectedEventError::InvalidCaseEvent {
135 last_state: *other,
136 next_state: Lifecycle::Started,
137 test_case_name: entry.name.clone(),
138 test_case_id,
139 }
140 .into());
141 }
142 }
143 }
144 ftest_manager::EventDetails::TestCaseArtifactGenerated(
145 TestCaseArtifactGeneratedEventDetails {
146 test_case_id: Some(test_case_id),
147 artifact: Some(artifact),
148 ..
149 },
150 ) => {
151 let entry = test_cases.get_mut(&test_case_id).ok_or(
152 UnexpectedEventError::CaseArtifactButNotFound { test_case_id },
153 )?;
154 if matches!(entry.lifecycle, Lifecycle::Finished) {
155 return Err(UnexpectedEventError::CaseArtifactButFinished {
156 test_case_id,
157 }
158 .into());
159 }
160 let artifact_fut = artifacts::drain_artifact(
161 &entry.reporter,
162 artifact,
163 log_opts.clone(),
164 )
165 .await?;
166 entry.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
167 }
168 ftest_manager::EventDetails::TestCaseStopped(
169 TestCaseStoppedEventDetails {
170 test_case_id: Some(test_case_id),
171 result: Some(result),
172 ..
173 },
174 ) => {
175 let entry = test_cases.get_mut(&test_case_id).ok_or(
176 UnexpectedEventError::CaseEventButNotFound {
177 next_state: Lifecycle::Stopped,
178 test_case_id,
179 },
180 )?;
181 match &entry.lifecycle {
182 Lifecycle::Started => {
183 entry.reporter.stopped(&result.into(), Timestamp::Unknown)?;
186 entry.lifecycle = Lifecycle::Stopped;
187 }
188 other => {
189 return Err(UnexpectedEventError::InvalidCaseEvent {
190 last_state: *other,
191 next_state: Lifecycle::Stopped,
192 test_case_name: entry.name.clone(),
193 test_case_id,
194 }
195 .into());
196 }
197 }
198 }
199 ftest_manager::EventDetails::TestCaseFinished(
200 TestCaseFinishedEventDetails {
201 test_case_id: Some(test_case_id), ..
202 },
203 ) => {
204 let entry = test_cases.get_mut(&test_case_id).ok_or(
205 UnexpectedEventError::CaseEventButNotFound {
206 next_state: Lifecycle::Finished,
207 test_case_id,
208 },
209 )?;
210 match &entry.lifecycle {
211 Lifecycle::Stopped => {
212 entry.lifecycle = Lifecycle::Finished;
215 }
216 other => {
217 return Err(UnexpectedEventError::InvalidCaseEvent {
218 last_state: *other,
219 next_state: Lifecycle::Finished,
220 test_case_name: entry.name.clone(),
221 test_case_id,
222 }
223 .into());
224 }
225 }
226 }
227 ftest_manager::EventDetails::SuiteArtifactGenerated(
228 SuiteArtifactGeneratedEventDetails { artifact: Some(artifact), .. },
229 ) => {
230 let artifact_fut = artifacts::drain_artifact(
231 suite_reporter,
232 artifact,
233 log_opts.clone(),
234 )
235 .await?;
236 suite_state.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
237 }
238 ftest_manager::EventDetails::SuiteStarted(_) => {
239 match &suite_state.lifecycle {
240 Lifecycle::Found => {
241 suite_state.reporter.started(timestamp)?;
242 suite_state.lifecycle = Lifecycle::Started;
243 }
244 other => {
245 return Err(UnexpectedEventError::InvalidEvent {
246 last_state: *other,
247 next_state: Lifecycle::Started,
248 }
249 .into());
250 }
251 }
252 }
253 ftest_manager::EventDetails::SuiteStopped(SuiteStoppedEventDetails {
254 result: Some(result),
255 ..
256 }) => match &suite_state.lifecycle {
257 Lifecycle::Started => {
258 suite_state.lifecycle = Lifecycle::Stopped;
259 suite_finish_timestamp = timestamp;
260 outcome = match result {
261 ftest_manager::SuiteResult::Finished => Outcome::Passed,
262 ftest_manager::SuiteResult::Failed => Outcome::Failed,
263 ftest_manager::SuiteResult::DidNotFinish => {
264 Outcome::Inconclusive
265 }
266 ftest_manager::SuiteResult::TimedOut
267 | ftest_manager::SuiteResult::Stopped => Outcome::Timedout,
268 ftest_manager::SuiteResult::InternalError => Outcome::error(
269 UnexpectedEventError::InternalErrorSuiteResult,
270 ),
271 r => {
272 return Err(
273 UnexpectedEventError::UnrecognizedSuiteResult {
274 result: r,
275 }
276 .into(),
277 );
278 }
279 };
280 }
281 other => {
282 return Err(UnexpectedEventError::InvalidEvent {
283 last_state: *other,
284 next_state: Lifecycle::Stopped,
285 }
286 .into());
287 }
288 },
289 ftest_manager::EventDetailsUnknown!() => {
290 warn!("Encountered unrecognized suite event");
291 }
292 }
293 }
294 }
295 }
296 drop(event_stream); Ok(())
298 }
299 .boxed_local();
300
301 let start_time = std::time::Instant::now();
302 let (stop_timeout_future, kill_timeout_future) = match timeout {
303 None => {
304 (futures::future::pending::<()>().boxed(), futures::future::pending::<()>().boxed())
305 }
306 Some(duration) => (
307 fasync::Timer::new(start_time + duration).boxed(),
308 fasync::Timer::new(start_time + duration + timeout_grace).boxed(),
309 ),
310 };
311
312 let collect_or_stop_fut = async move {
314 match futures::future::select(stop_timeout_future, collect_results_fut).await {
315 Either::Left((_stop_done, collect_fut)) => {
316 stopper.stop();
317 collect_fut.await
318 }
319 Either::Right((result, _)) => result,
320 }
321 };
322
323 let kill_fut = async move {
327 match futures::future::select(cancel_fut, kill_timeout_future).await {
328 Either::Left(_) => Outcome::Cancelled,
329 Either::Right(_) => Outcome::Timedout,
330 }
331 }
332 .shared();
333
334 let early_termination_outcome =
335 match collect_or_stop_fut.boxed_local().or_cancelled(kill_fut.clone()).await {
336 Ok(Ok(())) => None,
337 Ok(Err(e)) => return Err(e),
338 Err(Cancelled(outcome)) => Some(outcome),
339 };
340
341 info!("Awaiting case artifacts");
343 let mut unfinished_test_case_names = vec![];
344 for (_, test_case) in test_cases.into_iter() {
345 let CollectedEntityState { reporter, name, lifecycle, artifact_tasks } = test_case;
346 match (lifecycle, early_termination_outcome.clone()) {
347 (Lifecycle::Started | Lifecycle::Found, Some(early)) => {
348 reporter.stopped(&early.into(), Timestamp::Unknown)?;
349 }
350 (Lifecycle::Found, None) => {
351 unfinished_test_case_names.push(name.clone());
352 reporter.stopped(&Outcome::Inconclusive.into(), Timestamp::Unknown)?;
353 }
354 (Lifecycle::Started, None) => {
355 unfinished_test_case_names.push(name.clone());
356 reporter.stopped(&Outcome::DidNotFinish.into(), Timestamp::Unknown)?;
357 }
358 (Lifecycle::Stopped | Lifecycle::Finished, _) => (),
359 }
360
361 let finish_artifacts_fut = FuturesUnordered::from_iter(artifact_tasks)
362 .map(|result| match result {
363 Err(e) => {
364 error!("Failed to collect artifact for {}: {:?}", name, e);
365 }
366 Ok(Some(_log_result)) => warn!("Unexpectedly got log results for a test case"),
367 Ok(None) => (),
368 })
369 .collect::<()>();
370 if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut.clone()).await {
371 warn!("Stopped polling artifacts for {} due to timeout", name);
372 }
373
374 reporter.finished()?;
375 }
376 if !unfinished_test_case_names.is_empty() {
377 outcome = Outcome::error(UnexpectedEventError::CasesDidNotFinish {
378 cases: unfinished_test_case_names,
379 });
380 }
381
382 match (suite_state.lifecycle, early_termination_outcome) {
383 (Lifecycle::Found | Lifecycle::Started, Some(early)) => {
384 if matches!(&outcome, Outcome::Passed | Outcome::Failed) {
385 outcome = early;
386 }
387 }
388 (Lifecycle::Found | Lifecycle::Started, None) => {
389 outcome = Outcome::error(UnexpectedEventError::SuiteDidNotReportStop);
390 }
391 (Lifecycle::Stopped, _) => (),
393 (Lifecycle::Finished, _) => unreachable!(),
395 }
396
397 let restricted_logs_present = AtomicBool::new(false);
398 let finish_artifacts_fut = FuturesUnordered::from_iter(suite_state.artifact_tasks)
399 .then(|result| async {
400 match result {
401 Err(e) => {
402 error!("Failed to collect artifact for suite: {:?}", e);
403 }
404 Ok(Some(log_result)) => match log_result {
405 diagnostics::LogCollectionOutcome::Error { restricted_logs } => {
406 restricted_logs_present.store(true, Ordering::Relaxed);
407 let mut log_artifact = match suite_reporter
408 .new_artifact(&ArtifactType::RestrictedLog)
409 {
410 Ok(artifact) => artifact,
411 Err(e) => {
412 warn!("Error creating artifact to report restricted logs: {:?}", e);
413 return;
414 }
415 };
416 for log in restricted_logs.iter() {
417 if let Err(e) = writeln!(log_artifact, "{}", log) {
418 warn!("Error recording restricted logs: {:?}", e);
419 return;
420 }
421 }
422 }
423 diagnostics::LogCollectionOutcome::Passed => (),
424 },
425 Ok(None) => (),
426 }
427 })
428 .collect::<()>();
429 if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut).await {
430 warn!("Stopped polling artifacts due to timeout");
431 }
432 if restricted_logs_present.into_inner() && matches!(outcome, Outcome::Passed) {
433 outcome = Outcome::Failed;
434 }
435
436 suite_reporter.stopped(&outcome.clone().into(), suite_finish_timestamp)?;
437
438 Ok(outcome)
439}
440
441type EventStream =
442 std::pin::Pin<Box<dyn Stream<Item = Result<ftest_manager::Event, RunTestSuiteError>> + Send>>;
443
444pub(crate) struct RunningSuite {
447 event_stream: EventStream,
448 stopper: RunningSuiteStopper,
449 max_severity_logs: Option<Severity>,
450 timeout: Option<std::time::Duration>,
451 timeout_grace: std::time::Duration,
452 no_cases_equals_success: Option<bool>,
453}
454
455struct RunningSuiteStopper(Arc<ftest_manager::SuiteControllerProxy>);
456
457impl RunningSuiteStopper {
458 fn stop(self) {
459 let _ = self.0.stop();
460 }
461}
462
463pub(crate) struct WaitForStartArgs {
464 pub(crate) proxy: ftest_manager::SuiteControllerProxy,
465 pub(crate) max_severity_logs: Option<Severity>,
466 pub(crate) timeout: Option<std::time::Duration>,
467 pub(crate) timeout_grace: std::time::Duration,
468 pub(crate) max_pipelined: Option<usize>,
469 pub(crate) no_cases_equals_success: Option<bool>,
470}
471
472impl RunningSuite {
473 const DEFAULT_PIPELINED_REQUESTS: usize = 8;
477 pub(crate) async fn wait_for_start(args: WaitForStartArgs) -> Self {
478 let WaitForStartArgs {
479 proxy,
480 max_severity_logs,
481 timeout,
482 timeout_grace,
483 max_pipelined,
484 no_cases_equals_success,
485 } = args;
486
487 let proxy = Arc::new(proxy);
488 let proxy_clone = proxy.clone();
489 let unprocessed_event_stream = futures::stream::repeat_with(move || {
491 proxy.watch_events().inspect(|events_result| match events_result {
492 Ok(Ok(ref events)) => info!("Latest suite event: {:?}", events.last()),
493 _ => (),
494 })
495 })
496 .buffered(max_pipelined.unwrap_or(Self::DEFAULT_PIPELINED_REQUESTS));
497 let terminated_event_stream =
499 unprocessed_event_stream.take_until_stop_after(|result| match &result {
500 Ok(Ok(events)) => events.is_empty(),
501 Err(_) | Ok(Err(_)) => true,
502 });
503 let mut event_stream = terminated_event_stream
505 .map(Self::convert_to_result_vec)
506 .map(futures::stream::iter)
507 .flatten()
508 .peekable();
509 std::pin::Pin::new(&mut event_stream).peek().await;
511
512 Self {
513 event_stream: event_stream.boxed(),
514 stopper: RunningSuiteStopper(proxy_clone),
515 timeout,
516 timeout_grace,
517 max_severity_logs,
518 no_cases_equals_success,
519 }
520 }
521
522 fn convert_to_result_vec(
523 vec: Result<Result<Vec<ftest_manager::Event>, ftest_manager::LaunchError>, fidl::Error>,
524 ) -> Vec<Result<ftest_manager::Event, RunTestSuiteError>> {
525 match vec {
526 Ok(Ok(events)) => events.into_iter().map(Ok).collect(),
527 Ok(Err(e)) => vec![Err(e.into())],
528 Err(e) => vec![Err(e.into())],
529 }
530 }
531}
532
533#[cfg(test)]
534mod test {
535 use super::*;
536 use crate::output::EntityId;
537 use assert_matches::assert_matches;
538 use fidl::endpoints::create_proxy_and_stream;
539 use maplit::hashmap;
540
541 async fn respond_to_watch_events(
542 request_stream: &mut ftest_manager::SuiteControllerRequestStream,
543 events: Vec<ftest_manager::Event>,
544 ) {
545 let request = request_stream
546 .next()
547 .await
548 .expect("did not get next request")
549 .expect("error getting next request");
550 let responder = match request {
551 ftest_manager::SuiteControllerRequest::WatchEvents { responder } => responder,
552 r => panic!("Expected WatchEvents request but got {:?}", r),
553 };
554
555 responder.send(Ok(events)).expect("send events");
556 }
557
558 async fn serve_all_events(
560 mut request_stream: ftest_manager::SuiteControllerRequestStream,
561 events: Vec<ftest_manager::Event>,
562 ) {
563 const BATCH_SIZE: usize = 5;
564 let mut event_iter = events.into_iter();
565 while event_iter.len() > 0 {
566 respond_to_watch_events(
567 &mut request_stream,
568 event_iter.by_ref().take(BATCH_SIZE).collect(),
569 )
570 .await;
571 }
572 respond_to_watch_events(&mut request_stream, vec![]).await;
573 }
574
575 async fn serve_all_events_then_hang(
577 mut request_stream: ftest_manager::SuiteControllerRequestStream,
578 events: Vec<ftest_manager::Event>,
579 ) {
580 const BATCH_SIZE: usize = 5;
581 let mut event_iter = events.into_iter();
582 while event_iter.len() > 0 {
583 respond_to_watch_events(
584 &mut request_stream,
585 event_iter.by_ref().take(BATCH_SIZE).collect(),
586 )
587 .await;
588 }
589 let _requests = request_stream.collect::<Vec<_>>().await;
590 }
591
592 fn create_empty_event(timestamp: i64) -> ftest_manager::Event {
596 ftest_manager::Event { timestamp: Some(timestamp), ..Default::default() }
597 }
598
599 macro_rules! assert_empty_events_eq {
600 ($t1:expr, $t2:expr) => {
601 assert_eq!($t1.timestamp, $t2.timestamp, "Got incorrect event.")
602 };
603 }
604
605 #[fuchsia::test]
606 async fn running_events_simple() {
607 let (suite_proxy, mut suite_request_stream) =
608 create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
609 let suite_server_task = fasync::Task::spawn(async move {
610 respond_to_watch_events(&mut suite_request_stream, vec![create_empty_event(0)]).await;
611 respond_to_watch_events(&mut suite_request_stream, vec![]).await;
612 drop(suite_request_stream);
613 });
614
615 let mut running_suite = RunningSuite::wait_for_start(WaitForStartArgs {
616 proxy: suite_proxy,
617 max_severity_logs: None,
618 timeout: None,
619 timeout_grace: std::time::Duration::ZERO,
620 max_pipelined: None,
621 no_cases_equals_success: None,
622 })
623 .await;
624 assert_empty_events_eq!(
625 running_suite.event_stream.next().await.unwrap().unwrap(),
626 create_empty_event(0)
627 );
628 assert!(running_suite.event_stream.next().await.is_none());
629 assert!(running_suite.event_stream.next().await.is_none());
631 suite_server_task.await;
632 }
633
634 #[fuchsia::test]
635 async fn running_events_multiple_events() {
636 let (suite_proxy, mut suite_request_stream) =
637 create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
638 let suite_server_task = fasync::Task::spawn(async move {
639 respond_to_watch_events(
640 &mut suite_request_stream,
641 vec![create_empty_event(0), create_empty_event(1)],
642 )
643 .await;
644 respond_to_watch_events(
645 &mut suite_request_stream,
646 vec![create_empty_event(2), create_empty_event(3)],
647 )
648 .await;
649 respond_to_watch_events(&mut suite_request_stream, vec![]).await;
650 drop(suite_request_stream);
651 });
652
653 let mut running_suite = RunningSuite::wait_for_start(WaitForStartArgs {
654 proxy: suite_proxy,
655 max_severity_logs: None,
656 timeout: None,
657 timeout_grace: std::time::Duration::ZERO,
658 max_pipelined: None,
659 no_cases_equals_success: None,
660 })
661 .await;
662
663 for num in 0..4 {
664 assert_empty_events_eq!(
665 running_suite.event_stream.next().await.unwrap().unwrap(),
666 create_empty_event(num)
667 );
668 }
669 assert!(running_suite.event_stream.next().await.is_none());
670 suite_server_task.await;
671 }
672
673 #[fuchsia::test]
674 async fn running_events_peer_closed() {
675 let (suite_proxy, mut suite_request_stream) =
676 create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
677 let suite_server_task = fasync::Task::spawn(async move {
678 respond_to_watch_events(&mut suite_request_stream, vec![create_empty_event(1)]).await;
679 drop(suite_request_stream);
680 });
681
682 let mut running_suite = RunningSuite::wait_for_start(WaitForStartArgs {
683 proxy: suite_proxy,
684 max_severity_logs: None,
685 timeout: None,
686 timeout_grace: std::time::Duration::ZERO,
687 max_pipelined: None,
688 no_cases_equals_success: None,
689 })
690 .await;
691 assert_empty_events_eq!(
692 running_suite.event_stream.next().await.unwrap().unwrap(),
693 create_empty_event(1)
694 );
695 assert_matches!(
696 running_suite.event_stream.next().await,
697 Some(Err(RunTestSuiteError::Fidl(fidl::Error::ClientChannelClosed { .. })))
698 );
699 suite_server_task.await;
700 }
701
702 fn event_from_details(
703 timestamp: i64,
704 details: ftest_manager::EventDetails,
705 ) -> ftest_manager::Event {
706 let mut event = create_empty_event(timestamp);
707 event.details = Some(details);
708 event
709 }
710
711 fn case_found_event(timestamp: i64, test_case_id: u32, name: &str) -> ftest_manager::Event {
712 event_from_details(
713 timestamp,
714 ftest_manager::EventDetails::TestCaseFound(ftest_manager::TestCaseFoundEventDetails {
715 test_case_name: Some(name.into()),
716 test_case_id: Some(test_case_id),
717 ..Default::default()
718 }),
719 )
720 }
721
722 fn case_started_event(timestamp: i64, test_case_id: u32) -> ftest_manager::Event {
723 event_from_details(
724 timestamp,
725 ftest_manager::EventDetails::TestCaseStarted(
726 ftest_manager::TestCaseStartedEventDetails {
727 test_case_id: Some(test_case_id),
728 ..Default::default()
729 },
730 ),
731 )
732 }
733
734 fn case_stopped_event(
735 timestamp: i64,
736 test_case_id: u32,
737 result: ftest_manager::TestCaseResult,
738 ) -> ftest_manager::Event {
739 event_from_details(
740 timestamp,
741 ftest_manager::EventDetails::TestCaseStopped(
742 ftest_manager::TestCaseStoppedEventDetails {
743 test_case_id: Some(test_case_id),
744 result: Some(result),
745 ..Default::default()
746 },
747 ),
748 )
749 }
750
751 fn case_finished_event(timestamp: i64, test_case_id: u32) -> ftest_manager::Event {
752 event_from_details(
753 timestamp,
754 ftest_manager::EventDetails::TestCaseFinished(
755 ftest_manager::TestCaseFinishedEventDetails {
756 test_case_id: Some(test_case_id),
757 ..Default::default()
758 },
759 ),
760 )
761 }
762
763 fn case_stdout_event(
764 timestamp: i64,
765 test_case_id: u32,
766 stdout: fidl::Socket,
767 ) -> ftest_manager::Event {
768 event_from_details(
769 timestamp,
770 ftest_manager::EventDetails::TestCaseArtifactGenerated(
771 ftest_manager::TestCaseArtifactGeneratedEventDetails {
772 test_case_id: Some(test_case_id),
773 artifact: Some(ftest_manager::Artifact::Stdout(stdout)),
774 ..Default::default()
775 },
776 ),
777 )
778 }
779
780 fn case_stderr_event(
781 timestamp: i64,
782 test_case_id: u32,
783 stderr: fidl::Socket,
784 ) -> ftest_manager::Event {
785 event_from_details(
786 timestamp,
787 ftest_manager::EventDetails::TestCaseArtifactGenerated(
788 ftest_manager::TestCaseArtifactGeneratedEventDetails {
789 test_case_id: Some(test_case_id),
790 artifact: Some(ftest_manager::Artifact::Stderr(stderr)),
791 ..Default::default()
792 },
793 ),
794 )
795 }
796
797 fn suite_started_event(timestamp: i64) -> ftest_manager::Event {
798 event_from_details(
799 timestamp,
800 ftest_manager::EventDetails::SuiteStarted(ftest_manager::SuiteStartedEventDetails {
801 ..Default::default()
802 }),
803 )
804 }
805
806 fn suite_stopped_event(
807 timestamp: i64,
808 result: ftest_manager::SuiteResult,
809 ) -> ftest_manager::Event {
810 event_from_details(
811 timestamp,
812 ftest_manager::EventDetails::SuiteStopped(ftest_manager::SuiteStoppedEventDetails {
813 result: Some(result),
814 ..Default::default()
815 }),
816 )
817 }
818
819 #[fuchsia::test]
820 async fn collect_events_simple() {
821 let all_events = vec![
822 suite_started_event(0),
823 case_found_event(100, 0, "my_test_case"),
824 case_started_event(200, 0),
825 case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
826 case_finished_event(400, 0),
827 suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
828 ];
829
830 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
831 let test_fut = async move {
832 let reporter = output::InMemoryReporter::new();
833 let run_reporter = output::RunReporter::new(reporter.clone());
834 let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
835
836 let suite = RunningSuite::wait_for_start(WaitForStartArgs {
837 proxy,
838 max_severity_logs: None,
839 timeout: None,
840 timeout_grace: std::time::Duration::ZERO,
841 max_pipelined: None,
842 no_cases_equals_success: None,
843 })
844 .await;
845 assert_eq!(
846 run_suite_and_collect_logs(
847 suite,
848 &suite_reporter,
849 diagnostics::LogDisplayConfiguration::default(),
850 futures::future::pending()
851 )
852 .await
853 .expect("collect results"),
854 Outcome::Passed
855 );
856 suite_reporter.finished().expect("Reporter finished");
857
858 let reports = reporter.get_reports();
859 let case = reports
860 .iter()
861 .find(|report| report.id == EntityId::Case { case: CaseId(0) })
862 .unwrap();
863 assert_eq!(case.report.name, "my_test_case");
864 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
865 assert!(case.report.is_finished);
866 assert!(case.report.artifacts.is_empty());
867 assert!(case.report.directories.is_empty());
868 let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
869 assert_eq!(suite.report.name, "test-url");
870 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
871 assert!(suite.report.is_finished);
872 assert!(suite.report.artifacts.is_empty());
873 assert!(suite.report.directories.is_empty());
874 };
875
876 futures::future::join(serve_all_events(stream, all_events), test_fut).await;
877 }
878
879 #[fuchsia::test]
880 async fn collect_events_with_case_artifacts() {
881 const STDOUT_CONTENT: &str = "stdout from my_test_case";
882 const STDERR_CONTENT: &str = "stderr from my_test_case";
883
884 let (stdout_write, stdout_read) = fidl::Socket::create_stream();
885 let (stderr_write, stderr_read) = fidl::Socket::create_stream();
886 let all_events = vec![
887 suite_started_event(0),
888 case_found_event(100, 0, "my_test_case"),
889 case_started_event(200, 0),
890 case_stdout_event(300, 0, stdout_read),
891 case_stderr_event(300, 0, stderr_read),
892 case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
893 case_finished_event(400, 0),
894 suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
895 ];
896
897 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
898 let stdio_write_fut = async move {
899 let mut async_stdout = fasync::Socket::from_socket(stdout_write);
900 async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
901 let mut async_stderr = fasync::Socket::from_socket(stderr_write);
902 async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
903 };
904 let test_fut = async move {
905 let reporter = output::InMemoryReporter::new();
906 let run_reporter = output::RunReporter::new(reporter.clone());
907 let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
908
909 let suite = RunningSuite::wait_for_start(WaitForStartArgs {
910 proxy,
911 max_severity_logs: None,
912 timeout: None,
913 timeout_grace: std::time::Duration::ZERO,
914 max_pipelined: None,
915 no_cases_equals_success: None,
916 })
917 .await;
918 assert_eq!(
919 run_suite_and_collect_logs(
920 suite,
921 &suite_reporter,
922 diagnostics::LogDisplayConfiguration::default(),
923 futures::future::pending()
924 )
925 .await
926 .expect("collect results"),
927 Outcome::Passed
928 );
929 suite_reporter.finished().expect("Reporter finished");
930
931 let reports = reporter.get_reports();
932 let case = reports
933 .iter()
934 .find(|report| report.id == EntityId::Case { case: CaseId(0) })
935 .unwrap();
936 assert_eq!(case.report.name, "my_test_case");
937 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
938 assert!(case.report.is_finished);
939 assert_eq!(case.report.artifacts.len(), 2);
940 assert_eq!(
941 case.report
942 .artifacts
943 .iter()
944 .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
945 .collect::<HashMap<_, _>>(),
946 hashmap! {
947 output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
948 output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
949 }
950 );
951 assert!(case.report.directories.is_empty());
952
953 let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
954 assert_eq!(suite.report.name, "test-url");
955 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
956 assert!(suite.report.is_finished);
957 assert!(suite.report.artifacts.is_empty());
958 assert!(suite.report.directories.is_empty());
959 };
960
961 futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
962 .await;
963 }
964
965 #[fuchsia::test]
966 async fn collect_events_case_artifacts_complete_after_suite() {
967 const STDOUT_CONTENT: &str = "stdout from my_test_case";
968 const STDERR_CONTENT: &str = "stderr from my_test_case";
969
970 let (stdout_write, stdout_read) = fidl::Socket::create_stream();
971 let (stderr_write, stderr_read) = fidl::Socket::create_stream();
972 let all_events = vec![
973 suite_started_event(0),
974 case_found_event(100, 0, "my_test_case"),
975 case_started_event(200, 0),
976 case_stdout_event(300, 0, stdout_read),
977 case_stderr_event(300, 0, stderr_read),
978 case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
979 case_finished_event(400, 0),
980 suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
981 ];
982
983 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
984 let serve_fut = async move {
985 serve_all_events(stream, all_events).await;
987 let mut async_stdout = fasync::Socket::from_socket(stdout_write);
988 async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
989 let mut async_stderr = fasync::Socket::from_socket(stderr_write);
990 async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
991 };
992 let test_fut = async move {
993 let reporter = output::InMemoryReporter::new();
994 let run_reporter = output::RunReporter::new(reporter.clone());
995 let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
996
997 let suite = RunningSuite::wait_for_start(WaitForStartArgs {
998 proxy,
999 max_severity_logs: None,
1000 timeout: None,
1001 timeout_grace: std::time::Duration::ZERO,
1002 max_pipelined: Some(1),
1003 no_cases_equals_success: None,
1004 })
1005 .await;
1006 assert_eq!(
1007 run_suite_and_collect_logs(
1008 suite,
1009 &suite_reporter,
1010 diagnostics::LogDisplayConfiguration::default(),
1011 futures::future::pending()
1012 )
1013 .await
1014 .expect("collect results"),
1015 Outcome::Passed
1016 );
1017 suite_reporter.finished().expect("Reporter finished");
1018
1019 let reports = reporter.get_reports();
1020 let case = reports
1021 .iter()
1022 .find(|report| report.id == EntityId::Case { case: CaseId(0) })
1023 .unwrap();
1024 assert_eq!(case.report.name, "my_test_case");
1025 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
1026 assert!(case.report.is_finished);
1027 assert_eq!(case.report.artifacts.len(), 2);
1028 assert_eq!(
1029 case.report
1030 .artifacts
1031 .iter()
1032 .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1033 .collect::<HashMap<_, _>>(),
1034 hashmap! {
1035 output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
1036 output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
1037 }
1038 );
1039 assert!(case.report.directories.is_empty());
1040
1041 let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
1042 assert_eq!(suite.report.name, "test-url");
1043 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
1044 assert!(suite.report.is_finished);
1045 assert!(suite.report.artifacts.is_empty());
1046 assert!(suite.report.directories.is_empty());
1047 };
1048
1049 futures::future::join(serve_fut, test_fut).await;
1050 }
1051
1052 #[fuchsia::test]
1053 async fn collect_events_with_case_artifacts_sent_after_case_stopped() {
1054 const STDOUT_CONTENT: &str = "stdout from my_test_case";
1055 const STDERR_CONTENT: &str = "stderr from my_test_case";
1056
1057 let (stdout_write, stdout_read) = fidl::Socket::create_stream();
1058 let (stderr_write, stderr_read) = fidl::Socket::create_stream();
1059 let all_events = vec![
1060 suite_started_event(0),
1061 case_found_event(100, 0, "my_test_case"),
1062 case_started_event(200, 0),
1063 case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
1064 case_stdout_event(300, 0, stdout_read),
1065 case_stderr_event(300, 0, stderr_read),
1066 case_finished_event(400, 0),
1067 suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
1068 ];
1069
1070 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
1071 let stdio_write_fut = async move {
1072 let mut async_stdout = fasync::Socket::from_socket(stdout_write);
1073 async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
1074 let mut async_stderr = fasync::Socket::from_socket(stderr_write);
1075 async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
1076 };
1077 let test_fut = async move {
1078 let reporter = output::InMemoryReporter::new();
1079 let run_reporter = output::RunReporter::new(reporter.clone());
1080 let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
1081
1082 let suite = RunningSuite::wait_for_start(WaitForStartArgs {
1083 proxy,
1084 max_severity_logs: None,
1085 timeout: None,
1086 timeout_grace: std::time::Duration::ZERO,
1087 max_pipelined: None,
1088 no_cases_equals_success: None,
1089 })
1090 .await;
1091 assert_eq!(
1092 run_suite_and_collect_logs(
1093 suite,
1094 &suite_reporter,
1095 diagnostics::LogDisplayConfiguration::default(),
1096 futures::future::pending()
1097 )
1098 .await
1099 .expect("collect results"),
1100 Outcome::Passed
1101 );
1102 suite_reporter.finished().expect("Reporter finished");
1103
1104 let reports = reporter.get_reports();
1105 let case = reports
1106 .iter()
1107 .find(|report| report.id == EntityId::Case { case: CaseId(0) })
1108 .unwrap();
1109 assert_eq!(case.report.name, "my_test_case");
1110 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
1111 assert!(case.report.is_finished);
1112 assert_eq!(case.report.artifacts.len(), 2);
1113 assert_eq!(
1114 case.report
1115 .artifacts
1116 .iter()
1117 .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1118 .collect::<HashMap<_, _>>(),
1119 hashmap! {
1120 output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
1121 output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
1122 }
1123 );
1124 assert!(case.report.directories.is_empty());
1125
1126 let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
1127 assert_eq!(suite.report.name, "test-url");
1128 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
1129 assert!(suite.report.is_finished);
1130 assert!(suite.report.artifacts.is_empty());
1131 assert!(suite.report.directories.is_empty());
1132 };
1133
1134 futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
1135 .await;
1136 }
1137
1138 #[fuchsia::test]
1139 async fn collect_events_timed_out_case_with_hanging_artifacts() {
1140 let (_stdout_write, stdout_read) = fidl::Socket::create_stream();
1142 let (_stderr_write, stderr_read) = fidl::Socket::create_stream();
1143 let all_events = vec![
1144 suite_started_event(0),
1145 case_found_event(100, 0, "my_test_case"),
1146 case_started_event(200, 0),
1147 case_stdout_event(300, 0, stdout_read),
1148 case_stderr_event(300, 0, stderr_read),
1149 ];
1150
1151 let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
1152 let test_fut = async move {
1153 let reporter = output::InMemoryReporter::new();
1154 let run_reporter = output::RunReporter::new(reporter.clone());
1155 let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
1156
1157 let suite = RunningSuite::wait_for_start(WaitForStartArgs {
1158 proxy,
1159 max_severity_logs: None,
1160 timeout: Some(std::time::Duration::from_secs(2)),
1161 timeout_grace: std::time::Duration::ZERO,
1162 max_pipelined: None,
1163 no_cases_equals_success: None,
1164 })
1165 .await;
1166 assert_eq!(
1167 run_suite_and_collect_logs(
1168 suite,
1169 &suite_reporter,
1170 diagnostics::LogDisplayConfiguration::default(),
1171 futures::future::pending()
1172 )
1173 .await
1174 .expect("collect results"),
1175 Outcome::Timedout
1176 );
1177 suite_reporter.finished().expect("Reporter finished");
1178
1179 let reports = reporter.get_reports();
1180 let case = reports
1181 .iter()
1182 .find(|report| report.id == EntityId::Case { case: CaseId(0) })
1183 .unwrap();
1184 assert_eq!(case.report.name, "my_test_case");
1185 assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Timedout));
1186 assert!(case.report.is_finished);
1187 assert_eq!(case.report.artifacts.len(), 2);
1188 assert_eq!(
1189 case.report
1190 .artifacts
1191 .iter()
1192 .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1193 .collect::<HashMap<_, _>>(),
1194 hashmap! {
1195 output::ArtifactType::Stdout => vec![],
1196 output::ArtifactType::Stderr => vec![]
1197 }
1198 );
1199 assert!(case.report.directories.is_empty());
1200
1201 let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
1202 assert_eq!(suite.report.name, "test-url");
1203 assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Timedout));
1204 assert!(suite.report.is_finished);
1205 assert!(suite.report.artifacts.is_empty());
1206 assert!(suite.report.directories.is_empty());
1207 };
1208
1209 futures::future::join(serve_all_events_then_hang(stream, all_events), test_fut).await;
1210 }
1211}