test_manager_lib/
debug_data_server.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::run_events::{RunEvent, SuiteEvents};
6use anyhow::Error;
7use fidl::endpoints::create_request_stream;
8use futures::channel::mpsc;
9use futures::prelude::*;
10use futures::stream::FusedStream;
11use futures::{pin_mut, StreamExt, TryStreamExt};
12use itertools::Either;
13use log::warn;
14use test_diagnostics::zstd_compress::{Encoder, Error as CompressionError};
15use {fidl_fuchsia_io as fio, fidl_fuchsia_test_manager as ftest_manager, fuchsia_async as fasync};
16
17const DEBUG_DATA_TIMEOUT_SECONDS: i64 = 15;
18const EARLY_BOOT_DEBUG_DATA_PATH: &'static str = "/debugdata";
19
20pub(crate) async fn send_kernel_debug_data(
21    iterator: ftest_manager::DebugDataIteratorRequestStream,
22) -> Result<(), Error> {
23    log::info!("Serving kernel debug data");
24    let directory = fuchsia_fs::directory::open_in_namespace(
25        EARLY_BOOT_DEBUG_DATA_PATH,
26        fuchsia_fs::PERM_READABLE,
27    )?;
28
29    serve_iterator(EARLY_BOOT_DEBUG_DATA_PATH, directory, iterator).await
30}
31
32const ITERATOR_BATCH_SIZE: usize = 10;
33
34async fn filter_map_filename(
35    entry_result: Result<
36        fuchsia_fs::directory::DirEntry,
37        fuchsia_fs::directory::RecursiveEnumerateError,
38    >,
39    dir_path: &str,
40) -> Option<String> {
41    match entry_result {
42        Ok(fuchsia_fs::directory::DirEntry { name, kind }) => match kind {
43            fuchsia_fs::directory::DirentKind::File => Some(name),
44            _ => None,
45        },
46        Err(e) => {
47            warn!("Error reading directory in {}: {:?}", dir_path, e);
48            None
49        }
50    }
51}
52
53async fn serve_file_over_socket(
54    file_name: String,
55    file: fio::FileProxy,
56    socket: zx::Socket,
57    compress: bool,
58) {
59    let mut socket = fasync::Socket::from_socket(socket);
60
61    // We keep a buffer of 20 MB (2MB buffer * channel size 10) while reading the file
62    let num_bytes: u64 = 1024 * 1024 * 2;
63    let (mut sender, mut recv) = match compress {
64        true => {
65            let (encoder, recv) = Encoder::new(0);
66            (Either::Left(encoder), recv)
67        }
68        false => {
69            let (sender, recv) = mpsc::channel(10);
70            (Either::Right(sender), recv)
71        }
72    };
73    let filename = file_name.clone();
74    let _file_read_task = fasync::Task::spawn(async move {
75        loop {
76            let bytes = fuchsia_fs::file::read_num_bytes(&file, num_bytes).await.unwrap();
77            let len = bytes.len();
78            match sender {
79                Either::Left(ref mut e) => {
80                    if let Err(e) = e.compress(&bytes).await {
81                        match e {
82                            CompressionError::Send(_) => { /* means client is gone, ignore */ }
83                            e => {
84                                warn!("Error compressing file '{}':, {:?}", &filename, e);
85                            }
86                        }
87                        break;
88                    }
89                }
90                Either::Right(ref mut s) => {
91                    if let Err(_) = s.send(bytes).await {
92                        // no recv, don't read rest of the file.
93                        break;
94                    }
95                }
96            }
97
98            if len != usize::try_from(num_bytes).unwrap() {
99                if let Either::Left(e) = sender {
100                    // This is EOF as fuchsia.io.Readable does not return short reads
101                    if let Err(e) = e.finish().await {
102                        match e {
103                            CompressionError::Send(_) => { /* means client is gone, ignore */ }
104                            e => {
105                                warn!("Error compressing file '{}':, {:?}", &filename, e);
106                            }
107                        }
108                    }
109                }
110                break;
111            }
112        }
113    });
114
115    while let Some(bytes) = recv.next().await {
116        if let Err(e) = socket.write_all(bytes.as_slice()).await {
117            warn!("cannot serve file '{}': {:?}", &file_name, e);
118            return;
119        }
120    }
121}
122
123pub(crate) async fn serve_directory(
124    dir_path: &str,
125    mut event_sender: mpsc::Sender<RunEvent>,
126) -> Result<(), Error> {
127    let directory = fuchsia_fs::directory::open_in_namespace(dir_path, fuchsia_fs::PERM_READABLE)?;
128    {
129        let file_stream = fuchsia_fs::directory::readdir_recursive(
130            &directory,
131            Some(fasync::MonotonicDuration::from_seconds(DEBUG_DATA_TIMEOUT_SECONDS)),
132        )
133        .filter_map(|entry| filter_map_filename(entry, dir_path));
134        pin_mut!(file_stream);
135        if file_stream.next().await.is_none() {
136            // No files to serve.
137            return Ok(());
138        }
139
140        drop(file_stream);
141    }
142
143    let (client, iterator) = create_request_stream::<ftest_manager::DebugDataIteratorMarker>();
144    let _ = event_sender.send(RunEvent::debug_data(client).into()).await;
145    event_sender.disconnect(); // No need to hold this open while we serve the iterator.
146
147    serve_iterator(dir_path, directory, iterator).await
148}
149
150pub(crate) async fn serve_directory_for_suite(
151    dir_path: &str,
152    mut event_sender: mpsc::Sender<Result<SuiteEvents, ftest_manager::LaunchError>>,
153) -> Result<(), Error> {
154    let directory = fuchsia_fs::directory::open_in_namespace(dir_path, fuchsia_fs::PERM_READABLE)?;
155    {
156        let file_stream = fuchsia_fs::directory::readdir_recursive(
157            &directory,
158            Some(fasync::MonotonicDuration::from_seconds(DEBUG_DATA_TIMEOUT_SECONDS)),
159        )
160        .filter_map(|entry| filter_map_filename(entry, dir_path));
161        pin_mut!(file_stream);
162        if file_stream.next().await.is_none() {
163            // No files to serve.
164            return Ok(());
165        }
166
167        drop(file_stream);
168    }
169
170    let (client, iterator) = create_request_stream::<ftest_manager::DebugDataIteratorMarker>();
171    let _ = event_sender.send(Ok(SuiteEvents::debug_data(client).into())).await;
172    event_sender.disconnect(); // No need to hold this open while we serve the iterator.
173
174    serve_iterator(dir_path, directory, iterator).await
175}
176
177/// Serves the |DebugDataIterator| protocol by serving all the files contained under
178/// |dir_path|.
179///
180/// The contents under |dir_path| are assumed to not change while the iterator is served.
181pub(crate) async fn serve_iterator(
182    dir_path: &str,
183    directory: fio::DirectoryProxy,
184    mut iterator: ftest_manager::DebugDataIteratorRequestStream,
185) -> Result<(), Error> {
186    let file_stream = fuchsia_fs::directory::readdir_recursive(
187        &directory,
188        Some(fasync::MonotonicDuration::from_seconds(DEBUG_DATA_TIMEOUT_SECONDS)),
189    )
190    .filter_map(|entry| filter_map_filename(entry, dir_path));
191    pin_mut!(file_stream);
192    let mut file_stream = file_stream.fuse();
193
194    let mut file_tasks = vec![];
195    while let Some(request) = iterator.try_next().await? {
196        let mut compress = false;
197        let responder = match request {
198            ftest_manager::DebugDataIteratorRequest::GetNext { responder } => {
199                Either::Left(responder)
200            }
201            ftest_manager::DebugDataIteratorRequest::GetNextCompressed { responder } => {
202                compress = true;
203                Either::Right(responder)
204            }
205        };
206        let next_files = match file_stream.is_terminated() {
207            true => vec![],
208            false => file_stream.by_ref().take(ITERATOR_BATCH_SIZE).collect().await,
209        };
210        let debug_data = next_files
211            .into_iter()
212            .map(|file_name| {
213                let file = fuchsia_fs::directory::open_file_async(
214                    &directory,
215                    &file_name,
216                    fio::PERM_READABLE,
217                )?;
218                let (client, server) = zx::Socket::create_stream();
219                let t = fasync::Task::spawn(serve_file_over_socket(
220                    file_name.clone(),
221                    file,
222                    server,
223                    compress,
224                ));
225                file_tasks.push(t);
226                Ok(ftest_manager::DebugData {
227                    socket: Some(client.into()),
228                    name: file_name.into(),
229                    ..Default::default()
230                })
231            })
232            .collect::<Result<Vec<_>, Error>>()?;
233        let _ = match responder {
234            Either::Left(responder) => responder.send(debug_data),
235            Either::Right(responder) => responder.send(debug_data),
236        };
237    }
238
239    // make sure all tasks complete
240    future::join_all(file_tasks).await;
241    Ok(())
242}
243
244#[cfg(test)]
245mod test {
246    use super::*;
247    use crate::run_events::{RunEventPayload, SuiteEventPayload};
248    use ftest_manager::{DebugData, DebugDataIteratorProxy};
249    use fuchsia_async as fasync;
250    use std::collections::HashSet;
251    use tempfile::tempdir;
252    use test_case::test_case;
253    use test_manager_test_lib::collect_string_from_socket_helper;
254
255    async fn serve_iterator_from_tmp(
256        dir: &tempfile::TempDir,
257    ) -> (Option<ftest_manager::DebugDataIteratorProxy>, fasync::Task<Result<(), Error>>) {
258        let (send, mut recv) = mpsc::channel(0);
259        let dir_path = dir.path().to_str().unwrap().to_string();
260        let task = fasync::Task::local(async move { serve_directory(&dir_path, send).await });
261        let proxy = recv.next().await.map(|event| {
262            let RunEventPayload::DebugData(client) = event.into_payload();
263            client.into_proxy()
264        });
265        (proxy, task)
266    }
267
268    #[fuchsia::test]
269    async fn serve_iterator_empty_dir_returns_no_client() {
270        let dir = tempdir().unwrap();
271        let (client, task) = serve_iterator_from_tmp(&dir).await;
272        assert!(client.is_none());
273        task.await.expect("iterator server should not fail");
274    }
275
276    async fn get_next_debug_data(
277        proxy: &DebugDataIteratorProxy,
278        compressed: bool,
279    ) -> Vec<DebugData> {
280        match compressed {
281            true => proxy.get_next_compressed().await.expect("get next compressed"),
282            false => proxy.get_next().await.expect("get next"),
283        }
284    }
285
286    #[test_case(true; "compressed debug_data")]
287    #[test_case(false; "uncompressed debug_data")]
288    #[fuchsia::test]
289    async fn serve_iterator_single_response(compressed: bool) {
290        let dir = tempdir().unwrap();
291        fuchsia_fs::file::write_in_namespace(&dir.path().join("file").to_string_lossy(), "test")
292            .await
293            .expect("write to file");
294
295        let (client, task) = serve_iterator_from_tmp(&dir).await;
296
297        let proxy = client.expect("client to be returned");
298
299        let mut values = get_next_debug_data(&proxy, compressed).await;
300        assert_eq!(1usize, values.len());
301        let ftest_manager::DebugData { name, socket, .. } = values.pop().unwrap();
302        assert_eq!(Some("file".to_string()), name);
303        let contents = collect_string_from_socket_helper(socket.unwrap(), compressed)
304            .await
305            .expect("read socket");
306        assert_eq!("test", contents);
307
308        let values = get_next_debug_data(&proxy, compressed).await;
309        assert_eq!(values, vec![]);
310
311        // Calling again is okay and should also return empty vector.
312        let values = get_next_debug_data(&proxy, compressed).await;
313        assert_eq!(values, vec![]);
314
315        drop(proxy);
316        task.await.expect("iterator server should not fail");
317    }
318
319    #[test_case(true; "compressed debug_data")]
320    #[test_case(false; "uncompressed debug_data")]
321    #[fuchsia::test]
322    async fn serve_iterator_multiple_responses(compressed: bool) {
323        let num_files_served = ITERATOR_BATCH_SIZE * 2;
324
325        let dir = tempdir().unwrap();
326        for idx in 0..num_files_served {
327            fuchsia_fs::file::write_in_namespace(
328                &dir.path().join(format!("file-{:?}", idx)).to_string_lossy(),
329                &format!("test-{:?}", idx),
330            )
331            .await
332            .expect("write to file");
333        }
334
335        let (client, task) = serve_iterator_from_tmp(&dir).await;
336
337        let proxy = client.expect("client to be returned");
338
339        let mut all_files = vec![];
340        loop {
341            let mut next = get_next_debug_data(&proxy, compressed).await;
342            if next.is_empty() {
343                break;
344            }
345            all_files.append(&mut next);
346        }
347
348        let file_contents: HashSet<_> = futures::stream::iter(all_files)
349            .then(|ftest_manager::DebugData { name, socket, .. }| async move {
350                let contents = collect_string_from_socket_helper(socket.unwrap(), compressed)
351                    .await
352                    .expect("read socket");
353                (name.unwrap(), contents)
354            })
355            .collect()
356            .await;
357
358        let expected_files: HashSet<_> = (0..num_files_served)
359            .map(|idx| (format!("file-{:?}", idx), format!("test-{:?}", idx)))
360            .collect();
361
362        assert_eq!(file_contents, expected_files);
363        drop(proxy);
364        task.await.expect("iterator server should not fail");
365    }
366
367    async fn serve_iterator_for_suite_from_tmp(
368        dir: &tempfile::TempDir,
369    ) -> (Option<ftest_manager::DebugDataIteratorProxy>, fasync::Task<Result<(), Error>>) {
370        let (send, mut recv) = mpsc::channel(0);
371        let dir_path = dir.path().to_str().unwrap().to_string();
372        let task =
373            fasync::Task::local(async move { serve_directory_for_suite(&dir_path, send).await });
374        let proxy = recv.next().await.map(|event| {
375            if let SuiteEventPayload::DebugData(client) = event.unwrap().into_payload() {
376                Some(client.into_proxy())
377            } else {
378                None // Event is not a DebugData
379            }
380            .unwrap()
381        });
382        (proxy, task)
383    }
384
385    #[fuchsia::test]
386    async fn serve_iterator_for_suite_empty_dir_returns_no_client() {
387        let dir = tempdir().unwrap();
388        let (client, task) = serve_iterator_for_suite_from_tmp(&dir).await;
389        assert!(client.is_none());
390        task.await.expect("iterator server should not fail");
391    }
392
393    #[test_case(true; "compressed debug_data")]
394    #[test_case(false; "uncompressed debug_data")]
395    #[fuchsia::test]
396    async fn serve_iterator_for_suite_single_response(compressed: bool) {
397        let dir = tempdir().unwrap();
398        fuchsia_fs::file::write_in_namespace(&dir.path().join("file").to_string_lossy(), "test")
399            .await
400            .expect("write to file");
401
402        let (client, task) = serve_iterator_for_suite_from_tmp(&dir).await;
403
404        let proxy = client.expect("client to be returned");
405
406        let mut values = get_next_debug_data(&proxy, compressed).await;
407        assert_eq!(1usize, values.len());
408        let ftest_manager::DebugData { name, socket, .. } = values.pop().unwrap();
409        assert_eq!(Some("file".to_string()), name);
410        let contents = collect_string_from_socket_helper(socket.unwrap(), compressed)
411            .await
412            .expect("read socket");
413        assert_eq!("test", contents);
414
415        let values = get_next_debug_data(&proxy, compressed).await;
416        assert_eq!(values, vec![]);
417
418        // Calling again is okay and should also return empty vector.
419        let values = get_next_debug_data(&proxy, compressed).await;
420        assert_eq!(values, vec![]);
421
422        drop(proxy);
423        task.await.expect("iterator server should not fail");
424    }
425
426    #[test_case(true; "compressed debug_data")]
427    #[test_case(false; "uncompressed debug_data")]
428    #[fuchsia::test]
429    async fn serve_iterator_for_suite_multiple_responses(compressed: bool) {
430        let num_files_served = ITERATOR_BATCH_SIZE * 2;
431
432        let dir = tempdir().unwrap();
433        for idx in 0..num_files_served {
434            fuchsia_fs::file::write_in_namespace(
435                &dir.path().join(format!("file-{:?}", idx)).to_string_lossy(),
436                &format!("test-{:?}", idx),
437            )
438            .await
439            .expect("write to file");
440        }
441
442        let (client, task) = serve_iterator_from_tmp(&dir).await;
443
444        let proxy = client.expect("client to be returned");
445
446        let mut all_files = vec![];
447        loop {
448            let mut next = get_next_debug_data(&proxy, compressed).await;
449            if next.is_empty() {
450                break;
451            }
452            all_files.append(&mut next);
453        }
454
455        let file_contents: HashSet<_> = futures::stream::iter(all_files)
456            .then(|ftest_manager::DebugData { name, socket, .. }| async move {
457                let contents = collect_string_from_socket_helper(socket.unwrap(), compressed)
458                    .await
459                    .expect("read socket");
460                (name.unwrap(), contents)
461            })
462            .collect()
463            .await;
464
465        let expected_files: HashSet<_> = (0..num_files_served)
466            .map(|idx| (format!("file-{:?}", idx), format!("test-{:?}", idx)))
467            .collect();
468
469        assert_eq!(file_contents, expected_files);
470        drop(proxy);
471        task.await.expect("iterator server should not fail");
472    }
473}