1use crate::debug_data_server;
6use crate::run_events::{RunEvent, SuiteEvents};
7use anyhow::Error;
8use fidl::endpoints::create_endpoints;
9use fidl_fuchsia_test_manager::LaunchError;
10use fuchsia_component::client::connect_to_protocol;
11use fuchsia_component::server::ServiceFs;
12use fuchsia_component_test::LocalComponentHandles;
13use fuchsia_fs::directory::open_channel_in_namespace;
14use fuchsia_fs::{PERM_READABLE, PERM_WRITABLE};
15use futures::channel::mpsc;
16use futures::future::FutureExt;
17use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
18use futures::{pin_mut, select_biased, SinkExt};
19use log::info;
20use {
21 fidl_fuchsia_debugdata as fdebug, fidl_fuchsia_io as fio,
22 fidl_fuchsia_test_debug as ftest_debug, fuchsia_async as fasync,
23};
24
25pub(crate) struct DebugDataProcessor {
28 directory: DebugDataDirectory,
29 receiver: mpsc::Receiver<ftest_debug::DebugVmo>,
30 proxy_init_fn: Box<dyn FnOnce() -> Result<ftest_debug::DebugDataProcessorProxy, Error>>,
31}
32
33#[derive(Clone)]
35pub(crate) struct DebugDataSender {
36 sender: mpsc::Sender<ftest_debug::DebugVmo>,
37}
38
39#[derive(Debug)]
41pub enum DebugDataDirectory {
42 Isolated { parent: &'static str },
45 Accumulating { dir: &'static str },
48}
49
50impl DebugDataProcessor {
51 const MAX_SENT_VMOS: usize = 10;
52 pub fn new(directory: DebugDataDirectory) -> (Self, DebugDataSender) {
55 let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS);
56 (
57 Self {
58 directory,
59 receiver,
60 proxy_init_fn: Box::new(|| {
61 connect_to_protocol::<ftest_debug::DebugDataProcessorMarker>()
62 .map_err(Error::from)
63 }),
64 },
65 DebugDataSender { sender },
66 )
67 }
68
69 #[cfg(test)]
73 pub(crate) fn new_for_test(directory: DebugDataDirectory) -> DebugDataForTestResult {
74 let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS);
75 let (proxy, stream) =
76 fidl::endpoints::create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>();
77 let maybe_proxy = std::sync::Mutex::new(Some(proxy));
78 DebugDataForTestResult {
79 processor: Self {
80 directory,
81 receiver,
82 proxy_init_fn: Box::new(move || Ok(maybe_proxy.lock().unwrap().take().unwrap())),
83 },
84 sender: DebugDataSender { sender },
85 stream: stream,
86 }
87 }
88
89 pub async fn collect_and_serve(
92 self,
93 run_event_sender: mpsc::Sender<RunEvent>,
94 ) -> Result<(), Error> {
95 let Self { directory, receiver, proxy_init_fn } = self;
96
97 let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable();
99 pin_mut!(peekable_reciever);
100 if peekable_reciever.as_mut().peek().await.is_none() {
101 return Ok(());
102 }
103
104 enum MaybeOwnedDirectory {
105 Owned(tempfile::TempDir),
106 Unowned(&'static str),
107 }
108 let debug_directory = match directory {
109 DebugDataDirectory::Isolated { parent } => {
110 MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?)
111 }
112 DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir),
113 };
114 let debug_directory_path = match &debug_directory {
115 MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(),
116 MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir),
117 };
118
119 let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>();
120 open_channel_in_namespace(
121 &debug_directory_path,
122 PERM_READABLE | PERM_WRITABLE,
123 server_end,
124 )?;
125
126 let proxy = proxy_init_fn()?;
127 proxy.set_directory(directory_proxy)?;
128 while let Some(chunk) = peekable_reciever.next().await {
129 proxy.add_debug_vmos(chunk).await?;
130 }
131 proxy.finish().await?;
132
133 debug_data_server::serve_directory(&debug_directory_path, run_event_sender).await?;
134
135 if let MaybeOwnedDirectory::Owned(tmp) = debug_directory {
136 tmp.close()?;
137 }
138 Ok(())
139 }
140
141 pub async fn collect_and_serve_for_suite(
144 self,
145 suite_event_sender: mpsc::Sender<Result<SuiteEvents, LaunchError>>,
146 ) -> Result<(), Error> {
147 let Self { directory, receiver, proxy_init_fn } = self;
148
149 let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable();
151 pin_mut!(peekable_reciever);
152 if peekable_reciever.as_mut().peek().await.is_none() {
153 return Ok(());
154 }
155
156 enum MaybeOwnedDirectory {
157 Owned(tempfile::TempDir),
158 Unowned(&'static str),
159 }
160 let debug_directory = match directory {
161 DebugDataDirectory::Isolated { parent } => {
162 MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?)
163 }
164 DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir),
165 };
166 let debug_directory_path = match &debug_directory {
167 MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(),
168 MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir),
169 };
170
171 let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>();
172 open_channel_in_namespace(
173 &debug_directory_path,
174 PERM_READABLE | PERM_WRITABLE,
175 server_end,
176 )?;
177
178 let proxy = proxy_init_fn()?;
179 proxy.set_directory(directory_proxy)?;
180 while let Some(chunk) = peekable_reciever.next().await {
181 proxy.add_debug_vmos(chunk).await?;
182 }
183 proxy.finish().await?;
184
185 debug_data_server::serve_directory_for_suite(&debug_directory_path, suite_event_sender)
186 .await?;
187
188 if let MaybeOwnedDirectory::Owned(tmp) = debug_directory {
189 tmp.close()?;
190 }
191 Ok(())
192 }
193}
194
195#[cfg(test)]
196pub(crate) struct DebugDataForTestResult {
197 pub processor: DebugDataProcessor,
198 pub sender: DebugDataSender,
199 pub stream: ftest_debug::DebugDataProcessorRequestStream,
200}
201
202pub(crate) async fn serve_debug_data_publisher(
209 handles: LocalComponentHandles,
210 test_url: String,
211 debug_data_sender: DebugDataSender,
212 started_event: async_utils::event::Event,
213) -> Result<(), Error> {
214 let mut fs = ServiceFs::new();
215 let stop_recv = handles.register_stop_notifier().await;
218 started_event.signal();
219
220 fs.dir("svc").add_fidl_service(|stream: fdebug::PublisherRequestStream| stream);
221 fs.serve_connection(handles.outgoing_dir)?;
222
223 let mut drain_tasks = FuturesUnordered::new();
224 drain_tasks.push(fasync::Task::spawn(async move {
225 let _ = stop_recv.await;
226 Ok(())
227 }));
228
229 let mut got_requests = false;
230
231 loop {
232 select_biased! {
233 maybe_stream = fs.next().fuse() => match maybe_stream {
234 None => {
235 if !got_requests {
236 info!("Got no debug data requests for {}", test_url);
237 }
238 return drain_tasks.try_collect::<()>().await;
239 },
240 Some(stream) => {
241 got_requests = true;
242 let sender_clone = debug_data_sender.clone();
243 let url_clone = test_url.clone();
244 drain_tasks.push(fasync::Task::spawn(async move {
245 serve_publisher(stream, &url_clone, sender_clone).await?;
246 Ok(())
247 }));
248 }
249 },
250 maybe_result = drain_tasks.next() => match maybe_result {
254 Some(result) => {
255 result?;
256 },
257 None => {
258 if !got_requests {
259 info!("Got no debug data requests for {}", test_url);
260 }
261 return Ok(());
262 }
263 },
264 };
265 }
266}
267
268async fn serve_publisher(
269 stream: fdebug::PublisherRequestStream,
270 test_url: &str,
271 debug_data_sender: DebugDataSender,
272) -> Result<(), Error> {
273 stream
274 .map(Ok)
275 .try_for_each_concurrent(None, |req| {
276 let test_url = test_url.to_string();
277 let mut sender_clone = debug_data_sender.clone();
278 async move {
279 let fdebug::PublisherRequest::Publish { data_sink, data, vmo_token, .. } = req?;
280 fasync::OnSignals::new(&vmo_token, zx::Signals::EVENTPAIR_PEER_CLOSED).await?;
284 let _ = sender_clone
285 .sender
286 .send(ftest_debug::DebugVmo { test_url, data_sink, vmo: data })
287 .await;
288 Ok(())
289 }
290 })
291 .await
292}
293
294#[cfg(test)]
295mod test {
296 use super::*;
297 use crate::run_events::{RunEventPayload, SuiteEventPayload};
298 use crate::utilities::stream_fn;
299 use fidl::endpoints::create_proxy_and_stream;
300 use fuchsia_component_test::{
301 Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
302 };
303 use fuchsia_fs::Flags;
304 use futures::TryFutureExt;
305 use maplit::hashset;
306 use std::collections::HashSet;
307 use std::task::Poll;
308 use test_manager_test_lib::collect_string_from_socket_helper;
309
310 const VMO_SIZE: u64 = 4096;
311
312 async fn run_test_processor(
319 mut stream: ftest_debug::DebugDataProcessorRequestStream,
320 mut debug_vmo_recevied_sender: mpsc::Sender<()>,
321 ) {
322 let req = stream.try_next().await.expect("get first request").unwrap();
323 let dir = match req {
324 ftest_debug::DebugDataProcessorRequest::SetDirectory { directory, .. } => {
325 directory.into_proxy()
326 }
327 other => panic!("First request should be SetDirectory but got {:?}", other),
328 };
329
330 let mut collected_vmos = vec![];
331 let mut finish_responder = None;
332 while let Some(req) = stream.try_next().await.expect("get request") {
333 match req {
334 ftest_debug::DebugDataProcessorRequest::SetDirectory { .. } => {
335 panic!("Set directory called twice")
336 }
337 ftest_debug::DebugDataProcessorRequest::AddDebugVmos {
338 mut vmos,
339 responder,
340 ..
341 } => {
342 let num_vmos = vmos.len();
343 collected_vmos.append(&mut vmos);
344 let _ = responder.send();
345 for _ in 0..num_vmos {
346 let _ = debug_vmo_recevied_sender.send(()).await;
347 }
348 }
349 ftest_debug::DebugDataProcessorRequest::Finish { responder, .. } => {
350 finish_responder = Some(responder);
351 break;
352 }
353 }
354 }
355
356 for ftest_debug::DebugVmo { data_sink, test_url, .. } in collected_vmos {
357 let file = fuchsia_fs::directory::open_file_async(
358 &dir,
359 &data_sink,
360 Flags::FLAG_MAYBE_CREATE | PERM_WRITABLE,
361 )
362 .expect("open file");
363 fuchsia_fs::file::write(&file, &test_url).await.expect("write file");
364 }
365 finish_responder.unwrap().send().unwrap();
366 }
367
368 async fn construct_test_realm(
369 sender: DebugDataSender,
370 test_url: &'static str,
371 ) -> Result<RealmInstance, Error> {
372 let builder = RealmBuilder::new().await?;
373
374 let processor = builder
375 .add_local_child(
376 "processor",
377 move |handles| {
378 Box::pin(serve_debug_data_publisher(
379 handles,
380 test_url.to_string(),
381 sender.clone(),
382 async_utils::event::Event::new(),
383 ))
384 },
385 ChildOptions::new().eager(),
386 )
387 .await?;
388 builder
389 .add_route(
390 Route::new()
391 .capability(Capability::protocol::<fdebug::PublisherMarker>())
392 .from(&processor)
393 .to(Ref::parent()),
394 )
395 .await?;
396
397 let instance = builder.build().await?;
398 Ok(instance)
399 }
400
401 fn isolated_dir() -> DebugDataDirectory {
402 DebugDataDirectory::Isolated { parent: "/tmp" }
403 }
404
405 #[fuchsia::test]
406 async fn serve_no_requests() {
407 const TEST_URL: &str = "test-url";
408 let DebugDataForTestResult { processor, sender, stream } =
409 DebugDataProcessor::new_for_test(isolated_dir());
410 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
411 test_realm.destroy().await.expect("destroy test realm");
412
413 let (event_sender, event_recv) = mpsc::channel(1);
414 processor.collect_and_serve(event_sender).await.unwrap();
415
416 assert!(stream.collect::<Vec<_>>().await.is_empty());
417 assert!(event_recv.collect::<Vec<_>>().await.is_empty());
418 }
419
420 #[fuchsia::test]
421 async fn serve_for_suite_no_requests() {
422 const TEST_URL: &str = "test-url";
423 let DebugDataForTestResult { processor, sender, stream } =
424 DebugDataProcessor::new_for_test(isolated_dir());
425 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
426 test_realm.destroy().await.expect("destroy test realm");
427
428 let (event_sender, event_recv) = mpsc::channel(1);
429 processor.collect_and_serve_for_suite(event_sender).await.unwrap();
430
431 assert!(stream.collect::<Vec<_>>().await.is_empty());
432 assert!(event_recv.collect::<Vec<_>>().await.is_empty());
433 }
434
435 #[fuchsia::test]
436 async fn serve_single_client() {
437 const TEST_URL: &str = "test-url";
438 let DebugDataForTestResult { processor, sender, stream } =
439 DebugDataProcessor::new_for_test(isolated_dir());
440 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
441
442 let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
443 let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
445 let test_fut = async move {
447 let proxy: fdebug::PublisherProxy =
448 test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
449 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
450 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
451 proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
452 drop(vmo_token_1);
453 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
454 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
455 proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
456 drop(vmo_token_2);
457 drop(proxy);
458
459 vmo_request_received_recv.take(1).collect::<()>().await;
460 test_realm.destroy().await.expect("destroy test realm");
461 };
462
463 let (event_sender, event_recv) = mpsc::channel(10);
464 let processor_fut = processor
467 .collect_and_serve(event_sender)
468 .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
469 let assertion_fut = async move {
471 let mut events: Vec<_> = event_recv.collect().await;
472 assert_eq!(events.len(), 1);
473 let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload();
474 let iterator_proxy = iterator.into_proxy();
475 let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
476 .and_then(|debug_data| async move {
477 Ok((
478 debug_data.name.unwrap(),
479 collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
480 .await
481 .expect("Cannot read socket"),
482 ))
483 })
484 .try_collect()
485 .await
486 .expect("file collection");
487 let expected = hashset! {
488 ("data-sink-1".to_string(), TEST_URL.to_string()),
489 ("data-sink-2".to_string(), TEST_URL.to_string()),
490 };
491 assert_eq!(files, expected);
492 };
493
494 futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
495 }
496
497 #[fuchsia::test]
498 async fn serve_for_suite_single_client() {
499 const TEST_URL: &str = "test-url";
500 let DebugDataForTestResult { processor, sender, stream } =
501 DebugDataProcessor::new_for_test(isolated_dir());
502 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
503
504 let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
505 let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
507 let test_fut = async move {
509 let proxy: fdebug::PublisherProxy =
510 test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
511 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
512 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
513 proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
514 drop(vmo_token_1);
515 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
516 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
517 proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
518 drop(vmo_token_2);
519 drop(proxy);
520
521 vmo_request_received_recv.take(1).collect::<()>().await;
522 test_realm.destroy().await.expect("destroy test realm");
523 };
524
525 let (event_sender, event_recv) = mpsc::channel(10);
526 let processor_fut = processor
529 .collect_and_serve_for_suite(event_sender)
530 .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
531 let assertion_fut = async move {
533 let mut events: Vec<_> = event_recv.collect().await;
534 assert_eq!(events.len(), 1);
535 if let SuiteEventPayload::DebugData(iterator) =
536 events.pop().unwrap().unwrap().into_payload()
537 {
538 let iterator_proxy = iterator.into_proxy();
539 let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
540 .and_then(|debug_data| async move {
541 Ok((
542 debug_data.name.unwrap(),
543 collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
544 .await
545 .expect("Cannot read socket"),
546 ))
547 })
548 .try_collect()
549 .await
550 .expect("file collection");
551 let expected = hashset! {
552 ("data-sink-1".to_string(), TEST_URL.to_string()),
553 ("data-sink-2".to_string(), TEST_URL.to_string()),
554 };
555 assert_eq!(files, expected);
556 } else {
557 assert!(false); }
559 };
560
561 futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
562 }
563
564 #[fuchsia::test]
565 async fn serve_multiple_client() {
566 const TEST_URL: &str = "test-url";
567 let DebugDataForTestResult { processor, sender, stream } =
568 DebugDataProcessor::new_for_test(isolated_dir());
569 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
570
571 let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
572 let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
574 let test_fut = async move {
576 let proxy_1: fdebug::PublisherProxy =
577 test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
578 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
579 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
580 proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
581 drop(vmo_token_1);
582 let proxy_2: fdebug::PublisherProxy =
583 test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
584 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
585 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
586 proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
587 drop(vmo_token_2);
588 drop(proxy_1);
589 drop(proxy_2);
590
591 vmo_request_received_recv.take(2).collect::<()>().await;
592 test_realm.destroy().await.expect("destroy test realm");
593 };
594
595 let (event_sender, event_recv) = mpsc::channel(10);
596 let processor_fut = processor
599 .collect_and_serve(event_sender)
600 .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
601 let assertion_fut = async move {
603 let mut events: Vec<_> = event_recv.collect().await;
604 assert_eq!(events.len(), 1);
605 let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload();
606 let iterator_proxy = iterator.into_proxy();
607 let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
608 .and_then(|debug_data| async move {
609 Ok((
610 debug_data.name.unwrap(),
611 collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
612 .await
613 .expect("read socket"),
614 ))
615 })
616 .try_collect()
617 .await
618 .expect("file collection");
619 let expected = hashset! {
620 ("data-sink-1".to_string(), TEST_URL.to_string()),
621 ("data-sink-2".to_string(), TEST_URL.to_string()),
622 };
623 assert_eq!(files, expected);
624 };
625
626 futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
627 }
628
629 #[fuchsia::test]
630 async fn serve_for_suite_multiple_client() {
631 const TEST_URL: &str = "test-url";
632 let DebugDataForTestResult { processor, sender, stream } =
633 DebugDataProcessor::new_for_test(isolated_dir());
634 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
635
636 let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
637 let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
639 let test_fut = async move {
641 let proxy_1: fdebug::PublisherProxy =
642 test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
643 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
644 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
645 proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
646 drop(vmo_token_1);
647 let proxy_2: fdebug::PublisherProxy =
648 test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
649 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
650 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
651 proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
652 drop(vmo_token_2);
653 drop(proxy_1);
654 drop(proxy_2);
655
656 vmo_request_received_recv.take(2).collect::<()>().await;
657 test_realm.destroy().await.expect("destroy test realm");
658 };
659
660 let (event_sender, event_recv) = mpsc::channel(10);
661 let processor_fut = processor
664 .collect_and_serve_for_suite(event_sender)
665 .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
666 let assertion_fut = async move {
668 let mut events: Vec<_> = event_recv.collect().await;
669 assert_eq!(events.len(), 1);
670 if let SuiteEventPayload::DebugData(iterator) =
671 events.pop().unwrap().unwrap().into_payload()
672 {
673 let iterator_proxy = iterator.into_proxy();
674 let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
675 .and_then(|debug_data| async move {
676 Ok((
677 debug_data.name.unwrap(),
678 collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
679 .await
680 .expect("read socket"),
681 ))
682 })
683 .try_collect()
684 .await
685 .expect("file collection");
686 let expected = hashset! {
687 ("data-sink-1".to_string(), TEST_URL.to_string()),
688 ("data-sink-2".to_string(), TEST_URL.to_string()),
689 };
690 assert_eq!(files, expected);
691 } else {
692 assert!(false); }
694 };
695
696 futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
697 }
698
699 #[fuchsia::test]
700 fn single_publisher_connection_send_vmo_when_ready() {
701 const TEST_URL: &str = "test-url";
702 let mut executor = fasync::TestExecutor::new();
703
704 let (vmo_send, vmo_recv) = mpsc::channel(5);
705 let mut vmo_chunk_stream = vmo_recv.ready_chunks(5).boxed();
706 let (publisher_proxy, publisher_stream) =
707 create_proxy_and_stream::<fdebug::PublisherMarker>();
708 let mut serve_fut =
709 serve_publisher(publisher_stream, TEST_URL, DebugDataSender { sender: vmo_send })
710 .boxed();
711
712 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
713 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
714 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
715 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
716
717 publisher_proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
718 publisher_proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
719 drop(vmo_token_1);
720
721 assert!(executor.run_until_stalled(&mut serve_fut).is_pending());
725 let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed())
726 {
727 Poll::Pending => panic!("vmos should be ready"),
728 Poll::Ready(Some(vmos)) => vmos,
729 Poll::Ready(None) => panic!("stream closed prematurely"),
730 };
731 assert_eq!(ready_vmos.len(), 1);
732 let ready_vmo = ready_vmos.pop().unwrap();
733 assert_eq!(ready_vmo.test_url.as_str(), TEST_URL);
734 assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-1");
735
736 drop(vmo_token_2);
738 drop(publisher_proxy);
739 match executor.run_until_stalled(&mut serve_fut) {
740 futures::task::Poll::Ready(Ok(())) => (),
741 other => panic!("Expected poll to be ready but was {:?}", other),
742 }
743
744 let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed())
745 {
746 Poll::Pending => panic!("vmos should be ready"),
747 Poll::Ready(Some(vmos)) => vmos,
748 Poll::Ready(None) => panic!("stream closed prematurely"),
749 };
750 assert_eq!(ready_vmos.len(), 1);
751 let ready_vmo = ready_vmos.pop().unwrap();
752 assert_eq!(ready_vmo.test_url.as_str(), TEST_URL);
753 assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-2");
754
755 match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed()) {
756 Poll::Pending => panic!("vmos should be ready"),
757 Poll::Ready(None) => (),
758 Poll::Ready(Some(vmos)) => panic!("Expected stream to terminate but got {:?}", vmos),
759 };
760 }
761}