#[cfg(feature = "nightly")]
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
#[cfg(not(feature = "nightly"))]
use stable::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use std::cell::{Cell, UnsafeCell};
use std::ptr;
use std::mem;
use smallvec::SmallVec;
use rand::{self, XorShiftRng, Rng};
use thread_parker::ThreadParker;
use word_lock::WordLock;
use util::UncheckedOptionExt;
static NUM_THREADS: AtomicUsize = ATOMIC_USIZE_INIT;
static HASHTABLE: AtomicUsize = ATOMIC_USIZE_INIT;
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
const LOAD_FACTOR: usize = 3;
struct HashTable {
entries: Box<[Bucket]>,
hash_bits: u32,
_prev: *const HashTable,
}
impl HashTable {
fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
let bucket = Bucket {
mutex: WordLock::new(),
queue_head: Cell::new(ptr::null()),
queue_tail: Cell::new(ptr::null()),
fair_timeout: UnsafeCell::new(FairTimeout::new()),
_padding: unsafe { mem::uninitialized() },
};
Box::new(HashTable {
entries: vec![bucket; new_size].into_boxed_slice(),
hash_bits: hash_bits,
_prev: prev,
})
}
}
struct Bucket {
mutex: WordLock,
queue_head: Cell<*const ThreadData>,
queue_tail: Cell<*const ThreadData>,
fair_timeout: UnsafeCell<FairTimeout>,
_padding: [u8; 64],
}
impl Clone for Bucket {
fn clone(&self) -> Bucket {
Bucket {
mutex: WordLock::new(),
queue_head: Cell::new(ptr::null()),
queue_tail: Cell::new(ptr::null()),
fair_timeout: UnsafeCell::new(FairTimeout::new()),
_padding: unsafe { mem::uninitialized() },
}
}
}
struct FairTimeout {
timeout: Instant,
rng: XorShiftRng,
}
impl FairTimeout {
fn new() -> FairTimeout {
FairTimeout {
timeout: Instant::now(),
rng: rand::weak_rng(),
}
}
fn should_timeout(&mut self) -> bool {
let now = Instant::now();
if now > self.timeout {
self.timeout = now + Duration::new(0, self.rng.gen_range(0, 1000000));
true
} else {
false
}
}
}
struct ThreadData {
parker: ThreadParker,
key: AtomicUsize,
next_in_queue: Cell<*const ThreadData>,
unpark_token: Cell<UnparkToken>,
park_token: Cell<ParkToken>,
}
impl ThreadData {
fn new() -> ThreadData {
let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
unsafe {
grow_hashtable(num_threads);
}
ThreadData {
parker: ThreadParker::new(),
key: AtomicUsize::new(0),
next_in_queue: Cell::new(ptr::null()),
unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
park_token: Cell::new(DEFAULT_PARK_TOKEN),
}
}
}
impl Drop for ThreadData {
fn drop(&mut self) {
NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
}
}
unsafe fn get_hashtable() -> *const HashTable {
let mut table = HASHTABLE.load(Ordering::Acquire);
if table == 0 {
let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
match HASHTABLE.compare_exchange(0,
new_table as usize,
Ordering::Release,
Ordering::Relaxed) {
Ok(_) => return new_table,
Err(x) => table = x,
}
Box::from_raw(new_table);
}
table as *const HashTable
}
unsafe fn grow_hashtable(num_threads: usize) {
if HASHTABLE.load(Ordering::Relaxed) == 0 {
let new_table = Box::into_raw(HashTable::new(num_threads, ptr::null()));
if HASHTABLE.compare_exchange(0, new_table as usize, Ordering::Release, Ordering::Relaxed)
.is_ok() {
return;
}
Box::from_raw(new_table);
}
let mut old_table;
loop {
old_table = HASHTABLE.load(Ordering::Acquire) as *mut HashTable;
if (*old_table).entries.len() >= LOAD_FACTOR * num_threads {
return;
}
for b in &(*old_table).entries[..] {
b.mutex.lock();
}
if HASHTABLE.load(Ordering::Relaxed) == old_table as usize {
break;
}
for b in &(*old_table).entries[..] {
b.mutex.unlock();
}
}
let new_table = HashTable::new(num_threads, old_table);
for b in &(*old_table).entries[..] {
let mut current = b.queue_head.get();
while !current.is_null() {
let next = (*current).next_in_queue.get();
let hash = hash((*current).key.load(Ordering::Relaxed), new_table.hash_bits);
if new_table.entries[hash].queue_tail.get().is_null() {
new_table.entries[hash].queue_head.set(current);
} else {
(*new_table.entries[hash].queue_tail.get()).next_in_queue.set(current);
}
new_table.entries[hash].queue_tail.set(current);
(*current).next_in_queue.set(ptr::null());
current = next;
}
}
HASHTABLE.store(Box::into_raw(new_table) as usize, Ordering::Release);
for b in &(*old_table).entries[..] {
b.mutex.unlock();
}
}
#[cfg(target_pointer_width = "32")]
fn hash(key: usize, bits: u32) -> usize {
key.wrapping_mul(0x9E3779B9) >> (32 - bits)
}
#[cfg(target_pointer_width = "64")]
fn hash(key: usize, bits: u32) -> usize {
key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
}
unsafe fn lock_bucket<'a>(key: usize) -> &'a Bucket {
let mut bucket;
loop {
let hashtable = get_hashtable();
let hash = hash(key, (*hashtable).hash_bits);
bucket = &(*hashtable).entries[hash];
bucket.mutex.lock();
if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
return bucket;
}
bucket.mutex.unlock();
}
}
unsafe fn lock_bucket_checked<'a>(key: &AtomicUsize) -> (usize, &'a Bucket) {
let mut bucket;
loop {
let hashtable = get_hashtable();
let current_key = key.load(Ordering::Relaxed);
let hash = hash(current_key, (*hashtable).hash_bits);
bucket = &(*hashtable).entries[hash];
bucket.mutex.lock();
if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize &&
key.load(Ordering::Relaxed) == current_key {
return (current_key, bucket);
}
bucket.mutex.unlock();
}
}
unsafe fn lock_bucket_pair<'a>(key1: usize, key2: usize) -> (&'a Bucket, &'a Bucket) {
let mut bucket1;
loop {
let hashtable = get_hashtable();
let hash1 = hash(key1, (*hashtable).hash_bits);
let hash2 = hash(key2, (*hashtable).hash_bits);
if hash1 <= hash2 {
bucket1 = &(*hashtable).entries[hash1];
} else {
bucket1 = &(*hashtable).entries[hash2];
}
bucket1.mutex.lock();
if HASHTABLE.load(Ordering::Relaxed) == hashtable as usize {
if hash1 == hash2 {
return (bucket1, bucket1);
} else if hash1 < hash2 {
let bucket2 = &(*hashtable).entries[hash2];
bucket2.mutex.lock();
return (bucket1, bucket2);
} else {
let bucket2 = &(*hashtable).entries[hash1];
bucket2.mutex.lock();
return (bucket2, bucket1);
}
}
bucket1.mutex.unlock();
}
}
unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
if bucket1 as *const _ == bucket2 as *const _ {
bucket1.mutex.unlock();
} else if bucket1 as *const _ < bucket2 as *const _ {
bucket2.mutex.unlock();
bucket1.mutex.unlock();
} else {
bucket1.mutex.unlock();
bucket2.mutex.unlock();
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum ParkResult {
Unparked(UnparkToken),
Invalid,
TimedOut,
}
impl ParkResult {
pub fn is_unparked(self) -> bool {
if let ParkResult::Unparked(_) = self {
true
} else {
false
}
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct UnparkResult {
pub unparked_threads: usize,
pub have_more_threads: bool,
pub be_fair: bool,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum RequeueOp {
Abort,
UnparkOneRequeueRest,
RequeueAll,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub enum FilterOp {
Unpark,
Skip,
Stop,
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct UnparkToken(pub usize);
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub struct ParkToken(pub usize);
pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
#[inline]
pub unsafe fn park<V, B, T>(key: usize,
validate: V,
before_sleep: B,
timed_out: T,
park_token: ParkToken,
timeout: Option<Instant>)
-> ParkResult
where V: FnOnce() -> bool,
B: FnOnce(),
T: FnOnce(usize, bool)
{
let mut v = Some(validate);
let mut b = Some(before_sleep);
let mut t = Some(timed_out);
park_internal(key,
&mut || v.take().unchecked_unwrap()(),
&mut || b.take().unchecked_unwrap()(),
&mut |key, was_last_thread| t.take().unchecked_unwrap()(key, was_last_thread),
park_token,
timeout)
}
unsafe fn park_internal(key: usize,
validate: &mut FnMut() -> bool,
before_sleep: &mut FnMut(),
timed_out: &mut FnMut(usize, bool),
park_token: ParkToken,
timeout: Option<Instant>)
-> ParkResult {
let thread_data = &*THREAD_DATA.with(|x| x as *const ThreadData);
let bucket = lock_bucket(key);
if !validate() {
bucket.mutex.unlock();
return ParkResult::Invalid;
}
thread_data.next_in_queue.set(ptr::null());
thread_data.key.store(key, Ordering::Relaxed);
thread_data.park_token.set(park_token);
thread_data.parker.prepare_park();
if !bucket.queue_head.get().is_null() {
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
} else {
bucket.queue_head.set(thread_data);
}
bucket.queue_tail.set(thread_data);
bucket.mutex.unlock();
before_sleep();
let unparked = match timeout {
Some(timeout) => thread_data.parker.park_until(timeout),
None => {
thread_data.parker.park();
true
}
};
if unparked {
return ParkResult::Unparked(thread_data.unpark_token.get());
}
let (key, bucket) = lock_bucket_checked(&thread_data.key);
if !thread_data.parker.timed_out() {
bucket.mutex.unlock();
return ParkResult::Unparked(thread_data.unpark_token.get());
}
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
while !current.is_null() {
if current == thread_data {
let next = (*current).next_in_queue.get();
link.set(next);
let mut was_last_thread = true;
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
} else {
let mut scan = next;
while !scan.is_null() {
if (*scan).key.load(Ordering::Relaxed) == key {
was_last_thread = false;
break;
}
scan = (*scan).next_in_queue.get();
}
}
timed_out(key, was_last_thread);
break;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
debug_assert!(!current.is_null());
bucket.mutex.unlock();
ParkResult::TimedOut
}
#[inline]
pub unsafe fn unpark_one<C>(key: usize, callback: C) -> UnparkResult
where C: FnOnce(UnparkResult) -> UnparkToken
{
let mut c = Some(callback);
unpark_one_internal(key, &mut |result| c.take().unchecked_unwrap()(result))
}
unsafe fn unpark_one_internal(key: usize,
callback: &mut FnMut(UnparkResult) -> UnparkToken)
-> UnparkResult {
let bucket = lock_bucket(key);
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
let mut result = UnparkResult {
unparked_threads: 0,
have_more_threads: false,
be_fair: false,
};
while !current.is_null() {
if (*current).key.load(Ordering::Relaxed) == key {
let next = (*current).next_in_queue.get();
link.set(next);
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
} else {
let mut scan = next;
while !scan.is_null() {
if (*scan).key.load(Ordering::Relaxed) == key {
result.have_more_threads = true;
break;
}
scan = (*scan).next_in_queue.get();
}
}
result.unparked_threads = 1;
result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
let token = callback(result);
(*current).unpark_token.set(token);
let handle = (*current).parker.unpark_lock();
bucket.mutex.unlock();
handle.unpark();
return result;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
callback(result);
bucket.mutex.unlock();
result
}
pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
let bucket = lock_bucket(key);
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
let mut threads = SmallVec::<[_; 8]>::new();
while !current.is_null() {
if (*current).key.load(Ordering::Relaxed) == key {
let next = (*current).next_in_queue.get();
link.set(next);
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
}
(*current).unpark_token.set(unpark_token);
threads.push((*current).parker.unpark_lock());
current = next;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
bucket.mutex.unlock();
let num_threads = threads.len();
for handle in threads.into_iter() {
handle.unpark();
}
num_threads
}
#[inline]
pub unsafe fn unpark_requeue<V, C>(key_from: usize,
key_to: usize,
validate: V,
callback: C)
-> UnparkResult
where V: FnOnce() -> RequeueOp,
C: FnOnce(RequeueOp, UnparkResult) -> UnparkToken
{
let mut v = Some(validate);
let mut c = Some(callback);
unpark_requeue_internal(key_from,
key_to,
&mut || v.take().unchecked_unwrap()(),
&mut |op, r| c.take().unchecked_unwrap()(op, r))
}
unsafe fn unpark_requeue_internal(key_from: usize,
key_to: usize,
validate: &mut FnMut() -> RequeueOp,
callback: &mut FnMut(RequeueOp, UnparkResult) -> UnparkToken)
-> UnparkResult {
let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
let mut result = UnparkResult {
unparked_threads: 0,
have_more_threads: false,
be_fair: false,
};
let op = validate();
if op == RequeueOp::Abort {
unlock_bucket_pair(bucket_from, bucket_to);
return result;
}
let mut link = &bucket_from.queue_head;
let mut current = bucket_from.queue_head.get();
let mut previous = ptr::null();
let mut requeue_threads = ptr::null();
let mut requeue_threads_tail: *const ThreadData = ptr::null();
let mut wakeup_thread = None;
while !current.is_null() {
if (*current).key.load(Ordering::Relaxed) == key_from {
let next = (*current).next_in_queue.get();
link.set(next);
if bucket_from.queue_tail.get() == current {
bucket_from.queue_tail.set(previous);
}
if op == RequeueOp::UnparkOneRequeueRest && wakeup_thread.is_none() {
wakeup_thread = Some(current);
result.unparked_threads = 1;
} else {
if !requeue_threads.is_null() {
(*requeue_threads_tail).next_in_queue.set(current);
} else {
requeue_threads = current;
}
requeue_threads_tail = current;
(*current).key.store(key_to, Ordering::Relaxed);
result.have_more_threads = true;
}
current = next;
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
if !requeue_threads.is_null() {
(*requeue_threads_tail).next_in_queue.set(ptr::null());
if !bucket_to.queue_head.get().is_null() {
(*bucket_to.queue_tail.get()).next_in_queue.set(requeue_threads);
} else {
bucket_to.queue_head.set(requeue_threads);
}
bucket_to.queue_tail.set(requeue_threads_tail);
}
if result.unparked_threads != 0 {
result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
}
let token = callback(op, result);
if let Some(wakeup_thread) = wakeup_thread {
(*wakeup_thread).unpark_token.set(token);
let handle = (*wakeup_thread).parker.unpark_lock();
unlock_bucket_pair(bucket_from, bucket_to);
handle.unpark();
} else {
unlock_bucket_pair(bucket_from, bucket_to);
}
result
}
#[inline]
pub unsafe fn unpark_filter<F, C>(key: usize, mut filter: F, callback: C) -> UnparkResult
where F: FnMut(ParkToken) -> FilterOp,
C: FnOnce(UnparkResult) -> UnparkToken
{
let mut c = Some(callback);
unpark_filter_internal(key, &mut filter, &mut |r| c.take().unchecked_unwrap()(r))
}
unsafe fn unpark_filter_internal(key: usize,
filter: &mut FnMut(ParkToken) -> FilterOp,
callback: &mut FnMut(UnparkResult) -> UnparkToken)
-> UnparkResult {
let bucket = lock_bucket(key);
let mut link = &bucket.queue_head;
let mut current = bucket.queue_head.get();
let mut previous = ptr::null();
let mut threads = SmallVec::<[_; 8]>::new();
let mut result = UnparkResult {
unparked_threads: 0,
have_more_threads: false,
be_fair: false,
};
while !current.is_null() {
if (*current).key.load(Ordering::Relaxed) == key {
let next = (*current).next_in_queue.get();
match filter((*current).park_token.get()) {
FilterOp::Unpark => {
link.set(next);
if bucket.queue_tail.get() == current {
bucket.queue_tail.set(previous);
}
threads.push((current, None));
current = next;
}
FilterOp::Skip => {
result.have_more_threads = true;
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
FilterOp::Stop => {
result.have_more_threads = true;
break;
}
}
} else {
link = &(*current).next_in_queue;
previous = current;
current = link.get();
}
}
result.unparked_threads = threads.len();
if result.unparked_threads != 0 {
result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
}
let token = callback(result);
for t in threads.iter_mut() {
(*t.0).unpark_token.set(token);
t.1 = Some((*t.0).parker.unpark_lock());
}
bucket.mutex.unlock();
for (_, handle) in threads.into_iter() {
handle.unchecked_unwrap().unpark();
}
result
}