use std::sync::atomic::{AtomicBool, Ordering};
use itertools::Itertools;
use re_data_store::WriteError;
use re_data_store::{
test_row, test_util::sanity_unwrap, DataStore, DataStoreStats, GarbageCollectionOptions,
TimeInt, TimeRange, Timeline,
};
use re_log_types::{build_frame_nr, build_log_time, DataTable, EntityPath, TableId};
use re_types::components::InstanceKey;
use re_types::datagen::{build_some_colors, build_some_instances, build_some_positions2d};
use re_types_core::Loggable as _;
fn insert_table_with_retries(store: &mut DataStore, table: &DataTable) {
for row in table.to_rows() {
let mut row = row.unwrap();
loop {
match store.insert_row(&row) {
Ok(_) => break,
Err(WriteError::ReusedRowId(_)) => {
row.row_id = row.row_id.next();
}
err @ Err(_) => err.map(|_| ()).unwrap(),
}
}
}
}
#[test]
fn data_store_dump() {
init_logs();
for mut config in re_data_store::test_util::all_configs() {
config.store_insert_ids = false;
let mut store1 = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
let mut store2 = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
let mut store3 = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
data_store_dump_impl(&mut store1, &mut store2, &mut store3);
store1.gc(&GarbageCollectionOptions::gc_everything());
store2.gc(&GarbageCollectionOptions::gc_everything());
store3.gc(&GarbageCollectionOptions::gc_everything());
data_store_dump_impl(&mut store1, &mut store2, &mut store3);
}
}
fn data_store_dump_impl(store1: &mut DataStore, store2: &mut DataStore, store3: &mut DataStore) {
let insert_table = |store: &mut DataStore, table: &DataTable| {
insert_table_with_retries(store, table);
let mut table_timeless = table.clone();
table_timeless.col_timelines = Default::default();
insert_table_with_retries(store, &table_timeless);
};
let ent_paths = ["this/that", "other", "yet/another/one"];
let tables = ent_paths
.iter()
.map(|ent_path| create_insert_table(*ent_path))
.collect_vec();
for table in &tables {
insert_table(store1, table);
}
sanity_unwrap(store1);
for table in store1.to_data_tables(None) {
insert_table_with_retries(store2, &table);
}
sanity_unwrap(store2);
for table in store2.to_data_tables(None) {
insert_table_with_retries(store3, &table);
}
sanity_unwrap(store3);
#[cfg(feature = "polars")]
{
let store1_df = store1.to_dataframe();
let store2_df = store2.to_dataframe();
let store3_df = store3.to_dataframe();
assert!(
store1_df == store2_df,
"First & second stores differ:\n{store1_df}\n{store2_df}"
);
assert!(
store1_df == store3_df,
"First & third stores differ:\n{store1_df}\n{store3_df}"
);
}
let store1_stats = DataStoreStats::from_store(store1);
let store2_stats = DataStoreStats::from_store(store2);
let store3_stats = DataStoreStats::from_store(store3);
assert!(
store1_stats.temporal.num_bytes <= store2_stats.temporal.num_bytes
&& store1_stats.timeless.num_bytes <= store2_stats.timeless.num_bytes,
"First store should have <= amount of data of second store:\n\
{store1_stats:#?}\n{store2_stats:#?}"
);
assert!(
store2_stats.temporal.num_bytes <= store3_stats.temporal.num_bytes
&& store2_stats.timeless.num_bytes <= store3_stats.timeless.num_bytes,
"Second store should have <= amount of data of third store:\n\
{store2_stats:#?}\n{store3_stats:#?}"
);
}
#[test]
fn data_store_dump_filtered() {
init_logs();
for mut config in re_data_store::test_util::all_configs() {
config.store_insert_ids = false;
let mut store1 = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
let mut store2 = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
data_store_dump_filtered_impl(&mut store1, &mut store2);
store1.gc(&GarbageCollectionOptions::gc_everything());
store2.gc(&GarbageCollectionOptions::gc_everything());
data_store_dump_filtered_impl(&mut store1, &mut store2);
}
}
fn data_store_dump_filtered_impl(store1: &mut DataStore, store2: &mut DataStore) {
let timeline_frame_nr = Timeline::new_sequence("frame_nr");
let timeline_log_time = Timeline::log_time();
let frame1: TimeInt = 1.into();
let frame2: TimeInt = 2.into();
let frame3: TimeInt = 3.into();
let frame4: TimeInt = 4.into();
let ent_paths = ["this/that", "other", "yet/another/one"];
let tables = ent_paths
.iter()
.map(|ent_path| create_insert_table(*ent_path))
.collect_vec();
for table in &tables {
insert_table_with_retries(store1, table);
}
sanity_unwrap(store1);
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame1, frame1)).into()) {
insert_table_with_retries(store2, &table);
}
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame2, frame2)).into()) {
insert_table_with_retries(store2, &table);
}
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame3, frame3)).into()) {
insert_table_with_retries(store2, &table);
}
for table in store1.to_data_tables((timeline_log_time, TimeRange::new(frame3, frame3)).into()) {
insert_table_with_retries(store2, &table);
}
for table in store1.to_data_tables((timeline_frame_nr, TimeRange::new(frame4, frame4)).into()) {
insert_table_with_retries(store2, &table);
}
sanity_unwrap(store2);
#[cfg(feature = "polars")]
{
let store1_df = store1.to_dataframe();
let store2_df = store2.to_dataframe();
assert!(
store1_df == store2_df,
"First & second stores differ:\n{store1_df}\n{store2_df}"
);
}
let store1_stats = DataStoreStats::from_store(store1);
let store2_stats = DataStoreStats::from_store(store2);
assert!(
store1_stats.temporal.num_bytes <= store2_stats.temporal.num_bytes
&& store1_stats.timeless.num_bytes <= store2_stats.timeless.num_bytes,
"First store should have <= amount of data of second store:\n\
{store1_stats:#?}\n{store2_stats:#?}"
);
}
pub fn init_logs() {
static INIT: AtomicBool = AtomicBool::new(false);
if INIT
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
re_log::setup_native_logging();
}
}
fn create_insert_table(ent_path: impl Into<EntityPath>) -> DataTable {
let ent_path = ent_path.into();
let frame1: TimeInt = 1.into();
let frame2: TimeInt = 2.into();
let frame3: TimeInt = 3.into();
let frame4: TimeInt = 4.into();
let (instances1, colors1) = (build_some_instances(3), build_some_colors(3));
let row1 = test_row!(ent_path @ [
build_frame_nr(frame1),
] => 3; [instances1.clone(), colors1]);
let positions2 = build_some_positions2d(3);
let row2 = test_row!(ent_path @ [
build_frame_nr(frame2),
] => 3; [instances1, positions2]);
let positions3 = build_some_positions2d(10);
let row3 = test_row!(ent_path @ [
build_log_time(frame3.into()) , build_frame_nr(frame3),
] => 10; [positions3]);
let colors4 = build_some_colors(5);
let row4 = test_row!(ent_path @ [
build_frame_nr(frame4),
] => 5; [colors4]);
let mut table = DataTable::from_rows(TableId::new(), [row1, row2, row3, row4]);
table.compute_all_size_bytes();
table
}
#[test]
fn data_store_dump_empty_column() {
init_logs();
let mut config = re_data_store::DataStoreConfig {
indexed_bucket_num_rows: 1,
..re_data_store::DataStoreConfig::DEFAULT
};
config.store_insert_ids = false;
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config,
);
data_store_dump_empty_column_impl(&mut store);
}
fn data_store_dump_empty_column_impl(store: &mut DataStore) {
let ent_path: EntityPath = "points".into();
let frame1: TimeInt = 1.into();
let frame2: TimeInt = 2.into();
let frame3: TimeInt = 3.into();
{
let (instances1, colors1) = (build_some_instances(3), build_some_colors(3));
let row1 = test_row!(ent_path @ [
build_frame_nr(frame1),
] => 3; [instances1, colors1]);
let (instances2, positions2) = (build_some_instances(3), build_some_positions2d(3));
let row2 = test_row!(ent_path @ [
build_frame_nr(frame2),
] => 3; [instances2, positions2]);
let mut table = DataTable::from_rows(TableId::new(), [row1, row2]);
table.compute_all_size_bytes();
insert_table_with_retries(store, &table);
}
{
let (instances3, positions3) = (build_some_instances(3), build_some_colors(3));
let row3 = test_row!(ent_path @ [
build_frame_nr(frame3),
] => 3; [instances3, positions3]);
let mut table = DataTable::from_rows(TableId::new(), [row3]);
table.compute_all_size_bytes();
insert_table_with_retries(store, &table);
}
let data_msgs: Result<Vec<_>, _> = store
.to_data_tables(None)
.map(|table| table.to_arrow_msg())
.collect();
assert_eq!(data_msgs.unwrap().len(), 2);
}