/* $Id$
*
* EHS is a library for embedding HTTP(S) support into a C++ application
*
* Copyright (C) 2004 Zachary J. Hansen
*
* Code cleanup, new features and bugfixes: Copyright (C) 2010 Fritz Elfert
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1 as published by the Free Software Foundation;
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* This can be found in the 'COPYING' file.
*
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "ehs.h"
#include "networkabstraction.h"
#include "ehsconnection.h"
#include "ehsserver.h"
#include "socket.h"
#include "securesocket.h"
#include "debug.h"
#include <pcrecpp.h>
#include <pthread.h>
#include <fstream>
#include <sstream>
#include <stdexcept>
#include <cerrno>
static const char * const EHSconfig = "EHS_CONFIG:SSL="
#ifdef COMPILE_WITH_SSL
"1"
#else
"0"
#endif
",DEBUG="
#ifdef EHS_DEBUG
"1"
#else
"0"
#endif
",VERSION=" VERSION ",RELEASE=" SVNREV ",BUILD=" __DATE__ " " __TIME__;
const char * getEHSconfig()
{
return EHSconfig;
}
using namespace std;
/**
* Automatically unlocks a mutex if destroyed.
*/
class MutexHelper {
public:
MutexHelper(pthread_mutex_t *mutex, bool locknow = true) :
m_pMutex(mutex), m_bLocked(false)
{
if (locknow)
Lock();
}
~MutexHelper()
{
if (m_bLocked)
pthread_mutex_unlock(m_pMutex);
}
void Lock()
{
pthread_mutex_lock(m_pMutex);
m_bLocked = true;
}
void Unlock()
{
m_bLocked = false;
pthread_mutex_unlock(m_pMutex);
}
private:
pthread_mutex_t *m_pMutex;
bool m_bLocked;
MutexHelper(const MutexHelper &);
MutexHelper & operator=(const MutexHelper &);
};
int EHSServer::CreateFdSet()
{
// don't lock mutex, as this is only called from within a locked section
FD_ZERO(&m_oReadFds);
// add the accepting FD
FD_SET(m_poNetworkAbstraction->GetFd(), &m_oReadFds);
int nHighestFd = m_poNetworkAbstraction->GetFd();
for (EHSConnectionList::iterator i = m_oEHSConnectionList.begin();
i != m_oEHSConnectionList.end(); i++) {
/// skip this one if it's already been used
if ((*i)->StillReading()) {
int nCurrentFd = (*i)->GetNetworkAbstraction()->GetFd();
// EHS_TRACE("Adding %d to FD SET\n", nCurrentFd);
FD_SET(nCurrentFd, &m_oReadFds);
// store the highest FD in the set to return it
if (nCurrentFd > nHighestFd) {
nHighestFd = nCurrentFd;
}
} else {
EHS_TRACE("FD %d isn't reading anymore\n",
(*i)->GetNetworkAbstraction()->GetFd());
}
}
return nHighestFd;
}
void EHSServer::ClearIdleConnections()
{
// don't lock mutex, as this is only called from within locked sections
for (EHSConnectionList::iterator i = m_oEHSConnectionList.begin();
i != m_oEHSConnectionList.end(); i++) {
MutexHelper mh(&(*i)->m_oMutex);
// if it's been more than N seconds since a response has been
// sent and there are no pending requests
if ((*i)->StillReading() &&
time(NULL) - (*i)->LastActivity() > m_nIdleTimeout &&
(*i)->RequestsPending()) {
EHS_TRACE("Done reading because of idle timeout\n");
mh.Unlock();
(*i)->DoneReading(false);
mh.Lock();
}
}
RemoveFinishedConnections();
}
void EHSServer::RemoveFinishedConnections ( )
{
// don't lock mutex, as this is only called from within locked sections
for (EHSConnectionList::iterator i = m_oEHSConnectionList.begin();
i != m_oEHSConnectionList.end(); ) {
if ((*i)->CheckDone()) {
RemoveEHSConnection(*i);
i = m_oEHSConnectionList.begin();
} else {
i++;
}
}
}
EHSConnection::EHSConnection(NetworkAbstraction *ipoNetworkAbstraction,
EHSServer * ipoEHSServer) :
m_bDoneReading(false),
m_bDisconnected(false),
m_poCurrentHttpRequest(NULL),
m_poEHSServer(ipoEHSServer),
m_nLastActivity(0),
m_nRequests(0),
m_nResponses(0),
m_poNetworkAbstraction(ipoNetworkAbstraction),
m_sBuffer(""),
m_oHttpResponseMap(HttpResponseMap()),
m_oHttpRequestList(HttpRequestList()),
m_sAddress(ipoNetworkAbstraction->GetAddress()),
m_nPort(ipoNetworkAbstraction->GetPort()),
m_nMaxRequestSize(MAX_REQUEST_SIZE_DEFAULT),
m_oMutex(pthread_mutex_t())
{
UpdateLastActivity();
// initialize mutex for this object
pthread_mutex_init(&m_oMutex, NULL);
}
EHSConnection::~EHSConnection()
{
delete m_poCurrentHttpRequest;
delete m_poNetworkAbstraction;
}
NetworkAbstraction *EHSConnection::GetNetworkAbstraction()
{
return m_poNetworkAbstraction;
}
// adds data to the current buffer for this connection
EHSConnection::AddBufferResult
EHSConnection::AddBuffer(char *ipsData, ///< new data to be added
int inSize ///< size of new data
)
{
MutexHelper mh(&m_oMutex);
// make sure we actually got some data
if ( inSize <= 0 ) {
return ADDBUFFER_INVALID;
}
// make sure the buffer doesn't grow too big
if ((m_sBuffer.length() + inSize) > m_nMaxRequestSize) {
EHS_TRACE("AddBuffer: MaxRequestSize (%lu) exceeded.\n", m_nMaxRequestSize);
return ADDBUFFER_TOOBIG;
}
// this is binary safe -- only the single argument char* constructor looks for NULL
m_sBuffer += string ( ipsData, inSize );
// need to run through our buffer until we don't get a full result out
do {
// if we need to make a new request object, do that now
if (NULL == m_poCurrentHttpRequest ||
m_poCurrentHttpRequest->m_nCurrentHttpParseState == HttpRequest::HTTPPARSESTATE_COMPLETEREQUEST ) {
// if we have one already, toss it on the list
if (NULL != m_poCurrentHttpRequest) {
m_oHttpRequestList.push_back(m_poCurrentHttpRequest);
m_poEHSServer->IncrementRequestsPending();
// wake up everyone
pthread_cond_broadcast(& m_poEHSServer->m_oDoneAccepting);
if (m_poEHSServer->m_nServerRunningStatus == EHSServer::SERVERRUNNING_ONETHREADPERREQUEST ) {
// create a thread if necessary
pthread_t oThread;
// XXX: Why unlocked ?
mh.Unlock();
pthread_create(&oThread, NULL,
EHSServer::PthreadHandleData_ThreadedStub,
(void *)m_poEHSServer);
EHS_TRACE("created thread with TID=0x%x, NULL, func=0x%x, data=0x%x\n",
oThread, EHSServer::PthreadHandleData_ThreadedStub, (void *) m_poEHSServer);
pthread_detach(oThread);
mh.Lock();
}
}
// create the initial request
m_poCurrentHttpRequest = new HttpRequest(++m_nRequests, this);
m_poCurrentHttpRequest->m_bSecure = m_poNetworkAbstraction->IsSecure();
}
// parse through the current data
m_poCurrentHttpRequest->ParseData(m_sBuffer);
} while (m_poCurrentHttpRequest->m_nCurrentHttpParseState ==
HttpRequest::HTTPPARSESTATE_COMPLETEREQUEST);
AddBufferResult nReturnValue;
// return either invalid request or ok
if ( m_poCurrentHttpRequest->m_nCurrentHttpParseState == HttpRequest::HTTPPARSESTATE_INVALIDREQUEST ) {
nReturnValue = ADDBUFFER_INVALIDREQUEST;
} else {
nReturnValue = ADDBUFFER_OK;
}
return nReturnValue;
}
/// call when no more reads will be performed on this object. inDisconnected is true when client has disconnected
void EHSConnection::DoneReading(bool ibDisconnected)
{
MutexHelper mh(&m_oMutex);
m_bDoneReading = true;
m_bDisconnected = ibDisconnected;
}
HttpRequest * EHSConnection::GetNextRequest()
{
HttpRequest *ret = NULL;
MutexHelper mh(&m_oMutex);
if (!m_oHttpRequestList.empty()) {
ret = m_oHttpRequestList.front();
m_oHttpRequestList.pop_front();
}
return ret;
}
int EHSConnection::CheckDone()
{
// if we're not still reading, we may want to drop this connection
if ( !StillReading ( ) ) {
// if we're done with all our responses (-1 because the next (unused) request is already created)
if (m_nRequests - 1 <= m_nResponses) {
// if we haven't disconnected, do that now
if (!m_bDisconnected) {
EHS_TRACE ("Closing connection: .\n");
m_poNetworkAbstraction->Close();
}
return 1;
}
}
return 0;
}
////////////////////////////////////////////////////////////////////
// EHS SERVER
////////////////////////////////////////////////////////////////////
EHSServer::EHSServer ( EHS * ipoTopLevelEHS ///< pointer to top-level EHS for request routing
) :
m_nServerRunningStatus(SERVERRUNNING_NOTRUNNING),
m_poTopLevelEHS(ipoTopLevelEHS),
m_bAcceptedNewConnection(false),
m_oMutex(pthread_mutex_t()),
m_oDoneAccepting(pthread_cond_t()),
m_nRequestsPending(0),
m_bAccepting(false),
m_sServerName(""),
m_oReadFds(fd_set()),
m_oEHSConnectionList(EHSConnectionList()),
m_poNetworkAbstraction(NULL),
m_nAcceptThreadId(0),
m_nIdleTimeout(15),
m_nThreads(0),
m_poCurrentRequest(NULL)
{
// you HAVE to specify a top-level EHS object
if (NULL == m_poTopLevelEHS) {
throw invalid_argument("EHSServer::EHSServer: Pointer to toplevel EHS object is NULL.");
}
pthread_mutex_init(&m_oMutex, NULL);
pthread_cond_init(&m_oDoneAccepting, NULL);
// grab out the parameters for less typing later on
EHSServerParameters & roEHSServerParameters =
ipoTopLevelEHS->m_oEHSServerParameters;
// whether to run with https support
int nHttps = roEHSServerParameters["https"];
if (nHttps) {
EHS_TRACE("EHSServer running in HTTPS mode\n");
} else {
EHS_TRACE("EHSServer running in plain-text mode (no HTTPS)\n");
}
// are we using secure sockets?
if (nHttps) {
#ifdef COMPILE_WITH_SSL
EHS_TRACE("Trying to create secure socket with certificate='%s' and passphrase='%s'\n",
(const char*)roEHSServerParameters["certificate"],
(const char*)roEHSServerParameters["passphrase"]);
m_poNetworkAbstraction = new SecureSocket(roEHSServerParameters["certificate"],
reinterpret_cast<PassphraseHandler *>(ipoTopLevelEHS));
#else // COMPILE_WITH_SSL
throw runtime_error("EHSServer::EHSServer: EHS not compiled with SSL support. Cannot create HTTPS server.");
#endif // COMPILE_WITH_SSL
} else {
m_poNetworkAbstraction = new Socket();
}
// initialize the socket
if (roEHSServerParameters["bindaddress"] != "") {
m_poNetworkAbstraction->SetBindAddress(roEHSServerParameters["bindaddress"]);
}
m_poNetworkAbstraction->RegisterBindHelper(m_poTopLevelEHS->GetBindHelper());
int nResult = m_poNetworkAbstraction->Init(roEHSServerParameters["port"]); // initialize socket stuff
if (nResult != NetworkAbstraction::INITSOCKET_SUCCESS) {
EHS_TRACE("Error: Failed to initialize sockets\n");
return;
}
if (roEHSServerParameters["mode"] == "threadpool") {
// need to set this here because the thread will check this to make
// sure it's supposed to keep running
m_nServerRunningStatus = SERVERRUNNING_THREADPOOL;
// create a pthread
int nResult = -1;
int nThreadsToStart = roEHSServerParameters ["threadcount"].GetInt();
if (nThreadsToStart <= 0) {
nThreadsToStart = 1;
}
EHS_TRACE ("Starting %d threads\n", nThreadsToStart);
for (int i = 0; i < nThreadsToStart; i++) {
// create new thread and detach so we don't have to join on it
nResult = pthread_create(&m_nAcceptThreadId, NULL,
EHSServer::PthreadHandleData_ThreadedStub, (void *)this);
EHS_TRACE("created thread with ID=0x%x, NULL, func=0x%x, this=0x%x\n",
m_nAcceptThreadId, EHSServer::PthreadHandleData_ThreadedStub, this);
pthread_detach(m_nAcceptThreadId);
}
if (nResult != 0) {
m_nServerRunningStatus = SERVERRUNNING_NOTRUNNING;
}
} else if (roEHSServerParameters["mode"] == "onethreadperrequest") {
m_nServerRunningStatus = SERVERRUNNING_ONETHREADPERREQUEST;
// spawn off one thread just to deal with basic stuff
nResult = pthread_create(&m_nAcceptThreadId, NULL,
EHSServer::PthreadHandleData_ThreadedStub, (void *)this);
EHS_TRACE ( "created thread with ID=0x%x, NULL, func=0x%x, this=0x%x\n",
m_nAcceptThreadId, EHSServer::PthreadHandleData_ThreadedStub, this);
pthread_detach(m_nAcceptThreadId);
// check to make sure the thread was created properly
if (nResult != 0) {
m_nServerRunningStatus = SERVERRUNNING_NOTRUNNING;
}
} else if (roEHSServerParameters["mode"] == "singlethreaded") {
// we're single threaded
m_nServerRunningStatus = SERVERRUNNING_SINGLETHREADED;
} else {
EHS_TRACE("INVALID 'mode' SPECIFIED.\ntMust be 'singlethreaded', 'threadpool', or 'onethreadperrequest'\n");
throw runtime_error("EHSServer::EHSServer: invalid mode specified");
}
switch (m_nServerRunningStatus) {
case SERVERRUNNING_THREADPOOL:
EHS_TRACE("Info: EHS Server running in threadpool mode with %s threads\n",
roEHSServerParameters["threadcount"] == "" ? "1" :
roEHSServerParameters[ "threadcount"].GetCharString());
break;
case SERVERRUNNING_ONETHREADPERREQUEST:
EHS_TRACE("Info: EHS Server running with one thread per request\n");
break;
case SERVERRUNNING_SINGLETHREADED:
EHS_TRACE("Info: EHS Server running in singlethreaded mode\n");
break;
default:
EHS_TRACE("Error: EHS Server not running. Server initialization failed\n");
break;
}
return;
}
EHSServer::~EHSServer ( )
{
delete m_poNetworkAbstraction;
// Delete all elements in our connection list
while ( ! m_oEHSConnectionList.empty() ) {
delete m_oEHSConnectionList.front ( );
m_oEHSConnectionList.pop_front ( );
}
}
HttpRequest * EHSServer::GetNextRequest()
{
// don't lock because this is only called from within locked sections
HttpRequest * poNextRequest = NULL;
// pick a random connection if the list isn't empty
if (!m_oEHSConnectionList.empty()) {
// pick a random connection, so no one takes too much time
int nWhich = (int)(((double)m_oEHSConnectionList.size()) * rand() / (RAND_MAX + 1.0));
// go to that element
EHSConnectionList::iterator i = m_oEHSConnectionList.begin();
int nCounter = 0;
for (nCounter = 0; nCounter < nWhich; nCounter++) {
i++;
}
// now get the next available request treating the list as circular
EHSConnectionList::iterator iStartPoint = i;
int nFirstTime = 1;
while (poNextRequest == NULL && !(iStartPoint == i && nFirstTime == 0)) {
// check this one to see if it has anything
poNextRequest = (*i)->GetNextRequest();
i++;
if (i == m_oEHSConnectionList.end()) {
i = m_oEHSConnectionList.begin();
}
nFirstTime = 0;
// decrement the number of pending requests
if (poNextRequest != NULL) {
m_nRequestsPending--;
}
}
}
if (poNextRequest == NULL) {
// EHS_TRACE ("No request found\n");
} else {
EHS_TRACE ( "Found request\n" );
}
return poNextRequest;
}
void EHSServer::RemoveEHSConnection(EHSConnection * ipoEHSConnection)
{
// don't lock as this is only called from within locked sections
if (NULL == ipoEHSConnection) {
throw invalid_argument("EHSServer::RemoveEHSConnection: argument is NULL");
}
bool removed = false;
EHS_TRACE("%d connections to look for something to delete\n",
m_oEHSConnectionList.size ( ) );
// go through the list and find all occurances of ipoEHSConnection
for (EHSConnectionList::iterator i = m_oEHSConnectionList.begin();
i != m_oEHSConnectionList.end(); /* no third part */) {
if (*i == ipoEHSConnection) {
if (removed) {
throw runtime_error("EHSServer::RemoveEHSConnection: Deleting a second element");
}
removed = true;
// destroy the connection and remove it from the list
delete *i;
m_oEHSConnectionList.erase(i);
// start back over at the beginning of the list
i = m_oEHSConnectionList.begin();
} else {
i++;
}
}
}
bool EHS::ThreadInitHandler()
{
EHS_TRACE("EHS::ThreadInitHandler\n");
return true;
}
void EHS::ThreadExitHandler()
{
EHS_TRACE("EHS::ThreadExitHandler\n");
}
const std::string EHS::GetPassphrase(bool /* twice */)
{
return m_oEHSServerParameters["passphrase"];
}
EHS::StartServerResult
EHS::StartServer(EHSServerParameters ¶ms)
{
StartServerResult nResult = STARTSERVER_INVALID;
m_oEHSServerParameters = params;
if (m_poEHSServer != NULL) {
EHS_TRACE("Warning: Tried to start server that was already running\n");
nResult = STARTSERVER_ALREADYRUNNING;
} else {
// associate a EHSServer object to this EHS object
m_poEHSServer = new EHSServer(this);
if (m_poEHSServer->RunningStatus() == EHSServer::SERVERRUNNING_NOTRUNNING) {
EHS_TRACE("Error: Failed to start server\n");
delete m_poEHSServer;
m_poEHSServer = NULL;
return STARTSERVER_FAILED;
}
}
return STARTSERVER_SUCCESS;
}
// this is the function specified to pthread_create under UNIX
// because you can't start a thread directly into a class method
void * EHSServer::PthreadHandleData_ThreadedStub(void * ipParam ///< EHSServer object cast to a void pointer
)
{
EHSServer *self = reinterpret_cast<EHSServer *>(ipParam);
self->m_nThreads++;
self->HandleData_Threaded();
self->m_nThreads--;
return NULL;
}
void EHS::StopServer()
{
// make sure we're in a sane state
if ((NULL == m_poParent) && (NULL == m_poEHSServer)) {
throw runtime_error("EHS::StopServer: Invalid state");
}
if (m_poParent) {
m_poParent->StopServer();
} else if (m_poEHSServer) {
m_poEHSServer->EndServerThread();
delete m_poEHSServer;
m_poEHSServer = NULL;
}
}
bool EHS::ShouldTerminate() const
{
// make sure we're in a sane state
if ((NULL == m_poParent) && (NULL == m_poEHSServer)) {
throw runtime_error("EHS::StopServer: Invalid state");
}
bool ret = false;
if (m_poParent) {
ret = m_poParent->ShouldTerminate();
} else if (m_poEHSServer) {
ret = (EHSServer::SERVERRUNNING_SHOULDTERMINATE == m_poEHSServer->RunningStatus());
}
return ret;
}
void EHSServer::HandleData_Threaded()
{
if (m_poTopLevelEHS->ThreadInitHandler()) {
pthread_t self = pthread_self();
do {
bool catched = false;
HttpResponse *eResponse = NULL;
try {
HandleData(1000, self); // 1000ms select timeout
} catch (exception &e) {
catched = true;
eResponse = m_poTopLevelEHS->HandleThreadException(self, m_poCurrentRequest, e);
} catch (...) {
catched = true;
runtime_error e("unspecified");
eResponse = m_poTopLevelEHS->HandleThreadException(self, m_poCurrentRequest, e);
}
if (catched) {
if (NULL != eResponse) {
eResponse->m_poEHSConnection->AddResponse(eResponse);
MutexHelper mutex(&m_oMutex);
delete m_poCurrentRequest;
m_poCurrentRequest = NULL;
} else {
m_nServerRunningStatus = SERVERRUNNING_SHOULDTERMINATE;
m_nAcceptThreadId = 0;
MutexHelper mutex(&m_oMutex);
delete m_poCurrentRequest;
m_poCurrentRequest = NULL;
}
}
} while (m_nServerRunningStatus == SERVERRUNNING_THREADPOOL ||
self == m_nAcceptThreadId);
m_poTopLevelEHS->ThreadExitHandler();
}
}
void EHSServer::HandleData (int inTimeoutMilliseconds, ///< milliseconds for timeout on select
pthread_t inThreadId ///< numeric ID for this thread to help debug
)
{
MutexHelper mutex(&m_oMutex);
// determine if there are any jobs waiting if this thread should --
// if we're running one-thread-per-request and this is the accept thread
// we don't look for requests
m_poCurrentRequest = NULL;
if (m_nServerRunningStatus != SERVERRUNNING_ONETHREADPERREQUEST ||
inThreadId != m_nAcceptThreadId ) {
m_poCurrentRequest = GetNextRequest();
}
// if we got a request to handle
if (NULL != m_poCurrentRequest) {
// handle the request and post it back to the connection object
mutex.Unlock();
// route the request
HttpResponse *response = m_poTopLevelEHS->RouteRequest(m_poCurrentRequest).release();
response->m_poEHSConnection->AddResponse(response);
mutex.Lock();
delete m_poCurrentRequest;
m_poCurrentRequest = NULL;
} else {
// otherwise, no requests are pending
// if something is already accepting, sleep
if (m_bAccepting) {
// wait until something happens
// it's ok to not recheck our condition here, as we'll come back in the same way and recheck then
EHS_TRACE("Waiting on m_oDoneAccepting condition TID=%p\n", pthread_self());
pthread_cond_wait(&m_oDoneAccepting, &m_oMutex);
EHS_TRACE("Done waiting on m_oDoneAccepting condition TID=%p\n", pthread_self());
} else {
// if no one is accepting, we accept
m_bAcceptedNewConnection = false;
// we're now accepting
m_bAccepting = true;
mutex.Unlock();
// set up the timeout and normalize
timeval tv = { 0, inTimeoutMilliseconds * 1000 };
tv.tv_sec = tv.tv_usec / 1000000;
tv.tv_usec %= 1000000;
// create the FD set for select
int nHighestFd = CreateFdSet();
// call select
int nSocketCount = select(nHighestFd + 1, &m_oReadFds, NULL, NULL, &tv);
// handle select error
if (nSocketCount ==
#ifdef _WIN32
SOCKET_ERROR
#else // NOT _WIN32
-1
#endif // _WIN32
)
{
#ifdef _WIN32
EHS_TRACE("[%d] Critical Error: select() failed. Aborting\n", inThreadId);
throw runtime_error("EHSServer::HandleData: select() failed.");
#else // NOT _WIN32
if (errno != EINTR) {
EHS_TRACE("[%d] Critical Error: select() failed. Aborting\n", inThreadId);
throw runtime_error("EHSServer::HandleData: select() failed.");
}
#endif // _WIN32
}
// if no sockets have data to read, clear accepting flag and return
if (nSocketCount > 0) {
// Check the accept socket for a new connection
CheckAcceptSocket();
// check client sockets for data
CheckClientSockets();
}
mutex.Lock();
ClearIdleConnections();
m_bAccepting = false;
} // END ACCEPTING
} // END NO REQUESTS PENDING
}
void EHSServer::CheckAcceptSocket ( )
{
// see if we got data on this socket
if (FD_ISSET(m_poNetworkAbstraction->GetFd(), &m_oReadFds)) {
// THIS SHOULD BE NON-BLOCKING OR ELSE A HANG CAN OCCUR IF THEY DISCONNECT BETWEEN WHEN
// POLL SEES THE CONNECTION AND WHEN WE ACTUALLY CALL ACCEPT
NetworkAbstraction * poNewClient = m_poNetworkAbstraction->Accept();
if (poNewClient == NULL) {
// accept or (in case of SSL, ssl handshake) has failed.
return;
}
// create a new EHSConnection object and initialize it
EHSConnection * poEHSConnection = new EHSConnection ( poNewClient, this );
if (m_poTopLevelEHS->m_oEHSServerParameters.find("maxrequestsize") !=
m_poTopLevelEHS->m_oEHSServerParameters.end()) {
unsigned long n = m_poTopLevelEHS->m_oEHSServerParameters["maxrequestsize"];
EHS_TRACE ( "Setting connections MaxRequestSize to %lu\n", n );
poEHSConnection->SetMaxRequestSize ( n );
}
{
MutexHelper mutex(&m_oMutex);
m_oEHSConnectionList.push_back(poEHSConnection);
m_bAcceptedNewConnection = true;
}
} // end FD_ISSET ( )
}
void EHSServer::CheckClientSockets ( )
{
// go through all the sockets from which we're still reading
for (EHSConnectionList::iterator i = m_oEHSConnectionList.begin();
i != m_oEHSConnectionList.end(); i++) {
if (FD_ISSET((*i)->GetNetworkAbstraction()->GetFd(), &m_oReadFds)) {
EHS_TRACE("$$$$$ Got data on client connection\n");
// do the actual read
char buf[8192];
int nBytesReceived = (*i)->GetNetworkAbstraction()->Read(buf, sizeof(buf));
// if we received a disconnect
if (nBytesReceived <= 0) {
// we're done reading and we received a disconnect
(*i)->DoneReading(true);
} else {
// otherwise we got data
// take the data we got and append to the connection's buffer
EHSConnection::AddBufferResult nAddBufferResult =
(*i)->AddBuffer(buf, nBytesReceived);
// if add buffer failed, don't read from this connection anymore
if (nAddBufferResult == EHSConnection::ADDBUFFER_INVALIDREQUEST ||
nAddBufferResult == EHSConnection::ADDBUFFER_TOOBIG) {
// Immediately send a 400 response, then close the connection
auto_ptr<HttpResponse> tmp(HttpResponse::Error(HTTPRESPONSECODE_400_BADREQUEST, 0, *i));
(*i)->SendHttpResponse(tmp);
// done reading but did not receieve disconnect
EHS_TRACE("Done reading because we got a bad request\n");
(*i)->DoneReading(false);
} // end error with AddBuffer
} // end nBytesReceived
} // FD_ISSET
} // for loop through connections
}
void EHSConnection::AddResponse(HttpResponse *response)
{
MutexHelper mutex(&m_oMutex);
// push the object on to the list
m_oHttpResponseMap[response->m_nResponseId] = response;
// go through the list until we can't find the next response to send
bool found;
do {
found = false;
HttpResponseMap::iterator i = m_oHttpResponseMap.find(m_nResponses + 1);
if (m_oHttpResponseMap.end() != i) {
found = true;
SendHttpResponse(auto_ptr<HttpResponse>(i->second));
m_oHttpResponseMap.erase(i);
m_nResponses++;
// set last activity to the current time for idle purposes
UpdateLastActivity();
// if we're done with this connection, get rid of it
if (CheckDone()) {
EHS_TRACE("add response found something to delete\n");
// careful with mutexes around here.. Don't want to hold both
mutex.Unlock();
pthread_mutex_lock(&m_poEHSServer->m_oMutex);
m_poEHSServer->RemoveEHSConnection(this);
pthread_mutex_unlock(&m_poEHSServer->m_oMutex);
mutex.Lock();
}
EHS_TRACE("Sending response %d to %x\n", m_nResponses, this);
}
} while (found);
}
void EHSConnection::SendHttpResponse(auto_ptr<HttpResponse> response)
{
// only send it if the client isn't disconnected
if (Disconnected()) {
return;
}
ostringstream oss;
// add in the response code
oss << "HTTP/1.1 " << response->m_nResponseCode
<< " " << HttpResponse::GetPhrase(response->m_nResponseCode) << "\r\n";
// now go through all the entries in the responseheaders string map
StringMap::iterator ith = response->m_oResponseHeaders.begin();
while (ith != response->m_oResponseHeaders.end()) {
oss << ith->first << ": " << ith->second << "\r\n";
ith++;
}
// now push out all the cookies
StringList::iterator itl = response->m_oCookieList.begin ( );
while (itl != response->m_oCookieList.end()) {
oss << "Set-Cookie: " << *itl << "\r\n";
itl++;
}
// extra line break signalling end of headers
oss << "\r\n";
m_poNetworkAbstraction->Send(
reinterpret_cast<const void *>(oss.str().c_str()), oss.str().length());
// now send the body
m_poNetworkAbstraction->Send(response->GetBody(),
atoi(response->m_oResponseHeaders["content-length"].c_str()));
}
void EHSServer::EndServerThread()
{
pthread_mutex_lock(&m_oMutex);
m_nServerRunningStatus = SERVERRUNNING_NOTRUNNING;
m_nAcceptThreadId = 0;
pthread_mutex_unlock(&m_oMutex);
while (m_nThreads > 0) {
EHS_TRACE ("EndServerThread waiting for %d threads to terminate\n", m_nThreads);
pthread_cond_broadcast(&m_oDoneAccepting);
sleep(1);
}
EHS_TRACE ("EndServerThread: all threads terminated\n");
}
EHS::EHS (EHS *ipoParent, ///< parent EHS object for routing purposes
string isRegisteredAs ///< path string for routing purposes
) :
m_oEHSMap(EHSMap()),
m_poParent(ipoParent),
m_sRegisteredAs(isRegisteredAs),
m_poEHSServer(NULL),
m_poSourceEHS(NULL),
m_poBindHelper(NULL),
m_oEHSServerParameters(EHSServerParameters())
{
EHS_TRACE("EHS::EHS TID=%p\n", pthread_self());
}
EHS::~EHS ()
{
// needs to clean up all its registered interfaces
if (m_poParent) {
m_poParent->UnregisterEHS ( (char *)(m_sRegisteredAs.c_str ( ) ) );
}
if (m_poEHSServer) {
delete m_poEHSServer;
}
}
EHS::RegisterEHSResult
EHS::RegisterEHS (EHS * ipoEHS, ///< new sibling
const char * ipsRegisterPath ///< path for routing
)
{
ipoEHS->m_poParent = this;
ipoEHS->m_sRegisteredAs = ipsRegisterPath;
if (m_oEHSMap[ipsRegisterPath]) {
return REGISTEREHSINTERFACE_ALREADYEXISTS;
}
m_oEHSMap[ipsRegisterPath] = ipoEHS;
return REGISTEREHSINTERFACE_SUCCESS;
}
EHS::UnregisterEHSResult
EHS::UnregisterEHS (char *ipsRegisterPath ///< remove object at this path
)
{
if (!m_oEHSMap[ipsRegisterPath]) {
return UNREGISTEREHSINTERFACE_NOTREGISTERED;
}
m_oEHSMap.erase(ipsRegisterPath);
return UNREGISTEREHSINTERFACE_SUCCESS;
}
void EHS::HandleData(int inTimeoutMilliseconds ///< milliseconds for select timeout
)
{
// make sure we're in a sane state
if ((NULL == m_poParent) && (NULL == m_poEHSServer)) {
throw runtime_error("EHS::HandleData: Invalid state");
}
if (m_poParent) {
m_poParent->HandleData(inTimeoutMilliseconds);
} else {
// if we're in single threaded mode, handle data until there are no more jobs left
if (m_poEHSServer->RunningStatus() == EHSServer::SERVERRUNNING_SINGLETHREADED) {
do {
m_poEHSServer->HandleData(inTimeoutMilliseconds, pthread_self());
} while (m_poEHSServer->RequestsPending() ||
m_poEHSServer->AcceptedNewConnection());
}
}
}
string GetNextPathPart(string &irsUri ///< URI to look for next path part in
)
{
string ret;
string newuri;
pcrecpp::RE re("^/{0,1}([^/]+)/(.*)$");
if (re.FullMatch(irsUri, &ret, &newuri) ) {
irsUri = newuri;
return ret;
}
return "";
}
auto_ptr<HttpResponse>
EHS::RouteRequest(HttpRequest *request ///< request info for service
)
{
// get the next path from the URI
string sNextPathPart = GetNextPathPart(request->m_sUri);
EHS_TRACE ("Info: Trying to route: '%s'\n", sNextPathPart.c_str());
// We use an auto_ptr here, so that in case of an exception, the
// target gets deleted.
auto_ptr<HttpResponse> response(0);
// if there is no more path, call HandleRequest on this EHS object with
// whatever's left - or if we're not routing
if (sNextPathPart.empty() ||
m_oEHSServerParameters.find("norouterequest") !=
m_oEHSServerParameters.end()) {
// create an HttpRespose object for the client
response = auto_ptr<HttpResponse>(new HttpResponse(request->m_nRequestId,
request->m_poSourceEHSConnection));
// get the actual response and return code
response->SetResponseCode(HandleRequest(request, response.get()));
} else {
// if the path exists, check it against the map of EHSs
if (m_oEHSMap[sNextPathPart]) {
// if it exists, call its RouteRequest with the new shortened path
response = m_oEHSMap[sNextPathPart]->RouteRequest(request);
} else {
// if it doesn't exist, send an error back up saying resource doesn't exist
EHS_TRACE("Info: Routing failed. Most likely caused by an invalid URL, not internal error\n");
// send back a 404 response
response = auto_ptr<HttpResponse>(HttpResponse::Error(HTTPRESPONSECODE_404_NOTFOUND, request));
}
}
return response;
}
// default handle request returns time as text
ResponseCode EHS::HandleRequest(HttpRequest *request,
HttpResponse * response)
{
// if we have a source EHS specified, use it
if (m_poSourceEHS != NULL) {
return m_poSourceEHS->HandleRequest(request, response);
}
// otherwise, just send back the current time
ostringstream oss;
oss << time(NULL);
response->SetBody(oss.str().c_str(), oss.str().length());
response->SetHeader("Content-Type", "text/plain");
return HTTPRESPONSECODE_200_OK;
}
void EHS::SetSourceEHS(EHS & iroSourceEHS)
{
m_poSourceEHS = &iroSourceEHS;
}
HttpResponse *EHS::HandleThreadException(pthread_t tid, HttpRequest *, exception &ex)
{
cerr << "Caught an exception in thread "
<< hex << tid << ": " << ex.what() << endl;
return NULL;
}