#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
pub mod buffers;
pub use buffers::*;
mod config;
mod loom_exports;
use std::fmt;
use std::iter::FusedIterator;
use std::marker::PhantomData;
use std::mem::{drop, MaybeUninit};
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use std::sync::Arc;
use cache_padded::CachePadded;
use config::{AtomicUnsignedLong, AtomicUnsignedShort, UnsignedLong, UnsignedShort};
use loom_exports::cell::UnsafeCell;
#[derive(Debug)]
struct Queue<T, B: Buffer<T>> {
push_count: CachePadded<AtomicUnsignedShort>,
pop_count_and_head: CachePadded<AtomicUnsignedLong>,
head: CachePadded<AtomicUnsignedShort>,
buffer: Box<B::Data>,
_phantom: PhantomData<UnsafeCell<T>>,
}
impl<T, B: Buffer<T>> Queue<T, B> {
#[inline]
unsafe fn read_at(&self, position: UnsignedShort) -> T {
let index = (position & B::MASK) as usize;
(*self.buffer).as_ref()[index].with(|slot| slot.read().assume_init())
}
#[inline]
unsafe fn write_at(&self, position: UnsignedShort, item: T) {
let index = (position & B::MASK) as usize;
(*self.buffer).as_ref()[index].with_mut(|slot| slot.write(MaybeUninit::new(item)));
}
fn book_items<C>(
&self,
mut count_fn: C,
max_count: UnsignedShort,
) -> Result<(UnsignedShort, UnsignedShort, UnsignedShort), StealError>
where
C: FnMut(usize) -> usize,
{
let mut pop_count_and_head = self.pop_count_and_head.load(Acquire);
let old_head = self.head.load(Acquire);
loop {
let (pop_count, head) = unpack(pop_count_and_head);
if old_head != head {
return Err(StealError::Busy);
}
let push_count = self.push_count.load(Acquire);
let tail = push_count.wrapping_sub(pop_count);
let item_count = tail.wrapping_sub(head);
if item_count == 0 {
return Err(StealError::Empty);
}
let count = (count_fn(item_count as usize).min(max_count as usize) as UnsignedShort)
.min(item_count);
if count == 0 {
return Err(StealError::Empty);
}
let new_head = head.wrapping_add(count);
let new_pop_count_and_head = pack(pop_count, new_head);
match self.pop_count_and_head.compare_exchange_weak(
pop_count_and_head,
new_pop_count_and_head,
Acquire,
Acquire,
) {
Ok(_) => return Ok((head, new_head, count)),
Err(current) => pop_count_and_head = current,
}
}
}
}
impl<T, B: Buffer<T>> Drop for Queue<T, B> {
fn drop(&mut self) {
let head = self.head.load(Relaxed);
let push_count = self.push_count.load(Relaxed);
let pop_count = unpack(self.pop_count_and_head.load(Relaxed)).0;
let tail = push_count.wrapping_sub(pop_count);
let count = tail.wrapping_sub(head);
for offset in 0..count {
drop(unsafe { self.read_at(head.wrapping_add(offset)) });
}
}
}
#[derive(Debug)]
pub struct Worker<T, B: Buffer<T>> {
queue: Arc<Queue<T, B>>,
}
impl<T, B: Buffer<T>> Worker<T, B> {
pub fn new() -> Self {
let queue = Arc::new(Queue {
push_count: CachePadded::new(AtomicUnsignedShort::new(0)),
pop_count_and_head: CachePadded::new(AtomicUnsignedLong::new(0)),
head: CachePadded::new(AtomicUnsignedShort::new(0)),
buffer: B::allocate(),
_phantom: PhantomData,
});
Worker { queue }
}
pub fn stealer(&self) -> Stealer<T, B> {
Stealer {
queue: self.queue.clone(),
}
}
pub fn spare_capacity(&self) -> usize {
let capacity = <B as Buffer<T>>::CAPACITY;
let push_count = self.queue.push_count.load(Relaxed);
let pop_count = unpack(self.queue.pop_count_and_head.load(Relaxed)).0;
let tail = push_count.wrapping_sub(pop_count);
let head = self.queue.head.load(Relaxed);
let len = tail.wrapping_sub(head).min(capacity);
(capacity - len) as usize
}
pub fn is_empty(&self) -> bool {
let push_count = self.queue.push_count.load(Relaxed);
let (pop_count, head) = unpack(self.queue.pop_count_and_head.load(Relaxed));
let tail = push_count.wrapping_sub(pop_count);
tail == head
}
pub fn push(&self, item: T) -> Result<(), T> {
let push_count = self.queue.push_count.load(Relaxed);
let pop_count = unpack(self.queue.pop_count_and_head.load(Relaxed)).0;
let tail = push_count.wrapping_sub(pop_count);
let head = self.queue.head.load(Acquire);
if tail.wrapping_sub(head) >= B::CAPACITY {
return Err(item);
}
unsafe { self.queue.write_at(tail, item) };
self.queue
.push_count
.store(push_count.wrapping_add(1), Release);
Ok(())
}
pub fn extend<I: IntoIterator<Item = T>>(&self, iter: I) {
let push_count = self.queue.push_count.load(Relaxed);
let pop_count = unpack(self.queue.pop_count_and_head.load(Relaxed)).0;
let mut tail = push_count.wrapping_sub(pop_count);
let head = self.queue.head.load(Acquire);
let max_tail = head.wrapping_add(B::CAPACITY);
for item in iter {
if tail == max_tail {
break;
}
unsafe { self.queue.write_at(tail, item) };
tail = tail.wrapping_add(1);
}
self.queue
.push_count
.store(tail.wrapping_add(pop_count), Release);
}
pub fn pop(&self) -> Option<T> {
let mut pop_count_and_head = self.queue.pop_count_and_head.load(Relaxed);
let push_count = self.queue.push_count.load(Relaxed);
let (pop_count, mut head) = unpack(pop_count_and_head);
let tail = push_count.wrapping_sub(pop_count);
let new_pop_count = pop_count.wrapping_add(1);
loop {
if tail == head {
return None;
}
let new_pop_count_and_head = pack(new_pop_count, head);
match self.queue.pop_count_and_head.compare_exchange_weak(
pop_count_and_head,
new_pop_count_and_head,
Release,
Relaxed,
) {
Ok(_) => break,
Err(current) => {
pop_count_and_head = current;
head = unpack(current).1;
}
}
}
unsafe { Some(self.queue.read_at(tail.wrapping_sub(1))) }
}
pub fn drain<C>(&self, count_fn: C) -> Result<Drain<'_, T, B>, StealError>
where
C: FnMut(usize) -> usize,
{
let (old_head, new_head, _) = self.queue.book_items(count_fn, UnsignedShort::MAX)?;
Ok(Drain {
queue: &self.queue,
current: old_head,
end: new_head,
})
}
}
impl<T, B: Buffer<T>> Default for Worker<T, B> {
fn default() -> Self {
Self::new()
}
}
impl<T, B: Buffer<T>> UnwindSafe for Worker<T, B> {}
impl<T, B: Buffer<T>> RefUnwindSafe for Worker<T, B> {}
unsafe impl<T: Send, B: Buffer<T>> Send for Worker<T, B> {}
#[derive(Debug)]
pub struct Drain<'a, T, B: Buffer<T>> {
queue: &'a Queue<T, B>,
current: UnsignedShort,
end: UnsignedShort,
}
impl<'a, T, B: Buffer<T>> Iterator for Drain<'a, T, B> {
type Item = T;
fn next(&mut self) -> Option<T> {
if self.current == self.end {
return None;
}
let item = Some(unsafe { self.queue.read_at(self.current) });
self.current = self.current.wrapping_add(1);
if self.current == self.end {
self.queue.head.store(self.end, Release);
}
item
}
fn size_hint(&self) -> (usize, Option<usize>) {
let sz = self.end.wrapping_sub(self.current) as usize;
(sz, Some(sz))
}
}
impl<'a, T, B: Buffer<T>> ExactSizeIterator for Drain<'a, T, B> {}
impl<'a, T, B: Buffer<T>> FusedIterator for Drain<'a, T, B> {}
impl<'a, T, B: Buffer<T>> Drop for Drain<'a, T, B> {
fn drop(&mut self) {
for _item in self {}
}
}
impl<'a, T, B: Buffer<T>> UnwindSafe for Drain<'a, T, B> {}
impl<'a, T, B: Buffer<T>> RefUnwindSafe for Drain<'a, T, B> {}
unsafe impl<'a, T: Send, B: Buffer<T>> Send for Drain<'a, T, B> {}
unsafe impl<'a, T: Send, B: Buffer<T>> Sync for Drain<'a, T, B> {}
#[derive(Debug)]
pub struct Stealer<T, B: Buffer<T>> {
queue: Arc<Queue<T, B>>,
}
impl<T, B: Buffer<T>> Stealer<T, B> {
pub fn steal<C, BDest>(&self, dest: &Worker<T, BDest>, count_fn: C) -> Result<usize, StealError>
where
C: FnMut(usize) -> usize,
BDest: Buffer<T>,
{
let dest_push_count = dest.queue.push_count.load(Relaxed);
let dest_pop_count = unpack(dest.queue.pop_count_and_head.load(Relaxed)).0;
let dest_tail = dest_push_count.wrapping_sub(dest_pop_count);
let dest_head = dest.queue.head.load(Acquire);
let dest_free_capacity =
BDest::CAPACITY - dest_tail.wrapping_sub(dest_head).min(BDest::CAPACITY);
let (old_head, new_head, transfer_count) =
self.queue.book_items(count_fn, dest_free_capacity)?;
for offset in 0..transfer_count {
unsafe {
let item = self.queue.read_at(old_head.wrapping_add(offset));
dest.queue.write_at(dest_tail.wrapping_add(offset), item);
}
}
dest.queue
.push_count
.store(dest_push_count.wrapping_add(transfer_count), Release);
self.queue.head.store(new_head, Release);
Ok(transfer_count as usize)
}
pub fn steal_and_pop<C, BDest>(
&self,
dest: &Worker<T, BDest>,
count_fn: C,
) -> Result<(T, usize), StealError>
where
C: FnMut(usize) -> usize,
BDest: Buffer<T>,
{
let dest_push_count = dest.queue.push_count.load(Relaxed);
let dest_pop_count = unpack(dest.queue.pop_count_and_head.load(Relaxed)).0;
let dest_tail = dest_push_count.wrapping_sub(dest_pop_count);
let dest_head = dest.queue.head.load(Acquire);
let dest_free_capacity = BDest::CAPACITY - dest_tail.wrapping_sub(dest_head);
let (old_head, new_head, count) =
self.queue.book_items(count_fn, dest_free_capacity + 1)?;
let transfer_count = count - 1;
debug_assert!(transfer_count <= dest_free_capacity);
for offset in 0..transfer_count {
unsafe {
let item = self.queue.read_at(old_head.wrapping_add(offset));
dest.queue.write_at(dest_tail.wrapping_add(offset), item);
}
}
let last_item = unsafe { self.queue.read_at(old_head.wrapping_add(transfer_count)) };
dest.queue
.push_count
.store(dest_push_count.wrapping_add(transfer_count), Release);
self.queue.head.store(new_head, Release);
Ok((last_item, transfer_count as usize))
}
}
impl<T, B: Buffer<T>> Clone for Stealer<T, B> {
fn clone(&self) -> Self {
Stealer {
queue: self.queue.clone(),
}
}
}
impl<T, B: Buffer<T>> UnwindSafe for Stealer<T, B> {}
impl<T, B: Buffer<T>> RefUnwindSafe for Stealer<T, B> {}
unsafe impl<T: Send, B: Buffer<T>> Send for Stealer<T, B> {}
unsafe impl<T: Send, B: Buffer<T>> Sync for Stealer<T, B> {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StealError {
Empty,
Busy,
}
impl fmt::Display for StealError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
StealError::Empty => write!(f, "cannot steal from empty queue"),
StealError::Busy => write!(f, "a concurrent steal operation is ongoing"),
}
}
}
#[inline]
fn pack(value1: UnsignedShort, value2: UnsignedShort) -> UnsignedLong {
((value1 as UnsignedLong) << UnsignedShort::BITS) | value2 as UnsignedLong
}
#[inline]
fn unpack(value: UnsignedLong) -> (UnsignedShort, UnsignedShort) {
(
(value >> UnsignedShort::BITS) as UnsignedShort,
value as UnsignedShort,
)
}