use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::shutdown;
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
use std::collections::VecDeque;
use std::fmt;
use std::time::Duration;
pub(crate) struct BlockingPool {
spawner: Spawner,
shutdown_rx: shutdown::Receiver,
}
#[derive(Clone)]
pub(crate) struct Spawner {
inner: Arc<Inner>,
}
struct Inner {
shared: Mutex<Shared>,
condvar: Condvar,
thread_name: String,
stack_size: Option<usize>,
after_start: Option<Callback>,
before_stop: Option<Callback>,
thread_cap: usize,
}
struct Shared {
queue: VecDeque<Task>,
num_th: usize,
num_idle: u32,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
}
type Task = task::Notified<NoopSchedule>;
const KEEP_ALIVE: Duration = Duration::from_secs(10);
pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
let rt = Handle::current();
let (task, handle) = task::joinable(BlockingTask::new(func));
let _ = rt.blocking_spawner.spawn(task, &rt);
handle
}
#[allow(dead_code)]
pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
where
F: FnOnce() -> R + Send + 'static,
{
let rt = Handle::current();
let (task, _handle) = task::joinable(BlockingTask::new(func));
rt.blocking_spawner.spawn(task, &rt)
}
impl BlockingPool {
pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
let (shutdown_tx, shutdown_rx) = shutdown::channel();
BlockingPool {
spawner: Spawner {
inner: Arc::new(Inner {
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_th: 0,
num_idle: 0,
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
stack_size: builder.thread_stack_size,
after_start: builder.after_start.clone(),
before_stop: builder.before_stop.clone(),
thread_cap,
}),
},
shutdown_rx,
}
}
pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
}
pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
let mut shared = self.spawner.inner.shared.lock().unwrap();
if shared.shutdown {
return;
}
shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
drop(shared);
self.shutdown_rx.wait(timeout);
}
}
impl Drop for BlockingPool {
fn drop(&mut self) {
self.shutdown(None);
}
}
impl fmt::Debug for BlockingPool {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BlockingPool").finish()
}
}
impl Spawner {
fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
let shutdown_tx = {
let mut shared = self.inner.shared.lock().unwrap();
if shared.shutdown {
task.shutdown();
return Err(());
}
shared.queue.push_back(task);
if shared.num_idle == 0 {
if shared.num_th == self.inner.thread_cap {
None
} else {
shared.num_th += 1;
assert!(shared.shutdown_tx.is_some());
shared.shutdown_tx.clone()
}
} else {
shared.num_idle -= 1;
shared.num_notify += 1;
self.inner.condvar.notify_one();
None
}
};
if let Some(shutdown_tx) = shutdown_tx {
self.spawn_thread(shutdown_tx, rt);
}
Ok(())
}
fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
if let Some(stack_size) = self.inner.stack_size {
builder = builder.stack_size(stack_size);
}
let rt = rt.clone();
builder
.spawn(move || {
let rt = &rt;
rt.enter(move || {
rt.blocking_spawner.inner.run();
drop(shutdown_tx);
})
})
.unwrap();
}
}
impl Inner {
fn run(&self) {
if let Some(f) = &self.after_start {
f()
}
let mut shared = self.shared.lock().unwrap();
'main: loop {
while let Some(task) = shared.queue.pop_front() {
drop(shared);
task.run();
shared = self.shared.lock().unwrap();
}
shared.num_idle += 1;
while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
shared = lock_result.0;
let timeout_result = lock_result.1;
if shared.num_notify != 0 {
shared.num_notify -= 1;
break;
}
if !shared.shutdown && timeout_result.timed_out() {
break 'main;
}
}
if shared.shutdown {
while let Some(task) = shared.queue.pop_front() {
drop(shared);
task.shutdown();
shared = self.shared.lock().unwrap();
}
shared.num_idle += 1;
break;
}
}
shared.num_th -= 1;
shared.num_idle = shared
.num_idle
.checked_sub(1)
.expect("num_idle underflowed on thread exit");
if shared.shutdown && shared.num_th == 0 {
self.condvar.notify_one();
}
drop(shared);
if let Some(f) = &self.before_stop {
f()
}
}
}
impl fmt::Debug for Spawner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("blocking::Spawner").finish()
}
}