use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{Ordering, AtomicUsize};
use std::collections::VecDeque;
use std::time::Duration;
use std::thread;
pub struct TaskPool {
sharing: Arc<Sharing>,
}
struct Sharing {
todo: Mutex<VecDeque<Box<FnMut() + Send>>>,
condvar: Condvar,
active_tasks: AtomicUsize,
waiting_tasks: AtomicUsize,
}
static MIN_THREADS: usize = 4;
struct Registration<'a> {
nb: &'a AtomicUsize
}
impl<'a> Registration<'a> {
fn new(nb: &'a AtomicUsize) -> Registration<'a> {
nb.fetch_add(1, Ordering::Release);
Registration { nb: nb }
}
}
impl<'a> Drop for Registration<'a> {
fn drop(&mut self) {
self.nb.fetch_sub(1, Ordering::Release);
}
}
impl TaskPool {
pub fn new() -> TaskPool {
let pool = TaskPool {
sharing: Arc::new(Sharing {
todo: Mutex::new(VecDeque::new()),
condvar: Condvar::new(),
active_tasks: AtomicUsize::new(0),
waiting_tasks: AtomicUsize::new(0),
}),
};
for _ in 0..MIN_THREADS {
pool.add_thread(None)
}
pool
}
pub fn spawn(&self, code: Box<FnMut() + Send>) {
let mut queue = self.sharing.todo.lock().unwrap();
if self.sharing.waiting_tasks.load(Ordering::Acquire) == 0 {
self.add_thread(Some(code));
} else {
queue.push_back(code);
self.sharing.condvar.notify_one();
}
}
fn add_thread(&self, initial_fn: Option<Box<FnMut() + Send>>) {
let sharing = self.sharing.clone();
thread::spawn(move || {
let sharing = sharing;
let _active_guard = Registration::new(&sharing.active_tasks);
if initial_fn.is_some() {
let mut f = initial_fn.unwrap();
f();
}
loop {
let mut task: Box<FnMut() + Send> = {
let mut todo = sharing.todo.lock().unwrap();
let task;
loop {
if let Some(poped_task) = todo.pop_front() {
task = poped_task;
break;
}
let _waiting_guard = Registration::new(&sharing.waiting_tasks);
let received = if sharing.active_tasks.load(Ordering::Acquire)
<= MIN_THREADS
{
todo = sharing.condvar.wait(todo).unwrap();
true
} else {
let (new_lock, waitres) = sharing.condvar
.wait_timeout(todo, Duration::from_millis(5000))
.unwrap();
todo = new_lock;
!waitres.timed_out()
};
if !received && todo.is_empty() {
return;
}
}
task
};
task();
}
});
}
}
impl Drop for TaskPool {
fn drop(&mut self) {
self.sharing.active_tasks.store(999999999, Ordering::Release);
self.sharing.condvar.notify_all();
}
}