use std::convert::TryFrom;
use crate::{FlightData, SchemaResult};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result};
use arrow::ipc::{convert, reader, writer};
use arrow::record_batch::RecordBatch;
impl From<&RecordBatch> for FlightData {
fn from(batch: &RecordBatch) -> Self {
let (header, body) = writer::record_batch_to_bytes(batch);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: header,
data_body: body,
}
}
}
impl From<&Schema> for SchemaResult {
fn from(schema: &Schema) -> Self {
Self {
schema: writer::schema_to_bytes(schema),
}
}
}
impl From<&Schema> for FlightData {
fn from(schema: &Schema) -> Self {
let schema = writer::schema_to_bytes(schema);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: schema,
data_body: vec![],
}
}
}
impl TryFrom<&FlightData> for Schema {
type Error = ArrowError;
fn try_from(data: &FlightData) -> Result<Self> {
convert::schema_from_bytes(&data.data_header[..]).ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert flight data to Arrow schema".to_string(),
)
})
}
}
impl TryFrom<&SchemaResult> for Schema {
type Error = ArrowError;
fn try_from(data: &SchemaResult) -> Result<Self> {
convert::schema_from_bytes(&data.schema[..]).ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert schema result to Arrow schema".to_string(),
)
})
}
}
pub fn flight_data_to_batch(
data: &FlightData,
schema: SchemaRef,
) -> Result<Option<RecordBatch>> {
let message = arrow::ipc::get_root_as_message(&data.data_header[..]);
let dictionaries_by_field = Vec::new();
let batch_header = message.header_as_record_batch().ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert flight data header to a record batch".to_string(),
)
})?;
reader::read_record_batch(
&data.data_body,
batch_header,
schema,
&dictionaries_by_field,
)
}