extern crate crossbeam;
extern crate futures;
use std::any::Any;
use std::panic::{self, AssertUnwindSafe};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use crossbeam::sync::MsQueue;
use futures::{Future, promise, Promise, Task, Poll};
pub struct CpuPool {
inner: Arc<Inner>,
}
struct Inner {
queue: MsQueue<Message>,
cnt: AtomicUsize,
size: u32,
}
pub struct CpuFuture<R: Send + 'static> {
inner: Promise<thread::Result<R>>,
}
trait Thunk: Send + 'static {
fn call_box(self: Box<Self>);
}
impl<F: FnOnce() + Send + 'static> Thunk for F {
fn call_box(self: Box<Self>) {
(*self)()
}
}
enum Message {
Run(Box<Thunk>),
Close,
}
impl CpuPool {
pub fn new(size: u32) -> CpuPool {
let pool = CpuPool {
inner: Arc::new(Inner {
queue: MsQueue::new(),
cnt: AtomicUsize::new(1),
size: size,
}),
};
for _ in 0..size {
let pool = pool.clone();
thread::spawn(|| pool.work());
}
return pool
}
pub fn execute<F, R>(&self, f: F) -> CpuFuture<R>
where F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = promise();
self.inner.queue.push(Message::Run(Box::new(|| {
tx.complete(panic::catch_unwind(AssertUnwindSafe(f)));
})));
CpuFuture { inner: rx }
}
fn work(self) {
let mut done = false;
while !done {
let res = panic::catch_unwind(AssertUnwindSafe(|| {
while !done {
match self.inner.queue.pop() {
Message::Close => done = true,
Message::Run(r) => r.call_box(),
}
}
}));
drop(res);
}
}
}
impl Clone for CpuPool {
fn clone(&self) -> CpuPool {
self.inner.cnt.fetch_add(1, Ordering::Relaxed);
CpuPool { inner: self.inner.clone() }
}
}
impl Drop for CpuPool {
fn drop(&mut self) {
if self.inner.cnt.fetch_sub(1, Ordering::Relaxed) != 0 {
return
}
for _ in 0..self.inner.size {
self.inner.queue.push(Message::Close);
}
}
}
impl<R: Send + 'static> Future for CpuFuture<R> {
type Item = R;
type Error = Box<Any + Send>;
fn poll(&mut self, task: &mut Task) -> Poll<R, Box<Any + Send>> {
match self.inner.poll(task) {
Poll::Ok(res) => res.into(),
Poll::Err(_) => panic!("shouldn't be canceled"),
Poll::NotReady => Poll::NotReady,
}
}
fn schedule(&mut self, task: &mut Task) {
self.inner.schedule(task)
}
}