fuchsia_async/runtime/fuchsia/executor/
send.rs1use super::common::{Executor, ExecutorTime, MAIN_TASK_ID};
6use super::scope::ScopeHandle;
7use fuchsia_sync::{Condvar, Mutex};
8
9use futures::FutureExt;
10use std::future::Future;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::time::Duration;
14use std::{fmt, thread};
15
16pub struct SendExecutor {
27 inner: Arc<Executor>,
29 root_scope: ScopeHandle,
32 threads: Vec<thread::JoinHandle<()>>,
35 worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
36}
37
38impl fmt::Debug for SendExecutor {
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 f.debug_struct("SendExecutor").field("port", &self.inner.port).finish()
41 }
42}
43
44impl SendExecutor {
45 #[allow(deprecated)]
47 pub fn new(num_threads: u8) -> Self {
48 Self::new_inner(num_threads, None)
49 }
50
51 pub fn set_worker_init(&mut self, worker_init: impl Fn() + Send + Sync + 'static) {
54 self.worker_init = Some(Arc::new(worker_init) as Arc<dyn Fn() + Send + Sync + 'static>);
55 }
56
57 pub fn with_worker_init(mut self, worker_init: fn()) -> Self {
61 self.set_worker_init(worker_init);
62 self
63 }
64
65 fn new_inner(
66 num_threads: u8,
67 worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
68 ) -> Self {
69 let inner =
70 Arc::new(Executor::new(ExecutorTime::RealTime, false, num_threads));
71 let root_scope = ScopeHandle::root(inner.clone());
72 Executor::set_local(root_scope.clone());
73 Self { inner, root_scope, threads: Vec::default(), worker_init }
74 }
75
76 pub fn port(&self) -> &zx::Port {
78 &self.inner.port
79 }
80
81 pub fn run<F>(&mut self, future: F) -> F::Output
87 where
89 F: Future + Send + 'static,
90 F::Output: Send + 'static,
91 {
92 assert!(self.inner.is_real_time(), "Error: called `run` on an executor using fake time");
93
94 let pair = Arc::new((Mutex::new(None), Condvar::new()));
95 let pair2 = pair.clone();
96
97 let task = self.root_scope.new_task(
99 Some(MAIN_TASK_ID),
100 future.map(move |fut_result| {
101 let (lock, cvar) = &*pair2;
102 let mut result = lock.lock();
103 *result = Some(fut_result);
104 cvar.notify_one();
105 }),
106 );
107 task.detach();
108 assert!(self.root_scope.insert_task(task, false));
109
110 self.inner.done.store(false, Ordering::SeqCst);
112 self.create_worker_threads();
113
114 let (lock, cvar) = &*pair;
116 let mut result = lock.lock();
117 if result.is_none() {
118 let mut last_polled = 0;
119 let mut last_tasks_ready = false;
120 loop {
121 cvar.wait_for(&mut result, Duration::from_millis(250));
125 if result.is_some() {
126 break;
127 }
128 let polled = self.inner.polled.load(Ordering::Relaxed);
129 let tasks_ready = !self.inner.ready_tasks.is_empty();
130 if polled == last_polled && last_tasks_ready && tasks_ready {
131 eprintln!("Tasks might be stalled!");
142 self.inner.wake_one_thread();
143 }
144 last_polled = polled;
145 last_tasks_ready = tasks_ready;
146 }
147 }
148
149 self.join_all();
151
152 result.take().unwrap()
154 }
155
156 #[doc(hidden)]
157 pub fn root_scope(&self) -> &ScopeHandle {
159 &self.root_scope
160 }
161
162 fn create_worker_threads(&mut self) {
165 for _ in 0..self.inner.num_threads {
166 let inner = self.inner.clone();
167 let root_scope = self.root_scope.clone();
168 let worker_init = self.worker_init.clone();
169 let thread = thread::Builder::new()
170 .name("executor_worker".to_string())
171 .spawn(move || {
172 Executor::set_local(root_scope);
173 if let Some(init) = worker_init.as_ref() {
174 init();
175 }
176 inner.worker_lifecycle::<false>();
177 })
178 .expect("must be able to spawn threads");
179 self.threads.push(thread);
180 }
181 }
182
183 fn join_all(&mut self) {
184 self.inner.mark_done();
185
186 for thread in self.threads.drain(..) {
188 thread.join().expect("Couldn't join worker thread.");
189 }
190 }
191}
192
193impl Drop for SendExecutor {
194 fn drop(&mut self) {
195 self.join_all();
196 self.inner.on_parent_drop(&self.root_scope);
197 }
198}
199
200#[cfg(test)]
203mod tests {
204 use super::SendExecutor;
205 use crate::{Task, Timer};
206
207 use futures::channel::oneshot;
208 use std::sync::atomic::{AtomicU64, Ordering};
209 use std::sync::{Arc, Condvar, Mutex};
210
211 #[test]
212 fn test_stalled_triggers_wake_up() {
213 SendExecutor::new(2).run(async {
214 Timer::new(zx::MonotonicDuration::from_millis(10)).await;
217
218 let (tx, rx) = oneshot::channel();
219 let pair = Arc::new((Mutex::new(false), Condvar::new()));
220 let pair2 = pair.clone();
221
222 let _task = Task::spawn(async move {
223 tx.send(()).unwrap();
225 let (lock, cvar) = &*pair;
227 let mut done = lock.lock().unwrap();
228 while !*done {
229 done = cvar.wait(done).unwrap();
230 }
231 });
232
233 rx.await.unwrap();
234 let (lock, cvar) = &*pair2;
235 *lock.lock().unwrap() = true;
236 cvar.notify_one();
237 });
238 }
239
240 #[test]
241 fn worker_init_called_once_per_worker() {
242 static NUM_INIT_CALLS: AtomicU64 = AtomicU64::new(0);
243 fn initialize_test_worker() {
244 NUM_INIT_CALLS.fetch_add(1, Ordering::SeqCst);
245 }
246
247 let mut exec = SendExecutor::new(2).with_worker_init(initialize_test_worker);
248 exec.run(async {});
249 assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 2);
250 exec.run(async {});
251 assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 4);
252 }
253}