use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::usize;
use crate::registry::{Registry, WorkerThread};
pub(super) trait Latch {
fn set(&self);
}
pub(super) trait AsCoreLatch {
fn as_core_latch(&self) -> &CoreLatch;
}
const UNSET: usize = 0;
const SLEEPY: usize = 1;
const SLEEPING: usize = 2;
const SET: usize = 3;
#[derive(Debug)]
pub(super) struct CoreLatch {
state: AtomicUsize,
}
impl CoreLatch {
#[inline]
fn new() -> Self {
Self {
state: AtomicUsize::new(0),
}
}
#[inline]
pub(super) fn addr(&self) -> usize {
self as *const CoreLatch as usize
}
#[inline]
pub(super) fn get_sleepy(&self) -> bool {
self.state
.compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
}
#[inline]
pub(super) fn fall_asleep(&self) -> bool {
self.state
.compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
}
#[inline]
pub(super) fn wake_up(&self) {
if !self.probe() {
let _ =
self.state
.compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
}
}
#[inline]
fn set(&self) -> bool {
let old_state = self.state.swap(SET, Ordering::AcqRel);
old_state == SLEEPING
}
#[inline]
pub(super) fn probe(&self) -> bool {
self.state.load(Ordering::Acquire) == SET
}
}
pub(super) struct SpinLatch<'r> {
core_latch: CoreLatch,
registry: &'r Arc<Registry>,
target_worker_index: usize,
cross: bool,
}
impl<'r> SpinLatch<'r> {
#[inline]
pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
SpinLatch {
core_latch: CoreLatch::new(),
registry: thread.registry(),
target_worker_index: thread.index(),
cross: false,
}
}
#[inline]
pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
SpinLatch {
cross: true,
..SpinLatch::new(thread)
}
}
#[inline]
pub(super) fn probe(&self) -> bool {
self.core_latch.probe()
}
}
impl<'r> AsCoreLatch for SpinLatch<'r> {
#[inline]
fn as_core_latch(&self) -> &CoreLatch {
&self.core_latch
}
}
impl<'r> Latch for SpinLatch<'r> {
#[inline]
fn set(&self) {
let cross_registry;
let registry: &Registry = if self.cross {
cross_registry = Arc::clone(self.registry);
&cross_registry
} else {
self.registry
};
let target_worker_index = self.target_worker_index;
if self.core_latch.set() {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
}
#[derive(Debug)]
pub(super) struct LockLatch {
m: Mutex<bool>,
v: Condvar,
}
impl LockLatch {
#[inline]
pub(super) fn new() -> LockLatch {
LockLatch {
m: Mutex::new(false),
v: Condvar::new(),
}
}
pub(super) fn wait_and_reset(&self) {
let mut guard = self.m.lock().unwrap();
while !*guard {
guard = self.v.wait(guard).unwrap();
}
*guard = false;
}
pub(super) fn wait(&self) {
let mut guard = self.m.lock().unwrap();
while !*guard {
guard = self.v.wait(guard).unwrap();
}
}
}
impl Latch for LockLatch {
#[inline]
fn set(&self) {
let mut guard = self.m.lock().unwrap();
*guard = true;
self.v.notify_all();
}
}
#[derive(Debug)]
pub(super) struct CountLatch {
core_latch: CoreLatch,
counter: AtomicUsize,
}
impl CountLatch {
#[inline]
pub(super) fn new() -> CountLatch {
Self::with_count(1)
}
#[inline]
pub(super) fn with_count(n: usize) -> CountLatch {
CountLatch {
core_latch: CoreLatch::new(),
counter: AtomicUsize::new(n),
}
}
#[inline]
pub(super) fn increment(&self) {
debug_assert!(!self.core_latch.probe());
self.counter.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub(super) fn set(&self) -> bool {
if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
self.core_latch.set();
true
} else {
false
}
}
#[inline]
pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
if self.set() {
registry.notify_worker_latch_is_set(target_worker_index);
}
}
}
impl AsCoreLatch for CountLatch {
#[inline]
fn as_core_latch(&self) -> &CoreLatch {
&self.core_latch
}
}
#[derive(Debug)]
pub(super) struct CountLockLatch {
lock_latch: LockLatch,
counter: AtomicUsize,
}
impl CountLockLatch {
#[inline]
pub(super) fn with_count(n: usize) -> CountLockLatch {
CountLockLatch {
lock_latch: LockLatch::new(),
counter: AtomicUsize::new(n),
}
}
#[inline]
pub(super) fn increment(&self) {
let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
debug_assert!(old_counter != 0);
}
pub(super) fn wait(&self) {
self.lock_latch.wait();
}
}
impl Latch for CountLockLatch {
#[inline]
fn set(&self) {
if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
self.lock_latch.set();
}
}
}
impl<'a, L> Latch for &'a L
where
L: Latch,
{
#[inline]
fn set(&self) {
L::set(self);
}
}