[go: up one dir, main page]

futures-channel 0.3.3

Channels for asynchronous communication using futures-rs.
Documentation
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::thread;

#[test]
fn smoke() {
    let (mut sender, receiver) = mpsc::channel(1);

    let t = thread::spawn(move || {
        while let Ok(()) = block_on(sender.send(42)) {}
    });

    // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
    block_on(receiver.take(3).for_each(|_| futures::future::ready(())));

    t.join().unwrap()
}

#[test]
fn multiple_senders_disconnect() {
    {
        let (mut tx1, mut rx) = mpsc::channel(1);
        let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());

        // disconnect, dropping and Sink::poll_close should all close this sender but leave the
        // channel open for other senders
        tx1.disconnect();
        drop(tx2);
        block_on(tx3.close()).unwrap();

        assert!(tx1.is_closed());
        assert!(tx3.is_closed());
        assert!(!tx4.is_closed());

        block_on(tx4.send(5)).unwrap();
        assert_eq!(block_on(rx.next()), Some(5));

        // dropping the final sender will close the channel
        drop(tx4);
        assert_eq!(block_on(rx.next()), None);
    }

    {
        let (mut tx1, mut rx) = mpsc::unbounded();
        let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());

        // disconnect, dropping and Sink::poll_close should all close this sender but leave the
        // channel open for other senders
        tx1.disconnect();
        drop(tx2);
        block_on(tx3.close()).unwrap();

        assert!(tx1.is_closed());
        assert!(tx3.is_closed());
        assert!(!tx4.is_closed());

        block_on(tx4.send(5)).unwrap();
        assert_eq!(block_on(rx.next()), Some(5));

        // dropping the final sender will close the channel
        drop(tx4);
        assert_eq!(block_on(rx.next()), None);
    }
}

#[test]
fn multiple_senders_close_channel() {
    {
        let (mut tx1, mut rx) = mpsc::channel(1);
        let mut tx2 = tx1.clone();

        // close_channel should shut down the whole channel
        tx1.close_channel();

        assert!(tx1.is_closed());
        assert!(tx2.is_closed());

        let err = block_on(tx2.send(5)).unwrap_err();
        assert!(err.is_disconnected());

        assert_eq!(block_on(rx.next()), None);
    }

    {
        let (tx1, mut rx) = mpsc::unbounded();
        let mut tx2 = tx1.clone();

        // close_channel should shut down the whole channel
        tx1.close_channel();

        assert!(tx1.is_closed());
        assert!(tx2.is_closed());

        let err = block_on(tx2.send(5)).unwrap_err();
        assert!(err.is_disconnected());

        assert_eq!(block_on(rx.next()), None);
    }
}