Skip to main content

netdevice_client/session/buffer/
pool.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice buffer pool.
6
7use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::MaybeUninit;
15use std::num::{NonZeroU16, TryFromIntError};
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::Arc;
19use std::sync::atomic::{self, AtomicBool, AtomicU64};
20use std::task::Poll;
21
22use arrayvec::ArrayVec;
23use explicit::ResultExt as _;
24use fidl_fuchsia_hardware_network as netdev;
25use fuchsia_runtime::vmar_root_self;
26use futures::channel::oneshot::{Receiver, Sender, channel};
27
28use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
29use crate::error::{Error, Result};
30use crate::session::{BufferLayout, Config, Pending, Port};
31
32/// Responsible for managing [`Buffer`]s for a [`Session`](crate::session::Session).
33pub(in crate::session) struct Pool {
34    /// Base address of the pool.
35    // Note: This field requires us to manually implement `Sync` and `Send`.
36    base: NonNull<u8>,
37    /// The length of the pool in bytes.
38    bytes: usize,
39    /// The descriptors allocated for the pool.
40    descriptors: Descriptors,
41    /// Shared state for allocation.
42    tx_alloc_state: Mutex<TxAllocState>,
43    /// The free rx descriptors pending to be sent to driver.
44    pub(in crate::session) rx_pending: Pending<Rx>,
45    /// The buffer layout.
46    buffer_layout: BufferLayout,
47    /// State-keeping allowing sessions to handle rx leases.
48    rx_leases: RxLeaseHandlingState,
49}
50
51// `Pool` is `Send` and `Sync`, and this allows the compiler to deduce `Buffer`
52// to be `Send`. These impls are safe because we can safely share `Pool` and
53// `&Pool`: the implementation would never allocate the same buffer to two
54// callers at the same time.
55unsafe impl Send for Pool {}
56unsafe impl Sync for Pool {}
57
58/// The shared state which keeps track of available buffers and tx buffers.
59struct TxAllocState {
60    /// All pending tx allocation requests.
61    requests: VecDeque<TxAllocReq>,
62    free_list: TxFreeList,
63}
64
65/// We use a linked list to maintain the tx free descriptors - they are linked
66/// through their `nxt` fields, note this differs from the chaining expected
67/// by the network device protocol:
68/// - You can chain more than [`netdev::MAX_DESCRIPTOR_CHAIN`] descriptors
69///   together.
70/// - the free-list ends when the `nxt` field is 0xff, while the normal chain
71///   ends when `chain_length` becomes 0.
72struct TxFreeList {
73    /// The head of a linked list of available descriptors that can be allocated
74    /// for tx.
75    head: Option<DescId<Tx>>,
76    /// How many free descriptors are there in the pool.
77    len: u16,
78}
79
80impl Pool {
81    /// Creates a new [`Pool`] and its backing [`zx::Vmo`]s.
82    ///
83    /// Returns [`Pool`] and the [`zx::Vmo`]s for descriptors and data, in that
84    /// order.
85    pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
86        let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
87            config;
88        let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
89        let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
90            Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
91
92        // Construct the free list.
93        let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
94            descriptors.borrow_mut(&mut curr).set_nxt(head);
95            Some(curr)
96        });
97
98        for rx_desc in rx_free.iter_mut() {
99            descriptors.borrow_mut(rx_desc).initialize(
100                ChainLength::ZERO,
101                0,
102                buffer_layout.length.try_into().unwrap(),
103                0,
104            );
105        }
106
107        let tx_alloc_state = TxAllocState {
108            free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
109            requests: VecDeque::new(),
110        };
111
112        let size = buffer_stride.get() * u64::from(num_buffers);
113        let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
114
115        const VMO_NAME: zx::Name =
116            const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
117        data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
118        // `as` is OK because `size` is positive and smaller than isize::MAX.
119        // This is following the practice of rust stdlib to ensure allocation
120        // size never reaches isize::MAX.
121        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
122        let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
123        // The returned address of zx_vmar_map on success must be non-zero:
124        // https://fuchsia.dev/fuchsia-src/reference/syscalls/vmar_map
125        let base = NonNull::new(
126            vmar_root_self()
127                .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
128                .map_err(|status| Error::Map("data", status))? as *mut u8,
129        )
130        .unwrap();
131
132        Ok((
133            Arc::new(Pool {
134                base,
135                bytes: len,
136                descriptors,
137                tx_alloc_state: Mutex::new(tx_alloc_state),
138                rx_pending: Pending::new(rx_free),
139                buffer_layout,
140                rx_leases: RxLeaseHandlingState::new_with_flags(options),
141            }),
142            descriptors_vmo,
143            data_vmo,
144        ))
145    }
146
147    /// Allocates `num_parts` tx descriptors.
148    ///
149    /// It will block if there are not enough descriptors. Note that the
150    /// descriptors are not initialized, you need to call [`AllocGuard::init()`]
151    /// on the returned [`AllocGuard`] if you want to send it to the driver
152    /// later.
153    pub(in crate::session) async fn alloc_tx(
154        self: &Arc<Self>,
155        num_parts: ChainLength,
156    ) -> AllocGuard<Tx> {
157        let receiver = {
158            let mut state = self.tx_alloc_state.lock();
159            match state.free_list.try_alloc(num_parts, &self.descriptors) {
160                Some(allocated) => {
161                    return AllocGuard::new(allocated, self.clone());
162                }
163                None => {
164                    let (request, receiver) = TxAllocReq::new(num_parts);
165                    state.requests.push_back(request);
166                    receiver
167                }
168            }
169        };
170        // The sender must not be dropped.
171        receiver.await.unwrap()
172    }
173
174    /// Tries to allocate a [`SinglePartTxBuffer`].
175    ///
176    /// Returns `Ok(None)` if there is no available buffer, or `Err(Error::TxLength)`
177    /// if the requested size cannot meet the device requirement.
178    pub(in crate::session) fn try_alloc_single_part_tx_buffer(
179        self: &Arc<Self>,
180        num_bytes: usize,
181    ) -> Result<Option<SinglePartTxBuffer>> {
182        let BufferLayout { min_tx_data: _, min_tx_head, min_tx_tail, length: buffer_length } =
183            self.buffer_layout;
184        if num_bytes > buffer_length - usize::from(min_tx_head) - usize::from(min_tx_tail) {
185            return Err(Error::TxLength);
186        }
187        self.tx_alloc_state
188            .lock()
189            .free_list
190            .try_alloc(ChainLength::try_from(1u8).unwrap(), &self.descriptors)
191            .map(|allocated| -> Result<SinglePartTxBuffer> {
192                let mut alloc = AllocGuard::new(allocated, self.clone());
193                alloc.init(num_bytes)?;
194                let buffer = Buffer::from(alloc);
195                Ok(SinglePartTxBuffer::new(buffer, num_bytes).expect("must be single part"))
196            })
197            .transpose()
198    }
199
200    /// Allocates a tx [`Buffer`].
201    ///
202    /// The returned buffer will have `num_bytes` as its capacity, the method
203    /// will block if there are not enough buffers. An error will be returned if
204    /// the requested size cannot meet the device requirement, for example, if
205    /// the size of the head or tail region will become unrepresentable in u16.
206    pub(in crate::session) async fn alloc_tx_buffer(
207        self: &Arc<Self>,
208        num_bytes: usize,
209    ) -> Result<Buffer<Tx>> {
210        self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
211    }
212
213    /// Waits for at least one TX buffer to be available and returns an iterator
214    /// of buffers with `num_bytes` as capacity.
215    ///
216    /// The returned iterator is guaranteed to yield at least one item (though
217    /// it might be an error if the requested size cannot meet the device
218    /// requirement).
219    ///
220    /// # Note
221    ///
222    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
223    /// returned iterator will seemingly yield infinite items if the yielded
224    /// `Buffer`s are dropped while iterating.
225    pub(in crate::session) async fn alloc_tx_buffers<'a>(
226        self: &'a Arc<Self>,
227        num_bytes: usize,
228    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
229        let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
230            self.buffer_layout;
231        let tx_head = usize::from(min_tx_head);
232        let tx_tail = usize::from(min_tx_tail);
233        let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
234        let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
235        let chain_length = ChainLength::try_from(num_parts)?;
236        let first = self.alloc_tx(chain_length).await;
237        let iter = std::iter::once(first)
238            .chain(std::iter::from_fn(move || {
239                let mut state = self.tx_alloc_state.lock();
240                state
241                    .free_list
242                    .try_alloc(chain_length, &self.descriptors)
243                    .map(|allocated| AllocGuard::new(allocated, self.clone()))
244            }))
245            // Fuse afterwards so we're guaranteeing we can't see a new entry
246            // after having yielded `None` once.
247            .fuse()
248            .map(move |mut alloc| {
249                alloc.init(num_bytes)?;
250                Ok(alloc.into())
251            });
252        Ok(iter)
253    }
254
255    /// Frees rx descriptors.
256    pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
257        self.rx_pending.extend(descs.into_iter().map(|mut desc| {
258            self.descriptors.borrow_mut(&mut desc).initialize(
259                ChainLength::ZERO,
260                0,
261                self.buffer_layout.length.try_into().unwrap(),
262                0,
263            );
264            desc
265        }));
266    }
267
268    /// Frees tx descriptors.
269    ///
270    /// # Panics
271    ///
272    /// Panics if given an empty chain.
273    fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
274        // We store any pending request that need to be fulfilled in the stack
275        // here, to fulfill them only once we drop the lock, guaranteeing an
276        // AllocGuard can't be dropped while the lock is held.
277        let mut to_fulfill = ArrayVec::<
278            (TxAllocReq, AllocGuard<Tx>),
279            { netdev::MAX_DESCRIPTOR_CHAIN as usize },
280        >::new();
281
282        let mut state = self.tx_alloc_state.lock();
283
284        {
285            let mut descs = chain.into_iter();
286            // The following can't overflow because we can have at most u16::MAX
287            // descriptors: free_len + #(to_free) + #(descs in use) <= u16::MAX,
288            // Thus free_len + #(to_free) <= u16::MAX.
289            state.free_list.len += u16::try_from(descs.len()).unwrap();
290            let head = descs.next();
291            let old_head = std::mem::replace(&mut state.free_list.head, head);
292            let mut tail = descs.last();
293            let mut tail_ref = self.descriptors.borrow_mut(
294                tail.as_mut().unwrap_or_else(|| state.free_list.head.as_mut().unwrap()),
295            );
296            tail_ref.set_nxt(old_head);
297        }
298
299        // After putting the chain back into the free list, we try to fulfill
300        // any pending tx allocation requests.
301        while let Some(req) = state.requests.front() {
302            // Skip a request that we know is canceled.
303            //
304            // This is an optimization for long-ago dropped requests, since the
305            // receiver side can be dropped between here and fulfillment later.
306            if req.sender.is_canceled() {
307                let _cancelled: Option<TxAllocReq> = state.requests.pop_front();
308                continue;
309            }
310            let size = req.size;
311            match state.free_list.try_alloc(size, &self.descriptors) {
312                Some(descs) => {
313                    // The unwrap is safe because we know requests is not empty.
314                    let req = state.requests.pop_front().unwrap();
315                    to_fulfill.push((req, AllocGuard::new(descs, self.clone())));
316
317                    // If we're full temporarily release the lock to go again
318                    // later. Fulfilling a request must _always_ be done without
319                    // holding the lock.
320                    if to_fulfill.is_full() {
321                        drop(state);
322                        for (req, alloc) in to_fulfill.drain(..) {
323                            req.fulfill(alloc)
324                        }
325                        state = self.tx_alloc_state.lock();
326                    }
327                }
328                None => break,
329            }
330        }
331
332        // Make sure we're not holding the state lock when fulfilling requests.
333        drop(state);
334        // Fulfill any ready requests.
335        for (req, alloc) in to_fulfill {
336            req.fulfill(alloc)
337        }
338    }
339
340    /// Frees the completed tx descriptors chained by head to the pool.
341    ///
342    /// Call this function when the driver hands back a completed tx descriptor.
343    pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
344        let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
345        Ok(self.free_tx(chain))
346    }
347
348    /// Creates a [`Buffer<Rx>`] corresponding to the completed rx descriptors.
349    ///
350    /// Whenever the driver hands back a completed rx descriptor, this function
351    /// can be used to create the buffer that is represented by those chained
352    /// descriptors.
353    pub(in crate::session) fn rx_completed(
354        self: &Arc<Self>,
355        head: DescId<Rx>,
356    ) -> Result<Buffer<Rx>> {
357        let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
358        let alloc = AllocGuard::new(descs, self.clone());
359        Ok(alloc.into())
360    }
361
362    fn get_slice<'a, K: AllocKind>(&self, desc: &'a DescId<K>) -> &'a [u8] {
363        let desc = self.descriptors.borrow(desc);
364        let offset = usize::try_from(desc.offset() + u64::from(desc.head_length()))
365            .expect("usize must hold u64");
366        let len = usize::try_from(desc.data_length()).expect("usize must hold u32");
367        // Safety: The descriptor is describing a buffer from this pool. It must
368        // be valid to create a slice into that region. We hold a immutable
369        // reference to the underlying descriptor, this means no one else should
370        // have mutable reference to this memory region.
371        unsafe {
372            let ptr = self.base.as_ptr().add(offset);
373            std::slice::from_raw_parts(ptr, len)
374        }
375    }
376
377    fn get_slice_mut<'a, K: AllocKind>(&self, desc: &'a mut DescId<K>) -> &'a mut [u8] {
378        let desc = self.descriptors.borrow_mut(desc);
379        let offset = usize::try_from(desc.offset() + u64::from(desc.head_length()))
380            .expect("usize must hold u64");
381        let len = usize::try_from(desc.data_length()).expect("usize must hold u32");
382        // Safety: The descriptor is describing a buffer from this pool. It must
383        // be valid to create a slice into that region. We hold a mutable
384        // reference to the underlying descriptor, this means we are currently
385        // the only one has access to this memory region.
386        unsafe {
387            let ptr = self.base.as_ptr().add(offset);
388            std::slice::from_raw_parts_mut(ptr, len)
389        }
390    }
391}
392
393impl Drop for Pool {
394    fn drop(&mut self) {
395        unsafe {
396            vmar_root_self()
397                .unmap(self.base.as_ptr() as usize, self.bytes)
398                .expect("failed to unmap VMO for Pool")
399        }
400    }
401}
402
403impl TxFreeList {
404    /// Tries to allocate tx descriptors.
405    ///
406    /// Returns [`None`] if there are not enough descriptors.
407    fn try_alloc(
408        &mut self,
409        num_parts: ChainLength,
410        descriptors: &Descriptors,
411    ) -> Option<Chained<DescId<Tx>>> {
412        if u16::from(num_parts.get()) > self.len {
413            return None;
414        }
415
416        let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
417            let new_head = self.head.as_ref().and_then(|head| {
418                let nxt = descriptors.borrow(head).nxt();
419                nxt.map(|id| unsafe {
420                    // Safety: This is the nxt field of head of the free list,
421                    // it must be a tx descriptor id.
422                    DescId::from_raw(id)
423                })
424            });
425            std::mem::replace(&mut self.head, new_head)
426        });
427        let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
428        assert_eq!(allocated.len(), num_parts.into());
429        self.len -= u16::from(num_parts.get());
430        Some(allocated)
431    }
432}
433
434/// The buffer that can be used by the [`Session`](crate::session::Session).
435pub struct Buffer<K: AllocKind> {
436    /// The descriptors allocation.
437    alloc: AllocGuard<K>,
438}
439
440impl<K: AllocKind> Buffer<K> {
441    /// Returns the length of data region of the buffer.
442    pub fn len(&self) -> usize {
443        self.parts().map(|s| s.len()).sum()
444    }
445
446    /// Returns an iterator over the data slices of the buffer parts.
447    fn parts(&self) -> impl Iterator<Item = &[u8]> + '_ {
448        self.alloc.descs.iter().map(|desc| self.alloc.pool.get_slice(desc))
449    }
450
451    /// Returns an iterator over the mutable valid data slices of the buffer parts.
452    fn parts_mut(&mut self) -> impl Iterator<Item = &mut [u8]> + '_ {
453        self.alloc.descs.iter_mut().map(|desc| self.alloc.pool.get_slice_mut(desc))
454    }
455
456    /// Leaks the underlying buffer descriptors to the driver.
457    pub(in crate::session) fn leak(mut self) -> DescId<K> {
458        let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
459        descs.into_iter().next().unwrap()
460    }
461
462    /// Retrieves the frame type of the buffer.
463    pub fn frame_type(&self) -> Result<netdev::FrameType> {
464        self.alloc.descriptor().frame_type()
465    }
466
467    /// Retrieves the buffer's source port.
468    pub fn port(&self) -> Port {
469        self.alloc.descriptor().port()
470    }
471
472    /// Returns the buffer data as a slice.
473    pub fn as_slice(&self) -> Option<&[u8]> {
474        if self.alloc.len() != 1 {
475            return None;
476        }
477        self.parts().next()
478    }
479
480    /// Returns the buffer data as a mutable slice.
481    pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
482        if self.alloc.len() != 1 {
483            return None;
484        }
485        self.parts_mut().next()
486    }
487
488    /// Returns a wrapper for read-only operations.
489    pub fn io(&self) -> BufferIORef<'_, K> {
490        let mut len = 0;
491        let parts: Chained<&[u8]> = self.parts().inspect(|s| len += s.len()).collect();
492        BufferIO { parts, pos: 0, len, _marker: std::marker::PhantomData }
493    }
494
495    /// Returns a wrapper for read-write operations.
496    pub fn io_mut(&mut self) -> BufferIOMut<'_, K> {
497        let mut len = 0;
498        let parts: Chained<&mut [u8]> = self.parts_mut().inspect(|s| len += s.len()).collect();
499        BufferIO { parts, pos: 0, len, _marker: std::marker::PhantomData }
500    }
501}
502
503// TODO(https://fxbug.dev/525167122): Consider a different API for the `set_*`
504// methods if the `descriptor_mut()` call isn't optimized out on consecutive
505// method calls.
506impl Buffer<Tx> {
507    /// Sets the buffer's destination port.
508    pub fn set_port(&mut self, port: Port) {
509        self.alloc.descriptor_mut().set_port(port)
510    }
511
512    /// Sets the frame type of the buffer.
513    pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
514        self.alloc.descriptor_mut().set_frame_type(frame_type)
515    }
516
517    /// Sets TxFlags of a Tx buffer.
518    pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
519        self.alloc.descriptor_mut().set_tx_flags(flags)
520    }
521
522    /// Sets the generic checksum offload metadata for this buffer.
523    pub fn set_generic_csum_offload(&mut self, start: u16, offset: u16) {
524        self.alloc.descriptor_mut().set_generic_csum_offload(start, offset)
525    }
526
527    /// Shrinks the buffer.
528    ///
529    /// This method shrinks the buffer length to the larger of
530    ///   - requested new length
531    ///   - device required minimum Tx data length
532    ///
533    /// It is an error to try to increase the buffer length.
534    pub fn shrink_to(&mut self, mut new_len: usize) -> Result<()> {
535        let current_len = self.len();
536
537        if new_len > current_len {
538            return Err(Error::TxLength);
539        }
540
541        let min_tx_data = usize::from(self.alloc.pool.buffer_layout.min_tx_data);
542        new_len = new_len.max(min_tx_data);
543
544        let layouts = self.alloc.calculate_descriptor_layouts(new_len)?;
545
546        for (desc_id, DescriptorLayout { data_length, tail_length, .. }) in
547            self.alloc.descs.iter_mut().zip(layouts)
548        {
549            let mut descriptor = self.alloc.pool.descriptors.borrow_mut(desc_id);
550            descriptor.set_data_length(data_length);
551            descriptor.set_tail_length(tail_length);
552        }
553        Ok(())
554    }
555}
556
557/// The rx checksum offloading information.
558pub enum ChecksumRxOffloading {
559    /// N checksums were fully verified by the device.
560    Offloaded(NonZeroU16),
561}
562
563// TODO(https://fxbug.dev/525167122): Consider a different API for the `rx_*`
564// methods if the `descriptor()` call isn't optimized out on consecutive method
565// calls.
566impl Buffer<Rx> {
567    /// Retrieves RxFlags of an Rx Buffer.
568    pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
569        self.alloc.descriptor().rx_flags()
570    }
571
572    /// Retrieves the checksum offloading information.
573    pub fn rx_checksum_offloading(&self) -> Option<ChecksumRxOffloading> {
574        self.alloc.descriptor().rx_checksum_offloading()
575    }
576}
577
578impl<K: AllocKind> Debug for Buffer<K> {
579    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
580        let Self { alloc } = self;
581        f.debug_struct("Buffer").field("alloc", alloc).finish()
582    }
583}
584
585/// A witness type that proves the buffer is backed by one part only
586/// and thus can be converted into `&[u8]`.
587pub struct SinglePartTxBuffer(Buffer<Tx>);
588
589impl SinglePartTxBuffer {
590    /// Creates a new [`SinglePartTxBuffer`] from a [`Buffer<Tx>`] if it is
591    /// backed by one part only.
592    pub fn new(buffer: Buffer<Tx>, len: usize) -> Option<Self> {
593        if buffer.alloc.len() != 1 {
594            None
595        } else {
596            let cap = usize::try_from(buffer.alloc.descriptor().data_length())
597                .expect("u32 must fit in a usize");
598            if cap < len {
599                return None;
600            }
601            Some(Self(buffer))
602        }
603    }
604
605    /// Converts back to a Tx buffer.
606    pub fn into_inner(self) -> Buffer<Tx> {
607        let Self(buffer) = self;
608        buffer
609    }
610}
611
612impl AsRef<[u8]> for SinglePartTxBuffer {
613    fn as_ref(&self) -> &[u8] {
614        // Safety: `SinglePartTxBuffer` is guaranteed to have exactly one part
615        // (verified on creation), so the first descriptor is always initialized.
616        let desc = unsafe { self.0.alloc.descs.storage[0].assume_init_ref() };
617        self.0.alloc.pool.get_slice(desc)
618    }
619}
620
621impl AsMut<[u8]> for SinglePartTxBuffer {
622    fn as_mut(&mut self) -> &mut [u8] {
623        // Safety: `SinglePartTxBuffer` is guaranteed to have exactly one part
624        // (verified on creation), so the first descriptor is always initialized.
625        let desc = unsafe { self.0.alloc.descs.storage[0].assume_init_mut() };
626        self.0.alloc.pool.get_slice_mut(desc)
627    }
628}
629
630impl packet::FragmentedBuffer for SinglePartTxBuffer {
631    fn len(&self) -> usize {
632        let desc = self.0.alloc.descriptor();
633        usize::try_from(desc.data_length()).expect("u32 must fit in a usize")
634    }
635
636    fn with_bytes<'a, R, F>(&'a self, f: F) -> R
637    where
638        F: for<'b> FnOnce(packet::FragmentedBytes<'b, 'a>) -> R,
639    {
640        f(packet::FragmentedBytes::new(&mut [self.as_ref()][..]))
641    }
642}
643
644/// A wrapper around [`Buffer`] for sequential I/O.
645///
646/// `T` must be a slice reference type, typically `&'a [u8]` for read-only
647/// operations, or `&'a mut [u8]` for read-write operations.
648pub struct BufferIO<T, K: AllocKind> {
649    parts: Chained<T>,
650    pos: usize,
651    len: usize,
652    _marker: std::marker::PhantomData<K>,
653}
654
655pub type BufferIORef<'a, K> = BufferIO<&'a [u8], K>;
656pub type BufferIOMut<'a, K> = BufferIO<&'a mut [u8], K>;
657
658impl<T> BufferIO<T, Tx>
659where
660    T: AsMut<[u8]>,
661{
662    /// Writes data from `src` into the TX buffer starting at the specified `offset`.
663    ///
664    /// This method is infallible. It returns the number of bytes successfully written.
665    ///
666    /// If the specified `offset` is greater than or equal to the total length of the
667    /// buffer, or if the buffer has no remaining capacity at the offset, `0` bytes
668    /// will be written.
669    ///
670    /// If `src` is larger than the remaining capacity of the buffer starting at
671    /// `offset`, a short write occurs: only the bytes that fit within the buffer
672    /// are written, and the returned value will be less than `src.len()`.
673    pub fn write_at(&mut self, mut offset: usize, src: &[u8]) -> usize {
674        let mut total = 0;
675
676        for slice in self.parts.iter_mut() {
677            let slice = slice.as_mut();
678            if offset < slice.len() {
679                let available = slice.len() - offset;
680                let to_copy = std::cmp::min(src.len() - total, available);
681                slice[offset..offset + to_copy].copy_from_slice(&src[total..total + to_copy]);
682                total += to_copy;
683                offset = 0;
684                if total == src.len() {
685                    break;
686                }
687            } else {
688                offset -= slice.len();
689            }
690        }
691        total
692    }
693}
694
695impl<T, K: AllocKind> BufferIO<T, K>
696where
697    T: AsRef<[u8]>,
698{
699    /// Reads data from the buffer starting at the specified `offset` into `dst`.
700    ///
701    /// This method is infallible. It returns the number of bytes successfully read.
702    ///
703    /// If the specified `offset` is greater than or equal to the total length of the
704    /// buffer, `0` bytes will be read.
705    ///
706    /// If the remaining data in the buffer starting at `offset` is less than the
707    /// size of `dst`, a short read occurs: only the available bytes are copied,
708    /// and the returned value will be less than `dst.len()`.
709    pub fn read_at(&self, mut offset: usize, dst: &mut [u8]) -> usize {
710        let mut total = 0;
711
712        for slice in self.parts.iter() {
713            let slice = slice.as_ref();
714            if offset < slice.len() {
715                let available = slice.len() - offset;
716                let to_copy = std::cmp::min(dst.len() - total, available);
717                dst[total..total + to_copy].copy_from_slice(&slice[offset..offset + to_copy]);
718                total += to_copy;
719                offset = 0;
720                if total == dst.len() {
721                    break;
722                }
723            } else {
724                offset -= slice.len();
725            }
726        }
727        total
728    }
729}
730
731/// A non-empty container that has at most [`netdev::MAX_DESCRIPTOR_CHAIN`] elements.
732struct Chained<T> {
733    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
734    len: ChainLength,
735}
736
737impl<T> Deref for Chained<T> {
738    type Target = [T];
739
740    fn deref(&self) -> &Self::Target {
741        // Safety: `self.storage[..self.len]` is already initialized.
742        unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
743    }
744}
745
746impl<T> DerefMut for Chained<T> {
747    fn deref_mut(&mut self) -> &mut Self::Target {
748        // Safety: `self.storage[..self.len]` is already initialized.
749        unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
750    }
751}
752
753impl<T> Drop for Chained<T> {
754    fn drop(&mut self) {
755        // Safety: `self.deref_mut()` is a slice of all initialized elements.
756        unsafe {
757            std::ptr::drop_in_place(self.deref_mut());
758        }
759    }
760}
761
762impl<T: Debug> Debug for Chained<T> {
763    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
764        f.debug_list().entries(self.iter()).finish()
765    }
766}
767
768impl<T> Chained<T> {
769    #[allow(clippy::uninit_assumed_init)]
770    fn empty() -> Self {
771        // Create an uninitialized array of `MaybeUninit`. The `assume_init` is
772        // safe because the type we are claiming to have initialized here is a
773        // bunch of `MaybeUninit`s, which do not require initialization.
774        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
775        // is stablized.
776        // https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.uninit_array
777        Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
778    }
779}
780
781impl<T> FromIterator<T> for Chained<T> {
782    /// # Panics
783    ///
784    /// if the iterator can yield more than MAX_DESCRIPTOR_CHAIN elements.
785    fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
786        let mut result = Self::empty();
787        let mut len = 0u8;
788        for (idx, e) in elements.into_iter().enumerate() {
789            result.storage[idx] = MaybeUninit::new(e);
790            len += 1;
791        }
792        // `len` can not be larger than `MAX_DESCRIPTOR_CHAIN`, otherwise we can't
793        // get here due to the bound checks on `result.storage`.
794        result.len = ChainLength::try_from(len).unwrap();
795        result
796    }
797}
798
799impl<T> IntoIterator for Chained<T> {
800    type Item = T;
801    type IntoIter = ChainedIter<T>;
802
803    fn into_iter(mut self) -> Self::IntoIter {
804        let len = self.len;
805        self.len = ChainLength::ZERO;
806        // Safety: we have reset the length to zero, it is now safe to move out
807        // the values and set them to be uninitialized. The `assume_init` is
808        // safe because the type we are claiming to have initialized here is a
809        // bunch of `MaybeUninit`s, which do not require initialization.
810        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
811        // is stablized.
812        #[allow(clippy::uninit_assumed_init)]
813        let storage =
814            std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
815        ChainedIter { storage, len, consumed: 0 }
816    }
817}
818
819struct ChainedIter<T> {
820    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
821    len: ChainLength,
822    consumed: u8,
823}
824
825impl<T> Iterator for ChainedIter<T> {
826    type Item = T;
827
828    fn next(&mut self) -> Option<Self::Item> {
829        if self.consumed < self.len.get() {
830            // Safety: it is safe now to replace that slot with an uninitialized
831            // value because we will advance consumed by 1.
832            let value = unsafe {
833                std::mem::replace(
834                    &mut self.storage[usize::from(self.consumed)],
835                    MaybeUninit::uninit(),
836                )
837                .assume_init()
838            };
839            self.consumed += 1;
840            Some(value)
841        } else {
842            None
843        }
844    }
845
846    fn size_hint(&self) -> (usize, Option<usize>) {
847        let len = usize::from(self.len.get() - self.consumed);
848        (len, Some(len))
849    }
850}
851
852impl<T> ExactSizeIterator for ChainedIter<T> {}
853
854impl<T> Drop for ChainedIter<T> {
855    fn drop(&mut self) {
856        // Safety: `self.storage[self.consumed..self.len]` is initialized.
857        unsafe {
858            std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
859                &mut self.storage[self.consumed.into()..self.len.into()],
860            ));
861        }
862    }
863}
864
865/// Guards the allocated descriptors; they will be freed when dropped.
866pub(in crate::session) struct AllocGuard<K: AllocKind> {
867    descs: Chained<DescId<K>>,
868    pool: Arc<Pool>,
869}
870
871impl<K: AllocKind> Debug for AllocGuard<K> {
872    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
873        let Self { descs, pool: _ } = self;
874        f.debug_struct("AllocGuard").field("descs", descs).finish()
875    }
876}
877
878impl<K: AllocKind> AllocGuard<K> {
879    fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
880        Self { descs, pool }
881    }
882
883    /// Iterates over references to the descriptors.
884    fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
885        self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
886    }
887
888    /// Iterates over mutable references to the descriptors.
889    fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
890        let descriptors = &self.pool.descriptors;
891        self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
892    }
893
894    /// Gets a reference to the head descriptor.
895    fn descriptor(&self) -> DescRef<'_, K> {
896        self.descriptors().next().expect("descriptors must not be empty")
897    }
898
899    /// Gets a mutable reference to the head descriptor.
900    fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
901        self.descriptors_mut().next().expect("descriptors must not be empty")
902    }
903}
904
905#[derive(Debug, Clone, Copy, PartialEq, Eq)]
906struct DescriptorLayout {
907    chain_length: ChainLength,
908    head_length: u16,
909    data_length: u32,
910    tail_length: u16,
911}
912
913impl AllocGuard<Tx> {
914    /// Calculates the layout for each descriptor in this allocation chain.
915    ///
916    /// The layouts are calculated to satisfy the requested `target_len`, while
917    /// ensuring the session's `min_tx_head` and `min_tx_tail` requirements are
918    /// met.
919    ///
920    /// Returns `Err(Error::TxLength)` if the requirements cannot be met (e.g. if the
921    /// required tail padding overflows `u16`).
922    fn calculate_descriptor_layouts(&self, target_len: usize) -> Result<Chained<DescriptorLayout>> {
923        let len = self.len();
924        let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, .. } =
925            self.pool.buffer_layout;
926
927        let mut remaining_target = target_len;
928        (0..len)
929            .rev()
930            .map(|clen| {
931                let chain_length = ChainLength::try_from(clen).unwrap();
932                let head_length = if clen + 1 == len { min_tx_head } else { 0 };
933                let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
934
935                // head_length and tail_length. The check was done when the config
936                // for pool was created, so the subtraction won't overflow.
937                let available_bytes = u32::try_from(
938                    buffer_length - usize::from(head_length) - usize::from(tail_length),
939                )
940                .unwrap();
941
942                let data_length = match u32::try_from(remaining_target) {
943                    Ok(target) => {
944                        if target < available_bytes {
945                            // The target bytes are less than what is available,
946                            // we need to put the excess in the tail so that the
947                            // user cannot write more than they requested (or padded).
948                            let excess = available_bytes - target;
949                            tail_length = u16::try_from(excess)
950                                .ok_checked::<TryFromIntError>()
951                                .and_then(|tail_adjustment| {
952                                    tail_length.checked_add(tail_adjustment)
953                                })
954                                .ok_or(Error::TxLength)?;
955                        }
956                        target.min(available_bytes)
957                    }
958                    Err(TryFromIntError { .. }) => available_bytes,
959                };
960
961                let data_length_usize =
962                    usize::try_from(data_length).expect("u32 must fit in a usize");
963                remaining_target = remaining_target.saturating_sub(data_length_usize);
964
965                Ok::<_, Error>(DescriptorLayout {
966                    chain_length,
967                    head_length,
968                    data_length,
969                    tail_length,
970                })
971            })
972            .collect()
973    }
974
975    /// Initializes descriptors of a tx allocation.
976    ///
977    /// We choose to enforce and satisfy the `min_tx_data` layout requirement
978    /// (imposed by the device/driver) immediately during buffer allocation and
979    /// initialization here.
980    ///
981    /// Consequently, the allocated buffer's capacity (`target_len`) may be
982    /// larger than the `requested_bytes` if `requested_bytes` is smaller than
983    /// `min_tx_data`.
984    ///
985    /// While this means we might spend CPU cycles zero-padding buffers that are
986    /// subsequently dropped without being sent (a rare occurrence in typical
987    /// usage), this guarantees that buffer is always suitable for sending. This
988    /// also makes the transmit path (`Session::send`) infallible.
989    fn init(&mut self, requested_bytes: usize) -> Result<()> {
990        let min_tx_data = self.pool.buffer_layout.min_tx_data;
991        let target_len = requested_bytes.max(usize::from(min_tx_data));
992        let layouts = self.calculate_descriptor_layouts(target_len)?;
993
994        let mut remaining_requested = requested_bytes;
995
996        for (desc_id, DescriptorLayout { chain_length, head_length, data_length, tail_length }) in
997            self.descs.iter_mut().zip(layouts)
998        {
999            // Initialize the descriptor.
1000            {
1001                let mut descriptor = self.pool.descriptors.borrow_mut(desc_id);
1002                descriptor.initialize(chain_length, head_length, data_length, tail_length);
1003            }
1004
1005            let data_length_usize = usize::try_from(data_length).expect("u32 must fit in a usize");
1006            let requested_in_part = std::cmp::min(remaining_requested, data_length_usize);
1007            let pad_in_part = data_length_usize - requested_in_part;
1008
1009            // Zero-pad any excess capacity in this buffer part that was allocated
1010            // to satisfy the `min_tx_data` layout requirement but not requested by
1011            // the caller.
1012            //
1013            // We decided to pad the buffer on initialization because the lazy commit
1014            // model can only avoid padding for the following 2 cases:
1015            // 1) User only allocates but never sends.
1016            // 2) User writes past their requested size and meets the min_tx_data
1017            //    requirement.
1018            // Both should be uncommon, and in case 2) we can fix the client by
1019            // requesting a larger size to avoid padding.
1020            if pad_in_part > 0 {
1021                let slice = self.pool.get_slice_mut(desc_id);
1022                slice[requested_in_part..requested_in_part + pad_in_part].fill(0);
1023            }
1024
1025            remaining_requested -= requested_in_part;
1026        }
1027        Ok(())
1028    }
1029}
1030
1031impl<K: AllocKind> Drop for AllocGuard<K> {
1032    fn drop(&mut self) {
1033        if self.is_empty() {
1034            return;
1035        }
1036        K::free(private::Allocation(self));
1037    }
1038}
1039
1040impl<K: AllocKind> Deref for AllocGuard<K> {
1041    type Target = [DescId<K>];
1042
1043    fn deref(&self) -> &Self::Target {
1044        self.descs.deref()
1045    }
1046}
1047
1048impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
1049    fn from(alloc: AllocGuard<K>) -> Self {
1050        Self { alloc }
1051    }
1052}
1053
1054impl<T, K: AllocKind> Read for BufferIO<T, K>
1055where
1056    T: AsRef<[u8]>,
1057{
1058    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1059        let read_len = self.read_at(self.pos, buf);
1060        self.pos += read_len;
1061        Ok(read_len)
1062    }
1063}
1064
1065impl<T> Write for BufferIO<T, Tx>
1066where
1067    T: AsMut<[u8]>,
1068{
1069    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1070        let write_len = self.write_at(self.pos, buf);
1071        self.pos += write_len;
1072        Ok(write_len)
1073    }
1074
1075    fn flush(&mut self) -> std::io::Result<()> {
1076        Ok(())
1077    }
1078}
1079
1080impl<T, K: AllocKind> Seek for BufferIO<T, K> {
1081    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
1082        let pos = match pos {
1083            SeekFrom::Start(offset) => offset,
1084            SeekFrom::End(offset) => {
1085                let end = i64::try_from(self.len).unwrap();
1086                u64::try_from(end.wrapping_add(offset)).unwrap()
1087            }
1088            SeekFrom::Current(offset) => {
1089                let current = i64::try_from(self.pos).map_err(|TryFromIntError { .. }| {
1090                    std::io::Error::from(std::io::ErrorKind::InvalidInput)
1091                })?;
1092                u64::try_from(current.wrapping_add(offset)).unwrap()
1093            }
1094        };
1095        self.pos = usize::try_from(pos).map_err(|TryFromIntError { .. }| {
1096            std::io::Error::from(std::io::ErrorKind::InvalidInput)
1097        })?;
1098        Ok(pos)
1099    }
1100}
1101
1102/// A pending tx allocation request.
1103struct TxAllocReq {
1104    sender: Sender<AllocGuard<Tx>>,
1105    size: ChainLength,
1106}
1107
1108impl TxAllocReq {
1109    fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1110        let (sender, receiver) = channel();
1111        (TxAllocReq { sender, size }, receiver)
1112    }
1113
1114    /// Fulfills the pending request with an `AllocGuard`.
1115    ///
1116    /// If the request is already closed, the guard is simply dropped and
1117    /// returned to the queue.
1118    ///
1119    /// `fulfill` must *not* be called when the `guard`'s pool is holding the tx
1120    /// lock, since we may deadlock/panic upon the double tx lock acquisition.
1121    fn fulfill(self, guard: AllocGuard<Tx>) {
1122        let Self { sender, size: _ } = self;
1123        match sender.send(guard) {
1124            Ok(()) => (),
1125            Err(guard) => {
1126                // It's ok to just drop the guard here, it'll be returned to the
1127                // pool.
1128                drop(guard);
1129            }
1130        }
1131    }
1132}
1133
1134/// A module for sealed traits so that the user of this crate can not implement
1135/// [`AllocKind`] for anything than [`Rx`] and [`Tx`].
1136mod private {
1137    use super::{AllocKind, Rx, Tx};
1138    pub trait Sealed: 'static + Sized {}
1139    impl Sealed for Rx {}
1140    impl Sealed for Tx {}
1141
1142    // We can't leak a private type in a public trait, create an opaque private
1143    // new type for &mut super::AllocGuard so that we can mention it in the
1144    // AllocKind trait.
1145    pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1146}
1147
1148/// An allocation can have two kinds, this trait provides a way to project a
1149/// type ([`Rx`] or [`Tx`]) into a value.
1150pub trait AllocKind: private::Sealed {
1151    /// The reflected value of Self.
1152    const REFL: AllocKindRefl;
1153
1154    /// frees an allocation of the given kind.
1155    fn free(alloc: private::Allocation<'_, Self>);
1156}
1157
1158/// A tag to related types for Tx allocations.
1159pub enum Tx {}
1160/// A tag to related types for Rx allocations.
1161pub enum Rx {}
1162
1163/// The reflected value that allows inspection on an [`AllocKind`] type.
1164pub enum AllocKindRefl {
1165    Tx,
1166    Rx,
1167}
1168
1169impl AllocKindRefl {
1170    pub(in crate::session) fn as_str(&self) -> &'static str {
1171        match self {
1172            AllocKindRefl::Tx => "Tx",
1173            AllocKindRefl::Rx => "Rx",
1174        }
1175    }
1176}
1177
1178impl AllocKind for Tx {
1179    const REFL: AllocKindRefl = AllocKindRefl::Tx;
1180
1181    fn free(alloc: private::Allocation<'_, Self>) {
1182        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1183        pool.free_tx(std::mem::replace(descs, Chained::empty()));
1184    }
1185}
1186
1187impl AllocKind for Rx {
1188    const REFL: AllocKindRefl = AllocKindRefl::Rx;
1189
1190    fn free(alloc: private::Allocation<'_, Self>) {
1191        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1192        pool.free_rx(std::mem::replace(descs, Chained::empty()));
1193        pool.rx_leases.rx_complete();
1194    }
1195}
1196
1197/// An extracted struct containing state pertaining to watching rx leases.
1198pub(in crate::session) struct RxLeaseHandlingState {
1199    can_watch_rx_leases: AtomicBool,
1200    /// Keeps a rolling counter of received rx frames MINUS the target frame
1201    /// number of the current outstanding lease.
1202    ///
1203    /// When no leases are pending (via [`RxLeaseWatcher::wait_until`]),
1204    /// then this matches exactly the number of received frames.
1205    ///
1206    /// Otherwise, the lease is currently waiting for remaining `u64::MAX -
1207    /// rx_Frame_counter` frames. The logic depends on `AtomicU64` wrapping
1208    /// around as part of completing rx buffers.
1209    rx_frame_counter: AtomicU64,
1210    rx_lease_waker: AtomicWaker,
1211}
1212
1213impl RxLeaseHandlingState {
1214    fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1215        Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1216    }
1217
1218    fn new_with_enabled(enabled: bool) -> Self {
1219        Self {
1220            can_watch_rx_leases: AtomicBool::new(enabled),
1221            rx_frame_counter: AtomicU64::new(0),
1222            rx_lease_waker: AtomicWaker::new(),
1223        }
1224    }
1225
1226    /// Increments the total receive frame counter and possibly wakes up a
1227    /// waiting lease yielder.
1228    fn rx_complete(&self) {
1229        let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1230        let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1231
1232        // See wait_until for details. We need to hit a waker whenever our add
1233        // wrapped the u64 back around to 0.
1234        if prev == u64::MAX {
1235            rx_lease_waker.wake();
1236        }
1237    }
1238}
1239
1240/// A trait allowing [`RxLeaseWatcher`] to be agnostic over how to get an
1241/// [`RxLeaseHandlingState`].
1242pub(in crate::session) trait RxLeaseHandlingStateContainer {
1243    fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1244}
1245
1246impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1247    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1248        self.borrow()
1249    }
1250}
1251
1252impl RxLeaseHandlingStateContainer for Arc<Pool> {
1253    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1254        &self.rx_leases
1255    }
1256}
1257
1258/// A type safe-wrapper around a single lease watcher per `Pool`.
1259pub(in crate::session) struct RxLeaseWatcher<T> {
1260    state: T,
1261}
1262
1263impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1264    /// Creates a new lease watcher.
1265    ///
1266    /// # Panics
1267    ///
1268    /// Panics if an [`RxLeaseWatcher`] has already been created for the given
1269    /// pool or the pool was not configured for it.
1270    pub(in crate::session) fn new(state: T) -> Self {
1271        assert!(
1272            state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1273            "can't watch rx leases"
1274        );
1275        Self { state }
1276    }
1277
1278    /// Called by sessions to wait until `hold_until_frame` is fulfilled to
1279    /// yield leases out.
1280    ///
1281    /// Blocks until `hold_until_frame`-th rx buffer has been released.
1282    ///
1283    /// Note that this method takes `&mut self` because only one
1284    /// [`RxLeaseWatcher`] may be created by lease handling state, and exclusive
1285    /// access to it is required to watch lease completion.
1286    pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1287        // A note about wrap-arounds.
1288        //
1289        // We're assuming the frame counter will never wrap around for
1290        // correctness here. This should be fine, even assuming a packet
1291        // rate of 1 million pps it'd take almost 600k years for this counter
1292        // to wrap around:
1293        // - 2^64 / 1e6 / 60 / 60 / 24 / 365 ~ 584e3.
1294
1295        let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1296            self.state.lease_handling_state();
1297
1298        let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1299        // After having subtracted the waiting value we *must always restore the
1300        // value* on return, even if the future is not polled to completion.
1301        let _guard = scopeguard::guard((), |()| {
1302            let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1303        });
1304
1305        // Lease is ready to be fulfilled.
1306        if prev >= hold_until_frame {
1307            return;
1308        }
1309        // Threshold is a wrapped around subtraction. So now we must wait
1310        // until the read value from the atomic is LESS THAN the threshold.
1311        let threshold = prev.wrapping_sub(hold_until_frame);
1312        futures::future::poll_fn(|cx| {
1313            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1314            if v < threshold {
1315                return Poll::Ready(());
1316            }
1317            rx_lease_waker.register(cx.waker());
1318            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1319            if v < threshold {
1320                return Poll::Ready(());
1321            }
1322            Poll::Pending
1323        })
1324        .await;
1325    }
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330
1331    use super::*;
1332
1333    use assert_matches::assert_matches;
1334    use fuchsia_async as fasync;
1335    use futures::future::FutureExt;
1336    use test_case::test_case;
1337
1338    use std::collections::HashSet;
1339    use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1340    use std::pin::pin;
1341    use std::task::{Poll, Waker};
1342
1343    const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1344    const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1345    // Safety: These are safe because none of the values are zero.
1346    const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1347    const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1348    const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1349    const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1350        * netdev::MAX_DESCRIPTOR_CHAIN as usize
1351        - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1352        - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1353
1354    const SENTINEL_BYTE: u8 = 0xab;
1355    const WRITE_BYTE: u8 = 1;
1356    const PAD_BYTE: u8 = 0;
1357
1358    const DEFAULT_CONFIG: Config = Config {
1359        buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1360        num_rx_buffers: DEFAULT_RX_BUFFERS,
1361        num_tx_buffers: DEFAULT_TX_BUFFERS,
1362        options: netdev::SessionFlags::empty(),
1363        buffer_layout: BufferLayout {
1364            length: DEFAULT_BUFFER_LENGTH.get(),
1365            min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1366            min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1367            min_tx_data: 0,
1368        },
1369    };
1370
1371    impl Pool {
1372        fn new_test_default() -> Arc<Self> {
1373            let (pool, _descriptors, _data) =
1374                Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1375            pool
1376        }
1377
1378        async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1379            self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1380                .await
1381        }
1382
1383        fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1384            self.alloc_tx_checked(n).now_or_never()
1385        }
1386
1387        fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1388            std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1389        }
1390
1391        fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1392            self.alloc_tx_buffer(num_bytes)
1393                .now_or_never()
1394                .transpose()
1395                .expect("invalid arguments for alloc_tx_buffer")
1396        }
1397
1398        fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1399            Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1400        }
1401
1402        fn fill_sentinel_bytes(&mut self) {
1403            // Safety: We have mut reference to Pool, so we get to modify the
1404            // VMO pointed by self.base.
1405            unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1406        }
1407    }
1408
1409    impl Buffer<Tx> {
1410        // Write a byte at offset, the result buffer should be pad_size long, with
1411        // 0..offset being the SENTINEL_BYTE, offset being the WRITE_BYTE and the
1412        // rest being PAD_BYTE.
1413        fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1414            {
1415                let mut io = self.io_mut();
1416                assert_eq!(io.write_at(offset, &[WRITE_BYTE][..]), 1);
1417            }
1418            assert_eq!(self.len(), pad_size);
1419            // An arbitrary value that is not SENTINAL/WRITE/PAD_BYTE so that
1420            // we can make sure the write really happened.
1421            const INIT_BYTE: u8 = 42;
1422            let mut read_buf = vec![INIT_BYTE; pad_size];
1423            assert_eq!(self.io().read_at(0, &mut read_buf[..]), read_buf.len());
1424            for (idx, byte) in read_buf.iter().enumerate() {
1425                if idx < offset {
1426                    assert_eq!(*byte, SENTINEL_BYTE);
1427                } else if idx == offset {
1428                    assert_eq!(*byte, WRITE_BYTE);
1429                } else {
1430                    assert_eq!(*byte, PAD_BYTE);
1431                }
1432            }
1433        }
1434    }
1435
1436    impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1437    where
1438        K: AllocKind,
1439        I: ExactSizeIterator<Item = u16>,
1440        T: Copy + IntoIterator<IntoIter = I>,
1441    {
1442        fn eq(&self, other: &T) -> bool {
1443            let iter = other.into_iter();
1444            if usize::from(self.len) != iter.len() {
1445                return false;
1446            }
1447            self.iter().zip(iter).all(|(l, r)| l.get() == r)
1448        }
1449    }
1450
1451    impl Debug for TxAllocReq {
1452        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1453            let TxAllocReq { sender: _, size } = self;
1454            f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1455        }
1456    }
1457
1458    #[test]
1459    fn alloc_tx_distinct() {
1460        let pool = Pool::new_test_default();
1461        let allocated = pool.alloc_tx_all(1);
1462        assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1463        let distinct = allocated
1464            .iter()
1465            .map(|alloc| {
1466                assert_eq!(alloc.descs.len(), 1);
1467                alloc.descs[0].get()
1468            })
1469            .collect::<HashSet<u16>>();
1470        assert_eq!(allocated.len(), distinct.len());
1471    }
1472
1473    #[test]
1474    fn alloc_tx_free_len() {
1475        let pool = Pool::new_test_default();
1476        {
1477            let allocated = pool.alloc_tx_all(2);
1478            assert_eq!(
1479                allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1480                DEFAULT_TX_BUFFERS.get().into()
1481            );
1482            assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1483        }
1484        assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1485    }
1486
1487    #[test]
1488    fn alloc_tx_chain() {
1489        let pool = Pool::new_test_default();
1490        let allocated = pool.alloc_tx_all(3);
1491        assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1492        assert_matches!(pool.alloc_tx_now_or_never(3), None);
1493        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1494    }
1495
1496    #[test]
1497    fn alloc_tx_many() {
1498        let pool = Pool::new_test_default();
1499        let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1500            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1501            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1502        let data_len = usize::try_from(data_len).unwrap();
1503        let mut buffers = pool
1504            .alloc_tx_buffers(data_len)
1505            .now_or_never()
1506            .expect("failed to alloc")
1507            .unwrap()
1508            // Collect into a vec so we keep the buffers alive, otherwise they
1509            // are immediately returned to the pool.
1510            .collect::<Result<Vec<_>>>()
1511            .expect("buffer error");
1512        assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1513
1514        // We have all the buffers, which means allocating more should not
1515        // resolve.
1516        assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1517
1518        // If we release a single buffer we should be able to retrieve it again.
1519        assert_matches!(buffers.pop(), Some(_));
1520        let mut more_buffers =
1521            pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1522        let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1523        assert_matches!(more_buffers.next(), None);
1524        // The iterator is fused, so None is yielded even after dropping the
1525        // buffer.
1526        drop(buffer);
1527        assert_matches!(more_buffers.next(), None);
1528    }
1529
1530    #[test]
1531    fn alloc_tx_after_free() {
1532        let pool = Pool::new_test_default();
1533        let mut allocated = pool.alloc_tx_all(1);
1534        assert_matches!(pool.alloc_tx_now_or_never(2), None);
1535        {
1536            let _drained = allocated.drain(..2);
1537        }
1538        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1539    }
1540
1541    #[test]
1542    fn blocking_alloc_tx() {
1543        let mut executor = fasync::TestExecutor::new();
1544        let pool = Pool::new_test_default();
1545        let mut allocated = pool.alloc_tx_all(1);
1546        let alloc_fut = pool.alloc_tx_checked(1);
1547        let mut alloc_fut = pin!(alloc_fut);
1548        // The allocation should block.
1549        assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1550        // And the allocation request should be queued.
1551        assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1552        let freed = allocated
1553            .pop()
1554            .expect("no fulfulled allocations")
1555            .iter()
1556            .map(|x| x.get())
1557            .collect::<Chained<_>>();
1558        let same_as_freed =
1559            |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1560        // Now the task should be able to continue.
1561        assert_matches!(
1562            &executor.run_until_stalled(&mut alloc_fut),
1563            Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1564        );
1565        // And the queued request should now be removed.
1566        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1567    }
1568
1569    #[test]
1570    fn blocking_alloc_tx_cancel_before_free() {
1571        let mut executor = fasync::TestExecutor::new();
1572        let pool = Pool::new_test_default();
1573        let mut allocated = pool.alloc_tx_all(1);
1574        {
1575            let alloc_fut = pool.alloc_tx_checked(1);
1576            let mut alloc_fut = pin!(alloc_fut);
1577            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1578            assert_matches!(
1579                pool.tx_alloc_state.lock().requests.as_slices(),
1580                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1581            );
1582        }
1583        assert_matches!(
1584            allocated.pop(),
1585            Some(AllocGuard { ref descs, pool: ref p })
1586                if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1587        );
1588        let state = pool.tx_alloc_state.lock();
1589        assert_eq!(state.free_list.len, 1);
1590        assert!(state.requests.is_empty());
1591    }
1592
1593    #[test]
1594    fn blocking_alloc_tx_cancel_after_free() {
1595        let mut executor = fasync::TestExecutor::new();
1596        let pool = Pool::new_test_default();
1597        let mut allocated = pool.alloc_tx_all(1);
1598        {
1599            let alloc_fut = pool.alloc_tx_checked(1);
1600            let mut alloc_fut = pin!(alloc_fut);
1601            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1602            assert_matches!(
1603                pool.tx_alloc_state.lock().requests.as_slices(),
1604                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1605            );
1606            assert_matches!(
1607                allocated.pop(),
1608                Some(AllocGuard { ref descs, pool: ref p })
1609                    if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1610            );
1611        }
1612        let state = pool.tx_alloc_state.lock();
1613        assert_eq!(state.free_list.len, 1);
1614        assert!(state.requests.is_empty());
1615    }
1616
1617    #[test]
1618    fn multiple_blocking_alloc_tx_fulfill_order() {
1619        const TASKS_TOTAL: usize = 3;
1620        let mut executor = fasync::TestExecutor::new();
1621        let pool = Pool::new_test_default();
1622        let mut allocated = pool.alloc_tx_all(1);
1623        let mut alloc_futs = (1..=TASKS_TOTAL)
1624            .rev()
1625            .map(|x| {
1626                let pool = pool.clone();
1627                (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1628            })
1629            .collect::<Vec<_>>();
1630
1631        for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1632            assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1633            // assert that the tasks are sorted decreasing on the requested size.
1634            assert_eq!(idx + *req_size, TASKS_TOTAL);
1635        }
1636        {
1637            let state = pool.tx_alloc_state.lock();
1638            // The first pending request was introduced by `alloc_tx_all`.
1639            assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1640            let mut requests = state.requests.iter();
1641            // It should already be cancelled because the requesting future is
1642            // already dropped.
1643            assert!(requests.next().unwrap().sender.is_canceled());
1644            // The rest of the requests must not be cancelled.
1645            assert!(requests.all(|req| !req.sender.is_canceled()))
1646        }
1647
1648        let mut to_free = Vec::new();
1649        let mut freed = 0;
1650        for free_size in (1..=TASKS_TOTAL).rev() {
1651            let (_req_size, mut task) = alloc_futs.remove(0);
1652            for _ in 1..free_size {
1653                freed += 1;
1654                assert_matches!(
1655                    allocated.pop(),
1656                    Some(AllocGuard { ref descs, pool: ref p })
1657                        if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1658                );
1659                assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1660            }
1661            freed += 1;
1662            assert_matches!(
1663                allocated.pop(),
1664                Some(AllocGuard { ref descs, pool: ref p })
1665                    if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1666            );
1667            match executor.run_until_stalled(&mut task) {
1668                Poll::Ready(alloc) => {
1669                    assert_eq!(alloc.len(), free_size);
1670                    // Don't return the allocation to the pool now.
1671                    to_free.push(alloc);
1672                }
1673                Poll::Pending => panic!("The request should be fulfilled"),
1674            }
1675            // The rest of requests can not be fulfilled.
1676            for (_req_size, task) in alloc_futs.iter_mut() {
1677                assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1678            }
1679        }
1680        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1681    }
1682
1683    #[test]
1684    fn singleton_tx_layout() {
1685        let pool = Pool::new_test_default();
1686        let buffers = std::iter::from_fn(|| {
1687            let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1688                - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1689                - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1690            pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1691                assert_eq!(buffer.alloc.descriptors().count(), 1);
1692                let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1693                    * u64::from(buffer.alloc[0].get());
1694                {
1695                    let descriptor = buffer.alloc.descriptor();
1696                    assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1697                    assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1698                    assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1699                    assert_eq!(descriptor.data_length(), data_len);
1700                    assert_eq!(descriptor.offset(), offset);
1701                }
1702
1703                {
1704                    let mut slices = buffer.parts();
1705                    let slice = slices.next().expect("should have one slice");
1706                    assert_matches!(slices.next(), None);
1707                    assert_eq!(slice.len(), usize::try_from(data_len).unwrap());
1708                    assert_eq!(
1709                        slice.as_ptr(),
1710                        pool.base.as_ptr().wrapping_add(
1711                            usize::try_from(offset).unwrap()
1712                                + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1713                        )
1714                    );
1715                }
1716                buffer
1717            })
1718        })
1719        .collect::<Vec<_>>();
1720        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1721    }
1722
1723    #[test]
1724    fn chained_tx_layout() {
1725        let pool = Pool::new_test_default();
1726        let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1727            - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1728            - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1729        let buffers = std::iter::from_fn(|| {
1730            pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1731                assert_eq!(buffer.parts().count(), 4);
1732                for (idx, (descriptor, slice)) in
1733                    buffer.alloc.descriptors().zip(buffer.parts()).enumerate()
1734                {
1735                    let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1736                    let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1737                    let tail_length = if chain_length == ChainLength::ZERO {
1738                        DEFAULT_MIN_TX_BUFFER_TAIL
1739                    } else {
1740                        0
1741                    };
1742                    let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1743                        - u32::from(head_length)
1744                        - u32::from(tail_length);
1745                    let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1746                        * u64::from(buffer.alloc[idx].get());
1747                    assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1748                    assert_eq!(descriptor.head_length(), head_length);
1749                    assert_eq!(descriptor.tail_length(), tail_length);
1750                    assert_eq!(descriptor.offset(), offset);
1751                    assert_eq!(descriptor.data_length(), data_len);
1752                    if chain_length != ChainLength::ZERO {
1753                        assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1754                    }
1755
1756                    assert_eq!(slice.len(), usize::try_from(data_len).unwrap());
1757                    assert_eq!(
1758                        slice.as_ptr(),
1759                        pool.base.as_ptr().wrapping_add(
1760                            usize::try_from(offset).unwrap() + usize::from(head_length),
1761                        )
1762                    );
1763                }
1764                buffer
1765            })
1766        })
1767        .collect::<Vec<_>>();
1768        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1769    }
1770
1771    #[test]
1772    fn rx_distinct() {
1773        let pool = Pool::new_test_default();
1774        let mut guard = pool.rx_pending.inner.lock();
1775        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1776        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1777        let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1778        assert_eq!(descs.len(), distinct.len());
1779    }
1780
1781    #[test]
1782    fn alloc_rx_layout() {
1783        let pool = Pool::new_test_default();
1784        let mut guard = pool.rx_pending.inner.lock();
1785        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1786        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1787        for desc in descs.iter() {
1788            let descriptor = pool.descriptors.borrow(desc);
1789            let offset =
1790                u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1791            assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1792            assert_eq!(descriptor.head_length(), 0);
1793            assert_eq!(descriptor.tail_length(), 0);
1794            assert_eq!(descriptor.offset(), offset);
1795            assert_eq!(
1796                descriptor.data_length(),
1797                u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1798            );
1799        }
1800    }
1801
1802    #[test]
1803    fn buffer_read_at_write_at() {
1804        let pool = Pool::new_test_default();
1805        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1806        let mut buffer =
1807            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1808        // Because we have to accommodate the space for head and tail, there
1809        // would be 2 parts instead of 1.
1810        assert_eq!(buffer.parts().count(), 2);
1811        assert_eq!(buffer.len(), alloc_bytes);
1812        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1813        assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), write_buf.len());
1814        let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1815        assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), read_buf.len());
1816        for (idx, byte) in read_buf.iter().enumerate() {
1817            assert_eq!(*byte, write_buf[idx]);
1818        }
1819    }
1820
1821    #[test]
1822    fn buffer_write_at_short() {
1823        let pool = Pool::new_test_default();
1824        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1825        let mut buffer =
1826            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1827        assert_eq!(buffer.parts().count(), 2);
1828        assert_eq!(buffer.len(), alloc_bytes);
1829
1830        let write_buf = vec![WRITE_BYTE; alloc_bytes + 10];
1831
1832        // Test short write (writing more than buffer capacity)
1833        assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), alloc_bytes);
1834
1835        // Verify short write
1836        let mut read_buf = vec![0; alloc_bytes];
1837        assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1838        for byte in read_buf.iter() {
1839            assert_eq!(*byte, WRITE_BYTE);
1840        }
1841
1842        // Test write with offset past end
1843        assert_eq!(buffer.io_mut().write_at(alloc_bytes + 1, &write_buf[..]), 0);
1844
1845        // Test write with offset inside buffer but src extending past end
1846        let offset = alloc_bytes / 2;
1847        let expected_write = alloc_bytes - offset;
1848        let write_buf = vec![2; alloc_bytes]; // Different byte to distinguish
1849        assert_eq!(buffer.io_mut().write_at(offset, &write_buf[..]), expected_write);
1850
1851        // Verify the write
1852        let mut read_buf = vec![0; alloc_bytes];
1853        assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1854        for (idx, byte) in read_buf.iter().enumerate() {
1855            if idx < offset {
1856                assert_eq!(*byte, WRITE_BYTE);
1857            } else {
1858                assert_eq!(*byte, 2);
1859            }
1860        }
1861    }
1862
1863    #[test]
1864    fn buffer_read_at_short() {
1865        let pool = Pool::new_test_default();
1866        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1867        let mut buffer =
1868            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1869        assert_eq!(buffer.parts().count(), 2);
1870        assert_eq!(buffer.len(), alloc_bytes);
1871
1872        let write_buf = vec![WRITE_BYTE; alloc_bytes];
1873        assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), alloc_bytes);
1874
1875        // Test short read (reading more than buffer capacity)
1876        let mut read_buf = vec![0xff; alloc_bytes + 10];
1877        assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1878        for (idx, byte) in read_buf.iter().enumerate() {
1879            if idx < alloc_bytes {
1880                assert_eq!(*byte, WRITE_BYTE);
1881            } else {
1882                assert_eq!(*byte, 0xff);
1883            }
1884        }
1885
1886        // Test read with offset past end
1887        assert_eq!(buffer.io().read_at(alloc_bytes + 1, &mut read_buf[..]), 0);
1888
1889        // Test read with offset inside buffer but dst extending past end
1890        let offset = alloc_bytes / 2;
1891        let expected_read = alloc_bytes - offset;
1892        let mut read_buf = vec![0xff; alloc_bytes];
1893        assert_eq!(buffer.io().read_at(offset, &mut read_buf[..]), expected_read);
1894        for (idx, byte) in read_buf.iter().enumerate() {
1895            if idx < expected_read {
1896                assert_eq!(*byte, WRITE_BYTE);
1897            } else {
1898                assert_eq!(*byte, 0xff);
1899            }
1900        }
1901    }
1902
1903    #[test]
1904    fn buffer_read_write_seek() {
1905        let pool = Pool::new_test_default();
1906        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1907        let mut buffer =
1908            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1909        // Because we have to accommodate the space for head and tail, there
1910        // would be 2 parts instead of 1.
1911        assert_eq!(buffer.parts().count(), 2);
1912        assert_eq!(buffer.len(), alloc_bytes);
1913        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1914
1915        let mut io = buffer.io_mut();
1916
1917        assert_eq!(io.write(&write_buf[..]).expect("failed to write into buffer"), write_buf.len());
1918        const SEEK_FROM_END: usize = 64;
1919        const READ_LEN: usize = 12;
1920        assert_eq!(
1921            io.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1922            u64::try_from(io.len - SEEK_FROM_END).unwrap()
1923        );
1924        let mut read_buf = [0xff; READ_LEN];
1925        assert_eq!(io.read(&mut read_buf[..]).expect("failed to read from buffer"), read_buf.len());
1926        assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1927    }
1928
1929    #[test_case(32; "single buffer part")]
1930    #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1931    fn buffer_pad(pad_size: usize) {
1932        let mut pool = Pool::new_test_default();
1933        pool.set_min_tx_buffer_length(pad_size);
1934        for offset in 0..pad_size {
1935            Arc::get_mut(&mut pool)
1936                .expect("there are multiple owners of the underlying VMO")
1937                .fill_sentinel_bytes();
1938            let mut buffer =
1939                pool.alloc_tx_buffer_now_or_never(offset + 1).expect("failed to allocate buffer");
1940            buffer.check_write_and_pad(offset, pad_size);
1941        }
1942    }
1943
1944    #[test]
1945    fn buffer_pad_grow() {
1946        const BUFFER_PARTS: u8 = 3;
1947        let mut pool = Pool::new_test_default();
1948        let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1949            * u32::from(BUFFER_PARTS)
1950            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1951            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1952        pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1953        for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1954            Arc::get_mut(&mut pool)
1955                .expect("there are multiple owners of the underlying VMO")
1956                .fill_sentinel_bytes();
1957            let mut alloc =
1958                pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1959            alloc
1960                .init(usize::try_from(offset).unwrap() + 1)
1961                .expect("head/body/tail sizes are representable with u16/u32/u16");
1962            let mut buffer = Buffer::try_from(alloc).unwrap();
1963            buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1964        }
1965    }
1966
1967    #[test_case(  0; "writes at the beginning")]
1968    #[test_case( 15; "writes in the first part")]
1969    #[test_case( 75; "writes in the second part")]
1970    #[test_case(135; "writes in the third part")]
1971    #[test_case(195; "writes in the last part")]
1972    fn buffer_used(write_offset: usize) {
1973        let pool = Pool::new_test_default();
1974        let mut buffer =
1975            pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1976        let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1977            if i == 0 {
1978                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1979            } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1980                DEFAULT_BUFFER_LENGTH.get()
1981            } else {
1982                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1983            }
1984        });
1985        assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1986        assert_eq!(buffer.io_mut().write_at(write_offset, &[WRITE_BYTE][..]), 1);
1987        // The accumulator is Some if we haven't found the part where the byte
1988        // was written, None if we've already found it.
1989        assert_eq!(
1990            buffer.parts().zip(expected_caps).fold(
1991                Some(write_offset),
1992                |offset, (slice, expected_cap)| {
1993                    assert_eq!(slice.len(), expected_cap);
1994                    match offset {
1995                        Some(offset) => {
1996                            if offset >= expected_cap {
1997                                Some(offset - slice.len())
1998                            } else {
1999                                assert_eq!(slice[offset], WRITE_BYTE);
2000                                None
2001                            }
2002                        }
2003                        None => None,
2004                    }
2005                }
2006            ),
2007            None
2008        );
2009    }
2010
2011    #[test]
2012    fn allocate_under_device_minimum() {
2013        const MIN_TX_DATA: usize = 32;
2014        const ALLOC_SIZE: usize = 16;
2015        const WRITE_BYTE: u8 = 0xff;
2016        const WRITE_SENTINAL_BYTE: u8 = 0xee;
2017        const READ_SENTINAL_BYTE: u8 = 0xdd;
2018        let mut config = DEFAULT_CONFIG;
2019        config.buffer_layout.min_tx_data = MIN_TX_DATA;
2020        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
2021        for mut buffer in Vec::from_iter(std::iter::from_fn({
2022            let pool = pool.clone();
2023            move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
2024        })) {
2025            assert_eq!(
2026                buffer.io_mut().write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]),
2027                MIN_TX_DATA
2028            );
2029        }
2030        let mut allocated =
2031            pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
2032        assert_eq!(allocated.len(), MIN_TX_DATA);
2033        const WRITE_BUF_SIZE: usize = MIN_TX_DATA + 1;
2034        assert_eq!(allocated.io_mut().write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]), MIN_TX_DATA);
2035        assert_eq!(allocated.io_mut().write_at(0, &[WRITE_BYTE; ALLOC_SIZE]), ALLOC_SIZE);
2036        assert_eq!(allocated.len(), MIN_TX_DATA);
2037        const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
2038        let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
2039        assert_eq!(allocated.io().read_at(0, &mut read_buf[..]), MIN_TX_DATA);
2040        assert_eq!(allocated.io().read_at(0, &mut read_buf[..MIN_TX_DATA]), MIN_TX_DATA);
2041        assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
2042        assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[WRITE_BYTE; ALLOC_SIZE][..]);
2043        assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
2044    }
2045
2046    #[test]
2047    fn invalid_tx_length() {
2048        let mut config = DEFAULT_CONFIG;
2049        config.buffer_layout.length = usize::from(u16::MAX) + 2;
2050        config.buffer_layout.min_tx_head = 0;
2051        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
2052        assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
2053    }
2054
2055    #[test]
2056    fn rx_leases() {
2057        let mut executor = fuchsia_async::TestExecutor::new();
2058        let state = RxLeaseHandlingState::new_with_enabled(true);
2059        let mut watcher = RxLeaseWatcher { state: &state };
2060
2061        {
2062            let mut fut = pin!(watcher.wait_until(0));
2063            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2064        }
2065        {
2066            state.rx_complete();
2067            let mut fut = pin!(watcher.wait_until(1));
2068            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2069        }
2070        {
2071            let mut fut = pin!(watcher.wait_until(0));
2072            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2073        }
2074        {
2075            let mut fut = pin!(watcher.wait_until(3));
2076            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2077            state.rx_complete();
2078            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2079            state.rx_complete();
2080            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2081        }
2082        // Dropping the wait future without seeing it complete restores the
2083        // value.
2084        let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
2085        {
2086            let mut fut = pin!(watcher.wait_until(10000));
2087            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2088        }
2089        let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
2090        assert_eq!(counter_before, counter_after);
2091    }
2092}