fuchsia_async/runtime/fuchsia/executor/
send.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::common::{Executor, ExecutorTime, MAIN_TASK_ID};
6use super::scope::ScopeHandle;
7use fuchsia_sync::{Condvar, Mutex};
8
9use crate::runtime::instrument::TaskInstrument;
10use futures::FutureExt;
11use std::future::Future;
12use std::sync::atomic::Ordering;
13use std::sync::Arc;
14use std::time::Duration;
15use std::{fmt, thread};
16
17/// A multi-threaded port-based executor for Fuchsia. Requires that tasks scheduled on it
18/// implement `Send` so they can be load balanced between worker threads.
19///
20/// Having a `SendExecutor` in scope allows the creation and polling of zircon objects, such as
21/// [`fuchsia_async::Channel`].
22///
23/// # Panics
24///
25/// `SendExecutor` will panic on drop if any zircon objects attached to it are still alive. In other
26/// words, zircon objects backed by a `SendExecutor` must be dropped before it.
27pub struct SendExecutor {
28    /// The inner executor state.
29    inner: Arc<Executor>,
30    // LINT.IfChange
31    /// The root scope.
32    root_scope: ScopeHandle,
33    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
34    /// Worker thread handles
35    threads: Vec<thread::JoinHandle<()>>,
36    worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
37}
38
39impl fmt::Debug for SendExecutor {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        f.debug_struct("SendExecutor").field("port", &self.inner.port).finish()
42    }
43}
44
45impl SendExecutor {
46    fn new_inner(
47        num_threads: u8,
48        worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
49        instrument: Option<Arc<dyn TaskInstrument>>,
50    ) -> Self {
51        let inner = Arc::new(Executor::new(
52            ExecutorTime::RealTime,
53            /* is_local */ false,
54            num_threads,
55            instrument,
56        ));
57        let root_scope = ScopeHandle::root(inner.clone());
58        Executor::set_local(root_scope.clone());
59        Self { inner, root_scope, threads: Vec::default(), worker_init }
60    }
61
62    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
63    pub fn port(&self) -> &zx::Port {
64        &self.inner.port
65    }
66
67    /// Run `future` to completion, using this thread and `num_threads` workers in a pool to
68    /// poll active tasks.
69    // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
70    // the debugger needs to be updated.
71    // LINT.IfChange
72    pub fn run<F>(&mut self, future: F) -> F::Output
73    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
74    where
75        F: Future + Send + 'static,
76        F::Output: Send + 'static,
77    {
78        assert!(self.inner.is_real_time(), "Error: called `run` on an executor using fake time");
79
80        let pair = Arc::new((Mutex::new(None), Condvar::new()));
81        let pair2 = pair.clone();
82
83        // Spawn a future which will set the result upon completion.
84        let task = self.root_scope.new_task(
85            Some(MAIN_TASK_ID),
86            future.map(move |fut_result| {
87                let (lock, cvar) = &*pair2;
88                let mut result = lock.lock();
89                *result = Some(fut_result);
90                cvar.notify_one();
91            }),
92        );
93        task.detach();
94        assert!(self.root_scope.insert_task(task, false));
95
96        // Start worker threads, handing off timers from the current thread.
97        self.inner.done.store(false, Ordering::SeqCst);
98        self.create_worker_threads();
99
100        // Wait until the signal the future has completed.
101        let (lock, cvar) = &*pair;
102        let mut result = lock.lock();
103        if result.is_none() {
104            let mut last_polled = 0;
105            let mut last_tasks_ready = false;
106            loop {
107                // This timeout is chosen to be quite high since it impacts all processes that have
108                // multi-threaded async executors, and it exists to workaround arguably misbehaving
109                // users (see the comment below).
110                cvar.wait_for(&mut result, Duration::from_millis(250));
111                if result.is_some() {
112                    break;
113                }
114                let polled = self.inner.polled.load(Ordering::Relaxed);
115                let tasks_ready = !self.inner.ready_tasks.is_empty();
116                if polled == last_polled && last_tasks_ready && tasks_ready {
117                    // If this log message is printed, it most likely means that a task has blocked
118                    // making a reentrant synchronous call that doesn't involve a port message being
119                    // processed by this same executor. This can arise even if you would expect
120                    // there to normally be other port messages involved. One example (that has
121                    // actually happened): spawn a task to service a fuchsia.io connection, then try
122                    // and synchronously connect to that service. If the task hasn't had a chance to
123                    // run, then the async channel might not be registered with the executor, and so
124                    // sending messages to the channel doesn't trigger a port message. Typically,
125                    // the way to solve these issues is to run the service in a different executor
126                    // (which could be the same or a different process).
127                    eprintln!("Tasks might be stalled!");
128                    self.inner.wake_one_thread();
129                }
130                last_polled = polled;
131                last_tasks_ready = tasks_ready;
132            }
133        }
134
135        // Spin down worker threads
136        self.join_all();
137
138        // Unwrap is fine because of the check to `is_none` above.
139        result.take().unwrap()
140    }
141
142    #[doc(hidden)]
143    /// Returns the root scope of the executor.
144    pub fn root_scope(&self) -> &ScopeHandle {
145        &self.root_scope
146    }
147
148    /// Add `self.num_threads` worker threads to the executor's thread pool.
149    /// `timers`: timers from the "main" thread which would otherwise be lost.
150    fn create_worker_threads(&mut self) {
151        for _ in 0..self.inner.num_threads {
152            let inner = self.inner.clone();
153            let root_scope = self.root_scope.clone();
154            let worker_init = self.worker_init.clone();
155            let thread = thread::Builder::new()
156                .name("executor_worker".to_string())
157                .spawn(move || {
158                    Executor::set_local(root_scope);
159                    if let Some(init) = worker_init.as_ref() {
160                        init();
161                    }
162                    inner.worker_lifecycle::</* UNTIL_STALLED: */ false>();
163                })
164                .expect("must be able to spawn threads");
165            self.threads.push(thread);
166        }
167    }
168
169    fn join_all(&mut self) {
170        self.inner.mark_done();
171
172        // Join the worker threads
173        for thread in self.threads.drain(..) {
174            thread.join().expect("Couldn't join worker thread.");
175        }
176    }
177}
178
179impl Drop for SendExecutor {
180    fn drop(&mut self) {
181        self.join_all();
182        self.inner.on_parent_drop(&self.root_scope);
183    }
184}
185
186/// A builder for `SendExecutor`.
187#[derive(Default)]
188pub struct SendExecutorBuilder {
189    num_threads: Option<u8>,
190    worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
191    instrument: Option<Arc<dyn TaskInstrument>>,
192}
193
194impl SendExecutorBuilder {
195    /// Creates a new builder used for constructing a `SendExecutor`.
196    pub fn new() -> Self {
197        Self::default()
198    }
199
200    /// Sets the number of threads for the executor.
201    pub fn num_threads(mut self, num_threads: u8) -> Self {
202        self.num_threads = Some(num_threads);
203        self
204    }
205
206    /// Sets the worker initialization function.
207    pub fn worker_init(mut self, worker_init: impl Fn() + Send + Sync + 'static) -> Self {
208        self.worker_init = Some(Arc::new(worker_init));
209        self
210    }
211
212    /// Sets the instrumentation hook.
213    pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
214        self.instrument = instrument;
215        self
216    }
217
218    /// Builds the `SendExecutor`, consuming this `SendExecutorBuilder`.
219    pub fn build(self) -> SendExecutor {
220        SendExecutor::new_inner(self.num_threads.unwrap_or(1), self.worker_init, self.instrument)
221    }
222}
223
224// TODO(https://fxbug.dev/42156503) test SendExecutor with unit tests
225
226#[cfg(test)]
227mod tests {
228    use super::SendExecutorBuilder;
229    use crate::{Task, Timer};
230
231    use futures::channel::oneshot;
232    use std::sync::atomic::{AtomicU64, Ordering};
233    use std::sync::{Arc, Condvar, Mutex};
234
235    #[test]
236    fn test_stalled_triggers_wake_up() {
237        SendExecutorBuilder::new().num_threads(2).build().run(async {
238            // The timer will only fire on one thread, so use one so we can get to a point where
239            // only one thread is running.
240            Timer::new(zx::MonotonicDuration::from_millis(10)).await;
241
242            let (tx, rx) = oneshot::channel();
243            let pair = Arc::new((Mutex::new(false), Condvar::new()));
244            let pair2 = pair.clone();
245
246            let _task = Task::spawn(async move {
247                // Send a notification to the other task.
248                tx.send(()).unwrap();
249                // Now block the thread waiting for the result.
250                let (lock, cvar) = &*pair;
251                let mut done = lock.lock().unwrap();
252                while !*done {
253                    done = cvar.wait(done).unwrap();
254                }
255            });
256
257            rx.await.unwrap();
258            let (lock, cvar) = &*pair2;
259            *lock.lock().unwrap() = true;
260            cvar.notify_one();
261        });
262    }
263
264    #[test]
265    fn worker_init_called_once_per_worker() {
266        static NUM_INIT_CALLS: AtomicU64 = AtomicU64::new(0);
267        fn initialize_test_worker() {
268            NUM_INIT_CALLS.fetch_add(1, Ordering::SeqCst);
269        }
270
271        let mut exec =
272            SendExecutorBuilder::new().num_threads(2).worker_init(initialize_test_worker).build();
273        exec.run(async {});
274        assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 2);
275        exec.run(async {});
276        assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 4);
277    }
278}