[go: up one dir, main page]

calloop 0.3.1

A callback-based event loop
Documentation
//! An MPSC channel whose receiving end is an event source
//!
//! Create a channel using `Channel::<T>::new()`, which returns a
//! `Sender<T>` that can be cloned and sent accross threads if `T: Send`,
//! and a `Channel<T>` that can be inserted into an `EventLoop`. It will generate
//! one event per message.
//!
//! This implementation is based on
//! [`mio_more::channel`](https://docs.rs/mio-more/*/mio_more/channel/index.html).

use std::cell::RefCell;
use std::io;
use std::rc::Rc;
use std::sync::mpsc::TryRecvError;

use mio::{Evented, Poll, PollOpt, Ready, Token};

use mio_more::channel::{self as miochan, Receiver};
pub use mio_more::channel::{SendError, Sender, SyncSender, TrySendError};

use {EventDispatcher, EventSource};

/// The events generated by the channel event source
pub enum Event<T> {
    /// A message was received and is bundled here
    Msg(T),
    /// The channel was closed
    ///
    /// This means all the `Sender`s associated with this channel
    /// have been dropped, no more messages will ever be received.
    Closed,
}

/// The receiving end of the channel
///
/// This is the event source to be inserted into your `EventLoop`.
pub struct Channel<T> {
    receiver: Rc<Receiver<T>>,
}

/// Create a new asynchronous channel
pub fn channel<T>() -> (Sender<T>, Channel<T>) {
    let (sender, receiver) = miochan::channel();
    (
        sender,
        Channel {
            receiver: Rc::new(receiver),
        },
    )
}

/// Create a new synchronous, bounded channel
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
    let (sender, receiver) = miochan::sync_channel(bound);
    (
        sender,
        Channel {
            receiver: Rc::new(receiver),
        },
    )
}

impl<T> Evented for Channel<T> {
    fn register(
        &self,
        poll: &Poll,
        token: Token,
        interest: Ready,
        opts: PollOpt,
    ) -> io::Result<()> {
        self.receiver.register(poll, token, interest, opts)
    }

    fn reregister(
        &self,
        poll: &Poll,
        token: Token,
        interest: Ready,
        opts: PollOpt,
    ) -> io::Result<()> {
        self.receiver.reregister(poll, token, interest, opts)
    }

    fn deregister(&self, poll: &Poll) -> io::Result<()> {
        self.receiver.deregister(poll)
    }
}

impl<T: 'static> EventSource for Channel<T> {
    type Event = Event<T>;

    fn interest(&self) -> Ready {
        Ready::readable()
    }

    fn pollopts(&self) -> PollOpt {
        PollOpt::edge()
    }

    fn make_dispatcher<Data: 'static, F: FnMut(Event<T>, &mut Data) + 'static>(
        &self,
        callback: F,
    ) -> Rc<RefCell<EventDispatcher<Data>>> {
        Rc::new(RefCell::new(Dispatcher {
            _data: ::std::marker::PhantomData,
            receiver: self.receiver.clone(),
            callback,
        }))
    }
}

struct Dispatcher<Data, T, F: FnMut(Event<T>, &mut Data)> {
    _data: ::std::marker::PhantomData<fn(&mut Data)>,
    receiver: Rc<Receiver<T>>,
    callback: F,
}

impl<Data, T, F: FnMut(Event<T>, &mut Data)> EventDispatcher<Data> for Dispatcher<Data, T, F> {
    fn ready(&mut self, _: Ready, data: &mut Data) {
        loop {
            match self.receiver.try_recv() {
                Ok(val) => (self.callback)(Event::Msg(val), data),
                Err(TryRecvError::Empty) => break,
                Err(TryRecvError::Disconnected) => {
                    (self.callback)(Event::Closed, data);
                    break;
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn basic_channel() {
        let mut event_loop = ::EventLoop::new().unwrap();

        let handle = event_loop.handle();

        let (tx, rx) = channel::<()>();

        // (got_msg, got_closed)
        let mut got = (false, false);

        let _source = handle
            .insert_source(rx, move |evt, got: &mut (bool, bool)| match evt {
                Event::Msg(()) => {
                    got.0 = true;
                }
                Event::Closed => {
                    got.1 = true;
                }
            }).unwrap();

        // nothing is sent, nothing is received
        event_loop
            .dispatch(Some(::std::time::Duration::from_millis(0)), &mut got)
            .unwrap();

        assert_eq!(got, (false, false));

        // a message is send
        tx.send(()).unwrap();
        event_loop
            .dispatch(Some(::std::time::Duration::from_millis(0)), &mut got)
            .unwrap();

        assert_eq!(got, (true, false));

        // the sender is dropped
        ::std::mem::drop(tx);
        event_loop
            .dispatch(Some(::std::time::Duration::from_millis(0)), &mut got)
            .unwrap();

        assert_eq!(got, (true, true));
    }
}