1use crate::debug_data_server;
6use crate::run_events::{RunEvent, SuiteEvents};
7use anyhow::Error;
8use fidl::endpoints::create_endpoints;
9use fidl_fuchsia_test_manager::LaunchError;
10use fuchsia_component::client::connect_to_protocol;
11use fuchsia_component::server::ServiceFs;
12use fuchsia_component_test::LocalComponentHandles;
13use fuchsia_fs::directory::open_channel_in_namespace;
14use fuchsia_fs::{PERM_READABLE, PERM_WRITABLE};
15use futures::channel::mpsc;
16use futures::future::FutureExt;
17use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
18use futures::{pin_mut, select_biased, SinkExt};
19use log::info;
20use {
21 fidl_fuchsia_debugdata as fdebug, fidl_fuchsia_io as fio,
22 fidl_fuchsia_test_debug as ftest_debug, fuchsia_async as fasync,
23};
24
25pub(crate) struct DebugDataProcessor {
28 directory: DebugDataDirectory,
29 receiver: mpsc::Receiver<ftest_debug::DebugVmo>,
30 proxy_init_fn: Box<dyn FnOnce() -> Result<ftest_debug::DebugDataProcessorProxy, Error>>,
31}
32
33#[derive(Clone)]
35pub(crate) struct DebugDataSender {
36 sender: mpsc::Sender<ftest_debug::DebugVmo>,
37}
38
39#[derive(Debug)]
41pub enum DebugDataDirectory {
42 Isolated { parent: &'static str },
45 Accumulating { dir: &'static str },
48}
49
50impl DebugDataProcessor {
51 const MAX_SENT_VMOS: usize = 10;
52 pub fn new(directory: DebugDataDirectory) -> (Self, DebugDataSender) {
55 let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS);
56 (
57 Self {
58 directory,
59 receiver,
60 proxy_init_fn: Box::new(|| {
61 connect_to_protocol::<ftest_debug::DebugDataProcessorMarker>()
62 .map_err(Error::from)
63 }),
64 },
65 DebugDataSender { sender },
66 )
67 }
68
69 #[cfg(test)]
73 pub(crate) fn new_for_test(directory: DebugDataDirectory) -> DebugDataForTestResult {
74 let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS);
75 let (proxy, stream) =
76 fidl::endpoints::create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>();
77 let maybe_proxy = std::sync::Mutex::new(Some(proxy));
78 DebugDataForTestResult {
79 processor: Self {
80 directory,
81 receiver,
82 proxy_init_fn: Box::new(move || Ok(maybe_proxy.lock().unwrap().take().unwrap())),
83 },
84 sender: DebugDataSender { sender },
85 stream: stream,
86 }
87 }
88
89 pub async fn collect_and_serve(
92 self,
93 run_event_sender: mpsc::Sender<RunEvent>,
94 ) -> Result<(), Error> {
95 let Self { directory, receiver, proxy_init_fn } = self;
96
97 let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable();
99 pin_mut!(peekable_reciever);
100 if peekable_reciever.as_mut().peek().await.is_none() {
101 return Ok(());
102 }
103
104 enum MaybeOwnedDirectory {
105 Owned(tempfile::TempDir),
106 Unowned(&'static str),
107 }
108 let debug_directory = match directory {
109 DebugDataDirectory::Isolated { parent } => {
110 MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?)
111 }
112 DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir),
113 };
114 let debug_directory_path = match &debug_directory {
115 MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(),
116 MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir),
117 };
118
119 let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>();
120 open_channel_in_namespace(
121 &debug_directory_path,
122 PERM_READABLE | PERM_WRITABLE,
123 server_end,
124 )?;
125
126 let proxy = proxy_init_fn()?;
127 proxy.set_directory(directory_proxy)?;
128 while let Some(chunk) = peekable_reciever.next().await {
129 proxy.add_debug_vmos(chunk).await?;
130 }
131 proxy.finish().await?;
132
133 debug_data_server::serve_directory(&debug_directory_path, run_event_sender).await?;
134
135 if let MaybeOwnedDirectory::Owned(tmp) = debug_directory {
136 tmp.close()?;
137 }
138 Ok(())
139 }
140
141 pub async fn collect_and_serve_for_suite(
144 self,
145 suite_event_sender: mpsc::Sender<Result<SuiteEvents, LaunchError>>,
146 ) -> Result<(), Error> {
147 let Self { directory, receiver, proxy_init_fn } = self;
148
149 let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable();
151 pin_mut!(peekable_reciever);
152 if peekable_reciever.as_mut().peek().await.is_none() {
153 return Ok(());
154 }
155
156 enum MaybeOwnedDirectory {
157 Owned(tempfile::TempDir),
158 Unowned(&'static str),
159 }
160 let debug_directory = match directory {
161 DebugDataDirectory::Isolated { parent } => {
162 MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?)
163 }
164 DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir),
165 };
166 let debug_directory_path = match &debug_directory {
167 MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(),
168 MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir),
169 };
170
171 let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>();
172 open_channel_in_namespace(
173 &debug_directory_path,
174 PERM_READABLE | PERM_WRITABLE,
175 server_end,
176 )?;
177
178 let proxy = proxy_init_fn()?;
179 proxy.set_directory(directory_proxy)?;
180 while let Some(chunk) = peekable_reciever.next().await {
181 proxy.add_debug_vmos(chunk).await?;
182 }
183 proxy.finish().await?;
184
185 debug_data_server::serve_directory_for_suite(&debug_directory_path, suite_event_sender)
186 .await?;
187
188 if let MaybeOwnedDirectory::Owned(tmp) = debug_directory {
189 tmp.close()?;
190 }
191 Ok(())
192 }
193}
194
195#[cfg(test)]
196pub(crate) struct DebugDataForTestResult {
197 pub processor: DebugDataProcessor,
198 pub sender: DebugDataSender,
199 pub stream: ftest_debug::DebugDataProcessorRequestStream,
200}
201
202pub(crate) async fn serve_debug_data_publisher(
209 handles: LocalComponentHandles,
210 test_url: String,
211 debug_data_sender: DebugDataSender,
212 started_event: async_utils::event::Event,
213) -> Result<(), Error> {
214 let mut fs = ServiceFs::new();
215 let stop_recv = handles.register_stop_notifier().await;
218 started_event.signal();
219
220 fs.dir("svc").add_fidl_service(|stream: fdebug::PublisherRequestStream| stream);
221 fs.serve_connection(handles.outgoing_dir)?;
222
223 let mut drain_tasks = FuturesUnordered::new();
224 drain_tasks.push(fasync::Task::spawn(async move {
225 let _ = stop_recv.await;
226 Ok(())
227 }));
228
229 let mut got_requests = false;
230
231 loop {
232 select_biased! {
233 maybe_stream = fs.next().fuse() => match maybe_stream {
234 None => {
235 if !got_requests {
236 info!("Got no debug data requests for {}", test_url);
237 }
238 return drain_tasks.try_collect::<()>().await;
239 },
240 Some(stream) => {
241 got_requests = true;
242 let sender_clone = debug_data_sender.clone();
243 let url_clone = test_url.clone();
244 drain_tasks.push(fasync::Task::spawn(async move {
245 serve_publisher(stream, &url_clone, sender_clone).await?;
246 Ok(())
247 }));
248 }
249 },
250 maybe_result = drain_tasks.next() => match maybe_result {
254 Some(result) => {
255 result?;
256 },
257 None => {
258 if !got_requests {
259 info!("Got no debug data requests for {}", test_url);
260 }
261 return Ok(());
262 }
263 },
264 };
265 }
266}
267
268async fn serve_publisher(
269 stream: fdebug::PublisherRequestStream,
270 test_url: &str,
271 debug_data_sender: DebugDataSender,
272) -> Result<(), Error> {
273 stream
274 .map(Ok)
275 .try_for_each_concurrent(None, |req| {
276 let test_url = test_url.to_string();
277 let mut sender_clone = debug_data_sender.clone();
278 async move {
279 let fdebug::PublisherRequest::Publish { data_sink, data, vmo_token, .. } = req?;
280 fasync::OnSignals::new(&vmo_token, zx::Signals::EVENTPAIR_PEER_CLOSED).await?;
284 let _ = sender_clone
285 .sender
286 .send(ftest_debug::DebugVmo { test_url, data_sink, vmo: data })
287 .await;
288 Ok(())
289 }
290 })
291 .await
292}
293
294#[cfg(test)]
295mod test {
296 use super::*;
297 use crate::run_events::{RunEventPayload, SuiteEventPayload};
298 use crate::utilities::stream_fn;
299 use fidl::endpoints::create_proxy_and_stream;
300 use fuchsia_component_test::{
301 Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
302 };
303 use fuchsia_fs::Flags;
304 use futures::TryFutureExt;
305 use maplit::hashset;
306 use std::collections::HashSet;
307 use std::task::Poll;
308 use test_manager_test_lib::collect_string_from_socket_helper;
309
310 const VMO_SIZE: u64 = 4096;
311
312 async fn run_test_processor(
319 mut stream: ftest_debug::DebugDataProcessorRequestStream,
320 mut debug_vmo_recevied_sender: mpsc::Sender<()>,
321 ) {
322 let req = stream.try_next().await.expect("get first request").unwrap();
323 let dir = match req {
324 ftest_debug::DebugDataProcessorRequest::SetDirectory { directory, .. } => {
325 directory.into_proxy()
326 }
327 other => panic!("First request should be SetDirectory but got {:?}", other),
328 };
329
330 let mut collected_vmos = vec![];
331 let mut finish_responder = None;
332 while let Some(req) = stream.try_next().await.expect("get request") {
333 match req {
334 ftest_debug::DebugDataProcessorRequest::SetDirectory { .. } => {
335 panic!("Set directory called twice")
336 }
337 ftest_debug::DebugDataProcessorRequest::AddDebugVmos {
338 mut vmos,
339 responder,
340 ..
341 } => {
342 let num_vmos = vmos.len();
343 collected_vmos.append(&mut vmos);
344 let _ = responder.send();
345 for _ in 0..num_vmos {
346 let _ = debug_vmo_recevied_sender.send(()).await;
347 }
348 }
349 ftest_debug::DebugDataProcessorRequest::Finish { responder, .. } => {
350 finish_responder = Some(responder);
351 break;
352 }
353 }
354 }
355
356 for ftest_debug::DebugVmo { data_sink, test_url, .. } in collected_vmos {
357 let file = fuchsia_fs::directory::open_file_async(
358 &dir,
359 &data_sink,
360 Flags::FLAG_MAYBE_CREATE | PERM_WRITABLE,
361 )
362 .expect("open file");
363 fuchsia_fs::file::write(&file, &test_url).await.expect("write file");
364 }
365 finish_responder.unwrap().send().unwrap();
366 }
367
368 async fn construct_test_realm(
369 sender: DebugDataSender,
370 test_url: &'static str,
371 ) -> Result<RealmInstance, Error> {
372 let builder = RealmBuilder::new().await?;
373
374 let processor = builder
375 .add_local_child(
376 "processor",
377 move |handles| {
378 Box::pin(serve_debug_data_publisher(
379 handles,
380 test_url.to_string(),
381 sender.clone(),
382 async_utils::event::Event::new(),
383 ))
384 },
385 ChildOptions::new().eager(),
386 )
387 .await?;
388 builder
389 .add_route(
390 Route::new()
391 .capability(Capability::protocol::<fdebug::PublisherMarker>())
392 .from(&processor)
393 .to(Ref::parent()),
394 )
395 .await?;
396
397 let instance = builder.build().await?;
398 Ok(instance)
399 }
400
401 fn isolated_dir() -> DebugDataDirectory {
402 DebugDataDirectory::Isolated { parent: "/tmp" }
403 }
404
405 #[fuchsia::test]
406 async fn serve_no_requests() {
407 const TEST_URL: &str = "test-url";
408 let DebugDataForTestResult { processor, sender, stream } =
409 DebugDataProcessor::new_for_test(isolated_dir());
410 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
411 test_realm.destroy().await.expect("destroy test realm");
412
413 let (event_sender, event_recv) = mpsc::channel(1);
414 processor.collect_and_serve(event_sender).await.unwrap();
415
416 assert!(stream.collect::<Vec<_>>().await.is_empty());
417 assert!(event_recv.collect::<Vec<_>>().await.is_empty());
418 }
419
420 #[fuchsia::test]
421 async fn serve_for_suite_no_requests() {
422 const TEST_URL: &str = "test-url";
423 let DebugDataForTestResult { processor, sender, stream } =
424 DebugDataProcessor::new_for_test(isolated_dir());
425 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
426 test_realm.destroy().await.expect("destroy test realm");
427
428 let (event_sender, event_recv) = mpsc::channel(1);
429 processor.collect_and_serve_for_suite(event_sender).await.unwrap();
430
431 assert!(stream.collect::<Vec<_>>().await.is_empty());
432 assert!(event_recv.collect::<Vec<_>>().await.is_empty());
433 }
434
435 #[fuchsia::test]
436 async fn serve_single_client() {
437 const TEST_URL: &str = "test-url";
438 let DebugDataForTestResult { processor, sender, stream } =
439 DebugDataProcessor::new_for_test(isolated_dir());
440 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
441
442 let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
443 let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
445 let test_fut = async move {
447 let proxy = test_realm
448 .root
449 .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
450 .expect("connect to publisher");
451 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
452 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
453 proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
454 drop(vmo_token_1);
455 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
456 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
457 proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
458 drop(vmo_token_2);
459 drop(proxy);
460
461 vmo_request_received_recv.take(1).collect::<()>().await;
462 test_realm.destroy().await.expect("destroy test realm");
463 };
464
465 let (event_sender, event_recv) = mpsc::channel(10);
466 let processor_fut = processor
469 .collect_and_serve(event_sender)
470 .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
471 let assertion_fut = async move {
473 let mut events: Vec<_> = event_recv.collect().await;
474 assert_eq!(events.len(), 1);
475 let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload();
476 let iterator_proxy = iterator.into_proxy();
477 let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
478 .and_then(|debug_data| async move {
479 Ok((
480 debug_data.name.unwrap(),
481 collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
482 .await
483 .expect("Cannot read socket"),
484 ))
485 })
486 .try_collect()
487 .await
488 .expect("file collection");
489 let expected = hashset! {
490 ("data-sink-1".to_string(), TEST_URL.to_string()),
491 ("data-sink-2".to_string(), TEST_URL.to_string()),
492 };
493 assert_eq!(files, expected);
494 };
495
496 futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
497 }
498
499 #[fuchsia::test]
500 async fn serve_for_suite_single_client() {
501 const TEST_URL: &str = "test-url";
502 let DebugDataForTestResult { processor, sender, stream } =
503 DebugDataProcessor::new_for_test(isolated_dir());
504 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
505
506 let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
507 let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
509 let test_fut = async move {
511 let proxy = test_realm
512 .root
513 .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
514 .expect("connect to publisher");
515 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
516 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
517 proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
518 drop(vmo_token_1);
519 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
520 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
521 proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
522 drop(vmo_token_2);
523 drop(proxy);
524
525 vmo_request_received_recv.take(1).collect::<()>().await;
526 test_realm.destroy().await.expect("destroy test realm");
527 };
528
529 let (event_sender, event_recv) = mpsc::channel(10);
530 let processor_fut = processor
533 .collect_and_serve_for_suite(event_sender)
534 .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
535 let assertion_fut = async move {
537 let mut events: Vec<_> = event_recv.collect().await;
538 assert_eq!(events.len(), 1);
539 if let SuiteEventPayload::DebugData(iterator) =
540 events.pop().unwrap().unwrap().into_payload()
541 {
542 let iterator_proxy = iterator.into_proxy();
543 let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
544 .and_then(|debug_data| async move {
545 Ok((
546 debug_data.name.unwrap(),
547 collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
548 .await
549 .expect("Cannot read socket"),
550 ))
551 })
552 .try_collect()
553 .await
554 .expect("file collection");
555 let expected = hashset! {
556 ("data-sink-1".to_string(), TEST_URL.to_string()),
557 ("data-sink-2".to_string(), TEST_URL.to_string()),
558 };
559 assert_eq!(files, expected);
560 } else {
561 assert!(false); }
563 };
564
565 futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
566 }
567
568 #[fuchsia::test]
569 async fn serve_multiple_client() {
570 const TEST_URL: &str = "test-url";
571 let DebugDataForTestResult { processor, sender, stream } =
572 DebugDataProcessor::new_for_test(isolated_dir());
573 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
574
575 let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
576 let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
578 let test_fut = async move {
580 let proxy_1 = test_realm
581 .root
582 .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
583 .expect("connect to publisher");
584 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
585 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
586 proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
587 drop(vmo_token_1);
588 let proxy_2 = test_realm
589 .root
590 .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
591 .expect("connect to publisher");
592 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
593 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
594 proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
595 drop(vmo_token_2);
596 drop(proxy_1);
597 drop(proxy_2);
598
599 vmo_request_received_recv.take(2).collect::<()>().await;
600 test_realm.destroy().await.expect("destroy test realm");
601 };
602
603 let (event_sender, event_recv) = mpsc::channel(10);
604 let processor_fut = processor
607 .collect_and_serve(event_sender)
608 .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
609 let assertion_fut = async move {
611 let mut events: Vec<_> = event_recv.collect().await;
612 assert_eq!(events.len(), 1);
613 let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload();
614 let iterator_proxy = iterator.into_proxy();
615 let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
616 .and_then(|debug_data| async move {
617 Ok((
618 debug_data.name.unwrap(),
619 collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
620 .await
621 .expect("read socket"),
622 ))
623 })
624 .try_collect()
625 .await
626 .expect("file collection");
627 let expected = hashset! {
628 ("data-sink-1".to_string(), TEST_URL.to_string()),
629 ("data-sink-2".to_string(), TEST_URL.to_string()),
630 };
631 assert_eq!(files, expected);
632 };
633
634 futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
635 }
636
637 #[fuchsia::test]
638 async fn serve_for_suite_multiple_client() {
639 const TEST_URL: &str = "test-url";
640 let DebugDataForTestResult { processor, sender, stream } =
641 DebugDataProcessor::new_for_test(isolated_dir());
642 let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
643
644 let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
645 let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
647 let test_fut = async move {
649 let proxy_1 = test_realm
650 .root
651 .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
652 .expect("connect to publisher");
653 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
654 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
655 proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
656 drop(vmo_token_1);
657 let proxy_2 = test_realm
658 .root
659 .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
660 .expect("connect to publisher");
661 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
662 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
663 proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
664 drop(vmo_token_2);
665 drop(proxy_1);
666 drop(proxy_2);
667
668 vmo_request_received_recv.take(2).collect::<()>().await;
669 test_realm.destroy().await.expect("destroy test realm");
670 };
671
672 let (event_sender, event_recv) = mpsc::channel(10);
673 let processor_fut = processor
676 .collect_and_serve_for_suite(event_sender)
677 .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
678 let assertion_fut = async move {
680 let mut events: Vec<_> = event_recv.collect().await;
681 assert_eq!(events.len(), 1);
682 if let SuiteEventPayload::DebugData(iterator) =
683 events.pop().unwrap().unwrap().into_payload()
684 {
685 let iterator_proxy = iterator.into_proxy();
686 let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
687 .and_then(|debug_data| async move {
688 Ok((
689 debug_data.name.unwrap(),
690 collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
691 .await
692 .expect("read socket"),
693 ))
694 })
695 .try_collect()
696 .await
697 .expect("file collection");
698 let expected = hashset! {
699 ("data-sink-1".to_string(), TEST_URL.to_string()),
700 ("data-sink-2".to_string(), TEST_URL.to_string()),
701 };
702 assert_eq!(files, expected);
703 } else {
704 assert!(false); }
706 };
707
708 futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
709 }
710
711 #[fuchsia::test]
712 fn single_publisher_connection_send_vmo_when_ready() {
713 const TEST_URL: &str = "test-url";
714 let mut executor = fasync::TestExecutor::new();
715
716 let (vmo_send, vmo_recv) = mpsc::channel(5);
717 let mut vmo_chunk_stream = vmo_recv.ready_chunks(5).boxed();
718 let (publisher_proxy, publisher_stream) =
719 create_proxy_and_stream::<fdebug::PublisherMarker>();
720 let mut serve_fut =
721 serve_publisher(publisher_stream, TEST_URL, DebugDataSender { sender: vmo_send })
722 .boxed();
723
724 let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
725 let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
726 let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
727 let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
728
729 publisher_proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
730 publisher_proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
731 drop(vmo_token_1);
732
733 assert!(executor.run_until_stalled(&mut serve_fut).is_pending());
737 let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed())
738 {
739 Poll::Pending => panic!("vmos should be ready"),
740 Poll::Ready(Some(vmos)) => vmos,
741 Poll::Ready(None) => panic!("stream closed prematurely"),
742 };
743 assert_eq!(ready_vmos.len(), 1);
744 let ready_vmo = ready_vmos.pop().unwrap();
745 assert_eq!(ready_vmo.test_url.as_str(), TEST_URL);
746 assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-1");
747
748 drop(vmo_token_2);
750 drop(publisher_proxy);
751 match executor.run_until_stalled(&mut serve_fut) {
752 futures::task::Poll::Ready(Ok(())) => (),
753 other => panic!("Expected poll to be ready but was {:?}", other),
754 }
755
756 let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed())
757 {
758 Poll::Pending => panic!("vmos should be ready"),
759 Poll::Ready(Some(vmos)) => vmos,
760 Poll::Ready(None) => panic!("stream closed prematurely"),
761 };
762 assert_eq!(ready_vmos.len(), 1);
763 let ready_vmo = ready_vmos.pop().unwrap();
764 assert_eq!(ready_vmo.test_url.as_str(), TEST_URL);
765 assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-2");
766
767 match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed()) {
768 Poll::Pending => panic!("vmos should be ready"),
769 Poll::Ready(None) => (),
770 Poll::Ready(Some(vmos)) => panic!("Expected stream to terminate but got {:?}", vmos),
771 };
772 }
773}