1use crate::TimeoutExt;
6use futures::prelude::*;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9#[cfg(target_os = "fuchsia")]
10use std::task::Poll;
11use std::time::Duration;
12
13macro_rules! apply_timeout {
17 ($config:expr, $test:expr) => {{
18 let timeout = $config.timeout;
19 let test = $test;
20 move |run| {
21 let test = test(run);
22 async move {
23 if let Some(timeout) = timeout {
24 test.on_timeout(timeout, || panic!("timeout on run {}", run)).await
25 } else {
26 test.await
27 }
28 }
29 }
30 }};
31}
32
33pub trait TestResult: Sized {
35 fn run_singlethreaded(
37 test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
38 cfg: Config,
39 ) -> Self;
40
41 #[cfg(target_os = "fuchsia")]
43 fn run_until_stalled<
44 F: 'static + Sync + Fn(usize) -> Fut,
45 Fut: 'static + Future<Output = Self>,
46 >(
47 fake_time: bool,
48 test: F,
49 cfg: Config,
50 ) -> Self;
51
52 fn is_ok(&self) -> bool;
54}
55
56pub trait MultithreadedTestResult: Sized {
58 fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
60 test: F,
61 threads: u8,
62 cfg: Config,
63 ) -> Self;
64
65 fn is_ok(&self) -> bool;
67}
68
69impl<E: Send + 'static + std::fmt::Debug> TestResult for Result<(), E> {
70 fn run_singlethreaded(
71 test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
72 cfg: Config,
73 ) -> Self {
74 cfg.run(1, |run| crate::LocalExecutor::new().run_singlethreaded(test(run)))
75 }
76
77 #[cfg(target_os = "fuchsia")]
78 fn run_until_stalled<
79 F: 'static + Sync + Fn(usize) -> Fut,
80 Fut: 'static + Future<Output = Self>,
81 >(
82 fake_time: bool,
83 test: F,
84 cfg: Config,
85 ) -> Self {
86 let test = apply_timeout!(cfg, |run| test(run));
87 cfg.run(1, |run| {
88 let mut executor = if fake_time {
89 crate::TestExecutor::new_with_fake_time()
90 } else {
91 crate::TestExecutor::new()
92 };
93 match executor.run_until_stalled(&mut std::pin::pin!(test(run))) {
94 Poll::Ready(result) => result,
95 Poll::Pending => panic!(
96 "Stalled without completing. Consider using \"run_singlethreaded\", or check \
97 for a deadlock."
98 ),
99 }
100 })
101 }
102
103 fn is_ok(&self) -> bool {
104 Result::is_ok(self)
105 }
106}
107
108impl<E: 'static + Send> MultithreadedTestResult for Result<(), E> {
109 fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
110 test: F,
111 threads: u8,
112 cfg: Config,
113 ) -> Self {
114 let test = apply_timeout!(cfg, |run| test(run));
115 cfg.run(threads, |run| crate::SendExecutor::new(threads).run(test(run)))
118 }
119
120 fn is_ok(&self) -> bool {
121 Result::is_ok(self)
122 }
123}
124
125impl TestResult for () {
126 fn run_singlethreaded(
127 test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
128 cfg: Config,
129 ) -> Self {
130 let _ = cfg.run(1, |run| {
131 crate::LocalExecutor::new().run_singlethreaded(test(run));
132 Ok::<(), ()>(())
133 });
134 }
135
136 #[cfg(target_os = "fuchsia")]
137 fn run_until_stalled<
138 F: Sync + 'static + Fn(usize) -> Fut,
139 Fut: 'static + Future<Output = Self>,
140 >(
141 fake_time: bool,
142 test: F,
143 cfg: Config,
144 ) -> Self {
145 let _ = TestResult::run_until_stalled(
146 fake_time,
147 move |run| {
148 let test = test(run);
149 async move {
150 test.await;
151 Ok::<(), ()>(())
152 }
153 },
154 cfg,
155 );
156 }
157
158 fn is_ok(&self) -> bool {
159 true
160 }
161}
162
163impl MultithreadedTestResult for () {
164 fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
165 test: F,
166 threads: u8,
167 cfg: Config,
168 ) -> Self {
169 let _ = cfg.run(threads, |run| {
172 crate::SendExecutor::new(threads).run(test(run));
173 Ok::<(), ()>(())
174 });
175 }
176
177 fn is_ok(&self) -> bool {
178 true
179 }
180}
181
182#[derive(Clone)]
184pub struct Config {
185 repeat_count: usize,
186 max_concurrency: usize,
187 max_threads: u8,
188 timeout: Option<Duration>,
189}
190
191fn env_var<T: std::str::FromStr>(name: &str, default: T) -> T {
192 std::env::var(name).unwrap_or_default().parse().unwrap_or(default)
193}
194
195impl Config {
196 fn get() -> Self {
197 let repeat_count = std::cmp::max(1, env_var("FASYNC_TEST_REPEAT_COUNT", 1));
198 let max_concurrency = env_var("FASYNC_TEST_MAX_CONCURRENCY", 0);
199 let timeout_seconds = env_var("FASYNC_TEST_TIMEOUT_SECONDS", 0);
200 let max_threads = env_var("FASYNC_TEST_MAX_THREADS", 0);
201 let timeout =
202 if timeout_seconds == 0 { None } else { Some(Duration::from_secs(timeout_seconds)) };
203 Self { repeat_count, max_concurrency, max_threads, timeout }
204 }
205
206 fn in_parallel<E: Send>(
207 &self,
208 threads: u8,
209 f: impl Fn() -> Result<(), E> + Sync,
210 ) -> Result<(), E> {
211 std::thread::scope(|s| {
212 let mut join_handles = Vec::new();
213 for _ in 1..threads {
214 join_handles.push(s.spawn(&f));
215 }
216 f()?;
217 for h in join_handles {
218 if let Ok(result @ Err(_)) = h.join() {
219 return result;
220 }
221 }
222 Ok(())
223 })
224 }
225
226 fn run<E: Send>(
227 &self,
228 test_threads: u8,
229 f: impl Fn(usize) -> Result<(), E> + Sync,
230 ) -> Result<(), E> {
231 let mut threads = std::cmp::min(std::cmp::max(self.repeat_count, 1), self.max_concurrency);
234 if self.max_threads != 0 {
235 threads =
236 std::cmp::min(threads, std::cmp::max(self.max_threads / test_threads, 1) as usize);
237 }
238 let threads = u8::try_from(threads).unwrap_or(u8::MAX);
239 let run = AtomicUsize::new(0);
240 self.in_parallel(threads, || {
241 loop {
242 let this_run = run.fetch_add(1, Ordering::Relaxed);
243 if this_run >= self.repeat_count {
244 return Ok(());
245 }
246 let result = f(this_run);
247 if result.is_err() {
248 run.store(self.repeat_count, Ordering::Relaxed);
250 return result;
251 }
252 }
253 })
254 }
255}
256
257pub fn run_singlethreaded_test<F, Fut, R>(test: F) -> R
259where
260 F: 'static + Sync + Fn(usize) -> Fut,
261 Fut: 'static + Future<Output = R>,
262 R: TestResult,
263{
264 TestResult::run_singlethreaded(&|run| test(run).boxed_local(), Config::get())
265}
266
267#[cfg(target_os = "fuchsia")]
269pub fn run_until_stalled_test<F, Fut, R>(fake_time: bool, test: F) -> R
270where
271 F: 'static + Sync + Fn(usize) -> Fut,
272 Fut: 'static + Future<Output = R>,
273 R: TestResult,
274{
275 TestResult::run_until_stalled(fake_time, test, Config::get())
276}
277
278pub fn run_test<F, Fut, R>(test: F, threads: u8) -> R
280where
281 F: 'static + Sync + Fn(usize) -> Fut,
282 Fut: 'static + Send + Future<Output = R>,
283 R: MultithreadedTestResult,
284{
285 MultithreadedTestResult::run(test, threads, Config::get())
286}
287
288#[cfg(test)]
289mod tests {
290 use super::{Config, MultithreadedTestResult, TestResult};
291 use futures::lock::Mutex;
292 use futures::prelude::*;
293 use std::collections::HashSet;
294 use std::sync::Arc;
295 use std::time::Duration;
296
297 #[test]
298 fn run_singlethreaded() {
299 const REPEAT_COUNT: usize = 1000;
300 const MAX_THREADS: u8 = 10;
301 let pending_runs: Arc<Mutex<HashSet<_>>> =
302 Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
303 let pending_runs_child = pending_runs.clone();
304 TestResult::run_singlethreaded(
305 &move |i| {
306 let pending_runs_child = pending_runs_child.clone();
307 async move {
308 assert!(pending_runs_child.lock().await.remove(&i));
309 }
310 .boxed_local()
311 },
312 Config {
313 repeat_count: REPEAT_COUNT,
314 max_concurrency: 0,
315 max_threads: MAX_THREADS,
316 timeout: None,
317 },
318 );
319 assert!(pending_runs.try_lock().unwrap().is_empty());
320 }
321
322 #[ignore]
324 #[test]
325 #[should_panic]
326 fn run_singlethreaded_with_timeout() {
327 TestResult::run_singlethreaded(
328 &move |_| {
329 async move {
330 futures::future::pending::<()>().await;
331 }
332 .boxed_local()
333 },
334 Config {
335 repeat_count: 1,
336 max_concurrency: 0,
337 max_threads: 0,
338 timeout: Some(Duration::from_millis(1)),
339 },
340 );
341 }
342
343 #[test]
344 #[cfg(target_os = "fuchsia")]
345 fn run_until_stalled() {
346 const REPEAT_COUNT: usize = 1000;
347 let pending_runs: Arc<Mutex<HashSet<_>>> =
348 Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
349 let pending_runs_child = pending_runs.clone();
350 TestResult::run_until_stalled(
351 false,
352 move |i| {
353 let pending_runs_child = pending_runs_child.clone();
354 async move {
355 assert!(pending_runs_child.lock().await.remove(&i));
356 }
357 },
358 Config {
359 repeat_count: REPEAT_COUNT,
360 max_concurrency: 1,
361 max_threads: 1,
362 timeout: None,
363 },
364 );
365 assert!(pending_runs.try_lock().unwrap().is_empty());
366 }
367
368 #[test]
369 fn run() {
370 const REPEAT_COUNT: usize = 1000;
371 const THREADS: u8 = 4;
372 let pending_runs: Arc<Mutex<HashSet<_>>> =
373 Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
374 let pending_runs_child = pending_runs.clone();
375 MultithreadedTestResult::run(
376 move |i| {
377 let pending_runs_child = pending_runs_child.clone();
378 async move {
379 assert!(pending_runs_child.lock().await.remove(&i));
380 }
381 },
382 THREADS,
383 Config {
384 repeat_count: REPEAT_COUNT,
385 max_concurrency: 0,
386 max_threads: THREADS,
387 timeout: None,
388 },
389 );
390 assert!(pending_runs.try_lock().unwrap().is_empty());
391 }
392
393 #[ignore]
395 #[test]
396 #[should_panic]
397 fn run_with_timeout() {
398 const THREADS: u8 = 4;
399 MultithreadedTestResult::run(
400 move |_| async move {
401 futures::future::pending::<()>().await;
402 },
403 THREADS,
404 Config {
405 repeat_count: 1,
406 max_concurrency: 0,
407 max_threads: 0,
408 timeout: Some(Duration::from_millis(1)),
409 },
410 );
411 }
412}