1use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::MaybeUninit;
15use std::num::{NonZeroU16, TryFromIntError};
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::Arc;
19use std::sync::atomic::{self, AtomicBool, AtomicU64};
20use std::task::Poll;
21
22use arrayvec::ArrayVec;
23use explicit::ResultExt as _;
24use fidl_fuchsia_hardware_network as netdev;
25use fuchsia_runtime::vmar_root_self;
26use futures::channel::oneshot::{Receiver, Sender, channel};
27
28use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
29use crate::error::{Error, Result};
30use crate::session::{BufferLayout, Config, Pending, Port};
31
32pub(in crate::session) struct Pool {
34 base: NonNull<u8>,
37 bytes: usize,
39 descriptors: Descriptors,
41 tx_alloc_state: Mutex<TxAllocState>,
43 pub(in crate::session) rx_pending: Pending<Rx>,
45 buffer_layout: BufferLayout,
47 rx_leases: RxLeaseHandlingState,
49}
50
51unsafe impl Send for Pool {}
56unsafe impl Sync for Pool {}
57
58struct TxAllocState {
60 requests: VecDeque<TxAllocReq>,
62 free_list: TxFreeList,
63}
64
65struct TxFreeList {
73 head: Option<DescId<Tx>>,
76 len: u16,
78}
79
80impl Pool {
81 pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
86 let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
87 config;
88 let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
89 let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
90 Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
91
92 let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
94 descriptors.borrow_mut(&mut curr).set_nxt(head);
95 Some(curr)
96 });
97
98 for rx_desc in rx_free.iter_mut() {
99 descriptors.borrow_mut(rx_desc).initialize(
100 ChainLength::ZERO,
101 0,
102 buffer_layout.length.try_into().unwrap(),
103 0,
104 );
105 }
106
107 let tx_alloc_state = TxAllocState {
108 free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
109 requests: VecDeque::new(),
110 };
111
112 let size = buffer_stride.get() * u64::from(num_buffers);
113 let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
114
115 const VMO_NAME: zx::Name =
116 const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
117 data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
118 let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
123 let base = NonNull::new(
126 vmar_root_self()
127 .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
128 .map_err(|status| Error::Map("data", status))? as *mut u8,
129 )
130 .unwrap();
131
132 Ok((
133 Arc::new(Pool {
134 base,
135 bytes: len,
136 descriptors,
137 tx_alloc_state: Mutex::new(tx_alloc_state),
138 rx_pending: Pending::new(rx_free),
139 buffer_layout,
140 rx_leases: RxLeaseHandlingState::new_with_flags(options),
141 }),
142 descriptors_vmo,
143 data_vmo,
144 ))
145 }
146
147 pub(in crate::session) async fn alloc_tx(
154 self: &Arc<Self>,
155 num_parts: ChainLength,
156 ) -> AllocGuard<Tx> {
157 let receiver = {
158 let mut state = self.tx_alloc_state.lock();
159 match state.free_list.try_alloc(num_parts, &self.descriptors) {
160 Some(allocated) => {
161 return AllocGuard::new(allocated, self.clone());
162 }
163 None => {
164 let (request, receiver) = TxAllocReq::new(num_parts);
165 state.requests.push_back(request);
166 receiver
167 }
168 }
169 };
170 receiver.await.unwrap()
172 }
173
174 pub(in crate::session) fn try_alloc_single_part_tx_buffer(
179 self: &Arc<Self>,
180 num_bytes: usize,
181 ) -> Result<Option<SinglePartTxBuffer>> {
182 let BufferLayout { min_tx_data: _, min_tx_head, min_tx_tail, length: buffer_length } =
183 self.buffer_layout;
184 if num_bytes > buffer_length - usize::from(min_tx_head) - usize::from(min_tx_tail) {
185 return Err(Error::TxLength);
186 }
187 self.tx_alloc_state
188 .lock()
189 .free_list
190 .try_alloc(ChainLength::try_from(1u8).unwrap(), &self.descriptors)
191 .map(|allocated| -> Result<SinglePartTxBuffer> {
192 let mut alloc = AllocGuard::new(allocated, self.clone());
193 alloc.init(num_bytes)?;
194 let buffer = Buffer::from(alloc);
195 Ok(SinglePartTxBuffer::new(buffer, num_bytes).expect("must be single part"))
196 })
197 .transpose()
198 }
199
200 pub(in crate::session) async fn alloc_tx_buffer(
207 self: &Arc<Self>,
208 num_bytes: usize,
209 ) -> Result<Buffer<Tx>> {
210 self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
211 }
212
213 pub(in crate::session) async fn alloc_tx_buffers<'a>(
226 self: &'a Arc<Self>,
227 num_bytes: usize,
228 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
229 let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
230 self.buffer_layout;
231 let tx_head = usize::from(min_tx_head);
232 let tx_tail = usize::from(min_tx_tail);
233 let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
234 let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
235 let chain_length = ChainLength::try_from(num_parts)?;
236 let first = self.alloc_tx(chain_length).await;
237 let iter = std::iter::once(first)
238 .chain(std::iter::from_fn(move || {
239 let mut state = self.tx_alloc_state.lock();
240 state
241 .free_list
242 .try_alloc(chain_length, &self.descriptors)
243 .map(|allocated| AllocGuard::new(allocated, self.clone()))
244 }))
245 .fuse()
248 .map(move |mut alloc| {
249 alloc.init(num_bytes)?;
250 Ok(alloc.into())
251 });
252 Ok(iter)
253 }
254
255 pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
257 self.rx_pending.extend(descs.into_iter().map(|mut desc| {
258 self.descriptors.borrow_mut(&mut desc).initialize(
259 ChainLength::ZERO,
260 0,
261 self.buffer_layout.length.try_into().unwrap(),
262 0,
263 );
264 desc
265 }));
266 }
267
268 fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
274 let mut to_fulfill = ArrayVec::<
278 (TxAllocReq, AllocGuard<Tx>),
279 { netdev::MAX_DESCRIPTOR_CHAIN as usize },
280 >::new();
281
282 let mut state = self.tx_alloc_state.lock();
283
284 {
285 let mut descs = chain.into_iter();
286 state.free_list.len += u16::try_from(descs.len()).unwrap();
290 let head = descs.next();
291 let old_head = std::mem::replace(&mut state.free_list.head, head);
292 let mut tail = descs.last();
293 let mut tail_ref = self.descriptors.borrow_mut(
294 tail.as_mut().unwrap_or_else(|| state.free_list.head.as_mut().unwrap()),
295 );
296 tail_ref.set_nxt(old_head);
297 }
298
299 while let Some(req) = state.requests.front() {
302 if req.sender.is_canceled() {
307 let _cancelled: Option<TxAllocReq> = state.requests.pop_front();
308 continue;
309 }
310 let size = req.size;
311 match state.free_list.try_alloc(size, &self.descriptors) {
312 Some(descs) => {
313 let req = state.requests.pop_front().unwrap();
315 to_fulfill.push((req, AllocGuard::new(descs, self.clone())));
316
317 if to_fulfill.is_full() {
321 drop(state);
322 for (req, alloc) in to_fulfill.drain(..) {
323 req.fulfill(alloc)
324 }
325 state = self.tx_alloc_state.lock();
326 }
327 }
328 None => break,
329 }
330 }
331
332 drop(state);
334 for (req, alloc) in to_fulfill {
336 req.fulfill(alloc)
337 }
338 }
339
340 pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
344 let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
345 Ok(self.free_tx(chain))
346 }
347
348 pub(in crate::session) fn rx_completed(
354 self: &Arc<Self>,
355 head: DescId<Rx>,
356 ) -> Result<Buffer<Rx>> {
357 let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
358 let alloc = AllocGuard::new(descs, self.clone());
359 Ok(alloc.into())
360 }
361
362 fn get_slice<'a, K: AllocKind>(&self, desc: &'a DescId<K>) -> &'a [u8] {
363 let desc = self.descriptors.borrow(desc);
364 let offset = usize::try_from(desc.offset() + u64::from(desc.head_length()))
365 .expect("usize must hold u64");
366 let len = usize::try_from(desc.data_length()).expect("usize must hold u32");
367 unsafe {
372 let ptr = self.base.as_ptr().add(offset);
373 std::slice::from_raw_parts(ptr, len)
374 }
375 }
376
377 fn get_slice_mut<'a, K: AllocKind>(&self, desc: &'a mut DescId<K>) -> &'a mut [u8] {
378 let desc = self.descriptors.borrow_mut(desc);
379 let offset = usize::try_from(desc.offset() + u64::from(desc.head_length()))
380 .expect("usize must hold u64");
381 let len = usize::try_from(desc.data_length()).expect("usize must hold u32");
382 unsafe {
387 let ptr = self.base.as_ptr().add(offset);
388 std::slice::from_raw_parts_mut(ptr, len)
389 }
390 }
391}
392
393impl Drop for Pool {
394 fn drop(&mut self) {
395 unsafe {
396 vmar_root_self()
397 .unmap(self.base.as_ptr() as usize, self.bytes)
398 .expect("failed to unmap VMO for Pool")
399 }
400 }
401}
402
403impl TxFreeList {
404 fn try_alloc(
408 &mut self,
409 num_parts: ChainLength,
410 descriptors: &Descriptors,
411 ) -> Option<Chained<DescId<Tx>>> {
412 if u16::from(num_parts.get()) > self.len {
413 return None;
414 }
415
416 let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
417 let new_head = self.head.as_ref().and_then(|head| {
418 let nxt = descriptors.borrow(head).nxt();
419 nxt.map(|id| unsafe {
420 DescId::from_raw(id)
423 })
424 });
425 std::mem::replace(&mut self.head, new_head)
426 });
427 let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
428 assert_eq!(allocated.len(), num_parts.into());
429 self.len -= u16::from(num_parts.get());
430 Some(allocated)
431 }
432}
433
434pub struct Buffer<K: AllocKind> {
436 alloc: AllocGuard<K>,
438}
439
440impl<K: AllocKind> Buffer<K> {
441 pub fn len(&self) -> usize {
443 self.parts().map(|s| s.len()).sum()
444 }
445
446 fn parts(&self) -> impl Iterator<Item = &[u8]> + '_ {
448 self.alloc.descs.iter().map(|desc| self.alloc.pool.get_slice(desc))
449 }
450
451 fn parts_mut(&mut self) -> impl Iterator<Item = &mut [u8]> + '_ {
453 self.alloc.descs.iter_mut().map(|desc| self.alloc.pool.get_slice_mut(desc))
454 }
455
456 pub(in crate::session) fn leak(mut self) -> DescId<K> {
458 let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
459 descs.into_iter().next().unwrap()
460 }
461
462 pub fn frame_type(&self) -> Result<netdev::FrameType> {
464 self.alloc.descriptor().frame_type()
465 }
466
467 pub fn port(&self) -> Port {
469 self.alloc.descriptor().port()
470 }
471
472 pub fn as_slice(&self) -> Option<&[u8]> {
474 if self.alloc.len() != 1 {
475 return None;
476 }
477 self.parts().next()
478 }
479
480 pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
482 if self.alloc.len() != 1 {
483 return None;
484 }
485 self.parts_mut().next()
486 }
487
488 pub fn io(&self) -> BufferIORef<'_, K> {
490 let mut len = 0;
491 let parts: Chained<&[u8]> = self.parts().inspect(|s| len += s.len()).collect();
492 BufferIO { parts, pos: 0, len, _marker: std::marker::PhantomData }
493 }
494
495 pub fn io_mut(&mut self) -> BufferIOMut<'_, K> {
497 let mut len = 0;
498 let parts: Chained<&mut [u8]> = self.parts_mut().inspect(|s| len += s.len()).collect();
499 BufferIO { parts, pos: 0, len, _marker: std::marker::PhantomData }
500 }
501}
502
503impl Buffer<Tx> {
507 pub fn set_port(&mut self, port: Port) {
509 self.alloc.descriptor_mut().set_port(port)
510 }
511
512 pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
514 self.alloc.descriptor_mut().set_frame_type(frame_type)
515 }
516
517 pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
519 self.alloc.descriptor_mut().set_tx_flags(flags)
520 }
521
522 pub fn set_generic_csum_offload(&mut self, start: u16, offset: u16) {
524 self.alloc.descriptor_mut().set_generic_csum_offload(start, offset)
525 }
526
527 pub fn shrink_to(&mut self, mut new_len: usize) -> Result<()> {
535 let current_len = self.len();
536
537 if new_len > current_len {
538 return Err(Error::TxLength);
539 }
540
541 let min_tx_data = usize::from(self.alloc.pool.buffer_layout.min_tx_data);
542 new_len = new_len.max(min_tx_data);
543
544 let layouts = self.alloc.calculate_descriptor_layouts(new_len)?;
545
546 for (desc_id, DescriptorLayout { data_length, tail_length, .. }) in
547 self.alloc.descs.iter_mut().zip(layouts)
548 {
549 let mut descriptor = self.alloc.pool.descriptors.borrow_mut(desc_id);
550 descriptor.set_data_length(data_length);
551 descriptor.set_tail_length(tail_length);
552 }
553 Ok(())
554 }
555}
556
557pub enum ChecksumRxOffloading {
559 Offloaded(NonZeroU16),
561}
562
563impl Buffer<Rx> {
567 pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
569 self.alloc.descriptor().rx_flags()
570 }
571
572 pub fn rx_checksum_offloading(&self) -> Option<ChecksumRxOffloading> {
574 self.alloc.descriptor().rx_checksum_offloading()
575 }
576}
577
578impl<K: AllocKind> Debug for Buffer<K> {
579 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
580 let Self { alloc } = self;
581 f.debug_struct("Buffer").field("alloc", alloc).finish()
582 }
583}
584
585pub struct SinglePartTxBuffer(Buffer<Tx>);
588
589impl SinglePartTxBuffer {
590 pub fn new(buffer: Buffer<Tx>, len: usize) -> Option<Self> {
593 if buffer.alloc.len() != 1 {
594 None
595 } else {
596 let cap = usize::try_from(buffer.alloc.descriptor().data_length())
597 .expect("u32 must fit in a usize");
598 if cap < len {
599 return None;
600 }
601 Some(Self(buffer))
602 }
603 }
604
605 pub fn into_inner(self) -> Buffer<Tx> {
607 let Self(buffer) = self;
608 buffer
609 }
610}
611
612impl AsRef<[u8]> for SinglePartTxBuffer {
613 fn as_ref(&self) -> &[u8] {
614 let desc = unsafe { self.0.alloc.descs.storage[0].assume_init_ref() };
617 self.0.alloc.pool.get_slice(desc)
618 }
619}
620
621impl AsMut<[u8]> for SinglePartTxBuffer {
622 fn as_mut(&mut self) -> &mut [u8] {
623 let desc = unsafe { self.0.alloc.descs.storage[0].assume_init_mut() };
626 self.0.alloc.pool.get_slice_mut(desc)
627 }
628}
629
630impl packet::FragmentedBuffer for SinglePartTxBuffer {
631 fn len(&self) -> usize {
632 let desc = self.0.alloc.descriptor();
633 usize::try_from(desc.data_length()).expect("u32 must fit in a usize")
634 }
635
636 fn with_bytes<'a, R, F>(&'a self, f: F) -> R
637 where
638 F: for<'b> FnOnce(packet::FragmentedBytes<'b, 'a>) -> R,
639 {
640 f(packet::FragmentedBytes::new(&mut [self.as_ref()][..]))
641 }
642}
643
644pub struct BufferIO<T, K: AllocKind> {
649 parts: Chained<T>,
650 pos: usize,
651 len: usize,
652 _marker: std::marker::PhantomData<K>,
653}
654
655pub type BufferIORef<'a, K> = BufferIO<&'a [u8], K>;
656pub type BufferIOMut<'a, K> = BufferIO<&'a mut [u8], K>;
657
658impl<T> BufferIO<T, Tx>
659where
660 T: AsMut<[u8]>,
661{
662 pub fn write_at(&mut self, mut offset: usize, src: &[u8]) -> usize {
674 let mut total = 0;
675
676 for slice in self.parts.iter_mut() {
677 let slice = slice.as_mut();
678 if offset < slice.len() {
679 let available = slice.len() - offset;
680 let to_copy = std::cmp::min(src.len() - total, available);
681 slice[offset..offset + to_copy].copy_from_slice(&src[total..total + to_copy]);
682 total += to_copy;
683 offset = 0;
684 if total == src.len() {
685 break;
686 }
687 } else {
688 offset -= slice.len();
689 }
690 }
691 total
692 }
693}
694
695impl<T, K: AllocKind> BufferIO<T, K>
696where
697 T: AsRef<[u8]>,
698{
699 pub fn read_at(&self, mut offset: usize, dst: &mut [u8]) -> usize {
710 let mut total = 0;
711
712 for slice in self.parts.iter() {
713 let slice = slice.as_ref();
714 if offset < slice.len() {
715 let available = slice.len() - offset;
716 let to_copy = std::cmp::min(dst.len() - total, available);
717 dst[total..total + to_copy].copy_from_slice(&slice[offset..offset + to_copy]);
718 total += to_copy;
719 offset = 0;
720 if total == dst.len() {
721 break;
722 }
723 } else {
724 offset -= slice.len();
725 }
726 }
727 total
728 }
729}
730
731struct Chained<T> {
733 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
734 len: ChainLength,
735}
736
737impl<T> Deref for Chained<T> {
738 type Target = [T];
739
740 fn deref(&self) -> &Self::Target {
741 unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
743 }
744}
745
746impl<T> DerefMut for Chained<T> {
747 fn deref_mut(&mut self) -> &mut Self::Target {
748 unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
750 }
751}
752
753impl<T> Drop for Chained<T> {
754 fn drop(&mut self) {
755 unsafe {
757 std::ptr::drop_in_place(self.deref_mut());
758 }
759 }
760}
761
762impl<T: Debug> Debug for Chained<T> {
763 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
764 f.debug_list().entries(self.iter()).finish()
765 }
766}
767
768impl<T> Chained<T> {
769 #[allow(clippy::uninit_assumed_init)]
770 fn empty() -> Self {
771 Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
778 }
779}
780
781impl<T> FromIterator<T> for Chained<T> {
782 fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
786 let mut result = Self::empty();
787 let mut len = 0u8;
788 for (idx, e) in elements.into_iter().enumerate() {
789 result.storage[idx] = MaybeUninit::new(e);
790 len += 1;
791 }
792 result.len = ChainLength::try_from(len).unwrap();
795 result
796 }
797}
798
799impl<T> IntoIterator for Chained<T> {
800 type Item = T;
801 type IntoIter = ChainedIter<T>;
802
803 fn into_iter(mut self) -> Self::IntoIter {
804 let len = self.len;
805 self.len = ChainLength::ZERO;
806 #[allow(clippy::uninit_assumed_init)]
813 let storage =
814 std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
815 ChainedIter { storage, len, consumed: 0 }
816 }
817}
818
819struct ChainedIter<T> {
820 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
821 len: ChainLength,
822 consumed: u8,
823}
824
825impl<T> Iterator for ChainedIter<T> {
826 type Item = T;
827
828 fn next(&mut self) -> Option<Self::Item> {
829 if self.consumed < self.len.get() {
830 let value = unsafe {
833 std::mem::replace(
834 &mut self.storage[usize::from(self.consumed)],
835 MaybeUninit::uninit(),
836 )
837 .assume_init()
838 };
839 self.consumed += 1;
840 Some(value)
841 } else {
842 None
843 }
844 }
845
846 fn size_hint(&self) -> (usize, Option<usize>) {
847 let len = usize::from(self.len.get() - self.consumed);
848 (len, Some(len))
849 }
850}
851
852impl<T> ExactSizeIterator for ChainedIter<T> {}
853
854impl<T> Drop for ChainedIter<T> {
855 fn drop(&mut self) {
856 unsafe {
858 std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
859 &mut self.storage[self.consumed.into()..self.len.into()],
860 ));
861 }
862 }
863}
864
865pub(in crate::session) struct AllocGuard<K: AllocKind> {
867 descs: Chained<DescId<K>>,
868 pool: Arc<Pool>,
869}
870
871impl<K: AllocKind> Debug for AllocGuard<K> {
872 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
873 let Self { descs, pool: _ } = self;
874 f.debug_struct("AllocGuard").field("descs", descs).finish()
875 }
876}
877
878impl<K: AllocKind> AllocGuard<K> {
879 fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
880 Self { descs, pool }
881 }
882
883 fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
885 self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
886 }
887
888 fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
890 let descriptors = &self.pool.descriptors;
891 self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
892 }
893
894 fn descriptor(&self) -> DescRef<'_, K> {
896 self.descriptors().next().expect("descriptors must not be empty")
897 }
898
899 fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
901 self.descriptors_mut().next().expect("descriptors must not be empty")
902 }
903}
904
905#[derive(Debug, Clone, Copy, PartialEq, Eq)]
906struct DescriptorLayout {
907 chain_length: ChainLength,
908 head_length: u16,
909 data_length: u32,
910 tail_length: u16,
911}
912
913impl AllocGuard<Tx> {
914 fn calculate_descriptor_layouts(&self, target_len: usize) -> Result<Chained<DescriptorLayout>> {
923 let len = self.len();
924 let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, .. } =
925 self.pool.buffer_layout;
926
927 let mut remaining_target = target_len;
928 (0..len)
929 .rev()
930 .map(|clen| {
931 let chain_length = ChainLength::try_from(clen).unwrap();
932 let head_length = if clen + 1 == len { min_tx_head } else { 0 };
933 let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
934
935 let available_bytes = u32::try_from(
938 buffer_length - usize::from(head_length) - usize::from(tail_length),
939 )
940 .unwrap();
941
942 let data_length = match u32::try_from(remaining_target) {
943 Ok(target) => {
944 if target < available_bytes {
945 let excess = available_bytes - target;
949 tail_length = u16::try_from(excess)
950 .ok_checked::<TryFromIntError>()
951 .and_then(|tail_adjustment| {
952 tail_length.checked_add(tail_adjustment)
953 })
954 .ok_or(Error::TxLength)?;
955 }
956 target.min(available_bytes)
957 }
958 Err(TryFromIntError { .. }) => available_bytes,
959 };
960
961 let data_length_usize =
962 usize::try_from(data_length).expect("u32 must fit in a usize");
963 remaining_target = remaining_target.saturating_sub(data_length_usize);
964
965 Ok::<_, Error>(DescriptorLayout {
966 chain_length,
967 head_length,
968 data_length,
969 tail_length,
970 })
971 })
972 .collect()
973 }
974
975 fn init(&mut self, requested_bytes: usize) -> Result<()> {
990 let min_tx_data = self.pool.buffer_layout.min_tx_data;
991 let target_len = requested_bytes.max(usize::from(min_tx_data));
992 let layouts = self.calculate_descriptor_layouts(target_len)?;
993
994 let mut remaining_requested = requested_bytes;
995
996 for (desc_id, DescriptorLayout { chain_length, head_length, data_length, tail_length }) in
997 self.descs.iter_mut().zip(layouts)
998 {
999 {
1001 let mut descriptor = self.pool.descriptors.borrow_mut(desc_id);
1002 descriptor.initialize(chain_length, head_length, data_length, tail_length);
1003 }
1004
1005 let data_length_usize = usize::try_from(data_length).expect("u32 must fit in a usize");
1006 let requested_in_part = std::cmp::min(remaining_requested, data_length_usize);
1007 let pad_in_part = data_length_usize - requested_in_part;
1008
1009 if pad_in_part > 0 {
1021 let slice = self.pool.get_slice_mut(desc_id);
1022 slice[requested_in_part..requested_in_part + pad_in_part].fill(0);
1023 }
1024
1025 remaining_requested -= requested_in_part;
1026 }
1027 Ok(())
1028 }
1029}
1030
1031impl<K: AllocKind> Drop for AllocGuard<K> {
1032 fn drop(&mut self) {
1033 if self.is_empty() {
1034 return;
1035 }
1036 K::free(private::Allocation(self));
1037 }
1038}
1039
1040impl<K: AllocKind> Deref for AllocGuard<K> {
1041 type Target = [DescId<K>];
1042
1043 fn deref(&self) -> &Self::Target {
1044 self.descs.deref()
1045 }
1046}
1047
1048impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
1049 fn from(alloc: AllocGuard<K>) -> Self {
1050 Self { alloc }
1051 }
1052}
1053
1054impl<T, K: AllocKind> Read for BufferIO<T, K>
1055where
1056 T: AsRef<[u8]>,
1057{
1058 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
1059 let read_len = self.read_at(self.pos, buf);
1060 self.pos += read_len;
1061 Ok(read_len)
1062 }
1063}
1064
1065impl<T> Write for BufferIO<T, Tx>
1066where
1067 T: AsMut<[u8]>,
1068{
1069 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1070 let write_len = self.write_at(self.pos, buf);
1071 self.pos += write_len;
1072 Ok(write_len)
1073 }
1074
1075 fn flush(&mut self) -> std::io::Result<()> {
1076 Ok(())
1077 }
1078}
1079
1080impl<T, K: AllocKind> Seek for BufferIO<T, K> {
1081 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
1082 let pos = match pos {
1083 SeekFrom::Start(offset) => offset,
1084 SeekFrom::End(offset) => {
1085 let end = i64::try_from(self.len).unwrap();
1086 u64::try_from(end.wrapping_add(offset)).unwrap()
1087 }
1088 SeekFrom::Current(offset) => {
1089 let current = i64::try_from(self.pos).map_err(|TryFromIntError { .. }| {
1090 std::io::Error::from(std::io::ErrorKind::InvalidInput)
1091 })?;
1092 u64::try_from(current.wrapping_add(offset)).unwrap()
1093 }
1094 };
1095 self.pos = usize::try_from(pos).map_err(|TryFromIntError { .. }| {
1096 std::io::Error::from(std::io::ErrorKind::InvalidInput)
1097 })?;
1098 Ok(pos)
1099 }
1100}
1101
1102struct TxAllocReq {
1104 sender: Sender<AllocGuard<Tx>>,
1105 size: ChainLength,
1106}
1107
1108impl TxAllocReq {
1109 fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1110 let (sender, receiver) = channel();
1111 (TxAllocReq { sender, size }, receiver)
1112 }
1113
1114 fn fulfill(self, guard: AllocGuard<Tx>) {
1122 let Self { sender, size: _ } = self;
1123 match sender.send(guard) {
1124 Ok(()) => (),
1125 Err(guard) => {
1126 drop(guard);
1129 }
1130 }
1131 }
1132}
1133
1134mod private {
1137 use super::{AllocKind, Rx, Tx};
1138 pub trait Sealed: 'static + Sized {}
1139 impl Sealed for Rx {}
1140 impl Sealed for Tx {}
1141
1142 pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1146}
1147
1148pub trait AllocKind: private::Sealed {
1151 const REFL: AllocKindRefl;
1153
1154 fn free(alloc: private::Allocation<'_, Self>);
1156}
1157
1158pub enum Tx {}
1160pub enum Rx {}
1162
1163pub enum AllocKindRefl {
1165 Tx,
1166 Rx,
1167}
1168
1169impl AllocKindRefl {
1170 pub(in crate::session) fn as_str(&self) -> &'static str {
1171 match self {
1172 AllocKindRefl::Tx => "Tx",
1173 AllocKindRefl::Rx => "Rx",
1174 }
1175 }
1176}
1177
1178impl AllocKind for Tx {
1179 const REFL: AllocKindRefl = AllocKindRefl::Tx;
1180
1181 fn free(alloc: private::Allocation<'_, Self>) {
1182 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1183 pool.free_tx(std::mem::replace(descs, Chained::empty()));
1184 }
1185}
1186
1187impl AllocKind for Rx {
1188 const REFL: AllocKindRefl = AllocKindRefl::Rx;
1189
1190 fn free(alloc: private::Allocation<'_, Self>) {
1191 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1192 pool.free_rx(std::mem::replace(descs, Chained::empty()));
1193 pool.rx_leases.rx_complete();
1194 }
1195}
1196
1197pub(in crate::session) struct RxLeaseHandlingState {
1199 can_watch_rx_leases: AtomicBool,
1200 rx_frame_counter: AtomicU64,
1210 rx_lease_waker: AtomicWaker,
1211}
1212
1213impl RxLeaseHandlingState {
1214 fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1215 Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1216 }
1217
1218 fn new_with_enabled(enabled: bool) -> Self {
1219 Self {
1220 can_watch_rx_leases: AtomicBool::new(enabled),
1221 rx_frame_counter: AtomicU64::new(0),
1222 rx_lease_waker: AtomicWaker::new(),
1223 }
1224 }
1225
1226 fn rx_complete(&self) {
1229 let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1230 let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1231
1232 if prev == u64::MAX {
1235 rx_lease_waker.wake();
1236 }
1237 }
1238}
1239
1240pub(in crate::session) trait RxLeaseHandlingStateContainer {
1243 fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1244}
1245
1246impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1247 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1248 self.borrow()
1249 }
1250}
1251
1252impl RxLeaseHandlingStateContainer for Arc<Pool> {
1253 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1254 &self.rx_leases
1255 }
1256}
1257
1258pub(in crate::session) struct RxLeaseWatcher<T> {
1260 state: T,
1261}
1262
1263impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1264 pub(in crate::session) fn new(state: T) -> Self {
1271 assert!(
1272 state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1273 "can't watch rx leases"
1274 );
1275 Self { state }
1276 }
1277
1278 pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1287 let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1296 self.state.lease_handling_state();
1297
1298 let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1299 let _guard = scopeguard::guard((), |()| {
1302 let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1303 });
1304
1305 if prev >= hold_until_frame {
1307 return;
1308 }
1309 let threshold = prev.wrapping_sub(hold_until_frame);
1312 futures::future::poll_fn(|cx| {
1313 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1314 if v < threshold {
1315 return Poll::Ready(());
1316 }
1317 rx_lease_waker.register(cx.waker());
1318 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1319 if v < threshold {
1320 return Poll::Ready(());
1321 }
1322 Poll::Pending
1323 })
1324 .await;
1325 }
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330
1331 use super::*;
1332
1333 use assert_matches::assert_matches;
1334 use fuchsia_async as fasync;
1335 use futures::future::FutureExt;
1336 use test_case::test_case;
1337
1338 use std::collections::HashSet;
1339 use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1340 use std::pin::pin;
1341 use std::task::{Poll, Waker};
1342
1343 const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1344 const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1345 const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1347 const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1348 const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1349 const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1350 * netdev::MAX_DESCRIPTOR_CHAIN as usize
1351 - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1352 - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1353
1354 const SENTINEL_BYTE: u8 = 0xab;
1355 const WRITE_BYTE: u8 = 1;
1356 const PAD_BYTE: u8 = 0;
1357
1358 const DEFAULT_CONFIG: Config = Config {
1359 buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1360 num_rx_buffers: DEFAULT_RX_BUFFERS,
1361 num_tx_buffers: DEFAULT_TX_BUFFERS,
1362 options: netdev::SessionFlags::empty(),
1363 buffer_layout: BufferLayout {
1364 length: DEFAULT_BUFFER_LENGTH.get(),
1365 min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1366 min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1367 min_tx_data: 0,
1368 },
1369 };
1370
1371 impl Pool {
1372 fn new_test_default() -> Arc<Self> {
1373 let (pool, _descriptors, _data) =
1374 Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1375 pool
1376 }
1377
1378 async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1379 self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1380 .await
1381 }
1382
1383 fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1384 self.alloc_tx_checked(n).now_or_never()
1385 }
1386
1387 fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1388 std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1389 }
1390
1391 fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1392 self.alloc_tx_buffer(num_bytes)
1393 .now_or_never()
1394 .transpose()
1395 .expect("invalid arguments for alloc_tx_buffer")
1396 }
1397
1398 fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1399 Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1400 }
1401
1402 fn fill_sentinel_bytes(&mut self) {
1403 unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1406 }
1407 }
1408
1409 impl Buffer<Tx> {
1410 fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1414 {
1415 let mut io = self.io_mut();
1416 assert_eq!(io.write_at(offset, &[WRITE_BYTE][..]), 1);
1417 }
1418 assert_eq!(self.len(), pad_size);
1419 const INIT_BYTE: u8 = 42;
1422 let mut read_buf = vec![INIT_BYTE; pad_size];
1423 assert_eq!(self.io().read_at(0, &mut read_buf[..]), read_buf.len());
1424 for (idx, byte) in read_buf.iter().enumerate() {
1425 if idx < offset {
1426 assert_eq!(*byte, SENTINEL_BYTE);
1427 } else if idx == offset {
1428 assert_eq!(*byte, WRITE_BYTE);
1429 } else {
1430 assert_eq!(*byte, PAD_BYTE);
1431 }
1432 }
1433 }
1434 }
1435
1436 impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1437 where
1438 K: AllocKind,
1439 I: ExactSizeIterator<Item = u16>,
1440 T: Copy + IntoIterator<IntoIter = I>,
1441 {
1442 fn eq(&self, other: &T) -> bool {
1443 let iter = other.into_iter();
1444 if usize::from(self.len) != iter.len() {
1445 return false;
1446 }
1447 self.iter().zip(iter).all(|(l, r)| l.get() == r)
1448 }
1449 }
1450
1451 impl Debug for TxAllocReq {
1452 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1453 let TxAllocReq { sender: _, size } = self;
1454 f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1455 }
1456 }
1457
1458 #[test]
1459 fn alloc_tx_distinct() {
1460 let pool = Pool::new_test_default();
1461 let allocated = pool.alloc_tx_all(1);
1462 assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1463 let distinct = allocated
1464 .iter()
1465 .map(|alloc| {
1466 assert_eq!(alloc.descs.len(), 1);
1467 alloc.descs[0].get()
1468 })
1469 .collect::<HashSet<u16>>();
1470 assert_eq!(allocated.len(), distinct.len());
1471 }
1472
1473 #[test]
1474 fn alloc_tx_free_len() {
1475 let pool = Pool::new_test_default();
1476 {
1477 let allocated = pool.alloc_tx_all(2);
1478 assert_eq!(
1479 allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1480 DEFAULT_TX_BUFFERS.get().into()
1481 );
1482 assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1483 }
1484 assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1485 }
1486
1487 #[test]
1488 fn alloc_tx_chain() {
1489 let pool = Pool::new_test_default();
1490 let allocated = pool.alloc_tx_all(3);
1491 assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1492 assert_matches!(pool.alloc_tx_now_or_never(3), None);
1493 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1494 }
1495
1496 #[test]
1497 fn alloc_tx_many() {
1498 let pool = Pool::new_test_default();
1499 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1500 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1501 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1502 let data_len = usize::try_from(data_len).unwrap();
1503 let mut buffers = pool
1504 .alloc_tx_buffers(data_len)
1505 .now_or_never()
1506 .expect("failed to alloc")
1507 .unwrap()
1508 .collect::<Result<Vec<_>>>()
1511 .expect("buffer error");
1512 assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1513
1514 assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1517
1518 assert_matches!(buffers.pop(), Some(_));
1520 let mut more_buffers =
1521 pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1522 let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1523 assert_matches!(more_buffers.next(), None);
1524 drop(buffer);
1527 assert_matches!(more_buffers.next(), None);
1528 }
1529
1530 #[test]
1531 fn alloc_tx_after_free() {
1532 let pool = Pool::new_test_default();
1533 let mut allocated = pool.alloc_tx_all(1);
1534 assert_matches!(pool.alloc_tx_now_or_never(2), None);
1535 {
1536 let _drained = allocated.drain(..2);
1537 }
1538 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1539 }
1540
1541 #[test]
1542 fn blocking_alloc_tx() {
1543 let mut executor = fasync::TestExecutor::new();
1544 let pool = Pool::new_test_default();
1545 let mut allocated = pool.alloc_tx_all(1);
1546 let alloc_fut = pool.alloc_tx_checked(1);
1547 let mut alloc_fut = pin!(alloc_fut);
1548 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1550 assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1552 let freed = allocated
1553 .pop()
1554 .expect("no fulfulled allocations")
1555 .iter()
1556 .map(|x| x.get())
1557 .collect::<Chained<_>>();
1558 let same_as_freed =
1559 |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1560 assert_matches!(
1562 &executor.run_until_stalled(&mut alloc_fut),
1563 Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1564 );
1565 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1567 }
1568
1569 #[test]
1570 fn blocking_alloc_tx_cancel_before_free() {
1571 let mut executor = fasync::TestExecutor::new();
1572 let pool = Pool::new_test_default();
1573 let mut allocated = pool.alloc_tx_all(1);
1574 {
1575 let alloc_fut = pool.alloc_tx_checked(1);
1576 let mut alloc_fut = pin!(alloc_fut);
1577 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1578 assert_matches!(
1579 pool.tx_alloc_state.lock().requests.as_slices(),
1580 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1581 );
1582 }
1583 assert_matches!(
1584 allocated.pop(),
1585 Some(AllocGuard { ref descs, pool: ref p })
1586 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1587 );
1588 let state = pool.tx_alloc_state.lock();
1589 assert_eq!(state.free_list.len, 1);
1590 assert!(state.requests.is_empty());
1591 }
1592
1593 #[test]
1594 fn blocking_alloc_tx_cancel_after_free() {
1595 let mut executor = fasync::TestExecutor::new();
1596 let pool = Pool::new_test_default();
1597 let mut allocated = pool.alloc_tx_all(1);
1598 {
1599 let alloc_fut = pool.alloc_tx_checked(1);
1600 let mut alloc_fut = pin!(alloc_fut);
1601 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1602 assert_matches!(
1603 pool.tx_alloc_state.lock().requests.as_slices(),
1604 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1605 );
1606 assert_matches!(
1607 allocated.pop(),
1608 Some(AllocGuard { ref descs, pool: ref p })
1609 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1610 );
1611 }
1612 let state = pool.tx_alloc_state.lock();
1613 assert_eq!(state.free_list.len, 1);
1614 assert!(state.requests.is_empty());
1615 }
1616
1617 #[test]
1618 fn multiple_blocking_alloc_tx_fulfill_order() {
1619 const TASKS_TOTAL: usize = 3;
1620 let mut executor = fasync::TestExecutor::new();
1621 let pool = Pool::new_test_default();
1622 let mut allocated = pool.alloc_tx_all(1);
1623 let mut alloc_futs = (1..=TASKS_TOTAL)
1624 .rev()
1625 .map(|x| {
1626 let pool = pool.clone();
1627 (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1628 })
1629 .collect::<Vec<_>>();
1630
1631 for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1632 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1633 assert_eq!(idx + *req_size, TASKS_TOTAL);
1635 }
1636 {
1637 let state = pool.tx_alloc_state.lock();
1638 assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1640 let mut requests = state.requests.iter();
1641 assert!(requests.next().unwrap().sender.is_canceled());
1644 assert!(requests.all(|req| !req.sender.is_canceled()))
1646 }
1647
1648 let mut to_free = Vec::new();
1649 let mut freed = 0;
1650 for free_size in (1..=TASKS_TOTAL).rev() {
1651 let (_req_size, mut task) = alloc_futs.remove(0);
1652 for _ in 1..free_size {
1653 freed += 1;
1654 assert_matches!(
1655 allocated.pop(),
1656 Some(AllocGuard { ref descs, pool: ref p })
1657 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1658 );
1659 assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1660 }
1661 freed += 1;
1662 assert_matches!(
1663 allocated.pop(),
1664 Some(AllocGuard { ref descs, pool: ref p })
1665 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1666 );
1667 match executor.run_until_stalled(&mut task) {
1668 Poll::Ready(alloc) => {
1669 assert_eq!(alloc.len(), free_size);
1670 to_free.push(alloc);
1672 }
1673 Poll::Pending => panic!("The request should be fulfilled"),
1674 }
1675 for (_req_size, task) in alloc_futs.iter_mut() {
1677 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1678 }
1679 }
1680 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1681 }
1682
1683 #[test]
1684 fn singleton_tx_layout() {
1685 let pool = Pool::new_test_default();
1686 let buffers = std::iter::from_fn(|| {
1687 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1688 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1689 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1690 pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1691 assert_eq!(buffer.alloc.descriptors().count(), 1);
1692 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1693 * u64::from(buffer.alloc[0].get());
1694 {
1695 let descriptor = buffer.alloc.descriptor();
1696 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1697 assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1698 assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1699 assert_eq!(descriptor.data_length(), data_len);
1700 assert_eq!(descriptor.offset(), offset);
1701 }
1702
1703 {
1704 let mut slices = buffer.parts();
1705 let slice = slices.next().expect("should have one slice");
1706 assert_matches!(slices.next(), None);
1707 assert_eq!(slice.len(), usize::try_from(data_len).unwrap());
1708 assert_eq!(
1709 slice.as_ptr(),
1710 pool.base.as_ptr().wrapping_add(
1711 usize::try_from(offset).unwrap()
1712 + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1713 )
1714 );
1715 }
1716 buffer
1717 })
1718 })
1719 .collect::<Vec<_>>();
1720 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1721 }
1722
1723 #[test]
1724 fn chained_tx_layout() {
1725 let pool = Pool::new_test_default();
1726 let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1727 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1728 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1729 let buffers = std::iter::from_fn(|| {
1730 pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1731 assert_eq!(buffer.parts().count(), 4);
1732 for (idx, (descriptor, slice)) in
1733 buffer.alloc.descriptors().zip(buffer.parts()).enumerate()
1734 {
1735 let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1736 let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1737 let tail_length = if chain_length == ChainLength::ZERO {
1738 DEFAULT_MIN_TX_BUFFER_TAIL
1739 } else {
1740 0
1741 };
1742 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1743 - u32::from(head_length)
1744 - u32::from(tail_length);
1745 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1746 * u64::from(buffer.alloc[idx].get());
1747 assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1748 assert_eq!(descriptor.head_length(), head_length);
1749 assert_eq!(descriptor.tail_length(), tail_length);
1750 assert_eq!(descriptor.offset(), offset);
1751 assert_eq!(descriptor.data_length(), data_len);
1752 if chain_length != ChainLength::ZERO {
1753 assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1754 }
1755
1756 assert_eq!(slice.len(), usize::try_from(data_len).unwrap());
1757 assert_eq!(
1758 slice.as_ptr(),
1759 pool.base.as_ptr().wrapping_add(
1760 usize::try_from(offset).unwrap() + usize::from(head_length),
1761 )
1762 );
1763 }
1764 buffer
1765 })
1766 })
1767 .collect::<Vec<_>>();
1768 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1769 }
1770
1771 #[test]
1772 fn rx_distinct() {
1773 let pool = Pool::new_test_default();
1774 let mut guard = pool.rx_pending.inner.lock();
1775 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1776 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1777 let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1778 assert_eq!(descs.len(), distinct.len());
1779 }
1780
1781 #[test]
1782 fn alloc_rx_layout() {
1783 let pool = Pool::new_test_default();
1784 let mut guard = pool.rx_pending.inner.lock();
1785 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1786 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1787 for desc in descs.iter() {
1788 let descriptor = pool.descriptors.borrow(desc);
1789 let offset =
1790 u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1791 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1792 assert_eq!(descriptor.head_length(), 0);
1793 assert_eq!(descriptor.tail_length(), 0);
1794 assert_eq!(descriptor.offset(), offset);
1795 assert_eq!(
1796 descriptor.data_length(),
1797 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1798 );
1799 }
1800 }
1801
1802 #[test]
1803 fn buffer_read_at_write_at() {
1804 let pool = Pool::new_test_default();
1805 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1806 let mut buffer =
1807 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1808 assert_eq!(buffer.parts().count(), 2);
1811 assert_eq!(buffer.len(), alloc_bytes);
1812 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1813 assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), write_buf.len());
1814 let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1815 assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), read_buf.len());
1816 for (idx, byte) in read_buf.iter().enumerate() {
1817 assert_eq!(*byte, write_buf[idx]);
1818 }
1819 }
1820
1821 #[test]
1822 fn buffer_write_at_short() {
1823 let pool = Pool::new_test_default();
1824 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1825 let mut buffer =
1826 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1827 assert_eq!(buffer.parts().count(), 2);
1828 assert_eq!(buffer.len(), alloc_bytes);
1829
1830 let write_buf = vec![WRITE_BYTE; alloc_bytes + 10];
1831
1832 assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), alloc_bytes);
1834
1835 let mut read_buf = vec![0; alloc_bytes];
1837 assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1838 for byte in read_buf.iter() {
1839 assert_eq!(*byte, WRITE_BYTE);
1840 }
1841
1842 assert_eq!(buffer.io_mut().write_at(alloc_bytes + 1, &write_buf[..]), 0);
1844
1845 let offset = alloc_bytes / 2;
1847 let expected_write = alloc_bytes - offset;
1848 let write_buf = vec![2; alloc_bytes]; assert_eq!(buffer.io_mut().write_at(offset, &write_buf[..]), expected_write);
1850
1851 let mut read_buf = vec![0; alloc_bytes];
1853 assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1854 for (idx, byte) in read_buf.iter().enumerate() {
1855 if idx < offset {
1856 assert_eq!(*byte, WRITE_BYTE);
1857 } else {
1858 assert_eq!(*byte, 2);
1859 }
1860 }
1861 }
1862
1863 #[test]
1864 fn buffer_read_at_short() {
1865 let pool = Pool::new_test_default();
1866 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1867 let mut buffer =
1868 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1869 assert_eq!(buffer.parts().count(), 2);
1870 assert_eq!(buffer.len(), alloc_bytes);
1871
1872 let write_buf = vec![WRITE_BYTE; alloc_bytes];
1873 assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), alloc_bytes);
1874
1875 let mut read_buf = vec![0xff; alloc_bytes + 10];
1877 assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1878 for (idx, byte) in read_buf.iter().enumerate() {
1879 if idx < alloc_bytes {
1880 assert_eq!(*byte, WRITE_BYTE);
1881 } else {
1882 assert_eq!(*byte, 0xff);
1883 }
1884 }
1885
1886 assert_eq!(buffer.io().read_at(alloc_bytes + 1, &mut read_buf[..]), 0);
1888
1889 let offset = alloc_bytes / 2;
1891 let expected_read = alloc_bytes - offset;
1892 let mut read_buf = vec![0xff; alloc_bytes];
1893 assert_eq!(buffer.io().read_at(offset, &mut read_buf[..]), expected_read);
1894 for (idx, byte) in read_buf.iter().enumerate() {
1895 if idx < expected_read {
1896 assert_eq!(*byte, WRITE_BYTE);
1897 } else {
1898 assert_eq!(*byte, 0xff);
1899 }
1900 }
1901 }
1902
1903 #[test]
1904 fn buffer_read_write_seek() {
1905 let pool = Pool::new_test_default();
1906 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1907 let mut buffer =
1908 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1909 assert_eq!(buffer.parts().count(), 2);
1912 assert_eq!(buffer.len(), alloc_bytes);
1913 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1914
1915 let mut io = buffer.io_mut();
1916
1917 assert_eq!(io.write(&write_buf[..]).expect("failed to write into buffer"), write_buf.len());
1918 const SEEK_FROM_END: usize = 64;
1919 const READ_LEN: usize = 12;
1920 assert_eq!(
1921 io.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1922 u64::try_from(io.len - SEEK_FROM_END).unwrap()
1923 );
1924 let mut read_buf = [0xff; READ_LEN];
1925 assert_eq!(io.read(&mut read_buf[..]).expect("failed to read from buffer"), read_buf.len());
1926 assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1927 }
1928
1929 #[test_case(32; "single buffer part")]
1930 #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1931 fn buffer_pad(pad_size: usize) {
1932 let mut pool = Pool::new_test_default();
1933 pool.set_min_tx_buffer_length(pad_size);
1934 for offset in 0..pad_size {
1935 Arc::get_mut(&mut pool)
1936 .expect("there are multiple owners of the underlying VMO")
1937 .fill_sentinel_bytes();
1938 let mut buffer =
1939 pool.alloc_tx_buffer_now_or_never(offset + 1).expect("failed to allocate buffer");
1940 buffer.check_write_and_pad(offset, pad_size);
1941 }
1942 }
1943
1944 #[test]
1945 fn buffer_pad_grow() {
1946 const BUFFER_PARTS: u8 = 3;
1947 let mut pool = Pool::new_test_default();
1948 let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1949 * u32::from(BUFFER_PARTS)
1950 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1951 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1952 pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1953 for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1954 Arc::get_mut(&mut pool)
1955 .expect("there are multiple owners of the underlying VMO")
1956 .fill_sentinel_bytes();
1957 let mut alloc =
1958 pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1959 alloc
1960 .init(usize::try_from(offset).unwrap() + 1)
1961 .expect("head/body/tail sizes are representable with u16/u32/u16");
1962 let mut buffer = Buffer::try_from(alloc).unwrap();
1963 buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1964 }
1965 }
1966
1967 #[test_case( 0; "writes at the beginning")]
1968 #[test_case( 15; "writes in the first part")]
1969 #[test_case( 75; "writes in the second part")]
1970 #[test_case(135; "writes in the third part")]
1971 #[test_case(195; "writes in the last part")]
1972 fn buffer_used(write_offset: usize) {
1973 let pool = Pool::new_test_default();
1974 let mut buffer =
1975 pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1976 let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1977 if i == 0 {
1978 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1979 } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1980 DEFAULT_BUFFER_LENGTH.get()
1981 } else {
1982 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1983 }
1984 });
1985 assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1986 assert_eq!(buffer.io_mut().write_at(write_offset, &[WRITE_BYTE][..]), 1);
1987 assert_eq!(
1990 buffer.parts().zip(expected_caps).fold(
1991 Some(write_offset),
1992 |offset, (slice, expected_cap)| {
1993 assert_eq!(slice.len(), expected_cap);
1994 match offset {
1995 Some(offset) => {
1996 if offset >= expected_cap {
1997 Some(offset - slice.len())
1998 } else {
1999 assert_eq!(slice[offset], WRITE_BYTE);
2000 None
2001 }
2002 }
2003 None => None,
2004 }
2005 }
2006 ),
2007 None
2008 );
2009 }
2010
2011 #[test]
2012 fn allocate_under_device_minimum() {
2013 const MIN_TX_DATA: usize = 32;
2014 const ALLOC_SIZE: usize = 16;
2015 const WRITE_BYTE: u8 = 0xff;
2016 const WRITE_SENTINAL_BYTE: u8 = 0xee;
2017 const READ_SENTINAL_BYTE: u8 = 0xdd;
2018 let mut config = DEFAULT_CONFIG;
2019 config.buffer_layout.min_tx_data = MIN_TX_DATA;
2020 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
2021 for mut buffer in Vec::from_iter(std::iter::from_fn({
2022 let pool = pool.clone();
2023 move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
2024 })) {
2025 assert_eq!(
2026 buffer.io_mut().write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]),
2027 MIN_TX_DATA
2028 );
2029 }
2030 let mut allocated =
2031 pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
2032 assert_eq!(allocated.len(), MIN_TX_DATA);
2033 const WRITE_BUF_SIZE: usize = MIN_TX_DATA + 1;
2034 assert_eq!(allocated.io_mut().write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]), MIN_TX_DATA);
2035 assert_eq!(allocated.io_mut().write_at(0, &[WRITE_BYTE; ALLOC_SIZE]), ALLOC_SIZE);
2036 assert_eq!(allocated.len(), MIN_TX_DATA);
2037 const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
2038 let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
2039 assert_eq!(allocated.io().read_at(0, &mut read_buf[..]), MIN_TX_DATA);
2040 assert_eq!(allocated.io().read_at(0, &mut read_buf[..MIN_TX_DATA]), MIN_TX_DATA);
2041 assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
2042 assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[WRITE_BYTE; ALLOC_SIZE][..]);
2043 assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
2044 }
2045
2046 #[test]
2047 fn invalid_tx_length() {
2048 let mut config = DEFAULT_CONFIG;
2049 config.buffer_layout.length = usize::from(u16::MAX) + 2;
2050 config.buffer_layout.min_tx_head = 0;
2051 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
2052 assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
2053 }
2054
2055 #[test]
2056 fn rx_leases() {
2057 let mut executor = fuchsia_async::TestExecutor::new();
2058 let state = RxLeaseHandlingState::new_with_enabled(true);
2059 let mut watcher = RxLeaseWatcher { state: &state };
2060
2061 {
2062 let mut fut = pin!(watcher.wait_until(0));
2063 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2064 }
2065 {
2066 state.rx_complete();
2067 let mut fut = pin!(watcher.wait_until(1));
2068 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2069 }
2070 {
2071 let mut fut = pin!(watcher.wait_until(0));
2072 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2073 }
2074 {
2075 let mut fut = pin!(watcher.wait_until(3));
2076 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2077 state.rx_complete();
2078 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2079 state.rx_complete();
2080 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2081 }
2082 let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
2085 {
2086 let mut fut = pin!(watcher.wait_until(10000));
2087 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2088 }
2089 let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
2090 assert_eq!(counter_before, counter_after);
2091 }
2092}