1use anyhow::{bail, Context as _, Error};
8use fidl_fuchsia_test_manager::{
9 self as ftest_manager, SuiteControllerProxy, SuiteEvent as FidlSuiteEvent,
10 SuiteEventPayload as FidlSuiteEventPayload, SuiteEventPayloadUnknown,
11};
12use futures::channel::mpsc;
13use futures::prelude::*;
14use linked_hash_map::LinkedHashMap;
15use log::*;
16use moniker::ExtendedMoniker;
17use std::collections::HashMap;
18use std::sync::Arc;
19use test_diagnostics::zstd_compress::Decoder;
20use test_diagnostics::{collect_and_send_string_output, collect_string_from_socket, LogStream};
21use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
22
23pub fn default_run_option() -> ftest_manager::RunOptions {
24 ftest_manager::RunOptions {
25 parallel: None,
26 arguments: None,
27 run_disabled_tests: Some(false),
28 timeout: None,
29 case_filters_to_run: None,
30 log_iterator: None,
31 ..Default::default()
32 }
33}
34
35pub fn default_run_suite_options() -> ftest_manager::RunSuiteOptions {
36 ftest_manager::RunSuiteOptions { run_disabled_tests: Some(false), ..Default::default() }
37}
38
39#[derive(Debug, Eq, PartialEq)]
40pub struct AttributedLog {
41 pub log: String,
42 pub moniker: ExtendedMoniker,
43}
44
45pub async fn collect_suite_events(
46 suite_instance: SuiteRunInstance,
47) -> Result<(Vec<RunEvent>, Vec<AttributedLog>), Error> {
48 let (sender, mut recv) = mpsc::channel(1);
49 let execution_task =
50 fasync::Task::spawn(async move { suite_instance.collect_events(sender).await });
51 let mut events = vec![];
52 let mut log_tasks = vec![];
53 while let Some(event) = recv.next().await {
54 match event.payload {
55 SuiteEventPayload::RunEvent(RunEvent::CaseStdout { name, mut stdout_message }) => {
56 if stdout_message.ends_with("\n") {
57 stdout_message.truncate(stdout_message.len() - 1)
58 }
59 let logs = stdout_message.split("\n");
60 for log in logs {
61 if log.contains("Note: Randomizing tests' orders with a seed of") {
64 continue;
65 }
66 events.push(RunEvent::case_stdout(name.clone(), log.to_string()));
67 }
68 }
69 SuiteEventPayload::RunEvent(RunEvent::CaseStderr { name, mut stderr_message }) => {
70 if stderr_message.ends_with("\n") {
71 stderr_message.truncate(stderr_message.len() - 1)
72 }
73 let logs = stderr_message.split("\n");
74 for log in logs {
75 events.push(RunEvent::case_stderr(name.clone(), log.to_string()));
76 }
77 }
78 SuiteEventPayload::RunEvent(e) => events.push(e),
79 SuiteEventPayload::SuiteLog { log_stream } => {
80 let t = fasync::Task::spawn(log_stream.collect::<Vec<_>>());
81 log_tasks.push(t);
82 }
83 SuiteEventPayload::TestCaseLog { .. } => {
84 panic!("not supported yet!")
85 }
86 SuiteEventPayload::DebugData { .. } => {
87 panic!("not supported yet!")
88 }
89 }
90 }
91 execution_task.await.context("test execution failed")?;
92
93 let mut collected_logs = vec![];
94 for t in log_tasks {
95 let logs = t.await;
96 for log_result in logs {
97 let log = log_result?;
98 collected_logs
99 .push(AttributedLog { log: log.msg().unwrap().to_string(), moniker: log.moniker });
100 }
101 }
102
103 Ok((events, collected_logs))
104}
105
106pub async fn collect_suite_events_with_watch(
107 suite_instance: SuiteRunInstance,
108 filter_debug_data: bool,
109 compressed_debug_data: bool,
110) -> Result<(Vec<RunEvent>, Vec<AttributedLog>), Error> {
111 let (sender, mut recv) = mpsc::channel(1);
112 let execution_task = fasync::Task::spawn(async move {
113 suite_instance
114 .collect_events_with_watch(sender, filter_debug_data, compressed_debug_data)
115 .await
116 });
117 let mut events = vec![];
118 let mut log_tasks = vec![];
119 while let Some(event) = recv.next().await {
120 match event.payload {
121 SuiteEventPayload::RunEvent(RunEvent::CaseStdout { name, mut stdout_message }) => {
122 if stdout_message.ends_with("\n") {
123 stdout_message.truncate(stdout_message.len() - 1)
124 }
125 let logs = stdout_message.split("\n");
126 for log in logs {
127 if log.contains("Note: Randomizing tests' orders with a seed of") {
130 continue;
131 }
132 events.push(RunEvent::case_stdout(name.clone(), log.to_string()));
133 }
134 }
135 SuiteEventPayload::RunEvent(RunEvent::CaseStderr { name, mut stderr_message }) => {
136 if stderr_message.ends_with("\n") {
137 stderr_message.truncate(stderr_message.len() - 1)
138 }
139 let logs = stderr_message.split("\n");
140 for log in logs {
141 events.push(RunEvent::case_stderr(name.clone(), log.to_string()));
142 }
143 }
144 SuiteEventPayload::RunEvent(e) => events.push(e),
145 SuiteEventPayload::SuiteLog { log_stream } => {
146 let t = fasync::Task::spawn(log_stream.collect::<Vec<_>>());
147 log_tasks.push(t);
148 }
149 SuiteEventPayload::TestCaseLog { .. } => {
150 panic!("not supported yet!")
151 }
152 SuiteEventPayload::DebugData { filename, socket } => {
153 events.push(RunEvent::DebugData { filename, socket })
154 }
155 }
156 }
157 execution_task.await.context("test execution failed")?;
158
159 let mut collected_logs = vec![];
160 for t in log_tasks {
161 let logs = t.await;
162 for log_result in logs {
163 let log = log_result?;
164 collected_logs
165 .push(AttributedLog { log: log.msg().unwrap().to_string(), moniker: log.moniker });
166 }
167 }
168
169 Ok((events, collected_logs))
170}
171
172pub async fn collect_string_from_socket_helper(
174 socket: fidl::Socket,
175 compressed_debug_data: bool,
176) -> Result<String, anyhow::Error> {
177 if !compressed_debug_data {
178 return collect_string_from_socket(socket).await;
179 }
180 let mut async_socket = fidl::AsyncSocket::from_socket(socket);
181 let mut buf = vec![0u8; 1024 * 32];
182
183 let (mut decoder, mut receiver) = Decoder::new();
184 let task: fasync::Task<Result<(), anyhow::Error>> = fasync::Task::spawn(async move {
185 loop {
186 let l = async_socket.read(&mut buf).await?;
187 match l {
188 0 => {
189 decoder.finish().await?;
190 break;
191 }
192 _ => {
193 decoder.decompress(&buf[..l]).await?;
194 }
195 }
196 }
197 Ok(())
198 });
199
200 let mut decompressed_data = Vec::new();
201 while let Some(chunk) = receiver.next().await {
202 decompressed_data.extend_from_slice(&chunk);
203 }
204 task.await?;
205 Ok(String::from_utf8_lossy(decompressed_data.as_slice()).into())
206}
207pub struct SuiteRunner {
209 proxy: ftest_manager::SuiteRunnerProxy,
210}
211
212impl SuiteRunner {
213 pub fn new(proxy: ftest_manager::SuiteRunnerProxy) -> Self {
215 Self { proxy }
216 }
217
218 pub fn take_proxy(self) -> ftest_manager::SuiteRunnerProxy {
219 self.proxy
220 }
221
222 pub fn start_suite_run(
224 &self,
225 test_url: &str,
226 options: ftest_manager::RunSuiteOptions,
227 ) -> Result<SuiteRunInstance, Error> {
228 let (controller_proxy, controller) = fidl::endpoints::create_proxy();
229 self.proxy.run(test_url, options, controller).context("Error starting tests")?;
230
231 return Ok(SuiteRunInstance { controller_proxy: controller_proxy.into() });
232 }
233}
234
235pub struct TestBuilder {
237 proxy: ftest_manager::RunBuilderProxy,
238 filter_debug_data: bool,
239}
240
241impl TestBuilder {
242 pub fn new(proxy: ftest_manager::RunBuilderProxy) -> Self {
244 Self { proxy, filter_debug_data: false }
245 }
246
247 pub fn filter_debug_data(self) -> Self {
252 let Self { proxy, .. } = self;
253 Self { proxy, filter_debug_data: true }
254 }
255
256 pub fn take_proxy(self) -> ftest_manager::RunBuilderProxy {
257 self.proxy
258 }
259
260 pub fn set_scheduling_options(&self, accumulate_debug_data: bool) -> Result<(), Error> {
261 self.proxy
262 .with_scheduling_options(&ftest_manager::SchedulingOptions {
263 accumulate_debug_data: Some(accumulate_debug_data),
264 ..Default::default()
265 })
266 .map_err(Error::from)
267 }
268
269 pub async fn add_suite(
271 &self,
272 test_url: &str,
273 run_options: ftest_manager::RunOptions,
274 ) -> Result<SuiteRunInstance, Error> {
275 let (controller_proxy, controller) = fidl::endpoints::create_proxy();
276 self.proxy.add_suite(test_url, &run_options, controller)?;
277 Ok(SuiteRunInstance { controller_proxy: controller_proxy.into() })
278 }
279
280 pub async fn add_suite_in_realm(
282 &self,
283 realm: fidl::endpoints::ClientEnd<fidl_fuchsia_component::RealmMarker>,
284 offers: &[fidl_fuchsia_component_decl::Offer],
285 test_collection: &str,
286 test_url: &str,
287 run_options: ftest_manager::RunOptions,
288 ) -> Result<SuiteRunInstance, Error> {
289 let (controller_proxy, controller) = fidl::endpoints::create_proxy();
290 self.proxy.add_suite_in_realm(
291 realm,
292 offers,
293 test_collection,
294 test_url,
295 &run_options,
296 controller,
297 )?;
298 Ok(SuiteRunInstance { controller_proxy: controller_proxy.into() })
299 }
300
301 pub async fn run(self) -> Result<Vec<TestRunEvent>, Error> {
305 self.run_with_option(false).await
306 }
307
308 pub async fn run_with_option(
312 self,
313 get_compressed_debug_data: bool,
314 ) -> Result<Vec<TestRunEvent>, Error> {
315 let (controller_proxy, controller) = fidl::endpoints::create_proxy();
316 self.proxy.build(controller).context("Error starting tests")?;
317 let mut events = vec![];
319 loop {
320 let fidl_events = controller_proxy.get_events().await.context("Get run events")?;
321 if fidl_events.is_empty() {
322 break;
323 }
324 for fidl_event in fidl_events {
325 match fidl_event.payload.expect("Details cannot be empty") {
326 ftest_manager::RunEventPayload::Artifact(
327 ftest_manager::Artifact::DebugData(iterator),
328 ) => {
329 if !self.filter_debug_data {
330 let proxy = iterator.into_proxy();
331 loop {
332 let data = match get_compressed_debug_data {
333 true => proxy.get_next_compressed().await?,
334 false => proxy.get_next().await?,
335 };
336 if data.is_empty() {
337 break;
338 }
339 for data_file in data {
340 let socket = data_file.socket.expect("File cannot be empty");
341 events.push(TestRunEvent::debug_data(
342 fidl_event.timestamp,
343 data_file.name.expect("Name cannot be empty"),
344 socket,
345 ));
346 }
347 }
348 }
349 }
350 other => bail!("Expected only debug data run events but got {:?}", other),
351 }
352 }
353 }
354 Ok(events)
355 }
356}
357
358#[derive(Debug)]
359pub struct TestRunEvent {
360 pub timestamp: Option<i64>,
361 pub payload: TestRunEventPayload,
362}
363
364impl TestRunEvent {
365 pub fn debug_data<S: Into<String>>(
366 timestamp: Option<i64>,
367 filename: S,
368 socket: fidl::Socket,
369 ) -> Self {
370 Self {
371 timestamp,
372 payload: TestRunEventPayload::DebugData { filename: filename.into(), socket },
373 }
374 }
375}
376
377#[derive(Debug)]
378pub enum TestRunEventPayload {
379 DebugData { filename: String, socket: fidl::Socket },
380}
381
382pub struct SuiteEvent {
384 pub timestamp: Option<i64>,
385 pub payload: SuiteEventPayload,
386}
387
388impl SuiteEvent {
389 pub fn debug_data<S: Into<String>>(
391 timestamp: Option<i64>,
392 filename: S,
393 socket: fidl::Socket,
394 ) -> Self {
395 Self {
396 timestamp,
397 payload: SuiteEventPayload::DebugData { filename: filename.into(), socket },
398 }
399 }
400
401 pub fn case_found(timestamp: Option<i64>, name: String) -> Self {
402 SuiteEvent { timestamp, payload: SuiteEventPayload::RunEvent(RunEvent::case_found(name)) }
403 }
404
405 pub fn case_started(timestamp: Option<i64>, name: String) -> Self {
406 SuiteEvent { timestamp, payload: SuiteEventPayload::RunEvent(RunEvent::case_started(name)) }
407 }
408
409 pub fn case_stdout<N, L>(timestamp: Option<i64>, name: N, stdout_message: L) -> Self
410 where
411 N: Into<String>,
412 L: Into<String>,
413 {
414 SuiteEvent {
415 timestamp,
416 payload: SuiteEventPayload::RunEvent(RunEvent::case_stdout(
417 name.into(),
418 stdout_message.into(),
419 )),
420 }
421 }
422
423 pub fn case_stderr<N, L>(timestamp: Option<i64>, name: N, stderr_message: L) -> Self
424 where
425 N: Into<String>,
426 L: Into<String>,
427 {
428 SuiteEvent {
429 timestamp,
430 payload: SuiteEventPayload::RunEvent(RunEvent::case_stderr(
431 name.into(),
432 stderr_message.into(),
433 )),
434 }
435 }
436
437 pub fn case_stopped(
438 timestamp: Option<i64>,
439 name: String,
440 status: ftest_manager::CaseStatus,
441 ) -> Self {
442 SuiteEvent {
443 timestamp,
444 payload: SuiteEventPayload::RunEvent(RunEvent::case_stopped(name, status)),
445 }
446 }
447
448 pub fn case_finished(timestamp: Option<i64>, name: String) -> Self {
449 SuiteEvent {
450 timestamp,
451 payload: SuiteEventPayload::RunEvent(RunEvent::case_finished(name)),
452 }
453 }
454
455 pub fn suite_stopped(timestamp: Option<i64>, status: ftest_manager::SuiteStatus) -> Self {
456 SuiteEvent {
457 timestamp,
458 payload: SuiteEventPayload::RunEvent(RunEvent::suite_stopped(status)),
459 }
460 }
461
462 pub fn suite_custom(
463 timestamp: Option<i64>,
464 component: String,
465 filename: String,
466 contents: String,
467 ) -> Self {
468 SuiteEvent {
469 timestamp,
470 payload: SuiteEventPayload::RunEvent(RunEvent::suite_custom(
471 component, filename, contents,
472 )),
473 }
474 }
475
476 pub fn suite_log(timestamp: Option<i64>, log_stream: LogStream) -> Self {
477 SuiteEvent { timestamp, payload: SuiteEventPayload::SuiteLog { log_stream } }
478 }
479
480 pub fn test_case_log(timestamp: Option<i64>, name: String, log_stream: LogStream) -> Self {
481 SuiteEvent { timestamp, payload: SuiteEventPayload::TestCaseLog { name, log_stream } }
482 }
483}
484
485pub enum SuiteEventPayload {
486 SuiteLog {
488 log_stream: LogStream,
489 },
490
491 TestCaseLog {
493 name: String,
494 log_stream: LogStream,
495 },
496
497 RunEvent(RunEvent),
499
500 DebugData {
502 filename: String,
503 socket: fidl::Socket,
504 },
505}
506
507#[derive(PartialEq, Debug, Eq, Hash, Ord, PartialOrd)]
508pub enum RunEvent {
509 CaseFound { name: String },
510 CaseStarted { name: String },
511 CaseStdout { name: String, stdout_message: String },
512 CaseStderr { name: String, stderr_message: String },
513 CaseStopped { name: String, status: ftest_manager::CaseStatus },
514 CaseFinished { name: String },
515 SuiteStarted,
516 SuiteCustom { component: String, filename: String, contents: String },
517 SuiteStopped { status: ftest_manager::SuiteStatus },
518 DebugData { filename: String, socket: fidl::Socket },
519}
520
521impl RunEvent {
522 pub fn case_found<S>(name: S) -> Self
523 where
524 S: Into<String>,
525 {
526 Self::CaseFound { name: name.into() }
527 }
528
529 pub fn case_started<S>(name: S) -> Self
530 where
531 S: Into<String>,
532 {
533 Self::CaseStarted { name: name.into() }
534 }
535
536 pub fn case_stdout<S, L>(name: S, stdout_message: L) -> Self
537 where
538 S: Into<String>,
539 L: Into<String>,
540 {
541 Self::CaseStdout { name: name.into(), stdout_message: stdout_message.into() }
542 }
543
544 pub fn case_stderr<S, L>(name: S, stderr_message: L) -> Self
545 where
546 S: Into<String>,
547 L: Into<String>,
548 {
549 Self::CaseStderr { name: name.into(), stderr_message: stderr_message.into() }
550 }
551
552 pub fn case_stopped<S>(name: S, status: ftest_manager::CaseStatus) -> Self
553 where
554 S: Into<String>,
555 {
556 Self::CaseStopped { name: name.into(), status }
557 }
558
559 pub fn case_finished<S>(name: S) -> Self
560 where
561 S: Into<String>,
562 {
563 Self::CaseFinished { name: name.into() }
564 }
565
566 pub fn suite_started() -> Self {
567 Self::SuiteStarted
568 }
569
570 pub fn suite_custom<T, U, V>(component: T, filename: U, contents: V) -> Self
571 where
572 T: Into<String>,
573 U: Into<String>,
574 V: Into<String>,
575 {
576 Self::SuiteCustom {
577 component: component.into(),
578 filename: filename.into(),
579 contents: contents.into(),
580 }
581 }
582
583 pub fn suite_stopped(status: ftest_manager::SuiteStatus) -> Self {
584 Self::SuiteStopped { status }
585 }
586
587 pub fn debug_data<S>(filename: S, socket: fidl::Socket) -> Self
588 where
589 S: Into<String>,
590 {
591 Self::DebugData { filename: filename.into(), socket }
592 }
593
594 pub fn test_case_name(&self) -> Option<&String> {
596 match self {
597 RunEvent::CaseFound { name }
598 | RunEvent::CaseStarted { name }
599 | RunEvent::CaseStdout { name, .. }
600 | RunEvent::CaseStderr { name, .. }
601 | RunEvent::CaseStopped { name, .. }
602 | RunEvent::CaseFinished { name } => Some(name),
603 RunEvent::SuiteStarted
604 | RunEvent::SuiteStopped { .. }
605 | RunEvent::SuiteCustom { .. }
606 | RunEvent::DebugData { .. } => None,
607 }
608 }
609
610 pub fn owned_test_case_name(&self) -> Option<String> {
612 self.test_case_name().map(String::from)
613 }
614}
615
616#[derive(Default, Debug, Eq, PartialEq)]
619pub struct GroupedRunEvents {
620 pub non_artifact_events: Vec<RunEvent>,
622 pub stdout_events: Vec<RunEvent>,
624 pub stderr_events: Vec<RunEvent>,
626}
627
628pub trait GroupRunEventByTestCase: Iterator<Item = RunEvent> + Sized {
630 fn group_by_test_case_ordered(self) -> LinkedHashMap<Option<String>, GroupedRunEvents> {
645 let mut map = LinkedHashMap::new();
646 for run_event in self {
647 match run_event {
648 RunEvent::CaseStderr { .. } => map
649 .entry(run_event.owned_test_case_name())
650 .or_insert(GroupedRunEvents::default())
651 .stderr_events
652 .push(run_event),
653
654 RunEvent::CaseStdout { .. } => map
655 .entry(run_event.owned_test_case_name())
656 .or_insert(GroupedRunEvents::default())
657 .stdout_events
658 .push(run_event),
659
660 _ => map
661 .entry(run_event.owned_test_case_name())
662 .or_insert(GroupedRunEvents::default())
663 .non_artifact_events
664 .push(run_event),
665 }
666 }
667 map
668 }
669
670 fn group_by_test_case_unordered(self) -> HashMap<Option<String>, GroupedRunEvents> {
673 let mut map = HashMap::new();
674 for run_event in self {
675 match run_event {
676 RunEvent::CaseStderr { .. } => map
677 .entry(run_event.owned_test_case_name())
678 .or_insert(GroupedRunEvents::default())
679 .stderr_events
680 .push(run_event),
681
682 RunEvent::CaseStdout { .. } => map
683 .entry(run_event.owned_test_case_name())
684 .or_insert(GroupedRunEvents::default())
685 .stdout_events
686 .push(run_event),
687
688 _ => map
689 .entry(run_event.owned_test_case_name())
690 .or_insert(GroupedRunEvents::default())
691 .non_artifact_events
692 .push(run_event),
693 }
694 }
695 map
696 }
697
698 fn group(self) -> GroupedRunEvents {
700 let mut events = GroupedRunEvents::default();
701 for run_event in self {
702 match run_event {
703 RunEvent::CaseStderr { .. } => events.stderr_events.push(run_event),
704
705 RunEvent::CaseStdout { .. } => events.stdout_events.push(run_event),
706
707 _ => events.non_artifact_events.push(run_event),
708 }
709 }
710 events
711 }
712}
713
714impl<T> GroupRunEventByTestCase for T where T: Iterator<Item = RunEvent> + Sized {}
715
716#[derive(Default)]
717struct FidlSuiteEventProcessor {
718 case_map: HashMap<u32, String>,
719 std_output_map: HashMap<u32, Vec<fasync::Task<Result<(), Error>>>>,
720}
721
722impl FidlSuiteEventProcessor {
723 fn new() -> Self {
724 FidlSuiteEventProcessor::default()
725 }
726
727 fn get_test_case_name(&self, identifier: u32) -> String {
728 self.case_map
729 .get(&identifier)
730 .unwrap_or_else(|| panic!("invalid test case identifier: {:?}", identifier))
731 .clone()
732 }
733
734 async fn process(
735 &mut self,
736 event: FidlSuiteEvent,
737 mut sender: mpsc::Sender<SuiteEvent>,
738 ) -> Result<(), Error> {
739 let timestamp = event.timestamp;
740 let e = match event.payload.expect("Details cannot be null, please file bug.") {
741 FidlSuiteEventPayload::CaseFound(cf) => {
742 self.case_map.insert(cf.identifier, cf.test_case_name.clone());
743 SuiteEvent::case_found(timestamp, cf.test_case_name).into()
744 }
745 FidlSuiteEventPayload::CaseStarted(cs) => {
746 let test_case_name = self.get_test_case_name(cs.identifier);
747 SuiteEvent::case_started(timestamp, test_case_name).into()
748 }
749 FidlSuiteEventPayload::CaseStopped(cs) => {
750 let test_case_name = self.get_test_case_name(cs.identifier);
751 if let Some(outputs) = self.std_output_map.remove(&cs.identifier) {
752 for s in outputs {
753 s.await.context(format!(
754 "error collecting stdout/stderr of {}",
755 test_case_name
756 ))?;
757 }
758 }
759 SuiteEvent::case_stopped(timestamp, test_case_name, cs.status).into()
760 }
761 FidlSuiteEventPayload::CaseFinished(cf) => {
762 let test_case_name = self.get_test_case_name(cf.identifier);
763 SuiteEvent::case_finished(timestamp, test_case_name).into()
764 }
765 FidlSuiteEventPayload::CaseArtifact(ca) => {
766 let name = self.get_test_case_name(ca.identifier);
767 match ca.artifact {
768 ftest_manager::Artifact::Stdout(stdout) => {
769 let (s, mut r) = mpsc::channel(1024);
770 let stdout_task =
771 fasync::Task::spawn(collect_and_send_string_output(stdout, s));
772 let mut sender_clone = sender.clone();
773 let send_stdout_task = fasync::Task::spawn(async move {
774 while let Some(msg) = r.next().await {
775 sender_clone
776 .send(SuiteEvent::case_stdout(None, &name, msg))
777 .await
778 .context(format!("cannot send logs for {}", name))?;
779 }
780 Ok(())
781 });
782 match self.std_output_map.get_mut(&ca.identifier) {
783 Some(v) => {
784 v.push(stdout_task);
785 v.push(send_stdout_task);
786 }
787 None => {
788 self.std_output_map
789 .insert(ca.identifier, vec![stdout_task, send_stdout_task]);
790 }
791 }
792 None
793 }
794 ftest_manager::Artifact::Stderr(stderr) => {
795 let (s, mut r) = mpsc::channel(1024);
796 let stderr_task =
797 fasync::Task::spawn(collect_and_send_string_output(stderr, s));
798 let mut sender_clone = sender.clone();
799 let send_stderr_task = fasync::Task::spawn(async move {
800 while let Some(msg) = r.next().await {
801 sender_clone
802 .send(SuiteEvent::case_stderr(None, &name, msg))
803 .await
804 .context(format!("cannot send logs for {}", name))?;
805 }
806 Ok(())
807 });
808 match self.std_output_map.get_mut(&ca.identifier) {
809 Some(v) => {
810 v.push(stderr_task);
811 v.push(send_stderr_task);
812 }
813 None => {
814 self.std_output_map
815 .insert(ca.identifier, vec![stderr_task, send_stderr_task]);
816 }
817 }
818 None
819 }
820 ftest_manager::Artifact::Log(log) => match LogStream::from_syslog(log) {
821 Ok(log_stream) => {
822 SuiteEvent::test_case_log(timestamp, name, log_stream).into()
823 }
824 Err(e) => {
825 warn!("Cannot collect logs for test suite: {:?}", e);
826 None
827 }
828 },
829 _ => {
830 panic!("not supported")
831 }
832 }
833 }
834 FidlSuiteEventPayload::SuiteArtifact(sa) => match sa.artifact {
835 ftest_manager::Artifact::Stdout(_) => {
836 panic!("not supported")
837 }
838 ftest_manager::Artifact::Stderr(_) => {
839 panic!("not supported")
840 }
841 ftest_manager::Artifact::Log(log) => match LogStream::from_syslog(log) {
842 Ok(log_stream) => SuiteEvent::suite_log(timestamp, log_stream).into(),
843 Err(e) => {
844 warn!("Cannot collect logs for test suite: {:?}", e);
845 None
846 }
847 },
848 ftest_manager::Artifact::Custom(custom_artifact) => {
849 let ftest_manager::DirectoryAndToken { directory, token } =
850 custom_artifact.directory_and_token.unwrap();
851 let component_moniker = custom_artifact.component_moniker.unwrap();
852 let mut sender_clone = sender.clone();
853 fasync::Task::spawn(async move {
854 let directory = directory.into_proxy();
855 let entries: Vec<_> =
856 fuchsia_fs::directory::readdir_recursive(&directory, None)
857 .try_collect()
858 .await
859 .expect("read custom artifact directory");
860 for entry in entries.into_iter() {
861 let file = fuchsia_fs::directory::open_file_async(
862 &directory,
863 &entry.name,
864 fio::PERM_READABLE,
865 )
866 .unwrap();
867 let contents = fuchsia_fs::file::read_to_string(&file).await.unwrap();
868 sender_clone
869 .send(SuiteEvent::suite_custom(
870 timestamp,
871 component_moniker.clone(),
872 entry.name,
873 contents,
874 ))
875 .await
876 .unwrap();
877 }
878 drop(token);
881 })
882 .detach();
883 None
884 }
885 _ => {
886 panic!("not supported")
887 }
888 },
889 FidlSuiteEventPayload::SuiteStarted(_started) => SuiteEvent {
890 timestamp,
891 payload: SuiteEventPayload::RunEvent(RunEvent::SuiteStarted),
892 }
893 .into(),
894 FidlSuiteEventPayload::SuiteStopped(stopped) => SuiteEvent {
895 timestamp,
896 payload: SuiteEventPayload::RunEvent(RunEvent::SuiteStopped {
897 status: stopped.status,
898 }),
899 }
900 .into(),
901 SuiteEventPayloadUnknown!() => panic!("Unrecognized SuiteEvent"),
902 };
903 if let Some(item) = e {
904 sender.send(item).await.context("Cannot send event")?;
905 }
906 Ok(())
907 }
908
909 async fn process_event(
910 &mut self,
911 event: ftest_manager::Event,
912 mut sender: mpsc::Sender<SuiteEvent>,
913 filter_debug_data: bool,
914 compressed_debug_data: bool,
915 ) -> Result<(), Error> {
916 let timestamp = event.timestamp;
917 let e = match event.details.expect("Details cannot be null, please file bug.") {
918 ftest_manager::EventDetails::TestCaseFound(cf) => {
919 let test_case_name =
920 cf.test_case_name.expect("test_case_name must be specified, please file bug.");
921 self.case_map.insert(
922 cf.test_case_id.expect("test_case_id must be specified, please file bug."),
923 test_case_name.clone(),
924 );
925 SuiteEvent::case_found(timestamp, test_case_name).into()
926 }
927 ftest_manager::EventDetails::TestCaseStarted(cs) => {
928 let test_case_name = self.get_test_case_name(
929 cs.test_case_id.expect("test_case_id must be specified, please file bug."),
930 );
931 SuiteEvent::case_started(timestamp, test_case_name).into()
932 }
933 ftest_manager::EventDetails::TestCaseStopped(cs) => {
934 let test_case_name = self.get_test_case_name(
935 cs.test_case_id.expect("test_case_id must be specified, please file bug."),
936 );
937 if let Some(outputs) = self.std_output_map.remove(
938 &cs.test_case_id.expect("test_case_id must be specified, please file bug."),
939 ) {
940 for s in outputs {
941 s.await.context(format!(
942 "error collecting stdout/stderr of {}",
943 test_case_name
944 ))?;
945 }
946 }
947 SuiteEvent::case_stopped(
948 timestamp,
949 test_case_name,
950 to_case_status(cs.result.expect("result must be specified, please file bug.")),
951 )
952 .into()
953 }
954 ftest_manager::EventDetails::TestCaseFinished(cf) => {
955 let test_case_name = self.get_test_case_name(
956 cf.test_case_id.expect("test_case_id must be specified, please file bug."),
957 );
958 SuiteEvent::case_finished(timestamp, test_case_name).into()
959 }
960 ftest_manager::EventDetails::TestCaseArtifactGenerated(ca) => {
961 let name = self.get_test_case_name(
962 ca.test_case_id.expect("test_case_id must be specified, please file bug."),
963 );
964 match ca.artifact.expect("artifact must be specified, please file bug.") {
965 ftest_manager::Artifact::Stdout(stdout) => {
966 let (s, mut r) = mpsc::channel(1024);
967 let stdout_task =
968 fasync::Task::spawn(collect_and_send_string_output(stdout, s));
969 let mut sender_clone = sender.clone();
970 let send_stdout_task = fasync::Task::spawn(async move {
971 while let Some(msg) = r.next().await {
972 sender_clone
973 .send(SuiteEvent::case_stdout(None, &name, msg))
974 .await
975 .context(format!("cannot send logs for {}", name))?;
976 }
977 Ok(())
978 });
979 match self.std_output_map.get_mut(
980 &ca.test_case_id
981 .expect("test_case_id must be specified, please file bug."),
982 ) {
983 Some(v) => {
984 v.push(stdout_task);
985 v.push(send_stdout_task);
986 }
987 None => {
988 self.std_output_map.insert(
989 ca.test_case_id
990 .expect("test_case_id must be specified, please file bug."),
991 vec![stdout_task, send_stdout_task],
992 );
993 }
994 }
995 None
996 }
997 ftest_manager::Artifact::Stderr(stderr) => {
998 let (s, mut r) = mpsc::channel(1024);
999 let stderr_task =
1000 fasync::Task::spawn(collect_and_send_string_output(stderr, s));
1001 let mut sender_clone = sender.clone();
1002 let send_stderr_task = fasync::Task::spawn(async move {
1003 while let Some(msg) = r.next().await {
1004 sender_clone
1005 .send(SuiteEvent::case_stderr(None, &name, msg))
1006 .await
1007 .context(format!("cannot send logs for {}", name))?;
1008 }
1009 Ok(())
1010 });
1011 match self.std_output_map.get_mut(
1012 &ca.test_case_id
1013 .expect("test_case_id must be specified, please file bug."),
1014 ) {
1015 Some(v) => {
1016 v.push(stderr_task);
1017 v.push(send_stderr_task);
1018 }
1019 None => {
1020 self.std_output_map.insert(
1021 ca.test_case_id
1022 .expect("test_case_id must be specified, please file bug."),
1023 vec![stderr_task, send_stderr_task],
1024 );
1025 }
1026 }
1027 None
1028 }
1029 ftest_manager::Artifact::Log(log) => match LogStream::from_syslog(log) {
1030 Ok(log_stream) => {
1031 SuiteEvent::test_case_log(timestamp, name, log_stream).into()
1032 }
1033 Err(e) => {
1034 warn!("Cannot collect logs for test suite: {:?}", e);
1035 None
1036 }
1037 },
1038 _ => {
1039 panic!("not supported")
1040 }
1041 }
1042 }
1043 ftest_manager::EventDetails::SuiteArtifactGenerated(sa) => {
1044 match sa.artifact.expect("artifact must be specified, please file bug.") {
1045 ftest_manager::Artifact::Stdout(_) => {
1046 panic!("not supported")
1047 }
1048 ftest_manager::Artifact::Stderr(_) => {
1049 panic!("not supported")
1050 }
1051 ftest_manager::Artifact::Log(log) => match LogStream::from_syslog(log) {
1052 Ok(log_stream) => SuiteEvent::suite_log(timestamp, log_stream).into(),
1053 Err(e) => {
1054 warn!("Cannot collect logs for test suite: {:?}", e);
1055 None
1056 }
1057 },
1058 ftest_manager::Artifact::Custom(custom_artifact) => {
1059 let ftest_manager::DirectoryAndToken { directory, token } =
1060 custom_artifact.directory_and_token.unwrap();
1061 let component_moniker = custom_artifact.component_moniker.unwrap();
1062 let mut sender_clone = sender.clone();
1063 fasync::Task::spawn(async move {
1064 let directory = directory.into_proxy();
1065 let entries: Vec<_> =
1066 fuchsia_fs::directory::readdir_recursive(&directory, None)
1067 .try_collect()
1068 .await
1069 .expect("read custom artifact directory");
1070 for entry in entries.into_iter() {
1071 let file = fuchsia_fs::directory::open_file_async(
1072 &directory,
1073 &entry.name,
1074 fio::PERM_READABLE,
1075 )
1076 .unwrap();
1077 let contents =
1078 fuchsia_fs::file::read_to_string(&file).await.unwrap();
1079 sender_clone
1080 .send(SuiteEvent::suite_custom(
1081 timestamp,
1082 component_moniker.clone(),
1083 entry.name,
1084 contents,
1085 ))
1086 .await
1087 .unwrap();
1088 }
1089 drop(token);
1092 })
1093 .detach();
1094 None
1095 }
1096 ftest_manager::Artifact::DebugData(iterator) => {
1097 if !filter_debug_data {
1098 let mut sender_clone = sender.clone();
1099 let proxy = iterator.into_proxy();
1100 fasync::Task::spawn(async move {
1101 loop {
1102 let data = match compressed_debug_data {
1103 true => proxy.get_next_compressed().await.unwrap(),
1104 false => proxy.get_next().await.unwrap(),
1105 };
1106 if data.is_empty() {
1107 break;
1108 }
1109 for data_file in data {
1110 let socket =
1111 data_file.socket.expect("File cannot be empty");
1112 sender_clone
1113 .send(SuiteEvent::debug_data(
1114 timestamp,
1115 data_file.name.expect("Name cannot be empty"),
1116 socket,
1117 ))
1118 .await
1119 .unwrap();
1120 }
1121 }
1122 })
1123 .detach();
1124 }
1125 None
1126 }
1127 _ => {
1128 panic!("not supported")
1129 }
1130 }
1131 }
1132 ftest_manager::EventDetails::SuiteStarted(_started) => SuiteEvent {
1133 timestamp,
1134 payload: SuiteEventPayload::RunEvent(RunEvent::SuiteStarted),
1135 }
1136 .into(),
1137 ftest_manager::EventDetails::SuiteStopped(stopped) => SuiteEvent {
1138 timestamp,
1139 payload: SuiteEventPayload::RunEvent(RunEvent::SuiteStopped {
1140 status: to_suite_status(
1141 stopped.result.expect("result must be specified, please file bug."),
1142 ),
1143 }),
1144 }
1145 .into(),
1146 SuiteEventPayloadUnknown!() => panic!("Unrecognized SuiteEvent"),
1147 };
1148 if let Some(item) = e {
1149 sender.send(item).await.context("Cannot send event")?;
1150 }
1151 Ok(())
1152 }
1153}
1154
1155#[derive(Debug, thiserror::Error, Eq, PartialEq, Copy, Clone)]
1156pub enum SuiteLaunchError {
1157 #[error("Cannot enumerate tests")]
1158 CaseEnumeration,
1159
1160 #[error("Cannot resolve test url")]
1161 InstanceCannotResolve,
1162
1163 #[error("Invalid arguments passed")]
1164 InvalidArgs,
1165
1166 #[error("Cannot connect to test suite")]
1167 FailedToConnectToTestSuite,
1168
1169 #[error("resource unavailable")]
1170 ResourceUnavailable,
1171
1172 #[error("Some internal error occurred. Please file bug")]
1173 InternalError,
1174
1175 #[error("No test cases matched the provided filters")]
1176 NoMatchingCases,
1177}
1178
1179impl From<ftest_manager::LaunchError> for SuiteLaunchError {
1180 fn from(error: ftest_manager::LaunchError) -> Self {
1181 match error {
1182 ftest_manager::LaunchError::ResourceUnavailable => {
1183 SuiteLaunchError::ResourceUnavailable
1184 }
1185 ftest_manager::LaunchError::InstanceCannotResolve => {
1186 SuiteLaunchError::InstanceCannotResolve
1187 }
1188 ftest_manager::LaunchError::InvalidArgs => SuiteLaunchError::InvalidArgs,
1189 ftest_manager::LaunchError::FailedToConnectToTestSuite => {
1190 SuiteLaunchError::FailedToConnectToTestSuite
1191 }
1192 ftest_manager::LaunchError::CaseEnumeration => SuiteLaunchError::CaseEnumeration,
1193 ftest_manager::LaunchError::InternalError => SuiteLaunchError::InternalError,
1194 ftest_manager::LaunchError::NoMatchingCases => SuiteLaunchError::NoMatchingCases,
1195 ftest_manager::LaunchErrorUnknown!() => panic!("Encountered unknown launch error"),
1196 }
1197 }
1198}
1199
1200pub struct SuiteRunInstance {
1202 controller_proxy: Arc<SuiteControllerProxy>,
1203}
1204
1205impl SuiteRunInstance {
1206 pub fn controller(&self) -> Arc<SuiteControllerProxy> {
1207 self.controller_proxy.clone()
1208 }
1209
1210 pub async fn collect_events(&self, sender: mpsc::Sender<SuiteEvent>) -> Result<(), Error> {
1211 let controller_proxy = self.controller_proxy.clone();
1212 let mut processor = FidlSuiteEventProcessor::new();
1213 loop {
1214 match controller_proxy.get_events().await? {
1215 Err(e) => return Err(SuiteLaunchError::from(e).into()),
1216 Ok(events) => {
1217 if events.len() == 0 {
1218 break;
1219 }
1220 for event in events {
1221 if let Err(e) = processor.process(event, sender.clone()).await {
1222 warn!("error running test suite: {:?}", e);
1223 let _ = controller_proxy.kill();
1224 return Ok(());
1225 }
1226 }
1227 }
1228 }
1229 }
1230 Ok(())
1231 }
1232
1233 pub async fn collect_events_with_watch(
1234 &self,
1235 sender: mpsc::Sender<SuiteEvent>,
1236 filter_debug_data: bool,
1237 compressed_debug_data: bool,
1238 ) -> Result<(), Error> {
1239 let controller_proxy = self.controller_proxy.clone();
1240 let mut processor = FidlSuiteEventProcessor::new();
1241 loop {
1242 match controller_proxy.watch_events().await? {
1243 Err(e) => return Err(SuiteLaunchError::from(e).into()),
1244 Ok(events) => {
1245 if events.len() == 0 {
1246 break;
1247 }
1248 for event in events {
1249 if let Err(e) = processor
1250 .process_event(
1251 event,
1252 sender.clone(),
1253 filter_debug_data,
1254 compressed_debug_data,
1255 )
1256 .await
1257 {
1258 warn!("error running test suite: {:?}", e);
1259 let _ = controller_proxy.kill();
1260 return Ok(());
1261 }
1262 }
1263 }
1264 }
1265 }
1266 Ok(())
1267 }
1268}
1269
1270fn to_case_status(outcome: ftest_manager::TestCaseResult) -> ftest_manager::CaseStatus {
1271 match outcome {
1272 ftest_manager::TestCaseResult::Passed => ftest_manager::CaseStatus::Passed,
1273 ftest_manager::TestCaseResult::Failed => ftest_manager::CaseStatus::Failed,
1274 ftest_manager::TestCaseResult::TimedOut => ftest_manager::CaseStatus::TimedOut,
1275 ftest_manager::TestCaseResult::Skipped => ftest_manager::CaseStatus::Skipped,
1276 ftest_manager::TestCaseResult::Error => ftest_manager::CaseStatus::Error,
1277 _ => ftest_manager::CaseStatus::Error,
1278 }
1279}
1280
1281fn to_suite_status(outcome: ftest_manager::SuiteResult) -> ftest_manager::SuiteStatus {
1282 match outcome {
1283 ftest_manager::SuiteResult::Finished => ftest_manager::SuiteStatus::Passed,
1284 ftest_manager::SuiteResult::Failed => ftest_manager::SuiteStatus::Failed,
1285 ftest_manager::SuiteResult::DidNotFinish => ftest_manager::SuiteStatus::DidNotFinish,
1286 ftest_manager::SuiteResult::TimedOut => ftest_manager::SuiteStatus::TimedOut,
1287 ftest_manager::SuiteResult::Stopped => ftest_manager::SuiteStatus::Stopped,
1288 ftest_manager::SuiteResult::InternalError => ftest_manager::SuiteStatus::InternalError,
1289 _ => ftest_manager::SuiteStatus::InternalError,
1290 }
1291}