1use crate::run_events::{RunEvent, SuiteEvents};
6use anyhow::Error;
7use fidl::endpoints::create_request_stream;
8use futures::channel::mpsc;
9use futures::prelude::*;
10use futures::stream::FusedStream;
11use futures::{pin_mut, StreamExt, TryStreamExt};
12use itertools::Either;
13use log::warn;
14use test_diagnostics::zstd_compress::{Encoder, Error as CompressionError};
15use {fidl_fuchsia_io as fio, fidl_fuchsia_test_manager as ftest_manager, fuchsia_async as fasync};
16
17const DEBUG_DATA_TIMEOUT_SECONDS: i64 = 15;
18const EARLY_BOOT_DEBUG_DATA_PATH: &'static str = "/debugdata";
19
20pub(crate) async fn send_kernel_debug_data(
21 iterator: ftest_manager::DebugDataIteratorRequestStream,
22) -> Result<(), Error> {
23 log::info!("Serving kernel debug data");
24 let directory = fuchsia_fs::directory::open_in_namespace(
25 EARLY_BOOT_DEBUG_DATA_PATH,
26 fuchsia_fs::PERM_READABLE,
27 )?;
28
29 serve_iterator(EARLY_BOOT_DEBUG_DATA_PATH, directory, iterator).await
30}
31
32const ITERATOR_BATCH_SIZE: usize = 10;
33
34async fn filter_map_filename(
35 entry_result: Result<
36 fuchsia_fs::directory::DirEntry,
37 fuchsia_fs::directory::RecursiveEnumerateError,
38 >,
39 dir_path: &str,
40) -> Option<String> {
41 match entry_result {
42 Ok(fuchsia_fs::directory::DirEntry { name, kind }) => match kind {
43 fuchsia_fs::directory::DirentKind::File => Some(name),
44 _ => None,
45 },
46 Err(e) => {
47 warn!("Error reading directory in {}: {:?}", dir_path, e);
48 None
49 }
50 }
51}
52
53async fn serve_file_over_socket(
54 file_name: String,
55 file: fio::FileProxy,
56 socket: zx::Socket,
57 compress: bool,
58) {
59 let mut socket = fasync::Socket::from_socket(socket);
60
61 let num_bytes: u64 = 1024 * 1024 * 2;
63 let (mut sender, mut recv) = match compress {
64 true => {
65 let (encoder, recv) = Encoder::new(0);
66 (Either::Left(encoder), recv)
67 }
68 false => {
69 let (sender, recv) = mpsc::channel(10);
70 (Either::Right(sender), recv)
71 }
72 };
73 let filename = file_name.clone();
74 let _file_read_task = fasync::Task::spawn(async move {
75 loop {
76 let bytes = fuchsia_fs::file::read_num_bytes(&file, num_bytes).await.unwrap();
77 let len = bytes.len();
78 match sender {
79 Either::Left(ref mut e) => {
80 if let Err(e) = e.compress(&bytes).await {
81 match e {
82 CompressionError::Send(_) => { }
83 e => {
84 warn!("Error compressing file '{}':, {:?}", &filename, e);
85 }
86 }
87 break;
88 }
89 }
90 Either::Right(ref mut s) => {
91 if let Err(_) = s.send(bytes).await {
92 break;
94 }
95 }
96 }
97
98 if len != usize::try_from(num_bytes).unwrap() {
99 if let Either::Left(e) = sender {
100 if let Err(e) = e.finish().await {
102 match e {
103 CompressionError::Send(_) => { }
104 e => {
105 warn!("Error compressing file '{}':, {:?}", &filename, e);
106 }
107 }
108 }
109 }
110 break;
111 }
112 }
113 });
114
115 while let Some(bytes) = recv.next().await {
116 if let Err(e) = socket.write_all(bytes.as_slice()).await {
117 warn!("cannot serve file '{}': {:?}", &file_name, e);
118 return;
119 }
120 }
121}
122
123pub(crate) async fn serve_directory(
124 dir_path: &str,
125 mut event_sender: mpsc::Sender<RunEvent>,
126) -> Result<(), Error> {
127 let directory = fuchsia_fs::directory::open_in_namespace(dir_path, fuchsia_fs::PERM_READABLE)?;
128 {
129 let file_stream = fuchsia_fs::directory::readdir_recursive(
130 &directory,
131 Some(fasync::MonotonicDuration::from_seconds(DEBUG_DATA_TIMEOUT_SECONDS)),
132 )
133 .filter_map(|entry| filter_map_filename(entry, dir_path));
134 pin_mut!(file_stream);
135 if file_stream.next().await.is_none() {
136 return Ok(());
138 }
139
140 drop(file_stream);
141 }
142
143 let (client, iterator) = create_request_stream::<ftest_manager::DebugDataIteratorMarker>();
144 let _ = event_sender.send(RunEvent::debug_data(client).into()).await;
145 event_sender.disconnect(); serve_iterator(dir_path, directory, iterator).await
148}
149
150pub(crate) async fn serve_directory_for_suite(
151 dir_path: &str,
152 mut event_sender: mpsc::Sender<Result<SuiteEvents, ftest_manager::LaunchError>>,
153) -> Result<(), Error> {
154 let directory = fuchsia_fs::directory::open_in_namespace(dir_path, fuchsia_fs::PERM_READABLE)?;
155 {
156 let file_stream = fuchsia_fs::directory::readdir_recursive(
157 &directory,
158 Some(fasync::MonotonicDuration::from_seconds(DEBUG_DATA_TIMEOUT_SECONDS)),
159 )
160 .filter_map(|entry| filter_map_filename(entry, dir_path));
161 pin_mut!(file_stream);
162 if file_stream.next().await.is_none() {
163 return Ok(());
165 }
166
167 drop(file_stream);
168 }
169
170 let (client, iterator) = create_request_stream::<ftest_manager::DebugDataIteratorMarker>();
171 let _ = event_sender.send(Ok(SuiteEvents::debug_data(client).into())).await;
172 event_sender.disconnect(); serve_iterator(dir_path, directory, iterator).await
175}
176
177pub(crate) async fn serve_iterator(
182 dir_path: &str,
183 directory: fio::DirectoryProxy,
184 mut iterator: ftest_manager::DebugDataIteratorRequestStream,
185) -> Result<(), Error> {
186 let file_stream = fuchsia_fs::directory::readdir_recursive(
187 &directory,
188 Some(fasync::MonotonicDuration::from_seconds(DEBUG_DATA_TIMEOUT_SECONDS)),
189 )
190 .filter_map(|entry| filter_map_filename(entry, dir_path));
191 pin_mut!(file_stream);
192 let mut file_stream = file_stream.fuse();
193
194 let mut file_tasks = vec![];
195 while let Some(request) = iterator.try_next().await? {
196 let mut compress = false;
197 let responder = match request {
198 ftest_manager::DebugDataIteratorRequest::GetNext { responder } => {
199 Either::Left(responder)
200 }
201 ftest_manager::DebugDataIteratorRequest::GetNextCompressed { responder } => {
202 compress = true;
203 Either::Right(responder)
204 }
205 };
206 let next_files = match file_stream.is_terminated() {
207 true => vec![],
208 false => file_stream.by_ref().take(ITERATOR_BATCH_SIZE).collect().await,
209 };
210 let debug_data = next_files
211 .into_iter()
212 .map(|file_name| {
213 let file = fuchsia_fs::directory::open_file_async(
214 &directory,
215 &file_name,
216 fio::PERM_READABLE,
217 )?;
218 let (client, server) = zx::Socket::create_stream();
219 let t = fasync::Task::spawn(serve_file_over_socket(
220 file_name.clone(),
221 file,
222 server,
223 compress,
224 ));
225 file_tasks.push(t);
226 Ok(ftest_manager::DebugData {
227 socket: Some(client.into()),
228 name: file_name.into(),
229 ..Default::default()
230 })
231 })
232 .collect::<Result<Vec<_>, Error>>()?;
233 let _ = match responder {
234 Either::Left(responder) => responder.send(debug_data),
235 Either::Right(responder) => responder.send(debug_data),
236 };
237 }
238
239 future::join_all(file_tasks).await;
241 Ok(())
242}
243
244#[cfg(test)]
245mod test {
246 use super::*;
247 use crate::run_events::{RunEventPayload, SuiteEventPayload};
248 use ftest_manager::{DebugData, DebugDataIteratorProxy};
249 use fuchsia_async as fasync;
250 use std::collections::HashSet;
251 use tempfile::tempdir;
252 use test_case::test_case;
253 use test_manager_test_lib::collect_string_from_socket_helper;
254
255 async fn serve_iterator_from_tmp(
256 dir: &tempfile::TempDir,
257 ) -> (Option<ftest_manager::DebugDataIteratorProxy>, fasync::Task<Result<(), Error>>) {
258 let (send, mut recv) = mpsc::channel(0);
259 let dir_path = dir.path().to_str().unwrap().to_string();
260 let task = fasync::Task::local(async move { serve_directory(&dir_path, send).await });
261 let proxy = recv.next().await.map(|event| {
262 let RunEventPayload::DebugData(client) = event.into_payload();
263 client.into_proxy()
264 });
265 (proxy, task)
266 }
267
268 #[fuchsia::test]
269 async fn serve_iterator_empty_dir_returns_no_client() {
270 let dir = tempdir().unwrap();
271 let (client, task) = serve_iterator_from_tmp(&dir).await;
272 assert!(client.is_none());
273 task.await.expect("iterator server should not fail");
274 }
275
276 async fn get_next_debug_data(
277 proxy: &DebugDataIteratorProxy,
278 compressed: bool,
279 ) -> Vec<DebugData> {
280 match compressed {
281 true => proxy.get_next_compressed().await.expect("get next compressed"),
282 false => proxy.get_next().await.expect("get next"),
283 }
284 }
285
286 #[test_case(true; "compressed debug_data")]
287 #[test_case(false; "uncompressed debug_data")]
288 #[fuchsia::test]
289 async fn serve_iterator_single_response(compressed: bool) {
290 let dir = tempdir().unwrap();
291 fuchsia_fs::file::write_in_namespace(&dir.path().join("file").to_string_lossy(), "test")
292 .await
293 .expect("write to file");
294
295 let (client, task) = serve_iterator_from_tmp(&dir).await;
296
297 let proxy = client.expect("client to be returned");
298
299 let mut values = get_next_debug_data(&proxy, compressed).await;
300 assert_eq!(1usize, values.len());
301 let ftest_manager::DebugData { name, socket, .. } = values.pop().unwrap();
302 assert_eq!(Some("file".to_string()), name);
303 let contents = collect_string_from_socket_helper(socket.unwrap(), compressed)
304 .await
305 .expect("read socket");
306 assert_eq!("test", contents);
307
308 let values = get_next_debug_data(&proxy, compressed).await;
309 assert_eq!(values, vec![]);
310
311 let values = get_next_debug_data(&proxy, compressed).await;
313 assert_eq!(values, vec![]);
314
315 drop(proxy);
316 task.await.expect("iterator server should not fail");
317 }
318
319 #[test_case(true; "compressed debug_data")]
320 #[test_case(false; "uncompressed debug_data")]
321 #[fuchsia::test]
322 async fn serve_iterator_multiple_responses(compressed: bool) {
323 let num_files_served = ITERATOR_BATCH_SIZE * 2;
324
325 let dir = tempdir().unwrap();
326 for idx in 0..num_files_served {
327 fuchsia_fs::file::write_in_namespace(
328 &dir.path().join(format!("file-{:?}", idx)).to_string_lossy(),
329 &format!("test-{:?}", idx),
330 )
331 .await
332 .expect("write to file");
333 }
334
335 let (client, task) = serve_iterator_from_tmp(&dir).await;
336
337 let proxy = client.expect("client to be returned");
338
339 let mut all_files = vec![];
340 loop {
341 let mut next = get_next_debug_data(&proxy, compressed).await;
342 if next.is_empty() {
343 break;
344 }
345 all_files.append(&mut next);
346 }
347
348 let file_contents: HashSet<_> = futures::stream::iter(all_files)
349 .then(|ftest_manager::DebugData { name, socket, .. }| async move {
350 let contents = collect_string_from_socket_helper(socket.unwrap(), compressed)
351 .await
352 .expect("read socket");
353 (name.unwrap(), contents)
354 })
355 .collect()
356 .await;
357
358 let expected_files: HashSet<_> = (0..num_files_served)
359 .map(|idx| (format!("file-{:?}", idx), format!("test-{:?}", idx)))
360 .collect();
361
362 assert_eq!(file_contents, expected_files);
363 drop(proxy);
364 task.await.expect("iterator server should not fail");
365 }
366
367 async fn serve_iterator_for_suite_from_tmp(
368 dir: &tempfile::TempDir,
369 ) -> (Option<ftest_manager::DebugDataIteratorProxy>, fasync::Task<Result<(), Error>>) {
370 let (send, mut recv) = mpsc::channel(0);
371 let dir_path = dir.path().to_str().unwrap().to_string();
372 let task =
373 fasync::Task::local(async move { serve_directory_for_suite(&dir_path, send).await });
374 let proxy = recv.next().await.map(|event| {
375 if let SuiteEventPayload::DebugData(client) = event.unwrap().into_payload() {
376 Some(client.into_proxy())
377 } else {
378 None }
380 .unwrap()
381 });
382 (proxy, task)
383 }
384
385 #[fuchsia::test]
386 async fn serve_iterator_for_suite_empty_dir_returns_no_client() {
387 let dir = tempdir().unwrap();
388 let (client, task) = serve_iterator_for_suite_from_tmp(&dir).await;
389 assert!(client.is_none());
390 task.await.expect("iterator server should not fail");
391 }
392
393 #[test_case(true; "compressed debug_data")]
394 #[test_case(false; "uncompressed debug_data")]
395 #[fuchsia::test]
396 async fn serve_iterator_for_suite_single_response(compressed: bool) {
397 let dir = tempdir().unwrap();
398 fuchsia_fs::file::write_in_namespace(&dir.path().join("file").to_string_lossy(), "test")
399 .await
400 .expect("write to file");
401
402 let (client, task) = serve_iterator_for_suite_from_tmp(&dir).await;
403
404 let proxy = client.expect("client to be returned");
405
406 let mut values = get_next_debug_data(&proxy, compressed).await;
407 assert_eq!(1usize, values.len());
408 let ftest_manager::DebugData { name, socket, .. } = values.pop().unwrap();
409 assert_eq!(Some("file".to_string()), name);
410 let contents = collect_string_from_socket_helper(socket.unwrap(), compressed)
411 .await
412 .expect("read socket");
413 assert_eq!("test", contents);
414
415 let values = get_next_debug_data(&proxy, compressed).await;
416 assert_eq!(values, vec![]);
417
418 let values = get_next_debug_data(&proxy, compressed).await;
420 assert_eq!(values, vec![]);
421
422 drop(proxy);
423 task.await.expect("iterator server should not fail");
424 }
425
426 #[test_case(true; "compressed debug_data")]
427 #[test_case(false; "uncompressed debug_data")]
428 #[fuchsia::test]
429 async fn serve_iterator_for_suite_multiple_responses(compressed: bool) {
430 let num_files_served = ITERATOR_BATCH_SIZE * 2;
431
432 let dir = tempdir().unwrap();
433 for idx in 0..num_files_served {
434 fuchsia_fs::file::write_in_namespace(
435 &dir.path().join(format!("file-{:?}", idx)).to_string_lossy(),
436 &format!("test-{:?}", idx),
437 )
438 .await
439 .expect("write to file");
440 }
441
442 let (client, task) = serve_iterator_from_tmp(&dir).await;
443
444 let proxy = client.expect("client to be returned");
445
446 let mut all_files = vec![];
447 loop {
448 let mut next = get_next_debug_data(&proxy, compressed).await;
449 if next.is_empty() {
450 break;
451 }
452 all_files.append(&mut next);
453 }
454
455 let file_contents: HashSet<_> = futures::stream::iter(all_files)
456 .then(|ftest_manager::DebugData { name, socket, .. }| async move {
457 let contents = collect_string_from_socket_helper(socket.unwrap(), compressed)
458 .await
459 .expect("read socket");
460 (name.unwrap(), contents)
461 })
462 .collect()
463 .await;
464
465 let expected_files: HashSet<_> = (0..num_files_served)
466 .map(|idx| (format!("file-{:?}", idx), format!("test-{:?}", idx)))
467 .collect();
468
469 assert_eq!(file_contents, expected_files);
470 drop(proxy);
471 task.await.expect("iterator server should not fail");
472 }
473}