fuchsia_async/runtime/fuchsia/executor/
common.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9    PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
24use std::sync::Arc;
25use std::task::Context;
26
27pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
28
29/// The id of the main task, which is a virtual task that lives from construction
30/// to destruction of the executor. The main task may correspond to multiple
31/// main futures, in cases where the executor runs multiple times during its lifetime.
32pub(crate) const MAIN_TASK_ID: usize = 0;
33
34thread_local!(
35    static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
36);
37
38pub enum ExecutorTime {
39    RealTime,
40    /// Fake readings used in tests.
41    FakeTime {
42        // The fake monotonic clock reading.
43        mono_reading_ns: AtomicI64,
44        // An offset to add to mono_reading_ns to get the reading of the boot
45        // clock, disregarding the difference in timelines.
46        //
47        // We disregard the fact that the reading and offset can not be
48        // read atomically, this is usually not relevant in tests.
49        mono_to_boot_offset_ns: AtomicI64,
50    },
51}
52
53enum PollReadyTasksResult {
54    NoneReady,
55    MoreReady,
56    MainTaskCompleted,
57}
58
59///  24           16           8            0
60///  +------------+------------+------------+
61///  |  foreign   |  notified  |  sleeping  |
62///  +------------+------------+------------+
63///
64///  sleeping : the number of threads sleeping
65///  notified : the number of notifications posted to wake sleeping threads
66///  foreign  : the number of foreign threads processing tasks
67#[derive(Clone, Copy, Eq, PartialEq)]
68struct ThreadsState(u32);
69
70impl ThreadsState {
71    const fn sleeping(&self) -> u8 {
72        self.0 as u8
73    }
74
75    const fn notified(&self) -> u8 {
76        (self.0 >> 8) as u8
77    }
78
79    const fn with_sleeping(self, sleeping: u8) -> Self {
80        Self((self.0 & !0xff) | sleeping as u32)
81    }
82
83    const fn with_notified(self, notified: u8) -> Self {
84        Self(self.0 & !0xff00 | (notified as u32) << 8)
85    }
86
87    const fn with_foreign(self, foreign: u8) -> Self {
88        Self(self.0 & !0xff0000 | (foreign as u32) << 16)
89    }
90}
91
92#[cfg(test)]
93static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
94
95pub(crate) struct Executor {
96    pub(super) port: zx::Port,
97    monotonic_timers: Arc<Timers<MonotonicInstant>>,
98    boot_timers: Arc<Timers<BootInstant>>,
99    pub(super) done: AtomicBool,
100    is_local: bool,
101    pub(crate) receivers: PacketReceiverMap,
102    task_count: AtomicUsize,
103    pub(super) ready_tasks: SegQueue<TaskHandle>,
104    time: ExecutorTime,
105    // The low byte is the number of threads currently sleeping. The high byte is the number of
106    // of wake-up notifications pending.
107    pub(super) threads_state: AtomicU32,
108    pub(super) num_threads: u8,
109    pub(super) polled: AtomicU64,
110    // Data that belongs to the user that can be accessed via EHandle::local(). See
111    // `TestExecutor::poll_until_stalled`.
112    pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
113    pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
114    pub(super) hooks_map: HooksMap,
115}
116
117impl Executor {
118    pub fn new(
119        time: ExecutorTime,
120        is_local: bool,
121        num_threads: u8,
122        instrument: Option<Arc<dyn TaskInstrument>>,
123    ) -> Self {
124        Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
125    }
126
127    pub fn new_with_port(
128        time: ExecutorTime,
129        is_local: bool,
130        num_threads: u8,
131        port: zx::Port,
132        instrument: Option<Arc<dyn TaskInstrument>>,
133    ) -> Self {
134        #[cfg(test)]
135        ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
136
137        // Is this a fake-time executor?
138        let is_fake = matches!(
139            time,
140            ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
141        );
142
143        Executor {
144            port,
145            monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
146            boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
147            done: AtomicBool::new(false),
148            is_local,
149            receivers: PacketReceiverMap::new(),
150            task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
151            ready_tasks: SegQueue::new(),
152            time,
153            threads_state: AtomicU32::new(0),
154            num_threads,
155            polled: AtomicU64::new(0),
156            owner_data: Mutex::new(None),
157            instrument,
158            hooks_map: HooksMap::default(),
159        }
160    }
161
162    pub fn set_local(root_scope: ScopeHandle) {
163        EXECUTOR.with(|e| {
164            let mut e = e.borrow_mut();
165            assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
166            *e = Some(root_scope);
167        });
168    }
169
170    fn poll_ready_tasks(&self) -> PollReadyTasksResult {
171        loop {
172            for _ in 0..16 {
173                let Some(task) = self.ready_tasks.pop() else {
174                    return PollReadyTasksResult::NoneReady;
175                };
176                let task_id = task.id();
177                let complete = self.try_poll(task);
178                if complete && task_id == MAIN_TASK_ID {
179                    return PollReadyTasksResult::MainTaskCompleted;
180                }
181                self.polled.fetch_add(1, Ordering::Relaxed);
182            }
183            // We didn't finish all the ready tasks. If there are sleeping threads, post a
184            // notification to wake one up.
185            let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
186            loop {
187                if threads_state.sleeping() == 0 {
188                    // All threads are awake now. Prevent starvation.
189                    return PollReadyTasksResult::MoreReady;
190                }
191                if threads_state.notified() >= threads_state.sleeping() {
192                    // All sleeping threads have been notified. Keep going and poll more tasks.
193                    break;
194                }
195                match self.try_notify(threads_state) {
196                    Ok(()) => break,
197                    Err(s) => threads_state = s,
198                }
199            }
200        }
201    }
202
203    pub fn is_local(&self) -> bool {
204        self.is_local
205    }
206
207    pub fn next_task_id(&self) -> usize {
208        self.task_count.fetch_add(1, Ordering::Relaxed)
209    }
210
211    pub fn notify_task_ready(&self) {
212        // Only post if there's no thread running (or soon to be running). If we happen to be
213        // running on a thread for this executor, then threads_state won't be equal to num_threads,
214        // which means notifications only get fired if this is from a non-async thread, or a thread
215        // that belongs to a different executor. We use SeqCst ordering here to make sure this load
216        // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
217        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
218
219        // We only want to notify if there are no pending notifications and there are no other
220        // threads running.
221        while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
222            match self.try_notify(threads_state) {
223                Ok(()) => break,
224                Err(s) => threads_state = s,
225            }
226        }
227    }
228
229    /// Tries to notify a thread to wake up. Returns threads_state if it fails.
230    fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
231        self.threads_state
232            .compare_exchange_weak(
233                old_threads_state.0,
234                old_threads_state.0 + ThreadsState(0).with_notified(1).0,
235                Ordering::Relaxed,
236                Ordering::Relaxed,
237            )
238            .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
239            .map_err(ThreadsState)
240    }
241
242    pub fn wake_one_thread(&self) {
243        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
244        let current_sleeping = threads_state.sleeping();
245        if current_sleeping == 0 {
246            return;
247        }
248        while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
249            match self.try_notify(threads_state) {
250                Ok(()) => break,
251                Err(s) => threads_state = s,
252            }
253        }
254    }
255
256    pub fn notify_id(&self, id: u64) {
257        let up = zx::UserPacket::from_u8_array([0; 32]);
258        let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
259        if let Err(e) = self.port.queue(&packet) {
260            // TODO: logging
261            eprintln!("Failed to queue notify in port: {e:?}");
262        }
263    }
264
265    /// Returns the current reading of the monotonic clock.
266    ///
267    /// For test executors running in fake time, returns the reading of the
268    /// fake monotonic clock.
269    pub fn now(&self) -> MonotonicInstant {
270        match &self.time {
271            ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
272            ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
273                MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
274            }
275        }
276    }
277
278    /// Returns the current reading of the boot clock.
279    ///
280    /// For test executors running in fake time, returns the reading of the
281    /// fake boot clock.
282    pub fn boot_now(&self) -> BootInstant {
283        match &self.time {
284            ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
285
286            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
287                // The two atomic values are loaded one after the other. This should
288                // not normally be an issue in tests.
289                let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
290                let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
291                BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
292            }
293        }
294    }
295
296    /// Sets the reading of the fake monotonic clock.
297    ///
298    /// # Panics
299    ///
300    /// If called on an executor that runs in real time.
301    pub fn set_fake_time(&self, new: MonotonicInstant) {
302        let boot_offset_ns = match &self.time {
303            ExecutorTime::RealTime => {
304                panic!("Error: called `set_fake_time` on an executor using actual time.")
305            }
306            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
307                t.store(new.into_nanos(), Ordering::Relaxed);
308                mono_to_boot_offset_ns.load(Ordering::Relaxed)
309            }
310        };
311        self.monotonic_timers.maybe_notify(new);
312
313        // Changing fake time also affects boot time.  Notify boot clocks as well.
314        let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
315        self.boot_timers.maybe_notify(new_boot_time);
316    }
317
318    // Sets a new offset between boot and monotonic time.
319    //
320    // Only works for executors operating in fake time.
321    // The change in the fake offset will wake expired boot timers.
322    pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
323        let mono_now_ns = match &self.time {
324            ExecutorTime::RealTime => {
325                panic!("Error: called `set_fake_boot_to_mono_offset` on an executor using actual time.")
326            }
327            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
328                // We ignore the non-atomic update between b and t, it is likely
329                // not relevant in tests.
330                b.store(offset.into_nanos(), Ordering::Relaxed);
331                t.load(Ordering::Relaxed)
332            }
333        };
334        let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
335        self.boot_timers.maybe_notify(new_boot_now);
336    }
337
338    /// Returns `true` if this executor is running in real time.  Returns
339    /// `false` if this executor si running in fake time.
340    pub fn is_real_time(&self) -> bool {
341        matches!(self.time, ExecutorTime::RealTime)
342    }
343
344    /// Must be called before `on_parent_drop`.
345    ///
346    /// Done flag must be set before dropping packet receivers
347    /// so that future receivers that attempt to deregister themselves
348    /// know that it's okay if their entries are already missing.
349    pub fn mark_done(&self) {
350        self.done.store(true, Ordering::SeqCst);
351
352        // Make sure there's at least one notification outstanding per thread to wake up all
353        // workers. This might be more notifications than required, but this way we don't have to
354        // worry about races where tasks are just about to sleep; when a task receives the
355        // notification, it will check done and terminate.
356        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
357        let num_threads = self.num_threads;
358        loop {
359            let notified = threads_state.notified();
360            if notified >= num_threads {
361                break;
362            }
363            match self.threads_state.compare_exchange_weak(
364                threads_state.0,
365                threads_state.with_notified(num_threads).0,
366                Ordering::Relaxed,
367                Ordering::Relaxed,
368            ) {
369                Ok(_) => {
370                    for _ in notified..num_threads {
371                        self.notify_id(TASK_READY_WAKEUP_ID);
372                    }
373                    return;
374                }
375                Err(old) => threads_state = ThreadsState(old),
376            }
377        }
378    }
379
380    /// Notes about the lifecycle of an Executor.
381    ///
382    /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
383    /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
384    /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
385    /// the port is dropped alongside the Executor as it should.
386    /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
387    ///
388    /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
389    /// "current" executor being set, and that's unset when the executor is dropped.
390    ///
391    /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
392    /// and point (b) is related to "what happens when I try to create a new receiver when there
393    /// is no executor".
394    ///
395    /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
396    /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
397    /// references to it [2] instead of strong.
398    ///
399    /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
400    /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
401    /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
402    /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
403    /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
404    /// in that thread.
405    ///
406    /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
407    /// unregistered themselves when the executor drops. The choice to panic was made based on
408    /// patterns in fuchsia-async that may come to change:
409    ///
410    /// - Executor vends strong references to itself and those references are *stored* by most
411    ///   receiver implementations (as opposed to reached out on TLS every time).
412    /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
413    ///   easy to understand error to return when polling on an extinct executor.
414    /// - All receivers are implemented in this crate and well-known.
415    ///
416    /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
417    /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
418    /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
419    /// opaque non-clone-copy-or-send guard would be stronger than this. See:
420    /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
421    pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
422        // Drop all tasks.
423        // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
424        // as part of waking, there's an upgrade to a strong reference, so for a small amount of
425        // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
426        // future for the task which in turn could hold references to receivers, which, if we did
427        // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
428        // task futures here.
429        root_scope.drop_all_tasks();
430
431        // Drop all of the uncompleted tasks
432        while self.ready_tasks.pop().is_some() {}
433
434        // Deregister the timer receivers so that we can perform the check below.
435        self.monotonic_timers.deregister();
436        self.boot_timers.deregister();
437
438        // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
439        // happen. See discussion above.
440        //
441        // If you're here because you hit this panic check your code for:
442        //
443        // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
444        // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
445        //
446        // - A function scope that contains a fuchsia_async::Executor NOT in the first position
447        // (first position in function scope gets dropped last:
448        // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
449        //
450        // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
451        // contains a temporary (temporaries are dropped after the function scope:
452        // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
453        // looks like a `match` statement at the end of the function without a semicolon.
454        //
455        // - Storing channel and FIDL objects in static variables.
456        //
457        // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
458        assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
459
460        // Remove the thread-local executor set in `new`.
461        EHandle::rm_local();
462    }
463
464    // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
465    // the debugger needs to be updated.
466    // LINT.IfChange
467    pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
468        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
469
470        self.monotonic_timers.register(self);
471        self.boot_timers.register(self);
472
473        loop {
474            // Keep track of whether we are considered asleep.
475            let mut sleeping = false;
476
477            match self.poll_ready_tasks() {
478                PollReadyTasksResult::NoneReady => {
479                    // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
480                    // want this change here to happen *before* we check ready_tasks below. This
481                    // synchronizes with notify_task_ready which is called *after* a task is added
482                    // to ready_tasks.
483                    const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
484                    self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
485                    // Check ready tasks again. If a task got posted, wake up. This has to be done
486                    // because a notification won't get sent if there is at least one active thread
487                    // so there's a window between the preceding two lines where a task could be
488                    // made ready and a notification is not sent because it looks like there is at
489                    // least one thread running.
490                    if self.ready_tasks.is_empty() {
491                        sleeping = true;
492                    } else {
493                        // We lost a race, we're no longer sleeping.
494                        self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
495                    }
496                }
497                PollReadyTasksResult::MoreReady => {}
498                PollReadyTasksResult::MainTaskCompleted => return,
499            }
500
501            // Check done here after updating threads_state to avoid shutdown races.
502            if self.done.load(Ordering::SeqCst) {
503                return;
504            }
505
506            enum Work {
507                None,
508                Packet(zx::Packet),
509                Stalled,
510            }
511
512            let mut notified = false;
513            let work = {
514                // If we're considered awake choose INFINITE_PAST which will make the wait call
515                // return immediately.  Otherwise, wait until a packet arrives.
516                let deadline = if !sleeping || UNTIL_STALLED {
517                    zx::Instant::INFINITE_PAST
518                } else {
519                    zx::Instant::INFINITE
520                };
521
522                match self.port.wait(deadline) {
523                    Ok(packet) => {
524                        if packet.key() == TASK_READY_WAKEUP_ID {
525                            notified = true;
526                            Work::None
527                        } else {
528                            Work::Packet(packet)
529                        }
530                    }
531                    Err(zx::Status::TIMED_OUT) => {
532                        if !UNTIL_STALLED || !sleeping {
533                            Work::None
534                        } else {
535                            Work::Stalled
536                        }
537                    }
538                    Err(status) => {
539                        panic!("Error calling port wait: {status:?}");
540                    }
541                }
542            };
543
544            let threads_state_sub =
545                ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
546            if threads_state_sub.0 > 0 {
547                self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
548            }
549
550            match work {
551                Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
552                Work::None => {}
553                Work::Stalled => return,
554            }
555        }
556    }
557
558    /// Drops the main task.
559    ///
560    /// # Safety
561    ///
562    /// The caller must guarantee that the executor isn't running.
563    pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
564        root_scope.drop_task_unchecked(MAIN_TASK_ID);
565    }
566
567    fn try_poll(&self, task: TaskHandle) -> bool {
568        let task_waker = task.waker();
569        let poll_result = TaskHandle::set_current_with(&task, || {
570            task.try_poll(&mut Context::from_waker(&task_waker))
571        });
572        match poll_result {
573            AttemptPollResult::Yield => {
574                self.ready_tasks.push(task);
575                false
576            }
577            AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
578                task.scope().task_did_finish(task.id());
579                true
580            }
581            _ => false,
582        }
583    }
584
585    /// Returns the monotonic timers.
586    pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
587        &self.monotonic_timers
588    }
589
590    /// Returns the boot timers.
591    pub fn boot_timers(&self) -> &Timers<BootInstant> {
592        &self.boot_timers
593    }
594
595    fn poll_tasks(&self, callback: impl FnOnce()) {
596        assert!(!self.is_local);
597
598        // Increment the count of foreign threads.
599        const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
600        self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
601
602        callback();
603
604        // Poll up to 16 tasks.
605        for _ in 0..16 {
606            let Some(task) = self.ready_tasks.pop() else {
607                break;
608            };
609            let task_id = task.id();
610            if self.try_poll(task) && task_id == MAIN_TASK_ID {
611                break;
612            }
613            self.polled.fetch_add(1, Ordering::Relaxed);
614        }
615
616        let mut threads_state = ThreadsState(
617            self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
618        );
619
620        if !self.ready_tasks.is_empty() {
621            // There are tasks still ready to run, so wake up a thread if all the other threads are
622            // sleeping.
623            while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
624                match self.try_notify(threads_state) {
625                    Ok(()) => break,
626                    Err(s) => threads_state = s,
627                }
628            }
629        }
630    }
631
632    pub fn task_is_ready(&self, task: TaskHandle) {
633        self.ready_tasks.push(task);
634        self.notify_task_ready();
635    }
636}
637
638#[cfg(test)]
639impl Drop for Executor {
640    fn drop(&mut self) {
641        ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
642    }
643}
644
645/// A handle to an executor.
646#[derive(Clone)]
647pub struct EHandle {
648    // LINT.IfChange
649    pub(super) root_scope: ScopeHandle,
650    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
651}
652
653impl fmt::Debug for EHandle {
654    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
655        f.debug_struct("EHandle").field("port", &self.inner().port).finish()
656    }
657}
658
659impl EHandle {
660    /// Returns the thread-local executor.
661    ///
662    /// # Panics
663    ///
664    /// If called outside the context of an active async executor.
665    pub fn local() -> Self {
666        let root_scope = EXECUTOR
667            .with(|e| e.borrow().as_ref().map(|x| x.clone()))
668            .expect("Fuchsia Executor must be created first");
669
670        EHandle { root_scope }
671    }
672
673    /// Set the fake time to a given value.
674    ///
675    /// # Panics
676    ///
677    /// If the executor was not created with fake time.
678    pub fn set_fake_time(&self, t: MonotonicInstant) {
679        self.inner().set_fake_time(t)
680    }
681
682    pub(super) fn rm_local() {
683        EXECUTOR.with(|e| *e.borrow_mut() = None);
684    }
685
686    /// The root scope of the executor.
687    ///
688    /// This can be used to spawn tasks that live as long as the executor, and
689    /// to create shorter-lived child scopes.
690    ///
691    /// Most users should create an owned scope with
692    /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
693    pub fn global_scope(&self) -> &ScopeHandle {
694        &self.root_scope
695    }
696
697    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
698    pub fn port(&self) -> &zx::Port {
699        &self.inner().port
700    }
701
702    /// Registers a `PacketReceiver` with the executor and returns a registration.
703    /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
704    pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
705        self.inner().receivers.register(self.inner().clone(), receiver)
706    }
707
708    /// Registers a pinned `RawPacketReceiver` with the executor.
709    ///
710    /// The registration will be deregistered when dropped.
711    ///
712    /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
713    /// held, so it is not safe to register or unregister receivers at that time.
714    pub fn register_pinned<T: PacketReceiver>(
715        &self,
716        raw_registration: Pin<&mut RawReceiverRegistration<T>>,
717    ) {
718        self.inner().receivers.register_pinned(self.clone(), raw_registration);
719    }
720
721    #[inline(always)]
722    pub(crate) fn inner(&self) -> &Arc<Executor> {
723        self.root_scope.executor()
724    }
725
726    /// Spawn a new task to be run on this executor.
727    ///
728    /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
729    /// may be run on either a singlethreaded or multithreaded executor.
730    pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
731        self.global_scope().spawn(future);
732    }
733
734    /// Spawn a new task to be run on this executor.
735    ///
736    /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
737    /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
738    /// this executor is a LocalExecutor.
739    pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
740        self.global_scope().spawn_local(future);
741    }
742
743    pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
744        &self.inner().monotonic_timers
745    }
746
747    pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
748        &self.inner().boot_timers
749    }
750
751    /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
752    /// that are ready to run.  If tasks remain ready and no other threads are running, a thread
753    /// will be woken.  This can end up being a performance win in the case that the queue can be
754    /// cleared without needing to wake any other thread.
755    ///
756    /// # Panics
757    ///
758    /// If called on a single-threaded executor or if this thread is a thread managed by the
759    /// executor.
760    pub fn poll_tasks(&self, callback: impl FnOnce()) {
761        EXECUTOR.with(|e| {
762            assert!(
763                e.borrow_mut().replace(self.root_scope.clone()).is_none(),
764                "This thread is already associated with an executor"
765            );
766        });
767
768        self.inner().poll_tasks(callback);
769
770        EXECUTOR.with(|e| *e.borrow_mut() = None);
771    }
772}
773
774// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
775// non-static lifetime).  The executor doesn't handle this though; the executor just assumes all
776// tasks have the 'static lifetime.  It's up to the local executor to extend the lifetime and make
777// it safe.
778pub type TaskHandle = AtomicFutureHandle<'static>;
779
780thread_local! {
781    static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
782}
783
784impl TaskHandle {
785    pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
786        CURRENT_TASK.with(|cur| {
787            let cur = cur.get();
788            let cur = unsafe { cur.as_ref() };
789            f(cur)
790        })
791    }
792
793    fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
794        CURRENT_TASK.with(|cur| {
795            cur.set(task);
796            let result = f();
797            cur.set(std::ptr::null());
798            result
799        })
800    }
801}
802
803#[cfg(test)]
804mod tests {
805    use super::{EHandle, ACTIVE_EXECUTORS};
806    use crate::SendExecutorBuilder;
807    use crossbeam::epoch;
808    use std::sync::atomic::{AtomicU64, Ordering};
809    use std::sync::Arc;
810
811    #[test]
812    fn test_no_leaks() {
813        std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
814            .join()
815            .unwrap();
816
817        // This seems like the only way to force crossbeam to do a collection.
818        while ACTIVE_EXECUTORS.load(Ordering::Relaxed) != 0 {
819            epoch::pin().defer(|| {});
820        }
821    }
822
823    #[test]
824    fn poll_tasks() {
825        SendExecutorBuilder::new().num_threads(1).build().run(async {
826            let ehandle = EHandle::local();
827
828            // This will tie up the executor's only running thread which ensures that the task
829            // we spawn below can only run on the foreign thread.
830            std::thread::spawn(move || {
831                let ran = Arc::new(AtomicU64::new(0));
832                ehandle.poll_tasks(|| {
833                    let ran = ran.clone();
834                    ehandle.spawn_detached(async move {
835                        ran.fetch_add(1, Ordering::Relaxed);
836                    });
837                });
838
839                // The spawned task should have run in this thread.
840                assert_eq!(ran.load(Ordering::Relaxed), 1);
841            })
842            .join()
843            .unwrap();
844        });
845    }
846}