diff --git a/internal/gitaly/storage/raftmgr/replica.go b/internal/gitaly/storage/raftmgr/replica.go index c0bff5eaa1293fb564e6ba1c4c5e9572439d0d2f..133e8ad20c59c49369c89e6127c30aaa4accbdcc 100644 --- a/internal/gitaly/storage/raftmgr/replica.go +++ b/internal/gitaly/storage/raftmgr/replica.go @@ -322,6 +322,18 @@ func NewReplica( scopedMetrics := metrics.Scope(logStore.storageName) + // Here we add a mapping in the routing table between the relative path of the replica + // and its PartitionKey. This is important to allow querying the routing table by relative + // path when we receive requests from clients. + // Only add this entry if relativePath is defined. + // This is because it is possible that a replica gets created without a defined relative path. + // See: https://gitlab.com/gitlab-org/gitaly/-/blob/master/internal/gitaly/storage/storagemgr/partition/factory.go?ref_type=heads#L82 + if len(relativePath) > 0 { + if err = raftEnabledStorage.routingTable.UpsertRelativePathIndex(relativePath, partitionKey); err != nil { + return nil, fmt.Errorf("inserting relative path index in routing table: %w", err) + } + } + return &Replica{ memberID: memberID, relativePath: relativePath, @@ -573,6 +585,11 @@ func (replica *Replica) Close() error { replica.cancel() replica.wg.Wait() + // Remove the mapping between the relative path and the PartitionKey + // Ignoring the error as it might not be appropriate to abort + // closing the replica if we are unable to remove it from the DB. + _ = replica.raftEnabledStorage.routingTable.DeleteRelativePathIndex(replica.relativePath) + if replica.raftEnabledStorage != nil { // Mostly for tests; raftEnabledStorage should never be nil in practice. replica.raftEnabledStorage.DeregisterReplica(replica) diff --git a/internal/gitaly/storage/raftmgr/replica_test.go b/internal/gitaly/storage/raftmgr/replica_test.go index 81b24f1d7a9941a1df625f1791f36a536dc9bc0b..52b1776fe41a8a99ac2b65ac8c6a59940cd32f92 100644 --- a/internal/gitaly/storage/raftmgr/replica_test.go +++ b/internal/gitaly/storage/raftmgr/replica_test.go @@ -239,7 +239,7 @@ func TestReplica_Initialize(t *testing.T) { require.NoError(t, err) raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "@hashed/ab/cd/abcd") mgr, err := raftFactory(ctx, storageName, logStore, logger, metrics) require.NoError(t, err) @@ -319,7 +319,7 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "@hashed/ab/cd/abcd") mgr, err := raftFactory(ctx, storageName, logStore, logger, metrics) require.NoError(t, err) defer func() { require.NoError(t, mgr.Close()) }() @@ -365,7 +365,7 @@ func TestReplica_Initialize(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "@hashed/ab/cd/abcd") mgr1, err := raftFactory(ctx, storageName, logStore1, logger, metrics) require.NoError(t, err) @@ -428,7 +428,7 @@ func TestReplica_Initialize(t *testing.T) { logStore2, err := NewReplicaLogStore(storageName, partitionID, raftCfg, db, stagingDir, stateDir, &MockConsumer{}, log.NewPositionTracker(), logger, NewMetrics()) require.NoError(t, err) - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "@hashed/ab/cd/abcd") mgr2, err := raftFactory(ctx, storageName, logStore2, logger, metrics) require.NoError(t, err) @@ -713,7 +713,7 @@ func TestReplica_AppendLogEntry(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder), WithOpTimeout(1*time.Nanosecond)) // Create replica with very short operation timeout - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "@hashed/ab/cd/abcd") mgr, err := raftFactory( ctx, storageName, @@ -834,7 +834,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(recorder)) // Configure replica - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "@hashed/ab/cd/abcd") mgr, err := raftFactory(ctx, storageName, logStore, logger, metrics) require.NoError(t, err) @@ -888,7 +888,7 @@ func TestReplica_AppendLogEntry_CrashRecovery(t *testing.T) { raftFactory := DefaultFactoryWithNode(raftCfg, raftNode, WithEntryRecorder(env.recorder)) - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(env.storageName, env.partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(env.storageName, env.partitionID), 1, "@hashed/ab/cd/abcd") recoveryMgr, err := raftFactory(ctx, env.storageName, logStore, logger, env.metrics) require.NoError(t, err) @@ -1670,7 +1670,7 @@ func TestReplica_StorageConnection(t *testing.T) { // Create factory that connects replicas to storage raftFactory := DefaultFactoryWithNode(cfg.Raft, raftNode) - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "@hashed/ab/cd/abcd") replica, err := raftFactory(ctx, storageName, logStore, logger, NewMetrics()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, replica.Close()) }) @@ -1702,7 +1702,7 @@ func TestReplica_StorageConnection(t *testing.T) { }) t.Run("multiple replicas for same partition key", func(t *testing.T) { - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "@hashed/ab/cd/abcd") duplicateReplica, err := raftFactory(ctx, storageName, logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, duplicateReplica) @@ -1710,7 +1710,7 @@ func TestReplica_StorageConnection(t *testing.T) { t.Run("Register different replicas for different partition keys", func(t *testing.T) { partitionID := storage.PartitionID(2) - ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "") + ctx = storage.ContextWithPartitionInfo(ctx, NewPartitionKey(storageName, partitionID), 1, "@hashed/ab/cd/abcd") replicaTwo, err := raftFactory(ctx, storageName, logStore, logger, NewMetrics()) require.NoError(t, err) require.NotNil(t, replicaTwo) diff --git a/internal/gitaly/storage/raftmgr/routing_table.go b/internal/gitaly/storage/raftmgr/routing_table.go index 13ba63722977facee603e8aa6add002314c64531..af268e087ae5ae727a5f70376bd0cbed9799003b 100644 --- a/internal/gitaly/storage/raftmgr/routing_table.go +++ b/internal/gitaly/storage/raftmgr/routing_table.go @@ -13,9 +13,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" ) -func routingKey(partitionKey *gitalypb.RaftPartitionKey) []byte { - return []byte(fmt.Sprintf("raft/%s", partitionKey.GetValue())) -} +const ( + raftRoutingPrefix = "raft/rt/" + raftRelPathPrefix = "raft/relpathidx/" +) // RoutingTableEntry represents a Raft cluster's routing state for a partition. // It includes the current leader, all replicas, and Raft consensus metadata. @@ -27,12 +28,31 @@ type RoutingTableEntry struct { Index uint64 } +// buildRoutingKey returns the DB key to use when querying the database +// for the given partitionKey. +func buildRoutingKey(partitionKey *gitalypb.RaftPartitionKey) []byte { + return []byte(fmt.Sprintf("%s%s", raftRoutingPrefix, partitionKey.GetValue())) +} + +// buildRelativePathIndexKey returns the DB key to use when mapping from a +// repository's relative path to it's related PartitionKey. +// When a request from a client comes into Gitaly, the request do not hold +// the PartitionKey. Instead, it refers to a repository by relative path. +// We need to keep a mapping between a relative path and a PartitionKey. +// The DB holds such an index, queryable by the RelativePathIndexKey. +func buildRelativePathIndexKey(relativePath string) []byte { + return []byte(fmt.Sprintf("%s%s", raftRelPathPrefix, relativePath)) +} + // RoutingTable handles translation between member IDs and addresses type RoutingTable interface { Translate(partitionKey *gitalypb.RaftPartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) GetEntry(partitionKey *gitalypb.RaftPartitionKey) (*RoutingTableEntry, error) + GetEntryByRelativePath(relativePath string) (*RoutingTableEntry, error) UpsertEntry(entry RoutingTableEntry) error + UpsertRelativePathIndex(relativePath string, partitionKey *gitalypb.RaftPartitionKey) error DeleteEntry(partitionKey *gitalypb.RaftPartitionKey) error + DeleteRelativePathIndex(relativePath string) error ApplyReplicaConfChange(partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error ListEntries() (map[string]*RoutingTableEntry, error) } @@ -64,7 +84,7 @@ func (r *kvRoutingTable) UpsertEntry(entry RoutingTableEntry) error { } partitionKey := entry.Replicas[0].GetPartitionKey() - key := routingKey(partitionKey) + key := buildRoutingKey(partitionKey) item, err := txn.Get(key) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { @@ -103,13 +123,29 @@ func (r *kvRoutingTable) UpsertEntry(entry RoutingTableEntry) error { }) } +// UpsertRelativePathIndex adds a mapping in the routing table between a repository's relative path +// and the PartitionKey related to that repository. +// Keeping such an index is critical because when a client sends a request to Gitaly, most +// of the time the repository targeted by the request is identified solely by the relative path. +// We need the ability to convert a relative path to a PartitionKey. +func (r *kvRoutingTable) UpsertRelativePathIndex(relativePath string, partitionKey *gitalypb.RaftPartitionKey) error { + if relativePath == "" { + return fmt.Errorf("relative path cannot be empty") + } + + key := buildRelativePathIndexKey(relativePath) + return r.kvStore.Update(func(tx keyvalue.ReadWriter) error { + return tx.Set(key, []byte(partitionKey.GetValue())) + }) +} + // DeleteEntry deletes a routing table entry based on the partition key func (r *kvRoutingTable) DeleteEntry(partitionKey *gitalypb.RaftPartitionKey) error { r.mutex.Lock() defer r.mutex.Unlock() return r.kvStore.Update(func(txn keyvalue.ReadWriter) error { - key := routingKey(partitionKey) + key := buildRoutingKey(partitionKey) if err := txn.Delete(key); err != nil { return fmt.Errorf("delete entry: %w", err) @@ -119,9 +155,21 @@ func (r *kvRoutingTable) DeleteEntry(partitionKey *gitalypb.RaftPartitionKey) er }) } +// DeleteRelativePathIndex deletes the mapping between the relativePath and its +// corresponding PartitionKey from the DB. +func (r *kvRoutingTable) DeleteRelativePathIndex(relativePath string) error { + return r.kvStore.Update(func(txn keyvalue.ReadWriter) error { + key := buildRelativePathIndexKey(relativePath) + if err := txn.Delete(key); err != nil { + return fmt.Errorf("delete entry: %w", err) + } + return nil + }) +} + // GetEntry retrieves a routing table entry func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.RaftPartitionKey) (*RoutingTableEntry, error) { - key := routingKey(partitionKey) + key := buildRoutingKey(partitionKey) var entry RoutingTableEntry if err := r.kvStore.View(func(txn keyvalue.ReadWriter) error { @@ -140,6 +188,33 @@ func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.RaftPartitionKey) (*Rou return &entry, nil } +// GetEntryByRelativePath allows to fetch the routing table entry by a repository's +// relative path instead of PartitionKey. +// This is useful in cases where the PartitionKey is unknown, such as when a request +// comes into Gitaly. Clients usually identifies repositories by relative path. +func (r *kvRoutingTable) GetEntryByRelativePath(relativePath string) (*RoutingTableEntry, error) { + relativePathIndexKey := buildRelativePathIndexKey(relativePath) + + // The PartitionKey we need to retrieve from the DB + partitionKey := &gitalypb.RaftPartitionKey{} + err := r.kvStore.View(func(txn keyvalue.ReadWriter) error { + item, err := txn.Get(relativePathIndexKey) + if err != nil { + return err + } + return item.Value(func(value []byte) error { + partitionKey.Value = string(value) + return nil + }) + }) + if err != nil { + return nil, fmt.Errorf("relative path not in db index: %w", err) + } + + // Use the PartitionKey to get the routing entry + return r.GetEntry(partitionKey) +} + // Translate returns the storage name and address for a given partition key and member ID func (r *kvRoutingTable) Translate(partitionKey *gitalypb.RaftPartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) { entry, err := r.GetEntry(partitionKey) @@ -158,7 +233,7 @@ func (r *kvRoutingTable) Translate(partitionKey *gitalypb.RaftPartitionKey, memb func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.RaftPartitionKey, changes *ReplicaConfChanges) error { routingTableEntry, err := r.GetEntry(partitionKey) - if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { + if errorAccessingDatabase(err) { return fmt.Errorf("getting routing table entry: %w", err) } @@ -256,7 +331,7 @@ func (r *kvRoutingTable) ListEntries() (map[string]*RoutingTableEntry, error) { entries := make(map[string]*RoutingTableEntry) // With opaque partition keys, we return all entries. - prefix := "raft/" + prefix := raftRoutingPrefix if err := r.kvStore.View(func(txn keyvalue.ReadWriter) error { iter := txn.NewIterator(keyvalue.IteratorOptions{ @@ -284,3 +359,7 @@ func (r *kvRoutingTable) ListEntries() (map[string]*RoutingTableEntry, error) { return entries, nil } + +func errorAccessingDatabase(err error) bool { + return err != nil && !errors.Is(err, badger.ErrKeyNotFound) +} diff --git a/internal/gitaly/storage/raftmgr/routing_table_test.go b/internal/gitaly/storage/raftmgr/routing_table_test.go index bc447e6bc3289c82e767a7216a49db2f1fb5c373..a66675b8ca5225ffa3df386aa10a2c84f0615c0e 100644 --- a/internal/gitaly/storage/raftmgr/routing_table_test.go +++ b/internal/gitaly/storage/raftmgr/routing_table_test.go @@ -32,6 +32,7 @@ func TestPersistentRoutingTable(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) entry := RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, @@ -58,6 +59,7 @@ func TestPersistentRoutingTable(t *testing.T) { key := NewPartitionKey("test-authority", 2) entry1 := RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", Replicas: []*gitalypb.ReplicaID{ { PartitionKey: key, @@ -143,6 +145,7 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) initialEntry := &RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, @@ -311,6 +314,7 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) entry := &RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, @@ -349,6 +353,7 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) initialEntry := &RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, @@ -394,6 +399,7 @@ func TestApplyReplicaConfChange(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) initialEntry := &RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, @@ -481,7 +487,7 @@ func TestPersistentRoutingTable_ListEntries(t *testing.T) { require.Len(t, entries, 1) // Keys are opaque hashes, so we get the single entry - actualEntry := entries["raft/"+partitionKey.GetValue()] + actualEntry := entries[raftRoutingPrefix+partitionKey.GetValue()] require.NotNil(t, actualEntry) // Verify all fields match @@ -571,7 +577,7 @@ func TestPersistentRoutingTable_ListEntries(t *testing.T) { // Verify each entry for _, e := range entries { - expectedKey := fmt.Sprintf("raft/%s", e.key.GetValue()) + expectedKey := fmt.Sprintf(raftRoutingPrefix+"%s", e.key.GetValue()) actualEntry, exists := listedEntries[expectedKey] require.True(t, exists, "Entry %s should exist", expectedKey) @@ -585,6 +591,7 @@ func TestPersistentRoutingTable_ListEntries(t *testing.T) { // Insert initial entry initialEntry := RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, @@ -642,6 +649,7 @@ func TestPersistentRoutingTable_ListEntries(t *testing.T) { partitionKey := createPartitionKey("concurrent-authority", 1) entry := RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, @@ -674,6 +682,46 @@ func TestPersistentRoutingTable_ListEntries(t *testing.T) { wg.Wait() }) + t.Run("should not return relative path index entries", func(t *testing.T) { + rt := createRoutingTable(t) + partitionKey := createPartitionKey("test-authority", 1) + + entry := RoutingTableEntry{ + RelativePath: "@hashed/test/repo.git", + Replicas: []*gitalypb.ReplicaID{ + { + PartitionKey: partitionKey, + MemberId: 1, + StorageName: "test-storage", + Metadata: createMetadata("localhost:8075"), + Type: gitalypb.ReplicaID_REPLICA_TYPE_VOTER, + }, + }, + LeaderID: 1, + Term: 5, + Index: 100, + } + + require.NoError(t, rt.UpsertEntry(entry)) + + // Insert a relative path index entry + require.NoError(t, rt.UpsertRelativePathIndex(entry.RelativePath, partitionKey)) + + entries, err := rt.ListEntries() + require.NoError(t, err) + + // We need to ensure `ListEntries` only returns the RoutingTableEntry + // and not the relative path index ones. + require.Len(t, entries, 1) + + // Keys are opaque hashes, so we get the single entry + actualEntry := entries[raftRoutingPrefix+partitionKey.GetValue()] + require.NotNil(t, actualEntry) + + // Verify all fields match + testhelper.ProtoEqual(t, &entry, actualEntry) + }) + t.Run("returns all entries", func(t *testing.T) { rt := createRoutingTable(t) @@ -772,6 +820,7 @@ func TestDeleteEntry(t *testing.T) { partitionKey := NewPartitionKey("test-authority", 1) entry := RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", Replicas: []*gitalypb.ReplicaID{ { PartitionKey: partitionKey, @@ -878,3 +927,76 @@ func TestDeleteEntry(t *testing.T) { require.NoError(t, err) }) } + +func TestRelativePathIndex(t *testing.T) { + t.Parallel() + + createMetadata := func(address string) *gitalypb.ReplicaID_Metadata { + return &gitalypb.ReplicaID_Metadata{ + Address: address, + } + } + + t.Run("query by relative path", func(t *testing.T) { + t.Parallel() + + dir := testhelper.TempDir(t) + kvStore, err := keyvalue.NewBadgerStore(testhelper.NewLogger(t), dir) + require.NoError(t, err) + defer func() { + require.NoError(t, kvStore.Close()) + }() + + rt := NewKVRoutingTable(kvStore) + + partitionKey := NewPartitionKey("test-authority", 1) + entry := RoutingTableEntry{ + RelativePath: "@hashed/repo1.git", + Replicas: []*gitalypb.ReplicaID{ + { + PartitionKey: partitionKey, + MemberId: 1, + StorageName: "test-storage", + Metadata: createMetadata("localhost:1234"), + }, + }, + Term: 1, + Index: 1, + } + + err = rt.UpsertRelativePathIndex(entry.RelativePath, partitionKey) + require.NoError(t, err) + + err = rt.UpsertEntry(entry) + require.NoError(t, err) + + // Verify entry exists + retrievedEntry, err := rt.GetEntryByRelativePath(entry.RelativePath) + require.NoError(t, err) + require.Equal(t, uint64(1), retrievedEntry.Term) + require.Equal(t, uint64(1), retrievedEntry.Index) + + // Delete the entry from the DB + err = rt.DeleteEntry(partitionKey) + require.NoError(t, err) + + // At this point, there still exist a mapping in the DB + // between the relative path and the PartitionKey, so + // fetching the entry should not return while mapping + // the relative path to a PartitionKey, but rather fetching + // the entry itself. + _, err = rt.GetEntryByRelativePath(entry.RelativePath) + require.Error(t, err) + require.Contains(t, err.Error(), "Key not found") + + // Now we delete the relative path index + err = rt.DeleteRelativePathIndex(entry.RelativePath) + require.NoError(t, err) + + // Now querying by relative path should return an error indicating + // that the mapping no longer exists in the database. + _, err = rt.GetEntryByRelativePath(entry.RelativePath) + require.Error(t, err) + require.Contains(t, err.Error(), "relative path not in db index") + }) +} diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index a5c1821cb0c936023e828f11ad55113634100fcd..ea00d84cbb9b3d3b44caa46909c5fb9dd1176475 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -725,6 +725,7 @@ func RequireDatabase(tb testing.TB, ctx context.Context, database keyvalue.Trans return nil })) + fmt.Printf("\n\n\nUNEXPECTED: %v\n\n\n", unexpectedKeys) require.Emptyf(tb, unexpectedKeys, "database contains unexpected keys") // Ignore optional keys in expectedState but not in actualState @@ -1744,7 +1745,7 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas tc.expectedState.Database[string(key)] = &anypb.Any{} } partitionPrefix := storagemgr.KeyPrefixPartition(storagemgr.MetadataPartitionID) - peerKey := fmt.Sprintf("%sraft/%s", partitionPrefix, raftmgr.NewPartitionKey(storageName, setup.PartitionID).GetValue()) + peerKey := fmt.Sprintf("%sraft/rt/%s", partitionPrefix, raftmgr.NewPartitionKey(storageName, setup.PartitionID).GetValue()) if _, ok := tc.expectedState.Database[peerKey]; !ok { tc.expectedState.Database[peerKey] = &anypb.Any{} }