crossbeam_channel/flavors/
array.rs1use std::cell::UnsafeCell;
12use std::marker::PhantomData;
13use std::mem::{self, MaybeUninit};
14use std::ptr;
15use std::sync::atomic::{self, AtomicUsize, Ordering};
16use std::time::Instant;
17
18use crossbeam_utils::{Backoff, CachePadded};
19
20use crate::context::Context;
21use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
22use crate::select::{Operation, SelectHandle, Selected, Token};
23use crate::waker::SyncWaker;
24
25struct Slot<T> {
27 stamp: AtomicUsize,
29
30 msg: UnsafeCell<MaybeUninit<T>>,
32}
33
34#[derive(Debug)]
36pub struct ArrayToken {
37 slot: *const u8,
39
40 stamp: usize,
42}
43
44impl Default for ArrayToken {
45 #[inline]
46 fn default() -> Self {
47 ArrayToken {
48 slot: ptr::null(),
49 stamp: 0,
50 }
51 }
52}
53
54pub(crate) struct Channel<T> {
56 head: CachePadded<AtomicUsize>,
64
65 tail: CachePadded<AtomicUsize>,
73
74 buffer: *mut Slot<T>,
76
77 cap: usize,
79
80 one_lap: usize,
82
83 mark_bit: usize,
85
86 senders: SyncWaker,
88
89 receivers: SyncWaker,
91
92 _marker: PhantomData<T>,
94}
95
96impl<T> Channel<T> {
97 pub(crate) fn with_capacity(cap: usize) -> Self {
99 assert!(cap > 0, "capacity must be positive");
100
101 let mark_bit = (cap + 1).next_power_of_two();
103 let one_lap = mark_bit * 2;
104
105 let head = 0;
107 let tail = 0;
109
110 let buffer = {
113 let mut boxed: Box<[Slot<T>]> = (0..cap)
114 .map(|i| {
115 Slot {
117 stamp: AtomicUsize::new(i),
118 msg: UnsafeCell::new(MaybeUninit::uninit()),
119 }
120 })
121 .collect();
122 let ptr = boxed.as_mut_ptr();
123 mem::forget(boxed);
124 ptr
125 };
126
127 Channel {
128 buffer,
129 cap,
130 one_lap,
131 mark_bit,
132 head: CachePadded::new(AtomicUsize::new(head)),
133 tail: CachePadded::new(AtomicUsize::new(tail)),
134 senders: SyncWaker::new(),
135 receivers: SyncWaker::new(),
136 _marker: PhantomData,
137 }
138 }
139
140 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
142 Receiver(self)
143 }
144
145 pub(crate) fn sender(&self) -> Sender<'_, T> {
147 Sender(self)
148 }
149
150 fn start_send(&self, token: &mut Token) -> bool {
152 let backoff = Backoff::new();
153 let mut tail = self.tail.load(Ordering::Relaxed);
154
155 loop {
156 if tail & self.mark_bit != 0 {
158 token.array.slot = ptr::null();
159 token.array.stamp = 0;
160 return true;
161 }
162
163 let index = tail & (self.mark_bit - 1);
165 let lap = tail & !(self.one_lap - 1);
166
167 let slot = unsafe { &*self.buffer.add(index) };
169 let stamp = slot.stamp.load(Ordering::Acquire);
170
171 if tail == stamp {
173 let new_tail = if index + 1 < self.cap {
174 tail + 1
177 } else {
178 lap.wrapping_add(self.one_lap)
181 };
182
183 match self.tail.compare_exchange_weak(
185 tail,
186 new_tail,
187 Ordering::SeqCst,
188 Ordering::Relaxed,
189 ) {
190 Ok(_) => {
191 token.array.slot = slot as *const Slot<T> as *const u8;
193 token.array.stamp = tail + 1;
194 return true;
195 }
196 Err(t) => {
197 tail = t;
198 backoff.spin();
199 }
200 }
201 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
202 atomic::fence(Ordering::SeqCst);
203 let head = self.head.load(Ordering::Relaxed);
204
205 if head.wrapping_add(self.one_lap) == tail {
207 return false;
209 }
210
211 backoff.spin();
212 tail = self.tail.load(Ordering::Relaxed);
213 } else {
214 backoff.snooze();
216 tail = self.tail.load(Ordering::Relaxed);
217 }
218 }
219 }
220
221 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
223 if token.array.slot.is_null() {
225 return Err(msg);
226 }
227
228 let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
229
230 slot.msg.get().write(MaybeUninit::new(msg));
232 slot.stamp.store(token.array.stamp, Ordering::Release);
233
234 self.receivers.notify();
236 Ok(())
237 }
238
239 fn start_recv(&self, token: &mut Token) -> bool {
241 let backoff = Backoff::new();
242 let mut head = self.head.load(Ordering::Relaxed);
243
244 loop {
245 let index = head & (self.mark_bit - 1);
247 let lap = head & !(self.one_lap - 1);
248
249 let slot = unsafe { &*self.buffer.add(index) };
251 let stamp = slot.stamp.load(Ordering::Acquire);
252
253 if head + 1 == stamp {
255 let new = if index + 1 < self.cap {
256 head + 1
259 } else {
260 lap.wrapping_add(self.one_lap)
263 };
264
265 match self.head.compare_exchange_weak(
267 head,
268 new,
269 Ordering::SeqCst,
270 Ordering::Relaxed,
271 ) {
272 Ok(_) => {
273 token.array.slot = slot as *const Slot<T> as *const u8;
275 token.array.stamp = head.wrapping_add(self.one_lap);
276 return true;
277 }
278 Err(h) => {
279 head = h;
280 backoff.spin();
281 }
282 }
283 } else if stamp == head {
284 atomic::fence(Ordering::SeqCst);
285 let tail = self.tail.load(Ordering::Relaxed);
286
287 if (tail & !self.mark_bit) == head {
289 if tail & self.mark_bit != 0 {
291 token.array.slot = ptr::null();
293 token.array.stamp = 0;
294 return true;
295 } else {
296 return false;
298 }
299 }
300
301 backoff.spin();
302 head = self.head.load(Ordering::Relaxed);
303 } else {
304 backoff.snooze();
306 head = self.head.load(Ordering::Relaxed);
307 }
308 }
309 }
310
311 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
313 if token.array.slot.is_null() {
314 return Err(());
316 }
317
318 let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
319
320 let msg = slot.msg.get().read().assume_init();
322 slot.stamp.store(token.array.stamp, Ordering::Release);
323
324 self.senders.notify();
326 Ok(msg)
327 }
328
329 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
331 let token = &mut Token::default();
332 if self.start_send(token) {
333 unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
334 } else {
335 Err(TrySendError::Full(msg))
336 }
337 }
338
339 pub(crate) fn send(
341 &self,
342 msg: T,
343 deadline: Option<Instant>,
344 ) -> Result<(), SendTimeoutError<T>> {
345 let token = &mut Token::default();
346 loop {
347 let backoff = Backoff::new();
349 loop {
350 if self.start_send(token) {
351 let res = unsafe { self.write(token, msg) };
352 return res.map_err(SendTimeoutError::Disconnected);
353 }
354
355 if backoff.is_completed() {
356 break;
357 } else {
358 backoff.snooze();
359 }
360 }
361
362 if let Some(d) = deadline {
363 if Instant::now() >= d {
364 return Err(SendTimeoutError::Timeout(msg));
365 }
366 }
367
368 Context::with(|cx| {
369 let oper = Operation::hook(token);
371 self.senders.register(oper, cx);
372
373 if !self.is_full() || self.is_disconnected() {
375 let _ = cx.try_select(Selected::Aborted);
376 }
377
378 let sel = cx.wait_until(deadline);
380
381 match sel {
382 Selected::Waiting => unreachable!(),
383 Selected::Aborted | Selected::Disconnected => {
384 self.senders.unregister(oper).unwrap();
385 }
386 Selected::Operation(_) => {}
387 }
388 });
389 }
390 }
391
392 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
394 let token = &mut Token::default();
395
396 if self.start_recv(token) {
397 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
398 } else {
399 Err(TryRecvError::Empty)
400 }
401 }
402
403 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
405 let token = &mut Token::default();
406 loop {
407 let backoff = Backoff::new();
409 loop {
410 if self.start_recv(token) {
411 let res = unsafe { self.read(token) };
412 return res.map_err(|_| RecvTimeoutError::Disconnected);
413 }
414
415 if backoff.is_completed() {
416 break;
417 } else {
418 backoff.snooze();
419 }
420 }
421
422 if let Some(d) = deadline {
423 if Instant::now() >= d {
424 return Err(RecvTimeoutError::Timeout);
425 }
426 }
427
428 Context::with(|cx| {
429 let oper = Operation::hook(token);
431 self.receivers.register(oper, cx);
432
433 if !self.is_empty() || self.is_disconnected() {
435 let _ = cx.try_select(Selected::Aborted);
436 }
437
438 let sel = cx.wait_until(deadline);
440
441 match sel {
442 Selected::Waiting => unreachable!(),
443 Selected::Aborted | Selected::Disconnected => {
444 self.receivers.unregister(oper).unwrap();
445 }
448 Selected::Operation(_) => {}
449 }
450 });
451 }
452 }
453
454 pub(crate) fn len(&self) -> usize {
456 loop {
457 let tail = self.tail.load(Ordering::SeqCst);
459 let head = self.head.load(Ordering::SeqCst);
460
461 if self.tail.load(Ordering::SeqCst) == tail {
463 let hix = head & (self.mark_bit - 1);
464 let tix = tail & (self.mark_bit - 1);
465
466 return if hix < tix {
467 tix - hix
468 } else if hix > tix {
469 self.cap - hix + tix
470 } else if (tail & !self.mark_bit) == head {
471 0
472 } else {
473 self.cap
474 };
475 }
476 }
477 }
478
479 #[allow(clippy::unnecessary_wraps)] pub(crate) fn capacity(&self) -> Option<usize> {
482 Some(self.cap)
483 }
484
485 pub(crate) fn disconnect(&self) -> bool {
489 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
490
491 if tail & self.mark_bit == 0 {
492 self.senders.disconnect();
493 self.receivers.disconnect();
494 true
495 } else {
496 false
497 }
498 }
499
500 pub(crate) fn is_disconnected(&self) -> bool {
502 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
503 }
504
505 pub(crate) fn is_empty(&self) -> bool {
507 let head = self.head.load(Ordering::SeqCst);
508 let tail = self.tail.load(Ordering::SeqCst);
509
510 (tail & !self.mark_bit) == head
515 }
516
517 pub(crate) fn is_full(&self) -> bool {
519 let tail = self.tail.load(Ordering::SeqCst);
520 let head = self.head.load(Ordering::SeqCst);
521
522 head.wrapping_add(self.one_lap) == tail & !self.mark_bit
527 }
528}
529
530impl<T> Drop for Channel<T> {
531 fn drop(&mut self) {
532 let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
534
535 for i in 0..self.len() {
537 let index = if hix + i < self.cap {
539 hix + i
540 } else {
541 hix + i - self.cap
542 };
543
544 unsafe {
545 let p = {
546 let slot = &mut *self.buffer.add(index);
547 let msg = &mut *slot.msg.get();
548 msg.as_mut_ptr()
549 };
550 p.drop_in_place();
551 }
552 }
553
554 unsafe {
556 let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>];
560 Box::from_raw(ptr);
561 }
562 }
563}
564
565pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
567
568pub(crate) struct Sender<'a, T>(&'a Channel<T>);
570
571impl<T> SelectHandle for Receiver<'_, T> {
572 fn try_select(&self, token: &mut Token) -> bool {
573 self.0.start_recv(token)
574 }
575
576 fn deadline(&self) -> Option<Instant> {
577 None
578 }
579
580 fn register(&self, oper: Operation, cx: &Context) -> bool {
581 self.0.receivers.register(oper, cx);
582 self.is_ready()
583 }
584
585 fn unregister(&self, oper: Operation) {
586 self.0.receivers.unregister(oper);
587 }
588
589 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
590 self.try_select(token)
591 }
592
593 fn is_ready(&self) -> bool {
594 !self.0.is_empty() || self.0.is_disconnected()
595 }
596
597 fn watch(&self, oper: Operation, cx: &Context) -> bool {
598 self.0.receivers.watch(oper, cx);
599 self.is_ready()
600 }
601
602 fn unwatch(&self, oper: Operation) {
603 self.0.receivers.unwatch(oper);
604 }
605}
606
607impl<T> SelectHandle for Sender<'_, T> {
608 fn try_select(&self, token: &mut Token) -> bool {
609 self.0.start_send(token)
610 }
611
612 fn deadline(&self) -> Option<Instant> {
613 None
614 }
615
616 fn register(&self, oper: Operation, cx: &Context) -> bool {
617 self.0.senders.register(oper, cx);
618 self.is_ready()
619 }
620
621 fn unregister(&self, oper: Operation) {
622 self.0.senders.unregister(oper);
623 }
624
625 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
626 self.try_select(token)
627 }
628
629 fn is_ready(&self) -> bool {
630 !self.0.is_full() || self.0.is_disconnected()
631 }
632
633 fn watch(&self, oper: Operation, cx: &Context) -> bool {
634 self.0.senders.watch(oper, cx);
635 self.is_ready()
636 }
637
638 fn unwatch(&self, oper: Operation) {
639 self.0.senders.unwatch(oper);
640 }
641}