use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use re_log_types::EntityPath;
use re_types_core::ChunkId;
use crate::{
ArrowBatchMetadata, SorbetColumnDescriptors, SorbetError, TimestampMetadata, migrate_schema_ref,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SorbetSchema {
pub columns: SorbetColumnDescriptors,
pub chunk_id: Option<ChunkId>,
pub entity_path: Option<EntityPath>,
pub partition_id: Option<String>,
pub heap_size_bytes: Option<u64>,
pub timestamps: TimestampMetadata,
}
impl SorbetSchema {
pub(crate) const METADATA_KEY_VERSION: &'static str = "sorbet:version";
pub(crate) const METADATA_VERSION: semver::Version = semver::Version::new(0, 1, 1);
}
impl SorbetSchema {
#[inline]
pub fn with_heap_size_bytes(mut self, heap_size_bytes: u64) -> Self {
self.heap_size_bytes = Some(heap_size_bytes);
self
}
pub fn chunk_id_metadata(chunk_id: &ChunkId) -> (String, String) {
("rerun:id".to_owned(), chunk_id.to_string())
}
pub fn entity_path_metadata(entity_path: &EntityPath) -> (String, String) {
("rerun:entity_path".to_owned(), entity_path.to_string())
}
pub fn partition_id_metadata(partition_id: impl AsRef<str>) -> (String, String) {
(
"rerun:partition_id".to_owned(),
partition_id.as_ref().to_owned(),
)
}
pub fn arrow_batch_metadata(&self) -> ArrowBatchMetadata {
let Self {
columns: _,
chunk_id,
entity_path,
heap_size_bytes,
partition_id,
timestamps,
} = self;
[
Some((
Self::METADATA_KEY_VERSION.to_owned(),
Self::METADATA_VERSION.to_string(),
)),
chunk_id.as_ref().map(Self::chunk_id_metadata),
entity_path.as_ref().map(Self::entity_path_metadata),
partition_id.as_ref().map(Self::partition_id_metadata),
heap_size_bytes.as_ref().map(|heap_size_bytes| {
(
"rerun:heap_size_bytes".to_owned(),
heap_size_bytes.to_string(),
)
}),
]
.into_iter()
.flatten()
.chain(timestamps.to_metadata())
.collect()
}
}
impl From<SorbetSchema> for SorbetColumnDescriptors {
#[inline]
fn from(sorbet_schema: SorbetSchema) -> Self {
sorbet_schema.columns
}
}
impl SorbetSchema {
pub fn to_arrow(&self, batch_type: crate::BatchType) -> ArrowSchema {
ArrowSchema {
metadata: self.arrow_batch_metadata(),
fields: self.columns.arrow_fields(batch_type).into(),
}
}
}
impl SorbetSchema {
pub fn try_from_raw_arrow_schema(arrow_schema: ArrowSchemaRef) -> Result<Self, SorbetError> {
Self::try_from_migrated_arrow_schema(&migrate_schema_ref(arrow_schema))
}
#[tracing::instrument(level = "trace", skip_all)]
pub(crate) fn try_from_migrated_arrow_schema(
arrow_schema: &ArrowSchema,
) -> Result<Self, SorbetError> {
debug_assert!(
!arrow_schema.metadata.contains_key("rerun.id"),
"The schema should not contain the legacy 'rerun.id' key, because it should have already been migrated to 'rerun:id'."
);
let ArrowSchema { metadata, fields } = arrow_schema;
let entity_path = metadata
.get("rerun:entity_path")
.map(|s| EntityPath::parse_forgiving(s));
let columns = SorbetColumnDescriptors::try_from_arrow_fields(entity_path.as_ref(), fields)?;
let chunk_id = if let Some(chunk_id_str) = metadata.get("rerun:id") {
Some(chunk_id_str.parse().map_err(|err| {
SorbetError::ChunkIdDeserializationError(format!(
"Failed to deserialize chunk id {chunk_id_str:?}: {err}"
))
})?)
} else {
None
};
let heap_size_bytes = if let Some(heap_size_bytes) = metadata.get("rerun:heap_size_bytes") {
heap_size_bytes
.parse()
.map_err(|err| {
re_log::warn_once!(
"Failed to parse heap_size_bytes {heap_size_bytes:?} in chunk: {err}"
);
})
.ok()
} else {
None
};
let partition_id = metadata.get("rerun:partition_id").map(|s| s.to_owned());
if let Some(batch_version) = metadata.get(Self::METADATA_KEY_VERSION)
&& batch_version != &Self::METADATA_VERSION.to_string()
{
re_log::warn_once!(
"Sorbet batch version mismatch. Expected {}, got {batch_version:?}",
Self::METADATA_VERSION
);
}
Ok(Self {
columns,
chunk_id,
entity_path,
partition_id,
heap_size_bytes,
timestamps: TimestampMetadata::parse_record_batch_metadata(metadata),
})
}
}