fuchsia_async/runtime/fuchsia/executor/
packets.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::common::Executor;
6use crossbeam::epoch;
7use fuchsia_sync::{RwLock, RwLockReadGuard};
8use rustc_hash::FxHashMap as HashMap;
9use std::fmt;
10use std::marker::{PhantomData, PhantomPinned};
11use std::mem::ManuallyDrop;
12use std::ops::Deref;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use pin_project_lite::pin_project;
17
18use super::common::EHandle;
19
20#[allow(dead_code)]
21struct ReceiverGuard<'a>(RwLockReadGuard<'a, Inner>);
22
23struct RawReceiverVTable {
24    receive_packet: fn(*const (), guard: ReceiverGuard<'_>, packet: zx::Packet),
25}
26
27/// A trait for handling the arrival of a packet on a `zx::Port`.
28///
29/// This trait should be implemented by users who wish to write their own
30/// types which receive asynchronous notifications from a `zx::Port`.
31/// Implementors of this trait generally contain a `futures::task::AtomicWaker` which
32/// is used to wake up the task which can make progress due to the arrival of
33/// the packet.
34///
35/// `PacketReceiver`s should be registered with a `Core` using the
36/// `register_receiver` method on `Core`, `Handle`, or `Remote`.
37/// Upon registration, users will receive a `ReceiverRegistration`
38/// which provides `key` and `port` methods. These methods can be used to wait on
39/// asynchronous signals.
40///
41/// Note that `PacketReceiver`s may receive false notifications intended for a
42/// previous receiver, and should handle these gracefully.
43pub trait PacketReceiver: Send + Sync + 'static {
44    /// Receive a packet when one arrives.
45    fn receive_packet(&self, packet: zx::Packet);
46}
47
48impl<T: PacketReceiver> PacketReceiver for Arc<T> {
49    fn receive_packet(&self, packet: zx::Packet) {
50        self.as_ref().receive_packet(packet);
51    }
52}
53
54// Simple slab::Slab replacement that doesn't re-use keys
55// TODO(https://fxbug.dev/42119369): figure out how to safely cancel async waits so we can re-use keys again.
56pub struct PacketReceiverMap(RwLock<Inner>);
57
58struct Inner {
59    next_key: u64,
60    mapping: HashMap<u64, (*const (), &'static RawReceiverVTable)>,
61}
62
63impl PacketReceiverMap {
64    /// Returns a new map.
65    pub fn new() -> Self {
66        Self(RwLock::new(Inner { next_key: 0, mapping: HashMap::default() }))
67    }
68
69    /// Notifies the packet receiver for `key`.
70    pub fn receive_packet(&self, key: u64, packet: zx::Packet) {
71        let inner = self.0.read();
72        if let Some(&(data, vtable)) = inner.mapping.get(&key) {
73            (vtable.receive_packet)(data, ReceiverGuard(inner), packet);
74        }
75    }
76
77    /// Registers a raw receiver.
78    fn register_raw<R>(
79        &self,
80        f: impl FnOnce(u64) -> (*const (), &'static RawReceiverVTable, R),
81    ) -> R {
82        let mut inner = self.0.write();
83        let key = inner.next_key;
84        inner.next_key = inner.next_key.checked_add(1).expect("ran out of keys");
85        let (data, vtable, result) = f(key);
86        inner.mapping.insert(key, (data, vtable));
87        result
88    }
89
90    /// Registers a `PacketReceiver` with the executor and returns a registration.
91    /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
92    pub fn register<T: PacketReceiver>(
93        &self,
94        executor: Arc<Executor>,
95        receiver: T,
96    ) -> ReceiverRegistration<T> {
97        struct Impl<T>(PhantomData<T>);
98
99        impl<T: PacketReceiver> Impl<T> {
100            const VTABLE: RawReceiverVTable = RawReceiverVTable {
101                receive_packet: |data, guard, packet| {
102                    // We want to drop the guard so that the receiver can be deregistered if
103                    // necessary.  To make that safe, we use an epoch based garbage collector: when
104                    // deregistered, the deallocation will be deferred until it's safe.
105                    let _epoch_guard = epoch::pin();
106                    drop(guard);
107
108                    // SAFETY: We are holding a guard on the epoch which will prevent the receiver
109                    // from being dropped.
110                    unsafe { &*(data as *const ReceiverRegistrationInner<T>) }
111                        .receiver
112                        .receive_packet(packet);
113                },
114            };
115        }
116
117        self.register_raw(|key| {
118            let result =
119                ReceiverRegistration(ManuallyDrop::new(Box::pin(ReceiverRegistrationInner {
120                    executor,
121                    key,
122                    receiver,
123                    _pinned: PhantomPinned,
124                })));
125            (
126                result.0.as_ref().get_ref() as *const ReceiverRegistrationInner<T> as *const (),
127                &Impl::<T>::VTABLE,
128                result,
129            )
130        })
131    }
132
133    /// Registers a pinned `RawPacketReceiver` with the executor.
134    ///
135    /// The registration will be deregistered when dropped.
136    ///
137    /// NOTE: Unlike with `register`, `receive_packet` will be called whilst a lock is held, so it
138    /// is not safe to register or unregister receivers within `receve_packet`.
139    pub fn register_pinned<T: PacketReceiver>(
140        &self,
141        ehandle: EHandle,
142        raw_registration: Pin<&mut RawReceiverRegistration<T>>,
143    ) {
144        struct Impl<T>(PhantomData<T>);
145
146        impl<T: PacketReceiver> Impl<T> {
147            const VTABLE: RawReceiverVTable = RawReceiverVTable {
148                receive_packet: |data, _guard, packet| {
149                    // SAFETY: This is reversing the cast we did below and we are holding a guard
150                    // which will prevent the receiver from being dropped.
151                    unsafe { &*(data as *const T) }.receive_packet(packet);
152                },
153            };
154        }
155
156        self.register_raw(|key| {
157            let reg = raw_registration.project();
158            *reg.registration = Some(Registration { ehandle, key });
159            (reg.receiver.as_ref().get_ref() as *const T as *const (), &Impl::<T>::VTABLE, ())
160        });
161    }
162
163    fn deregister(&self, key: u64) {
164        self.0.write().mapping.remove(&key).unwrap_or_else(|| panic!("invalid key"));
165    }
166
167    pub fn is_empty(&self) -> bool {
168        self.0.read().mapping.is_empty()
169    }
170}
171
172// SAFETY: All `PacketReceiver`s are `Send`.
173unsafe impl Send for PacketReceiverMap {}
174// SAFETY: All `PacketReceiver`s are `Sync`.
175unsafe impl Sync for PacketReceiverMap {}
176
177#[derive(Debug)]
178struct Registration {
179    ehandle: EHandle,
180    key: u64,
181}
182
183pin_project! {
184    /// A registration of a `PacketReceiver`.
185    /// When dropped, it will automatically deregister the `PacketReceiver`.
186    // NOTE: purposefully does not implement `Clone`.
187    #[derive(Debug)]
188    #[project(!Unpin)]
189    pub struct RawReceiverRegistration<T: ?Sized> {
190        registration: Option<Registration>,
191        #[pin]
192        receiver: T,
193    }
194
195    impl<T: ?Sized> PinnedDrop for RawReceiverRegistration<T> {
196        fn drop(this: Pin<&mut Self>) {
197            this.unregister();
198        }
199    }
200}
201
202impl<T: ?Sized> RawReceiverRegistration<T> {
203    /// Returns a new `ReceiverRegistration` wrapping the given `PacketReceiver`.
204    pub fn new(receiver: T) -> Self
205    where
206        T: Sized,
207    {
208        Self { registration: None, receiver }
209    }
210
211    /// Returns `true` if the receiver registration has been registered with an executor.
212    pub fn is_registered(&self) -> bool {
213        self.registration.is_some()
214    }
215
216    /// Registers the registration with an executor.
217    pub fn register(self: Pin<&mut Self>, ehandle: EHandle)
218    where
219        T: PacketReceiver + Sized,
220    {
221        if self.registration.is_none() {
222            ehandle.register_pinned(self);
223        }
224    }
225
226    /// Unregisters the registration from the executor.
227    ///
228    /// If the registration was registered, the executor handle and key of the
229    /// previous registration are returned. Otherwise, returns `None`.
230    pub fn unregister(self: Pin<&mut Self>) -> Option<(EHandle, u64)> {
231        let this = self.project();
232        if let Some(registration) = this.registration.take() {
233            registration.ehandle.inner().receivers.deregister(registration.key);
234            *this.registration = None;
235            Some((registration.ehandle, registration.key))
236        } else {
237            None
238        }
239    }
240
241    /// The key with which `Packet`s destined for this receiver should be sent on the `zx::Port`.
242    ///
243    /// Returns `None` if this registration has not been registered yet.
244    pub fn key(&self) -> Option<u64> {
245        Some(self.registration.as_ref()?.key)
246    }
247
248    /// The internal `PacketReceiver`.
249    ///
250    /// Returns `None` if this registration has not been registered yet.
251    pub fn receiver(&self) -> &T {
252        &self.receiver
253    }
254
255    /// The `zx::Port` on which packets destined for this `PacketReceiver` should be queued.
256    ///
257    /// Returns `None` if this registration has not been registered yet.
258    pub fn port(&self) -> Option<&zx::Port> {
259        Some(self.registration.as_ref()?.ehandle.port())
260    }
261}
262
263/// A registration of a `PacketReceiver`.
264///
265/// When dropped, it will automatically deregister the `PacketReceiver`. Unlike
266/// `RawReceiverRegistration`, this involves a heap allocation.
267// NOTE: purposefully does not implement `Clone`.
268#[derive(Debug)]
269pub struct ReceiverRegistration<T: Send + 'static>(
270    ManuallyDrop<Pin<Box<ReceiverRegistrationInner<T>>>>,
271);
272
273struct ReceiverRegistrationInner<T> {
274    executor: Arc<Executor>,
275    key: u64,
276    receiver: T,
277    _pinned: PhantomPinned,
278}
279
280impl<T: fmt::Debug> fmt::Debug for ReceiverRegistrationInner<T> {
281    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
282        f.debug_struct("ReceiverRegistrationInner")
283            .field("key", &self.key)
284            .field("receiver", &self.receiver)
285            .finish()
286    }
287}
288
289impl<T: Send + 'static> Drop for ReceiverRegistration<T> {
290    fn drop(&mut self) {
291        self.0.executor.receivers.deregister(self.0.key);
292
293        // SAFETY: This is the only place where we call `ManuallyDrop::take`.
294        let inner = unsafe { ManuallyDrop::take(&mut self.0) };
295
296        // We must defer the deallocation of inner as threads might currently be referencing inner.
297        epoch::pin().defer(|| inner);
298    }
299}
300
301impl<T: Send + 'static> ReceiverRegistration<T> {
302    /// The key with which `Packet`s destined for this receiver should be sent on the `zx::Port`.
303    pub fn key(&self) -> u64 {
304        self.0.key
305    }
306
307    /// The internal `PacketReceiver`.
308    pub fn receiver(&self) -> &T {
309        &self.0.receiver
310    }
311
312    /// The `zx::Port` on which packets destined for this `PacketReceiver` should be queued.
313    pub fn port(&self) -> &zx::Port {
314        &self.0.executor.port
315    }
316}
317
318impl<T: Send + 'static> Deref for ReceiverRegistration<T> {
319    type Target = T;
320    fn deref(&self) -> &Self::Target {
321        self.receiver()
322    }
323}
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328    use crate::LocalExecutor;
329
330    #[test]
331    fn packet_receiver_map_does_not_reuse_keys() {
332        #[derive(Debug, Clone, PartialEq)]
333        struct DummyPacketReceiver {
334            id: i32,
335        }
336
337        impl PacketReceiver for DummyPacketReceiver {
338            fn receive_packet(&self, _: zx::Packet) {
339                unimplemented!()
340            }
341        }
342
343        let _executor = LocalExecutor::new();
344        let reg1 = EHandle::local().register_receiver(DummyPacketReceiver { id: 1 });
345        assert_eq!(reg1.key(), 0);
346
347        let reg2 = EHandle::local().register_receiver(DummyPacketReceiver { id: 2 });
348        assert_eq!(reg2.key(), 1);
349
350        // Still doesn't reuse IDs after one is removed
351        drop(reg1);
352
353        let reg3 = EHandle::local().register_receiver(DummyPacketReceiver { id: 3 });
354        assert_eq!(reg3.key(), 2);
355    }
356}