use crate::latch::CoreLatch;
use crate::sync::{Condvar, Mutex};
use crossbeam_utils::CachePadded;
use std::sync::atomic::Ordering;
use std::thread;
mod counters;
pub(crate) use self::counters::THREADS_MAX;
use self::counters::{AtomicCounters, JobsEventCounter};
pub(super) struct Sleep {
worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
counters: AtomicCounters,
}
pub(super) struct IdleState {
worker_index: usize,
rounds: u32,
jobs_counter: JobsEventCounter,
}
#[derive(Default)]
struct WorkerSleepState {
is_blocked: Mutex<bool>,
condvar: Condvar,
}
const ROUNDS_UNTIL_SLEEPY: u32 = 32;
const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
impl Sleep {
pub(super) fn new(n_threads: usize) -> Sleep {
assert!(n_threads <= THREADS_MAX);
Sleep {
worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
counters: AtomicCounters::new(),
}
}
#[inline]
pub(super) fn start_looking(&self, worker_index: usize) -> IdleState {
self.counters.add_inactive_thread();
IdleState {
worker_index,
rounds: 0,
jobs_counter: JobsEventCounter::DUMMY,
}
}
#[inline]
pub(super) fn work_found(&self) {
let threads_to_wake = self.counters.sub_inactive_thread();
self.wake_any_threads(threads_to_wake as u32);
}
#[inline]
pub(super) fn no_work_found(
&self,
idle_state: &mut IdleState,
latch: &CoreLatch,
has_injected_jobs: impl FnOnce() -> bool,
) {
if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
thread::yield_now();
idle_state.rounds += 1;
} else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
idle_state.jobs_counter = self.announce_sleepy();
idle_state.rounds += 1;
thread::yield_now();
} else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
idle_state.rounds += 1;
thread::yield_now();
} else {
debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
self.sleep(idle_state, latch, has_injected_jobs);
}
}
#[cold]
fn announce_sleepy(&self) -> JobsEventCounter {
self.counters
.increment_jobs_event_counter_if(JobsEventCounter::is_active)
.jobs_counter()
}
#[cold]
fn sleep(
&self,
idle_state: &mut IdleState,
latch: &CoreLatch,
has_injected_jobs: impl FnOnce() -> bool,
) {
let worker_index = idle_state.worker_index;
if !latch.get_sleepy() {
return;
}
let sleep_state = &self.worker_sleep_states[worker_index];
let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
debug_assert!(!*is_blocked);
if !latch.fall_asleep() {
idle_state.wake_fully();
return;
}
loop {
let counters = self.counters.load(Ordering::SeqCst);
debug_assert!(idle_state.jobs_counter.is_sleepy());
if counters.jobs_counter() != idle_state.jobs_counter {
idle_state.wake_partly();
latch.wake_up();
return;
}
if self.counters.try_add_sleeping_thread(counters) {
break;
}
}
std::sync::atomic::fence(Ordering::SeqCst);
if has_injected_jobs() {
self.counters.sub_sleeping_thread();
} else {
*is_blocked = true;
while *is_blocked {
is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
}
}
idle_state.wake_fully();
latch.wake_up();
}
pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
self.wake_specific_thread(target_worker_index);
}
#[inline]
pub(super) fn new_injected_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
std::sync::atomic::fence(Ordering::SeqCst);
self.new_jobs(num_jobs, queue_was_empty)
}
#[inline]
pub(super) fn new_internal_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
self.new_jobs(num_jobs, queue_was_empty)
}
#[inline]
fn new_jobs(&self, num_jobs: u32, queue_was_empty: bool) {
let counters = self
.counters
.increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
let num_awake_but_idle = counters.awake_but_idle_threads();
let num_sleepers = counters.sleeping_threads();
if num_sleepers == 0 {
return;
}
let num_awake_but_idle = num_awake_but_idle as u32;
let num_sleepers = num_sleepers as u32;
if !queue_was_empty {
let num_to_wake = Ord::min(num_jobs, num_sleepers);
self.wake_any_threads(num_to_wake);
} else if num_awake_but_idle < num_jobs {
let num_to_wake = Ord::min(num_jobs - num_awake_but_idle, num_sleepers);
self.wake_any_threads(num_to_wake);
}
}
#[cold]
fn wake_any_threads(&self, mut num_to_wake: u32) {
if num_to_wake > 0 {
for i in 0..self.worker_sleep_states.len() {
if self.wake_specific_thread(i) {
num_to_wake -= 1;
if num_to_wake == 0 {
return;
}
}
}
}
}
fn wake_specific_thread(&self, index: usize) -> bool {
let sleep_state = &self.worker_sleep_states[index];
let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
if *is_blocked {
*is_blocked = false;
sleep_state.condvar.notify_one();
self.counters.sub_sleeping_thread();
true
} else {
false
}
}
}
impl IdleState {
fn wake_fully(&mut self) {
self.rounds = 0;
self.jobs_counter = JobsEventCounter::DUMMY;
}
fn wake_partly(&mut self) {
self.rounds = ROUNDS_UNTIL_SLEEPY;
self.jobs_counter = JobsEventCounter::DUMMY;
}
}