use std::any::Any;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::panic;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicBool, ATOMIC_USIZE_INIT, Ordering};
use std::thread;
use Future;
use executor::{DEFAULT, Executor};
use lock::Lock;
use slot::Slot;
pub struct Task {
id: usize,
ready: usize,
list: Box<Any + Send>,
handle: TaskHandle,
}
pub struct ScopedTask<'a> {
task: &'a mut Task,
reset: bool,
}
#[derive(Clone)]
pub struct TaskHandle {
inner: Arc<Inner>,
}
struct Inner {
id: usize,
slot: Slot<(Task, Box<Future<Item=(), Error=()>>)>,
registered: AtomicBool,
list: Lock<Box<Any + Send + Sync>>,
}
pub struct TaskData<A> {
id: usize,
ptr: *mut A,
}
unsafe impl<A: Send> Send for TaskData<A> {}
unsafe impl<A: Sync> Sync for TaskData<A> {}
pub struct TaskNotifyData<A> {
id: usize,
ptr: *mut A,
}
unsafe impl<A: Send> Send for TaskNotifyData<A> {}
unsafe impl<A: Sync> Sync for TaskNotifyData<A> {}
impl Task {
pub fn new() -> Task {
static NEXT: AtomicUsize = ATOMIC_USIZE_INIT;
let id = NEXT.fetch_add(1, Ordering::SeqCst);
if id >= usize::max_value() - 50_000 {
panic!("overflow in number of tasks created");
}
Task {
id: id,
list: Box::new(()),
ready: 0,
handle: TaskHandle {
inner: Arc::new(Inner {
id: id,
slot: Slot::new(None),
registered: AtomicBool::new(false),
list: Lock::new(Box::new(())),
}),
},
}
}
pub fn insert<A>(&mut self, a: A) -> TaskData<A>
where A: Any + Send + 'static,
{
struct Node<T: ?Sized> {
_next: Box<Any + Send>,
data: T,
}
let prev = mem::replace(&mut self.list, Box::new(()));
let mut next = Box::new(Node { _next: prev, data: a });
let ret = TaskData { id: self.id, ptr: &mut next.data };
self.list = next;
return ret
}
pub fn insert_notify<A>(&mut self, a: A) -> TaskNotifyData<A>
where A: Any + Send + Sync + 'static,
{
struct Node<T: ?Sized> {
_next: Box<Any + Sync + Send>,
data: T,
}
let mut list = self.handle.inner.list.try_lock().unwrap();
let prev = mem::replace(&mut *list, Box::new(()));
let mut next = Box::new(Node { _next: prev, data: a });
let ret = TaskNotifyData { id: self.id, ptr: &mut next.data };
*list = next;
return ret
}
pub fn get<A>(&self, data: &TaskData<A>) -> &A {
assert_eq!(data.id, self.id);
unsafe { &*data.ptr }
}
pub fn get_mut<A>(&mut self, data: &TaskData<A>) -> &mut A {
assert_eq!(data.id, self.id);
unsafe { &mut *data.ptr }
}
pub fn notify(&mut self) {
self.handle().notify()
}
pub fn handle(&self) -> &TaskHandle {
&self.handle
}
pub fn scoped(&mut self) -> ScopedTask {
ScopedTask { task: self, reset: false }
}
pub fn run(self, mut future: Box<Future<Item=(), Error=()>>) {
let mut me = self;
loop {
assert_eq!(me.ready, 0);
let result = catch_unwind(move || {
(future.poll(&mut me), future, me)
});
match result {
Ok((ref r, _, _)) if r.is_ready() => return,
Ok((_, f, t)) => {
future = f;
me = t;
}
Err(e) => panic::resume_unwind(e),
}
future = match future.tailcall() {
Some(f) => f,
None => future,
};
break
}
future.schedule(&mut me);
let inner = me.handle.inner.clone();
inner.slot.try_produce((me, future)).ok().unwrap();
}
}
fn catch_unwind<F, U>(f: F) -> thread::Result<U>
where F: FnOnce() -> U + Send + 'static,
{
panic::catch_unwind(panic::AssertUnwindSafe(f))
}
impl TaskHandle {
pub fn equivalent(&self, other: &TaskHandle) -> bool {
&*self.inner as *const _ == &*other.inner as *const _
}
pub fn get<A>(&self, data: &TaskNotifyData<A>) -> &A {
assert_eq!(data.id, self.inner.id);
unsafe { &*data.ptr }
}
pub fn notify(&self) {
if self.inner.registered.swap(true, Ordering::SeqCst) {
return
}
self.inner.slot.on_full(|slot| {
let (task, future) = slot.try_consume().ok().unwrap();
task.handle.inner.registered.store(false, Ordering::SeqCst);
DEFAULT.execute(|| task.run(future))
});
}
}
impl<'a> ScopedTask<'a> {
pub fn ready(&mut self) -> &mut ScopedTask<'a>{
if !self.reset {
self.reset = true;
self.task.ready += 1;
}
self
}
}
impl<'a> Deref for ScopedTask<'a> {
type Target = Task;
fn deref(&self) -> &Task {
&*self.task
}
}
impl<'a> DerefMut for ScopedTask<'a> {
fn deref_mut(&mut self) -> &mut Task {
&mut *self.task
}
}
impl<'a> Drop for ScopedTask<'a> {
fn drop(&mut self) {
if self.reset {
self.task.ready -= 1;
}
}
}
impl<A> Clone for TaskData<A> {
fn clone(&self) -> TaskData<A> {
TaskData {
id: self.id,
ptr: self.ptr,
}
}
}
impl<A> Copy for TaskData<A> {}
impl<A> Clone for TaskNotifyData<A> {
fn clone(&self) -> TaskNotifyData<A> {
TaskNotifyData {
id: self.id,
ptr: self.ptr,
}
}
}
impl<A> Copy for TaskNotifyData<A> {}