use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
pub trait Park {
type Unpark: Unpark;
type Error;
fn unpark(&self) -> Self::Unpark;
fn park(&mut self) -> Result<(), Self::Error>;
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;
}
pub trait Unpark: Sync + Send + 'static {
fn unpark(&self);
}
impl Unpark for Box<Unpark> {
fn unpark(&self) {
(**self).unpark()
}
}
#[derive(Debug)]
pub struct ParkThread {
_anchor: PhantomData<Rc<()>>,
}
#[derive(Debug)]
pub struct ParkError {
_p: (),
}
#[derive(Clone, Debug)]
pub struct UnparkThread {
inner: Arc<Inner>,
}
#[derive(Debug)]
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}
const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;
thread_local! {
static CURRENT_PARK_THREAD: Arc<Inner> = Arc::new(Inner {
state: AtomicUsize::new(IDLE),
mutex: Mutex::new(()),
condvar: Condvar::new(),
});
}
impl ParkThread {
pub fn new() -> ParkThread {
ParkThread {
_anchor: PhantomData,
}
}
fn with_current<F, R>(&self, f: F) -> R
where F: FnOnce(&Arc<Inner>) -> R,
{
CURRENT_PARK_THREAD.with(|inner| f(inner))
}
}
impl Park for ParkThread {
type Unpark = UnparkThread;
type Error = ParkError;
fn unpark(&self) -> Self::Unpark {
let inner = self.with_current(|inner| inner.clone());
UnparkThread { inner }
}
fn park(&mut self) -> Result<(), Self::Error> {
self.with_current(|inner| inner.park(None))
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.with_current(|inner| inner.park(Some(duration)))
}
}
impl Unpark for UnparkThread {
fn unpark(&self) {
self.inner.unpark();
}
}
impl Inner {
fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
NOTIFY => return Ok(()),
IDLE => {},
_ => unreachable!(),
}
let mut m = self.mutex.lock().unwrap();
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
NOTIFY => {
self.state.store(IDLE, Ordering::SeqCst);
return Ok(());
}
IDLE => {},
_ => unreachable!(),
}
m = match timeout {
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
None => self.condvar.wait(m).unwrap(),
};
self.state.store(IDLE, Ordering::SeqCst);
drop(m);
Ok(())
}
fn unpark(&self) {
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}
let _m = self.mutex.lock().unwrap();
match self.state.swap(NOTIFY, Ordering::SeqCst) {
SLEEP => {}
NOTIFY => return,
IDLE => return,
_ => unreachable!(),
}
self.condvar.notify_one();
}
}