test_manager_lib/
debug_data_processor.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::debug_data_server;
6use crate::run_events::{RunEvent, SuiteEvents};
7use anyhow::Error;
8use fidl::endpoints::create_endpoints;
9use fidl_fuchsia_test_manager::LaunchError;
10use fuchsia_component::client::connect_to_protocol;
11use fuchsia_component::server::ServiceFs;
12use fuchsia_component_test::LocalComponentHandles;
13use fuchsia_fs::directory::open_channel_in_namespace;
14use fuchsia_fs::{PERM_READABLE, PERM_WRITABLE};
15use futures::channel::mpsc;
16use futures::future::FutureExt;
17use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
18use futures::{pin_mut, select_biased, SinkExt};
19use log::info;
20use {
21    fidl_fuchsia_debugdata as fdebug, fidl_fuchsia_io as fio,
22    fidl_fuchsia_test_debug as ftest_debug, fuchsia_async as fasync,
23};
24
25/// Processor that collects debug data and serves the iterator sending data back to a test
26/// executor.
27pub(crate) struct DebugDataProcessor {
28    directory: DebugDataDirectory,
29    receiver: mpsc::Receiver<ftest_debug::DebugVmo>,
30    proxy_init_fn: Box<dyn FnOnce() -> Result<ftest_debug::DebugDataProcessorProxy, Error>>,
31}
32
33/// Sender used to pass VMOs back to |DebugDataProcessor|.
34#[derive(Clone)]
35pub(crate) struct DebugDataSender {
36    sender: mpsc::Sender<ftest_debug::DebugVmo>,
37}
38
39/// Directory used to store collected debug data.
40#[derive(Debug)]
41pub enum DebugDataDirectory {
42    /// An isolated directory is owned purely by the |DebugDataProcessor| it is given to, and will
43    /// be torn down when the |DebugDataProcessor| is terminated.
44    Isolated { parent: &'static str },
45    /// An accumulated directory may be shared between multiple |DebugDataProcessor|s. Contents
46    /// will not be torn down.
47    Accumulating { dir: &'static str },
48}
49
50impl DebugDataProcessor {
51    const MAX_SENT_VMOS: usize = 10;
52    /// Create a new |DebugDataProcessor| for processing VMOs, and |DebugDataSender| for passing
53    /// it VMOs.
54    pub fn new(directory: DebugDataDirectory) -> (Self, DebugDataSender) {
55        let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS);
56        (
57            Self {
58                directory,
59                receiver,
60                proxy_init_fn: Box::new(|| {
61                    connect_to_protocol::<ftest_debug::DebugDataProcessorMarker>()
62                        .map_err(Error::from)
63                }),
64            },
65            DebugDataSender { sender },
66        )
67    }
68
69    /// Create a new |DebugDataProcessor| for processing VMOs, |DebugDataSender| for passing
70    /// it VMOs, and the |fuchsia.test.debug.DebugDataProcessor| stream to which the processor
71    /// will connect.
72    #[cfg(test)]
73    pub(crate) fn new_for_test(directory: DebugDataDirectory) -> DebugDataForTestResult {
74        let (sender, receiver) = futures::channel::mpsc::channel(Self::MAX_SENT_VMOS);
75        let (proxy, stream) =
76            fidl::endpoints::create_proxy_and_stream::<ftest_debug::DebugDataProcessorMarker>();
77        let maybe_proxy = std::sync::Mutex::new(Some(proxy));
78        DebugDataForTestResult {
79            processor: Self {
80                directory,
81                receiver,
82                proxy_init_fn: Box::new(move || Ok(maybe_proxy.lock().unwrap().take().unwrap())),
83            },
84            sender: DebugDataSender { sender },
85            stream: stream,
86        }
87    }
88
89    /// Collect debug data produced by the corresponding |DebugDataSender|, and serve the resulting
90    /// data. In case debug data is produced, sends the event over |run_event_sender|.
91    pub async fn collect_and_serve(
92        self,
93        run_event_sender: mpsc::Sender<RunEvent>,
94    ) -> Result<(), Error> {
95        let Self { directory, receiver, proxy_init_fn } = self;
96
97        // Avoid setting up resources in the common case where no debug data is produced.
98        let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable();
99        pin_mut!(peekable_reciever);
100        if peekable_reciever.as_mut().peek().await.is_none() {
101            return Ok(());
102        }
103
104        enum MaybeOwnedDirectory {
105            Owned(tempfile::TempDir),
106            Unowned(&'static str),
107        }
108        let debug_directory = match directory {
109            DebugDataDirectory::Isolated { parent } => {
110                MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?)
111            }
112            DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir),
113        };
114        let debug_directory_path = match &debug_directory {
115            MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(),
116            MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir),
117        };
118
119        let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>();
120        open_channel_in_namespace(
121            &debug_directory_path,
122            PERM_READABLE | PERM_WRITABLE,
123            server_end,
124        )?;
125
126        let proxy = proxy_init_fn()?;
127        proxy.set_directory(directory_proxy)?;
128        while let Some(chunk) = peekable_reciever.next().await {
129            proxy.add_debug_vmos(chunk).await?;
130        }
131        proxy.finish().await?;
132
133        debug_data_server::serve_directory(&debug_directory_path, run_event_sender).await?;
134
135        if let MaybeOwnedDirectory::Owned(tmp) = debug_directory {
136            tmp.close()?;
137        }
138        Ok(())
139    }
140
141    /// Collect debug data produced by the corresponding |DebugDataSender|, and serve the resulting
142    /// data. In case debug data is produced, sends the event over |suite_event_sender|.
143    pub async fn collect_and_serve_for_suite(
144        self,
145        suite_event_sender: mpsc::Sender<Result<SuiteEvents, LaunchError>>,
146    ) -> Result<(), Error> {
147        let Self { directory, receiver, proxy_init_fn } = self;
148
149        // Avoid setting up resources in the common case where no debug data is produced.
150        let peekable_reciever = receiver.ready_chunks(Self::MAX_SENT_VMOS).peekable();
151        pin_mut!(peekable_reciever);
152        if peekable_reciever.as_mut().peek().await.is_none() {
153            return Ok(());
154        }
155
156        enum MaybeOwnedDirectory {
157            Owned(tempfile::TempDir),
158            Unowned(&'static str),
159        }
160        let debug_directory = match directory {
161            DebugDataDirectory::Isolated { parent } => {
162                MaybeOwnedDirectory::Owned(tempfile::TempDir::new_in(parent)?)
163            }
164            DebugDataDirectory::Accumulating { dir } => MaybeOwnedDirectory::Unowned(dir),
165        };
166        let debug_directory_path = match &debug_directory {
167            MaybeOwnedDirectory::Owned(tmp) => tmp.path().to_string_lossy(),
168            MaybeOwnedDirectory::Unowned(dir) => std::borrow::Cow::Borrowed(*dir),
169        };
170
171        let (directory_proxy, server_end) = create_endpoints::<fio::DirectoryMarker>();
172        open_channel_in_namespace(
173            &debug_directory_path,
174            PERM_READABLE | PERM_WRITABLE,
175            server_end,
176        )?;
177
178        let proxy = proxy_init_fn()?;
179        proxy.set_directory(directory_proxy)?;
180        while let Some(chunk) = peekable_reciever.next().await {
181            proxy.add_debug_vmos(chunk).await?;
182        }
183        proxy.finish().await?;
184
185        debug_data_server::serve_directory_for_suite(&debug_directory_path, suite_event_sender)
186            .await?;
187
188        if let MaybeOwnedDirectory::Owned(tmp) = debug_directory {
189            tmp.close()?;
190        }
191        Ok(())
192    }
193}
194
195#[cfg(test)]
196pub(crate) struct DebugDataForTestResult {
197    pub processor: DebugDataProcessor,
198    pub sender: DebugDataSender,
199    pub stream: ftest_debug::DebugDataProcessorRequestStream,
200}
201
202/// Serve |fuchsia.debugdata.Publisher| as a RealmBuilder mock. Collected VMOs are sent over
203/// |debug_data_sender| for processing. |started_event| is signalled once the mock is ready
204/// to serve requests.
205// TODO(https://fxbug.dev/42056523): |started_event| is added as part of a synchronization mechanism to
206// work around cases when a component is destroyed before starting, even though there is a
207// request. Remove when no longer needed.
208pub(crate) async fn serve_debug_data_publisher(
209    handles: LocalComponentHandles,
210    test_url: String,
211    debug_data_sender: DebugDataSender,
212    started_event: async_utils::event::Event,
213) -> Result<(), Error> {
214    let mut fs = ServiceFs::new();
215    // Register a notifier so that this mock isn't immediately killed - it needs to drain
216    // debug data.
217    let stop_recv = handles.register_stop_notifier().await;
218    started_event.signal();
219
220    fs.dir("svc").add_fidl_service(|stream: fdebug::PublisherRequestStream| stream);
221    fs.serve_connection(handles.outgoing_dir)?;
222
223    let mut drain_tasks = FuturesUnordered::new();
224    drain_tasks.push(fasync::Task::spawn(async move {
225        let _ = stop_recv.await;
226        Ok(())
227    }));
228
229    let mut got_requests = false;
230
231    loop {
232        select_biased! {
233            maybe_stream = fs.next().fuse() => match maybe_stream {
234                None => {
235                    if !got_requests {
236                        info!("Got no debug data requests for {}", test_url);
237                    }
238                    return drain_tasks.try_collect::<()>().await;
239                },
240                Some(stream) => {
241                    got_requests = true;
242                    let sender_clone = debug_data_sender.clone();
243                    let url_clone = test_url.clone();
244                    drain_tasks.push(fasync::Task::spawn(async move {
245                        serve_publisher(stream, &url_clone, sender_clone).await?;
246                        Ok(())
247                    }));
248                }
249            },
250            // Poll for completion of both stop_recv and any futures serving the publisher
251            // together. This allows us to accept any new serve requests even if stop is
252            // called, so long as at least one other request is still being served.
253            maybe_result = drain_tasks.next() => match maybe_result {
254                Some(result) => {
255                    result?;
256                },
257                None => {
258                    if !got_requests {
259                        info!("Got no debug data requests for {}", test_url);
260                    }
261                    return Ok(());
262                }
263            },
264        };
265    }
266}
267
268async fn serve_publisher(
269    stream: fdebug::PublisherRequestStream,
270    test_url: &str,
271    debug_data_sender: DebugDataSender,
272) -> Result<(), Error> {
273    stream
274        .map(Ok)
275        .try_for_each_concurrent(None, |req| {
276            let test_url = test_url.to_string();
277            let mut sender_clone = debug_data_sender.clone();
278            async move {
279                let fdebug::PublisherRequest::Publish { data_sink, data, vmo_token, .. } = req?;
280                // Wait for the token handle to close before sending the VMO for processing.
281                // This allows the client to continue modifying the VMO after it has sent it.
282                // See |fuchsia.debugdata.Publisher| protocol for details.
283                fasync::OnSignals::new(&vmo_token, zx::Signals::EVENTPAIR_PEER_CLOSED).await?;
284                let _ = sender_clone
285                    .sender
286                    .send(ftest_debug::DebugVmo { test_url, data_sink, vmo: data })
287                    .await;
288                Ok(())
289            }
290        })
291        .await
292}
293
294#[cfg(test)]
295mod test {
296    use super::*;
297    use crate::run_events::{RunEventPayload, SuiteEventPayload};
298    use crate::utilities::stream_fn;
299    use fidl::endpoints::create_proxy_and_stream;
300    use fuchsia_component_test::{
301        Capability, ChildOptions, RealmBuilder, RealmInstance, Ref, Route,
302    };
303    use fuchsia_fs::Flags;
304    use futures::TryFutureExt;
305    use maplit::hashset;
306    use std::collections::HashSet;
307    use std::task::Poll;
308    use test_manager_test_lib::collect_string_from_socket_helper;
309
310    const VMO_SIZE: u64 = 4096;
311
312    /// Runs a fake test processor implementation that, for each VMO received, creates
313    /// a new file called "data_sink" and writes the test_url inside it.
314    /// |debug_vmo_recevied_sender| is a synchronization hack that sends one message for
315    /// each vmo received. It is a workaround for the
316    /// Started/Destroyed/CapabilityRequested events being delivered out of order.
317    /// See https://fxbug.dev/42156498.
318    async fn run_test_processor(
319        mut stream: ftest_debug::DebugDataProcessorRequestStream,
320        mut debug_vmo_recevied_sender: mpsc::Sender<()>,
321    ) {
322        let req = stream.try_next().await.expect("get first request").unwrap();
323        let dir = match req {
324            ftest_debug::DebugDataProcessorRequest::SetDirectory { directory, .. } => {
325                directory.into_proxy()
326            }
327            other => panic!("First request should be SetDirectory but got {:?}", other),
328        };
329
330        let mut collected_vmos = vec![];
331        let mut finish_responder = None;
332        while let Some(req) = stream.try_next().await.expect("get request") {
333            match req {
334                ftest_debug::DebugDataProcessorRequest::SetDirectory { .. } => {
335                    panic!("Set directory called twice")
336                }
337                ftest_debug::DebugDataProcessorRequest::AddDebugVmos {
338                    mut vmos,
339                    responder,
340                    ..
341                } => {
342                    let num_vmos = vmos.len();
343                    collected_vmos.append(&mut vmos);
344                    let _ = responder.send();
345                    for _ in 0..num_vmos {
346                        let _ = debug_vmo_recevied_sender.send(()).await;
347                    }
348                }
349                ftest_debug::DebugDataProcessorRequest::Finish { responder, .. } => {
350                    finish_responder = Some(responder);
351                    break;
352                }
353            }
354        }
355
356        for ftest_debug::DebugVmo { data_sink, test_url, .. } in collected_vmos {
357            let file = fuchsia_fs::directory::open_file_async(
358                &dir,
359                &data_sink,
360                Flags::FLAG_MAYBE_CREATE | PERM_WRITABLE,
361            )
362            .expect("open file");
363            fuchsia_fs::file::write(&file, &test_url).await.expect("write file");
364        }
365        finish_responder.unwrap().send().unwrap();
366    }
367
368    async fn construct_test_realm(
369        sender: DebugDataSender,
370        test_url: &'static str,
371    ) -> Result<RealmInstance, Error> {
372        let builder = RealmBuilder::new().await?;
373
374        let processor = builder
375            .add_local_child(
376                "processor",
377                move |handles| {
378                    Box::pin(serve_debug_data_publisher(
379                        handles,
380                        test_url.to_string(),
381                        sender.clone(),
382                        async_utils::event::Event::new(),
383                    ))
384                },
385                ChildOptions::new().eager(),
386            )
387            .await?;
388        builder
389            .add_route(
390                Route::new()
391                    .capability(Capability::protocol::<fdebug::PublisherMarker>())
392                    .from(&processor)
393                    .to(Ref::parent()),
394            )
395            .await?;
396
397        let instance = builder.build().await?;
398        Ok(instance)
399    }
400
401    fn isolated_dir() -> DebugDataDirectory {
402        DebugDataDirectory::Isolated { parent: "/tmp" }
403    }
404
405    #[fuchsia::test]
406    async fn serve_no_requests() {
407        const TEST_URL: &str = "test-url";
408        let DebugDataForTestResult { processor, sender, stream } =
409            DebugDataProcessor::new_for_test(isolated_dir());
410        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
411        test_realm.destroy().await.expect("destroy test realm");
412
413        let (event_sender, event_recv) = mpsc::channel(1);
414        processor.collect_and_serve(event_sender).await.unwrap();
415
416        assert!(stream.collect::<Vec<_>>().await.is_empty());
417        assert!(event_recv.collect::<Vec<_>>().await.is_empty());
418    }
419
420    #[fuchsia::test]
421    async fn serve_for_suite_no_requests() {
422        const TEST_URL: &str = "test-url";
423        let DebugDataForTestResult { processor, sender, stream } =
424            DebugDataProcessor::new_for_test(isolated_dir());
425        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
426        test_realm.destroy().await.expect("destroy test realm");
427
428        let (event_sender, event_recv) = mpsc::channel(1);
429        processor.collect_and_serve_for_suite(event_sender).await.unwrap();
430
431        assert!(stream.collect::<Vec<_>>().await.is_empty());
432        assert!(event_recv.collect::<Vec<_>>().await.is_empty());
433    }
434
435    #[fuchsia::test]
436    async fn serve_single_client() {
437        const TEST_URL: &str = "test-url";
438        let DebugDataForTestResult { processor, sender, stream } =
439            DebugDataProcessor::new_for_test(isolated_dir());
440        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
441
442        let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
443        // Future running fuchsia.test.debug.DebugDataProcessor.
444        let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
445        // Future running the 'test' (client of fuchsia.debugdata.Publisher)
446        let test_fut = async move {
447            let proxy: fdebug::PublisherProxy =
448                test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
449            let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
450            let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
451            proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
452            drop(vmo_token_1);
453            let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
454            let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
455            proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
456            drop(vmo_token_2);
457            drop(proxy);
458
459            vmo_request_received_recv.take(1).collect::<()>().await;
460            test_realm.destroy().await.expect("destroy test realm");
461        };
462
463        let (event_sender, event_recv) = mpsc::channel(10);
464        // Future that collects VMOs from the test realm and forwards
465        // them to fuchsia.debugdata.Publisher
466        let processor_fut = processor
467            .collect_and_serve(event_sender)
468            .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
469        // Future that collects produced debug artifact and asserts on contents.
470        let assertion_fut = async move {
471            let mut events: Vec<_> = event_recv.collect().await;
472            assert_eq!(events.len(), 1);
473            let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload();
474            let iterator_proxy = iterator.into_proxy();
475            let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
476                .and_then(|debug_data| async move {
477                    Ok((
478                        debug_data.name.unwrap(),
479                        collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
480                            .await
481                            .expect("Cannot read socket"),
482                    ))
483                })
484                .try_collect()
485                .await
486                .expect("file collection");
487            let expected = hashset! {
488                ("data-sink-1".to_string(), TEST_URL.to_string()),
489                ("data-sink-2".to_string(), TEST_URL.to_string()),
490            };
491            assert_eq!(files, expected);
492        };
493
494        futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
495    }
496
497    #[fuchsia::test]
498    async fn serve_for_suite_single_client() {
499        const TEST_URL: &str = "test-url";
500        let DebugDataForTestResult { processor, sender, stream } =
501            DebugDataProcessor::new_for_test(isolated_dir());
502        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
503
504        let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
505        // Future running fuchsia.test.debug.DebugDataProcessor.
506        let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
507        // Future running the 'test' (client of fuchsia.debugdata.Publisher)
508        let test_fut = async move {
509            let proxy: fdebug::PublisherProxy =
510                test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
511            let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
512            let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
513            proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
514            drop(vmo_token_1);
515            let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
516            let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
517            proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
518            drop(vmo_token_2);
519            drop(proxy);
520
521            vmo_request_received_recv.take(1).collect::<()>().await;
522            test_realm.destroy().await.expect("destroy test realm");
523        };
524
525        let (event_sender, event_recv) = mpsc::channel(10);
526        // Future that collects VMOs from the test realm and forwards
527        // them to fuchsia.debugdata.Publisher
528        let processor_fut = processor
529            .collect_and_serve_for_suite(event_sender)
530            .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
531        // Future that collects produced debug artifact and asserts on contents.
532        let assertion_fut = async move {
533            let mut events: Vec<_> = event_recv.collect().await;
534            assert_eq!(events.len(), 1);
535            if let SuiteEventPayload::DebugData(iterator) =
536                events.pop().unwrap().unwrap().into_payload()
537            {
538                let iterator_proxy = iterator.into_proxy();
539                let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
540                    .and_then(|debug_data| async move {
541                        Ok((
542                            debug_data.name.unwrap(),
543                            collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
544                                .await
545                                .expect("Cannot read socket"),
546                        ))
547                    })
548                    .try_collect()
549                    .await
550                    .expect("file collection");
551                let expected = hashset! {
552                    ("data-sink-1".to_string(), TEST_URL.to_string()),
553                    ("data-sink-2".to_string(), TEST_URL.to_string()),
554                };
555                assert_eq!(files, expected);
556            } else {
557                assert!(false); // Event payload was not DebugData
558            }
559        };
560
561        futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
562    }
563
564    #[fuchsia::test]
565    async fn serve_multiple_client() {
566        const TEST_URL: &str = "test-url";
567        let DebugDataForTestResult { processor, sender, stream } =
568            DebugDataProcessor::new_for_test(isolated_dir());
569        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
570
571        let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
572        // Future running fuchsia.test.debug.DebugDataProcessor.
573        let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
574        // Future running the 'test' (client of fuchsia.debugdata.Publisher)
575        let test_fut = async move {
576            let proxy_1: fdebug::PublisherProxy =
577                test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
578            let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
579            let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
580            proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
581            drop(vmo_token_1);
582            let proxy_2: fdebug::PublisherProxy =
583                test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
584            let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
585            let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
586            proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
587            drop(vmo_token_2);
588            drop(proxy_1);
589            drop(proxy_2);
590
591            vmo_request_received_recv.take(2).collect::<()>().await;
592            test_realm.destroy().await.expect("destroy test realm");
593        };
594
595        let (event_sender, event_recv) = mpsc::channel(10);
596        // Future that collects VMOs from the test realm and forwards
597        // them to fuchsia.debugdata.Publisher
598        let processor_fut = processor
599            .collect_and_serve(event_sender)
600            .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
601        // Future that collects produced debug artifact and asserts on contents.
602        let assertion_fut = async move {
603            let mut events: Vec<_> = event_recv.collect().await;
604            assert_eq!(events.len(), 1);
605            let RunEventPayload::DebugData(iterator) = events.pop().unwrap().into_payload();
606            let iterator_proxy = iterator.into_proxy();
607            let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
608                .and_then(|debug_data| async move {
609                    Ok((
610                        debug_data.name.unwrap(),
611                        collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
612                            .await
613                            .expect("read socket"),
614                    ))
615                })
616                .try_collect()
617                .await
618                .expect("file collection");
619            let expected = hashset! {
620                ("data-sink-1".to_string(), TEST_URL.to_string()),
621                ("data-sink-2".to_string(), TEST_URL.to_string()),
622            };
623            assert_eq!(files, expected);
624        };
625
626        futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
627    }
628
629    #[fuchsia::test]
630    async fn serve_for_suite_multiple_client() {
631        const TEST_URL: &str = "test-url";
632        let DebugDataForTestResult { processor, sender, stream } =
633            DebugDataProcessor::new_for_test(isolated_dir());
634        let test_realm = construct_test_realm(sender, TEST_URL).await.expect("build test realm");
635
636        let (vmo_request_received_send, vmo_request_received_recv) = mpsc::channel(2);
637        // Future running fuchsia.test.debug.DebugDataProcessor.
638        let processor_server_fut = run_test_processor(stream, vmo_request_received_send);
639        // Future running the 'test' (client of fuchsia.debugdata.Publisher)
640        let test_fut = async move {
641            let proxy_1: fdebug::PublisherProxy =
642                test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
643            let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
644            let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
645            proxy_1.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
646            drop(vmo_token_1);
647            let proxy_2: fdebug::PublisherProxy =
648                test_realm.root.connect_to_protocol_at_exposed_dir().expect("connect to publisher");
649            let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
650            let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
651            proxy_2.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
652            drop(vmo_token_2);
653            drop(proxy_1);
654            drop(proxy_2);
655
656            vmo_request_received_recv.take(2).collect::<()>().await;
657            test_realm.destroy().await.expect("destroy test realm");
658        };
659
660        let (event_sender, event_recv) = mpsc::channel(10);
661        // Future that collects VMOs from the test realm and forwards
662        // them to fuchsia.debugdata.Publisher
663        let processor_fut = processor
664            .collect_and_serve_for_suite(event_sender)
665            .unwrap_or_else(|e| panic!("processor failed: {:?}", e));
666        // Future that collects produced debug artifact and asserts on contents.
667        let assertion_fut = async move {
668            let mut events: Vec<_> = event_recv.collect().await;
669            assert_eq!(events.len(), 1);
670            if let SuiteEventPayload::DebugData(iterator) =
671                events.pop().unwrap().unwrap().into_payload()
672            {
673                let iterator_proxy = iterator.into_proxy();
674                let files: HashSet<_> = stream_fn(move || iterator_proxy.get_next_compressed())
675                    .and_then(|debug_data| async move {
676                        Ok((
677                            debug_data.name.unwrap(),
678                            collect_string_from_socket_helper(debug_data.socket.unwrap(), true)
679                                .await
680                                .expect("read socket"),
681                        ))
682                    })
683                    .try_collect()
684                    .await
685                    .expect("file collection");
686                let expected = hashset! {
687                    ("data-sink-1".to_string(), TEST_URL.to_string()),
688                    ("data-sink-2".to_string(), TEST_URL.to_string()),
689                };
690                assert_eq!(files, expected);
691            } else {
692                assert!(false); // Event payload was not DebugData
693            }
694        };
695
696        futures::future::join4(processor_server_fut, test_fut, processor_fut, assertion_fut).await;
697    }
698
699    #[fuchsia::test]
700    fn single_publisher_connection_send_vmo_when_ready() {
701        const TEST_URL: &str = "test-url";
702        let mut executor = fasync::TestExecutor::new();
703
704        let (vmo_send, vmo_recv) = mpsc::channel(5);
705        let mut vmo_chunk_stream = vmo_recv.ready_chunks(5).boxed();
706        let (publisher_proxy, publisher_stream) =
707            create_proxy_and_stream::<fdebug::PublisherMarker>();
708        let mut serve_fut =
709            serve_publisher(publisher_stream, TEST_URL, DebugDataSender { sender: vmo_send })
710                .boxed();
711
712        let vmo_1 = zx::Vmo::create(VMO_SIZE).unwrap();
713        let (vmo_token_1, vmo_token_server_1) = zx::EventPair::create();
714        let vmo_2 = zx::Vmo::create(VMO_SIZE).unwrap();
715        let (vmo_token_2, vmo_token_server_2) = zx::EventPair::create();
716
717        publisher_proxy.publish("data-sink-1", vmo_1, vmo_token_server_1).expect("publish vmo");
718        publisher_proxy.publish("data-sink-2", vmo_2, vmo_token_server_2).expect("publish vmo");
719        drop(vmo_token_1);
720
721        // After this point vmo 1 should be ready for processing and passed on to processor, but
722        // vmo 2 should not.
723
724        assert!(executor.run_until_stalled(&mut serve_fut).is_pending());
725        let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed())
726        {
727            Poll::Pending => panic!("vmos should be ready"),
728            Poll::Ready(Some(vmos)) => vmos,
729            Poll::Ready(None) => panic!("stream closed prematurely"),
730        };
731        assert_eq!(ready_vmos.len(), 1);
732        let ready_vmo = ready_vmos.pop().unwrap();
733        assert_eq!(ready_vmo.test_url.as_str(), TEST_URL);
734        assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-1");
735
736        // After dropping vmo token 2 it should be passed to the processor.
737        drop(vmo_token_2);
738        drop(publisher_proxy);
739        match executor.run_until_stalled(&mut serve_fut) {
740            futures::task::Poll::Ready(Ok(())) => (),
741            other => panic!("Expected poll to be ready but was {:?}", other),
742        }
743
744        let mut ready_vmos = match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed())
745        {
746            Poll::Pending => panic!("vmos should be ready"),
747            Poll::Ready(Some(vmos)) => vmos,
748            Poll::Ready(None) => panic!("stream closed prematurely"),
749        };
750        assert_eq!(ready_vmos.len(), 1);
751        let ready_vmo = ready_vmos.pop().unwrap();
752        assert_eq!(ready_vmo.test_url.as_str(), TEST_URL);
753        assert_eq!(ready_vmo.data_sink.as_str(), "data-sink-2");
754
755        match executor.run_until_stalled(&mut vmo_chunk_stream.next().boxed()) {
756            Poll::Pending => panic!("vmos should be ready"),
757            Poll::Ready(None) => (),
758            Poll::Ready(Some(vmos)) => panic!("Expected stream to terminate but got {:?}", vmos),
759        };
760    }
761}