[go: up one dir, main page]

Menu

[r66]: / client / dcclient.h  Maximize  Restore  History

Download this file

177 lines (132 with data), 4.2 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*****************************************************************************
Copyright © 2006, 2007, The Board of Trustees of the University of Illinois.
All Rights Reserved.
Sector: A Distributed Storage and Computing Infrastructure
National Center for Data Mining (NCDM)
University of Illinois at Chicago
http://www.ncdm.uic.edu/
Sector is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free
Software Foundation, either version 3 of the License, or (at your option)
any later version.
Sector is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see <http://www.gnu.org/licenses/>.
*****************************************************************************/
/*****************************************************************************
written by
Yunhong Gu [gu@lac.uic.edu], last updated 06/25/2007
*****************************************************************************/
#ifndef __SECTOR_H__
#define __SECTOR_H__
#include "client.h"
#include <string>
#include <pthread.h>
#include <transport.h>
using namespace std;
namespace cb
{
class Process;
class Stream
{
public:
Stream();
~Stream();
public:
int init(const vector<string>& files);
int init(const int& num);
void setName(const string& name);
int setSeg(const int64_t& start, const int64_t& end);
int getSeg(int64_t& start, int64_t& end);
int getSize(int64_t& size);
public:
string m_strName;
vector<string> m_vFiles; // list of files
vector<int64_t> m_vSize; // size per file
vector<int64_t> m_vRecNum; // number of record per file
vector< set<Node, NodeComp> > m_vLocation; // locations for each bucket
int m_iFileNum; // number of files
int64_t m_llSize; // total data size
int64_t m_llRecNum; // total number of records
int64_t m_llStart; // start point (record)
int64_t m_llEnd; // end point (record), -1 means the last record
int m_iStatus; // 0: uninitialized, 1: initialized, -1: bad
};
struct Result
{
int m_iResID;
char* m_pcData;
int m_iDataLen;
int64_t* m_pllIndex;
int m_iIndexLen;
string m_strOrigFile;
int64_t m_llOrigStartRec;
int64_t m_llOrigEndRec;
string m_strIP;
int m_iPort;
};
class Process
{
friend class Client;
public:
Process();
~Process();
int run(const Stream& input, Stream& output, string op, const int& rows, const char* param = NULL, const int& size = 0);
// rows:
// n (n > 0): n rows per time
// 0: no rows, one file per time
// -1: all rows
int read(Result*& res, const bool& inorder = true, const bool& wait = true);
int checkProgress();
int close();
private:
string m_strOperator;
char* m_pcParam;
int m_iParamSize;
Stream* m_pInput;
Stream* m_pOutput;
int m_iOutputType;
struct DS
{
int m_iID;
string m_strDataFile;
int64_t m_llOffset;
int64_t m_llSize;
int m_iSPEID;
int m_iStatus; // 0: not started yet; 1: in progress; 2: done, result ready; 3: result read
Result* m_pResult;
};
vector<DS> m_vDS;
struct SPE
{
uint32_t m_uiID;
string m_strIP;
int m_iPort;
DS* m_pDS;
int m_iStatus; // -1: bad; 0: ready; 1; running
int m_iProgress;
timeval m_StartTime;
timeval m_LastUpdateTime;
Transport m_DataChn;
};
vector<SPE> m_vSPE;
int m_iProgress; // progress, 0..100
int m_iAvgRunTime; // average running time, in seconds
int m_iTotalDS;
int m_iTotalSPE;
int m_iAvailRes;
pthread_mutex_t m_ResLock;
pthread_cond_t m_ResCond;
int m_iMinUnitSize;
int m_iMaxUnitSize;
CGMP m_GMP;
private:
int prepareSPE();
static void* run(void*);
int checkSPE(bool locsense, map<string, Node>& datalocmap);
int startSPE(SPE& s, bool locsense, map<string, Node>& datalocmap);
};
}; // namespace cb
#endif