1use fuchsia_async::{self as fasync};
6use futures::task::AtomicWaker;
7use std::future::poll_fn;
8use std::ops::{Deref, Range};
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use std::task::Poll;
12use thiserror::Error;
13use zx::AsHandleRef;
14
15pub const RING_BUFFER_MESSAGE_HEADER_SIZE: usize = 16;
17
18pub const MAX_MESSAGE_SIZE: usize = 65536;
21
22const HEAD_OFFSET: usize = 0; const TAIL_OFFSET: usize = 8; pub struct RingBuffer {
37 shared_region: zx::vdso_next::IobSharedRegion,
38 _vmar: zx::Vmar,
39 base: usize,
40 capacity: usize,
41}
42
43#[derive(Eq, Error, Debug, PartialEq)]
44pub enum Error {
45 #[error("attempt to read a message at an unaligned index")]
46 BadAlignment,
47 #[error("there is not enough room to read the header")]
48 TooSmall,
49 #[error("bad message length (e.g. too big or beyond bounds)")]
50 BadLength,
51}
52
53impl RingBuffer {
54 pub fn create(capacity: usize) -> Reader {
58 let page_size = zx::system_get_page_size() as usize;
59
60 assert_eq!(capacity % page_size, 0);
61 assert!(capacity >= MAX_MESSAGE_SIZE);
62
63 let shared_region_size = capacity + page_size;
64 let shared_region =
65 zx::vdso_next::IobSharedRegion::create(Default::default(), shared_region_size as u64)
66 .unwrap();
67
68 let (iob, _) = zx::Iob::create(
70 Default::default(),
71 &[zx::IobRegion {
72 region_type: zx::IobRegionType::Shared {
73 options: Default::default(),
74 region: &shared_region,
75 },
76 access: zx::IobAccess::EP0_CAN_MAP_READ | zx::IobAccess::EP0_CAN_MAP_WRITE,
77 discipline: zx::IobDiscipline::MediatedWriteRingBuffer { tag: 0 },
78 }],
79 )
80 .unwrap();
81
82 let root_vmar = fuchsia_runtime::vmar_root_self();
83
84 let (vmar, base) = root_vmar
89 .allocate(
90 0,
91 shared_region_size + MAX_MESSAGE_SIZE,
92 zx::VmarFlags::CAN_MAP_READ
93 | zx::VmarFlags::CAN_MAP_WRITE
94 | zx::VmarFlags::CAN_MAP_SPECIFIC,
95 )
96 .unwrap();
97 vmar.map_iob(
98 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
99 0,
100 &iob,
101 0,
102 0,
103 shared_region_size,
104 )
105 .unwrap();
106 vmar.map_iob(
107 zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
108 shared_region_size,
109 &iob,
110 0,
111 zx::system_get_page_size() as u64,
112 MAX_MESSAGE_SIZE,
113 )
114 .unwrap();
115
116 let this = Arc::new(Self { shared_region, _vmar: vmar, base, capacity });
117 Reader {
118 ring_buffer: this,
119 registration: fasync::EHandle::local().register_receiver(Receiver::default()),
120 }
121 }
122
123 pub fn new_iob_writer(&self, tag: u64) -> Result<(zx::Iob, zx::Iob), zx::Status> {
127 zx::Iob::create(
128 Default::default(),
129 &[zx::IobRegion {
130 region_type: zx::IobRegionType::Shared {
131 options: Default::default(),
132 region: &self.shared_region,
133 },
134 access: zx::IobAccess::EP0_CAN_MEDIATED_WRITE,
135 discipline: zx::IobDiscipline::MediatedWriteRingBuffer { tag },
136 }],
137 )
138 }
139
140 pub fn capacity(&self) -> usize {
142 self.capacity
143 }
144
145 pub fn head(&self) -> u64 {
148 unsafe { (*((self.base + HEAD_OFFSET) as *const AtomicU64)).load(Ordering::Acquire) }
151 }
152
153 pub fn tail(&self) -> u64 {
157 unsafe { (*((self.base + TAIL_OFFSET) as *const AtomicU64)).load(Ordering::Acquire) }
160 }
161
162 pub fn increment_tail(&self, amount: usize) {
166 assert_eq!(amount % 8, 0);
167 unsafe {
170 (*((self.base + TAIL_OFFSET) as *const AtomicU64))
171 .fetch_add(amount as u64, Ordering::Release);
172 }
173 }
174
175 fn index_to_offset(&self, index: u64) -> usize {
176 (index % self.capacity as u64) as usize
177 }
178
179 pub unsafe fn read<T>(&self, index: u64) -> T {
187 debug_assert_eq!(index % std::mem::align_of::<T>() as u64, 0);
188 debug_assert!(std::mem::size_of::<T>() <= MAX_MESSAGE_SIZE);
189 let offset = self.index_to_offset(index);
190 ((self.base + zx::system_get_page_size() as usize + offset) as *const T).read()
191 }
192
193 pub unsafe fn first_message_in(&self, range: Range<u64>) -> Result<(u64, &[u8]), Error> {
201 if !range.start.is_multiple_of(8) {
202 return Err(Error::BadAlignment);
203 }
204 if range.end - range.start < RING_BUFFER_MESSAGE_HEADER_SIZE as u64 {
205 return Err(Error::TooSmall);
206 }
207 let tag = self.read(range.start);
208 let message_len: u64 = self.read(range.start + 8);
209 if message_len
210 > std::cmp::min(range.end - range.start, MAX_MESSAGE_SIZE as u64)
211 - RING_BUFFER_MESSAGE_HEADER_SIZE as u64
212 {
213 return Err(Error::BadLength);
214 }
215 let index = self.index_to_offset(range.start + 16);
216 Ok((
217 tag,
218 std::slice::from_raw_parts(
219 (self.base + zx::system_get_page_size() as usize + index) as *const u8,
220 message_len as usize,
221 ),
222 ))
223 }
224}
225
226pub struct Reader {
228 ring_buffer: Arc<RingBuffer>,
229 registration: fasync::ReceiverRegistration<Receiver>,
230}
231
232impl Deref for Reader {
233 type Target = Arc<RingBuffer>;
234 fn deref(&self) -> &Self::Target {
235 &self.ring_buffer
236 }
237}
238
239impl Reader {
240 pub async fn wait(&mut self, index: u64) -> u64 {
242 poll_fn(|cx| {
243 let head = self.head();
245 if head > index {
246 return Poll::Ready(head);
247 }
248 self.registration.waker.register(cx.waker());
249 if !self.registration.async_wait.swap(true, Ordering::Relaxed) {
250 self.shared_region
251 .wait_async_handle(
252 self.registration.port(),
253 self.registration.key(),
254 zx::Signals::IOB_SHARED_REGION_UPDATED,
255 zx::WaitAsyncOpts::empty(),
256 )
257 .unwrap();
258 }
259 let head = self.head();
261 if head > index {
262 Poll::Ready(head)
263 } else {
264 Poll::Pending
265 }
266 })
267 .await
268 }
269
270 pub async fn read_message(&mut self) -> Result<(u64, Vec<u8>), Error> {
273 let tail = self.tail();
274 let head = self.wait(tail).await;
275 let message = unsafe {
278 self.first_message_in(tail..head).map(|(tag, message)| (tag, message.to_vec()))?
279 };
280 self.increment_tail(ring_buffer_record_len(message.1.len()));
281 Ok(message)
282 }
283}
284
285#[derive(Default)]
286struct Receiver {
287 waker: AtomicWaker,
288 async_wait: AtomicBool,
289}
290
291impl fasync::PacketReceiver for Receiver {
292 fn receive_packet(&self, _packet: zx::Packet) {
293 self.async_wait.store(false, Ordering::Relaxed);
294 self.waker.wake();
295 }
296}
297
298pub fn ring_buffer_record_len(message_len: usize) -> usize {
301 RING_BUFFER_MESSAGE_HEADER_SIZE + message_len.next_multiple_of(8)
302}
303
304#[cfg(test)]
305mod tests {
306 use super::{Error, RingBuffer, MAX_MESSAGE_SIZE, RING_BUFFER_MESSAGE_HEADER_SIZE};
307 use futures::stream::FuturesUnordered;
308 use futures::{FutureExt, StreamExt};
309 use std::sync::atomic::{AtomicU64, Ordering};
310
311 #[fuchsia::test]
312 async fn read_message() {
313 const TAG: u64 = 56;
314 let mut reader = RingBuffer::create(128 * 1024);
315 let (iob, _) = reader.new_iob_writer(TAG).unwrap();
316 const DATA: &[u8] = b"test";
317 iob.write(Default::default(), 0, DATA).unwrap();
318 let (tag, data) = reader.read_message().await.expect("read_message failed");
319 assert_eq!(tag, TAG);
320 assert_eq!(&data, DATA);
321 }
322
323 #[fuchsia::test]
324 async fn writing_wakes_reader() {
325 const TAG: u64 = 56;
326 let mut reader = RingBuffer::create(128 * 1024);
327 let (iob, _) = reader.new_iob_writer(TAG).unwrap();
328
329 let mut read_message = FuturesUnordered::from_iter([reader.read_message()]);
331
332 assert!(read_message.next().now_or_never().is_none());
334
335 const DATA: &[u8] = b"test";
336 iob.write(Default::default(), 0, DATA).unwrap();
337
338 let (tag, data) = read_message.next().await.unwrap().expect("read_message failed");
340 assert_eq!(tag, TAG);
341 assert_eq!(&data, DATA);
342 }
343
344 #[fuchsia::test]
345 async fn corrupt() {
346 let mut reader = RingBuffer::create(128 * 1024);
347
348 const HEAD_OFFSET: usize = 0;
349 const TAIL_OFFSET: usize = 8;
350 let message_len_offset: usize = zx::system_get_page_size() as usize + 8;
351
352 let base = reader.base;
353 let write_u64 = |offset, value| unsafe {
354 (*((base + offset) as *const AtomicU64)).store(value, Ordering::Release);
355 };
356
357 write_u64(TAIL_OFFSET, 1);
359 write_u64(HEAD_OFFSET, 8);
360 assert_eq!(reader.read_message().await, Err(Error::BadAlignment));
361
362 write_u64(TAIL_OFFSET, 0);
364 assert_eq!(reader.read_message().await, Err(Error::TooSmall));
365
366 write_u64(HEAD_OFFSET, 32);
368 write_u64(
369 message_len_offset,
370 (MAX_MESSAGE_SIZE + RING_BUFFER_MESSAGE_HEADER_SIZE + 1) as u64,
371 );
372 assert_eq!(reader.read_message().await, Err(Error::BadLength));
373
374 write_u64(message_len_offset, 17);
376 assert_eq!(reader.read_message().await, Err(Error::BadLength));
377
378 write_u64(message_len_offset, 16);
380 assert!(reader.read_message().await.is_ok());
381 }
382}