crossbeam_deque/
deque.rs

1use std::cell::{Cell, UnsafeCell};
2use std::cmp;
3use std::fmt;
4use std::iter::FromIterator;
5use std::marker::PhantomData;
6use std::mem::{self, MaybeUninit};
7use std::ptr;
8use std::sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11use crate::epoch::{self, Atomic, Owned};
12use crate::utils::{Backoff, CachePadded};
13
14// Minimum buffer capacity.
15const MIN_CAP: usize = 64;
16// Maximum number of tasks that can be stolen in `steal_batch()` and `steal_batch_and_pop()`.
17const MAX_BATCH: usize = 32;
18// If a buffer of at least this size is retired, thread-local garbage is flushed so that it gets
19// deallocated as soon as possible.
20const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
21
22/// A buffer that holds tasks in a worker queue.
23///
24/// This is just a pointer to the buffer and its length - dropping an instance of this struct will
25/// *not* deallocate the buffer.
26struct Buffer<T> {
27    /// Pointer to the allocated memory.
28    ptr: *mut T,
29
30    /// Capacity of the buffer. Always a power of two.
31    cap: usize,
32}
33
34unsafe impl<T> Send for Buffer<T> {}
35
36impl<T> Buffer<T> {
37    /// Allocates a new buffer with the specified capacity.
38    fn alloc(cap: usize) -> Buffer<T> {
39        debug_assert_eq!(cap, cap.next_power_of_two());
40
41        let mut v = Vec::with_capacity(cap);
42        let ptr = v.as_mut_ptr();
43        mem::forget(v);
44
45        Buffer { ptr, cap }
46    }
47
48    /// Deallocates the buffer.
49    unsafe fn dealloc(self) {
50        drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
51    }
52
53    /// Returns a pointer to the task at the specified `index`.
54    unsafe fn at(&self, index: isize) -> *mut T {
55        // `self.cap` is always a power of two.
56        self.ptr.offset(index & (self.cap - 1) as isize)
57    }
58
59    /// Writes `task` into the specified `index`.
60    ///
61    /// This method might be concurrently called with another `read` at the same index, which is
62    /// technically speaking a data race and therefore UB. We should use an atomic store here, but
63    /// that would be more expensive and difficult to implement generically for all types `T`.
64    /// Hence, as a hack, we use a volatile write instead.
65    unsafe fn write(&self, index: isize, task: T) {
66        ptr::write_volatile(self.at(index), task)
67    }
68
69    /// Reads a task from the specified `index`.
70    ///
71    /// This method might be concurrently called with another `write` at the same index, which is
72    /// technically speaking a data race and therefore UB. We should use an atomic load here, but
73    /// that would be more expensive and difficult to implement generically for all types `T`.
74    /// Hence, as a hack, we use a volatile write instead.
75    unsafe fn read(&self, index: isize) -> T {
76        ptr::read_volatile(self.at(index))
77    }
78}
79
80impl<T> Clone for Buffer<T> {
81    fn clone(&self) -> Buffer<T> {
82        Buffer {
83            ptr: self.ptr,
84            cap: self.cap,
85        }
86    }
87}
88
89impl<T> Copy for Buffer<T> {}
90
91/// Internal queue data shared between the worker and stealers.
92///
93/// The implementation is based on the following work:
94///
95/// 1. [Chase and Lev. Dynamic circular work-stealing deque. SPAA 2005.][chase-lev]
96/// 2. [Le, Pop, Cohen, and Nardelli. Correct and efficient work-stealing for weak memory models.
97///    PPoPP 2013.][weak-mem]
98/// 3. [Norris and Demsky. CDSchecker: checking concurrent data structures written with C/C++
99///    atomics. OOPSLA 2013.][checker]
100///
101/// [chase-lev]: https://dl.acm.org/citation.cfm?id=1073974
102/// [weak-mem]: https://dl.acm.org/citation.cfm?id=2442524
103/// [checker]: https://dl.acm.org/citation.cfm?id=2509514
104struct Inner<T> {
105    /// The front index.
106    front: AtomicIsize,
107
108    /// The back index.
109    back: AtomicIsize,
110
111    /// The underlying buffer.
112    buffer: CachePadded<Atomic<Buffer<T>>>,
113}
114
115impl<T> Drop for Inner<T> {
116    fn drop(&mut self) {
117        // Load the back index, front index, and buffer.
118        let b = self.back.load(Ordering::Relaxed);
119        let f = self.front.load(Ordering::Relaxed);
120
121        unsafe {
122            let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
123
124            // Go through the buffer from front to back and drop all tasks in the queue.
125            let mut i = f;
126            while i != b {
127                buffer.deref().at(i).drop_in_place();
128                i = i.wrapping_add(1);
129            }
130
131            // Free the memory allocated by the buffer.
132            buffer.into_owned().into_box().dealloc();
133        }
134    }
135}
136
137/// Worker queue flavor: FIFO or LIFO.
138#[derive(Clone, Copy, Debug, Eq, PartialEq)]
139enum Flavor {
140    /// The first-in first-out flavor.
141    Fifo,
142
143    /// The last-in first-out flavor.
144    Lifo,
145}
146
147/// A worker queue.
148///
149/// This is a FIFO or LIFO queue that is owned by a single thread, but other threads may steal
150/// tasks from it. Task schedulers typically create a single worker queue per thread.
151///
152/// # Examples
153///
154/// A FIFO worker:
155///
156/// ```
157/// use crossbeam_deque::{Steal, Worker};
158///
159/// let w = Worker::new_fifo();
160/// let s = w.stealer();
161///
162/// w.push(1);
163/// w.push(2);
164/// w.push(3);
165///
166/// assert_eq!(s.steal(), Steal::Success(1));
167/// assert_eq!(w.pop(), Some(2));
168/// assert_eq!(w.pop(), Some(3));
169/// ```
170///
171/// A LIFO worker:
172///
173/// ```
174/// use crossbeam_deque::{Steal, Worker};
175///
176/// let w = Worker::new_lifo();
177/// let s = w.stealer();
178///
179/// w.push(1);
180/// w.push(2);
181/// w.push(3);
182///
183/// assert_eq!(s.steal(), Steal::Success(1));
184/// assert_eq!(w.pop(), Some(3));
185/// assert_eq!(w.pop(), Some(2));
186/// ```
187pub struct Worker<T> {
188    /// A reference to the inner representation of the queue.
189    inner: Arc<CachePadded<Inner<T>>>,
190
191    /// A copy of `inner.buffer` for quick access.
192    buffer: Cell<Buffer<T>>,
193
194    /// The flavor of the queue.
195    flavor: Flavor,
196
197    /// Indicates that the worker cannot be shared among threads.
198    _marker: PhantomData<*mut ()>, // !Send + !Sync
199}
200
201unsafe impl<T: Send> Send for Worker<T> {}
202
203impl<T> Worker<T> {
204    /// Creates a FIFO worker queue.
205    ///
206    /// Tasks are pushed and popped from opposite ends.
207    ///
208    /// # Examples
209    ///
210    /// ```
211    /// use crossbeam_deque::Worker;
212    ///
213    /// let w = Worker::<i32>::new_fifo();
214    /// ```
215    pub fn new_fifo() -> Worker<T> {
216        let buffer = Buffer::alloc(MIN_CAP);
217
218        let inner = Arc::new(CachePadded::new(Inner {
219            front: AtomicIsize::new(0),
220            back: AtomicIsize::new(0),
221            buffer: CachePadded::new(Atomic::new(buffer)),
222        }));
223
224        Worker {
225            inner,
226            buffer: Cell::new(buffer),
227            flavor: Flavor::Fifo,
228            _marker: PhantomData,
229        }
230    }
231
232    /// Creates a LIFO worker queue.
233    ///
234    /// Tasks are pushed and popped from the same end.
235    ///
236    /// # Examples
237    ///
238    /// ```
239    /// use crossbeam_deque::Worker;
240    ///
241    /// let w = Worker::<i32>::new_lifo();
242    /// ```
243    pub fn new_lifo() -> Worker<T> {
244        let buffer = Buffer::alloc(MIN_CAP);
245
246        let inner = Arc::new(CachePadded::new(Inner {
247            front: AtomicIsize::new(0),
248            back: AtomicIsize::new(0),
249            buffer: CachePadded::new(Atomic::new(buffer)),
250        }));
251
252        Worker {
253            inner,
254            buffer: Cell::new(buffer),
255            flavor: Flavor::Lifo,
256            _marker: PhantomData,
257        }
258    }
259
260    /// Creates a stealer for this queue.
261    ///
262    /// The returned stealer can be shared among threads and cloned.
263    ///
264    /// # Examples
265    ///
266    /// ```
267    /// use crossbeam_deque::Worker;
268    ///
269    /// let w = Worker::<i32>::new_lifo();
270    /// let s = w.stealer();
271    /// ```
272    pub fn stealer(&self) -> Stealer<T> {
273        Stealer {
274            inner: self.inner.clone(),
275            flavor: self.flavor,
276        }
277    }
278
279    /// Resizes the internal buffer to the new capacity of `new_cap`.
280    #[cold]
281    unsafe fn resize(&self, new_cap: usize) {
282        // Load the back index, front index, and buffer.
283        let b = self.inner.back.load(Ordering::Relaxed);
284        let f = self.inner.front.load(Ordering::Relaxed);
285        let buffer = self.buffer.get();
286
287        // Allocate a new buffer and copy data from the old buffer to the new one.
288        let new = Buffer::alloc(new_cap);
289        let mut i = f;
290        while i != b {
291            ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
292            i = i.wrapping_add(1);
293        }
294
295        let guard = &epoch::pin();
296
297        // Replace the old buffer with the new one.
298        self.buffer.replace(new);
299        let old =
300            self.inner
301                .buffer
302                .swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
303
304        // Destroy the old buffer later.
305        guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
306
307        // If the buffer is very large, then flush the thread-local garbage in order to deallocate
308        // it as soon as possible.
309        if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
310            guard.flush();
311        }
312    }
313
314    /// Reserves enough capacity so that `reserve_cap` tasks can be pushed without growing the
315    /// buffer.
316    fn reserve(&self, reserve_cap: usize) {
317        if reserve_cap > 0 {
318            // Compute the current length.
319            let b = self.inner.back.load(Ordering::Relaxed);
320            let f = self.inner.front.load(Ordering::SeqCst);
321            let len = b.wrapping_sub(f) as usize;
322
323            // The current capacity.
324            let cap = self.buffer.get().cap;
325
326            // Is there enough capacity to push `reserve_cap` tasks?
327            if cap - len < reserve_cap {
328                // Keep doubling the capacity as much as is needed.
329                let mut new_cap = cap * 2;
330                while new_cap - len < reserve_cap {
331                    new_cap *= 2;
332                }
333
334                // Resize the buffer.
335                unsafe {
336                    self.resize(new_cap);
337                }
338            }
339        }
340    }
341
342    /// Returns `true` if the queue is empty.
343    ///
344    /// ```
345    /// use crossbeam_deque::Worker;
346    ///
347    /// let w = Worker::new_lifo();
348    ///
349    /// assert!(w.is_empty());
350    /// w.push(1);
351    /// assert!(!w.is_empty());
352    /// ```
353    pub fn is_empty(&self) -> bool {
354        let b = self.inner.back.load(Ordering::Relaxed);
355        let f = self.inner.front.load(Ordering::SeqCst);
356        b.wrapping_sub(f) <= 0
357    }
358
359    /// Returns the number of tasks in the deque.
360    ///
361    /// ```
362    /// use crossbeam_deque::Worker;
363    ///
364    /// let w = Worker::new_lifo();
365    ///
366    /// assert_eq!(w.len(), 0);
367    /// w.push(1);
368    /// assert_eq!(w.len(), 1);
369    /// w.push(1);
370    /// assert_eq!(w.len(), 2);
371    /// ```
372    pub fn len(&self) -> usize {
373        let b = self.inner.back.load(Ordering::Relaxed);
374        let f = self.inner.front.load(Ordering::SeqCst);
375        b.wrapping_sub(f).max(0) as usize
376    }
377
378    /// Pushes a task into the queue.
379    ///
380    /// # Examples
381    ///
382    /// ```
383    /// use crossbeam_deque::Worker;
384    ///
385    /// let w = Worker::new_lifo();
386    /// w.push(1);
387    /// w.push(2);
388    /// ```
389    pub fn push(&self, task: T) {
390        // Load the back index, front index, and buffer.
391        let b = self.inner.back.load(Ordering::Relaxed);
392        let f = self.inner.front.load(Ordering::Acquire);
393        let mut buffer = self.buffer.get();
394
395        // Calculate the length of the queue.
396        let len = b.wrapping_sub(f);
397
398        // Is the queue full?
399        if len >= buffer.cap as isize {
400            // Yes. Grow the underlying buffer.
401            unsafe {
402                self.resize(2 * buffer.cap);
403            }
404            buffer = self.buffer.get();
405        }
406
407        // Write `task` into the slot.
408        unsafe {
409            buffer.write(b, task);
410        }
411
412        atomic::fence(Ordering::Release);
413
414        // Increment the back index.
415        //
416        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
417        // races because it doesn't understand fences.
418        self.inner.back.store(b.wrapping_add(1), Ordering::Release);
419    }
420
421    /// Pops a task from the queue.
422    ///
423    /// # Examples
424    ///
425    /// ```
426    /// use crossbeam_deque::Worker;
427    ///
428    /// let w = Worker::new_fifo();
429    /// w.push(1);
430    /// w.push(2);
431    ///
432    /// assert_eq!(w.pop(), Some(1));
433    /// assert_eq!(w.pop(), Some(2));
434    /// assert_eq!(w.pop(), None);
435    /// ```
436    pub fn pop(&self) -> Option<T> {
437        // Load the back and front index.
438        let b = self.inner.back.load(Ordering::Relaxed);
439        let f = self.inner.front.load(Ordering::Relaxed);
440
441        // Calculate the length of the queue.
442        let len = b.wrapping_sub(f);
443
444        // Is the queue empty?
445        if len <= 0 {
446            return None;
447        }
448
449        match self.flavor {
450            // Pop from the front of the queue.
451            Flavor::Fifo => {
452                // Try incrementing the front index to pop the task.
453                let f = self.inner.front.fetch_add(1, Ordering::SeqCst);
454                let new_f = f.wrapping_add(1);
455
456                if b.wrapping_sub(new_f) < 0 {
457                    self.inner.front.store(f, Ordering::Relaxed);
458                    return None;
459                }
460
461                unsafe {
462                    // Read the popped task.
463                    let buffer = self.buffer.get();
464                    let task = buffer.read(f);
465
466                    // Shrink the buffer if `len - 1` is less than one fourth of the capacity.
467                    if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
468                        self.resize(buffer.cap / 2);
469                    }
470
471                    Some(task)
472                }
473            }
474
475            // Pop from the back of the queue.
476            Flavor::Lifo => {
477                // Decrement the back index.
478                let b = b.wrapping_sub(1);
479                self.inner.back.store(b, Ordering::Relaxed);
480
481                atomic::fence(Ordering::SeqCst);
482
483                // Load the front index.
484                let f = self.inner.front.load(Ordering::Relaxed);
485
486                // Compute the length after the back index was decremented.
487                let len = b.wrapping_sub(f);
488
489                if len < 0 {
490                    // The queue is empty. Restore the back index to the original task.
491                    self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
492                    None
493                } else {
494                    // Read the task to be popped.
495                    let buffer = self.buffer.get();
496                    let mut task = unsafe { Some(buffer.read(b)) };
497
498                    // Are we popping the last task from the queue?
499                    if len == 0 {
500                        // Try incrementing the front index.
501                        if self
502                            .inner
503                            .front
504                            .compare_exchange(
505                                f,
506                                f.wrapping_add(1),
507                                Ordering::SeqCst,
508                                Ordering::Relaxed,
509                            )
510                            .is_err()
511                        {
512                            // Failed. We didn't pop anything.
513                            mem::forget(task.take());
514                        }
515
516                        // Restore the back index to the original task.
517                        self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
518                    } else {
519                        // Shrink the buffer if `len` is less than one fourth of the capacity.
520                        if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
521                            unsafe {
522                                self.resize(buffer.cap / 2);
523                            }
524                        }
525                    }
526
527                    task
528                }
529            }
530        }
531    }
532}
533
534impl<T> fmt::Debug for Worker<T> {
535    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
536        f.pad("Worker { .. }")
537    }
538}
539
540/// A stealer handle of a worker queue.
541///
542/// Stealers can be shared among threads.
543///
544/// Task schedulers typically have a single worker queue per worker thread.
545///
546/// # Examples
547///
548/// ```
549/// use crossbeam_deque::{Steal, Worker};
550///
551/// let w = Worker::new_lifo();
552/// w.push(1);
553/// w.push(2);
554///
555/// let s = w.stealer();
556/// assert_eq!(s.steal(), Steal::Success(1));
557/// assert_eq!(s.steal(), Steal::Success(2));
558/// assert_eq!(s.steal(), Steal::Empty);
559/// ```
560pub struct Stealer<T> {
561    /// A reference to the inner representation of the queue.
562    inner: Arc<CachePadded<Inner<T>>>,
563
564    /// The flavor of the queue.
565    flavor: Flavor,
566}
567
568unsafe impl<T: Send> Send for Stealer<T> {}
569unsafe impl<T: Send> Sync for Stealer<T> {}
570
571impl<T> Stealer<T> {
572    /// Returns `true` if the queue is empty.
573    ///
574    /// ```
575    /// use crossbeam_deque::Worker;
576    ///
577    /// let w = Worker::new_lifo();
578    /// let s = w.stealer();
579    ///
580    /// assert!(s.is_empty());
581    /// w.push(1);
582    /// assert!(!s.is_empty());
583    /// ```
584    pub fn is_empty(&self) -> bool {
585        let f = self.inner.front.load(Ordering::Acquire);
586        atomic::fence(Ordering::SeqCst);
587        let b = self.inner.back.load(Ordering::Acquire);
588        b.wrapping_sub(f) <= 0
589    }
590
591    /// Returns the number of tasks in the deque.
592    ///
593    /// ```
594    /// use crossbeam_deque::Worker;
595    ///
596    /// let w = Worker::new_lifo();
597    /// let s = w.stealer();
598    ///
599    /// assert_eq!(s.len(), 0);
600    /// w.push(1);
601    /// assert_eq!(s.len(), 1);
602    /// w.push(2);
603    /// assert_eq!(s.len(), 2);
604    /// ```
605    pub fn len(&self) -> usize {
606        let f = self.inner.front.load(Ordering::Acquire);
607        atomic::fence(Ordering::SeqCst);
608        let b = self.inner.back.load(Ordering::Acquire);
609        b.wrapping_sub(f).max(0) as usize
610    }
611
612    /// Steals a task from the queue.
613    ///
614    /// # Examples
615    ///
616    /// ```
617    /// use crossbeam_deque::{Steal, Worker};
618    ///
619    /// let w = Worker::new_lifo();
620    /// w.push(1);
621    /// w.push(2);
622    ///
623    /// let s = w.stealer();
624    /// assert_eq!(s.steal(), Steal::Success(1));
625    /// assert_eq!(s.steal(), Steal::Success(2));
626    /// ```
627    pub fn steal(&self) -> Steal<T> {
628        // Load the front index.
629        let f = self.inner.front.load(Ordering::Acquire);
630
631        // A SeqCst fence is needed here.
632        //
633        // If the current thread is already pinned (reentrantly), we must manually issue the
634        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
635        // have to.
636        if epoch::is_pinned() {
637            atomic::fence(Ordering::SeqCst);
638        }
639
640        let guard = &epoch::pin();
641
642        // Load the back index.
643        let b = self.inner.back.load(Ordering::Acquire);
644
645        // Is the queue empty?
646        if b.wrapping_sub(f) <= 0 {
647            return Steal::Empty;
648        }
649
650        // Load the buffer and read the task at the front.
651        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
652        let task = unsafe { buffer.deref().read(f) };
653
654        // Try incrementing the front index to steal the task.
655        // If the buffer has been swapped or the increment fails, we retry.
656        if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
657            || self
658                .inner
659                .front
660                .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
661                .is_err()
662        {
663            // We didn't steal this task, forget it.
664            mem::forget(task);
665            return Steal::Retry;
666        }
667
668        // Return the stolen task.
669        Steal::Success(task)
670    }
671
672    /// Steals a batch of tasks and pushes them into another worker.
673    ///
674    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
675    /// steal around half of the tasks in the queue, but also not more than some constant limit.
676    ///
677    /// # Examples
678    ///
679    /// ```
680    /// use crossbeam_deque::Worker;
681    ///
682    /// let w1 = Worker::new_fifo();
683    /// w1.push(1);
684    /// w1.push(2);
685    /// w1.push(3);
686    /// w1.push(4);
687    ///
688    /// let s = w1.stealer();
689    /// let w2 = Worker::new_fifo();
690    ///
691    /// let _ = s.steal_batch(&w2);
692    /// assert_eq!(w2.pop(), Some(1));
693    /// assert_eq!(w2.pop(), Some(2));
694    /// ```
695    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
696        if Arc::ptr_eq(&self.inner, &dest.inner) {
697            if dest.is_empty() {
698                return Steal::Empty;
699            } else {
700                return Steal::Success(());
701            }
702        }
703
704        // Load the front index.
705        let mut f = self.inner.front.load(Ordering::Acquire);
706
707        // A SeqCst fence is needed here.
708        //
709        // If the current thread is already pinned (reentrantly), we must manually issue the
710        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
711        // have to.
712        if epoch::is_pinned() {
713            atomic::fence(Ordering::SeqCst);
714        }
715
716        let guard = &epoch::pin();
717
718        // Load the back index.
719        let b = self.inner.back.load(Ordering::Acquire);
720
721        // Is the queue empty?
722        let len = b.wrapping_sub(f);
723        if len <= 0 {
724            return Steal::Empty;
725        }
726
727        // Reserve capacity for the stolen batch.
728        let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH);
729        dest.reserve(batch_size);
730        let mut batch_size = batch_size as isize;
731
732        // Get the destination buffer and back index.
733        let dest_buffer = dest.buffer.get();
734        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
735
736        // Load the buffer.
737        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
738
739        match self.flavor {
740            // Steal a batch of tasks from the front at once.
741            Flavor::Fifo => {
742                // Copy the batch from the source to the destination buffer.
743                match dest.flavor {
744                    Flavor::Fifo => {
745                        for i in 0..batch_size {
746                            unsafe {
747                                let task = buffer.deref().read(f.wrapping_add(i));
748                                dest_buffer.write(dest_b.wrapping_add(i), task);
749                            }
750                        }
751                    }
752                    Flavor::Lifo => {
753                        for i in 0..batch_size {
754                            unsafe {
755                                let task = buffer.deref().read(f.wrapping_add(i));
756                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
757                            }
758                        }
759                    }
760                }
761
762                // Try incrementing the front index to steal the batch.
763                // If the buffer has been swapped or the increment fails, we retry.
764                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
765                    || self
766                        .inner
767                        .front
768                        .compare_exchange(
769                            f,
770                            f.wrapping_add(batch_size),
771                            Ordering::SeqCst,
772                            Ordering::Relaxed,
773                        )
774                        .is_err()
775                {
776                    return Steal::Retry;
777                }
778
779                dest_b = dest_b.wrapping_add(batch_size);
780            }
781
782            // Steal a batch of tasks from the front one by one.
783            Flavor::Lifo => {
784                // This loop may modify the batch_size, which triggers a clippy lint warning.
785                // Use a new variable to avoid the warning, and to make it clear we aren't
786                // modifying the loop exit condition during iteration.
787                let original_batch_size = batch_size;
788
789                for i in 0..original_batch_size {
790                    // If this is not the first steal, check whether the queue is empty.
791                    if i > 0 {
792                        // We've already got the current front index. Now execute the fence to
793                        // synchronize with other threads.
794                        atomic::fence(Ordering::SeqCst);
795
796                        // Load the back index.
797                        let b = self.inner.back.load(Ordering::Acquire);
798
799                        // Is the queue empty?
800                        if b.wrapping_sub(f) <= 0 {
801                            batch_size = i;
802                            break;
803                        }
804                    }
805
806                    // Read the task at the front.
807                    let task = unsafe { buffer.deref().read(f) };
808
809                    // Try incrementing the front index to steal the task.
810                    // If the buffer has been swapped or the increment fails, we retry.
811                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
812                        || self
813                            .inner
814                            .front
815                            .compare_exchange(
816                                f,
817                                f.wrapping_add(1),
818                                Ordering::SeqCst,
819                                Ordering::Relaxed,
820                            )
821                            .is_err()
822                    {
823                        // We didn't steal this task, forget it and break from the loop.
824                        mem::forget(task);
825                        batch_size = i;
826                        break;
827                    }
828
829                    // Write the stolen task into the destination buffer.
830                    unsafe {
831                        dest_buffer.write(dest_b, task);
832                    }
833
834                    // Move the source front index and the destination back index one step forward.
835                    f = f.wrapping_add(1);
836                    dest_b = dest_b.wrapping_add(1);
837                }
838
839                // If we didn't steal anything, the operation needs to be retried.
840                if batch_size == 0 {
841                    return Steal::Retry;
842                }
843
844                // If stealing into a FIFO queue, stolen tasks need to be reversed.
845                if dest.flavor == Flavor::Fifo {
846                    for i in 0..batch_size / 2 {
847                        unsafe {
848                            let i1 = dest_b.wrapping_sub(batch_size - i);
849                            let i2 = dest_b.wrapping_sub(i + 1);
850                            let t1 = dest_buffer.read(i1);
851                            let t2 = dest_buffer.read(i2);
852                            dest_buffer.write(i1, t2);
853                            dest_buffer.write(i2, t1);
854                        }
855                    }
856                }
857            }
858        }
859
860        atomic::fence(Ordering::Release);
861
862        // Update the back index in the destination queue.
863        //
864        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
865        // races because it doesn't understand fences.
866        dest.inner.back.store(dest_b, Ordering::Release);
867
868        // Return with success.
869        Steal::Success(())
870    }
871
872    /// Steals a batch of tasks, pushes them into another worker, and pops a task from that worker.
873    ///
874    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
875    /// steal around half of the tasks in the queue, but also not more than some constant limit.
876    ///
877    /// # Examples
878    ///
879    /// ```
880    /// use crossbeam_deque::{Steal, Worker};
881    ///
882    /// let w1 = Worker::new_fifo();
883    /// w1.push(1);
884    /// w1.push(2);
885    /// w1.push(3);
886    /// w1.push(4);
887    ///
888    /// let s = w1.stealer();
889    /// let w2 = Worker::new_fifo();
890    ///
891    /// assert_eq!(s.steal_batch_and_pop(&w2), Steal::Success(1));
892    /// assert_eq!(w2.pop(), Some(2));
893    /// ```
894    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
895        if Arc::ptr_eq(&self.inner, &dest.inner) {
896            match dest.pop() {
897                None => return Steal::Empty,
898                Some(task) => return Steal::Success(task),
899            }
900        }
901
902        // Load the front index.
903        let mut f = self.inner.front.load(Ordering::Acquire);
904
905        // A SeqCst fence is needed here.
906        //
907        // If the current thread is already pinned (reentrantly), we must manually issue the
908        // fence. Otherwise, the following pinning will issue the fence anyway, so we don't
909        // have to.
910        if epoch::is_pinned() {
911            atomic::fence(Ordering::SeqCst);
912        }
913
914        let guard = &epoch::pin();
915
916        // Load the back index.
917        let b = self.inner.back.load(Ordering::Acquire);
918
919        // Is the queue empty?
920        let len = b.wrapping_sub(f);
921        if len <= 0 {
922            return Steal::Empty;
923        }
924
925        // Reserve capacity for the stolen batch.
926        let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1);
927        dest.reserve(batch_size);
928        let mut batch_size = batch_size as isize;
929
930        // Get the destination buffer and back index.
931        let dest_buffer = dest.buffer.get();
932        let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
933
934        // Load the buffer
935        let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
936
937        // Read the task at the front.
938        let mut task = unsafe { buffer.deref().read(f) };
939
940        match self.flavor {
941            // Steal a batch of tasks from the front at once.
942            Flavor::Fifo => {
943                // Copy the batch from the source to the destination buffer.
944                match dest.flavor {
945                    Flavor::Fifo => {
946                        for i in 0..batch_size {
947                            unsafe {
948                                let task = buffer.deref().read(f.wrapping_add(i + 1));
949                                dest_buffer.write(dest_b.wrapping_add(i), task);
950                            }
951                        }
952                    }
953                    Flavor::Lifo => {
954                        for i in 0..batch_size {
955                            unsafe {
956                                let task = buffer.deref().read(f.wrapping_add(i + 1));
957                                dest_buffer.write(dest_b.wrapping_add(batch_size - 1 - i), task);
958                            }
959                        }
960                    }
961                }
962
963                // Try incrementing the front index to steal the task.
964                // If the buffer has been swapped or the increment fails, we retry.
965                if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
966                    || self
967                        .inner
968                        .front
969                        .compare_exchange(
970                            f,
971                            f.wrapping_add(batch_size + 1),
972                            Ordering::SeqCst,
973                            Ordering::Relaxed,
974                        )
975                        .is_err()
976                {
977                    // We didn't steal this task, forget it.
978                    mem::forget(task);
979                    return Steal::Retry;
980                }
981
982                dest_b = dest_b.wrapping_add(batch_size);
983            }
984
985            // Steal a batch of tasks from the front one by one.
986            Flavor::Lifo => {
987                // Try incrementing the front index to steal the task.
988                if self
989                    .inner
990                    .front
991                    .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
992                    .is_err()
993                {
994                    // We didn't steal this task, forget it.
995                    mem::forget(task);
996                    return Steal::Retry;
997                }
998
999                // Move the front index one step forward.
1000                f = f.wrapping_add(1);
1001
1002                // Repeat the same procedure for the batch steals.
1003                //
1004                // This loop may modify the batch_size, which triggers a clippy lint warning.
1005                // Use a new variable to avoid the warning, and to make it clear we aren't
1006                // modifying the loop exit condition during iteration.
1007                let original_batch_size = batch_size;
1008                for i in 0..original_batch_size {
1009                    // We've already got the current front index. Now execute the fence to
1010                    // synchronize with other threads.
1011                    atomic::fence(Ordering::SeqCst);
1012
1013                    // Load the back index.
1014                    let b = self.inner.back.load(Ordering::Acquire);
1015
1016                    // Is the queue empty?
1017                    if b.wrapping_sub(f) <= 0 {
1018                        batch_size = i;
1019                        break;
1020                    }
1021
1022                    // Read the task at the front.
1023                    let tmp = unsafe { buffer.deref().read(f) };
1024
1025                    // Try incrementing the front index to steal the task.
1026                    // If the buffer has been swapped or the increment fails, we retry.
1027                    if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
1028                        || self
1029                            .inner
1030                            .front
1031                            .compare_exchange(
1032                                f,
1033                                f.wrapping_add(1),
1034                                Ordering::SeqCst,
1035                                Ordering::Relaxed,
1036                            )
1037                            .is_err()
1038                    {
1039                        // We didn't steal this task, forget it and break from the loop.
1040                        mem::forget(tmp);
1041                        batch_size = i;
1042                        break;
1043                    }
1044
1045                    // Write the previously stolen task into the destination buffer.
1046                    unsafe {
1047                        dest_buffer.write(dest_b, mem::replace(&mut task, tmp));
1048                    }
1049
1050                    // Move the source front index and the destination back index one step forward.
1051                    f = f.wrapping_add(1);
1052                    dest_b = dest_b.wrapping_add(1);
1053                }
1054
1055                // If stealing into a FIFO queue, stolen tasks need to be reversed.
1056                if dest.flavor == Flavor::Fifo {
1057                    for i in 0..batch_size / 2 {
1058                        unsafe {
1059                            let i1 = dest_b.wrapping_sub(batch_size - i);
1060                            let i2 = dest_b.wrapping_sub(i + 1);
1061                            let t1 = dest_buffer.read(i1);
1062                            let t2 = dest_buffer.read(i2);
1063                            dest_buffer.write(i1, t2);
1064                            dest_buffer.write(i2, t1);
1065                        }
1066                    }
1067                }
1068            }
1069        }
1070
1071        atomic::fence(Ordering::Release);
1072
1073        // Update the back index in the destination queue.
1074        //
1075        // This ordering could be `Relaxed`, but then thread sanitizer would falsely report data
1076        // races because it doesn't understand fences.
1077        dest.inner.back.store(dest_b, Ordering::Release);
1078
1079        // Return with success.
1080        Steal::Success(task)
1081    }
1082}
1083
1084impl<T> Clone for Stealer<T> {
1085    fn clone(&self) -> Stealer<T> {
1086        Stealer {
1087            inner: self.inner.clone(),
1088            flavor: self.flavor,
1089        }
1090    }
1091}
1092
1093impl<T> fmt::Debug for Stealer<T> {
1094    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1095        f.pad("Stealer { .. }")
1096    }
1097}
1098
1099// Bits indicating the state of a slot:
1100// * If a task has been written into the slot, `WRITE` is set.
1101// * If a task has been read from the slot, `READ` is set.
1102// * If the block is being destroyed, `DESTROY` is set.
1103const WRITE: usize = 1;
1104const READ: usize = 2;
1105const DESTROY: usize = 4;
1106
1107// Each block covers one "lap" of indices.
1108const LAP: usize = 64;
1109// The maximum number of values a block can hold.
1110const BLOCK_CAP: usize = LAP - 1;
1111// How many lower bits are reserved for metadata.
1112const SHIFT: usize = 1;
1113// Indicates that the block is not the last one.
1114const HAS_NEXT: usize = 1;
1115
1116/// A slot in a block.
1117struct Slot<T> {
1118    /// The task.
1119    task: UnsafeCell<MaybeUninit<T>>,
1120
1121    /// The state of the slot.
1122    state: AtomicUsize,
1123}
1124
1125impl<T> Slot<T> {
1126    /// Waits until a task is written into the slot.
1127    fn wait_write(&self) {
1128        let backoff = Backoff::new();
1129        while self.state.load(Ordering::Acquire) & WRITE == 0 {
1130            backoff.snooze();
1131        }
1132    }
1133}
1134
1135/// A block in a linked list.
1136///
1137/// Each block in the list can hold up to `BLOCK_CAP` values.
1138struct Block<T> {
1139    /// The next block in the linked list.
1140    next: AtomicPtr<Block<T>>,
1141
1142    /// Slots for values.
1143    slots: [Slot<T>; BLOCK_CAP],
1144}
1145
1146impl<T> Block<T> {
1147    /// Creates an empty block that starts at `start_index`.
1148    fn new() -> Block<T> {
1149        // SAFETY: This is safe because:
1150        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
1151        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
1152        //  [3] `Slot::task` (UnsafeCell) may be safely zero initialized because it
1153        //       holds a MaybeUninit.
1154        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
1155        unsafe { MaybeUninit::zeroed().assume_init() }
1156    }
1157
1158    /// Waits until the next pointer is set.
1159    fn wait_next(&self) -> *mut Block<T> {
1160        let backoff = Backoff::new();
1161        loop {
1162            let next = self.next.load(Ordering::Acquire);
1163            if !next.is_null() {
1164                return next;
1165            }
1166            backoff.snooze();
1167        }
1168    }
1169
1170    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
1171    unsafe fn destroy(this: *mut Block<T>, count: usize) {
1172        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
1173        // begun destruction of the block.
1174        for i in (0..count).rev() {
1175            let slot = (*this).slots.get_unchecked(i);
1176
1177            // Mark the `DESTROY` bit if a thread is still using the slot.
1178            if slot.state.load(Ordering::Acquire) & READ == 0
1179                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
1180            {
1181                // If a thread is still using the slot, it will continue destruction of the block.
1182                return;
1183            }
1184        }
1185
1186        // No thread is using the block, now it is safe to destroy it.
1187        drop(Box::from_raw(this));
1188    }
1189}
1190
1191/// A position in a queue.
1192struct Position<T> {
1193    /// The index in the queue.
1194    index: AtomicUsize,
1195
1196    /// The block in the linked list.
1197    block: AtomicPtr<Block<T>>,
1198}
1199
1200/// An injector queue.
1201///
1202/// This is a FIFO queue that can be shared among multiple threads. Task schedulers typically have
1203/// a single injector queue, which is the entry point for new tasks.
1204///
1205/// # Examples
1206///
1207/// ```
1208/// use crossbeam_deque::{Injector, Steal};
1209///
1210/// let q = Injector::new();
1211/// q.push(1);
1212/// q.push(2);
1213///
1214/// assert_eq!(q.steal(), Steal::Success(1));
1215/// assert_eq!(q.steal(), Steal::Success(2));
1216/// assert_eq!(q.steal(), Steal::Empty);
1217/// ```
1218pub struct Injector<T> {
1219    /// The head of the queue.
1220    head: CachePadded<Position<T>>,
1221
1222    /// The tail of the queue.
1223    tail: CachePadded<Position<T>>,
1224
1225    /// Indicates that dropping a `Injector<T>` may drop values of type `T`.
1226    _marker: PhantomData<T>,
1227}
1228
1229unsafe impl<T: Send> Send for Injector<T> {}
1230unsafe impl<T: Send> Sync for Injector<T> {}
1231
1232impl<T> Default for Injector<T> {
1233    fn default() -> Self {
1234        let block = Box::into_raw(Box::new(Block::<T>::new()));
1235        Self {
1236            head: CachePadded::new(Position {
1237                block: AtomicPtr::new(block),
1238                index: AtomicUsize::new(0),
1239            }),
1240            tail: CachePadded::new(Position {
1241                block: AtomicPtr::new(block),
1242                index: AtomicUsize::new(0),
1243            }),
1244            _marker: PhantomData,
1245        }
1246    }
1247}
1248
1249impl<T> Injector<T> {
1250    /// Creates a new injector queue.
1251    ///
1252    /// # Examples
1253    ///
1254    /// ```
1255    /// use crossbeam_deque::Injector;
1256    ///
1257    /// let q = Injector::<i32>::new();
1258    /// ```
1259    pub fn new() -> Injector<T> {
1260        Self::default()
1261    }
1262
1263    /// Pushes a task into the queue.
1264    ///
1265    /// # Examples
1266    ///
1267    /// ```
1268    /// use crossbeam_deque::Injector;
1269    ///
1270    /// let w = Injector::new();
1271    /// w.push(1);
1272    /// w.push(2);
1273    /// ```
1274    pub fn push(&self, task: T) {
1275        let backoff = Backoff::new();
1276        let mut tail = self.tail.index.load(Ordering::Acquire);
1277        let mut block = self.tail.block.load(Ordering::Acquire);
1278        let mut next_block = None;
1279
1280        loop {
1281            // Calculate the offset of the index into the block.
1282            let offset = (tail >> SHIFT) % LAP;
1283
1284            // If we reached the end of the block, wait until the next one is installed.
1285            if offset == BLOCK_CAP {
1286                backoff.snooze();
1287                tail = self.tail.index.load(Ordering::Acquire);
1288                block = self.tail.block.load(Ordering::Acquire);
1289                continue;
1290            }
1291
1292            // If we're going to have to install the next block, allocate it in advance in order to
1293            // make the wait for other threads as short as possible.
1294            if offset + 1 == BLOCK_CAP && next_block.is_none() {
1295                next_block = Some(Box::new(Block::<T>::new()));
1296            }
1297
1298            let new_tail = tail + (1 << SHIFT);
1299
1300            // Try advancing the tail forward.
1301            match self.tail.index.compare_exchange_weak(
1302                tail,
1303                new_tail,
1304                Ordering::SeqCst,
1305                Ordering::Acquire,
1306            ) {
1307                Ok(_) => unsafe {
1308                    // If we've reached the end of the block, install the next one.
1309                    if offset + 1 == BLOCK_CAP {
1310                        let next_block = Box::into_raw(next_block.unwrap());
1311                        let next_index = new_tail.wrapping_add(1 << SHIFT);
1312
1313                        self.tail.block.store(next_block, Ordering::Release);
1314                        self.tail.index.store(next_index, Ordering::Release);
1315                        (*block).next.store(next_block, Ordering::Release);
1316                    }
1317
1318                    // Write the task into the slot.
1319                    let slot = (*block).slots.get_unchecked(offset);
1320                    slot.task.get().write(MaybeUninit::new(task));
1321                    slot.state.fetch_or(WRITE, Ordering::Release);
1322
1323                    return;
1324                },
1325                Err(t) => {
1326                    tail = t;
1327                    block = self.tail.block.load(Ordering::Acquire);
1328                    backoff.spin();
1329                }
1330            }
1331        }
1332    }
1333
1334    /// Steals a task from the queue.
1335    ///
1336    /// # Examples
1337    ///
1338    /// ```
1339    /// use crossbeam_deque::{Injector, Steal};
1340    ///
1341    /// let q = Injector::new();
1342    /// q.push(1);
1343    /// q.push(2);
1344    ///
1345    /// assert_eq!(q.steal(), Steal::Success(1));
1346    /// assert_eq!(q.steal(), Steal::Success(2));
1347    /// assert_eq!(q.steal(), Steal::Empty);
1348    /// ```
1349    pub fn steal(&self) -> Steal<T> {
1350        let mut head;
1351        let mut block;
1352        let mut offset;
1353
1354        let backoff = Backoff::new();
1355        loop {
1356            head = self.head.index.load(Ordering::Acquire);
1357            block = self.head.block.load(Ordering::Acquire);
1358
1359            // Calculate the offset of the index into the block.
1360            offset = (head >> SHIFT) % LAP;
1361
1362            // If we reached the end of the block, wait until the next one is installed.
1363            if offset == BLOCK_CAP {
1364                backoff.snooze();
1365            } else {
1366                break;
1367            }
1368        }
1369
1370        let mut new_head = head + (1 << SHIFT);
1371
1372        if new_head & HAS_NEXT == 0 {
1373            atomic::fence(Ordering::SeqCst);
1374            let tail = self.tail.index.load(Ordering::Relaxed);
1375
1376            // If the tail equals the head, that means the queue is empty.
1377            if head >> SHIFT == tail >> SHIFT {
1378                return Steal::Empty;
1379            }
1380
1381            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1382            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1383                new_head |= HAS_NEXT;
1384            }
1385        }
1386
1387        // Try moving the head index forward.
1388        if self
1389            .head
1390            .index
1391            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1392            .is_err()
1393        {
1394            return Steal::Retry;
1395        }
1396
1397        unsafe {
1398            // If we've reached the end of the block, move to the next one.
1399            if offset + 1 == BLOCK_CAP {
1400                let next = (*block).wait_next();
1401                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1402                if !(*next).next.load(Ordering::Relaxed).is_null() {
1403                    next_index |= HAS_NEXT;
1404                }
1405
1406                self.head.block.store(next, Ordering::Release);
1407                self.head.index.store(next_index, Ordering::Release);
1408            }
1409
1410            // Read the task.
1411            let slot = (*block).slots.get_unchecked(offset);
1412            slot.wait_write();
1413            let task = slot.task.get().read().assume_init();
1414
1415            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1416            // but couldn't because we were busy reading from the slot.
1417            if (offset + 1 == BLOCK_CAP)
1418                || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
1419            {
1420                Block::destroy(block, offset);
1421            }
1422
1423            Steal::Success(task)
1424        }
1425    }
1426
1427    /// Steals a batch of tasks and pushes them into a worker.
1428    ///
1429    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1430    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1431    ///
1432    /// # Examples
1433    ///
1434    /// ```
1435    /// use crossbeam_deque::{Injector, Worker};
1436    ///
1437    /// let q = Injector::new();
1438    /// q.push(1);
1439    /// q.push(2);
1440    /// q.push(3);
1441    /// q.push(4);
1442    ///
1443    /// let w = Worker::new_fifo();
1444    /// let _ = q.steal_batch(&w);
1445    /// assert_eq!(w.pop(), Some(1));
1446    /// assert_eq!(w.pop(), Some(2));
1447    /// ```
1448    pub fn steal_batch(&self, dest: &Worker<T>) -> Steal<()> {
1449        let mut head;
1450        let mut block;
1451        let mut offset;
1452
1453        let backoff = Backoff::new();
1454        loop {
1455            head = self.head.index.load(Ordering::Acquire);
1456            block = self.head.block.load(Ordering::Acquire);
1457
1458            // Calculate the offset of the index into the block.
1459            offset = (head >> SHIFT) % LAP;
1460
1461            // If we reached the end of the block, wait until the next one is installed.
1462            if offset == BLOCK_CAP {
1463                backoff.snooze();
1464            } else {
1465                break;
1466            }
1467        }
1468
1469        let mut new_head = head;
1470        let advance;
1471
1472        if new_head & HAS_NEXT == 0 {
1473            atomic::fence(Ordering::SeqCst);
1474            let tail = self.tail.index.load(Ordering::Relaxed);
1475
1476            // If the tail equals the head, that means the queue is empty.
1477            if head >> SHIFT == tail >> SHIFT {
1478                return Steal::Empty;
1479            }
1480
1481            // If head and tail are not in the same block, set `HAS_NEXT` in head. Also, calculate
1482            // the right batch size to steal.
1483            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1484                new_head |= HAS_NEXT;
1485                // We can steal all tasks till the end of the block.
1486                advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1487            } else {
1488                let len = (tail - head) >> SHIFT;
1489                // Steal half of the available tasks.
1490                advance = ((len + 1) / 2).min(MAX_BATCH);
1491            }
1492        } else {
1493            // We can steal all tasks till the end of the block.
1494            advance = (BLOCK_CAP - offset).min(MAX_BATCH);
1495        }
1496
1497        new_head += advance << SHIFT;
1498        let new_offset = offset + advance;
1499
1500        // Try moving the head index forward.
1501        if self
1502            .head
1503            .index
1504            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1505            .is_err()
1506        {
1507            return Steal::Retry;
1508        }
1509
1510        // Reserve capacity for the stolen batch.
1511        let batch_size = new_offset - offset;
1512        dest.reserve(batch_size);
1513
1514        // Get the destination buffer and back index.
1515        let dest_buffer = dest.buffer.get();
1516        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1517
1518        unsafe {
1519            // If we've reached the end of the block, move to the next one.
1520            if new_offset == BLOCK_CAP {
1521                let next = (*block).wait_next();
1522                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1523                if !(*next).next.load(Ordering::Relaxed).is_null() {
1524                    next_index |= HAS_NEXT;
1525                }
1526
1527                self.head.block.store(next, Ordering::Release);
1528                self.head.index.store(next_index, Ordering::Release);
1529            }
1530
1531            // Copy values from the injector into the destination queue.
1532            match dest.flavor {
1533                Flavor::Fifo => {
1534                    for i in 0..batch_size {
1535                        // Read the task.
1536                        let slot = (*block).slots.get_unchecked(offset + i);
1537                        slot.wait_write();
1538                        let task = slot.task.get().read().assume_init();
1539
1540                        // Write it into the destination queue.
1541                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1542                    }
1543                }
1544
1545                Flavor::Lifo => {
1546                    for i in 0..batch_size {
1547                        // Read the task.
1548                        let slot = (*block).slots.get_unchecked(offset + i);
1549                        slot.wait_write();
1550                        let task = slot.task.get().read().assume_init();
1551
1552                        // Write it into the destination queue.
1553                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1554                    }
1555                }
1556            }
1557
1558            atomic::fence(Ordering::Release);
1559
1560            // Update the back index in the destination queue.
1561            //
1562            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1563            // data races because it doesn't understand fences.
1564            dest.inner
1565                .back
1566                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1567
1568            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1569            // but couldn't because we were busy reading from the slot.
1570            if new_offset == BLOCK_CAP {
1571                Block::destroy(block, offset);
1572            } else {
1573                for i in offset..new_offset {
1574                    let slot = (*block).slots.get_unchecked(i);
1575
1576                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1577                        Block::destroy(block, offset);
1578                        break;
1579                    }
1580                }
1581            }
1582
1583            Steal::Success(())
1584        }
1585    }
1586
1587    /// Steals a batch of tasks, pushes them into a worker, and pops a task from that worker.
1588    ///
1589    /// How many tasks exactly will be stolen is not specified. That said, this method will try to
1590    /// steal around half of the tasks in the queue, but also not more than some constant limit.
1591    ///
1592    /// # Examples
1593    ///
1594    /// ```
1595    /// use crossbeam_deque::{Injector, Steal, Worker};
1596    ///
1597    /// let q = Injector::new();
1598    /// q.push(1);
1599    /// q.push(2);
1600    /// q.push(3);
1601    /// q.push(4);
1602    ///
1603    /// let w = Worker::new_fifo();
1604    /// assert_eq!(q.steal_batch_and_pop(&w), Steal::Success(1));
1605    /// assert_eq!(w.pop(), Some(2));
1606    /// ```
1607    pub fn steal_batch_and_pop(&self, dest: &Worker<T>) -> Steal<T> {
1608        let mut head;
1609        let mut block;
1610        let mut offset;
1611
1612        let backoff = Backoff::new();
1613        loop {
1614            head = self.head.index.load(Ordering::Acquire);
1615            block = self.head.block.load(Ordering::Acquire);
1616
1617            // Calculate the offset of the index into the block.
1618            offset = (head >> SHIFT) % LAP;
1619
1620            // If we reached the end of the block, wait until the next one is installed.
1621            if offset == BLOCK_CAP {
1622                backoff.snooze();
1623            } else {
1624                break;
1625            }
1626        }
1627
1628        let mut new_head = head;
1629        let advance;
1630
1631        if new_head & HAS_NEXT == 0 {
1632            atomic::fence(Ordering::SeqCst);
1633            let tail = self.tail.index.load(Ordering::Relaxed);
1634
1635            // If the tail equals the head, that means the queue is empty.
1636            if head >> SHIFT == tail >> SHIFT {
1637                return Steal::Empty;
1638            }
1639
1640            // If head and tail are not in the same block, set `HAS_NEXT` in head.
1641            if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
1642                new_head |= HAS_NEXT;
1643                // We can steal all tasks till the end of the block.
1644                advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1645            } else {
1646                let len = (tail - head) >> SHIFT;
1647                // Steal half of the available tasks.
1648                advance = ((len + 1) / 2).min(MAX_BATCH + 1);
1649            }
1650        } else {
1651            // We can steal all tasks till the end of the block.
1652            advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1);
1653        }
1654
1655        new_head += advance << SHIFT;
1656        let new_offset = offset + advance;
1657
1658        // Try moving the head index forward.
1659        if self
1660            .head
1661            .index
1662            .compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Acquire)
1663            .is_err()
1664        {
1665            return Steal::Retry;
1666        }
1667
1668        // Reserve capacity for the stolen batch.
1669        let batch_size = new_offset - offset - 1;
1670        dest.reserve(batch_size);
1671
1672        // Get the destination buffer and back index.
1673        let dest_buffer = dest.buffer.get();
1674        let dest_b = dest.inner.back.load(Ordering::Relaxed);
1675
1676        unsafe {
1677            // If we've reached the end of the block, move to the next one.
1678            if new_offset == BLOCK_CAP {
1679                let next = (*block).wait_next();
1680                let mut next_index = (new_head & !HAS_NEXT).wrapping_add(1 << SHIFT);
1681                if !(*next).next.load(Ordering::Relaxed).is_null() {
1682                    next_index |= HAS_NEXT;
1683                }
1684
1685                self.head.block.store(next, Ordering::Release);
1686                self.head.index.store(next_index, Ordering::Release);
1687            }
1688
1689            // Read the task.
1690            let slot = (*block).slots.get_unchecked(offset);
1691            slot.wait_write();
1692            let task = slot.task.get().read().assume_init();
1693
1694            match dest.flavor {
1695                Flavor::Fifo => {
1696                    // Copy values from the injector into the destination queue.
1697                    for i in 0..batch_size {
1698                        // Read the task.
1699                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1700                        slot.wait_write();
1701                        let task = slot.task.get().read().assume_init();
1702
1703                        // Write it into the destination queue.
1704                        dest_buffer.write(dest_b.wrapping_add(i as isize), task);
1705                    }
1706                }
1707
1708                Flavor::Lifo => {
1709                    // Copy values from the injector into the destination queue.
1710                    for i in 0..batch_size {
1711                        // Read the task.
1712                        let slot = (*block).slots.get_unchecked(offset + i + 1);
1713                        slot.wait_write();
1714                        let task = slot.task.get().read().assume_init();
1715
1716                        // Write it into the destination queue.
1717                        dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
1718                    }
1719                }
1720            }
1721
1722            atomic::fence(Ordering::Release);
1723
1724            // Update the back index in the destination queue.
1725            //
1726            // This ordering could be `Relaxed`, but then thread sanitizer would falsely report
1727            // data races because it doesn't understand fences.
1728            dest.inner
1729                .back
1730                .store(dest_b.wrapping_add(batch_size as isize), Ordering::Release);
1731
1732            // Destroy the block if we've reached the end, or if another thread wanted to destroy
1733            // but couldn't because we were busy reading from the slot.
1734            if new_offset == BLOCK_CAP {
1735                Block::destroy(block, offset);
1736            } else {
1737                for i in offset..new_offset {
1738                    let slot = (*block).slots.get_unchecked(i);
1739
1740                    if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
1741                        Block::destroy(block, offset);
1742                        break;
1743                    }
1744                }
1745            }
1746
1747            Steal::Success(task)
1748        }
1749    }
1750
1751    /// Returns `true` if the queue is empty.
1752    ///
1753    /// # Examples
1754    ///
1755    /// ```
1756    /// use crossbeam_deque::Injector;
1757    ///
1758    /// let q = Injector::new();
1759    ///
1760    /// assert!(q.is_empty());
1761    /// q.push(1);
1762    /// assert!(!q.is_empty());
1763    /// ```
1764    pub fn is_empty(&self) -> bool {
1765        let head = self.head.index.load(Ordering::SeqCst);
1766        let tail = self.tail.index.load(Ordering::SeqCst);
1767        head >> SHIFT == tail >> SHIFT
1768    }
1769
1770    /// Returns the number of tasks in the queue.
1771    ///
1772    /// # Examples
1773    ///
1774    /// ```
1775    /// use crossbeam_deque::Injector;
1776    ///
1777    /// let q = Injector::new();
1778    ///
1779    /// assert_eq!(q.len(), 0);
1780    /// q.push(1);
1781    /// assert_eq!(q.len(), 1);
1782    /// q.push(1);
1783    /// assert_eq!(q.len(), 2);
1784    /// ```
1785    pub fn len(&self) -> usize {
1786        loop {
1787            // Load the tail index, then load the head index.
1788            let mut tail = self.tail.index.load(Ordering::SeqCst);
1789            let mut head = self.head.index.load(Ordering::SeqCst);
1790
1791            // If the tail index didn't change, we've got consistent indices to work with.
1792            if self.tail.index.load(Ordering::SeqCst) == tail {
1793                // Erase the lower bits.
1794                tail &= !((1 << SHIFT) - 1);
1795                head &= !((1 << SHIFT) - 1);
1796
1797                // Fix up indices if they fall onto block ends.
1798                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
1799                    tail = tail.wrapping_add(1 << SHIFT);
1800                }
1801                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
1802                    head = head.wrapping_add(1 << SHIFT);
1803                }
1804
1805                // Rotate indices so that head falls into the first block.
1806                let lap = (head >> SHIFT) / LAP;
1807                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
1808                head = head.wrapping_sub((lap * LAP) << SHIFT);
1809
1810                // Remove the lower bits.
1811                tail >>= SHIFT;
1812                head >>= SHIFT;
1813
1814                // Return the difference minus the number of blocks between tail and head.
1815                return tail - head - tail / LAP;
1816            }
1817        }
1818    }
1819}
1820
1821impl<T> Drop for Injector<T> {
1822    fn drop(&mut self) {
1823        let mut head = self.head.index.load(Ordering::Relaxed);
1824        let mut tail = self.tail.index.load(Ordering::Relaxed);
1825        let mut block = self.head.block.load(Ordering::Relaxed);
1826
1827        // Erase the lower bits.
1828        head &= !((1 << SHIFT) - 1);
1829        tail &= !((1 << SHIFT) - 1);
1830
1831        unsafe {
1832            // Drop all values between `head` and `tail` and deallocate the heap-allocated blocks.
1833            while head != tail {
1834                let offset = (head >> SHIFT) % LAP;
1835
1836                if offset < BLOCK_CAP {
1837                    // Drop the task in the slot.
1838                    let slot = (*block).slots.get_unchecked(offset);
1839                    let p = &mut *slot.task.get();
1840                    p.as_mut_ptr().drop_in_place();
1841                } else {
1842                    // Deallocate the block and move to the next one.
1843                    let next = (*block).next.load(Ordering::Relaxed);
1844                    drop(Box::from_raw(block));
1845                    block = next;
1846                }
1847
1848                head = head.wrapping_add(1 << SHIFT);
1849            }
1850
1851            // Deallocate the last remaining block.
1852            drop(Box::from_raw(block));
1853        }
1854    }
1855}
1856
1857impl<T> fmt::Debug for Injector<T> {
1858    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1859        f.pad("Worker { .. }")
1860    }
1861}
1862
1863/// Possible outcomes of a steal operation.
1864///
1865/// # Examples
1866///
1867/// There are lots of ways to chain results of steal operations together:
1868///
1869/// ```
1870/// use crossbeam_deque::Steal::{self, Empty, Retry, Success};
1871///
1872/// let collect = |v: Vec<Steal<i32>>| v.into_iter().collect::<Steal<i32>>();
1873///
1874/// assert_eq!(collect(vec![Empty, Empty, Empty]), Empty);
1875/// assert_eq!(collect(vec![Empty, Retry, Empty]), Retry);
1876/// assert_eq!(collect(vec![Retry, Success(1), Empty]), Success(1));
1877///
1878/// assert_eq!(collect(vec![Empty, Empty]).or_else(|| Retry), Retry);
1879/// assert_eq!(collect(vec![Retry, Empty]).or_else(|| Success(1)), Success(1));
1880/// ```
1881#[must_use]
1882#[derive(PartialEq, Eq, Copy, Clone)]
1883pub enum Steal<T> {
1884    /// The queue was empty at the time of stealing.
1885    Empty,
1886
1887    /// At least one task was successfully stolen.
1888    Success(T),
1889
1890    /// The steal operation needs to be retried.
1891    Retry,
1892}
1893
1894impl<T> Steal<T> {
1895    /// Returns `true` if the queue was empty at the time of stealing.
1896    ///
1897    /// # Examples
1898    ///
1899    /// ```
1900    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1901    ///
1902    /// assert!(!Success(7).is_empty());
1903    /// assert!(!Retry::<i32>.is_empty());
1904    ///
1905    /// assert!(Empty::<i32>.is_empty());
1906    /// ```
1907    pub fn is_empty(&self) -> bool {
1908        match self {
1909            Steal::Empty => true,
1910            _ => false,
1911        }
1912    }
1913
1914    /// Returns `true` if at least one task was stolen.
1915    ///
1916    /// # Examples
1917    ///
1918    /// ```
1919    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1920    ///
1921    /// assert!(!Empty::<i32>.is_success());
1922    /// assert!(!Retry::<i32>.is_success());
1923    ///
1924    /// assert!(Success(7).is_success());
1925    /// ```
1926    pub fn is_success(&self) -> bool {
1927        match self {
1928            Steal::Success(_) => true,
1929            _ => false,
1930        }
1931    }
1932
1933    /// Returns `true` if the steal operation needs to be retried.
1934    ///
1935    /// # Examples
1936    ///
1937    /// ```
1938    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1939    ///
1940    /// assert!(!Empty::<i32>.is_retry());
1941    /// assert!(!Success(7).is_retry());
1942    ///
1943    /// assert!(Retry::<i32>.is_retry());
1944    /// ```
1945    pub fn is_retry(&self) -> bool {
1946        match self {
1947            Steal::Retry => true,
1948            _ => false,
1949        }
1950    }
1951
1952    /// Returns the result of the operation, if successful.
1953    ///
1954    /// # Examples
1955    ///
1956    /// ```
1957    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1958    ///
1959    /// assert_eq!(Empty::<i32>.success(), None);
1960    /// assert_eq!(Retry::<i32>.success(), None);
1961    ///
1962    /// assert_eq!(Success(7).success(), Some(7));
1963    /// ```
1964    pub fn success(self) -> Option<T> {
1965        match self {
1966            Steal::Success(res) => Some(res),
1967            _ => None,
1968        }
1969    }
1970
1971    /// If no task was stolen, attempts another steal operation.
1972    ///
1973    /// Returns this steal result if it is `Success`. Otherwise, closure `f` is invoked and then:
1974    ///
1975    /// * If the second steal resulted in `Success`, it is returned.
1976    /// * If both steals were unsuccessful but any resulted in `Retry`, then `Retry` is returned.
1977    /// * If both resulted in `None`, then `None` is returned.
1978    ///
1979    /// # Examples
1980    ///
1981    /// ```
1982    /// use crossbeam_deque::Steal::{Empty, Retry, Success};
1983    ///
1984    /// assert_eq!(Success(1).or_else(|| Success(2)), Success(1));
1985    /// assert_eq!(Retry.or_else(|| Success(2)), Success(2));
1986    ///
1987    /// assert_eq!(Retry.or_else(|| Empty), Retry::<i32>);
1988    /// assert_eq!(Empty.or_else(|| Retry), Retry::<i32>);
1989    ///
1990    /// assert_eq!(Empty.or_else(|| Empty), Empty::<i32>);
1991    /// ```
1992    pub fn or_else<F>(self, f: F) -> Steal<T>
1993    where
1994        F: FnOnce() -> Steal<T>,
1995    {
1996        match self {
1997            Steal::Empty => f(),
1998            Steal::Success(_) => self,
1999            Steal::Retry => {
2000                if let Steal::Success(res) = f() {
2001                    Steal::Success(res)
2002                } else {
2003                    Steal::Retry
2004                }
2005            }
2006        }
2007    }
2008}
2009
2010impl<T> fmt::Debug for Steal<T> {
2011    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2012        match self {
2013            Steal::Empty => f.pad("Empty"),
2014            Steal::Success(_) => f.pad("Success(..)"),
2015            Steal::Retry => f.pad("Retry"),
2016        }
2017    }
2018}
2019
2020impl<T> FromIterator<Steal<T>> for Steal<T> {
2021    /// Consumes items until a `Success` is found and returns it.
2022    ///
2023    /// If no `Success` was found, but there was at least one `Retry`, then returns `Retry`.
2024    /// Otherwise, `Empty` is returned.
2025    fn from_iter<I>(iter: I) -> Steal<T>
2026    where
2027        I: IntoIterator<Item = Steal<T>>,
2028    {
2029        let mut retry = false;
2030        for s in iter {
2031            match &s {
2032                Steal::Empty => {}
2033                Steal::Success(_) => return s,
2034                Steal::Retry => retry = true,
2035            }
2036        }
2037
2038        if retry {
2039            Steal::Retry
2040        } else {
2041            Steal::Empty
2042        }
2043    }
2044}