crossbeam_channel/flavors/
list.rs1use std::cell::UnsafeCell;
4use std::marker::PhantomData;
5use std::mem::MaybeUninit;
6use std::ptr;
7use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8use std::time::Instant;
9
10use crossbeam_utils::{Backoff, CachePadded};
11
12use crate::context::Context;
13use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14use crate::select::{Operation, SelectHandle, Selected, Token};
15use crate::waker::SyncWaker;
16
17const WRITE: usize = 1;
28const READ: usize = 2;
29const DESTROY: usize = 4;
30
31const LAP: usize = 32;
33const BLOCK_CAP: usize = LAP - 1;
35const SHIFT: usize = 1;
37const MARK_BIT: usize = 1;
41
42struct Slot<T> {
44 msg: UnsafeCell<MaybeUninit<T>>,
46
47 state: AtomicUsize,
49}
50
51impl<T> Slot<T> {
52 fn wait_write(&self) {
54 let backoff = Backoff::new();
55 while self.state.load(Ordering::Acquire) & WRITE == 0 {
56 backoff.snooze();
57 }
58 }
59}
60
61struct Block<T> {
65 next: AtomicPtr<Block<T>>,
67
68 slots: [Slot<T>; BLOCK_CAP],
70}
71
72impl<T> Block<T> {
73 fn new() -> Block<T> {
75 unsafe { MaybeUninit::zeroed().assume_init() }
82 }
83
84 fn wait_next(&self) -> *mut Block<T> {
86 let backoff = Backoff::new();
87 loop {
88 let next = self.next.load(Ordering::Acquire);
89 if !next.is_null() {
90 return next;
91 }
92 backoff.snooze();
93 }
94 }
95
96 unsafe fn destroy(this: *mut Block<T>, start: usize) {
98 for i in start..BLOCK_CAP - 1 {
101 let slot = (*this).slots.get_unchecked(i);
102
103 if slot.state.load(Ordering::Acquire) & READ == 0
105 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
106 {
107 return;
109 }
110 }
111
112 drop(Box::from_raw(this));
114 }
115}
116
117#[derive(Debug)]
119struct Position<T> {
120 index: AtomicUsize,
122
123 block: AtomicPtr<Block<T>>,
125}
126
127#[derive(Debug)]
129pub struct ListToken {
130 block: *const u8,
132
133 offset: usize,
135}
136
137impl Default for ListToken {
138 #[inline]
139 fn default() -> Self {
140 ListToken {
141 block: ptr::null(),
142 offset: 0,
143 }
144 }
145}
146
147pub(crate) struct Channel<T> {
155 head: CachePadded<Position<T>>,
157
158 tail: CachePadded<Position<T>>,
160
161 receivers: SyncWaker,
163
164 _marker: PhantomData<T>,
166}
167
168impl<T> Channel<T> {
169 pub(crate) fn new() -> Self {
171 Channel {
172 head: CachePadded::new(Position {
173 block: AtomicPtr::new(ptr::null_mut()),
174 index: AtomicUsize::new(0),
175 }),
176 tail: CachePadded::new(Position {
177 block: AtomicPtr::new(ptr::null_mut()),
178 index: AtomicUsize::new(0),
179 }),
180 receivers: SyncWaker::new(),
181 _marker: PhantomData,
182 }
183 }
184
185 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
187 Receiver(self)
188 }
189
190 pub(crate) fn sender(&self) -> Sender<'_, T> {
192 Sender(self)
193 }
194
195 fn start_send(&self, token: &mut Token) -> bool {
197 let backoff = Backoff::new();
198 let mut tail = self.tail.index.load(Ordering::Acquire);
199 let mut block = self.tail.block.load(Ordering::Acquire);
200 let mut next_block = None;
201
202 loop {
203 if tail & MARK_BIT != 0 {
205 token.list.block = ptr::null();
206 return true;
207 }
208
209 let offset = (tail >> SHIFT) % LAP;
211
212 if offset == BLOCK_CAP {
214 backoff.snooze();
215 tail = self.tail.index.load(Ordering::Acquire);
216 block = self.tail.block.load(Ordering::Acquire);
217 continue;
218 }
219
220 if offset + 1 == BLOCK_CAP && next_block.is_none() {
223 next_block = Some(Box::new(Block::<T>::new()));
224 }
225
226 if block.is_null() {
229 let new = Box::into_raw(Box::new(Block::<T>::new()));
230
231 if self
232 .tail
233 .block
234 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
235 .is_ok()
236 {
237 self.head.block.store(new, Ordering::Release);
238 block = new;
239 } else {
240 next_block = unsafe { Some(Box::from_raw(new)) };
241 tail = self.tail.index.load(Ordering::Acquire);
242 block = self.tail.block.load(Ordering::Acquire);
243 continue;
244 }
245 }
246
247 let new_tail = tail + (1 << SHIFT);
248
249 match self.tail.index.compare_exchange_weak(
251 tail,
252 new_tail,
253 Ordering::SeqCst,
254 Ordering::Acquire,
255 ) {
256 Ok(_) => unsafe {
257 if offset + 1 == BLOCK_CAP {
259 let next_block = Box::into_raw(next_block.unwrap());
260 self.tail.block.store(next_block, Ordering::Release);
261 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
262 (*block).next.store(next_block, Ordering::Release);
263 }
264
265 token.list.block = block as *const u8;
266 token.list.offset = offset;
267 return true;
268 },
269 Err(t) => {
270 tail = t;
271 block = self.tail.block.load(Ordering::Acquire);
272 backoff.spin();
273 }
274 }
275 }
276 }
277
278 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
280 if token.list.block.is_null() {
282 return Err(msg);
283 }
284
285 let block = token.list.block as *mut Block<T>;
287 let offset = token.list.offset;
288 let slot = (*block).slots.get_unchecked(offset);
289 slot.msg.get().write(MaybeUninit::new(msg));
290 slot.state.fetch_or(WRITE, Ordering::Release);
291
292 self.receivers.notify();
294 Ok(())
295 }
296
297 fn start_recv(&self, token: &mut Token) -> bool {
299 let backoff = Backoff::new();
300 let mut head = self.head.index.load(Ordering::Acquire);
301 let mut block = self.head.block.load(Ordering::Acquire);
302
303 loop {
304 let offset = (head >> SHIFT) % LAP;
306
307 if offset == BLOCK_CAP {
309 backoff.snooze();
310 head = self.head.index.load(Ordering::Acquire);
311 block = self.head.block.load(Ordering::Acquire);
312 continue;
313 }
314
315 let mut new_head = head + (1 << SHIFT);
316
317 if new_head & MARK_BIT == 0 {
318 atomic::fence(Ordering::SeqCst);
319 let tail = self.tail.index.load(Ordering::Relaxed);
320
321 if head >> SHIFT == tail >> SHIFT {
323 if tail & MARK_BIT != 0 {
325 token.list.block = ptr::null();
327 return true;
328 } else {
329 return false;
331 }
332 }
333
334 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
336 new_head |= MARK_BIT;
337 }
338 }
339
340 if block.is_null() {
343 backoff.snooze();
344 head = self.head.index.load(Ordering::Acquire);
345 block = self.head.block.load(Ordering::Acquire);
346 continue;
347 }
348
349 match self.head.index.compare_exchange_weak(
351 head,
352 new_head,
353 Ordering::SeqCst,
354 Ordering::Acquire,
355 ) {
356 Ok(_) => unsafe {
357 if offset + 1 == BLOCK_CAP {
359 let next = (*block).wait_next();
360 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
361 if !(*next).next.load(Ordering::Relaxed).is_null() {
362 next_index |= MARK_BIT;
363 }
364
365 self.head.block.store(next, Ordering::Release);
366 self.head.index.store(next_index, Ordering::Release);
367 }
368
369 token.list.block = block as *const u8;
370 token.list.offset = offset;
371 return true;
372 },
373 Err(h) => {
374 head = h;
375 block = self.head.block.load(Ordering::Acquire);
376 backoff.spin();
377 }
378 }
379 }
380 }
381
382 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
384 if token.list.block.is_null() {
385 return Err(());
387 }
388
389 let block = token.list.block as *mut Block<T>;
391 let offset = token.list.offset;
392 let slot = (*block).slots.get_unchecked(offset);
393 slot.wait_write();
394 let msg = slot.msg.get().read().assume_init();
395
396 if offset + 1 == BLOCK_CAP {
399 Block::destroy(block, 0);
400 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
401 Block::destroy(block, offset + 1);
402 }
403
404 Ok(msg)
405 }
406
407 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
409 self.send(msg, None).map_err(|err| match err {
410 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
411 SendTimeoutError::Timeout(_) => unreachable!(),
412 })
413 }
414
415 pub(crate) fn send(
417 &self,
418 msg: T,
419 _deadline: Option<Instant>,
420 ) -> Result<(), SendTimeoutError<T>> {
421 let token = &mut Token::default();
422 assert!(self.start_send(token));
423 unsafe {
424 self.write(token, msg)
425 .map_err(SendTimeoutError::Disconnected)
426 }
427 }
428
429 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
431 let token = &mut Token::default();
432
433 if self.start_recv(token) {
434 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
435 } else {
436 Err(TryRecvError::Empty)
437 }
438 }
439
440 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
442 let token = &mut Token::default();
443 loop {
444 let backoff = Backoff::new();
446 loop {
447 if self.start_recv(token) {
448 unsafe {
449 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
450 }
451 }
452
453 if backoff.is_completed() {
454 break;
455 } else {
456 backoff.snooze();
457 }
458 }
459
460 if let Some(d) = deadline {
461 if Instant::now() >= d {
462 return Err(RecvTimeoutError::Timeout);
463 }
464 }
465
466 Context::with(|cx| {
468 let oper = Operation::hook(token);
469 self.receivers.register(oper, cx);
470
471 if !self.is_empty() || self.is_disconnected() {
473 let _ = cx.try_select(Selected::Aborted);
474 }
475
476 let sel = cx.wait_until(deadline);
478
479 match sel {
480 Selected::Waiting => unreachable!(),
481 Selected::Aborted | Selected::Disconnected => {
482 self.receivers.unregister(oper).unwrap();
483 }
486 Selected::Operation(_) => {}
487 }
488 });
489 }
490 }
491
492 pub(crate) fn len(&self) -> usize {
494 loop {
495 let mut tail = self.tail.index.load(Ordering::SeqCst);
497 let mut head = self.head.index.load(Ordering::SeqCst);
498
499 if self.tail.index.load(Ordering::SeqCst) == tail {
501 tail &= !((1 << SHIFT) - 1);
503 head &= !((1 << SHIFT) - 1);
504
505 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
507 tail = tail.wrapping_add(1 << SHIFT);
508 }
509 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
510 head = head.wrapping_add(1 << SHIFT);
511 }
512
513 let lap = (head >> SHIFT) / LAP;
515 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
516 head = head.wrapping_sub((lap * LAP) << SHIFT);
517
518 tail >>= SHIFT;
520 head >>= SHIFT;
521
522 return tail - head - tail / LAP;
524 }
525 }
526 }
527
528 pub(crate) fn capacity(&self) -> Option<usize> {
530 None
531 }
532
533 pub(crate) fn disconnect_senders(&self) -> bool {
537 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
538
539 if tail & MARK_BIT == 0 {
540 self.receivers.disconnect();
541 true
542 } else {
543 false
544 }
545 }
546
547 pub(crate) fn disconnect_receivers(&self) -> bool {
551 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
552
553 if tail & MARK_BIT == 0 {
554 self.discard_all_messages();
557 true
558 } else {
559 false
560 }
561 }
562
563 fn discard_all_messages(&self) {
567 let backoff = Backoff::new();
568 let mut tail = self.tail.index.load(Ordering::Acquire);
569 loop {
570 let offset = (tail >> SHIFT) % LAP;
571 if offset != BLOCK_CAP {
572 break;
573 }
574
575 backoff.snooze();
579 tail = self.tail.index.load(Ordering::Acquire);
580 }
581
582 let mut head = self.head.index.load(Ordering::Acquire);
583 let mut block = self.head.block.load(Ordering::Acquire);
584
585 unsafe {
586 while head >> SHIFT != tail >> SHIFT {
588 let offset = (head >> SHIFT) % LAP;
589
590 if offset < BLOCK_CAP {
591 let slot = (*block).slots.get_unchecked(offset);
593 slot.wait_write();
594 let p = &mut *slot.msg.get();
595 p.as_mut_ptr().drop_in_place();
596 } else {
597 (*block).wait_next();
598 let next = (*block).next.load(Ordering::Acquire);
600 drop(Box::from_raw(block));
601 block = next;
602 }
603
604 head = head.wrapping_add(1 << SHIFT);
605 }
606
607 if !block.is_null() {
609 drop(Box::from_raw(block));
610 }
611 }
612 head &= !MARK_BIT;
613 self.head.block.store(ptr::null_mut(), Ordering::Release);
614 self.head.index.store(head, Ordering::Release);
615 }
616
617 pub(crate) fn is_disconnected(&self) -> bool {
619 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
620 }
621
622 pub(crate) fn is_empty(&self) -> bool {
624 let head = self.head.index.load(Ordering::SeqCst);
625 let tail = self.tail.index.load(Ordering::SeqCst);
626 head >> SHIFT == tail >> SHIFT
627 }
628
629 pub(crate) fn is_full(&self) -> bool {
631 false
632 }
633}
634
635impl<T> Drop for Channel<T> {
636 fn drop(&mut self) {
637 let mut head = self.head.index.load(Ordering::Relaxed);
638 let mut tail = self.tail.index.load(Ordering::Relaxed);
639 let mut block = self.head.block.load(Ordering::Relaxed);
640
641 head &= !((1 << SHIFT) - 1);
643 tail &= !((1 << SHIFT) - 1);
644
645 unsafe {
646 while head != tail {
648 let offset = (head >> SHIFT) % LAP;
649
650 if offset < BLOCK_CAP {
651 let slot = (*block).slots.get_unchecked(offset);
653 let p = &mut *slot.msg.get();
654 p.as_mut_ptr().drop_in_place();
655 } else {
656 let next = (*block).next.load(Ordering::Relaxed);
658 drop(Box::from_raw(block));
659 block = next;
660 }
661
662 head = head.wrapping_add(1 << SHIFT);
663 }
664
665 if !block.is_null() {
667 drop(Box::from_raw(block));
668 }
669 }
670 }
671}
672
673pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
675
676pub(crate) struct Sender<'a, T>(&'a Channel<T>);
678
679impl<T> SelectHandle for Receiver<'_, T> {
680 fn try_select(&self, token: &mut Token) -> bool {
681 self.0.start_recv(token)
682 }
683
684 fn deadline(&self) -> Option<Instant> {
685 None
686 }
687
688 fn register(&self, oper: Operation, cx: &Context) -> bool {
689 self.0.receivers.register(oper, cx);
690 self.is_ready()
691 }
692
693 fn unregister(&self, oper: Operation) {
694 self.0.receivers.unregister(oper);
695 }
696
697 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
698 self.try_select(token)
699 }
700
701 fn is_ready(&self) -> bool {
702 !self.0.is_empty() || self.0.is_disconnected()
703 }
704
705 fn watch(&self, oper: Operation, cx: &Context) -> bool {
706 self.0.receivers.watch(oper, cx);
707 self.is_ready()
708 }
709
710 fn unwatch(&self, oper: Operation) {
711 self.0.receivers.unwatch(oper);
712 }
713}
714
715impl<T> SelectHandle for Sender<'_, T> {
716 fn try_select(&self, token: &mut Token) -> bool {
717 self.0.start_send(token)
718 }
719
720 fn deadline(&self) -> Option<Instant> {
721 None
722 }
723
724 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
725 self.is_ready()
726 }
727
728 fn unregister(&self, _oper: Operation) {}
729
730 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
731 self.try_select(token)
732 }
733
734 fn is_ready(&self) -> bool {
735 true
736 }
737
738 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
739 self.is_ready()
740 }
741
742 fn unwatch(&self, _oper: Operation) {}
743}