[go: up one dir, main page]

tokio-timer 0.2.13

Timer facilities for Tokio
Documentation
extern crate futures;
extern crate rand;
extern crate tokio_executor;
extern crate tokio_timer;

use tokio_executor::park::{Park, Unpark, UnparkThread};
use tokio_timer::*;

use futures::stream::FuturesUnordered;
use futures::{Future, Stream};
use rand::Rng;

use std::cmp;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::{Duration, Instant};

struct Signal {
    rem: AtomicUsize,
    unpark: UnparkThread,
}

#[test]
fn hammer_complete() {
    const ITERS: usize = 5;
    const THREADS: usize = 4;
    const PER_THREAD: usize = 40;
    const MIN_DELAY: u64 = 1;
    const MAX_DELAY: u64 = 5_000;

    for _ in 0..ITERS {
        let mut timer = Timer::default();
        let handle = timer.handle();
        let barrier = Arc::new(Barrier::new(THREADS));

        let done = Arc::new(Signal {
            rem: AtomicUsize::new(THREADS),
            unpark: timer.get_park().unpark(),
        });

        for _ in 0..THREADS {
            let handle = handle.clone();
            let barrier = barrier.clone();
            let done = done.clone();

            thread::spawn(move || {
                let mut exec = FuturesUnordered::new();
                let mut rng = rand::thread_rng();

                barrier.wait();

                for _ in 0..PER_THREAD {
                    let deadline =
                        Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));

                    exec.push({
                        handle.delay(deadline).and_then(move |_| {
                            let now = Instant::now();
                            assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
                            Ok(())
                        })
                    });
                }

                // Run the logic
                exec.for_each(|_| Ok(())).wait().unwrap();

                if 1 == done.rem.fetch_sub(1, SeqCst) {
                    done.unpark.unpark();
                }
            });
        }

        while done.rem.load(SeqCst) > 0 {
            timer.turn(None).unwrap();
        }
    }
}

#[test]
fn hammer_cancel() {
    const ITERS: usize = 5;
    const THREADS: usize = 4;
    const PER_THREAD: usize = 40;
    const MIN_DELAY: u64 = 1;
    const MAX_DELAY: u64 = 5_000;

    for _ in 0..ITERS {
        let mut timer = Timer::default();
        let handle = timer.handle();
        let barrier = Arc::new(Barrier::new(THREADS));

        let done = Arc::new(Signal {
            rem: AtomicUsize::new(THREADS),
            unpark: timer.get_park().unpark(),
        });

        for _ in 0..THREADS {
            let handle = handle.clone();
            let barrier = barrier.clone();
            let done = done.clone();

            thread::spawn(move || {
                let mut exec = FuturesUnordered::new();
                let mut rng = rand::thread_rng();

                barrier.wait();

                for _ in 0..PER_THREAD {
                    let deadline1 =
                        Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));

                    let deadline2 =
                        Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));

                    let deadline = cmp::min(deadline1, deadline2);

                    let delay = handle.delay(deadline1);
                    let join = handle.timeout(delay, deadline2);

                    exec.push({
                        join.and_then(move |_| {
                            let now = Instant::now();
                            assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
                            Ok(())
                        })
                    });
                }

                // Run the logic
                exec.or_else(|e| {
                    assert!(e.is_elapsed());
                    Ok::<_, ()>(())
                })
                .for_each(|_| Ok(()))
                .wait()
                .unwrap();

                if 1 == done.rem.fetch_sub(1, SeqCst) {
                    done.unpark.unpark();
                }
            });
        }

        while done.rem.load(SeqCst) > 0 {
            timer.turn(None).unwrap();
        }
    }
}

#[test]
fn hammer_reset() {
    const ITERS: usize = 5;
    const THREADS: usize = 4;
    const PER_THREAD: usize = 40;
    const MIN_DELAY: u64 = 1;
    const MAX_DELAY: u64 = 250;

    for _ in 0..ITERS {
        let mut timer = Timer::default();
        let handle = timer.handle();
        let barrier = Arc::new(Barrier::new(THREADS));

        let done = Arc::new(Signal {
            rem: AtomicUsize::new(THREADS),
            unpark: timer.get_park().unpark(),
        });

        for _ in 0..THREADS {
            let handle = handle.clone();
            let barrier = barrier.clone();
            let done = done.clone();

            thread::spawn(move || {
                let mut exec = FuturesUnordered::new();
                let mut rng = rand::thread_rng();

                barrier.wait();

                for _ in 0..PER_THREAD {
                    let deadline1 =
                        Instant::now() + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));

                    let deadline2 =
                        deadline1 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));

                    let deadline3 =
                        deadline2 + Duration::from_millis(rng.gen_range(MIN_DELAY, MAX_DELAY));

                    exec.push({
                        handle
                            .delay(deadline1)
                            // Select over a second delay
                            .select2(handle.delay(deadline2))
                            .map_err(|e| panic!("boom; err={:?}", e))
                            .and_then(move |res| {
                                use futures::future::Either::*;

                                let now = Instant::now();
                                assert!(
                                    now >= deadline1,
                                    "deadline greater by {:?}",
                                    deadline1 - now
                                );

                                let mut other = match res {
                                    A((_, other)) => other,
                                    B((_, other)) => other,
                                };

                                other.reset(deadline3);
                                other
                            })
                            .and_then(move |_| {
                                let now = Instant::now();
                                assert!(
                                    now >= deadline3,
                                    "deadline greater by {:?}",
                                    deadline3 - now
                                );
                                Ok(())
                            })
                    });
                }

                // Run the logic
                exec.for_each(|_| Ok(())).wait().unwrap();

                if 1 == done.rem.fetch_sub(1, SeqCst) {
                    done.unpark.unpark();
                }
            });
        }

        while done.rem.load(SeqCst) > 0 {
            timer.turn(None).unwrap();
        }
    }
}