1use crate::artifacts;
6use crate::cancel::NamedFutureExt;
7use crate::diagnostics::{self, LogCollectionOutcome};
8use crate::outcome::{RunTestSuiteError, UnexpectedEventError};
9use crate::output::{
10 ArtifactType, DirectoryArtifactType, DynDirectoryArtifact, DynReporter, EntityReporter,
11};
12use crate::stream_util::StreamUtil;
13use anyhow::{anyhow, Context as _};
14use fidl::Peered;
15use futures::future::{join_all, BoxFuture, FutureExt, TryFutureExt};
16use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
17use futures::AsyncReadExt;
18use log::{debug, warn};
19use std::borrow::Borrow;
20use std::collections::VecDeque;
21use std::io::Write;
22use std::path::PathBuf;
23use test_diagnostics::zstd_compress::Decoder;
24use {fidl_fuchsia_io as fio, fidl_fuchsia_test_manager as ftest_manager, fuchsia_async as fasync};
25
26pub(crate) async fn drain_artifact<'a, E, T>(
34 reporter: &'a EntityReporter<E, T>,
35 artifact: ftest_manager::Artifact,
36 log_opts: diagnostics::LogCollectionOptions,
37) -> Result<
38 BoxFuture<'static, Result<Option<LogCollectionOutcome>, anyhow::Error>>,
39 RunTestSuiteError,
40>
41where
42 T: Borrow<DynReporter>,
43{
44 match artifact {
45 ftest_manager::Artifact::Stdout(socket) => {
46 let stdout = reporter.new_artifact(&ArtifactType::Stdout)?;
47 Ok(copy_socket_artifact(socket, stdout).map_ok(|_| None).named("stdout").boxed())
48 }
49 ftest_manager::Artifact::Stderr(socket) => {
50 let stderr = reporter.new_artifact(&ArtifactType::Stderr)?;
51 Ok(copy_socket_artifact(socket, stderr).map_ok(|_| None).named("stderr").boxed())
52 }
53 ftest_manager::Artifact::Log(syslog) => {
54 let syslog_artifact = reporter.new_artifact(&ArtifactType::Syslog)?;
55 Ok(diagnostics::collect_logs(
56 test_diagnostics::LogStream::from_syslog(syslog)?,
57 syslog_artifact,
58 log_opts,
59 )
60 .map_ok(Some)
61 .named("syslog")
62 .boxed())
63 }
64 ftest_manager::Artifact::Custom(ftest_manager::CustomArtifact {
65 directory_and_token,
66 component_moniker,
67 ..
68 }) => {
69 let ftest_manager::DirectoryAndToken { directory, token, .. } = directory_and_token
70 .ok_or(UnexpectedEventError::MissingRequiredField {
71 containing_struct: "CustomArtifact",
72 field: "directory_and_token",
73 })?;
74 let directory_artifact = reporter
75 .new_directory_artifact(&DirectoryArtifactType::Custom, component_moniker)?;
76 Ok(async move {
77 let directory = directory.into_proxy();
78 let result =
79 artifacts::copy_custom_artifact_directory(directory, directory_artifact).await;
80 let _ = token.signal_peer(fidl::Signals::empty(), fidl::Signals::USER_0);
84 result
85 }
86 .map_ok(|()| None)
87 .named("custom_artifacts")
88 .boxed())
89 }
90 ftest_manager::Artifact::DebugData(iterator) => {
91 let output_directory = reporter
92 .new_directory_artifact(&DirectoryArtifactType::Debug, None )?;
93 Ok(artifacts::copy_debug_data(iterator.into_proxy(), output_directory)
94 .map(|()| Ok(None))
95 .named("debug_data")
96 .boxed())
97 }
98 ftest_manager::ArtifactUnknown!() => {
99 warn!("Encountered an unknown artifact");
100 Ok(futures::future::ready(Ok(None)).boxed())
101 }
102 }
103}
104
105async fn copy_socket_artifact<W: Write>(
107 socket: fidl::Socket,
108 mut artifact: W,
109) -> Result<usize, anyhow::Error> {
110 let mut async_socket = fidl::AsyncSocket::from_socket(socket);
111 let mut len = 0;
112 loop {
113 let done =
114 test_diagnostics::SocketReadFut::new(&mut async_socket, |maybe_buf| match maybe_buf {
115 Some(buf) => {
116 len += buf.len();
117 artifact.write_all(buf)?;
118 Ok(false)
119 }
120 None => Ok(true),
121 })
122 .await?;
123 if done {
124 artifact.flush()?;
125 return Ok(len);
126 }
127 }
128}
129
130async fn copy_socket_artifact_and_decompress<W: Write>(
133 socket: fidl::Socket,
134 mut artifact: W,
135) -> Result<(usize, usize), anyhow::Error> {
136 let mut async_socket = fidl::AsyncSocket::from_socket(socket);
137 let mut buf = vec![0u8; 1024 * 1024 * 2];
138
139 let (mut decoder, mut receiver) = Decoder::new();
140 let task: fasync::Task<Result<usize, anyhow::Error>> = fasync::Task::spawn(async move {
141 let mut len = 0;
142 loop {
143 let l = async_socket.read(&mut buf).await?;
144 match l {
145 0 => {
146 decoder.finish().await?;
147 break;
148 }
149 _ => {
150 len += l;
151 decoder.decompress(&buf[..l]).await?;
152 }
153 }
154 }
155 Ok(len)
156 });
157
158 let mut decompressed_len = 0;
159 while let Some(buf) = receiver.next().await {
160 decompressed_len += buf.len();
161 artifact.write_all(&buf)?;
162 }
163 artifact.flush()?;
164
165 let compressed_len = task.await?;
166 return Ok((decompressed_len, compressed_len));
167}
168
169pub async fn copy_debug_data(
171 iterator: ftest_manager::DebugDataIteratorProxy,
172 output_directory: Box<DynDirectoryArtifact>,
173) {
174 let start = std::time::Instant::now();
175 const PIPELINED_REQUESTS: usize = 4;
176 let unprocessed_data_stream =
177 futures::stream::repeat_with(move || iterator.get_next_compressed())
178 .buffered(PIPELINED_REQUESTS);
179 let terminated_event_stream =
180 unprocessed_data_stream.take_until_stop_after(|result| match &result {
181 Ok(events) => events.is_empty(),
182 _ => true,
183 });
184
185 let data_futs = terminated_event_stream
186 .map(|result| match result {
187 Ok(vals) => vals,
188 Err(e) => {
189 warn!("Request failure: {:?}", e);
190 vec![]
191 }
192 })
193 .map(futures::stream::iter)
194 .flatten()
195 .map(|debug_data| {
196 let output =
197 debug_data.name.as_ref().ok_or_else(|| anyhow!("Missing profile name")).and_then(
198 |name| {
199 output_directory.new_file(&PathBuf::from(name)).map_err(anyhow::Error::from)
200 },
201 );
202 fasync::Task::spawn(async move {
203 let _ = &debug_data;
204 let mut output = output?;
205 let socket =
206 debug_data.socket.ok_or_else(|| anyhow!("Missing profile socket handle"))?;
207 debug!("Reading run profile \"{:?}\"", debug_data.name);
208 let start = std::time::Instant::now();
209 let (decompressed_len, compressed_len) =
210 copy_socket_artifact_and_decompress(socket, &mut output).await.map_err(
211 |e| {
212 warn!("Error copying artifact '{:?}': {:?}", debug_data.name, e);
213 e
214 },
215 )?;
216
217 debug!(
218 "Copied file {:?}: {}({} - compressed) bytes in {:?}",
219 debug_data.name,
220 decompressed_len,
221 compressed_len,
222 start.elapsed()
223 );
224 Ok::<(), anyhow::Error>(())
225 })
226 })
227 .collect::<Vec<_>>()
228 .await;
229 join_all(data_futs).await;
230 debug!("All profiles downloaded in {:?}", start.elapsed());
231}
232
233async fn copy_custom_artifact_directory(
235 directory: fio::DirectoryProxy,
236 out_dir: Box<DynDirectoryArtifact>,
237) -> Result<(), anyhow::Error> {
238 let mut paths = vec![];
239 let mut enumerate = fuchsia_fs::directory::readdir_recursive(&directory, None);
240 while let Ok(Some(file)) = enumerate.try_next().await {
241 if file.kind == fuchsia_fs::directory::DirentKind::File {
242 paths.push(file.name);
243 }
244 }
245
246 let futs = FuturesUnordered::new();
247 paths.iter().for_each(|path| {
248 let file =
249 fuchsia_fs::directory::open_file_async(&directory, path, fuchsia_fs::PERM_READABLE);
250 let output_file = out_dir.new_file(std::path::Path::new(path));
251 futs.push(async move {
252 let file = file.with_context(|| format!("with path {:?}", path))?;
253 let mut output_file = output_file?;
254
255 copy_file_to_writer(&file, &mut output_file).await.map(|_| ())
256 });
257 });
258
259 futs.for_each(|result| {
260 if let Err(e) = result {
261 warn!("Custom artifact failure: {}", e);
262 }
263 async move {}
264 })
265 .await;
266
267 Ok(())
268}
269
270async fn copy_file_to_writer<T: Write>(
271 file: &fio::FileProxy,
272 output: &mut T,
273) -> Result<usize, anyhow::Error> {
274 const READ_SIZE: u64 = fio::MAX_BUF;
275
276 let mut vector = VecDeque::new();
277 const PIPELINED_READ_COUNT: u64 = 4;
279 for _n in 0..PIPELINED_READ_COUNT {
280 vector.push_back(file.read(READ_SIZE));
281 }
282 let mut len = 0;
283 loop {
284 let buf = vector.pop_front().unwrap().await?.map_err(zx_status::Status::from_raw)?;
285 if buf.is_empty() {
286 break;
287 }
288 len += buf.len();
289 output.write_all(&buf)?;
290 vector.push_back(file.read(READ_SIZE));
291 }
292 Ok(len)
293}
294
295#[cfg(test)]
296mod socket_tests {
297 use super::*;
298 use futures::AsyncWriteExt;
299
300 #[fuchsia::test]
301 async fn copy_socket() {
302 let cases = vec![vec![], b"0123456789abcde".to_vec(), vec![0u8; 4096]];
303
304 for case in cases.iter() {
305 let (client_socket, server_socket) = fidl::Socket::create_stream();
306 let mut output = vec![];
307 let write_fut = async move {
308 let mut async_socket = fidl::AsyncSocket::from_socket(server_socket);
309 async_socket.write_all(case.as_slice()).await.expect("write bytes");
310 };
311
312 let ((), res) =
313 futures::future::join(write_fut, copy_socket_artifact(client_socket, &mut output))
314 .await;
315 res.expect("copy contents");
316 assert_eq!(output.as_slice(), case.as_slice());
317 }
318 }
319}
320
321#[cfg(target_os = "fuchsia")]
323#[cfg(test)]
324mod file_tests {
325 use super::*;
326 use crate::output::InMemoryDirectoryWriter;
327 use futures::prelude::*;
328 use maplit::hashmap;
329 use std::collections::HashMap;
330 use std::sync::Arc;
331 use vfs::directory::helper::DirectlyMutable;
332 use vfs::directory::immutable::Simple;
333 use vfs::file::vmo::read_only;
334 use vfs::pseudo_directory;
335 use {fidl_fuchsia_io as fio, fuchsia_async as fasync};
336
337 async fn serve_content_over_socket(content: Vec<u8>, socket: zx::Socket) {
338 let mut socket = fidl::AsyncSocket::from_socket(socket);
339 socket.write_all(content.as_slice()).await.expect("Cannot serve content over socket");
340 }
341
342 async fn serve_and_copy_debug_data(
343 expected_files: &HashMap<PathBuf, Vec<u8>>,
344 directory_writer: InMemoryDirectoryWriter,
345 ) {
346 let mut served_files = vec![];
347 expected_files.iter().for_each(|(path, content)| {
348 let mut compressor = zstd::bulk::Compressor::new(0).unwrap();
349 let bytes = compressor.compress(&content).unwrap();
350 let (client, server) = zx::Socket::create_stream();
351 fasync::Task::spawn(serve_content_over_socket(bytes, server)).detach();
352 served_files.push(ftest_manager::DebugData {
353 name: Some(path.display().to_string()),
354 socket: Some(client.into()),
355 ..Default::default()
356 });
357 });
358
359 let (iterator_proxy, mut iterator_stream) =
360 fidl::endpoints::create_proxy_and_stream::<ftest_manager::DebugDataIteratorMarker>();
361 let serve_fut = async move {
362 let mut files_iter = served_files.into_iter();
363 while let Ok(Some(request)) = iterator_stream.try_next().await {
364 let responder = match request {
365 ftest_manager::DebugDataIteratorRequest::GetNext { .. } => {
366 panic!("Not Implemented");
367 }
368 ftest_manager::DebugDataIteratorRequest::GetNextCompressed { responder } => {
369 responder
370 }
371 };
372 let resp: Vec<_> = files_iter.by_ref().take(3).collect();
373 let _ = responder.send(resp);
374 }
375 };
376 futures::future::join(
377 serve_fut,
378 copy_debug_data(iterator_proxy, Box::new(directory_writer)),
379 )
380 .await;
381 }
382
383 fn test_cases() -> Vec<(&'static str, Arc<Simple>, HashMap<PathBuf, Vec<u8>>)> {
384 vec![
385 ("empty", pseudo_directory! {}, hashmap! {}),
386 (
387 "single file",
388 pseudo_directory! {
389 "test_file.txt" => read_only("Hello, World!"),
390 },
391 hashmap! {
392 "test_file.txt".to_string().into() => b"Hello, World!".to_vec()
393 },
394 ),
395 (
396 "subdir",
397 pseudo_directory! {
398 "sub" => pseudo_directory! {
399 "nested.txt" => read_only("Nested file!"),
400 }
401 },
402 hashmap! {
403 "sub/nested.txt".to_string().into() => b"Nested file!".to_vec()
404 },
405 ),
406 (
407 "empty file",
408 pseudo_directory! {
409 "empty.txt" => read_only(""),
410 },
411 hashmap! {
412 "empty.txt".to_string().into() => b"".to_vec()
413 },
414 ),
415 (
416 "big file",
417 pseudo_directory! {
418 "big.txt" => read_only(vec![b's'; (fio::MAX_BUF as usize)*2]),
419 },
420 hashmap! {
421 "big.txt".to_string().into() => vec![b's'; (fio::MAX_BUF as usize) *2 as usize]
422 },
423 ),
424 (
425 "100 files",
426 {
427 let dir = pseudo_directory! {};
428 for i in 0..100 {
429 dir.add_entry(
430 format!("{:?}.txt", i),
431 read_only(format!("contents for {:?}", i)),
432 )
433 .expect("add file");
434 }
435 dir
436 },
437 (0..100)
438 .map(|i| {
439 (
440 format!("{:?}.txt", i).into(),
441 format!("contents for {:?}", i).into_bytes(),
442 )
443 })
444 .collect(),
445 ),
446 ]
447 }
448
449 #[fuchsia::test]
450 async fn test_copy_dir() {
451 for (name, fake_dir, expected_files) in test_cases() {
452 let directory =
453 vfs::directory::serve(fake_dir, fio::PERM_READABLE | fio::PERM_WRITABLE);
454 let artifact = InMemoryDirectoryWriter::default();
455 copy_custom_artifact_directory(directory, Box::new(artifact.clone()))
456 .await
457 .expect("reading custom directory");
458 let actual_files: HashMap<_, _> = artifact
459 .files
460 .lock()
461 .iter()
462 .map(|(path, artifact)| (path.clone(), artifact.get_contents()))
463 .collect();
464 assert_eq!(expected_files, actual_files, "{}", name);
465 }
466 }
467
468 #[fuchsia::test]
469 async fn test_copy_debug_data() {
470 for (name, _fake_dir, expected_files) in test_cases() {
471 let artifact = InMemoryDirectoryWriter::default();
472 serve_and_copy_debug_data(&expected_files, artifact.clone()).await;
473 let actual_files: HashMap<_, _> = artifact
474 .files
475 .lock()
476 .iter()
477 .map(|(path, artifact)| (path.clone(), artifact.get_contents()))
478 .collect();
479 assert_eq!(expected_files, actual_files, "{}", name);
480 }
481 }
482}