use rand::Rng;
use re_data_store::{
test_row, test_util::sanity_unwrap, DataStore, DataStoreConfig, DataStoreStats,
GarbageCollectionOptions, LatestAtQuery, WriteError,
};
use re_log_types::example_components::MyPoint;
use re_log_types::{
build_frame_nr, build_log_time, DataCell, DataRow, Duration, EntityPath, RowId, Time, TimeInt,
TimePoint, TimeType, Timeline,
};
use re_types::components::InstanceKey;
use re_types::datagen::{build_some_colors, build_some_instances, build_some_positions2d};
use re_types_core::Loggable as _;
#[test]
fn row_id_ordering_semantics() -> anyhow::Result<()> {
let entity_path: EntityPath = "some_entity".into();
let timeline_frame = Timeline::new_sequence("frame");
let timepoint = TimePoint::from_iter([(timeline_frame, 10.into())]);
let point1 = MyPoint::new(1.0, 1.0);
let point2 = MyPoint::new(2.0, 2.0);
{
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let row_id = RowId::new();
let row = DataRow::from_component_batches(
row_id,
timepoint.clone(),
entity_path.clone(),
[&[point1] as _],
)?;
store.insert_row(&row)?;
let row_id = RowId::new();
let row = DataRow::from_component_batches(
row_id,
timepoint.clone(),
entity_path.clone(),
[&[point2] as _],
)?;
store.insert_row(&row)?;
{
let query = LatestAtQuery {
timeline: timeline_frame,
at: 11.into(),
};
let got_point = store
.query_latest_component::<MyPoint>(&entity_path, &query)
.unwrap()
.value;
similar_asserts::assert_eq!(point2, got_point);
}
}
{
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let row_id = RowId::new();
let row = DataRow::from_component_batches(
row_id,
timepoint.clone(),
entity_path.clone(),
[&[point1] as _],
)?;
store.insert_row(&row)?;
let row = DataRow::from_component_batches(
row_id,
timepoint.clone(),
entity_path.clone(),
[&[point2] as _],
)?;
let res = store.insert_row(&row);
assert!(matches!(res, Err(WriteError::ReusedRowId(_)),));
}
{
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let row_id1 = RowId::new();
let row_id2 = row_id1.next();
let row = DataRow::from_component_batches(
row_id2,
timepoint.clone(),
entity_path.clone(),
[&[point1] as _],
)?;
store.insert_row(&row)?;
let row = DataRow::from_component_batches(
row_id1,
timepoint.clone(),
entity_path.clone(),
[&[point2] as _],
)?;
store.insert_row(&row)?;
{
let query = LatestAtQuery {
timeline: timeline_frame,
at: 11.into(),
};
let got_point = store
.query_latest_component::<MyPoint>(&entity_path, &query)
.unwrap()
.value;
similar_asserts::assert_eq!(point1, got_point);
}
}
{
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let row_id1 = RowId::new();
let row_id2 = row_id1.next();
let row = DataRow::from_component_batches(
row_id2,
TimePoint::timeless(),
entity_path.clone(),
[&[point1] as _],
)?;
store.insert_row(&row)?;
let row = DataRow::from_component_batches(
row_id1,
TimePoint::timeless(),
entity_path.clone(),
[&[point2] as _],
)?;
store.insert_row(&row)?;
{
let got_point = store
.query_timeless_component::<MyPoint>(&entity_path)
.unwrap()
.value;
similar_asserts::assert_eq!(point1, got_point);
}
}
Ok(())
}
#[test]
fn write_errors() {
re_log::setup_logging();
let ent_path = EntityPath::from("this/that");
{
pub fn build_sparse_instances() -> DataCell {
DataCell::from_component_sparse::<InstanceKey>([Some(1), None, Some(3)])
}
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let row = test_row!(ent_path @
[build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [
build_sparse_instances(), build_some_positions2d(3)
]);
assert!(matches!(
store.insert_row(&row),
Err(WriteError::SparseClusteringComponent(_)),
));
}
{
pub fn build_unsorted_instances() -> DataCell {
DataCell::from_component::<InstanceKey>([1, 3, 2])
}
pub fn build_duped_instances() -> DataCell {
DataCell::from_component::<InstanceKey>([1, 2, 2])
}
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
{
let row = test_row!(ent_path @
[build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [
build_unsorted_instances(), build_some_positions2d(3)
]);
assert!(matches!(
store.insert_row(&row),
Err(WriteError::InvalidClusteringComponent(_)),
));
}
{
let row = test_row!(ent_path @
[build_frame_nr(32.into()), build_log_time(Time::now())] => 3; [
build_duped_instances(), build_some_positions2d(3)
]);
assert!(matches!(
store.insert_row(&row),
Err(WriteError::InvalidClusteringComponent(_)),
));
}
}
{
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let mut row = test_row!(ent_path @ [
build_frame_nr(1.into()),
build_log_time(Time::now()),
] => 1; [ build_some_positions2d(1) ]);
row.row_id = re_log_types::RowId::new();
store.insert_row(&row).unwrap();
row.row_id = row.row_id.next();
store.insert_row(&row).unwrap();
assert!(matches!(
store.insert_row(&row),
Err(WriteError::ReusedRowId(_)),
));
let err = store.insert_row(&row).unwrap_err();
let WriteError::ReusedRowId(err_row_id) = err else {
unreachable!();
};
assert_eq!(row.row_id(), err_row_id);
}
}
#[test]
fn latest_at_emptiness_edge_cases() {
re_log::setup_logging();
for config in re_data_store::test_util::all_configs() {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
latest_at_emptiness_edge_cases_impl(&mut store);
}
}
fn latest_at_emptiness_edge_cases_impl(store: &mut DataStore) {
let ent_path = EntityPath::from("this/that");
let now = Time::now();
let now_minus_1s = now - Duration::from_secs(1.0);
let now_minus_1s_nanos = now_minus_1s.nanos_since_epoch().into();
let frame39 = 39.into();
let frame40 = 40.into();
let num_instances = 3;
store
.insert_row(&test_row!(ent_path @ [
build_log_time(now), build_frame_nr(frame40),
] => num_instances; [build_some_instances(num_instances as _)]))
.unwrap();
sanity_unwrap(store);
let timeline_wrong_name = Timeline::new("lag_time", TimeType::Time);
let timeline_wrong_kind = Timeline::new("log_time", TimeType::Sequence);
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let timeline_log_time = Timeline::log_time();
{
let cells = store.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame39),
&ent_path,
InstanceKey::name(),
&[InstanceKey::name()],
);
assert!(cells.is_none());
}
{
let cells = store.latest_at(
&LatestAtQuery::new(timeline_log_time, now_minus_1s_nanos),
&ent_path,
InstanceKey::name(),
&[InstanceKey::name()],
);
assert!(cells.is_none());
}
{
let cells = store.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame40),
&EntityPath::from("does/not/exist"),
InstanceKey::name(),
&[InstanceKey::name()],
);
assert!(cells.is_none());
}
{
let components = &["does".into(), "not".into(), "exist".into()];
let (_, _, cells) = store
.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame40),
&ent_path,
InstanceKey::name(),
components,
)
.unwrap();
cells.iter().all(|cell| cell.is_none());
}
{
let (_, _, cells) = store
.latest_at(
&LatestAtQuery::new(timeline_frame_nr, frame40),
&ent_path,
InstanceKey::name(),
&[],
)
.unwrap();
assert!(cells.is_empty());
}
{
let cells = store.latest_at(
&LatestAtQuery::new(timeline_wrong_name, frame40),
&EntityPath::from("does/not/exist"),
InstanceKey::name(),
&[InstanceKey::name()],
);
assert!(cells.is_none());
}
{
let cells = store.latest_at(
&LatestAtQuery::new(timeline_wrong_kind, frame40),
&EntityPath::from("does/not/exist"),
InstanceKey::name(),
&[InstanceKey::name()],
);
assert!(cells.is_none());
}
}
#[test]
fn gc_correct() {
re_log::setup_logging();
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
DataStoreConfig::default(),
);
let stats_empty = DataStoreStats::from_store(&store);
let mut rng = rand::thread_rng();
let num_frames = rng.gen_range(0..=100);
let frames = (0..num_frames).filter(|_| rand::thread_rng().gen());
for frame_nr in frames {
let num_ents = 10;
for i in 0..num_ents {
let ent_path = EntityPath::from(format!("this/that/{i}"));
let num_instances = rng.gen_range(0..=1_000);
let row = test_row!(ent_path @ [
build_frame_nr(frame_nr.into()),
] => num_instances; [
build_some_colors(num_instances as _),
]);
store.insert_row(&row).unwrap();
}
}
sanity_unwrap(&store);
check_still_readable(&store);
let stats = DataStoreStats::from_store(&store);
let (store_events, stats_diff) = store.gc(&GarbageCollectionOptions::gc_everything());
let stats_diff = stats_diff + stats_empty;
assert_eq!(
stats.metadata_registry.num_rows,
stats_diff.metadata_registry.num_rows
);
assert_eq!(
stats.metadata_registry.num_bytes,
stats_diff.metadata_registry.num_bytes
);
assert_eq!(stats.temporal.num_rows, stats_diff.temporal.num_rows);
sanity_unwrap(&store);
check_still_readable(&store);
for event in store_events {
assert!(store.get_msg_metadata(&event.row_id).is_none());
}
let (store_events, stats_diff) = store.gc(&GarbageCollectionOptions::gc_everything());
assert!(store_events.is_empty());
assert_eq!(DataStoreStats::default(), stats_diff);
sanity_unwrap(&store);
check_still_readable(&store);
}
fn check_still_readable(store: &DataStore) {
store.to_data_table().unwrap(); }
#[test]
fn gc_metadata_size() -> anyhow::Result<()> {
for enable_batching in [false, true] {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);
let point = MyPoint::new(1.0, 1.0);
for _ in 0..3 {
let row = DataRow::from_component_batches(
RowId::new(),
TimePoint::timeless(),
"xxx".into(),
[&[point] as _],
)?;
store.insert_row(&row).unwrap();
}
for _ in 0..2 {
_ = store.gc(&GarbageCollectionOptions {
target: re_data_store::GarbageCollectionTarget::DropAtLeastFraction(1.0),
gc_timeless: false,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching,
time_budget: std::time::Duration::MAX,
});
_ = store.gc(&GarbageCollectionOptions {
target: re_data_store::GarbageCollectionTarget::DropAtLeastFraction(1.0),
gc_timeless: false,
protect_latest: 1,
purge_empty_tables: false,
dont_protect: Default::default(),
enable_batching,
time_budget: std::time::Duration::MAX,
});
}
}
Ok(())
}
#[test]
fn entity_min_time_correct() -> anyhow::Result<()> {
re_log::setup_logging();
for config in re_data_store::test_util::all_configs() {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
config.clone(),
);
entity_min_time_correct_impl(&mut store)?;
}
Ok(())
}
fn entity_min_time_correct_impl(store: &mut DataStore) -> anyhow::Result<()> {
let ent_path = EntityPath::from("this/that");
let wrong_ent_path = EntityPath::from("this/that/other");
let point = MyPoint::new(1.0, 1.0);
let timeline_wrong_name = Timeline::new("lag_time", TimeType::Time);
let timeline_wrong_kind = Timeline::new("log_time", TimeType::Sequence);
let timeline_frame_nr = Timeline::new("frame_nr", TimeType::Sequence);
let timeline_log_time = Timeline::log_time();
let now = Time::now();
let now_plus_one = now + Duration::from_secs(1.0);
let now_minus_one = now - Duration::from_secs(1.0);
let row = DataRow::from_component_batches(
RowId::new(),
TimePoint::from_iter([
(timeline_log_time, now.into()),
(timeline_frame_nr, 42.into()),
]),
ent_path.clone(),
[&[point] as _],
)?;
store.insert_row(&row).unwrap();
assert!(store
.entity_min_time(&timeline_wrong_name, &ent_path)
.is_none());
assert!(store
.entity_min_time(&timeline_wrong_kind, &ent_path)
.is_none());
assert_eq!(
store.entity_min_time(&timeline_frame_nr, &ent_path),
Some(TimeInt::from(42))
);
assert_eq!(
store.entity_min_time(&timeline_log_time, &ent_path),
Some(TimeInt::from(now))
);
assert!(store
.entity_min_time(&timeline_frame_nr, &wrong_ent_path)
.is_none());
let row = DataRow::from_component_batches(
RowId::new(),
TimePoint::from_iter([
(timeline_log_time, now_plus_one.into()),
(timeline_frame_nr, 54.into()),
]),
ent_path.clone(),
[&[point] as _],
)?;
store.insert_row(&row).unwrap();
assert!(store
.entity_min_time(&timeline_wrong_name, &ent_path)
.is_none());
assert!(store
.entity_min_time(&timeline_wrong_kind, &ent_path)
.is_none());
assert_eq!(
store.entity_min_time(&timeline_frame_nr, &ent_path),
Some(TimeInt::from(42))
);
assert_eq!(
store.entity_min_time(&timeline_log_time, &ent_path),
Some(TimeInt::from(now))
);
assert!(store
.entity_min_time(&timeline_frame_nr, &wrong_ent_path)
.is_none());
let row = DataRow::from_component_batches(
RowId::new(),
TimePoint::from_iter([
(timeline_log_time, now_minus_one.into()),
(timeline_frame_nr, 32.into()),
]),
ent_path.clone(),
[&[point] as _],
)?;
store.insert_row(&row).unwrap();
assert!(store
.entity_min_time(&timeline_wrong_name, &ent_path)
.is_none());
assert!(store
.entity_min_time(&timeline_wrong_kind, &ent_path)
.is_none());
assert_eq!(
store.entity_min_time(&timeline_frame_nr, &ent_path),
Some(TimeInt::from(32))
);
assert_eq!(
store.entity_min_time(&timeline_log_time, &ent_path),
Some(TimeInt::from(now_minus_one))
);
assert!(store
.entity_min_time(&timeline_frame_nr, &wrong_ent_path)
.is_none());
Ok(())
}