use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::reactor::Reactor;
pub fn pair() -> (Parker, Unparker) {
let p = Parker::new();
let u = p.unparker();
(p, u)
}
#[derive(Debug)]
pub struct Parker {
inner: parking::Parker,
io: Arc<AtomicBool>,
}
impl Parker {
pub fn new() -> Parker {
let inner = parking::Parker::new();
let io = Arc::new(AtomicBool::new(false));
Reactor::get().increment_parkers();
Parker { inner, io }
}
pub fn park(&self) {
self.park_inner(None);
}
pub fn park_timeout(&self, timeout: Duration) -> bool {
self.park_inner(Some(timeout))
}
pub fn park_deadline(&self, deadline: Instant) -> bool {
self.park_inner(Some(deadline.saturating_duration_since(Instant::now())))
}
pub fn unpark(&self) {
if self.inner.unpark() && self.io.load(Ordering::SeqCst) {
Reactor::get().notify();
}
}
pub fn unparker(&self) -> Unparker {
Unparker {
inner: self.inner.unparker(),
io: self.io.clone(),
}
}
fn park_inner(&self, timeout: Option<Duration>) -> bool {
if self.inner.park_timeout(Duration::from_secs(0)) {
if let Some(reactor_lock) = Reactor::get().try_lock() {
let _ = reactor_lock.react(Some(Duration::from_secs(0)));
}
return true;
}
if let Some(dur) = timeout {
if dur == Duration::from_secs(0) {
if let Some(reactor_lock) = Reactor::get().try_lock() {
let _ = reactor_lock.react(Some(Duration::from_secs(0)));
}
return false;
}
}
let deadline = timeout.map(|t| Instant::now() + t);
loop {
match Reactor::get().try_lock() {
None => {
if let Some(deadline) = deadline {
return self.inner.park_deadline(deadline);
} else {
self.inner.park();
return true;
}
}
Some(reactor_lock) => {
self.io.store(true, Ordering::SeqCst);
if self.inner.park_timeout(Duration::from_secs(0)) {
self.io.store(false, Ordering::SeqCst);
return true;
}
let timeout = deadline.map(|d| d.saturating_duration_since(Instant::now()));
let _ = reactor_lock.react(timeout);
self.io.store(false, Ordering::SeqCst);
if self.inner.park_timeout(Duration::from_secs(0)) {
return true;
}
if let Some(deadline) = deadline {
if Instant::now() >= deadline {
return false;
}
}
}
}
}
}
}
impl Drop for Parker {
fn drop(&mut self) {
Reactor::get().decrement_parkers();
}
}
impl Default for Parker {
fn default() -> Parker {
Parker::new()
}
}
#[derive(Clone, Debug)]
pub struct Unparker {
inner: parking::Unparker,
io: Arc<AtomicBool>,
}
impl Unparker {
pub fn unpark(&self) {
if self.inner.unpark() && self.io.load(Ordering::SeqCst) {
Reactor::get().notify();
}
}
}