[go: up one dir, main page]

smolscale 0.3.9

A global, auto-scaling, preemptive scheduler
Documentation
//! A global, auto-scaling, preemptive scheduler using work-balancing.
//!
//! ## What? Another executor?
//!
//! `smolscale` is a **work-balancing** executor based on [async-task], designed to be a drop-in replacement to `smol` and `async-global-executor`. It is designed based on the thesis that work-stealing, the usual approach in async executors like `async-executor` and `tokio`, is not the right algorithm for scheduling huge amounts of tiny, interdependent work units, which are what message-passing futures end up being. Instead, `smolscale` uses *work-balancing*, an approach also found in Erlang, where a global "balancer" thread periodically balances work between workers, but workers do not attempt to steal tasks from each other. This avoids the extremely frequent stealing attempts that work-stealing schedulers generate when applied to async tasks.
//!
//! `smolscale`'s approach especially excels in two circumstances:
//! - **When the CPU cores are not fully loaded**: Traditional work stealing optimizes for the case where most workers have work to do, which is only the case in fully-loaded scenarios. When workers often wake up and go back to sleep, however, a lot of CPU time is wasted stealing work. `smolscale` will instead drastically reduce CPU usage in these circumstances --- a `async-executor` app that takes 80% of CPU time may now take only 20%. Although this does not improve fully-loaded throughput, it significantly reduces power consumption and does increase throughput in circumstances where multiple thread pools compete for CPU time.
//! - **When a lot of message-passing is happening**: Message-passing workloads often involve tasks quickly waking up and going back to sleep. In a work-stealing scheduler, this again floods the scheduler with stealing requests. `smolscale` can significantly improve throughput, especially compared to executors like `async-executor` that do not special-case message passing.
//!
//! Furthermore, smolscale has a **preemptive** thread pool that ensures that tasks cannot block other tasks no matter what. This means that you can do things like run expensive computations or even do blocking I/O within a task without worrying about causing deadlocks. Even with "traditional" tasks that do not block, this approach can reduce worst-case latency. Preemption is heavily inspired by Stjepan Glavina's [previous work on async-std](https://async.rs/blog/stop-worrying-about-blocking-the-new-async-std-runtime/).
//!
//! `smolscale` also experimentally includes `Nursery`, a helper for [structured concurrency](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) on the `smolscale` global executor.
//!
//! ## Show me the benchmarks!
//!
//! Right now, `smolscale` uses a very naive implementation (for example, stealable local queues are implemented as SPSC queues with a spinlock on the consumer side, and worker parking is done naively through `event-listener`), and its performance is expected to drastically increase. However, at most tasks it is already much faster than `async-global-executor` (the de-facto standard "non-Tokio-world" executor, which powers `async-std`), sometimes an order of magnitude faster. Here are some unscientific benchmark results; percentages are compared to `async-global-executor`:
//! ```
//! spawn_one               time:   [105.08 ns 105.21 ns 105.36 ns]                      
//!                         change: [-98.570% -98.549% -98.530%] (p = 0.00 < 0.05)
//!                         Performance has improved.
//!
//! spawn_many              time:   [3.0585 ms 3.0598 ms 3.0613 ms]                        
//!                         change: [-87.576% -87.291% -86.948%] (p = 0.00 < 0.05)
//!                         Performance has improved.
//!
//! yield_now               time:   [4.1676 ms 4.1917 ms 4.2166 ms]                       
//!                         change: [-50.455% -49.994% -49.412%] (p = 0.00 < 0.05)
//!                         Performance has improved.
//
//! ping_pong               time:   [8.5389 ms 8.6990 ms 8.8525 ms]                      
//!                         change: [+12.264% +14.548% +16.917%] (p = 0.00 < 0.05)
//!                         Performance has regressed.
//!
//! Benchmarking spawn_executors_recursively:
//! spawn_executors_recursively                                                                            
//!                         time:   [180.26 ms 180.40 ms 180.56 ms]
//!                         change: [+497.14% +500.08% +502.97%] (p = 0.00 < 0.05)
//!                         Performance has regressed.
//!
//! context_switch_quiet    time:   [100.67 us 102.05 us 103.07 us]                                 
//!                         change: [-42.789% -41.170% -39.490%] (p = 0.00 < 0.05)
//!                         Performance has improved.
//!
//! context_switch_busy     time:   [8.7637 ms 8.9012 ms 9.0561 ms]                                
//!                         change: [+3.3147% +5.5719% +7.6684%] (p = 0.00 < 0.05)
//!                         Performance has regressed.
//!```

use fastcounter::FastCounter;
use futures_lite::prelude::*;
use once_cell::sync::{Lazy, OnceCell};
use std::{
    pin::Pin,
    sync::atomic::AtomicUsize,
    sync::atomic::{AtomicBool, Ordering},
    task::{Context, Poll},
    time::Duration,
};
mod executor;
mod fastcounter;
mod nursery;
mod sp2c;
pub use executor::*;
pub use nursery::*;

//const CHANGE_THRESH: u32 = 10;
const MONITOR_MS: u64 = 5;

const MAX_THREADS: usize = 1500;

