#![allow(unused_unsafe)]
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
mod entry;
pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
mod handle;
pub(crate) use self::handle::Handle;
mod wheel;
pub(super) mod sleep;
use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::time::error::Error;
use crate::time::{Clock, Duration, Instant};
use std::convert::TryInto;
use std::fmt;
use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
#[derive(Debug)]
pub(crate) struct Driver<P: Park + 'static> {
time_source: ClockTime,
handle: Handle,
park: P,
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}
#[derive(Debug, Clone)]
pub(self) struct ClockTime {
clock: super::clock::Clock,
start_time: Instant,
}
impl ClockTime {
pub(self) fn new(clock: Clock) -> Self {
Self {
start_time: clock.now(),
clock,
}
}
pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
self.instant_to_tick(t + Duration::from_nanos(999_999))
}
pub(self) fn instant_to_tick(&self, t: Instant) -> u64 {
let dur: Duration = t
.checked_duration_since(self.start_time)
.unwrap_or_else(|| Duration::from_secs(0));
let ms = dur.as_millis();
ms.try_into().expect("Duration too far into the future")
}
pub(self) fn tick_to_duration(&self, t: u64) -> Duration {
Duration::from_millis(t)
}
pub(self) fn now(&self) -> u64 {
self.instant_to_tick(self.clock.now())
}
}
struct Inner {
pub(super) state: Mutex<InnerState>,
pub(super) is_shutdown: AtomicBool,
}
struct InnerState {
time_source: ClockTime,
elapsed: u64,
next_wake: Option<NonZeroU64>,
wheel: wheel::Wheel,
unpark: Box<dyn Unpark>,
}
impl<P> Driver<P>
where
P: Park + 'static,
{
pub(crate) fn new(park: P, clock: Clock) -> Driver<P> {
let time_source = ClockTime::new(clock);
let inner = Inner::new(time_source.clone(), Box::new(park.unpark()));
Driver {
time_source,
handle: Handle::new(Arc::new(inner)),
park,
#[cfg(feature = "test-util")]
did_wake: Arc::new(AtomicBool::new(false)),
}
}
pub(crate) fn handle(&self) -> Handle {
self.handle.clone()
}
fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
let mut lock = self.handle.get().state.lock();
assert!(!self.handle.is_shutdown());
let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
drop(lock);
match next_wake {
Some(when) => {
let now = self.time_source.now();
let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
if duration > Duration::from_millis(0) {
if let Some(limit) = limit {
duration = std::cmp::min(limit, duration);
}
self.park_timeout(duration)?;
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
}
None => {
if let Some(duration) = limit {
self.park_timeout(duration)?;
} else {
self.park.park()?;
}
}
}
self.handle.process();
Ok(())
}
cfg_test_util! {
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
let clock = &self.time_source.clock;
if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;
if !self.did_wake() {
clock.advance(duration);
}
} else {
self.park.park_timeout(duration)?;
}
Ok(())
}
fn did_wake(&self) -> bool {
self.did_wake.swap(false, Ordering::SeqCst)
}
}
cfg_not_test_util! {
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
self.park.park_timeout(duration)
}
}
}
impl Handle {
pub(self) fn process(&self) {
let now = self.time_source().now();
self.process_at_time(now)
}
pub(self) fn process_at_time(&self, mut now: u64) {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;
let mut lock = self.get().lock();
if now < lock.elapsed {
now = lock.elapsed;
}
while let Some(entry) = lock.wheel.poll(now) {
debug_assert!(unsafe { entry.is_pending() });
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list[waker_idx] = Some(waker);
waker_idx += 1;
if waker_idx == waker_list.len() {
drop(lock);
for waker in waker_list.iter_mut() {
waker.take().unwrap().wake();
}
waker_idx = 0;
lock = self.get().lock();
}
}
}
lock.elapsed = lock.wheel.elapsed();
lock.next_wake = lock
.wheel
.poll_at()
.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
drop(lock);
for waker in waker_list[0..waker_idx].iter_mut() {
waker.take().unwrap().wake();
}
}
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.get().lock();
if entry.as_ref().might_be_registered() {
lock.wheel.remove(entry);
}
entry.as_ref().handle().fire(Ok(()));
}
}
pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
let waker = unsafe {
let mut lock = self.get().lock();
if unsafe { entry.as_ref().might_be_registered() } {
lock.wheel.remove(entry);
}
let entry = entry.as_ref().handle();
if self.is_shutdown() {
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
} else {
entry.set_expiration(new_tick);
match unsafe { lock.wheel.insert(entry) } {
Ok(when) => {
if lock
.next_wake
.map(|next_wake| when < next_wake.get())
.unwrap_or(true)
{
lock.unpark.unpark();
}
None
}
Err((entry, super::error::InsertError::Elapsed)) => unsafe {
entry.fire(Ok(()))
},
}
}
};
if let Some(waker) = waker {
waker.wake();
}
}
}
impl<P> Park for Driver<P>
where
P: Park + 'static,
{
type Unpark = TimerUnpark<P>;
type Error = P::Error;
fn unpark(&self) -> Self::Unpark {
TimerUnpark::new(self)
}
fn park(&mut self) -> Result<(), Self::Error> {
self.park_internal(None)
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.park_internal(Some(duration))
}
fn shutdown(&mut self) {
if self.handle.is_shutdown() {
return;
}
self.handle.get().is_shutdown.store(true, Ordering::SeqCst);
self.handle.process_at_time(u64::MAX);
self.park.shutdown();
}
}
impl<P> Drop for Driver<P>
where
P: Park + 'static,
{
fn drop(&mut self) {
self.shutdown();
}
}
pub(crate) struct TimerUnpark<P: Park + 'static> {
inner: P::Unpark,
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}
impl<P: Park + 'static> TimerUnpark<P> {
fn new(driver: &Driver<P>) -> TimerUnpark<P> {
TimerUnpark {
inner: driver.park.unpark(),
#[cfg(feature = "test-util")]
did_wake: driver.did_wake.clone(),
}
}
}
impl<P: Park + 'static> Unpark for TimerUnpark<P> {
fn unpark(&self) {
#[cfg(feature = "test-util")]
self.did_wake.store(true, Ordering::SeqCst);
self.inner.unpark();
}
}
impl Inner {
pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
Inner {
state: Mutex::new(InnerState {
time_source,
elapsed: 0,
next_wake: None,
unpark,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),
}
}
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
self.state.lock()
}
pub(super) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::SeqCst)
}
}
impl fmt::Debug for Inner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Inner").finish()
}
}
#[cfg(test)]
mod tests;