use futures::{lazy, Future};
use scope;
use std::any::Any;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;
use {Configuration, ThreadPool};
use super::{spawn, spawn_future};
#[test]
fn spawn_then_join_in_worker() {
let (tx, rx) = channel();
scope(move |_| {
spawn(move || tx.send(22).unwrap());
});
assert_eq!(22, rx.recv().unwrap());
}
#[test]
fn spawn_then_join_outside_worker() {
let (tx, rx) = channel();
spawn(move || tx.send(22).unwrap());
assert_eq!(22, rx.recv().unwrap());
}
#[test]
fn panic_fwd() {
let (tx, rx) = channel();
let tx = Mutex::new(tx);
let panic_handler = move |err: Box<Any + Send>| {
let tx = tx.lock().unwrap();
if let Some(&msg) = err.downcast_ref::<&str>() {
if msg == "Hello, world!" {
tx.send(1).unwrap();
} else {
tx.send(2).unwrap();
}
} else {
tx.send(3).unwrap();
}
};
let configuration = Configuration::new().panic_handler(panic_handler);
ThreadPool::new(configuration).unwrap().spawn(move || panic!("Hello, world!"));
assert_eq!(1, rx.recv().unwrap());
}
#[test]
fn async_future_map() {
let data = Arc::new(Mutex::new(format!("Hello, ")));
let a = spawn_future(lazy({
let data = data.clone();
move || Ok::<_, ()>(data)
}));
let future = spawn_future(a.map(|data| {
let mut v = data.lock().unwrap();
v.push_str("world!");
}));
let () = future.wait().unwrap();
assert_eq!(&data.lock().unwrap()[..], "Hello, world!");
}
#[test]
#[should_panic(expected = "Hello, world!")]
fn async_future_panic_prop() {
let future = spawn_future(lazy(move || Ok::<(), ()>(argh())));
let _ = future.rayon_wait();
fn argh() -> () {
if true {
panic!("Hello, world!");
}
}
}
#[test]
fn async_future_scope_interact() {
let future = spawn_future(lazy(move || Ok::<usize, ()>(22)));
let mut vec = vec![];
scope(|s| {
let future = s.spawn_future(future.map(|x| x * 2));
s.spawn(|_| {
vec.push(future.rayon_wait().unwrap());
}); });
assert_eq!(vec![44], vec);
}
#[test]
fn termination_while_things_are_executing() {
let (tx0, rx0) = channel();
let (tx1, rx1) = channel();
{
let thread_pool = ThreadPool::new(Configuration::new()).unwrap();
thread_pool.spawn(move || {
let data = rx0.recv().unwrap();
spawn(move || {
tx1.send(data).unwrap();
});
});
}
tx0.send(22).unwrap();
let v = rx1.recv().unwrap();
assert_eq!(v, 22);
}
#[test]
fn custom_panic_handler_and_spawn() {
let (tx, rx) = channel();
let tx = Mutex::new(tx);
let panic_handler = move |e: Box<Any + Send>| {
tx.lock().unwrap().send(e).unwrap();
};
let config = Configuration::new().panic_handler(panic_handler);
ThreadPool::new(config).unwrap().spawn(move || {
panic!("Hello, world!");
});
let error = rx.recv().unwrap();
if let Some(&msg) = error.downcast_ref::<&str>() {
assert_eq!(msg, "Hello, world!");
} else {
panic!("did not receive a string from panic handler");
}
}
#[test]
fn custom_panic_handler_and_nested_spawn() {
let (tx, rx) = channel();
let tx = Mutex::new(tx);
let panic_handler = move |e| {
tx.lock().unwrap().send(e).unwrap();
};
const PANICS: usize = 3;
let config = Configuration::new().panic_handler(panic_handler);
ThreadPool::new(config).unwrap().spawn(move || {
for _ in 0 .. PANICS {
spawn(move || {
panic!("Hello, world!");
});
}
});
for _ in 0 .. PANICS {
let error = rx.recv().unwrap();
if let Some(&msg) = error.downcast_ref::<&str>() {
assert_eq!(msg, "Hello, world!");
} else {
panic!("did not receive a string from panic handler");
}
}
}