fuchsia_async/runtime/fuchsia/executor/common.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9 PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
24use std::sync::Arc;
25use std::task::Context;
26
27pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
28
29/// The id of the main task, which is a virtual task that lives from construction
30/// to destruction of the executor. The main task may correspond to multiple
31/// main futures, in cases where the executor runs multiple times during its lifetime.
32pub(crate) const MAIN_TASK_ID: usize = 0;
33
34thread_local!(
35 static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
36);
37
38pub enum ExecutorTime {
39 RealTime,
40 /// Fake readings used in tests.
41 FakeTime {
42 // The fake monotonic clock reading.
43 mono_reading_ns: AtomicI64,
44 // An offset to add to mono_reading_ns to get the reading of the boot
45 // clock, disregarding the difference in timelines.
46 //
47 // We disregard the fact that the reading and offset can not be
48 // read atomically, this is usually not relevant in tests.
49 mono_to_boot_offset_ns: AtomicI64,
50 },
51}
52
53enum PollReadyTasksResult {
54 NoneReady,
55 MoreReady,
56 MainTaskCompleted,
57}
58
59/// 24 16 8 0
60/// +------------+------------+------------+
61/// | foreign | notified | sleeping |
62/// +------------+------------+------------+
63///
64/// sleeping : the number of threads sleeping
65/// notified : the number of notifications posted to wake sleeping threads
66/// foreign : the number of foreign threads processing tasks
67#[derive(Clone, Copy, Eq, PartialEq)]
68struct ThreadsState(u32);
69
70impl ThreadsState {
71 const fn sleeping(&self) -> u8 {
72 self.0 as u8
73 }
74
75 const fn notified(&self) -> u8 {
76 (self.0 >> 8) as u8
77 }
78
79 const fn with_sleeping(self, sleeping: u8) -> Self {
80 Self((self.0 & !0xff) | sleeping as u32)
81 }
82
83 const fn with_notified(self, notified: u8) -> Self {
84 Self(self.0 & !0xff00 | (notified as u32) << 8)
85 }
86
87 const fn with_foreign(self, foreign: u8) -> Self {
88 Self(self.0 & !0xff0000 | (foreign as u32) << 16)
89 }
90}
91
92#[cfg(test)]
93static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
94
95pub(crate) struct Executor {
96 pub(super) port: zx::Port,
97 monotonic_timers: Arc<Timers<MonotonicInstant>>,
98 boot_timers: Arc<Timers<BootInstant>>,
99 pub(super) done: AtomicBool,
100 is_local: bool,
101 pub(crate) receivers: PacketReceiverMap,
102 task_count: AtomicUsize,
103 pub(super) ready_tasks: SegQueue<TaskHandle>,
104 time: ExecutorTime,
105 // The low byte is the number of threads currently sleeping. The high byte is the number of
106 // of wake-up notifications pending.
107 pub(super) threads_state: AtomicU32,
108 pub(super) num_threads: u8,
109 pub(super) polled: AtomicU64,
110 // Data that belongs to the user that can be accessed via EHandle::local(). See
111 // `TestExecutor::poll_until_stalled`.
112 pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
113 pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
114 pub(super) hooks_map: HooksMap,
115}
116
117impl Executor {
118 pub fn new(
119 time: ExecutorTime,
120 is_local: bool,
121 num_threads: u8,
122 instrument: Option<Arc<dyn TaskInstrument>>,
123 ) -> Self {
124 Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
125 }
126
127 pub fn new_with_port(
128 time: ExecutorTime,
129 is_local: bool,
130 num_threads: u8,
131 port: zx::Port,
132 instrument: Option<Arc<dyn TaskInstrument>>,
133 ) -> Self {
134 #[cfg(test)]
135 ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
136
137 // Is this a fake-time executor?
138 let is_fake = matches!(
139 time,
140 ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
141 );
142
143 Executor {
144 port,
145 monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
146 boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
147 done: AtomicBool::new(false),
148 is_local,
149 receivers: PacketReceiverMap::new(),
150 task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
151 ready_tasks: SegQueue::new(),
152 time,
153 threads_state: AtomicU32::new(0),
154 num_threads,
155 polled: AtomicU64::new(0),
156 owner_data: Mutex::new(None),
157 instrument,
158 hooks_map: HooksMap::default(),
159 }
160 }
161
162 pub fn set_local(root_scope: ScopeHandle) {
163 EXECUTOR.with(|e| {
164 let mut e = e.borrow_mut();
165 assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
166 *e = Some(root_scope);
167 });
168 }
169
170 fn poll_ready_tasks(&self) -> PollReadyTasksResult {
171 loop {
172 for _ in 0..16 {
173 let Some(task) = self.ready_tasks.pop() else {
174 return PollReadyTasksResult::NoneReady;
175 };
176 let task_id = task.id();
177 let complete = self.try_poll(task);
178 if complete && task_id == MAIN_TASK_ID {
179 return PollReadyTasksResult::MainTaskCompleted;
180 }
181 self.polled.fetch_add(1, Ordering::Relaxed);
182 }
183 // We didn't finish all the ready tasks. If there are sleeping threads, post a
184 // notification to wake one up.
185 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
186 loop {
187 if threads_state.sleeping() == 0 {
188 // All threads are awake now. Prevent starvation.
189 return PollReadyTasksResult::MoreReady;
190 }
191 if threads_state.notified() >= threads_state.sleeping() {
192 // All sleeping threads have been notified. Keep going and poll more tasks.
193 break;
194 }
195 match self.try_notify(threads_state) {
196 Ok(()) => break,
197 Err(s) => threads_state = s,
198 }
199 }
200 }
201 }
202
203 pub fn is_local(&self) -> bool {
204 self.is_local
205 }
206
207 pub fn next_task_id(&self) -> usize {
208 self.task_count.fetch_add(1, Ordering::Relaxed)
209 }
210
211 pub fn notify_task_ready(&self) {
212 // Only post if there's no thread running (or soon to be running). If we happen to be
213 // running on a thread for this executor, then threads_state won't be equal to num_threads,
214 // which means notifications only get fired if this is from a non-async thread, or a thread
215 // that belongs to a different executor. We use SeqCst ordering here to make sure this load
216 // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
217 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
218
219 // We only want to notify if there are no pending notifications and there are no other
220 // threads running.
221 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
222 match self.try_notify(threads_state) {
223 Ok(()) => break,
224 Err(s) => threads_state = s,
225 }
226 }
227 }
228
229 /// Tries to notify a thread to wake up. Returns threads_state if it fails.
230 fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
231 self.threads_state
232 .compare_exchange_weak(
233 old_threads_state.0,
234 old_threads_state.0 + ThreadsState(0).with_notified(1).0,
235 Ordering::Relaxed,
236 Ordering::Relaxed,
237 )
238 .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
239 .map_err(ThreadsState)
240 }
241
242 pub fn wake_one_thread(&self) {
243 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
244 let current_sleeping = threads_state.sleeping();
245 if current_sleeping == 0 {
246 return;
247 }
248 while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
249 match self.try_notify(threads_state) {
250 Ok(()) => break,
251 Err(s) => threads_state = s,
252 }
253 }
254 }
255
256 pub fn notify_id(&self, id: u64) {
257 let up = zx::UserPacket::from_u8_array([0; 32]);
258 let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
259 if let Err(e) = self.port.queue(&packet) {
260 // TODO: logging
261 eprintln!("Failed to queue notify in port: {e:?}");
262 }
263 }
264
265 /// Returns the current reading of the monotonic clock.
266 ///
267 /// For test executors running in fake time, returns the reading of the
268 /// fake monotonic clock.
269 pub fn now(&self) -> MonotonicInstant {
270 match &self.time {
271 ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
272 ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
273 MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
274 }
275 }
276 }
277
278 /// Returns the current reading of the boot clock.
279 ///
280 /// For test executors running in fake time, returns the reading of the
281 /// fake boot clock.
282 pub fn boot_now(&self) -> BootInstant {
283 match &self.time {
284 ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
285
286 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
287 // The two atomic values are loaded one after the other. This should
288 // not normally be an issue in tests.
289 let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
290 let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
291 BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
292 }
293 }
294 }
295
296 /// Sets the reading of the fake monotonic clock.
297 ///
298 /// # Panics
299 ///
300 /// If called on an executor that runs in real time.
301 pub fn set_fake_time(&self, new: MonotonicInstant) {
302 let boot_offset_ns = match &self.time {
303 ExecutorTime::RealTime => {
304 panic!("Error: called `set_fake_time` on an executor using actual time.")
305 }
306 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
307 t.store(new.into_nanos(), Ordering::Relaxed);
308 mono_to_boot_offset_ns.load(Ordering::Relaxed)
309 }
310 };
311 self.monotonic_timers.maybe_notify(new);
312
313 // Changing fake time also affects boot time. Notify boot clocks as well.
314 let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
315 self.boot_timers.maybe_notify(new_boot_time);
316 }
317
318 // Sets a new offset between boot and monotonic time.
319 //
320 // Only works for executors operating in fake time.
321 // The change in the fake offset will wake expired boot timers.
322 pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
323 let mono_now_ns = match &self.time {
324 ExecutorTime::RealTime => {
325 panic!("Error: called `set_fake_boot_to_mono_offset` on an executor using actual time.")
326 }
327 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
328 // We ignore the non-atomic update between b and t, it is likely
329 // not relevant in tests.
330 b.store(offset.into_nanos(), Ordering::Relaxed);
331 t.load(Ordering::Relaxed)
332 }
333 };
334 let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
335 self.boot_timers.maybe_notify(new_boot_now);
336 }
337
338 /// Returns `true` if this executor is running in real time. Returns
339 /// `false` if this executor si running in fake time.
340 pub fn is_real_time(&self) -> bool {
341 matches!(self.time, ExecutorTime::RealTime)
342 }
343
344 /// Must be called before `on_parent_drop`.
345 ///
346 /// Done flag must be set before dropping packet receivers
347 /// so that future receivers that attempt to deregister themselves
348 /// know that it's okay if their entries are already missing.
349 pub fn mark_done(&self) {
350 self.done.store(true, Ordering::SeqCst);
351
352 // Make sure there's at least one notification outstanding per thread to wake up all
353 // workers. This might be more notifications than required, but this way we don't have to
354 // worry about races where tasks are just about to sleep; when a task receives the
355 // notification, it will check done and terminate.
356 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
357 let num_threads = self.num_threads;
358 loop {
359 let notified = threads_state.notified();
360 if notified >= num_threads {
361 break;
362 }
363 match self.threads_state.compare_exchange_weak(
364 threads_state.0,
365 threads_state.with_notified(num_threads).0,
366 Ordering::Relaxed,
367 Ordering::Relaxed,
368 ) {
369 Ok(_) => {
370 for _ in notified..num_threads {
371 self.notify_id(TASK_READY_WAKEUP_ID);
372 }
373 return;
374 }
375 Err(old) => threads_state = ThreadsState(old),
376 }
377 }
378 }
379
380 /// Notes about the lifecycle of an Executor.
381 ///
382 /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
383 /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
384 /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
385 /// the port is dropped alongside the Executor as it should.
386 /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
387 ///
388 /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
389 /// "current" executor being set, and that's unset when the executor is dropped.
390 ///
391 /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
392 /// and point (b) is related to "what happens when I try to create a new receiver when there
393 /// is no executor".
394 ///
395 /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
396 /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
397 /// references to it [2] instead of strong.
398 ///
399 /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
400 /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
401 /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
402 /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
403 /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
404 /// in that thread.
405 ///
406 /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
407 /// unregistered themselves when the executor drops. The choice to panic was made based on
408 /// patterns in fuchsia-async that may come to change:
409 ///
410 /// - Executor vends strong references to itself and those references are *stored* by most
411 /// receiver implementations (as opposed to reached out on TLS every time).
412 /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
413 /// easy to understand error to return when polling on an extinct executor.
414 /// - All receivers are implemented in this crate and well-known.
415 ///
416 /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
417 /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
418 /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
419 /// opaque non-clone-copy-or-send guard would be stronger than this. See:
420 /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
421 pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
422 // Drop all tasks.
423 // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
424 // as part of waking, there's an upgrade to a strong reference, so for a small amount of
425 // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
426 // future for the task which in turn could hold references to receivers, which, if we did
427 // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
428 // task futures here.
429 root_scope.drop_all_tasks();
430
431 // Drop all of the uncompleted tasks
432 while self.ready_tasks.pop().is_some() {}
433
434 // Deregister the timer receivers so that we can perform the check below.
435 self.monotonic_timers.deregister();
436 self.boot_timers.deregister();
437
438 // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
439 // happen. See discussion above.
440 //
441 // If you're here because you hit this panic check your code for:
442 //
443 // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
444 // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
445 //
446 // - A function scope that contains a fuchsia_async::Executor NOT in the first position
447 // (first position in function scope gets dropped last:
448 // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
449 //
450 // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
451 // contains a temporary (temporaries are dropped after the function scope:
452 // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
453 // looks like a `match` statement at the end of the function without a semicolon.
454 //
455 // - Storing channel and FIDL objects in static variables.
456 //
457 // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
458 assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
459
460 // Remove the thread-local executor set in `new`.
461 EHandle::rm_local();
462 }
463
464 // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
465 // the debugger needs to be updated.
466 // LINT.IfChange
467 pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
468 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
469
470 self.monotonic_timers.register(self);
471 self.boot_timers.register(self);
472
473 loop {
474 // Keep track of whether we are considered asleep.
475 let mut sleeping = false;
476
477 match self.poll_ready_tasks() {
478 PollReadyTasksResult::NoneReady => {
479 // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
480 // want this change here to happen *before* we check ready_tasks below. This
481 // synchronizes with notify_task_ready which is called *after* a task is added
482 // to ready_tasks.
483 const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
484 self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
485 // Check ready tasks again. If a task got posted, wake up. This has to be done
486 // because a notification won't get sent if there is at least one active thread
487 // so there's a window between the preceding two lines where a task could be
488 // made ready and a notification is not sent because it looks like there is at
489 // least one thread running.
490 if self.ready_tasks.is_empty() {
491 sleeping = true;
492 } else {
493 // We lost a race, we're no longer sleeping.
494 self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
495 }
496 }
497 PollReadyTasksResult::MoreReady => {}
498 PollReadyTasksResult::MainTaskCompleted => return,
499 }
500
501 // Check done here after updating threads_state to avoid shutdown races.
502 if self.done.load(Ordering::SeqCst) {
503 return;
504 }
505
506 enum Work {
507 None,
508 Packet(zx::Packet),
509 Stalled,
510 }
511
512 let mut notified = false;
513 let work = {
514 // If we're considered awake choose INFINITE_PAST which will make the wait call
515 // return immediately. Otherwise, wait until a packet arrives.
516 let deadline = if !sleeping || UNTIL_STALLED {
517 zx::Instant::INFINITE_PAST
518 } else {
519 zx::Instant::INFINITE
520 };
521
522 match self.port.wait(deadline) {
523 Ok(packet) => {
524 if packet.key() == TASK_READY_WAKEUP_ID {
525 notified = true;
526 Work::None
527 } else {
528 Work::Packet(packet)
529 }
530 }
531 Err(zx::Status::TIMED_OUT) => {
532 if !UNTIL_STALLED || !sleeping {
533 Work::None
534 } else {
535 Work::Stalled
536 }
537 }
538 Err(status) => {
539 panic!("Error calling port wait: {status:?}");
540 }
541 }
542 };
543
544 let threads_state_sub =
545 ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
546 if threads_state_sub.0 > 0 {
547 self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
548 }
549
550 match work {
551 Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
552 Work::None => {}
553 Work::Stalled => return,
554 }
555 }
556 }
557
558 /// Drops the main task.
559 ///
560 /// # Safety
561 ///
562 /// The caller must guarantee that the executor isn't running.
563 pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
564 root_scope.drop_task_unchecked(MAIN_TASK_ID);
565 }
566
567 fn try_poll(&self, task: TaskHandle) -> bool {
568 let task_waker = task.waker();
569 let poll_result = TaskHandle::set_current_with(&task, || {
570 task.try_poll(&mut Context::from_waker(&task_waker))
571 });
572 match poll_result {
573 AttemptPollResult::Yield => {
574 self.ready_tasks.push(task);
575 false
576 }
577 AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
578 task.scope().task_did_finish(task.id());
579 true
580 }
581 _ => false,
582 }
583 }
584
585 /// Returns the monotonic timers.
586 pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
587 &self.monotonic_timers
588 }
589
590 /// Returns the boot timers.
591 pub fn boot_timers(&self) -> &Timers<BootInstant> {
592 &self.boot_timers
593 }
594
595 fn poll_tasks(&self, callback: impl FnOnce()) {
596 assert!(!self.is_local);
597
598 // Increment the count of foreign threads.
599 const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
600 self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
601
602 callback();
603
604 // Poll up to 16 tasks.
605 for _ in 0..16 {
606 let Some(task) = self.ready_tasks.pop() else {
607 break;
608 };
609 let task_id = task.id();
610 if self.try_poll(task) && task_id == MAIN_TASK_ID {
611 break;
612 }
613 self.polled.fetch_add(1, Ordering::Relaxed);
614 }
615
616 let mut threads_state = ThreadsState(
617 self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
618 );
619
620 if !self.ready_tasks.is_empty() {
621 // There are tasks still ready to run, so wake up a thread if all the other threads are
622 // sleeping.
623 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
624 match self.try_notify(threads_state) {
625 Ok(()) => break,
626 Err(s) => threads_state = s,
627 }
628 }
629 }
630 }
631
632 pub fn task_is_ready(&self, task: TaskHandle) {
633 self.ready_tasks.push(task);
634 self.notify_task_ready();
635 }
636}
637
638#[cfg(test)]
639impl Drop for Executor {
640 fn drop(&mut self) {
641 ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
642 }
643}
644
645/// A handle to an executor.
646#[derive(Clone)]
647pub struct EHandle {
648 // LINT.IfChange
649 pub(super) root_scope: ScopeHandle,
650 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
651}
652
653impl fmt::Debug for EHandle {
654 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
655 f.debug_struct("EHandle").field("port", &self.inner().port).finish()
656 }
657}
658
659impl EHandle {
660 /// Returns the thread-local executor.
661 ///
662 /// # Panics
663 ///
664 /// If called outside the context of an active async executor.
665 pub fn local() -> Self {
666 let root_scope = EXECUTOR
667 .with(|e| e.borrow().as_ref().map(|x| x.clone()))
668 .expect("Fuchsia Executor must be created first");
669
670 EHandle { root_scope }
671 }
672
673 /// Set the fake time to a given value.
674 ///
675 /// # Panics
676 ///
677 /// If the executor was not created with fake time.
678 pub fn set_fake_time(&self, t: MonotonicInstant) {
679 self.inner().set_fake_time(t)
680 }
681
682 pub(super) fn rm_local() {
683 EXECUTOR.with(|e| *e.borrow_mut() = None);
684 }
685
686 /// The root scope of the executor.
687 ///
688 /// This can be used to spawn tasks that live as long as the executor, and
689 /// to create shorter-lived child scopes.
690 ///
691 /// Most users should create an owned scope with
692 /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
693 pub fn global_scope(&self) -> &ScopeHandle {
694 &self.root_scope
695 }
696
697 /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
698 pub fn port(&self) -> &zx::Port {
699 &self.inner().port
700 }
701
702 /// Registers a `PacketReceiver` with the executor and returns a registration.
703 /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
704 pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
705 self.inner().receivers.register(self.inner().clone(), receiver)
706 }
707
708 /// Registers a pinned `RawPacketReceiver` with the executor.
709 ///
710 /// The registration will be deregistered when dropped.
711 ///
712 /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
713 /// held, so it is not safe to register or unregister receivers at that time.
714 pub fn register_pinned<T: PacketReceiver>(
715 &self,
716 raw_registration: Pin<&mut RawReceiverRegistration<T>>,
717 ) {
718 self.inner().receivers.register_pinned(self.clone(), raw_registration);
719 }
720
721 #[inline(always)]
722 pub(crate) fn inner(&self) -> &Arc<Executor> {
723 self.root_scope.executor()
724 }
725
726 /// Spawn a new task to be run on this executor.
727 ///
728 /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
729 /// may be run on either a singlethreaded or multithreaded executor.
730 pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
731 self.global_scope().spawn(future);
732 }
733
734 /// Spawn a new task to be run on this executor.
735 ///
736 /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
737 /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
738 /// this executor is a LocalExecutor.
739 pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
740 self.global_scope().spawn_local(future);
741 }
742
743 pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
744 &self.inner().monotonic_timers
745 }
746
747 pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
748 &self.inner().boot_timers
749 }
750
751 /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
752 /// that are ready to run. If tasks remain ready and no other threads are running, a thread
753 /// will be woken. This can end up being a performance win in the case that the queue can be
754 /// cleared without needing to wake any other thread.
755 ///
756 /// # Panics
757 ///
758 /// If called on a single-threaded executor or if this thread is a thread managed by the
759 /// executor.
760 pub fn poll_tasks(&self, callback: impl FnOnce()) {
761 EXECUTOR.with(|e| {
762 assert!(
763 e.borrow_mut().replace(self.root_scope.clone()).is_none(),
764 "This thread is already associated with an executor"
765 );
766 });
767
768 self.inner().poll_tasks(callback);
769
770 EXECUTOR.with(|e| *e.borrow_mut() = None);
771 }
772}
773
774// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
775// non-static lifetime). The executor doesn't handle this though; the executor just assumes all
776// tasks have the 'static lifetime. It's up to the local executor to extend the lifetime and make
777// it safe.
778pub type TaskHandle = AtomicFutureHandle<'static>;
779
780thread_local! {
781 static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
782}
783
784impl TaskHandle {
785 pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
786 CURRENT_TASK.with(|cur| {
787 let cur = cur.get();
788 let cur = unsafe { cur.as_ref() };
789 f(cur)
790 })
791 }
792
793 fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
794 CURRENT_TASK.with(|cur| {
795 cur.set(task);
796 let result = f();
797 cur.set(std::ptr::null());
798 result
799 })
800 }
801}
802
803#[cfg(test)]
804mod tests {
805 use super::{EHandle, ACTIVE_EXECUTORS};
806 use crate::SendExecutorBuilder;
807 use crossbeam::epoch;
808 use std::sync::atomic::{AtomicU64, Ordering};
809 use std::sync::Arc;
810
811 #[test]
812 fn test_no_leaks() {
813 std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
814 .join()
815 .unwrap();
816
817 // This seems like the only way to force crossbeam to do a collection.
818 while ACTIVE_EXECUTORS.load(Ordering::Relaxed) != 0 {
819 epoch::pin().defer(|| {});
820 }
821 }
822
823 #[test]
824 fn poll_tasks() {
825 SendExecutorBuilder::new().num_threads(1).build().run(async {
826 let ehandle = EHandle::local();
827
828 // This will tie up the executor's only running thread which ensures that the task
829 // we spawn below can only run on the foreign thread.
830 std::thread::spawn(move || {
831 let ran = Arc::new(AtomicU64::new(0));
832 ehandle.poll_tasks(|| {
833 let ran = ran.clone();
834 ehandle.spawn_detached(async move {
835 ran.fetch_add(1, Ordering::Relaxed);
836 });
837 });
838
839 // The spawned task should have run in this thread.
840 assert_eq!(ran.load(Ordering::Relaxed), 1);
841 })
842 .join()
843 .unwrap();
844 });
845 }
846}