use crate::broadcast::{self, BroadcastContext};
use crate::join;
use crate::registry::{Registry, ThreadSpawn, WorkerThread};
use crate::scope::{do_in_place_scope, do_in_place_scope_fifo};
use crate::spawn;
use crate::{scope, Scope};
use crate::{scope_fifo, ScopeFifo};
use crate::{ThreadPoolBuildError, ThreadPoolBuilder};
use std::error::Error;
use std::fmt;
use std::sync::Arc;
mod test;
pub struct ThreadPool {
registry: Arc<Registry>,
}
impl ThreadPool {
#[deprecated(note = "Use `ThreadPoolBuilder::build`")]
#[allow(deprecated)]
pub fn new(configuration: crate::Configuration) -> Result<ThreadPool, Box<dyn Error>> {
Self::build(configuration.into_builder()).map_err(Box::from)
}
pub(super) fn build<S>(
builder: ThreadPoolBuilder<S>,
) -> Result<ThreadPool, ThreadPoolBuildError>
where
S: ThreadSpawn,
{
let registry = Registry::new(builder)?;
Ok(ThreadPool { registry })
}
pub fn install<OP, R>(&self, op: OP) -> R
where
OP: FnOnce() -> R + Send,
R: Send,
{
self.registry.in_worker(|_, _| op())
}
pub fn broadcast<OP, R>(&self, op: OP) -> Vec<R>
where
OP: Fn(BroadcastContext<'_>) -> R + Sync,
R: Send,
{
unsafe { broadcast::broadcast_in(op, &self.registry) }
}
#[inline]
pub fn current_num_threads(&self) -> usize {
self.registry.num_threads()
}
#[inline]
pub fn current_thread_index(&self) -> Option<usize> {
let curr = self.registry.current_thread()?;
Some(curr.index())
}
#[inline]
pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
let curr = self.registry.current_thread()?;
Some(!curr.local_deque_is_empty())
}
pub fn join<A, B, RA, RB>(&self, oper_a: A, oper_b: B) -> (RA, RB)
where
A: FnOnce() -> RA + Send,
B: FnOnce() -> RB + Send,
RA: Send,
RB: Send,
{
self.install(|| join(oper_a, oper_b))
}
pub fn scope<'scope, OP, R>(&self, op: OP) -> R
where
OP: FnOnce(&Scope<'scope>) -> R + Send,
R: Send,
{
self.install(|| scope(op))
}
pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R
where
OP: FnOnce(&ScopeFifo<'scope>) -> R + Send,
R: Send,
{
self.install(|| scope_fifo(op))
}
pub fn in_place_scope<'scope, OP, R>(&self, op: OP) -> R
where
OP: FnOnce(&Scope<'scope>) -> R,
{
do_in_place_scope(Some(&self.registry), op)
}
pub fn in_place_scope_fifo<'scope, OP, R>(&self, op: OP) -> R
where
OP: FnOnce(&ScopeFifo<'scope>) -> R,
{
do_in_place_scope_fifo(Some(&self.registry), op)
}
pub fn spawn<OP>(&self, op: OP)
where
OP: FnOnce() + Send + 'static,
{
unsafe { spawn::spawn_in(op, &self.registry) }
}
pub fn spawn_fifo<OP>(&self, op: OP)
where
OP: FnOnce() + Send + 'static,
{
unsafe { spawn::spawn_fifo_in(op, &self.registry) }
}
pub fn spawn_broadcast<OP>(&self, op: OP)
where
OP: Fn(BroadcastContext<'_>) + Send + Sync + 'static,
{
unsafe { broadcast::spawn_broadcast_in(op, &self.registry) }
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
self.registry.terminate();
}
}
impl fmt::Debug for ThreadPool {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("ThreadPool")
.field("num_threads", &self.current_num_threads())
.field("id", &self.registry.id())
.finish()
}
}
#[inline]
pub fn current_thread_index() -> Option<usize> {
unsafe {
let curr = WorkerThread::current().as_ref()?;
Some(curr.index())
}
}
#[inline]
pub fn current_thread_has_pending_tasks() -> Option<bool> {
unsafe {
let curr = WorkerThread::current().as_ref()?;
Some(!curr.local_deque_is_empty())
}
}