use buffer::Buffer;
use curl;
use http;
use std::cell::RefCell;
use std::io;
use std::io::Read;
use std::mem;
use std::rc::Rc;
use std::str;
use std::str::FromStr;
use std::time::Duration;
use super::*;
const DEFAULT_TIMEOUT_MS: u64 = 1000;
pub struct Transport {
multi: curl::multi::Multi,
handle: Option<Handle>,
options: Options,
data: Rc<RefCell<Data>>,
}
enum Handle {
Ready(curl::easy::Easy2<Collector>),
Active(curl::multi::Easy2Handle<Collector>),
}
struct Data {
request_body: Body,
response: http::response::Builder,
header_complete: bool,
buffer: Buffer,
}
impl Transport {
pub fn new() -> Transport {
Transport::with_options(Options::default())
}
pub fn with_options(options: Options) -> Transport {
let data = Rc::new(RefCell::new(Data {
request_body: Body::default(),
response: http::response::Builder::new(),
header_complete: false,
buffer: Buffer::new(),
}));
Transport {
multi: curl::multi::Multi::new(),
handle: None,
options: options,
data: data,
}
}
#[inline]
pub fn is_ready(&self) -> bool {
!self.is_active()
}
#[inline]
pub fn is_active(&self) -> bool {
if let Some(Handle::Active(_)) = self.handle {
true
} else {
false
}
}
pub fn execute(&mut self, request: Request) -> Result<http::response::Builder, Error> {
self.begin_request(request)?;
while !self.data.borrow().header_complete {
self.dispatch()?;
}
Ok(mem::replace(&mut self.data.borrow_mut().response, http::response::Builder::new()))
}
pub fn cancel(&mut self) -> Result<bool, Error> {
if self.is_active() {
self.end_request()?;
Ok(true)
} else {
Ok(false)
}
}
fn begin_request(&mut self, request: Request) -> Result<(), Error> {
let mut easy = match self.handle.take() {
Some(Handle::Active(_)) => {
return Err(Error::TransportBusy);
}
Some(Handle::Ready(mut easy)) => {
easy.reset();
easy
}
None => {
curl::easy::Easy2::new(Collector {
data: self.data.clone(),
})
}
};
easy.max_connects(1)?;
easy.signal(false)?;
if let Some(timeout) = self.options.timeout {
easy.timeout(timeout)?;
}
easy.connect_timeout(self.options.connect_timeout)?;
easy.tcp_nodelay(self.options.tcp_nodelay)?;
if let Some(interval) = self.options.tcp_keepalive {
easy.tcp_keepalive(true)?;
easy.tcp_keepintvl(interval)?;
}
match self.options.redirect_policy {
RedirectPolicy::None => {
easy.follow_location(false)?;
}
RedirectPolicy::Follow => {
easy.follow_location(true)?;
}
RedirectPolicy::Limit(max) => {
easy.follow_location(true)?;
easy.max_redirections(max)?;
}
}
if let Some(version) = self.options.preferred_http_version {
easy.http_version(match version {
http::Version::HTTP_10 => curl::easy::HttpVersion::V10,
http::Version::HTTP_11 => curl::easy::HttpVersion::V11,
http::Version::HTTP_2 => curl::easy::HttpVersion::V2,
_ => curl::easy::HttpVersion::Any,
})?;
}
if let Some(ref proxy) = self.options.proxy {
easy.proxy(&format!("{}", proxy))?;
}
easy.custom_request(request.method().as_str())?;
easy.url(&format!("{}", request.uri()))?;
let mut headers = curl::easy::List::new();
for (name, value) in request.headers() {
let header = format!("{}: {}", name.as_str(), value.to_str().unwrap());
headers.append(&header)?;
}
easy.http_headers(headers)?;
let body = request.into_parts().1;
if !body.is_empty() {
easy.upload(true)?;
}
self.data.borrow_mut().request_body = body;
let easy = self.multi.add2(easy)?;
self.handle = Some(Handle::Active(easy));
self.data.borrow_mut().header_complete = false;
self.data.borrow_mut().buffer.clear();
Ok(())
}
fn end_request(&mut self) -> Result<(), Error> {
self.handle = match self.handle.take() {
Some(Handle::Active(easy)) => {
let easy = self.multi.remove2(easy)?;
Some(Handle::Ready(easy))
},
handle => handle,
};
Ok(())
}
fn dispatch(&mut self) -> Result<(), Error> {
if self.is_active() {
let timeout = self.multi.get_timeout()?.unwrap_or(Duration::from_millis(DEFAULT_TIMEOUT_MS));
self.multi.wait(&mut [], timeout)?;
if self.multi.perform()? == 0 {
self.end_request()?;
let mut result = None;
self.multi.messages(|message| {
if let Some(Err(e)) = message.result() {
result = Some(e);
}
});
if let Some(e) = result {
return Err(e.into());
}
}
}
Ok(())
}
}
impl Read for Transport {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
while self.data.borrow().buffer.is_empty() && self.is_active() {
self.dispatch()?;
}
self.data.borrow_mut().buffer.read(dst)
}
}
unsafe impl Send for Transport {}
struct Collector {
data: Rc<RefCell<Data>>,
}
impl curl::easy::Handler for Collector {
fn header(&mut self, data: &[u8]) -> bool {
let line = match str::from_utf8(data) {
Ok(s) => s,
_ => return false,
};
if line.starts_with("HTTP/") {
let version = match &line[0..8] {
"HTTP/2.0" => http::Version::HTTP_2,
"HTTP/1.1" => http::Version::HTTP_11,
"HTTP/1.0" => http::Version::HTTP_10,
"HTTP/0.9" => http::Version::HTTP_09,
_ => http::Version::default(),
};
self.data.borrow_mut().response.version(version);
let status_code = match http::StatusCode::from_str(&line[9..12]) {
Ok(s) => s,
_ => return false,
};
self.data.borrow_mut().response.status(status_code);
return true;
}
if let Some(pos) = line.find(":") {
let (name, value) = line.split_at(pos);
let value = value[2..].trim();
self.data.borrow_mut().response.header(name, value);
return true;
}
if line == "\r\n" {
self.data.borrow_mut().header_complete = true;
return true;
}
false
}
fn read(&mut self, data: &mut [u8]) -> Result<usize, curl::easy::ReadError> {
self.data.borrow_mut()
.request_body
.read(data)
.map_err(|_| curl::easy::ReadError::Abort)
}
fn write(&mut self, data: &[u8]) -> Result<usize, curl::easy::WriteError> {
self.data.borrow_mut().buffer.push(data);
Ok(data.len())
}
}