use std::io::{BufReader, Read, Seek};
use std::sync::Arc;
use serde_json::Value;
use crate::array::*;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
use super::{deserialize::read, infer_json_schema_from_seekable, util::ValueIter};
#[derive(Debug)]
struct Decoder {
schema: Arc<Schema>,
projection: Option<Vec<String>>,
batch_size: usize,
}
impl Decoder {
pub fn new(schema: Arc<Schema>, batch_size: usize, projection: Option<Vec<String>>) -> Self {
let schema = match &projection {
Some(projection) => {
let fields = schema.fields();
let projected_fields: Vec<Field> = fields
.iter()
.filter_map(|field| {
if projection.contains(field.name()) {
Some(field.clone())
} else {
None
}
})
.collect();
Arc::new(Schema::new(projected_fields))
}
None => schema,
};
Self {
schema,
projection,
batch_size,
}
}
pub fn schema(&self) -> &Arc<Schema> {
&self.schema
}
pub fn next_batch<I>(&self, value_iter: &mut I) -> Result<Option<RecordBatch>>
where
I: Iterator<Item = Result<Value>>,
{
let rows = value_iter
.take(self.batch_size)
.map(|value| {
let v = value?;
match v {
Value::Object(_) => Ok(v),
_ => Err(ArrowError::ExternalFormat(format!(
"Row needs to be of type object, got: {:?}",
v
))),
}
})
.collect::<Result<Vec<_>>>()?;
let rows = rows.iter().collect::<Vec<_>>();
if rows.is_empty() {
return Ok(None);
}
let projection = self.projection.clone().unwrap_or_else(Vec::new);
let projected_fields = if projection.is_empty() {
self.schema.fields().to_vec()
} else {
projection
.iter()
.map(|name| self.schema.column_with_name(name).map(|x| x.1.clone()))
.flatten()
.collect()
};
let data_type = DataType::Struct(projected_fields.clone());
let array = read(&rows, data_type);
let array = array.as_any().downcast_ref::<StructArray>().unwrap();
let arrays = array.values().to_vec();
let projected_schema = Arc::new(Schema::new(projected_fields));
RecordBatch::try_new(projected_schema, arrays).map(Some)
}
}
#[derive(Debug)]
pub struct Reader<R: Read> {
reader: BufReader<R>,
decoder: Decoder,
}
impl<R: Read> Reader<R> {
pub fn new(
reader: R,
schema: Arc<Schema>,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Self {
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
}
pub fn from_buf_reader(
reader: BufReader<R>,
schema: Arc<Schema>,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Self {
Self {
reader,
decoder: Decoder::new(schema, batch_size, projection),
}
}
pub fn schema(&self) -> &Arc<Schema> {
self.decoder.schema()
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<Option<RecordBatch>> {
self.decoder
.next_batch(&mut ValueIter::new(&mut self.reader, None))
}
}
#[derive(Debug)]
pub struct ReaderBuilder {
schema: Option<Arc<Schema>>,
max_records: Option<usize>,
batch_size: usize,
projection: Option<Vec<String>>,
}
impl Default for ReaderBuilder {
fn default() -> Self {
Self {
schema: None,
max_records: None,
batch_size: 1024,
projection: None,
}
}
}
impl ReaderBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_schema(mut self, schema: Arc<Schema>) -> Self {
self.schema = Some(schema);
self
}
pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
self.schema = None;
self.max_records = max_records;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_projection(mut self, projection: Vec<String>) -> Self {
self.projection = Some(projection);
self
}
pub fn build<R>(self, source: R) -> Result<Reader<R>>
where
R: Read + Seek,
{
let mut buf_reader = BufReader::new(source);
let schema = match self.schema {
Some(schema) => schema,
None => Arc::new(infer_json_schema_from_seekable(
&mut buf_reader,
self.max_records,
)?),
};
Ok(Reader::from_buf_reader(
buf_reader,
schema,
self.batch_size,
self.projection,
))
}
}