[go: up one dir, main page]

tokio-executor 0.1.3

Future execution primitives
Documentation
use super::{Executor, Enter, SpawnError};

use futures::Future;

use std::cell::Cell;

#[cfg(feature = "unstable-futures")]
use futures2;

/// Executes futures on the default executor for the current execution context.
///
/// `DefaultExecutor` implements `Executor` and can be used to spawn futures
/// without referencing a specific executor.
///
/// When an executor starts, it sets the `DefaultExecutor` handle to point to an
/// executor (usually itself) that is used to spawn new tasks.
///
/// The current `DefaultExecutor` reference is tracked using a thread-local
/// variable and is set using `tokio_executor::with_default`
#[derive(Debug, Clone)]
pub struct DefaultExecutor {
    _dummy: (),
}

impl DefaultExecutor {
    /// Returns a handle to the default executor for the current context.
    ///
    /// Futures may be spawned onto the default executor using this handle.
    ///
    /// The returned handle will reference whichever executor is configured as
    /// the default **at the time `spawn` is called**. This enables
    /// `DefaultExecutor::current()` to be called before an execution context is
    /// setup, then passed **into** an execution context before it is used.
    ///
    /// This is also true for sending the handle across threads, so calling
    /// `DefaultExecutor::current()` on thread A and then sending the result to
    /// thread B will _not_ reference the default executor that was set on thread A.
    pub fn current() -> DefaultExecutor {
        DefaultExecutor {
            _dummy: (),
        }
    }

    #[inline]
    fn with_current<F: FnOnce(&mut Executor) -> R, R>(f: F) -> Option<R> {
        EXECUTOR.with(|current_executor| {
            match current_executor.replace(State::Active) {
                State::Ready(executor_ptr) => {
                    let executor = unsafe { &mut *executor_ptr };
                    let result = f(executor);
                    current_executor.set(State::Ready(executor_ptr));
                    Some(result)
                },
                State::Empty | State::Active => None,
            }
        })
    }
}

#[derive(Clone, Copy)]
enum State {
    // default executor not defined
    Empty,
    // default executor is defined and ready to be used
    Ready(*mut Executor),
    // default executor is currently active (used to detect recursive calls)
    Active
} 

/// Thread-local tracking the current executor
thread_local!(static EXECUTOR: Cell<State> = Cell::new(State::Empty));

// ===== impl DefaultExecutor =====

impl super::Executor for DefaultExecutor {
    fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
        -> Result<(), SpawnError>
    {
        DefaultExecutor::with_current(|executor| executor.spawn(future))
            .unwrap_or_else(|| Err(SpawnError::shutdown()))
    }

    #[cfg(feature = "unstable-futures")]
    fn spawn2(&mut self, future: Box<futures2::Future<Item = (), Error = futures2::Never> + Send>)
             -> Result<(), futures2::executor::SpawnError>
    {
        DefaultExecutor::with_current(|executor| executor.spawn2(future))
            .unwrap_or_else(|| Err(futures2::executor::SpawnError::shutdown()))
    }

    fn status(&self) -> Result<(), SpawnError> {
        DefaultExecutor::with_current(|executor| executor.status())
            .unwrap_or_else(|| Err(SpawnError::shutdown()))
    }
}

// ===== global spawn fns =====

/// Submits a future for execution on the default executor -- usually a
/// threadpool.
///
/// Futures are lazy constructs. When they are defined, no work happens. In
/// order for the logic defined by the future to be run, the future must be
/// spawned on an executor. This function is the easiest way to do so.
///
/// This function must be called from an execution context, i.e. from a future
/// that has been already spawned onto an executor.
///
/// Once spawned, the future will execute. The details of how that happens is
/// left up to the executor instance. If the executor is a thread pool, the
/// future will be pushed onto a queue that a worker thread polls from. If the
/// executor is a "current thread" executor, the future might be polled
/// immediately from within the call to `spawn` or it might be pushed onto an
/// internal queue.
///
/// # Panics
///
/// This function will panic if the default executor is not set or if spawning
/// onto the default executor returns an error. To avoid the panic, use the
/// `DefaultExecutor` handle directly.
///
/// # Examples
///
/// ```rust
/// # extern crate futures;
/// # extern crate tokio_executor;
/// # use tokio_executor::spawn;
/// # pub fn dox() {
/// use futures::future::lazy;
///
/// spawn(lazy(|| {
///     println!("running on the default executor");
///     Ok(())
/// }));
/// # }
/// # pub fn main() {}
/// ```
pub fn spawn<T>(future: T)
    where T: Future<Item = (), Error = ()> + Send + 'static,
{
    DefaultExecutor::current().spawn(Box::new(future))
        .unwrap()
}

/// Like `spawn` but compatible with futures 0.2
#[cfg(feature = "unstable-futures")]
pub fn spawn2<T>(future: T)
    where T: futures2::Future<Item = (), Error = futures2::Never> + Send + 'static,
{
    DefaultExecutor::current().spawn2(Box::new(future))
        .unwrap()
}

/// Set the default executor for the duration of the closure
///
/// # Panics
///
/// This function panics if there already is a default executor set.
pub fn with_default<T, F, R>(executor: &mut T, enter: &mut Enter, f: F) -> R
where T: Executor,
      F: FnOnce(&mut Enter) -> R
{
    EXECUTOR.with(|cell| {
        match cell.get() {
            State::Ready(_) | State::Active =>
                panic!("default executor already set for execution context"),
            _ => {}
        }

        // Ensure that the executor is removed from the thread-local context
        // when leaving the scope. This handles cases that involve panicking.
        struct Reset<'a>(&'a Cell<State>);

        impl<'a> Drop for Reset<'a> {
            fn drop(&mut self) {
                self.0.set(State::Empty);
            }
        }

        let _reset = Reset(cell);

        // While scary, this is safe. The function takes a
        // `&mut Executor`, which guarantees that the reference lives for the
        // duration of `with_default`.
        //
        // Because we are always clearing the TLS value at the end of the
        // function, we can cast the reference to 'static which thread-local
        // cells require.
        let executor = unsafe { hide_lt(executor as &mut _ as *mut _) };

        cell.set(State::Ready(executor));

        f(enter)
    })
}

unsafe fn hide_lt<'a>(p: *mut (Executor + 'a)) -> *mut (Executor + 'static) {
    use std::mem;
    mem::transmute(p)
}

#[cfg(test)]
mod tests {
    use super::{Executor, DefaultExecutor, with_default};

    #[test]
    fn default_executor_is_send_and_sync() {
        fn assert_send_sync<T: Send + Sync>() {}

        assert_send_sync::<DefaultExecutor>();
    }

    #[test]
    fn nested_default_executor_status() {
        let mut enter = super::super::enter().unwrap();
        let mut executor = DefaultExecutor::current();

        let result = with_default(&mut executor, &mut enter, |_| {
            DefaultExecutor::current().status()
        });

        assert!(result.err().unwrap().is_shutdown())
    }
}