use crate::compute::SortOptions;
use crate::row::fixed::{FixedLengthEncoding, FromSlice};
use crate::row::interner::{Interned, OrderPreservingInterner};
use crate::row::{null_sentinel, Rows};
use arrow_array::builder::*;
use arrow_array::cast::*;
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::{ArrowNativeType, MutableBuffer, ToByteSlice};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
pub fn compute_dictionary_mapping(
interner: &mut OrderPreservingInterner,
values: &ArrayRef,
) -> Vec<Option<Interned>> {
downcast_primitive_array! {
values => interner
.intern(values.iter().map(|x| x.map(|x| x.encode()))),
DataType::Binary => {
let iter = as_generic_binary_array::<i64>(values).iter();
interner.intern(iter)
}
DataType::LargeBinary => {
let iter = as_generic_binary_array::<i64>(values).iter();
interner.intern(iter)
}
DataType::Utf8 => {
let iter = as_string_array(values).iter().map(|x| x.map(|x| x.as_bytes()));
interner.intern(iter)
}
DataType::LargeUtf8 => {
let iter = as_largestring_array(values).iter().map(|x| x.map(|x| x.as_bytes()));
interner.intern(iter)
}
_ => unreachable!(),
}
}
pub fn encode_dictionary<K: ArrowDictionaryKeyType>(
out: &mut Rows,
column: &DictionaryArray<K>,
normalized_keys: &[Option<&[u8]>],
opts: SortOptions,
) {
for (offset, k) in out.offsets.iter_mut().skip(1).zip(column.keys()) {
match k.and_then(|k| normalized_keys[k.as_usize()]) {
Some(normalized_key) => {
let end_offset = *offset + 1 + normalized_key.len();
out.buffer[*offset] = 1;
out.buffer[*offset + 1..end_offset].copy_from_slice(normalized_key);
if opts.descending {
out.buffer[*offset..end_offset]
.iter_mut()
.for_each(|v| *v = !*v)
}
*offset = end_offset;
}
None => {
out.buffer[*offset] = null_sentinel(opts);
*offset += 1;
}
}
}
}
macro_rules! decode_primitive_helper {
($t:ty, $values: ident, $data_type:ident) => {
decode_primitive::<$t>(&$values, $data_type.clone())
};
}
pub unsafe fn decode_dictionary<K: ArrowDictionaryKeyType>(
interner: &OrderPreservingInterner,
value_type: &DataType,
options: SortOptions,
rows: &mut [&[u8]],
) -> Result<DictionaryArray<K>, ArrowError> {
let len = rows.len();
let mut dictionary: HashMap<Interned, K::Native> = HashMap::with_capacity(len);
let null_sentinel = null_sentinel(options);
let null_terminator = match options.descending {
true => 0xFF,
false => 0_u8,
};
let mut null_builder = BooleanBufferBuilder::new(len);
let mut keys = BufferBuilder::<K::Native>::new(len);
let mut values = Vec::with_capacity(len);
let mut null_count = 0;
let mut key_scratch = Vec::new();
for row in rows {
if row[0] == null_sentinel {
null_builder.append(false);
null_count += 1;
*row = &row[1..];
keys.append(K::Native::default());
continue;
}
let key_offset = row
.iter()
.skip(1)
.position(|x| *x == null_terminator)
.unwrap();
let key = &row[1..key_offset + 2];
*row = &row[key_offset + 2..];
let interned = match options.descending {
true => {
key_scratch.clear();
key_scratch.extend_from_slice(key);
key_scratch.iter_mut().for_each(|o| *o = !*o);
interner.lookup(&key_scratch).unwrap()
}
false => interner.lookup(key).unwrap(),
};
let k = match dictionary.entry(interned) {
Entry::Vacant(v) => {
let k = values.len();
values.push(interner.value(interned));
let key = K::Native::from_usize(k)
.ok_or(ArrowError::DictionaryKeyOverflowError)?;
*v.insert(key)
}
Entry::Occupied(o) => *o.get(),
};
keys.append(k);
null_builder.append(true);
}
let child = downcast_primitive! {
value_type => (decode_primitive_helper, values, value_type),
DataType::Null => NullArray::new(values.len()).into_data(),
DataType::Boolean => decode_bool(&values),
DataType::Utf8 => decode_string::<i32>(&values),
DataType::LargeUtf8 => decode_string::<i64>(&values),
DataType::Binary => decode_binary::<i32>(&values),
DataType::LargeBinary => decode_binary::<i64>(&values),
_ => unreachable!(),
};
let data_type =
DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(value_type.clone()));
let builder = ArrayDataBuilder::new(data_type)
.len(len)
.null_bit_buffer(Some(null_builder.finish()))
.null_count(null_count)
.add_buffer(keys.finish())
.add_child_data(child);
Ok(DictionaryArray::from(builder.build_unchecked()))
}
fn decode_binary<O: OffsetSizeTrait>(values: &[&[u8]]) -> ArrayData {
let capacity = values.iter().map(|x| x.len()).sum();
let mut builder = GenericBinaryBuilder::<O>::with_capacity(values.len(), capacity);
for v in values {
builder.append_value(v)
}
builder.finish().into_data()
}
unsafe fn decode_string<O: OffsetSizeTrait>(values: &[&[u8]]) -> ArrayData {
let d = match O::IS_LARGE {
true => DataType::LargeUtf8,
false => DataType::Utf8,
};
decode_binary::<O>(values)
.into_builder()
.data_type(d)
.build_unchecked()
}
fn decode_bool(values: &[&[u8]]) -> ArrayData {
let mut builder = BooleanBufferBuilder::new(values.len());
for value in values {
builder.append(bool::decode([value[0]]))
}
let builder = ArrayDataBuilder::new(DataType::Boolean)
.len(values.len())
.add_buffer(builder.finish());
unsafe { builder.build_unchecked() }
}
unsafe fn decode_fixed<T: FixedLengthEncoding + ToByteSlice>(
values: &[&[u8]],
data_type: DataType,
) -> ArrayData {
let mut buffer = MutableBuffer::new(std::mem::size_of::<T>() * values.len());
for value in values {
let value = T::Encoded::from_slice(value, false);
buffer.push(T::decode(value))
}
let builder = ArrayDataBuilder::new(data_type)
.len(values.len())
.add_buffer(buffer.into());
builder.build_unchecked()
}
fn decode_primitive<T: ArrowPrimitiveType>(
values: &[&[u8]],
data_type: DataType,
) -> ArrayData
where
T::Native: FixedLengthEncoding,
{
assert_eq!(
std::mem::discriminant(&T::DATA_TYPE),
std::mem::discriminant(&data_type),
);
unsafe { decode_fixed::<T::Native>(values, data_type) }
}