[go: up one dir, main page]

Menu

[r66]: / server / server.h  Maximize  Restore  History

Download this file

176 lines (137 with data), 4.5 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*****************************************************************************
Copyright © 2006, 2007, The Board of Trustees of the University of Illinois.
All Rights Reserved.
Sector: A Distributed Storage and Computing Infrastructure
National Center for Data Mining (NCDM)
University of Illinois at Chicago
http://www.ncdm.uic.edu/
Sector is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free
Software Foundation, either version 3 of the License, or (at your option)
any later version.
Sector is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see <http://www.gnu.org/licenses/>.
*****************************************************************************/
/*****************************************************************************
written by
Yunhong Gu [gu@lac.uic.edu], last updated 06/25/2007
*****************************************************************************/
#ifndef __STORE_H__
#define __STORE_H__
#include <routing.h>
#include <gmp.h>
#include <transport.h>
#include <log.h>
#include <conf.h>
#include <index.h>
#include <kb.h>
#include <fs.h>
namespace cb
{
class Server
{
public:
Server(const string& ip);
~Server();
public:
int init(char* ip = NULL, int port = 0);
int run();
private:
struct Param1
{
Server* serv_instance; // self
string client_ip; // client IP
int client_ctrl_port; // client control port
int32_t msg_id; // message ID
CCBMsg* msg; // message
};
struct Param2
{
Server* serv_instance; // self
string filename; // filename
Transport* datachn; // data channel
int mode; // file access mode
string client_ip; // client IP
int client_data_port; // client data channel port
};
struct Param3
{
Server* serv_instance; // self
string filename; // filename
string query; // query
Transport* datachn; // data channel
string client_ip; // client IP
int client_data_port; // client UDT port
};
struct Param4
{
Server* serv_instance; // self
Transport* datachn; // data channel
string client_ip; // client IP
int client_ctrl_port; // client GMP port
int client_data_port; // client data port
int speid; // speid
string function; // SPE operator
char* param; // SPE parameter
int psize; // parameter size
int rows; // number of rows per processing: -1 means all in the block
int buckets; // number of output buckets. 0: nothing, result sent back to client, -1: dump to local, n: sent to n buckets
char* locations; // locations of buckets
};
struct Param5
{
Server* serv_instance; // self
string client_ip; // client IP
int client_ctrl_port; // client GMP port
string filename; // SPE output file name
int dsnum; // number of data segments (results to expect)
CGMP* gmp; // GMP
};
static void* process(void* s);
static void* processEx(void* p1);
static void* fileHandler(void* p2);
static void* SPEHandler(void* p4);
static void* SPEShuffler(void* p5);
private:
int SPEReadData(const string& datafile, const int64_t& offset, int& size, int64_t* index, const int64_t& totalrows, char*& block);
private:
void updateOutLink();
void updateInLink();
int scanLocalFile();
private:
CRouting m_Router;
CGMP m_GMP;
LocalFileIndex m_LocalFile;
RemoteFileIndex m_RemoteFile;
string m_strLocalHost;
int m_iLocalPort;
static const int m_iKeySpace = 32;
string m_strHomeDir;
time_t m_HomeDirMTime;
CAccessLog m_AccessLog;
CPerfLog m_PerfLog;
SECTORParam m_SysConfig;
SectorFS m_SectorFS;
KnowledgeBase m_KBase;
};
class SPEResult
{
friend class Server;
public:
~SPEResult();
public:
void init(const int& n, const int& size);
void addData(const int& bucketid, const int64_t* index, const int64_t& ilen, const char* data, const int64_t& dlen);
private:
int m_iBucketNum;
int m_iSize;
vector<int32_t> m_vIndexLen;
vector<int64_t*> m_vIndex;
vector<int32_t> m_vDataLen;
vector<char*> m_vData;
};
}; //namespace
#endif