use http::{Request, Response, StatusCode};
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::time::Sleep;
use tower_layer::Layer;
use tower_service::Service;
#[derive(Debug, Clone, Copy)]
pub struct TimeoutLayer {
timeout: Duration,
}
impl TimeoutLayer {
pub fn new(timeout: Duration) -> Self {
TimeoutLayer { timeout }
}
}
impl<S> Layer<S> for TimeoutLayer {
type Service = Timeout<S>;
fn layer(&self, inner: S) -> Self::Service {
Timeout::new(inner, self.timeout)
}
}
#[derive(Debug, Clone, Copy)]
pub struct Timeout<S> {
inner: S,
timeout: Duration,
}
impl<S> Timeout<S> {
pub fn new(inner: S, timeout: Duration) -> Self {
Self { inner, timeout }
}
define_inner_service_accessors!();
pub fn layer(timeout: Duration) -> TimeoutLayer {
TimeoutLayer::new(timeout)
}
}
impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for Timeout<S>
where
S: Service<Request<ReqBody>, Response = Response<ResBody>>,
ResBody: Default,
{
type Response = S::Response;
type Error = S::Error;
type Future = ResponseFuture<S::Future>;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
let sleep = tokio::time::sleep(self.timeout);
ResponseFuture {
inner: self.inner.call(req),
sleep,
}
}
}
pin_project! {
pub struct ResponseFuture<F> {
#[pin]
inner: F,
#[pin]
sleep: Sleep,
}
}
impl<F, B, E> Future for ResponseFuture<F>
where
F: Future<Output = Result<Response<B>, E>>,
B: Default,
{
type Output = Result<Response<B>, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.sleep.poll(cx).is_ready() {
let mut res = Response::new(B::default());
*res.status_mut() = StatusCode::REQUEST_TIMEOUT;
return Poll::Ready(Ok(res));
}
this.inner.poll(cx)
}
}