use Error;
use atomic::AtomicU64;
use timer::{Handle, Inner};
use futures::Poll;
use futures::task::AtomicTask;
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::{Arc, Weak};
use std::sync::atomic::{AtomicBool, AtomicPtr};
use std::sync::atomic::Ordering::SeqCst;
use std::time::Instant;
use std::u64;
#[derive(Debug)]
pub(crate) struct Entry {
inner: Weak<Inner>,
task: AtomicTask,
state: AtomicU64,
counted: bool,
queued: AtomicBool,
next_atomic: UnsafeCell<*mut Entry>,
when: UnsafeCell<Option<u64>>,
next_stack: UnsafeCell<Option<Arc<Entry>>>,
prev_stack: UnsafeCell<*const Entry>,
}
pub(crate) struct Stack {
head: Option<Arc<Entry>>,
}
#[derive(Debug)]
pub(crate) struct AtomicStack {
head: AtomicPtr<Entry>,
}
#[derive(Debug)]
pub(crate) struct AtomicStackEntries {
ptr: *mut Entry,
}
const ELAPSED: u64 = 1 << 63;
const ERROR: u64 = u64::MAX;
const SHUTDOWN: *mut Entry = 1 as *mut _;
impl Entry {
pub fn new(when: u64, handle: Handle) -> Entry {
assert!(when > 0 && when < u64::MAX);
Entry {
inner: handle.into_inner(),
task: AtomicTask::new(),
state: AtomicU64::new(when),
counted: true,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
}
pub fn new_elapsed(handle: Handle) -> Entry {
Entry {
inner: handle.into_inner(),
task: AtomicTask::new(),
state: AtomicU64::new(ELAPSED),
counted: true,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
}
pub fn new_error() -> Entry {
Entry {
inner: Weak::new(),
task: AtomicTask::new(),
state: AtomicU64::new(ERROR),
counted: false,
queued: AtomicBool::new(false),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
prev_stack: UnsafeCell::new(ptr::null_mut()),
}
}
pub fn when_internal(&self) -> Option<u64> {
unsafe { (*self.when.get()) }
}
pub fn set_when_internal(&self, when: Option<u64>) {
unsafe { (*self.when.get()) = when; }
}
pub fn load_state(&self) -> Option<u64> {
let state = self.state.load(SeqCst);
if is_elapsed(state) {
None
} else {
Some(state)
}
}
pub fn is_elapsed(&self) -> bool {
let state = self.state.load(SeqCst);
is_elapsed(state)
}
pub fn fire(&self, when: u64) {
let mut curr = self.state.load(SeqCst);
loop {
if is_elapsed(curr) || curr > when {
return;
}
let next = ELAPSED | curr;
let actual = self.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
self.task.notify();
}
pub fn error(&self) {
let mut curr = self.state.load(SeqCst);
loop {
if is_elapsed(curr) {
return;
}
let next = ERROR;
let actual = self.state.compare_and_swap(curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
self.task.notify();
}
pub fn cancel(entry: &Arc<Entry>) {
let state = entry.state.fetch_or(ELAPSED, SeqCst);
if is_elapsed(state) {
return;
}
let inner = match entry.inner.upgrade() {
Some(inner) => inner,
None => return,
};
let _ = inner.queue(entry);
}
pub fn poll_elapsed(&self) -> Poll<(), Error> {
use futures::Async::NotReady;
let mut curr = self.state.load(SeqCst);
if is_elapsed(curr) {
if curr == ERROR {
return Err(Error::shutdown());
} else {
return Ok(().into());
}
}
self.task.register();
curr = self.state.load(SeqCst).into();
if is_elapsed(curr) {
if curr == ERROR {
return Err(Error::shutdown());
} else {
return Ok(().into());
}
}
Ok(NotReady)
}
pub fn reset(entry: &Arc<Entry>, deadline: Instant) {
let inner = match entry.inner.upgrade() {
Some(inner) => inner,
None => return,
};
let when = inner.normalize_deadline(deadline);
let elapsed = inner.elapsed();
let mut curr = entry.state.load(SeqCst);
let mut notify;
loop {
if curr == ERROR || curr == when {
return;
}
let next;
if when <= elapsed {
next = ELAPSED;
notify = !is_elapsed(curr);
} else {
next = when;
notify = true;
}
let actual = entry.state.compare_and_swap(
curr, next, SeqCst);
if curr == actual {
break;
}
curr = actual;
}
if notify {
let _ = inner.queue(entry);
}
}
}
fn is_elapsed(state: u64) -> bool {
state & ELAPSED == ELAPSED
}
impl Drop for Entry {
fn drop(&mut self) {
if !self.counted {
return;
}
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
inner.decrement();
}
}
unsafe impl Send for Entry {}
unsafe impl Sync for Entry {}
impl Stack {
pub fn new() -> Stack {
Stack { head: None }
}
pub fn is_empty(&self) -> bool {
self.head.is_none()
}
pub fn push(&mut self, entry: Arc<Entry>) {
let ptr: *const Entry = &*entry as *const _;
let old = self.head.take();
unsafe {
debug_assert!((*entry.next_stack.get()).is_none());
debug_assert!((*entry.prev_stack.get()).is_null());
if let Some(ref entry) = old.as_ref() {
debug_assert!({
ptr != &***entry as *const _
});
*entry.prev_stack.get() = ptr;
}
*entry.next_stack.get() = old;
}
self.head = Some(entry);
}
pub fn pop(&mut self) -> Option<Arc<Entry>> {
let entry = self.head.take();
unsafe {
if let Some(entry) = entry.as_ref() {
self.head = (*entry.next_stack.get()).take();
if let Some(entry) = self.head.as_ref() {
*entry.prev_stack.get() = ptr::null();
}
*entry.prev_stack.get() = ptr::null();
}
}
entry
}
pub fn remove(&mut self, entry: &Entry) {
unsafe {
debug_assert!({
let mut next = self.head.as_ref();
let mut contains = false;
while let Some(n) = next {
if entry as *const _ == &**n as *const _ {
debug_assert!(!contains);
contains = true;
}
next = (*n.next_stack.get()).as_ref();
}
contains
});
let next = (*entry.next_stack.get()).take();
if let Some(next) = next.as_ref() {
(*next.prev_stack.get()) = *entry.prev_stack.get();
}
if let Some(prev) = (*entry.prev_stack.get()).as_ref() {
*prev.next_stack.get() = next;
} else {
self.head = next;
}
*entry.prev_stack.get() = ptr::null();
}
}
}
impl AtomicStack {
pub fn new() -> AtomicStack {
AtomicStack { head: AtomicPtr::new(ptr::null_mut()) }
}
pub fn push(&self, entry: &Arc<Entry>) -> Result<bool, Error> {
let queued = entry.queued.fetch_or(true, SeqCst).into();
if queued {
return Ok(false);
}
let ptr = Arc::into_raw(entry.clone()) as *mut _;
let mut curr = self.head.load(SeqCst);
loop {
if curr == SHUTDOWN {
let _ = unsafe { Arc::from_raw(ptr) };
return Err(Error::shutdown());
}
unsafe {
*(entry.next_atomic.get()) = curr;
}
let actual = self.head.compare_and_swap(curr, ptr, SeqCst);
if actual == curr {
break;
}
curr = actual;
}
Ok(true)
}
pub fn take(&self) -> AtomicStackEntries {
let ptr = self.head.swap(ptr::null_mut(), SeqCst);
AtomicStackEntries { ptr }
}
pub fn shutdown(&self) {
let ptr = self.head.swap(SHUTDOWN, SeqCst);
drop(AtomicStackEntries { ptr });
}
}
impl Iterator for AtomicStackEntries {
type Item = Arc<Entry>;
fn next(&mut self) -> Option<Self::Item> {
if self.ptr.is_null() {
return None;
}
let entry = unsafe { Arc::from_raw(self.ptr) };
self.ptr = unsafe { (*entry.next_atomic.get()) };
let res = entry.queued.fetch_and(false, SeqCst);
debug_assert!(res);
Some(entry)
}
}
impl Drop for AtomicStackEntries {
fn drop(&mut self) {
while let Some(entry) = self.next() {
entry.error();
}
}
}