use std::io::{self, Read, Write, Cursor, Chain};
use std::path::Path;
use std::fs::File;
use std::time::Duration;
#[cfg(feature = "tls")] use super::net_stream::HttpsStream;
use super::data_stream::{DataStream, kill_stream};
use super::net_stream::NetStream;
use ext::ReadExt;
use http::hyper;
use http::hyper::h1::HttpReader;
use http::hyper::h1::HttpReader::*;
use http::hyper::net::{HttpStream, NetworkStream};
pub type HyperBodyReader<'a, 'b> =
self::HttpReader<&'a mut hyper::buffer::BufReader<&'b mut dyn NetworkStream>>;
pub type BodyReader = HttpReader<Chain<Cursor<Vec<u8>>, NetStream>>;
const PEEK_BYTES: usize = 512;
pub struct Data {
buffer: Vec<u8>,
is_complete: bool,
stream: BodyReader,
}
impl Data {
pub fn open(mut self) -> DataStream {
let buffer = ::std::mem::replace(&mut self.buffer, vec![]);
let empty_stream = Cursor::new(vec![]).chain(NetStream::Empty);
let empty_http_stream = HttpReader::SizedReader(empty_stream, 0);
let stream = ::std::mem::replace(&mut self.stream, empty_http_stream);
DataStream(Cursor::new(buffer).chain(stream))
}
crate fn from_hyp(
req: &crate::Request<'_>,
mut body: HyperBodyReader
) -> Result<Data, &'static str> {
#[inline(always)]
#[cfg(feature = "tls")]
fn concrete_stream(stream: &mut dyn NetworkStream) -> Option<NetStream> {
stream.downcast_ref::<HttpsStream>()
.map(|s| NetStream::Https(s.clone()))
.or_else(|| {
stream.downcast_ref::<HttpStream>()
.map(|s| NetStream::Http(s.clone()))
})
}
#[inline(always)]
#[cfg(not(feature = "tls"))]
fn concrete_stream(stream: &mut dyn NetworkStream) -> Option<NetStream> {
stream.downcast_ref::<HttpStream>()
.map(|s| NetStream::Http(s.clone()))
}
let net_stream = match concrete_stream(*body.get_mut().get_mut()) {
Some(net_stream) => net_stream,
None => return Err("Stream is not an HTTP(s) stream!")
};
let timeout = req.state.config.read_timeout.map(|s| Duration::from_secs(s as u64));
let _ = net_stream.set_read_timeout(timeout);
let (mut hyper_buf, pos, cap) = body.get_mut().take_buf();
hyper_buf.truncate(cap); let mut cursor = Cursor::new(hyper_buf);
cursor.set_position(pos as u64);
let inner_data = cursor.chain(net_stream);
let http_stream = match body {
SizedReader(_, n) => SizedReader(inner_data, n),
EofReader(_) => EofReader(inner_data),
EmptyReader(_) => EmptyReader(inner_data),
ChunkedReader(_, n) => ChunkedReader(inner_data, n)
};
Ok(Data::new(http_stream))
}
#[inline(always)]
pub fn peek(&self) -> &[u8] {
if self.buffer.len() > PEEK_BYTES {
&self.buffer[..PEEK_BYTES]
} else {
&self.buffer
}
}
#[inline(always)]
pub fn peek_complete(&self) -> bool {
self.is_complete
}
#[inline(always)]
pub fn stream_to<W: Write>(self, writer: &mut W) -> io::Result<u64> {
io::copy(&mut self.open(), writer)
}
#[inline(always)]
pub fn stream_to_file<P: AsRef<Path>>(self, path: P) -> io::Result<u64> {
io::copy(&mut self.open(), &mut File::create(path)?)
}
#[inline(always)]
crate fn new(mut stream: BodyReader) -> Data {
trace_!("Data::new({:?})", stream);
let mut peek_buf: Vec<u8> = vec![0; PEEK_BYTES];
let eof = match stream.read_max(&mut peek_buf[..]) {
Ok(n) => {
trace_!("Filled peek buf with {} bytes.", n);
peek_buf.truncate(n);
n < PEEK_BYTES
}
Err(e) => {
error_!("Failed to read into peek buffer: {:?}.", e);
peek_buf.truncate(0);
false
},
};
trace_!("Peek bytes: {}/{} bytes.", peek_buf.len(), PEEK_BYTES);
Data { buffer: peek_buf, stream, is_complete: eof }
}
#[inline]
crate fn local(data: Vec<u8>) -> Data {
let empty_stream = Cursor::new(vec![]).chain(NetStream::Empty);
Data {
buffer: data,
stream: HttpReader::SizedReader(empty_stream, 0),
is_complete: true,
}
}
}
impl Drop for Data {
fn drop(&mut self) {
kill_stream(&mut self.stream);
}
}