[go: up one dir, main page]

Menu

[8f591a]: / algo / sort.h.back  Maximize  Restore  History

Download this file

547 lines (439 with data), 15.0 kB

#ifndef SORT_HEADER
#define SORT_HEADER
/***************************************************************************
 *            sort.h
 *
 *  Fri Jan 17 11:26:42 2003
 *  Copyright  2003  Roman Dementiev
 *  dementiev@mpi-sb.mpg.de
 ****************************************************************************/

#include <list>

#include "../mng/mng.h"
#include "../common/rand.h"
#include "../mng/adaptor.h"
#include "../common/simple_vector.h"
#include "../common/switch.h"
#include "interleaved_alloc.h"
#include "intksort.h"
#include "adaptor.h"
#include "async_schedule.h"
#include "../mng/block_prefetcher.h"
#include "../mng/buf_writer.h"
#include "run_cursor.h"
#include "loosertree.h"


//#define SORT_OPT_PREFETCHING
//#define INTERLEAVED_ALLOC


__STXXL_BEGIN_NAMESPACE

namespace sort_local
{
	template <typename BIDTp_,typename ValTp_>
	struct trigger_entry
	{
		typedef BIDTp_ bid_type;
		typedef ValTp_ value_type;

		bid_type bid;
		value_type value;
	
		operator bid_type()
		{
			return bid;
		};
	};
	
	template <typename BIDTp_,typename ValTp_,typename ValueCmp_>
	struct trigger_entry_cmp
	{
		typedef trigger_entry<BIDTp_,ValTp_> trigger_entry_type;
		ValueCmp_ cmp;
		trigger_entry_cmp(ValueCmp_ c): cmp(c) { }
		trigger_entry_cmp(const trigger_entry_cmp & a): cmp(a.cmp) { }
		bool operator ()(const trigger_entry_type & a,const trigger_entry_type & b) const
		{
			return cmp(a.value, b.value);
		};
	};
	
	template <typename block_type,
					  typename prefetcher_type,
						typename value_cmp>
	struct run_cursor2_cmp
	{
		typedef run_cursor2<block_type,prefetcher_type> cursor_type;
		value_cmp cmp;
		
