[go: up one dir, main page]

tokio 1.35.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
use crate::runtime::scheduler::multi_thread::{queue, Stats};
use crate::runtime::task::{self, Schedule, Task};

use std::cell::RefCell;
use std::thread;
use std::time::Duration;

#[allow(unused)]
macro_rules! assert_metrics {
    ($stats:ident, $field:ident == $v:expr) => {{
        use crate::runtime::WorkerMetrics;
        use std::sync::atomic::Ordering::Relaxed;

        let worker = WorkerMetrics::new();
        $stats.submit(&worker);

        let expect = $v;
        let actual = worker.$field.load(Relaxed);

        assert!(actual == expect, "expect = {}; actual = {}", expect, actual)
    }};
}

fn new_stats() -> Stats {
    use crate::runtime::WorkerMetrics;
    Stats::new(&WorkerMetrics::new())
}

#[test]
fn fits_256_one_at_a_time() {
    let (_, mut local) = queue::local();
    let inject = RefCell::new(vec![]);
    let mut stats = new_stats();

    for _ in 0..256 {
        let (task, _) = super::unowned(async {});
        local.push_back_or_overflow(task, &inject, &mut stats);
    }

    cfg_metrics! {
        assert_metrics!(stats, overflow_count == 0);
    }

    assert!(inject.borrow_mut().pop().is_none());

    while local.pop().is_some() {}
}

#[test]
fn fits_256_all_at_once() {
    let (_, mut local) = queue::local();

    let mut tasks = (0..256)
        .map(|_| super::unowned(async {}).0)
        .collect::<Vec<_>>();
    local.push_back(tasks.drain(..));

    let mut i = 0;
    while local.pop().is_some() {
        i += 1;
    }

    assert_eq!(i, 256);
}

#[test]
fn fits_256_all_in_chunks() {
    let (_, mut local) = queue::local();

    let mut tasks = (0..256)
        .map(|_| super::unowned(async {}).0)
        .collect::<Vec<_>>();

    local.push_back(tasks.drain(..10));
    local.push_back(tasks.drain(..100));
    local.push_back(tasks.drain(..46));
    local.push_back(tasks.drain(..100));

    let mut i = 0;
    while local.pop().is_some() {
        i += 1;
    }

    assert_eq!(i, 256);
}

#[test]
fn overflow() {
    let (_, mut local) = queue::local();
    let inject = RefCell::new(vec![]);
    let mut stats = new_stats();

    for _ in 0..257 {
        let (task, _) = super::unowned(async {});
        local.push_back_or_overflow(task, &inject, &mut stats);
    }

    cfg_metrics! {
        assert_metrics!(stats, overflow_count == 1);
    }

    let mut n = 0;

    n += inject.borrow_mut().drain(..).count();

    while local.pop().is_some() {
        n += 1;
    }

    assert_eq!(n, 257);
}

#[test]
fn steal_batch() {
    let mut stats = new_stats();

    let (steal1, mut local1) = queue::local();
    let (_, mut local2) = queue::local();
    let inject = RefCell::new(vec![]);

    for _ in 0..4 {
        let (task, _) = super::unowned(async {});
        local1.push_back_or_overflow(task, &inject, &mut stats);
    }

    assert!(steal1.steal_into(&mut local2, &mut stats).is_some());

    cfg_metrics! {
        assert_metrics!(stats, steal_count == 2);
    }

    for _ in 0..1 {
        assert!(local2.pop().is_some());
    }

    assert!(local2.pop().is_none());

    for _ in 0..2 {
        assert!(local1.pop().is_some());
    }

    assert!(local1.pop().is_none());
}

const fn normal_or_miri(normal: usize, miri: usize) -> usize {
    if cfg!(miri) {
        miri
    } else {
        normal
    }
}

#[test]
fn stress1() {
    const NUM_ITER: usize = 5;
    const NUM_STEAL: usize = normal_or_miri(1_000, 10);
    const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
    const NUM_PUSH: usize = normal_or_miri(500, 10);
    const NUM_POP: usize = normal_or_miri(250, 10);

    let mut stats = new_stats();

    for _ in 0..NUM_ITER {
        let (steal, mut local) = queue::local();
        let inject = RefCell::new(vec![]);

        let th = thread::spawn(move || {
            let mut stats = new_stats();
            let (_, mut local) = queue::local();
            let mut n = 0;

            for _ in 0..NUM_STEAL {
                if steal.steal_into(&mut local, &mut stats).is_some() {
                    n += 1;
                }

                while local.pop().is_some() {
                    n += 1;
                }

                thread::yield_now();
            }

            cfg_metrics! {
                assert_metrics!(stats, steal_count == n as _);
            }

            n
        });

        let mut n = 0;

        for _ in 0..NUM_LOCAL {
            for _ in 0..NUM_PUSH {
                let (task, _) = super::unowned(async {});
                local.push_back_or_overflow(task, &inject, &mut stats);
            }

            for _ in 0..NUM_POP {
                if local.pop().is_some() {
                    n += 1;
                } else {
                    break;
                }
            }
        }

        n += inject.borrow_mut().drain(..).count();

        n += th.join().unwrap();

        assert_eq!(n, NUM_LOCAL * NUM_PUSH);
    }
}

#[test]
fn stress2() {
    const NUM_ITER: usize = 1;
    const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
    const NUM_STEAL: usize = normal_or_miri(1_000, 10);

    let mut stats = new_stats();

    for _ in 0..NUM_ITER {
        let (steal, mut local) = queue::local();
        let inject = RefCell::new(vec![]);

        let th = thread::spawn(move || {
            let mut stats = new_stats();
            let (_, mut local) = queue::local();
            let mut n = 0;

            for _ in 0..NUM_STEAL {
                if steal.steal_into(&mut local, &mut stats).is_some() {
                    n += 1;
                }

                while local.pop().is_some() {
                    n += 1;
                }

                thread::sleep(Duration::from_micros(10));
            }

            n
        });

        let mut num_pop = 0;

        for i in 0..NUM_TASKS {
            let (task, _) = super::unowned(async {});
            local.push_back_or_overflow(task, &inject, &mut stats);

            if i % 128 == 0 && local.pop().is_some() {
                num_pop += 1;
            }

            num_pop += inject.borrow_mut().drain(..).count();
        }

        num_pop += th.join().unwrap();

        while local.pop().is_some() {
            num_pop += 1;
        }

        num_pop += inject.borrow_mut().drain(..).count();

        assert_eq!(num_pop, NUM_TASKS);
    }
}

struct Runtime;

impl Schedule for Runtime {
    fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
        None
    }

    fn schedule(&self, _task: task::Notified<Self>) {
        unreachable!();
    }
}