use crate::{array::*, buffer::MutableBuffer};
use crate::{
datatypes::DataType,
error::{ArrowError, Result},
};
fn utf8_substring<O: Offset>(array: &Utf8Array<O>, start: O, length: &Option<O>) -> Utf8Array<O> {
let validity = array.validity();
let offsets = array.offsets();
let values = array.values();
let mut new_offsets = MutableBuffer::<O>::with_capacity(array.len() + 1);
let mut new_values = MutableBuffer::<u8>::new();
let mut length_so_far = O::zero();
new_offsets.push(length_so_far);
offsets.windows(2).for_each(|windows| {
let length_i: O = windows[1] - windows[0];
let start = windows[0]
+ if start >= O::zero() {
start
} else {
length_i + start
};
let start = start.max(windows[0]).min(windows[1]);
let length: O = length
.unwrap_or(length_i)
.min(windows[1] - start); length_so_far += length;
new_offsets.push(length_so_far);
let start = start.to_usize();
let length = length.to_usize();
new_values.extend_from_slice(&values[start..start + length]);
});
Utf8Array::<O>::from_data(
array.data_type().clone(),
new_offsets.into(),
new_values.into(),
validity.cloned(),
)
}
fn binary_substring<O: Offset>(
array: &BinaryArray<O>,
start: O,
length: &Option<O>,
) -> BinaryArray<O> {
let validity = array.validity();
let offsets = array.offsets();
let values = array.values();
let mut new_offsets = MutableBuffer::<O>::with_capacity(array.len() + 1);
let mut new_values = MutableBuffer::<u8>::new();
let mut length_so_far = O::zero();
new_offsets.push(length_so_far);
offsets.windows(2).for_each(|windows| {
let length_i: O = windows[1] - windows[0];
let start = windows[0]
+ if start >= O::zero() {
start
} else {
length_i + start
};
let start = start.max(windows[0]).min(windows[1]);
let length: O = length
.unwrap_or(length_i)
.min(windows[1] - start); length_so_far += length;
new_offsets.push(length_so_far);
let start = start.to_usize();
let length = length.to_usize();
new_values.extend_from_slice(&values[start..start + length]);
});
BinaryArray::<O>::from_data(
array.data_type().clone(),
new_offsets.into(),
new_values.into(),
validity.cloned(),
)
}
pub fn substring(array: &dyn Array, start: i64, length: &Option<u64>) -> Result<Box<dyn Array>> {
match array.data_type() {
DataType::Binary => Ok(Box::new(binary_substring(
array
.as_any()
.downcast_ref::<BinaryArray<i32>>()
.expect("A binary is expected"),
start as i32,
&length.map(|e| e as i32),
))),
DataType::LargeBinary => Ok(Box::new(binary_substring(
array
.as_any()
.downcast_ref::<BinaryArray<i64>>()
.expect("A large binary is expected"),
start,
&length.map(|e| e as i64),
))),
DataType::LargeUtf8 => Ok(Box::new(utf8_substring(
array
.as_any()
.downcast_ref::<Utf8Array<i64>>()
.expect("A large string is expected"),
start,
&length.map(|e| e as i64),
))),
DataType::Utf8 => Ok(Box::new(utf8_substring(
array
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.expect("A string is expected"),
start as i32,
&length.map(|e| e as i32),
))),
_ => Err(ArrowError::InvalidArgumentError(format!(
"substring does not support type {:?}",
array.data_type()
))),
}
}
pub fn can_substring(data_type: &DataType) -> bool {
matches!(
data_type,
DataType::LargeUtf8 | DataType::Utf8 | DataType::LargeBinary | DataType::Binary
)
}