use std::cell::RefCell;
use std::io;
use std::rc::Rc;
use std::sync::mpsc::TryRecvError;
use mio::{Evented, Poll, PollOpt, Ready, Token};
use mio_more::channel::{self as miochan, Receiver};
pub use mio_more::channel::{SendError, Sender, SyncSender, TrySendError};
use {EventDispatcher, EventSource};
pub enum Event<T> {
Msg(T),
Closed,
}
pub struct Channel<T> {
receiver: Rc<Receiver<T>>,
}
pub fn channel<T>() -> (Sender<T>, Channel<T>) {
let (sender, receiver) = miochan::channel();
(
sender,
Channel {
receiver: Rc::new(receiver),
},
)
}
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
let (sender, receiver) = miochan::sync_channel(bound);
(
sender,
Channel {
receiver: Rc::new(receiver),
},
)
}
impl<T> Evented for Channel<T> {
fn register(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self.receiver.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &Poll,
token: Token,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self.receiver.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.receiver.deregister(poll)
}
}
impl<T: 'static> EventSource for Channel<T> {
type Event = Event<T>;
fn interest(&self) -> Ready {
Ready::readable()
}
fn pollopts(&self) -> PollOpt {
PollOpt::edge()
}
fn make_dispatcher<Data: 'static, F: FnMut(Event<T>, &mut Data) + 'static>(
&self,
callback: F,
) -> Rc<RefCell<EventDispatcher<Data>>> {
Rc::new(RefCell::new(Dispatcher {
_data: ::std::marker::PhantomData,
receiver: self.receiver.clone(),
callback,
}))
}
}
struct Dispatcher<Data, T, F: FnMut(Event<T>, &mut Data)> {
_data: ::std::marker::PhantomData<fn(&mut Data)>,
receiver: Rc<Receiver<T>>,
callback: F,
}
impl<Data, T, F: FnMut(Event<T>, &mut Data)> EventDispatcher<Data> for Dispatcher<Data, T, F> {
fn ready(&mut self, _: Ready, data: &mut Data) {
loop {
match self.receiver.try_recv() {
Ok(val) => (self.callback)(Event::Msg(val), data),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
(self.callback)(Event::Closed, data);
break;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_channel() {
let mut event_loop = ::EventLoop::new().unwrap();
let handle = event_loop.handle();
let (tx, rx) = channel::<()>();
let mut got = (false, false);
let _source = handle
.insert_source(rx, move |evt, got: &mut (bool, bool)| match evt {
Event::Msg(()) => {
got.0 = true;
}
Event::Closed => {
got.1 = true;
}
}).unwrap();
event_loop
.dispatch(Some(::std::time::Duration::from_millis(0)), &mut got)
.unwrap();
assert_eq!(got, (false, false));
tx.send(()).unwrap();
event_loop
.dispatch(Some(::std::time::Duration::from_millis(0)), &mut got)
.unwrap();
assert_eq!(got, (true, false));
::std::mem::drop(tx);
event_loop
.dispatch(Some(::std::time::Duration::from_millis(0)), &mut got)
.unwrap();
assert_eq!(got, (true, true));
}
}