use crate::compute::sort::{build_compare, Compare, SortColumn};
use crate::error::{ArrowError, Result};
use std::cmp::Ordering;
use std::iter::Iterator;
use std::ops::Range;
pub fn lexicographical_partition_ranges<'a>(
columns: &'a [SortColumn],
) -> Result<impl Iterator<Item = Range<usize>> + 'a> {
LexicographicalPartitionIterator::try_new(columns)
}
struct LexicographicalPartitionIterator<'a> {
comparator: Compare<'a>,
num_rows: usize,
previous_partition_point: usize,
partition_point: usize,
value_indices: Vec<usize>,
}
impl<'a> LexicographicalPartitionIterator<'a> {
fn try_new(columns: &'a [SortColumn]) -> Result<Self> {
if columns.is_empty() {
return Err(ArrowError::InvalidArgumentError(
"Sort requires at least one column".to_string(),
));
}
let num_rows = columns[0].values.len();
if columns.iter().any(|item| item.values.len() != num_rows) {
return Err(ArrowError::InvalidArgumentError(
"Lexical sort columns have different row counts".to_string(),
));
};
let comparators = columns
.iter()
.map(|x| build_compare(x.values, x.options.unwrap_or_default()))
.collect::<Result<Vec<_>>>()?;
let comparator = Box::new(move |a_idx: usize, b_idx: usize| -> Ordering {
for comparator in comparators.iter() {
match comparator(a_idx, b_idx) {
Ordering::Equal => continue,
other => return other,
}
}
Ordering::Equal
});
let value_indices = (0..num_rows).collect::<Vec<usize>>();
Ok(Self {
comparator,
num_rows,
previous_partition_point: 0,
partition_point: 0,
value_indices,
})
}
}
impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
type Item = Range<usize>;
fn next(&mut self) -> Option<Self::Item> {
if self.partition_point < self.num_rows {
self.partition_point +=
self.value_indices[self.partition_point..].partition_point(|idx| {
(self.comparator)(*idx, self.partition_point) != Ordering::Greater
});
let start = self.previous_partition_point;
let end = self.partition_point;
self.previous_partition_point = self.partition_point;
Some(Range { start, end })
} else {
None
}
}
}