use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
use std::fmt;
#[cfg(not(loom))]
use std::sync::Arc;
pub struct Builder {
kind: Kind,
enable_io: bool,
enable_time: bool,
core_threads: Option<usize>,
max_threads: usize,
pub(super) thread_name: String,
pub(super) thread_stack_size: Option<usize>,
pub(super) after_start: Option<Callback>,
pub(super) before_stop: Option<Callback>,
}
#[derive(Debug, Clone, Copy)]
enum Kind {
Shell,
#[cfg(feature = "rt-core")]
Basic,
#[cfg(feature = "rt-threaded")]
ThreadPool,
}
impl Builder {
pub fn new() -> Builder {
Builder {
kind: Kind::Shell,
enable_io: false,
enable_time: false,
core_threads: None,
max_threads: 512,
thread_name: "tokio-runtime-worker".into(),
thread_stack_size: None,
after_start: None,
before_stop: None,
}
}
pub fn enable_all(&mut self) -> &mut Self {
#[cfg(feature = "io-driver")]
self.enable_io();
#[cfg(feature = "time")]
self.enable_time();
self
}
#[deprecated(note = "In future will be replaced by core_threads method")]
pub fn num_threads(&mut self, val: usize) -> &mut Self {
self.core_threads = Some(val);
self
}
pub fn core_threads(&mut self, val: usize) -> &mut Self {
assert_ne!(val, 0, "Core threads cannot be zero");
self.core_threads = Some(val);
self
}
pub fn max_threads(&mut self, val: usize) -> &mut Self {
assert_ne!(val, 0, "Thread limit cannot be zero");
self.max_threads = val;
self
}
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
self.thread_name = val.into();
self
}
pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
self.thread_stack_size = Some(val);
self
}
#[cfg(not(loom))]
pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.after_start = Some(Arc::new(f));
self
}
#[cfg(not(loom))]
pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.before_stop = Some(Arc::new(f));
self
}
pub fn build(&mut self) -> io::Result<Runtime> {
match self.kind {
Kind::Shell => self.build_shell_runtime(),
#[cfg(feature = "rt-core")]
Kind::Basic => self.build_basic_runtime(),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool => self.build_threaded_runtime(),
}
}
fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::Kind;
let clock = time::create_clock();
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let spawner = Spawner::Shell;
let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
kind: Kind::Shell(Shell::new(driver)),
handle: Handle {
spawner,
io_handle,
time_handle,
clock,
blocking_spawner,
},
blocking_pool,
})
}
}
cfg_io_driver! {
impl Builder {
pub fn enable_io(&mut self) -> &mut Self {
self.enable_io = true;
self
}
}
}
cfg_time! {
impl Builder {
pub fn enable_time(&mut self) -> &mut Self {
self.enable_time = true;
self
}
}
}
cfg_rt_core! {
impl Builder {
pub fn basic_scheduler(&mut self) -> &mut Self {
self.kind = Kind::Basic;
self
}
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Kind};
let clock = time::create_clock();
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let scheduler = BasicScheduler::new(driver);
let spawner = Spawner::Basic(scheduler.spawner().clone());
let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
kind: Kind::Basic(scheduler),
handle: Handle {
spawner,
io_handle,
time_handle,
clock,
blocking_spawner,
},
blocking_pool,
})
}
}
}
cfg_rt_threaded! {
impl Builder {
pub fn threaded_scheduler(&mut self) -> &mut Self {
self.kind = Kind::ThreadPool;
self
}
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{Kind, ThreadPool};
use crate::runtime::park::Parker;
let core_threads = self.core_threads.unwrap_or_else(crate::loom::sys::num_cpus);
assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
let clock = time::create_clock();
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
let blocking_pool = blocking::create_blocking_pool(self, self.max_threads);
let blocking_spawner = blocking_pool.spawner().clone();
let handle = Handle {
spawner,
io_handle,
time_handle,
clock,
blocking_spawner,
};
handle.enter(|| launch.launch());
Ok(Runtime {
kind: Kind::ThreadPool(scheduler),
handle,
blocking_pool,
})
}
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("kind", &self.kind)
.field("core_threads", &self.core_threads)
.field("max_threads", &self.max_threads)
.field("thread_name", &self.thread_name)
.field("thread_stack_size", &self.thread_stack_size)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
.field("before_stop", &self.after_start.as_ref().map(|_| "..."))
.finish()
}
}