		run_cursor2_cmp(value_cmp c):cmp(c) {	}
		run_cursor2_cmp(const run_cursor2_cmp & a):cmp(a.cmp) { }
		inline bool operator  () (const cursor_type & a, const cursor_type & b)
		{
			if (UNLIKELY (b.empty ()))
				return true;	// sentinel emulation
			if (UNLIKELY (a.empty ()))
				return false;	//sentinel emulation

			return (cmp(a.current (),b.current()));
		};
	};

	
template <
					typename block_type,
					typename run_type,
					typename input_bid_iterator,
					typename value_cmp>
void
create_runs(
		input_bid_iterator it,
		run_type ** runs,
		int nruns,
		int _m,
		value_cmp cmp)
{
	typedef typename block_type::value_type type;
	typedef typename block_type::bid_type bid_type;
	
	int m2 = _m / 2;
	block_manager *bm = block_manager::get_instance ();
	block_type *Blocks1 = new block_type[m2];
	block_type *Blocks2 = new block_type[m2];
	request **read_reqs = new request *[m2];
	request **write_reqs = new request *[m2];
	bid_type * bids = new bid_type[m2];
	run_type * run;

	//  STXXL_MSG("Creating runs")
	disk_queues::get_instance ()->set_priority_op(disk_queue::READ);

	int i;
	int k = 0;
	int run_size = 0;
	
	// warning: pverlapping is not perfect
	for(; LIKELY (k < nruns); k++)
	{
		run = runs[k];
		run_size = run->size ();

		for(i = 0; i < run_size; i++)
		{
			bids[i] = *(it++);
			Blocks1[i].read(bids[i], read_reqs[i]);
		}

		for (i = 0; i < run_size; i++)
			bm->delete_block(bids[i]);

		mc::waitdel_all(read_reqs, run_size);

		if(block_type::has_filler)
		      std::sort(
		              TwoToOneDimArrayRowAdaptor< Block,Block::type,Block::size > (Blocks1,0),
		              TwoToOneDimArrayRowAdaptor< Block,Block::type,Block::size > (Blocks1, run_size*Block::size )
		              );
		else
			std::sort(Blocks1[0].elem, Blocks1[m2].elem, cmp);

		if (LIKELY (k))
			mc::waitdel_all (write_reqs, m2);

		for (i = 0; i < run_size; i++)
		{
			(*run)[i].value = Blocks1[i][0];
			Blocks1[i].write ((*run)[i].bid, write_reqs[i]);
		}
		std::swap (Blocks1, Blocks2);
	}

	mc::waitdel_all (write_reqs, run_size);


	delete [] Blocks1;
	delete [] Blocks2;
	delete [] read_reqs;
	delete [] write_reqs;
	delete [] bids;
	
}

	
	
template < typename block_type,typename run_type , typename value_cmp>
void merge_runs(run_type ** in_runs, int nruns, run_type * out_run,unsigned  _m,value_cmp cmp)
{
	typedef typename block_type::bid_type bid_type;
	typedef typename block_type::value_type value_type;
	typedef block_prefetcher<block_type,typename run_type::iterator> prefetcher_type;
	typedef run_cursor2<block_type,prefetcher_type> run_cursor_type;
	typedef run_cursor2_cmp<block_type,prefetcher_type,value_cmp> run_cursor2_cmp_type;
	
	int i;
	run_type consume_seq(out_run->size());

	int * prefetch_seq = new int[out_run->size()];

	typename run_type::iterator copy_start = consume_seq.begin ();
	for (i = 0; i < nruns; i++)
	{
		// TODO: try to avoid copy
		copy_start = std::copy(
						in_runs[i]->begin (),
						in_runs[i]->end (),
						copy_start	);
	}
	
	std::sort(consume_seq.begin (), consume_seq.end (),
					trigger_entry_cmp<bid_type,value_type,value_cmp>(cmp));

	int disks_number = config::get_instance ()->disks_number ();
	
	#ifdef PLAY_WITH_OPT_PREF
	const int n_write_buffers = 4 * disks_number;
	#else
	const int n_prefetch_buffers = std::max( 2 * disks_number , (3 * (int(_m) - nruns) / 4));
        const int n_write_buffers = std::max( 2 * disks_number , int(_m) - nruns - n_prefetch_buffers );
	// heuristic
	const int n_opt_prefetch_buffers = 2 * disks_number + (3*(n_prefetch_buffers - 2 * disks_number))/10;
	#endif
	
	#ifdef SORT_OPT_PREFETCHING
	compute_prefetch_schedule(
			consume_seq,
			prefetch_seq,
			n_opt_prefetch_buffers,
			disks_number );
	#else
	for(i=0;i<out_run->size();i++)
		prefetch_seq[i] = i;
	#endif
	
	prefetcher_type prefetcher(	consume_seq.begin(),
															consume_seq.end(),
															prefetch_seq,
															nruns + n_prefetch_buffers);
	
	buffered_writer<block_type> writer(n_write_buffers,n_write_buffers/2);
	
	int out_run_size = out_run->size ();

	looser_tree<
							run_cursor_type,
							run_cursor2_cmp_type,
							block_type::size> loosers (&prefetcher, nruns,run_cursor2_cmp_type(cmp));


	block_type *out_buffer = writer.get_free_block();

	for (i = 0; i < out_run_size; i++)
	{
		loosers.multi_merge(out_buffer->elem);
		(*out_run)[i].value = *(out_buffer->elem);
		out_buffer = writer.write(out_buffer,(*out_run)[i].bid);
	}
	
	delete [] prefetch_seq;

	block_manager *bm = block_manager::get_instance ();
	for (i = 0; i < nruns; i++)
	{
		unsigned sz = in_runs[i]->size ();
		for (unsigned j = 0; j < sz; j++)
			bm->delete_block ((*in_runs[i])[j].bid);
	}
	
}



template <typename block_type,
					typename alloc_strategy,
					typename input_bid_iterator,
					typename value_cmp>
simple_vector< trigger_entry<typename block_type::bid_type,typename block_type::value_type> > * 
	sort_blocks(input_bid_iterator input_bids,unsigned _n,unsigned _m,value_cmp cmp)
{
	typedef typename block_type::value_type type;
	typedef typename block_type::bid_type bid_type;
	typedef trigger_entry< bid_type,type > trigger_entry_type;
	typedef simple_vector< trigger_entry_type > run_type;
	typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
	
	unsigned int m2 = _m / 2;
	unsigned int full_runs = _n / m2;
	unsigned int partial_runs = ((_n % m2) ? 1 : 0);
	unsigned int nruns = full_runs + partial_runs;
	unsigned int i;
	
	config *cfg = config::get_instance ();
	block_manager *mng = block_manager::get_instance ();
	int ndisks = cfg->disks_number ();
	
	//STXXL_MSG ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+"
	//	   << partial_runs) 
	
#ifdef STXXL_IO_STATS
	stats *iostats = stats::get_instance ();
	iostats->reset ();
#endif

	
	double begin =
		stxxl_timestamp (), after_runs_creation, end;

	run_type **runs = new run_type *[nruns];

	for (i = 0; i < full_runs; i++)
		runs[i] = new run_type (m2);

#ifdef INTERLEAVED_ALLOC
	if (partial_runs)
	{
		unsigned int last_run_size = _n - full_runs * m2;
		runs[i] = new run_type (last_run_size);

		mng->new_blocks (interleaved_alloc_strategy (nruns, 0, ndisks),
				 RunsToBIDArrayAdaptor2 < block_type::raw_size,run_type >
				 (runs, 0, nruns, last_run_size),
				 RunsToBIDArrayAdaptor2 < block_type::raw_size,run_type >
				 (runs, _n, nruns, last_run_size));

	}
	else
		mng->new_blocks (interleaved_alloc_strategy (nruns, 0, ndisks),
				 RunsToBIDArrayAdaptor < block_type::raw_size,run_type >
				 (runs, 0, nruns),
				 RunsToBIDArrayAdaptor < block_type::raw_size,run_type >
				 (runs, _n, nruns));
#else
	
		if (partial_runs)
			runs[i] = new run_type (_n - full_runs * m2);
		
		for(i=0;i<nruns;i++)
		{
			mng->new_blocks(	alloc_strategy(0,ndisks),
												trigger_entry_iterator<trigger_entry_type,block_type::raw_size>(runs[i]->begin()),
												trigger_entry_iterator<trigger_entry_type,block_type::raw_size>(runs[i]->end())	);
		}
#endif
	
	create_runs< block_type,
							 run_type,
							 input_bid_iterator,
							 value_cmp > (input_bids, runs, nruns,_m,cmp);

	after_runs_creation = stxxl_timestamp ();
#ifdef COUNT_WAIT_TIME
	double io_wait_after_rf = stxxl::wait_time_counter;
#endif

	disk_queues::get_instance ()->set_priority_op (disk_queue::WRITE);

	unsigned int full_runsize = m2;
	run_type **new_runs;

	while (nruns > 1)
	{
		full_runsize = full_runsize * _m;
		unsigned int new_full_runs = _n / full_runsize;
		unsigned int new_partial_runs = ((_n % full_runsize) ? 1 : 0);
		unsigned int new_nruns = new_full_runs + new_partial_runs;

		new_runs = new run_type *[new_nruns];

		for (i = 0; i < new_full_runs; i++)
			new_runs[i] = new run_type (full_runsize);

		if (nruns - new_full_runs * _m == 1)
		{
			// case when one partial run is left to be sorted
			// STXXL_MSG("case when one partial run is left to be sorted")
			new_runs[i] = new run_type (_n - full_runsize * new_full_runs);
			
			run_type *tmp = runs[new_full_runs * _m];
			std::copy (tmp->begin (), tmp->end (), new_runs[i]->begin ());

			mng->new_blocks (interleaved_alloc_strategy
					 (new_nruns - 1, 0, ndisks),
					 RunsToBIDArrayAdaptor <
					 block_type::raw_size,run_type > (new_runs, 0,
							    new_nruns - 1),
					 RunsToBIDArrayAdaptor <
					 block_type::raw_size,run_type > (new_runs,
							    new_full_runs *
							    full_runsize,
							    new_nruns - 1));

			
			for (i = 0; i < new_full_runs; i++)
			{
				merge_runs<block_type,run_type>(runs + i * _m, _m,*(new_runs + i),_m, cmp);
			}


		}
		else
		{

			//allocate output blocks
			if (new_partial_runs)
			{
				unsigned int last_run_size = _n - full_runsize * new_full_runs;
				new_runs[i] = new run_type (last_run_size);

				mng->new_blocks (interleaved_alloc_strategy
						 (new_nruns, 0, ndisks),
						 RunsToBIDArrayAdaptor2 <
						 block_type::raw_size,run_type > (new_runs,
								    0,
								    new_nruns,
								    last_run_size),
						 RunsToBIDArrayAdaptor2 <
						 block_type::raw_size,run_type > (new_runs,
								    _n,
								    new_nruns,
								    last_run_size));
			}
			else
				mng->new_blocks (interleaved_alloc_strategy
						 (new_nruns, 0, ndisks),
						 RunsToBIDArrayAdaptor <
						 block_type::raw_size,run_type > (new_runs,
								    0,
								    new_nruns),
						 RunsToBIDArrayAdaptor <
						 block_type::raw_size,run_type > (new_runs,
								    _n,
								    new_nruns));


			//      STXXL_MSG("Output runs:" << new_nruns << "=" << new_full_runs << "+" << new_partial_runs)
			for (i = 0; i < new_full_runs; i++)
			{
				//        STXXL_MSG("Merge of m ("<< _m <<") runs")
				merge_runs<block_type,run_type>(runs + i * _m, _m, *(new_runs + i),_m,cmp);
			}

			if (new_partial_runs)
			{
				//      STXXL_MSG("Partial merge of "<< (nruns - i*_m) <<" runs")
				merge_runs<block_type,run_type>(runs + i * _m, nruns - i * _m,*(new_runs + i),_m,cmp);
			}

		}

		nruns = new_nruns;
		delete [] runs;
		runs = new_runs;

	}
	run_type * result = *runs;
	delete [] runs;
	
	
	end = stxxl_timestamp ();

	STXXL_MSG ("Elapsed time        : " << end - begin << " s. Run creation time: " << 
	after_runs_creation - begin << " s")
#ifdef STXXL_IO_STATS
	STXXL_MSG ("reads               : " << iostats->get_reads ()) 
	STXXL_MSG ("writes              : " << iostats->get_writes ())
	STXXL_MSG ("read time           : " << iostats->get_read_time () << " s") 
	STXXL_MSG ("write time          : " << iostats->get_write_time () <<" s")
	STXXL_MSG ("parallel read time  : " << iostats->get_pread_time () << " s")
	STXXL_MSG ("parallel write time : " << iostats->get_pwrite_time () << " s")
	STXXL_MSG ("parallel io time    : " << iostats->get_pio_time () << " s")
#endif
#ifdef COUNT_WAIT_TIME
	STXXL_MSG ("Time in I/O wait(rf): " << io_wait_after_rf << " s")
	STXXL_MSG ("Time in I/O wait    : " << stxxl::wait_time_counter << " s")
#endif
	
	return result;
}

};


