run_test_suite_lib/
artifacts.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::artifacts;
6use crate::cancel::NamedFutureExt;
7use crate::diagnostics::{self, LogCollectionOutcome};
8use crate::outcome::{RunTestSuiteError, UnexpectedEventError};
9use crate::output::{
10    ArtifactType, DirectoryArtifactType, DynDirectoryArtifact, DynReporter, EntityReporter,
11};
12use crate::stream_util::StreamUtil;
13use anyhow::{anyhow, Context as _};
14use fidl::Peered;
15use futures::future::{join_all, BoxFuture, FutureExt, TryFutureExt};
16use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
17use futures::AsyncReadExt;
18use log::{debug, warn};
19use std::borrow::Borrow;
20use std::collections::VecDeque;
21use std::io::Write;
22use std::path::PathBuf;
23use test_diagnostics::zstd_compress::Decoder;
24use {fidl_fuchsia_io as fio, fidl_fuchsia_test_manager as ftest_manager, fuchsia_async as fasync};
25
26/// Given an |artifact| reported over fuchsia.test.manager, create the appropriate artifact in the
27/// reporter. Returns a Future, which when polled to completion, drains the results from |artifact|
28/// and saves them to the reporter.
29///
30/// This method is an async method returning a Future so that the lifetime of |reporter| is not
31/// tied to the lifetime of the Future.
32/// The returned Future resolves to LogCollectionOutcome when logs are processed.
33pub(crate) async fn drain_artifact<'a, E, T>(
34    reporter: &'a EntityReporter<E, T>,
35    artifact: ftest_manager::Artifact,
36    log_opts: diagnostics::LogCollectionOptions,
37) -> Result<
38    BoxFuture<'static, Result<Option<LogCollectionOutcome>, anyhow::Error>>,
39    RunTestSuiteError,
40>
41where
42    T: Borrow<DynReporter>,
43{
44    match artifact {
45        ftest_manager::Artifact::Stdout(socket) => {
46            let stdout = reporter.new_artifact(&ArtifactType::Stdout)?;
47            Ok(copy_socket_artifact(socket, stdout).map_ok(|_| None).named("stdout").boxed())
48        }
49        ftest_manager::Artifact::Stderr(socket) => {
50            let stderr = reporter.new_artifact(&ArtifactType::Stderr)?;
51            Ok(copy_socket_artifact(socket, stderr).map_ok(|_| None).named("stderr").boxed())
52        }
53        ftest_manager::Artifact::Log(syslog) => {
54            let syslog_artifact = reporter.new_artifact(&ArtifactType::Syslog)?;
55            Ok(diagnostics::collect_logs(
56                test_diagnostics::LogStream::from_syslog(syslog)?,
57                syslog_artifact,
58                log_opts,
59            )
60            .map_ok(Some)
61            .named("syslog")
62            .boxed())
63        }
64        ftest_manager::Artifact::Custom(ftest_manager::CustomArtifact {
65            directory_and_token,
66            component_moniker,
67            ..
68        }) => {
69            let ftest_manager::DirectoryAndToken { directory, token, .. } = directory_and_token
70                .ok_or(UnexpectedEventError::MissingRequiredField {
71                    containing_struct: "CustomArtifact",
72                    field: "directory_and_token",
73                })?;
74            let directory_artifact = reporter
75                .new_directory_artifact(&DirectoryArtifactType::Custom, component_moniker)?;
76            Ok(async move {
77                let directory = directory.into_proxy();
78                let result =
79                    artifacts::copy_custom_artifact_directory(directory, directory_artifact).await;
80                // TODO(https://fxbug.dev/42165719): Remove this signal once Overnet
81                // supports automatically signalling EVENTPAIR_CLOSED when the
82                // handle is closed.
83                let _ = token.signal_peer(fidl::Signals::empty(), fidl::Signals::USER_0);
84                result
85            }
86            .map_ok(|()| None)
87            .named("custom_artifacts")
88            .boxed())
89        }
90        ftest_manager::Artifact::DebugData(iterator) => {
91            let output_directory = reporter
92                .new_directory_artifact(&DirectoryArtifactType::Debug, None /* moniker */)?;
93            Ok(artifacts::copy_debug_data(iterator.into_proxy(), output_directory)
94                .map(|()| Ok(None))
95                .named("debug_data")
96                .boxed())
97        }
98        ftest_manager::ArtifactUnknown!() => {
99            warn!("Encountered an unknown artifact");
100            Ok(futures::future::ready(Ok(None)).boxed())
101        }
102    }
103}
104
105/// Copy an artifact reported over a socket.
106async fn copy_socket_artifact<W: Write>(
107    socket: fidl::Socket,
108    mut artifact: W,
109) -> Result<usize, anyhow::Error> {
110    let mut async_socket = fidl::AsyncSocket::from_socket(socket);
111    let mut len = 0;
112    loop {
113        let done =
114            test_diagnostics::SocketReadFut::new(&mut async_socket, |maybe_buf| match maybe_buf {
115                Some(buf) => {
116                    len += buf.len();
117                    artifact.write_all(buf)?;
118                    Ok(false)
119                }
120                None => Ok(true),
121            })
122            .await?;
123        if done {
124            artifact.flush()?;
125            return Ok(len);
126        }
127    }
128}
129
130/// Copy and decompress (zstd) the artifact reported over a socket.
131/// Returns (decompressed, compressed) sizes.
132async fn copy_socket_artifact_and_decompress<W: Write>(
133    socket: fidl::Socket,
134    mut artifact: W,
135) -> Result<(usize, usize), anyhow::Error> {
136    let mut async_socket = fidl::AsyncSocket::from_socket(socket);
137    let mut buf = vec![0u8; 1024 * 1024 * 2];
138
139    let (mut decoder, mut receiver) = Decoder::new();
140    let task: fasync::Task<Result<usize, anyhow::Error>> = fasync::Task::spawn(async move {
141        let mut len = 0;
142        loop {
143            let l = async_socket.read(&mut buf).await?;
144            match l {
145                0 => {
146                    decoder.finish().await?;
147                    break;
148                }
149                _ => {
150                    len += l;
151                    decoder.decompress(&buf[..l]).await?;
152                }
153            }
154        }
155        Ok(len)
156    });
157
158    let mut decompressed_len = 0;
159    while let Some(buf) = receiver.next().await {
160        decompressed_len += buf.len();
161        artifact.write_all(&buf)?;
162    }
163    artifact.flush()?;
164
165    let compressed_len = task.await?;
166    return Ok((decompressed_len, compressed_len));
167}
168
169/// Copy debug data reported over a debug data iterator to an output directory.
170pub async fn copy_debug_data(
171    iterator: ftest_manager::DebugDataIteratorProxy,
172    output_directory: Box<DynDirectoryArtifact>,
173) {
174    let start = std::time::Instant::now();
175    const PIPELINED_REQUESTS: usize = 4;
176    let unprocessed_data_stream =
177        futures::stream::repeat_with(move || iterator.get_next_compressed())
178            .buffered(PIPELINED_REQUESTS);
179    let terminated_event_stream =
180        unprocessed_data_stream.take_until_stop_after(|result| match &result {
181            Ok(events) => events.is_empty(),
182            _ => true,
183        });
184
185    let data_futs = terminated_event_stream
186        .map(|result| match result {
187            Ok(vals) => vals,
188            Err(e) => {
189                warn!("Request failure: {:?}", e);
190                vec![]
191            }
192        })
193        .map(futures::stream::iter)
194        .flatten()
195        .map(|debug_data| {
196            let output =
197                debug_data.name.as_ref().ok_or_else(|| anyhow!("Missing profile name")).and_then(
198                    |name| {
199                        output_directory.new_file(&PathBuf::from(name)).map_err(anyhow::Error::from)
200                    },
201                );
202            fasync::Task::spawn(async move {
203                let _ = &debug_data;
204                let mut output = output?;
205                let socket =
206                    debug_data.socket.ok_or_else(|| anyhow!("Missing profile socket handle"))?;
207                debug!("Reading run profile \"{:?}\"", debug_data.name);
208                let start = std::time::Instant::now();
209                let (decompressed_len, compressed_len) =
210                    copy_socket_artifact_and_decompress(socket, &mut output).await.map_err(
211                        |e| {
212                            warn!("Error copying artifact '{:?}': {:?}", debug_data.name, e);
213                            e
214                        },
215                    )?;
216
217                debug!(
218                    "Copied file {:?}: {}({} - compressed) bytes in {:?}",
219                    debug_data.name,
220                    decompressed_len,
221                    compressed_len,
222                    start.elapsed()
223                );
224                Ok::<(), anyhow::Error>(())
225            })
226        })
227        .collect::<Vec<_>>()
228        .await;
229    join_all(data_futs).await;
230    debug!("All profiles downloaded in {:?}", start.elapsed());
231}
232
233/// Copy a directory into a directory artifact.
234async fn copy_custom_artifact_directory(
235    directory: fio::DirectoryProxy,
236    out_dir: Box<DynDirectoryArtifact>,
237) -> Result<(), anyhow::Error> {
238    let mut paths = vec![];
239    let mut enumerate = fuchsia_fs::directory::readdir_recursive(&directory, None);
240    while let Ok(Some(file)) = enumerate.try_next().await {
241        if file.kind == fuchsia_fs::directory::DirentKind::File {
242            paths.push(file.name);
243        }
244    }
245
246    let futs = FuturesUnordered::new();
247    paths.iter().for_each(|path| {
248        let file =
249            fuchsia_fs::directory::open_file_async(&directory, path, fuchsia_fs::PERM_READABLE);
250        let output_file = out_dir.new_file(std::path::Path::new(path));
251        futs.push(async move {
252            let file = file.with_context(|| format!("with path {:?}", path))?;
253            let mut output_file = output_file?;
254
255            copy_file_to_writer(&file, &mut output_file).await.map(|_| ())
256        });
257    });
258
259    futs.for_each(|result| {
260        if let Err(e) = result {
261            warn!("Custom artifact failure: {}", e);
262        }
263        async move {}
264    })
265    .await;
266
267    Ok(())
268}
269
270async fn copy_file_to_writer<T: Write>(
271    file: &fio::FileProxy,
272    output: &mut T,
273) -> Result<usize, anyhow::Error> {
274    const READ_SIZE: u64 = fio::MAX_BUF;
275
276    let mut vector = VecDeque::new();
277    // Arbitrary number of reads to pipeline.
278    const PIPELINED_READ_COUNT: u64 = 4;
279    for _n in 0..PIPELINED_READ_COUNT {
280        vector.push_back(file.read(READ_SIZE));
281    }
282    let mut len = 0;
283    loop {
284        let buf = vector.pop_front().unwrap().await?.map_err(zx_status::Status::from_raw)?;
285        if buf.is_empty() {
286            break;
287        }
288        len += buf.len();
289        output.write_all(&buf)?;
290        vector.push_back(file.read(READ_SIZE));
291    }
292    Ok(len)
293}
294
295#[cfg(test)]
296mod socket_tests {
297    use super::*;
298    use futures::AsyncWriteExt;
299
300    #[fuchsia::test]
301    async fn copy_socket() {
302        let cases = vec![vec![], b"0123456789abcde".to_vec(), vec![0u8; 4096]];
303
304        for case in cases.iter() {
305            let (client_socket, server_socket) = fidl::Socket::create_stream();
306            let mut output = vec![];
307            let write_fut = async move {
308                let mut async_socket = fidl::AsyncSocket::from_socket(server_socket);
309                async_socket.write_all(case.as_slice()).await.expect("write bytes");
310            };
311
312            let ((), res) =
313                futures::future::join(write_fut, copy_socket_artifact(client_socket, &mut output))
314                    .await;
315            res.expect("copy contents");
316            assert_eq!(output.as_slice(), case.as_slice());
317        }
318    }
319}
320
321// These tests use vfs, which is only available on Fuchsia.
322#[cfg(target_os = "fuchsia")]
323#[cfg(test)]
324mod file_tests {
325    use super::*;
326    use crate::output::InMemoryDirectoryWriter;
327    use futures::prelude::*;
328    use maplit::hashmap;
329    use std::collections::HashMap;
330    use std::sync::Arc;
331    use vfs::directory::helper::DirectlyMutable;
332    use vfs::directory::immutable::Simple;
333    use vfs::file::vmo::read_only;
334    use vfs::pseudo_directory;
335    use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
336
337    async fn serve_content_over_socket(content: Vec<u8>, socket: zx::Socket) {
338        let mut socket = fidl::AsyncSocket::from_socket(socket);
339        socket.write_all(content.as_slice()).await.expect("Cannot serve content over socket");
340    }
341
342    async fn serve_and_copy_debug_data(
343        expected_files: &HashMap<PathBuf, Vec<u8>>,
344        directory_writer: InMemoryDirectoryWriter,
345    ) {
346        let mut served_files = vec![];
347        expected_files.iter().for_each(|(path, content)| {
348            let mut compressor = zstd::bulk::Compressor::new(0).unwrap();
349            let bytes = compressor.compress(&content).unwrap();
350            let (client, server) = zx::Socket::create_stream();
351            fasync::Task::spawn(serve_content_over_socket(bytes, server)).detach();
352            served_files.push(ftest_manager::DebugData {
353                name: Some(path.display().to_string()),
354                socket: Some(client.into()),
355                ..Default::default()
356            });
357        });
358
359        let (iterator_proxy, mut iterator_stream) =
360            fidl::endpoints::create_proxy_and_stream::<ftest_manager::DebugDataIteratorMarker>();
361        let serve_fut = async move {
362            let mut files_iter = served_files.into_iter();
363            while let Ok(Some(request)) = iterator_stream.try_next().await {
364                let responder = match request {
365                    ftest_manager::DebugDataIteratorRequest::GetNext { .. } => {
366                        panic!("Not Implemented");
367                    }
368                    ftest_manager::DebugDataIteratorRequest::GetNextCompressed { responder } => {
369                        responder
370                    }
371                };
372                let resp: Vec<_> = files_iter.by_ref().take(3).collect();
373                let _ = responder.send(resp);
374            }
375        };
376        futures::future::join(
377            serve_fut,
378            copy_debug_data(iterator_proxy, Box::new(directory_writer)),
379        )
380        .await;
381    }
382
383    fn test_cases() -> Vec<(&'static str, Arc<Simple>, HashMap<PathBuf, Vec<u8>>)> {
384        vec![
385            ("empty", pseudo_directory! {}, hashmap! {}),
386            (
387                "single file",
388                pseudo_directory! {
389                    "test_file.txt" => read_only("Hello, World!"),
390                },
391                hashmap! {
392                    "test_file.txt".to_string().into() => b"Hello, World!".to_vec()
393                },
394            ),
395            (
396                "subdir",
397                pseudo_directory! {
398                    "sub" => pseudo_directory! {
399                        "nested.txt" => read_only("Nested file!"),
400                    }
401                },
402                hashmap! {
403                    "sub/nested.txt".to_string().into() => b"Nested file!".to_vec()
404                },
405            ),
406            (
407                "empty file",
408                pseudo_directory! {
409                    "empty.txt" => read_only(""),
410                },
411                hashmap! {
412                    "empty.txt".to_string().into() => b"".to_vec()
413                },
414            ),
415            (
416                "big file",
417                pseudo_directory! {
418                    "big.txt" => read_only(vec![b's'; (fio::MAX_BUF as usize)*2]),
419                },
420                hashmap! {
421                    "big.txt".to_string().into() => vec![b's'; (fio::MAX_BUF as usize) *2 as usize]
422                },
423            ),
424            (
425                "100 files",
426                {
427                    let dir = pseudo_directory! {};
428                    for i in 0..100 {
429                        dir.add_entry(
430                            format!("{:?}.txt", i),
431                            read_only(format!("contents for {:?}", i)),
432                        )
433                        .expect("add file");
434                    }
435                    dir
436                },
437                (0..100)
438                    .map(|i| {
439                        (
440                            format!("{:?}.txt", i).into(),
441                            format!("contents for {:?}", i).into_bytes(),
442                        )
443                    })
444                    .collect(),
445            ),
446        ]
447    }
448
449    #[fuchsia::test]
450    async fn test_copy_dir() {
451        for (name, fake_dir, expected_files) in test_cases() {
452            let directory =
453                vfs::directory::serve(fake_dir, fio::PERM_READABLE | fio::PERM_WRITABLE);
454            let artifact = InMemoryDirectoryWriter::default();
455            copy_custom_artifact_directory(directory, Box::new(artifact.clone()))
456                .await
457                .expect("reading custom directory");
458            let actual_files: HashMap<_, _> = artifact
459                .files
460                .lock()
461                .iter()
462                .map(|(path, artifact)| (path.clone(), artifact.get_contents()))
463                .collect();
464            assert_eq!(expected_files, actual_files, "{}", name);
465        }
466    }
467
468    #[fuchsia::test]
469    async fn test_copy_debug_data() {
470        for (name, _fake_dir, expected_files) in test_cases() {
471            let artifact = InMemoryDirectoryWriter::default();
472            serve_and_copy_debug_data(&expected_files, artifact.clone()).await;
473            let actual_files: HashMap<_, _> = artifact
474                .files
475                .lock()
476                .iter()
477                .map(|(path, artifact)| (path.clone(), artifact.get_contents()))
478                .collect();
479            assert_eq!(expected_files, actual_files, "{}", name);
480        }
481    }
482}