fuchsia_async/runtime/fuchsia/executor/
common.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
7use super::packets::{
8    PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
9};
10use super::scope::ScopeHandle;
11use super::time::{BootInstant, MonotonicInstant};
12use crossbeam::queue::SegQueue;
13use fuchsia_sync::Mutex;
14use zx::BootDuration;
15
16use std::any::Any;
17use std::cell::{Cell, RefCell};
18use std::fmt;
19use std::future::Future;
20use std::pin::Pin;
21use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
22use std::sync::Arc;
23use std::task::Context;
24
25pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
26
27/// The id of the main task, which is a virtual task that lives from construction
28/// to destruction of the executor. The main task may correspond to multiple
29/// main futures, in cases where the executor runs multiple times during its lifetime.
30pub(crate) const MAIN_TASK_ID: usize = 0;
31
32thread_local!(
33    static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
34);
35
36pub enum ExecutorTime {
37    RealTime,
38    /// Fake readings used in tests.
39    FakeTime {
40        // The fake monotonic clock reading.
41        mono_reading_ns: AtomicI64,
42        // An offset to add to mono_reading_ns to get the reading of the boot
43        // clock, disregarding the difference in timelines.
44        //
45        // We disregard the fact that the reading and offset can not be
46        // read atomically, this is usually not relevant in tests.
47        mono_to_boot_offset_ns: AtomicI64,
48    },
49}
50
51enum PollReadyTasksResult {
52    NoneReady,
53    MoreReady,
54    MainTaskCompleted,
55}
56
57///  24           16           8            0
58///  +------------+------------+------------+
59///  |  foreign   |  notified  |  sleeping  |
60///  +------------+------------+------------+
61///
62///  sleeping : the number of threads sleeping
63///  notified : the number of notifications posted to wake sleeping threads
64///  foreign  : the number of foreign threads processing tasks
65#[derive(Clone, Copy, Eq, PartialEq)]
66struct ThreadsState(u32);
67
68impl ThreadsState {
69    const fn sleeping(&self) -> u8 {
70        self.0 as u8
71    }
72
73    const fn notified(&self) -> u8 {
74        (self.0 >> 8) as u8
75    }
76
77    const fn with_sleeping(self, sleeping: u8) -> Self {
78        Self((self.0 & !0xff) | sleeping as u32)
79    }
80
81    const fn with_notified(self, notified: u8) -> Self {
82        Self(self.0 & !0xff00 | (notified as u32) << 8)
83    }
84
85    const fn with_foreign(self, foreign: u8) -> Self {
86        Self(self.0 & !0xff0000 | (foreign as u32) << 16)
87    }
88}
89
90#[cfg(test)]
91static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
92
93pub(crate) struct Executor {
94    pub(super) port: zx::Port,
95    monotonic_timers: Arc<Timers<MonotonicInstant>>,
96    boot_timers: Arc<Timers<BootInstant>>,
97    pub(super) done: AtomicBool,
98    is_local: bool,
99    pub(crate) receivers: PacketReceiverMap,
100    task_count: AtomicUsize,
101    pub(super) ready_tasks: SegQueue<TaskHandle>,
102    time: ExecutorTime,
103    // The low byte is the number of threads currently sleeping. The high byte is the number of
104    // of wake-up notifications pending.
105    pub(super) threads_state: AtomicU32,
106    pub(super) num_threads: u8,
107    pub(super) polled: AtomicU64,
108    // Data that belongs to the user that can be accessed via EHandle::local(). See
109    // `TestExecutor::poll_until_stalled`.
110    pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
111}
112
113impl Executor {
114    pub fn new(time: ExecutorTime, is_local: bool, num_threads: u8) -> Self {
115        Self::new_with_port(time, is_local, num_threads, zx::Port::create())
116    }
117
118    pub fn new_with_port(
119        time: ExecutorTime,
120        is_local: bool,
121        num_threads: u8,
122        port: zx::Port,
123    ) -> Self {
124        #[cfg(test)]
125        ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
126
127        // Is this a fake-time executor?
128        let is_fake = matches!(
129            time,
130            ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
131        );
132
133        Executor {
134            port,
135            monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
136            boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
137            done: AtomicBool::new(false),
138            is_local,
139            receivers: PacketReceiverMap::new(),
140            task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
141            ready_tasks: SegQueue::new(),
142            time,
143            threads_state: AtomicU32::new(0),
144            num_threads,
145            polled: AtomicU64::new(0),
146            owner_data: Mutex::new(None),
147        }
148    }
149
150    pub fn set_local(root_scope: ScopeHandle) {
151        EXECUTOR.with(|e| {
152            let mut e = e.borrow_mut();
153            assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
154            *e = Some(root_scope);
155        });
156    }
157
158    fn poll_ready_tasks(&self) -> PollReadyTasksResult {
159        loop {
160            for _ in 0..16 {
161                let Some(task) = self.ready_tasks.pop() else {
162                    return PollReadyTasksResult::NoneReady;
163                };
164                let task_id = task.id();
165                let complete = self.try_poll(task);
166                if complete && task_id == MAIN_TASK_ID {
167                    return PollReadyTasksResult::MainTaskCompleted;
168                }
169                self.polled.fetch_add(1, Ordering::Relaxed);
170            }
171            // We didn't finish all the ready tasks. If there are sleeping threads, post a
172            // notification to wake one up.
173            let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
174            loop {
175                if threads_state.sleeping() == 0 {
176                    // All threads are awake now. Prevent starvation.
177                    return PollReadyTasksResult::MoreReady;
178                }
179                if threads_state.notified() >= threads_state.sleeping() {
180                    // All sleeping threads have been notified. Keep going and poll more tasks.
181                    break;
182                }
183                match self.try_notify(threads_state) {
184                    Ok(()) => break,
185                    Err(s) => threads_state = s,
186                }
187            }
188        }
189    }
190
191    pub fn is_local(&self) -> bool {
192        self.is_local
193    }
194
195    pub fn next_task_id(&self) -> usize {
196        self.task_count.fetch_add(1, Ordering::Relaxed)
197    }
198
199    pub fn notify_task_ready(&self) {
200        // Only post if there's no thread running (or soon to be running). If we happen to be
201        // running on a thread for this executor, then threads_state won't be equal to num_threads,
202        // which means notifications only get fired if this is from a non-async thread, or a thread
203        // that belongs to a different executor. We use SeqCst ordering here to make sure this load
204        // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
205        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
206
207        // We only want to notify if there are no pending notifications and there are no other
208        // threads running.
209        while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
210            match self.try_notify(threads_state) {
211                Ok(()) => break,
212                Err(s) => threads_state = s,
213            }
214        }
215    }
216
217    /// Tries to notify a thread to wake up. Returns threads_state if it fails.
218    fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
219        self.threads_state
220            .compare_exchange_weak(
221                old_threads_state.0,
222                old_threads_state.0 + ThreadsState(0).with_notified(1).0,
223                Ordering::Relaxed,
224                Ordering::Relaxed,
225            )
226            .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
227            .map_err(ThreadsState)
228    }
229
230    pub fn wake_one_thread(&self) {
231        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
232        let current_sleeping = threads_state.sleeping();
233        if current_sleeping == 0 {
234            return;
235        }
236        while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
237            match self.try_notify(threads_state) {
238                Ok(()) => break,
239                Err(s) => threads_state = s,
240            }
241        }
242    }
243
244    pub fn notify_id(&self, id: u64) {
245        let up = zx::UserPacket::from_u8_array([0; 32]);
246        let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
247        if let Err(e) = self.port.queue(&packet) {
248            // TODO: logging
249            eprintln!("Failed to queue notify in port: {e:?}");
250        }
251    }
252
253    /// Returns the current reading of the monotonic clock.
254    ///
255    /// For test executors running in fake time, returns the reading of the
256    /// fake monotonic clock.
257    pub fn now(&self) -> MonotonicInstant {
258        match &self.time {
259            ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
260            ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
261                MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
262            }
263        }
264    }
265
266    /// Returns the current reading of the boot clock.
267    ///
268    /// For test executors running in fake time, returns the reading of the
269    /// fake boot clock.
270    pub fn boot_now(&self) -> BootInstant {
271        match &self.time {
272            ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
273
274            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
275                // The two atomic values are loaded one after the other. This should
276                // not normally be an issue in tests.
277                let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
278                let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
279                BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
280            }
281        }
282    }
283
284    /// Sets the reading of the fake monotonic clock.
285    ///
286    /// # Panics
287    ///
288    /// If called on an executor that runs in real time.
289    pub fn set_fake_time(&self, new: MonotonicInstant) {
290        let boot_offset_ns = match &self.time {
291            ExecutorTime::RealTime => {
292                panic!("Error: called `set_fake_time` on an executor using actual time.")
293            }
294            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
295                t.store(new.into_nanos(), Ordering::Relaxed);
296                mono_to_boot_offset_ns.load(Ordering::Relaxed)
297            }
298        };
299        self.monotonic_timers.maybe_notify(new);
300
301        // Changing fake time also affects boot time.  Notify boot clocks as well.
302        let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
303        self.boot_timers.maybe_notify(new_boot_time);
304    }
305
306    // Sets a new offset between boot and monotonic time.
307    //
308    // Only works for executors operating in fake time.
309    // The change in the fake offset will wake expired boot timers.
310    pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
311        let mono_now_ns = match &self.time {
312            ExecutorTime::RealTime => {
313                panic!("Error: called `set_fake_boot_to_mono_offset` on an executor using actual time.")
314            }
315            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
316                // We ignore the non-atomic update between b and t, it is likely
317                // not relevant in tests.
318                b.store(offset.into_nanos(), Ordering::Relaxed);
319                t.load(Ordering::Relaxed)
320            }
321        };
322        let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
323        self.boot_timers.maybe_notify(new_boot_now);
324    }
325
326    /// Returns `true` if this executor is running in real time.  Returns
327    /// `false` if this executor si running in fake time.
328    pub fn is_real_time(&self) -> bool {
329        matches!(self.time, ExecutorTime::RealTime)
330    }
331
332    /// Must be called before `on_parent_drop`.
333    ///
334    /// Done flag must be set before dropping packet receivers
335    /// so that future receivers that attempt to deregister themselves
336    /// know that it's okay if their entries are already missing.
337    pub fn mark_done(&self) {
338        self.done.store(true, Ordering::SeqCst);
339
340        // Make sure there's at least one notification outstanding per thread to wake up all
341        // workers. This might be more notifications than required, but this way we don't have to
342        // worry about races where tasks are just about to sleep; when a task receives the
343        // notification, it will check done and terminate.
344        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
345        let num_threads = self.num_threads;
346        loop {
347            let notified = threads_state.notified();
348            if notified >= num_threads {
349                break;
350            }
351            match self.threads_state.compare_exchange_weak(
352                threads_state.0,
353                threads_state.with_notified(num_threads).0,
354                Ordering::Relaxed,
355                Ordering::Relaxed,
356            ) {
357                Ok(_) => {
358                    for _ in notified..num_threads {
359                        self.notify_id(TASK_READY_WAKEUP_ID);
360                    }
361                    return;
362                }
363                Err(old) => threads_state = ThreadsState(old),
364            }
365        }
366    }
367
368    /// Notes about the lifecycle of an Executor.
369    ///
370    /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
371    /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
372    /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
373    /// the port is dropped alongside the Executor as it should.
374    /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
375    ///
376    /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
377    /// "current" executor being set, and that's unset when the executor is dropped.
378    ///
379    /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
380    /// and point (b) is related to "what happens when I try to create a new receiver when there
381    /// is no executor".
382    ///
383    /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
384    /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
385    /// references to it [2] instead of strong.
386    ///
387    /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
388    /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
389    /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
390    /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
391    /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
392    /// in that thread.
393    ///
394    /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
395    /// unregistered themselves when the executor drops. The choice to panic was made based on
396    /// patterns in fuchsia-async that may come to change:
397    ///
398    /// - Executor vends strong references to itself and those references are *stored* by most
399    ///   receiver implementations (as opposed to reached out on TLS every time).
400    /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
401    ///   easy to understand error to return when polling on an extinct executor.
402    /// - All receivers are implemented in this crate and well-known.
403    ///
404    /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
405    /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
406    /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
407    /// opaque non-clone-copy-or-send guard would be stronger than this. See:
408    /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
409    pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
410        // Drop all tasks.
411        // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
412        // as part of waking, there's an upgrade to a strong reference, so for a small amount of
413        // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
414        // future for the task which in turn could hold references to receivers, which, if we did
415        // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
416        // task futures here.
417        root_scope.drop_all_tasks();
418
419        // Drop all of the uncompleted tasks
420        while self.ready_tasks.pop().is_some() {}
421
422        // Deregister the timer receivers so that we can perform the check below.
423        self.monotonic_timers.deregister();
424        self.boot_timers.deregister();
425
426        // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
427        // happen. See discussion above.
428        //
429        // If you're here because you hit this panic check your code for:
430        //
431        // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
432        // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
433        //
434        // - A function scope that contains a fuchsia_async::Executor NOT in the first position
435        // (first position in function scope gets dropped last:
436        // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
437        //
438        // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
439        // contains a temporary (temporaries are dropped after the function scope:
440        // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
441        // looks like a `match` statement at the end of the function without a semicolon.
442        //
443        // - Storing channel and FIDL objects in static variables.
444        //
445        // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
446        assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
447
448        // Remove the thread-local executor set in `new`.
449        EHandle::rm_local();
450    }
451
452    // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
453    // the debugger needs to be updated.
454    // LINT.IfChange
455    pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
456        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
457
458        self.monotonic_timers.register(self);
459        self.boot_timers.register(self);
460
461        loop {
462            // Keep track of whether we are considered asleep.
463            let mut sleeping = false;
464
465            match self.poll_ready_tasks() {
466                PollReadyTasksResult::NoneReady => {
467                    // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
468                    // want this change here to happen *before* we check ready_tasks below. This
469                    // synchronizes with notify_task_ready which is called *after* a task is added
470                    // to ready_tasks.
471                    const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
472                    self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
473                    // Check ready tasks again. If a task got posted, wake up. This has to be done
474                    // because a notification won't get sent if there is at least one active thread
475                    // so there's a window between the preceding two lines where a task could be
476                    // made ready and a notification is not sent because it looks like there is at
477                    // least one thread running.
478                    if self.ready_tasks.is_empty() {
479                        sleeping = true;
480                    } else {
481                        // We lost a race, we're no longer sleeping.
482                        self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
483                    }
484                }
485                PollReadyTasksResult::MoreReady => {}
486                PollReadyTasksResult::MainTaskCompleted => return,
487            }
488
489            // Check done here after updating threads_state to avoid shutdown races.
490            if self.done.load(Ordering::SeqCst) {
491                return;
492            }
493
494            enum Work {
495                None,
496                Packet(zx::Packet),
497                Stalled,
498            }
499
500            let mut notified = false;
501            let work = {
502                // If we're considered awake choose INFINITE_PAST which will make the wait call
503                // return immediately.  Otherwise, wait until a packet arrives.
504                let deadline = if !sleeping || UNTIL_STALLED {
505                    zx::Instant::INFINITE_PAST
506                } else {
507                    zx::Instant::INFINITE
508                };
509
510                match self.port.wait(deadline) {
511                    Ok(packet) => {
512                        if packet.key() == TASK_READY_WAKEUP_ID {
513                            notified = true;
514                            Work::None
515                        } else {
516                            Work::Packet(packet)
517                        }
518                    }
519                    Err(zx::Status::TIMED_OUT) => {
520                        if !UNTIL_STALLED || !sleeping {
521                            Work::None
522                        } else {
523                            Work::Stalled
524                        }
525                    }
526                    Err(status) => {
527                        panic!("Error calling port wait: {status:?}");
528                    }
529                }
530            };
531
532            let threads_state_sub =
533                ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
534            if threads_state_sub.0 > 0 {
535                self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
536            }
537
538            match work {
539                Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
540                Work::None => {}
541                Work::Stalled => return,
542            }
543        }
544    }
545
546    /// Drops the main task.
547    ///
548    /// # Safety
549    ///
550    /// The caller must guarantee that the executor isn't running.
551    pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
552        root_scope.drop_task_unchecked(MAIN_TASK_ID);
553    }
554
555    fn try_poll(&self, task: TaskHandle) -> bool {
556        let task_waker = task.waker();
557        let poll_result = TaskHandle::set_current_with(&task, || {
558            task.try_poll(&mut Context::from_waker(&task_waker))
559        });
560        match poll_result {
561            AttemptPollResult::Yield => {
562                self.ready_tasks.push(task);
563                false
564            }
565            AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
566                task.scope().task_did_finish(task.id());
567                true
568            }
569            _ => false,
570        }
571    }
572
573    /// Returns the monotonic timers.
574    pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
575        &self.monotonic_timers
576    }
577
578    /// Returns the boot timers.
579    pub fn boot_timers(&self) -> &Timers<BootInstant> {
580        &self.boot_timers
581    }
582
583    fn poll_tasks(&self, callback: impl FnOnce()) {
584        assert!(!self.is_local);
585
586        // Increment the count of foreign threads.
587        const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
588        self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
589
590        callback();
591
592        // Poll up to 16 tasks.
593        for _ in 0..16 {
594            let Some(task) = self.ready_tasks.pop() else {
595                break;
596            };
597            let task_id = task.id();
598            if self.try_poll(task) && task_id == MAIN_TASK_ID {
599                break;
600            }
601            self.polled.fetch_add(1, Ordering::Relaxed);
602        }
603
604        let mut threads_state = ThreadsState(
605            self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
606        );
607
608        if !self.ready_tasks.is_empty() {
609            // There are tasks still ready to run, so wake up a thread if all the other threads are
610            // sleeping.
611            while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
612                match self.try_notify(threads_state) {
613                    Ok(()) => break,
614                    Err(s) => threads_state = s,
615                }
616            }
617        }
618    }
619
620    pub fn task_is_ready(&self, task: TaskHandle) {
621        self.ready_tasks.push(task);
622        self.notify_task_ready();
623    }
624}
625
626#[cfg(test)]
627impl Drop for Executor {
628    fn drop(&mut self) {
629        ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
630    }
631}
632
633/// A handle to an executor.
634#[derive(Clone)]
635pub struct EHandle {
636    // LINT.IfChange
637    pub(super) root_scope: ScopeHandle,
638    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
639}
640
641impl fmt::Debug for EHandle {
642    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643        f.debug_struct("EHandle").field("port", &self.inner().port).finish()
644    }
645}
646
647impl EHandle {
648    /// Returns the thread-local executor.
649    ///
650    /// # Panics
651    ///
652    /// If called outside the context of an active async executor.
653    pub fn local() -> Self {
654        let root_scope = EXECUTOR
655            .with(|e| e.borrow().as_ref().map(|x| x.clone()))
656            .expect("Fuchsia Executor must be created first");
657
658        EHandle { root_scope }
659    }
660
661    pub(super) fn rm_local() {
662        EXECUTOR.with(|e| *e.borrow_mut() = None);
663    }
664
665    /// The root scope of the executor.
666    ///
667    /// This can be used to spawn tasks that live as long as the executor, and
668    /// to create shorter-lived child scopes.
669    ///
670    /// Most users should create an owned scope with
671    /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
672    pub fn global_scope(&self) -> &ScopeHandle {
673        &self.root_scope
674    }
675
676    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
677    pub fn port(&self) -> &zx::Port {
678        &self.inner().port
679    }
680
681    /// Registers a `PacketReceiver` with the executor and returns a registration.
682    /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
683    pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
684        self.inner().receivers.register(self.inner().clone(), receiver)
685    }
686
687    /// Registers a pinned `RawPacketReceiver` with the executor.
688    ///
689    /// The registration will be deregistered when dropped.
690    ///
691    /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
692    /// held, so it is not safe to register or unregister receivers at that time.
693    pub fn register_pinned<T: PacketReceiver>(
694        &self,
695        raw_registration: Pin<&mut RawReceiverRegistration<T>>,
696    ) {
697        self.inner().receivers.register_pinned(self.clone(), raw_registration);
698    }
699
700    #[inline(always)]
701    pub(crate) fn inner(&self) -> &Arc<Executor> {
702        self.root_scope.executor()
703    }
704
705    /// Spawn a new task to be run on this executor.
706    ///
707    /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
708    /// may be run on either a singlethreaded or multithreaded executor.
709    pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
710        self.global_scope().spawn(future);
711    }
712
713    /// Spawn a new task to be run on this executor.
714    ///
715    /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
716    /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
717    /// this executor is a LocalExecutor.
718    pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
719        self.global_scope().spawn_local(future);
720    }
721
722    pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
723        &self.inner().monotonic_timers
724    }
725
726    pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
727        &self.inner().boot_timers
728    }
729
730    /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
731    /// that are ready to run.  If tasks remain ready and no other threads are running, a thread
732    /// will be woken.  This can end up being a performance win in the case that the queue can be
733    /// cleared without needing to wake any other thread.
734    ///
735    /// # Panics
736    ///
737    /// If called on a single-threaded executor or if this thread is a thread managed by the
738    /// executor.
739    pub fn poll_tasks(&self, callback: impl FnOnce()) {
740        EXECUTOR.with(|e| {
741            assert!(
742                e.borrow_mut().replace(self.root_scope.clone()).is_none(),
743                "This thread is already associated with an executor"
744            );
745        });
746
747        self.inner().poll_tasks(callback);
748
749        EXECUTOR.with(|e| *e.borrow_mut() = None);
750    }
751}
752
753// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
754// non-static lifetime).  The executor doesn't handle this though; the executor just assumes all
755// tasks have the 'static lifetime.  It's up to the local executor to extend the lifetime and make
756// it safe.
757pub type TaskHandle = AtomicFutureHandle<'static>;
758
759thread_local! {
760    static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
761}
762
763impl TaskHandle {
764    pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
765        CURRENT_TASK.with(|cur| {
766            let cur = cur.get();
767            let cur = unsafe { cur.as_ref() };
768            f(cur)
769        })
770    }
771
772    fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
773        CURRENT_TASK.with(|cur| {
774            cur.set(task);
775            let result = f();
776            cur.set(std::ptr::null());
777            result
778        })
779    }
780}
781
782#[cfg(test)]
783mod tests {
784    use super::{EHandle, ACTIVE_EXECUTORS};
785    use crate::SendExecutor;
786    use crossbeam::epoch;
787    use std::sync::atomic::{AtomicU64, Ordering};
788    use std::sync::Arc;
789
790    #[test]
791    fn test_no_leaks() {
792        std::thread::spawn(|| SendExecutor::new(1).run(async {})).join().unwrap();
793
794        // This seems like the only way to force crossbeam to do a collection.
795        while ACTIVE_EXECUTORS.load(Ordering::Relaxed) != 0 {
796            epoch::pin().defer(|| {});
797        }
798    }
799
800    #[test]
801    fn poll_tasks() {
802        SendExecutor::new(1).run(async {
803            let ehandle = EHandle::local();
804
805            // This will tie up the executor's only running thread which ensures that the task
806            // we spawn below can only run on the foreign thread.
807            std::thread::spawn(move || {
808                let ran = Arc::new(AtomicU64::new(0));
809                ehandle.poll_tasks(|| {
810                    let ran = ran.clone();
811                    ehandle.spawn_detached(async move {
812                        ran.fetch_add(1, Ordering::Relaxed);
813                    });
814                });
815
816                // The spawned task should have run in this thread.
817                assert_eq!(ran.load(Ordering::Relaxed), 1);
818            })
819            .join()
820            .unwrap();
821        });
822    }
823}