#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
#![doc(
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]
use std::cell::RefCell;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock, TryLockError};
use std::task::{Poll, Waker};
use async_lock::OnceCell;
use async_task::Runnable;
use concurrent_queue::ConcurrentQueue;
use futures_lite::{future, prelude::*};
use slab::Slab;
#[doc(no_inline)]
pub use async_task::Task;
pub struct Executor<'a> {
state: OnceCell<Arc<State>>,
_marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
}
unsafe impl Send for Executor<'_> {}
unsafe impl Sync for Executor<'_> {}
impl UnwindSafe for Executor<'_> {}
impl RefUnwindSafe for Executor<'_> {}
impl fmt::Debug for Executor<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
debug_executor(self, "Executor", f)
}
}
impl<'a> Executor<'a> {
pub const fn new() -> Executor<'a> {
Executor {
state: OnceCell::new(),
_marker: PhantomData,
}
}
pub fn is_empty(&self) -> bool {
self.state().active.lock().unwrap().is_empty()
}
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
let mut active = self.state().active.lock().unwrap();
let index = active.vacant_entry().key();
let state = self.state().clone();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
future.await
};
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) };
active.insert(runnable.waker());
runnable.schedule();
task
}
pub fn try_tick(&self) -> bool {
match self.state().queue.pop() {
Err(_) => false,
Ok(runnable) => {
self.state().notify();
runnable.run();
true
}
}
}
pub async fn tick(&self) {
let state = self.state();
let runnable = Ticker::new(state).runnable().await;
runnable.run();
}
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
let runner = Runner::new(self.state());
let mut rng = fastrand::Rng::new();
LocalQueue::set(self.state(), &runner.local, {
let runner = &runner;
async move {
let run_forever = async {
loop {
for _ in 0..200 {
let runnable = runner.runnable(&mut rng).await;
runnable.run();
}
future::yield_now().await;
}
};
future.or(run_forever).await
}
})
.await
}
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state().clone();
move |runnable| {
let mut runnable = Some(runnable);
LocalQueue::with(|local_queue| {
if local_queue.state != &*state as *const State as usize {
return;
}
if let Err(e) = local_queue.queue.push(runnable.take().unwrap()) {
runnable = Some(e.into_inner());
return;
}
local_queue.waker.wake_by_ref();
});
if let Some(runnable) = runnable {
state.queue.push(runnable).unwrap();
state.notify();
}
}
}
fn state(&self) -> &Arc<State> {
self.state.get_or_init_blocking(|| Arc::new(State::new()))
}
}
impl Drop for Executor<'_> {
fn drop(&mut self) {
if let Some(state) = self.state.get() {
let mut active = state.active.lock().unwrap();
for w in active.drain() {
w.wake();
}
drop(active);
while state.queue.pop().is_ok() {}
}
}
}
impl<'a> Default for Executor<'a> {
fn default() -> Executor<'a> {
Executor::new()
}
}
pub struct LocalExecutor<'a> {
inner: Executor<'a>,
_marker: PhantomData<Rc<()>>,
}
impl UnwindSafe for LocalExecutor<'_> {}
impl RefUnwindSafe for LocalExecutor<'_> {}
impl fmt::Debug for LocalExecutor<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
debug_executor(&self.inner, "LocalExecutor", f)
}
}
impl<'a> LocalExecutor<'a> {
pub const fn new() -> LocalExecutor<'a> {
LocalExecutor {
inner: Executor::new(),
_marker: PhantomData,
}
}
pub fn is_empty(&self) -> bool {
self.inner().is_empty()
}
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
let mut active = self.inner().state().active.lock().unwrap();
let index = active.vacant_entry().key();
let state = self.inner().state().clone();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
future.await
};
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) };
active.insert(runnable.waker());
runnable.schedule();
task
}
pub fn try_tick(&self) -> bool {
self.inner().try_tick()
}
pub async fn tick(&self) {
self.inner().tick().await
}
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
self.inner().run(future).await
}
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.inner().state().clone();
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}
fn inner(&self) -> &Executor<'a> {
&self.inner
}
}
impl<'a> Default for LocalExecutor<'a> {
fn default() -> LocalExecutor<'a> {
LocalExecutor::new()
}
}
struct State {
queue: ConcurrentQueue<Runnable>,
local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
notified: AtomicBool,
sleepers: Mutex<Sleepers>,
active: Mutex<Slab<Waker>>,
}
impl State {
fn new() -> State {
State {
queue: ConcurrentQueue::unbounded(),
local_queues: RwLock::new(Vec::new()),
notified: AtomicBool::new(true),
sleepers: Mutex::new(Sleepers {
count: 0,
wakers: Vec::new(),
free_ids: Vec::new(),
}),
active: Mutex::new(Slab::new()),
}
}
#[inline]
fn notify(&self) {
if self
.notified
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let waker = self.sleepers.lock().unwrap().notify();
if let Some(w) = waker {
w.wake();
}
}
}
}
struct Sleepers {
count: usize,
wakers: Vec<(usize, Waker)>,
free_ids: Vec<usize>,
}
impl Sleepers {
fn insert(&mut self, waker: &Waker) -> usize {
let id = match self.free_ids.pop() {
Some(id) => id,
None => self.count + 1,
};
self.count += 1;
self.wakers.push((id, waker.clone()));
id
}
fn update(&mut self, id: usize, waker: &Waker) -> bool {
for item in &mut self.wakers {
if item.0 == id {
if !item.1.will_wake(waker) {
item.1 = waker.clone();
}
return false;
}
}
self.wakers.push((id, waker.clone()));
true
}
fn remove(&mut self, id: usize) -> bool {
self.count -= 1;
self.free_ids.push(id);
for i in (0..self.wakers.len()).rev() {
if self.wakers[i].0 == id {
self.wakers.remove(i);
return false;
}
}
true
}
fn is_notified(&self) -> bool {
self.count == 0 || self.count > self.wakers.len()
}
fn notify(&mut self) -> Option<Waker> {
if self.wakers.len() == self.count {
self.wakers.pop().map(|item| item.1)
} else {
None
}
}
}
struct Ticker<'a> {
state: &'a State,
sleeping: AtomicUsize,
}
impl Ticker<'_> {
fn new(state: &State) -> Ticker<'_> {
Ticker {
state,
sleeping: AtomicUsize::new(0),
}
}
fn sleep(&self, waker: &Waker) -> bool {
let mut sleepers = self.state.sleepers.lock().unwrap();
match self.sleeping.load(Ordering::SeqCst) {
0 => self
.sleeping
.store(sleepers.insert(waker), Ordering::SeqCst),
id => {
if !sleepers.update(id, waker) {
return false;
}
}
}
self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
true
}
fn wake(&self) {
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
sleepers.remove(id);
self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
}
}
async fn runnable(&self) -> Runnable {
self.runnable_with(|| self.state.queue.pop().ok()).await
}
async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
future::poll_fn(|cx| {
loop {
match search() {
None => {
if !self.sleep(cx.waker()) {
return Poll::Pending;
}
}
Some(r) => {
self.wake();
self.state.notify();
return Poll::Ready(r);
}
}
}
})
.await
}
}
impl Drop for Ticker<'_> {
fn drop(&mut self) {
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
let notified = sleepers.remove(id);
self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
if notified {
drop(sleepers);
self.state.notify();
}
}
}
}
struct Runner<'a> {
state: &'a State,
ticker: Ticker<'a>,
local: Arc<ConcurrentQueue<Runnable>>,
ticks: AtomicUsize,
}
impl Runner<'_> {
fn new(state: &State) -> Runner<'_> {
let runner = Runner {
state,
ticker: Ticker::new(state),
local: Arc::new(ConcurrentQueue::bounded(512)),
ticks: AtomicUsize::new(0),
};
state
.local_queues
.write()
.unwrap()
.push(runner.local.clone());
runner
}
async fn runnable(&self, rng: &mut fastrand::Rng) -> Runnable {
let runnable = self
.ticker
.runnable_with(|| {
if let Ok(r) = self.local.pop() {
return Some(r);
}
if let Ok(r) = self.state.queue.pop() {
steal(&self.state.queue, &self.local);
return Some(r);
}
let local_queues = self.state.local_queues.read().unwrap();
let n = local_queues.len();
let start = rng.usize(..n);
let iter = local_queues
.iter()
.chain(local_queues.iter())
.skip(start)
.take(n);
let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
for local in iter {
steal(local, &self.local);
if let Ok(r) = self.local.pop() {
return Some(r);
}
}
None
})
.await;
let ticks = self.ticks.fetch_add(1, Ordering::SeqCst);
if ticks % 64 == 0 {
steal(&self.state.queue, &self.local);
}
runnable
}
}
impl Drop for Runner<'_> {
fn drop(&mut self) {
self.state
.local_queues
.write()
.unwrap()
.retain(|local| !Arc::ptr_eq(local, &self.local));
while let Ok(r) = self.local.pop() {
r.schedule();
}
}
}
struct LocalQueue {
state: usize,
queue: Arc<ConcurrentQueue<Runnable>>,
waker: Waker,
}
impl LocalQueue {
fn with<R>(f: impl FnOnce(&LocalQueue) -> R) -> Option<R> {
std::thread_local! {
static LOCAL_QUEUE: RefCell<Option<LocalQueue>> = RefCell::new(None);
}
impl LocalQueue {
async fn set<F>(
state: &State,
queue: &Arc<ConcurrentQueue<Runnable>>,
fut: F,
) -> F::Output
where
F: Future,
{
let mut old = with_waker(|waker| {
LOCAL_QUEUE.with(move |slot| {
slot.borrow_mut().replace(LocalQueue {
state: state as *const State as usize,
queue: queue.clone(),
waker: waker.clone(),
})
})
})
.await;
let _guard = CallOnDrop(move || {
let old = old.take();
let _ = LOCAL_QUEUE.try_with(move |slot| {
*slot.borrow_mut() = old;
});
});
futures_lite::pin!(fut);
future::poll_fn(move |cx| {
LOCAL_QUEUE
.try_with({
let waker = cx.waker();
move |slot| {
let mut slot = slot.borrow_mut();
let qaw = slot.as_mut().expect("missing local queue");
if !Arc::ptr_eq(&qaw.queue, queue) {
return;
}
if !qaw.waker.will_wake(waker) {
qaw.waker = waker.clone();
}
}
})
.ok();
fut.as_mut().poll(cx)
})
.await
}
}
LOCAL_QUEUE
.try_with(|local_queue| local_queue.borrow().as_ref().map(f))
.ok()
.flatten()
}
}
fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
let mut count = (src.len() + 1) / 2;
if count > 0 {
if let Some(cap) = dest.capacity() {
count = count.min(cap - dest.len());
}
for _ in 0..count {
if let Ok(t) = src.pop() {
assert!(dest.push(t).is_ok());
} else {
break;
}
}
}
}
fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = match executor.state.get() {
Some(state) => state,
None => {
struct Uninitialized;
impl fmt::Debug for Uninitialized {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<uninitialized>")
}
}
return f.debug_tuple(name).field(&Uninitialized).finish();
}
};
struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
impl fmt::Debug for ActiveTasks<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0.try_lock() {
Ok(lock) => fmt::Debug::fmt(&lock.len(), f),
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
}
}
}
struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
impl fmt::Debug for LocalRunners<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0.try_read() {
Ok(lock) => f
.debug_list()
.entries(lock.iter().map(|queue| queue.len()))
.finish(),
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
}
}
}
struct SleepCount<'a>(&'a Mutex<Sleepers>);
impl fmt::Debug for SleepCount<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0.try_lock() {
Ok(lock) => fmt::Debug::fmt(&lock.count, f),
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
}
}
}
f.debug_struct(name)
.field("active", &ActiveTasks(&state.active))
.field("global_tasks", &state.queue.len())
.field("local_runners", &LocalRunners(&state.local_queues))
.field("sleepers", &SleepCount(&state.sleepers))
.finish()
}
struct CallOnDrop<F: FnMut()>(F);
impl<F: FnMut()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}
fn with_waker<F: FnOnce(&Waker) -> R, R>(f: F) -> impl Future<Output = R> {
let mut f = Some(f);
future::poll_fn(move |cx| {
let f = f.take().unwrap();
Poll::Ready(f(cx.waker()))
})
}
fn _ensure_send_and_sync() {
use futures_lite::future::pending;
fn is_send<T: Send>(_: T) {}
fn is_sync<T: Sync>(_: T) {}
is_send::<Executor<'_>>(Executor::new());
is_sync::<Executor<'_>>(Executor::new());
let ex = Executor::new();
is_send(ex.run(pending::<()>()));
is_sync(ex.run(pending::<()>()));
is_send(ex.tick());
is_sync(ex.tick());
fn _negative_test() {}
}