From d1ba2bcf555f9b94484af28c1f7f9030a3df6245 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 12 Jul 2024 16:32:13 +0700 Subject: [PATCH 1/3] raft: Add replication factor to Raft config This commit adds the replication factor to the Raft config. This config defines the number of nodes where the data of a storage are replicated. It includes the main storage. For example, ReplicationFactor=3 means 1 main storage + 2 replicated storages. The replication factor is set at the storage level. All repositories of a storage share the same behavior. Although our Raft architecture makes it feasible to set the replication factor at the repository level, we don't need that fine-grained level of control at the moment. This decision is subject to change in the future. This commit also adds a field to cluster.proto. At this point, cluster-related protobufs are not used anywhere except in tests. So, it's okay to change protobuf field number, for the sake of simplicity. --- internal/gitaly/config/config.go | 10 +++ internal/gitaly/config/config_test.go | 111 +++++++++++++++++--------- proto/cluster.proto | 4 +- proto/go/gitalypb/cluster.pb.go | 26 ++++-- 4 files changed, 105 insertions(+), 46 deletions(-) diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 48392af76a..ef6b5ed86e 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -1206,6 +1206,9 @@ type Raft struct { // InitialMembers contains the list of initial members of the cluster. It's a map of NodeID to // RaftAddr. Due to limitations of the TOML format, it's not possible to set the map key as a uint64. InitialMembers map[string]string `toml:"initial_members" json:"initial_members"` + // ReplicationFactor defines the number of nodes where data of this storage are replicated, + // including the original storage. + ReplicationFactor uint64 `toml:"replication_factor" json:"replication_factor"` // RTTMilliseconds is the maximum round trip between two nodes in the cluster. It's used to // calculate multiple types of timeouts of Raft protocol. RTTMilliseconds uint64 `toml:"rtt_milliseconds" json:"rtt_milliseconds"` @@ -1225,6 +1228,9 @@ const ( // RaftDefaultHeartbeatTicks is the default heartbeat RTT for the Raft cluster. The estimated election // timeout is DefaultRTT * DefaultHeartbeatTicks. RaftDefaultHeartbeatTicks = 2 + // RaftDefaultReplicationFactor is the default number of nodes where data of this storage are + // replicated. By default, the factor is 3, which means 1 main storage + 2 replicated storages. + RaftDefaultReplicationFactor = 3 ) func (r Raft) fulfillDefaults() Raft { @@ -1237,6 +1243,9 @@ func (r Raft) fulfillDefaults() Raft { if r.HeartbeatTicks == 0 { r.HeartbeatTicks = RaftDefaultHeartbeatTicks } + if r.ReplicationFactor == 0 { + r.ReplicationFactor = RaftDefaultReplicationFactor + } return r } @@ -1250,6 +1259,7 @@ func (r Raft) Validate() error { Append(cfgerror.NotEmpty(r.ClusterID), "cluster_id"). Append(cfgerror.Comparable(r.NodeID).GreaterThan(0), "node_id"). Append(cfgerror.NotEmpty(r.RaftAddr), "raft_addr"). + Append(cfgerror.Comparable(r.ReplicationFactor).GreaterThan(0), "replication_factor"). Append(cfgerror.Comparable(r.RTTMilliseconds).GreaterThan(0), "rtt_millisecond"). Append(cfgerror.Comparable(r.ElectionTicks).GreaterThan(0), "election_rtt"). Append(cfgerror.Comparable(r.HeartbeatTicks).GreaterThan(0), "heartbeat_rtt") diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go index b2a8b5179f..5eebff5da7 100644 --- a/internal/gitaly/config/config_test.go +++ b/internal/gitaly/config/config_test.go @@ -2665,9 +2665,10 @@ func TestRaftConfig_Validate(t *testing.T) { "2": "localhost:3002", "3": "localhost:3003", }, - RTTMilliseconds: 200, - ElectionTicks: 20, - HeartbeatTicks: 2, + ReplicationFactor: 5, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 2, }, }, { @@ -2682,9 +2683,10 @@ func TestRaftConfig_Validate(t *testing.T) { "2": "localhost:3002", "3": "localhost:3003", }, - RTTMilliseconds: 200, - ElectionTicks: 20, - HeartbeatTicks: 2, + ReplicationFactor: 5, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 2, }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( @@ -2705,9 +2707,10 @@ func TestRaftConfig_Validate(t *testing.T) { "2": "localhost:3002", "3": "localhost:3003", }, - RTTMilliseconds: 200, - ElectionTicks: 20, - HeartbeatTicks: 2, + ReplicationFactor: 5, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 2, }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( @@ -2728,9 +2731,10 @@ func TestRaftConfig_Validate(t *testing.T) { "2": "localhost:3002", "3": "localhost:3003", }, - RTTMilliseconds: 200, - ElectionTicks: 20, - HeartbeatTicks: 2, + ReplicationFactor: 5, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 2, }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( @@ -2742,14 +2746,15 @@ func TestRaftConfig_Validate(t *testing.T) { { name: "empty initial members", cfg: Raft{ - Enabled: true, - ClusterID: "4f04a0e2-0db8-4bfa-b846-01b5b4a093fb", - NodeID: 1, - RaftAddr: "localhost:3001", - InitialMembers: map[string]string{}, - RTTMilliseconds: 200, - ElectionTicks: 20, - HeartbeatTicks: 2, + Enabled: true, + ClusterID: "4f04a0e2-0db8-4bfa-b846-01b5b4a093fb", + NodeID: 1, + RaftAddr: "localhost:3001", + InitialMembers: map[string]string{}, + ReplicationFactor: 5, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 2, }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( @@ -2771,9 +2776,10 @@ func TestRaftConfig_Validate(t *testing.T) { "3": "localhost:3003", "4": "", }, - RTTMilliseconds: 200, - ElectionTicks: 20, - HeartbeatTicks: 2, + ReplicationFactor: 5, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 2, }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( @@ -2795,9 +2801,10 @@ func TestRaftConfig_Validate(t *testing.T) { "3": "localhost:3003", "4": "1:2:3", }, - RTTMilliseconds: 200, - ElectionTicks: 20, - HeartbeatTicks: 2, + ReplicationFactor: 5, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 2, }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( @@ -2818,9 +2825,10 @@ func TestRaftConfig_Validate(t *testing.T) { "2": "localhost:3002", "3": "localhost:3003", }, - RTTMilliseconds: 0, - ElectionTicks: 20, - HeartbeatTicks: 2, + ReplicationFactor: 5, + RTTMilliseconds: 0, + ElectionTicks: 20, + HeartbeatTicks: 2, }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( @@ -2841,9 +2849,10 @@ func TestRaftConfig_Validate(t *testing.T) { "2": "localhost:3002", "3": "localhost:3003", }, - RTTMilliseconds: 200, - ElectionTicks: 0, - HeartbeatTicks: 2, + ReplicationFactor: 5, + RTTMilliseconds: 200, + ElectionTicks: 0, + HeartbeatTicks: 2, }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( @@ -2864,9 +2873,10 @@ func TestRaftConfig_Validate(t *testing.T) { "2": "localhost:3002", "3": "localhost:3003", }, - RTTMilliseconds: 200, - ElectionTicks: 20, - HeartbeatTicks: 0, + ReplicationFactor: 5, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 0, }, expectedErr: cfgerror.ValidationErrors{ cfgerror.NewValidationError( @@ -2875,6 +2885,30 @@ func TestRaftConfig_Validate(t *testing.T) { ), }, }, + { + name: "invalid replication factor", + cfg: Raft{ + Enabled: true, + ClusterID: "4f04a0e2-0db8-4bfa-b846-01b5b4a093fb", + NodeID: 1, + RaftAddr: "localhost:3001", + InitialMembers: map[string]string{ + "1": "localhost:3001", + "2": "localhost:3002", + "3": "localhost:3003", + }, + ReplicationFactor: 0, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 2, + }, + expectedErr: cfgerror.ValidationErrors{ + cfgerror.NewValidationError( + fmt.Errorf("%w: 0 is not greater than 0", cfgerror.ErrNotInRange), + "replication_factor", + ), + }, + }, } { t.Run(tc.name, func(t *testing.T) { err := tc.cfg.Validate() @@ -2905,9 +2939,10 @@ initial_members = {1 = "localhost:4001", 2 = "localhost:4002", 3 = "localhost:40 "2": "localhost:4002", "3": "localhost:4003", }, - RTTMilliseconds: 200, - ElectionTicks: 20, - HeartbeatTicks: 0, + ReplicationFactor: 3, + RTTMilliseconds: 200, + ElectionTicks: 20, + HeartbeatTicks: 0, }, } require.NoError(t, expectedCfg.SetDefaults()) diff --git a/proto/cluster.proto b/proto/cluster.proto index fa954a1f42..69f2a9cb7a 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -23,8 +23,10 @@ message Storage { uint64 storage_id = 1; // name is the human-readable name of the storage. string name = 2; + // replication_factor defines the number of nodes where data of this storage are replicated. + uint64 replication_factor = 3; // replica_groups is a list of identifiers for the replica groups associated with this storage. - repeated uint64 replica_groups = 3; + repeated uint64 replica_groups = 4; } // LeaderState represents the current leader state of a Raft group. diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index b29434b7be..aae6d33122 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -100,8 +100,10 @@ type Storage struct { StorageId uint64 `protobuf:"varint,1,opt,name=storage_id,json=storageId,proto3" json:"storage_id,omitempty"` // name is the human-readable name of the storage. Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + // replication_factor defines the number of nodes where data of this storage are replicated. + ReplicationFactor uint64 `protobuf:"varint,3,opt,name=replication_factor,json=replicationFactor,proto3" json:"replication_factor,omitempty"` // replica_groups is a list of identifiers for the replica groups associated with this storage. - ReplicaGroups []uint64 `protobuf:"varint,3,rep,packed,name=replica_groups,json=replicaGroups,proto3" json:"replica_groups,omitempty"` + ReplicaGroups []uint64 `protobuf:"varint,4,rep,packed,name=replica_groups,json=replicaGroups,proto3" json:"replica_groups,omitempty"` } func (x *Storage) Reset() { @@ -150,6 +152,13 @@ func (x *Storage) GetName() string { return "" } +func (x *Storage) GetReplicationFactor() uint64 { + if x != nil { + return x.ReplicationFactor + } + return 0 +} + func (x *Storage) GetReplicaGroups() []uint64 { if x != nil { return x.ReplicaGroups @@ -537,12 +546,15 @@ var file_cluster_proto_rawDesc = []byte{ 0x01, 0x28, 0x04, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x25, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, - 0x02, 0x38, 0x01, 0x22, 0x63, 0x0a, 0x07, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1d, - 0x0a, 0x0a, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x09, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x49, 0x64, 0x12, 0x12, 0x0a, - 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, - 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x67, 0x72, 0x6f, - 0x75, 0x70, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x04, 0x52, 0x0d, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x02, 0x38, 0x01, 0x22, 0x92, 0x01, 0x0a, 0x07, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, + 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x04, 0x52, 0x09, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x49, 0x64, 0x12, 0x12, + 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x12, 0x2d, 0x0a, 0x12, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x5f, 0x66, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x11, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x61, 0x63, 0x74, 0x6f, + 0x72, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x5f, 0x67, 0x72, 0x6f, + 0x75, 0x70, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x04, 0x52, 0x0d, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x73, 0x22, 0x6f, 0x0a, 0x0b, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x67, 0x72, 0x6f, 0x75, 0x70, -- GitLab From 6be018c4a948a5bd688517ea6ddac1798aa8b6f6 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 12 Jul 2024 17:06:30 +0700 Subject: [PATCH 2/3] raft: Add replication factor to storage registration A prior commit added replication factor to Raft config. This commit attaches that config to the storage registration request. The metadata Raft group persists that config to each member node. It will be used for replica placement in some later commits. --- internal/gitaly/storage/raft/manager_test.go | 14 +++--- .../gitaly/storage/raft/metadata_group.go | 9 ++-- .../storage/raft/metadata_group_test.go | 8 ++-- .../storage/raft/metadata_statemachine.go | 5 +- .../raft/metadata_statemachine_test.go | 48 +++++++++++-------- .../gitaly/storage/raft/testhelper_test.go | 17 +++---- proto/cluster.proto | 2 + proto/go/gitalypb/cluster.pb.go | 33 ++++++++----- 8 files changed, 80 insertions(+), 56 deletions(-) diff --git a/internal/gitaly/storage/raft/manager_test.go b/internal/gitaly/storage/raft/manager_test.go index a7c9aade50..5c88e31a67 100644 --- a/internal/gitaly/storage/raft/manager_test.go +++ b/internal/gitaly/storage/raft/manager_test.go @@ -72,7 +72,7 @@ func TestManager_Start(t *testing.T) { ClusterId: cluster.clusterID, NextStorageId: 2, Storages: map[uint64]*gitalypb.Storage{ - 1: {StorageId: 1, Name: "storage-1"}, + 1: {StorageId: 1, Name: "storage-1", ReplicationFactor: 3}, }, }, clusterInfo) }) @@ -118,7 +118,7 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(numNode+1), clusterInfo.NextStorageId) require.Equal(t, &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), Name: storage.name, + StorageId: storage.id.ToUint64(), Name: storage.name, ReplicationFactor: 3, }, clusterInfo.Storages[storage.id.ToUint64()]) }) }) @@ -155,7 +155,7 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(3), clusterInfo.NextStorageId) require.Equal(t, &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), Name: storage.name, + StorageId: storage.id.ToUint64(), Name: storage.name, ReplicationFactor: 3, }, clusterInfo.Storages[storage.id.ToUint64()]) }) @@ -173,7 +173,7 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(4), clusterInfo.NextStorageId) require.Equal(t, &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), Name: storage.name, + StorageId: storage.id.ToUint64(), Name: storage.name, ReplicationFactor: 3, }, clusterInfo.Storages[storage.id.ToUint64()]) }) }) @@ -254,7 +254,7 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(3), clusterInfo.NextStorageId) require.Equal(t, &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), Name: storage.name, + StorageId: storage.id.ToUint64(), Name: storage.name, ReplicationFactor: 3, }, clusterInfo.Storages[storage.id.ToUint64()]) } }) @@ -287,7 +287,7 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(4), clusterInfo.NextStorageId) require.Equal(t, &gitalypb.Storage{ - StorageId: mgr.firstStorage.id.ToUint64(), Name: mgr.firstStorage.name, + StorageId: mgr.firstStorage.id.ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, }, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) }) }) @@ -320,7 +320,7 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(4), clusterInfo.NextStorageId) require.Equal(t, &gitalypb.Storage{ - StorageId: mgr.firstStorage.id.ToUint64(), Name: mgr.firstStorage.name, + StorageId: mgr.firstStorage.id.ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, }, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) }) }) diff --git a/internal/gitaly/storage/raft/metadata_group.go b/internal/gitaly/storage/raft/metadata_group.go index 405a37aaf8..9370482529 100644 --- a/internal/gitaly/storage/raft/metadata_group.go +++ b/internal/gitaly/storage/raft/metadata_group.go @@ -168,7 +168,7 @@ func (g *metadataRaftGroup) RegisterStorage(storageName string) (raftID, error) return 0, fmt.Errorf("storage %q already registered", storageName) } } - result, response, err := g.requestRegisterStorage(storageName) + result, response, err := g.requestRegisterStorage(storageName, g.clusterConfig.ReplicationFactor) if err != nil { return 0, fmt.Errorf("registering storage: %w", err) } @@ -236,7 +236,7 @@ func (g *metadataRaftGroup) requestBootstrapCluster() (updateResult, *gitalypb.B return requester.SyncWrite(g.ctx, &gitalypb.BootstrapClusterRequest{ClusterId: g.clusterConfig.ClusterID}) } -func (g *metadataRaftGroup) requestRegisterStorage(storageName string) (updateResult, *gitalypb.RegisterStorageResponse, error) { +func (g *metadataRaftGroup) requestRegisterStorage(storageName string, replicationFactor uint64) (updateResult, *gitalypb.RegisterStorageResponse, error) { requester := NewRequester[*gitalypb.RegisterStorageRequest, *gitalypb.RegisterStorageResponse]( g.nodeHost, g.groupID, g.logger, requestOption{ retry: defaultRetry, @@ -244,7 +244,10 @@ func (g *metadataRaftGroup) requestRegisterStorage(storageName string) (updateRe exponential: g.backoffProfile, }, ) - return requester.SyncWrite(g.ctx, &gitalypb.RegisterStorageRequest{StorageName: storageName}) + return requester.SyncWrite(g.ctx, &gitalypb.RegisterStorageRequest{ + StorageName: storageName, + ReplicationFactor: replicationFactor, + }) } func (g *metadataRaftGroup) getLeaderState() (*gitalypb.LeaderState, error) { diff --git a/internal/gitaly/storage/raft/metadata_group_test.go b/internal/gitaly/storage/raft/metadata_group_test.go index 266983104f..b4989096b6 100644 --- a/internal/gitaly/storage/raft/metadata_group_test.go +++ b/internal/gitaly/storage/raft/metadata_group_test.go @@ -292,9 +292,9 @@ func TestMetadataGroup_RegisterStorage(t *testing.T) { ClusterId: cluster.clusterID, NextStorageId: 4, Storages: map[uint64]*gitalypb.Storage{ - 1: {StorageId: 1, Name: "storage-2"}, - 2: {StorageId: 2, Name: "storage-4"}, - 3: {StorageId: 3, Name: "storage-6"}, + 1: {StorageId: 1, Name: "storage-2", ReplicationFactor: 3}, + 2: {StorageId: 2, Name: "storage-4", ReplicationFactor: 3}, + 3: {StorageId: 3, Name: "storage-6", ReplicationFactor: 3}, }, }, clusterInfo) } @@ -327,7 +327,7 @@ func TestMetadataGroup_RegisterStorage(t *testing.T) { ClusterId: cluster.clusterID, NextStorageId: 2, Storages: map[uint64]*gitalypb.Storage{ - 1: {StorageId: 1, Name: "storage-1"}, + 1: {StorageId: 1, Name: "storage-1", ReplicationFactor: 3}, }, }, clusterInfo) } diff --git a/internal/gitaly/storage/raft/metadata_statemachine.go b/internal/gitaly/storage/raft/metadata_statemachine.go index 2c826e3c90..9e78629d7a 100644 --- a/internal/gitaly/storage/raft/metadata_statemachine.go +++ b/internal/gitaly/storage/raft/metadata_statemachine.go @@ -210,8 +210,9 @@ func (s *metadataStateMachine) handleRegisterStorageRequest(req *gitalypb.Regist } newStorage := &gitalypb.Storage{ - StorageId: cluster.NextStorageId, - Name: req.StorageName, + StorageId: cluster.NextStorageId, + Name: req.GetStorageName(), + ReplicationFactor: req.GetReplicationFactor(), } cluster.Storages[cluster.NextStorageId] = newStorage cluster.NextStorageId++ diff --git a/internal/gitaly/storage/raft/metadata_statemachine_test.go b/internal/gitaly/storage/raft/metadata_statemachine_test.go index 26eaf2a3f0..9d91578783 100644 --- a/internal/gitaly/storage/raft/metadata_statemachine_test.go +++ b/internal/gitaly/storage/raft/metadata_statemachine_test.go @@ -228,8 +228,8 @@ func TestMetadataStateMachine_Update(t *testing.T) { requireLastApplied(t, sm, 1) result, err := sm.Update([]statemachine.Entry{ - {Index: 2, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name})}, - {Index: 3, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[1].Name})}, + {Index: 2, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name, ReplicationFactor: 3})}, + {Index: 3, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[1].Name, ReplicationFactor: 5})}, }) require.NoError(t, err) require.Equal(t, []statemachine.Entry{ @@ -237,8 +237,9 @@ func TestMetadataStateMachine_Update(t *testing.T) { Value: uint64(resultRegisterStorageSuccessful), Data: wrapSMMessage(t, &gitalypb.RegisterStorageResponse{ Storage: &gitalypb.Storage{ - StorageId: 1, - Name: cfg.Storages[0].Name, + StorageId: 1, + Name: cfg.Storages[0].Name, + ReplicationFactor: 3, }, }), }}, @@ -246,8 +247,9 @@ func TestMetadataStateMachine_Update(t *testing.T) { Value: uint64(resultRegisterStorageSuccessful), Data: wrapSMMessage(t, &gitalypb.RegisterStorageResponse{ Storage: &gitalypb.Storage{ - StorageId: 2, - Name: cfg.Storages[1].Name, + StorageId: 2, + Name: cfg.Storages[1].Name, + ReplicationFactor: 5, }, }), }}, @@ -259,12 +261,14 @@ func TestMetadataStateMachine_Update(t *testing.T) { NextStorageId: 3, Storages: map[uint64]*gitalypb.Storage{ 1: { - StorageId: 1, - Name: cfg.Storages[0].Name, + StorageId: 1, + Name: cfg.Storages[0].Name, + ReplicationFactor: 3, }, 2: { - StorageId: 2, - Name: cfg.Storages[1].Name, + StorageId: 2, + Name: cfg.Storages[1].Name, + ReplicationFactor: 5, }, }, }) @@ -285,8 +289,8 @@ func TestMetadataStateMachine_Update(t *testing.T) { requireLastApplied(t, sm, 1) result, err := sm.Update([]statemachine.Entry{ - {Index: 2, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name})}, - {Index: 3, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name})}, + {Index: 2, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name, ReplicationFactor: 3})}, + {Index: 3, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name, ReplicationFactor: 5})}, }) require.NoError(t, err) require.Equal(t, []statemachine.Entry{ @@ -294,8 +298,9 @@ func TestMetadataStateMachine_Update(t *testing.T) { Value: uint64(resultRegisterStorageSuccessful), Data: wrapSMMessage(t, &gitalypb.RegisterStorageResponse{ Storage: &gitalypb.Storage{ - StorageId: 1, - Name: cfg.Storages[0].Name, + StorageId: 1, + Name: cfg.Storages[0].Name, + ReplicationFactor: 3, }, }), }}, @@ -310,8 +315,9 @@ func TestMetadataStateMachine_Update(t *testing.T) { NextStorageId: 2, Storages: map[uint64]*gitalypb.Storage{ 1: { - StorageId: 1, - Name: cfg.Storages[0].Name, + StorageId: 1, + Name: cfg.Storages[0].Name, + ReplicationFactor: 3, }, }, }) @@ -329,7 +335,7 @@ func TestMetadataStateMachine_Update(t *testing.T) { require.NoError(t, err) result, err := sm.Update([]statemachine.Entry{ - {Index: 1, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name})}, + {Index: 1, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name, ReplicationFactor: 3})}, }) require.NoError(t, err) require.Equal(t, []statemachine.Entry{ @@ -459,8 +465,8 @@ func TestMetadataStateMachine_Lookup(t *testing.T) { bootstrapCluster(t, sm) _, err = sm.Update([]statemachine.Entry{ - {Index: 2, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name})}, - {Index: 3, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[1].Name})}, + {Index: 2, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[0].Name, ReplicationFactor: 3})}, + {Index: 3, Cmd: wrapSMMessage(t, &gitalypb.RegisterStorageRequest{StorageName: cfg.Storages[1].Name, ReplicationFactor: 5})}, }) require.NoError(t, err) @@ -471,8 +477,8 @@ func TestMetadataStateMachine_Lookup(t *testing.T) { ClusterId: "1234", NextStorageId: 3, Storages: map[uint64]*gitalypb.Storage{ - 1: {StorageId: 1, Name: cfg.Storages[0].Name}, - 2: {StorageId: 2, Name: cfg.Storages[1].Name}, + 1: {StorageId: 1, Name: cfg.Storages[0].Name, ReplicationFactor: 3}, + 2: {StorageId: 2, Name: cfg.Storages[1].Name, ReplicationFactor: 5}, }, }}, response) }) diff --git a/internal/gitaly/storage/raft/testhelper_test.go b/internal/gitaly/storage/raft/testhelper_test.go index 36b559a7c9..455d2bb5d5 100644 --- a/internal/gitaly/storage/raft/testhelper_test.go +++ b/internal/gitaly/storage/raft/testhelper_test.go @@ -218,14 +218,15 @@ func (c *testRaftCluster) createRaftConfig(node raftID) config.Raft { initialMembers[fmt.Sprintf("%d", node)] = addr } return config.Raft{ - Enabled: true, - ClusterID: c.clusterID, - NodeID: node.ToUint64(), - RaftAddr: c.initialMembers[node.ToUint64()], - InitialMembers: initialMembers, - RTTMilliseconds: config.RaftDefaultRTT, - ElectionTicks: config.RaftDefaultElectionTicks, - HeartbeatTicks: config.RaftDefaultHeartbeatTicks, + Enabled: true, + ClusterID: c.clusterID, + NodeID: node.ToUint64(), + RaftAddr: c.initialMembers[node.ToUint64()], + InitialMembers: initialMembers, + ReplicationFactor: 3, + RTTMilliseconds: config.RaftDefaultRTT, + ElectionTicks: config.RaftDefaultElectionTicks, + HeartbeatTicks: config.RaftDefaultHeartbeatTicks, } } diff --git a/proto/cluster.proto b/proto/cluster.proto index 69f2a9cb7a..a2a63150a9 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -69,6 +69,8 @@ message GetClusterResponse{ message RegisterStorageRequest { // storage_name is the human-readable name of the new storage. string storage_name = 1; + // replication_factor contains the replication factor of this storage. + uint64 replication_factor = 2; } // RegisterStorageResponse is the response message for registering a new storage in a cluster. diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index aae6d33122..f02523c494 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -438,6 +438,8 @@ type RegisterStorageRequest struct { // storage_name is the human-readable name of the new storage. StorageName string `protobuf:"bytes,1,opt,name=storage_name,json=storageName,proto3" json:"storage_name,omitempty"` + // replication_factor contains the replication factor of this storage. + ReplicationFactor uint64 `protobuf:"varint,2,opt,name=replication_factor,json=replicationFactor,proto3" json:"replication_factor,omitempty"` } func (x *RegisterStorageRequest) Reset() { @@ -479,6 +481,13 @@ func (x *RegisterStorageRequest) GetStorageName() string { return "" } +func (x *RegisterStorageRequest) GetReplicationFactor() uint64 { + if x != nil { + return x.ReplicationFactor + } + return 0 +} + // RegisterStorageResponse is the response message for registering a new storage in a cluster. type RegisterStorageResponse struct { state protoimpl.MessageState @@ -576,19 +585,21 @@ var file_cluster_proto_rawDesc = []byte{ 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x52, 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x22, 0x3b, 0x0a, 0x16, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x6f, 0x72, + 0x22, 0x6a, 0x0a, 0x16, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x44, 0x0a, - 0x17, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, - 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, - 0x61, 0x67, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, - 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x52, 0x0b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x2d, 0x0a, + 0x12, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x66, 0x61, 0x63, + 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x11, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x46, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x22, 0x44, 0x0a, 0x17, + 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( -- GitLab From 8be9ed3866cdc42255d7ee7c7436451e2577d535 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 16 Jul 2024 14:04:03 +0700 Subject: [PATCH 3/3] raft: Persist full storage info after registration Recently, after registration, only the storage ID is persisted in the storage's metadata DB. The metadata Raft group stores more information than just the storage ID such as replication_factor. As a result, we need to persist full storage information in the storage's metadata DB. This persisted info will be used to compare with the upstream info so that the storage is able to apply any changes. --- internal/gitaly/storage/raft/manager.go | 16 ++++-- internal/gitaly/storage/raft/manager_test.go | 57 ++++++++++++++----- .../gitaly/storage/raft/metadata_group.go | 16 +++--- .../storage/raft/metadata_group_test.go | 16 ++++-- internal/gitaly/storage/raft/storage.go | 48 ++++++++++------ 5 files changed, 106 insertions(+), 47 deletions(-) diff --git a/internal/gitaly/storage/raft/manager.go b/internal/gitaly/storage/raft/manager.go index fff76b445e..10513d2348 100644 --- a/internal/gitaly/storage/raft/manager.go +++ b/internal/gitaly/storage/raft/manager.go @@ -245,19 +245,23 @@ func (m *Manager) Start() (returnedErr error) { // Register storage ID if not exist. Similarly, this operation is handled by the metadata group. // It will be handled by the metadata authority in the future. for storageName, storageMgr := range m.storageManagers { - if err := storageMgr.loadStorageID(m.ctx); err != nil { - return fmt.Errorf("loading storage ID: %w", err) + if err := storageMgr.loadStorageInfo(m.ctx); err != nil { + return fmt.Errorf("loading persisted storage info: %w", err) } - if storageMgr.id == 0 { - id, err := m.metadataGroup.RegisterStorage(storageName) + if storageMgr.persistedInfo == nil || storageMgr.persistedInfo.GetStorageId() == 0 { + storageInfo, err := m.metadataGroup.RegisterStorage(storageName) if err != nil { return fmt.Errorf("registering storage ID: %w", err) } - if err := storageMgr.saveStorageID(m.ctx, id); err != nil { + if err := storageMgr.saveStorageInfo(m.ctx, storageInfo); err != nil { return fmt.Errorf("saving storage ID: %w", err) } } - m.logger.WithFields(log.Fields{"storage_name": storageName, "storage_id": storageMgr.id}).Info("storage joined the cluster") + m.logger.WithFields(log.Fields{ + "storage_name": storageName, + "storage_id": storageMgr.persistedInfo.GetStorageId(), + "replication_factor": storageMgr.persistedInfo.GetReplicationFactor(), + }).Info("storage joined the cluster") } m.logger.Info("Raft cluster has started") diff --git a/internal/gitaly/storage/raft/manager_test.go b/internal/gitaly/storage/raft/manager_test.go index 5c88e31a67..8d252b1e70 100644 --- a/internal/gitaly/storage/raft/manager_test.go +++ b/internal/gitaly/storage/raft/manager_test.go @@ -46,7 +46,7 @@ func TestManager_Start(t *testing.T) { resetManager := func(t *testing.T, m *Manager) { m.metadataGroup = nil for _, storageMgr := range m.storageManagers { - storageMgr.clearStorageID() + storageMgr.clearStorageInfo() storageMgr.nodeHost.Close() nodeHost, err := dragonboat.NewNodeHost(storageMgr.nodeHost.NodeHostConfig()) require.NoError(t, err) @@ -99,6 +99,11 @@ func TestManager_Start(t *testing.T) { fanOut(numNode, func(node raftID) { require.NoError(t, cluster.nodes[node].manager.Start()) + + storage := cluster.nodes[node].manager.firstStorage + require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.name, storage.persistedInfo.Name) + require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) }) var expectedIDs, allocatedIDs []raftID @@ -117,9 +122,10 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(numNode+1), clusterInfo.NextStorageId) - require.Equal(t, &gitalypb.Storage{ + expectedInfo := &gitalypb.Storage{ StorageId: storage.id.ToUint64(), Name: storage.name, ReplicationFactor: 3, - }, clusterInfo.Storages[storage.id.ToUint64()]) + } + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) }) }) }(numNode) @@ -137,6 +143,11 @@ func TestManager_Start(t *testing.T) { fanOut(2, func(node raftID) { require.NoError(t, cluster.nodes[node].manager.Start()) require.Equal(t, true, cluster.nodes[node].manager.Ready()) + + storage := cluster.nodes[node].manager.firstStorage + require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.name, storage.persistedInfo.Name) + require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) }) // The quorum is reached @@ -154,9 +165,10 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(3), clusterInfo.NextStorageId) - require.Equal(t, &gitalypb.Storage{ + expectedInfo := &gitalypb.Storage{ StorageId: storage.id.ToUint64(), Name: storage.name, ReplicationFactor: 3, - }, clusterInfo.Storages[storage.id.ToUint64()]) + } + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) }) // Now the third node joins. It does not matter whether the third node bootstraps the cluster. @@ -172,9 +184,10 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(4), clusterInfo.NextStorageId) - require.Equal(t, &gitalypb.Storage{ + expectedInfo := &gitalypb.Storage{ StorageId: storage.id.ToUint64(), Name: storage.name, ReplicationFactor: 3, - }, clusterInfo.Storages[storage.id.ToUint64()]) + } + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) }) }) }(bootstrap) @@ -226,6 +239,11 @@ func TestManager_Start(t *testing.T) { require.EqualError(t, cluster.nodes[node].manager.Start(), "registering storage ID: storage \"storage-2\" already registered") } else { require.NoError(t, cluster.nodes[node].manager.Start()) + + storage := cluster.nodes[node].manager.firstStorage + require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.name, storage.persistedInfo.Name) + require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) } if node != duplicatedNode { @@ -253,9 +271,10 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(3), clusterInfo.NextStorageId) - require.Equal(t, &gitalypb.Storage{ + expectedInfo := &gitalypb.Storage{ StorageId: storage.id.ToUint64(), Name: storage.name, ReplicationFactor: 3, - }, clusterInfo.Storages[storage.id.ToUint64()]) + } + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) } }) }) @@ -268,6 +287,11 @@ func TestManager_Start(t *testing.T) { fanOut(3, func(node raftID) { require.NoError(t, cluster.nodes[node].manager.Start()) + + storage := cluster.nodes[node].manager.firstStorage + require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.name, storage.persistedInfo.Name) + require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) }) for _, node := range cluster.nodes { @@ -286,9 +310,10 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(4), clusterInfo.NextStorageId) - require.Equal(t, &gitalypb.Storage{ + expectedInfo := &gitalypb.Storage{ StorageId: mgr.firstStorage.id.ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, - }, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) + } + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) }) }) @@ -300,6 +325,11 @@ func TestManager_Start(t *testing.T) { fanOut(3, func(node raftID) { require.NoError(t, cluster.nodes[node].manager.Start()) + + storage := cluster.nodes[node].manager.firstStorage + require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.name, storage.persistedInfo.Name) + require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) }) for _, node := range cluster.nodes { @@ -319,9 +349,10 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(4), clusterInfo.NextStorageId) - require.Equal(t, &gitalypb.Storage{ + expectedInfo := &gitalypb.Storage{ StorageId: mgr.firstStorage.id.ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, - }, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) + } + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) }) }) diff --git a/internal/gitaly/storage/raft/metadata_group.go b/internal/gitaly/storage/raft/metadata_group.go index 9370482529..61af0f91e7 100644 --- a/internal/gitaly/storage/raft/metadata_group.go +++ b/internal/gitaly/storage/raft/metadata_group.go @@ -157,35 +157,35 @@ func (g *metadataRaftGroup) tryBootstrap() (*gitalypb.Cluster, error) { // RegisterStorage requests the metadata group to allocate a unique ID for a new storage. The caller // is expected to persist the newly allocated ID. This ID is used for future interactions with the // Raft cluster. The storage name must be unique cluster-wide. -func (g *metadataRaftGroup) RegisterStorage(storageName string) (raftID, error) { +func (g *metadataRaftGroup) RegisterStorage(storageName string) (*gitalypb.Storage, error) { storageName = strings.TrimSpace(storageName) cluster, err := g.ClusterInfo() if err != nil { - return 0, err + return nil, err } for _, storage := range cluster.Storages { if storage.GetName() == storageName { - return 0, fmt.Errorf("storage %q already registered", storageName) + return nil, fmt.Errorf("storage %q already registered", storageName) } } result, response, err := g.requestRegisterStorage(storageName, g.clusterConfig.ReplicationFactor) if err != nil { - return 0, fmt.Errorf("registering storage: %w", err) + return nil, fmt.Errorf("registering storage: %w", err) } switch result { case resultRegisterStorageSuccessful: - return raftID(response.GetStorage().GetStorageId()), nil + return response.GetStorage(), nil case resultStorageAlreadyRegistered: // There's a chance that storage is registered by another node while firing this request. We // have no choice but reject this request. - return 0, fmt.Errorf("storage %q already registered", storageName) + return nil, fmt.Errorf("storage %q already registered", storageName) case resultRegisterStorageClusterNotBootstrappedYet: // Extremely rare occasion. This case occurs when the cluster information is wiped out of // the metadata group when the register storage request is in-flight. - return 0, fmt.Errorf("cluster has not been bootstrapped") + return nil, fmt.Errorf("cluster has not been bootstrapped") default: - return 0, fmt.Errorf("unsupported update result: %d", result) + return nil, fmt.Errorf("unsupported update result: %d", result) } } diff --git a/internal/gitaly/storage/raft/metadata_group_test.go b/internal/gitaly/storage/raft/metadata_group_test.go index b4989096b6..04bb74bcda 100644 --- a/internal/gitaly/storage/raft/metadata_group_test.go +++ b/internal/gitaly/storage/raft/metadata_group_test.go @@ -280,9 +280,13 @@ func TestMetadataGroup_RegisterStorage(t *testing.T) { groups := bootstrapCluster(t, cluster, ptnMgr) for i := raftID(1); i <= 3; i++ { - id, err := groups[i].RegisterStorage(fmt.Sprintf("storage-%d", 2*i)) + info, err := groups[i].RegisterStorage(fmt.Sprintf("storage-%d", 2*i)) require.NoError(t, err) - require.Equal(t, i, id) + require.Equal(t, &gitalypb.Storage{ + StorageId: uint64(i), + Name: fmt.Sprintf("storage-%d", 2*i), + ReplicationFactor: 3, + }, info) } for i := raftID(1); i <= 3; i++ { @@ -310,9 +314,13 @@ func TestMetadataGroup_RegisterStorage(t *testing.T) { ptnMgr := setupTestPartitionManager(t, cfg) groups := bootstrapCluster(t, cluster, ptnMgr) - id, err := groups[1].RegisterStorage("storage-1") + info, err := groups[1].RegisterStorage("storage-1") require.NoError(t, err) - require.Equal(t, raftID(1), id) + require.Equal(t, &gitalypb.Storage{ + StorageId: 1, + Name: "storage-1", + ReplicationFactor: 3, + }, info) _, err = groups[2].RegisterStorage("storage-1") require.EqualError(t, err, "storage \"storage-1\" already registered") diff --git a/internal/gitaly/storage/raft/storage.go b/internal/gitaly/storage/raft/storage.go index e6acefdb15..ed0f1f1a79 100644 --- a/internal/gitaly/storage/raft/storage.go +++ b/internal/gitaly/storage/raft/storage.go @@ -9,6 +9,8 @@ import ( "github.com/lni/dragonboat/v4" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/protobuf/proto" ) type dbAccessor func(context.Context, bool, func(keyvalue.ReadWriter) error) error @@ -17,10 +19,11 @@ type dbAccessor func(context.Context, bool, func(keyvalue.ReadWriter) error) err // keyvalue.Transactioner for each Raft group, allowing the Raft groups to store their data in the // underlying keyvalue store. type storageManager struct { - id raftID - name string - ptnMgr *storagemgr.PartitionManager - nodeHost *dragonboat.NodeHost + id raftID + name string + ptnMgr *storagemgr.PartitionManager + nodeHost *dragonboat.NodeHost + persistedInfo *gitalypb.Storage } // newStorageManager returns an instance of storage manager. @@ -35,10 +38,10 @@ func newStorageManager(name string, ptnMgr *storagemgr.PartitionManager, nodeHos // Close closes the storage manager. func (m *storageManager) Close() { m.nodeHost.Close() } -func (m *storageManager) loadStorageID(ctx context.Context) error { +func (m *storageManager) loadStorageInfo(ctx context.Context) error { db := m.dbForStorage() - return db(ctx, true, func(txn keyvalue.ReadWriter) error { - item, err := txn.Get([]byte("storage_id")) + return db(ctx, false, func(txn keyvalue.ReadWriter) error { + item, err := txn.Get([]byte("storage")) if err != nil { if errors.Is(err, badger.ErrKeyNotFound) { return nil @@ -46,32 +49,45 @@ func (m *storageManager) loadStorageID(ctx context.Context) error { return err } return item.Value(func(value []byte) error { - m.id.UnmarshalBinary(value) + var persistedInfo gitalypb.Storage + if err := proto.Unmarshal(value, &persistedInfo); err != nil { + return err + } + m.persistedInfo = &persistedInfo + m.id = raftID(m.persistedInfo.StorageId) return nil }) }) } -func (m *storageManager) saveStorageID(ctx context.Context, id raftID) error { +func (m *storageManager) saveStorageInfo(ctx context.Context, storage *gitalypb.Storage) error { db := m.dbForStorage() return db(ctx, false, func(txn keyvalue.ReadWriter) error { - _, err := txn.Get([]byte("storage_id")) + _, err := txn.Get([]byte("storage")) if err == nil { - return fmt.Errorf("storage ID already exists") + return fmt.Errorf("storage already exists") } else if !errors.Is(err, badger.ErrKeyNotFound) { return err } - if err := txn.Set([]byte("storage_id"), id.MarshalBinary()); err != nil { + marshaled, err := proto.Marshal(storage) + if err != nil { + return err + } + if err := txn.Set([]byte("storage"), marshaled); err != nil { return err } - m.id = id + m.persistedInfo = storage + m.id = raftID(m.persistedInfo.StorageId) return nil }) } -// clearStorageID clears the storage ID inside the in-memory storage of the storage manager. It does -// not clean the underlying storage ID. -func (m *storageManager) clearStorageID() { m.id = 0 } +// clearStorageInfo clears the storage info inside the in-memory storage of the storage manager. It +// does not clean the persisted info the DB. +func (m *storageManager) clearStorageInfo() { + m.id = 0 + m.persistedInfo = nil +} func (m *storageManager) dbForStorage() dbAccessor { return func(ctx context.Context, readOnly bool, fn func(keyvalue.ReadWriter) error) error { -- GitLab