use crate::loom::sync::atomic::AtomicUsize;
use std::fmt;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::usize;
pub(super) struct State {
val: AtomicUsize,
}
#[derive(Copy, Clone)]
pub(super) struct Snapshot(usize);
const RUNNING: usize = 0b00_0001;
const NOTIFIED: usize = 0b00_0010;
const COMPLETE: usize = 0b00_0100;
const RELEASED: usize = 0b00_1000;
const JOIN_INTEREST: usize = 0b01_0000;
const JOIN_WAKER: usize = 0b10_0000;
const CANCELLED: usize = 0b100_0000;
const LIFECYCLE_MASK: usize =
RUNNING | NOTIFIED | COMPLETE | RELEASED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
const WAKER_COUNT_MASK: usize = usize::MAX - LIFECYCLE_MASK;
const WAKER_COUNT_SHIFT: usize = WAKER_COUNT_MASK.count_zeros() as usize;
const WAKER_ONE: usize = 1 << WAKER_COUNT_SHIFT;
const INITIAL_STATE: usize = NOTIFIED;
impl State {
pub(super) fn new_joinable() -> State {
State {
val: AtomicUsize::new(INITIAL_STATE | JOIN_INTEREST),
}
}
pub(super) fn load(&self) -> Snapshot {
Snapshot(self.val.load(Acquire))
}
pub(super) fn transition_to_running(&self) -> Snapshot {
const DELTA: usize = RUNNING | NOTIFIED;
let prev = Snapshot(self.val.fetch_xor(DELTA, Acquire));
assert!(prev.is_notified());
if prev.is_running() {
let prev = self.val.fetch_or(CANCELLED, AcqRel);
return Snapshot(prev | CANCELLED);
}
assert!(!prev.is_running());
let next = Snapshot(prev.0 ^ DELTA);
assert!(next.is_running());
assert!(!next.is_notified());
next
}
pub(super) fn transition_to_idle(&self) -> Snapshot {
const DELTA: usize = RUNNING;
let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
if !prev.is_running() {
let prev = self.val.fetch_or(CANCELLED, AcqRel);
return Snapshot(prev | CANCELLED);
}
let next = Snapshot(prev.0 ^ DELTA);
assert!(!next.is_running());
next
}
pub(super) fn transition_to_complete(&self) -> Snapshot {
const DELTA: usize = RUNNING | COMPLETE;
let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
assert!(!prev.is_complete());
let next = Snapshot(prev.0 ^ DELTA);
assert!(next.is_complete());
next
}
pub(super) fn transition_to_released(&self) -> Snapshot {
const DELTA: usize = RUNNING | COMPLETE | RELEASED;
let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
assert!(prev.is_running());
assert!(!prev.is_complete());
assert!(!prev.is_released());
let next = Snapshot(prev.0 ^ DELTA);
assert!(!next.is_running());
assert!(next.is_complete());
assert!(next.is_released());
next
}
pub(super) fn transition_to_canceled_from_queue(&self) -> Snapshot {
let prev = Snapshot(self.val.fetch_or(CANCELLED, AcqRel));
assert!(!prev.is_complete());
assert!(!prev.is_running() || prev.is_notified());
Snapshot(prev.0 | CANCELLED)
}
pub(super) fn transition_to_canceled_from_list(&self) -> Option<Snapshot> {
let mut prev = self.load();
loop {
if !prev.is_active() {
return None;
}
let mut next = prev;
if prev.is_running() {
next.0 -= RUNNING;
next.0 |= NOTIFIED;
} else if prev.is_notified() {
next.0 += RUNNING;
next.0 |= NOTIFIED;
} else {
next.0 |= CANCELLED;
}
let res = self.val.compare_exchange(prev.0, next.0, AcqRel, Acquire);
match res {
Ok(_) if next.is_canceled() => return Some(next),
Ok(_) => return None,
Err(actual) => {
prev = Snapshot(actual);
}
}
}
}
pub(super) fn release_task(&self) -> Snapshot {
use crate::loom::sync::atomic;
const DELTA: usize = RELEASED;
let prev = Snapshot(self.val.fetch_or(DELTA, Release));
assert!(!prev.is_released());
assert!(prev.is_terminal(), "state = {:?}", prev);
let next = Snapshot(prev.0 | DELTA);
assert!(next.is_released());
if next.is_final_ref() || (next.has_join_waker() && !next.is_join_interested()) {
atomic::fence(Acquire);
}
next
}
pub(super) fn transition_to_notified(&self) -> bool {
const MASK: usize = RUNNING | NOTIFIED | COMPLETE | CANCELLED;
let prev = self.val.fetch_or(NOTIFIED, Release);
prev & MASK == 0
}
pub(super) fn drop_join_handle_fast(&self) -> bool {
use std::sync::atomic::Ordering::Relaxed;
self.val
.compare_exchange_weak(
INITIAL_STATE | JOIN_INTEREST,
INITIAL_STATE,
Release,
Relaxed,
)
.is_ok()
}
pub(super) fn complete_join_handle(&self) -> Snapshot {
use crate::loom::sync::atomic;
const DELTA: usize = JOIN_INTEREST;
let prev = Snapshot(self.val.fetch_sub(DELTA, Release));
assert!(prev.is_join_interested());
let next = Snapshot(prev.0 - DELTA);
if !next.is_final_ref() {
return next;
}
atomic::fence(Acquire);
next
}
pub(super) fn drop_join_handle_slow(&self) -> Result<Snapshot, Snapshot> {
const MASK: usize = COMPLETE | CANCELLED;
let mut prev = self.val.load(Acquire);
loop {
if prev & MASK != 0 {
return Err(Snapshot(prev));
}
assert!(prev & JOIN_INTEREST == JOIN_INTEREST);
let next = (prev - JOIN_INTEREST) & !JOIN_WAKER;
let res = self.val.compare_exchange(prev, next, AcqRel, Acquire);
match res {
Ok(_) => {
return Ok(Snapshot(next));
}
Err(actual) => {
prev = actual;
}
}
}
}
pub(super) fn store_join_waker(&self) -> Snapshot {
use crate::loom::sync::atomic;
const DELTA: usize = JOIN_WAKER;
let prev = Snapshot(self.val.fetch_xor(DELTA, Release));
assert!(!prev.has_join_waker());
let next = Snapshot(prev.0 ^ DELTA);
assert!(next.has_join_waker());
if next.is_complete() {
atomic::fence(Acquire);
}
next
}
pub(super) fn unset_waker(&self) -> Snapshot {
const MASK: usize = COMPLETE | CANCELLED;
let mut prev = self.val.load(Acquire);
loop {
if prev & MASK != 0 {
return Snapshot(prev);
}
assert!(Snapshot(prev).has_join_waker());
let next = prev - JOIN_WAKER;
let res = self.val.compare_exchange(prev, next, AcqRel, Acquire);
match res {
Ok(_) => return Snapshot(next),
Err(actual) => {
prev = actual;
}
}
}
}
pub(super) fn ref_inc(&self) {
use std::process;
use std::sync::atomic::Ordering::Relaxed;
let prev = self.val.fetch_add(WAKER_ONE, Relaxed);
if prev > isize::max_value() as usize {
process::abort();
}
}
pub(super) fn ref_dec(&self) -> bool {
use crate::loom::sync::atomic;
let prev = self.val.fetch_sub(WAKER_ONE, Release);
let next = Snapshot(prev - WAKER_ONE);
if next.is_final_ref() {
atomic::fence(Acquire);
}
next.is_final_ref()
}
}
impl Snapshot {
pub(super) fn is_running(self) -> bool {
self.0 & RUNNING == RUNNING
}
pub(super) fn is_notified(self) -> bool {
self.0 & NOTIFIED == NOTIFIED
}
pub(super) fn is_released(self) -> bool {
self.0 & RELEASED == RELEASED
}
pub(super) fn is_complete(self) -> bool {
self.0 & COMPLETE == COMPLETE
}
pub(super) fn is_canceled(self) -> bool {
self.0 & CANCELLED == CANCELLED
}
pub(super) fn is_active(self) -> bool {
self.0 & (COMPLETE | CANCELLED) == 0
}
pub(super) fn is_terminal(self) -> bool {
!self.is_active() || (self.is_notified() && self.is_running())
}
pub(super) fn is_join_interested(self) -> bool {
self.0 & JOIN_INTEREST == JOIN_INTEREST
}
pub(super) fn has_join_waker(self) -> bool {
self.0 & JOIN_WAKER == JOIN_WAKER
}
pub(super) fn is_final_ref(self) -> bool {
const MASK: usize = WAKER_COUNT_MASK | RELEASED | JOIN_INTEREST;
(self.0 & MASK) == RELEASED
}
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use std::sync::atomic::Ordering::SeqCst;
let snapshot = Snapshot(self.val.load(SeqCst));
fmt.debug_struct("State")
.field("snapshot", &snapshot)
.finish()
}
}
impl fmt::Debug for Snapshot {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Snapshot")
.field("is_running", &self.is_running())
.field("is_notified", &self.is_notified())
.field("is_released", &self.is_released())
.field("is_complete", &self.is_complete())
.field("is_canceled", &self.is_canceled())
.field("is_join_interested", &self.is_join_interested())
.field("has_join_waker", &self.has_join_waker())
.field("is_final_ref", &self.is_final_ref())
.finish()
}
}