use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::prelude::*;
use pin_utils::unsafe_pinned;
use crate::Delay;
pub trait FutureExt: TryFuture + Sized {
fn timeout(self, dur: Duration) -> Timeout<Self>
where
Self::Error: From<io::Error>,
{
Timeout {
timeout: Delay::new(dur),
future: self,
}
}
fn timeout_at(self, at: Instant) -> Timeout<Self>
where
Self::Error: From<io::Error>,
{
Timeout {
timeout: Delay::new_at(at),
future: self,
}
}
}
impl<F: TryFuture> FutureExt for F {}
#[derive(Debug)]
pub struct Timeout<F>
where
F: TryFuture,
F::Error: From<io::Error>,
{
future: F,
timeout: Delay,
}
impl<F> Timeout<F>
where
F: TryFuture,
F::Error: From<io::Error>,
{
unsafe_pinned!(future: F);
unsafe_pinned!(timeout: Delay);
}
impl<F> Future for Timeout<F>
where
F: TryFuture,
F::Error: From<io::Error>,
{
type Output = Result<F::Ok, F::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().future().try_poll(cx) {
Poll::Pending => {}
other => return other,
}
if self.timeout().poll(cx).is_ready() {
let err = Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out").into());
Poll::Ready(err)
} else {
Poll::Pending
}
}
}
pub trait StreamExt: TryStream + Sized {
fn timeout(self, dur: Duration) -> TimeoutStream<Self>
where
Self::Error: From<io::Error>,
{
TimeoutStream {
timeout: Delay::new(dur),
dur,
stream: self,
}
}
}
impl<S: TryStream> StreamExt for S {}
#[derive(Debug)]
pub struct TimeoutStream<S>
where
S: TryStream,
S::Error: From<io::Error>,
{
timeout: Delay,
dur: Duration,
stream: S,
}
impl<S> TimeoutStream<S>
where
S: TryStream,
S::Error: From<io::Error>,
{
unsafe_pinned!(timeout: Delay);
unsafe_pinned!(stream: S);
}
impl<S> TryStream for TimeoutStream<S>
where
S: TryStream,
S::Error: From<io::Error>,
{
type Ok = S::Ok;
type Error = S::Error;
fn try_poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Ok, Self::Error>>> {
let dur = self.dur;
let r = self.as_mut().stream().try_poll_next(cx);
match r {
Poll::Pending => {}
other => {
self.as_mut().timeout().reset(dur);
return other;
}
}
if self.as_mut().timeout().poll(cx).is_ready() {
self.as_mut().timeout().reset(dur);
Poll::Ready(Some(Err(io::Error::new(
io::ErrorKind::TimedOut,
"stream item timed out",
)
.into())))
} else {
Poll::Pending
}
}
}