#![cfg(test)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
#[test]
#[should_panic(expected = "Hello, world!")]
fn panic_propagate() {
let thread_pool = ThreadPoolBuilder::new().build().unwrap();
thread_pool.install(|| {
panic!("Hello, world!");
});
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn workers_stop() {
let registry;
{
let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
registry = thread_pool.install(|| {
join_a_lot(22);
Arc::clone(&thread_pool.registry)
});
assert_eq!(registry.num_threads(), 22);
}
registry.wait_until_stopped();
}
fn join_a_lot(n: usize) {
if n > 0 {
join(|| join_a_lot(n - 1), || join_a_lot(n - 1));
}
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn sleeper_stop() {
use std::{thread, time};
let registry;
{
let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
registry = Arc::clone(&thread_pool.registry);
thread::sleep(time::Duration::from_secs(1));
}
registry.wait_until_stopped();
}
fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
let count = Arc::new(AtomicUsize::new(0));
(Arc::clone(&count), move |_| {
count.fetch_add(1, Ordering::SeqCst);
})
}
fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
use std::{thread, time};
for _ in 0..60 {
counter = match Arc::try_unwrap(counter) {
Ok(counter) => return counter.into_inner(),
Err(counter) => {
thread::sleep(time::Duration::from_secs(1));
counter
}
};
}
panic!("Counter is still shared!");
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn failed_thread_stack() {
let stack_size = ::std::isize::MAX as usize;
let (start_count, start_handler) = count_handler();
let (exit_count, exit_handler) = count_handler();
let builder = ThreadPoolBuilder::new()
.num_threads(10)
.stack_size(stack_size)
.start_handler(start_handler)
.exit_handler(exit_handler);
let pool = builder.build();
assert!(pool.is_err(), "thread stack should have failed!");
let start_count = wait_for_counter(start_count);
assert!(start_count <= 1);
assert_eq!(start_count, wait_for_counter(exit_count));
}
#[test]
#[cfg_attr(not(panic = "unwind"), ignore)]
fn panic_thread_name() {
let (start_count, start_handler) = count_handler();
let (exit_count, exit_handler) = count_handler();
let builder = ThreadPoolBuilder::new()
.num_threads(10)
.start_handler(start_handler)
.exit_handler(exit_handler)
.thread_name(|i| {
if i >= 5 {
panic!();
}
format!("panic_thread_name#{}", i)
});
let pool = crate::unwind::halt_unwinding(|| builder.build());
assert!(pool.is_err(), "thread-name panic should propagate!");
assert_eq!(5, wait_for_counter(start_count));
assert_eq!(5, wait_for_counter(exit_count));
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn self_install() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
assert!(pool.install(|| pool.install(|| true)));
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install() {
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let ok = pool1.install(|| {
pool2.install(|| {
pool1.install(|| {
true
})
})
});
assert!(ok);
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn mutual_install_sleepy() {
use std::{thread, time};
let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let ok = pool1.install(|| {
pool2.install(|| {
thread::sleep(time::Duration::from_secs(1));
pool1.install(|| {
thread::sleep(time::Duration::from_secs(1));
true
})
})
});
assert!(ok);
}
#[test]
#[allow(deprecated)]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn check_thread_pool_new() {
let pool = ThreadPool::new(crate::Configuration::new().num_threads(22)).unwrap();
assert_eq!(pool.current_num_threads(), 22);
}
macro_rules! test_scope_order {
($scope:ident => $spawn:ident) => {{
let builder = ThreadPoolBuilder::new().num_threads(1);
let pool = builder.build().unwrap();
pool.install(|| {
let vec = Mutex::new(vec![]);
pool.$scope(|scope| {
let vec = &vec;
for i in 0..10 {
scope.$spawn(move |_| {
vec.lock().unwrap().push(i);
});
}
});
vec.into_inner().unwrap()
})
}};
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_lifo_order() {
let vec = test_scope_order!(scope => spawn);
let expected: Vec<i32> = (0..10).rev().collect(); assert_eq!(vec, expected);
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn scope_fifo_order() {
let vec = test_scope_order!(scope_fifo => spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); assert_eq!(vec, expected);
}
macro_rules! test_spawn_order {
($spawn:ident) => {{
let builder = ThreadPoolBuilder::new().num_threads(1);
let pool = &builder.build().unwrap();
let (tx, rx) = channel();
pool.install(move || {
for i in 0..10 {
let tx = tx.clone();
pool.$spawn(move || {
tx.send(i).unwrap();
});
}
});
rx.iter().collect::<Vec<i32>>()
}};
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_lifo_order() {
let vec = test_spawn_order!(spawn);
let expected: Vec<i32> = (0..10).rev().collect(); assert_eq!(vec, expected);
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn spawn_fifo_order() {
let vec = test_spawn_order!(spawn_fifo);
let expected: Vec<i32> = (0..10).collect(); assert_eq!(vec, expected);
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_scopes() {
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
where
OP: FnOnce(&[&Scope<'scope>]) + Send,
{
if let Some((pool, tail)) = pools.split_first() {
pool.scope(move |s| {
let mut scopes = scopes;
scopes.push(s);
nest(tail, scopes, op)
})
} else {
(op)(&scopes)
}
}
let pools: Vec<_> = (0..10)
.map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
.collect();
let counter = AtomicUsize::new(0);
nest(&pools, vec![], |scopes| {
for &s in scopes {
s.spawn(|_| {
counter.fetch_add(1, Ordering::Relaxed);
});
}
});
assert_eq!(counter.into_inner(), pools.len());
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn nested_fifo_scopes() {
fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
where
OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
{
if let Some((pool, tail)) = pools.split_first() {
pool.scope_fifo(move |s| {
let mut scopes = scopes;
scopes.push(s);
nest(tail, scopes, op)
})
} else {
(op)(&scopes)
}
}
let pools: Vec<_> = (0..10)
.map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
.collect();
let counter = AtomicUsize::new(0);
nest(&pools, vec![], |scopes| {
for &s in scopes {
s.spawn_fifo(|_| {
counter.fetch_add(1, Ordering::Relaxed);
});
}
});
assert_eq!(counter.into_inner(), pools.len());
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
let rx_ref = ℞
pool.in_place_scope(move |s| {
s.spawn(move |_| {
tx.send(()).unwrap();
});
rx_ref.recv().unwrap();
});
}
#[test]
#[cfg_attr(any(target_os = "emscripten", target_family = "wasm"), ignore)]
fn in_place_scope_fifo_no_deadlock() {
let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
let (tx, rx) = channel();
let rx_ref = ℞
pool.in_place_scope_fifo(move |s| {
s.spawn_fifo(move |_| {
tx.send(()).unwrap();
});
rx_ref.recv().unwrap();
});
}
#[test]
fn yield_now_to_spawn() {
let (tx, rx) = channel();
crate::spawn(move || tx.send(22).unwrap());
crate::registry::in_worker(move |_, _| {
crate::yield_now();
});
assert_eq!(22, rx.recv().unwrap());
}
#[test]
fn yield_local_to_spawn() {
let (tx, rx) = channel();
crate::spawn(move || tx.send(22).unwrap());
crate::registry::in_worker(move |_, _| {
crate::yield_local();
});
assert_eq!(22, rx.recv().unwrap());
}