use std::cell::{Cell, RefCell};
use std::fmt;
use std::rc::{Rc, Weak};
use {Future, Poll, Async};
use future::{Executor, IntoFuture, Lazy, lazy};
use task::{self, Task};
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
value: None,
tx_task: None,
rx_task: None,
}));
let tx = Sender {
inner: Rc::downgrade(&inner),
};
let rx = Receiver {
state: State::Open(inner),
};
(tx, rx)
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Weak<RefCell<Inner<T>>>,
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Receiver<T> {
state: State<T>,
}
#[derive(Debug)]
enum State<T> {
Open(Rc<RefCell<Inner<T>>>),
Closed(Option<T>),
}
pub use sync::oneshot::Canceled;
#[derive(Debug)]
struct Inner<T> {
value: Option<T>,
tx_task: Option<Task>,
rx_task: Option<Task>,
}
impl<T> Sender<T> {
pub fn send(self, val: T) -> Result<(), T> {
if let Some(inner) = self.inner.upgrade() {
inner.borrow_mut().value = Some(val);
Ok(())
} else {
Err(val)
}
}
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
match self.inner.upgrade() {
Some(inner) => {
inner.borrow_mut().tx_task = Some(task::current());
Ok(Async::NotReady)
}
None => Ok(().into()),
}
}
pub fn is_canceled(&self) -> bool {
!self.inner.upgrade().is_some()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
let rx_task = {
let mut borrow = inner.borrow_mut();
borrow.tx_task.take();
borrow.rx_task.take()
};
if let Some(task) = rx_task {
task.notify();
}
}
}
impl<T> Receiver<T> {
pub fn close(&mut self) {
let (item, task) = match self.state {
State::Open(ref inner) => {
let mut inner = inner.borrow_mut();
drop(inner.rx_task.take());
(inner.value.take(), inner.tx_task.take())
}
State::Closed(_) => return,
};
self.state = State::Closed(item);
if let Some(task) = task {
task.notify();
}
}
}
impl<T> Future for Receiver<T> {
type Item = T;
type Error = Canceled;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = match self.state {
State::Open(ref mut inner) => inner,
State::Closed(ref mut item) => {
match item.take() {
Some(item) => return Ok(item.into()),
None => return Err(Canceled),
}
}
};
if let Some(val) = inner.borrow_mut().value.take() {
return Ok(Async::Ready(val))
}
if Rc::get_mut(inner).is_some() {
Err(Canceled)
} else {
inner.borrow_mut().rx_task = Some(task::current());
Ok(Async::NotReady)
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.close();
}
}
pub struct SpawnHandle<T, E> {
rx: Receiver<Result<T, E>>,
keep_running: Rc<Cell<bool>>,
}
pub struct Execute<F: Future> {
future: F,
tx: Option<Sender<Result<F::Item, F::Error>>>,
keep_running: Rc<Cell<bool>>,
}
pub fn spawn<F, E>(future: F, executor: &E) -> SpawnHandle<F::Item, F::Error>
where F: Future,
E: Executor<Execute<F>>,
{
let flag = Rc::new(Cell::new(true));
let (tx, rx) = channel();
executor.execute(Execute {
future: future,
tx: Some(tx),
keep_running: flag.clone(),
}).expect("failed to spawn future");
SpawnHandle {
rx: rx,
keep_running: flag,
}
}
pub fn spawn_fn<F, R, E>(f: F, executor: &E) -> SpawnHandle<R::Item, R::Error>
where F: FnOnce() -> R,
R: IntoFuture,
E: Executor<Execute<Lazy<F, R>>>,
{
spawn(lazy(f), executor)
}
impl<T, E> SpawnHandle<T, E> {
pub fn forget(self) {
self.keep_running.set(false);
}
}
impl<T, E> Future for SpawnHandle<T, E> {
type Item = T;
type Error = E;
fn poll(&mut self) -> Poll<T, E> {
match self.rx.poll() {
Ok(Async::Ready(Ok(t))) => Ok(t.into()),
Ok(Async::Ready(Err(e))) => Err(e),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => panic!("future was canceled before completion"),
}
}
}
impl<T: fmt::Debug, E: fmt::Debug> fmt::Debug for SpawnHandle<T, E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("SpawnHandle")
.finish()
}
}
impl<F: Future> Future for Execute<F> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if self.tx.as_mut().unwrap().poll_cancel().unwrap().is_ready() {
if !self.keep_running.get() {
return Ok(().into())
}
}
let result = match self.future.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(t)) => Ok(t),
Err(e) => Err(e),
};
drop(self.tx.take().unwrap().send(result));
Ok(().into())
}
}
impl<F: Future + fmt::Debug> fmt::Debug for Execute<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Execute")
.field("future", &self.future)
.finish()
}
}