#![forbid(future_incompatible, rust_2018_idioms)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, missing_doc_code_examples)]
#![cfg_attr(test, deny(warnings))]
#[cfg(feature = "runtime")]
use futures::compat::Future01CompatExt;
use futures::{
compat::{Compat, Compat01As03},
future::BoxFuture,
prelude::*,
stream,
task::Spawn,
};
use http_service::{Body, HttpService};
use hyper::server::{Builder as HyperBuilder, Server as HyperServer};
#[cfg(feature = "runtime")]
use std::net::SocketAddr;
use std::{
pin::Pin,
sync::Arc,
task::{self, Poll},
};
struct WrapHttpService<H> {
service: Arc<H>,
}
struct WrapConnection<H: HttpService> {
service: Arc<H>,
connection: H::Connection,
}
impl<H, Ctx> hyper::service::MakeService<Ctx> for WrapHttpService<H>
where
H: HttpService,
{
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = std::io::Error;
type Service = WrapConnection<H>;
type Future = Compat<BoxFuture<'static, Result<Self::Service, Self::Error>>>;
type MakeError = std::io::Error;
fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
let service = self.service.clone();
let error = std::io::Error::from(std::io::ErrorKind::Other);
async move {
let connection = service.connect().into_future().await.map_err(|_| error)?;
Ok(WrapConnection {
service,
connection,
})
}
.boxed()
.compat()
}
}
impl<H> hyper::service::Service for WrapConnection<H>
where
H: HttpService,
{
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = std::io::Error;
type Future = Compat<BoxFuture<'static, Result<http::Response<hyper::Body>, Self::Error>>>;
fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
let error = std::io::Error::from(std::io::ErrorKind::Other);
let req = req.map(|hyper_body| {
let stream = Compat01As03::new(hyper_body).map(|c| match c {
Ok(chunk) => Ok(chunk.into_bytes()),
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
});
Body::from_stream(stream)
});
let fut = self.service.respond(&mut self.connection, req);
async move {
let res: http::Response<_> = fut.into_future().await.map_err(|_| error)?;
Ok(res.map(|body| hyper::Body::wrap_stream(body.compat())))
}
.boxed()
.compat()
}
}
#[allow(clippy::type_complexity)] pub struct Server<I: TryStream, S, Sp> {
inner: Compat01As03<
HyperServer<
Compat<stream::MapOk<I, fn(I::Ok) -> Compat<I::Ok>>>,
WrapHttpService<S>,
Compat<Sp>,
>,
>,
}
impl<I: TryStream, S, Sp> std::fmt::Debug for Server<I, S, Sp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Server").finish()
}
}
#[allow(clippy::type_complexity)] pub struct Builder<I: TryStream, Sp> {
inner: HyperBuilder<Compat<stream::MapOk<I, fn(I::Ok) -> Compat<I::Ok>>>, Compat<Sp>>,
}
impl<I: TryStream, Sp> std::fmt::Debug for Builder<I, Sp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Builder").finish()
}
}
impl<I: TryStream> Server<I, (), ()> {
pub fn builder(incoming: I) -> Builder<I, ()> {
Builder {
inner: HyperServer::builder(Compat::new(incoming.map_ok(Compat::new as _)))
.executor(Compat::new(())),
}
}
}
impl<I: TryStream, Sp> Builder<I, Sp> {
pub fn with_spawner<Sp2>(self, new_spawner: Sp2) -> Builder<I, Sp2> {
Builder {
inner: self.inner.executor(Compat::new(new_spawner)),
}
}
pub fn serve<S: HttpService>(self, service: S) -> Server<I, S, Sp>
where
I: TryStream + Unpin,
I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Sp: Clone + Send + 'static,
for<'a> &'a Sp: Spawn,
{
Server {
inner: Compat01As03::new(self.inner.serve(WrapHttpService {
service: Arc::new(service),
})),
}
}
}
impl<I, S, Sp> Future for Server<I, S, Sp>
where
I: TryStream + Unpin,
I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
S: HttpService,
Sp: Clone + Send + 'static,
for<'a> &'a Sp: Spawn,
{
type Output = hyper::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<hyper::Result<()>> {
self.inner.poll_unpin(cx)
}
}
#[cfg(feature = "runtime")]
pub fn serve<S: HttpService>(
s: S,
addr: SocketAddr,
) -> impl Future<Output = Result<(), hyper::Error>> {
let service = WrapHttpService {
service: Arc::new(s),
};
hyper::Server::bind(&addr).serve(service).compat()
}
#[cfg(feature = "runtime")]
pub fn run<S: HttpService>(s: S, addr: SocketAddr) {
let server = serve(s, addr).map(|_| Result::<_, ()>::Ok(())).compat();
hyper::rt::run(server);
}