[go: up one dir, main page]

Menu

[8f591a]: / io / iobase.h  Maximize  Restore  History

Download this file

578 lines (495 with data), 14.6 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
#ifndef IOBASE_HEADER
#define IOBASE_HEADER
/***************************************************************************
* iobase.h
*
* Sat Aug 24 23:54:44 2002
* Copyright 2002 Roman Dementiev
* dementiev@mpi-sb.mpg.de
****************************************************************************/
#define STXXL_IO_STATS
#if defined(__linux__)
#define STXXL_CHECK_BLOCK_ALIGNING
#endif
//#if defined(__linux__)
//# if !defined(O_DIRECT) && (defined(__alpha__) || defined(__i386__))
//# define O_DIRECT 040000 /* direct disk access */
//# endif
//#endif
//#ifdef __sun__
//#define O_DIRECT 0
//#endif
#include <iostream>
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <errno.h>
#include <sys/resource.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <stdlib.h>
// STL includes
#include <algorithm>
#include <string>
#include <queue>
#include <map>
#include <set>
#ifndef O_SYNC
#define O_SYNC 0
#endif
#ifndef O_RSYNC
#define O_RSYNC 0
#endif
#ifndef O_DSYNC
#define O_DSYNC 0
#endif
#if defined(__linux__)
//# include <asm/fcntl.h>
# if !defined(O_DIRECT) && (defined(__alpha__) || defined(__i386__))
# define O_DIRECT 040000 /* direct disk access */
# endif
#endif
#ifndef O_DIRECT
#define O_DIRECT O_SYNC
#endif
#include "iostats.h"
#include "../common/utils.h"
#include "../common/semaphore.h"
#include "../common/mutex.h"
//#include "../common/rwlock.h"
#include "../common/switch.h"
#include "../common/state.h"
#include "completion_handler.h"
__STXXL_BEGIN_NAMESPACE
//! \defgroup iolayer I/O primitives layer
//! Group of classes which enable abstraction from operating system calls and support
//! system-independent interfaces for asynchronous I/O.
//! \{
#define BLOCK_ALIGN 4096
typedef void *(*thread_function_t) (void *);
typedef long long int DISKID;
class request;
class request_ptr;
//! \brief Default completion handler class
struct default_completion_handler
{
//! \brief An operator that does nothing
void operator() (request *) { };
};
//! \brief Defines interface of file
//! It is a base class for different implementations that might
//! base on various file systems or even remote storage interfaces
class file
{
//! \brief private constructor
//! \remark instantiation of file without id is forbidden
file ()
{
};
protected:
int id;
//! \brief Initializes file object
//! \param _id file identifier
//! \remark Called in implementations of file
file (int _id):id (_id) { };
public:
//! \brief Definition of acceptable file open modes
//! Various open modes in a file system must be
//! converted to this set of acceptable modes
enum open_mode
{
RDONLY = 1, //!< only reading of the file is allowed
WRONLY = 2, //!< only writing of the file is allowed
RDWR = 4, //!< read and write of the file are allowed
CREAT = 8, //!< in case file does not exist no error occurs and file is newly created
DIRECT = 16,//!< I/Os proceed bypassing file system buffers, i.e. unbuffered I/O
TRUNC = 32 //!< once file is opened its length becomes zero
};
//! \brief Schedules asynchronous read request to the file
//! \param buffer pointer to memory buffer to read into
//! \param pos starting file position to read
//! \param bytes number of bytes to transfer
//! \param on_cmpl I/O completion handler
//! \return \c request_ptr object, that can be used to track the status of the operation
virtual request_ptr aread (void *buffer, off_t pos, size_t bytes,
completion_handler on_cmpl ) = 0;
//! \brief Schedules asynchronous write request to the file
//! \param buffer pointer to memory buffer to write from
//! \param pos starting file position to write
//! \param bytes number of bytes to transfer
//! \param on_cmpl I/O completion handler
//! \return \c request_ptr object, that can be used to track the status of the operation
virtual request_ptr awrite (void *buffer, off_t pos, size_t bytes,
completion_handler on_cmpl ) = 0;
//! \brief Changes the size of the file
//! \param newsize value of the new file size
virtual void set_size (off_t newsize) = 0;
//! \brief Returns size of the file
//! \return file size in bytes
virtual off_t size () = 0;
//! \brief depricated, use \c stxxl::file::get_id() instead
int get_disk_number ()
{
return id;
};
//! \brief Returns file's identifier
//! \remark might be used as disk's id in case disk to file mapping
//! \return integer file identifier, passed as constructor parameter
int get_id()
{
return id;
};
virtual ~ file () { };
};
class mc;
class disk_queue;
class disk_queues;
//! \brief Defines interface of request
//! Since all library I/O operations are asynchronous,
//! one needs to keep track of their status: whether
//! an I/O completed or not.
class request
{
friend int wait_any(request_ptr req_array[], int count);
template <class request_iterator_> friend
request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end);
friend class file;
friend class disk_queue;
friend class disk_queues;
friend class request_ptr;
protected:
virtual bool add_waiter (onoff_switch * sw) = 0;
virtual void delete_waiter (onoff_switch * sw) = 0;
//virtual void enqueue () = 0;
virtual void serve () = 0;
completion_handler on_complete;
int ref_cnt;
mutex ref_cnt_mutex;
enum request_type { READ, WRITE };
void completed ()
{
on_complete(this);
};
public:
request(completion_handler on_compl):on_complete(on_compl),ref_cnt(0)
{
STXXL_VERBOSE3("request "<< unsigned(this) <<": creation, cnt: "<<ref_cnt)
}
//! \brief Suspends calling thread until completion of the request
virtual void wait () = 0;
//! \brief Polls the status of the request
//! \return \c true if request is comleted, otherwise \c false
virtual bool poll () = 0;
//! \brief Identifies the type of request I/O implementation
//! \return pointer to null terminated string of characters, containing the name of I/O implementation
virtual const char * io_type ()
{
return "none";
};
virtual ~request()
{
STXXL_VERBOSE3("request "<< unsigned(this) <<": deletion, cnt: "<<ref_cnt)
}
private:
// Following methods are declared but not implemented
// intentionnaly to forbid their usage
request(const request &);
request & operator=(const request &);
request();
void add_ref()
{
ref_cnt_mutex.lock();
STXXL_VERBOSE3("request "<< unsigned(this) <<": adding reference, cnt: "<<ref_cnt)
ref_cnt++;
ref_cnt_mutex.unlock();
}
bool sub_ref()
{
ref_cnt_mutex.lock();
STXXL_VERBOSE3("request "<< unsigned(this) <<": subtracting reference cnt: "<<ref_cnt)
int val=--ref_cnt;
ref_cnt_mutex.unlock();
assert(val>=0);
return (val==0);
}
};
//! \brief A smart wrapper for \c request pointer.
//! Implemented as reference counting smart pointer.
class request_ptr
{
request * ptr;
void add_ref()
{
if(ptr)
{
ptr->add_ref();
}
}
void sub_ref()
{
if(ptr)
{
if(ptr->sub_ref())
{
delete ptr;
ptr = NULL;
}
}
}
public:
//! \brief Constucts an \c request_ptr from \c request pointer
request_ptr(request *ptr_=NULL):ptr(ptr_) { add_ref(); }
//! \brief Constucts an \c request_ptr from a \c request_ptr object
request_ptr(const request_ptr & p): ptr(p.ptr) { add_ref(); }
//! \brief Destructor
~request_ptr() { sub_ref(); }
//! \brief Assignment operator from \c request_ptr object
//! \return reference to itself
request_ptr & operator= (const request_ptr & p)
{
assert(p.ptr);
return (*this = p.ptr);
}
//! \brief Assignment operator from \c request pointer
//! \return reference to itself
request_ptr & operator= (request * p)
{
if(p != ptr)
{
sub_ref();
ptr = p;
add_ref();
}
return *this;
}
//! \brief "Star" operator
//! \return reference to owned \c request object
request & operator * () const
{
assert(ptr);
return *ptr;
}
//! \brief "Arrow" operator
//! \return pointer to owned \c request object
request * operator -> () const
{
assert(ptr);
return ptr;
}
//! \brief Access to owned \c request object (sinonym for \c operator->() )
//! \return reference to owned \c request object
//! \warning Creation another \c request_ptr from the returned \c request or deletion
//! causes unpredictable behaviour. Do not do that!
request * get() const { return ptr; }
//! \brief Returns true if object is initialized
bool valid() const { return ptr; }
//! \brief Returns true if object is not initialized
bool empty() const { return !ptr; }
};
//! \brief Collection of functions to track statuses of a number of requests
//! \brief Suspends calling thread until \b any of requests is completed
//! \param req_array array of \c request_ptr objects
//! \param count size of req_array
//! \return index in req_array pointing to the \b first completed request
inline int wait_any(request_ptr req_array[], int count);
//! \brief Suspends calling thread until \b all requests are completed
//! \param req_array array of request_ptr objects
//! \param count size of req_array
inline void wait_all(request_ptr req_array[], int count);
//! \brief Polls requests
//! \param req_array array of request_ptr objects
//! \param count size of req_array
//! \param index contains index of the \b first completed request if any
//! \return \c true if any of requests is completed, then index contains valid value, otherwise \c false
inline bool poll_any (request_ptr req_array[], int count,int &index);
void wait_all(request_ptr req_array[], int count)
{
for (int i = 0; i < count; i++)
{
req_array[i]->wait ();
}
}
template <class request_iterator_>
void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
{
while(reqs_begin != reqs_end)
{
(request_ptr(*reqs_begin))->wait ();
++reqs_begin;
}
}
bool poll_any (request_ptr req_array[], int count, int &index)
{
index = -1;
for (int i = 0; i < count; i++)
{
if (req_array[i]->poll())
{
index = i;
return true;
}
};
return false;
}
template <class request_iterator_>
request_iterator_ poll_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
{
while(reqs_begin != reqs_end)
{
if ((request_ptr(*reqs_begin))->poll())
return reqs_begin;
++reqs_begin;
};
return reqs_end;
}
int wait_any (request_ptr req_array[], int count)
{
START_COUNT_WAIT_TIME
onoff_switch sw;
int i = 0, index = -1;
for (; i < count; i++)
{
if (req_array[i]->add_waiter (&sw))
{
// already done
index = i;
while (--i >= 0)
req_array[i]->delete_waiter (&sw);
END_COUNT_WAIT_TIME
return index;
}
}
sw.wait_for_on ();
for (i = 0; i < count; i++)
{
req_array[i]->delete_waiter (&sw);
if (index < 0 && req_array[i]->poll ())
index = i;
}
END_COUNT_WAIT_TIME
return index;
}
template <class request_iterator_>
request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
{
START_COUNT_WAIT_TIME
onoff_switch sw;
request_iterator_ cur = reqs_begin, result = reqs_end;
for (; cur != reqs_end; cur++)
{
if ((request_ptr(*cur))->add_waiter (&sw))
{
// already done
result = cur;
if(cur != reqs_begin)
{
while (--cur != reqs_begin)
(request_ptr(*cur))->delete_waiter (&sw);
(request_ptr(*cur))->delete_waiter (&sw);
}
END_COUNT_WAIT_TIME
return result;
}
}
sw.wait_for_on ();
for (cur = reqs_begin; cur != reqs_end; cur++)
{
(request_ptr(*cur))->delete_waiter (&sw);
if (result == reqs_end && (request_ptr(*cur))->poll ())
result = cur;
}
END_COUNT_WAIT_TIME
return result;
}
class disk_queue
{
public:
enum priority_op { READ, WRITE, NONE };
private:
mutex write_mutex;
mutex read_mutex;
std::queue < request_ptr > write_queue;
std::queue < request_ptr > read_queue;
semaphore sem;
pthread_t thread;
priority_op _priority_op;
#ifdef STXXL_IO_STATS
stats *iostats;
#endif
static void *worker (void *arg);
public:
disk_queue (int n = 1); // max number of requests simultainenously submitted to disk
void set_priority_op (priority_op op)
{
_priority_op = op;
};
void add_readreq (request_ptr & req);
void add_writereq (request_ptr & req);
~disk_queue ();
};
//! \brief Encapsulates disk queues
//! \remark is a singleton
class disk_queues
{
protected:
std::map < DISKID, disk_queue * > queues;
disk_queues ()
{
};
public:
void add_readreq (request_ptr & req, DISKID disk)
{
if (queues.find (disk) == queues.end ())
{
// create new disk queue
queues[disk] = new disk_queue ();
}
queues[disk]->add_readreq (req);
};
void add_writereq (request_ptr & req, DISKID disk)
{
if (queues.find (disk) == queues.end ())
{
// create new disk queue
queues[disk] = new disk_queue ();
}
queues[disk]->add_writereq (req);
};
~disk_queues ()
{
// deallocate all queues
for (std::map < DISKID, disk_queue * >::iterator i =
queues.begin (); i != queues.end (); i++)
delete (*i).second;
};
static disk_queues *get_instance ()
{
if (!instance)
instance = new disk_queues ();
return instance;
};
//! \brief Changes requests priorities
//! \param op one of:
//! - READ, read requests are served before write requests within a disk queue
//! - WRITE, write requests are served before read requests within a disk queue
//! - NONE, read and write requests are served by turns, alternately
void set_priority_op(disk_queue::priority_op op)
{
for (std::map < DISKID, disk_queue * >::iterator i =
queues.begin (); i != queues.end (); i++)
i->second->set_priority_op (op);
};
private:
static disk_queues *instance;
};
#ifdef COUNT_WAIT_TIME
extern double wait_time_counter;
#endif
//! \}
__STXXL_END_NAMESPACE
#endif