ring_buffer/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::{self as fasync};
6use futures::task::AtomicWaker;
7use std::future::poll_fn;
8use std::ops::{Deref, Range};
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
10use std::sync::Arc;
11use std::task::Poll;
12use thiserror::Error;
13use zx::AsHandleRef;
14
15/// Size of the kernel header in the ring buffer. This is different to the FXT header.
16pub const RING_BUFFER_MESSAGE_HEADER_SIZE: usize = 16;
17
18/// Maximum message size. This includes the ring buffer header. This is also the minimum capacity
19/// for the ring buffer.
20pub const MAX_MESSAGE_SIZE: usize = 65536;
21
22// The ring buffer consists of a head index, tail index on the first page. The ring buffer proper
23// starts from the second page. The head and tail indices never wrap; modulo arithmetic is used to
24// get to the actual offset in the buffer.
25//
26// Messages in the ring buffer consist of a 64 bit tag, followed by a 64 bit length. The remainder
27// of the message is a message in the diagnostics log format. The tag and length are written by the
28// kernel, so can be trusted whilst the rest of the message can't be. Messages will always be 64
29// bit aligned, but the length doesn't have to be, which means that when the tail index is advanced,
30// it must always be advanced to maintain that alignment.
31
32const HEAD_OFFSET: usize = 0; // Offset of the head index in the ring buffer.
33const TAIL_OFFSET: usize = 8; // Offset of the tail index in the ring buffer.
34
35/// RingBuffer wraps a IOBuffer shared region and mapping that uses the ring buffer discipline.
36pub struct RingBuffer {
37    shared_region: zx::vdso_next::IobSharedRegion,
38    _vmar: zx::Vmar,
39    base: usize,
40    capacity: usize,
41}
42
43#[derive(Eq, Error, Debug, PartialEq)]
44pub enum Error {
45    #[error("attempt to read a message at an unaligned index")]
46    BadAlignment,
47    #[error("there is not enough room to read the header")]
48    TooSmall,
49    #[error("bad message length (e.g. too big or beyond bounds)")]
50    BadLength,
51}
52
53impl RingBuffer {
54    /// Returns a new RingBuffer and Reader. `capacity` must be a multiple of the page size and
55    /// should be at least `MAX_MESSAGE_SIZE`. The `capacity` does not include the additional page
56    /// used to store the head and tail indices.
57    pub fn create(capacity: usize) -> Reader {
58        let page_size = zx::system_get_page_size() as usize;
59
60        assert_eq!(capacity % page_size, 0);
61        assert!(capacity >= MAX_MESSAGE_SIZE);
62
63        let shared_region_size = capacity + page_size;
64        let shared_region =
65            zx::vdso_next::IobSharedRegion::create(Default::default(), shared_region_size as u64)
66                .unwrap();
67
68        // We only need one endpoint.
69        let (iob, _) = zx::Iob::create(
70            Default::default(),
71            &[zx::IobRegion {
72                region_type: zx::IobRegionType::Shared {
73                    options: Default::default(),
74                    region: &shared_region,
75                },
76                access: zx::IobAccess::EP0_CAN_MAP_READ | zx::IobAccess::EP0_CAN_MAP_WRITE,
77                discipline: zx::IobDiscipline::MediatedWriteRingBuffer { tag: 0 },
78            }],
79        )
80        .unwrap();
81
82        let root_vmar = fuchsia_runtime::vmar_root_self();
83
84        // Map the buffer but repeat the mapping for the first 64 KiB of the buffer at the end
85        // which makes dealing with wrapping much easier.
86        //
87        // NOTE: dropping the vmar will drop the mappings.
88        let (vmar, base) = root_vmar
89            .allocate(
90                0,
91                shared_region_size + MAX_MESSAGE_SIZE,
92                zx::VmarFlags::CAN_MAP_READ
93                    | zx::VmarFlags::CAN_MAP_WRITE
94                    | zx::VmarFlags::CAN_MAP_SPECIFIC,
95            )
96            .unwrap();
97        vmar.map_iob(
98            zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
99            /* vmar_offset */ 0,
100            &iob,
101            /* region_index */ 0,
102            /* region_offset */ 0,
103            /* region_len */ shared_region_size,
104        )
105        .unwrap();
106        vmar.map_iob(
107            zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE | zx::VmarFlags::SPECIFIC,
108            /* vmar_offset */ shared_region_size,
109            &iob,
110            /* region_index */ 0,
111            /* region_offset */ zx::system_get_page_size() as u64,
112            /* region_len */ MAX_MESSAGE_SIZE,
113        )
114        .unwrap();
115
116        let this = Arc::new(Self { shared_region, _vmar: vmar, base, capacity });
117        Reader {
118            ring_buffer: this,
119            registration: fasync::EHandle::local().register_receiver(Receiver::default()),
120        }
121    }
122
123    /// Returns an IOBuffer that can be used to write to the ring buffer. A tuple is returned; the
124    /// first IOBuffer in the tuple can be written to. The second IOBuffer is the peer and cannot
125    /// be written to or mapped but it can be monitored for peer closed.
126    pub fn new_iob_writer(&self, tag: u64) -> Result<(zx::Iob, zx::Iob), zx::Status> {
127        zx::Iob::create(
128            Default::default(),
129            &[zx::IobRegion {
130                region_type: zx::IobRegionType::Shared {
131                    options: Default::default(),
132                    region: &self.shared_region,
133                },
134                access: zx::IobAccess::EP0_CAN_MEDIATED_WRITE,
135                discipline: zx::IobDiscipline::MediatedWriteRingBuffer { tag },
136            }],
137        )
138    }
139
140    /// Returns the capacity of the ring buffer.
141    pub fn capacity(&self) -> usize {
142        self.capacity
143    }
144
145    /// Returns the value of the head pointer, read with Acquire ordering (which will synchronise
146    /// with an update to the head pointer in the kernel that uses Release ordering).
147    pub fn head(&self) -> u64 {
148        // SAFETY: This should be aligned and we mapped base, so the pointer should be
149        // dereferenceable.
150        unsafe { (*((self.base + HEAD_OFFSET) as *const AtomicU64)).load(Ordering::Acquire) }
151    }
152
153    /// Returns the value of the tail pointer, read with Acquire ordering (which will synchronise
154    /// with an update to the tail via `increment_tail` below; the kernel never changes the tail
155    /// pointer).
156    pub fn tail(&self) -> u64 {
157        // SAFETY: This should be aligned and we mapped base, so the pointer should be
158        // dereferenceable.
159        unsafe { (*((self.base + TAIL_OFFSET) as *const AtomicU64)).load(Ordering::Acquire) }
160    }
161
162    /// Increments the tail pointer, synchronized with Release ordering (which will synchronise with
163    /// the kernel that reads the tail pointer using Acquire ordering). `amount` should always be
164    /// a multiple of 8. See also `ring_buffer_record_len`.
165    pub fn increment_tail(&self, amount: usize) {
166        assert_eq!(amount % 8, 0);
167        // SAFETY: This should be aligned and we mapped base, so the pointer should be
168        // dereferenceable.
169        unsafe {
170            (*((self.base + TAIL_OFFSET) as *const AtomicU64))
171                .fetch_add(amount as u64, Ordering::Release);
172        }
173    }
174
175    fn index_to_offset(&self, index: u64) -> usize {
176        (index % self.capacity as u64) as usize
177    }
178
179    /// Reads T at `index` in the buffer.
180    ///
181    /// # SAFETY
182    ///
183    /// `index` must have the same alignment as `T`. The read is non-atomic which means it is
184    /// undefined behaviour if there is concurrent write access to the same location (across all
185    /// processes).
186    pub unsafe fn read<T>(&self, index: u64) -> T {
187        debug_assert_eq!(index % std::mem::align_of::<T>() as u64, 0);
188        debug_assert!(std::mem::size_of::<T>() <= MAX_MESSAGE_SIZE);
189        let offset = self.index_to_offset(index);
190        ((self.base + zx::system_get_page_size() as usize + offset) as *const T).read()
191    }
192
193    /// Returns a slice for the first message in `range`.
194    ///
195    /// # SAFETY
196    ///
197    /// The reads are non-atomic so there can be no other concurrent write access to the same range
198    /// (across all processes). The returned slice will only remain valid so long as there is no
199    /// other concurrent write access to the range.
200    pub unsafe fn first_message_in(&self, range: Range<u64>) -> Result<(u64, &[u8]), Error> {
201        if !range.start.is_multiple_of(8) {
202            return Err(Error::BadAlignment);
203        }
204        if range.end - range.start < RING_BUFFER_MESSAGE_HEADER_SIZE as u64 {
205            return Err(Error::TooSmall);
206        }
207        let tag = self.read(range.start);
208        let message_len: u64 = self.read(range.start + 8);
209        if message_len
210            > std::cmp::min(range.end - range.start, MAX_MESSAGE_SIZE as u64)
211                - RING_BUFFER_MESSAGE_HEADER_SIZE as u64
212        {
213            return Err(Error::BadLength);
214        }
215        let index = self.index_to_offset(range.start + 16);
216        Ok((
217            tag,
218            std::slice::from_raw_parts(
219                (self.base + zx::system_get_page_size() as usize + index) as *const u8,
220                message_len as usize,
221            ),
222        ))
223    }
224}
225
226/// Provides exclusive read access to the ring buffer.
227pub struct Reader {
228    ring_buffer: Arc<RingBuffer>,
229    registration: fasync::ReceiverRegistration<Receiver>,
230}
231
232impl Deref for Reader {
233    type Target = Arc<RingBuffer>;
234    fn deref(&self) -> &Self::Target {
235        &self.ring_buffer
236    }
237}
238
239impl Reader {
240    /// Waits for the head to exceed `index`. Returns head.
241    pub async fn wait(&mut self, index: u64) -> u64 {
242        poll_fn(|cx| {
243            // Check before registering the waker.
244            let head = self.head();
245            if head > index {
246                return Poll::Ready(head);
247            }
248            self.registration.waker.register(cx.waker());
249            if !self.registration.async_wait.swap(true, Ordering::Relaxed) {
250                self.shared_region
251                    .wait_async_handle(
252                        self.registration.port(),
253                        self.registration.key(),
254                        zx::Signals::IOB_SHARED_REGION_UPDATED,
255                        zx::WaitAsyncOpts::empty(),
256                    )
257                    .unwrap();
258            }
259            // Check again in case there was a race.
260            let head = self.head();
261            if head > index {
262                Poll::Ready(head)
263            } else {
264                Poll::Pending
265            }
266        })
267        .await
268    }
269
270    /// Reads a message from `tail`. If no message is ready, this will wait. This will advance
271    /// the `tail`.
272    pub async fn read_message(&mut self) -> Result<(u64, Vec<u8>), Error> {
273        let tail = self.tail();
274        let head = self.wait(tail).await;
275        // SAFETY: There should be no other concurrent write access to this memory because writing
276        // is only allowed via `new_iob_writer` above, and that will always write beyond `head`.
277        let message = unsafe {
278            self.first_message_in(tail..head).map(|(tag, message)| (tag, message.to_vec()))?
279        };
280        self.increment_tail(ring_buffer_record_len(message.1.len()));
281        Ok(message)
282    }
283}
284
285#[derive(Default)]
286struct Receiver {
287    waker: AtomicWaker,
288    async_wait: AtomicBool,
289}
290
291impl fasync::PacketReceiver for Receiver {
292    fn receive_packet(&self, _packet: zx::Packet) {
293        self.async_wait.store(false, Ordering::Relaxed);
294        self.waker.wake();
295    }
296}
297
298/// Returns the ring buffer record length given the message length. This accounts for the ring
299/// buffer message header and any padding required to maintain alignment.
300pub fn ring_buffer_record_len(message_len: usize) -> usize {
301    RING_BUFFER_MESSAGE_HEADER_SIZE + message_len.next_multiple_of(8)
302}
303
304#[cfg(test)]
305mod tests {
306    use super::{Error, RingBuffer, MAX_MESSAGE_SIZE, RING_BUFFER_MESSAGE_HEADER_SIZE};
307    use futures::stream::FuturesUnordered;
308    use futures::{FutureExt, StreamExt};
309    use std::sync::atomic::{AtomicU64, Ordering};
310
311    #[fuchsia::test]
312    async fn read_message() {
313        const TAG: u64 = 56;
314        let mut reader = RingBuffer::create(128 * 1024);
315        let (iob, _) = reader.new_iob_writer(TAG).unwrap();
316        const DATA: &[u8] = b"test";
317        iob.write(Default::default(), 0, DATA).unwrap();
318        let (tag, data) = reader.read_message().await.expect("read_message failed");
319        assert_eq!(tag, TAG);
320        assert_eq!(&data, DATA);
321    }
322
323    #[fuchsia::test]
324    async fn writing_wakes_reader() {
325        const TAG: u64 = 56;
326        let mut reader = RingBuffer::create(128 * 1024);
327        let (iob, _) = reader.new_iob_writer(TAG).unwrap();
328
329        // Use FuturesUnordered so that it uses its own waker.
330        let mut read_message = FuturesUnordered::from_iter([reader.read_message()]);
331
332        // Poll the reader once to prime it.
333        assert!(read_message.next().now_or_never().is_none());
334
335        const DATA: &[u8] = b"test";
336        iob.write(Default::default(), 0, DATA).unwrap();
337
338        // Check the reader is woken.
339        let (tag, data) = read_message.next().await.unwrap().expect("read_message failed");
340        assert_eq!(tag, TAG);
341        assert_eq!(&data, DATA);
342    }
343
344    #[fuchsia::test]
345    async fn corrupt() {
346        let mut reader = RingBuffer::create(128 * 1024);
347
348        const HEAD_OFFSET: usize = 0;
349        const TAIL_OFFSET: usize = 8;
350        let message_len_offset: usize = zx::system_get_page_size() as usize + 8;
351
352        let base = reader.base;
353        let write_u64 = |offset, value| unsafe {
354            (*((base + offset) as *const AtomicU64)).store(value, Ordering::Release);
355        };
356
357        // Unaligned tail
358        write_u64(TAIL_OFFSET, 1);
359        write_u64(HEAD_OFFSET, 8);
360        assert_eq!(reader.read_message().await, Err(Error::BadAlignment));
361
362        // Too small.
363        write_u64(TAIL_OFFSET, 0);
364        assert_eq!(reader.read_message().await, Err(Error::TooSmall));
365
366        // Exceeds max message size.
367        write_u64(HEAD_OFFSET, 32);
368        write_u64(
369            message_len_offset,
370            (MAX_MESSAGE_SIZE + RING_BUFFER_MESSAGE_HEADER_SIZE + 1) as u64,
371        );
372        assert_eq!(reader.read_message().await, Err(Error::BadLength));
373
374        // Message too big vs head - tail.
375        write_u64(message_len_offset, 17);
376        assert_eq!(reader.read_message().await, Err(Error::BadLength));
377
378        // And finally, a valid message, just to make sure there isn't another issue.
379        write_u64(message_len_offset, 16);
380        assert!(reader.read_message().await.is_ok());
381    }
382}