netstack3_tcp/
buffer.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Defines the buffer traits needed by the TCP implementation. The traits
6//! in this module provide a common interface for platform-specific buffers
7//! used by TCP.
8
9use netstack3_base::{Payload, SackBlocks, SeqNum};
10
11use arrayvec::ArrayVec;
12use core::fmt::Debug;
13use core::ops::Range;
14use packet::InnerPacketBuilder;
15
16use crate::internal::base::BufferSizes;
17use crate::internal::seq_ranges::{SeqRange, SeqRanges};
18
19/// Common super trait for both sending and receiving buffer.
20pub trait Buffer: Debug + Sized {
21    /// Returns the capacity range `(min, max)` for this buffer type.
22    fn capacity_range() -> (usize, usize);
23
24    /// Returns information about the number of bytes in the buffer.
25    ///
26    /// Returns a [`BufferLimits`] instance with information about the number of
27    /// bytes in the buffer.
28    fn limits(&self) -> BufferLimits;
29
30    /// Gets the target size of the buffer, in bytes.
31    ///
32    /// The target capacity of the buffer is distinct from the actual capacity
33    /// (returned by [`Buffer::capacity`]) in that the target capacity should
34    /// remain fixed unless requested otherwise, while the actual capacity can
35    /// vary with usage.
36    ///
37    /// For fixed-size buffers this should return the same result as calling
38    /// `self.capacity()`. For buffer types that support resizing, the
39    /// returned value can be different but should not change unless a resize
40    /// was requested.
41    fn target_capacity(&self) -> usize;
42
43    /// Requests that the buffer be resized to hold the given number of bytes.
44    ///
45    /// Calling this method suggests to the buffer that it should alter its size.
46    /// Implementations are free to impose constraints or ignore requests
47    /// entirely.
48    fn request_capacity(&mut self, size: usize);
49}
50
51/// A buffer supporting TCP receiving operations.
52pub trait ReceiveBuffer: Buffer {
53    /// Writes `data` into the buffer at `offset`.
54    ///
55    /// Returns the number of bytes written.
56    fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize;
57
58    /// Marks `count` bytes available for the application to read.
59    ///
60    /// `has_outstanding` informs the buffer if any bytes past `count` may have
61    /// been populated by out of order segments.
62    ///
63    /// # Panics
64    ///
65    /// Panics if the caller attempts to make more bytes readable than the
66    /// buffer has capacity for. That is, this method panics if `self.len() +
67    /// count > self.cap()`
68    fn make_readable(&mut self, count: usize, has_outstanding: bool);
69}
70
71/// A buffer supporting TCP sending operations.
72pub trait SendBuffer: Buffer {
73    /// The payload type given to `peek_with`.
74    type Payload<'a>: InnerPacketBuilder + Payload + Debug + 'a;
75
76    /// Removes `count` bytes from the beginning of the buffer as already read.
77    ///
78    /// # Panics
79    ///
80    /// Panics if more bytes are marked as read than are available, i.e.,
81    /// `count > self.len`.
82    fn mark_read(&mut self, count: usize);
83
84    /// Calls `f` with contiguous sequences of readable bytes in the buffer
85    /// without advancing the reading pointer.
86    ///
87    /// # Panics
88    ///
89    /// Panics if more bytes are peeked than are available, i.e.,
90    /// `offset > self.len`
91    fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
92    where
93        F: FnOnce(Self::Payload<'a>) -> R;
94}
95
96/// Information about the number of bytes in a [`Buffer`].
97#[derive(Eq, PartialEq, Debug, Copy, Clone)]
98pub struct BufferLimits {
99    /// The total number of bytes that the buffer can hold.
100    pub capacity: usize,
101
102    /// The number of readable bytes that the buffer currently holds.
103    pub len: usize,
104}
105
106/// Assembler for out-of-order segment data.
107#[derive(Debug)]
108#[cfg_attr(test, derive(PartialEq, Eq))]
109pub(super) struct Assembler {
110    // `nxt` is the next sequence number to be expected. It should be before
111    // any sequnce number of the out-of-order sequence numbers we keep track
112    // of below.
113    nxt: SeqNum,
114    // Keeps track of the "age" of segments in the outstanding queue. Every time
115    // a segment is inserted, the generation increases. This allows
116    // RFC-compliant ordering of selective ACK blocks.
117    generation: usize,
118    // Holds all the sequence number ranges which we have already received.
119    // These ranges are sorted and should have a gap of at least 1 byte
120    // between any consecutive two. These ranges should only be after `nxt`.
121    // Each range is tagged with the generation that last modified it.
122    outstanding: SeqRanges<usize>,
123}
124
125impl Assembler {
126    /// Creates a new assembler.
127    pub(super) fn new(nxt: SeqNum) -> Self {
128        Self { outstanding: SeqRanges::default(), generation: 0, nxt }
129    }
130
131    /// Returns the next sequence number expected to be received.
132    pub(super) fn nxt(&self) -> SeqNum {
133        self.nxt
134    }
135
136    /// Returns whether there are out-of-order segments waiting to be
137    /// acknowledged.
138    pub(super) fn has_out_of_order(&self) -> bool {
139        !self.outstanding.is_empty()
140    }
141
142    /// Inserts a received segment.
143    ///
144    /// The newly added segment will be merged with as many existing ones as
145    /// possible and `nxt` will be advanced to the highest ACK number possible.
146    ///
147    /// Returns number of bytes that should be available for the application
148    /// to consume.
149    ///
150    /// # Panics
151    ///
152    /// Panics if `start` is after `end` or if `start` is before `self.nxt`.
153    pub(super) fn insert(&mut self, Range { start, end }: Range<SeqNum>) -> usize {
154        assert!(!start.after(end));
155        assert!(!start.before(self.nxt));
156        if start == end {
157            return 0;
158        }
159
160        let Self { outstanding, nxt, generation } = self;
161        *generation = *generation + 1;
162        let _: bool = outstanding.insert(start..end, *generation);
163
164        if let Some(advanced) = outstanding.pop_front_if(|r| r.start() == *nxt) {
165            *nxt = advanced.end();
166            usize::try_from(advanced.len()).unwrap()
167        } else {
168            0
169        }
170    }
171
172    pub(super) fn has_outstanding(&self) -> bool {
173        let Self { outstanding, nxt: _, generation: _ } = self;
174        !outstanding.is_empty()
175    }
176
177    /// Returns the current outstanding selective ack blocks in the assembler.
178    ///
179    /// The returned blocks are sorted according to [RFC 2018 section 4]:
180    ///
181    /// * The first SACK block (i.e., the one immediately following the kind and
182    ///   length fields in the option) MUST specify the contiguous block of data
183    ///   containing the segment which triggered this ACK. [...]
184    /// * The SACK option SHOULD be filled out by repeating the most recently
185    ///   reported SACK blocks [...]
186    ///
187    /// This is achieved by always returning the blocks that were most recently
188    /// changed by incoming segments.
189    ///
190    /// [RFC 2018 section 4]:
191    ///     https://datatracker.ietf.org/doc/html/rfc2018#section-4
192    pub(crate) fn sack_blocks(&self) -> SackBlocks {
193        let Self { nxt: _, generation: _, outstanding } = self;
194        // Fast exit, no outstanding blocks.
195        if outstanding.is_empty() {
196            return SackBlocks::default();
197        }
198
199        let mut heap = ArrayVec::<&SeqRange<_>, { SackBlocks::MAX_BLOCKS }>::new();
200        for block in outstanding.iter() {
201            if heap.is_full() {
202                if heap.last().is_some_and(|l| l.meta() < block.meta()) {
203                    // New block is later than the earliest block in the heap.
204                    let _: Option<_> = heap.pop();
205                } else {
206                    // New block is earlier than the earliest block in the heap,
207                    // pass.
208                    continue;
209                }
210            }
211
212            heap.push(block);
213            // Sort heap larger generation to lower.
214            heap.sort_by(|a, b| b.meta().cmp(&a.meta()))
215        }
216
217        SackBlocks::from_iter(heap.into_iter().map(|block| block.to_sack_block()))
218    }
219}
220
221/// A conversion trait that converts the object that Bindings give us into a
222/// pair of receive and send buffers.
223pub trait IntoBuffers<R: ReceiveBuffer, S: SendBuffer> {
224    /// Converts to receive and send buffers.
225    fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S);
226}
227
228#[cfg(any(test, feature = "testutils"))]
229impl<R: Default + ReceiveBuffer, S: Default + SendBuffer> IntoBuffers<R, S> for () {
230    fn into_buffers(self, buffer_sizes: BufferSizes) -> (R, S) {
231        // Ignore buffer sizes since this is a test-only impl.
232        let BufferSizes { send: _, receive: _ } = buffer_sizes;
233        Default::default()
234    }
235}
236
237#[cfg(any(test, feature = "testutils"))]
238pub(crate) mod testutil {
239    use super::*;
240
241    use alloc::sync::Arc;
242    use alloc::vec;
243    use alloc::vec::Vec;
244    use core::cmp;
245
246    use either::Either;
247    use netstack3_base::sync::Mutex;
248    use netstack3_base::{FragmentedPayload, WindowSize};
249
250    use crate::internal::socket::accept_queue::ListenerNotifier;
251
252    /// A circular buffer implementation.
253    ///
254    /// A [`RingBuffer`] holds a logically contiguous ring of memory in three
255    /// regions:
256    ///
257    /// - *readable*: memory is available for reading and not for writing,
258    /// - *writable*: memory that is available for writing and not for reading,
259    /// - *reserved*: memory that was read from and is no longer available
260    ///   for reading or for writing.
261    ///
262    /// Zero or more of these regions can be empty, and a region of memory can
263    /// transition from one to another in a few different ways:
264    ///
265    /// *Readable* memory, once read, becomes writable.
266    ///
267    /// *Writable* memory, once marked as such, becomes readable.
268    #[cfg_attr(any(test, feature = "testutils"), derive(Clone, PartialEq, Eq))]
269    pub struct RingBuffer {
270        pub(super) storage: Vec<u8>,
271        /// The index where the reader starts to read.
272        ///
273        /// Maintains the invariant that `head < storage.len()` by wrapping
274        /// around to 0 as needed.
275        pub(super) head: usize,
276        /// The amount of readable data in `storage`.
277        ///
278        /// Anything between [head, head+len) is readable. This will never exceed
279        /// `storage.len()`.
280        pub(super) len: usize,
281    }
282
283    impl Debug for RingBuffer {
284        fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
285            let Self { storage, head, len } = self;
286            f.debug_struct("RingBuffer")
287                .field("storage (len, cap)", &(storage.len(), storage.capacity()))
288                .field("head", head)
289                .field("len", len)
290                .finish()
291        }
292    }
293
294    impl Default for RingBuffer {
295        fn default() -> Self {
296            Self::new(WindowSize::DEFAULT.into())
297        }
298    }
299
300    impl RingBuffer {
301        /// Creates a new `RingBuffer`.
302        pub fn new(capacity: usize) -> Self {
303            Self { storage: vec![0; capacity], head: 0, len: 0 }
304        }
305
306        /// Resets the buffer to be entirely unwritten.
307        pub fn reset(&mut self) {
308            let Self { storage: _, head, len } = self;
309            *head = 0;
310            *len = 0;
311        }
312
313        /// Calls `f` on the contiguous sequences from `start` up to `len` bytes.
314        fn with_readable<'a, F, R>(storage: &'a Vec<u8>, start: usize, len: usize, f: F) -> R
315        where
316            F: for<'b> FnOnce(&'b [&'a [u8]]) -> R,
317        {
318            // Don't read past the end of storage.
319            let end = start + len;
320            if end > storage.len() {
321                let first_part = &storage[start..storage.len()];
322                let second_part = &storage[0..len - first_part.len()];
323                f(&[first_part, second_part][..])
324            } else {
325                let all_bytes = &storage[start..end];
326                f(&[all_bytes][..])
327            }
328        }
329
330        /// Calls `f` with contiguous sequences of readable bytes in the buffer and
331        /// discards the amount of bytes returned by `f`.
332        ///
333        /// # Panics
334        ///
335        /// Panics if the closure wants to discard more bytes than possible, i.e.,
336        /// the value returned by `f` is greater than `self.len()`.
337        pub fn read_with<F>(&mut self, f: F) -> usize
338        where
339            F: for<'a, 'b> FnOnce(&'b [&'a [u8]]) -> usize,
340        {
341            let Self { storage, head, len } = self;
342            if storage.len() == 0 {
343                return f(&[&[]]);
344            }
345            let nread = RingBuffer::with_readable(storage, *head, *len, f);
346            assert!(nread <= *len);
347            *len -= nread;
348            *head = (*head + nread) % storage.len();
349            nread
350        }
351
352        /// Returns the writable regions of the [`RingBuffer`].
353        pub fn writable_regions(&mut self) -> impl IntoIterator<Item = &mut [u8]> {
354            let BufferLimits { capacity, len } = self.limits();
355            let available = capacity - len;
356            let Self { storage, head, len } = self;
357
358            let mut write_start = *head + *len;
359            if write_start >= storage.len() {
360                write_start -= storage.len()
361            }
362            let write_end = write_start + available;
363            if write_end <= storage.len() {
364                Either::Left([&mut self.storage[write_start..write_end]].into_iter())
365            } else {
366                let (b1, b2) = self.storage[..].split_at_mut(write_start);
367                let b2_len = b2.len();
368                Either::Right([b2, &mut b1[..(available - b2_len)]].into_iter())
369            }
370        }
371    }
372
373    impl Buffer for RingBuffer {
374        fn capacity_range() -> (usize, usize) {
375            // Arbitrarily chosen to satisfy tests so we have some semblance of
376            // clamping capacity in tests.
377            (16, 16 << 20)
378        }
379
380        fn limits(&self) -> BufferLimits {
381            let Self { storage, len, head: _ } = self;
382            let capacity = storage.len();
383            BufferLimits { len: *len, capacity }
384        }
385
386        fn target_capacity(&self) -> usize {
387            let Self { storage, len: _, head: _ } = self;
388            storage.len()
389        }
390
391        fn request_capacity(&mut self, size: usize) {
392            unimplemented!("capacity request for {size} not supported")
393        }
394    }
395
396    impl ReceiveBuffer for RingBuffer {
397        fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
398            let BufferLimits { capacity, len } = self.limits();
399            let available = capacity - len;
400            let Self { storage, head, len } = self;
401            if storage.len() == 0 {
402                return 0;
403            }
404
405            if offset > available {
406                return 0;
407            }
408            let start_at = (*head + *len + offset) % storage.len();
409            let to_write = cmp::min(data.len(), available);
410            // Write the first part of the payload.
411            let first_len = cmp::min(to_write, storage.len() - start_at);
412            data.partial_copy(0, &mut storage[start_at..start_at + first_len]);
413            // If we have more to write, wrap around and start from the beginning
414            // of the storage.
415            if to_write > first_len {
416                data.partial_copy(first_len, &mut storage[0..to_write - first_len]);
417            }
418            to_write
419        }
420
421        fn make_readable(&mut self, count: usize, _has_outstanding: bool) {
422            let BufferLimits { capacity, len } = self.limits();
423            debug_assert!(count <= capacity - len);
424            self.len += count;
425        }
426    }
427
428    impl SendBuffer for RingBuffer {
429        type Payload<'a> = FragmentedPayload<'a, 2>;
430
431        fn mark_read(&mut self, count: usize) {
432            let Self { storage, head, len } = self;
433            assert!(count <= *len);
434            *len -= count;
435            *head = (*head + count) % storage.len();
436        }
437
438        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
439        where
440            F: FnOnce(Self::Payload<'a>) -> R,
441        {
442            let Self { storage, head, len } = self;
443            if storage.len() == 0 {
444                return f(FragmentedPayload::new_empty());
445            }
446            assert!(offset <= *len);
447            RingBuffer::with_readable(
448                storage,
449                (*head + offset) % storage.len(),
450                *len - offset,
451                |readable| f(readable.into_iter().map(|x| *x).collect()),
452            )
453        }
454    }
455
456    impl RingBuffer {
457        /// Enqueues as much of `data` as possible to the end of the buffer.
458        ///
459        /// Returns the number of bytes actually queued.
460        pub(crate) fn enqueue_data(&mut self, data: &[u8]) -> usize {
461            let nwritten = self.write_at(0, &data);
462            self.make_readable(nwritten, false);
463            nwritten
464        }
465    }
466
467    impl Buffer for Arc<Mutex<RingBuffer>> {
468        fn capacity_range() -> (usize, usize) {
469            RingBuffer::capacity_range()
470        }
471
472        fn limits(&self) -> BufferLimits {
473            self.lock().limits()
474        }
475
476        fn target_capacity(&self) -> usize {
477            self.lock().target_capacity()
478        }
479
480        fn request_capacity(&mut self, size: usize) {
481            self.lock().request_capacity(size)
482        }
483    }
484
485    impl ReceiveBuffer for Arc<Mutex<RingBuffer>> {
486        fn write_at<P: Payload>(&mut self, offset: usize, data: &P) -> usize {
487            self.lock().write_at(offset, data)
488        }
489
490        fn make_readable(&mut self, count: usize, has_outstanding: bool) {
491            self.lock().make_readable(count, has_outstanding)
492        }
493    }
494
495    /// An implementation of [`SendBuffer`] for tests.
496    #[derive(Debug, Default)]
497    pub struct TestSendBuffer {
498        fake_stream: Arc<Mutex<Vec<u8>>>,
499        ring: RingBuffer,
500    }
501
502    impl TestSendBuffer {
503        /// Creates a new `TestSendBuffer` with a backing shared vec and a
504        /// helper ring buffer.
505        pub fn new(fake_stream: Arc<Mutex<Vec<u8>>>, ring: RingBuffer) -> TestSendBuffer {
506            Self { fake_stream, ring }
507        }
508    }
509
510    impl Buffer for TestSendBuffer {
511        fn capacity_range() -> (usize, usize) {
512            let (min, max) = RingBuffer::capacity_range();
513            (min * 2, max * 2)
514        }
515
516        fn limits(&self) -> BufferLimits {
517            let Self { fake_stream, ring } = self;
518            let BufferLimits { capacity: ring_capacity, len: ring_len } = ring.limits();
519            let guard = fake_stream.lock();
520            let len = ring_len + guard.len();
521            let capacity = ring_capacity + guard.capacity();
522            BufferLimits { len, capacity }
523        }
524
525        fn target_capacity(&self) -> usize {
526            let Self { fake_stream: _, ring } = self;
527            ring.target_capacity()
528        }
529
530        fn request_capacity(&mut self, size: usize) {
531            let Self { fake_stream: _, ring } = self;
532            ring.request_capacity(size)
533        }
534    }
535
536    impl SendBuffer for TestSendBuffer {
537        type Payload<'a> = FragmentedPayload<'a, 2>;
538
539        fn mark_read(&mut self, count: usize) {
540            let Self { fake_stream: _, ring } = self;
541            ring.mark_read(count)
542        }
543
544        fn peek_with<'a, F, R>(&'a mut self, offset: usize, f: F) -> R
545        where
546            F: FnOnce(Self::Payload<'a>) -> R,
547        {
548            let Self { fake_stream, ring } = self;
549            let mut guard = fake_stream.lock();
550            if !guard.is_empty() {
551                // Pull from the fake stream into the ring if there is capacity.
552                let BufferLimits { capacity, len } = ring.limits();
553                let len = (capacity - len).min(guard.len());
554                let rest = guard.split_off(len);
555                let first = core::mem::replace(&mut *guard, rest);
556                assert_eq!(ring.enqueue_data(&first[..]), len);
557            }
558            ring.peek_with(offset, f)
559        }
560    }
561
562    fn arc_mutex_eq<T: PartialEq>(a: &Arc<Mutex<T>>, b: &Arc<Mutex<T>>) -> bool {
563        if Arc::ptr_eq(a, b) {
564            return true;
565        }
566        (&*a.lock()) == (&*b.lock())
567    }
568
569    /// A fake implementation of client-side TCP buffers.
570    #[derive(Clone, Debug, Default)]
571    pub struct ClientBuffers {
572        /// Receive buffer shared with core TCP implementation.
573        pub receive: Arc<Mutex<RingBuffer>>,
574        /// Send buffer shared with core TCP implementation.
575        pub send: Arc<Mutex<Vec<u8>>>,
576    }
577
578    impl PartialEq for ClientBuffers {
579        fn eq(&self, ClientBuffers { receive: other_receive, send: other_send }: &Self) -> bool {
580            let Self { receive, send } = self;
581            arc_mutex_eq(receive, other_receive) && arc_mutex_eq(send, other_send)
582        }
583    }
584
585    impl Eq for ClientBuffers {}
586
587    impl ClientBuffers {
588        /// Creates new a `ClientBuffers` with `buffer_sizes`.
589        pub fn new(buffer_sizes: BufferSizes) -> Self {
590            let BufferSizes { send, receive } = buffer_sizes;
591            Self {
592                receive: Arc::new(Mutex::new(RingBuffer::new(receive))),
593                send: Arc::new(Mutex::new(Vec::with_capacity(send))),
594            }
595        }
596    }
597
598    /// A fake implementation of bindings buffers for TCP.
599    #[derive(Debug, Clone, Eq, PartialEq)]
600    #[allow(missing_docs)]
601    pub enum ProvidedBuffers {
602        Buffers(WriteBackClientBuffers),
603        NoBuffers,
604    }
605
606    impl Default for ProvidedBuffers {
607        fn default() -> Self {
608            Self::NoBuffers
609        }
610    }
611
612    impl From<WriteBackClientBuffers> for ProvidedBuffers {
613        fn from(buffers: WriteBackClientBuffers) -> Self {
614            ProvidedBuffers::Buffers(buffers)
615        }
616    }
617
618    impl From<ProvidedBuffers> for WriteBackClientBuffers {
619        fn from(extra: ProvidedBuffers) -> Self {
620            match extra {
621                ProvidedBuffers::Buffers(buffers) => buffers,
622                ProvidedBuffers::NoBuffers => Default::default(),
623            }
624        }
625    }
626
627    impl From<ProvidedBuffers> for () {
628        fn from(_: ProvidedBuffers) -> Self {
629            ()
630        }
631    }
632
633    impl From<()> for ProvidedBuffers {
634        fn from(_: ()) -> Self {
635            Default::default()
636        }
637    }
638
639    /// The variant of [`ProvidedBuffers`] that provides observing the data
640    /// sent/received to TCP sockets.
641    #[derive(Debug, Default, Clone)]
642    pub struct WriteBackClientBuffers(pub Arc<Mutex<Option<ClientBuffers>>>);
643
644    impl PartialEq for WriteBackClientBuffers {
645        fn eq(&self, Self(other): &Self) -> bool {
646            let Self(this) = self;
647            arc_mutex_eq(this, other)
648        }
649    }
650
651    impl Eq for WriteBackClientBuffers {}
652
653    impl IntoBuffers<Arc<Mutex<RingBuffer>>, TestSendBuffer> for ProvidedBuffers {
654        fn into_buffers(
655            self,
656            buffer_sizes: BufferSizes,
657        ) -> (Arc<Mutex<RingBuffer>>, TestSendBuffer) {
658            let buffers = ClientBuffers::new(buffer_sizes);
659            if let ProvidedBuffers::Buffers(b) = self {
660                *b.0.as_ref().lock() = Some(buffers.clone());
661            }
662            let ClientBuffers { receive, send } = buffers;
663            (receive, TestSendBuffer::new(send, Default::default()))
664        }
665    }
666
667    impl ListenerNotifier for ProvidedBuffers {
668        fn new_incoming_connections(&mut self, _: usize) {}
669    }
670}
671
672#[cfg(test)]
673mod test {
674    use alloc::vec::Vec;
675    use alloc::{format, vec};
676
677    use netstack3_base::FragmentedPayload;
678    use proptest::strategy::{Just, Strategy};
679    use proptest::test_runner::Config;
680    use proptest::{prop_assert, prop_assert_eq, proptest};
681    use proptest_support::failed_seeds_no_std;
682    use test_case::test_case;
683    use testutil::RingBuffer;
684
685    use super::*;
686    proptest! {
687        #![proptest_config(Config {
688            // Add all failed seeds here.
689            failure_persistence: failed_seeds_no_std!(
690                "cc f621ca7d3a2b108e0dc41f7169ad028f4329b79e90e73d5f68042519a9f63999",
691                "cc c449aebed201b4ec4f137f3c224f20325f4cfee0b7fd596d9285176b6d811aa9"
692            ),
693            ..Config::default()
694        })]
695
696        #[test]
697        fn ring_buffer_make_readable((mut rb, avail) in ring_buffer::with_written()) {
698            let old_storage = rb.storage.clone();
699            let old_head = rb.head;
700            let old_len = rb.limits().len;
701            rb.make_readable(avail, false);
702            // Assert that length is updated but everything else is unchanged.
703            let RingBuffer { storage, head, len } = rb;
704            prop_assert_eq!(len, old_len + avail);
705            prop_assert_eq!(head, old_head);
706            prop_assert_eq!(storage, old_storage);
707        }
708
709        #[test]
710        fn ring_buffer_write_at((mut rb, offset, data) in ring_buffer::with_offset_data()) {
711            let old_head = rb.head;
712            let old_len = rb.limits().len;
713            prop_assert_eq!(rb.write_at(offset, &&data[..]), data.len());
714            prop_assert_eq!(rb.head, old_head);
715            prop_assert_eq!(rb.limits().len, old_len);
716            for i in 0..data.len() {
717                let masked = (rb.head + rb.len + offset + i) % rb.storage.len();
718                // Make sure that data are written.
719                prop_assert_eq!(rb.storage[masked], data[i]);
720                rb.storage[masked] = 0;
721            }
722            // And the other parts of the storage are untouched.
723            prop_assert_eq!(&rb.storage, &vec![0; rb.storage.len()]);
724        }
725
726        #[test]
727        fn ring_buffer_read_with((mut rb, expected, consume) in ring_buffer::with_read_data()) {
728            prop_assert_eq!(rb.limits().len, expected.len());
729            let nread = rb.read_with(|readable| {
730                assert!(readable.len() == 1 || readable.len() == 2);
731                let got = readable.concat();
732                assert_eq!(got, expected);
733                consume
734            });
735            prop_assert_eq!(nread, consume);
736            prop_assert_eq!(rb.limits().len, expected.len() - consume);
737        }
738
739        #[test]
740        fn ring_buffer_mark_read((mut rb, readable) in ring_buffer::with_readable()) {
741            const BYTE_TO_WRITE: u8 = 0x42;
742            let written = rb.writable_regions().into_iter().fold(0, |acc, slice| {
743                slice.fill(BYTE_TO_WRITE);
744                acc + slice.len()
745            });
746            let old_storage = rb.storage.clone();
747            let old_head = rb.head;
748            let old_len = rb.limits().len;
749
750            rb.mark_read(readable);
751            let new_writable = rb.writable_regions().into_iter().fold(Vec::new(), |mut acc, slice| {
752                acc.extend_from_slice(slice);
753                acc
754            });
755            for (i, x) in new_writable.iter().enumerate().take(written) {
756                prop_assert_eq!(*x, BYTE_TO_WRITE, "i={}, rb={:?}", i, rb);
757            }
758            prop_assert!(new_writable.len() >= written);
759
760            let RingBuffer { storage, head, len } = rb;
761            prop_assert_eq!(len, old_len - readable);
762            prop_assert_eq!(head, (old_head + readable) % old_storage.len());
763            prop_assert_eq!(storage, old_storage);
764        }
765
766        #[test]
767        fn ring_buffer_peek_with((mut rb, expected, offset) in ring_buffer::with_read_data()) {
768            prop_assert_eq!(rb.limits().len, expected.len());
769            rb.peek_with(offset, |readable| {
770                prop_assert_eq!(readable.to_vec(), &expected[offset..]);
771                Ok(())
772            })?;
773            prop_assert_eq!(rb.limits().len, expected.len());
774        }
775
776        #[test]
777        fn ring_buffer_writable_regions(mut rb in ring_buffer::arb_ring_buffer()) {
778            const BYTE_TO_WRITE: u8 = 0x42;
779            let writable_len = rb.writable_regions().into_iter().fold(0, |acc, slice| {
780                slice.fill(BYTE_TO_WRITE);
781                acc + slice.len()
782            });
783            let BufferLimits {len, capacity} = rb.limits();
784            prop_assert_eq!(writable_len + len, capacity);
785            for i in 0..capacity {
786                let expected = if i < len {
787                    0
788                } else {
789                    BYTE_TO_WRITE
790                };
791                let idx = (rb.head + i) % rb.storage.len();
792                prop_assert_eq!(rb.storage[idx], expected);
793            }
794        }
795    }
796
797    #[test_case([Range { start: 0, end: 0 }]
798        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(0), generation: 0 })]
799    #[test_case([Range { start: 0, end: 10 }]
800        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(10), generation: 1 })]
801    #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }]
802        => Assembler {
803            outstanding: [
804                SeqRange::new(SeqNum::new(5)..SeqNum::new(15), 2).unwrap()
805            ].into_iter().collect(),
806            nxt: SeqNum::new(0),
807            generation: 2,
808        })
809    ]
810    #[test_case([Range{ start: 10, end: 15 }, Range { start: 0, end: 5 }, Range { start: 5, end: 10 }]
811        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
812    #[test_case([Range{ start: 10, end: 15 }, Range { start: 5, end: 10 }, Range { start: 0, end: 5 }]
813        => Assembler { outstanding: SeqRanges::default(), nxt: SeqNum::new(15), generation: 3 })]
814    #[test_case([Range{ start: 10, end: 15 }, Range { start: 10, end: 15 }, Range { start: 11, end: 12 }]
815        => Assembler {
816             outstanding: [
817                SeqRange::new(SeqNum::new(10)..SeqNum::new(15), 3).unwrap()
818            ].into_iter().collect(),
819            nxt: SeqNum::new(0), generation: 3 })]
820    fn assembler_examples(ops: impl IntoIterator<Item = Range<u32>>) -> Assembler {
821        let mut assembler = Assembler::new(SeqNum::new(0));
822        for Range { start, end } in ops.into_iter() {
823            let _advanced = assembler.insert(SeqNum::new(start)..SeqNum::new(end));
824        }
825        assembler
826    }
827
828    #[test_case(&[] => Vec::<Range<u32>>::new(); "empty")]
829    #[test_case(&[1..2] => vec![1..2]; "single")]
830    #[test_case(&[1..2, 3..4] => vec![3..4, 1..2]; "latest first")]
831    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10]
832        => vec![9..10, 7..8, 5..6, 3..4]; "max len")]
833    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 6..7]
834        => vec![5..8, 9..10, 3..4, 1..2]; "gap fill")]
835    #[test_case(&[1..2, 3..4, 5..6, 7..8, 9..10, 1..8]
836        => vec![1..8, 9..10]; "large gap fill")]
837    fn assembler_sack_blocks(ops: &[Range<u32>]) -> Vec<Range<u32>> {
838        let mut assembler = Assembler::new(SeqNum::new(0));
839        for Range { start, end } in ops {
840            let _: usize = assembler.insert(SeqNum::new(*start)..SeqNum::new(*end));
841        }
842        assembler
843            .sack_blocks()
844            .try_iter()
845            .map(|r| r.expect("invalid block").into_range_u32())
846            .collect()
847    }
848
849    #[test]
850    // Regression test for https://fxbug.dev/42061342.
851    fn ring_buffer_wrap_around() {
852        const CAPACITY: usize = 16;
853        let mut rb = RingBuffer::new(CAPACITY);
854
855        // Write more than half the buffer.
856        const BUF_SIZE: usize = 10;
857        assert_eq!(rb.enqueue_data(&[0xAA; BUF_SIZE]), BUF_SIZE);
858        rb.peek_with(0, |payload| {
859            assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xAA; BUF_SIZE]))
860        });
861        rb.mark_read(BUF_SIZE);
862
863        // Write around the end of the buffer.
864        assert_eq!(rb.enqueue_data(&[0xBB; BUF_SIZE]), BUF_SIZE);
865        rb.peek_with(0, |payload| {
866            assert_eq!(
867                payload,
868                FragmentedPayload::new([
869                    &[0xBB; (CAPACITY - BUF_SIZE)],
870                    &[0xBB; (BUF_SIZE * 2 - CAPACITY)]
871                ])
872            )
873        });
874        // Mark everything read, which should advance `head` around to the
875        // beginning of the buffer.
876        rb.mark_read(BUF_SIZE);
877
878        // Now make a contiguous sequence of bytes readable.
879        assert_eq!(rb.enqueue_data(&[0xCC; BUF_SIZE]), BUF_SIZE);
880        rb.peek_with(0, |payload| {
881            assert_eq!(payload, FragmentedPayload::new_contiguous(&[0xCC; BUF_SIZE]))
882        });
883
884        // Check that the unwritten bytes are left untouched. If `head` was
885        // advanced improperly, this will crash.
886        let read = rb.read_with(|segments| {
887            assert_eq!(segments, [[0xCC; BUF_SIZE]]);
888            BUF_SIZE
889        });
890        assert_eq!(read, BUF_SIZE);
891    }
892
893    #[test]
894    fn ring_buffer_example() {
895        let mut rb = RingBuffer::new(16);
896        assert_eq!(rb.write_at(5, &"World".as_bytes()), 5);
897        assert_eq!(rb.write_at(0, &"Hello".as_bytes()), 5);
898        rb.make_readable(10, false);
899        assert_eq!(
900            rb.read_with(|readable| {
901                assert_eq!(readable, &["HelloWorld".as_bytes()]);
902                5
903            }),
904            5
905        );
906        assert_eq!(
907            rb.read_with(|readable| {
908                assert_eq!(readable, &["World".as_bytes()]);
909                readable[0].len()
910            }),
911            5
912        );
913        assert_eq!(rb.write_at(0, &"HelloWorld".as_bytes()), 10);
914        rb.make_readable(10, false);
915        assert_eq!(
916            rb.read_with(|readable| {
917                assert_eq!(readable, &["HelloW".as_bytes(), "orld".as_bytes()]);
918                6
919            }),
920            6
921        );
922        assert_eq!(rb.limits().len, 4);
923        assert_eq!(
924            rb.read_with(|readable| {
925                assert_eq!(readable, &["orld".as_bytes()]);
926                4
927            }),
928            4
929        );
930        assert_eq!(rb.limits().len, 0);
931
932        assert_eq!(rb.enqueue_data("Hello".as_bytes()), 5);
933        assert_eq!(rb.limits().len, 5);
934
935        let () = rb.peek_with(3, |readable| {
936            assert_eq!(readable.to_vec(), "lo".as_bytes());
937        });
938
939        rb.mark_read(2);
940
941        let () = rb.peek_with(0, |readable| {
942            assert_eq!(readable.to_vec(), "llo".as_bytes());
943        });
944    }
945
946    mod ring_buffer {
947        use super::*;
948        // Use a small capacity so that we have a higher chance to exercise
949        // wrapping around logic.
950        const MAX_CAP: usize = 32;
951
952        fn arb_ring_buffer_args() -> impl Strategy<Value = (usize, usize, usize)> {
953            // Use a small capacity so that we have a higher chance to exercise
954            // wrapping around logic.
955            (1..=MAX_CAP).prop_flat_map(|cap| {
956                let max_len = cap;
957                //  cap      head     len
958                (Just(cap), 0..cap, 0..=max_len)
959            })
960        }
961
962        pub(super) fn arb_ring_buffer() -> impl Strategy<Value = RingBuffer> {
963            arb_ring_buffer_args().prop_map(|(cap, head, len)| RingBuffer {
964                storage: vec![0; cap],
965                head,
966                len,
967            })
968        }
969
970        /// A strategy for a [`RingBuffer`] and a valid length to mark read.
971        pub(super) fn with_readable() -> impl Strategy<Value = (RingBuffer, usize)> {
972            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
973                (Just(RingBuffer { storage: vec![0; cap], head, len }), 0..=len)
974            })
975        }
976
977        /// A strategy for a [`RingBuffer`] and a valid length to make readable.
978        pub(super) fn with_written() -> impl Strategy<Value = (RingBuffer, usize)> {
979            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
980                let rb = RingBuffer { storage: vec![0; cap], head, len };
981                let max_written = cap - len;
982                (Just(rb), 0..=max_written)
983            })
984        }
985
986        /// A strategy for a [`RingBuffer`], a valid offset and data to write.
987        pub(super) fn with_offset_data() -> impl Strategy<Value = (RingBuffer, usize, Vec<u8>)> {
988            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
989                let writable_len = cap - len;
990                (0..=writable_len).prop_flat_map(move |offset| {
991                    (0..=writable_len - offset).prop_flat_map(move |data_len| {
992                        (
993                            Just(RingBuffer { storage: vec![0; cap], head, len }),
994                            Just(offset),
995                            proptest::collection::vec(1..=u8::MAX, data_len),
996                        )
997                    })
998                })
999            })
1000        }
1001
1002        /// A strategy for a [`RingBuffer`], its readable data, and how many
1003        /// bytes to consume.
1004        pub(super) fn with_read_data() -> impl Strategy<Value = (RingBuffer, Vec<u8>, usize)> {
1005            arb_ring_buffer_args().prop_flat_map(|(cap, head, len)| {
1006                proptest::collection::vec(1..=u8::MAX, len).prop_flat_map(move |data| {
1007                    // Fill the RingBuffer with the data.
1008                    let mut rb = RingBuffer { storage: vec![0; cap], head, len: 0 };
1009                    assert_eq!(rb.write_at(0, &&data[..]), len);
1010                    rb.make_readable(len, false);
1011                    (Just(rb), Just(data), 0..=len)
1012                })
1013            })
1014        }
1015    }
1016}