547 lines (439 with data), 15.0 kB
#ifndef SORT_HEADER
#define SORT_HEADER
/***************************************************************************
* sort.h
*
* Fri Jan 17 11:26:42 2003
* Copyright 2003 Roman Dementiev
* dementiev@mpi-sb.mpg.de
****************************************************************************/
#include <list>
#include "../mng/mng.h"
#include "../common/rand.h"
#include "../mng/adaptor.h"
#include "../common/simple_vector.h"
#include "../common/switch.h"
#include "interleaved_alloc.h"
#include "intksort.h"
#include "adaptor.h"
#include "async_schedule.h"
#include "../mng/block_prefetcher.h"
#include "../mng/buf_writer.h"
#include "run_cursor.h"
#include "loosertree.h"
//#define SORT_OPT_PREFETCHING
//#define INTERLEAVED_ALLOC
__STXXL_BEGIN_NAMESPACE
namespace sort_local
{
template <typename BIDTp_,typename ValTp_>
struct trigger_entry
{
typedef BIDTp_ bid_type;
typedef ValTp_ value_type;
bid_type bid;
value_type value;
operator bid_type()
{
return bid;
};
};
template <typename BIDTp_,typename ValTp_,typename ValueCmp_>
struct trigger_entry_cmp
{
typedef trigger_entry<BIDTp_,ValTp_> trigger_entry_type;
ValueCmp_ cmp;
trigger_entry_cmp(ValueCmp_ c): cmp(c) { }
trigger_entry_cmp(const trigger_entry_cmp & a): cmp(a.cmp) { }
bool operator ()(const trigger_entry_type & a,const trigger_entry_type & b) const
{
return cmp(a.value, b.value);
};
};
template <typename block_type,
typename prefetcher_type,
typename value_cmp>
struct run_cursor2_cmp
{
typedef run_cursor2<block_type,prefetcher_type> cursor_type;
value_cmp cmp;
run_cursor2_cmp(value_cmp c):cmp(c) { }
run_cursor2_cmp(const run_cursor2_cmp & a):cmp(a.cmp) { }
inline bool operator () (const cursor_type & a, const cursor_type & b)
{
if (UNLIKELY (b.empty ()))
return true; // sentinel emulation
if (UNLIKELY (a.empty ()))
return false; //sentinel emulation
return (cmp(a.current (),b.current()));
};
};
template <
typename block_type,
typename run_type,
typename input_bid_iterator,
typename value_cmp>
void
create_runs(
input_bid_iterator it,
run_type ** runs,
int nruns,
int _m,
value_cmp cmp)
{
typedef typename block_type::value_type type;
typedef typename block_type::bid_type bid_type;
int m2 = _m / 2;
block_manager *bm = block_manager::get_instance ();
block_type *Blocks1 = new block_type[m2];
block_type *Blocks2 = new block_type[m2];
request **read_reqs = new request *[m2];
request **write_reqs = new request *[m2];
bid_type * bids = new bid_type[m2];
run_type * run;
// STXXL_MSG("Creating runs")
disk_queues::get_instance ()->set_priority_op(disk_queue::READ);
int i;
int k = 0;
int run_size = 0;
// warning: pverlapping is not perfect
for(; LIKELY (k < nruns); k++)
{
run = runs[k];
run_size = run->size ();
for(i = 0; i < run_size; i++)
{
bids[i] = *(it++);
Blocks1[i].read(bids[i], read_reqs[i]);
}
for (i = 0; i < run_size; i++)
bm->delete_block(bids[i]);
mc::waitdel_all(read_reqs, run_size);
if(block_type::has_filler)
std::sort(
TwoToOneDimArrayRowAdaptor< Block,Block::type,Block::size > (Blocks1,0),
TwoToOneDimArrayRowAdaptor< Block,Block::type,Block::size > (Blocks1, run_size*Block::size )
);
else
std::sort(Blocks1[0].elem, Blocks1[m2].elem, cmp);
if (LIKELY (k))
mc::waitdel_all (write_reqs, m2);
for (i = 0; i < run_size; i++)
{
(*run)[i].value = Blocks1[i][0];
Blocks1[i].write ((*run)[i].bid, write_reqs[i]);
}
std::swap (Blocks1, Blocks2);
}
mc::waitdel_all (write_reqs, run_size);
delete [] Blocks1;
delete [] Blocks2;
delete [] read_reqs;
delete [] write_reqs;
delete [] bids;
}
template < typename block_type,typename run_type , typename value_cmp>
void merge_runs(run_type ** in_runs, int nruns, run_type * out_run,unsigned _m,value_cmp cmp)
{
typedef typename block_type::bid_type bid_type;
typedef typename block_type::value_type value_type;
typedef block_prefetcher<block_type,typename run_type::iterator> prefetcher_type;
typedef run_cursor2<block_type,prefetcher_type> run_cursor_type;
typedef run_cursor2_cmp<block_type,prefetcher_type,value_cmp> run_cursor2_cmp_type;
int i;
run_type consume_seq(out_run->size());
int * prefetch_seq = new int[out_run->size()];
typename run_type::iterator copy_start = consume_seq.begin ();
for (i = 0; i < nruns; i++)
{
// TODO: try to avoid copy
copy_start = std::copy(
in_runs[i]->begin (),
in_runs[i]->end (),
copy_start );
}
std::sort(consume_seq.begin (), consume_seq.end (),
trigger_entry_cmp<bid_type,value_type,value_cmp>(cmp));
int disks_number = config::get_instance ()->disks_number ();
#ifdef PLAY_WITH_OPT_PREF
const int n_write_buffers = 4 * disks_number;
#else
const int n_prefetch_buffers = std::max( 2 * disks_number , (3 * (int(_m) - nruns) / 4));
const int n_write_buffers = std::max( 2 * disks_number , int(_m) - nruns - n_prefetch_buffers );
// heuristic
const int n_opt_prefetch_buffers = 2 * disks_number + (3*(n_prefetch_buffers - 2 * disks_number))/10;
#endif
#ifdef SORT_OPT_PREFETCHING
compute_prefetch_schedule(
consume_seq,
prefetch_seq,
n_opt_prefetch_buffers,
disks_number );
#else
for(i=0;i<out_run->size();i++)
prefetch_seq[i] = i;
#endif
prefetcher_type prefetcher( consume_seq.begin(),
consume_seq.end(),
prefetch_seq,
nruns + n_prefetch_buffers);
buffered_writer<block_type> writer(n_write_buffers,n_write_buffers/2);
int out_run_size = out_run->size ();
looser_tree<
run_cursor_type,
run_cursor2_cmp_type,
block_type::size> loosers (&prefetcher, nruns,run_cursor2_cmp_type(cmp));
block_type *out_buffer = writer.get_free_block();
for (i = 0; i < out_run_size; i++)
{
loosers.multi_merge(out_buffer->elem);
(*out_run)[i].value = *(out_buffer->elem);
out_buffer = writer.write(out_buffer,(*out_run)[i].bid);
}
delete [] prefetch_seq;
block_manager *bm = block_manager::get_instance ();
for (i = 0; i < nruns; i++)
{
unsigned sz = in_runs[i]->size ();
for (unsigned j = 0; j < sz; j++)
bm->delete_block ((*in_runs[i])[j].bid);
}
}
template <typename block_type,
typename alloc_strategy,
typename input_bid_iterator,
typename value_cmp>
simple_vector< trigger_entry<typename block_type::bid_type,typename block_type::value_type> > *
sort_blocks(input_bid_iterator input_bids,unsigned _n,unsigned _m,value_cmp cmp)
{
typedef typename block_type::value_type type;
typedef typename block_type::bid_type bid_type;
typedef trigger_entry< bid_type,type > trigger_entry_type;
typedef simple_vector< trigger_entry_type > run_type;
typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
unsigned int m2 = _m / 2;
unsigned int full_runs = _n / m2;
unsigned int partial_runs = ((_n % m2) ? 1 : 0);
unsigned int nruns = full_runs + partial_runs;
unsigned int i;
config *cfg = config::get_instance ();
block_manager *mng = block_manager::get_instance ();
int ndisks = cfg->disks_number ();
//STXXL_MSG ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+"
// << partial_runs)
#ifdef STXXL_IO_STATS
stats *iostats = stats::get_instance ();
iostats->reset ();
#endif
double begin =
stxxl_timestamp (), after_runs_creation, end;
run_type **runs = new run_type *[nruns];
for (i = 0; i < full_runs; i++)
runs[i] = new run_type (m2);
#ifdef INTERLEAVED_ALLOC
if (partial_runs)
{
unsigned int last_run_size = _n - full_runs * m2;
runs[i] = new run_type (last_run_size);
mng->new_blocks (interleaved_alloc_strategy (nruns, 0, ndisks),
RunsToBIDArrayAdaptor2 < block_type::raw_size,run_type >
(runs, 0, nruns, last_run_size),
RunsToBIDArrayAdaptor2 < block_type::raw_size,run_type >
(runs, _n, nruns, last_run_size));
}
else
mng->new_blocks (interleaved_alloc_strategy (nruns, 0, ndisks),
RunsToBIDArrayAdaptor < block_type::raw_size,run_type >
(runs, 0, nruns),
RunsToBIDArrayAdaptor < block_type::raw_size,run_type >
(runs, _n, nruns));
#else
if (partial_runs)
runs[i] = new run_type (_n - full_runs * m2);
for(i=0;i<nruns;i++)
{
mng->new_blocks( alloc_strategy(0,ndisks),
trigger_entry_iterator<trigger_entry_type,block_type::raw_size>(runs[i]->begin()),
trigger_entry_iterator<trigger_entry_type,block_type::raw_size>(runs[i]->end()) );
}
#endif
create_runs< block_type,
run_type,
input_bid_iterator,
value_cmp > (input_bids, runs, nruns,_m,cmp);
after_runs_creation = stxxl_timestamp ();
#ifdef COUNT_WAIT_TIME
double io_wait_after_rf = stxxl::wait_time_counter;
#endif
disk_queues::get_instance ()->set_priority_op (disk_queue::WRITE);
unsigned int full_runsize = m2;
run_type **new_runs;
while (nruns > 1)
{
full_runsize = full_runsize * _m;
unsigned int new_full_runs = _n / full_runsize;
unsigned int new_partial_runs = ((_n % full_runsize) ? 1 : 0);
unsigned int new_nruns = new_full_runs + new_partial_runs;
new_runs = new run_type *[new_nruns];
for (i = 0; i < new_full_runs; i++)
new_runs[i] = new run_type (full_runsize);
if (nruns - new_full_runs * _m == 1)
{
// case when one partial run is left to be sorted
// STXXL_MSG("case when one partial run is left to be sorted")
new_runs[i] = new run_type (_n - full_runsize * new_full_runs);
run_type *tmp = runs[new_full_runs * _m];
std::copy (tmp->begin (), tmp->end (), new_runs[i]->begin ());
mng->new_blocks (interleaved_alloc_strategy
(new_nruns - 1, 0, ndisks),
RunsToBIDArrayAdaptor <
block_type::raw_size,run_type > (new_runs, 0,
new_nruns - 1),
RunsToBIDArrayAdaptor <
block_type::raw_size,run_type > (new_runs,
new_full_runs *
full_runsize,
new_nruns - 1));
for (i = 0; i < new_full_runs; i++)
{
merge_runs<block_type,run_type>(runs + i * _m, _m,*(new_runs + i),_m, cmp);
}
}
else
{
//allocate output blocks
if (new_partial_runs)
{
unsigned int last_run_size = _n - full_runsize * new_full_runs;
new_runs[i] = new run_type (last_run_size);
mng->new_blocks (interleaved_alloc_strategy
(new_nruns, 0, ndisks),
RunsToBIDArrayAdaptor2 <
block_type::raw_size,run_type > (new_runs,
0,
new_nruns,
last_run_size),
RunsToBIDArrayAdaptor2 <
block_type::raw_size,run_type > (new_runs,
_n,
new_nruns,
last_run_size));
}
else
mng->new_blocks (interleaved_alloc_strategy
(new_nruns, 0, ndisks),
RunsToBIDArrayAdaptor <
block_type::raw_size,run_type > (new_runs,
0,
new_nruns),
RunsToBIDArrayAdaptor <
block_type::raw_size,run_type > (new_runs,
_n,
new_nruns));
// STXXL_MSG("Output runs:" << new_nruns << "=" << new_full_runs << "+" << new_partial_runs)
for (i = 0; i < new_full_runs; i++)
{
// STXXL_MSG("Merge of m ("<< _m <<") runs")
merge_runs<block_type,run_type>(runs + i * _m, _m, *(new_runs + i),_m,cmp);
}
if (new_partial_runs)
{
// STXXL_MSG("Partial merge of "<< (nruns - i*_m) <<" runs")
merge_runs<block_type,run_type>(runs + i * _m, nruns - i * _m,*(new_runs + i),_m,cmp);
}
}
nruns = new_nruns;
delete [] runs;
runs = new_runs;
}
run_type * result = *runs;
delete [] runs;
end = stxxl_timestamp ();
STXXL_MSG ("Elapsed time : " << end - begin << " s. Run creation time: " <<
after_runs_creation - begin << " s")
#ifdef STXXL_IO_STATS
STXXL_MSG ("reads : " << iostats->get_reads ())
STXXL_MSG ("writes : " << iostats->get_writes ())
STXXL_MSG ("read time : " << iostats->get_read_time () << " s")
STXXL_MSG ("write time : " << iostats->get_write_time () <<" s")
STXXL_MSG ("parallel read time : " << iostats->get_pread_time () << " s")
STXXL_MSG ("parallel write time : " << iostats->get_pwrite_time () << " s")
STXXL_MSG ("parallel io time : " << iostats->get_pio_time () << " s")
#endif
#ifdef COUNT_WAIT_TIME
STXXL_MSG ("Time in I/O wait(rf): " << io_wait_after_rf << " s")
STXXL_MSG ("Time in I/O wait : " << stxxl::wait_time_counter << " s")
#endif
return result;
}
};
//! \brief External sorting routine for records with defined operator <
//! \param first object of model of \c ext_random_access_iterator concept
//! \param last object of model of \c ext_random_access_iterator concept
//! \param cmp comparison object
//! \param M amount of buffers for internal use
//! \return comparison object \c cmp after being applied
//! \remark Implements external merge sort described in [Dementiev & Sanders'03]
//! \remark non-stable
template <typename ExtIterator_,typename StrictWeakOrdering_>
void sort(ExtIterator_ first, ExtIterator_ last,StrictWeakOrdering_ cmp,unsigned M)
{
typedef simple_vector< sort_local::trigger_entry<typename ExtIterator_::bid_type,
typename ExtIterator_::vector_type::value_type> > run_type;
typedef typename ExtIterator_::vector_type::value_type value_type;
typedef typename ExtIterator_::block_type block_type;
unsigned n=0;
block_manager *mng = block_manager::get_instance ();
first.flush();
if((last - first)*sizeof(value_type) < M)
{
STXXL_ERRMSG("In-memory sort, not implemented")
abort();
}
else
{
if(first.block_offset())
{
if(last.block_offset()) // first and last element reside
// not in the beginning of the block
{
STXXL_ERRMSG("Sorting for non-aligned iterators is not implemented")
abort();
}
else
{
// first element resides
// not in the beginning of the block
STXXL_ERRMSG("Sorting for non-aligned iterators is not implemented")
abort();
}
}
else
{
if(last.block_offset()) // last element resides
// not in the beginning of the block
{
STXXL_ERRMSG("Sorting for non-aligned iterators is not implemented")
abort();
}
else
{
// first and last element resine in the beginning of blocks
n = last.bid() - first.bid();
run_type * out =
sort_local::sort_blocks< typename ExtIterator_::block_type,
typename ExtIterator_::vector_type::alloc_strategy,
typename ExtIterator_::bids_container_iterator >
(first.bid(),n,M/block_type::raw_size,cmp);
run_type::iterator it = out->begin();
typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
for(;cur_bid != last.bid(); cur_bid++,it++)
{
*cur_bid = (*it).bid;
}
}
}
}
};
__STXXL_END_NAMESPACE
#endif