#![deny(missing_docs)]
#![warn(missing_debug_implementations)]
use std::cmp::Ordering;
use std::mem;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Mutex, Weak};
use std::task::{Context, Poll};
use std::time::Instant;
use std::fmt;
use futures::prelude::*;
use futures::task::AtomicWaker;
use arc_list::{ArcList, Node};
use heap::{Heap, Slot};
mod arc_list;
mod global;
mod heap;
pub mod ext;
pub use ext::FutureExt;
pub struct Timer {
inner: Arc<Inner>,
timer_heap: Heap<HeapTimer>,
}
#[derive(Clone)]
pub struct TimerHandle {
inner: Weak<Inner>,
}
mod delay;
mod interval;
pub use self::delay::Delay;
pub use self::interval::Interval;
struct Inner {
list: ArcList<ScheduledTimer>,
waker: AtomicWaker,
}
struct ScheduledTimer {
waker: AtomicWaker,
state: AtomicUsize,
inner: Weak<Inner>,
at: Mutex<Option<Instant>>,
slot: Mutex<Option<Slot>>,
}
struct HeapTimer {
at: Instant,
gen: usize,
node: Arc<Node<ScheduledTimer>>,
}
impl Timer {
pub fn new() -> Timer {
Timer {
inner: Arc::new(Inner {
list: ArcList::new(),
waker: AtomicWaker::new(),
}),
timer_heap: Heap::new(),
}
}
pub fn handle(&self) -> TimerHandle {
TimerHandle {
inner: Arc::downgrade(&self.inner),
}
}
pub fn next_event(&self) -> Option<Instant> {
self.timer_heap.peek().map(|t| t.at)
}
pub fn advance(&mut self) {
self.advance_to(Instant::now())
}
pub fn advance_to(&mut self, now: Instant) {
loop {
match self.timer_heap.peek() {
Some(head) if head.at <= now => {}
Some(_) => break,
None => break,
};
let heap_timer = self.timer_heap.pop().unwrap();
*heap_timer.node.slot.lock().unwrap() = None;
let bits = heap_timer.gen << 2;
match heap_timer
.node
.state
.compare_exchange(bits, bits | 0b01, SeqCst, SeqCst)
{
Ok(_) => heap_timer.node.waker.wake(),
Err(_b) => {}
}
}
}
fn update_or_add(&mut self, at: Instant, node: Arc<Node<ScheduledTimer>>) {
let gen = node.state.load(SeqCst) >> 2;
let mut slot = node.slot.lock().unwrap();
if let Some(heap_slot) = slot.take() {
self.timer_heap.remove(heap_slot);
}
*slot = Some(self.timer_heap.push(HeapTimer {
at: at,
gen: gen,
node: node.clone(),
}));
}
fn remove(&mut self, node: Arc<Node<ScheduledTimer>>) {
let mut slot = node.slot.lock().unwrap();
let heap_slot = match slot.take() {
Some(slot) => slot,
None => return,
};
self.timer_heap.remove(heap_slot);
}
fn invalidate(&mut self, node: Arc<Node<ScheduledTimer>>) {
node.state.fetch_or(0b10, SeqCst);
node.waker.wake();
}
}
impl Future for Timer {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.inner).waker.register(cx.waker());
let mut list = self.inner.list.take();
while let Some(node) = list.pop() {
let at = *node.at.lock().unwrap();
match at {
Some(at) => self.update_or_add(at, node),
None => self.remove(node),
}
}
Poll::Pending
}
}
impl Drop for Timer {
fn drop(&mut self) {
let mut list = self.inner.list.take_and_seal();
while let Some(t) = list.pop() {
self.invalidate(t);
}
while let Some(t) = self.timer_heap.pop() {
self.invalidate(t.node);
}
}
}
impl fmt::Debug for Timer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Timer").field("heap", &"...").finish()
}
}
impl PartialEq for HeapTimer {
fn eq(&self, other: &HeapTimer) -> bool {
self.at == other.at
}
}
impl Eq for HeapTimer {}
impl PartialOrd for HeapTimer {
fn partial_cmp(&self, other: &HeapTimer) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapTimer {
fn cmp(&self, other: &HeapTimer) -> Ordering {
self.at.cmp(&other.at)
}
}
static HANDLE_FALLBACK: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Debug)]
pub struct SetDefaultError(());
impl TimerHandle {
pub fn set_as_global_fallback(self) -> Result<(), SetDefaultError> {
unsafe {
let val = self.into_usize();
match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
Ok(_) => Ok(()),
Err(_) => {
drop(TimerHandle::from_usize(val));
Err(SetDefaultError(()))
}
}
}
}
fn into_usize(self) -> usize {
unsafe { mem::transmute::<Weak<Inner>, usize>(self.inner) }
}
unsafe fn from_usize(val: usize) -> TimerHandle {
let inner = mem::transmute::<usize, Weak<Inner>>(val);;
TimerHandle { inner }
}
}
impl Default for TimerHandle {
fn default() -> TimerHandle {
let mut fallback = HANDLE_FALLBACK.load(SeqCst);
if fallback == 0 {
let helper = match global::HelperThread::new() {
Ok(helper) => helper,
Err(_) => return TimerHandle { inner: Weak::new() },
};
if helper.handle().set_as_global_fallback().is_ok() {
let ret = helper.handle();
helper.forget();
return ret;
}
fallback = HANDLE_FALLBACK.load(SeqCst);
}
assert!(fallback != 0);
unsafe {
let handle = TimerHandle::from_usize(fallback);
let ret = handle.clone();
drop(handle.into_usize());
return ret;
}
}
}
impl fmt::Debug for TimerHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("TimerHandle").field("inner", &"...").finish()
}
}