fuchsia_async/runtime/fuchsia/executor/
packets.rs1use super::common::Executor;
6use crossbeam::epoch;
7use fuchsia_sync::{RwLock, RwLockReadGuard};
8use rustc_hash::FxHashMap as HashMap;
9use std::fmt;
10use std::marker::{PhantomData, PhantomPinned};
11use std::mem::ManuallyDrop;
12use std::ops::Deref;
13use std::pin::Pin;
14use std::sync::Arc;
15
16use pin_project_lite::pin_project;
17
18use super::common::EHandle;
19
20#[allow(dead_code)]
21struct ReceiverGuard<'a>(RwLockReadGuard<'a, Inner>);
22
23struct RawReceiverVTable {
24 receive_packet: fn(*const (), guard: ReceiverGuard<'_>, packet: zx::Packet),
25}
26
27pub trait PacketReceiver: Send + Sync + 'static {
44 fn receive_packet(&self, packet: zx::Packet);
46}
47
48impl<T: PacketReceiver> PacketReceiver for Arc<T> {
49 fn receive_packet(&self, packet: zx::Packet) {
50 self.as_ref().receive_packet(packet);
51 }
52}
53
54pub struct PacketReceiverMap(RwLock<Inner>);
57
58struct Inner {
59 next_key: u64,
60 mapping: HashMap<u64, (*const (), &'static RawReceiverVTable)>,
61}
62
63impl PacketReceiverMap {
64 pub fn new() -> Self {
66 Self(RwLock::new(Inner { next_key: 0, mapping: HashMap::default() }))
67 }
68
69 pub fn receive_packet(&self, key: u64, packet: zx::Packet) {
71 let inner = self.0.read();
72 if let Some(&(data, vtable)) = inner.mapping.get(&key) {
73 (vtable.receive_packet)(data, ReceiverGuard(inner), packet);
74 }
75 }
76
77 fn register_raw<R>(
79 &self,
80 f: impl FnOnce(u64) -> (*const (), &'static RawReceiverVTable, R),
81 ) -> R {
82 let mut inner = self.0.write();
83 let key = inner.next_key;
84 inner.next_key = inner.next_key.checked_add(1).expect("ran out of keys");
85 let (data, vtable, result) = f(key);
86 inner.mapping.insert(key, (data, vtable));
87 result
88 }
89
90 pub fn register<T: PacketReceiver>(
93 &self,
94 executor: Arc<Executor>,
95 receiver: T,
96 ) -> ReceiverRegistration<T> {
97 struct Impl<T>(PhantomData<T>);
98
99 impl<T: PacketReceiver> Impl<T> {
100 const VTABLE: RawReceiverVTable = RawReceiverVTable {
101 receive_packet: |data, guard, packet| {
102 let _epoch_guard = epoch::pin();
106 drop(guard);
107
108 unsafe { &*(data as *const ReceiverRegistrationInner<T>) }
111 .receiver
112 .receive_packet(packet);
113 },
114 };
115 }
116
117 self.register_raw(|key| {
118 let result =
119 ReceiverRegistration(ManuallyDrop::new(Box::pin(ReceiverRegistrationInner {
120 executor,
121 key,
122 receiver,
123 _pinned: PhantomPinned,
124 })));
125 (
126 result.0.as_ref().get_ref() as *const ReceiverRegistrationInner<T> as *const (),
127 &Impl::<T>::VTABLE,
128 result,
129 )
130 })
131 }
132
133 pub fn register_pinned<T: PacketReceiver>(
140 &self,
141 ehandle: EHandle,
142 raw_registration: Pin<&mut RawReceiverRegistration<T>>,
143 ) {
144 struct Impl<T>(PhantomData<T>);
145
146 impl<T: PacketReceiver> Impl<T> {
147 const VTABLE: RawReceiverVTable = RawReceiverVTable {
148 receive_packet: |data, _guard, packet| {
149 unsafe { &*(data as *const T) }.receive_packet(packet);
152 },
153 };
154 }
155
156 self.register_raw(|key| {
157 let reg = raw_registration.project();
158 *reg.registration = Some(Registration { ehandle, key });
159 (reg.receiver.as_ref().get_ref() as *const T as *const (), &Impl::<T>::VTABLE, ())
160 });
161 }
162
163 fn deregister(&self, key: u64) {
164 self.0.write().mapping.remove(&key).unwrap_or_else(|| panic!("invalid key"));
165 }
166
167 pub fn is_empty(&self) -> bool {
168 self.0.read().mapping.is_empty()
169 }
170}
171
172unsafe impl Send for PacketReceiverMap {}
174unsafe impl Sync for PacketReceiverMap {}
176
177#[derive(Debug)]
178struct Registration {
179 ehandle: EHandle,
180 key: u64,
181}
182
183pin_project! {
184 #[derive(Debug)]
188 #[project(!Unpin)]
189 pub struct RawReceiverRegistration<T: ?Sized> {
190 registration: Option<Registration>,
191 #[pin]
192 receiver: T,
193 }
194
195 impl<T: ?Sized> PinnedDrop for RawReceiverRegistration<T> {
196 fn drop(this: Pin<&mut Self>) {
197 this.unregister();
198 }
199 }
200}
201
202impl<T: ?Sized> RawReceiverRegistration<T> {
203 pub fn new(receiver: T) -> Self
205 where
206 T: Sized,
207 {
208 Self { registration: None, receiver }
209 }
210
211 pub fn is_registered(&self) -> bool {
213 self.registration.is_some()
214 }
215
216 pub fn register(self: Pin<&mut Self>, ehandle: EHandle)
218 where
219 T: PacketReceiver + Sized,
220 {
221 if self.registration.is_none() {
222 ehandle.register_pinned(self);
223 }
224 }
225
226 pub fn unregister(self: Pin<&mut Self>) -> Option<(EHandle, u64)> {
231 let this = self.project();
232 if let Some(registration) = this.registration.take() {
233 registration.ehandle.inner().receivers.deregister(registration.key);
234 *this.registration = None;
235 Some((registration.ehandle, registration.key))
236 } else {
237 None
238 }
239 }
240
241 pub fn key(&self) -> Option<u64> {
245 Some(self.registration.as_ref()?.key)
246 }
247
248 pub fn receiver(&self) -> &T {
252 &self.receiver
253 }
254
255 pub fn port(&self) -> Option<&zx::Port> {
259 Some(self.registration.as_ref()?.ehandle.port())
260 }
261}
262
263#[derive(Debug)]
269pub struct ReceiverRegistration<T: Send + 'static>(
270 ManuallyDrop<Pin<Box<ReceiverRegistrationInner<T>>>>,
271);
272
273struct ReceiverRegistrationInner<T> {
274 executor: Arc<Executor>,
275 key: u64,
276 receiver: T,
277 _pinned: PhantomPinned,
278}
279
280impl<T: fmt::Debug> fmt::Debug for ReceiverRegistrationInner<T> {
281 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
282 f.debug_struct("ReceiverRegistrationInner")
283 .field("key", &self.key)
284 .field("receiver", &self.receiver)
285 .finish()
286 }
287}
288
289impl<T: Send + 'static> Drop for ReceiverRegistration<T> {
290 fn drop(&mut self) {
291 self.0.executor.receivers.deregister(self.0.key);
292
293 let inner = unsafe { ManuallyDrop::take(&mut self.0) };
295
296 epoch::pin().defer(|| inner);
298 }
299}
300
301impl<T: Send + 'static> ReceiverRegistration<T> {
302 pub fn key(&self) -> u64 {
304 self.0.key
305 }
306
307 pub fn receiver(&self) -> &T {
309 &self.0.receiver
310 }
311
312 pub fn port(&self) -> &zx::Port {
314 &self.0.executor.port
315 }
316}
317
318impl<T: Send + 'static> Deref for ReceiverRegistration<T> {
319 type Target = T;
320 fn deref(&self) -> &Self::Target {
321 self.receiver()
322 }
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328 use crate::LocalExecutor;
329
330 #[test]
331 fn packet_receiver_map_does_not_reuse_keys() {
332 #[derive(Debug, Clone, PartialEq)]
333 struct DummyPacketReceiver {
334 id: i32,
335 }
336
337 impl PacketReceiver for DummyPacketReceiver {
338 fn receive_packet(&self, _: zx::Packet) {
339 unimplemented!()
340 }
341 }
342
343 let _executor = LocalExecutor::new();
344 let reg1 = EHandle::local().register_receiver(DummyPacketReceiver { id: 1 });
345 assert_eq!(reg1.key(), 0);
346
347 let reg2 = EHandle::local().register_receiver(DummyPacketReceiver { id: 2 });
348 assert_eq!(reg2.key(), 1);
349
350 drop(reg1);
352
353 let reg3 = EHandle::local().register_receiver(DummyPacketReceiver { id: 3 });
354 assert_eq!(reg3.key(), 2);
355 }
356}