use futures::{self, Async, Future};
use futures::future::lazy;
use futures::sync::oneshot;
use futures::task::{self, Unpark};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use ::{scope, ThreadPool, Configuration};
#[test]
fn future_test() {
let data = &[0, 1];
ThreadPool::new(Configuration::new().num_threads(2)).unwrap().install(|| {
scope(|s| {
let a = s.spawn_future(futures::future::ok::<_, ()>(&data[0]));
let b = s.spawn_future(futures::future::ok::<_, ()>(&data[1]));
let (item1, next) = a.select(b).wait().ok().unwrap();
let item2 = next.wait().unwrap();
assert!(*item1 == 0 || *item1 == 1);
assert!(*item2 == 1 - *item1);
});
});
}
#[test]
fn future_map() {
let data = &mut [format!("Hello, ")];
let mut future = None;
scope(|s| {
let a = s.spawn_future(lazy(|| Ok::<_, ()>(&mut data[0])));
future = Some(s.spawn_future(a.map(|v| {
v.push_str("world!");
})));
});
assert_eq!(data[0], "Hello, world!");
assert!(future.is_some());
}
#[test]
fn future_escape_ref() {
let data = &mut [format!("Hello, ")];
{
let mut future = None;
scope(|s| {
let data = &mut *data;
future = Some(s.spawn_future(lazy(move || Ok::<_, ()>(&mut data[0]))));
});
let s = future.unwrap().wait().unwrap();
s.push_str("world!");
}
assert_eq!(data[0], "Hello, world!");
}
#[test]
#[should_panic(expected = "Hello, world!")]
fn future_panic_prop() {
scope(|s| {
let future = s.spawn_future(lazy(move || Ok::<(), ()>(argh())));
let _ = future.rayon_wait(); });
fn argh() -> () {
if true {
panic!("Hello, world!");
}
}
}
#[test]
fn future_rayon_wait_1_thread() {
let mut result = None;
ThreadPool::new(Configuration::new().num_threads(1)).unwrap().install(|| {
scope(|s| {
use std::sync::mpsc::channel;
let (tx, rx) = channel();
let a = s.spawn_future(lazy(move || Ok::<usize, ()>(rx.recv().unwrap())));
let b = s.spawn_future(a.map(|v| v + 1));
let c = s.spawn_future(b.map(|v| v + 1));
s.spawn(move |_| tx.send(20).unwrap());
result = Some(c.rayon_wait().unwrap());
});
});
assert_eq!(result, Some(22));
}
#[test]
#[should_panic]
fn future_wait_panics_inside_rayon_thread() {
scope(|s| {
let future = s.spawn_future(lazy(move || Ok::<(), ()>(())));
let _ = future.wait(); });
}
#[test]
fn future_wait_works_outside_rayon_threads() {
let mut future = None;
scope(|s| {
future = Some(s.spawn_future(lazy(move || Ok::<(), ()>(()))));
});
assert_eq!(Ok(()), future.unwrap().wait());
}
#[test]
#[should_panic(expected = "Hello, world!")]
fn panicy_unpark() {
scope(|s| {
let (a_tx, a_rx) = oneshot::channel::<u32>();
let rf = s.spawn_future(a_rx);
let mut spawn = task::spawn(rf);
let unpark = Arc::new(PanicUnpark);
match spawn.poll_future(unpark.clone()) {
Ok(Async::NotReady) => {
}
r => panic!("spawn poll returned: {:?}", r),
}
a_tx.send(22).unwrap();
let v = spawn.into_inner().rayon_wait().unwrap();
assert_eq!(v, 22);
});
panic!("scope failed to panic!");
struct PanicUnpark;
impl Unpark for PanicUnpark {
fn unpark(&self) {
panic!("Hello, world!");
}
}
}
#[test]
fn double_unpark() {
let unpark0 = Arc::new(TrackUnpark { value: AtomicUsize::new(0) });
let unpark1 = Arc::new(TrackUnpark { value: AtomicUsize::new(0) });
let mut _tag = None;
scope(|s| {
let (a_tx, a_rx) = oneshot::channel::<u32>();
let rf = s.spawn_future(a_rx);
let mut spawn = task::spawn(rf);
for i in 0..22 {
let u = if i % 2 == 0 {
unpark0.clone()
} else {
unpark1.clone()
};
match spawn.poll_future(u) {
Ok(Async::NotReady) => {
}
r => panic!("spawn poll returned: {:?}", r),
}
}
a_tx.send(22).unwrap();
_tag = Some(spawn.into_inner());
});
assert_eq!(unpark1.value.load(Ordering::SeqCst), 1);
assert_eq!(unpark0.value.load(Ordering::SeqCst), 0);
struct TrackUnpark {
value: AtomicUsize,
}
impl Unpark for TrackUnpark {
fn unpark(&self) {
self.value.fetch_add(1, Ordering::SeqCst);
}
}
}