use crate::future::poll_fn;
use crate::time::{sleep_until, Duration, Instant, Sleep};
use crate::util::trace;
use std::future::Future;
use std::panic::Location;
use std::pin::Pin;
use std::task::{Context, Poll};
#[track_caller]
pub fn interval(period: Duration) -> Interval {
assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
internal_interval_at(Instant::now(), period, trace::caller_location())
}
#[track_caller]
pub fn interval_at(start: Instant, period: Duration) -> Interval {
assert!(period > Duration::new(0, 0), "`period` must be non-zero.");
internal_interval_at(start, period, trace::caller_location())
}
#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
fn internal_interval_at(
start: Instant,
period: Duration,
location: Option<&'static Location<'static>>,
) -> Interval {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = {
let location = location.expect("should have location if tracing");
tracing::trace_span!(
"runtime.resource",
concrete_type = "Interval",
kind = "timer",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
)
};
#[cfg(all(tokio_unstable, feature = "tracing"))]
let delay = resource_span.in_scope(|| Box::pin(sleep_until(start)));
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let delay = Box::pin(sleep_until(start));
Interval {
delay,
period,
missed_tick_behavior: Default::default(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MissedTickBehavior {
Burst,
Delay,
Skip,
}
impl MissedTickBehavior {
fn next_timeout(&self, timeout: Instant, now: Instant, period: Duration) -> Instant {
match self {
Self::Burst => timeout + period,
Self::Delay => now + period,
Self::Skip => {
now + period
- Duration::from_nanos(
((now - timeout).as_nanos() % period.as_nanos())
.try_into()
.expect(
"too much time has elapsed since the interval was supposed to tick",
),
)
}
}
}
}
impl Default for MissedTickBehavior {
fn default() -> Self {
Self::Burst
}
}
#[derive(Debug)]
pub struct Interval {
delay: Pin<Box<Sleep>>,
period: Duration,
missed_tick_behavior: MissedTickBehavior,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
}
impl Interval {
pub async fn tick(&mut self) -> Instant {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = self.resource_span.clone();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let instant = trace::async_op(
|| poll_fn(|cx| self.poll_tick(cx)),
resource_span,
"Interval::tick",
"poll_tick",
false,
);
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let instant = poll_fn(|cx| self.poll_tick(cx));
instant.await
}
pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
ready!(Pin::new(&mut self.delay).poll(cx));
let timeout = self.delay.deadline();
let now = Instant::now();
let next = if now > timeout + Duration::from_millis(5) {
self.missed_tick_behavior
.next_timeout(timeout, now, self.period)
} else {
timeout + self.period
};
self.delay.as_mut().reset_without_reregister(next);
Poll::Ready(timeout)
}
pub fn reset(&mut self) {
self.delay.as_mut().reset(Instant::now() + self.period);
}
pub fn reset_immediately(&mut self) {
self.delay.as_mut().reset(Instant::now());
}
pub fn reset_after(&mut self, after: Duration) {
self.delay.as_mut().reset(Instant::now() + after);
}
pub fn reset_at(&mut self, deadline: Instant) {
self.delay.as_mut().reset(deadline);
}
pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
self.missed_tick_behavior
}
pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
self.missed_tick_behavior = behavior;
}
pub fn period(&self) -> Duration {
self.period
}
}