use {Reactor, Handle};
use atomic_task::AtomicTask;
use futures::{Future, Async, Poll, task};
use std::io;
use std::thread;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
#[derive(Debug)]
pub struct Background {
inner: Option<Inner>,
}
#[derive(Debug)]
pub struct Shutdown {
inner: Inner,
}
#[derive(Debug)]
struct Inner {
handle: Handle,
shared: Arc<Shared>,
}
#[derive(Debug)]
struct Shared {
shutdown: AtomicUsize,
shutdown_task: AtomicTask,
}
const SHUTDOWN_IDLE: usize = 1;
const SHUTDOWN_NOW: usize = 2;
const SHUTDOWN: usize = 3;
impl Background {
pub(crate) fn new(reactor: Reactor) -> io::Result<Background> {
let handle = reactor.handle().clone();
let shared = Arc::new(Shared {
shutdown: AtomicUsize::new(0),
shutdown_task: AtomicTask::new(),
});
let shared2 = shared.clone();
thread::Builder::new()
.spawn(move || run(reactor, shared2))?;
Ok(Background {
inner: Some(Inner {
handle,
shared,
}),
})
}
pub fn handle(&self) -> &Handle {
&self.inner.as_ref().unwrap().handle
}
pub fn shutdown_on_idle(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
inner.shutdown_on_idle();
Shutdown { inner }
}
pub fn shutdown_now(mut self) -> Shutdown {
let inner = self.inner.take().unwrap();
inner.shutdown_now();
Shutdown { inner }
}
pub fn forget(mut self) {
drop(self.inner.take());
}
}
impl Drop for Background {
fn drop(&mut self) {
let inner = match self.inner.take() {
Some(i) => i,
None => return,
};
inner.shutdown_now();
let shutdown = Shutdown { inner };
let _ = shutdown.wait();
}
}
impl Future for Shutdown {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
let task = task::current();
self.inner.shared.shutdown_task.register_task(task);
if !self.inner.is_shutdown() {
return Ok(Async::NotReady);
}
Ok(().into())
}
}
impl Inner {
fn is_shutdown(&self) -> bool {
self.shared.shutdown.load(SeqCst) == SHUTDOWN
}
fn shutdown_on_idle(&self) {
self.shared.shutdown
.compare_and_swap(0, SHUTDOWN_IDLE, SeqCst);
self.handle.wakeup();
}
fn shutdown_now(&self) {
let mut curr = self.shared.shutdown.load(SeqCst);
loop {
if curr >= SHUTDOWN_NOW {
return;
}
let act = self.shared.shutdown
.compare_and_swap(curr, SHUTDOWN_NOW, SeqCst);
if act == curr {
self.handle.wakeup();
return;
}
curr = act;
}
}
}
fn run(mut reactor: Reactor, shared: Arc<Shared>) {
debug!("starting background reactor");
loop {
let shutdown = shared.shutdown.load(SeqCst);
if shutdown == SHUTDOWN_NOW {
debug!("shutting background reactor down NOW");
break;
}
if shutdown == SHUTDOWN_IDLE && reactor.is_idle() {
debug!("shutting background reactor on idle");
break;
}
reactor.turn(None).unwrap();
}
drop(reactor);
shared.shutdown.store(SHUTDOWN, SeqCst);
shared.shutdown_task.notify();
debug!("background reactor has shutdown");
}