From 5090b56506d760f4d03e477b107c98553971a21f Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 23 Jul 2024 13:59:25 +0700 Subject: [PATCH 01/14] raft: Add protobuf definitions related to Partition This commit adds some more definitions to Cluster protobuf. Those definitions relate to Partition and its corresponding operations such as RegisterPartition and GetRegisteredPartitions. --- proto/cluster.proto | 40 +++ proto/go/gitalypb/cluster.pb.go | 499 +++++++++++++++++++++++++++----- 2 files changed, 471 insertions(+), 68 deletions(-) diff --git a/proto/cluster.proto b/proto/cluster.proto index c7ad388960..6315b54498 100644 --- a/proto/cluster.proto +++ b/proto/cluster.proto @@ -17,6 +17,20 @@ message Cluster { map storages = 3; } +// Partition represents a partition in a Raft cluster. +message Partition { + // authority_name is the name of storage that creates this partition. + string authority_name = 1; + // authority_id is the unique ID of storage that creates this partition. The ID was allocated by + // the metadata Raft group. + uint64 authority_id = 2; + // partition_id is the partition ID of a partition *within the authority storage*. All + // repositories sharing a same origin have the same partition ID. + uint64 partition_id = 3; + // relative_path is the unique relative path of that partition. + string relative_path = 4; +} + // Storage represents a storage unit within a cluster. message Storage { // storage_id is the unique identifier for the storage. @@ -99,4 +113,30 @@ message UpdateStorageRequest { message UpdateStorageResponse { // storage contains the details of the newly updated storage. Storage storage = 1; +} + +// RegisterPartitionRequest is the request message to add a partition (repository + forks) to a +// storage for tracking. The destinating storage is not necessarily the same as the authority +// storage. For example, a partition is tracked by another storage on different node. +message RegisterPartitionRequest { + // partition is the registerting one. + Partition partition = 1; +} + +// RegisterPartitionResponse is the response message of RegisterPartitionRequest. +message RegisterPartitionResponse { + // partition contains info of registered partition. If the registering partition already exists, + // it contains the partition info inside the statemachine. + Partition partition = 1; +} + +// GetRegisteredPartitionsRequest is the request message to query for all registered partitions +// that a storage is tracking. +message GetRegisteredPartitionsRequest { +} + +// GetRegisteredPartitionsResponse is the response message of GetRegisteredPartitionsRequest. +message GetRegisteredPartitionsResponse { + // partitions is the list of registered partitions. + repeated Partition partitions = 1; } \ No newline at end of file diff --git a/proto/go/gitalypb/cluster.pb.go b/proto/go/gitalypb/cluster.pb.go index 399e251a9d..5e8ab97408 100644 --- a/proto/go/gitalypb/cluster.pb.go +++ b/proto/go/gitalypb/cluster.pb.go @@ -90,6 +90,84 @@ func (x *Cluster) GetStorages() map[uint64]*Storage { return nil } +// Partition represents a partition in a Raft cluster. +type Partition struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // authority_name is the name of storage that creates this partition. + AuthorityName string `protobuf:"bytes,1,opt,name=authority_name,json=authorityName,proto3" json:"authority_name,omitempty"` + // authority_id is the unique ID of storage that creates this partition. The ID was allocated by + // the metadata Raft group. + AuthorityId uint64 `protobuf:"varint,2,opt,name=authority_id,json=authorityId,proto3" json:"authority_id,omitempty"` + // partition_id is the partition ID of a partition *within the authority storage*. All + // repositories sharing a same origin have the same partition ID. + PartitionId uint64 `protobuf:"varint,3,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + // relative_path is the unique relative path of that partition. + RelativePath string `protobuf:"bytes,4,opt,name=relative_path,json=relativePath,proto3" json:"relative_path,omitempty"` +} + +func (x *Partition) Reset() { + *x = Partition{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Partition) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Partition) ProtoMessage() {} + +func (x *Partition) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Partition.ProtoReflect.Descriptor instead. +func (*Partition) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{1} +} + +func (x *Partition) GetAuthorityName() string { + if x != nil { + return x.AuthorityName + } + return "" +} + +func (x *Partition) GetAuthorityId() uint64 { + if x != nil { + return x.AuthorityId + } + return 0 +} + +func (x *Partition) GetPartitionId() uint64 { + if x != nil { + return x.PartitionId + } + return 0 +} + +func (x *Partition) GetRelativePath() string { + if x != nil { + return x.RelativePath + } + return "" +} + // Storage represents a storage unit within a cluster. type Storage struct { state protoimpl.MessageState @@ -111,7 +189,7 @@ type Storage struct { func (x *Storage) Reset() { *x = Storage{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[1] + mi := &file_cluster_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -124,7 +202,7 @@ func (x *Storage) String() string { func (*Storage) ProtoMessage() {} func (x *Storage) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[1] + mi := &file_cluster_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -137,7 +215,7 @@ func (x *Storage) ProtoReflect() protoreflect.Message { // Deprecated: Use Storage.ProtoReflect.Descriptor instead. func (*Storage) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{1} + return file_cluster_proto_rawDescGZIP(), []int{2} } func (x *Storage) GetStorageId() uint64 { @@ -196,7 +274,7 @@ type LeaderState struct { func (x *LeaderState) Reset() { *x = LeaderState{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[2] + mi := &file_cluster_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -209,7 +287,7 @@ func (x *LeaderState) String() string { func (*LeaderState) ProtoMessage() {} func (x *LeaderState) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[2] + mi := &file_cluster_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -222,7 +300,7 @@ func (x *LeaderState) ProtoReflect() protoreflect.Message { // Deprecated: Use LeaderState.ProtoReflect.Descriptor instead. func (*LeaderState) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{2} + return file_cluster_proto_rawDescGZIP(), []int{3} } func (x *LeaderState) GetGroupId() uint64 { @@ -266,7 +344,7 @@ type BootstrapClusterRequest struct { func (x *BootstrapClusterRequest) Reset() { *x = BootstrapClusterRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[3] + mi := &file_cluster_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -279,7 +357,7 @@ func (x *BootstrapClusterRequest) String() string { func (*BootstrapClusterRequest) ProtoMessage() {} func (x *BootstrapClusterRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[3] + mi := &file_cluster_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -292,7 +370,7 @@ func (x *BootstrapClusterRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use BootstrapClusterRequest.ProtoReflect.Descriptor instead. func (*BootstrapClusterRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{3} + return file_cluster_proto_rawDescGZIP(), []int{4} } func (x *BootstrapClusterRequest) GetClusterId() string { @@ -315,7 +393,7 @@ type BootstrapClusterResponse struct { func (x *BootstrapClusterResponse) Reset() { *x = BootstrapClusterResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[4] + mi := &file_cluster_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -328,7 +406,7 @@ func (x *BootstrapClusterResponse) String() string { func (*BootstrapClusterResponse) ProtoMessage() {} func (x *BootstrapClusterResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[4] + mi := &file_cluster_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -341,7 +419,7 @@ func (x *BootstrapClusterResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use BootstrapClusterResponse.ProtoReflect.Descriptor instead. func (*BootstrapClusterResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{4} + return file_cluster_proto_rawDescGZIP(), []int{5} } func (x *BootstrapClusterResponse) GetCluster() *Cluster { @@ -361,7 +439,7 @@ type GetClusterRequest struct { func (x *GetClusterRequest) Reset() { *x = GetClusterRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -374,7 +452,7 @@ func (x *GetClusterRequest) String() string { func (*GetClusterRequest) ProtoMessage() {} func (x *GetClusterRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[5] + mi := &file_cluster_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -387,7 +465,7 @@ func (x *GetClusterRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use GetClusterRequest.ProtoReflect.Descriptor instead. func (*GetClusterRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{5} + return file_cluster_proto_rawDescGZIP(), []int{6} } // GetClusterResponse is the response message for retrieving information about a cluster. @@ -403,7 +481,7 @@ type GetClusterResponse struct { func (x *GetClusterResponse) Reset() { *x = GetClusterResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -416,7 +494,7 @@ func (x *GetClusterResponse) String() string { func (*GetClusterResponse) ProtoMessage() {} func (x *GetClusterResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[6] + mi := &file_cluster_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -429,7 +507,7 @@ func (x *GetClusterResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use GetClusterResponse.ProtoReflect.Descriptor instead. func (*GetClusterResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{6} + return file_cluster_proto_rawDescGZIP(), []int{7} } func (x *GetClusterResponse) GetCluster() *Cluster { @@ -456,7 +534,7 @@ type RegisterStorageRequest struct { func (x *RegisterStorageRequest) Reset() { *x = RegisterStorageRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -469,7 +547,7 @@ func (x *RegisterStorageRequest) String() string { func (*RegisterStorageRequest) ProtoMessage() {} func (x *RegisterStorageRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[7] + mi := &file_cluster_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -482,7 +560,7 @@ func (x *RegisterStorageRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterStorageRequest.ProtoReflect.Descriptor instead. func (*RegisterStorageRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{7} + return file_cluster_proto_rawDescGZIP(), []int{8} } func (x *RegisterStorageRequest) GetStorageName() string { @@ -519,7 +597,7 @@ type RegisterStorageResponse struct { func (x *RegisterStorageResponse) Reset() { *x = RegisterStorageResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -532,7 +610,7 @@ func (x *RegisterStorageResponse) String() string { func (*RegisterStorageResponse) ProtoMessage() {} func (x *RegisterStorageResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[8] + mi := &file_cluster_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -545,7 +623,7 @@ func (x *RegisterStorageResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterStorageResponse.ProtoReflect.Descriptor instead. func (*RegisterStorageResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{8} + return file_cluster_proto_rawDescGZIP(), []int{9} } func (x *RegisterStorageResponse) GetStorage() *Storage { @@ -574,7 +652,7 @@ type UpdateStorageRequest struct { func (x *UpdateStorageRequest) Reset() { *x = UpdateStorageRequest{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -587,7 +665,7 @@ func (x *UpdateStorageRequest) String() string { func (*UpdateStorageRequest) ProtoMessage() {} func (x *UpdateStorageRequest) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[9] + mi := &file_cluster_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -600,7 +678,7 @@ func (x *UpdateStorageRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use UpdateStorageRequest.ProtoReflect.Descriptor instead. func (*UpdateStorageRequest) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{9} + return file_cluster_proto_rawDescGZIP(), []int{10} } func (x *UpdateStorageRequest) GetStorageId() uint64 { @@ -637,7 +715,7 @@ type UpdateStorageResponse struct { func (x *UpdateStorageResponse) Reset() { *x = UpdateStorageResponse{} if protoimpl.UnsafeEnabled { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -650,7 +728,7 @@ func (x *UpdateStorageResponse) String() string { func (*UpdateStorageResponse) ProtoMessage() {} func (x *UpdateStorageResponse) ProtoReflect() protoreflect.Message { - mi := &file_cluster_proto_msgTypes[10] + mi := &file_cluster_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -663,7 +741,7 @@ func (x *UpdateStorageResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use UpdateStorageResponse.ProtoReflect.Descriptor instead. func (*UpdateStorageResponse) Descriptor() ([]byte, []int) { - return file_cluster_proto_rawDescGZIP(), []int{10} + return file_cluster_proto_rawDescGZIP(), []int{11} } func (x *UpdateStorageResponse) GetStorage() *Storage { @@ -673,6 +751,196 @@ func (x *UpdateStorageResponse) GetStorage() *Storage { return nil } +// RegisterPartitionRequest is the request message to add a partition (repository + forks) to a +// storage for tracking. The destinating storage is not necessarily the same as the authority +// storage. For example, a partition is tracked by another storage on different node. +type RegisterPartitionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // partition is the registerting one. + Partition *Partition `protobuf:"bytes,1,opt,name=partition,proto3" json:"partition,omitempty"` +} + +func (x *RegisterPartitionRequest) Reset() { + *x = RegisterPartitionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterPartitionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterPartitionRequest) ProtoMessage() {} + +func (x *RegisterPartitionRequest) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterPartitionRequest.ProtoReflect.Descriptor instead. +func (*RegisterPartitionRequest) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{12} +} + +func (x *RegisterPartitionRequest) GetPartition() *Partition { + if x != nil { + return x.Partition + } + return nil +} + +// RegisterPartitionResponse is the response message of RegisterPartitionRequest. +type RegisterPartitionResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // partition contains info of registered partition. If the registering partition already exists, + // it contains the partition info inside the statemachine. + Partition *Partition `protobuf:"bytes,1,opt,name=partition,proto3" json:"partition,omitempty"` +} + +func (x *RegisterPartitionResponse) Reset() { + *x = RegisterPartitionResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterPartitionResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterPartitionResponse) ProtoMessage() {} + +func (x *RegisterPartitionResponse) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterPartitionResponse.ProtoReflect.Descriptor instead. +func (*RegisterPartitionResponse) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{13} +} + +func (x *RegisterPartitionResponse) GetPartition() *Partition { + if x != nil { + return x.Partition + } + return nil +} + +// GetRegisteredPartitionsRequest is the request message to query for all registered partitions +// that a storage is tracking. +type GetRegisteredPartitionsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetRegisteredPartitionsRequest) Reset() { + *x = GetRegisteredPartitionsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[14] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetRegisteredPartitionsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRegisteredPartitionsRequest) ProtoMessage() {} + +func (x *GetRegisteredPartitionsRequest) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[14] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRegisteredPartitionsRequest.ProtoReflect.Descriptor instead. +func (*GetRegisteredPartitionsRequest) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{14} +} + +// GetRegisteredPartitionsResponse is the response message of GetRegisteredPartitionsRequest. +type GetRegisteredPartitionsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // partitions is the list of registered partitions. + Partitions []*Partition `protobuf:"bytes,1,rep,name=partitions,proto3" json:"partitions,omitempty"` +} + +func (x *GetRegisteredPartitionsResponse) Reset() { + *x = GetRegisteredPartitionsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_cluster_proto_msgTypes[15] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetRegisteredPartitionsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRegisteredPartitionsResponse) ProtoMessage() {} + +func (x *GetRegisteredPartitionsResponse) ProtoReflect() protoreflect.Message { + mi := &file_cluster_proto_msgTypes[15] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRegisteredPartitionsResponse.ProtoReflect.Descriptor instead. +func (*GetRegisteredPartitionsResponse) Descriptor() ([]byte, []int) { + return file_cluster_proto_rawDescGZIP(), []int{15} +} + +func (x *GetRegisteredPartitionsResponse) GetPartitions() []*Partition { + if x != nil { + return x.Partitions + } + return nil +} + var File_cluster_proto protoreflect.FileDescriptor var file_cluster_proto_rawDesc = []byte{ @@ -691,7 +959,17 @@ var file_cluster_proto_rawDesc = []byte{ 0x01, 0x28, 0x04, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x25, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, - 0x02, 0x38, 0x01, 0x22, 0xab, 0x01, 0x0a, 0x07, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, + 0x02, 0x38, 0x01, 0x22, 0x9d, 0x01, 0x0a, 0x09, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x5f, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x61, 0x75, 0x74, 0x68, 0x6f, + 0x72, 0x69, 0x74, 0x79, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x75, 0x74, 0x68, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, + 0x61, 0x75, 0x74, 0x68, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x70, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x23, + 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, + 0x61, 0x74, 0x68, 0x22, 0xab, 0x01, 0x0a, 0x07, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, @@ -747,11 +1025,28 @@ var file_cluster_proto_rawDesc = []byte{ 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x52, - 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, - 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, - 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x22, 0x4b, 0x0a, 0x18, 0x52, 0x65, 0x67, 0x69, + 0x73, 0x74, 0x65, 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x2f, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x4c, 0x0a, 0x19, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x2f, 0x0a, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, + 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x22, 0x20, 0x0a, 0x1e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, + 0x65, 0x72, 0x65, 0x64, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x54, 0x0a, 0x1f, 0x47, 0x65, 0x74, 0x52, 0x65, 0x67, 0x69, + 0x73, 0x74, 0x65, 0x72, 0x65, 0x64, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x31, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, + 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x34, 0x5a, 0x32, 0x67, + 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, + 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, + 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -766,33 +1061,41 @@ func file_cluster_proto_rawDescGZIP() []byte { return file_cluster_proto_rawDescData } -var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_cluster_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_cluster_proto_goTypes = []any{ - (*Cluster)(nil), // 0: gitaly.Cluster - (*Storage)(nil), // 1: gitaly.Storage - (*LeaderState)(nil), // 2: gitaly.LeaderState - (*BootstrapClusterRequest)(nil), // 3: gitaly.BootstrapClusterRequest - (*BootstrapClusterResponse)(nil), // 4: gitaly.BootstrapClusterResponse - (*GetClusterRequest)(nil), // 5: gitaly.GetClusterRequest - (*GetClusterResponse)(nil), // 6: gitaly.GetClusterResponse - (*RegisterStorageRequest)(nil), // 7: gitaly.RegisterStorageRequest - (*RegisterStorageResponse)(nil), // 8: gitaly.RegisterStorageResponse - (*UpdateStorageRequest)(nil), // 9: gitaly.UpdateStorageRequest - (*UpdateStorageResponse)(nil), // 10: gitaly.UpdateStorageResponse - nil, // 11: gitaly.Cluster.StoragesEntry + (*Cluster)(nil), // 0: gitaly.Cluster + (*Partition)(nil), // 1: gitaly.Partition + (*Storage)(nil), // 2: gitaly.Storage + (*LeaderState)(nil), // 3: gitaly.LeaderState + (*BootstrapClusterRequest)(nil), // 4: gitaly.BootstrapClusterRequest + (*BootstrapClusterResponse)(nil), // 5: gitaly.BootstrapClusterResponse + (*GetClusterRequest)(nil), // 6: gitaly.GetClusterRequest + (*GetClusterResponse)(nil), // 7: gitaly.GetClusterResponse + (*RegisterStorageRequest)(nil), // 8: gitaly.RegisterStorageRequest + (*RegisterStorageResponse)(nil), // 9: gitaly.RegisterStorageResponse + (*UpdateStorageRequest)(nil), // 10: gitaly.UpdateStorageRequest + (*UpdateStorageResponse)(nil), // 11: gitaly.UpdateStorageResponse + (*RegisterPartitionRequest)(nil), // 12: gitaly.RegisterPartitionRequest + (*RegisterPartitionResponse)(nil), // 13: gitaly.RegisterPartitionResponse + (*GetRegisteredPartitionsRequest)(nil), // 14: gitaly.GetRegisteredPartitionsRequest + (*GetRegisteredPartitionsResponse)(nil), // 15: gitaly.GetRegisteredPartitionsResponse + nil, // 16: gitaly.Cluster.StoragesEntry } var file_cluster_proto_depIdxs = []int32{ - 11, // 0: gitaly.Cluster.storages:type_name -> gitaly.Cluster.StoragesEntry + 16, // 0: gitaly.Cluster.storages:type_name -> gitaly.Cluster.StoragesEntry 0, // 1: gitaly.BootstrapClusterResponse.cluster:type_name -> gitaly.Cluster 0, // 2: gitaly.GetClusterResponse.cluster:type_name -> gitaly.Cluster - 1, // 3: gitaly.RegisterStorageResponse.storage:type_name -> gitaly.Storage - 1, // 4: gitaly.UpdateStorageResponse.storage:type_name -> gitaly.Storage - 1, // 5: gitaly.Cluster.StoragesEntry.value:type_name -> gitaly.Storage - 6, // [6:6] is the sub-list for method output_type - 6, // [6:6] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 2, // 3: gitaly.RegisterStorageResponse.storage:type_name -> gitaly.Storage + 2, // 4: gitaly.UpdateStorageResponse.storage:type_name -> gitaly.Storage + 1, // 5: gitaly.RegisterPartitionRequest.partition:type_name -> gitaly.Partition + 1, // 6: gitaly.RegisterPartitionResponse.partition:type_name -> gitaly.Partition + 1, // 7: gitaly.GetRegisteredPartitionsResponse.partitions:type_name -> gitaly.Partition + 2, // 8: gitaly.Cluster.StoragesEntry.value:type_name -> gitaly.Storage + 9, // [9:9] is the sub-list for method output_type + 9, // [9:9] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_cluster_proto_init() } @@ -814,7 +1117,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*Storage); i { + switch v := v.(*Partition); i { case 0: return &v.state case 1: @@ -826,7 +1129,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*LeaderState); i { + switch v := v.(*Storage); i { case 0: return &v.state case 1: @@ -838,7 +1141,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*BootstrapClusterRequest); i { + switch v := v.(*LeaderState); i { case 0: return &v.state case 1: @@ -850,7 +1153,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*BootstrapClusterResponse); i { + switch v := v.(*BootstrapClusterRequest); i { case 0: return &v.state case 1: @@ -862,7 +1165,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*GetClusterRequest); i { + switch v := v.(*BootstrapClusterResponse); i { case 0: return &v.state case 1: @@ -874,7 +1177,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*GetClusterResponse); i { + switch v := v.(*GetClusterRequest); i { case 0: return &v.state case 1: @@ -886,7 +1189,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[7].Exporter = func(v any, i int) any { - switch v := v.(*RegisterStorageRequest); i { + switch v := v.(*GetClusterResponse); i { case 0: return &v.state case 1: @@ -898,7 +1201,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[8].Exporter = func(v any, i int) any { - switch v := v.(*RegisterStorageResponse); i { + switch v := v.(*RegisterStorageRequest); i { case 0: return &v.state case 1: @@ -910,7 +1213,7 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[9].Exporter = func(v any, i int) any { - switch v := v.(*UpdateStorageRequest); i { + switch v := v.(*RegisterStorageResponse); i { case 0: return &v.state case 1: @@ -922,6 +1225,18 @@ func file_cluster_proto_init() { } } file_cluster_proto_msgTypes[10].Exporter = func(v any, i int) any { + switch v := v.(*UpdateStorageRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[11].Exporter = func(v any, i int) any { switch v := v.(*UpdateStorageResponse); i { case 0: return &v.state @@ -933,6 +1248,54 @@ func file_cluster_proto_init() { return nil } } + file_cluster_proto_msgTypes[12].Exporter = func(v any, i int) any { + switch v := v.(*RegisterPartitionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[13].Exporter = func(v any, i int) any { + switch v := v.(*RegisterPartitionResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[14].Exporter = func(v any, i int) any { + switch v := v.(*GetRegisteredPartitionsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_cluster_proto_msgTypes[15].Exporter = func(v any, i int) any { + switch v := v.(*GetRegisteredPartitionsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -940,7 +1303,7 @@ func file_cluster_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_cluster_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 17, NumExtensions: 0, NumServices: 0, }, -- GitLab From 1ee3aeac25ac7d25964357eae289e5c54a5daf06 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 23 Jul 2024 14:00:12 +0700 Subject: [PATCH 02/14] raft: Implement statemachine for replication Raft groups --- .../storage/raft/replication_statemachine.go | 284 ++++++++++++++++++ 1 file changed, 284 insertions(+) create mode 100644 internal/gitaly/storage/raft/replication_statemachine.go diff --git a/internal/gitaly/storage/raft/replication_statemachine.go b/internal/gitaly/storage/raft/replication_statemachine.go new file mode 100644 index 0000000000..e7de8f958d --- /dev/null +++ b/internal/gitaly/storage/raft/replication_statemachine.go @@ -0,0 +1,284 @@ +package raft + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/dgraph-io/badger/v4" + "github.com/lni/dragonboat/v4/statemachine" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/protobuf/proto" +) + +// partitionOpType is an operation type, denoting the operation to perform for a partition +// managed by this Raft group. +type partitionOpType byte + +const ( + // opTrackPartition tells the observer to track a partition. There are two occasions: + // - When the Raft group is restarted. + // - When the Raft group detects a new partition creation. + opTrackPartition = partitionOpType(iota) +) + +// partitionOperation defines an operation to perform on a partition. From the scope of replication +// metadata, only partition creation and deletion are supported. +type partitionOperation struct { + op partitionOpType + partition *gitalypb.Partition +} + +const ( + resultRegisterPartitionSuccessfully = updateResult(iota) + resultRegisterPartitionExisted +) + +var keyPartitionsList = []byte("partitions") + +// replicationStateMachine is a state machine that manages the data for a replication Raft group. +// It manages two types of objects: +// - If the replication Raft group is for an authority, the state machine manages the partitions +// created by that authority storage. +// - If the replication Raft group is for a replica, the state machine manages the partitions the +// replica should manage. +// The statemachine triggers the configured callback when it receives new creation/deletion +// operations. +type replicationStateMachine struct { + ctx context.Context + db dbAccessor + authority bool + storageID raftID + callback func([]*partitionOperation) + callbackOps []*partitionOperation +} + +// Open initializes the replication state machine and returns the last applied index. The data are +// persisted to disk by the keyvalue package, so we don't maintain a separate in-memory +// representation here. +func (s *replicationStateMachine) Open(stopC <-chan struct{}) (uint64, error) { + lastApplied, err := s.LastApplied() + if err != nil { + return 0, fmt.Errorf("reading last index from DB: %w", err) + } + + select { + case <-stopC: + return 0, statemachine.ErrOpenStopped + default: + return lastApplied.ToUint64(), nil + } +} + +// LastApplied returns the last applied index of the state machine. +func (s *replicationStateMachine) LastApplied() (lastApplied raftID, err error) { + return lastApplied, s.db.read(s.ctx, func(txn keyvalue.ReadWriter) error { + lastApplied, err = s.getLastIndex(txn) + if err != nil { + err = fmt.Errorf("getting last index from DB: %w", err) + } + return err + }) +} + +// Update applies entry to replication group. It supports the only operation now: +// - *gitalypb.RegisterPartitionRequest +func (s *replicationStateMachine) Update(entries []statemachine.Entry) ([]statemachine.Entry, error) { + defer func() { + // Always reset the list of operations for callback, regardless of the update result. + s.callbackOps = []*partitionOperation{} + }() + var returnedEntries []statemachine.Entry + + if err := s.db.write(s.ctx, func(txn keyvalue.ReadWriter) error { + entries, err := s.update(txn, entries) + if err != nil { + return err + } + returnedEntries = entries + return nil + }); err != nil { + return nil, err + } + + if len(s.callbackOps) > 0 { + s.callback(s.callbackOps) + } + return returnedEntries, nil +} + +func (s *replicationStateMachine) update(txn keyvalue.ReadWriter, entries []statemachine.Entry) (_ []statemachine.Entry, returnedErr error) { + lastApplied, err := s.getLastIndex(txn) + if err != nil { + return nil, fmt.Errorf("reading last index from DB: %w", err) + } + + var returnedEntries []statemachine.Entry + for _, entry := range entries { + if lastApplied >= raftID(entry.Index) { + return nil, fmt.Errorf("log entry with previously applied index, last applied %d entry index %d", lastApplied, entry.Index) + } + result, err := s.updateEntry(txn, &entry) + if err != nil { + return nil, fmt.Errorf("updating entry index %d: %w", entry.Index, err) + } + returnedEntries = append(returnedEntries, statemachine.Entry{ + Index: entry.Index, + Result: *result, + }) + lastApplied = raftID(entry.Index) + } + if err := txn.Set(keyLastApplied, lastApplied.MarshalBinary()); err != nil { + return nil, fmt.Errorf("setting last index: %w", err) + } + return returnedEntries, nil +} + +func (s *replicationStateMachine) updateEntry(txn keyvalue.ReadWriter, entry *statemachine.Entry) (*statemachine.Result, error) { + var result statemachine.Result + + msg, err := anyProtoUnmarshal(entry.Cmd) + if err != nil { + return nil, fmt.Errorf("unmarshalling command: %w", err) + } + + switch req := msg.(type) { + case *gitalypb.RegisterPartitionRequest: + returnedPartition, err := s.upsertPartition(txn, req.GetPartition()) + if err != nil { + return nil, fmt.Errorf("handling RegisterPartitionRequest: %w", err) + } + + if returnedPartition == nil { + returnedPartition = req.GetPartition() + result.Value = uint64(resultRegisterPartitionSuccessfully) + + s.callbackOps = append(s.callbackOps, &partitionOperation{ + op: opTrackPartition, + partition: returnedPartition, + }) + } else { + result.Value = uint64(resultRegisterPartitionExisted) + } + + response, err := anyProtoMarshal(&gitalypb.RegisterPartitionResponse{Partition: returnedPartition}) + if err != nil { + return nil, fmt.Errorf("marshaling RegisterPartitionResponse: %w", err) + } + result.Data = response + + return &result, nil + default: + return nil, fmt.Errorf("request not supported: %s", msg.ProtoReflect().Descriptor().Name()) + } +} + +// Lookup queries the state machine. It supports the only operation now: +// - *gitalypb.GetRegisteredPartitionsRequest +func (s *replicationStateMachine) Lookup(cmd interface{}) (interface{}, error) { + switch cmd.(type) { + case *gitalypb.GetRegisteredPartitionsRequest: + response := &gitalypb.GetRegisteredPartitionsResponse{} + + if err := s.db.read(s.ctx, func(txn keyvalue.ReadWriter) error { + it := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte(fmt.Sprintf("%s/", keyPartitionsList)), + }) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + if err := it.Item().Value(func(value []byte) error { + var partition gitalypb.Partition + if err := proto.Unmarshal(value, &partition); err != nil { + return fmt.Errorf("unmarshalling partition from DB: %w", err) + } + response.Partitions = append(response.Partitions, &partition) + + return nil + }); err != nil { + return fmt.Errorf("iterating through partitions: %w", err) + } + } + return nil + }); err != nil { + return nil, fmt.Errorf("reading registered partitions: %w", err) + } + + return response, nil + default: + return nil, fmt.Errorf("request not supported: %T", cmd) + } +} + +// Sync is a no-op because our DB flushes to disk on commit. +func (s *replicationStateMachine) Sync() error { return nil } + +// PrepareSnapshot is a no-op until we start supporting snapshots. +func (s *replicationStateMachine) PrepareSnapshot() (interface{}, error) { + return nil, fmt.Errorf("PrepareSnapshot hasn't been not supported") +} + +// SaveSnapshot is a no-op until we start supporting snapshots. +func (s *replicationStateMachine) SaveSnapshot(_ interface{}, _ io.Writer, _ <-chan struct{}) error { + return fmt.Errorf("SaveSnapshot hasn't been not supported") +} + +// RecoverFromSnapshot is a no-op until we start supporting snapshots. +func (s *replicationStateMachine) RecoverFromSnapshot(_ io.Reader, _ <-chan struct{}) error { + return fmt.Errorf("RecoverFromSnapshot hasn't been not supported") +} + +// Close is a no-op because our DB is managed externally. +func (s *replicationStateMachine) Close() error { return nil } + +func (s *replicationStateMachine) getLastIndex(txn keyvalue.ReadWriter) (raftID, error) { + var appliedIndex raftID + + item, err := txn.Get(keyLastApplied) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return 0, nil + } + return 0, err + } + return appliedIndex, item.Value(func(value []byte) error { + appliedIndex.UnmarshalBinary(value) + return nil + }) +} + +func (s *replicationStateMachine) upsertPartition(txn keyvalue.ReadWriter, partition *gitalypb.Partition) (*gitalypb.Partition, error) { + key := s.partitionKey(partition) + item, err := txn.Get(key) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + marshaledPartition, err := proto.Marshal(partition) + if err != nil { + return nil, fmt.Errorf("marshaling partition: %w", err) + } + if err := txn.Set(key, marshaledPartition); err != nil { + return nil, fmt.Errorf("saving partition %d of authority storage %d: %w", partition.GetPartitionId(), partition.GetAuthorityId(), err) + } + return nil, nil + } + return nil, err + } + + var existingPartition gitalypb.Partition + return &existingPartition, item.Value(func(value []byte) error { + return proto.Unmarshal(value, &existingPartition) + }) +} + +func (s *replicationStateMachine) partitionKey(partition *gitalypb.Partition) []byte { + return []byte(fmt.Sprintf("%s/%s", keyPartitionsList, raftID(partition.GetPartitionId()).MarshalBinary())) +} + +var _ = Statemachine(&replicationStateMachine{}) + +// newReplicationStateMachine returns a state machine that manages data for a replication Raft group. +func newReplicationStateMachine(ctx context.Context, db dbAccessor, authority bool, storageID raftID, callback func([]*partitionOperation)) *replicationStateMachine { + return &replicationStateMachine{ctx: ctx, db: db, authority: authority, storageID: storageID, callback: callback} +} -- GitLab From 693027157589b937b2137f81ec54f55c2869bd04 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Wed, 31 Jul 2024 13:12:57 +0700 Subject: [PATCH 03/14] raft: Implement replication Raft group --- internal/gitaly/storage/raft/ids.go | 13 +- internal/gitaly/storage/raft/manager.go | 3 + .../gitaly/storage/raft/metadata_group.go | 5 + .../gitaly/storage/raft/replication_group.go | 314 ++++++++++++++++++ 4 files changed, 332 insertions(+), 3 deletions(-) create mode 100644 internal/gitaly/storage/raft/replication_group.go diff --git a/internal/gitaly/storage/raft/ids.go b/internal/gitaly/storage/raft/ids.go index b7bfed58b8..d82c56fcd1 100644 --- a/internal/gitaly/storage/raft/ids.go +++ b/internal/gitaly/storage/raft/ids.go @@ -10,9 +10,6 @@ import ( // internally. In Gitaly, "group" is used exclusively to refer to a Raft group. type raftID uint64 -// MetadataGroupID is a hard-coded ID of the cluster-wide metadata Raft group. -const MetadataGroupID = raftID(1) - // MarshalBinary returns a binary representation of the raftID. func (id raftID) MarshalBinary() []byte { marshaled := make([]byte, binary.Size(id)) @@ -40,3 +37,13 @@ func (id raftID) ToUint64() uint64 { // needs to perform all necessary checks beforehand. However, the state machine must perform state // validation at its layer. The result is propagated to the caller to handle respectively. type updateResult uint64 + +// MetadataGroupID is a hard-coded ID of the cluster-wide metadata Raft group. +const MetadataGroupID = raftID(1) + +// AuthorityGroupID returns the ID of the Raft group that manages the partitions created by the +// input storage. The ID has the form "XX000000", in which the first two bytes are the storage ID. +// The cluster has the maximum of 2^16=65536 storages. +func AuthorityGroupID(storageID raftID) raftID { + return storageID << 48 +} diff --git a/internal/gitaly/storage/raft/manager.go b/internal/gitaly/storage/raft/manager.go index 6a068013da..0b79d5673a 100644 --- a/internal/gitaly/storage/raft/manager.go +++ b/internal/gitaly/storage/raft/manager.go @@ -248,6 +248,9 @@ func (m *Manager) Close() { } defer m.closed.Store(true) + if err := m.metadataGroup.Close(); err != nil { + m.logger.WithError(err).Warn("fail to stop metadata Raft group") + } for _, storageMgr := range m.storageManagers { storageMgr.Close() } diff --git a/internal/gitaly/storage/raft/metadata_group.go b/internal/gitaly/storage/raft/metadata_group.go index 545bd6103e..1971b12f19 100644 --- a/internal/gitaly/storage/raft/metadata_group.go +++ b/internal/gitaly/storage/raft/metadata_group.go @@ -233,6 +233,11 @@ func (g *metadataRaftGroup) WaitReady() error { return WaitGroupReady(g.ctx, g.nodeHost, g.groupID) } +// Close closes the Raft group. +func (g *metadataRaftGroup) Close() error { + return g.nodeHost.StopReplica(g.groupConfig.ShardID, g.groupConfig.ReplicaID) +} + func (g *metadataRaftGroup) maxHeartbeatWait() time.Duration { return time.Millisecond * time.Duration(g.groupConfig.HeartbeatRTT*g.nodeHost.NodeHostConfig().RTTMillisecond) } diff --git a/internal/gitaly/storage/raft/replication_group.go b/internal/gitaly/storage/raft/replication_group.go new file mode 100644 index 0000000000..b4b0b70bcc --- /dev/null +++ b/internal/gitaly/storage/raft/replication_group.go @@ -0,0 +1,314 @@ +package raft + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/lni/dragonboat/v4" + dragonboatConf "github.com/lni/dragonboat/v4/config" + "github.com/lni/dragonboat/v4/statemachine" + "gitlab.com/gitlab-org/gitaly/v16/internal/backoff" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// replicationRaftGroup is a Raft group that either manages the partitions created by the input +// storage or manages the partitions replicated from another storage. +// This type of Raft group is supposted to be establish one-way connection between an authority to +// its replicas. Each related node starts a replication Raft group as follows: +// - The group member on authority storage always holds the "leader" role. +// - The group members on replicas always hold the "non-voting" role. They can sync the list of +// partitions created by the authority but don't involve in the election. +// +// All participants join the same group ID, generated from the authority's ID. This approach +// allows the authority to communicate partition's creation and deletion events to its replicas +// reliably. The list of partitions are persisted in the state machines of each replication Raft +// group member. If the authority if offline, no further partitions are created in that authority +// storage; the clients must find another authority. +// +// This group provides some functions that allow the outsider to access the list of partitions at +// startup as well as any partition updates along the way. +// +// The startup order of members of this group matters. The group leader, aka authority, must start +// the group first. It thens registers the list of replicas to the group. Finally, replica group +// members join the group. +type replicationRaftGroup struct { + Group + sync.Mutex + cancel context.CancelFunc + backoffProfile *backoff.Exponential + + // authority tells if this Raft group is for an authority or not. + authority bool + storageID raftID + storageName string + + initialized bool + // pendingOperations contains the list of partitions waiting for processed. + pendingOperations []*partitionOperation + // newOperation is a channel telling if there is a new operation. + newOperation chan struct{} +} + +// newReplicationGroup starts an instance of the replication Raft group. +func newReplicationGroup( + ctx context.Context, + authority bool, + storage *gitalypb.Storage, + nodeHost *dragonboat.NodeHost, + db dbAccessor, + clusterCfg config.Raft, + logger log.Logger, +) (*replicationRaftGroup, error) { + ctx, cancel := context.WithCancel(ctx) + + // Both authority and replicas share the same group ID, which is generated from the authority ID. + groupID := AuthorityGroupID(raftID(storage.GetStorageId())) + groupCfg := dragonboatConf.Config{ + ReplicaID: clusterCfg.NodeID, + ShardID: groupID.ToUint64(), + ElectionRTT: clusterCfg.ElectionTicks, + HeartbeatRTT: clusterCfg.HeartbeatTicks, + // Replicas must have non-voting role. + IsNonVoting: !authority, + WaitReady: true, + } + + backoffProfile := backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))) + backoffProfile.BaseDelay = time.Duration(clusterCfg.ElectionTicks) * time.Duration(clusterCfg.RTTMilliseconds) * time.Microsecond + + groupLogger := logger.WithFields(log.Fields{ + "raft.group": "replication", + "raft.group_id": groupID, + "raft.replica_id": clusterCfg.NodeID, + "raft.authority": authority, + "raft.storage_id": storage.GetStorageId(), + "raft.storage_name": storage.GetName(), + }) + + group := &replicationRaftGroup{ + Group: Group{ + ctx: ctx, + groupID: groupID, + replicaID: raftID(clusterCfg.NodeID), + clusterConfig: clusterCfg, + groupConfig: groupCfg, + logger: groupLogger, + nodeHost: nodeHost, + }, + cancel: cancel, + storageID: raftID(storage.GetStorageId()), + storageName: storage.GetName(), + authority: authority, + backoffProfile: backoffProfile, + newOperation: make(chan struct{}, 1), + } + + initialMembers := map[uint64]string{} + // The authority group is the only initial member of this group. Replicated groups don't need the list of + // initial members. They join the group as non-voting members. + if authority { + initialMembers[clusterCfg.NodeID] = clusterCfg.RaftAddr + } + + if err := nodeHost.StartOnDiskReplica(initialMembers, !authority, func(_groupID, _replicaID uint64) statemachine.IOnDiskStateMachine { + return newReplicationStateMachine(ctx, db, authority, raftID(storage.GetStorageId()), group.appendPartitionOps) + }, groupCfg); err != nil { + return nil, fmt.Errorf("joining replication group: %w", err) + } + + return group, nil +} + +// Stop stops the group by cancelling the context and closing connecting channels. +func (g *replicationRaftGroup) Close() (returnedErr error) { + g.cancel() + if err := g.nodeHost.StopReplica(g.groupConfig.ShardID, g.groupConfig.ReplicaID); err != nil { + returnedErr = err + } + close(g.newOperation) + return +} + +// PollPartitions returns the list of pending operations. If there is any partition, the list +// returns immediately. Otherwise, the caller is blocked until new operation is added or the context +// is cancelled. +// If this function is called for the first time after the group has just started, it returns all +// partitions in the statemachine. Subsequent calls return new updates from the authority member. +func (g *replicationRaftGroup) PollPartitions() ([]*partitionOperation, error) { + if err := g.initialize(); err != nil { + return nil, fmt.Errorf("initializing replication Raft group") + } + + for { + g.Lock() + // If there is any pending operation, returns immediately. + if len(g.pendingOperations) > 0 { + partitions := g.pendingOperations + g.pendingOperations = []*partitionOperation{} + g.Unlock() + return partitions, nil + } + g.Unlock() + + // Other wise, block until then. + select { + case <-g.ctx.Done(): + return nil, nil + case <-g.newOperation: + } + } +} + +// WaitReady waits until the replication group is ready or its context is cancelled. +func (g *replicationRaftGroup) WaitReady() error { + return WaitGroupReady(g.ctx, g.nodeHost, g.groupID) +} + +// RegisterReplica adds another node as the replica of this authority storage. Without this call, +// the replication Raft group members on other nodes are not able to join the group. This function +// doesn't kick off replication. The replication starts when other nodes kick off their counterpart +// replication Raft groups. +func (g *replicationRaftGroup) RegisterReplica(ctx context.Context, nodeID raftID, replicaAddr string) error { + if !g.authority { + return fmt.Errorf("only authority storage could register its replica") + } + membership, err := g.nodeHost.SyncGetShardMembership(ctx, g.groupID.ToUint64()) + if err != nil { + return fmt.Errorf("fetching shard membership: %w", err) + } + if _, exist := membership.NonVotings[nodeID.ToUint64()]; exist { + return nil + } + + if err := g.nodeHost.SyncRequestAddNonVoting(ctx, g.groupID.ToUint64(), nodeID.ToUint64(), replicaAddr, membership.ConfigChangeID); err != nil { + return fmt.Errorf("requesting to add node %d (%s) as a replica of group %d (storage ID %d): %w", nodeID, replicaAddr, g.groupID, g.storageID, err) + } + return nil +} + +// StorageName is a getter that returns the group's target storage name. +func (g *replicationRaftGroup) StorageName() string { + return g.storageName +} + +// StorageID is a getter that returns the group's target storage ID. +func (g *replicationRaftGroup) StorageID() raftID { + return g.storageID +} + +// initialize pushes all partitions in the state machine to the list of pending operations. It runs +// once when a caller gets the list of partitions after group has just started. +func (g *replicationRaftGroup) initialize() error { + g.Lock() + defer g.Unlock() + + if g.initialized { + return nil + } + + if err := g.WaitReady(); err != nil { + return err + } + + // Push all registered partitions to the pending list. + partitions, err := g.GetRegisteredPartitions() + if err != nil { + return err + } + if len(partitions) > 0 { + for _, partition := range partitions { + g.pendingOperations = append(g.pendingOperations, &partitionOperation{ + op: opTrackPartition, + partition: partition, + }) + } + g.signalNewPartition() + } + g.initialized = true + return nil +} + +func (g *replicationRaftGroup) signalNewPartition() { + select { + case g.newOperation <- struct{}{}: + default: + // If the signal channel is full, no need to wait. + } +} + +func (g *replicationRaftGroup) appendPartitionOps(ops []*partitionOperation) { + g.Lock() + defer g.Unlock() + + g.pendingOperations = append(g.pendingOperations, ops...) + g.signalNewPartition() +} + +// RegisterPartition adds a partition to the tracking list of the replication Raft group. +func (g *replicationRaftGroup) RegisterPartition(partitionID raftID, relativePath string) (*gitalypb.Partition, error) { + result, response, err := g.requestRegisterPartition(partitionID, relativePath) + if err != nil { + return nil, fmt.Errorf("sending RegisterPartitionRequest: %w", err) + } + + switch result { + case resultRegisterPartitionSuccessfully: + return response.GetPartition(), nil + case resultRegisterPartitionExisted: + return nil, structerr.New("partition already existed").WithMetadataItems( + structerr.MetadataItem{Key: "partition_id", Value: partitionID}, + structerr.MetadataItem{Key: "relative_path", Value: relativePath}, + ) + default: + return nil, fmt.Errorf("unknown result code %d", result) + } +} + +// GetRegisteredPartitions returns the list of all registered partitions. Warning. This function is +// not suitable to be called too frequently. +func (g *replicationRaftGroup) GetRegisteredPartitions() ([]*gitalypb.Partition, error) { + response, err := g.requestGetPartitions() + if err != nil { + return nil, fmt.Errorf("sending GetRegisteredPartitionsRequest: %w", err) + } + return response.GetPartitions(), nil +} + +func (g *replicationRaftGroup) requestRegisterPartition(partitionID raftID, relativePath string) (updateResult, *gitalypb.RegisterPartitionResponse, error) { + requester := NewRequester[*gitalypb.RegisterPartitionRequest, *gitalypb.RegisterPartitionResponse]( + g.nodeHost, g.groupID, g.logger, requestOption{ + retry: defaultRetry, + timeout: g.maxNextElectionWait(), + exponential: g.backoffProfile, + }, + ) + return requester.SyncWrite(g.ctx, &gitalypb.RegisterPartitionRequest{ + Partition: &gitalypb.Partition{ + AuthorityName: g.storageName, + AuthorityId: g.storageID.ToUint64(), + PartitionId: partitionID.ToUint64(), + RelativePath: relativePath, + }, + }) +} + +func (g *replicationRaftGroup) requestGetPartitions() (*gitalypb.GetRegisteredPartitionsResponse, error) { + requester := NewRequester[*gitalypb.GetRegisteredPartitionsRequest, *gitalypb.GetRegisteredPartitionsResponse]( + g.nodeHost, g.groupID, g.logger, requestOption{ + retry: defaultRetry, + timeout: g.maxNextElectionWait(), + exponential: g.backoffProfile, + }, + ) + return requester.SyncRead(g.ctx, &gitalypb.GetRegisteredPartitionsRequest{}) +} + +func (g *replicationRaftGroup) maxNextElectionWait() time.Duration { + return time.Millisecond * time.Duration(g.groupConfig.ElectionRTT*g.nodeHost.NodeHostConfig().RTTMillisecond) +} -- GitLab From 07b8f5add38ee2d778d3e3c62d436b380e19d6c0 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sat, 27 Jul 2024 20:41:17 +0700 Subject: [PATCH 04/14] raft: Refactor Raft manager's restarting tests In the test suite of Raft manager, there are some tests that assert the state of the manager after a restart. They sets some internal states of manager objects to `nil`. That approach is prone to be broken if the manager introduces some more states. This commit refactors those tests. They now re-create manager objects using existing storage instead. --- internal/gitaly/storage/raft/manager_test.go | 37 ++++++++++--------- internal/gitaly/storage/raft/storage.go | 7 ---- .../gitaly/storage/raft/testhelper_test.go | 11 ++++-- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/internal/gitaly/storage/raft/manager_test.go b/internal/gitaly/storage/raft/manager_test.go index 381f6300bb..0759629696 100644 --- a/internal/gitaly/storage/raft/manager_test.go +++ b/internal/gitaly/storage/raft/manager_test.go @@ -4,7 +4,6 @@ import ( "fmt" "testing" - "github.com/lni/dragonboat/v4" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -54,23 +53,27 @@ func TestManager_Start(t *testing.T) { } return &testNode{ - manager: mgr, - close: mgr.Close, + cfg: cfg, + manager: mgr, + ptnManager: ptnMgr, + close: mgr.Close, }, nil } } - resetManager := func(t *testing.T, m *Manager) { - m.metadataGroup = nil - for _, storageMgr := range m.storageManagers { - storageMgr.clearStorageInfo() - storageMgr.nodeHost.Close() - nodeHost, err := dragonboat.NewNodeHost(storageMgr.nodeHost.NodeHostConfig()) - require.NoError(t, err) - storageMgr.nodeHost = nodeHost - } - m.started.Store(false) - m.closed.Store(false) + resetManager := func(t *testing.T, node *testNode) { + node.manager.Close() + m2, err := NewManager( + testhelper.Context(t), + node.cfg.Storages, + node.manager.clusterConfig, + node.manager.managerConfig, + node.ptnManager, + node.manager.logger) + require.NoError(t, err) + + node.manager = m2 + node.close = m2.Close } t.Run("bootstrap a singular cluster", func(t *testing.T) { @@ -333,7 +336,7 @@ func TestManager_Start(t *testing.T) { }) for _, node := range cluster.nodes { - resetManager(t, node.manager) + resetManager(t, node) } fanOut(3, func(node raftID) { @@ -376,7 +379,7 @@ func TestManager_Start(t *testing.T) { }) for _, node := range cluster.nodes { - resetManager(t, node.manager) + resetManager(t, node) node.manager.managerConfig.BootstrapCluster = false } @@ -456,7 +459,7 @@ func TestManager_Start(t *testing.T) { }) for _, node := range cluster.nodes { - resetManager(t, node.manager) + resetManager(t, node) node.manager.managerConfig.BootstrapCluster = false node.manager.clusterConfig.ReplicationFactor = 3 } diff --git a/internal/gitaly/storage/raft/storage.go b/internal/gitaly/storage/raft/storage.go index d981bcabfe..9a62587fe9 100644 --- a/internal/gitaly/storage/raft/storage.go +++ b/internal/gitaly/storage/raft/storage.go @@ -80,13 +80,6 @@ func (m *storageManager) saveStorageInfo(ctx context.Context, storage *gitalypb. }) } -// clearStorageInfo clears the storage info inside the in-memory storage of the storage manager. It -// does not clean the persisted info the DB. -func (m *storageManager) clearStorageInfo() { - m.id = 0 - m.persistedInfo = nil -} - func (m *storageManager) dbForMetadataGroup() dbAccessor { return dbForMetadataGroup(m.ptnMgr, m.name) } diff --git a/internal/gitaly/storage/raft/testhelper_test.go b/internal/gitaly/storage/raft/testhelper_test.go index e853a44a18..94273e93f3 100644 --- a/internal/gitaly/storage/raft/testhelper_test.go +++ b/internal/gitaly/storage/raft/testhelper_test.go @@ -125,10 +125,15 @@ var dragonboatTestingProfile = func() dragonboatConfig.ExpertConfig { }() type testNode struct { - nodeHost *dragonboat.NodeHost - manager *Manager + // Variables for a real manager. + cfg config.Cfg + manager *Manager + ptnManager *storagemgr.PartitionManager + close func() + + // Mock manager's functionalities. sm *testStateMachine - close func() + nodeHost *dragonboat.NodeHost } type testRaftCluster struct { -- GitLab From 1d53c75bdb0f5fdef01ccf2d0ee7b6bcfcc0f428 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sat, 27 Jul 2024 20:41:57 +0700 Subject: [PATCH 05/14] raft: Access storage's ID via a getter Raft's storage manager object has two ways to access its ID: via an internal field or via a persistent storage info struct. The former is an implication of the later. This commit removes the internal ID field and provides a getter to access such info from the persistent info. --- internal/gitaly/storage/raft/manager_test.go | 69 ++++++++++---------- internal/gitaly/storage/raft/storage.go | 18 +++-- 2 files changed, 42 insertions(+), 45 deletions(-) diff --git a/internal/gitaly/storage/raft/manager_test.go b/internal/gitaly/storage/raft/manager_test.go index 0759629696..7181336d24 100644 --- a/internal/gitaly/storage/raft/manager_test.go +++ b/internal/gitaly/storage/raft/manager_test.go @@ -83,7 +83,7 @@ func TestManager_Start(t *testing.T) { defer cluster.closeAll() require.NoError(t, cluster.nodes[1].manager.Start()) - require.Equal(t, raftID(1), cluster.nodes[1].manager.firstStorage.id) + require.Equal(t, raftID(1), cluster.nodes[1].manager.firstStorage.ID()) clusterInfo, err := cluster.nodes[1].manager.ClusterInfo() require.NoError(t, err) @@ -104,7 +104,7 @@ func TestManager_Start(t *testing.T) { defer cluster.closeAll() require.NoError(t, cluster.nodes[1].manager.Start()) - require.Equal(t, raftID(1), cluster.nodes[1].manager.firstStorage.id) + require.Equal(t, raftID(1), cluster.nodes[1].manager.firstStorage.ID()) require.EqualError(t, cluster.nodes[1].manager.Start(), "raft manager already started") }) @@ -121,7 +121,7 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -130,7 +130,7 @@ func TestManager_Start(t *testing.T) { var expectedIDs, allocatedIDs []raftID for i := raftID(1); i <= raftID(numNode); i++ { expectedIDs = append(expectedIDs, i) - allocatedIDs = append(allocatedIDs, cluster.nodes[i].manager.firstStorage.id) + allocatedIDs = append(allocatedIDs, cluster.nodes[i].manager.firstStorage.ID()) } require.ElementsMatch(t, expectedIDs, allocatedIDs) @@ -144,13 +144,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(numNode+1), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), + StorageId: storage.ID().ToUint64(), Name: storage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(storage.id, uint64(numNode)), + ReplicaGroups: replicaGroups(storage.ID(), uint64(numNode)), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.ID().ToUint64()]) }) }) }(numNode) @@ -170,7 +170,7 @@ func TestManager_Start(t *testing.T) { require.Equal(t, true, cluster.nodes[node].manager.Ready()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -178,8 +178,8 @@ func TestManager_Start(t *testing.T) { // The quorum is reached require.ElementsMatch(t, []raftID{1, 2}, []raftID{ - cluster.nodes[1].manager.firstStorage.id, - cluster.nodes[2].manager.firstStorage.id, + cluster.nodes[1].manager.firstStorage.ID(), + cluster.nodes[2].manager.firstStorage.ID(), }) fanOut(2, func(node raftID) { @@ -187,19 +187,18 @@ func TestManager_Start(t *testing.T) { storage := mgr.firstStorage clusterInfo, err := mgr.ClusterInfo() - fmt.Printf("%+v %+v\n", node, clusterInfo) require.NoError(t, err) require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(3), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), + StorageId: storage.ID().ToUint64(), Name: storage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(storage.id, 2), + ReplicaGroups: replicaGroups(storage.ID(), 2), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.ID().ToUint64()]) }) // Now the third node joins. It does not matter whether the third node bootstraps the cluster. @@ -216,13 +215,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, cluster.clusterID, clusterInfo.ClusterId) require.Equal(t, uint64(4), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), + StorageId: storage.ID().ToUint64(), Name: storage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(storage.id, 3), + ReplicaGroups: replicaGroups(storage.ID(), 3), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.ID().ToUint64()]) }) }) }(bootstrap) @@ -276,7 +275,7 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -288,8 +287,8 @@ func TestManager_Start(t *testing.T) { }) require.ElementsMatch(t, []raftID{1, 2}, []raftID{ - cluster.nodes[1].manager.firstStorage.id, - cluster.nodes[2].manager.firstStorage.id, + cluster.nodes[1].manager.firstStorage.ID(), + cluster.nodes[2].manager.firstStorage.ID(), }) fanOut(3, func(node raftID) { @@ -308,13 +307,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(3), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: storage.id.ToUint64(), + StorageId: storage.ID().ToUint64(), Name: storage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(storage.id, 2), + ReplicaGroups: replicaGroups(storage.ID(), 2), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[storage.ID().ToUint64()]) } }) }) @@ -329,7 +328,7 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -352,13 +351,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(4), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: mgr.firstStorage.id.ToUint64(), + StorageId: mgr.firstStorage.ID().ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(mgr.firstStorage.id, 3), + ReplicaGroups: replicaGroups(mgr.firstStorage.ID(), 3), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.ID().ToUint64()]) }) }) @@ -372,7 +371,7 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(3), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -396,13 +395,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(4), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: mgr.firstStorage.id.ToUint64(), + StorageId: mgr.firstStorage.ID().ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(mgr.firstStorage.id, 3), + ReplicaGroups: replicaGroups(mgr.firstStorage.ID(), 3), } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.ID().ToUint64()]) }) }) @@ -451,7 +450,7 @@ func TestManager_Start(t *testing.T) { require.NoError(t, cluster.nodes[node].manager.Start()) storage := cluster.nodes[node].manager.firstStorage - require.Equal(t, storage.id.ToUint64(), storage.persistedInfo.StorageId) + require.Equal(t, storage.ID().ToUint64(), storage.persistedInfo.StorageId) require.Equal(t, storage.name, storage.persistedInfo.Name) require.Equal(t, uint64(1), storage.persistedInfo.ReplicationFactor) require.Equal(t, node.ToUint64(), storage.persistedInfo.NodeId) @@ -477,13 +476,13 @@ func TestManager_Start(t *testing.T) { require.Equal(t, uint64(4), clusterInfo.NextStorageId) expectedInfo := &gitalypb.Storage{ - StorageId: mgr.firstStorage.id.ToUint64(), + StorageId: mgr.firstStorage.ID().ToUint64(), Name: mgr.firstStorage.name, ReplicationFactor: 3, // New replication factor NodeId: node.ToUint64(), - ReplicaGroups: replicaGroups(mgr.firstStorage.id, 3), // New replica groups + ReplicaGroups: replicaGroups(mgr.firstStorage.ID(), 3), // New replica groups } - testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.id.ToUint64()]) + testhelper.ProtoEqual(t, expectedInfo, clusterInfo.Storages[mgr.firstStorage.ID().ToUint64()]) }) }) } diff --git a/internal/gitaly/storage/raft/storage.go b/internal/gitaly/storage/raft/storage.go index 9a62587fe9..20759e021a 100644 --- a/internal/gitaly/storage/raft/storage.go +++ b/internal/gitaly/storage/raft/storage.go @@ -3,7 +3,6 @@ package raft import ( "context" "errors" - "fmt" "github.com/dgraph-io/badger/v4" "github.com/lni/dragonboat/v4" @@ -17,7 +16,6 @@ import ( // keyvalue.Transactioner for each Raft group, allowing the Raft groups to store their data in the // underlying keyvalue store. type storageManager struct { - id raftID name string ptnMgr *storagemgr.PartitionManager db dbAccessor @@ -38,6 +36,14 @@ func newStorageManager(name string, ptnMgr *storagemgr.PartitionManager, nodeHos // Close closes the storage manager. func (m *storageManager) Close() { m.nodeHost.Close() } +// ID returns the ID of the storage from persistent storage. +func (m *storageManager) ID() raftID { + if m.persistedInfo == nil { + return 0 + } + return raftID(m.persistedInfo.GetStorageId()) +} + func (m *storageManager) loadStorageInfo(ctx context.Context) error { return m.db.read(ctx, func(txn keyvalue.ReadWriter) error { item, err := txn.Get([]byte("storage")) @@ -53,7 +59,6 @@ func (m *storageManager) loadStorageInfo(ctx context.Context) error { return err } m.persistedInfo = &persistedInfo - m.id = raftID(m.persistedInfo.StorageId) return nil }) }) @@ -61,12 +66,6 @@ func (m *storageManager) loadStorageInfo(ctx context.Context) error { func (m *storageManager) saveStorageInfo(ctx context.Context, storage *gitalypb.Storage) error { return m.db.write(ctx, func(txn keyvalue.ReadWriter) error { - _, err := txn.Get([]byte("storage")) - if err == nil { - return fmt.Errorf("storage already exists") - } else if !errors.Is(err, badger.ErrKeyNotFound) { - return err - } marshaled, err := proto.Marshal(storage) if err != nil { return err @@ -75,7 +74,6 @@ func (m *storageManager) saveStorageInfo(ctx context.Context, storage *gitalypb. return err } m.persistedInfo = storage - m.id = raftID(m.persistedInfo.StorageId) return nil }) } -- GitLab From c0aca0c986571edfd0029a2528de153dd0601bca Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Wed, 31 Jul 2024 12:57:04 +0700 Subject: [PATCH 06/14] raft: Implement replicator to manage replication activities This commits Implements Raft replication. it manages the replication activities of a storage, including: - Apply the list replica groups. - Poll the list of partitions under management and start/stop corresponding partition groups accordingly. - Manage the authority storage of this node. - Push any changes from partitions of that authority storages if any. - Manage the replicas assigned to this storage. - Poll and apply any changes from partitions of replicas if any. --- internal/gitaly/storage/raft/replicator.go | 358 +++++++++++++++++++++ 1 file changed, 358 insertions(+) create mode 100644 internal/gitaly/storage/raft/replicator.go diff --git a/internal/gitaly/storage/raft/replicator.go b/internal/gitaly/storage/raft/replicator.go new file mode 100644 index 0000000000..162652e49b --- /dev/null +++ b/internal/gitaly/storage/raft/replicator.go @@ -0,0 +1,358 @@ +package raft + +import ( + "context" + "errors" + "fmt" + "slices" + "time" + + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +// defaultReplicatorTimeout defines the default timeout period used for replicator's operations. +const defaultReplicatorTimeout = 30 * time.Second + +// replica is an interface that represents avavailable actions of a replica group. +type replica interface { + PollPartitions() ([]*partitionOperation, error) + WaitReady() error + StorageID() raftID + StorageName() string + Close() error +} + +// authority is an interface that represents the actions of an authority group. +type authority interface { + replica + + // RegisterPartition registers a partition with the authority. + RegisterPartition(partitionID raftID, relativePath string) (*gitalypb.Partition, error) + // RegisterReplica registers a replica to the list of known replicas of the authority. + RegisterReplica(ctx context.Context, nodeID raftID, replicaAddr string) error +} + +// storageTracker tracks the partition deletion/creation activities of a replica. +type storageTracker struct { + ctx context.Context + cancel context.CancelFunc + done chan struct{} + poller replica +} + +type replicatorConfig struct { + initAuthorityGroup func(storage *gitalypb.Storage) (authority, error) + initReplicaGroup func(authorityID raftID, storage *gitalypb.Storage) (replica, error) + initPartitionGroup func(authorityID raftID, partitionID raftID, relativePath string) error + getNodeAddr func(nodeID raftID) (string, error) +} + +// replicator manages the replication activities of a storage, including: +// - Apply the list replica groups. +// - Poll the list of partitions under management and start/stop corresponding partition groups accordingly. +// - Manage the authority storage of this node. +// - Push any changes from partitions of that authority storages if any. +// - Manage the replicas assigned to this storage. +// - Poll and apply any changes from partitions of replicas if any. +// +/* + ┌─►Partition Group N ──► Other nodes + │ + ├─►Partition Group 2 ──► Other nodes + Push changes │ + ├─►Partition Group 1 ──► Other nodes + │ + │ + ┌─────► Authority Group + │ + Push authority changes │ Poll Changes +WAL ────────────────────► Replicator ◄─────────────────────────────┐ + ▲ │ │ │ + │ │ │ │ + └─────────────────────────┘ ├────── Replica GroupsA │ + Apply replica changes │ │ │ + │ │ │ + │ ├──Partition Group 1 │ │ + │ │ │ │ + │ ├──Partition Group 2 ├───┤ + │ │ │ │ + │ └──Partition Group N │ │ + │ │ + └────── Replica GroupsB │ + │ │ + │ │ + ├──Partition Group 1 │ │ + │ │ │ + ├──Partition Group 2 ├───┘ + │ │ + └──Partition Group N │ +*/ +type replicator struct { + ctx context.Context + logger log.Logger + config replicatorConfig + authorityID raftID + authority authority + storageTrackers map[raftID]*storageTracker +} + +// ApplyReplicaGroups applies the list of replica groups. +func (r *replicator) ApplyReplicaGroups(cluster *gitalypb.Cluster) (int, error) { + var changes int + + authorityStorageInfo := cluster.Storages[r.authorityID.ToUint64()] + if authorityStorageInfo == nil { + return 0, fmt.Errorf("storage with ID %d does not exist", r.authorityID) + } + + replicatingStorages := r.stringifyReplicatingStorages() + storagesToReplicate := r.storagesToReplicate(cluster) + + // First, initialize the authority storage, which is the storage managed by this node. + if r.authority == nil { + authority, err := r.config.initAuthorityGroup(authorityStorageInfo) + if err != nil { + return changes, fmt.Errorf("creating authority Raft group: %w", err) + } + + if err := authority.WaitReady(); err != nil { + if closingErr := authority.Close(); closingErr != nil { + return changes, errors.Join(err, closingErr) + } + return changes, fmt.Errorf("waiting authority Raft group: %w", err) + } + + // Track this storage. As the group does not communicate with this replicator directly, we + // need to poll from its statemachine. + r.trackStorage(r.authorityID, authority) + r.authority = authority + changes++ + + r.logger.WithFields(log.Fields{ + "raft.storage_id": r.authorityID, + "raft.storage_name": authorityStorageInfo.GetName(), + }).Info("tracking authority storage") + } + + // Register all replicas of this authority storage. The replication won't start until a + // replication Raft groups on other nodes start. This node doesn't remove obsolete replicas on + // other nodes although it could. It lets other nodes shutdown obsolete replicas themselves so + // that they have a chance to clean up resources. + for _, replicaID := range authorityStorageInfo.ReplicaGroups { + if err := r.registerReplica(replicaID, cluster); err != nil { + return changes, fmt.Errorf("registering replica: %w", err) + } + } + + // Untrack obsoleted replicas when the list of replica groups changes. At the moment, the + // replicator doesn't clean up data. It simply stops listening to new changes. Data deletion + // will be handled in https://gitlab.com/gitlab-org/gitaly/-/issues/6248. + for storageID := range r.storageTrackers { + if storageID == r.authorityID { + continue + } + if _, exist := storagesToReplicate[storageID]; !exist { + changes++ + if err := r.untrackStorage(storageID); err != nil { + return 0, fmt.Errorf("untrack storage ID %d: %w", storageID, err) + } + r.logger.WithFields(log.Fields{"raft.storage_id": storageID}).Info("untracked storage") + } + } + + // Listen to existing/new partitions from tracked replica. + for storageID, storageInfo := range storagesToReplicate { + if _, exist := r.storageTrackers[storageID]; exist { + continue + } + changes++ + replicated, err := r.config.initReplicaGroup(r.authorityID, storageInfo) + if err != nil { + return 0, fmt.Errorf("creating replication Raft group: %w", err) + } + r.trackStorage(storageID, replicated) + r.logger.WithFields(log.Fields{ + "raft.storage_id": storageID, + "raft.storage_name": storageInfo.GetName(), + }).Info("tracking storage") + } + + if changes != 0 { + r.logger.WithFields(log.Fields{ + "raft.replicating_storages": replicatingStorages, + "raft.storages_to_replicate": r.stringifyStorageMap(storagesToReplicate), + }).Info(fmt.Sprintf("applied replica groups, %d changes", changes)) + } + return changes, nil +} + +// BroadcastNewPartition is triggered by a caller. It makes a call to the authority group to +// register the new partition. This partition is broadcasted to corresponding replica Raft groups. +func (r *replicator) BroadcastNewPartition(partitionID raftID, relativePath string) error { + if _, err := r.authority.RegisterPartition(partitionID, relativePath); err != nil { + return fmt.Errorf("registering partition: %w", err) + } + return nil +} + +// Close stops all activities of the replicator. This function exits after background goroutines finish. +func (r *replicator) Close() error { + var mergedErr error + + for storageID := range r.storageTrackers { + if err := r.untrackStorage(storageID); err != nil { + if mergedErr == nil { + mergedErr = err + } else { + mergedErr = errors.Join(mergedErr, err) + } + } + } + return mergedErr +} + +func (r *replicator) storagesToReplicate(cluster *gitalypb.Cluster) map[raftID]*gitalypb.Storage { + replications := map[raftID]*gitalypb.Storage{} + for storageID, storage := range cluster.Storages { + if slices.Contains(storage.ReplicaGroups, r.authorityID.ToUint64()) { + replications[raftID(storageID)] = storage + } + } + return replications +} + +func (r *replicator) stringifyStorageMap(m map[raftID]*gitalypb.Storage) map[raftID]string { + result := map[raftID]string{} + for k, v := range m { + result[k] = v.GetName() + } + return result +} + +func (r *replicator) stringifyReplicatingStorages() map[raftID]string { + result := map[raftID]string{} + for k, v := range r.storageTrackers { + if k != r.authorityID { + result[k] = v.poller.StorageName() + continue + } + } + return result +} + +func (r *replicator) trackStorage(storageID raftID, poller replica) { + ctx, cancel := context.WithCancel(r.ctx) + r.storageTrackers[storageID] = &storageTracker{ + ctx: ctx, + cancel: cancel, + poller: poller, + done: make(chan struct{}), + } + go r.pollPartitionsFromStorage(r.storageTrackers[storageID]) +} + +func (r *replicator) untrackStorage(storageID raftID) error { + tracker := r.storageTrackers[storageID] + tracker.cancel() + defer func() { + delete(r.storageTrackers, storageID) + }() + + if err := tracker.poller.Close(); err != nil { + return fmt.Errorf("closing poller: %w", err) + } + + select { + case <-time.After(defaultReplicatorTimeout): + return fmt.Errorf("deadline exceeded while untracking storage") + case <-tracker.done: + return nil + } +} + +func (r *replicator) pollPartitionsFromStorage(tracker *storageTracker) { + defer close(tracker.done) + + for { + select { + case <-tracker.ctx.Done(): + return + default: + } + + partitions, err := tracker.poller.PollPartitions() + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + r.logger.WithFields(log.Fields{ + "raft.storage_id": tracker.poller.StorageID(), + "raft.storage_name": tracker.poller.StorageName(), + }).WithError(err).Error("fail to poll partritions from storage") + continue + } + if len(partitions) == 0 { + continue + } + for _, operation := range partitions { + partition := operation.partition + if partition == nil { + r.logger.Error("tracking unknown partition") + continue + } + switch operation.op { + case opTrackPartition: + fields := log.Fields{ + "raft.authority_id": partition.GetAuthorityId(), + "raft.authority_name": partition.GetAuthorityName(), + "raft.storage_id": r.authority.StorageID(), + "raft.storage_name": r.authority.StorageName(), + "raft.partition_id": partition.GetPartitionId(), + "raft.relative_path": partition.GetRelativePath(), + } + if err := r.config.initPartitionGroup( + raftID(partition.GetAuthorityId()), + raftID(partition.GetPartitionId()), + partition.GetRelativePath(), + ); err != nil { + r.logger.WithFields(fields).WithError(err).Error("fail to initialize partition Raft group") + continue + } + r.logger.WithFields(fields).Debug("tracking partition") + default: + r.logger.WithFields(log.Fields{ + "raft.partition_id": partition.GetPartitionId(), + "raft.relative_path": partition.GetRelativePath(), + }).Error("unsupported op") + } + } + } +} + +func (r *replicator) registerReplica(replicaID uint64, cluster *gitalypb.Cluster) error { + replicaInfo := cluster.Storages[replicaID] + if replicaInfo == nil { + return fmt.Errorf("replica ID %d of storage ID %d not found", replicaID, r.authorityID) + } + addr, err := r.config.getNodeAddr(raftID(replicaInfo.GetNodeId())) + if err != nil { + return fmt.Errorf("getting storage address: %w", err) + } + ctx, cancel := context.WithTimeout(r.ctx, defaultReplicatorTimeout) + defer cancel() + return r.authority.RegisterReplica(ctx, raftID(replicaInfo.GetNodeId()), addr) +} + +func newReplicator(ctx context.Context, authorityID raftID, logger log.Logger, config replicatorConfig) *replicator { + return &replicator{ + ctx: ctx, + authorityID: authorityID, + logger: logger.WithFields(log.Fields{ + "raft.component": "replicator", + "raft.storage_id": authorityID, + }), + config: config, + storageTrackers: map[raftID]*storageTracker{}, + } +} -- GitLab From 008daef61df8e8dabb5248c332c0aa897a08589c Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sat, 27 Jul 2024 20:44:13 +0700 Subject: [PATCH 07/14] raft: Let the Raft manager start replicators This commit lets the Raft manager start the replicators for each managed storage. After starting, the Raft manager starts to poll latest cluster info from metadata Raft group. It triggers the replicator to apply the changes. If there is no new updates, the polling frequency is pushed back exponentially. --- internal/gitaly/storage/raft/db.go | 19 +- internal/gitaly/storage/raft/db_test.go | 96 ++++++---- internal/gitaly/storage/raft/manager.go | 176 ++++++++++++++++-- internal/gitaly/storage/raft/storage.go | 15 +- .../gitaly/storage/raft/testhelper_test.go | 8 +- 5 files changed, 260 insertions(+), 54 deletions(-) diff --git a/internal/gitaly/storage/raft/db.go b/internal/gitaly/storage/raft/db.go index 143f408377..82ced10acc 100644 --- a/internal/gitaly/storage/raft/db.go +++ b/internal/gitaly/storage/raft/db.go @@ -2,6 +2,7 @@ package raft import ( "context" + "fmt" "github.com/lni/dragonboat/v4/statemachine" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" @@ -38,19 +39,29 @@ func newNamespacedDBAccessor(ptnMgr *storagemgr.PartitionManager, storageName st } } -// dbForStorage returns a namedspaced DB accessor function for specific information of a storage in +// dbForStorage returns a namedspaced DB accessor for specific information of a storage in // Raft cluster such as allocated storage ID, last applied replica groups, etc. func dbForStorage(ptnMgr *storagemgr.PartitionManager, storageName string) dbAccessor { return newNamespacedDBAccessor(ptnMgr, storageName, []byte("raft/self/")) } -// dbForMetadataGroup returns a namedspaced DB accessfor function to store the data of metadata Raft -// group. Those data consists of cluster-wide information such as list of registered storages and -// their replication groups, etc. +// dbForMetadataGroup returns a namedspaced DB accessor to store the data of metadata Raft group. +// Those data consists of cluster-wide information such as list of registered storages and their +// replication groups, etc. func dbForMetadataGroup(ptnMgr *storagemgr.PartitionManager, storageName string) dbAccessor { return newNamespacedDBAccessor(ptnMgr, storageName, []byte("raft/cluster/")) } +// dbForReplicationGroup returns a namedspaced DB accessor to store the list of partitions under +// management of a storage. They could be either replicated partitions or ones created by the +// authoritative storage. +func dbForReplicationGroup(ptnMgr *storagemgr.PartitionManager, storageID raftID, storageName string, targetID raftID) dbAccessor { + if storageID == targetID { + return newNamespacedDBAccessor(ptnMgr, storageName, []byte("raft/authority/")) + } + return newNamespacedDBAccessor(ptnMgr, storageName, []byte(fmt.Sprintf("raft/replicas/%s/", targetID.MarshalBinary()))) +} + var keyLastApplied = []byte("applied_lsn") // Statemachine is an interface that wraps dragonboat's statemachine. It is a superset of diff --git a/internal/gitaly/storage/raft/db_test.go b/internal/gitaly/storage/raft/db_test.go index 6f4697ccbb..41b3aa6731 100644 --- a/internal/gitaly/storage/raft/db_test.go +++ b/internal/gitaly/storage/raft/db_test.go @@ -4,13 +4,8 @@ import ( "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -23,20 +18,9 @@ func TestDbForMetadataGroup(t *testing.T) { ctx := testhelper.Context(t) cfg := testcfg.Build(t, testcfg.WithStorages("node-1")) - logger := testhelper.NewLogger(t) - dbMgr := setupTestDBManager(t, cfg) + dbMgr, ptnManager := setupTestDBManagerAndPartitionManager(t, cfg) - cmdFactory := gittest.NewCommandFactory(t, cfg) - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - - partitionManager, err := storagemgr.NewPartitionManager(testhelper.Context(t), cfg.Storages, cmdFactory, localRepoFactory, logger, dbMgr, cfg.Prometheus, nil) - require.NoError(t, err) - t.Cleanup(partitionManager.Close) - - db := dbForMetadataGroup(partitionManager, "node-1") + db := dbForMetadataGroup(ptnManager, "node-1") require.NoError(t, db.write(ctx, func(txn keyvalue.ReadWriter) error { require.NoError(t, txn.Set([]byte("data-1"), []byte("one"))) return nil @@ -96,19 +80,7 @@ func TestDbForStorage(t *testing.T) { ctx := testhelper.Context(t) cfg := testcfg.Build(t, testcfg.WithStorages("node-1")) - logger := testhelper.NewLogger(t) - dbMgr := setupTestDBManager(t, cfg) - - cmdFactory := gittest.NewCommandFactory(t, cfg) - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - - partitionManager, err := storagemgr.NewPartitionManager(testhelper.Context(t), cfg.Storages, cmdFactory, localRepoFactory, logger, dbMgr, cfg.Prometheus, nil) - require.NoError(t, err) - t.Cleanup(partitionManager.Close) - + dbMgr, ptnManager := setupTestDBManagerAndPartitionManager(t, cfg) storageInfo := &gitalypb.Storage{ StorageId: 1, Name: "node-1", @@ -117,7 +89,7 @@ func TestDbForStorage(t *testing.T) { ReplicaGroups: []uint64{2, 3}, } - db := dbForStorage(partitionManager, "node-1") + db := dbForStorage(ptnManager, "node-1") require.NoError(t, db.write(ctx, func(txn keyvalue.ReadWriter) error { storage, err := proto.Marshal(storageInfo) require.NoError(t, err) @@ -169,3 +141,63 @@ func TestDbForStorage(t *testing.T) { return nil })) } + +func TestDbForReplicationGroup(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t, testcfg.WithStorages("node-1")) + + dbMgr, ptnManager := setupTestDBManagerAndPartitionManager(t, cfg) + + db1 := dbForReplicationGroup(ptnManager, 1, "node-1", 1) + db2 := dbForReplicationGroup(ptnManager, 1, "node-1", 2) + db3 := dbForReplicationGroup(ptnManager, 1, "node-1", 3) + + require.NoError(t, db1.write(ctx, func(txn keyvalue.ReadWriter) error { + require.NoError(t, txn.Set([]byte("partitions/1"), []byte("@hashed/aa/bb/aabb"))) + require.NoError(t, txn.Set([]byte("partitions/2"), []byte("@hashed/cc/dd/ccdd"))) + require.NoError(t, txn.Set([]byte("partitions/3"), []byte("@hashed/ii/jj/iijj"))) + return nil + })) + + require.NoError(t, db2.write(ctx, func(txn keyvalue.ReadWriter) error { + require.NoError(t, txn.Set([]byte("partitions/1"), []byte("@hashed/aa/bb/aabb"))) + require.NoError(t, txn.Set([]byte("partitions/2"), []byte("@hashed/ee/ff/eeff"))) + return nil + })) + + require.NoError(t, db3.write(ctx, func(txn keyvalue.ReadWriter) error { + require.NoError(t, txn.Set([]byte("partitions/1"), []byte("@hashed/aa/bb/aabb"))) + require.NoError(t, txn.Set([]byte("partitions/2"), []byte("@hashed/gg/hh/gghh"))) + return nil + })) + + store, err := dbMgr.GetDB("node-1") + require.NoError(t, err) + + require.NoError(t, store.View(func(txn keyvalue.ReadWriter) error { + it := txn.NewIterator(keyvalue.IteratorOptions{ + Prefix: []byte("p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/"), + }) + defer it.Close() + + values := map[string][]byte{} + for it.Rewind(); it.Valid(); it.Next() { + k, err := it.Item().ValueCopy(nil) + require.NoError(t, err) + values[string(it.Item().Key())] = k + } + + require.Equal(t, map[string][]byte{ + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/authority/partitions/1": []byte("@hashed/aa/bb/aabb"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/authority/partitions/2": []byte("@hashed/cc/dd/ccdd"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/authority/partitions/3": []byte("@hashed/ii/jj/iijj"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/replicas/\x00\x00\x00\x00\x00\x00\x00\x02/partitions/1": []byte("@hashed/aa/bb/aabb"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/replicas/\x00\x00\x00\x00\x00\x00\x00\x02/partitions/2": []byte("@hashed/ee/ff/eeff"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/replicas/\x00\x00\x00\x00\x00\x00\x00\x03/partitions/1": []byte("@hashed/aa/bb/aabb"), + "p/\x00\x00\x00\x00\x00\x00\x00\x01/kv/raft/replicas/\x00\x00\x00\x00\x00\x00\x00\x03/partitions/2": []byte("@hashed/gg/hh/gghh"), + }, values) + return nil + })) +} diff --git a/internal/gitaly/storage/raft/manager.go b/internal/gitaly/storage/raft/manager.go index 0b79d5673a..e69b62148f 100644 --- a/internal/gitaly/storage/raft/manager.go +++ b/internal/gitaly/storage/raft/manager.go @@ -3,11 +3,15 @@ package raft import ( "context" "fmt" + "math/rand" "path/filepath" + "sync" "sync/atomic" + "time" "github.com/lni/dragonboat/v4" dragonboatConf "github.com/lni/dragonboat/v4/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/backoff" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -39,11 +43,13 @@ type ManagerConfig struct { // Manager is responsible for managing the Raft cluster for all storages. type Manager struct { ctx context.Context + cancel context.CancelFunc clusterConfig config.Raft managerConfig ManagerConfig logger log.Logger started atomic.Bool closed atomic.Bool + running sync.WaitGroup storageManagers map[string]*storageManager firstStorage *storageManager @@ -72,9 +78,10 @@ func NewManager( if len(storages) > 1 { return nil, fmt.Errorf("the support for multiple storages is temporarily disabled") } - + ctx, cancel := context.WithCancel(ctx) m := &Manager{ ctx: ctx, + cancel: cancel, clusterConfig: clusterCfg, managerConfig: managerCfg, logger: logger.WithFields(log.Fields{ @@ -84,6 +91,7 @@ func NewManager( "raft_node_id": clusterCfg.NodeID, }), storageManagers: map[string]*storageManager{}, + running: sync.WaitGroup{}, } storage := storages[0] @@ -126,6 +134,7 @@ func NewManager( // - Register the node's storage with the metadata Raft group. The metadata Raft group allocates a // new storage ID for each of them. They persist in their IDs. This type of ID is used for future // interaction with the cluster. +// - Start the replicator that monitors to any storage/partition changes from desired groups. func (m *Manager) Start() (returnedErr error) { if m.started.Load() { return fmt.Errorf("raft manager already started") @@ -160,18 +169,11 @@ func (m *Manager) Start() (returnedErr error) { m.logger.WithField("cluster", cluster).Info("Raft cluster bootstrapped") } - // Temporarily, we fetch the cluster info from the metadata Raft group directly. In the future, - // this node needs to contact a metadata authority. - // For more information: https://gitlab.com/groups/gitlab-org/-/epics/10864 - cluster, err := m.metadataGroup.ClusterInfo() - if err != nil { - return fmt.Errorf("getting cluster info: %w", err) - } - if cluster.ClusterId != m.clusterConfig.ClusterID { - return fmt.Errorf("joining the wrong cluster, expected to join %q but joined %q", m.clusterConfig.ClusterID, cluster.ClusterId) + if err := m.registerStorages(); err != nil { + return err } - if err := m.registerStorages(); err != nil { + if err := m.startReplicators(); err != nil { return err } @@ -196,6 +198,17 @@ func (m *Manager) initMetadataGroup(storageMgr *storageManager) error { } func (m *Manager) registerStorages() error { + // Temporarily, we fetch the cluster info from the metadata Raft group directly. In the future, + // this node needs to contact a metadata authority. + // For more information: https://gitlab.com/groups/gitlab-org/-/epics/10864 + cluster, err := m.metadataGroup.ClusterInfo() + if err != nil { + return fmt.Errorf("getting cluster info: %w", err) + } + if cluster.ClusterId != m.clusterConfig.ClusterID { + return fmt.Errorf("joining the wrong cluster, expected to join %q but joined %q", m.clusterConfig.ClusterID, cluster.ClusterId) + } + if m.managerConfig.testBeforeRegister != nil { m.managerConfig.testBeforeRegister() } @@ -206,7 +219,7 @@ func (m *Manager) registerStorages() error { if err := storageMgr.loadStorageInfo(m.ctx); err != nil { return fmt.Errorf("loading persisted storage info: %w", err) } - if storageMgr.persistedInfo == nil || storageMgr.persistedInfo.GetStorageId() == 0 { + if storageMgr.ID() == 0 { storageInfo, err := m.metadataGroup.RegisterStorage(storageName) if err != nil { return fmt.Errorf("registering storage info: %w", err) @@ -214,6 +227,10 @@ func (m *Manager) registerStorages() error { if err := storageMgr.saveStorageInfo(m.ctx, storageInfo); err != nil { return fmt.Errorf("saving storage info: %w", err) } + m.logger.WithFields(log.Fields{ + "raft.storage_name": storageName, + "raft.storage_id": storageMgr.persistedInfo.GetStorageId(), + }).Info("storage registered") } else if storageMgr.persistedInfo.NodeId != m.clusterConfig.NodeID || storageMgr.persistedInfo.ReplicationFactor != m.clusterConfig.ReplicationFactor { // Changes that gonna affect replication. Gitaly needs to sync up those changes to metadata // Raft group to shuffle the replication groups. We don't persit new info intentionally. The @@ -227,15 +244,128 @@ func (m *Manager) registerStorages() error { } } m.logger.WithFields(log.Fields{ - "storage_name": storageName, - "storage_id": storageMgr.persistedInfo.GetStorageId(), - "replication_factor": storageMgr.persistedInfo.GetReplicationFactor(), + "raft.storage_name": storageName, + "raft.storage_id": storageMgr.persistedInfo.GetStorageId(), + "raft.replication_factor": storageMgr.persistedInfo.GetReplicationFactor(), }).Info("storage joined the cluster") } return nil } +func (m *Manager) initReplicationGroup(hostingID raftID, storageInfo *gitalypb.Storage, authority bool) (*replicationRaftGroup, error) { + var destinatingMgr *storageManager + for _, storageMgr := range m.storageManagers { + if storageMgr.ID() == hostingID { + destinatingMgr = storageMgr + } + } + + if destinatingMgr == nil { + return nil, fmt.Errorf("storage %q is not managed by this node", storageInfo.GetName()) + } + + group, err := newReplicationGroup( + m.ctx, + authority, + storageInfo, + destinatingMgr.nodeHost, + destinatingMgr.dbForReplicationGroup(raftID(storageInfo.GetStorageId())), + m.clusterConfig, + m.logger, + ) + if err != nil { + return nil, fmt.Errorf("initializing replication Raft group: %w", err) + } + return group, nil +} + +func (m *Manager) startReplicators() error { + for _, storageMgr := range m.storageManagers { + replicator := newReplicator(m.ctx, storageMgr.ID(), m.logger, replicatorConfig{ + initAuthorityGroup: func(storageInfo *gitalypb.Storage) (authority, error) { + return m.initReplicationGroup(raftID(storageInfo.GetStorageId()), storageInfo, true) + }, + initReplicaGroup: func(hostingID raftID, storageInfo *gitalypb.Storage) (replica, error) { + return m.initReplicationGroup(hostingID, storageInfo, false) + }, + getNodeAddr: m.getNodeAddr, + initPartitionGroup: func(_authorityID raftID, _partitionID raftID, _relativePath string) error { + // No-op now. + return nil + }, + }) + storageMgr.replicator = replicator + } + + delay := backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))) + delay.BaseDelay = 5 * time.Second + delay.MaxDelay = 60 * time.Second + + errTimer := time.NewTimer(m.metadataGroup.maxNextElectionWait()) + + const maxNoUpdates = 30 + var noUpdates uint + noUpdatesTimer := time.NewTimer(delay.BaseDelay) + + m.running.Add(1) + go func() { + defer m.running.Done() + for { + var err error + + clusterInfo, err := m.ClusterInfo() + if err == nil { + for _, storageMgr := range m.storageManagers { + var changes int + if changes, err = storageMgr.replicator.ApplyReplicaGroups(clusterInfo); err != nil { + break + } + if err = storageMgr.saveStorageInfo(m.ctx, clusterInfo.Storages[storageMgr.ID().ToUint64()]); err != nil { + break + } + if changes != 0 { + noUpdates = 0 + } + } + } + + if err != nil { + if m.ctx.Err() != nil { + return + } + m.logger.WithError(err).WithField("cluster", clusterInfo).Error("error while monitoring replica changes") + errTimer.Reset(m.metadataGroup.maxNextElectionWait()) + noUpdates = 0 + select { + case <-m.ctx.Done(): + errTimer.Stop() + return + case <-errTimer.C: + errTimer.Stop() + continue + } + } + + // Push back the next polling point of time. This practice reduces the polling + // rate when the cluster is stable. + if noUpdates < maxNoUpdates { + noUpdates++ + } + noUpdatesTimer.Reset(delay.Backoff(noUpdates)) + select { + case <-m.ctx.Done(): + noUpdatesTimer.Stop() + return + case <-noUpdatesTimer.C: + noUpdatesTimer.Stop() + } + } + }() + + return nil +} + // Ready returns if the Raft manager is ready. func (m *Manager) Ready() bool { return m.started.Load() @@ -252,8 +382,12 @@ func (m *Manager) Close() { m.logger.WithError(err).Warn("fail to stop metadata Raft group") } for _, storageMgr := range m.storageManagers { - storageMgr.Close() + if err := storageMgr.Close(); err != nil { + m.logger.WithError(err).Error("stopping storage") + } } + m.cancel() + m.running.Wait() m.logger.Info("Raft cluster has stopped") } @@ -267,3 +401,13 @@ func (m *Manager) ClusterInfo() (*gitalypb.Cluster, error) { } return m.metadataGroup.ClusterInfo() } + +func (m *Manager) getNodeAddr(nodeID raftID) (string, error) { + // Right now, all storages are also initial members. In the future, the manager needs to contact + // the metadata Raft group or gossip. + addr, exist := m.clusterConfig.InitialMembers[fmt.Sprintf("%d", nodeID)] + if !exist { + return "", fmt.Errorf("address of storage %d does not exist", nodeID) + } + return addr, nil +} diff --git a/internal/gitaly/storage/raft/storage.go b/internal/gitaly/storage/raft/storage.go index 20759e021a..5d476cdd2c 100644 --- a/internal/gitaly/storage/raft/storage.go +++ b/internal/gitaly/storage/raft/storage.go @@ -21,6 +21,7 @@ type storageManager struct { db dbAccessor nodeHost *dragonboat.NodeHost persistedInfo *gitalypb.Storage + replicator *replicator } // newStorageManager returns an instance of storage manager. @@ -34,7 +35,15 @@ func newStorageManager(name string, ptnMgr *storagemgr.PartitionManager, nodeHos } // Close closes the storage manager. -func (m *storageManager) Close() { m.nodeHost.Close() } +func (m *storageManager) Close() (returnedErr error) { + if m.replicator != nil { + if err := m.replicator.Close(); err != nil { + returnedErr = err + } + } + m.nodeHost.Close() + return +} // ID returns the ID of the storage from persistent storage. func (m *storageManager) ID() raftID { @@ -81,3 +90,7 @@ func (m *storageManager) saveStorageInfo(ctx context.Context, storage *gitalypb. func (m *storageManager) dbForMetadataGroup() dbAccessor { return dbForMetadataGroup(m.ptnMgr, m.name) } + +func (m *storageManager) dbForReplicationGroup(targetID raftID) dbAccessor { + return dbForReplicationGroup(m.ptnMgr, m.ID(), m.name, targetID) +} diff --git a/internal/gitaly/storage/raft/testhelper_test.go b/internal/gitaly/storage/raft/testhelper_test.go index 94273e93f3..81a44d901f 100644 --- a/internal/gitaly/storage/raft/testhelper_test.go +++ b/internal/gitaly/storage/raft/testhelper_test.go @@ -360,7 +360,7 @@ func setupTestDBManager(t *testing.T, cfg config.Cfg) *keyvalue.DBManager { return dbMgr } -func setupTestPartitionManager(t *testing.T, cfg config.Cfg) *storagemgr.PartitionManager { +func setupTestDBManagerAndPartitionManager(t *testing.T, cfg config.Cfg) (*keyvalue.DBManager, *storagemgr.PartitionManager) { logger := testhelper.NewLogger(t) dbMgr := setupTestDBManager(t, cfg) @@ -374,6 +374,12 @@ func setupTestPartitionManager(t *testing.T, cfg config.Cfg) *storagemgr.Partiti require.NoError(t, err) t.Cleanup(partitionManager.Close) + return dbMgr, partitionManager +} + +func setupTestPartitionManager(t *testing.T, cfg config.Cfg) *storagemgr.PartitionManager { + _, partitionManager := setupTestDBManagerAndPartitionManager(t, cfg) + return partitionManager } -- GitLab From e01b6220f9aa9047a8bb76ed998cde02800669f2 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Wed, 31 Jul 2024 15:55:52 +0700 Subject: [PATCH 08/14] storagemgr: Add a flag to log entry marking repository creation Recently, the log entry doesn't tell if a repository is created as a part of that entry. Although that could be implied from log entry's file and directory operations, it's more reliable to have a formal flag marking such events. --- .../storage/storagemgr/transaction_manager.go | 4 + .../transaction_manager_hook_test.go | 5 + .../transaction_manager_repo_test.go | 30 +- proto/go/gitalypb/log.pb.go | 419 ++++++++++-------- proto/log.proto | 6 + 5 files changed, 285 insertions(+), 179 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index bf6e716f06..64fd9b59e3 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -2177,6 +2177,10 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } } + if transaction.repositoryCreation != nil { + logEntry.RepositoryCreation = &gitalypb.LogEntry_RepositoryCreation{} + } + if transaction.deleteRepository { logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go index 48b04bb098..21abe41858 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_hook_test.go @@ -18,6 +18,8 @@ type hookFunc func(hookContext) type hookContext struct { // closeManager calls the calls Close on the TransactionManager. closeManager func() + // accessManager triggers a callback against the TransactionManager. + accessManager func(func(*TransactionManager)) } // installHooks takes the hooks in the test setup and configures them in the TransactionManager. @@ -41,6 +43,9 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, *destination = func() { runHook(hookContext{ closeManager: mgr.Close, + accessManager: func(callback func(*TransactionManager)) { + callback(mgr) + }, }) } } diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_repo_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_repo_test.go index 5df7815054..86135cddfe 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_repo_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_repo_test.go @@ -3,20 +3,33 @@ package storagemgr import ( "testing" + "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { + assertLogEntryHasRepositoryCreation := func(hookCtx hookContext) { + hookCtx.accessManager(func(mgr *TransactionManager) { + entry, err := mgr.readLogEntry(mgr.appliedLSN + 1) + require.NoError(t, err) + testhelper.ProtoEqual(t, &gitalypb.LogEntry_RepositoryCreation{}, entry.RepositoryCreation) + }) + } return []transactionTestCase{ { desc: "create repository when it doesn't exist", steps: steps{ RemoveRepository{}, - StartManager{}, + StartManager{ + Hooks: testTransactionHooks{ + BeforeApplyLogEntry: assertLogEntryHasRepositoryCreation, + }, + }, Begin{ RelativePaths: []string{setup.RelativePath}, }, @@ -78,7 +91,11 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t desc: "create repository with full state", steps: steps{ RemoveRepository{}, - StartManager{}, + StartManager{ + Hooks: testTransactionHooks{ + BeforeApplyLogEntry: assertLogEntryHasRepositoryCreation, + }, + }, Begin{ RelativePaths: []string{setup.RelativePath}, }, @@ -263,7 +280,8 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t RemoveRepository{}, StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { + BeforeApplyLogEntry: func(hc hookContext) { + assertLogEntryHasRepositoryCreation(hc) panic(errSimulatedCrash) }, }, @@ -333,7 +351,11 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t AssertManager{ ExpectedError: errSimulatedCrash, }, - StartManager{}, + StartManager{ + Hooks: testTransactionHooks{ + BeforeApplyLogEntry: assertLogEntryHasRepositoryCreation, + }, + }, }, expectedState: StateAssertion{ Database: DatabaseState{ diff --git a/proto/go/gitalypb/log.pb.go b/proto/go/gitalypb/log.pb.go index b5cb6d241e..dc016edb0c 100644 --- a/proto/go/gitalypb/log.pb.go +++ b/proto/go/gitalypb/log.pb.go @@ -39,6 +39,8 @@ type LogEntry struct { ReferenceTransactions []*LogEntry_ReferenceTransaction `protobuf:"bytes,2,rep,name=reference_transactions,json=referenceTransactions,proto3" json:"reference_transactions,omitempty"` // repository_deletion, when set, indicates this log entry deletes the repository. RepositoryDeletion *LogEntry_RepositoryDeletion `protobuf:"bytes,6,opt,name=repository_deletion,json=repositoryDeletion,proto3" json:"repository_deletion,omitempty"` + // repository_creation, when set, indicates this log entry creates the repository. + RepositoryCreation *LogEntry_RepositoryCreation `protobuf:"bytes,11,opt,name=repository_creation,json=repositoryCreation,proto3" json:"repository_creation,omitempty"` // housekeeping, when set, indicates this log entry contains a housekeeping task. Housekeeping *LogEntry_Housekeeping `protobuf:"bytes,9,opt,name=housekeeping,proto3" json:"housekeeping,omitempty"` // operations is an ordered list of operations to run in order to apply @@ -99,6 +101,13 @@ func (x *LogEntry) GetRepositoryDeletion() *LogEntry_RepositoryDeletion { return nil } +func (x *LogEntry) GetRepositoryCreation() *LogEntry_RepositoryCreation { + if x != nil { + return x.RepositoryCreation + } + return nil +} + func (x *LogEntry) GetHousekeeping() *LogEntry_Housekeeping { if x != nil { return x.Housekeeping @@ -254,6 +263,45 @@ func (*LogEntry_RepositoryDeletion) Descriptor() ([]byte, []int) { return file_log_proto_rawDescGZIP(), []int{0, 1} } +// RepositoryCreation models a repository creation. +type LogEntry_RepositoryCreation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *LogEntry_RepositoryCreation) Reset() { + *x = LogEntry_RepositoryCreation{} + if protoimpl.UnsafeEnabled { + mi := &file_log_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogEntry_RepositoryCreation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEntry_RepositoryCreation) ProtoMessage() {} + +func (x *LogEntry_RepositoryCreation) ProtoReflect() protoreflect.Message { + mi := &file_log_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEntry_RepositoryCreation.ProtoReflect.Descriptor instead. +func (*LogEntry_RepositoryCreation) Descriptor() ([]byte, []int) { + return file_log_proto_rawDescGZIP(), []int{0, 2} +} + // Housekeeping models a housekeeping run. It is supposed to handle housekeeping tasks for repositories such as the // cleanup of unneeded files and optimizations for the repository's data structures. It is a collection of smaller // tasks. @@ -273,7 +321,7 @@ type LogEntry_Housekeeping struct { func (x *LogEntry_Housekeeping) Reset() { *x = LogEntry_Housekeeping{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[4] + mi := &file_log_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -286,7 +334,7 @@ func (x *LogEntry_Housekeeping) String() string { func (*LogEntry_Housekeeping) ProtoMessage() {} func (x *LogEntry_Housekeeping) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[4] + mi := &file_log_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -299,7 +347,7 @@ func (x *LogEntry_Housekeeping) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Housekeeping.ProtoReflect.Descriptor instead. func (*LogEntry_Housekeeping) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 2} + return file_log_proto_rawDescGZIP(), []int{0, 3} } func (x *LogEntry_Housekeeping) GetPackRefs() *LogEntry_Housekeeping_PackRefs { @@ -342,7 +390,7 @@ type LogEntry_Operation struct { func (x *LogEntry_Operation) Reset() { *x = LogEntry_Operation{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[5] + mi := &file_log_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -355,7 +403,7 @@ func (x *LogEntry_Operation) String() string { func (*LogEntry_Operation) ProtoMessage() {} func (x *LogEntry_Operation) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[5] + mi := &file_log_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -368,7 +416,7 @@ func (x *LogEntry_Operation) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Operation.ProtoReflect.Descriptor instead. func (*LogEntry_Operation) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3} + return file_log_proto_rawDescGZIP(), []int{0, 4} } func (m *LogEntry_Operation) GetOperation() isLogEntry_Operation_Operation { @@ -474,7 +522,7 @@ type LogEntry_ReferenceTransaction_Change struct { func (x *LogEntry_ReferenceTransaction_Change) Reset() { *x = LogEntry_ReferenceTransaction_Change{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[6] + mi := &file_log_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -487,7 +535,7 @@ func (x *LogEntry_ReferenceTransaction_Change) String() string { func (*LogEntry_ReferenceTransaction_Change) ProtoMessage() {} func (x *LogEntry_ReferenceTransaction_Change) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[6] + mi := &file_log_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -539,7 +587,7 @@ type LogEntry_Housekeeping_PackRefs struct { func (x *LogEntry_Housekeeping_PackRefs) Reset() { *x = LogEntry_Housekeeping_PackRefs{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[7] + mi := &file_log_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -552,7 +600,7 @@ func (x *LogEntry_Housekeeping_PackRefs) String() string { func (*LogEntry_Housekeeping_PackRefs) ProtoMessage() {} func (x *LogEntry_Housekeeping_PackRefs) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[7] + mi := &file_log_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -565,7 +613,7 @@ func (x *LogEntry_Housekeeping_PackRefs) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Housekeeping_PackRefs.ProtoReflect.Descriptor instead. func (*LogEntry_Housekeeping_PackRefs) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 2, 0} + return file_log_proto_rawDescGZIP(), []int{0, 3, 0} } func (x *LogEntry_Housekeeping_PackRefs) GetPrunedRefs() [][]byte { @@ -595,7 +643,7 @@ type LogEntry_Housekeeping_Repack struct { func (x *LogEntry_Housekeeping_Repack) Reset() { *x = LogEntry_Housekeeping_Repack{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[8] + mi := &file_log_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -608,7 +656,7 @@ func (x *LogEntry_Housekeeping_Repack) String() string { func (*LogEntry_Housekeeping_Repack) ProtoMessage() {} func (x *LogEntry_Housekeeping_Repack) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[8] + mi := &file_log_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -621,7 +669,7 @@ func (x *LogEntry_Housekeeping_Repack) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Housekeeping_Repack.ProtoReflect.Descriptor instead. func (*LogEntry_Housekeeping_Repack) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 2, 1} + return file_log_proto_rawDescGZIP(), []int{0, 3, 1} } func (x *LogEntry_Housekeeping_Repack) GetNewFiles() []string { @@ -656,7 +704,7 @@ type LogEntry_Housekeeping_WriteCommitGraphs struct { func (x *LogEntry_Housekeeping_WriteCommitGraphs) Reset() { *x = LogEntry_Housekeeping_WriteCommitGraphs{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[9] + mi := &file_log_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -669,7 +717,7 @@ func (x *LogEntry_Housekeeping_WriteCommitGraphs) String() string { func (*LogEntry_Housekeeping_WriteCommitGraphs) ProtoMessage() {} func (x *LogEntry_Housekeeping_WriteCommitGraphs) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[9] + mi := &file_log_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -682,7 +730,7 @@ func (x *LogEntry_Housekeeping_WriteCommitGraphs) ProtoReflect() protoreflect.Me // Deprecated: Use LogEntry_Housekeeping_WriteCommitGraphs.ProtoReflect.Descriptor instead. func (*LogEntry_Housekeeping_WriteCommitGraphs) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 2, 2} + return file_log_proto_rawDescGZIP(), []int{0, 3, 2} } // CreateHardLink creates a hard link. The existing inode metadata, including @@ -705,7 +753,7 @@ type LogEntry_Operation_CreateHardLink struct { func (x *LogEntry_Operation_CreateHardLink) Reset() { *x = LogEntry_Operation_CreateHardLink{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[10] + mi := &file_log_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -718,7 +766,7 @@ func (x *LogEntry_Operation_CreateHardLink) String() string { func (*LogEntry_Operation_CreateHardLink) ProtoMessage() {} func (x *LogEntry_Operation_CreateHardLink) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[10] + mi := &file_log_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -731,7 +779,7 @@ func (x *LogEntry_Operation_CreateHardLink) ProtoReflect() protoreflect.Message // Deprecated: Use LogEntry_Operation_CreateHardLink.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_CreateHardLink) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 0} + return file_log_proto_rawDescGZIP(), []int{0, 4, 0} } func (x *LogEntry_Operation_CreateHardLink) GetSourcePath() []byte { @@ -770,7 +818,7 @@ type LogEntry_Operation_RemoveDirectoryEntry struct { func (x *LogEntry_Operation_RemoveDirectoryEntry) Reset() { *x = LogEntry_Operation_RemoveDirectoryEntry{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[11] + mi := &file_log_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -783,7 +831,7 @@ func (x *LogEntry_Operation_RemoveDirectoryEntry) String() string { func (*LogEntry_Operation_RemoveDirectoryEntry) ProtoMessage() {} func (x *LogEntry_Operation_RemoveDirectoryEntry) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[11] + mi := &file_log_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -796,7 +844,7 @@ func (x *LogEntry_Operation_RemoveDirectoryEntry) ProtoReflect() protoreflect.Me // Deprecated: Use LogEntry_Operation_RemoveDirectoryEntry.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_RemoveDirectoryEntry) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 1} + return file_log_proto_rawDescGZIP(), []int{0, 4, 1} } func (x *LogEntry_Operation_RemoveDirectoryEntry) GetPath() []byte { @@ -821,7 +869,7 @@ type LogEntry_Operation_CreateDirectory struct { func (x *LogEntry_Operation_CreateDirectory) Reset() { *x = LogEntry_Operation_CreateDirectory{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[12] + mi := &file_log_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -834,7 +882,7 @@ func (x *LogEntry_Operation_CreateDirectory) String() string { func (*LogEntry_Operation_CreateDirectory) ProtoMessage() {} func (x *LogEntry_Operation_CreateDirectory) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[12] + mi := &file_log_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -847,7 +895,7 @@ func (x *LogEntry_Operation_CreateDirectory) ProtoReflect() protoreflect.Message // Deprecated: Use LogEntry_Operation_CreateDirectory.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_CreateDirectory) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 2} + return file_log_proto_rawDescGZIP(), []int{0, 4, 2} } func (x *LogEntry_Operation_CreateDirectory) GetPath() []byte { @@ -879,7 +927,7 @@ type LogEntry_Operation_SetKey struct { func (x *LogEntry_Operation_SetKey) Reset() { *x = LogEntry_Operation_SetKey{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[13] + mi := &file_log_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -892,7 +940,7 @@ func (x *LogEntry_Operation_SetKey) String() string { func (*LogEntry_Operation_SetKey) ProtoMessage() {} func (x *LogEntry_Operation_SetKey) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[13] + mi := &file_log_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -905,7 +953,7 @@ func (x *LogEntry_Operation_SetKey) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Operation_SetKey.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_SetKey) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 3} + return file_log_proto_rawDescGZIP(), []int{0, 4, 3} } func (x *LogEntry_Operation_SetKey) GetKey() []byte { @@ -935,7 +983,7 @@ type LogEntry_Operation_DeleteKey struct { func (x *LogEntry_Operation_DeleteKey) Reset() { *x = LogEntry_Operation_DeleteKey{} if protoimpl.UnsafeEnabled { - mi := &file_log_proto_msgTypes[14] + mi := &file_log_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -948,7 +996,7 @@ func (x *LogEntry_Operation_DeleteKey) String() string { func (*LogEntry_Operation_DeleteKey) ProtoMessage() {} func (x *LogEntry_Operation_DeleteKey) ProtoReflect() protoreflect.Message { - mi := &file_log_proto_msgTypes[14] + mi := &file_log_proto_msgTypes[15] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -961,7 +1009,7 @@ func (x *LogEntry_Operation_DeleteKey) ProtoReflect() protoreflect.Message { // Deprecated: Use LogEntry_Operation_DeleteKey.ProtoReflect.Descriptor instead. func (*LogEntry_Operation_DeleteKey) Descriptor() ([]byte, []int) { - return file_log_proto_rawDescGZIP(), []int{0, 3, 4} + return file_log_proto_rawDescGZIP(), []int{0, 4, 4} } func (x *LogEntry_Operation_DeleteKey) GetKey() []byte { @@ -975,7 +1023,7 @@ var File_log_proto protoreflect.FileDescriptor var file_log_proto_rawDesc = []byte{ 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x22, 0xe7, 0x0d, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x61, 0x6c, 0x79, 0x22, 0xd3, 0x0e, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x5c, 0x0a, 0x16, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, @@ -989,109 +1037,116 @@ var file_log_proto_rawDesc = []byte{ 0x32, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, - 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x41, 0x0a, 0x0c, 0x68, 0x6f, 0x75, - 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x52, 0x0c, - 0x68, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x3a, 0x0a, 0x0a, - 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, - 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x6f, 0x70, - 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0xc7, 0x01, 0x0a, 0x14, 0x52, 0x65, 0x66, - 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x12, 0x46, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, - 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, - 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x1a, 0x67, 0x0a, 0x06, 0x43, 0x68, 0x61, - 0x6e, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, - 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, - 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, - 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, - 0x4f, 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x65, 0x77, 0x5f, 0x74, 0x61, 0x72, 0x67, 0x65, - 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6e, 0x65, 0x77, 0x54, 0x61, 0x72, 0x67, - 0x65, 0x74, 0x1a, 0x14, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0xa6, 0x03, 0x0a, 0x0c, 0x48, 0x6f, 0x75, - 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x43, 0x0a, 0x09, 0x70, 0x61, 0x63, - 0x6b, 0x5f, 0x72, 0x65, 0x66, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, - 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x61, 0x63, 0x6b, - 0x52, 0x65, 0x66, 0x73, 0x52, 0x08, 0x70, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, 0x12, 0x3c, - 0x0a, 0x06, 0x72, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, - 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x52, 0x65, - 0x70, 0x61, 0x63, 0x6b, 0x52, 0x06, 0x72, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x12, 0x5f, 0x0a, 0x13, - 0x77, 0x72, 0x69, 0x74, 0x65, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x5f, 0x67, 0x72, 0x61, - 0x70, 0x68, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, - 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x43, 0x6f, - 0x6d, 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x52, 0x11, 0x77, 0x72, 0x69, 0x74, - 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x1a, 0x2b, 0x0a, - 0x08, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x72, 0x75, - 0x6e, 0x65, 0x64, 0x5f, 0x72, 0x65, 0x66, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0a, - 0x70, 0x72, 0x75, 0x6e, 0x65, 0x64, 0x52, 0x65, 0x66, 0x73, 0x1a, 0x70, 0x0a, 0x06, 0x52, 0x65, - 0x70, 0x61, 0x63, 0x6b, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x66, 0x69, 0x6c, 0x65, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x46, 0x69, 0x6c, 0x65, - 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x66, 0x69, 0x6c, - 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, - 0x64, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, 0x66, 0x75, 0x6c, - 0x6c, 0x5f, 0x72, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, - 0x69, 0x73, 0x46, 0x75, 0x6c, 0x6c, 0x52, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x1a, 0x13, 0x0a, 0x11, - 0x57, 0x72, 0x69, 0x74, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, - 0x73, 0x1a, 0xf9, 0x05, 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, - 0x55, 0x0a, 0x10, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6c, - 0x69, 0x6e, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, - 0x4c, 0x69, 0x6e, 0x6b, 0x48, 0x00, 0x52, 0x0e, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, - 0x72, 0x64, 0x4c, 0x69, 0x6e, 0x6b, 0x12, 0x67, 0x0a, 0x16, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, - 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x65, 0x6e, 0x74, 0x72, 0x79, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x54, 0x0a, 0x13, 0x72, 0x65, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, + 0x6f, 0x72, 0x79, 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, + 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, + 0x41, 0x0a, 0x0c, 0x68, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x18, + 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, + 0x70, 0x69, 0x6e, 0x67, 0x52, 0x0c, 0x68, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, + 0x6e, 0x67, 0x12, 0x3a, 0x0a, 0x0a, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x18, 0x0a, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, + 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x0a, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0xc7, + 0x01, 0x0a, 0x14, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, + 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x46, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, + 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2c, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, + 0x65, 0x6e, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2e, + 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x1a, + 0x67, 0x0a, 0x06, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, + 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, 0x4f, 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x6e, 0x65, 0x77, + 0x5f, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x6e, + 0x65, 0x77, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x1a, 0x14, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x14, + 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x43, 0x72, 0x65, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0xa6, 0x03, 0x0a, 0x0c, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, + 0x65, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x43, 0x0a, 0x09, 0x70, 0x61, 0x63, 0x6b, 0x5f, 0x72, 0x65, + 0x66, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, + 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, + 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, + 0x52, 0x08, 0x70, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x66, 0x73, 0x12, 0x3c, 0x0a, 0x06, 0x72, 0x65, + 0x70, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x69, 0x74, + 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, + 0x73, 0x65, 0x6b, 0x65, 0x65, 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x52, 0x65, 0x70, 0x61, 0x63, 0x6b, + 0x52, 0x06, 0x72, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x12, 0x5f, 0x0a, 0x13, 0x77, 0x72, 0x69, 0x74, + 0x65, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x5f, 0x67, 0x72, 0x61, 0x70, 0x68, 0x73, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x48, 0x6f, 0x75, 0x73, 0x65, 0x6b, 0x65, 0x65, + 0x70, 0x69, 0x6e, 0x67, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, + 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x52, 0x11, 0x77, 0x72, 0x69, 0x74, 0x65, 0x43, 0x6f, 0x6d, + 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x1a, 0x2b, 0x0a, 0x08, 0x50, 0x61, 0x63, + 0x6b, 0x52, 0x65, 0x66, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x70, 0x72, 0x75, 0x6e, 0x65, 0x64, 0x5f, + 0x72, 0x65, 0x66, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x0a, 0x70, 0x72, 0x75, 0x6e, + 0x65, 0x64, 0x52, 0x65, 0x66, 0x73, 0x1a, 0x70, 0x0a, 0x06, 0x52, 0x65, 0x70, 0x61, 0x63, 0x6b, + 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x65, 0x77, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x08, 0x6e, 0x65, 0x77, 0x46, 0x69, 0x6c, 0x65, 0x73, 0x12, 0x23, 0x0a, + 0x0d, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x66, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x0c, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x46, 0x69, 0x6c, + 0x65, 0x73, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, 0x66, 0x75, 0x6c, 0x6c, 0x5f, 0x72, 0x65, + 0x70, 0x61, 0x63, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x73, 0x46, 0x75, + 0x6c, 0x6c, 0x52, 0x65, 0x70, 0x61, 0x63, 0x6b, 0x1a, 0x13, 0x0a, 0x11, 0x57, 0x72, 0x69, 0x74, + 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x73, 0x1a, 0xf9, 0x05, + 0x0a, 0x09, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x55, 0x0a, 0x10, 0x63, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6c, 0x69, 0x6e, 0x6b, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, 0x4c, 0x69, 0x6e, 0x6b, + 0x48, 0x00, 0x52, 0x0e, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, 0x4c, 0x69, + 0x6e, 0x6b, 0x12, 0x67, 0x0a, 0x16, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64, 0x69, 0x72, + 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x52, + 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x48, 0x00, 0x52, 0x14, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x69, 0x72, + 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x57, 0x0a, 0x10, 0x63, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, + 0x79, 0x48, 0x00, 0x52, 0x0f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x79, 0x12, 0x3c, 0x0a, 0x07, 0x73, 0x65, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, + 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x53, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x48, 0x00, 0x52, 0x06, 0x73, 0x65, 0x74, 0x4b, + 0x65, 0x79, 0x12, 0x45, 0x0a, 0x0a, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x5f, 0x6b, 0x65, 0x79, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, - 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x48, 0x00, 0x52, 0x14, 0x72, 0x65, 0x6d, 0x6f, 0x76, - 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, - 0x57, 0x0a, 0x10, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, - 0x6f, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, - 0x63, 0x74, 0x6f, 0x72, 0x79, 0x48, 0x00, 0x52, 0x0f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, - 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x3c, 0x0a, 0x07, 0x73, 0x65, 0x74, 0x5f, - 0x6b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x67, 0x69, 0x74, 0x61, - 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x53, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x48, 0x00, 0x52, 0x06, - 0x73, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x12, 0x45, 0x0a, 0x0a, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, - 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x4f, 0x70, 0x65, - 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, - 0x48, 0x00, 0x52, 0x09, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x1a, 0x88, 0x01, - 0x0a, 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, 0x4c, 0x69, 0x6e, 0x6b, - 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x61, 0x74, - 0x68, 0x12, 0x2a, 0x0a, 0x11, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x6e, 0x5f, 0x73, - 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x49, 0x6e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, - 0x10, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x61, 0x74, - 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x74, 0x68, 0x1a, 0x2a, 0x0a, 0x14, 0x52, 0x65, 0x6d, 0x6f, - 0x76, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, - 0x70, 0x61, 0x74, 0x68, 0x1a, 0x39, 0x0a, 0x0f, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, - 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x12, 0x0a, 0x04, 0x6d, - 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x1a, - 0x30, 0x0a, 0x06, 0x53, 0x65, 0x74, 0x4b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x1a, 0x1d, 0x0a, 0x09, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x10, - 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, - 0x42, 0x0b, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1b, 0x0a, - 0x03, 0x4c, 0x53, 0x4e, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, - 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, - 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x6e, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x48, 0x00, 0x52, 0x09, + 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x1a, 0x88, 0x01, 0x0a, 0x0e, 0x43, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x48, 0x61, 0x72, 0x64, 0x4c, 0x69, 0x6e, 0x6b, 0x12, 0x1f, 0x0a, 0x0b, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x61, 0x74, 0x68, 0x12, 0x2a, 0x0a, + 0x11, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x6e, 0x5f, 0x73, 0x74, 0x6f, 0x72, 0x61, + 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x49, 0x6e, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x29, 0x0a, 0x10, 0x64, 0x65, 0x73, + 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x50, 0x61, 0x74, 0x68, 0x1a, 0x2a, 0x0a, 0x14, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x69, + 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, + 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, + 0x1a, 0x39, 0x0a, 0x0f, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, + 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x04, 0x70, 0x61, 0x74, 0x68, 0x12, 0x12, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x1a, 0x30, 0x0a, 0x06, 0x53, + 0x65, 0x74, 0x4b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x1a, 0x1d, 0x0a, + 0x09, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x42, 0x0b, 0x0a, 0x09, + 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x1b, 0x0a, 0x03, 0x4c, 0x53, 0x4e, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1106,43 +1161,45 @@ func file_log_proto_rawDescGZIP() []byte { return file_log_proto_rawDescData } -var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 15) +var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 16) var file_log_proto_goTypes = []any{ (*LogEntry)(nil), // 0: gitaly.LogEntry (*LSN)(nil), // 1: gitaly.LSN (*LogEntry_ReferenceTransaction)(nil), // 2: gitaly.LogEntry.ReferenceTransaction (*LogEntry_RepositoryDeletion)(nil), // 3: gitaly.LogEntry.RepositoryDeletion - (*LogEntry_Housekeeping)(nil), // 4: gitaly.LogEntry.Housekeeping - (*LogEntry_Operation)(nil), // 5: gitaly.LogEntry.Operation - (*LogEntry_ReferenceTransaction_Change)(nil), // 6: gitaly.LogEntry.ReferenceTransaction.Change - (*LogEntry_Housekeeping_PackRefs)(nil), // 7: gitaly.LogEntry.Housekeeping.PackRefs - (*LogEntry_Housekeeping_Repack)(nil), // 8: gitaly.LogEntry.Housekeeping.Repack - (*LogEntry_Housekeeping_WriteCommitGraphs)(nil), // 9: gitaly.LogEntry.Housekeeping.WriteCommitGraphs - (*LogEntry_Operation_CreateHardLink)(nil), // 10: gitaly.LogEntry.Operation.CreateHardLink - (*LogEntry_Operation_RemoveDirectoryEntry)(nil), // 11: gitaly.LogEntry.Operation.RemoveDirectoryEntry - (*LogEntry_Operation_CreateDirectory)(nil), // 12: gitaly.LogEntry.Operation.CreateDirectory - (*LogEntry_Operation_SetKey)(nil), // 13: gitaly.LogEntry.Operation.SetKey - (*LogEntry_Operation_DeleteKey)(nil), // 14: gitaly.LogEntry.Operation.DeleteKey + (*LogEntry_RepositoryCreation)(nil), // 4: gitaly.LogEntry.RepositoryCreation + (*LogEntry_Housekeeping)(nil), // 5: gitaly.LogEntry.Housekeeping + (*LogEntry_Operation)(nil), // 6: gitaly.LogEntry.Operation + (*LogEntry_ReferenceTransaction_Change)(nil), // 7: gitaly.LogEntry.ReferenceTransaction.Change + (*LogEntry_Housekeeping_PackRefs)(nil), // 8: gitaly.LogEntry.Housekeeping.PackRefs + (*LogEntry_Housekeeping_Repack)(nil), // 9: gitaly.LogEntry.Housekeeping.Repack + (*LogEntry_Housekeeping_WriteCommitGraphs)(nil), // 10: gitaly.LogEntry.Housekeeping.WriteCommitGraphs + (*LogEntry_Operation_CreateHardLink)(nil), // 11: gitaly.LogEntry.Operation.CreateHardLink + (*LogEntry_Operation_RemoveDirectoryEntry)(nil), // 12: gitaly.LogEntry.Operation.RemoveDirectoryEntry + (*LogEntry_Operation_CreateDirectory)(nil), // 13: gitaly.LogEntry.Operation.CreateDirectory + (*LogEntry_Operation_SetKey)(nil), // 14: gitaly.LogEntry.Operation.SetKey + (*LogEntry_Operation_DeleteKey)(nil), // 15: gitaly.LogEntry.Operation.DeleteKey } var file_log_proto_depIdxs = []int32{ 2, // 0: gitaly.LogEntry.reference_transactions:type_name -> gitaly.LogEntry.ReferenceTransaction 3, // 1: gitaly.LogEntry.repository_deletion:type_name -> gitaly.LogEntry.RepositoryDeletion - 4, // 2: gitaly.LogEntry.housekeeping:type_name -> gitaly.LogEntry.Housekeeping - 5, // 3: gitaly.LogEntry.operations:type_name -> gitaly.LogEntry.Operation - 6, // 4: gitaly.LogEntry.ReferenceTransaction.changes:type_name -> gitaly.LogEntry.ReferenceTransaction.Change - 7, // 5: gitaly.LogEntry.Housekeeping.pack_refs:type_name -> gitaly.LogEntry.Housekeeping.PackRefs - 8, // 6: gitaly.LogEntry.Housekeeping.repack:type_name -> gitaly.LogEntry.Housekeeping.Repack - 9, // 7: gitaly.LogEntry.Housekeeping.write_commit_graphs:type_name -> gitaly.LogEntry.Housekeeping.WriteCommitGraphs - 10, // 8: gitaly.LogEntry.Operation.create_hard_link:type_name -> gitaly.LogEntry.Operation.CreateHardLink - 11, // 9: gitaly.LogEntry.Operation.remove_directory_entry:type_name -> gitaly.LogEntry.Operation.RemoveDirectoryEntry - 12, // 10: gitaly.LogEntry.Operation.create_directory:type_name -> gitaly.LogEntry.Operation.CreateDirectory - 13, // 11: gitaly.LogEntry.Operation.set_key:type_name -> gitaly.LogEntry.Operation.SetKey - 14, // 12: gitaly.LogEntry.Operation.delete_key:type_name -> gitaly.LogEntry.Operation.DeleteKey - 13, // [13:13] is the sub-list for method output_type - 13, // [13:13] is the sub-list for method input_type - 13, // [13:13] is the sub-list for extension type_name - 13, // [13:13] is the sub-list for extension extendee - 0, // [0:13] is the sub-list for field type_name + 4, // 2: gitaly.LogEntry.repository_creation:type_name -> gitaly.LogEntry.RepositoryCreation + 5, // 3: gitaly.LogEntry.housekeeping:type_name -> gitaly.LogEntry.Housekeeping + 6, // 4: gitaly.LogEntry.operations:type_name -> gitaly.LogEntry.Operation + 7, // 5: gitaly.LogEntry.ReferenceTransaction.changes:type_name -> gitaly.LogEntry.ReferenceTransaction.Change + 8, // 6: gitaly.LogEntry.Housekeeping.pack_refs:type_name -> gitaly.LogEntry.Housekeeping.PackRefs + 9, // 7: gitaly.LogEntry.Housekeeping.repack:type_name -> gitaly.LogEntry.Housekeeping.Repack + 10, // 8: gitaly.LogEntry.Housekeeping.write_commit_graphs:type_name -> gitaly.LogEntry.Housekeeping.WriteCommitGraphs + 11, // 9: gitaly.LogEntry.Operation.create_hard_link:type_name -> gitaly.LogEntry.Operation.CreateHardLink + 12, // 10: gitaly.LogEntry.Operation.remove_directory_entry:type_name -> gitaly.LogEntry.Operation.RemoveDirectoryEntry + 13, // 11: gitaly.LogEntry.Operation.create_directory:type_name -> gitaly.LogEntry.Operation.CreateDirectory + 14, // 12: gitaly.LogEntry.Operation.set_key:type_name -> gitaly.LogEntry.Operation.SetKey + 15, // 13: gitaly.LogEntry.Operation.delete_key:type_name -> gitaly.LogEntry.Operation.DeleteKey + 14, // [14:14] is the sub-list for method output_type + 14, // [14:14] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_log_proto_init() } @@ -1200,7 +1257,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Housekeeping); i { + switch v := v.(*LogEntry_RepositoryCreation); i { case 0: return &v.state case 1: @@ -1212,7 +1269,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation); i { + switch v := v.(*LogEntry_Housekeeping); i { case 0: return &v.state case 1: @@ -1224,7 +1281,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[6].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_ReferenceTransaction_Change); i { + switch v := v.(*LogEntry_Operation); i { case 0: return &v.state case 1: @@ -1236,7 +1293,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[7].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Housekeeping_PackRefs); i { + switch v := v.(*LogEntry_ReferenceTransaction_Change); i { case 0: return &v.state case 1: @@ -1248,7 +1305,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[8].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Housekeeping_Repack); i { + switch v := v.(*LogEntry_Housekeeping_PackRefs); i { case 0: return &v.state case 1: @@ -1260,7 +1317,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[9].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Housekeeping_WriteCommitGraphs); i { + switch v := v.(*LogEntry_Housekeeping_Repack); i { case 0: return &v.state case 1: @@ -1272,7 +1329,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[10].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation_CreateHardLink); i { + switch v := v.(*LogEntry_Housekeeping_WriteCommitGraphs); i { case 0: return &v.state case 1: @@ -1284,7 +1341,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[11].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation_RemoveDirectoryEntry); i { + switch v := v.(*LogEntry_Operation_CreateHardLink); i { case 0: return &v.state case 1: @@ -1296,7 +1353,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[12].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation_CreateDirectory); i { + switch v := v.(*LogEntry_Operation_RemoveDirectoryEntry); i { case 0: return &v.state case 1: @@ -1308,7 +1365,7 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[13].Exporter = func(v any, i int) any { - switch v := v.(*LogEntry_Operation_SetKey); i { + switch v := v.(*LogEntry_Operation_CreateDirectory); i { case 0: return &v.state case 1: @@ -1320,6 +1377,18 @@ func file_log_proto_init() { } } file_log_proto_msgTypes[14].Exporter = func(v any, i int) any { + switch v := v.(*LogEntry_Operation_SetKey); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_log_proto_msgTypes[15].Exporter = func(v any, i int) any { switch v := v.(*LogEntry_Operation_DeleteKey); i { case 0: return &v.state @@ -1332,7 +1401,7 @@ func file_log_proto_init() { } } } - file_log_proto_msgTypes[5].OneofWrappers = []any{ + file_log_proto_msgTypes[6].OneofWrappers = []any{ (*LogEntry_Operation_CreateHardLink_)(nil), (*LogEntry_Operation_RemoveDirectoryEntry_)(nil), (*LogEntry_Operation_CreateDirectory_)(nil), @@ -1345,7 +1414,7 @@ func file_log_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_log_proto_rawDesc, NumEnums: 0, - NumMessages: 15, + NumMessages: 16, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/log.proto b/proto/log.proto index fbf7879b79..e18ea34f9b 100644 --- a/proto/log.proto +++ b/proto/log.proto @@ -34,6 +34,10 @@ message LogEntry { message RepositoryDeletion { } + // RepositoryCreation models a repository creation. + message RepositoryCreation { + } + // Housekeeping models a housekeeping run. It is supposed to handle housekeeping tasks for repositories such as the // cleanup of unneeded files and optimizations for the repository's data structures. It is a collection of smaller // tasks. @@ -84,6 +88,8 @@ message LogEntry { repeated ReferenceTransaction reference_transactions = 2; // repository_deletion, when set, indicates this log entry deletes the repository. RepositoryDeletion repository_deletion = 6; + // repository_creation, when set, indicates this log entry creates the repository. + RepositoryCreation repository_creation = 11; // housekeeping, when set, indicates this log entry contains a housekeeping task. Housekeeping housekeeping = 9; -- GitLab From 9765f68f8918990cb0b1ca1f230df36fec0a23ba Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sun, 28 Jul 2024 14:00:46 +0700 Subject: [PATCH 09/14] storagemgr: Add ReadEntry function to LogConsumer That function lets the consumer reads a particular log entry at a LSN when it consumes the log entry. --- internal/backup/log_entry_test.go | 5 +++ .../storage/storagemgr/testhelper_test.go | 19 ++++++++++ .../storage/storagemgr/transaction_manager.go | 14 ++++++++ .../transaction_manager_consumer_test.go | 36 +++++++++++++++++++ 4 files changed, 74 insertions(+) diff --git a/internal/backup/log_entry_test.go b/internal/backup/log_entry_test.go index dffedfd45c..45911641e0 100644 --- a/internal/backup/log_entry_test.go +++ b/internal/backup/log_entry_test.go @@ -20,6 +20,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) type mockLogManager struct { @@ -70,6 +71,10 @@ func (lm *mockLogManager) AcknowledgeTransaction(_ storagemgr.LogConsumer, lsn s } } +func (lm *mockLogManager) ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { + return nil, nil +} + func (lm *mockLogManager) SendNotification() { n := lm.notifications[0] lm.archiver.NotifyNewTransactions(lm.partitionInfo.storageName, lm.partitionInfo.partitionID, n.lowWaterMark, n.highWaterMark) diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go index 9171c431df..4d5055b8de 100644 --- a/internal/gitaly/storage/storagemgr/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/testhelper_test.go @@ -842,6 +842,16 @@ type ConsumerAcknowledge struct { LSN storage.LSN } +// ConsumerReadEntry asserts the entry returned to the consumer at a point of time. +type ConsumerReadEntry struct { + // LSN is the target LSN by the consumers. + LSN storage.LSN + // ExpectedEntry is the expected returned log entry. + ExpectedEntry *gitalypb.LogEntry + // ExpectedErr is the expected returned error string. + ExpectedErr string +} + // RemoveRepository removes the repository from the disk. It must be run with the TransactionManager // closed. type RemoveRepository struct{} @@ -1354,6 +1364,15 @@ func runTransactionTest(t *testing.T, ctx context.Context, tc transactionTestCas transaction.WriteCommitGraphs(step.Config) case ConsumerAcknowledge: transactionManager.AcknowledgeTransaction(transactionManager.consumer, step.LSN) + case ConsumerReadEntry: + entry, err := transactionManager.ReadEntry(step.LSN) + if step.ExpectedErr == "" { + require.NoError(t, err) + testhelper.ProtoEqual(t, step.ExpectedEntry, entry) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), step.ExpectedErr) + } case RepositoryAssertion: require.Contains(t, openTransactions, step.TransactionID, "test error: transaction's snapshot asserted before beginning it") transaction := openTransactions[step.TransactionID] diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 64fd9b59e3..12b17141fd 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -899,6 +899,8 @@ type LogManager interface { AcknowledgeTransaction(consumer LogConsumer, lsn storage.LSN) // GetTransactionPath returns the path of the log entry's root directory. GetTransactionPath(lsn storage.LSN) string + // ReadEntry returns the log entry object of the LSN. + ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) } // AcknowledgeTransaction acknowledges log entries up and including lsn as successfully processed @@ -919,6 +921,18 @@ func (mgr *TransactionManager) GetTransactionPath(lsn storage.LSN) string { return walFilesPathForLSN(mgr.stateDirectory, lsn) } +// ReadEntry returns the log entry object of the LSN. +func (mgr *TransactionManager) ReadEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { + if lsn < mgr.lowWaterMark() { + return nil, fmt.Errorf("requested log entry is gone") + } + entry, err := mgr.readLogEntry(lsn) + if err != nil { + return nil, fmt.Errorf("reading log entry: %w", err) + } + return entry, nil +} + // consumerPosition tracks the last LSN acknowledged for a consumer. type consumerPosition struct { // position is the last LSN acknowledged as completed by the consumer. diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go index b9872f7b2e..02702bb817 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_consumer_test.go @@ -2,6 +2,7 @@ package storagemgr import ( "context" + "path/filepath" "testing" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -9,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { @@ -80,9 +82,35 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedEntry: &gitalypb.LogEntry{ + RelativePath: setup.RelativePath, + Operations: []*gitalypb.LogEntry_Operation{ + {Operation: &gitalypb.LogEntry_Operation_CreateHardLink_{ + CreateHardLink: &gitalypb.LogEntry_Operation_CreateHardLink{ + SourcePath: []byte("1"), + DestinationPath: []byte(filepath.Join(setup.RelativePath, "refs/heads/main")), + }, + }}, + }, + ReferenceTransactions: []*gitalypb.LogEntry_ReferenceTransaction{ + {Changes: []*gitalypb.LogEntry_ReferenceTransaction_Change{ + { + ReferenceName: []byte("refs/heads/main"), + NewOid: []byte(setup.Commits.First.OID), + }, + }}, + }, + }, + }, ConsumerAcknowledge{ LSN: 1, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedErr: "requested log entry is gone", + }, }, expectedState: StateAssertion{ Database: DatabaseState{ @@ -137,6 +165,14 @@ func generateConsumerTests(t *testing.T, setup testTransactionSetup) []transacti ConsumerAcknowledge{ LSN: 2, }, + ConsumerReadEntry{ + LSN: 1, + ExpectedErr: "requested log entry is gone", + }, + ConsumerReadEntry{ + LSN: 2, + ExpectedErr: "requested log entry is gone", + }, }, expectedState: StateAssertion{ Database: DatabaseState{ -- GitLab From 6e93eb459c1a264023367aac7f064ce2284e68eb Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sun, 28 Jul 2024 14:22:01 +0700 Subject: [PATCH 10/14] storagemgr: Simplify log consumer factory Recently, the PartitionManager triggers a factory to create a log consumer with corresponding arguments. The factory returns a log consumer object and a clean up function. This commit simplifies the log consumer factory so that it returns the log consumer object only. The partition manager triggers the cleanup method directly from that consumer instead. --- internal/cli/gitaly/serve.go | 4 ++-- .../storage/storagemgr/partition_manager.go | 22 +++++++++++-------- .../storage/storagemgr/testhelper_test.go | 2 ++ .../storage/storagemgr/transaction_manager.go | 4 +++- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index d94704aac6..4ec5b970f9 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -392,12 +392,12 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("resolving write-ahead log backup sink: %w", err) } - consumerFactory = func(lma storagemgr.LogManagerAccessor) (storagemgr.LogConsumer, func()) { + consumerFactory = func(lma storagemgr.LogManagerAccessor) storagemgr.LogConsumer { walArchiver := backup.NewLogEntryArchiver(logger, walSink, cfg.Backup.WALWorkerCount, lma) prometheus.MustRegister(walArchiver) walArchiver.Run() - return walArchiver, walArchiver.Close + return walArchiver } } diff --git a/internal/gitaly/storage/storagemgr/partition_manager.go b/internal/gitaly/storage/storagemgr/partition_manager.go index 0aa49fcaf4..dca012bc44 100644 --- a/internal/gitaly/storage/storagemgr/partition_manager.go +++ b/internal/gitaly/storage/storagemgr/partition_manager.go @@ -54,8 +54,10 @@ type PartitionManager struct { // transactionManagerFactory is a factory to create TransactionManagers. This shouldn't ever be changed // during normal operation, but can be used to adjust the transaction manager's behaviour in tests. transactionManagerFactory transactionManagerFactory - // consumerCleanup closes the LogConsumer. - consumerCleanup func() + // consumer is a hook that will be passed to the each TransactionManager. The Transactionmanager + // notifies the consumer of new transactions by invoking the NotifyNewTransaction method after + // they are committed. + consumer LogConsumer // metrics accounts for all metrics of transaction operations. It will be // passed down to each transaction manager and is shared between them. The // metrics must be registered to be collected by prometheus collector. @@ -284,13 +286,10 @@ func NewPartitionManager( metrics: metrics, } - var logConsumer LogConsumer - var cleanup func() if consumerFactory != nil { - logConsumer, cleanup = consumerFactory(pm) + pm.consumer = consumerFactory(pm) } - pm.consumerCleanup = cleanup pm.transactionManagerFactory = func( logger log.Logger, partitionID storage.PartitionID, @@ -312,7 +311,7 @@ func NewPartitionManager( metrics.housekeeping, metrics.snapshot.Scope(storageMgr.name), ), - logConsumer, + pm.consumer, ) } @@ -449,6 +448,11 @@ func (pm *PartitionManager) CallLogManager(ctx context.Context, storageName stri return nil } +// GetLogConsumer returns the registered log consumer if any. +func (pm *PartitionManager) GetLogConsumer() LogConsumer { + return pm.consumer +} + // StorageKV executes the provided function against the storage's metadata DB. All write operations // issued by the function are committed in an atomic fashion. All read operations are performed // against a snapshot of the database. @@ -603,8 +607,8 @@ func (pm *PartitionManager) Close() { }() } - if pm.consumerCleanup != nil { - pm.consumerCleanup() + if pm.consumer != nil { + pm.consumer.Close() } activeStorages.Wait() diff --git a/internal/gitaly/storage/storagemgr/testhelper_test.go b/internal/gitaly/storage/storagemgr/testhelper_test.go index 4d5055b8de..39d021e440 100644 --- a/internal/gitaly/storage/storagemgr/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/testhelper_test.go @@ -907,6 +907,8 @@ func (lc *MockLogConsumer) NotifyNewTransactions(storageName string, partitionID lc.highWaterMark = highWaterMark } +func (lc *MockLogConsumer) Close() {} + // ConsumerState is used to track the log positions received by the consumer and the corresponding // acknowledgements from the consumer to the manager. We deliberately do not track the LowWaterMark // sent to consumers as this is non-deterministic. diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 12b17141fd..476883b8b7 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -875,6 +875,8 @@ type LogConsumer interface { // LSNs are sent so that a newly initialized consumer is aware of the full range of // entries it can process. NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark, highWaterMark storage.LSN) + // Close shuts down the log consumer. It is called when the partition manager shutdowns. + Close() } // LogManagerAccessor is the interface used by the LogManager coordinator. It is called by @@ -889,7 +891,7 @@ type LogManagerAccessor interface { // LogConsumerFactory returns a LogConsumer that requires a LogManagerAccessor for construction and // a function to close the LogConsumer. -type LogConsumerFactory func(LogManagerAccessor) (_ LogConsumer, cleanup func()) +type LogConsumerFactory func(LogManagerAccessor) LogConsumer // LogManager is the interface used on the consumer side of the integration. The consumer // has the ability to acknowledge transactions as having been processed with AcknowledgeTransaction. -- GitLab From 8225b0c8a0d325f2a9c2fb2719a293a2f59432cd Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sun, 28 Jul 2024 23:15:21 +0700 Subject: [PATCH 11/14] raft: Replace "raft_" log prefix to "raft." Recently, all loggers inside Raft package have "raft_" prefix. It works well on the local environment. However, on production, the logs are not grouped in log aggregation systems such as Kibana. This commit replaces all "raft_" log prefixes to "raft". --- internal/gitaly/storage/raft/logger.go | 2 +- internal/gitaly/storage/raft/manager.go | 12 ++++++------ internal/gitaly/storage/raft/metadata_group.go | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/gitaly/storage/raft/logger.go b/internal/gitaly/storage/raft/logger.go index 733506e22f..70d1c81f75 100644 --- a/internal/gitaly/storage/raft/logger.go +++ b/internal/gitaly/storage/raft/logger.go @@ -73,7 +73,7 @@ func SetLogger(logger log.Logger, suppressDefaultLog bool) { name: pkgName, Logger: logger.WithFields(log.Fields{ "component": "raft", - "raft_component": strings.ToLower(pkgName), + "raft.component": strings.ToLower(pkgName), }), } }) diff --git a/internal/gitaly/storage/raft/manager.go b/internal/gitaly/storage/raft/manager.go index e69b62148f..ee4f998a33 100644 --- a/internal/gitaly/storage/raft/manager.go +++ b/internal/gitaly/storage/raft/manager.go @@ -86,9 +86,9 @@ func NewManager( managerConfig: managerCfg, logger: logger.WithFields(log.Fields{ "component": "raft", - "raft_component": "manager", - "raft_cluster_id": clusterCfg.ClusterID, - "raft_node_id": clusterCfg.NodeID, + "raft.component": "manager", + "raft.cluster_id": clusterCfg.ClusterID, + "raft.node_id": clusterCfg.NodeID, }), storageManagers: map[string]*storageManager{}, running: sync.WaitGroup{}, @@ -104,7 +104,7 @@ func NewManager( DefaultNodeRegistryEnabled: false, EnableMetrics: true, RaftEventListener: &raftLogger{ - Logger: m.logger.WithField("raft_component", "system"), + Logger: m.logger.WithField("raft.component", "system"), }, Expert: managerCfg.expertConfig, }) @@ -147,8 +147,8 @@ func (m *Manager) Start() (returnedErr error) { }() m.logger.WithFields(log.Fields{ - "raft_config": m.clusterConfig, - "raft_manager_conf": m.managerConfig, + "raft.config": m.clusterConfig, + "raft.manager_conf": m.managerConfig, }).Info("Raft cluster is starting") // A Gitaly node contains multiple independent storages, and each storage maps to a dragonboat diff --git a/internal/gitaly/storage/raft/metadata_group.go b/internal/gitaly/storage/raft/metadata_group.go index 1971b12f19..09369acd47 100644 --- a/internal/gitaly/storage/raft/metadata_group.go +++ b/internal/gitaly/storage/raft/metadata_group.go @@ -61,9 +61,9 @@ func newMetadataRaftGroup(ctx context.Context, nodeHost *dragonboat.NodeHost, db } groupLogger := logger.WithFields(log.Fields{ - "raft_group": "metadata", - "raft_group_id": MetadataGroupID, - "raft_replica_id": clusterCfg.NodeID, + "raft.group": "metadata", + "raft.group_id": MetadataGroupID, + "raft.replica_id": clusterCfg.NodeID, }) groupLogger.Info("joined metadata group") -- GitLab From 2dc2a650caa30e1a1d5fb40a4c2833e16abd0156 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 1 Aug 2024 16:39:40 +0700 Subject: [PATCH 12/14] storagemgr: Fix a small typo --- internal/gitaly/storage/storagemgr/transaction_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index 476883b8b7..97d48e9eb5 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -2799,7 +2799,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // to transaction operations. // // To ensure that we don't modify existing tables and autocompact, we lock the existing tables -// before applying the updates. This way the reftable backend willl only create new tables +// before applying the updates. This way the reftable backend will only create new tables func (mgr *TransactionManager) verifyReferencesWithGitForReftables( ctx context.Context, referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, -- GitLab From f4a1f870081e64d57fa59f34d071b10e762ad7f1 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sun, 28 Jul 2024 14:02:07 +0700 Subject: [PATCH 13/14] raft: Implement log consumer for Raft to consume WAL log entries This commit implements a LogConsumer that acts as a bridge that connects WAL and Raft replicator. It implements WAL's `storagemgr.LogConsumer` interface, listens for new notifications, and then pushes new notifications to the corresponding Raft replicator. The replicator then broadcast the changes to desired Raft groups. --- internal/gitaly/storage/raft/log_consumer.go | 202 +++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 internal/gitaly/storage/raft/log_consumer.go diff --git a/internal/gitaly/storage/raft/log_consumer.go b/internal/gitaly/storage/raft/log_consumer.go new file mode 100644 index 0000000000..293a0c1e9e --- /dev/null +++ b/internal/gitaly/storage/raft/log_consumer.go @@ -0,0 +1,202 @@ +package raft + +import ( + "container/list" + "context" + "fmt" + "sync" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" +) + +type notification struct { + partitionID storage.PartitionID + lowWaterMark storage.LSN + highWaterMark storage.LSN +} + +type partitionState struct { + nextLSN storage.LSN +} + +type notifications struct { + sync.Mutex + storageName string + list *list.List + signal chan struct{} + partitionStates map[storage.PartitionID]*partitionState +} + +func (n *notifications) pop() *notification { + n.Lock() + defer n.Unlock() + + item := n.list.Front() + if item == nil { + return nil + } + + n.list.Remove(item) + return item.Value.(*notification) +} + +type replicatorPusher struct { + finish chan struct{} + done chan struct{} + replicator *replicator +} + +// LogConsumer acts as a bridge that connects WAL and Raft replicator. It implements WAL's +// storagemgr.LogConsumer interface, listens for new notifications, and then pushes new notifications +// to the corresponding Raft replicator. +type LogConsumer struct { + sync.Mutex + + ctx context.Context + logger log.Logger + logManager storagemgr.LogManagerAccessor + pushers map[string]*replicatorPusher + notifications map[string]*notifications +} + +// Push starts a goroutine that pipe the notifications from WAL to the corresponding replicator of a storage. +func (l *LogConsumer) Push(storageName string, replicator *replicator) error { + if _, exist := l.pushers[storageName]; exist { + return fmt.Errorf("storage %q already registered", storageName) + } + + l.pushers[storageName] = &replicatorPusher{ + finish: make(chan struct{}), + done: make(chan struct{}), + replicator: replicator, + } + go l.pushNotifications(l.initializeNotifications(storageName), l.pushers[storageName]) + + return nil +} + +// NotifyNewTransactions is called by WAL's transaction manager when there are new appended logs or +// when the manager restarts. It appends the notification to a linked list. In the background, that +// linked list is consumed by another goroutine initiated by (*LogConsumer).Push. +func (l *LogConsumer) NotifyNewTransactions(storageName string, partitionID storage.PartitionID, lowWaterMark storage.LSN, highWaterMark storage.LSN) { + notifications := l.initializeNotifications(storageName) + + notifications.Lock() + notifications.list.PushBack(¬ification{ + partitionID: partitionID, + lowWaterMark: lowWaterMark, + highWaterMark: highWaterMark, + }) + notifications.Unlock() + + select { + case notifications.signal <- struct{}{}: + default: + } +} + +// Close stops all activities of this log consumer. The function exits when all background goroutines finish. +func (l *LogConsumer) Close() { + for storageName := range l.pushers { + close(l.pushers[storageName].finish) + } + for storageName := range l.pushers { + <-l.pushers[storageName].done + } + l.pushers = map[string]*replicatorPusher{} + l.notifications = map[string]*notifications{} +} + +func (l *LogConsumer) pushNotifications(notifications *notifications, pusher *replicatorPusher) { + defer close(pusher.done) + + for { + select { + case <-pusher.finish: + return + case <-notifications.signal: + } + + for { + select { + case <-pusher.finish: + return + default: + } + + n := notifications.pop() + if n != nil { + break + } + + state, ok := notifications.partitionStates[n.partitionID] + if !ok { + state = &partitionState{nextLSN: n.lowWaterMark} + notifications.partitionStates[n.partitionID] = state + } + + // All log entries are already handled. + if state.nextLSN > n.highWaterMark { + continue + } else if state.nextLSN < n.lowWaterMark { + state.nextLSN = n.lowWaterMark + } + + if err := l.logManager.CallLogManager(l.ctx, notifications.storageName, n.partitionID, func(manager storagemgr.LogManager) { + for lsn := state.nextLSN; lsn <= n.highWaterMark; lsn++ { + entry, err := manager.ReadEntry(lsn) + if err != nil { + l.logger.WithError(err).WithFields(log.Fields{ + "raft.authority_name": notifications.storageName, + "raft.partition_id": n.partitionID, + "raft.lsn": lsn, + }).Error("fail to read log entry") + continue + } + if entry.GetRepositoryCreation() != nil { + if err := pusher.replicator.BroadcastNewPartition(raftID(n.partitionID), entry.RelativePath); err != nil { + l.logger.WithError(err).WithFields(log.Fields{ + "raft.authority_name": notifications.storageName, + "raft.partition_id": n.partitionID, + }).Error("fail to broadcast new partition") + } + } + manager.AcknowledgeTransaction(l, lsn) + } + state.nextLSN = n.highWaterMark + 1 + }); err != nil { + l.logger.WithError(err).Error("failed to acknowledge log entry") + } + } + } +} + +func (l *LogConsumer) initializeNotifications(storageName string) *notifications { + l.Lock() + defer l.Unlock() + + if _, ok := l.notifications[storageName]; !ok { + l.notifications[storageName] = ¬ifications{ + storageName: storageName, + list: &list.List{}, + signal: make(chan struct{}, 1), + } + } + return l.notifications[storageName] +} + +// NewLogConsumer is a factory that returns new LogConsumer object for the input LogManagerAccessor. +func NewLogConsumer(ctx context.Context, lma storagemgr.LogManagerAccessor, logger log.Logger) storagemgr.LogConsumer { + return &LogConsumer{ + ctx: ctx, + logManager: lma, + logger: logger.WithFields(log.Fields{ + "component": "raft", + "raft.component": "log_consumer", + }), + pushers: map[string]*replicatorPusher{}, + notifications: map[string]*notifications{}, + } +} -- GitLab From a6f9532304b871d4eb857cdf91ef3dcec9b15637 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Sun, 28 Jul 2024 14:24:11 +0700 Subject: [PATCH 14/14] raft: Integrate WAL and Raft manager via Raft log consumer In this commit, we create a new Raft consumer and injects it to the WAL partition manager via the log consumer interface. Then, this log consumer is connected to the Raft replicator inside Raft manager. Afterward, the Raft replicator is now able to monitor partition changes (deletion/creation) from the tracking storages. --- internal/cli/gitaly/serve.go | 7 +++++++ internal/gitaly/storage/raft/log_consumer.go | 7 ++++--- internal/gitaly/storage/raft/logger.go | 2 ++ internal/gitaly/storage/raft/manager.go | 15 ++++++++++++++- internal/gitaly/storage/raft/manager_test.go | 2 ++ internal/gitaly/storage/raft/testhelper_test.go | 6 +++++- 6 files changed, 34 insertions(+), 5 deletions(-) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 4ec5b970f9..067d101401 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -400,6 +400,13 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return walArchiver } } + // WAL doesn't support multiple log consumers yet. So, Raft's consumer will shadow other consumers. + // This problem will be handled in https://gitlab.com/gitlab-org/gitaly/-/issues/6105. + if cfg.Raft.Enabled { + consumerFactory = func(lma storagemgr.LogManagerAccessor) storagemgr.LogConsumer { + return raft.NewLogConsumer(ctx, lma, logger) + } + } partitionMgr, err = storagemgr.NewPartitionManager( ctx, diff --git a/internal/gitaly/storage/raft/log_consumer.go b/internal/gitaly/storage/raft/log_consumer.go index 293a0c1e9e..8b1146b2bb 100644 --- a/internal/gitaly/storage/raft/log_consumer.go +++ b/internal/gitaly/storage/raft/log_consumer.go @@ -179,9 +179,10 @@ func (l *LogConsumer) initializeNotifications(storageName string) *notifications if _, ok := l.notifications[storageName]; !ok { l.notifications[storageName] = ¬ifications{ - storageName: storageName, - list: &list.List{}, - signal: make(chan struct{}, 1), + storageName: storageName, + list: &list.List{}, + signal: make(chan struct{}, 1), + partitionStates: map[storage.PartitionID]*partitionState{}, } } return l.notifications[storageName] diff --git a/internal/gitaly/storage/raft/logger.go b/internal/gitaly/storage/raft/logger.go index 70d1c81f75..9118516eca 100644 --- a/internal/gitaly/storage/raft/logger.go +++ b/internal/gitaly/storage/raft/logger.go @@ -85,12 +85,14 @@ func SetLogger(logger log.Logger, suppressDefaultLog bool) { dragonboatLogger.GetLogger("rsm").SetLevel(dragonboatLogger.WARNING) dragonboatLogger.GetLogger("transport").SetLevel(dragonboatLogger.ERROR) dragonboatLogger.GetLogger("grpc").SetLevel(dragonboatLogger.ERROR) + dragonboatLogger.GetLogger("logdb").SetLevel(dragonboatLogger.ERROR) } else { dragonboatLogger.GetLogger("dragonboat").SetLevel(dragonboatLogger.INFO) dragonboatLogger.GetLogger("raft").SetLevel(dragonboatLogger.INFO) dragonboatLogger.GetLogger("rsm").SetLevel(dragonboatLogger.INFO) dragonboatLogger.GetLogger("transport").SetLevel(dragonboatLogger.INFO) dragonboatLogger.GetLogger("grpc").SetLevel(dragonboatLogger.INFO) + dragonboatLogger.GetLogger("logdb").SetLevel(dragonboatLogger.INFO) } }) } diff --git a/internal/gitaly/storage/raft/manager.go b/internal/gitaly/storage/raft/manager.go index ee4f998a33..a022c0862b 100644 --- a/internal/gitaly/storage/raft/manager.go +++ b/internal/gitaly/storage/raft/manager.go @@ -51,6 +51,7 @@ type Manager struct { closed atomic.Bool running sync.WaitGroup + logConsumer *LogConsumer storageManagers map[string]*storageManager firstStorage *storageManager metadataGroup *metadataRaftGroup @@ -116,6 +117,15 @@ func NewManager( if m.firstStorage == nil { m.firstStorage = m.storageManagers[storage.Name] } + consumer := ptnMgr.GetLogConsumer() + if consumer == nil { + return nil, fmt.Errorf("WAL log consumer has not been initialized") + } + raftLogConsumer, ok := consumer.(*LogConsumer) + if !ok { + return nil, fmt.Errorf("mismatched WAL log consumer, %T expected, %T found", raftLogConsumer, consumer) + } + m.logConsumer = raftLogConsumer return m, nil } @@ -281,7 +291,7 @@ func (m *Manager) initReplicationGroup(hostingID raftID, storageInfo *gitalypb.S } func (m *Manager) startReplicators() error { - for _, storageMgr := range m.storageManagers { + for storageName, storageMgr := range m.storageManagers { replicator := newReplicator(m.ctx, storageMgr.ID(), m.logger, replicatorConfig{ initAuthorityGroup: func(storageInfo *gitalypb.Storage) (authority, error) { return m.initReplicationGroup(raftID(storageInfo.GetStorageId()), storageInfo, true) @@ -296,6 +306,9 @@ func (m *Manager) startReplicators() error { }, }) storageMgr.replicator = replicator + if err := m.logConsumer.Push(storageName, replicator); err != nil { + return fmt.Errorf("registering storage %q to Raft's WAL log consumer: %w", storageName, err) + } } delay := backoff.NewDefaultExponential(rand.New(rand.NewSource(time.Now().UnixNano()))) diff --git a/internal/gitaly/storage/raft/manager_test.go b/internal/gitaly/storage/raft/manager_test.go index 7181336d24..b582b80971 100644 --- a/internal/gitaly/storage/raft/manager_test.go +++ b/internal/gitaly/storage/raft/manager_test.go @@ -63,6 +63,8 @@ func TestManager_Start(t *testing.T) { resetManager := func(t *testing.T, node *testNode) { node.manager.Close() + node.ptnManager.GetLogConsumer().Close() + m2, err := NewManager( testhelper.Context(t), node.cfg.Storages, diff --git a/internal/gitaly/storage/raft/testhelper_test.go b/internal/gitaly/storage/raft/testhelper_test.go index 81a44d901f..919ab5528f 100644 --- a/internal/gitaly/storage/raft/testhelper_test.go +++ b/internal/gitaly/storage/raft/testhelper_test.go @@ -370,7 +370,11 @@ func setupTestDBManagerAndPartitionManager(t *testing.T, cfg config.Cfg) (*keyva localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - partitionManager, err := storagemgr.NewPartitionManager(testhelper.Context(t), cfg.Storages, cmdFactory, localRepoFactory, logger, dbMgr, cfg.Prometheus, nil) + consumerFactory := func(lma storagemgr.LogManagerAccessor) storagemgr.LogConsumer { + return NewLogConsumer(testhelper.Context(t), lma, logger) + } + + partitionManager, err := storagemgr.NewPartitionManager(testhelper.Context(t), cfg.Storages, cmdFactory, localRepoFactory, logger, dbMgr, cfg.Prometheus, consumerFactory) require.NoError(t, err) t.Cleanup(partitionManager.Close) -- GitLab