fuchsia_async/
test_support.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#[cfg(target_os = "fuchsia")]
6use crate::TestExecutorBuilder;
7use crate::{LocalExecutorBuilder, SendExecutorBuilder, TimeoutExt};
8use futures::prelude::*;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11#[cfg(target_os = "fuchsia")]
12use std::task::Poll;
13use std::time::Duration;
14
15// Apply the timeout from config to test
16// Ideally this would be a function like Config::with_timeout, but we need to handle Send and !Send
17// and it's likely better not to have to duplicate this code.
18macro_rules! apply_timeout {
19    ($config:expr, $test:expr) => {{
20        let timeout = $config.timeout;
21        let test = $test;
22        move |run| {
23            let test = test(run);
24            async move {
25                if let Some(timeout) = timeout {
26                    test.on_timeout(timeout, || panic!("timeout on run {}", run)).await
27                } else {
28                    test.await
29                }
30            }
31        }
32    }};
33}
34
35/// Defines how to compose multiple test runs for a kind of test result.
36pub trait TestResult: Sized {
37    /// How to repeatedly run a test with this result in a single threaded executor.
38    fn run_singlethreaded(
39        test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
40        cfg: Config,
41    ) -> Self;
42
43    /// Similarly, but use run_until_stalled
44    #[cfg(target_os = "fuchsia")]
45    fn run_until_stalled<
46        F: 'static + Sync + Fn(usize) -> Fut,
47        Fut: 'static + Future<Output = Self>,
48    >(
49        fake_time: bool,
50        test: F,
51        cfg: Config,
52    ) -> Self;
53
54    /// Whether the result is successful.
55    fn is_ok(&self) -> bool;
56}
57
58/// Defines how to compose multiple test runs for a kind of test result in a multithreaded executor.
59pub trait MultithreadedTestResult: Sized {
60    /// How to repeatedly run a test with this result in a multi threaded executor.
61    fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
62        test: F,
63        threads: u8,
64        cfg: Config,
65    ) -> Self;
66
67    /// Whether the result is successful.
68    fn is_ok(&self) -> bool;
69}
70
71impl<E: Send + 'static + std::fmt::Debug> TestResult for Result<(), E> {
72    fn run_singlethreaded(
73        test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
74        cfg: Config,
75    ) -> Self {
76        cfg.run(1, |run| LocalExecutorBuilder::new().build().run_singlethreaded(test(run)))
77    }
78
79    #[cfg(target_os = "fuchsia")]
80    fn run_until_stalled<
81        F: 'static + Sync + Fn(usize) -> Fut,
82        Fut: 'static + Future<Output = Self>,
83    >(
84        fake_time: bool,
85        test: F,
86        cfg: Config,
87    ) -> Self {
88        let test = apply_timeout!(cfg, |run| test(run));
89        cfg.run(1, |run| {
90            let mut executor = TestExecutorBuilder::new().fake_time(fake_time).build();
91            match executor.run_until_stalled(&mut std::pin::pin!(test(run))) {
92                Poll::Ready(result) => result,
93                Poll::Pending => panic!(
94                    "Stalled without completing. Consider using \"run_singlethreaded\", or check \
95                     for a deadlock."
96                ),
97            }
98        })
99    }
100
101    fn is_ok(&self) -> bool {
102        Result::is_ok(self)
103    }
104}
105
106impl<E: 'static + Send> MultithreadedTestResult for Result<(), E> {
107    fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
108        test: F,
109        threads: u8,
110        cfg: Config,
111    ) -> Self {
112        let test = apply_timeout!(cfg, |run| test(run));
113        // Fuchsia's SendExecutor actually uses an extra thread, but it doesn't do anything, so we
114        // don't count it.
115        cfg.run(threads, |run| {
116            SendExecutorBuilder::new().num_threads(threads).build().run(test(run))
117        })
118    }
119
120    fn is_ok(&self) -> bool {
121        Result::is_ok(self)
122    }
123}
124
125impl TestResult for () {
126    fn run_singlethreaded(
127        test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
128        cfg: Config,
129    ) -> Self {
130        let _ = cfg.run(1, |run| {
131            LocalExecutorBuilder::new().build().run_singlethreaded(test(run));
132            Ok::<(), ()>(())
133        });
134    }
135
136    #[cfg(target_os = "fuchsia")]
137    fn run_until_stalled<
138        F: Sync + 'static + Fn(usize) -> Fut,
139        Fut: 'static + Future<Output = Self>,
140    >(
141        fake_time: bool,
142        test: F,
143        cfg: Config,
144    ) -> Self {
145        let _ = TestResult::run_until_stalled(
146            fake_time,
147            move |run| {
148                let test = test(run);
149                async move {
150                    test.await;
151                    Ok::<(), ()>(())
152                }
153            },
154            cfg,
155        );
156    }
157
158    fn is_ok(&self) -> bool {
159        true
160    }
161}
162
163impl MultithreadedTestResult for () {
164    fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
165        test: F,
166        threads: u8,
167        cfg: Config,
168    ) -> Self {
169        // Fuchsia's SendExecutor actually uses an extra thread, but it doesn't do anything, so we
170        // don't count it.
171        let _ = cfg.run(threads, |run| {
172            SendExecutorBuilder::new().num_threads(threads).build().run(test(run));
173            Ok::<(), ()>(())
174        });
175    }
176
177    fn is_ok(&self) -> bool {
178        true
179    }
180}
181
182/// Configuration variables for a single test run.
183#[derive(Clone)]
184pub struct Config {
185    repeat_count: usize,
186    max_concurrency: usize,
187    max_threads: u8,
188    timeout: Option<Duration>,
189}
190
191fn env_var<T: std::str::FromStr>(name: &str, default: T) -> T {
192    std::env::var(name).unwrap_or_default().parse().unwrap_or(default)
193}
194
195impl Config {
196    fn get() -> Self {
197        let repeat_count = std::cmp::max(1, env_var("FASYNC_TEST_REPEAT_COUNT", 1));
198        let max_concurrency = env_var("FASYNC_TEST_MAX_CONCURRENCY", 0);
199        let timeout_seconds = env_var("FASYNC_TEST_TIMEOUT_SECONDS", 0);
200        let max_threads = env_var("FASYNC_TEST_MAX_THREADS", 0);
201        let timeout =
202            if timeout_seconds == 0 { None } else { Some(Duration::from_secs(timeout_seconds)) };
203        Self { repeat_count, max_concurrency, max_threads, timeout }
204    }
205
206    fn in_parallel<E: Send>(
207        &self,
208        threads: u8,
209        f: impl Fn() -> Result<(), E> + Sync,
210    ) -> Result<(), E> {
211        std::thread::scope(|s| {
212            let mut join_handles = Vec::new();
213            for _ in 1..threads {
214                join_handles.push(s.spawn(&f));
215            }
216            f()?;
217            for h in join_handles {
218                if let Ok(result @ Err(_)) = h.join() {
219                    return result;
220                }
221            }
222            Ok(())
223        })
224    }
225
226    fn run<E: Send>(
227        &self,
228        test_threads: u8,
229        f: impl Fn(usize) -> Result<(), E> + Sync,
230    ) -> Result<(), E> {
231        // max_concurrency is the maximum number of runs of the same test to run in parallel, but
232        // each test can run multiple threads.  max_threads is the maximum number of threads.
233        let mut threads = std::cmp::min(std::cmp::max(self.repeat_count, 1), self.max_concurrency);
234        if self.max_threads != 0 {
235            threads =
236                std::cmp::min(threads, std::cmp::max(self.max_threads / test_threads, 1) as usize);
237        }
238        let threads = u8::try_from(threads).unwrap_or(u8::MAX);
239        let run = AtomicUsize::new(0);
240        self.in_parallel(threads, || {
241            loop {
242                let this_run = run.fetch_add(1, Ordering::Relaxed);
243                if this_run >= self.repeat_count {
244                    return Ok(());
245                }
246                let result = f(this_run);
247                if result.is_err() {
248                    // Prevent any more runs from starting.
249                    run.store(self.repeat_count, Ordering::Relaxed);
250                    return result;
251                }
252            }
253        })
254    }
255}
256
257/// Runs a test in an executor, potentially repeatedly and concurrently
258pub fn run_singlethreaded_test<F, Fut, R>(test: F) -> R
259where
260    F: 'static + Sync + Fn(usize) -> Fut,
261    Fut: 'static + Future<Output = R>,
262    R: TestResult,
263{
264    TestResult::run_singlethreaded(&|run| test(run).boxed_local(), Config::get())
265}
266
267/// Runs a test in an executor until it's stalled
268#[cfg(target_os = "fuchsia")]
269pub fn run_until_stalled_test<F, Fut, R>(fake_time: bool, test: F) -> R
270where
271    F: 'static + Sync + Fn(usize) -> Fut,
272    Fut: 'static + Future<Output = R>,
273    R: TestResult,
274{
275    TestResult::run_until_stalled(fake_time, test, Config::get())
276}
277
278/// Runs a test in an executor, potentially repeatedly and concurrently
279pub fn run_test<F, Fut, R>(test: F, threads: u8) -> R
280where
281    F: 'static + Sync + Fn(usize) -> Fut,
282    Fut: 'static + Send + Future<Output = R>,
283    R: MultithreadedTestResult,
284{
285    MultithreadedTestResult::run(test, threads, Config::get())
286}
287
288#[cfg(test)]
289mod tests {
290    use super::{Config, MultithreadedTestResult, TestResult};
291    use futures::lock::Mutex;
292    use futures::prelude::*;
293    use std::collections::HashSet;
294    use std::sync::Arc;
295    use std::time::Duration;
296
297    #[test]
298    fn run_singlethreaded() {
299        const REPEAT_COUNT: usize = 1000;
300        const MAX_THREADS: u8 = 10;
301        let pending_runs: Arc<Mutex<HashSet<_>>> =
302            Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
303        let pending_runs_child = pending_runs.clone();
304        TestResult::run_singlethreaded(
305            &move |i| {
306                let pending_runs_child = pending_runs_child.clone();
307                async move {
308                    assert!(pending_runs_child.lock().await.remove(&i));
309                }
310                .boxed_local()
311            },
312            Config {
313                repeat_count: REPEAT_COUNT,
314                max_concurrency: 0,
315                max_threads: MAX_THREADS,
316                timeout: None,
317            },
318        );
319        assert!(pending_runs.try_lock().unwrap().is_empty());
320    }
321
322    // TODO(https://fxbug.dev/42138715): should_panic tests trigger LSAN
323    #[ignore]
324    #[test]
325    #[should_panic]
326    fn run_singlethreaded_with_timeout() {
327        TestResult::run_singlethreaded(
328            &move |_| {
329                async move {
330                    futures::future::pending::<()>().await;
331                }
332                .boxed_local()
333            },
334            Config {
335                repeat_count: 1,
336                max_concurrency: 0,
337                max_threads: 0,
338                timeout: Some(Duration::from_millis(1)),
339            },
340        );
341    }
342
343    #[test]
344    #[cfg(target_os = "fuchsia")]
345    fn run_until_stalled() {
346        const REPEAT_COUNT: usize = 1000;
347        let pending_runs: Arc<Mutex<HashSet<_>>> =
348            Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
349        let pending_runs_child = pending_runs.clone();
350        TestResult::run_until_stalled(
351            false,
352            move |i| {
353                let pending_runs_child = pending_runs_child.clone();
354                async move {
355                    assert!(pending_runs_child.lock().await.remove(&i));
356                }
357            },
358            Config {
359                repeat_count: REPEAT_COUNT,
360                max_concurrency: 1,
361                max_threads: 1,
362                timeout: None,
363            },
364        );
365        assert!(pending_runs.try_lock().unwrap().is_empty());
366    }
367
368    #[test]
369    fn run() {
370        const REPEAT_COUNT: usize = 1000;
371        const THREADS: u8 = 4;
372        let pending_runs: Arc<Mutex<HashSet<_>>> =
373            Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
374        let pending_runs_child = pending_runs.clone();
375        MultithreadedTestResult::run(
376            move |i| {
377                let pending_runs_child = pending_runs_child.clone();
378                async move {
379                    assert!(pending_runs_child.lock().await.remove(&i));
380                }
381            },
382            THREADS,
383            Config {
384                repeat_count: REPEAT_COUNT,
385                max_concurrency: 0,
386                max_threads: THREADS,
387                timeout: None,
388            },
389        );
390        assert!(pending_runs.try_lock().unwrap().is_empty());
391    }
392
393    // TODO(https://fxbug.dev/42138715): should_panic tests trigger LSAN
394    #[ignore]
395    #[test]
396    #[should_panic]
397    fn run_with_timeout() {
398        const THREADS: u8 = 4;
399        MultithreadedTestResult::run(
400            move |_| async move {
401                futures::future::pending::<()>().await;
402            },
403            THREADS,
404            Config {
405                repeat_count: 1,
406                max_concurrency: 0,
407                max_threads: 0,
408                timeout: Some(Duration::from_millis(1)),
409            },
410        );
411    }
412}