#![crate_name = "tiny_http"]
#![crate_type = "lib"]
extern crate ascii;
extern crate chunked_transfer;
extern crate encoding;
extern crate url;
use std::io::Error as IoError;
use std::io::Result as IoResult;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::thread;
use std::net;
use client::ClientConnection;
use util::MessagesQueue;
pub use common::{Header, HeaderField, HTTPVersion, Method, StatusCode};
pub use request::Request;
pub use response::{ResponseBox, Response};
mod client;
mod common;
mod request;
mod response;
#[allow(dead_code)] mod util;
pub struct Server {
tasks_pool: util::TaskPool,
close: Arc<AtomicBool>,
messages: Arc<MessagesQueue<Message>>,
listening_addr: net::SocketAddr,
}
enum Message {
NewClient(Result<ClientConnection, IoError>),
NewRequest(Request),
}
impl From<Result<ClientConnection, IoError>> for Message {
fn from(c: Result<ClientConnection, IoError>) -> Message {
Message::NewClient(c)
}
}
impl From<Request> for Message {
fn from(rq: Request) -> Message {
Message::NewRequest(rq)
}
}
#[doc(hidden)]
trait MustBeShareDummy : Sync + Send {}
#[doc(hidden)]
impl MustBeShareDummy for Server {}
pub struct IncomingRequests<'a> {
server: &'a Server
}
pub struct ServerBuilder {
address: net::SocketAddrV4,
client_timeout_ms: u32,
}
impl ServerBuilder {
pub fn new() -> ServerBuilder {
ServerBuilder {
address: net::SocketAddrV4::new(net::Ipv4Addr::new(0, 0, 0, 0), 80),
client_timeout_ms: 60 * 1000,
}
}
pub fn with_port(mut self, port: u16) -> ServerBuilder {
let addr = self.address.ip().clone();
self.address = net::SocketAddrV4::new(addr, port);
self
}
pub fn with_random_port(mut self) -> ServerBuilder {
let addr = self.address.ip().clone();
self.address = net::SocketAddrV4::new(addr, 0);
self
}
pub fn with_client_connections_timeout(mut self, milliseconds: u32) -> ServerBuilder {
self.client_timeout_ms = milliseconds;
self
}
pub fn build(self) -> IoResult<Server> {
Server::new(self)
}
}
impl Server {
fn new(config: ServerBuilder) -> IoResult<Server> {
let close_trigger = Arc::new(AtomicBool::new(false));
let (server, local_addr) = {
let listener = try!(net::TcpListener::bind(net::SocketAddr::V4(config.address)));
let local_addr = try!(listener.local_addr());
(listener, local_addr)
};
let messages = MessagesQueue::with_capacity(8);
let inside_close_trigger = close_trigger.clone();
let inside_messages = messages.clone();
thread::spawn(move || {
loop {
let new_client = server.accept().map(|(sock, _)| {
use util::ClosableTcpStream;
let read_closable = ClosableTcpStream::new(sock.try_clone().unwrap(), true, false);
let write_closable = ClosableTcpStream::new(sock, false, true);
ClientConnection::new(write_closable, read_closable)
});
inside_messages.push(new_client.into());
}
});
Ok(Server {
tasks_pool: util::TaskPool::new(),
messages: messages,
close: close_trigger,
listening_addr: local_addr,
})
}
#[inline]
pub fn incoming_requests(&self) -> IncomingRequests {
IncomingRequests { server: self }
}
#[inline]
pub fn get_server_addr(&self) -> net::SocketAddr {
self.listening_addr.clone()
}
pub fn get_num_connections(&self) -> usize {
unimplemented!()
}
pub fn recv(&self) -> IoResult<Request> {
loop {
match self.messages.pop() {
Message::NewClient(Ok(client)) => self.add_client(client),
Message::NewClient(Err(err)) => return Err(err),
Message::NewRequest(rq) => return Ok(rq),
}
}
}
pub fn try_recv(&self) -> IoResult<Option<Request>> {
loop {
match self.messages.try_pop() {
Some(Message::NewClient(Ok(client))) => self.add_client(client),
Some(Message::NewClient(Err(err))) => return Err(err),
Some(Message::NewRequest(rq)) => return Ok(Some(rq)),
None => return Ok(None)
}
}
}
fn add_client(&self, client: ClientConnection) {
let messages = self.messages.clone();
let mut client = Some(client);
self.tasks_pool.spawn(Box::new(move || {
if let Some(client) = client.take() {
for rq in client {
messages.push(rq.into());
}
}
}));
}
}
impl<'a> Iterator for IncomingRequests<'a> {
type Item = Request;
fn next(&mut self) -> Option<Request> {
self.server.recv().ok()
}
}
impl Drop for Server {
fn drop(&mut self) {
use std::sync::atomic::Ordering::Relaxed;
self.close.store(true, Relaxed);
}
}