#![warn(missing_docs)]
#![warn(missing_debug_implementations)]
extern crate crossbeam_epoch as epoch;
extern crate crossbeam_utils as utils;
use std::cell::Cell;
use std::cmp;
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::atomic::{self, AtomicIsize, Ordering};
use std::sync::Arc;
use epoch::{Atomic, Owned};
use utils::CachePadded;
const MIN_CAP: usize = 32;
const MAX_BATCH: usize = 128;
const FLUSH_THRESHOLD_BYTES: usize = 1 << 10;
pub fn fifo<T>() -> (Worker<T>, Stealer<T>) {
let buffer = Buffer::alloc(MIN_CAP);
let inner = Arc::new(CachePadded::new(Inner {
front: AtomicIsize::new(0),
back: AtomicIsize::new(0),
buffer: Atomic::new(buffer),
}));
let w = Worker {
inner: inner.clone(),
cached_buffer: Cell::new(buffer),
flavor: Flavor::Fifo,
_marker: PhantomData,
};
let s = Stealer {
inner,
flavor: Flavor::Fifo,
};
(w, s)
}
pub fn lifo<T>() -> (Worker<T>, Stealer<T>) {
let buffer = Buffer::alloc(MIN_CAP);
let inner = Arc::new(CachePadded::new(Inner {
front: AtomicIsize::new(0),
back: AtomicIsize::new(0),
buffer: Atomic::new(buffer),
}));
let w = Worker {
inner: inner.clone(),
cached_buffer: Cell::new(buffer),
flavor: Flavor::Lifo,
_marker: PhantomData,
};
let s = Stealer {
inner,
flavor: Flavor::Lifo,
};
(w, s)
}
struct Buffer<T> {
ptr: *mut T,
cap: usize,
}
unsafe impl<T> Send for Buffer<T> {}
impl<T> Buffer<T> {
fn alloc(cap: usize) -> Self {
debug_assert_eq!(cap, cap.next_power_of_two());
let mut v = Vec::with_capacity(cap);
let ptr = v.as_mut_ptr();
mem::forget(v);
Buffer { ptr, cap }
}
unsafe fn dealloc(self) {
drop(Vec::from_raw_parts(self.ptr, 0, self.cap));
}
unsafe fn at(&self, index: isize) -> *mut T {
self.ptr.offset(index & (self.cap - 1) as isize)
}
unsafe fn write(&self, index: isize, value: T) {
ptr::write_volatile(self.at(index), value)
}
unsafe fn read(&self, index: isize) -> T {
ptr::read_volatile(self.at(index))
}
}
impl<T> Clone for Buffer<T> {
fn clone(&self) -> Buffer<T> {
Buffer {
ptr: self.ptr,
cap: self.cap,
}
}
}
impl<T> Copy for Buffer<T> {}
#[must_use]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
pub enum Pop<T> {
Empty,
Data(T),
Retry,
}
#[must_use]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
pub enum Steal<T> {
Empty,
Data(T),
Retry,
}
struct Inner<T> {
front: AtomicIsize,
back: AtomicIsize,
buffer: Atomic<Buffer<T>>,
}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
let b = self.back.load(Ordering::Relaxed);
let f = self.front.load(Ordering::Relaxed);
unsafe {
let buffer = self.buffer.load(Ordering::Relaxed, epoch::unprotected());
let mut i = f;
while i != b {
ptr::drop_in_place(buffer.deref().at(i));
i = i.wrapping_add(1);
}
buffer.into_owned().into_box().dealloc();
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum Flavor {
Fifo,
Lifo,
}
pub struct Worker<T> {
inner: Arc<CachePadded<Inner<T>>>,
cached_buffer: Cell<Buffer<T>>,
flavor: Flavor,
_marker: PhantomData<*mut ()>, }
unsafe impl<T: Send> Send for Worker<T> {}
impl<T> Worker<T> {
#[cold]
unsafe fn resize(&self, new_cap: usize) {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::Relaxed);
let buffer = self.cached_buffer.get();
let new = Buffer::alloc(new_cap);
self.cached_buffer.set(new);
let mut i = f;
while i != b {
ptr::copy_nonoverlapping(buffer.at(i), new.at(i), 1);
i = i.wrapping_add(1);
}
let guard = &epoch::pin();
let old =
self.inner
.buffer
.swap(Owned::new(new).into_shared(guard), Ordering::Release, guard);
guard.defer_unchecked(move || old.into_owned().into_box().dealloc());
if mem::size_of::<T>() * new_cap >= FLUSH_THRESHOLD_BYTES {
guard.flush();
}
}
fn reserve(&self, reserve_cap: usize) {
if reserve_cap > 0 {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::SeqCst);
let len = b.wrapping_sub(f) as usize;
let cap = self.cached_buffer.get().cap;
if cap - len < reserve_cap {
let mut new_cap = cap * 2;
while new_cap - len < reserve_cap {
new_cap *= 2;
}
unsafe {
self.resize(new_cap);
}
}
}
}
pub fn is_empty(&self) -> bool {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::SeqCst);
b.wrapping_sub(f) <= 0
}
pub fn push(&self, value: T) {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::Acquire);
let mut buffer = self.cached_buffer.get();
let len = b.wrapping_sub(f);
if len >= buffer.cap as isize {
unsafe {
self.resize(2 * buffer.cap);
}
buffer = self.cached_buffer.get();
}
unsafe {
buffer.write(b, value);
}
atomic::fence(Ordering::Release);
self.inner.back.store(b.wrapping_add(1), Ordering::Release);
}
pub fn pop(&self) -> Pop<T> {
let b = self.inner.back.load(Ordering::Relaxed);
let f = self.inner.front.load(Ordering::Relaxed);
let len = b.wrapping_sub(f);
if len <= 0 {
return Pop::Empty;
}
match self.flavor {
Flavor::Fifo => {
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
unsafe {
let buffer = self.cached_buffer.get();
let data = buffer.read(f);
if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
self.resize(buffer.cap / 2);
}
return Pop::Data(data);
}
}
Pop::Retry
}
Flavor::Lifo => {
let b = b.wrapping_sub(1);
self.inner.back.store(b, Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
let f = self.inner.front.load(Ordering::Relaxed);
let len = b.wrapping_sub(f);
if len < 0 {
self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
Pop::Empty
} else {
let buffer = self.cached_buffer.get();
let mut value = unsafe { Some(buffer.read(b)) };
if len == 0 {
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
).is_err()
{
mem::forget(value.take());
}
self.inner.back.store(b.wrapping_add(1), Ordering::Relaxed);
} else {
if buffer.cap > MIN_CAP && len < buffer.cap as isize / 4 {
unsafe {
self.resize(buffer.cap / 2);
}
}
}
match value {
None => Pop::Empty,
Some(data) => Pop::Data(data),
}
}
}
}
}
}
impl<T> fmt::Debug for Worker<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Worker { .. }")
}
}
pub struct Stealer<T> {
inner: Arc<CachePadded<Inner<T>>>,
flavor: Flavor,
}
unsafe impl<T: Send> Send for Stealer<T> {}
unsafe impl<T: Send> Sync for Stealer<T> {}
impl<T> Stealer<T> {
pub fn is_empty(&self) -> bool {
let f = self.inner.front.load(Ordering::Acquire);
atomic::fence(Ordering::SeqCst);
let b = self.inner.back.load(Ordering::Acquire);
b.wrapping_sub(f) <= 0
}
pub fn steal(&self) -> Steal<T> {
let f = self.inner.front.load(Ordering::Acquire);
if epoch::is_pinned() {
atomic::fence(Ordering::SeqCst);
}
let guard = &epoch::pin();
let b = self.inner.back.load(Ordering::Acquire);
if b.wrapping_sub(f) <= 0 {
return Steal::Empty;
}
let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
let value = unsafe { buffer.deref().read(f) };
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
mem::forget(value);
return Steal::Retry;
}
Steal::Data(value)
}
pub fn steal_many(&self, dest: &Worker<T>) -> Steal<T> {
let mut f = self.inner.front.load(Ordering::Acquire);
if epoch::is_pinned() {
atomic::fence(Ordering::SeqCst);
}
let guard = &epoch::pin();
let b = self.inner.back.load(Ordering::Acquire);
let len = b.wrapping_sub(f);
if len <= 0 {
return Steal::Empty;
}
let additional = cmp::min((len as usize - 1) / 2, MAX_BATCH);
dest.reserve(additional);
let additional = additional as isize;
let dest_buffer = dest.cached_buffer.get();
let mut dest_b = dest.inner.back.load(Ordering::Relaxed);
let buffer = self.inner.buffer.load(Ordering::Acquire, guard);
let value = unsafe { buffer.deref().read(f) };
match self.flavor {
Flavor::Fifo => {
for i in 0..additional {
unsafe {
let value = buffer.deref().read(f.wrapping_add(i + 1));
dest_buffer.write(dest_b.wrapping_add(i), value);
}
}
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(additional + 1),
Ordering::SeqCst,
Ordering::Relaxed,
).is_err()
{
mem::forget(value);
return Steal::Retry;
}
atomic::fence(Ordering::Release);
dest.inner
.back
.store(dest_b.wrapping_add(additional), Ordering::Release);
Steal::Data(value)
}
Flavor::Lifo => {
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
mem::forget(value);
return Steal::Retry;
}
f = f.wrapping_add(1);
for _ in 0..additional {
atomic::fence(Ordering::SeqCst);
let b = self.inner.back.load(Ordering::Acquire);
if b.wrapping_sub(f) <= 0 {
break;
}
let value = unsafe { buffer.deref().read(f) };
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
mem::forget(value);
break;
}
unsafe {
dest_buffer.write(dest_b, value);
}
f = f.wrapping_add(1);
dest_b = dest_b.wrapping_add(1);
atomic::fence(Ordering::Release);
dest.inner.back.store(dest_b, Ordering::Release);
}
Steal::Data(value)
}
}
}
}
impl<T> Clone for Stealer<T> {
fn clone(&self) -> Stealer<T> {
Stealer {
inner: self.inner.clone(),
flavor: self.flavor,
}
}
}
impl<T> fmt::Debug for Stealer<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Stealer { .. }")
}
}