use std::cell::RefCell;
use std::cmp;
use std::fmt;
use std::io::{self, ErrorKind};
use std::mem;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use futures::{Future, IntoFuture, Async};
use futures::future;
use futures::executor::{self, Spawn, Unpark};
use futures::sync::mpsc;
use futures::task::Task;
use mio;
use mio::event::Evented;
use slab::Slab;
use heap::{Heap, Slot};
mod io_token;
mod timeout_token;
mod poll_evented;
mod timeout;
mod interval;
pub use self::poll_evented::PollEvented;
pub use self::timeout::Timeout;
pub use self::interval::Interval;
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
scoped_thread_local!(static CURRENT_LOOP: Core);
pub struct Core {
events: mio::Events,
tx: mpsc::UnboundedSender<Message>,
rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
_rx_registration: mio::Registration,
rx_readiness: Arc<MySetReadiness>,
inner: Rc<RefCell<Inner>>,
_future_registration: mio::Registration,
future_readiness: Arc<MySetReadiness>,
}
struct Inner {
id: usize,
io: mio::Poll,
io_dispatch: Slab<ScheduledIo>,
task_dispatch: Slab<ScheduledTask>,
timer_heap: Heap<(Instant, usize)>,
timeouts: Slab<(Option<Slot>, TimeoutState)>,
}
#[derive(Clone,Copy,Eq,PartialEq,Hash,Debug)]
pub struct CoreId(usize);
#[derive(Clone)]
pub struct Remote {
id: usize,
tx: mpsc::UnboundedSender<Message>,
}
#[derive(Clone)]
pub struct Handle {
remote: Remote,
inner: Weak<RefCell<Inner>>,
}
struct ScheduledIo {
readiness: Arc<AtomicUsize>,
reader: Option<Task>,
writer: Option<Task>,
}
struct ScheduledTask {
_registration: mio::Registration,
spawn: Option<Spawn<Box<Future<Item=(), Error=()>>>>,
wake: Arc<MySetReadiness>,
}
enum TimeoutState {
NotFired,
Fired,
Waiting(Task),
}
enum Direction {
Read,
Write,
}
enum Message {
DropSource(usize),
Schedule(usize, Task, Direction),
UpdateTimeout(usize, Task),
ResetTimeout(usize, Instant),
CancelTimeout(usize),
Run(Box<FnBox>),
}
#[repr(usize)]
#[derive(Clone, Copy, Debug, PartialEq)]
enum Readiness {
Readable = 1,
Writable = 2,
}
const TOKEN_MESSAGES: mio::Token = mio::Token(0);
const TOKEN_FUTURE: mio::Token = mio::Token(1);
const TOKEN_START: usize = 2;
impl Core {
pub fn new() -> io::Result<Core> {
let io = try!(mio::Poll::new());
let future_pair = mio::Registration::new2();
try!(io.register(&future_pair.0,
TOKEN_FUTURE,
mio::Ready::readable(),
mio::PollOpt::level()));
let (tx, rx) = mpsc::unbounded();
let channel_pair = mio::Registration::new2();
try!(io.register(&channel_pair.0,
TOKEN_MESSAGES,
mio::Ready::readable(),
mio::PollOpt::level()));
let rx_readiness = Arc::new(MySetReadiness(channel_pair.1));
rx_readiness.unpark();
Ok(Core {
events: mio::Events::with_capacity(1024),
tx: tx,
rx: RefCell::new(executor::spawn(rx)),
_rx_registration: channel_pair.0,
rx_readiness: rx_readiness,
_future_registration: future_pair.0,
future_readiness: Arc::new(MySetReadiness(future_pair.1)),
inner: Rc::new(RefCell::new(Inner {
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
io: io,
io_dispatch: Slab::with_capacity(1),
task_dispatch: Slab::with_capacity(1),
timeouts: Slab::with_capacity(1),
timer_heap: Heap::new(),
})),
})
}
pub fn handle(&self) -> Handle {
Handle {
remote: self.remote(),
inner: Rc::downgrade(&self.inner),
}
}
pub fn remote(&self) -> Remote {
Remote {
id: self.inner.borrow().id,
tx: self.tx.clone(),
}
}
pub fn run<F>(&mut self, f: F) -> Result<F::Item, F::Error>
where F: Future,
{
let mut task = executor::spawn(f);
let ready = self.future_readiness.clone();
let mut future_fired = true;
loop {
if future_fired {
let res = try!(CURRENT_LOOP.set(self, || {
task.poll_future(ready.clone())
}));
if let Async::Ready(e) = res {
return Ok(e)
}
}
future_fired = self.poll(None);
}
}
pub fn turn(&mut self, max_wait: Option<Duration>) {
self.poll(max_wait);
}
fn poll(&mut self, max_wait: Option<Duration>) -> bool {
let start = Instant::now();
let timeout = self.inner.borrow_mut().timer_heap.peek().map(|t| {
if t.0 < start {
Duration::new(0, 0)
} else {
t.0 - start
}
});
let timeout = match (max_wait, timeout) {
(Some(d1), Some(d2)) => Some(cmp::min(d1, d2)),
(max_wait, timeout) => max_wait.or(timeout),
};
let amt = match self.inner.borrow_mut().io.poll(&mut self.events, timeout) {
Ok(a) => a,
Err(ref e) if e.kind() == ErrorKind::Interrupted => return false,
Err(e) => panic!("error in poll: {}", e),
};
let after_poll = Instant::now();
debug!("loop poll - {:?}", after_poll - start);
debug!("loop time - {:?}", after_poll);
self.consume_timeouts(after_poll);
let mut fired = false;
for i in 0..self.events.len() {
let event = self.events.get(i).unwrap();
let token = event.token();
trace!("event {:?} {:?}", event.readiness(), event.token());
if token == TOKEN_MESSAGES {
self.rx_readiness.0.set_readiness(mio::Ready::empty()).unwrap();
CURRENT_LOOP.set(&self, || self.consume_queue());
} else if token == TOKEN_FUTURE {
self.future_readiness.0.set_readiness(mio::Ready::empty()).unwrap();
fired = true;
} else {
self.dispatch(token, event.readiness());
}
}
debug!("loop process - {} events, {:?}", amt, after_poll.elapsed());
return fired
}
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let token = usize::from(token) - TOKEN_START;
if token % 2 == 0 {
self.dispatch_io(token / 2, ready)
} else {
self.dispatch_task(token / 2)
}
}
fn dispatch_io(&mut self, token: usize, ready: mio::Ready) {
let mut reader = None;
let mut writer = None;
let mut inner = self.inner.borrow_mut();
if let Some(io) = inner.io_dispatch.get_mut(token) {
if ready.is_readable() || platform::is_hup(&ready) {
reader = io.reader.take();
io.readiness.fetch_or(Readiness::Readable as usize,
Ordering::Relaxed);
}
if ready.is_writable() {
writer = io.writer.take();
io.readiness.fetch_or(Readiness::Writable as usize,
Ordering::Relaxed);
}
}
drop(inner);
if let Some(reader) = reader {
self.notify_handle(reader);
}
if let Some(writer) = writer {
self.notify_handle(writer);
}
}
fn dispatch_task(&mut self, token: usize) {
let mut inner = self.inner.borrow_mut();
let (task, wake) = match inner.task_dispatch.get_mut(token) {
Some(slot) => (slot.spawn.take(), slot.wake.clone()),
None => return,
};
wake.0.set_readiness(mio::Ready::empty()).unwrap();
let mut task = match task {
Some(task) => task,
None => return,
};
drop(inner);
let res = CURRENT_LOOP.set(self, || task.poll_future(wake));
let _task_to_drop;
inner = self.inner.borrow_mut();
match res {
Ok(Async::NotReady) => {
assert!(inner.task_dispatch[token].spawn.is_none());
inner.task_dispatch[token].spawn = Some(task);
}
Ok(Async::Ready(())) |
Err(()) => {
_task_to_drop = inner.task_dispatch.remove(token).unwrap();
}
}
drop(inner);
}
fn consume_timeouts(&mut self, now: Instant) {
loop {
let mut inner = self.inner.borrow_mut();
match inner.timer_heap.peek() {
Some(head) if head.0 <= now => {}
Some(_) => break,
None => break,
};
let (_, slab_idx) = inner.timer_heap.pop().unwrap();
trace!("firing timeout: {}", slab_idx);
inner.timeouts[slab_idx].0.take().unwrap();
let handle = inner.timeouts[slab_idx].1.fire();
drop(inner);
if let Some(handle) = handle {
self.notify_handle(handle);
}
}
}
fn notify_handle(&self, handle: Task) {
debug!("notifying a task handle");
CURRENT_LOOP.set(&self, || handle.unpark());
}
fn consume_queue(&self) {
debug!("consuming notification queue");
let unpark = self.rx_readiness.clone();
loop {
let msg = self.rx.borrow_mut().poll_stream(unpark.clone()).unwrap();
match msg {
Async::Ready(Some(msg)) => self.notify(msg),
Async::NotReady |
Async::Ready(None) => break,
}
}
}
fn notify(&self, msg: Message) {
match msg {
Message::DropSource(tok) => self.inner.borrow_mut().drop_source(tok),
Message::Schedule(tok, wake, dir) => {
let task = self.inner.borrow_mut().schedule(tok, wake, dir);
if let Some(task) = task {
self.notify_handle(task);
}
}
Message::UpdateTimeout(t, handle) => {
let task = self.inner.borrow_mut().update_timeout(t, handle);
if let Some(task) = task {
self.notify_handle(task);
}
}
Message::ResetTimeout(t, at) => {
self.inner.borrow_mut().reset_timeout(t, at);
}
Message::CancelTimeout(t) => {
self.inner.borrow_mut().cancel_timeout(t)
}
Message::Run(r) => r.call_box(self),
}
}
pub fn id(&self) -> CoreId {
CoreId(self.inner.borrow().id)
}
}
impl fmt::Debug for Core {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Core")
.field("id", &self.id())
.finish()
}
}
impl Inner {
fn add_source(&mut self, source: &Evented)
-> io::Result<(Arc<AtomicUsize>, usize)> {
debug!("adding a new I/O source");
let sched = ScheduledIo {
readiness: Arc::new(AtomicUsize::new(0)),
reader: None,
writer: None,
};
if self.io_dispatch.vacant_entry().is_none() {
let amt = self.io_dispatch.len();
self.io_dispatch.reserve_exact(amt);
}
let entry = self.io_dispatch.vacant_entry().unwrap();
try!(self.io.register(source,
mio::Token(TOKEN_START + entry.index() * 2),
mio::Ready::readable() |
mio::Ready::writable() |
platform::hup(),
mio::PollOpt::edge()));
Ok((sched.readiness.clone(), entry.insert(sched).index()))
}
fn deregister_source(&mut self, source: &Evented) -> io::Result<()> {
self.io.deregister(source)
}
fn drop_source(&mut self, token: usize) {
debug!("dropping I/O source: {}", token);
self.io_dispatch.remove(token).unwrap();
}
fn schedule(&mut self, token: usize, wake: Task, dir: Direction)
-> Option<Task> {
debug!("scheduling direction for: {}", token);
let sched = self.io_dispatch.get_mut(token).unwrap();
let (slot, bit) = match dir {
Direction::Read => (&mut sched.reader, Readiness::Readable as usize),
Direction::Write => (&mut sched.writer, Readiness::Writable as usize),
};
if sched.readiness.load(Ordering::SeqCst) & bit != 0 {
*slot = None;
Some(wake)
} else {
*slot = Some(wake);
None
}
}
fn add_timeout(&mut self, at: Instant) -> usize {
if self.timeouts.vacant_entry().is_none() {
let len = self.timeouts.len();
self.timeouts.reserve_exact(len);
}
let entry = self.timeouts.vacant_entry().unwrap();
let slot = self.timer_heap.push((at, entry.index()));
let entry = entry.insert((Some(slot), TimeoutState::NotFired));
debug!("added a timeout: {}", entry.index());
return entry.index();
}
fn update_timeout(&mut self, token: usize, handle: Task) -> Option<Task> {
debug!("updating a timeout: {}", token);
self.timeouts[token].1.block(handle)
}
fn reset_timeout(&mut self, token: usize, at: Instant) {
let pair = &mut self.timeouts[token];
if let Some(slot) = pair.0.take() {
self.timer_heap.remove(slot);
}
let slot = self.timer_heap.push((at, token));
*pair = (Some(slot), TimeoutState::NotFired);
debug!("set a timeout: {}", token);
}
fn cancel_timeout(&mut self, token: usize) {
debug!("cancel a timeout: {}", token);
let pair = self.timeouts.remove(token);
if let Some((Some(slot), _state)) = pair {
self.timer_heap.remove(slot);
}
}
fn spawn(&mut self, future: Box<Future<Item=(), Error=()>>) {
if self.task_dispatch.vacant_entry().is_none() {
let len = self.task_dispatch.len();
self.task_dispatch.reserve_exact(len);
}
let entry = self.task_dispatch.vacant_entry().unwrap();
let token = TOKEN_START + 2 * entry.index() + 1;
let pair = mio::Registration::new2();
self.io.register(&pair.0,
mio::Token(token),
mio::Ready::readable(),
mio::PollOpt::level())
.expect("cannot fail future registration with mio");
let unpark = Arc::new(MySetReadiness(pair.1));
let entry = entry.insert(ScheduledTask {
spawn: Some(executor::spawn(future)),
wake: unpark,
_registration: pair.0,
});
entry.get().wake.clone().unpark();
}
}
impl Remote {
fn send(&self, msg: Message) {
self.with_loop(|lp| {
match lp {
Some(lp) => {
lp.consume_queue();
lp.notify(msg);
}
None => {
match mpsc::UnboundedSender::send(&self.tx, msg) {
Ok(()) => {}
Err(e) => drop(e),
}
}
}
})
}
fn with_loop<F, R>(&self, f: F) -> R
where F: FnOnce(Option<&Core>) -> R
{
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
let same = lp.inner.borrow().id == self.id;
if same {
f(Some(lp))
} else {
f(None)
}
})
} else {
f(None)
}
}
pub fn spawn<F, R>(&self, f: F)
where F: FnOnce(&Handle) -> R + Send + 'static,
R: IntoFuture<Item=(), Error=()>,
R::Future: 'static,
{
self.send(Message::Run(Box::new(|lp: &Core| {
let f = f(&lp.handle());
lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
})));
}
pub fn id(&self) -> CoreId {
CoreId(self.id)
}
pub fn handle(&self) -> Option<Handle> {
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
let same = lp.inner.borrow().id == self.id;
if same {
Some(lp.handle())
} else {
None
}
})
} else {
None
}
}
}
impl fmt::Debug for Remote {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Remote")
.field("id", &self.id())
.finish()
}
}
impl Handle {
pub fn remote(&self) -> &Remote {
&self.remote
}
pub fn spawn<F>(&self, f: F)
where F: Future<Item=(), Error=()> + 'static,
{
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
inner.borrow_mut().spawn(Box::new(f));
}
pub fn spawn_fn<F, R>(&self, f: F)
where F: FnOnce() -> R + 'static,
R: IntoFuture<Item=(), Error=()> + 'static,
{
self.spawn(future::lazy(f))
}
pub fn id(&self) -> CoreId {
self.remote.id()
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Handle")
.field("id", &self.id())
.finish()
}
}
impl TimeoutState {
fn block(&mut self, handle: Task) -> Option<Task> {
match *self {
TimeoutState::Fired => return Some(handle),
_ => {}
}
*self = TimeoutState::Waiting(handle);
None
}
fn fire(&mut self) -> Option<Task> {
match mem::replace(self, TimeoutState::Fired) {
TimeoutState::NotFired => None,
TimeoutState::Fired => panic!("fired twice?"),
TimeoutState::Waiting(handle) => Some(handle),
}
}
}
struct MySetReadiness(mio::SetReadiness);
impl Unpark for MySetReadiness {
fn unpark(&self) {
self.0.set_readiness(mio::Ready::readable())
.expect("failed to set readiness");
}
}
trait FnBox: Send + 'static {
fn call_box(self: Box<Self>, lp: &Core);
}
impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
fn call_box(self: Box<Self>, lp: &Core) {
(*self)(lp)
}
}
#[cfg(unix)]
mod platform {
use mio::Ready;
use mio::unix::UnixReady;
pub fn is_hup(event: &Ready) -> bool {
UnixReady::from(*event).is_hup()
}
pub fn hup() -> Ready {
UnixReady::hup().into()
}
}
#[cfg(windows)]
mod platform {
use mio::Ready;
pub fn is_hup(_event: &Ready) -> bool {
false
}
pub fn hup() -> Ready {
Ready::empty()
}
}