1use crate::above_root_capabilities::AboveRootCapabilitiesForTest;
6use crate::constants::{self, HERMETIC_TESTS_COLLECTION};
7use crate::debug_data_processor::{DebugDataDirectory, DebugDataProcessor, DebugDataSender};
8use crate::error::*;
9use crate::facet::SuiteFacets;
10use crate::run_events::{RunEvent, SuiteEvents};
11use crate::scheduler::Scheduler;
12use crate::self_diagnostics::DiagnosticNode;
13use crate::{facet, running_suite, scheduler};
14use anyhow::Error;
15use fidl::endpoints::{ControlHandle, Responder};
16use fidl_fuchsia_component::RealmProxy;
17use fidl_fuchsia_component_resolution::ResolverProxy;
18use fidl_fuchsia_pkg::PackageResolverProxy;
19use ftest_manager::{
20 LaunchError, RunControllerRequest, RunControllerRequestStream, SchedulingOptions,
21 SuiteControllerRequest, SuiteControllerRequestStream,
22};
23use futures::channel::{mpsc, oneshot};
24use futures::future::Either;
25use futures::prelude::*;
26use futures::StreamExt;
27use log::{error, info, warn};
28use std::sync::Arc;
29use {
30 fidl_fuchsia_component_test as ftest, fidl_fuchsia_test_manager as ftest_manager,
31 fuchsia_async as fasync,
32};
33
34const EXECUTION_PROPERTY: &'static str = "execution";
35
36pub(crate) struct SuiteRealm {
37 pub realm_proxy: RealmProxy,
38 pub offers: Vec<ftest::Capability>,
39 pub test_collection: String,
40}
41
42pub(crate) struct Suite {
43 pub test_url: String,
44 pub options: ftest_manager::RunOptions,
45 pub controller: SuiteControllerRequestStream,
46 pub resolver: Arc<ResolverProxy>,
47 pub pkg_resolver: Arc<PackageResolverProxy>,
48 pub above_root_capabilities_for_test: Arc<AboveRootCapabilitiesForTest>,
49 pub facets: facet::ResolveStatus,
50 pub realm: Option<SuiteRealm>,
51}
52
53pub(crate) struct TestRunBuilder {
54 pub suites: Vec<Suite>,
55}
56
57impl TestRunBuilder {
58 async fn run_controller(
63 mut controller: RunControllerRequestStream,
64 run_task: futures::future::RemoteHandle<()>,
65 stop_sender: oneshot::Sender<()>,
66 event_recv: mpsc::Receiver<RunEvent>,
67 diagnostics: DiagnosticNode,
68 ) {
69 let mut run_task = Some(run_task);
70 let mut stop_sender = Some(stop_sender);
71 let (events_responder_sender, mut events_responder_recv) = mpsc::unbounded();
72 let diagnostics_ref = &diagnostics;
73
74 let serve_controller_fut = async move {
75 while let Some(request) = controller.try_next().await? {
76 match request {
77 RunControllerRequest::Stop { .. } => {
78 diagnostics_ref.set_flag("stopped");
79 if let Some(stop_sender) = stop_sender.take() {
80 let _ = stop_sender.send(());
82 }
86 }
87 RunControllerRequest::Kill { .. } => {
88 diagnostics_ref.set_flag("killed");
89 drop(run_task.take());
91 }
95 RunControllerRequest::GetEvents { responder } => {
96 events_responder_sender.unbounded_send(responder).unwrap_or_else(|e| {
97 e.into_inner().drop_without_shutdown();
100 })
101 }
102 RunControllerRequest::_UnknownMethod { ordinal, control_handle, .. } => {
103 warn!(
104 "Unknown run controller request received: {}, closing connection",
105 ordinal
106 );
107 drop(run_task.take());
109 control_handle.shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
110 break;
111 }
112 }
113 }
114 Result::<(), Error>::Ok(())
115 };
116
117 let get_events_fut = async move {
118 let mut event_chunks = event_recv.map(RunEvent::into).ready_chunks(EVENTS_THRESHOLD);
119 while let Some(responder) = events_responder_recv.next().await {
120 diagnostics_ref.set_property("events", "awaiting");
121 let next_chunk = event_chunks.next().await.unwrap_or_default();
122 diagnostics_ref.set_property("events", "idle");
123 let done = next_chunk.is_empty();
124 responder.send(next_chunk)?;
125 if done {
126 diagnostics_ref.set_flag("events_drained");
127 break;
128 }
129 }
130 events_responder_recv.close();
133 events_responder_recv
134 .for_each(|responder| async move {
135 responder.drop_without_shutdown();
136 })
137 .await;
138 Result::<(), Error>::Ok(())
139 };
140
141 match futures::future::select(serve_controller_fut.boxed(), get_events_fut.boxed()).await {
142 Either::Left((serve_result, _fut)) => {
143 if let Err(e) = serve_result {
144 warn!(diagnostics:?; "Error serving RunController: {:?}", e);
145 }
146 }
147 Either::Right((get_events_result, serve_fut)) => {
148 if let Err(e) = get_events_result {
149 warn!(diagnostics:?; "Error sending events for RunController: {:?}", e);
150 }
151 if let Err(e) = serve_fut.await {
155 warn!(diagnostics:?; "Error serving RunController: {:?}", e);
156 }
157 }
158 }
159 }
160
161 pub(crate) async fn run(
162 self,
163 controller: RunControllerRequestStream,
164 diagnostics: DiagnosticNode,
165 scheduling_options: Option<SchedulingOptions>,
166 ) {
167 let (stop_sender, mut stop_recv) = oneshot::channel::<()>();
168 let (event_sender, event_recv) = mpsc::channel::<RunEvent>(16);
169
170 let diagnostics_ref = &diagnostics;
171
172 let max_parallel_suites = match &scheduling_options {
173 Some(options) => options.max_parallel_suites,
174 None => None,
175 };
176 let max_parallel_suites_ref = &max_parallel_suites;
177 let accumulate_debug_data = scheduling_options
178 .as_ref()
179 .and_then(|options| options.accumulate_debug_data)
180 .unwrap_or(false);
181 let debug_data_directory = match accumulate_debug_data {
182 true => DebugDataDirectory::Accumulating { dir: constants::DEBUG_DATA_FOR_SCP },
183 false => DebugDataDirectory::Isolated { parent: constants::ISOLATED_TMP },
184 };
185 let (debug_data_processor, debug_data_sender) =
186 DebugDataProcessor::new(debug_data_directory);
187
188 let debug_task = fasync::Task::local(
189 debug_data_processor
190 .collect_and_serve(event_sender)
191 .unwrap_or_else(|err| warn!(err:?; "Error serving debug data")),
192 );
193
194 let suite_scheduler_fut = async move {
196 diagnostics_ref.set_property(EXECUTION_PROPERTY, "executing");
197
198 let serial_executor = scheduler::SerialScheduler {};
199
200 match max_parallel_suites_ref {
201 Some(max_parallel_suites) => {
202 let parallel_executor = scheduler::ParallelScheduler {
203 suite_runner: scheduler::RunSuiteObj {},
204 max_parallel_suites: *max_parallel_suites,
205 };
206 let get_facets_fn = |test_url, resolver| async move {
207 facet::get_suite_facets(test_url, resolver).await
208 };
209 let (serial_suites, parallel_suites) =
210 split_suites_by_hermeticity(self.suites, get_facets_fn).await;
211
212 parallel_executor
213 .execute(
214 parallel_suites,
215 diagnostics_ref.child("parallel_executor"),
216 &mut stop_recv,
217 debug_data_sender.clone(),
218 )
219 .await;
220 serial_executor
221 .execute(
222 serial_suites,
223 diagnostics_ref.child("serial_executor"),
224 &mut stop_recv,
225 debug_data_sender.clone(),
226 )
227 .await;
228 }
229 None => {
230 serial_executor
231 .execute(
232 self.suites,
233 diagnostics_ref.child("serial_executor"),
234 &mut stop_recv,
235 debug_data_sender.clone(),
236 )
237 .await;
238 }
239 }
240
241 drop(debug_data_sender); diagnostics_ref.set_property(EXECUTION_PROPERTY, "complete");
244 };
245
246 let (remote, remote_handle) = suite_scheduler_fut.remote_handle();
247
248 let ((), ()) = futures::future::join(
249 remote.then(|_| async move {
250 debug_task.await;
251 }),
252 Self::run_controller(
253 controller,
254 remote_handle,
255 stop_sender,
256 event_recv,
257 diagnostics.child("controller"),
258 ),
259 )
260 .await;
261 }
262}
263
264const EVENTS_THRESHOLD: usize = 50;
267
268impl Suite {
269 pub(crate) async fn run(self, diagnostics: DiagnosticNode, accumulate_debug_data: bool) {
270 let diagnostics_ref = &diagnostics;
271
272 let debug_data_directory = match accumulate_debug_data {
273 true => DebugDataDirectory::Accumulating { dir: constants::DEBUG_DATA_FOR_SCP },
274 false => DebugDataDirectory::Isolated { parent: constants::ISOLATED_TMP },
275 };
276 let (debug_data_processor, debug_data_sender) =
277 DebugDataProcessor::new(debug_data_directory);
278
279 let (event_sender, event_receiver) = mpsc::channel(1024);
280
281 let debug_task = fasync::Task::local(
282 debug_data_processor
283 .collect_and_serve_for_suite(event_sender.clone())
284 .unwrap_or_else(|err| warn!(err:?; "Error serving debug data")),
285 );
286
287 let suite_run_fut = async move {
289 diagnostics_ref.set_property(EXECUTION_PROPERTY, "executing");
290
291 let suite_node = diagnostics_ref.child("serial_executor").child("suite-0");
292 suite_node.set_property("url", self.test_url.clone());
293 run_single_suite_for_suite_runner(
294 self,
295 debug_data_sender,
296 suite_node,
297 event_sender,
298 event_receiver,
299 )
300 .await;
301
302 diagnostics_ref.set_property(EXECUTION_PROPERTY, "complete");
303 };
304
305 suite_run_fut
306 .then(|_| async move {
307 debug_task.await;
308 })
309 .await;
310 }
311
312 async fn run_controller(
313 mut controller: SuiteControllerRequestStream,
314 stop_sender: oneshot::Sender<()>,
315 run_suite_remote_handle: futures::future::RemoteHandle<()>,
316 event_recv: mpsc::Receiver<Result<SuiteEvents, LaunchError>>,
317 ) -> Result<(), Error> {
318 let mut task = Some(run_suite_remote_handle);
319 let mut stop_sender = Some(stop_sender);
320 let (events_responder_sender, mut events_responder_recv) = mpsc::unbounded();
321
322 let serve_controller_fut = async move {
323 while let Some(event) = controller.try_next().await? {
324 match event {
325 SuiteControllerRequest::Stop { .. } => {
326 if let Some(stop) = stop_sender.take() {
328 let _ = stop.send(());
329 }
333 }
334 SuiteControllerRequest::Kill { .. } => {
335 drop(task.take());
337 }
341 SuiteControllerRequest::WatchEvents { responder } => {
342 events_responder_sender
343 .unbounded_send(EventResponder::New(responder))
344 .unwrap_or_else(|e| {
345 e.into_inner().drop_without_shutdown();
348 })
349 }
350 SuiteControllerRequest::GetEvents { responder } => {
351 events_responder_sender
352 .unbounded_send(EventResponder::Deprecated(responder))
353 .unwrap_or_else(|e| {
354 e.into_inner().drop_without_shutdown();
357 })
358 }
359 SuiteControllerRequest::_UnknownMethod { ordinal, control_handle, .. } => {
360 warn!(
361 "Unknown suite controller request received: {}, closing connection",
362 ordinal
363 );
364 drop(task.take());
366 control_handle.shutdown_with_epitaph(zx::Status::NOT_SUPPORTED);
367 break;
368 }
369 }
370 }
371 Ok(())
372 };
373
374 let get_events_fut = async move {
375 let mut event_chunks = event_recv.ready_chunks(EVENTS_THRESHOLD);
376 while let Some(responder) = events_responder_recv.next().await {
377 let next_chunk_results: Vec<Result<_, _>> =
378 event_chunks.next().await.unwrap_or_default();
379 if responder.send(next_chunk_results)? {
380 break;
381 }
382 }
383 events_responder_recv.close();
386 events_responder_recv
387 .for_each(|responder| async move {
388 responder.drop_without_shutdown();
389 })
390 .await;
391 Result::<(), Error>::Ok(())
392 };
393
394 match futures::future::select(serve_controller_fut.boxed(), get_events_fut.boxed()).await {
395 Either::Left((serve_result, _fut)) => serve_result,
396 Either::Right((get_events_result, serve_fut)) => {
397 get_events_result?;
398 serve_fut.await
402 }
403 }
404 }
405}
406
407enum EventResponder {
408 Deprecated(ftest_manager::SuiteControllerGetEventsResponder),
409 New(ftest_manager::SuiteControllerWatchEventsResponder),
410}
411
412impl EventResponder {
413 fn drop_without_shutdown(self) {
414 match self {
415 EventResponder::Deprecated(inner) => inner.drop_without_shutdown(),
416 EventResponder::New(inner) => inner.drop_without_shutdown(),
417 }
418 }
419
420 pub fn send(self, results: Vec<Result<SuiteEvents, LaunchError>>) -> Result<bool, fidl::Error> {
421 match self {
422 EventResponder::Deprecated(inner) => {
423 let result: Result<Vec<_>, _> =
424 results.into_iter().map(|r| r.map(SuiteEvents::into)).collect();
425 let done = match &result {
426 Ok(events) => events.is_empty(),
427 Err(_) => true,
428 };
429 inner.send(result).map(|_| done)
430 }
431 EventResponder::New(inner) => {
432 let result: Result<Vec<_>, _> =
433 results.into_iter().map(|r| r.map(SuiteEvents::into)).collect();
434 let done = match &result {
435 Ok(events) => events.is_empty(),
436 Err(_) => true,
437 };
438 inner.send(result).map(|_| done)
439 }
440 }
441 }
442}
443
444async fn run_single_suite_for_suite_runner(
445 suite: Suite,
446 debug_data_sender: DebugDataSender,
447 diagnostics: DiagnosticNode,
448 mut event_sender: mpsc::Sender<Result<SuiteEvents, LaunchError>>,
449 event_recv: mpsc::Receiver<Result<SuiteEvents, LaunchError>>,
450) {
451 let (stop_sender, stop_recv) = oneshot::channel::<()>();
452
453 let Suite {
454 test_url,
455 options,
456 controller,
457 resolver,
458 pkg_resolver,
459 above_root_capabilities_for_test,
460 facets,
461 realm: suite_realm,
462 } = suite;
463
464 let run_test_fut = async {
465 diagnostics.set_property(EXECUTION_PROPERTY, "get_facets");
466
467 let facets = match facets {
468 facet::ResolveStatus::Resolved(result) => {
472 match result {
473 Ok(facets) => facets,
474
475 Err(error) => {
480 event_sender.send(Err(error.into())).await.unwrap();
481 return;
482 }
483 }
484 }
485 facet::ResolveStatus::Unresolved => {
486 match facet::get_suite_facets(test_url.clone(), resolver.clone()).await {
487 Ok(facets) => facets,
488 Err(error) => {
489 event_sender.send(Err(error.into())).await.unwrap();
490 return;
491 }
492 }
493 }
494 };
495 diagnostics.set_property(EXECUTION_PROPERTY, "launch");
496 match running_suite::RunningSuite::launch(
497 &test_url,
498 facets,
499 resolver,
500 pkg_resolver,
501 above_root_capabilities_for_test,
502 debug_data_sender,
503 &diagnostics,
504 &suite_realm,
505 use_debug_agent_for_runs(&options),
506 )
507 .await
508 {
509 Ok(mut instance) => {
510 diagnostics.set_property(EXECUTION_PROPERTY, "run_tests");
511 instance.run_tests(&test_url, options, event_sender, stop_recv).await;
512 diagnostics.set_property(EXECUTION_PROPERTY, "tests_done");
513 diagnostics.set_property(EXECUTION_PROPERTY, "tear_down");
514 if let Err(err) = instance.destroy(diagnostics.child("destroy")).await {
515 error!(
517 diagnostics:?,
518 err:?;
519 "Failed to destroy instance. Debug data may be lost."
520 );
521 }
522 }
523 Err(e) => {
524 event_sender.send(Err(e.into())).await.unwrap();
525 }
526 }
527 };
528 let (run_test_remote, run_test_handle) = run_test_fut.remote_handle();
529
530 let controller_fut =
531 Suite::run_controller(controller, stop_sender, run_test_handle, event_recv);
532 let ((), controller_ret) = futures::future::join(run_test_remote, controller_fut).await;
533
534 if let Err(e) = controller_ret {
535 warn!(diagnostics:?; "Ended test {}: {:?}", test_url, e);
536 }
537
538 diagnostics.set_property(EXECUTION_PROPERTY, "complete");
539 info!(diagnostics:?; "Test destruction complete");
540}
541
542pub(crate) async fn run_single_suite(
543 suite: Suite,
544 debug_data_sender: DebugDataSender,
545 diagnostics: DiagnosticNode,
546) {
547 let (mut sender, recv) = mpsc::channel(1024);
548 let (stop_sender, stop_recv) = oneshot::channel::<()>();
549 let mut maybe_instance = None;
550
551 let Suite {
552 test_url,
553 options,
554 controller,
555 resolver,
556 pkg_resolver,
557 above_root_capabilities_for_test,
558 facets,
559 realm: suite_realm,
560 } = suite;
561
562 let run_test_fut = async {
563 diagnostics.set_property(EXECUTION_PROPERTY, "get_facets");
564
565 let facets = match facets {
566 facet::ResolveStatus::Resolved(result) => {
570 match result {
571 Ok(facets) => facets,
572
573 Err(error) => {
578 sender.send(Err(error.into())).await.unwrap();
579 return;
580 }
581 }
582 }
583 facet::ResolveStatus::Unresolved => {
584 match facet::get_suite_facets(test_url.clone(), resolver.clone()).await {
585 Ok(facets) => facets,
586 Err(error) => {
587 sender.send(Err(error.into())).await.unwrap();
588 return;
589 }
590 }
591 }
592 };
593 diagnostics.set_property(EXECUTION_PROPERTY, "launch");
594 match running_suite::RunningSuite::launch(
595 &test_url,
596 facets,
597 resolver,
598 pkg_resolver,
599 above_root_capabilities_for_test,
600 debug_data_sender,
601 &diagnostics,
602 &suite_realm,
603 use_debug_agent_for_runs(&options),
604 )
605 .await
606 {
607 Ok(instance) => {
608 diagnostics.set_property(EXECUTION_PROPERTY, "run_tests");
609 let instance_ref = maybe_instance.insert(instance);
610 instance_ref.run_tests(&test_url, options, sender, stop_recv).await;
611 diagnostics.set_property(EXECUTION_PROPERTY, "tests_done");
612 }
613 Err(e) => {
614 sender.send(Err(e.into())).await.unwrap();
615 }
616 }
617 };
618 let (run_test_remote, run_test_handle) = run_test_fut.remote_handle();
619
620 let controller_fut = Suite::run_controller(controller, stop_sender, run_test_handle, recv);
621 let ((), controller_ret) = futures::future::join(run_test_remote, controller_fut).await;
622
623 if let Err(e) = controller_ret {
624 warn!(diagnostics:?; "Ended test {}: {:?}", test_url, e);
625 }
626
627 if let Some(instance) = maybe_instance.take() {
628 diagnostics.set_property(EXECUTION_PROPERTY, "tear_down");
629 info!(diagnostics:?; "Test suite has finished, destroying instance...");
630 if let Err(err) = instance.destroy(diagnostics.child("destroy")).await {
631 error!(diagnostics:?, err:?; "Failed to destroy instance. Debug data may be lost.");
633 }
634 }
635 diagnostics.set_property(EXECUTION_PROPERTY, "complete");
636 info!(diagnostics:?; "Test destruction complete");
637}
638
639async fn split_suites_by_hermeticity<F, Fut>(
643 suites: Vec<Suite>,
644 get_facets_fn: F,
645) -> (Vec<Suite>, Vec<Suite>)
646where
647 F: Fn(String, Arc<ResolverProxy>) -> Fut,
648 Fut: futures::future::Future<Output = Result<SuiteFacets, LaunchTestError>>,
649{
650 let mut serial_suites: Vec<Suite> = Vec::new();
651 let mut parallel_suites: Vec<Suite> = Vec::new();
652
653 for mut suite in suites {
654 if suite.realm.is_some() {
655 serial_suites.push(suite);
656 continue;
657 }
658 let test_url = suite.test_url.clone();
659 let resolver = suite.resolver.clone();
660 let facet_result = get_facets_fn(test_url, resolver).await;
661 let can_run_in_parallel = match &facet_result {
662 Ok(facets) => facets.collection == HERMETIC_TESTS_COLLECTION,
663 Err(_) => false,
664 };
665 suite.facets = facet::ResolveStatus::Resolved(facet_result);
666 if can_run_in_parallel {
667 parallel_suites.push(suite);
668 } else {
669 serial_suites.push(suite);
670 }
671 }
672 (serial_suites, parallel_suites)
673}
674
675fn use_debug_agent_for_runs(options: &ftest_manager::RunOptions) -> bool {
677 match options.no_exception_channel {
678 Some(true) => {
679 false
681 }
682 Some(false) | None => {
683 true
685 }
686 }
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692 use fidl::endpoints::create_proxy_and_stream;
693 use {fidl_fuchsia_component_resolution as fresolution, fuchsia_async as fasync};
694
695 fn new_run_inspect_node() -> DiagnosticNode {
696 DiagnosticNode::new("root", Arc::new(fuchsia_inspect::types::Node::default()))
697 }
698
699 #[fuchsia::test]
700 async fn run_controller_stop_test() {
701 let (sender, recv) = mpsc::channel(1024);
702 let (stop_sender, stop_recv) = oneshot::channel::<()>();
703 let (task, remote_handle) = async move {
704 stop_recv.await.unwrap();
705 drop(sender);
707 }
708 .remote_handle();
709 let _task = fasync::Task::spawn(task);
710 let (proxy, controller) = create_proxy_and_stream::<ftest_manager::RunControllerMarker>();
711 let run_controller = fasync::Task::spawn(async move {
712 TestRunBuilder::run_controller(
713 controller,
714 remote_handle,
715 stop_sender,
716 recv,
717 new_run_inspect_node(),
718 )
719 .await
720 });
721 let get_events_task = fasync::Task::spawn(proxy.get_events());
723 proxy.stop().unwrap();
724 assert_eq!(get_events_task.await.unwrap(), vec![]);
725
726 drop(proxy);
727 run_controller.await;
728 }
729
730 #[fuchsia::test]
731 async fn run_controller_abort_when_channel_closed() {
732 let (_sender, recv) = mpsc::channel(1024);
733 let (stop_sender, _stop_recv) = oneshot::channel::<()>();
734 let (task, remote_handle) = futures::future::pending().remote_handle();
736 let pending_task = fasync::Task::spawn(task);
737 let (proxy, controller) = create_proxy_and_stream::<ftest_manager::RunControllerMarker>();
738 let run_controller = fasync::Task::spawn(async move {
739 TestRunBuilder::run_controller(
740 controller,
741 remote_handle,
742 stop_sender,
743 recv,
744 new_run_inspect_node(),
745 )
746 .await
747 });
748 let get_events_task = fasync::Task::spawn(proxy.get_events());
750 drop(proxy);
751 drop(get_events_task);
752 pending_task.await;
755 run_controller.await;
756 }
757
758 #[fuchsia::test]
759 async fn suite_controller_stop_test() {
760 let (sender, recv) = mpsc::channel(1024);
761 let (stop_sender, stop_recv) = oneshot::channel::<()>();
762 let (task, remote_handle) = async move {
763 stop_recv.await.unwrap();
764 drop(sender);
766 }
767 .remote_handle();
768 let _task = fasync::Task::spawn(task);
769 let (proxy, controller) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
770 let run_controller = fasync::Task::spawn(async move {
771 Suite::run_controller(controller, stop_sender, remote_handle, recv).await
772 });
773 let get_events_task = fasync::Task::spawn(proxy.get_events());
775 proxy.stop().unwrap();
776
777 assert_eq!(get_events_task.await.unwrap(), Ok(vec![]));
778 drop(proxy);
780 run_controller.await.unwrap();
781 }
782
783 #[fuchsia::test]
784 async fn suite_controller_abort_remote_when_controller_closed() {
785 let (_sender, recv) = mpsc::channel(1024);
786 let (stop_sender, _stop_recv) = oneshot::channel::<()>();
787 let (task, remote_handle) = futures::future::pending().remote_handle();
789 let pending_task = fasync::Task::spawn(task);
790 let (proxy, controller) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
791 let run_controller = fasync::Task::spawn(async move {
792 Suite::run_controller(controller, stop_sender, remote_handle, recv).await
793 });
794 let get_events_task = fasync::Task::spawn(proxy.get_events());
796 drop(proxy);
797 drop(get_events_task);
798 pending_task.await;
801 run_controller.await.unwrap();
802 }
803
804 #[fuchsia::test]
805 async fn suite_controller_get_events() {
806 let (mut sender, recv) = mpsc::channel(1024);
807 let (stop_sender, stop_recv) = oneshot::channel::<()>();
808 let (task, remote_handle) = async {}.remote_handle();
809 let _task = fasync::Task::spawn(task);
810 let (proxy, controller) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
811 let run_controller = fasync::Task::spawn(async move {
812 Suite::run_controller(controller, stop_sender, remote_handle, recv).await
813 });
814 sender.send(Ok(SuiteEvents::case_found(1, "case1".to_string()).into())).await.unwrap();
815 sender.send(Ok(SuiteEvents::case_found(2, "case2".to_string()).into())).await.unwrap();
816
817 let events = proxy.get_events().await.unwrap().unwrap();
818 assert_eq!(events.len(), 2);
819 assert_eq!(
820 events[0].payload,
821 SuiteEvents::case_found(1, "case1".to_string()).into_suite_run_event().payload,
822 );
823 assert_eq!(
824 events[1].payload,
825 SuiteEvents::case_found(2, "case2".to_string()).into_suite_run_event().payload,
826 );
827 sender.send(Ok(SuiteEvents::case_started(2).into())).await.unwrap();
828 proxy.stop().unwrap();
829
830 sender.send(Ok(SuiteEvents::case_started(1).into())).await.unwrap();
832 sender.send(Ok(SuiteEvents::case_found(3, "case3".to_string()).into())).await.unwrap();
833
834 stop_recv.await.unwrap();
835 drop(sender);
837 let events = proxy.get_events().await.unwrap().unwrap();
838 assert_eq!(events.len(), 3);
839
840 assert_eq!(events[0].payload, SuiteEvents::case_started(2).into_suite_run_event().payload,);
841 assert_eq!(events[1].payload, SuiteEvents::case_started(1).into_suite_run_event().payload,);
842 assert_eq!(
843 events[2].payload,
844 SuiteEvents::case_found(3, "case3".to_string()).into_suite_run_event().payload,
845 );
846
847 let events = proxy.get_events().await.unwrap().unwrap();
848 assert_eq!(events, vec![]);
849 drop(proxy);
851 run_controller.await.unwrap();
852 }
853
854 async fn create_fake_suite(test_url: String) -> Suite {
855 let (_controller_proxy, controller_stream) =
856 create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
857 let (resolver_proxy, _resolver_stream) =
858 create_proxy_and_stream::<fresolution::ResolverMarker>();
859 let resolver_proxy = Arc::new(resolver_proxy);
860 let (pkg_resolver_proxy, _pkg_resolver_stream) =
861 create_proxy_and_stream::<fidl_fuchsia_pkg::PackageResolverMarker>();
862 let pkg_resolver_proxy = Arc::new(pkg_resolver_proxy);
863 let routing_info = Arc::new(AboveRootCapabilitiesForTest::new_empty_for_tests());
864 Suite {
865 realm: None,
866 test_url,
867 options: ftest_manager::RunOptions {
868 parallel: None,
869 arguments: None,
870 run_disabled_tests: Some(false),
871 timeout: None,
872 case_filters_to_run: None,
873 log_iterator: None,
874 ..Default::default()
875 },
876 controller: controller_stream,
877 resolver: resolver_proxy,
878 pkg_resolver: pkg_resolver_proxy,
879 above_root_capabilities_for_test: routing_info,
880 facets: facet::ResolveStatus::Unresolved,
881 }
882 }
883
884 #[fuchsia::test]
885 async fn split_suites_by_hermeticity_test() {
886 let hermetic_suite = create_fake_suite("hermetic_suite".to_string()).await;
887 let non_hermetic_suite = create_fake_suite("non_hermetic_suite".to_string()).await;
888 let suites = vec![hermetic_suite, non_hermetic_suite];
889
890 let get_facets_fn = |test_url, _resolver| async move {
892 if test_url == "hermetic_suite".to_string() {
893 Ok(SuiteFacets {
894 collection: HERMETIC_TESTS_COLLECTION,
895 deprecated_allowed_packages: None,
896 })
897 } else {
898 Ok(SuiteFacets {
899 collection: crate::constants::SYSTEM_TESTS_COLLECTION,
900 deprecated_allowed_packages: None,
901 })
902 }
903 };
904 let (serial_suites, parallel_suites) =
905 split_suites_by_hermeticity(suites, get_facets_fn).await;
906
907 assert_eq!(parallel_suites[0].test_url, "hermetic_suite".to_string());
908 assert_eq!(serial_suites[0].test_url, "non_hermetic_suite".to_string());
909 }
910
911 #[test]
912 fn suite_controller_hanging_get_events() {
913 let mut executor = fasync::TestExecutor::new();
914 let (mut sender, recv) = mpsc::channel(1024);
915 let (stop_sender, _stop_recv) = oneshot::channel::<()>();
916 let (task, remote_handle) = async {}.remote_handle();
917 let _task = fasync::Task::spawn(task);
918 let (proxy, controller) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
919 let _run_controller = fasync::Task::spawn(async move {
920 Suite::run_controller(controller, stop_sender, remote_handle, recv).await
921 });
922
923 let mut get_events =
925 fasync::Task::spawn(async move { proxy.get_events().await.unwrap().unwrap() });
926 assert_eq!(executor.run_until_stalled(&mut get_events), std::task::Poll::Pending);
927 executor.run_singlethreaded(async {
928 sender.send(Ok(SuiteEvents::case_found(1, "case1".to_string()).into())).await.unwrap();
929 sender.send(Ok(SuiteEvents::case_found(2, "case2".to_string()).into())).await.unwrap();
930 });
931 let events = executor.run_singlethreaded(get_events);
932 assert_eq!(events.len(), 2);
933 assert_eq!(
934 events[0].payload,
935 SuiteEvents::case_found(1, "case1".to_string()).into_suite_run_event().payload,
936 );
937 assert_eq!(
938 events[1].payload,
939 SuiteEvents::case_found(2, "case2".to_string()).into_suite_run_event().payload,
940 );
941 }
942}