use core::cell::{Cell, UnsafeCell};
use core::mem;
use core::num::Wrapping;
use core::ptr;
use core::sync::atomic;
use core::sync::atomic::Ordering;
use alloc::boxed::Box;
use alloc::arc::Arc;
use crossbeam_utils::cache_padded::CachePadded;
use nodrop::NoDrop;
use atomic::Owned;
use epoch::{AtomicEpoch, Epoch};
use guard::{unprotected, Guard};
use garbage::{Bag, Garbage};
use sync::list::{List, Entry, IterError, IsElement};
use sync::queue::Queue;
const COLLECT_STEPS: usize = 8;
const PINNINGS_BETWEEN_COLLECT: usize = 128;
pub struct Global {
locals: List<Local>,
queue: Queue<(Epoch, Bag)>,
epoch: CachePadded<AtomicEpoch>,
}
impl Global {
#[inline]
pub fn new() -> Global {
Global {
locals: List::new(),
queue: Queue::new(),
epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
}
}
pub fn load_epoch(&self, ordering: Ordering) -> Epoch {
self.epoch.load(ordering)
}
pub fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
let bag = mem::replace(bag, Bag::new());
atomic::fence(Ordering::SeqCst);
let epoch = self.epoch.load(Ordering::Relaxed);
self.queue.push((epoch, bag), guard);
}
#[cold]
pub fn collect(&self, guard: &Guard) {
let global_epoch = self.try_advance(guard);
let condition = |item: &(Epoch, Bag)| {
global_epoch.wrapping_sub(item.0) >= 2
};
let steps = if cfg!(feature = "sanitize") {
usize::max_value()
} else {
COLLECT_STEPS
};
for _ in 0..steps {
match self.queue.try_pop_if(&condition, guard) {
None => break,
Some(bag) => drop(bag),
}
}
}
#[cold]
pub fn try_advance(&self, guard: &Guard) -> Epoch {
let global_epoch = self.epoch.load(Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
for local in self.locals.iter(&guard) {
match local {
Err(IterError::Stalled) => {
return global_epoch;
}
Ok(local) => {
let local_epoch = local.epoch.load(Ordering::Relaxed);
if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
return global_epoch;
}
}
}
}
atomic::fence(Ordering::Acquire);
let new_epoch = global_epoch.successor();
self.epoch.store(new_epoch, Ordering::Release);
new_epoch
}
}
pub struct Local {
entry: Entry,
epoch: AtomicEpoch,
global: UnsafeCell<NoDrop<Arc<Global>>>,
bag: UnsafeCell<Bag>,
guard_count: Cell<usize>,
handle_count: Cell<usize>,
pin_count: Cell<Wrapping<usize>>,
}
unsafe impl Sync for Local {}
impl Local {
pub fn register(global: &Arc<Global>) -> *const Local {
unsafe {
let local = Owned::new(Local {
entry: Entry::default(),
epoch: AtomicEpoch::new(Epoch::starting()),
global: UnsafeCell::new(NoDrop::new(global.clone())),
bag: UnsafeCell::new(Bag::new()),
guard_count: Cell::new(0),
handle_count: Cell::new(1),
pin_count: Cell::new(Wrapping(0)),
}).into_shared(&unprotected());
global.locals.insert(local, &unprotected());
local.as_raw()
}
}
#[inline]
pub fn is_bag_empty(&self) -> bool {
unsafe { (*self.bag.get()).is_empty() }
}
#[inline]
pub fn global(&self) -> &Global {
unsafe { &*self.global.get() }
}
#[inline]
pub fn is_pinned(&self) -> bool {
self.guard_count.get() > 0
}
pub fn defer(&self, mut garbage: Garbage, guard: &Guard) {
let bag = unsafe { &mut *self.bag.get() };
while let Err(g) = bag.try_push(garbage) {
self.global().push_bag(bag, guard);
garbage = g;
}
}
pub fn flush(&self, guard: &Guard) {
let bag = unsafe { &mut *self.bag.get() };
if !bag.is_empty() {
self.global().push_bag(bag, guard);
}
self.global().collect(guard);
}
#[inline]
pub fn pin(&self) -> Guard {
let guard = unsafe { Guard::new(self) };
let guard_count = self.guard_count.get();
self.guard_count.set(guard_count.checked_add(1).unwrap());
if guard_count == 0 {
let global_epoch = self.global().epoch.load(Ordering::Relaxed);
let new_epoch = global_epoch.pinned();
if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
let current = Epoch::starting();
let previous = self.epoch.compare_and_swap(current, new_epoch, Ordering::SeqCst);
debug_assert_eq!(current, previous, "participant was expected to be unpinned");
} else {
self.epoch.store(new_epoch, Ordering::Relaxed);
atomic::fence(Ordering::SeqCst);
}
let count = self.pin_count.get();
self.pin_count.set(count + Wrapping(1));
if count.0 % PINNINGS_BETWEEN_COLLECT == 0 {
self.global().collect(&guard);
}
}
guard
}
#[inline]
pub fn unpin(&self) {
let guard_count = self.guard_count.get();
self.guard_count.set(guard_count - 1);
if guard_count == 1 {
self.epoch.store(Epoch::starting(), Ordering::Release);
if self.handle_count.get() == 0 {
self.finalize();
}
}
}
#[inline]
pub fn repin(&self) {
let guard_count = self.guard_count.get();
if guard_count == 1 {
let epoch = self.epoch.load(Ordering::Relaxed);
let global_epoch = self.global().epoch.load(Ordering::Relaxed);
if epoch != global_epoch {
self.epoch.store(global_epoch, Ordering::Release);
}
}
}
#[inline]
pub fn acquire_handle(&self) {
let handle_count = self.handle_count.get();
debug_assert!(handle_count >= 1);
self.handle_count.set(handle_count + 1);
}
#[inline]
pub fn release_handle(&self) {
let guard_count = self.guard_count.get();
let handle_count = self.handle_count.get();
debug_assert!(handle_count >= 1);
self.handle_count.set(handle_count - 1);
if guard_count == 0 && handle_count == 1 {
self.finalize();
}
}
#[cold]
fn finalize(&self) {
debug_assert_eq!(self.guard_count.get(), 0);
debug_assert_eq!(self.handle_count.get(), 0);
self.handle_count.set(1);
unsafe {
let guard = &self.pin();
self.global().push_bag(&mut *self.bag.get(), guard);
}
self.handle_count.set(0);
unsafe {
let global: Arc<Global> = ptr::read(&**self.global.get());
self.entry.delete(&unprotected());
drop(global);
}
}
}
impl IsElement<Local> for Local {
fn entry_of(local: &Local) -> &Entry {
let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
unsafe { &*entry_ptr }
}
unsafe fn element_of(entry: &Entry) -> &Local {
#[allow(unused_unsafe)]
let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
&*local_ptr
}
unsafe fn finalize(entry: &Entry) {
let local = Self::element_of(entry);
drop(Box::from_raw(local as *const Local as *mut Local));
}
}