use std::mem;
use std::vec::Vec;
use std::sync::{Arc, Mutex};
use std::ops::Deref;
use {Future, Poll, Async};
use task::{self, Task};
#[must_use = "futures do nothing unless polled"]
pub struct Shared<F: Future> {
inner: Arc<Mutex<State<F>>>,
}
enum State<F: Future> {
Waiting(F, Vec<Task>),
Done(Result<Arc<F::Item>, Arc<F::Error>>),
}
impl<F> Shared<F>
where F: Future
{
pub fn new(future: F) -> Self {
Shared {
inner: Arc::new(Mutex::new(State::Waiting(future, Vec::new()))),
}
}
}
impl<F> Future for Shared<F>
where F: Future
{
type Item = SharedItem<F::Item>;
type Error = SharedError<F::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut inner = self.inner.lock().unwrap();
let result = match *inner {
State::Waiting(ref mut future, _) => Some(future.poll()),
State::Done(_) => None,
};
let new_state = match result {
Some(Ok(Async::NotReady)) => None,
Some(Ok(Async::Ready(e))) => Some(State::Done(Ok(Arc::new(e)))),
Some(Err(e)) => Some(State::Done(Err(Arc::new(e)))),
None => None,
};
let tasks_to_wake = match new_state {
Some(new) => {
match mem::replace(&mut *inner, new) {
State::Waiting(_, tasks) => tasks,
State::Done(_) => panic!(),
}
}
None => Vec::new(),
};
let ret = match *inner {
State::Waiting(_, ref mut tasks) => {
tasks.push(task::park());
Ok(Async::NotReady)
}
State::Done(Ok(ref e)) => Ok(SharedItem { item: e.clone() }.into()),
State::Done(Err(ref e)) => Err(SharedError { error: e.clone() }.into()),
};
drop(inner);
for task in tasks_to_wake {
task.unpark();
}
return ret
}
}
impl<F> Clone for Shared<F>
where F: Future
{
fn clone(&self) -> Self {
Shared { inner: self.inner.clone() }
}
}
impl<F: Future> Drop for Shared<F> {
fn drop(&mut self) {
let mut inner = match self.inner.try_lock() {
Ok(inner) => inner,
Err(_) => return,
};
let waiters = match *inner {
State::Waiting(_, ref mut waiters) => mem::replace(waiters, Vec::new()),
State::Done(_) => return,
};
drop(inner);
for waiter in waiters {
waiter.unpark();
}
}
}
#[derive(Debug)]
pub struct SharedItem<T> {
item: Arc<T>,
}
impl<T> Deref for SharedItem<T> {
type Target = T;
fn deref(&self) -> &T {
&self.item.as_ref()
}
}
#[derive(Debug)]
pub struct SharedError<E> {
error: Arc<E>,
}
impl<E> Deref for SharedError<E> {
type Target = E;
fn deref(&self) -> &E {
&self.error.as_ref()
}
}