//! \brief External sorting routine for records with defined operator <
//! \param first object of model of \c ext_random_access_iterator concept
//! \param last object of model of \c ext_random_access_iterator concept
//! \param cmp comparison object
//! \param M amount of buffers for internal use
//! \return comparison object \c cmp after being applied
//! \remark Implements external merge sort described in [Dementiev & Sanders'03]
//! \remark non-stable
template <typename ExtIterator_,typename StrictWeakOrdering_>
void sort(ExtIterator_ first, ExtIterator_ last,StrictWeakOrdering_ cmp,unsigned M)
{
	typedef simple_vector< sort_local::trigger_entry<typename ExtIterator_::bid_type, 
		typename ExtIterator_::vector_type::value_type> > run_type;
	
	typedef typename ExtIterator_::vector_type::value_type value_type;
	typedef typename ExtIterator_::block_type block_type;
	
	unsigned n=0;
	block_manager *mng = block_manager::get_instance ();
	
	first.flush();
	
	if((last - first)*sizeof(value_type) < M)
	{
		STXXL_ERRMSG("In-memory sort, not implemented")
		abort();
	}
	else
	{
		if(first.block_offset()) 
		{
			if(last.block_offset())   // first and last element reside 
																// not in the beginning of the block
			{
				STXXL_ERRMSG("Sorting for non-aligned iterators is not implemented")
				abort();
			}
			else
			{
				// first element resides
				// not in the beginning of the block
				
				STXXL_ERRMSG("Sorting for non-aligned iterators is not implemented")
				abort();
			}
			
		}
		else
		{
			if(last.block_offset()) // last element resides
																// not in the beginning of the block
			{
				STXXL_ERRMSG("Sorting for non-aligned iterators is not implemented")
				abort();
			}
			else
			{
				// first and last element resine in the beginning of blocks 
				n = last.bid() - first.bid();
				
				run_type * out =
						sort_local::sort_blocks<	typename ExtIterator_::block_type,
													typename ExtIterator_::vector_type::alloc_strategy,
													typename ExtIterator_::bids_container_iterator >
														 (first.bid(),n,M/block_type::raw_size,cmp);
				
				run_type::iterator it = out->begin();
				typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
				
				for(;cur_bid != last.bid(); cur_bid++,it++)
				{
					*cur_bid = (*it).bid;
				}
				
			}
		}
	}
};

__STXXL_END_NAMESPACE


#endif