use std::{future::Future, pin::Pin, sync::Arc};
use futures_util::{
stream::{FuturesUnordered, Stream},
task::{waker_ref, ArcWake, Context, LocalFutureObj, Poll as FutPoll},
};
use crate::{
sources::{
channel::{channel, Channel, ChannelError, Event, Sender},
ping::{make_ping, Ping, PingError, PingSource},
EventSource,
},
Poll, PostAction, Readiness, Token, TokenFactory,
};
#[derive(Debug)]
pub struct Executor<T> {
futures: FuturesUnordered<LocalFutureObj<'static, T>>,
new_futures: Channel<LocalFutureObj<'static, T>>,
ready_futures: PingSource,
waker: Arc<ExecWaker>,
}
#[derive(Clone, Debug)]
pub struct Scheduler<T> {
sender: Sender<LocalFutureObj<'static, T>>,
}
impl<T> Scheduler<T> {
pub fn schedule<Fut: 'static>(&self, future: Fut) -> Result<(), ExecutorDestroyed>
where
Fut: Future<Output = T>,
{
let obj = LocalFutureObj::new(Box::new(future));
self.sender.send(obj).map_err(|_| ExecutorDestroyed)
}
}
#[derive(thiserror::Error, Debug)]
#[error("the executor was destroyed")]
pub struct ExecutorDestroyed;
#[derive(Debug)]
struct ExecWaker {
ping: Ping,
}
impl ArcWake for ExecWaker {
fn wake_by_ref(arc_self: &Arc<ExecWaker>) {
arc_self.ping.ping();
}
}
pub fn executor<T>() -> crate::Result<(Executor<T>, Scheduler<T>)> {
let (ping, ready_futures) = make_ping()?;
let (sender, new_futures) = channel();
Ok((
Executor {
futures: FuturesUnordered::new(),
new_futures,
ready_futures,
waker: Arc::new(ExecWaker { ping }),
},
Scheduler { sender },
))
}
impl<T> EventSource for Executor<T> {
type Event = T;
type Metadata = ();
type Ret = ();
type Error = ExecutorError;
fn process_events<F>(
&mut self,
readiness: Readiness,
token: Token,
mut callback: F,
) -> Result<PostAction, Self::Error>
where
F: FnMut(T, &mut ()),
{
let futures = &mut self.futures;
self.new_futures
.process_events(readiness, token, |evt, _| {
if let Event::Msg(fut) = evt {
futures.push(fut);
}
})
.map_err(ExecutorError::NewFutureError)?;
self.ready_futures
.process_events(readiness, token, |(), _| {})
.map_err(ExecutorError::WakeError)?;
let waker = waker_ref(&self.waker);
let mut cx = Context::from_waker(&waker);
while let FutPoll::Ready(Some(ret)) = Pin::new(&mut self.futures).poll_next(&mut cx) {
callback(ret, &mut ());
}
Ok(PostAction::Continue)
}
fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
self.new_futures.register(poll, token_factory)?;
self.ready_futures.register(poll, token_factory)?;
Ok(())
}
fn reregister(
&mut self,
poll: &mut Poll,
token_factory: &mut TokenFactory,
) -> crate::Result<()> {
self.new_futures.reregister(poll, token_factory)?;
self.ready_futures.reregister(poll, token_factory)?;
Ok(())
}
fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
self.new_futures.unregister(poll)?;
self.ready_futures.unregister(poll)?;
Ok(())
}
}
#[derive(thiserror::Error, Debug)]
pub enum ExecutorError {
#[error("error adding new futures")]
NewFutureError(ChannelError),
#[error("error processing wake events")]
WakeError(PingError),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ready() {
let mut event_loop = crate::EventLoop::<u32>::try_new().unwrap();
let handle = event_loop.handle();
let (exec, sched) = executor::<u32>().unwrap();
handle
.insert_source(exec, move |ret, &mut (), got| {
*got = ret;
})
.unwrap();
let mut got = 0;
let fut = async { 42 };
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut got)
.unwrap();
assert_eq!(got, 0);
sched.schedule(fut).unwrap();
event_loop
.dispatch(Some(::std::time::Duration::ZERO), &mut got)
.unwrap();
assert_eq!(got, 42);
}
}