[go: up one dir, main page]

Menu

[r39]: / udt / core.h  Maximize  Restore  History

Download this file

464 lines (372 with data), 22.1 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
/*****************************************************************************
Copyright © 2001 - 2007, The Board of Trustees of the University of Illinois.
All Rights Reserved.
UDP-based Data Transfer Library (UDT) version 3
Laboratory for Advanced Computing (LAC)
National Center for Data Mining (NCDM)
University of Illinois at Chicago
http://www.lac.uic.edu/
This library is free software; you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or (at
your option) any later version.
This library is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
*****************************************************************************/
/*****************************************************************************
This header file contains the definition of UDT buffer structure and operations.
*****************************************************************************/
/*****************************************************************************
written by
Yunhong Gu [gu@lac.uic.edu], last updated 05/15/2007
*****************************************************************************/
#ifndef __UDT_CORE_H__
#define __UDT_CORE_H__
#include "udt.h"
#include "common.h"
#include "list.h"
#include "buffer.h"
#include "window.h"
#include "packet.h"
#include "channel.h"
#include "api.h"
#include "ccc.h"
class UDT_API CUDT
{
friend struct CUDTSocket;
friend class CUDTUnited;
friend class CCC;
private: // constructor and desctructor
CUDT();
CUDT(const CUDT& ancestor);
const CUDT& operator=(const CUDT&) {return *this;}
~CUDT();
public: //API
static UDTSOCKET socket(int af, int type = SOCK_STREAM, int protocol = 0);
static int bind(UDTSOCKET u, const sockaddr* name, int namelen);
static int listen(UDTSOCKET u, int backlog);
static UDTSOCKET accept(UDTSOCKET u, sockaddr* addr, int* addrlen);
static int connect(UDTSOCKET u, const sockaddr* name, int namelen);
static int close(UDTSOCKET u);
static int getpeername(UDTSOCKET u, sockaddr* name, int* namelen);
static int getsockname(UDTSOCKET u, sockaddr* name, int* namelen);
static int getsockopt(UDTSOCKET u, int level, UDTOpt optname, void* optval, int* optlen);
static int setsockopt(UDTSOCKET u, int level, UDTOpt optname, const void* optval, int optlen);
static int shutdown(UDTSOCKET u, int how);
static int send(UDTSOCKET u, const char* buf, int len, int flags = 0, int* handle = NULL, UDT_MEM_ROUTINE routine = NULL, void* context = NULL);
static int recv(UDTSOCKET u, char* buf, int len, int flags = 0, int* handle = NULL, UDT_MEM_ROUTINE routine = NULL, void* context = NULL);
static int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false);
static int recvmsg(UDTSOCKET u, char* buf, int len);
static int64_t sendfile(UDTSOCKET u, std::ifstream& ifs, const int64_t& offset, const int64_t& size, const int& block = 366000);
static int64_t recvfile(UDTSOCKET u, std::ofstream& ofs, const int64_t& offset, const int64_t& size, const int& block = 7320000);
static bool getoverlappedresult(UDTSOCKET u, int handle, int& progress, bool wait = false);
static int select(int nfds, ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout);
static CUDTException& getlasterror();
static int perfmon(UDTSOCKET u, CPerfMon* perf, bool clear = true);
public: // internal API
static bool isUSock(UDTSOCKET u);
static CUDT* getUDTHandle(UDTSOCKET u);
private:
// Functionality:
// initialize a UDT entity and bind to a local address.
// Parameters:
// 0) [in] addr: pointer to the local address to be bound to.
// Returned value:
// None.
void open(const sockaddr* addr = NULL);
// Functionality:
// Start listening to any connection request.
// Parameters:
// None.
// Returned value:
// None.
void listen();
// Functionality:
// Connect to a UDT entity listening at address "peer".
// Parameters:
// 0) [in] peer: The address of the listening UDT entity.
// Returned value:
// None.
void connect(const sockaddr* peer);
// Functionality:
// Connect to a UDT entity listening at address "peer", which has sent "hs" request.
// Parameters:
// 0) [in] peer: The address of the listening UDT entity.
// 1) [in/out] hs: The handshake information sent by the peer side (in), negotiated value (out).
// Returned value:
// None.
void connect(const sockaddr* peer, CHandShake* hs);
// Functionality:
// Close the opened UDT entity.
// Parameters:
// None.
// Returned value:
// None.
void close();
// Functionality:
// Request UDT to send out a data block "data" with size of "len".
// Parameters:
// 0) [in] data: The address of the application data to be sent.
// 1) [in] len: The size of the data block.
// 2) [in, out] overlapped: A pointer to the returned overlapped IO handle.
// 3) [in] func: pointer to a function to process the buffer after overlapped IO is completed.
// Returned value:
// Actual size of data sent.
int send(char* data, const int& len, int* overlapped = NULL, const UDT_MEM_ROUTINE func = NULL, void* context = NULL);
// Functionality:
// Request UDT to receive data to a memory block "data" with size of "len".
// Parameters:
// 0) [out] data: data received.
// 1) [in] len: The desired size of data to be received.
// 2) [out] overlapped: A pointer to the returned overlapped IO handle.
// 3) [in] unused.
// Returned value:
// Actual size of data received.
int recv(char* data, const int& len, int* overlapped = NULL, const UDT_MEM_ROUTINE func = NULL, void* context = NULL);
// Functionality:
// send a message of a memory block "data" with size of "len".
// Parameters:
// 0) [out] data: data received.
// 1) [in] len: The desired size of data to be received.
// 2) [in] ttl: the time-to-live of the message.
// 3) [in] inorder: if the message should be delivered in order.
// Returned value:
// Actual size of data sent.
int sendmsg(const char* data, const int& len, const int& ttl, const bool& inorder);
// Functionality:
// Receive a message to buffer "data".
// Parameters:
// 0) [out] data: data received.
// 1) [in] len: size of the buffer.
// Returned value:
// Actual size of data received.
int recvmsg(char* data, const int& len);
// Functionality:
// query the result of an overlapped IO indicated by "handle".
// Parameters:
// 0) [in] handle: the handle that indicates the submitted overlapped IO.
// 1) [out] progess: how many data left to be sent/receive.
// 2) [in] wait: wait for the IO finished or not.
// Returned value:
// if the overlapped IO is completed.
bool getOverlappedResult(const int& handle, int& progress, const bool& wait = false);
// Functionality:
// Request UDT to send out a file described as "fd", starting from "offset", with size of "size".
// Parameters:
// 0) [in] ifs: The input file stream.
// 1) [in] offset: From where to read and send data;
// 2) [in] size: How many data to be sent.
// 3) [in] block: size of block per read from disk
// Returned value:
// Actual size of data sent.
int64_t sendfile(std::ifstream& ifs, const int64_t& offset, const int64_t& size, const int& block = 366000);
// Functionality:
// Request UDT to receive data into a file described as "fd", starting from "offset", with expected size of "size".
// Parameters:
// 0) [out] ofs: The output file stream.
// 1) [in] offset: From where to write data;
// 2) [in] size: How many data to be received.
// 3) [in] block: size of block per write to disk
// Returned value:
// Actual size of data received.
int64_t recvfile(std::ofstream& ofs, const int64_t& offset, const int64_t& size, const int& block = 7320000);
// Functionality:
// Configure UDT options.
// Parameters:
// 0) [in] optName: The enum name of a UDT option.
// 1) [in] optval: The value to be set.
// 2) [in] optlen: size of "optval".
// Returned value:
// None.
void setOpt(UDTOpt optName, const void* optval, const int& optlen);
// Functionality:
// Read UDT options.
// Parameters:
// 0) [in] optName: The enum name of a UDT option.
// 1) [in] optval: The value to be returned.
// 2) [out] optlen: size of "optval".
// Returned value:
// None.
void getOpt(UDTOpt optName, void* optval, int& optlen);
// Functionality:
// read the performance data since last sample() call.
// Parameters:
// 0) [in, out] perf: pointer to a CPerfMon structure to record the performance data.
// 1) [in] clear: flag to decide if the local performance trace should be cleared.
// Returned value:
// None.
void sample(CPerfMon* perf, bool clear = true);
private:
static CUDTUnited s_UDTUnited; // UDT global management base
public:
static const UDTSOCKET INVALID_SOCK; // invalid socket descriptor
static const int ERROR; // socket api error returned value
private:
UDTSOCKET m_SocketID; // UDT socket number
int m_iSockType; // Type of the UDT connection (SOCK_STREAM or SOCK_DGRAM)
private: // Version
const int m_iVersion; // UDT version, for compatibility use
private: // Threads, data channel, and timing facility
#ifndef WIN32
bool m_bSndThrStart; // lazy snd thread creation
#endif
pthread_t m_SndThread; // Sending thread
pthread_t m_RcvThread; // Receiving thread
CChannel* m_pChannel; // UDP channel
CTimer* m_pTimer; // Timing facility
uint64_t m_ullCPUFrequency; // CPU clock frequency, used for Timer
private: // Timing intervals
const int m_iSYNInterval; // Periodical Rate Control Interval, 10 microseconds
const int m_iSelfClockInterval; // ACK interval for self-clocking
private: // Packet size and sequence number attributes
int m_iPktSize; // Maximum/regular packet size, in bytes
int m_iPayloadSize; // Maximum/regular payload size, in bytes
private: // Options
int m_iMSS; // Maximum Segment Size
bool m_bSynSending; // Sending syncronization mode
bool m_bSynRecving; // Receiving syncronization mode
int m_iFlightFlagSize; // Maximum number of packets in flight from the peer side
int m_iSndQueueLimit; // Maximum length of the sending buffer queue
int m_iUDTBufSize; // UDT buffer size (for receiving)
linger m_Linger; // Linger information on close
int m_iUDPSndBufSize; // UDP sending buffer size
int m_iUDPRcvBufSize; // UDP receiving buffer size
int m_iMaxMsg; // Maximum message size of datagram UDT connection
int m_iMsgTTL; // Time-to-live of a datagram message, in microseconds
int m_iIPversion; // IP version
bool m_bRendezvous; // Rendezvous connection mode
int m_iSndTimeOut; // sending timeout in milliseconds
int m_iRcvTimeOut; // receiving timeout in milliseconds
const int m_iQuickStartPkts; // Number of packets to be sent as a quick start
private: // CCC
CCCVirtualFactory* m_pCCFactory; // Factory class to create a specific CC instance
CCC* m_pCC; // custom congestion control class
private: // Status
volatile bool m_bListening; // If the UDT entit is listening to connection
volatile bool m_bConnected; // Whether the connection is on or off
volatile bool m_bClosing; // If the UDT entity is closing
volatile bool m_bShutdown; // If the peer side has shutdown the connection
volatile bool m_bBroken; // If the connection has been broken
bool m_bOpened; // If the UDT entity has been opened
bool m_bSndSlowStart; // If UDT is during slow start phase (snd side flag)
bool m_bRcvSlowStart; // If UDT is during slow start phase (rcv side flag)
bool m_bFreeze; // freeze the data sending
int m_iEXPCount; // Expiration counter
int m_iBandwidth; // Estimated bandwidth
private: // connection setup
pthread_t m_ListenThread;
#ifndef WIN32
static void* listenHandler(void* listener);
#else
static DWORD WINAPI listenHandler(LPVOID listener);
#endif
private: // Sending related data
CSndBuffer* m_pSndBuffer; // Sender buffer
CSndLossList* m_pSndLossList; // Sender loss list
CPktTimeWindow* m_pSndTimeWindow; // Packet sending time window
volatile uint64_t m_ullInterval; // Inter-packet time, in CPU clock cycles
uint64_t m_ullLastDecRate; // inter-packet time when last decrease occurs
uint64_t m_ullTimeDiff; // aggregate difference in inter-packet time
volatile int m_iFlowWindowSize; // Flow control window size
int m_iMaxFlowWindowSize; // Maximum flow window size = flight flag size of the peer side
volatile double m_dCongestionWindow; // congestion window size
int m_iNAKCount; // NAK counter
int m_iDecRandom; // random threshold on decrease by number of loss events
int m_iAvgNAKNum; // average number of NAKs per congestion
int m_iDecCount; // number of rate decrease in the current congestion period
timeval m_LastSYNTime; // the timestamp when last rate control occured
bool m_bLoss; // if there is any loss during last RC period
volatile int32_t m_iSndLastAck; // Last ACK received
int32_t m_iSndLastDataAck; // The real last ACK that updates the sender buffer and loss list
int32_t m_iSndCurrSeqNo; // The largest sequence number that has been sent
int32_t m_iLastDecSeq; // Sequence number sent last decrease occurs
int32_t m_iISN; // Initial Sequence Number
private: // Receiving related data
CRcvBuffer* m_pRcvBuffer; // Receiver buffer
CRcvLossList* m_pRcvLossList; // Receiver loss list
CIrregularPktList* m_pIrrPktList; // Irregular sized packet list
CACKWindow* m_pACKWindow; // ACK history window
CPktTimeWindow* m_pRcvTimeWindow; // Packet arrival time window
int m_iRTT; // RTT
int m_iRTTVar; // RTT variance
int32_t m_iRcvLastAck; // Last sent ACK
uint64_t m_ullLastAckTime; // Timestamp of last ACK
int32_t m_iRcvLastAckAck; // Last sent ACK that has been acknowledged
int32_t m_iAckSeqNo; // Last ACK sequence number
int32_t m_iRcvCurrSeqNo; // Largest received sequence number
int32_t m_iNextExpect; // Sequence number of next speculated packet to receive
volatile bool m_bReadBuf; // Application has called "recv" but has not finished
volatile char* m_pcTempData; // Pointer to the buffer that application want to put received data into
volatile int m_iTempLen; // Size of the "m_pcTempData"
volatile UDT_MEM_ROUTINE m_pTempRoutine; // pointer to a routine function to process "m_pcTempData"
volatile void* m_pTempContext; // context parameter for "m_pTempRoutine"
int32_t m_iUserBufBorder; // Sequence number of last packet that will fulfill a user buffer
uint64_t m_ullLastWarningTime; // Last time that a warning message is sent
int32_t m_iPeerISN; // Initial Sequence Number of the peer side
int m_iFlowControlWindow; // flow control window size to be advertised
private: // Overlapped IO related
int m_iSndHandle; // seed used to generate an overlapped sending handle
int m_iRcvHandle; // seed used to generate an overlapped receiving handle
private: // synchronization: mutexes and conditions
pthread_mutex_t m_ConnectionLock; // used to synchronize connection operation
pthread_cond_t m_SendDataCond; // used to block sending when there is no data
pthread_mutex_t m_SendDataLock; // lock associated to m_SendDataCond
pthread_cond_t m_SendBlockCond; // used to block "send" call
pthread_mutex_t m_SendBlockLock; // lock associated to m_SendBlockCond
pthread_mutex_t m_AckLock; // used to protected sender's loss list when processing ACK
pthread_cond_t m_WindowCond; // used to block sending when flow window is exceeded
pthread_mutex_t m_WindowLock; // lock associated to m_WindowLock
pthread_cond_t m_RecvDataCond; // used to block "recv" when there is no data
pthread_mutex_t m_RecvDataLock; // lock associated to m_RecvDataCond
pthread_cond_t m_OverlappedRecvCond; // used to block "recv" when overlapped receving is in progress
pthread_mutex_t m_OverlappedRecvLock; // lock associated to m_OverlappedRecvCond
pthread_mutex_t m_HandleLock; // used to generate unique send/recv handle
pthread_mutex_t m_SendLock; // used to synchronize "send" call
pthread_mutex_t m_RecvLock; // used to synchronize "recv" call
void initSynch();
void destroySynch();
void releaseSynch();
private: // Thread handlers
#ifndef WIN32
static void* sndHandler(void* sender);
static void* rcvHandler(void* recver);
#else
static DWORD WINAPI sndHandler(LPVOID sender);
static DWORD WINAPI rcvHandler(LPVOID recver);
#endif
private: // congestion control
void rateControl();
void flowControl(const int& recvrate);
private: // Generation and processing of control packet
void sendCtrl(const int& pkttype, void* lparam = NULL, void* rparam = NULL, const int& size = 0);
void processCtrl(CPacket& ctrlpkt);
private: // Trace
timeval m_StartTime; // timestamp when the UDT entity is started
int64_t m_llSentTotal; // total number of sent data packets, including retransmissions
int64_t m_llRecvTotal; // total number of received packets
int m_iSndLossTotal; // total number of lost packets (sender side)
int m_iRcvLossTotal; // total number of lost packets (receiver side)
int m_iRetransTotal; // total number of retransmitted packets
int m_iSentACKTotal; // total number of sent ACK packets
int m_iRecvACKTotal; // total number of received ACK packets
int m_iSentNAKTotal; // total number of sent NAK packets
int m_iRecvNAKTotal; // total number of received NAK packets
timeval m_LastSampleTime; // last performance sample time
int64_t m_llTraceSent; // number of pakctes sent in the last trace interval
int64_t m_llTraceRecv; // number of pakctes received in the last trace interval
int m_iTraceSndLoss; // number of lost packets in the last trace interval (sender side)
int m_iTraceRcvLoss; // number of lost packets in the last trace interval (receiver side)
int m_iTraceRetrans; // number of retransmitted packets in the last trace interval
int m_iSentACK; // number of ACKs sent in the last trace interval
int m_iRecvACK; // number of ACKs received in the last trace interval
int m_iSentNAK; // number of NAKs sent in the last trace interval
int m_iRecvNAK; // number of NAKs received in the last trace interval
private: // internal data
char* m_pcTmpBuf;
};
#endif