use super::{Handle, Reactor};
use futures::task::AtomicWaker;
use futures::{executor, Future, Poll};
use log::debug;
use std::io;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::task::Context;
use std::thread;
#[derive(Debug)]
pub struct Background {
inner: Option<Inner>,
}
#[derive(Debug)]
pub struct Shutdown {
inner: Inner,
}
#[derive(Debug)]
struct Inner {
handle: Handle,
shared: Arc<Shared>,
}
#[derive(Debug)]
struct Shared {
shutdown: AtomicUsize,
shutdown_task: AtomicWaker,
}
const SHUTDOWN_IDLE: usize = 1;
const SHUTDOWN_NOW: usize = 2;
const SHUTDOWN: usize = 3;
impl Background {
pub(super) fn new(reactor: Reactor) -> io::Result<Background> {
let handle = reactor.handle().clone();
let shared = Arc::new(Shared {
shutdown: AtomicUsize::new(0),
shutdown_task: AtomicWaker::new(),
});
let shared2 = shared.clone();
thread::Builder::new().spawn(move || run(reactor, shared2))?;
Ok(Background {
inner: Some(Inner { handle, shared }),
})
}
pub fn forget(mut self) {
drop(self.inner.take());
}
}
impl Drop for Background {
fn drop(&mut self) {
let inner = match self.inner.take() {
Some(i) => i,
None => return,
};
inner.shutdown_now();
let shutdown = Shutdown { inner };
let _ = executor::block_on(shutdown);
}
}
impl Future for Shutdown {
type Output = Result<(), ()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.shared.shutdown_task.register(&cx.waker());
if !self.inner.is_shutdown() {
return Poll::Pending;
}
Poll::Ready(Ok(()))
}
}
impl Inner {
fn is_shutdown(&self) -> bool {
self.shared.shutdown.load(SeqCst) == SHUTDOWN
}
fn shutdown_now(&self) {
let mut curr = self.shared.shutdown.load(SeqCst);
loop {
if curr >= SHUTDOWN_NOW {
return;
}
let act = self
.shared
.shutdown
.compare_and_swap(curr, SHUTDOWN_NOW, SeqCst);
if act == curr {
self.handle.wakeup();
return;
}
curr = act;
}
}
}
fn run(mut reactor: Reactor, shared: Arc<Shared>) {
debug!("starting background reactor");
loop {
let shutdown = shared.shutdown.load(SeqCst);
if shutdown == SHUTDOWN_NOW {
debug!("shutting background reactor down NOW");
break;
}
if shutdown == SHUTDOWN_IDLE && reactor.is_idle() {
debug!("shutting background reactor on idle");
break;
}
reactor.turn(None).unwrap();
}
drop(reactor);
shared.shutdown.store(SHUTDOWN, SeqCst);
shared.shutdown_task.wake();
debug!("background reactor has shutdown");
}