fuchsia_async/runtime/fuchsia/executor/
send.rs1use super::common::{Executor, ExecutorTime, MAIN_TASK_ID};
6use super::scope::ScopeHandle;
7use fuchsia_sync::{Condvar, Mutex};
8
9use crate::runtime::instrument::TaskInstrument;
10use futures::FutureExt;
11use std::future::Future;
12use std::sync::atomic::Ordering;
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, thread};
16
17pub struct SendExecutor {
28 inner: Arc<Executor>,
30 root_scope: ScopeHandle,
33 threads: Vec<thread::JoinHandle<()>>,
36 worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
37}
38
39impl fmt::Debug for SendExecutor {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("SendExecutor").field("port", &self.inner.port).finish()
42 }
43}
44
45impl SendExecutor {
46 fn new_inner(
47 num_threads: u8,
48 worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
49 instrument: Option<Arc<dyn TaskInstrument>>,
50 ) -> Self {
51 let inner = Arc::new(Executor::new(
52 ExecutorTime::RealTime,
53 false,
54 num_threads,
55 instrument,
56 ));
57 let root_scope = ScopeHandle::root(inner.clone());
58 Executor::set_local(root_scope.clone());
59 Self { inner, root_scope, threads: Vec::default(), worker_init }
60 }
61
62 pub fn port(&self) -> &zx::Port {
64 &self.inner.port
65 }
66
67 pub fn run<F>(&mut self, future: F) -> F::Output
73 where
75 F: Future + Send + 'static,
76 F::Output: Send + 'static,
77 {
78 assert!(self.inner.is_real_time(), "Error: called `run` on an executor using fake time");
79
80 let pair = Arc::new((Mutex::new(None), Condvar::new()));
81 let pair2 = pair.clone();
82
83 let task = self.root_scope.new_task(
85 Some(MAIN_TASK_ID),
86 future.map(move |fut_result| {
87 let (lock, cvar) = &*pair2;
88 let mut result = lock.lock();
89 *result = Some(fut_result);
90 cvar.notify_one();
91 }),
92 );
93 task.detach();
94 assert!(self.root_scope.insert_task(task, false));
95
96 self.inner.done.store(false, Ordering::SeqCst);
98 self.create_worker_threads();
99
100 let (lock, cvar) = &*pair;
102 let mut result = lock.lock();
103 if result.is_none() {
104 let mut last_polled = 0;
105 let mut last_tasks_ready = false;
106 loop {
107 cvar.wait_for(&mut result, Duration::from_millis(250));
111 if result.is_some() {
112 break;
113 }
114 let polled = self.inner.polled.load(Ordering::Relaxed);
115 let tasks_ready = !self.inner.ready_tasks.is_empty();
116 if polled == last_polled && last_tasks_ready && tasks_ready {
117 eprintln!("Tasks might be stalled!");
128 self.inner.wake_one_thread();
129 }
130 last_polled = polled;
131 last_tasks_ready = tasks_ready;
132 }
133 }
134
135 self.join_all();
137
138 result.take().unwrap()
140 }
141
142 #[doc(hidden)]
143 pub fn root_scope(&self) -> &ScopeHandle {
145 &self.root_scope
146 }
147
148 fn create_worker_threads(&mut self) {
151 for _ in 0..self.inner.num_threads {
152 let inner = self.inner.clone();
153 let root_scope = self.root_scope.clone();
154 let worker_init = self.worker_init.clone();
155 let thread = thread::Builder::new()
156 .name("executor_worker".to_string())
157 .spawn(move || {
158 Executor::set_local(root_scope);
159 if let Some(init) = worker_init.as_ref() {
160 init();
161 }
162 inner.worker_lifecycle::<false>();
163 })
164 .expect("must be able to spawn threads");
165 self.threads.push(thread);
166 }
167 }
168
169 fn join_all(&mut self) {
170 self.inner.mark_done();
171
172 for thread in self.threads.drain(..) {
174 thread.join().expect("Couldn't join worker thread.");
175 }
176 }
177}
178
179impl Drop for SendExecutor {
180 fn drop(&mut self) {
181 self.join_all();
182 self.inner.on_parent_drop(&self.root_scope);
183 }
184}
185
186#[derive(Default)]
188pub struct SendExecutorBuilder {
189 num_threads: Option<u8>,
190 worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
191 instrument: Option<Arc<dyn TaskInstrument>>,
192}
193
194impl SendExecutorBuilder {
195 pub fn new() -> Self {
197 Self::default()
198 }
199
200 pub fn num_threads(mut self, num_threads: u8) -> Self {
202 self.num_threads = Some(num_threads);
203 self
204 }
205
206 pub fn worker_init(mut self, worker_init: impl Fn() + Send + Sync + 'static) -> Self {
208 self.worker_init = Some(Arc::new(worker_init));
209 self
210 }
211
212 pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
214 self.instrument = instrument;
215 self
216 }
217
218 pub fn build(self) -> SendExecutor {
220 SendExecutor::new_inner(self.num_threads.unwrap_or(1), self.worker_init, self.instrument)
221 }
222}
223
224#[cfg(test)]
227mod tests {
228 use super::SendExecutorBuilder;
229 use crate::{Task, Timer};
230
231 use futures::channel::oneshot;
232 use std::sync::atomic::{AtomicU64, Ordering};
233 use std::sync::{Arc, Condvar, Mutex};
234
235 #[test]
236 fn test_stalled_triggers_wake_up() {
237 SendExecutorBuilder::new().num_threads(2).build().run(async {
238 Timer::new(zx::MonotonicDuration::from_millis(10)).await;
241
242 let (tx, rx) = oneshot::channel();
243 let pair = Arc::new((Mutex::new(false), Condvar::new()));
244 let pair2 = pair.clone();
245
246 let _task = Task::spawn(async move {
247 tx.send(()).unwrap();
249 let (lock, cvar) = &*pair;
251 let mut done = lock.lock().unwrap();
252 while !*done {
253 done = cvar.wait(done).unwrap();
254 }
255 });
256
257 rx.await.unwrap();
258 let (lock, cvar) = &*pair2;
259 *lock.lock().unwrap() = true;
260 cvar.notify_one();
261 });
262 }
263
264 #[test]
265 fn worker_init_called_once_per_worker() {
266 static NUM_INIT_CALLS: AtomicU64 = AtomicU64::new(0);
267 fn initialize_test_worker() {
268 NUM_INIT_CALLS.fetch_add(1, Ordering::SeqCst);
269 }
270
271 let mut exec =
272 SendExecutorBuilder::new().num_threads(2).worker_init(initialize_test_worker).build();
273 exec.run(async {});
274 assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 2);
275 exec.run(async {});
276 assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 4);
277 }
278}