use std::collections::{BTreeMap, VecDeque};
use std::sync::atomic::AtomicU64;
use arrow2::datatypes::DataType;
use nohash_hasher::IntMap;
use parking_lot::RwLock;
use re_log_types::{
DataCell, DataCellColumn, EntityPath, EntityPathHash, ErasedTimeVec, ResolvedTimeRange, RowId,
RowIdVec, StoreId, TimeInt, TimePoint, Timeline,
};
use re_types_core::{ComponentName, ComponentNameSet, SizeBytes};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DataStoreConfig {
pub indexed_bucket_num_rows: u64,
pub store_insert_ids: bool,
}
impl Default for DataStoreConfig {
#[inline]
fn default() -> Self {
Self::DEFAULT
}
}
impl DataStoreConfig {
pub const DEFAULT: Self = Self {
indexed_bucket_num_rows: 512,
store_insert_ids: cfg!(debug_assertions),
};
}
pub type InsertIdVec = VecDeque<u64>;
#[derive(Debug, Default, Clone)]
pub struct DataTypeRegistry(pub IntMap<ComponentName, DataType>);
impl std::ops::Deref for DataTypeRegistry {
type Target = IntMap<ComponentName, DataType>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for DataTypeRegistry {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Debug, Clone)]
pub struct MetadataRegistry<T: Clone> {
pub registry: BTreeMap<RowId, T>,
pub heap_size_bytes: u64,
}
impl Default for MetadataRegistry<(TimePoint, EntityPathHash)> {
fn default() -> Self {
let mut this = Self {
registry: Default::default(),
heap_size_bytes: 0,
};
this.heap_size_bytes = this.heap_size_bytes(); this
}
}
impl<T: Clone> std::ops::Deref for MetadataRegistry<T> {
type Target = BTreeMap<RowId, T>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.registry
}
}
impl<T: Clone> std::ops::DerefMut for MetadataRegistry<T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.registry
}
}
#[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct StoreGeneration {
insert_id: u64,
gc_id: u64,
}
pub struct DataStore {
pub(crate) id: StoreId,
pub(crate) config: DataStoreConfig,
pub(crate) type_registry: DataTypeRegistry,
pub(crate) metadata_registry: MetadataRegistry<(TimePoint, EntityPathHash)>,
pub(crate) tables: BTreeMap<(EntityPathHash, Timeline), IndexedTable>,
pub(crate) static_tables: BTreeMap<EntityPathHash, StaticTable>,
pub(crate) insert_id: u64,
pub(crate) query_id: AtomicU64,
pub(crate) gc_id: u64,
pub(crate) event_id: AtomicU64,
}
impl Clone for DataStore {
fn clone(&self) -> Self {
Self {
id: self.id.clone(),
config: self.config.clone(),
type_registry: self.type_registry.clone(),
metadata_registry: self.metadata_registry.clone(),
tables: self.tables.clone(),
static_tables: self.static_tables.clone(),
insert_id: Default::default(),
query_id: Default::default(),
gc_id: Default::default(),
event_id: Default::default(),
}
}
}
impl DataStore {
pub fn new(id: StoreId, config: DataStoreConfig) -> Self {
Self {
id,
config,
type_registry: Default::default(),
metadata_registry: Default::default(),
tables: Default::default(),
static_tables: Default::default(),
insert_id: 0,
query_id: AtomicU64::new(0),
gc_id: 0,
event_id: AtomicU64::new(0),
}
}
#[inline]
pub fn id(&self) -> &StoreId {
&self.id
}
pub fn insert_id_component_name() -> ComponentName {
"rerun.controls.InsertId".into()
}
pub fn generation(&self) -> StoreGeneration {
StoreGeneration {
insert_id: self.insert_id,
gc_id: self.gc_id,
}
}
pub fn config(&self) -> &DataStoreConfig {
&self.config
}
pub fn lookup_datatype(&self, component: &ComponentName) -> Option<&DataType> {
self.type_registry.get(component)
}
pub fn oldest_time_per_timeline(&self) -> BTreeMap<Timeline, TimeInt> {
re_tracing::profile_function!();
let mut oldest_time_per_timeline = BTreeMap::default();
for index in self.tables.values() {
if let Some(bucket) = index.buckets.values().next() {
let entry = oldest_time_per_timeline
.entry(bucket.timeline)
.or_insert(TimeInt::MAX);
if let Some(&time) = bucket.inner.read().col_time.front() {
*entry = TimeInt::min(*entry, TimeInt::new_temporal(time));
}
}
}
oldest_time_per_timeline
}
pub fn iter_indices(
&self,
) -> impl ExactSizeIterator<Item = ((EntityPath, Timeline), &IndexedTable)> {
self.tables.iter().map(|((_, timeline), table)| {
((table.entity_path.clone() , *timeline), table)
})
}
}
#[derive(Debug, Clone)]
pub struct IndexedTable {
pub timeline: Timeline,
pub entity_path: EntityPath,
pub buckets: BTreeMap<TimeInt, IndexedBucket>,
pub all_components: ComponentNameSet,
pub buckets_num_rows: u64,
pub buckets_size_bytes: u64,
}
impl IndexedTable {
pub fn new(timeline: Timeline, entity_path: EntityPath) -> Self {
let bucket = IndexedBucket::new(timeline);
let buckets_size_bytes = bucket.total_size_bytes();
Self {
timeline,
entity_path,
buckets: [(TimeInt::MIN, bucket)].into(),
all_components: Default::default(),
buckets_num_rows: 0,
buckets_size_bytes,
}
}
pub(crate) fn uphold_indexing_invariants(&mut self) {
if self.buckets.is_empty() {
let Self {
timeline,
entity_path: _,
buckets,
all_components: _, buckets_num_rows,
buckets_size_bytes,
} = self;
let bucket = IndexedBucket::new(*timeline);
let size_bytes = bucket.total_size_bytes();
*buckets = [(TimeInt::MIN, bucket)].into();
*buckets_num_rows = 0;
*buckets_size_bytes = size_bytes;
}
else if let Some((_, bucket)) = self.buckets.pop_first() {
self.buckets.insert(TimeInt::MIN, bucket);
}
}
}
#[derive(Debug)]
pub struct IndexedBucket {
pub timeline: Timeline,
pub inner: RwLock<IndexedBucketInner>,
}
impl Clone for IndexedBucket {
fn clone(&self) -> Self {
Self {
timeline: self.timeline,
inner: RwLock::new(self.inner.read().clone()),
}
}
}
impl IndexedBucket {
pub(crate) fn new(timeline: Timeline) -> Self {
Self {
timeline,
inner: RwLock::new(IndexedBucketInner::default()),
}
}
}
#[derive(Debug, Clone)]
pub struct IndexedBucketInner {
pub is_sorted: bool,
pub time_range: ResolvedTimeRange,
pub col_time: ErasedTimeVec,
pub col_insert_id: InsertIdVec,
pub col_row_id: RowIdVec,
pub max_row_id: RowId,
pub columns: IntMap<ComponentName, DataCellColumn>,
pub size_bytes: u64,
}
impl Default for IndexedBucketInner {
fn default() -> Self {
let mut this = Self {
is_sorted: true,
time_range: ResolvedTimeRange::EMPTY,
col_time: Default::default(),
col_insert_id: Default::default(),
col_row_id: Default::default(),
max_row_id: RowId::ZERO,
columns: Default::default(),
size_bytes: 0, };
this.compute_size_bytes();
this
}
}
#[derive(Clone)]
pub struct StaticTable {
pub entity_path: EntityPath,
pub cells: BTreeMap<ComponentName, StaticCell>,
}
impl StaticTable {
#[inline]
pub fn new(entity_path: EntityPath) -> Self {
Self {
entity_path,
cells: Default::default(),
}
}
}
#[derive(Clone)]
pub struct StaticCell {
pub insert_id: Option<u64>,
pub row_id: RowId,
pub cell: DataCell,
}