test_manager_lib/
debug_data_processor.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::debug_data_server;
6use crate::run_events::{RunEvent, SuiteEvents};
7use anyhow::Error;
8use fidl::endpoints::create_endpoints;
9use fidl_fuchsia_test_manager::LaunchError;
10use fuchsia_component::client::connect_to_protocol;
11use fuchsia_component::server::ServiceFs;
12use fuchsia_component_test::LocalComponentHandles;
13use fuchsia_fs::directory::open_channel_in_namespace;
14use fuchsia_fs::{PERM_READABLE, PERM_WRITABLE};
15use futures::channel::mpsc;
16use futures::future::FutureExt;
17use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
18use futures::{pin_mut, select_biased, SinkExt};
19use log::info;
20use {
21    fidl_fuchsia_debugdata as fdebug, fidl_fuchsia_io as fio,
22    fidl_fuchsia_test_debug as ftest_debug, fuchsia_async as fasync,
23};
24
25/// Processor that collects debug data and serves the iterator sending data back to a test
26/// executor.
27pub(crate) struct DebugDataProcessor {
28    directory: DebugDataDirectory,
29    receiver: mpsc::Receiver<ftest_debug::DebugVmo>,
30    proxy_init_fn: Box<dyn FnOnce() -> Result<ftest_debug::DebugDataProcessorProxy, Error>>,
31}
32
33/// Sender used to pass VMOs back to |DebugDataProcessor|.
34#[derive(Clone)]
35pub(crate) struct DebugDataSender {
36    sender: mpsc::Sender<ftest_debug::DebugVmo>,
37}
38
39/// Directory used to store collected debug data.
40#[derive(Debug)]
41pub enum DebugDataDirectory {
42    /// An isolated directory is owned purely by the |DebugDataProcessor| it is given to, and will
43    /// be torn down when the |DebugDataProcessor| is terminated.
44    Isolated { parent: &'static str },
45    /// An accumulated directory may be shared between multiple |DebugDataProcessor|s. Contents
46    /// will not be torn down.
47    Accumulating { dir: &'static str },
48}
49
50impl DebugDataProcessor {
51    const MAX_SENT_VMOS: usize = 10;
52    /// Create a new |DebugDataProcessor| for processing VMOs, and |DebugDataSender| for passing
53    /// it VMOs.
54    pub fn new(directory: DebugDataDirectory) -> (Self, DebugDataSender) {
55        let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS);
56        (
57            Self {
58                directory,
59                receiver,
60                proxy_init_fn: Box::new(|| {
61                    connect_to_protocol::<ftest_debug::DebugDataProcessorMarker>()
62                        .map_err(Error::from)
63                }),
64            },
65            DebugDataSender { sender },
66        )
67    }
68
69    /// Create a new |DebugDataProcessor| for processing VMOs, |DebugDataSender| for passing
70    /// it VMOs, and the |fuchsia.test.debug.DebugDataProcessor| stream to which the processor
71    /// will connect.
72    #[cfg(test)]
73    pub(crate) fn new_for_test(directory: DebugDataDirectory) -> DebugDataForTestResult {
74        let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS);
75        let (proxy, stream) =
76            fidl::endpoints::create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>();
77        let maybe_proxy = std::sync::Mutex::new(Some(proxy));
78        DebugDataForTestResult {
79            processor: Self {
80                directory,
81                receiver,
82                proxy_init_fn: Box::new(move || Ok(maybe_proxy.lock().unwrap().take().unwrap())),
83            },
84            sender: DebugDataSender { sender },
85            stream: stream,
86        }
87    }
88
89    /// Collect debug data produced by the corresponding |DebugDataSender|, and serve the resulting
90    /// data. In case debug data is produced, sends the event over |run_event_sender|.
91    pub async fn collect_and_serve(
92        self,
93        run_event_sender: mpsc::Sender<RunEvent>,
94    ) -> Result<(), Error> {
95        let Self { directory, receiver, proxy_init_fn } = self;
96
97        // Avoid setting up resources in the common case where no debug data is produced.
98        let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable();
99        pin_mut!(peekable_reciever);
100        if peekable_reciever.as_mut().peek().await.is_none() {
101            return Ok(());
102        }
103
104        enum MaybeOwnedDirectory {
105            Owned(tempfile::TempDir),
106            Unowned(&'static str),
107        }
108        let debug_directory = match directory {
109            DebugDataDirectory::Isolated { parent } => {
110                MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?)
111            }
112            DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir),
113        };
114        let debug_directory_path = match &debug_directory {
115            MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(),
116            MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir),
117        };
118
119        let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>();
120        open_channel_in_namespace(
121            &debug_directory_path,
122            PERM_READABLE | PERM_WRITABLE,
123            server_end,
124        )?;
125
126        let proxy = proxy_init_fn()?;
127        proxy.set_directory(directory_proxy)?;
128        while let Some(chunk) = peekable_reciever.next().await {
129            proxy.add_debug_vmos(chunk).await?;
130        }
131        proxy.finish().await?;
132
133        debug_data_server::serve_directory(&debug_directory_path, run_event_sender).await?;
134
135        if let MaybeOwnedDirectory::Owned(tmp) = debug_directory {
136            tmp.close()?;
137        }
138        Ok(())
139    }
140
141    /// Collect debug data produced by the corresponding |DebugDataSender|, and serve the resulting
142    /// data. In case debug data is produced, sends the event over |suite_event_sender|.
143    pub async fn collect_and_serve_for_suite(
144        self,
145        suite_event_sender: mpsc::Sender<Result<SuiteEvents, LaunchError>>,
146    ) -> Result<(), Error> {
147        let Self { directory, receiver, proxy_init_fn } = self;
148
149        // Avoid setting up resources in the common case where no debug data is produced.
150        let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable();
151        pin_mut!(peekable_reciever);
152        if peekable_reciever.as_mut().peek().await.is_none() {
153            return Ok(());
154        }
155
156        enum MaybeOwnedDirectory {
157            Owned(tempfile::TempDir),
158            Unowned(&'static str),
159        }
160        let debug_directory = match directory {
161            DebugDataDirectory::Isolated { parent } => {
162                MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?)
163            }
164            DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir),
165        };
166        let debug_directory_path = match &debug_directory {
167            MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(),
168            MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir),
169        };
170
171        let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>();
172        open_channel_in_namespace(
173            &debug_directory_path,
174            PERM_READABLE | PERM_WRITABLE,
175            server_end,
176        )?;
177
178        let proxy = proxy_init_fn()?;
179        proxy.set_directory(directory_proxy)?;
180        while let Some(chunk) = peekable_reciever.next().await {
181            proxy.add_debug_vmos(chunk).await?;
182        }
183        proxy.finish().await?;
184
185        debug_data_server::serve_directory_for_suite(&debug_directory_path, suite_event_sender)
186            .await?;
187
188        if let MaybeOwnedDirectory::Owned(tmp) = debug_directory {
189            tmp.close()?;
190        }
191        Ok(())
192    }
193}
194
195#[cfg(test)]
196pub(crate) struct DebugDataForTestResult {
197    pub processor: DebugDataProcessor,
198    pub sender: DebugDataSender,
199    pub stream: ftest_debug::DebugDataProcessorRequestStream,
200}
201
202/// Serve |fuchsia.debugdata.Publisher| as a RealmBuilder mock. Collected VMOs are sent over
203/// |debug_data_sender| for processing. |started_event| is signalled once the mock is ready
204/// to serve requests.
205// TODO(https://fxbug.dev/42056523): |started_event| is added as part of a synchronization mechanism to
206// work around cases when a component is destroyed before starting, even though there is a
207// request. Remove when no longer needed.
208pub(crate) async fn serve_debug_data_publisher(
209    handles: LocalComponentHandles,
210    test_url: String,
211    debug_data_sender: DebugDataSender,
212    started_event: async_utils::event::Event,
213) -> Result<(), Error> {
214    let mut fs = ServiceFs::new();
215    // Register a notifier so that this mock isn't immediately killed - it needs to drain
216    // debug data.
217    let stop_recv = handles.register_stop_notifier().await;
218    started_event.signal();
219
220    fs.dir("svc").add_fidl_service(|stream: fdebug::PublisherRequestStream| stream);
221    fs.serve_connection(handles.outgoing_dir)?;
222
223    let mut drain_tasks = FuturesUnordered::new();
224    drain_tasks.push(fasync::Task::spawn(async move {
225        let _ = stop_recv.await;
226        Ok(())
227    }));
228
229    let mut got_requests = false;
230
231    loop {
232        select_biased! {
233            maybe_stream = fs.next().fuse() => match maybe_stream {
234                None => {
235                    if !got_requests {
236                        info!("Got no debug data requests for {}", test_url);
237                    }
238                    return drain_tasks.try_collect::<()>().await;
239                },
240                Some(stream) => {
241                    got_requests = true;
242                    let sender_clone = debug_data_sender.clone();
243                    let url_clone = test_url.clone();
244                    drain_tasks.push(fasync::Task::spawn(async move {
245                        serve_publisher(stream, &url_clone, sender_clone).await?;
246                        Ok(())
247                    }));
248                }
249            },
250            // Poll for completion of both stop_recv and any futures serving the publisher
251            // together. This allows us to accept any new serve requests even if stop is
252            // called, so long as at least one other request is still being served.
253            maybe_result = drain_tasks.next() => match maybe_result {
254                Some(result) => {
255                    result?;
256                },
257                None => {
258                    if !got_requests {
259                        info!("Got no debug data requests for {}", test_url);
260                    }
261                    return Ok(());
262                }
263            },
264        };
265    }
266}
267
268async fn serve_publisher(
269    stream: fdebug::PublisherRequestStream,
270    test_url: &str,
271    debug_data_sender: DebugDataSender,
272) -> Result<(), Error> {
273    stream
274        .map(Ok)
275        .try_for_each_concurrent(None, |req| {
276            let test_url = test_url.to_string();
277            let mut sender_clone = debug_data_sender.clone();
278            async move {
279                let fdebug::PublisherRequest::Publish { data_sink, data, vmo_token, .. } = req?;
280                // Wait for the token handle to close before sending the VMO for processing.
281                // This allows the client to continue modifying the VMO after it has sent it.
282                // See |fuchsia.debugdata.Publisher| protocol for details.
283                fasync::OnSignals::new(&vmo_token, zx::Signals::EVENTPAIR_PEER_CLOSED).await?;
284                let _ = sender_clone
285                    .sender
286                    .send(ftest_debug::DebugVmo { test_url, data_sink, vmo: data })
287                    .await;
288                Ok(())
289            }
290        })
291        .await
292}
293
294#[cfg(test)]
295mod test {
296    use super::*;
297    use crate::run_events::{RunEventPayload, SuiteEventPayload};
298    use crate::utilities::stream_fn;
299    use fidl::endpoints::create_proxy_and_stream;
300    use fuchsia_component_test::{
301        Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
302    };
303    use fuchsia_fs::Flags;
304    use futures::TryFutureExt;
305    use maplit::hashset;
306    use std::collections::HashSet;
307    use std::task::Poll;
308    use test_manager_test_lib::collect_string_from_socket_helper;
309
310    const VMO_SIZE: u64 = 4096;
311
312    /// Runs a fake test processor implementation that, for each VMO received, creates
313    /// a new file called "data_sink" and writes the test_url inside it.
314    /// |debug_vmo_recevied_sender| is a synchronization hack that sends one message for
315    /// each vmo received. It is a workaround for the
316    /// Started/Destroyed/CapabilityRequested events being delivered out of order.
317    /// See https://fxbug.dev/42156498.
318    async fn run_test_processor(
319        mut stream: ftest_debug::DebugDataProcessorRequestStream,
320        mut debug_vmo_recevied_sender: mpsc::Sender<()>,
321    ) {
322        let req = stream.try_next().await.expect("get first request").unwrap();
323        let dir = match req {
324            ftest_debug::DebugDataProcessorRequest::SetDirectory { directory, .. } => {
325                directory.into_proxy()
326            }
327            other => panic!("First request should be SetDirectory but got {:?}", other),
328        };
329
330        let mut collected_vmos = vec![];
331        let mut finish_responder = None;
332        while let Some(req) = stream.try_next().await.expect("get request") {
333            match req {
334                ftest_debug::DebugDataProcessorRequest::SetDirectory { .. } => {
335                    panic!("Set directory called twice")
336                }
337                ftest_debug::DebugDataProcessorRequest::AddDebugVmos {
338                    mut vmos,
339                    responder,
340                    ..
341                } => {
342                    let num_vmos = vmos.len();
343                    collected_vmos.append(&mut vmos);
344                    let _ = responder.send();
345                    for _ in 0..num_vmos {
346                        let _ = debug_vmo_recevied_sender.send(()).await;
347                    }
348                }
349                ftest_debug::DebugDataProcessorRequest::Finish { responder, .. } => {
350                    finish_responder = Some(responder);
351                    break;
352                }
353            }
354        }
355
356        for ftest_debug::DebugVmo { data_sink, test_url, .. } in collected_vmos {
357            let file = fuchsia_fs::directory::open_file_async(
358                &dir,
359                &data_sink,
360                Flags::FLAG_MAYBE_CREATE | PERM_WRITABLE,
361            )
362            .expect("open file");
363            fuchsia_fs::file::write(&file, &test_url).await.expect("write file");
364        }
365        finish_responder.unwrap().send().unwrap();
366    }
367
368    async fn construct_test_realm(
369        sender: DebugDataSender,
370        test_url: &'static str,
371    ) -> Result<RealmInstance, Error> {
372        let builder = RealmBuilder::new().await?;
373
374        let processor = builder
375            .add_local_child(
376                "processor",
377                move |handles| {
378                    Box::pin(serve_debug_data_publisher(
379                        handles,
380                        test_url.to_string(),
381                        sender.clone(),
382                        async_utils::event::Event::new(),
383                    ))
384                },
385                ChildOptions::new().eager(),
386            )
387            .await?;
388        builder
389            .add_route(
390                Route::new()
391                    .capability(Capability::protocol::<fdebug::PublisherMarker>())
392                    .from(&processor)
393                    .to(Ref::parent()),
394            )
395            .await?;
396
397        let instance = builder.build().await?;
398        Ok(instance)
399    }
400
401    fn isolated_dir() -> DebugDataDirectory {
402        DebugDataDirectory::Isolated { parent: "/tmp" }
403    }
404
405    #[fuchsia::test]
406    async fn serve_no_requests() {
407        const TEST_URL: &str = "test-url";
408        let DebugDataForTestResult { processor, sender, stream } =
409            DebugDataProcessor::new_for_test(isolated_dir());
410        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
411        test_realm.destroy().await.expect("destroy test realm");
412
413        let (event_sender, event_recv) = mpsc::channel(1);
414        processor.collect_and_serve(event_sender).await.unwrap();
415
416        assert!(stream.collect::<Vec<_>>().await.is_empty());
417        assert!(event_recv.collect::<Vec<_>>().await.is_empty());
418    }
419
420    #[fuchsia::test]
421    async fn serve_for_suite_no_requests() {
422        const TEST_URL: &str = "test-url";
423        let DebugDataForTestResult { processor, sender, stream } =
424            DebugDataProcessor::new_for_test(isolated_dir());
425        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
426        test_realm.destroy().await.expect("destroy test realm");
427
428        let (event_sender, event_recv) = mpsc::channel(1);
429        processor.collect_and_serve_for_suite(event_sender).await.unwrap();
430
431        assert!(stream.collect::<Vec<_>>().await.is_empty());
432        assert!(event_recv.collect::<Vec<_>>().await.is_empty());
433    }
434
435    #[fuchsia::test]
436    async fn serve_single_client() {
437        const TEST_URL: &str = "test-url";
438        let DebugDataForTestResult { processor, sender, stream } =
439            DebugDataProcessor::new_for_test(isolated_dir());
440        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
441
442        let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
443        // Future running fuchsia.test.debug.DebugDataProcessor.
444        let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
445        // Future running the 'test' (client of fuchsia.debugdata.Publisher)
446        let test_fut = async move {
447            let proxy = test_realm
448                .root
449                .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
450                .expect("connect to publisher");
451            let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
452            let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
453            proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
454            drop(vmo_token_1);
455            let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
456            let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
457            proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
458            drop(vmo_token_2);
459            drop(proxy);
460
461            vmo_request_received_recv.take(1).collect::<()>().await;
462            test_realm.destroy().await.expect("destroy test realm");
463        };
464
465        let (event_sender, event_recv) = mpsc::channel(10);
466        // Future that collects VMOs from the test realm and forwards
467        // them to fuchsia.debugdata.Publisher
468        let processor_fut = processor
469            .collect_and_serve(event_sender)
470            .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
471        // Future that collects produced debug artifact and asserts on contents.
472        let assertion_fut = async move {
473            let mut events: Vec<_> = event_recv.collect().await;
474            assert_eq!(events.len(), 1);
475            let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload();
476            let iterator_proxy = iterator.into_proxy();
477            let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
478                .and_then(|debug_data| async move {
479                    Ok((
480                        debug_data.name.unwrap(),
481                        collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
482                            .await
483                            .expect("Cannot read socket"),
484                    ))
485                })
486                .try_collect()
487                .await
488                .expect("file collection");
489            let expected = hashset! {
490                ("data-sink-1".to_string(), TEST_URL.to_string()),
491                ("data-sink-2".to_string(), TEST_URL.to_string()),
492            };
493            assert_eq!(files, expected);
494        };
495
496        futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
497    }
498
499    #[fuchsia::test]
500    async fn serve_for_suite_single_client() {
501        const TEST_URL: &str = "test-url";
502        let DebugDataForTestResult { processor, sender, stream } =
503            DebugDataProcessor::new_for_test(isolated_dir());
504        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
505
506        let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
507        // Future running fuchsia.test.debug.DebugDataProcessor.
508        let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
509        // Future running the 'test' (client of fuchsia.debugdata.Publisher)
510        let test_fut = async move {
511            let proxy = test_realm
512                .root
513                .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
514                .expect("connect to publisher");
515            let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
516            let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
517            proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
518            drop(vmo_token_1);
519            let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
520            let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
521            proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
522            drop(vmo_token_2);
523            drop(proxy);
524
525            vmo_request_received_recv.take(1).collect::<()>().await;
526            test_realm.destroy().await.expect("destroy test realm");
527        };
528
529        let (event_sender, event_recv) = mpsc::channel(10);
530        // Future that collects VMOs from the test realm and forwards
531        // them to fuchsia.debugdata.Publisher
532        let processor_fut = processor
533            .collect_and_serve_for_suite(event_sender)
534            .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
535        // Future that collects produced debug artifact and asserts on contents.
536        let assertion_fut = async move {
537            let mut events: Vec<_> = event_recv.collect().await;
538            assert_eq!(events.len(), 1);
539            if let SuiteEventPayload::DebugData(iterator) =
540                events.pop().unwrap().unwrap().into_payload()
541            {
542                let iterator_proxy = iterator.into_proxy();
543                let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
544                    .and_then(|debug_data| async move {
545                        Ok((
546                            debug_data.name.unwrap(),
547                            collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
548                                .await
549                                .expect("Cannot read socket"),
550                        ))
551                    })
552                    .try_collect()
553                    .await
554                    .expect("file collection");
555                let expected = hashset! {
556                    ("data-sink-1".to_string(), TEST_URL.to_string()),
557                    ("data-sink-2".to_string(), TEST_URL.to_string()),
558                };
559                assert_eq!(files, expected);
560            } else {
561                assert!(false); // Event payload was not DebugData
562            }
563        };
564
565        futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
566    }
567
568    #[fuchsia::test]
569    async fn serve_multiple_client() {
570        const TEST_URL: &str = "test-url";
571        let DebugDataForTestResult { processor, sender, stream } =
572            DebugDataProcessor::new_for_test(isolated_dir());
573        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
574
575        let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
576        // Future running fuchsia.test.debug.DebugDataProcessor.
577        let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
578        // Future running the 'test' (client of fuchsia.debugdata.Publisher)
579        let test_fut = async move {
580            let proxy_1 = test_realm
581                .root
582                .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
583                .expect("connect to publisher");
584            let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
585            let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
586            proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
587            drop(vmo_token_1);
588            let proxy_2 = test_realm
589                .root
590                .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
591                .expect("connect to publisher");
592            let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
593            let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
594            proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
595            drop(vmo_token_2);
596            drop(proxy_1);
597            drop(proxy_2);
598
599            vmo_request_received_recv.take(2).collect::<()>().await;
600            test_realm.destroy().await.expect("destroy test realm");
601        };
602
603        let (event_sender, event_recv) = mpsc::channel(10);
604        // Future that collects VMOs from the test realm and forwards
605        // them to fuchsia.debugdata.Publisher
606        let processor_fut = processor
607            .collect_and_serve(event_sender)
608            .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
609        // Future that collects produced debug artifact and asserts on contents.
610        let assertion_fut = async move {
611            let mut events: Vec<_> = event_recv.collect().await;
612            assert_eq!(events.len(), 1);
613            let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload();
614            let iterator_proxy = iterator.into_proxy();
615            let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
616                .and_then(|debug_data| async move {
617                    Ok((
618                        debug_data.name.unwrap(),
619                        collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
620                            .await
621                            .expect("read socket"),
622                    ))
623                })
624                .try_collect()
625                .await
626                .expect("file collection");
627            let expected = hashset! {
628                ("data-sink-1".to_string(), TEST_URL.to_string()),
629                ("data-sink-2".to_string(), TEST_URL.to_string()),
630            };
631            assert_eq!(files, expected);
632        };
633
634        futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
635    }
636
637    #[fuchsia::test]
638    async fn serve_for_suite_multiple_client() {
639        const TEST_URL: &str = "test-url";
640        let DebugDataForTestResult { processor, sender, stream } =
641            DebugDataProcessor::new_for_test(isolated_dir());
642        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
643
644        let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
645        // Future running fuchsia.test.debug.DebugDataProcessor.
646        let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
647        // Future running the 'test' (client of fuchsia.debugdata.Publisher)
648        let test_fut = async move {
649            let proxy_1 = test_realm
650                .root
651                .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
652                .expect("connect to publisher");
653            let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
654            let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
655            proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
656            drop(vmo_token_1);
657            let proxy_2 = test_realm
658                .root
659                .connect_to_protocol_at_exposed_dir::<fdebug::PublisherMarker>()
660                .expect("connect to publisher");
661            let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
662            let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
663            proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
664            drop(vmo_token_2);
665            drop(proxy_1);
666            drop(proxy_2);
667
668            vmo_request_received_recv.take(2).collect::<()>().await;
669            test_realm.destroy().await.expect("destroy test realm");
670        };
671
672        let (event_sender, event_recv) = mpsc::channel(10);
673        // Future that collects VMOs from the test realm and forwards
674        // them to fuchsia.debugdata.Publisher
675        let processor_fut = processor
676            .collect_and_serve_for_suite(event_sender)
677            .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
678        // Future that collects produced debug artifact and asserts on contents.
679        let assertion_fut = async move {
680            let mut events: Vec<_> = event_recv.collect().await;
681            assert_eq!(events.len(), 1);
682            if let SuiteEventPayload::DebugData(iterator) =
683                events.pop().unwrap().unwrap().into_payload()
684            {
685                let iterator_proxy = iterator.into_proxy();
686                let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
687                    .and_then(|debug_data| async move {
688                        Ok((
689                            debug_data.name.unwrap(),
690                            collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
691                                .await
692                                .expect("read socket"),
693                        ))
694                    })
695                    .try_collect()
696                    .await
697                    .expect("file collection");
698                let expected = hashset! {
699                    ("data-sink-1".to_string(), TEST_URL.to_string()),
700                    ("data-sink-2".to_string(), TEST_URL.to_string()),
701                };
702                assert_eq!(files, expected);
703            } else {
704                assert!(false); // Event payload was not DebugData
705            }
706        };
707
708        futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
709    }
710
711    #[fuchsia::test]
712    fn single_publisher_connection_send_vmo_when_ready() {
713        const TEST_URL: &str = "test-url";
714        let mut executor = fasync::TestExecutor::new();
715
716        let (vmo_send, vmo_recv) = mpsc::channel(5);
717        let mut vmo_chunk_stream = vmo_recv.ready_chunks(5).boxed();
718        let (publisher_proxy, publisher_stream) =
719            create_proxy_and_stream::<fdebug::PublisherMarker>();
720        let mut serve_fut =
721            serve_publisher(publisher_stream, TEST_URL, DebugDataSender { sender: vmo_send })
722                .boxed();
723
724        let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
725        let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
726        let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
727        let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
728
729        publisher_proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
730        publisher_proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
731        drop(vmo_token_1);
732
733        // After this point vmo 1 should be ready for processing and passed on to processor, but
734        // vmo 2 should not.
735
736        assert!(executor.run_until_stalled(&mut serve_fut).is_pending());
737        let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed())
738        {
739            Poll::Pending => panic!("vmos should be ready"),
740            Poll::Ready(Some(vmos)) => vmos,
741            Poll::Ready(None) => panic!("stream closed prematurely"),
742        };
743        assert_eq!(ready_vmos.len(), 1);
744        let ready_vmo = ready_vmos.pop().unwrap();
745        assert_eq!(ready_vmo.test_url.as_str(), TEST_URL);
746        assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-1");
747
748        // After dropping vmo token 2 it should be passed to the processor.
749        drop(vmo_token_2);
750        drop(publisher_proxy);
751        match executor.run_until_stalled(&mut serve_fut) {
752            futures::task::Poll::Ready(Ok(())) => (),
753            other => panic!("Expected poll to be ready but was {:?}", other),
754        }
755
756        let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed())
757        {
758            Poll::Pending => panic!("vmos should be ready"),
759            Poll::Ready(Some(vmos)) => vmos,
760            Poll::Ready(None) => panic!("stream closed prematurely"),
761        };
762        assert_eq!(ready_vmos.len(), 1);
763        let ready_vmo = ready_vmos.pop().unwrap();
764        assert_eq!(ready_vmo.test_url.as_str(), TEST_URL);
765        assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-2");
766
767        match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed()) {
768            Poll::Pending => panic!("vmos should be ready"),
769            Poll::Ready(None) => (),
770            Poll::Ready(Some(vmos)) => panic!("Expected stream to terminate but got {:?}", vmos),
771        };
772    }
773}