// thread_local! {
//     static LEXEC: Rc<async_executor::LocalExecutor<'static>> = Rc::new(async_executor::LocalExecutor::new())
// }
static EXEC: Lazy<Executor> = Lazy::new(Executor::new);

static FUTURES_BEING_POLLED: Lazy<FastCounter> = Lazy::new(Default::default);
static POLL_COUNT: Lazy<FastCounter> = Lazy::new(Default::default);

static THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);

static MONITOR: OnceCell<std::thread::JoinHandle<()>> = OnceCell::new();

static SINGLE_THREAD: AtomicBool = AtomicBool::new(false);

/// Irrevocably puts smolscale into single-threaded mode.
pub fn permanently_single_threaded() {
    SINGLE_THREAD.store(true, Ordering::Relaxed);
}

fn start_monitor() {
    MONITOR.get_or_init(|| {
        std::thread::Builder::new()
            .name("sscale-mon".into())
            .spawn(monitor_loop)
            .unwrap()
    });
}

fn monitor_loop() {
    fn start_thread(exitable: bool, process_io: bool) {
        THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
        std::thread::Builder::new()
            .name(
                if exitable {
                    "sscale-wkr-e"
                } else {
                    "sscale-wkr-c"
                }
                .into(),
            )
            .stack_size(128 * 1024)
            .spawn(move || {
                // let local_exec = LEXEC.with(|v| Rc::clone(v));
                let future = async {
                    scopeguard::defer!({
                        THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
                    });
                    // let run_local = local_exec.run(futures_lite::future::pending::<()>());
                    if exitable {
                        EXEC.worker()
                            .run()
                            .or(async {
                                async_io::Timer::after(Duration::from_secs(3)).await;
                            })
                            .await;
                    } else {
                        EXEC.worker().run().await;
                    };
                };
                if process_io {
                    async_io::block_on(future)
                } else {
                    futures_lite::future::block_on(future)
                }
            })
            .unwrap();
    }
    if SINGLE_THREAD.load(Ordering::Relaxed) {
        start_thread(false, true);
        return;
    } else {
        for _ in 0..num_cpus::get() {
            start_thread(false, true);
        }
    }

    // "Token bucket"
    let mut token_bucket = 1000;
    for count in 0u64.. {
        if count % 100 == 0 && token_bucket < 1000 {
            token_bucket += 1
        }
        EXEC.rebalance();
        if SINGLE_THREAD.load(Ordering::Relaxed) {
            return;
        }
        let before_sleep = POLL_COUNT.count();
        std::thread::sleep(Duration::from_millis(MONITOR_MS));
        let after_sleep = POLL_COUNT.count();
        let running_threads = THREAD_COUNT.load(Ordering::Relaxed);
        let full_running = FUTURES_BEING_POLLED.count() >= running_threads;
        if after_sleep == before_sleep
            && running_threads <= MAX_THREADS
            && full_running
            && token_bucket > 0
        {
            start_thread(true, false);
            token_bucket -= 1;
        }
    }
    unreachable!()
}

/// Spawns a future onto the global executor and immediately blocks on it.
pub fn block_on<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> T {
    futures_lite::future::block_on(spawn(future))
}

/// Spawns a task onto the lazily-initialized global executor.
///
/// The task can block or run CPU-intensive code if needed --- it will not block other tasks.
pub fn spawn<T: Send + 'static>(
    future: impl Future<Output = T> + Send + 'static,
) -> async_executor::Task<T> {
    start_monitor();
    EXEC.spawn(WrappedFuture::new(future))
    // async_global_executor::spawn(future)
}

// /// Spawns a task onto the lazily-initialized thread-local executor.
// ///
// /// The task should **NOT** block or run CPU-intensive code
// pub fn spawn<T: 'static>(
//     future: impl Future<Output = T> + 'static,
// ) -> async_executor::Task<T> {
//     start_monitor();
//     LEXEC.with(|v| v.spawn(future))
//     // async_global_executor::spawn(future)
// }

struct WrappedFuture<T, F: Future<Output = T>> {
    fut: F,
}

static ACTIVE_TASKS: Lazy<FastCounter> = Lazy::new(Default::default);

/// Returns the current number of active tasks.
pub fn active_task_count() -> usize {
    ACTIVE_TASKS.count()
}

/// Returns the current number of running tasks.
pub fn running_task_count() -> usize {
    FUTURES_BEING_POLLED.count()
}

impl<T, F: Future<Output = T>> Drop for WrappedFuture<T, F> {
    fn drop(&mut self) {
        ACTIVE_TASKS.decr();
    }
}

impl<T, F: Future<Output = T>> Future for WrappedFuture<T, F> {
    type Output = T;

    #[inline]
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // FUTURES_BEING_POLLED.incr();
        POLL_COUNT.incr();
        // scopeguard::defer!({
        //     FUTURES_BEING_POLLED.decr();
        // });

        let fut = unsafe { self.map_unchecked_mut(|v| &mut v.fut) };
        fut.poll(cx)
    }
}

impl<T, F: Future<Output = T> + 'static> WrappedFuture<T, F> {
    pub fn new(fut: F) -> Self {
        ACTIVE_TASKS.incr();
        WrappedFuture { fut }
    }
}