From a36ab9d84c84678b470016d7f176add6f728fea3 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Mon, 25 Jan 2021 21:41:21 -0800 Subject: [PATCH 01/12] RouteVote RPC proto definition for transactions The RouteVote RPC allows the routing of requests and response from the transaction RPCs VoteTransaction and StopTransaction. This allows the transaction service to be hosted on Gitaly, rather than Praefect. --- proto/go/gitalypb/transaction.pb.go | 464 +++++++++++++++++-- proto/transaction.proto | 94 +++- ruby/proto/gitaly/transaction_pb.rb | 22 + ruby/proto/gitaly/transaction_services_pb.rb | 34 ++ 4 files changed, 571 insertions(+), 43 deletions(-) diff --git a/proto/go/gitalypb/transaction.pb.go b/proto/go/gitalypb/transaction.pb.go index 6f70b1cfab..08024665b2 100644 --- a/proto/go/gitalypb/transaction.pb.go +++ b/proto/go/gitalypb/transaction.pb.go @@ -61,7 +61,10 @@ type VoteTransactionRequest struct { // Name of the Gitaly node that's voting on a transaction. Node string `protobuf:"bytes,3,opt,name=node,proto3" json:"node,omitempty"` // SHA1 of the references that are to be updated - ReferenceUpdatesHash []byte `protobuf:"bytes,4,opt,name=reference_updates_hash,json=referenceUpdatesHash,proto3" json:"reference_updates_hash,omitempty"` + ReferenceUpdatesHash []byte `protobuf:"bytes,4,opt,name=reference_updates_hash,json=referenceUpdatesHash,proto3" json:"reference_updates_hash,omitempty"` + // Route UUID is used when the transaction service on Gitaly is used to + // route messages back to Praefect clients + RouteUuid string `protobuf:"bytes,5,opt,name=route_uuid,json=routeUuid,proto3" json:"route_uuid,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -120,6 +123,13 @@ func (m *VoteTransactionRequest) GetReferenceUpdatesHash() []byte { return nil } +func (m *VoteTransactionRequest) GetRouteUuid() string { + if m != nil { + return m.RouteUuid + } + return "" +} + type VoteTransactionResponse struct { State VoteTransactionResponse_TransactionState `protobuf:"varint,1,opt,name=state,proto3,enum=gitaly.VoteTransactionResponse_TransactionState" json:"state,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -162,7 +172,10 @@ func (m *VoteTransactionResponse) GetState() VoteTransactionResponse_Transaction type StopTransactionRequest struct { Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"` // ID of the transaction we're processing - TransactionId uint64 `protobuf:"varint,2,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"` + TransactionId uint64 `protobuf:"varint,2,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"` + // Route UUID is used when the transaction service on Gitaly is used to + // route messages back to Praefect clients + RouteUuid string `protobuf:"bytes,5,opt,name=route_uuid,json=routeUuid,proto3" json:"route_uuid,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -207,6 +220,13 @@ func (m *StopTransactionRequest) GetTransactionId() uint64 { return 0 } +func (m *StopTransactionRequest) GetRouteUuid() string { + if m != nil { + return m.RouteUuid + } + return "" +} + type StopTransactionResponse struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -238,42 +258,295 @@ func (m *StopTransactionResponse) XXX_DiscardUnknown() { var xxx_messageInfo_StopTransactionResponse proto.InternalMessageInfo +// RouteVoteRequest wraps Gitaly messages so that they can +// be properly routed between Praefects. It also allows Praefect to manage +// transactions. +type RouteVoteRequest struct { + // The route UUID allows Gitaly messages to be routed to the correct + // Praefect client + RouteUuid string `protobuf:"bytes,1,opt,name=route_uuid,json=routeUuid,proto3" json:"route_uuid,omitempty"` + // Types that are valid to be assigned to Msg: + // *RouteVoteRequest_OpenRouteRequest + // *RouteVoteRequest_Error + // *RouteVoteRequest_VoteTxRequest + // *RouteVoteRequest_VoteTxResponse + // *RouteVoteRequest_StopTxRequest + // *RouteVoteRequest_StopTxResponse + Msg isRouteVoteRequest_Msg `protobuf_oneof:"msg"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RouteVoteRequest) Reset() { *m = RouteVoteRequest{} } +func (m *RouteVoteRequest) String() string { return proto.CompactTextString(m) } +func (*RouteVoteRequest) ProtoMessage() {} +func (*RouteVoteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_2cc4e03d2c28c490, []int{4} +} + +func (m *RouteVoteRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RouteVoteRequest.Unmarshal(m, b) +} +func (m *RouteVoteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RouteVoteRequest.Marshal(b, m, deterministic) +} +func (m *RouteVoteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_RouteVoteRequest.Merge(m, src) +} +func (m *RouteVoteRequest) XXX_Size() int { + return xxx_messageInfo_RouteVoteRequest.Size(m) +} +func (m *RouteVoteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_RouteVoteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_RouteVoteRequest proto.InternalMessageInfo + +func (m *RouteVoteRequest) GetRouteUuid() string { + if m != nil { + return m.RouteUuid + } + return "" +} + +type isRouteVoteRequest_Msg interface { + isRouteVoteRequest_Msg() +} + +type RouteVoteRequest_OpenRouteRequest struct { + OpenRouteRequest *RouteVoteRequest_OpenRoute `protobuf:"bytes,2,opt,name=open_route_request,json=openRouteRequest,proto3,oneof"` +} + +type RouteVoteRequest_Error struct { + Error *RouteVoteRequest_Status `protobuf:"bytes,3,opt,name=error,proto3,oneof"` +} + +type RouteVoteRequest_VoteTxRequest struct { + VoteTxRequest *VoteTransactionRequest `protobuf:"bytes,5,opt,name=vote_tx_request,json=voteTxRequest,proto3,oneof"` +} + +type RouteVoteRequest_VoteTxResponse struct { + VoteTxResponse *VoteTransactionResponse `protobuf:"bytes,6,opt,name=vote_tx_response,json=voteTxResponse,proto3,oneof"` +} + +type RouteVoteRequest_StopTxRequest struct { + StopTxRequest *StopTransactionRequest `protobuf:"bytes,7,opt,name=stop_tx_request,json=stopTxRequest,proto3,oneof"` +} + +type RouteVoteRequest_StopTxResponse struct { + StopTxResponse *StopTransactionResponse `protobuf:"bytes,8,opt,name=stop_tx_response,json=stopTxResponse,proto3,oneof"` +} + +func (*RouteVoteRequest_OpenRouteRequest) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_Error) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_VoteTxRequest) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_VoteTxResponse) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_StopTxRequest) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_StopTxResponse) isRouteVoteRequest_Msg() {} + +func (m *RouteVoteRequest) GetMsg() isRouteVoteRequest_Msg { + if m != nil { + return m.Msg + } + return nil +} + +func (m *RouteVoteRequest) GetOpenRouteRequest() *RouteVoteRequest_OpenRoute { + if x, ok := m.GetMsg().(*RouteVoteRequest_OpenRouteRequest); ok { + return x.OpenRouteRequest + } + return nil +} + +func (m *RouteVoteRequest) GetError() *RouteVoteRequest_Status { + if x, ok := m.GetMsg().(*RouteVoteRequest_Error); ok { + return x.Error + } + return nil +} + +func (m *RouteVoteRequest) GetVoteTxRequest() *VoteTransactionRequest { + if x, ok := m.GetMsg().(*RouteVoteRequest_VoteTxRequest); ok { + return x.VoteTxRequest + } + return nil +} + +func (m *RouteVoteRequest) GetVoteTxResponse() *VoteTransactionResponse { + if x, ok := m.GetMsg().(*RouteVoteRequest_VoteTxResponse); ok { + return x.VoteTxResponse + } + return nil +} + +func (m *RouteVoteRequest) GetStopTxRequest() *StopTransactionRequest { + if x, ok := m.GetMsg().(*RouteVoteRequest_StopTxRequest); ok { + return x.StopTxRequest + } + return nil +} + +func (m *RouteVoteRequest) GetStopTxResponse() *StopTransactionResponse { + if x, ok := m.GetMsg().(*RouteVoteRequest_StopTxResponse); ok { + return x.StopTxResponse + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*RouteVoteRequest) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*RouteVoteRequest_OpenRouteRequest)(nil), + (*RouteVoteRequest_Error)(nil), + (*RouteVoteRequest_VoteTxRequest)(nil), + (*RouteVoteRequest_VoteTxResponse)(nil), + (*RouteVoteRequest_StopTxRequest)(nil), + (*RouteVoteRequest_StopTxResponse)(nil), + } +} + +// OpenRoute is sent from Praefect to Gitaly to open a new route +// session. All transactions requests from Gitaly with the specified ID will +// route to the Praefect that opened the session. Only one route can be +// opened per stream. Closing the stream will also close the route. +type RouteVoteRequest_OpenRoute struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RouteVoteRequest_OpenRoute) Reset() { *m = RouteVoteRequest_OpenRoute{} } +func (m *RouteVoteRequest_OpenRoute) String() string { return proto.CompactTextString(m) } +func (*RouteVoteRequest_OpenRoute) ProtoMessage() {} +func (*RouteVoteRequest_OpenRoute) Descriptor() ([]byte, []int) { + return fileDescriptor_2cc4e03d2c28c490, []int{4, 0} +} + +func (m *RouteVoteRequest_OpenRoute) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RouteVoteRequest_OpenRoute.Unmarshal(m, b) +} +func (m *RouteVoteRequest_OpenRoute) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RouteVoteRequest_OpenRoute.Marshal(b, m, deterministic) +} +func (m *RouteVoteRequest_OpenRoute) XXX_Merge(src proto.Message) { + xxx_messageInfo_RouteVoteRequest_OpenRoute.Merge(m, src) +} +func (m *RouteVoteRequest_OpenRoute) XXX_Size() int { + return xxx_messageInfo_RouteVoteRequest_OpenRoute.Size(m) +} +func (m *RouteVoteRequest_OpenRoute) XXX_DiscardUnknown() { + xxx_messageInfo_RouteVoteRequest_OpenRoute.DiscardUnknown(m) +} + +var xxx_messageInfo_RouteVoteRequest_OpenRoute proto.InternalMessageInfo + +// Status is copy of google.rpc.Status, which represents errors in gRPC +type RouteVoteRequest_Status struct { + Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RouteVoteRequest_Status) Reset() { *m = RouteVoteRequest_Status{} } +func (m *RouteVoteRequest_Status) String() string { return proto.CompactTextString(m) } +func (*RouteVoteRequest_Status) ProtoMessage() {} +func (*RouteVoteRequest_Status) Descriptor() ([]byte, []int) { + return fileDescriptor_2cc4e03d2c28c490, []int{4, 1} +} + +func (m *RouteVoteRequest_Status) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RouteVoteRequest_Status.Unmarshal(m, b) +} +func (m *RouteVoteRequest_Status) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RouteVoteRequest_Status.Marshal(b, m, deterministic) +} +func (m *RouteVoteRequest_Status) XXX_Merge(src proto.Message) { + xxx_messageInfo_RouteVoteRequest_Status.Merge(m, src) +} +func (m *RouteVoteRequest_Status) XXX_Size() int { + return xxx_messageInfo_RouteVoteRequest_Status.Size(m) +} +func (m *RouteVoteRequest_Status) XXX_DiscardUnknown() { + xxx_messageInfo_RouteVoteRequest_Status.DiscardUnknown(m) +} + +var xxx_messageInfo_RouteVoteRequest_Status proto.InternalMessageInfo + +func (m *RouteVoteRequest_Status) GetCode() int32 { + if m != nil { + return m.Code + } + return 0 +} + +func (m *RouteVoteRequest_Status) GetMessage() string { + if m != nil { + return m.Message + } + return "" +} + func init() { proto.RegisterEnum("gitaly.VoteTransactionResponse_TransactionState", VoteTransactionResponse_TransactionState_name, VoteTransactionResponse_TransactionState_value) proto.RegisterType((*VoteTransactionRequest)(nil), "gitaly.VoteTransactionRequest") proto.RegisterType((*VoteTransactionResponse)(nil), "gitaly.VoteTransactionResponse") proto.RegisterType((*StopTransactionRequest)(nil), "gitaly.StopTransactionRequest") proto.RegisterType((*StopTransactionResponse)(nil), "gitaly.StopTransactionResponse") + proto.RegisterType((*RouteVoteRequest)(nil), "gitaly.RouteVoteRequest") + proto.RegisterType((*RouteVoteRequest_OpenRoute)(nil), "gitaly.RouteVoteRequest.OpenRoute") + proto.RegisterType((*RouteVoteRequest_Status)(nil), "gitaly.RouteVoteRequest.Status") } func init() { proto.RegisterFile("transaction.proto", fileDescriptor_2cc4e03d2c28c490) } var fileDescriptor_2cc4e03d2c28c490 = []byte{ - // 385 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x52, 0xc1, 0x6e, 0xda, 0x40, - 0x14, 0xec, 0x52, 0x63, 0xc1, 0x2b, 0xa5, 0xee, 0xaa, 0x02, 0x97, 0x43, 0x6b, 0x59, 0xaa, 0xe4, - 0x43, 0x6b, 0x23, 0xe8, 0xa1, 0xd7, 0x52, 0xa9, 0x2a, 0x07, 0x44, 0xb5, 0x38, 0x39, 0x70, 0x41, - 0x0b, 0x5e, 0xb0, 0x25, 0xe2, 0x75, 0x76, 0x97, 0x03, 0x3f, 0x92, 0xe4, 0x7f, 0x22, 0x45, 0xf9, - 0xa6, 0x9c, 0x22, 0xbc, 0x01, 0x2c, 0x08, 0xca, 0x31, 0xb7, 0x7d, 0x33, 0x6f, 0xc6, 0x6f, 0x46, - 0x86, 0x8f, 0x4a, 0xd0, 0x54, 0xd2, 0x99, 0x4a, 0x78, 0xea, 0x67, 0x82, 0x2b, 0x8e, 0xcd, 0x45, - 0xa2, 0xe8, 0x72, 0xdd, 0x82, 0x65, 0x92, 0x2a, 0x8d, 0xb5, 0x6a, 0x32, 0xa6, 0x82, 0x45, 0x7a, - 0x72, 0x6f, 0x11, 0x34, 0xce, 0xb9, 0x62, 0xe1, 0x5e, 0x4b, 0xd8, 0xe5, 0x8a, 0x49, 0x85, 0x7f, - 0x01, 0x08, 0x96, 0x71, 0x99, 0x28, 0x2e, 0xd6, 0x36, 0x72, 0x90, 0xf7, 0xae, 0x83, 0x7d, 0xed, - 0xe8, 0x93, 0x1d, 0xd3, 0x33, 0x6e, 0xee, 0xbe, 0x23, 0x52, 0xd8, 0xc5, 0xdf, 0xa0, 0x5e, 0xb8, - 0x65, 0x92, 0x44, 0x76, 0xc9, 0x41, 0x9e, 0x41, 0xde, 0x17, 0xd0, 0x7e, 0x84, 0x31, 0x18, 0x29, - 0x8f, 0x98, 0xfd, 0xd6, 0x41, 0x5e, 0x95, 0xe4, 0x6f, 0xfc, 0x13, 0x1a, 0x82, 0xcd, 0x99, 0x60, - 0xe9, 0x8c, 0x4d, 0x56, 0x59, 0x44, 0x15, 0x93, 0x93, 0x98, 0xca, 0xd8, 0x36, 0x1c, 0xe4, 0xd5, - 0xc8, 0xa7, 0x1d, 0x7b, 0xa6, 0xc9, 0x7f, 0x54, 0xc6, 0xee, 0x15, 0x82, 0xe6, 0x51, 0x0a, 0x99, - 0xf1, 0x54, 0x32, 0xfc, 0x17, 0xca, 0x52, 0x51, 0xc5, 0xf2, 0x04, 0xf5, 0x4e, 0x7b, 0x9b, 0xe0, - 0xc4, 0xbe, 0x5f, 0xc0, 0x46, 0x1b, 0x1d, 0xd1, 0x72, 0xb7, 0x0b, 0xd6, 0x21, 0x85, 0x01, 0xcc, - 0x3f, 0xc3, 0xc1, 0xa0, 0x1f, 0x5a, 0x6f, 0x70, 0x15, 0xca, 0xbf, 0x7b, 0x43, 0x12, 0x5a, 0x08, - 0x57, 0xc0, 0x18, 0x85, 0xc3, 0xff, 0x56, 0xc9, 0x5d, 0x43, 0x63, 0xa4, 0x78, 0xf6, 0x0a, 0xed, - 0xba, 0x9f, 0xa1, 0x79, 0xf4, 0x69, 0x1d, 0xb1, 0x73, 0x8f, 0xa0, 0x4e, 0xd8, 0xbc, 0x40, 0xe1, - 0x31, 0x7c, 0x38, 0x28, 0x04, 0x7f, 0x39, 0xd9, 0x54, 0x9e, 0xa0, 0xf5, 0xf5, 0x85, 0x26, 0x5d, - 0xf3, 0xe1, 0xda, 0x2b, 0x55, 0xd0, 0xc6, 0xfb, 0xe0, 0x92, 0xbd, 0xf7, 0xf3, 0xed, 0xec, 0xbd, - 0x4f, 0x44, 0xd8, 0x7a, 0xf7, 0xda, 0xe3, 0xcd, 0xe6, 0x92, 0x4e, 0xfd, 0x19, 0xbf, 0x08, 0xf4, - 0xf3, 0x07, 0x17, 0x8b, 0x40, 0xeb, 0x83, 0xfc, 0x2f, 0x0f, 0x16, 0xfc, 0x69, 0xce, 0xa6, 0x53, - 0x33, 0x87, 0xba, 0x8f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x5f, 0x19, 0xe6, 0x7d, 0x2f, 0x03, 0x00, + // 609 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x54, 0xdf, 0x6e, 0xd3, 0x3e, + 0x14, 0xae, 0xb7, 0xa4, 0x5b, 0x4f, 0xb7, 0x2e, 0x3f, 0xeb, 0xa7, 0x2d, 0x44, 0x02, 0xaa, 0x4a, + 0x48, 0xbd, 0x80, 0x74, 0xea, 0x10, 0x70, 0x4b, 0x27, 0xa1, 0x4c, 0x68, 0x2a, 0xf2, 0x3a, 0x2e, + 0xb8, 0x89, 0xb2, 0xc6, 0x6b, 0x23, 0x6d, 0x71, 0xb0, 0x1d, 0xb4, 0x5d, 0xf3, 0x0e, 0xb0, 0x47, + 0xe1, 0x09, 0x78, 0x10, 0x9e, 0x80, 0x47, 0x40, 0xb6, 0x97, 0x34, 0xed, 0x88, 0x7a, 0xc9, 0x9d, + 0x7d, 0xfe, 0x7c, 0xf9, 0xce, 0xf7, 0x1d, 0x07, 0xfe, 0x93, 0x3c, 0x4a, 0x45, 0x34, 0x95, 0x09, + 0x4b, 0xfd, 0x8c, 0x33, 0xc9, 0x70, 0x73, 0x96, 0xc8, 0xe8, 0xea, 0xd6, 0x83, 0xab, 0x24, 0x95, + 0x26, 0xe6, 0xed, 0x88, 0x79, 0xc4, 0x69, 0x6c, 0x6e, 0xbd, 0x5f, 0x08, 0xf6, 0x3f, 0x32, 0x49, + 0x27, 0x8b, 0x5e, 0x42, 0x3f, 0xe7, 0x54, 0x48, 0xfc, 0x06, 0x80, 0xd3, 0x8c, 0x89, 0x44, 0x32, + 0x7e, 0xeb, 0xa2, 0x2e, 0xea, 0xb7, 0x87, 0xd8, 0x37, 0x88, 0x3e, 0x29, 0x33, 0x23, 0xeb, 0xee, + 0xe7, 0x73, 0x44, 0x2a, 0xb5, 0xf8, 0x19, 0x74, 0x2a, 0x5c, 0xc2, 0x24, 0x76, 0x37, 0xba, 0xa8, + 0x6f, 0x91, 0xdd, 0x4a, 0xf4, 0x24, 0xc6, 0x18, 0xac, 0x94, 0xc5, 0xd4, 0xdd, 0xec, 0xa2, 0x7e, + 0x8b, 0xe8, 0x33, 0x7e, 0x09, 0xfb, 0x9c, 0x5e, 0x52, 0x4e, 0xd3, 0x29, 0x0d, 0xf3, 0x2c, 0x8e, + 0x24, 0x15, 0xe1, 0x3c, 0x12, 0x73, 0xd7, 0xea, 0xa2, 0xfe, 0x0e, 0xf9, 0xbf, 0xcc, 0x9e, 0x9b, + 0x64, 0x10, 0x89, 0x39, 0x7e, 0x0c, 0xc0, 0x59, 0x2e, 0x69, 0x98, 0xe7, 0x49, 0xec, 0xda, 0x1a, + 0xaf, 0xa5, 0x23, 0xe7, 0x79, 0x12, 0xf7, 0xbe, 0x21, 0x38, 0x78, 0x30, 0xa4, 0xc8, 0x58, 0x2a, + 0x28, 0x7e, 0x07, 0xb6, 0x90, 0x91, 0xa4, 0x7a, 0xc0, 0xce, 0xf0, 0xb0, 0x18, 0xb0, 0xa6, 0xde, + 0xaf, 0xc4, 0xce, 0x54, 0x1f, 0x31, 0xed, 0xbd, 0x23, 0x70, 0x56, 0x53, 0x18, 0xa0, 0x79, 0x3c, + 0x3e, 0x3d, 0x3d, 0x99, 0x38, 0x0d, 0xdc, 0x02, 0xfb, 0xed, 0x68, 0x4c, 0x26, 0x0e, 0xc2, 0xdb, + 0x60, 0x9d, 0x4d, 0xc6, 0x1f, 0x9c, 0x8d, 0xde, 0x1d, 0x82, 0xfd, 0x33, 0xc9, 0xb2, 0x7f, 0xa1, + 0xfe, 0x1a, 0xcd, 0x1e, 0xc1, 0xc1, 0x03, 0x66, 0x46, 0x82, 0xde, 0x0f, 0x0b, 0x1c, 0xa2, 0x0a, + 0x95, 0x46, 0x05, 0xdf, 0x65, 0x38, 0xb4, 0x02, 0x87, 0x09, 0x60, 0x96, 0xd1, 0x34, 0x34, 0x35, + 0xdc, 0x34, 0x69, 0x62, 0xed, 0x61, 0xaf, 0x1c, 0x6b, 0x05, 0xd4, 0x1f, 0x67, 0x34, 0xd5, 0xc1, + 0xa0, 0x41, 0x1c, 0x56, 0x5c, 0x8a, 0x4f, 0xbe, 0x06, 0x9b, 0x72, 0xce, 0xb8, 0x5e, 0xa0, 0xf6, + 0xf0, 0x69, 0x2d, 0x8c, 0x72, 0x23, 0x17, 0x41, 0x83, 0x98, 0x7a, 0x1c, 0xc0, 0xde, 0x17, 0x26, + 0x69, 0x28, 0x6f, 0x4a, 0x26, 0xb6, 0x86, 0x78, 0x52, 0xeb, 0xbe, 0xae, 0x0a, 0x1a, 0x64, 0x57, + 0x35, 0x4e, 0x6e, 0x0a, 0x0a, 0xef, 0xc1, 0x59, 0x20, 0x19, 0x79, 0xdc, 0xe6, 0x32, 0x9b, 0x9a, + 0x45, 0x0a, 0x1a, 0xa4, 0x53, 0x60, 0xdd, 0xaf, 0x62, 0x00, 0x7b, 0x42, 0xb2, 0xac, 0x4a, 0x6b, + 0x6b, 0x99, 0xd6, 0xdf, 0x77, 0x45, 0xd1, 0x52, 0x8d, 0x4b, 0xb4, 0x16, 0x48, 0xf7, 0xb4, 0xb6, + 0x97, 0x69, 0xd5, 0x98, 0xab, 0x68, 0x15, 0x58, 0x26, 0xe2, 0xb5, 0xa1, 0x55, 0xfa, 0xe0, 0xbd, + 0x82, 0xa6, 0x51, 0x53, 0xbd, 0xde, 0xa9, 0x7a, 0xbd, 0xca, 0x6a, 0x9b, 0xe8, 0x33, 0x76, 0x61, + 0xeb, 0x9a, 0x0a, 0x11, 0xcd, 0xa8, 0xb6, 0xb6, 0x45, 0x8a, 0xeb, 0xc8, 0x86, 0xcd, 0x6b, 0x31, + 0x1b, 0x7e, 0xdd, 0x80, 0x0e, 0xa1, 0x97, 0x95, 0x0f, 0x63, 0x02, 0x7b, 0x2b, 0x12, 0xe1, 0x35, + 0x36, 0x78, 0xeb, 0xb4, 0x55, 0x98, 0x2b, 0xf3, 0xe1, 0x35, 0x1a, 0x7a, 0xeb, 0x84, 0xc1, 0xc7, + 0xd0, 0x2a, 0x17, 0x0b, 0xbb, 0x75, 0xbb, 0xe6, 0xd5, 0x66, 0xfa, 0xe8, 0x10, 0x79, 0xd6, 0xef, + 0xef, 0x7d, 0x34, 0x3a, 0xfc, 0xa4, 0x8a, 0xae, 0xa2, 0x0b, 0x7f, 0xca, 0xae, 0x07, 0xe6, 0xf8, + 0x82, 0xf1, 0xd9, 0xc0, 0xb4, 0x0e, 0xf4, 0xaf, 0x79, 0x30, 0x63, 0xf7, 0xf7, 0xec, 0xe2, 0xa2, + 0xa9, 0x43, 0x47, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x94, 0x70, 0xdd, 0x7c, 0xe4, 0x05, 0x00, 0x00, } @@ -291,6 +564,40 @@ const _ = grpc.SupportPackageIsVersion4 type RefTransactionClient interface { VoteTransaction(ctx context.Context, in *VoteTransactionRequest, opts ...grpc.CallOption) (*VoteTransactionResponse, error) StopTransaction(ctx context.Context, in *StopTransactionRequest, opts ...grpc.CallOption) (*StopTransactionResponse, error) + // RouteVote allows Praefect to dial to a remote Gitaly and request + // intercepting VoteTransaction and StopTransaction calls made for a specific + // route UUID. For example, given a route UUID of 5: + // + // ┌────────┐ ┌──────┐ ┌────┐ + // │Praefect│ │Gitaly│ │Hook│ + // └───┬────┘ └──┬───┘ └─┬──┘ + // │ RouteVote open route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 opened │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ │ VoteTransaction for 5 │ + // │ │ <──────────────────────── + // │ │ │ + // │ Forward VoteTransactionRequest for 5 │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ Forward VoteTransactionResponse for 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ │ Response │ + // │ │ ────────────────────────> + // │ │ │ + // │ RouteVote close route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 closed │ │ + // │ <──────────────────────────────────────── │ + // ┌───┴────┐ ┌──┴───┐ ┌─┴──┐ + // │Praefect│ │Gitaly│ │Hook│ + // └────────┘ └──────┘ └────┘ + RouteVote(ctx context.Context, opts ...grpc.CallOption) (RefTransaction_RouteVoteClient, error) } type refTransactionClient struct { @@ -319,10 +626,75 @@ func (c *refTransactionClient) StopTransaction(ctx context.Context, in *StopTran return out, nil } +func (c *refTransactionClient) RouteVote(ctx context.Context, opts ...grpc.CallOption) (RefTransaction_RouteVoteClient, error) { + stream, err := c.cc.NewStream(ctx, &_RefTransaction_serviceDesc.Streams[0], "/gitaly.RefTransaction/RouteVote", opts...) + if err != nil { + return nil, err + } + x := &refTransactionRouteVoteClient{stream} + return x, nil +} + +type RefTransaction_RouteVoteClient interface { + Send(*RouteVoteRequest) error + Recv() (*RouteVoteRequest, error) + grpc.ClientStream +} + +type refTransactionRouteVoteClient struct { + grpc.ClientStream +} + +func (x *refTransactionRouteVoteClient) Send(m *RouteVoteRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *refTransactionRouteVoteClient) Recv() (*RouteVoteRequest, error) { + m := new(RouteVoteRequest) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // RefTransactionServer is the server API for RefTransaction service. type RefTransactionServer interface { VoteTransaction(context.Context, *VoteTransactionRequest) (*VoteTransactionResponse, error) StopTransaction(context.Context, *StopTransactionRequest) (*StopTransactionResponse, error) + // RouteVote allows Praefect to dial to a remote Gitaly and request + // intercepting VoteTransaction and StopTransaction calls made for a specific + // route UUID. For example, given a route UUID of 5: + // + // ┌────────┐ ┌──────┐ ┌────┐ + // │Praefect│ │Gitaly│ │Hook│ + // └───┬────┘ └──┬───┘ └─┬──┘ + // │ RouteVote open route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 opened │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ │ VoteTransaction for 5 │ + // │ │ <──────────────────────── + // │ │ │ + // │ Forward VoteTransactionRequest for 5 │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ Forward VoteTransactionResponse for 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ │ Response │ + // │ │ ────────────────────────> + // │ │ │ + // │ RouteVote close route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 closed │ │ + // │ <──────────────────────────────────────── │ + // ┌───┴────┐ ┌──┴───┐ ┌─┴──┐ + // │Praefect│ │Gitaly│ │Hook│ + // └────────┘ └──────┘ └────┘ + RouteVote(RefTransaction_RouteVoteServer) error } // UnimplementedRefTransactionServer can be embedded to have forward compatible implementations. @@ -335,6 +707,9 @@ func (*UnimplementedRefTransactionServer) VoteTransaction(ctx context.Context, r func (*UnimplementedRefTransactionServer) StopTransaction(ctx context.Context, req *StopTransactionRequest) (*StopTransactionResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method StopTransaction not implemented") } +func (*UnimplementedRefTransactionServer) RouteVote(srv RefTransaction_RouteVoteServer) error { + return status.Errorf(codes.Unimplemented, "method RouteVote not implemented") +} func RegisterRefTransactionServer(s *grpc.Server, srv RefTransactionServer) { s.RegisterService(&_RefTransaction_serviceDesc, srv) @@ -376,6 +751,32 @@ func _RefTransaction_StopTransaction_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } +func _RefTransaction_RouteVote_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(RefTransactionServer).RouteVote(&refTransactionRouteVoteServer{stream}) +} + +type RefTransaction_RouteVoteServer interface { + Send(*RouteVoteRequest) error + Recv() (*RouteVoteRequest, error) + grpc.ServerStream +} + +type refTransactionRouteVoteServer struct { + grpc.ServerStream +} + +func (x *refTransactionRouteVoteServer) Send(m *RouteVoteRequest) error { + return x.ServerStream.SendMsg(m) +} + +func (x *refTransactionRouteVoteServer) Recv() (*RouteVoteRequest, error) { + m := new(RouteVoteRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + var _RefTransaction_serviceDesc = grpc.ServiceDesc{ ServiceName: "gitaly.RefTransaction", HandlerType: (*RefTransactionServer)(nil), @@ -389,6 +790,13 @@ var _RefTransaction_serviceDesc = grpc.ServiceDesc{ Handler: _RefTransaction_StopTransaction_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "RouteVote", + Handler: _RefTransaction_RouteVote_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "transaction.proto", } diff --git a/proto/transaction.proto b/proto/transaction.proto index fb11879c46..f8da506a23 100644 --- a/proto/transaction.proto +++ b/proto/transaction.proto @@ -8,21 +8,43 @@ import "lint.proto"; import "shared.proto"; service RefTransaction { - - rpc VoteTransaction (VoteTransactionRequest) returns (VoteTransactionResponse) { - option (op_type) = { - op: MUTATOR - scope_level: REPOSITORY - }; - } - - rpc StopTransaction (StopTransactionRequest) returns (StopTransactionResponse) { - option (op_type) = { - op: MUTATOR - scope_level: REPOSITORY - }; - } - + option (intercepted) = true; + rpc VoteTransaction (VoteTransactionRequest) returns (VoteTransactionResponse); + rpc StopTransaction (StopTransactionRequest) returns (StopTransactionResponse); + // RouteVote allows Praefect to dial to a remote Gitaly and request + // intercepting VoteTransaction and StopTransaction calls made for a specific + // route UUID. For example, given a route UUID of 5: + // + // ┌────────┐ ┌──────┐ ┌────┐ + // │Praefect│ │Gitaly│ │Hook│ + // └───┬────┘ └──┬───┘ └─┬──┘ + // │ RouteVote open route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 opened │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ │ VoteTransaction for 5 │ + // │ │ <──────────────────────── + // │ │ │ + // │ Forward VoteTransactionRequest for 5 │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ Forward VoteTransactionResponse for 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ │ Response │ + // │ │ ────────────────────────> + // │ │ │ + // │ RouteVote close route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 closed │ │ + // │ <──────────────────────────────────────── │ + // ┌───┴────┐ ┌──┴───┐ ┌─┴──┐ + // │Praefect│ │Gitaly│ │Hook│ + // └────────┘ └──────┘ └────┘ + rpc RouteVote (stream RouteVoteRequest) returns (stream RouteVoteRequest); } message VoteTransactionRequest { @@ -33,6 +55,9 @@ message VoteTransactionRequest { string node = 3; // SHA1 of the references that are to be updated bytes reference_updates_hash = 4; + // Route UUID is used when the transaction service on Gitaly is used to + // route messages back to Praefect clients + string route_uuid = 5; } message VoteTransactionResponse { @@ -51,6 +76,45 @@ message StopTransactionRequest { Repository repository = 1[(target_repository)=true]; // ID of the transaction we're processing uint64 transaction_id = 2; + // Route UUID is used when the transaction service on Gitaly is used to + // route messages back to Praefect clients + string route_uuid = 5; } message StopTransactionResponse {} + +// RouteVoteRequest wraps Gitaly messages so that they can +// be properly routed between Praefects. It also allows Praefect to manage +// transactions. +message RouteVoteRequest{ + // The route UUID allows Gitaly messages to be routed to the correct + // Praefect client + string route_uuid = 1; + + // OpenRoute is sent from Praefect to Gitaly to open a new route + // session. All transactions requests from Gitaly with the specified ID will + // route to the Praefect that opened the session. Only one route can be + // opened per stream. Closing the stream will also close the route. + message OpenRoute {} + + // Status is copy of google.rpc.Status, which represents errors in gRPC + message Status { + int32 code = 1; + string message = 2; + } + + oneof msg { + // These messages are used by Praefect to manage route sessions + OpenRoute open_route_request = 2; + + // error can be sent from Praefect to Gitaly as a response to + // any routed RPC + Status error = 3; + + // These messages are wrapped to allow Gitaly RPCs to be routed + VoteTransactionRequest vote_tx_request = 5; + VoteTransactionResponse vote_tx_response = 6; + StopTransactionRequest stop_tx_request = 7; + StopTransactionResponse stop_tx_response = 8; + } +} diff --git a/ruby/proto/gitaly/transaction_pb.rb b/ruby/proto/gitaly/transaction_pb.rb index 59c7f56265..f05a633ea2 100644 --- a/ruby/proto/gitaly/transaction_pb.rb +++ b/ruby/proto/gitaly/transaction_pb.rb @@ -12,6 +12,7 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :transaction_id, :uint64, 2 optional :node, :string, 3 optional :reference_updates_hash, :bytes, 4 + optional :route_uuid, :string, 5 end add_message "gitaly.VoteTransactionResponse" do optional :state, :enum, 1, "gitaly.VoteTransactionResponse.TransactionState" @@ -24,9 +25,27 @@ Google::Protobuf::DescriptorPool.generated_pool.build do add_message "gitaly.StopTransactionRequest" do optional :repository, :message, 1, "gitaly.Repository" optional :transaction_id, :uint64, 2 + optional :route_uuid, :string, 5 end add_message "gitaly.StopTransactionResponse" do end + add_message "gitaly.RouteVoteRequest" do + optional :route_uuid, :string, 1 + oneof :msg do + optional :open_route_request, :message, 2, "gitaly.RouteVoteRequest.OpenRoute" + optional :error, :message, 3, "gitaly.RouteVoteRequest.Status" + optional :vote_tx_request, :message, 5, "gitaly.VoteTransactionRequest" + optional :vote_tx_response, :message, 6, "gitaly.VoteTransactionResponse" + optional :stop_tx_request, :message, 7, "gitaly.StopTransactionRequest" + optional :stop_tx_response, :message, 8, "gitaly.StopTransactionResponse" + end + end + add_message "gitaly.RouteVoteRequest.OpenRoute" do + end + add_message "gitaly.RouteVoteRequest.Status" do + optional :code, :int32, 1 + optional :message, :string, 2 + end end end @@ -36,4 +55,7 @@ module Gitaly VoteTransactionResponse::TransactionState = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.VoteTransactionResponse.TransactionState").enummodule StopTransactionRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.StopTransactionRequest").msgclass StopTransactionResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.StopTransactionResponse").msgclass + RouteVoteRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RouteVoteRequest").msgclass + RouteVoteRequest::OpenRoute = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RouteVoteRequest.OpenRoute").msgclass + RouteVoteRequest::Status = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RouteVoteRequest.Status").msgclass end diff --git a/ruby/proto/gitaly/transaction_services_pb.rb b/ruby/proto/gitaly/transaction_services_pb.rb index 1749482460..79eafe17ba 100644 --- a/ruby/proto/gitaly/transaction_services_pb.rb +++ b/ruby/proto/gitaly/transaction_services_pb.rb @@ -16,6 +16,40 @@ module Gitaly rpc :VoteTransaction, Gitaly::VoteTransactionRequest, Gitaly::VoteTransactionResponse rpc :StopTransaction, Gitaly::StopTransactionRequest, Gitaly::StopTransactionResponse + # RouteVote allows Praefect to dial to a remote Gitaly and request + # intercepting VoteTransaction and StopTransaction calls made for a specific + # route UUID. For example, given a route UUID of 5: + # + # ┌────────┐ ┌──────┐ ┌────┐ + # │Praefect│ │Gitaly│ │Hook│ + # └───┬────┘ └──┬───┘ └─┬──┘ + # │ RouteVote open route 5 │ │ + # │ ────────────────────────────────────────> │ + # │ │ │ + # │ Route 5 opened │ │ + # │ <──────────────────────────────────────── │ + # │ │ │ + # │ │ VoteTransaction for 5 │ + # │ │ <──────────────────────── + # │ │ │ + # │ Forward VoteTransactionRequest for 5 │ │ + # │ <──────────────────────────────────────── │ + # │ │ │ + # │ Forward VoteTransactionResponse for 5 │ │ + # │ ────────────────────────────────────────> │ + # │ │ │ + # │ │ Response │ + # │ │ ────────────────────────> + # │ │ │ + # │ RouteVote close route 5 │ │ + # │ ────────────────────────────────────────> │ + # │ │ │ + # │ Route 5 closed │ │ + # │ <──────────────────────────────────────── │ + # ┌───┴────┐ ┌──┴───┐ ┌─┴──┐ + # │Praefect│ │Gitaly│ │Hook│ + # └────────┘ └──────┘ └────┘ + rpc :RouteVote, stream(Gitaly::RouteVoteRequest), stream(Gitaly::RouteVoteRequest) end Stub = Service.rpc_stub_class -- GitLab From 8d4c1abd139c7adcb7351c6018253eabbc2bc5b1 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Mon, 25 Jan 2021 21:45:31 -0800 Subject: [PATCH 02/12] Praefect transaction service future proofing Ensure that the absence of the new RouteVote RPC, as well as future RPCs, does not stop the Praefect service from compiling. --- internal/gitaly/transaction/manager_test.go | 1 + internal/praefect/service/transaction/server.go | 1 + 2 files changed, 2 insertions(+) diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go index e1d3044f81..1f11d13275 100644 --- a/internal/gitaly/transaction/manager_test.go +++ b/internal/gitaly/transaction/manager_test.go @@ -16,6 +16,7 @@ import ( ) type testTransactionServer struct { + *gitalypb.UnimplementedRefTransactionServer vote func(*gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) stop func(*gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) } diff --git a/internal/praefect/service/transaction/server.go b/internal/praefect/service/transaction/server.go index ceabc22825..3a931140c2 100644 --- a/internal/praefect/service/transaction/server.go +++ b/internal/praefect/service/transaction/server.go @@ -11,6 +11,7 @@ import ( ) type Server struct { + *gitalypb.UnimplementedRefTransactionServer txMgr *transactions.Manager } -- GitLab From 0c3598dbcef549f465e2d50c18476036af4abc31 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Mon, 25 Jan 2021 21:46:48 -0800 Subject: [PATCH 03/12] Gitaly transaction service implementation This commit adds the transaction service implementation for use on Gitaly servers. Rather than interface with a transaction manager like the Praefect implementation, the Gitaly transaction service is designed to only route VoteTransaction and StopTransaction requests via the new RouteVote RPC. By keeping the interface for the reference transaction hooks the same, we minimize the amount of changes in Gitaly. This design shifts the burden to the integration in the Praefect coordinator. RouteVote acts as a pub sub message bus, where the Praefect client subscribes to transactions. Subscribing to a transaction establishes a route to send messages between Praefect and a Gitaly client. --- internal/gitaly/service/transaction/server.go | 275 ++++++++++++++ .../gitaly/service/transaction/server_test.go | 359 ++++++++++++++++++ 2 files changed, 634 insertions(+) create mode 100644 internal/gitaly/service/transaction/server.go create mode 100644 internal/gitaly/service/transaction/server_test.go diff --git a/internal/gitaly/service/transaction/server.go b/internal/gitaly/service/transaction/server.go new file mode 100644 index 0000000000..6251151ff4 --- /dev/null +++ b/internal/gitaly/service/transaction/server.go @@ -0,0 +1,275 @@ +package transaction + +import ( + "context" + "fmt" + "sync" + + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type route string + +type server struct { + // requestPaths are how a Gitaly request gets routed to the correct + // Praefect + requestPaths struct { + sync.RWMutex + pathByUUID map[route]chan<- *gitalypb.RouteVoteRequest + } + + // responsePaths are how a Praefect response gets routed to the correct + // Gitaly RPC call + responsePaths struct { + sync.RWMutex + pathByUUID map[route]chan<- *gitalypb.RouteVoteRequest + } +} + +// NewServer returns a Gitaly transaction server capable of routing votes to a +// Praefect client +func NewServer() gitalypb.RefTransactionServer { + s := &server{} + s.requestPaths.pathByUUID = map[route]chan<- *gitalypb.RouteVoteRequest{} + s.responsePaths.pathByUUID = map[route]chan<- *gitalypb.RouteVoteRequest{} + return s +} + +// VoteTransaction will attempt to route the request to a listening Praefect +// client and then wait for a response to be routed back. +func (s *server) VoteTransaction(ctx context.Context, req *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + wrappedReq := &gitalypb.RouteVoteRequest{ + RouteUuid: req.GetRouteUuid(), + Msg: &gitalypb.RouteVoteRequest_VoteTxRequest{req}, + } + + resp, err := s.routeRPC(ctx, wrappedReq) + if err != nil { + return nil, status.Errorf(status.Code(err), "VoteTransaction: routing RPC: %v", err) + } + + switch r := resp.Msg.(type) { + case *gitalypb.RouteVoteRequest_VoteTxResponse: + return r.VoteTxResponse, nil + case *gitalypb.RouteVoteRequest_Error: + return nil, status.New(codes.Code(r.Error.Code), r.Error.Message).Err() + default: + return nil, helper.ErrInternalf("VoteTransaction: unexpected response type %T", resp.Msg) + } +} + +// StopTransaction will attempt to route the request to a listening Praefect +// client and then wait for a response to be routed back. +func (s *server) StopTransaction(ctx context.Context, req *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + wrappedReq := &gitalypb.RouteVoteRequest{ + RouteUuid: req.GetRouteUuid(), + Msg: &gitalypb.RouteVoteRequest_StopTxRequest{req}, + } + + resp, err := s.routeRPC(ctx, wrappedReq) + if err != nil { + return nil, status.Errorf(status.Code(err), "StopTransaction: routing RPC: %v", err) + } + + switch r := resp.Msg.(type) { + case *gitalypb.RouteVoteRequest_StopTxResponse: + return r.StopTxResponse, nil + case *gitalypb.RouteVoteRequest_Error: + return nil, status.New(codes.Code(r.Error.Code), r.Error.Message).Err() + default: + return nil, helper.ErrInternalf("StopTransaction: unexpected response type %T", resp.Msg) + } +} + +func (s *server) routeRPC(ctx context.Context, req *gitalypb.RouteVoteRequest) (*gitalypb.RouteVoteRequest, error) { + respPath := make(chan *gitalypb.RouteVoteRequest) + defer close(respPath) + + routeUUID := route(req.GetRouteUuid()) + + cleanup, err := s.openRespPath(respPath, routeUUID) + if err != nil { + return nil, status.Errorf(status.Code(err), "opening response path: %v", err) + } + defer cleanup() + + if err := s.routeRequest(ctx, routeUUID, req); err != nil { + return nil, status.Errorf(status.Code(err), "routing request: %v", err) + } + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("waiting for response: %w", ctx.Err()) + case resp, ok := <-respPath: + if !ok { + return nil, helper.ErrInternalf("response route unexpectedly closed") + } + return resp, nil + } +} + +func (s *server) RouteVote(bidi gitalypb.RefTransaction_RouteVoteServer) error { + ctx, cancel := context.WithCancel(bidi.Context()) + defer cancel() + + reqPath := make(chan *gitalypb.RouteVoteRequest) + defer close(reqPath) + + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { return s.processOutgoing(ctx, reqPath, bidi) }) + eg.Go(func() error { return s.processIncoming(ctx, reqPath, bidi) }) + + return eg.Wait() +} + +// processOutgoing handles receiving messages from other clients that need to be +// delivered to the caller of this stream +func (s *server) processOutgoing(ctx context.Context, reqPath <-chan *gitalypb.RouteVoteRequest, stream gitalypb.RefTransaction_RouteVoteServer) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case msg, ok := <-reqPath: + if !ok { + return nil + } + if err := stream.Send(msg); err != nil { + return err + } + } + } +} + +// processIncoming handles each incoming message to this stream +func (s *server) processIncoming(ctx context.Context, reqPath chan<- *gitalypb.RouteVoteRequest, bidi gitalypb.RefTransaction_RouteVoteServer) error { + var sessionOpened bool + for { + var req *gitalypb.RouteVoteRequest + var err error + + // there is no way to cancel early from a stream recv, so we + // wrap it in a goroutine to select it against context cancelation + done := make(chan struct{}) + go func() { + defer close(done) + req, err = bidi.Recv() // should return when RPC is done + }() + + select { + case <-done: + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + + routeUUID := route(req.GetRouteUuid()) + + switch req.GetMsg().(type) { + case *gitalypb.RouteVoteRequest_OpenRouteRequest: + if sessionOpened { + return status.Error(codes.AlreadyExists, "route already exists for stream") + } + if err := s.openReqPath(reqPath, routeUUID, bidi); err != nil { + return err + } + sessionOpened = true + defer s.closeRoute(routeUUID) + default: + if err := s.routeResponse(ctx, routeUUID, req); err != nil { + return err + } + } + } +} + +func (s *server) closeRoute(routeUUID route) { + s.requestPaths.Lock() + delete(s.requestPaths.pathByUUID, routeUUID) + s.requestPaths.Unlock() +} + +func (s *server) openReqPath(reqPath chan<- *gitalypb.RouteVoteRequest, routeID route, bidi gitalypb.RefTransaction_RouteVoteServer) error { + s.requestPaths.Lock() + defer s.requestPaths.Unlock() + + _, ok := s.requestPaths.pathByUUID[routeID] + if ok { + return status.Errorf(codes.AlreadyExists, "route session already exists for %s", routeID) + } + + s.requestPaths.pathByUUID[routeID] = reqPath + + // send back same message as confirmation of session + // creation + return bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: string(routeID), + Msg: &gitalypb.RouteVoteRequest_OpenRouteRequest{ + OpenRouteRequest: &gitalypb.RouteVoteRequest_OpenRoute{}, + }, + }) +} + +func (s *server) openRespPath(respPath chan<- *gitalypb.RouteVoteRequest, routeID route) (func(), error) { + s.responsePaths.Lock() + defer s.responsePaths.Unlock() + + _, ok := s.responsePaths.pathByUUID[routeID] + if ok { + return func() {}, status.Errorf(codes.AlreadyExists, "route session already exists for %s", routeID) + } + + s.responsePaths.pathByUUID[routeID] = respPath + + cleanup := func() { + s.responsePaths.Lock() + delete(s.responsePaths.pathByUUID, routeID) + s.responsePaths.Unlock() + } + + return cleanup, nil +} + +// routeRequest attempts to find a route for the given route ID and send the +// request message to the route owner +func (s *server) routeRequest(ctx context.Context, routeID route, req *gitalypb.RouteVoteRequest) error { + s.requestPaths.Lock() + reqPath, ok := s.requestPaths.pathByUUID[routeID] + s.requestPaths.Unlock() + + if !ok { + return status.Errorf(codes.NotFound, "route does not exist for UUID %s", req.RouteUuid) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case reqPath <- req: + return nil + } +} + +// routeResponse attempts to find a route for the given route ID and send the +// response message to the Gitaly caller +func (s *server) routeResponse(ctx context.Context, routeID route, resp *gitalypb.RouteVoteRequest) error { + s.responsePaths.RLock() + respPath, ok := s.responsePaths.pathByUUID[routeID] + s.responsePaths.RUnlock() + + if !ok { + return status.Errorf(codes.NotFound, "route does not exist for UUID %s", resp.RouteUuid) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case respPath <- resp: + return nil + } +} diff --git a/internal/gitaly/service/transaction/server_test.go b/internal/gitaly/service/transaction/server_test.go new file mode 100644 index 0000000000..f459b757c0 --- /dev/null +++ b/internal/gitaly/service/transaction/server_test.go @@ -0,0 +1,359 @@ +package transaction + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" +) + +func TestServer_RouteVote(t *testing.T) { + srvAddr, stopSrv := runServer(t) + defer stopSrv() + + rtClient, cc := newClient(t, srvAddr) + defer cc.Close() + + routeID1 := "1" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("when route ID is not opened", func(t *testing.T) { + bidi, err := rtClient.RouteVote(ctx) + require.NoError(t, err) + defer func() { _ = bidi.CloseSend() }() + + errMsg := "rpc error: code = NotFound desc = route does not exist for UUID 1" + + t.Run("VoteTransaction fails", func(t *testing.T) { + voteTxReq := &gitalypb.VoteTransactionRequest{RouteUuid: routeID1} + _, err := rtClient.VoteTransaction(ctx, voteTxReq) + require.Error(t, err) + testhelper.RequireGrpcError(t, err, codes.NotFound) + require.Contains(t, err.Error(), errMsg) + }) + + t.Run("StopTransaction fails", func(t *testing.T) { + stopTxReq := &gitalypb.StopTransactionRequest{RouteUuid: routeID1} + _, err := rtClient.StopTransaction(ctx, stopTxReq) + require.Error(t, err) + testhelper.RequireGrpcError(t, err, codes.NotFound) + require.Contains(t, err.Error(), errMsg) + }) + }) + + // opens a session and waits for confirmation of setup + openTxSession := func(t *testing.T, bidi gitalypb.RefTransaction_RouteVoteClient, routeID string) { + expectResponse := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_OpenRouteRequest{ + &gitalypb.RouteVoteRequest_OpenRoute{}, + }, + } + err := bidi.Send(expectResponse) + require.NoError(t, err) + + // wait for confirmation of message to avoid race condition + actualResponse, err := bidi.Recv() + require.NoError(t, err) + testhelper.ProtoEqual(t, expectResponse, actualResponse) + } + + // recvAndSend simulates a Praefect client receiving a request (recv) + // from a Gitaly client, and then replying with a response (send) + recvAndSend := func(t *testing.T, bidi gitalypb.RefTransaction_RouteVoteClient, recv proto.Message, send *gitalypb.RouteVoteRequest) { + actualRecv, err := bidi.Recv() + if !assert.NoError(t, err) { + return + } + if !assert.True(t, proto.Equal(recv, actualRecv)) { + return + } + + err = bidi.Send(send) + if !assert.NoError(t, err) { + return + } + } + + // performs the Gitaly client RPC, and the Praefect handling of that + // call and verifies all expected messages are routed correctly + assertVoteRequestSucceeds := func(t *testing.T, ctx context.Context, routeID string, bidi gitalypb.RefTransaction_RouteVoteClient, request *gitalypb.VoteTransactionRequest, response *gitalypb.VoteTransactionResponse) { + done := make(chan struct{}) + defer func() { <-done }() + go func() { + defer close(done) + recv := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_VoteTxRequest{ + request, + }, + } + send := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_VoteTxResponse{ + response, + }, + } + recvAndSend(t, bidi, recv, send) + }() + + actualResp, err := rtClient.VoteTransaction(ctx, request) + require.NoError(t, err) + testhelper.ProtoEqual(t, response, actualResp) + } + + // performs a Gitaly RPC, and the Praefect handling of that call will + // send back an error and verifies all messages are routed correctly + assertVoteRequestFailure := func(t *testing.T, ctx context.Context, routeID string, bidi gitalypb.RefTransaction_RouteVoteClient, request *gitalypb.VoteTransactionRequest, statusErr *gitalypb.RouteVoteRequest_Status) { + done := make(chan struct{}) + defer func() { <-done }() + go func() { + defer close(done) + recv := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_VoteTxRequest{ + request, + }, + } + send := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_Error{ + statusErr, + }, + } + recvAndSend(t, bidi, recv, send) + }() + + _, err := rtClient.VoteTransaction(ctx, request) + require.Error(t, err) + require.Equal(t, codes.Code(statusErr.Code), status.Code(err)) + require.Contains(t, err.Error(), statusErr.Message) + } + + // performs the Gitaly client RPC, and the Praefect handling of that + // call and verifies all expected messages are routed correctly + assertStopRequestSucceeds := func(t *testing.T, ctx context.Context, routeID string, bidi gitalypb.RefTransaction_RouteVoteClient, request *gitalypb.StopTransactionRequest, response *gitalypb.StopTransactionResponse) { + done := make(chan struct{}) + defer func() { <-done }() + go func() { + defer close(done) + recv := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_StopTxRequest{ + request, + }, + } + send := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_StopTxResponse{ + response, + }, + } + recvAndSend(t, bidi, recv, send) + }() + + actualResp, err := rtClient.StopTransaction(ctx, request) + require.NoError(t, err) + testhelper.ProtoEqual(t, response, actualResp) + } + + // performs a Gitaly RPC, and the Praefect handling of that call will + // send back an error and verifies all messages are routed correctly + assertStopRequestFailure := func(t *testing.T, ctx context.Context, routeID string, bidi gitalypb.RefTransaction_RouteVoteClient, request *gitalypb.StopTransactionRequest, statusErr *gitalypb.RouteVoteRequest_Status) { + done := make(chan struct{}) + defer func() { <-done }() + go func() { + defer close(done) + recv := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_StopTxRequest{ + request, + }, + } + send := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_Error{ + statusErr, + }, + } + recvAndSend(t, bidi, recv, send) + }() + + _, err := rtClient.StopTransaction(ctx, request) + require.Error(t, err) + require.Equal(t, codes.Code(statusErr.Code), status.Code(err)) + require.Contains(t, err.Error(), statusErr.Message) + } + + t.Run("route opened", func(t *testing.T) { + bidi, err := rtClient.RouteVote(ctx) + require.NoError(t, err) + defer func() { _ = bidi.CloseSend() }() + + openTxSession(t, bidi, routeID1) + + t.Run("opening additional route fails", func(t *testing.T) { + bidi, err := rtClient.RouteVote(ctx) + require.NoError(t, err) + defer func() { _ = bidi.CloseSend() }() + + err = bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: routeID1, + Msg: &gitalypb.RouteVoteRequest_OpenRouteRequest{ + &gitalypb.RouteVoteRequest_OpenRoute{}, + }, + }) + require.NoError(t, err) + + _, err = bidi.Recv() + require.Error(t, err) + + testhelper.RequireGrpcError(t, err, codes.AlreadyExists) + }) + + t.Run("VoteTransaction request succeeds", func(t *testing.T) { + request := &gitalypb.VoteTransactionRequest{ + RouteUuid: routeID1, + Node: "test", + } + response := &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_COMMIT, + } + assertVoteRequestSucceeds(t, ctx, routeID1, bidi, request, response) + }) + + t.Run("VoteTransaction request fails", func(t *testing.T) { + request := &gitalypb.VoteTransactionRequest{ + RouteUuid: routeID1, + Node: "test", + } + statusErr := &gitalypb.RouteVoteRequest_Status{ + Code: 42, + Message: "nope, try again", + } + assertVoteRequestFailure(t, ctx, routeID1, bidi, request, statusErr) + }) + + t.Run("StopTransaction request succeeds", func(t *testing.T) { + request := &gitalypb.StopTransactionRequest{ + RouteUuid: routeID1, + } + response := &gitalypb.StopTransactionResponse{} + assertStopRequestSucceeds(t, ctx, routeID1, bidi, request, response) + }) + + t.Run("StopTransaction request fails", func(t *testing.T) { + request := &gitalypb.StopTransactionRequest{ + RouteUuid: routeID1, + } + statusErr := &gitalypb.RouteVoteRequest_Status{ + Code: 42, + Message: "nope, try again", + } + assertStopRequestFailure(t, ctx, routeID1, bidi, request, statusErr) + }) + }) + + t.Run("multiple routes opened from many Praefects", func(t *testing.T) { + bidiClients := make([]gitalypb.RefTransaction_RouteVoteClient, 1000) + + // open many concurrent route sessions + for i := 0; i < 1000; i++ { + bidi, err := rtClient.RouteVote(ctx) + require.NoError(t, err) + defer func() { _ = bidi.CloseSend() }() + + openTxSession(t, bidi, fmt.Sprint(i)) + + bidiClients[i] = bidi + } + + t.Run("VoteTransaction RPC routing", func(t *testing.T) { + // send many requests + for i := 0; i < 1000; i++ { + request := &gitalypb.VoteTransactionRequest{ + RouteUuid: fmt.Sprint(i), + Node: fmt.Sprint(i), + } + response := &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_STOP, + } + + assertVoteRequestSucceeds(t, ctx, fmt.Sprint(i), bidiClients[i], request, response) + + statusErr := &gitalypb.RouteVoteRequest_Status{ + Code: 42, + Message: fmt.Sprint(i), + } + + assertVoteRequestFailure(t, ctx, fmt.Sprint(i), bidiClients[i], request, statusErr) + } + }) + + t.Run("StopTransaction RPC routing", func(t *testing.T) { + // send many requests + for i := 0; i < 1000; i++ { + request := &gitalypb.StopTransactionRequest{ + RouteUuid: fmt.Sprint(i), + } + response := &gitalypb.StopTransactionResponse{} + + assertStopRequestSucceeds(t, ctx, fmt.Sprint(i), bidiClients[i], request, response) + + statusErr := &gitalypb.RouteVoteRequest_Status{ + Code: 42, + Message: fmt.Sprint(i), + } + + assertStopRequestFailure(t, ctx, fmt.Sprint(i), bidiClients[i], request, statusErr) + } + }) + }) +} + +func runServer(t *testing.T) (string, func()) { + srv := testhelper.NewServer(t, nil, nil) + + gitalypb.RegisterRefTransactionServer(srv.GrpcServer(), NewServer()) + reflection.Register(srv.GrpcServer()) + + srv.Start(t) + + return "unix://" + srv.Socket(), srv.Stop +} + +func newClient(t *testing.T, serverSocketPath string) (gitalypb.RefTransactionClient, *grpc.ClientConn) { + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + } + + conn, err := grpc.Dial(serverSocketPath, connOpts...) + if err != nil { + t.Fatal(err) + } + + return gitalypb.NewRefTransactionClient(conn), conn +} + +func TestMain(m *testing.M) { + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + cleanup := testhelper.Configure() + defer cleanup() + + return m.Run() +} -- GitLab From 089802847665fa4d7ce265c5adfcae139e1a9b7e Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Mon, 25 Jan 2021 20:51:51 -0800 Subject: [PATCH 04/12] Replace Praefect server address with Gitaly In order to have reference transaction hooks call Gitaly instead of Praefect, we replace the Praefect server info with the internal socket address for the current Gitaly configuration. --- internal/git/hooks_options.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/internal/git/hooks_options.go b/internal/git/hooks_options.go index bad8d4645a..02e03835e6 100644 --- a/internal/git/hooks_options.go +++ b/internal/git/hooks_options.go @@ -10,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git/repository" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -88,6 +89,14 @@ func (cc *cmdCfg) configureHooks( return err } + if praefect != nil && featureflag.IsEnabled(ctx, featureflag.GitalyTxSvc) { + internalListenAddr := "unix://" + cfg.GitalyInternalSocketPath() + praefect = &metadata.PraefectServer{ + SocketPath: internalListenAddr, + Token: cfg.Auth.Token, + } + } + payload, err := NewHooksPayload(cfg, repo, transaction, praefect, receiveHooksPayload, requestedHooks).Env() if err != nil { return err -- GitLab From d137fedbf2267785dbae3bcde19373eac45f74ba Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Thu, 21 Jan 2021 20:46:50 -0800 Subject: [PATCH 05/12] Register Gitaly transaction service --- internal/gitaly/service/register.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/gitaly/service/register.go b/internal/gitaly/service/register.go index df898e5ca5..135d9be978 100644 --- a/internal/gitaly/service/register.go +++ b/internal/gitaly/service/register.go @@ -25,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/server" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/smarthttp" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/ssh" + txservice "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/transaction" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/wiki" "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/storage" @@ -96,6 +97,7 @@ func RegisterAll( gitalypb.RegisterObjectPoolServiceServer(grpcServer, objectpool.NewServer(cfg, locator, gitCmdFactory)) gitalypb.RegisterHookServiceServer(grpcServer, hook.NewServer(cfg, hookManager, gitCmdFactory)) gitalypb.RegisterInternalGitalyServer(grpcServer, internalgitaly.NewServer(cfg.Storages)) + gitalypb.RegisterRefTransactionServer(grpcServer, txservice.NewServer()) healthpb.RegisterHealthServer(grpcServer, health.NewServer()) reflection.Register(grpcServer) -- GitLab From 0a596f9f19d041a9c0e90284fe67fb418fe4bd34 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Mon, 25 Jan 2021 22:07:40 -0800 Subject: [PATCH 06/12] Refactor transaction service response and error handling The logic for how to respond back to clients will be reused in the Gitaly transaction service. Let's export it and DRY the code to avoid mistakes as the two services potentially diverge. --- .../praefect/service/transaction/server.go | 42 +------------ internal/praefect/transactions/manager.go | 62 +++++++++++++++++++ 2 files changed, 65 insertions(+), 39 deletions(-) diff --git a/internal/praefect/service/transaction/server.go b/internal/praefect/service/transaction/server.go index 3a931140c2..c5d49430c6 100644 --- a/internal/praefect/service/transaction/server.go +++ b/internal/praefect/service/transaction/server.go @@ -2,12 +2,9 @@ package transaction import ( "context" - "errors" - "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc/codes" ) type Server struct { @@ -26,28 +23,7 @@ func NewServer(txMgr *transactions.Manager) gitalypb.RefTransactionServer { // completed. func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { err := s.txMgr.VoteTransaction(ctx, in.TransactionId, in.Node, in.ReferenceUpdatesHash) - if err != nil { - switch { - case errors.Is(err, transactions.ErrNotFound): - return nil, helper.ErrNotFound(err) - case errors.Is(err, transactions.ErrTransactionCanceled): - return nil, helper.DecorateError(codes.Canceled, err) - case errors.Is(err, transactions.ErrTransactionStopped): - return &gitalypb.VoteTransactionResponse{ - State: gitalypb.VoteTransactionResponse_STOP, - }, nil - case errors.Is(err, transactions.ErrTransactionFailed): - return &gitalypb.VoteTransactionResponse{ - State: gitalypb.VoteTransactionResponse_ABORT, - }, nil - default: - return nil, helper.ErrInternal(err) - } - } - - return &gitalypb.VoteTransactionResponse{ - State: gitalypb.VoteTransactionResponse_COMMIT, - }, nil + return transactions.VoteResponseFor(err) } // StopTransaction is called by a client who wants to gracefully stop a @@ -55,18 +31,6 @@ func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransacti // will not get accepted anymore. It is fine to call this RPC multiple times on // the same transaction. func (s *Server) StopTransaction(ctx context.Context, in *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { - if err := s.txMgr.StopTransaction(ctx, in.TransactionId); err != nil { - switch { - case errors.Is(err, transactions.ErrNotFound): - return nil, helper.ErrNotFound(err) - case errors.Is(err, transactions.ErrTransactionCanceled): - return nil, helper.DecorateError(codes.Canceled, err) - case errors.Is(err, transactions.ErrTransactionStopped): - return &gitalypb.StopTransactionResponse{}, nil - default: - return nil, helper.ErrInternal(err) - } - } - - return &gitalypb.StopTransactionResponse{}, nil + err := s.txMgr.StopTransaction(ctx, in.TransactionId) + return transactions.StopResponseFor(err) } diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index abfbe8fdc0..29de16dfcb 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -14,7 +14,11 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var ErrNotFound = errors.New("transaction not found") @@ -127,6 +131,64 @@ func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger { // from the transaction manager. type CancelFunc func() error +func VoteResponseFor(err error) (*gitalypb.VoteTransactionResponse, error) { + if err != nil { + switch { + case errors.Is(err, ErrNotFound): + return nil, helper.ErrNotFound(err) + case errors.Is(err, ErrTransactionCanceled): + return nil, helper.DecorateError(codes.Canceled, err) + case errors.Is(err, ErrTransactionStopped): + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_STOP, + }, nil + case errors.Is(err, ErrTransactionFailed): + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_ABORT, + }, nil + default: + return nil, helper.ErrInternal(err) + } + } + + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_COMMIT, + }, nil +} + +func StopResponseFor(err error) (*gitalypb.StopTransactionResponse, error) { + if err != nil { + switch { + case errors.Is(err, ErrNotFound): + return nil, helper.ErrNotFound(err) + case errors.Is(err, ErrTransactionCanceled): + return nil, helper.DecorateError(codes.Canceled, err) + case errors.Is(err, ErrTransactionStopped): + return &gitalypb.StopTransactionResponse{}, nil + default: + return nil, helper.ErrInternal(err) + } + } + + return &gitalypb.StopTransactionResponse{}, nil +} + +func routeVoteErr(routeUUID string, err error) *gitalypb.RouteVoteRequest { + c := codes.Unknown + if s, ok := status.FromError(err); ok { + c = s.Code() + } + return &gitalypb.RouteVoteRequest{ + RouteUuid: routeUUID, + Msg: &gitalypb.RouteVoteRequest_Error{ + &gitalypb.RouteVoteRequest_Status{ + Code: int32(c), + Message: err.Error(), + }, + }, + } +} + // RegisterTransaction registers a new reference transaction for a set of nodes // taking part in the transaction. `threshold` is the threshold at which an // election will succeed. It needs to be in the range `weight(voters)/2 < -- GitLab From 5cd53caf4d63b74d63fb6db3e0157c2233dd2077 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Mon, 25 Jan 2021 22:48:29 -0800 Subject: [PATCH 07/12] Feature flag for gitaly transaction service This feature flag addition allows Gitaly to host the transaction service instead of Praefect. --- internal/metadata/featureflag/feature_flags.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/metadata/featureflag/feature_flags.go b/internal/metadata/featureflag/feature_flags.go index d062f96d6f..d2365144b2 100644 --- a/internal/metadata/featureflag/feature_flags.go +++ b/internal/metadata/featureflag/feature_flags.go @@ -36,6 +36,9 @@ var ( GoGetNewLFSPointers = FeatureFlag{Name: "go_get_new_lfs_pointers", OnByDefault: false} // UploadPackGitalyHooks makes git-upload-pack use gitaly-hooks to run pack-objects UploadPackGitalyHooks = FeatureFlag{Name: "upload_pack_gitaly_hooks", OnByDefault: false} + // GitalyTxSvc switches from using the Praefect transaction service to + // the Gitaly transaction service for an operation + GitalyTxSvc = FeatureFlag{Name: "g_tx_svc", OnByDefault: false} // TxApplyBfgObjectMapStream enables transactions for ApplyBfgObjectMapStream TxApplyBfgObjectMapStream = FeatureFlag{Name: "tx_apply_bfg_object_map_stream", OnByDefault: true} @@ -107,6 +110,7 @@ var ( var All = []FeatureFlag{ DistributedReads, LogCommandStats, + GitalyTxSvc, ReferenceTransactions, GoUserCherryPick, GoUserUpdateBranch, -- GitLab From e41a98b009322b03bbc7c87cf57a02adfec8f4f3 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Mon, 25 Jan 2021 22:46:39 -0800 Subject: [PATCH 08/12] Integrate RouteVote into transaction manager The transaction manager needs to subscribe to transactions at the destination Gitalies for each transaction. This entails listening for a request and routing back the response to each Gitaly voter. --- internal/praefect/coordinator.go | 5 - internal/praefect/transactions/manager.go | 134 +++++++++++++++++++++- 2 files changed, 132 insertions(+), 7 deletions(-) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 5036d569f9..bf50a27881 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -110,11 +110,6 @@ var transactionRPCs = map[string]transactionsCondition{ "/gitaly.RepositoryService/RestoreCustomHooks": transactionsDisabled, "/gitaly.RepositoryService/SetConfig": transactionsDisabled, "/gitaly.RepositoryService/WriteCommitGraph": transactionsDisabled, - - // These shouldn't ever use transactions for the sake of not creating - // cyclic dependencies. - "/gitaly.RefTransaction/StopTransaction": transactionsDisabled, - "/gitaly.RefTransaction/VoteTransaction": transactionsDisabled, } // forcePrimaryRoutingRPCs tracks RPCs which need to always get routed to the primary. This should diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 29de16dfcb..3efc9cba0d 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -11,12 +11,14 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -33,6 +35,7 @@ type Manager struct { counterMetric *prometheus.CounterVec delayMetric *prometheus.HistogramVec subtransactionsMetric prometheus.Histogram + routeUUID uuid.UUID } // TransactionIDGenerator is an interface for types that can generate transaction IDs. @@ -72,6 +75,14 @@ func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt { } } +// WithRouteUUID is an option to manually set the manager's UUID used for +// routing transaction messages. +func WithRouteUUID(routeUUID uuid.UUID) ManagerOpt { + return func(mgr *Manager) { + mgr.routeUUID = routeUUID + } +} + // NewManager creates a new transactions Manager. func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { mgr := &Manager{ @@ -103,6 +114,7 @@ func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { Buckets: []float64{0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0}, }, ), + routeUUID: uuid.New(), } for _, opt := range opts { @@ -131,6 +143,7 @@ func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger { // from the transaction manager. type CancelFunc func() error +// VoteResponseFor returns the appropriate response based on the provided error func VoteResponseFor(err error) (*gitalypb.VoteTransactionResponse, error) { if err != nil { switch { @@ -156,6 +169,7 @@ func VoteResponseFor(err error) (*gitalypb.VoteTransactionResponse, error) { }, nil } +// StopResponseFor will return the appropriate response for the provided error func StopResponseFor(err error) (*gitalypb.StopTransactionResponse, error) { if err != nil { switch { @@ -173,13 +187,13 @@ func StopResponseFor(err error) (*gitalypb.StopTransactionResponse, error) { return &gitalypb.StopTransactionResponse{}, nil } -func routeVoteErr(routeUUID string, err error) *gitalypb.RouteVoteRequest { +func routeVoteErr(routeUUID uuid.UUID, err error) *gitalypb.RouteVoteRequest { c := codes.Unknown if s, ok := status.FromError(err); ok { c = s.Code() } return &gitalypb.RouteVoteRequest{ - RouteUuid: routeUUID, + RouteUuid: routeUUID.String(), Msg: &gitalypb.RouteVoteRequest_Error{ &gitalypb.RouteVoteRequest_Status{ Code: int32(c), @@ -189,6 +203,122 @@ func routeVoteErr(routeUUID string, err error) *gitalypb.RouteVoteRequest { } } +// StartRoutingVotes will repeatedly attempt to route votes from a Gitaly hosted +// transaction service back to this Praefect server. A goroutine is launched for +// each node and will attempt to recover from errors and reestablish a route +// stream until the context is cancelled. +// The connSet param is meant to be provided by praefect.Connections from a +// praefect.NodeSet. It is keyed first by virtual storage name, then Gitaly +// storage name. +func (mgr *Manager) StartRoutingVotes(ctx context.Context, connSet map[string]map[string]*grpc.ClientConn) func() { + wg := sync.WaitGroup{} + + for _, vs := range connSet { + for nodeName, cc := range vs { + wg.Add(1) + go func(nodeName string, cc *grpc.ClientConn) { + defer wg.Done() + + // only case where we stop attempting to route + // votes is when the parent context is + // cancelled or the equivalent gRPC error. + // All other errors we will log and retry + for { + err := mgr.routeVotes(ctx, nodeName, cc) + if errors.Is(err, context.Canceled) { + return + } + if status.Code(err) == codes.Canceled { + return + } + mgr.log(ctx). + WithError(err). + WithField("route", mgr.routeUUID). + WithField("node", nodeName). + Error("vote routing failed, retrying") + } + }(nodeName, cc) + } + } + + return wg.Wait +} + +func (mgr *Manager) routeVotes(ctx context.Context, nodeName string, cc *grpc.ClientConn) error { + bidi, err := gitalypb.NewRefTransactionClient(cc).RouteVote(ctx) + if err != nil { + return err + } + defer func() { + if err := bidi.CloseSend(); err != nil { + mgr.log(ctx). + WithError(err). + WithField("route", mgr.routeUUID). + WithField("node", nodeName). + Error("unable to close route session") + } + }() + + // open transaction on Gitaly node + if err := bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: mgr.routeUUID.String(), + Msg: &gitalypb.RouteVoteRequest_OpenRouteRequest{ + OpenRouteRequest: &gitalypb.RouteVoteRequest_OpenRoute{}, + }, + }); err != nil { + return err + } + // wait for confirmation + _, err = bidi.Recv() + if err != nil { + return err + } + + for { + resp, err := bidi.Recv() + if err != nil { + return err + } + + switch r := resp.Msg.(type) { + case *gitalypb.RouteVoteRequest_VoteTxRequest: + err := mgr.VoteTransaction(ctx, + r.VoteTxRequest.GetTransactionId(), + r.VoteTxRequest.GetNode(), + r.VoteTxRequest.GetReferenceUpdatesHash(), + ) + resp, err := VoteResponseFor(err) + if err != nil { + return bidi.Send(routeVoteErr(mgr.routeUUID, err)) + } + if err := bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: r.VoteTxRequest.GetRouteUuid(), + Msg: &gitalypb.RouteVoteRequest_VoteTxResponse{resp}, + }); err != nil { + return err + } + + case *gitalypb.RouteVoteRequest_StopTxRequest: + err := mgr.StopTransaction(ctx, + r.StopTxRequest.GetTransactionId(), + ) + resp, err := StopResponseFor(err) + if err != nil { + return bidi.Send(routeVoteErr(mgr.routeUUID, err)) + } + if err := bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: r.StopTxRequest.GetRouteUuid(), + Msg: &gitalypb.RouteVoteRequest_StopTxResponse{resp}, + }); err != nil { + return err + } + + default: + return fmt.Errorf("received unexpected type %T", r) + } + } +} + // RegisterTransaction registers a new reference transaction for a set of nodes // taking part in the transaction. `threshold` is the threshold at which an // election will succeed. It needs to be in the range `weight(voters)/2 < -- GitLab From 81d7f8d27a5b7c0538ab250727c5dfef81205960 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Wed, 27 Jan 2021 00:25:35 -0800 Subject: [PATCH 09/12] Test Gitaly transaction service with coordinator --- internal/praefect/coordinator_test.go | 116 +++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index dc4dd77bce..3ab5e5ffa3 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -13,11 +13,13 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" + "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/git/gittest" + gtransaction "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/transaction" "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" @@ -276,7 +278,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { }, } - txMgr := transactions.NewManager(conf) + routeUUID := uuid.New() + txMgr := transactions.NewManager(conf, transactions.WithRouteUUID(routeUUID)) coordinator := NewCoordinator( datastore.NewMemoryReplicationEventQueue(conf), @@ -340,6 +343,117 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { err = streamParams.RequestFinalizer() require.NoError(t, err) + + t.Run("with Gitaly transaction service enabled", func(t *testing.T) { + ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.GitalyTxSvc) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // register a gitaly transaction service for each node + gitalyClients := map[string]gitalypb.RefTransactionClient{} + connSet := map[string]map[string]*grpc.ClientConn{} + for _, vs := range conf.VirtualStorages { + connSet[vs.Name] = map[string]*grpc.ClientConn{} + for _, node := range vs.Nodes { + srv := testhelper.NewServer(t, nil, nil) + gitalypb.RegisterRefTransactionServer( + srv.GrpcServer(), gtransaction.NewServer()) + + srv.Start(t) + defer srv.Stop() + + addr := "unix://" + srv.Socket() + + cc, err := client.Dial(addr, nil) + require.NoError(t, err) + defer cc.Close() + + node.Address = addr + gitalyClients[node.Storage] = gitalypb.NewRefTransactionClient(cc) + connSet[vs.Name][node.Storage] = cc + } + } + + join := txMgr.StartRoutingVotes(ctx, connSet) + defer join() + + // wait until the routes are up + + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + require.NoError(t, err) + defer func() { require.NoError(t, streamParams.RequestFinalizer()) }() + + transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx) + require.NoError(t, err) + + var wg sync.WaitGroup + var syncWG sync.WaitGroup + + wg.Add(2) + defer wg.Wait() + + syncWG.Add(2) + + stopped := make(chan struct{}) // tells us when the transaction has been stopped + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + + req := &gitalypb.VoteTransactionRequest{ + Repository: &repo, + TransactionId: transaction.ID, + Node: "primary", + ReferenceUpdatesHash: vote[:], + RouteUuid: routeUUID.String(), + } + resp, err := gitalyClients["primary"].VoteTransaction(ctx, req) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, resp.State) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + stopReq := &gitalypb.StopTransactionRequest{ + Repository: &repo, + TransactionId: transaction.ID, + RouteUuid: routeUUID.String(), + } + _, err = gitalyClients["primary"].StopTransaction(ctx, stopReq) + require.NoError(t, err) + close(stopped) + }() + + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + + req := &gitalypb.VoteTransactionRequest{ + Repository: &repo, + TransactionId: transaction.ID, + Node: "secondary", + ReferenceUpdatesHash: vote[:], + RouteUuid: routeUUID.String(), + } + resp, err := gitalyClients["secondary"].VoteTransaction(ctx, req) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, resp.State) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + <-stopped + resp, err = gitalyClients["secondary"].VoteTransaction(ctx, req) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, resp.State) + + cancel() // test is done, stop routing votes + }() + }) } func TestStreamDirectorAccessor(t *testing.T) { -- GitLab From 22f98083da48443532a690deff8d2e0ff1f1ae91 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Tue, 2 Mar 2021 15:37:27 -0800 Subject: [PATCH 10/12] Add route UUID to hook payload --- internal/git/hooks_options.go | 2 +- internal/gitaly/service/operations/branches_test.go | 4 ++-- internal/gitaly/service/operations/tags_test.go | 2 +- .../service/repository/apply_gitattributes_test.go | 2 +- .../gitaly/service/smarthttp/receive_pack_test.go | 2 +- internal/praefect/coordinator.go | 4 ++-- internal/praefect/metadata/transaction.go | 12 ++++++++---- internal/praefect/transactions/manager.go | 3 +++ 8 files changed, 19 insertions(+), 12 deletions(-) diff --git a/internal/git/hooks_options.go b/internal/git/hooks_options.go index 02e03835e6..82dd0953b3 100644 --- a/internal/git/hooks_options.go +++ b/internal/git/hooks_options.go @@ -89,7 +89,7 @@ func (cc *cmdCfg) configureHooks( return err } - if praefect != nil && featureflag.IsEnabled(ctx, featureflag.GitalyTxSvc) { + if transaction != nil && featureflag.IsEnabled(ctx, featureflag.GitalyTxSvc) { internalListenAddr := "unix://" + cfg.GitalyInternalSocketPath() praefect = &metadata.PraefectServer{ SocketPath: internalListenAddr, diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index 96fcb212c8..1a36bbdcac 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -197,7 +197,7 @@ func TestUserCreateBranchWithTransaction(t *testing.T) { defer cancel() ctx, err = tc.server.Inject(ctx) require.NoError(t, err) - ctx, err = metadata.InjectTransaction(ctx, 1, "node", true) + ctx, err = metadata.InjectTransaction(ctx, 1, "node", true, "") require.NoError(t, err) ctx = helper.IncomingToOutgoing(ctx) @@ -587,7 +587,7 @@ func TestUserDeleteBranch_transaction(t *testing.T) { defer cancel() ctx, err = praefect.Inject(ctx) require.NoError(t, err) - ctx, err = metadata.InjectTransaction(ctx, 1, "node", true) + ctx, err = metadata.InjectTransaction(ctx, 1, "node", true, "") require.NoError(t, err) ctx = helper.IncomingToOutgoing(ctx) diff --git a/internal/gitaly/service/operations/tags_test.go b/internal/gitaly/service/operations/tags_test.go index 50ee82327e..3a3c02ee66 100644 --- a/internal/gitaly/service/operations/tags_test.go +++ b/internal/gitaly/service/operations/tags_test.go @@ -387,7 +387,7 @@ func TestUserCreateTagWithTransaction(t *testing.T) { // We need to convert to an incoming context first in // order to preserve the feature flag. ctx = helper.OutgoingToIncoming(ctx) - ctx, err = metadata.InjectTransaction(ctx, 1, "node", testCase.primary) + ctx, err = metadata.InjectTransaction(ctx, 1, "node", testCase.primary, "") require.NoError(t, err) ctx, err = praefectServer.Inject(ctx) require.NoError(t, err) diff --git a/internal/gitaly/service/repository/apply_gitattributes_test.go b/internal/gitaly/service/repository/apply_gitattributes_test.go index 459a926c84..e750d80a15 100644 --- a/internal/gitaly/service/repository/apply_gitattributes_test.go +++ b/internal/gitaly/service/repository/apply_gitattributes_test.go @@ -197,7 +197,7 @@ func TestApplyGitattributesWithTransaction(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - ctx, err := metadata.InjectTransaction(ctx, 1, "primary", true) + ctx, err := metadata.InjectTransaction(ctx, 1, "primary", true, "") require.NoError(t, err) ctx, err = praefect.Inject(ctx) require.NoError(t, err) diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go index b7b9cfa36f..62c7cff93e 100644 --- a/internal/gitaly/service/smarthttp/receive_pack_test.go +++ b/internal/gitaly/service/smarthttp/receive_pack_test.go @@ -604,7 +604,7 @@ func TestPostReceiveWithReferenceTransactionHook(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - ctx, err = metadata.InjectTransaction(ctx, 1234, "primary", true) + ctx, err = metadata.InjectTransaction(ctx, 1234, "primary", true, "") require.NoError(t, err) ctx, err = praefectServer.Inject(ctx) require.NoError(t, err) diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index bf50a27881..b093faf540 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -455,7 +455,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall errByNode: make(map[string]error), } - injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true) + injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true, c.txMgr.RouteUUID().String()) if err != nil { return nil, err } @@ -473,7 +473,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return nil, err } - injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false) + injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false, c.txMgr.RouteUUID().String()) if err != nil { return nil, err } diff --git a/internal/praefect/metadata/transaction.go b/internal/praefect/metadata/transaction.go index 08e0a43bae..a3c7b7d5a5 100644 --- a/internal/praefect/metadata/transaction.go +++ b/internal/praefect/metadata/transaction.go @@ -30,6 +30,9 @@ type Transaction struct { Node string `json:"node"` // Primary identifies the node's role in this transaction Primary bool `json:"primary"` + // RouteUUID is used to properly route a vote to the originating + // Praefect + RouteUUID string `json:"route_uuid"` } // serialize serializes a `Transaction` into a string. @@ -57,11 +60,12 @@ func transactionFromSerialized(serialized string) (Transaction, error) { } // InjectTransaction injects reference transaction metadata into an incoming context -func InjectTransaction(ctx context.Context, tranasctionID uint64, node string, primary bool) (context.Context, error) { +func InjectTransaction(ctx context.Context, tranasctionID uint64, node string, primary bool, routeUUID string) (context.Context, error) { transaction := Transaction{ - ID: tranasctionID, - Node: node, - Primary: primary, + ID: tranasctionID, + Node: node, + Primary: primary, + RouteUUID: routeUUID, } serialized, err := transaction.serialize() diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index 3efc9cba0d..c04dd0eb88 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -124,6 +124,9 @@ func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { return mgr } +// RouteUUID is a getter for the routeUUID +func (mgr *Manager) RouteUUID() uuid.UUID { return mgr.routeUUID } + func (mgr *Manager) Describe(descs chan<- *prometheus.Desc) { prometheus.DescribeByCollect(mgr, descs) } -- GitLab From b23f63d7d756650a4506ac229b7af77d8e9442c5 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Wed, 3 Mar 2021 13:40:38 -0800 Subject: [PATCH 11/12] Pass RouteID in transaction voting --- internal/gitaly/transaction/manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go index 5a5033d0d1..7aa7e03bc5 100644 --- a/internal/gitaly/transaction/manager.go +++ b/internal/gitaly/transaction/manager.go @@ -101,6 +101,7 @@ func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server TransactionId: tx.ID, Node: tx.Node, ReferenceUpdatesHash: hash, + RouteUuid: tx.RouteUUID, }) if err != nil { logger.WithError(err).Error("vote failed") @@ -130,6 +131,7 @@ func (m *PoolManager) Stop(ctx context.Context, tx metadata.Transaction, server if _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ TransactionId: tx.ID, + RouteUuid: tx.RouteUUID, }); err != nil { m.log(ctx).WithFields(logrus.Fields{ "transaction.id": tx.ID, -- GitLab From 1457881c272cbc6d15c700678aace2f7db14be09 Mon Sep 17 00:00:00 2001 From: Paul Okstad Date: Wed, 3 Mar 2021 23:17:08 -0800 Subject: [PATCH 12/12] Start vote routing in Praefect --- cmd/praefect/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 156b89425d..adb2c3e16f 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -387,6 +387,8 @@ func run(cfgs []starter.Config, conf config.Config) error { protoregistry.GitalyProtoPreregistered, ) ) + transactionManager.StartRoutingVotes(ctx, nodeSet.Connections()) + metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl) if db != nil { prometheus.MustRegister( -- GitLab