use runtime::{Inner, Runtime};
use reactor::Reactor;
use std::io;
use std::sync::Mutex;
use std::time::Duration;
use num_cpus;
use tokio_reactor;
use tokio_threadpool::Builder as ThreadPoolBuilder;
use tokio_timer::clock::{self, Clock};
use tokio_timer::timer::{self, Timer};
#[derive(Debug)]
pub struct Builder {
threadpool_builder: ThreadPoolBuilder,
core_threads: usize,
clock: Clock,
}
impl Builder {
pub fn new() -> Builder {
let core_threads = num_cpus::get().max(1);
let mut threadpool_builder = ThreadPoolBuilder::new();
threadpool_builder.name_prefix("tokio-runtime-worker-");
threadpool_builder.pool_size(core_threads);
Builder {
threadpool_builder,
core_threads,
clock: Clock::new(),
}
}
pub fn clock(&mut self, clock: Clock) -> &mut Self {
self.clock = clock;
self
}
#[deprecated(
since="0.1.9",
note="use the `core_threads`, `blocking_threads`, `name_prefix`, \
`keep_alive`, and `stack_size` functions on `runtime::Builder`, \
instead")]
#[doc(hidden)]
pub fn threadpool_builder(&mut self, val: ThreadPoolBuilder) -> &mut Self {
self.threadpool_builder = val;
self
}
pub fn core_threads(&mut self, val: usize) -> &mut Self {
self.core_threads = val;
self.threadpool_builder.pool_size(val);
self
}
pub fn blocking_threads(&mut self, val: usize) -> &mut Self {
self.threadpool_builder.max_blocking(val);
self
}
pub fn keep_alive(&mut self, val: Option<Duration>) -> &mut Self {
self.threadpool_builder.keep_alive(val);
self
}
pub fn name_prefix<S: Into<String>>(&mut self, val: S) -> &mut Self {
self.threadpool_builder.name_prefix(val);
self
}
pub fn stack_size(&mut self, val: usize) -> &mut Self {
self.threadpool_builder.stack_size(val);
self
}
pub fn build(&mut self) -> io::Result<Runtime> {
self.threadpool_builder.pool_size(self.core_threads);
let mut reactor_handles = Vec::new();
let mut timer_handles = Vec::new();
let mut timers = Vec::new();
for _ in 0..self.core_threads {
let reactor = Reactor::new()?;
reactor_handles.push(reactor.handle());
let timer = Timer::new_with_now(reactor, self.clock.clone());
timer_handles.push(timer.handle());
timers.push(Mutex::new(Some(timer)));
}
let clock = self.clock.clone();
let reactor = reactor_handles[0].clone();
let pool = self.threadpool_builder
.around_worker(move |w, enter| {
let index = w.id().to_usize();
tokio_reactor::with_default(&reactor_handles[index], enter, |enter| {
clock::with_default(&clock, enter, |enter| {
timer::with_default(&timer_handles[index], enter, |_| {
w.run();
});
})
});
})
.custom_park(move |worker_id| {
let index = worker_id.to_usize();
timers[index]
.lock()
.unwrap()
.take()
.unwrap()
})
.build();
Ok(Runtime {
inner: Some(Inner {
reactor,
pool,
}),
})
}
}