use std::cell::Cell;
use std::mem;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
use epoch::{Atomic, Owned, Ptr};
use epoch::garbage::{self, Bag, EPOCH};
thread_local! {
static HARNESS: Harness = Harness {
thread: Thread::register(),
is_pinned: Cell::new(false),
pin_count: Cell::new(0),
bag: Cell::new(Box::into_raw(Box::new(Bag::new()))),
};
}
struct Harness {
thread: *const Thread,
is_pinned: Cell<bool>,
pin_count: Cell<usize>,
bag: Cell<*mut Bag>,
}
impl Drop for Harness {
fn drop(&mut self) {
let thread = unsafe { &*self.thread };
let pin = &Scope { bag: &self.bag };
thread.set_pinned(pin);
try_advance(pin);
garbage::collect(pin);
let bag = unsafe { Box::from_raw(self.bag.get()) };
garbage::push(bag, pin);
thread.set_unpinned();
thread.unregister();
}
}
struct Thread {
state: AtomicUsize,
next: Atomic<Thread>,
}
impl Thread {
#[inline]
fn set_pinned(&self, _pin: &Scope) {
let epoch = EPOCH.load(Relaxed);
let state = epoch | 1;
if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
let previous = self.state.load(Relaxed);
self.state.compare_and_swap(previous, state, SeqCst);
} else {
self.state.store(state, Relaxed);
::std::sync::atomic::fence(SeqCst);
}
}
#[inline]
fn set_unpinned(&self) {
self.state.store(0, Release);
}
fn register() -> *const Thread {
let list = participants();
let mut new = Owned::new(Thread {
state: AtomicUsize::new(0),
next: Atomic::null(),
});
let pin = unsafe { &mem::zeroed::<Scope>() };
let mut head = list.load(Acquire, pin);
loop {
new.next.store(head, Relaxed);
match list.compare_and_swap_weak_owned(head, new, AcqRel, pin) {
Ok(n) => return n.as_raw(),
Err((h, n)) => {
head = h;
new = n;
}
}
}
}
fn unregister(&self) {
let pin = unsafe { &mem::zeroed::<Scope>() };
let mut next = self.next.load(Acquire, pin);
while next.tag() == 0 {
match self.next.compare_and_swap(next, next.with_tag(1), AcqRel, pin) {
Ok(()) => break,
Err(n) => next = n,
}
}
}
}
fn participants() -> &'static Atomic<Thread> {
static PARTICIPANTS: AtomicUsize = ATOMIC_USIZE_INIT;
unsafe { &*(&PARTICIPANTS as *const _ as *const _) }
}
#[cold]
pub fn try_advance(scope: &Scope) {
let epoch = EPOCH.load(SeqCst);
let mut pred = participants();
let mut curr = pred.load(Acquire, scope);
while let Some(c) = unsafe { curr.as_ref() } {
let succ = c.next.load(Acquire, scope);
if succ.tag() == 1 {
let succ = succ.with_tag(0);
if pred.compare_and_swap(curr, succ, AcqRel, scope).is_err() {
return;
}
unsafe { scope.defer_free(curr) }
curr = succ;
} else {
let thread_state = c.state.load(SeqCst);
let thread_is_pinned = thread_state & 1 == 1;
let thread_epoch = thread_state & !1;
if thread_is_pinned && thread_epoch != epoch {
return;
}
pred = &c.next;
curr = succ;
}
}
EPOCH.compare_and_swap(epoch, epoch.wrapping_add(2), SeqCst);
}
#[derive(Debug)]
pub struct Scope {
bag: *const Cell<*mut Bag>, }
impl Scope {
pub unsafe fn defer_free<T>(&self, ptr: Ptr<T>) {
let object = ptr.as_raw();
let count = 1;
unsafe fn free<T>(ptr: *mut T, count: usize) {
drop(Vec::from_raw_parts(ptr as *mut T, 0, count));
}
loop {
let cell = &*self.bag;
let bag = cell.get();
if (*bag).try_insert(free::<T>, object, count) {
break;
}
self.flush();
}
}
pub unsafe fn defer_drop<T>(&self, ptr: Ptr<T>) {
let object = ptr.as_raw();
let count = 1;
unsafe fn destruct<T>(ptr: *mut T, count: usize) {
drop(Vec::from_raw_parts(ptr, count, count));
}
loop {
let cell = &*self.bag;
let bag = cell.get();
if (*bag).try_insert(destruct::<T>, object, count) {
break;
}
self.flush();
}
}
pub fn flush(&self) {
unsafe {
let cell = &*self.bag;
let bag = cell.get();
if !(*bag).is_empty() {
cell.set(Box::into_raw(Box::new(Bag::new())));
let bag = Box::from_raw(bag);
garbage::push(bag, self);
try_advance(self);
garbage::collect(self);
}
}
}
}
pub fn pin<F, T>(f: F) -> T
where
F: FnOnce(&Scope) -> T
{
const PINS_BETWEEN_COLLECT: usize = 128;
HARNESS.with(|harness| {
let thread = unsafe { &*harness.thread };
let pin = &Scope { bag: &harness.bag };
let was_pinned = harness.is_pinned.get();
if !was_pinned {
harness.is_pinned.set(true);
thread.set_pinned(pin);
let count = harness.pin_count.get();
harness.pin_count.set(count.wrapping_add(1));
if count % PINS_BETWEEN_COLLECT == 0 {
try_advance(pin);
garbage::collect(pin);
}
}
defer! {
if !was_pinned {
thread.set_unpinned();
harness.is_pinned.set(false);
}
}
f(pin)
})
}
pub unsafe fn unprotected<F, T>(f: F) -> T
where
F: FnOnce(&Scope) -> T
{
let pin = &Scope { bag: ::std::ptr::null() };
f(pin)
}
#[inline]
pub fn is_pinned() -> bool {
HARNESS.with(|harness| harness.is_pinned.get())
}
#[cfg(test)]
mod tests {
use std::thread;
use std::sync::atomic::Ordering::SeqCst;
use epoch::{self, Owned};
use epoch::garbage::EPOCH;
use epoch::thread::{HARNESS, try_advance};
#[test]
fn pin_reentrant() {
assert!(!epoch::is_pinned());
epoch::pin(|_| {
assert!(epoch::is_pinned());
epoch::pin(|_| {
assert!(epoch::is_pinned());
});
assert!(epoch::is_pinned());
});
assert!(!epoch::is_pinned());
}
#[test]
fn flush_local_garbage() {
for _ in 0..100 {
epoch::pin(|scope| {
unsafe {
let a = Owned::new(7).into_ptr(scope);
scope.defer_free(a);
HARNESS.with(|h| {
assert!(!(*h.bag.get()).is_empty());
while !(*h.bag.get()).is_empty() {
scope.flush();
}
});
}
});
}
}
#[test]
fn garbage_buffering() {
HARNESS.with(|h| unsafe {
while !(*h.bag.get()).is_empty() {
epoch::pin(|scope| scope.flush());
}
epoch::pin(|scope| {
for _ in 0..10 {
let a = Owned::new(7).into_ptr(scope);
scope.defer_free(a);
}
assert!(!(*h.bag.get()).is_empty());
});
});
}
#[test]
fn pin_holds_advance() {
let threads = (0..8).map(|_| {
thread::spawn(|| {
for _ in 0..500_000 {
epoch::pin(|scope| {
let before = EPOCH.load(SeqCst);
try_advance(scope);
let after = EPOCH.load(SeqCst);
assert!(after.wrapping_sub(before) <= 2);
});
}
})
}).collect::<Vec<_>>();
for t in threads {
t.join().unwrap();
}
}
}