use std::cell::RefCell;
use std::fmt::{self, Debug, Formatter};
use std::io;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use mio::{Events, Poll, PollOpt, Ready, Registration, SetReadiness};
use list::SourceList;
use sources::{EventSource, Idle, Source};
pub struct LoopHandle<Data> {
poll: Rc<Poll>,
list: Rc<RefCell<SourceList<Data>>>,
idles: Rc<RefCell<Vec<Rc<RefCell<Option<Box<FnMut(&mut Data)>>>>>>>,
}
impl<Data> Clone for LoopHandle<Data> {
fn clone(&self) -> LoopHandle<Data> {
LoopHandle {
poll: self.poll.clone(),
list: self.list.clone(),
idles: self.idles.clone(),
}
}
}
pub struct InsertError<E> {
pub source: E,
pub error: io::Error,
}
impl<E> Debug for InsertError<E> {
fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
write!(formatter, "{:?}", self.error)
}
}
impl<E> From<InsertError<E>> for io::Error {
fn from(e: InsertError<E>) -> io::Error {
e.error
}
}
impl<Data: 'static> LoopHandle<Data> {
pub fn insert_source<E: EventSource, F: FnMut(E::Event, &mut Data) + 'static>(
&self,
source: E,
callback: F,
) -> Result<Source<E>, InsertError<E>> {
let dispatcher = source.make_dispatcher(callback);
let token = self.list.borrow_mut().add_source(dispatcher);
let interest = source.interest();
let opt = source.pollopts();
if let Err(e) = self.poll.register(&source, token, interest, opt) {
return Err(InsertError { source, error: e });
}
Ok(Source {
source,
poll: self.poll.clone(),
list: self.list.clone(),
token,
})
}
pub fn insert_idle<F: FnOnce(&mut Data) + 'static>(&self, callback: F) -> Idle {
let mut opt_cb = Some(callback);
let callback = Rc::new(RefCell::new(Some(Box::new(move |data: &mut Data| {
if let Some(cb) = opt_cb.take() {
cb(data);
}
}) as Box<FnMut(&mut Data)>)));
self.idles.borrow_mut().push(callback.clone());
Idle { callback }
}
}
pub struct EventLoop<Data> {
handle: LoopHandle<Data>,
events_buffer: Events,
stop_signal: Arc<AtomicBool>,
wakeup: SetReadiness,
}
impl<Data: 'static> EventLoop<Data> {
pub fn new() -> io::Result<EventLoop<Data>> {
let handle = LoopHandle {
poll: Rc::new(Poll::new()?),
list: Rc::new(RefCell::new(SourceList::new())),
idles: Rc::new(RefCell::new(Vec::new())),
};
let (wakeup_registration, wakeup_readiness) = Registration::new2();
let mut wakeup_source = ::sources::generic::Generic::new(wakeup_registration);
wakeup_source.set_interest(Ready::readable());
wakeup_source.set_pollopts(PollOpt::edge());
let readiness2 = wakeup_readiness.clone();
handle.insert_source(wakeup_source, move |_, _| {
readiness2.set_readiness(Ready::empty()).unwrap();
})?;
Ok(EventLoop {
handle,
events_buffer: Events::with_capacity(32),
stop_signal: Arc::new(AtomicBool::new(false)),
wakeup: wakeup_readiness,
})
}
pub fn handle(&self) -> LoopHandle<Data> {
self.handle.clone()
}
fn dispatch_events(&mut self, timeout: Option<Duration>, data: &mut Data) -> io::Result<()> {
self.events_buffer.clear();
self.handle.poll.poll(&mut self.events_buffer, timeout)?;
loop {
if self.events_buffer.is_empty() {
break;
}
for event in &self.events_buffer {
let opt_dispatcher = self.handle.list.borrow().get_dispatcher(event.token());
if let Some(dispatcher) = opt_dispatcher {
dispatcher.borrow_mut().ready(event.readiness(), data);
}
}
self.events_buffer.clear();
self.handle
.poll
.poll(&mut self.events_buffer, Some(Duration::from_millis(0)))?;
}
Ok(())
}
fn dispatch_idles(&mut self, data: &mut Data) {
let idles = ::std::mem::replace(&mut *self.handle.idles.borrow_mut(), Vec::new());
for idle in idles {
if let Some(ref mut callback) = *idle.borrow_mut() {
callback(data);
}
}
}
pub fn dispatch(&mut self, timeout: Option<Duration>, data: &mut Data) -> io::Result<()> {
self.dispatch_events(timeout, data)?;
self.dispatch_idles(data);
Ok(())
}
pub fn get_signal(&self) -> LoopSignal {
LoopSignal {
signal: self.stop_signal.clone(),
wakeup: self.wakeup.clone(),
}
}
pub fn run<F>(
&mut self,
timeout: Option<Duration>,
data: &mut Data,
mut cb: F,
) -> io::Result<()>
where
F: FnMut(&mut Data),
{
self.stop_signal.store(false, Ordering::Release);
while !self.stop_signal.load(Ordering::Acquire) {
self.dispatch(timeout, data)?;
cb(data);
}
Ok(())
}
}
#[derive(Clone)]
pub struct LoopSignal {
signal: Arc<AtomicBool>,
wakeup: SetReadiness,
}
impl LoopSignal {
pub fn stop(&self) {
self.signal.store(true, Ordering::Release);
}
pub fn wakeup(&self) {
self.wakeup.set_readiness(Ready::readable()).unwrap();
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::EventLoop;
#[test]
fn dispatch_idle() {
let mut event_loop = EventLoop::new().unwrap();
let mut dispatched = false;
event_loop.handle().insert_idle(|d| {
*d = true;
});
event_loop
.dispatch(Some(Duration::from_millis(0)), &mut dispatched)
.unwrap();
assert!(dispatched);
}
#[test]
fn cancel_idle() {
let mut event_loop = EventLoop::new().unwrap();
let mut dispatched = false;
let idle = event_loop.handle().insert_idle(move |d| {
*d = true;
});
idle.cancel();
event_loop
.dispatch(Some(Duration::from_millis(0)), &mut dispatched)
.unwrap();
assert!(!dispatched);
}
#[test]
fn wakeup() {
let mut event_loop = EventLoop::new().unwrap();
let signal = event_loop.get_signal();
::std::thread::spawn(move || {
::std::thread::sleep(Duration::from_millis(500));
signal.wakeup();
});
event_loop.dispatch(None, &mut ()).unwrap();
}
#[test]
fn wakeup_stop() {
let mut event_loop = EventLoop::new().unwrap();
let signal = event_loop.get_signal();
::std::thread::spawn(move || {
::std::thread::sleep(Duration::from_millis(500));
signal.stop();
signal.wakeup();
});
event_loop.run(None, &mut (), |_| {}).unwrap();
}
}