1#[cfg(target_os = "fuchsia")]
6use crate::TestExecutorBuilder;
7use crate::{LocalExecutorBuilder, SendExecutorBuilder, TimeoutExt};
8use futures::prelude::*;
9use std::pin::Pin;
10use std::sync::atomic::{AtomicUsize, Ordering};
11#[cfg(target_os = "fuchsia")]
12use std::task::Poll;
13use std::time::Duration;
14
15macro_rules! apply_timeout {
19 ($config:expr, $test:expr) => {{
20 let timeout = $config.timeout;
21 let test = $test;
22 move |run| {
23 let test = test(run);
24 async move {
25 if let Some(timeout) = timeout {
26 test.on_timeout(timeout, || panic!("timeout on run {}", run)).await
27 } else {
28 test.await
29 }
30 }
31 }
32 }};
33}
34
35pub trait TestResult: Sized {
37 fn run_singlethreaded(
39 test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
40 cfg: Config,
41 ) -> Self;
42
43 #[cfg(target_os = "fuchsia")]
45 fn run_until_stalled<
46 F: 'static + Sync + Fn(usize) -> Fut,
47 Fut: 'static + Future<Output = Self>,
48 >(
49 fake_time: bool,
50 test: F,
51 cfg: Config,
52 ) -> Self;
53
54 fn is_ok(&self) -> bool;
56}
57
58pub trait MultithreadedTestResult: Sized {
60 fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
62 test: F,
63 threads: u8,
64 cfg: Config,
65 ) -> Self;
66
67 fn is_ok(&self) -> bool;
69}
70
71impl<E: Send + 'static + std::fmt::Debug> TestResult for Result<(), E> {
72 fn run_singlethreaded(
73 test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
74 cfg: Config,
75 ) -> Self {
76 cfg.run(1, |run| LocalExecutorBuilder::new().build().run_singlethreaded(test(run)))
77 }
78
79 #[cfg(target_os = "fuchsia")]
80 fn run_until_stalled<
81 F: 'static + Sync + Fn(usize) -> Fut,
82 Fut: 'static + Future<Output = Self>,
83 >(
84 fake_time: bool,
85 test: F,
86 cfg: Config,
87 ) -> Self {
88 let test = apply_timeout!(cfg, |run| test(run));
89 cfg.run(1, |run| {
90 let mut executor = TestExecutorBuilder::new().fake_time(fake_time).build();
91 match executor.run_until_stalled(&mut std::pin::pin!(test(run))) {
92 Poll::Ready(result) => result,
93 Poll::Pending => panic!(
94 "Stalled without completing. Consider using \"run_singlethreaded\", or check \
95 for a deadlock."
96 ),
97 }
98 })
99 }
100
101 fn is_ok(&self) -> bool {
102 Result::is_ok(self)
103 }
104}
105
106impl<E: 'static + Send> MultithreadedTestResult for Result<(), E> {
107 fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
108 test: F,
109 threads: u8,
110 cfg: Config,
111 ) -> Self {
112 let test = apply_timeout!(cfg, |run| test(run));
113 cfg.run(threads, |run| {
116 SendExecutorBuilder::new().num_threads(threads).build().run(test(run))
117 })
118 }
119
120 fn is_ok(&self) -> bool {
121 Result::is_ok(self)
122 }
123}
124
125impl TestResult for () {
126 fn run_singlethreaded(
127 test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
128 cfg: Config,
129 ) -> Self {
130 let _ = cfg.run(1, |run| {
131 LocalExecutorBuilder::new().build().run_singlethreaded(test(run));
132 Ok::<(), ()>(())
133 });
134 }
135
136 #[cfg(target_os = "fuchsia")]
137 fn run_until_stalled<
138 F: Sync + 'static + Fn(usize) -> Fut,
139 Fut: 'static + Future<Output = Self>,
140 >(
141 fake_time: bool,
142 test: F,
143 cfg: Config,
144 ) -> Self {
145 let _ = TestResult::run_until_stalled(
146 fake_time,
147 move |run| {
148 let test = test(run);
149 async move {
150 test.await;
151 Ok::<(), ()>(())
152 }
153 },
154 cfg,
155 );
156 }
157
158 fn is_ok(&self) -> bool {
159 true
160 }
161}
162
163impl MultithreadedTestResult for () {
164 fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
165 test: F,
166 threads: u8,
167 cfg: Config,
168 ) -> Self {
169 let _ = cfg.run(threads, |run| {
172 SendExecutorBuilder::new().num_threads(threads).build().run(test(run));
173 Ok::<(), ()>(())
174 });
175 }
176
177 fn is_ok(&self) -> bool {
178 true
179 }
180}
181
182#[derive(Clone)]
184pub struct Config {
185 repeat_count: usize,
186 max_concurrency: usize,
187 max_threads: u8,
188 timeout: Option<Duration>,
189}
190
191fn env_var<T: std::str::FromStr>(name: &str, default: T) -> T {
192 std::env::var(name).unwrap_or_default().parse().unwrap_or(default)
193}
194
195impl Config {
196 fn get() -> Self {
197 let repeat_count = std::cmp::max(1, env_var("FASYNC_TEST_REPEAT_COUNT", 1));
198 let max_concurrency = env_var("FASYNC_TEST_MAX_CONCURRENCY", 0);
199 let timeout_seconds = env_var("FASYNC_TEST_TIMEOUT_SECONDS", 0);
200 let max_threads = env_var("FASYNC_TEST_MAX_THREADS", 0);
201 let timeout =
202 if timeout_seconds == 0 { None } else { Some(Duration::from_secs(timeout_seconds)) };
203 Self { repeat_count, max_concurrency, max_threads, timeout }
204 }
205
206 fn in_parallel<E: Send>(
207 &self,
208 threads: u8,
209 f: impl Fn() -> Result<(), E> + Sync,
210 ) -> Result<(), E> {
211 std::thread::scope(|s| {
212 let mut join_handles = Vec::new();
213 for _ in 1..threads {
214 join_handles.push(s.spawn(&f));
215 }
216 f()?;
217 for h in join_handles {
218 if let Ok(result @ Err(_)) = h.join() {
219 return result;
220 }
221 }
222 Ok(())
223 })
224 }
225
226 fn run<E: Send>(
227 &self,
228 test_threads: u8,
229 f: impl Fn(usize) -> Result<(), E> + Sync,
230 ) -> Result<(), E> {
231 let mut threads = std::cmp::min(std::cmp::max(self.repeat_count, 1), self.max_concurrency);
234 if self.max_threads != 0 {
235 threads =
236 std::cmp::min(threads, std::cmp::max(self.max_threads / test_threads, 1) as usize);
237 }
238 let threads = u8::try_from(threads).unwrap_or(u8::MAX);
239 let run = AtomicUsize::new(0);
240 self.in_parallel(threads, || {
241 loop {
242 let this_run = run.fetch_add(1, Ordering::Relaxed);
243 if this_run >= self.repeat_count {
244 return Ok(());
245 }
246 let result = f(this_run);
247 if result.is_err() {
248 run.store(self.repeat_count, Ordering::Relaxed);
250 return result;
251 }
252 }
253 })
254 }
255}
256
257pub fn run_singlethreaded_test<F, Fut, R>(test: F) -> R
259where
260 F: 'static + Sync + Fn(usize) -> Fut,
261 Fut: 'static + Future<Output = R>,
262 R: TestResult,
263{
264 TestResult::run_singlethreaded(&|run| test(run).boxed_local(), Config::get())
265}
266
267#[cfg(target_os = "fuchsia")]
269pub fn run_until_stalled_test<F, Fut, R>(fake_time: bool, test: F) -> R
270where
271 F: 'static + Sync + Fn(usize) -> Fut,
272 Fut: 'static + Future<Output = R>,
273 R: TestResult,
274{
275 TestResult::run_until_stalled(fake_time, test, Config::get())
276}
277
278pub fn run_test<F, Fut, R>(test: F, threads: u8) -> R
280where
281 F: 'static + Sync + Fn(usize) -> Fut,
282 Fut: 'static + Send + Future<Output = R>,
283 R: MultithreadedTestResult,
284{
285 MultithreadedTestResult::run(test, threads, Config::get())
286}
287
288#[cfg(test)]
289mod tests {
290 use super::{Config, MultithreadedTestResult, TestResult};
291 use futures::lock::Mutex;
292 use futures::prelude::*;
293 use std::collections::HashSet;
294 use std::sync::Arc;
295 use std::time::Duration;
296
297 #[test]
298 fn run_singlethreaded() {
299 const REPEAT_COUNT: usize = 1000;
300 const MAX_THREADS: u8 = 10;
301 let pending_runs: Arc<Mutex<HashSet<_>>> =
302 Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
303 let pending_runs_child = pending_runs.clone();
304 TestResult::run_singlethreaded(
305 &move |i| {
306 let pending_runs_child = pending_runs_child.clone();
307 async move {
308 assert!(pending_runs_child.lock().await.remove(&i));
309 }
310 .boxed_local()
311 },
312 Config {
313 repeat_count: REPEAT_COUNT,
314 max_concurrency: 0,
315 max_threads: MAX_THREADS,
316 timeout: None,
317 },
318 );
319 assert!(pending_runs.try_lock().unwrap().is_empty());
320 }
321
322 #[ignore]
324 #[test]
325 #[should_panic]
326 fn run_singlethreaded_with_timeout() {
327 TestResult::run_singlethreaded(
328 &move |_| {
329 async move {
330 futures::future::pending::<()>().await;
331 }
332 .boxed_local()
333 },
334 Config {
335 repeat_count: 1,
336 max_concurrency: 0,
337 max_threads: 0,
338 timeout: Some(Duration::from_millis(1)),
339 },
340 );
341 }
342
343 #[test]
344 #[cfg(target_os = "fuchsia")]
345 fn run_until_stalled() {
346 const REPEAT_COUNT: usize = 1000;
347 let pending_runs: Arc<Mutex<HashSet<_>>> =
348 Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
349 let pending_runs_child = pending_runs.clone();
350 TestResult::run_until_stalled(
351 false,
352 move |i| {
353 let pending_runs_child = pending_runs_child.clone();
354 async move {
355 assert!(pending_runs_child.lock().await.remove(&i));
356 }
357 },
358 Config {
359 repeat_count: REPEAT_COUNT,
360 max_concurrency: 1,
361 max_threads: 1,
362 timeout: None,
363 },
364 );
365 assert!(pending_runs.try_lock().unwrap().is_empty());
366 }
367
368 #[test]
369 fn run() {
370 const REPEAT_COUNT: usize = 1000;
371 const THREADS: u8 = 4;
372 let pending_runs: Arc<Mutex<HashSet<_>>> =
373 Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
374 let pending_runs_child = pending_runs.clone();
375 MultithreadedTestResult::run(
376 move |i| {
377 let pending_runs_child = pending_runs_child.clone();
378 async move {
379 assert!(pending_runs_child.lock().await.remove(&i));
380 }
381 },
382 THREADS,
383 Config {
384 repeat_count: REPEAT_COUNT,
385 max_concurrency: 0,
386 max_threads: THREADS,
387 timeout: None,
388 },
389 );
390 assert!(pending_runs.try_lock().unwrap().is_empty());
391 }
392
393 #[ignore]
395 #[test]
396 #[should_panic]
397 fn run_with_timeout() {
398 const THREADS: u8 = 4;
399 MultithreadedTestResult::run(
400 move |_| async move {
401 futures::future::pending::<()>().await;
402 },
403 THREADS,
404 Config {
405 repeat_count: 1,
406 max_concurrency: 0,
407 max_threads: 0,
408 timeout: Some(Duration::from_millis(1)),
409 },
410 );
411 }
412}