use crate::time::driver::{Handle, TimerEntry};
use crate::time::{error::Error, Duration, Instant};
use crate::util::trace;
use pin_project_lite::pin_project;
use std::future::Future;
use std::panic::Location;
use std::pin::Pin;
use std::task::{self, Poll};
cfg_trace! {
use crate::time::driver::ClockTime;
}
#[cfg_attr(docsrs, doc(alias = "delay_until"))]
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn sleep_until(deadline: Instant) -> Sleep {
return Sleep::new_timeout(deadline, trace::caller_location());
}
#[cfg_attr(docsrs, doc(alias = "delay_for"))]
#[cfg_attr(docsrs, doc(alias = "wait"))]
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn sleep(duration: Duration) -> Sleep {
let location = trace::caller_location();
match Instant::now().checked_add(duration) {
Some(deadline) => Sleep::new_timeout(deadline, location),
None => Sleep::new_timeout(Instant::far_future(), location),
}
}
pin_project! {
#[cfg_attr(docsrs, doc(alias = "Delay"))]
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Sleep {
inner: Inner,
#[pin]
entry: TimerEntry,
}
}
cfg_trace! {
#[derive(Debug)]
struct Inner {
deadline: Instant,
resource_span: tracing::Span,
async_op_span: tracing::Span,
time_source: ClockTime,
}
}
cfg_not_trace! {
#[derive(Debug)]
struct Inner {
deadline: Instant,
}
}
impl Sleep {
#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
pub(crate) fn new_timeout(
deadline: Instant,
location: Option<&'static Location<'static>>,
) -> Sleep {
let handle = Handle::current();
let entry = TimerEntry::new(&handle, deadline);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let time_source = handle.time_source().clone();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.checked_sub(time_source.now()).unwrap_or(0);
#[cfg(tokio_track_caller)]
let location = location.expect("should have location if tracking caller");
#[cfg(tokio_track_caller)]
let resource_span = tracing::trace_span!(
"runtime.resource",
concrete_type = "Sleep",
kind = "timer",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);
#[cfg(not(tokio_track_caller))]
let resource_span =
tracing::trace_span!("runtime.resource", concrete_type = "Sleep", kind = "timer");
let async_op_span =
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout");
tracing::trace!(
target: "runtime::resource::state_update",
parent: resource_span.id(),
duration = duration,
duration.unit = "ms",
duration.op = "override",
);
Inner {
deadline,
resource_span,
async_op_span,
time_source,
}
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = Inner { deadline };
Sleep { inner, entry }
}
pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {
Self::new_timeout(Instant::far_future(), location)
}
pub fn deadline(&self) -> Instant {
self.inner.deadline
}
pub fn is_elapsed(&self) -> bool {
self.entry.is_elapsed()
}
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
self.reset_inner(deadline)
}
fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let me = self.project();
me.entry.reset(deadline);
(*me.inner).deadline = deadline;
#[cfg(all(tokio_unstable, feature = "tracing"))]
{
me.inner.async_op_span =
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::reset");
tracing::trace!(
target: "runtime::resource::state_update",
parent: me.inner.resource_span.id(),
duration = {
let now = me.inner.time_source.now();
let deadline_tick = me.inner.time_source.deadline_to_tick(deadline);
deadline_tick.checked_sub(now).unwrap_or(0)
},
duration.unit = "ms",
duration.op = "override",
);
}
}
cfg_not_trace! {
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();
let coop = ready!(crate::coop::poll_proceed(cx));
me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
})
}
}
cfg_trace! {
fn poll_elapsed(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
let me = self.project();
let coop = ready!(trace_poll_op!(
"poll_elapsed",
crate::coop::poll_proceed(cx),
me.inner.resource_span.id(),
));
let result = me.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
});
trace_poll_op!("poll_elapsed", result, me.inner.resource_span.id())
}
}
}
impl Future for Sleep {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _span = self.inner.async_op_span.clone().entered();
match ready!(self.as_mut().poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
}
}