#![feature(async_await)]
use std::error::Error;
use std::io;
use std::time::Duration;
use std::task::Poll;
use std::thread;
use futures::future::poll_fn;
use futures::channel::mpsc::*;
use futures_timer::*;
use futures::TryStreamExt as TryStreamExt03;
type TestResult = io::Result<()>;
#[runtime::test]
async fn future_timeout() -> TestResult {
let long_future = poll_fn::<TestResult, _>(|_| {
Poll::Pending
});
let res = long_future.timeout(Duration::from_millis(100)).await;
assert_eq!("future timed out", res.unwrap_err().description());
Ok(())
}
#[runtime::test]
async fn future_doesnt_timeout() -> TestResult {
let short_future = futures::future::ready::<TestResult>(Ok(()));
short_future.timeout(Duration::from_millis(100)).await?;
Ok(())
}
#[runtime::test]
async fn stream() -> TestResult {
let dur = Duration::from_millis(10);
Delay::new(dur).await?;
Delay::new(dur).await?;
Ok(())
}
#[runtime::test]
async fn stream_timeout() -> TestResult {
let (mut tx, rx) = unbounded::<io::Result<u8>>();
thread::spawn(move || {
for i in 0..10_u8 {
tx.start_send(Ok(i)).unwrap();
thread::sleep(Duration::from_millis(100));
}
drop(tx)
});
let mut f = rx.timeout(Duration::from_millis(10));
let mut ok = 0;
let mut err = 0;
loop {
let next = f.try_next().await;
match next {
Ok(None) => { break; }
Ok(_) => { ok += 1; }
Err(_) => { err += 1; }
}
}
assert_eq!(ok, 10);
assert!(err > ok * 5);
Ok(())
}
#[runtime::test]
async fn stream_doesnt_timeout() -> TestResult {
let (mut tx, rx) = unbounded::<io::Result<u8>>();
thread::spawn(move || {
for i in 0..10_u8 {
tx.start_send(Ok(i)).unwrap();
thread::sleep(Duration::from_millis(100));
}
drop(tx)
});
let mut f = rx.timeout(Duration::from_millis(200));
let mut count = 0;
loop {
let next = f.try_next().await;
if let Ok(None) = next {
break;
}
next.unwrap();
count += 1;
}
assert_eq!(count, 10);
Ok(())
}