use Configuration;
use join;
use {scope, Scope};
use spawn;
use std::sync::Arc;
use std::error::Error;
use std::fmt;
use registry::{Registry, WorkerThread};
mod internal;
mod test;
pub struct ThreadPool {
registry: Arc<Registry>,
}
impl ThreadPool {
pub fn new(configuration: Configuration) -> Result<ThreadPool, Box<Error>> {
let registry = try!(Registry::new(configuration));
Ok(ThreadPool { registry: registry })
}
#[cfg(rayon_unstable)]
pub fn global() -> &'static Arc<ThreadPool> {
lazy_static! {
static ref DEFAULT_THREAD_POOL: Arc<ThreadPool> =
Arc::new(ThreadPool { registry: Registry::global() });
}
&DEFAULT_THREAD_POOL
}
pub fn install<OP, R>(&self, op: OP) -> R
where OP: FnOnce() -> R + Send,
R: Send
{
self.registry.in_worker(|_, _| op())
}
#[inline]
pub fn current_num_threads(&self) -> usize {
self.registry.num_threads()
}
#[inline]
pub fn current_thread_index(&self) -> Option<usize> {
unsafe {
let curr = WorkerThread::current();
if curr.is_null() {
None
} else if (*curr).registry().id() != self.registry.id() {
None
} else {
Some((*curr).index())
}
}
}
#[inline]
pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
unsafe {
let curr = WorkerThread::current();
if curr.is_null() {
None
} else if (*curr).registry().id() != self.registry.id() {
None
} else {
Some(!(*curr).local_deque_is_empty())
}
}
}
pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
where A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send
{
self.install(|| join(oper_a, oper_b))
}
pub fn scope<'scope, OP, R>(&self, op: OP) -> R
where OP: for<'s> FnOnce(&'s Scope<'scope>) -> R + 'scope + Send, R: Send
{
self.install(|| scope(op))
}
pub fn spawn<OP>(&self, op: OP)
where OP: FnOnce() + Send + 'static
{
unsafe { spawn::spawn_in(op, &self.registry) }
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
self.registry.terminate();
}
}
impl fmt::Debug for ThreadPool {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ThreadPool")
.field("num_threads", &self.current_num_threads())
.field("id", &self.registry.id())
.finish()
}
}
#[inline]
pub fn current_thread_index() -> Option<usize> {
unsafe {
let curr = WorkerThread::current();
if curr.is_null() {
None
} else {
Some((*curr).index())
}
}
}
#[inline]
pub fn current_thread_has_pending_tasks() -> Option<bool> {
unsafe {
let curr = WorkerThread::current();
if curr.is_null() {
None
} else {
Some(!(*curr).local_deque_is_empty())
}
}
}