[go: up one dir, main page]

tokio-timer 0.2.3

Timer facilities for Tokio
Documentation
extern crate futures;
extern crate rand;
extern crate tokio_executor;
extern crate tokio_timer;

use tokio_executor::park::{Park, Unpark, UnparkThread};
use tokio_timer::*;

use futures::{Future, Stream};
use futures::stream::FuturesUnordered;
use rand::Rng;

use std::cmp;
use std::sync::{Arc, Barrier};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::thread;
use std::time::{Duration, Instant};

struct Signal {
    rem: AtomicUsize,
    unpark: UnparkThread,
}

#[test]
fn hammer_complete() {
    const ITERS: usize = 5;
    const THREADS: usize = 4;
    const PER_THREAD: usize = 40;
    const MIN_DELAY: u64 = 1;
    const MAX_DELAY: u64 = 5_000;

    for _ in 0..ITERS {
        let mut timer = Timer::default();
        let handle = timer.handle();
        let barrier = Arc::new(Barrier::new(THREADS));

        let done = Arc::new(Signal {
            rem: AtomicUsize::new(THREADS),
            unpark: timer.get_park().unpark(),
        });

        for _ in 0..THREADS {
            let handle = handle.clone();
            let barrier = barrier.clone();
            let done = done.clone();

            thread::spawn(move || {
                let mut exec = FuturesUnordered::new();
                let mut rng = rand::thread_rng();

                barrier.wait();

                for _ in 0..PER_THREAD {
                    let deadline = Instant::now() + Duration::from_millis(
                        rng.gen_range(MIN_DELAY, MAX_DELAY));

                    exec.push({
                        handle.delay(deadline)
                            .and_then(move |_| {
                                let now = Instant::now();
                                assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
                                Ok(())
                            })
                    });
                }

                // Run the logic
                exec.for_each(|_| Ok(()))
                    .wait()
                    .unwrap();

                if 1 == done.rem.fetch_sub(1, SeqCst) {
                    done.unpark.unpark();
                }
            });
        }

        while done.rem.load(SeqCst) > 0 {
            timer.turn(None).unwrap();
        }
    }
}

#[test]
fn hammer_cancel() {
    const ITERS: usize = 5;
    const THREADS: usize = 4;
    const PER_THREAD: usize = 40;
    const MIN_DELAY: u64 = 1;
    const MAX_DELAY: u64 = 5_000;

    for _ in 0..ITERS {
        let mut timer = Timer::default();
        let handle = timer.handle();
        let barrier = Arc::new(Barrier::new(THREADS));

        let done = Arc::new(Signal {
            rem: AtomicUsize::new(THREADS),
            unpark: timer.get_park().unpark(),
        });

        for _ in 0..THREADS {
            let handle = handle.clone();
            let barrier = barrier.clone();
            let done = done.clone();

            thread::spawn(move || {
                let mut exec = FuturesUnordered::new();
                let mut rng = rand::thread_rng();

                barrier.wait();

                for _ in 0..PER_THREAD {
                    let deadline1 = Instant::now() + Duration::from_millis(
                        rng.gen_range(MIN_DELAY, MAX_DELAY));

                    let deadline2 = Instant::now() + Duration::from_millis(
                        rng.gen_range(MIN_DELAY, MAX_DELAY));

                    let deadline = cmp::min(deadline1, deadline2);

                    let delay = handle.delay(deadline1);
                    let join = handle.deadline(delay, deadline2);

                    exec.push({
                        join
                            .and_then(move |_| {
                                let now = Instant::now();
                                assert!(now >= deadline, "deadline greater by {:?}", deadline - now);
                                Ok(())
                            })
                    });
                }

                // Run the logic
                exec
                    .or_else(|e| {
                        assert!(e.is_elapsed());
                        Ok::<_, ()>(())
                    })
                    .for_each(|_| Ok(()))
                    .wait()
                    .unwrap();

                if 1 == done.rem.fetch_sub(1, SeqCst) {
                    done.unpark.unpark();
                }
            });
        }

        while done.rem.load(SeqCst) > 0 {
            timer.turn(None).unwrap();
        }
    }
}

#[test]
fn hammer_reset() {
    const ITERS: usize = 5;
    const THREADS: usize = 4;
    const PER_THREAD: usize = 40;
    const MIN_DELAY: u64 = 1;
    const MAX_DELAY: u64 = 250;

    for _ in 0..ITERS {
        let mut timer = Timer::default();
        let handle = timer.handle();
        let barrier = Arc::new(Barrier::new(THREADS));

        let done = Arc::new(Signal {
            rem: AtomicUsize::new(THREADS),
            unpark: timer.get_park().unpark(),
        });

        for _ in 0..THREADS {
            let handle = handle.clone();
            let barrier = barrier.clone();
            let done = done.clone();

            thread::spawn(move || {
                let mut exec = FuturesUnordered::new();
                let mut rng = rand::thread_rng();

                barrier.wait();

                for _ in 0..PER_THREAD {
                    let deadline1 = Instant::now() + Duration::from_millis(
                        rng.gen_range(MIN_DELAY, MAX_DELAY));

                    let deadline2 = deadline1 + Duration::from_millis(
                        rng.gen_range(MIN_DELAY, MAX_DELAY));

                    let deadline3 = deadline2 + Duration::from_millis(
                        rng.gen_range(MIN_DELAY, MAX_DELAY));

                    exec.push({
                        handle.delay(deadline1)
                            // Select over a second delay
                            .select2(handle.delay(deadline2))
                            .map_err(|e| panic!("boom; err={:?}", e))
                            .and_then(move |res| {
                                use futures::future::Either::*;

                                let now = Instant::now();
                                assert!(now >= deadline1, "deadline greater by {:?}", deadline1 - now);

                                let mut other = match res {
                                    A((_, other)) => other,
                                    B((_, other)) => other,
                                };

                                other.reset(deadline3);
                                other
                            })
                            .and_then(move |_| {
                                let now = Instant::now();
                                assert!(now >= deadline3, "deadline greater by {:?}", deadline3 - now);
                                Ok(())
                            })
                    });
                }

                // Run the logic
                exec
                    .for_each(|_| Ok(()))
                    .wait()
                    .unwrap();

                if 1 == done.rem.fetch_sub(1, SeqCst) {
                    done.unpark.unpark();
                }
            });
        }

        while done.rem.load(SeqCst) > 0 {
            timer.turn(None).unwrap();
        }
    }
}