fuchsia_async/runtime/fuchsia/executor/common.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
7use super::packets::{
8 PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
9};
10use super::scope::ScopeHandle;
11use super::time::{BootInstant, MonotonicInstant};
12use crossbeam::queue::SegQueue;
13use fuchsia_sync::Mutex;
14use zx::BootDuration;
15
16use std::any::Any;
17use std::cell::{Cell, RefCell};
18use std::fmt;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::task::Context;
24
25pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
26
27/// The id of the main task, which is a virtual task that lives from construction
28/// to destruction of the executor. The main task may correspond to multiple
29/// main futures, in cases where the executor runs multiple times during its lifetime.
30pub(crate) const MAIN_TASK_ID: usize = 0;
31
32thread_local!(
33 static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
34);
35
36pub enum ExecutorTime {
37 RealTime,
38 /// Fake readings used in tests.
39 FakeTime {
40 // The fake monotonic clock reading.
41 mono_reading_ns: AtomicI64,
42 // An offset to add to mono_reading_ns to get the reading of the boot
43 // clock, disregarding the difference in timelines.
44 //
45 // We disregard the fact that the reading and offset can not be
46 // read atomically, this is usually not relevant in tests.
47 mono_to_boot_offset_ns: AtomicI64,
48 },
49}
50
51enum PollReadyTasksResult {
52 NoneReady,
53 MoreReady,
54 MainTaskCompleted,
55}
56
57/// 24 16 8 0
58/// +------------+------------+------------+
59/// | foreign | notified | sleeping |
60/// +------------+------------+------------+
61///
62/// sleeping : the number of threads sleeping
63/// notified : the number of notifications posted to wake sleeping threads
64/// foreign : the number of foreign threads processing tasks
65#[derive(Clone, Copy, Eq, PartialEq)]
66struct ThreadsState(u32);
67
68impl ThreadsState {
69 const fn sleeping(&self) -> u8 {
70 self.0 as u8
71 }
72
73 const fn notified(&self) -> u8 {
74 (self.0 >> 8) as u8
75 }
76
77 const fn with_sleeping(self, sleeping: u8) -> Self {
78 Self((self.0 & !0xff) | sleeping as u32)
79 }
80
81 const fn with_notified(self, notified: u8) -> Self {
82 Self(self.0 & !0xff00 | (notified as u32) << 8)
83 }
84
85 const fn with_foreign(self, foreign: u8) -> Self {
86 Self(self.0 & !0xff0000 | (foreign as u32) << 16)
87 }
88}
89
90#[cfg(test)]
91static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
92
93pub(crate) struct Executor {
94 pub(super) port: zx::Port,
95 monotonic_timers: Arc<Timers<MonotonicInstant>>,
96 boot_timers: Arc<Timers<BootInstant>>,
97 pub(super) done: AtomicBool,
98 is_local: bool,
99 pub(crate) receivers: PacketReceiverMap,
100 task_count: AtomicUsize,
101 pub(super) ready_tasks: SegQueue<TaskHandle>,
102 time: ExecutorTime,
103 // The low byte is the number of threads currently sleeping. The high byte is the number of
104 // of wake-up notifications pending.
105 pub(super) threads_state: AtomicU32,
106 pub(super) num_threads: u8,
107 pub(super) polled: AtomicU64,
108 // Data that belongs to the user that can be accessed via EHandle::local(). See
109 // `TestExecutor::poll_until_stalled`.
110 pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
111}
112
113impl Executor {
114 pub fn new(time: ExecutorTime, is_local: bool, num_threads: u8) -> Self {
115 Self::new_with_port(time, is_local, num_threads, zx::Port::create())
116 }
117
118 pub fn new_with_port(
119 time: ExecutorTime,
120 is_local: bool,
121 num_threads: u8,
122 port: zx::Port,
123 ) -> Self {
124 #[cfg(test)]
125 ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
126
127 // Is this a fake-time executor?
128 let is_fake = matches!(
129 time,
130 ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
131 );
132
133 Executor {
134 port,
135 monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
136 boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
137 done: AtomicBool::new(false),
138 is_local,
139 receivers: PacketReceiverMap::new(),
140 task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
141 ready_tasks: SegQueue::new(),
142 time,
143 threads_state: AtomicU32::new(0),
144 num_threads,
145 polled: AtomicU64::new(0),
146 owner_data: Mutex::new(None),
147 }
148 }
149
150 pub fn set_local(root_scope: ScopeHandle) {
151 EXECUTOR.with(|e| {
152 let mut e = e.borrow_mut();
153 assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
154 *e = Some(root_scope);
155 });
156 }
157
158 fn poll_ready_tasks(&self) -> PollReadyTasksResult {
159 loop {
160 for _ in 0..16 {
161 let Some(task) = self.ready_tasks.pop() else {
162 return PollReadyTasksResult::NoneReady;
163 };
164 let task_id = task.id();
165 let complete = self.try_poll(task);
166 if complete && task_id == MAIN_TASK_ID {
167 return PollReadyTasksResult::MainTaskCompleted;
168 }
169 self.polled.fetch_add(1, Ordering::Relaxed);
170 }
171 // We didn't finish all the ready tasks. If there are sleeping threads, post a
172 // notification to wake one up.
173 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
174 loop {
175 if threads_state.sleeping() == 0 {
176 // All threads are awake now. Prevent starvation.
177 return PollReadyTasksResult::MoreReady;
178 }
179 if threads_state.notified() >= threads_state.sleeping() {
180 // All sleeping threads have been notified. Keep going and poll more tasks.
181 break;
182 }
183 match self.try_notify(threads_state) {
184 Ok(()) => break,
185 Err(s) => threads_state = s,
186 }
187 }
188 }
189 }
190
191 pub fn is_local(&self) -> bool {
192 self.is_local
193 }
194
195 pub fn next_task_id(&self) -> usize {
196 self.task_count.fetch_add(1, Ordering::Relaxed)
197 }
198
199 pub fn notify_task_ready(&self) {
200 // Only post if there's no thread running (or soon to be running). If we happen to be
201 // running on a thread for this executor, then threads_state won't be equal to num_threads,
202 // which means notifications only get fired if this is from a non-async thread, or a thread
203 // that belongs to a different executor. We use SeqCst ordering here to make sure this load
204 // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
205 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
206
207 // We only want to notify if there are no pending notifications and there are no other
208 // threads running.
209 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
210 match self.try_notify(threads_state) {
211 Ok(()) => break,
212 Err(s) => threads_state = s,
213 }
214 }
215 }
216
217 /// Tries to notify a thread to wake up. Returns threads_state if it fails.
218 fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
219 self.threads_state
220 .compare_exchange_weak(
221 old_threads_state.0,
222 old_threads_state.0 + ThreadsState(0).with_notified(1).0,
223 Ordering::Relaxed,
224 Ordering::Relaxed,
225 )
226 .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
227 .map_err(ThreadsState)
228 }
229
230 pub fn wake_one_thread(&self) {
231 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
232 let current_sleeping = threads_state.sleeping();
233 if current_sleeping == 0 {
234 return;
235 }
236 while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
237 match self.try_notify(threads_state) {
238 Ok(()) => break,
239 Err(s) => threads_state = s,
240 }
241 }
242 }
243
244 pub fn notify_id(&self, id: u64) {
245 let up = zx::UserPacket::from_u8_array([0; 32]);
246 let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
247 if let Err(e) = self.port.queue(&packet) {
248 // TODO: logging
249 eprintln!("Failed to queue notify in port: {e:?}");
250 }
251 }
252
253 /// Returns the current reading of the monotonic clock.
254 ///
255 /// For test executors running in fake time, returns the reading of the
256 /// fake monotonic clock.
257 pub fn now(&self) -> MonotonicInstant {
258 match &self.time {
259 ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
260 ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
261 MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
262 }
263 }
264 }
265
266 /// Returns the current reading of the boot clock.
267 ///
268 /// For test executors running in fake time, returns the reading of the
269 /// fake boot clock.
270 pub fn boot_now(&self) -> BootInstant {
271 match &self.time {
272 ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
273
274 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
275 // The two atomic values are loaded one after the other. This should
276 // not normally be an issue in tests.
277 let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
278 let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
279 BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
280 }
281 }
282 }
283
284 /// Sets the reading of the fake monotonic clock.
285 ///
286 /// # Panics
287 ///
288 /// If called on an executor that runs in real time.
289 pub fn set_fake_time(&self, new: MonotonicInstant) {
290 let boot_offset_ns = match &self.time {
291 ExecutorTime::RealTime => {
292 panic!("Error: called `set_fake_time` on an executor using actual time.")
293 }
294 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
295 t.store(new.into_nanos(), Ordering::Relaxed);
296 mono_to_boot_offset_ns.load(Ordering::Relaxed)
297 }
298 };
299 self.monotonic_timers.maybe_notify(new);
300
301 // Changing fake time also affects boot time. Notify boot clocks as well.
302 let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
303 self.boot_timers.maybe_notify(new_boot_time);
304 }
305
306 // Sets a new offset between boot and monotonic time.
307 //
308 // Only works for executors operating in fake time.
309 // The change in the fake offset will wake expired boot timers.
310 pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
311 let mono_now_ns = match &self.time {
312 ExecutorTime::RealTime => {
313 panic!("Error: called `set_fake_boot_to_mono_offset` on an executor using actual time.")
314 }
315 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
316 // We ignore the non-atomic update between b and t, it is likely
317 // not relevant in tests.
318 b.store(offset.into_nanos(), Ordering::Relaxed);
319 t.load(Ordering::Relaxed)
320 }
321 };
322 let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
323 self.boot_timers.maybe_notify(new_boot_now);
324 }
325
326 /// Returns `true` if this executor is running in real time. Returns
327 /// `false` if this executor si running in fake time.
328 pub fn is_real_time(&self) -> bool {
329 matches!(self.time, ExecutorTime::RealTime)
330 }
331
332 /// Must be called before `on_parent_drop`.
333 ///
334 /// Done flag must be set before dropping packet receivers
335 /// so that future receivers that attempt to deregister themselves
336 /// know that it's okay if their entries are already missing.
337 pub fn mark_done(&self) {
338 self.done.store(true, Ordering::SeqCst);
339
340 // Make sure there's at least one notification outstanding per thread to wake up all
341 // workers. This might be more notifications than required, but this way we don't have to
342 // worry about races where tasks are just about to sleep; when a task receives the
343 // notification, it will check done and terminate.
344 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
345 let num_threads = self.num_threads;
346 loop {
347 let notified = threads_state.notified();
348 if notified >= num_threads {
349 break;
350 }
351 match self.threads_state.compare_exchange_weak(
352 threads_state.0,
353 threads_state.with_notified(num_threads).0,
354 Ordering::Relaxed,
355 Ordering::Relaxed,
356 ) {
357 Ok(_) => {
358 for _ in notified..num_threads {
359 self.notify_id(TASK_READY_WAKEUP_ID);
360 }
361 return;
362 }
363 Err(old) => threads_state = ThreadsState(old),
364 }
365 }
366 }
367
368 /// Notes about the lifecycle of an Executor.
369 ///
370 /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
371 /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
372 /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
373 /// the port is dropped alongside the Executor as it should.
374 /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
375 ///
376 /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
377 /// "current" executor being set, and that's unset when the executor is dropped.
378 ///
379 /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
380 /// and point (b) is related to "what happens when I try to create a new receiver when there
381 /// is no executor".
382 ///
383 /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
384 /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
385 /// references to it [2] instead of strong.
386 ///
387 /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
388 /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
389 /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
390 /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
391 /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
392 /// in that thread.
393 ///
394 /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
395 /// unregistered themselves when the executor drops. The choice to panic was made based on
396 /// patterns in fuchsia-async that may come to change:
397 ///
398 /// - Executor vends strong references to itself and those references are *stored* by most
399 /// receiver implementations (as opposed to reached out on TLS every time).
400 /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
401 /// easy to understand error to return when polling on an extinct executor.
402 /// - All receivers are implemented in this crate and well-known.
403 ///
404 /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
405 /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
406 /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
407 /// opaque non-clone-copy-or-send guard would be stronger than this. See:
408 /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
409 pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
410 // Drop all tasks.
411 // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
412 // as part of waking, there's an upgrade to a strong reference, so for a small amount of
413 // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
414 // future for the task which in turn could hold references to receivers, which, if we did
415 // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
416 // task futures here.
417 root_scope.drop_all_tasks();
418
419 // Drop all of the uncompleted tasks
420 while self.ready_tasks.pop().is_some() {}
421
422 // Deregister the timer receivers so that we can perform the check below.
423 self.monotonic_timers.deregister();
424 self.boot_timers.deregister();
425
426 // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
427 // happen. See discussion above.
428 //
429 // If you're here because you hit this panic check your code for:
430 //
431 // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
432 // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
433 //
434 // - A function scope that contains a fuchsia_async::Executor NOT in the first position
435 // (first position in function scope gets dropped last:
436 // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
437 //
438 // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
439 // contains a temporary (temporaries are dropped after the function scope:
440 // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
441 // looks like a `match` statement at the end of the function without a semicolon.
442 //
443 // - Storing channel and FIDL objects in static variables.
444 //
445 // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
446 assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
447
448 // Remove the thread-local executor set in `new`.
449 EHandle::rm_local();
450 }
451
452 // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
453 // the debugger needs to be updated.
454 // LINT.IfChange
455 pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
456 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
457
458 self.monotonic_timers.register(self);
459 self.boot_timers.register(self);
460
461 loop {
462 // Keep track of whether we are considered asleep.
463 let mut sleeping = false;
464
465 match self.poll_ready_tasks() {
466 PollReadyTasksResult::NoneReady => {
467 // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
468 // want this change here to happen *before* we check ready_tasks below. This
469 // synchronizes with notify_task_ready which is called *after* a task is added
470 // to ready_tasks.
471 const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
472 self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
473 // Check ready tasks again. If a task got posted, wake up. This has to be done
474 // because a notification won't get sent if there is at least one active thread
475 // so there's a window between the preceding two lines where a task could be
476 // made ready and a notification is not sent because it looks like there is at
477 // least one thread running.
478 if self.ready_tasks.is_empty() {
479 sleeping = true;
480 } else {
481 // We lost a race, we're no longer sleeping.
482 self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
483 }
484 }
485 PollReadyTasksResult::MoreReady => {}
486 PollReadyTasksResult::MainTaskCompleted => return,
487 }
488
489 // Check done here after updating threads_state to avoid shutdown races.
490 if self.done.load(Ordering::SeqCst) {
491 return;
492 }
493
494 enum Work {
495 None,
496 Packet(zx::Packet),
497 Stalled,
498 }
499
500 let mut notified = false;
501 let work = {
502 // If we're considered awake choose INFINITE_PAST which will make the wait call
503 // return immediately. Otherwise, wait until a packet arrives.
504 let deadline = if !sleeping || UNTIL_STALLED {
505 zx::Instant::INFINITE_PAST
506 } else {
507 zx::Instant::INFINITE
508 };
509
510 match self.port.wait(deadline) {
511 Ok(packet) => {
512 if packet.key() == TASK_READY_WAKEUP_ID {
513 notified = true;
514 Work::None
515 } else {
516 Work::Packet(packet)
517 }
518 }
519 Err(zx::Status::TIMED_OUT) => {
520 if !UNTIL_STALLED || !sleeping {
521 Work::None
522 } else {
523 Work::Stalled
524 }
525 }
526 Err(status) => {
527 panic!("Error calling port wait: {status:?}");
528 }
529 }
530 };
531
532 let threads_state_sub =
533 ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
534 if threads_state_sub.0 > 0 {
535 self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
536 }
537
538 match work {
539 Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
540 Work::None => {}
541 Work::Stalled => return,
542 }
543 }
544 }
545
546 /// Drops the main task.
547 ///
548 /// # Safety
549 ///
550 /// The caller must guarantee that the executor isn't running.
551 pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
552 root_scope.drop_task_unchecked(MAIN_TASK_ID);
553 }
554
555 fn try_poll(&self, task: TaskHandle) -> bool {
556 let task_waker = task.waker();
557 let poll_result = TaskHandle::set_current_with(&task, || {
558 task.try_poll(&mut Context::from_waker(&task_waker))
559 });
560 match poll_result {
561 AttemptPollResult::Yield => {
562 self.ready_tasks.push(task);
563 false
564 }
565 AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
566 task.scope().task_did_finish(task.id());
567 true
568 }
569 _ => false,
570 }
571 }
572
573 /// Returns the monotonic timers.
574 pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
575 &self.monotonic_timers
576 }
577
578 /// Returns the boot timers.
579 pub fn boot_timers(&self) -> &Timers<BootInstant> {
580 &self.boot_timers
581 }
582
583 fn poll_tasks(&self, callback: impl FnOnce()) {
584 assert!(!self.is_local);
585
586 // Increment the count of foreign threads.
587 const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
588 self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
589
590 callback();
591
592 // Poll up to 16 tasks.
593 for _ in 0..16 {
594 let Some(task) = self.ready_tasks.pop() else {
595 break;
596 };
597 let task_id = task.id();
598 if self.try_poll(task) && task_id == MAIN_TASK_ID {
599 break;
600 }
601 self.polled.fetch_add(1, Ordering::Relaxed);
602 }
603
604 let mut threads_state = ThreadsState(
605 self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
606 );
607
608 if !self.ready_tasks.is_empty() {
609 // There are tasks still ready to run, so wake up a thread if all the other threads are
610 // sleeping.
611 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
612 match self.try_notify(threads_state) {
613 Ok(()) => break,
614 Err(s) => threads_state = s,
615 }
616 }
617 }
618 }
619
620 pub fn task_is_ready(&self, task: TaskHandle) {
621 self.ready_tasks.push(task);
622 self.notify_task_ready();
623 }
624}
625
626#[cfg(test)]
627impl Drop for Executor {
628 fn drop(&mut self) {
629 ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
630 }
631}
632
633/// A handle to an executor.
634#[derive(Clone)]
635pub struct EHandle {
636 // LINT.IfChange
637 pub(super) root_scope: ScopeHandle,
638 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
639}
640
641impl fmt::Debug for EHandle {
642 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643 f.debug_struct("EHandle").field("port", &self.inner().port).finish()
644 }
645}
646
647impl EHandle {
648 /// Returns the thread-local executor.
649 ///
650 /// # Panics
651 ///
652 /// If called outside the context of an active async executor.
653 pub fn local() -> Self {
654 let root_scope = EXECUTOR
655 .with(|e| e.borrow().as_ref().map(|x| x.clone()))
656 .expect("Fuchsia Executor must be created first");
657
658 EHandle { root_scope }
659 }
660
661 pub(super) fn rm_local() {
662 EXECUTOR.with(|e| *e.borrow_mut() = None);
663 }
664
665 /// The root scope of the executor.
666 ///
667 /// This can be used to spawn tasks that live as long as the executor, and
668 /// to create shorter-lived child scopes.
669 ///
670 /// Most users should create an owned scope with
671 /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
672 pub fn global_scope(&self) -> &ScopeHandle {
673 &self.root_scope
674 }
675
676 /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
677 pub fn port(&self) -> &zx::Port {
678 &self.inner().port
679 }
680
681 /// Registers a `PacketReceiver` with the executor and returns a registration.
682 /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
683 pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
684 self.inner().receivers.register(self.inner().clone(), receiver)
685 }
686
687 /// Registers a pinned `RawPacketReceiver` with the executor.
688 ///
689 /// The registration will be deregistered when dropped.
690 ///
691 /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
692 /// held, so it is not safe to register or unregister receivers at that time.
693 pub fn register_pinned<T: PacketReceiver>(
694 &self,
695 raw_registration: Pin<&mut RawReceiverRegistration<T>>,
696 ) {
697 self.inner().receivers.register_pinned(self.clone(), raw_registration);
698 }
699
700 #[inline(always)]
701 pub(crate) fn inner(&self) -> &Arc<Executor> {
702 self.root_scope.executor()
703 }
704
705 /// Spawn a new task to be run on this executor.
706 ///
707 /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
708 /// may be run on either a singlethreaded or multithreaded executor.
709 pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
710 self.global_scope().spawn(future);
711 }
712
713 /// Spawn a new task to be run on this executor.
714 ///
715 /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
716 /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
717 /// this executor is a LocalExecutor.
718 pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
719 self.global_scope().spawn_local(future);
720 }
721
722 pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
723 &self.inner().monotonic_timers
724 }
725
726 pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
727 &self.inner().boot_timers
728 }
729
730 /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
731 /// that are ready to run. If tasks remain ready and no other threads are running, a thread
732 /// will be woken. This can end up being a performance win in the case that the queue can be
733 /// cleared without needing to wake any other thread.
734 ///
735 /// # Panics
736 ///
737 /// If called on a single-threaded executor or if this thread is a thread managed by the
738 /// executor.
739 pub fn poll_tasks(&self, callback: impl FnOnce()) {
740 EXECUTOR.with(|e| {
741 assert!(
742 e.borrow_mut().replace(self.root_scope.clone()).is_none(),
743 "This thread is already associated with an executor"
744 );
745 });
746
747 self.inner().poll_tasks(callback);
748
749 EXECUTOR.with(|e| *e.borrow_mut() = None);
750 }
751}
752
753// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
754// non-static lifetime). The executor doesn't handle this though; the executor just assumes all
755// tasks have the 'static lifetime. It's up to the local executor to extend the lifetime and make
756// it safe.
757pub type TaskHandle = AtomicFutureHandle<'static>;
758
759thread_local! {
760 static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
761}
762
763impl TaskHandle {
764 pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
765 CURRENT_TASK.with(|cur| {
766 let cur = cur.get();
767 let cur = unsafe { cur.as_ref() };
768 f(cur)
769 })
770 }
771
772 fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
773 CURRENT_TASK.with(|cur| {
774 cur.set(task);
775 let result = f();
776 cur.set(std::ptr::null());
777 result
778 })
779 }
780}
781
782#[cfg(test)]
783mod tests {
784 use super::{EHandle, ACTIVE_EXECUTORS};
785 use crate::SendExecutor;
786 use crossbeam::epoch;
787 use std::sync::atomic::{AtomicU64, Ordering};
788 use std::sync::Arc;
789
790 #[test]
791 fn test_no_leaks() {
792 std::thread::spawn(|| SendExecutor::new(1).run(async {})).join().unwrap();
793
794 // This seems like the only way to force crossbeam to do a collection.
795 while ACTIVE_EXECUTORS.load(Ordering::Relaxed) != 0 {
796 epoch::pin().defer(|| {});
797 }
798 }
799
800 #[test]
801 fn poll_tasks() {
802 SendExecutor::new(1).run(async {
803 let ehandle = EHandle::local();
804
805 // This will tie up the executor's only running thread which ensures that the task
806 // we spawn below can only run on the foreign thread.
807 std::thread::spawn(move || {
808 let ran = Arc::new(AtomicU64::new(0));
809 ehandle.poll_tasks(|| {
810 let ran = ran.clone();
811 ehandle.spawn_detached(async move {
812 ran.fetch_add(1, Ordering::Relaxed);
813 });
814 });
815
816 // The spawned task should have run in this thread.
817 assert_eq!(ran.load(Ordering::Relaxed), 1);
818 })
819 .join()
820 .unwrap();
821 });
822 }
823}