use futures::channel::mpsc;
use futures::executor::block_on;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::thread;
#[test]
fn smoke() {
let (mut sender, receiver) = mpsc::channel(1);
let t = thread::spawn(move || {
while let Ok(()) = block_on(sender.send(42)) {}
});
block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
t.join().unwrap()
}
#[test]
fn multiple_senders_disconnect() {
{
let (mut tx1, mut rx) = mpsc::channel(1);
let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
tx1.disconnect();
drop(tx2);
block_on(tx3.close()).unwrap();
assert!(tx1.is_closed());
assert!(tx3.is_closed());
assert!(!tx4.is_closed());
block_on(tx4.send(5)).unwrap();
assert_eq!(block_on(rx.next()), Some(5));
drop(tx4);
assert_eq!(block_on(rx.next()), None);
}
{
let (mut tx1, mut rx) = mpsc::unbounded();
let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
tx1.disconnect();
drop(tx2);
block_on(tx3.close()).unwrap();
assert!(tx1.is_closed());
assert!(tx3.is_closed());
assert!(!tx4.is_closed());
block_on(tx4.send(5)).unwrap();
assert_eq!(block_on(rx.next()), Some(5));
drop(tx4);
assert_eq!(block_on(rx.next()), None);
}
}
#[test]
fn multiple_senders_close_channel() {
{
let (mut tx1, mut rx) = mpsc::channel(1);
let mut tx2 = tx1.clone();
tx1.close_channel();
assert!(tx1.is_closed());
assert!(tx2.is_closed());
let err = block_on(tx2.send(5)).unwrap_err();
assert!(err.is_disconnected());
assert_eq!(block_on(rx.next()), None);
}
{
let (tx1, mut rx) = mpsc::unbounded();
let mut tx2 = tx1.clone();
tx1.close_channel();
assert!(tx1.is_closed());
assert!(tx2.is_closed());
let err = block_on(tx2.send(5)).unwrap_err();
assert!(err.is_disconnected());
assert_eq!(block_on(rx.next()), None);
}
}