[go: up one dir, main page]

Menu

[r19]: / udt / buffer.h  Maximize  Restore  History

Download this file

405 lines (317 with data), 15.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
/*****************************************************************************
Copyright © 2001 - 2006, The Board of Trustees of the University of Illinois.
All Rights Reserved.
UDP-based Data Transfer Library (UDT) version 3
Laboratory for Advanced Computing (LAC)
National Center for Data Mining (NCDM)
University of Illinois at Chicago
http://www.lac.uic.edu/
This library is free software; you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or (at
your option) any later version.
This library is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
*****************************************************************************/
/*****************************************************************************
This header file contains the definition of UDT buffer structure and operations.
*****************************************************************************/
/*****************************************************************************
written by
Yunhong Gu [gu@lac.uic.edu], last updated 09/07/2006
*****************************************************************************/
#ifndef __UDT_BUFFER_H__
#define __UDT_BUFFER_H__
#include "udt.h"
#include "list.h"
class CSndBuffer
{
public:
CSndBuffer(const int& mss);
~CSndBuffer();
// Functionality:
// Insert a user buffer into the sending list.
// Parameters:
// 0) [in] data: pointer to the user data block.
// 1) [in] len: size of the block.
// 2) [in] handle: handle of this request IO.
// 3) [in] func: routine to process the buffer after IO completed.
// 4) [in] context: context parameter for the buffer process routine
// 5) [in] ttl: time to live in milliseconds
// 6) [in] seqno: sequence number of the first packet in the block, for DGRAM only
// 7) [in] order: if the block should be delivered in order, for DGRAM only
// Returned value:
// None.
void addBuffer(const char* data, const int& len, const int& handle, const UDT_MEM_ROUTINE func, void* context, const int& ttl = -1, const int32_t& seqno = 0, const bool& order = false);
// Functionality:
// Find data position to pack a DATA packet from the furthest reading point.
// Parameters:
// 0) [out] data: the pointer to the data position.
// 1) [in] len: Expected data length.
// 2) [out] msgno: message number of the packet.
// Returned value:
// Actual length of data read.
int readData(char** data, const int& len, int32_t& msgno);
// Functionality:
// Find data position to pack a DATA packet for a retransmission.
// Parameters:
// 0) [out] data: the pointer to the data position.
// 1) [in] offset: offset from the last ACK point.
// 2) [in] len: Expected data length.
// 3) [out] msgno: message number of the packet.
// 4) [out] seqno: sequence number of the first packet in the message
// 5) [out] msglen: length of the message
// Returned value:
// Actual length of data read.
int readData(char** data, const int offset, const int& len, int32_t& msgno, int32_t& seqno, int& msglen);
// Functionality:
// Update the ACK point and may release/unmap/return the user data according to the flag.
// Parameters:
// 0) [in] len: size of data acknowledged.
// 1) [in] payloadsize: regular payload size that UDT always try to read.
// Returned value:
// None.
void ackData(const int& len, const int& payloadsize);
// Functionality:
// Read size of data still in the sending list.
// Parameters:
// None.
// Returned value:
// Current size of the data in the sending list.
int getCurrBufSize() const;
// Functionality:
// Query the progress of the buffer sending identified by handle.
// Parameters:
// 1) [in] handle: descriptor of this overlapped IO
// 2) [out] progress: the current progress of the overlapped IO
// Returned value:
// if the overlapped IO is completed.
bool getOverlappedResult(const int& handle, int& progress);
// Functionality:
// helper function to release the user buffer.
// Parameters:
// 1) [in]: pointer to the buffer
// 2) [in]: buffer size
// Returned value:
// Current size of the data in the sending list
static void releaseBuffer(char* buf, int, void*);
private:
pthread_mutex_t m_BufLock; // used to synchronize buffer operation
struct Block
{
char* m_pcData; // pointer to the data block
int m_iLength; // length of the block
timeval m_OriginTime; // original request time
int m_iTTL; // time to live
int32_t m_iMsgNo; // message number
int32_t m_iSeqNo; // sequence number of first packet
int m_iInOrder; // flag indicating if the block should be delivered in order
int m_iHandle; // a unique handle to represent this senidng request
UDT_MEM_ROUTINE m_pMemRoutine; // function to process buffer after sending
void* m_pContext; // context parameter for the memory processing routine
Block* m_next; // next block
} *m_pBlock, *m_pLastBlock, *m_pCurrSendBlk, *m_pCurrAckBlk;
// m_pBlock: The first block
// m_pLastBlock: The last block
// m_pCurrSendBlk: The block contains the data with the largest seq. no. that has been sent
// m_pCurrAckBlk: The block contains the data with the latest ACK (= m_pBlock)
int m_iCurrBufSize; // Total size of the blocks
int m_iCurrSendPnt; // pointer to the data with the largest current seq. no.
int m_iCurrAckPnt; // pointer to the data with the latest ACK
int32_t m_iNextMsgNo; // next message number
int m_iMSS; // maximum seqment/packet size
};
////////////////////////////////////////////////////////////////////////////////
class CRcvBuffer
{
public:
CRcvBuffer(const int& mss);
CRcvBuffer(const int& mss, const int& bufsize);
~CRcvBuffer();
// Functionality:
// Find a position in the buffer to receive next packet.
// Parameters:
// 0) [out] data: the pointer to the next data position.
// 1) [in] offset: offset from last ACK point.
// 2) [in] len: size of data to be written.
// Returned value:
// true if found, otherwise false.
bool nextDataPos(char** data, int offset, const int& len);
// Functionality:
// Write data into the buffer.
// Parameters:
// 0) [in/out] data: [in] pointer to data to be copied, [out] pointer to the protocol buffer location where the data is added.
// 1) [in] offset: offset from last ACK point.
// 2) [in] len: size of data to be written.
// Returned value:
// true if a position that can hold the data is found, otherwise false.
bool addData(char** data, int offset, int len);
// Functionality:
// Move part of the data in buffer to the direction of the ACK point by some length.
// Parameters:
// 0) [in] offset: From where to move the data.
// 1) [in] len: How much to move.
// Returned value:
// None.
void moveData(int offset, const int& len);
// Functionality:
// Read data from the buffer into user buffer.
// Parameters:
// 0) [out] data: data read from protocol buffer.
// 1) [in] len: size of data to be read.
// Returned value:
// true if there is enough data to read, otherwise return false.
bool readBuffer(char* data, const int& len);
// Functionality:
// Update the ACK point of the buffer.
// Parameters:
// 0) [in] len: size of data to be acknowledged.
// Returned value:
// 1 if a user buffer is fulfilled, otherwise 0.
int ackData(const int& len);
// Functionality:
// Insert the user buffer into the protocol buffer.
// Parameters:
// 0) [in] buf: pointer to the user buffer.
// 1) [in] len: size of the user buffer.
// 2) [in] handle: descriptor of this overlapped receiving.
// 3) [in] func: buffer process routine after an overlapped IO is completed.
// 3) [in] context parameter for the buffer process routine.
// Returned value:
// Size of data that has been received by now.
int registerUserBuf(char* buf, const int& len, const int& handle, const UDT_MEM_ROUTINE func, void* context);
// Functionality:
// remove the user buffer from the protocol buffer.
// Parameters:
// None
// Returned value:
// None.
void removeUserBuf();
// Functionality:
// Query how many buffer space left for data receiving.
// Parameters:
// None.
// Returned value:
// size of available buffer space (including user buffer) for data receiving.
int getAvailBufSize() const;
// Functionality:
// Query how many data has been continuously received (for reading).
// Parameters:
// None.
// Returned value:
// size of valid (continous) data for reading.
int getRcvDataSize() const;
// Functionality:
// Query the progress of the buffer sending identified by handle.
// Parameters:
// 1) [in] handle: descriptor of this overlapped IO
// 2) [out] progress: the current progress of the overlapped IO
// Returned value:
// if the overlapped IO is completed.
bool getOverlappedResult(const int& handle, int& progress);
// Functionality:
// Query the total size of overlapped recv buffers.
// Parameters:
// None.
// Returned value:
// Total size of the pending overlapped recv buffers.
int getPendingQueueSize() const;
// Functionality:
// Initialize the received message list.
// Parameters:
// None.
// Returned value:
// None.
void initMsgList();
// Functionality:
// Check the message boundaries.
// Parameters:
// 0) [in] type: boundary type: start and/or end.
// 1) [in] msgno: message number
// 2) [in] seqno: sequence number
// 3) [in] ptr: pointer to the protocol buffer
// 4) [in] diff: size difference of an irredular packet
// Returned value:
// None.
void checkMsg(const int& type, const int32_t& msgno, const int32_t& seqno, const char* ptr, const bool& inorder, const int& diff);
// Functionality:
// acknowledgment check for the message list.
// Parameters:
// 0) [in] ackno: latest acknowledged sequence number.
// 1) [in] rll: receiver's loss list
// Returned value:
// None.
bool ackMsg(const int32_t& ackno, const CRcvLossList* rll);
// Functionality:
// mark the message to be dropped from the message list.
// Parameters:
// 0) [in] msgno: message nuumer.
// Returned value:
// None.
void dropMsg(const int32_t& msgno);
// Functionality:
// read a message.
// Parameters:
// 0) [out] data: buffer to write the message into.
// 1) [in] len: size of the buffer.
// Returned value:
// actuall size of data read.
int readMsg(char* data, const int& len);
// Functionality:
// get the number of valid message currently available.
// Parameters:
// None.
// Returned value:
// number of valid message.
int getValidMsgCount();
private:
char* m_pcData; // pointer to the protocol buffer
int m_iSize; // size of the protocol buffer
int m_iStartPos; // the head position for I/O (inclusive)
int m_iLastAckPos; // the last ACKed position (exclusive)
// EMPTY: m_iStartPos = m_iLastAckPos FULL: m_iStartPos = m_iLastAckPos + 1
int m_iMaxOffset; // the furthest "dirty" position (absolute distance from m_iLastAckPos)
char* m_pcUserBuf; // pointer to the user registered buffer
int m_iUserBufSize; // size of the user buffer
int m_iUserBufAck; // last ACKed position of the user buffer
int m_iHandle; // unique handle to represet this IO request
UDT_MEM_ROUTINE m_pMemRoutine; // function to process user buffer after receiving
void* m_pContext; // context parameter for the buffer processing routine
struct Block
{
char* m_pcData; // pointer to the overlapped recv buffer
int m_iLength; // length of the block
int m_iHandle; // a unique handle to represent this receiving request
UDT_MEM_ROUTINE m_pMemRoutine; // function to process buffer after a complete receiving
void* m_pContext; // context parameter for the buffer processing routine
Block* m_next; // next block
} *m_pPendingBlock, *m_pLastBlock;
// m_pPendingBlock: // the list of pending overlapped recv buffers
// m_pLastBlock: // the last block of pending buffers
int m_iPendingSize; // total size of pending recv buffers
struct MsgInfo
{
char* m_pcData; // location of the message in the protocol buffer
int32_t m_iMsgNo; // message number
int32_t m_iStartSeq; // sequence number of the first packet in the message
int32_t m_iEndSeq; // sequence number of the last packet in the message
int m_iSizeDiff; // the size difference of the last packet (that may be an irregular sized packet)
int m_iLength; // length of this message
bool m_bValid; // if the message is valid
bool m_bDropped; // if the message is droped by the sender
bool m_bInOrder; // if the message should be delivered in order
} *m_pMessageList; // a list of the received message
int m_iMsgInfoSize; // size of the message info list
int m_iPtrFirstMsg; // pointer to the first message in the list
int m_iPtrRecentACK; // the most recent ACK'ed message
int32_t m_iLastMsgNo; // the last msg no ever received
pthread_mutex_t m_MsgLock; // used to synchronize MsgInfo operation
int m_iValidMsgCount; // number valid message
int m_iMSS; // maximum seqment/packet size
};
#endif