diff --git a/cmd/praefect/main.go b/cmd/praefect/main.go index 156b89425d9afa840df448d9c09da616f9c25c64..adb2c3e16f7cf25cc1813573b772561227bff505 100644 --- a/cmd/praefect/main.go +++ b/cmd/praefect/main.go @@ -387,6 +387,8 @@ func run(cfgs []starter.Config, conf config.Config) error { protoregistry.GitalyProtoPreregistered, ) ) + transactionManager.StartRoutingVotes(ctx, nodeSet.Connections()) + metricsCollectors = append(metricsCollectors, transactionManager, coordinator, repl) if db != nil { prometheus.MustRegister( diff --git a/internal/git/hooks_options.go b/internal/git/hooks_options.go index bad8d4645a5bd86c8fc24fd013f78532db683def..82dd0953b3bd7b296a6378b9df917a7a98634d1d 100644 --- a/internal/git/hooks_options.go +++ b/internal/git/hooks_options.go @@ -10,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/git/repository" "gitlab.com/gitlab-org/gitaly/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/internal/log" + "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/praefect/metadata" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" ) @@ -88,6 +89,14 @@ func (cc *cmdCfg) configureHooks( return err } + if transaction != nil && featureflag.IsEnabled(ctx, featureflag.GitalyTxSvc) { + internalListenAddr := "unix://" + cfg.GitalyInternalSocketPath() + praefect = &metadata.PraefectServer{ + SocketPath: internalListenAddr, + Token: cfg.Auth.Token, + } + } + payload, err := NewHooksPayload(cfg, repo, transaction, praefect, receiveHooksPayload, requestedHooks).Env() if err != nil { return err diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index 96fcb212c8a2f410058d80ff59c0adc44de579d1..1a36bbdcacec3b1994329753368ea91de1da2dc3 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -197,7 +197,7 @@ func TestUserCreateBranchWithTransaction(t *testing.T) { defer cancel() ctx, err = tc.server.Inject(ctx) require.NoError(t, err) - ctx, err = metadata.InjectTransaction(ctx, 1, "node", true) + ctx, err = metadata.InjectTransaction(ctx, 1, "node", true, "") require.NoError(t, err) ctx = helper.IncomingToOutgoing(ctx) @@ -587,7 +587,7 @@ func TestUserDeleteBranch_transaction(t *testing.T) { defer cancel() ctx, err = praefect.Inject(ctx) require.NoError(t, err) - ctx, err = metadata.InjectTransaction(ctx, 1, "node", true) + ctx, err = metadata.InjectTransaction(ctx, 1, "node", true, "") require.NoError(t, err) ctx = helper.IncomingToOutgoing(ctx) diff --git a/internal/gitaly/service/operations/tags_test.go b/internal/gitaly/service/operations/tags_test.go index 50ee82327eaebd11d262bd9d276367a2f53ea6e7..3a3c02ee664eef876f6de0b1d0a9ecfe8ae95d86 100644 --- a/internal/gitaly/service/operations/tags_test.go +++ b/internal/gitaly/service/operations/tags_test.go @@ -387,7 +387,7 @@ func TestUserCreateTagWithTransaction(t *testing.T) { // We need to convert to an incoming context first in // order to preserve the feature flag. ctx = helper.OutgoingToIncoming(ctx) - ctx, err = metadata.InjectTransaction(ctx, 1, "node", testCase.primary) + ctx, err = metadata.InjectTransaction(ctx, 1, "node", testCase.primary, "") require.NoError(t, err) ctx, err = praefectServer.Inject(ctx) require.NoError(t, err) diff --git a/internal/gitaly/service/register.go b/internal/gitaly/service/register.go index df898e5ca5b0fe5280e108833371df1d40e609ef..135d9be978344d673b4bed994904926f33a10274 100644 --- a/internal/gitaly/service/register.go +++ b/internal/gitaly/service/register.go @@ -25,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/server" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/smarthttp" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/ssh" + txservice "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/transaction" "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/wiki" "gitlab.com/gitlab-org/gitaly/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/internal/storage" @@ -96,6 +97,7 @@ func RegisterAll( gitalypb.RegisterObjectPoolServiceServer(grpcServer, objectpool.NewServer(cfg, locator, gitCmdFactory)) gitalypb.RegisterHookServiceServer(grpcServer, hook.NewServer(cfg, hookManager, gitCmdFactory)) gitalypb.RegisterInternalGitalyServer(grpcServer, internalgitaly.NewServer(cfg.Storages)) + gitalypb.RegisterRefTransactionServer(grpcServer, txservice.NewServer()) healthpb.RegisterHealthServer(grpcServer, health.NewServer()) reflection.Register(grpcServer) diff --git a/internal/gitaly/service/repository/apply_gitattributes_test.go b/internal/gitaly/service/repository/apply_gitattributes_test.go index 459a926c8415e86900a4911fc412ff3bbcfdbc5d..e750d80a1503744b3af32236019845f8bad6a2a6 100644 --- a/internal/gitaly/service/repository/apply_gitattributes_test.go +++ b/internal/gitaly/service/repository/apply_gitattributes_test.go @@ -197,7 +197,7 @@ func TestApplyGitattributesWithTransaction(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - ctx, err := metadata.InjectTransaction(ctx, 1, "primary", true) + ctx, err := metadata.InjectTransaction(ctx, 1, "primary", true, "") require.NoError(t, err) ctx, err = praefect.Inject(ctx) require.NoError(t, err) diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go index b7b9cfa36fefbacad1bb98ba37c984b8a2f2dc86..62c7cff93eaa1280fe91ecc1e3696a0af37e7200 100644 --- a/internal/gitaly/service/smarthttp/receive_pack_test.go +++ b/internal/gitaly/service/smarthttp/receive_pack_test.go @@ -604,7 +604,7 @@ func TestPostReceiveWithReferenceTransactionHook(t *testing.T) { ctx, cancel := testhelper.Context() defer cancel() - ctx, err = metadata.InjectTransaction(ctx, 1234, "primary", true) + ctx, err = metadata.InjectTransaction(ctx, 1234, "primary", true, "") require.NoError(t, err) ctx, err = praefectServer.Inject(ctx) require.NoError(t, err) diff --git a/internal/gitaly/service/transaction/server.go b/internal/gitaly/service/transaction/server.go new file mode 100644 index 0000000000000000000000000000000000000000..6251151ff42a4a9a5b2b079c37961843eee4da6e --- /dev/null +++ b/internal/gitaly/service/transaction/server.go @@ -0,0 +1,275 @@ +package transaction + +import ( + "context" + "fmt" + "sync" + + "gitlab.com/gitlab-org/gitaly/internal/helper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type route string + +type server struct { + // requestPaths are how a Gitaly request gets routed to the correct + // Praefect + requestPaths struct { + sync.RWMutex + pathByUUID map[route]chan<- *gitalypb.RouteVoteRequest + } + + // responsePaths are how a Praefect response gets routed to the correct + // Gitaly RPC call + responsePaths struct { + sync.RWMutex + pathByUUID map[route]chan<- *gitalypb.RouteVoteRequest + } +} + +// NewServer returns a Gitaly transaction server capable of routing votes to a +// Praefect client +func NewServer() gitalypb.RefTransactionServer { + s := &server{} + s.requestPaths.pathByUUID = map[route]chan<- *gitalypb.RouteVoteRequest{} + s.responsePaths.pathByUUID = map[route]chan<- *gitalypb.RouteVoteRequest{} + return s +} + +// VoteTransaction will attempt to route the request to a listening Praefect +// client and then wait for a response to be routed back. +func (s *server) VoteTransaction(ctx context.Context, req *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { + wrappedReq := &gitalypb.RouteVoteRequest{ + RouteUuid: req.GetRouteUuid(), + Msg: &gitalypb.RouteVoteRequest_VoteTxRequest{req}, + } + + resp, err := s.routeRPC(ctx, wrappedReq) + if err != nil { + return nil, status.Errorf(status.Code(err), "VoteTransaction: routing RPC: %v", err) + } + + switch r := resp.Msg.(type) { + case *gitalypb.RouteVoteRequest_VoteTxResponse: + return r.VoteTxResponse, nil + case *gitalypb.RouteVoteRequest_Error: + return nil, status.New(codes.Code(r.Error.Code), r.Error.Message).Err() + default: + return nil, helper.ErrInternalf("VoteTransaction: unexpected response type %T", resp.Msg) + } +} + +// StopTransaction will attempt to route the request to a listening Praefect +// client and then wait for a response to be routed back. +func (s *server) StopTransaction(ctx context.Context, req *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { + wrappedReq := &gitalypb.RouteVoteRequest{ + RouteUuid: req.GetRouteUuid(), + Msg: &gitalypb.RouteVoteRequest_StopTxRequest{req}, + } + + resp, err := s.routeRPC(ctx, wrappedReq) + if err != nil { + return nil, status.Errorf(status.Code(err), "StopTransaction: routing RPC: %v", err) + } + + switch r := resp.Msg.(type) { + case *gitalypb.RouteVoteRequest_StopTxResponse: + return r.StopTxResponse, nil + case *gitalypb.RouteVoteRequest_Error: + return nil, status.New(codes.Code(r.Error.Code), r.Error.Message).Err() + default: + return nil, helper.ErrInternalf("StopTransaction: unexpected response type %T", resp.Msg) + } +} + +func (s *server) routeRPC(ctx context.Context, req *gitalypb.RouteVoteRequest) (*gitalypb.RouteVoteRequest, error) { + respPath := make(chan *gitalypb.RouteVoteRequest) + defer close(respPath) + + routeUUID := route(req.GetRouteUuid()) + + cleanup, err := s.openRespPath(respPath, routeUUID) + if err != nil { + return nil, status.Errorf(status.Code(err), "opening response path: %v", err) + } + defer cleanup() + + if err := s.routeRequest(ctx, routeUUID, req); err != nil { + return nil, status.Errorf(status.Code(err), "routing request: %v", err) + } + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("waiting for response: %w", ctx.Err()) + case resp, ok := <-respPath: + if !ok { + return nil, helper.ErrInternalf("response route unexpectedly closed") + } + return resp, nil + } +} + +func (s *server) RouteVote(bidi gitalypb.RefTransaction_RouteVoteServer) error { + ctx, cancel := context.WithCancel(bidi.Context()) + defer cancel() + + reqPath := make(chan *gitalypb.RouteVoteRequest) + defer close(reqPath) + + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { return s.processOutgoing(ctx, reqPath, bidi) }) + eg.Go(func() error { return s.processIncoming(ctx, reqPath, bidi) }) + + return eg.Wait() +} + +// processOutgoing handles receiving messages from other clients that need to be +// delivered to the caller of this stream +func (s *server) processOutgoing(ctx context.Context, reqPath <-chan *gitalypb.RouteVoteRequest, stream gitalypb.RefTransaction_RouteVoteServer) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case msg, ok := <-reqPath: + if !ok { + return nil + } + if err := stream.Send(msg); err != nil { + return err + } + } + } +} + +// processIncoming handles each incoming message to this stream +func (s *server) processIncoming(ctx context.Context, reqPath chan<- *gitalypb.RouteVoteRequest, bidi gitalypb.RefTransaction_RouteVoteServer) error { + var sessionOpened bool + for { + var req *gitalypb.RouteVoteRequest + var err error + + // there is no way to cancel early from a stream recv, so we + // wrap it in a goroutine to select it against context cancelation + done := make(chan struct{}) + go func() { + defer close(done) + req, err = bidi.Recv() // should return when RPC is done + }() + + select { + case <-done: + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + + routeUUID := route(req.GetRouteUuid()) + + switch req.GetMsg().(type) { + case *gitalypb.RouteVoteRequest_OpenRouteRequest: + if sessionOpened { + return status.Error(codes.AlreadyExists, "route already exists for stream") + } + if err := s.openReqPath(reqPath, routeUUID, bidi); err != nil { + return err + } + sessionOpened = true + defer s.closeRoute(routeUUID) + default: + if err := s.routeResponse(ctx, routeUUID, req); err != nil { + return err + } + } + } +} + +func (s *server) closeRoute(routeUUID route) { + s.requestPaths.Lock() + delete(s.requestPaths.pathByUUID, routeUUID) + s.requestPaths.Unlock() +} + +func (s *server) openReqPath(reqPath chan<- *gitalypb.RouteVoteRequest, routeID route, bidi gitalypb.RefTransaction_RouteVoteServer) error { + s.requestPaths.Lock() + defer s.requestPaths.Unlock() + + _, ok := s.requestPaths.pathByUUID[routeID] + if ok { + return status.Errorf(codes.AlreadyExists, "route session already exists for %s", routeID) + } + + s.requestPaths.pathByUUID[routeID] = reqPath + + // send back same message as confirmation of session + // creation + return bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: string(routeID), + Msg: &gitalypb.RouteVoteRequest_OpenRouteRequest{ + OpenRouteRequest: &gitalypb.RouteVoteRequest_OpenRoute{}, + }, + }) +} + +func (s *server) openRespPath(respPath chan<- *gitalypb.RouteVoteRequest, routeID route) (func(), error) { + s.responsePaths.Lock() + defer s.responsePaths.Unlock() + + _, ok := s.responsePaths.pathByUUID[routeID] + if ok { + return func() {}, status.Errorf(codes.AlreadyExists, "route session already exists for %s", routeID) + } + + s.responsePaths.pathByUUID[routeID] = respPath + + cleanup := func() { + s.responsePaths.Lock() + delete(s.responsePaths.pathByUUID, routeID) + s.responsePaths.Unlock() + } + + return cleanup, nil +} + +// routeRequest attempts to find a route for the given route ID and send the +// request message to the route owner +func (s *server) routeRequest(ctx context.Context, routeID route, req *gitalypb.RouteVoteRequest) error { + s.requestPaths.Lock() + reqPath, ok := s.requestPaths.pathByUUID[routeID] + s.requestPaths.Unlock() + + if !ok { + return status.Errorf(codes.NotFound, "route does not exist for UUID %s", req.RouteUuid) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case reqPath <- req: + return nil + } +} + +// routeResponse attempts to find a route for the given route ID and send the +// response message to the Gitaly caller +func (s *server) routeResponse(ctx context.Context, routeID route, resp *gitalypb.RouteVoteRequest) error { + s.responsePaths.RLock() + respPath, ok := s.responsePaths.pathByUUID[routeID] + s.responsePaths.RUnlock() + + if !ok { + return status.Errorf(codes.NotFound, "route does not exist for UUID %s", resp.RouteUuid) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case respPath <- resp: + return nil + } +} diff --git a/internal/gitaly/service/transaction/server_test.go b/internal/gitaly/service/transaction/server_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f459b757c0f75de0b20d4b26e6dc7c07543ffb11 --- /dev/null +++ b/internal/gitaly/service/transaction/server_test.go @@ -0,0 +1,359 @@ +package transaction + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" +) + +func TestServer_RouteVote(t *testing.T) { + srvAddr, stopSrv := runServer(t) + defer stopSrv() + + rtClient, cc := newClient(t, srvAddr) + defer cc.Close() + + routeID1 := "1" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("when route ID is not opened", func(t *testing.T) { + bidi, err := rtClient.RouteVote(ctx) + require.NoError(t, err) + defer func() { _ = bidi.CloseSend() }() + + errMsg := "rpc error: code = NotFound desc = route does not exist for UUID 1" + + t.Run("VoteTransaction fails", func(t *testing.T) { + voteTxReq := &gitalypb.VoteTransactionRequest{RouteUuid: routeID1} + _, err := rtClient.VoteTransaction(ctx, voteTxReq) + require.Error(t, err) + testhelper.RequireGrpcError(t, err, codes.NotFound) + require.Contains(t, err.Error(), errMsg) + }) + + t.Run("StopTransaction fails", func(t *testing.T) { + stopTxReq := &gitalypb.StopTransactionRequest{RouteUuid: routeID1} + _, err := rtClient.StopTransaction(ctx, stopTxReq) + require.Error(t, err) + testhelper.RequireGrpcError(t, err, codes.NotFound) + require.Contains(t, err.Error(), errMsg) + }) + }) + + // opens a session and waits for confirmation of setup + openTxSession := func(t *testing.T, bidi gitalypb.RefTransaction_RouteVoteClient, routeID string) { + expectResponse := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_OpenRouteRequest{ + &gitalypb.RouteVoteRequest_OpenRoute{}, + }, + } + err := bidi.Send(expectResponse) + require.NoError(t, err) + + // wait for confirmation of message to avoid race condition + actualResponse, err := bidi.Recv() + require.NoError(t, err) + testhelper.ProtoEqual(t, expectResponse, actualResponse) + } + + // recvAndSend simulates a Praefect client receiving a request (recv) + // from a Gitaly client, and then replying with a response (send) + recvAndSend := func(t *testing.T, bidi gitalypb.RefTransaction_RouteVoteClient, recv proto.Message, send *gitalypb.RouteVoteRequest) { + actualRecv, err := bidi.Recv() + if !assert.NoError(t, err) { + return + } + if !assert.True(t, proto.Equal(recv, actualRecv)) { + return + } + + err = bidi.Send(send) + if !assert.NoError(t, err) { + return + } + } + + // performs the Gitaly client RPC, and the Praefect handling of that + // call and verifies all expected messages are routed correctly + assertVoteRequestSucceeds := func(t *testing.T, ctx context.Context, routeID string, bidi gitalypb.RefTransaction_RouteVoteClient, request *gitalypb.VoteTransactionRequest, response *gitalypb.VoteTransactionResponse) { + done := make(chan struct{}) + defer func() { <-done }() + go func() { + defer close(done) + recv := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_VoteTxRequest{ + request, + }, + } + send := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_VoteTxResponse{ + response, + }, + } + recvAndSend(t, bidi, recv, send) + }() + + actualResp, err := rtClient.VoteTransaction(ctx, request) + require.NoError(t, err) + testhelper.ProtoEqual(t, response, actualResp) + } + + // performs a Gitaly RPC, and the Praefect handling of that call will + // send back an error and verifies all messages are routed correctly + assertVoteRequestFailure := func(t *testing.T, ctx context.Context, routeID string, bidi gitalypb.RefTransaction_RouteVoteClient, request *gitalypb.VoteTransactionRequest, statusErr *gitalypb.RouteVoteRequest_Status) { + done := make(chan struct{}) + defer func() { <-done }() + go func() { + defer close(done) + recv := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_VoteTxRequest{ + request, + }, + } + send := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_Error{ + statusErr, + }, + } + recvAndSend(t, bidi, recv, send) + }() + + _, err := rtClient.VoteTransaction(ctx, request) + require.Error(t, err) + require.Equal(t, codes.Code(statusErr.Code), status.Code(err)) + require.Contains(t, err.Error(), statusErr.Message) + } + + // performs the Gitaly client RPC, and the Praefect handling of that + // call and verifies all expected messages are routed correctly + assertStopRequestSucceeds := func(t *testing.T, ctx context.Context, routeID string, bidi gitalypb.RefTransaction_RouteVoteClient, request *gitalypb.StopTransactionRequest, response *gitalypb.StopTransactionResponse) { + done := make(chan struct{}) + defer func() { <-done }() + go func() { + defer close(done) + recv := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_StopTxRequest{ + request, + }, + } + send := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_StopTxResponse{ + response, + }, + } + recvAndSend(t, bidi, recv, send) + }() + + actualResp, err := rtClient.StopTransaction(ctx, request) + require.NoError(t, err) + testhelper.ProtoEqual(t, response, actualResp) + } + + // performs a Gitaly RPC, and the Praefect handling of that call will + // send back an error and verifies all messages are routed correctly + assertStopRequestFailure := func(t *testing.T, ctx context.Context, routeID string, bidi gitalypb.RefTransaction_RouteVoteClient, request *gitalypb.StopTransactionRequest, statusErr *gitalypb.RouteVoteRequest_Status) { + done := make(chan struct{}) + defer func() { <-done }() + go func() { + defer close(done) + recv := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_StopTxRequest{ + request, + }, + } + send := &gitalypb.RouteVoteRequest{ + RouteUuid: routeID, + Msg: &gitalypb.RouteVoteRequest_Error{ + statusErr, + }, + } + recvAndSend(t, bidi, recv, send) + }() + + _, err := rtClient.StopTransaction(ctx, request) + require.Error(t, err) + require.Equal(t, codes.Code(statusErr.Code), status.Code(err)) + require.Contains(t, err.Error(), statusErr.Message) + } + + t.Run("route opened", func(t *testing.T) { + bidi, err := rtClient.RouteVote(ctx) + require.NoError(t, err) + defer func() { _ = bidi.CloseSend() }() + + openTxSession(t, bidi, routeID1) + + t.Run("opening additional route fails", func(t *testing.T) { + bidi, err := rtClient.RouteVote(ctx) + require.NoError(t, err) + defer func() { _ = bidi.CloseSend() }() + + err = bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: routeID1, + Msg: &gitalypb.RouteVoteRequest_OpenRouteRequest{ + &gitalypb.RouteVoteRequest_OpenRoute{}, + }, + }) + require.NoError(t, err) + + _, err = bidi.Recv() + require.Error(t, err) + + testhelper.RequireGrpcError(t, err, codes.AlreadyExists) + }) + + t.Run("VoteTransaction request succeeds", func(t *testing.T) { + request := &gitalypb.VoteTransactionRequest{ + RouteUuid: routeID1, + Node: "test", + } + response := &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_COMMIT, + } + assertVoteRequestSucceeds(t, ctx, routeID1, bidi, request, response) + }) + + t.Run("VoteTransaction request fails", func(t *testing.T) { + request := &gitalypb.VoteTransactionRequest{ + RouteUuid: routeID1, + Node: "test", + } + statusErr := &gitalypb.RouteVoteRequest_Status{ + Code: 42, + Message: "nope, try again", + } + assertVoteRequestFailure(t, ctx, routeID1, bidi, request, statusErr) + }) + + t.Run("StopTransaction request succeeds", func(t *testing.T) { + request := &gitalypb.StopTransactionRequest{ + RouteUuid: routeID1, + } + response := &gitalypb.StopTransactionResponse{} + assertStopRequestSucceeds(t, ctx, routeID1, bidi, request, response) + }) + + t.Run("StopTransaction request fails", func(t *testing.T) { + request := &gitalypb.StopTransactionRequest{ + RouteUuid: routeID1, + } + statusErr := &gitalypb.RouteVoteRequest_Status{ + Code: 42, + Message: "nope, try again", + } + assertStopRequestFailure(t, ctx, routeID1, bidi, request, statusErr) + }) + }) + + t.Run("multiple routes opened from many Praefects", func(t *testing.T) { + bidiClients := make([]gitalypb.RefTransaction_RouteVoteClient, 1000) + + // open many concurrent route sessions + for i := 0; i < 1000; i++ { + bidi, err := rtClient.RouteVote(ctx) + require.NoError(t, err) + defer func() { _ = bidi.CloseSend() }() + + openTxSession(t, bidi, fmt.Sprint(i)) + + bidiClients[i] = bidi + } + + t.Run("VoteTransaction RPC routing", func(t *testing.T) { + // send many requests + for i := 0; i < 1000; i++ { + request := &gitalypb.VoteTransactionRequest{ + RouteUuid: fmt.Sprint(i), + Node: fmt.Sprint(i), + } + response := &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_STOP, + } + + assertVoteRequestSucceeds(t, ctx, fmt.Sprint(i), bidiClients[i], request, response) + + statusErr := &gitalypb.RouteVoteRequest_Status{ + Code: 42, + Message: fmt.Sprint(i), + } + + assertVoteRequestFailure(t, ctx, fmt.Sprint(i), bidiClients[i], request, statusErr) + } + }) + + t.Run("StopTransaction RPC routing", func(t *testing.T) { + // send many requests + for i := 0; i < 1000; i++ { + request := &gitalypb.StopTransactionRequest{ + RouteUuid: fmt.Sprint(i), + } + response := &gitalypb.StopTransactionResponse{} + + assertStopRequestSucceeds(t, ctx, fmt.Sprint(i), bidiClients[i], request, response) + + statusErr := &gitalypb.RouteVoteRequest_Status{ + Code: 42, + Message: fmt.Sprint(i), + } + + assertStopRequestFailure(t, ctx, fmt.Sprint(i), bidiClients[i], request, statusErr) + } + }) + }) +} + +func runServer(t *testing.T) (string, func()) { + srv := testhelper.NewServer(t, nil, nil) + + gitalypb.RegisterRefTransactionServer(srv.GrpcServer(), NewServer()) + reflection.Register(srv.GrpcServer()) + + srv.Start(t) + + return "unix://" + srv.Socket(), srv.Stop +} + +func newClient(t *testing.T, serverSocketPath string) (gitalypb.RefTransactionClient, *grpc.ClientConn) { + connOpts := []grpc.DialOption{ + grpc.WithInsecure(), + } + + conn, err := grpc.Dial(serverSocketPath, connOpts...) + if err != nil { + t.Fatal(err) + } + + return gitalypb.NewRefTransactionClient(conn), conn +} + +func TestMain(m *testing.M) { + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + cleanup := testhelper.Configure() + defer cleanup() + + return m.Run() +} diff --git a/internal/gitaly/transaction/manager.go b/internal/gitaly/transaction/manager.go index 5a5033d0d157d567007bebb4de445f645ea36d56..7aa7e03bc5859db7e60e6c12be46b1f31dfeb7d5 100644 --- a/internal/gitaly/transaction/manager.go +++ b/internal/gitaly/transaction/manager.go @@ -101,6 +101,7 @@ func (m *PoolManager) Vote(ctx context.Context, tx metadata.Transaction, server TransactionId: tx.ID, Node: tx.Node, ReferenceUpdatesHash: hash, + RouteUuid: tx.RouteUUID, }) if err != nil { logger.WithError(err).Error("vote failed") @@ -130,6 +131,7 @@ func (m *PoolManager) Stop(ctx context.Context, tx metadata.Transaction, server if _, err := client.StopTransaction(ctx, &gitalypb.StopTransactionRequest{ TransactionId: tx.ID, + RouteUuid: tx.RouteUUID, }); err != nil { m.log(ctx).WithFields(logrus.Fields{ "transaction.id": tx.ID, diff --git a/internal/gitaly/transaction/manager_test.go b/internal/gitaly/transaction/manager_test.go index e1d3044f8184099d83e4b93d510bc240d21d816f..1f11d132758aaa5a432aa289dd29d46b5c9bd852 100644 --- a/internal/gitaly/transaction/manager_test.go +++ b/internal/gitaly/transaction/manager_test.go @@ -16,6 +16,7 @@ import ( ) type testTransactionServer struct { + *gitalypb.UnimplementedRefTransactionServer vote func(*gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) stop func(*gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) } diff --git a/internal/metadata/featureflag/feature_flags.go b/internal/metadata/featureflag/feature_flags.go index d062f96d6f7162bb37fcbefb42e1bdb789e8ed82..d2365144b2804be4c6ef879754802ba42a3f6316 100644 --- a/internal/metadata/featureflag/feature_flags.go +++ b/internal/metadata/featureflag/feature_flags.go @@ -36,6 +36,9 @@ var ( GoGetNewLFSPointers = FeatureFlag{Name: "go_get_new_lfs_pointers", OnByDefault: false} // UploadPackGitalyHooks makes git-upload-pack use gitaly-hooks to run pack-objects UploadPackGitalyHooks = FeatureFlag{Name: "upload_pack_gitaly_hooks", OnByDefault: false} + // GitalyTxSvc switches from using the Praefect transaction service to + // the Gitaly transaction service for an operation + GitalyTxSvc = FeatureFlag{Name: "g_tx_svc", OnByDefault: false} // TxApplyBfgObjectMapStream enables transactions for ApplyBfgObjectMapStream TxApplyBfgObjectMapStream = FeatureFlag{Name: "tx_apply_bfg_object_map_stream", OnByDefault: true} @@ -107,6 +110,7 @@ var ( var All = []FeatureFlag{ DistributedReads, LogCommandStats, + GitalyTxSvc, ReferenceTransactions, GoUserCherryPick, GoUserUpdateBranch, diff --git a/internal/praefect/coordinator.go b/internal/praefect/coordinator.go index 5036d569f958f06797a9a7af7964eabc7c1a8982..b093faf540d5f9003ffa21a5b0503ef39d31edeb 100644 --- a/internal/praefect/coordinator.go +++ b/internal/praefect/coordinator.go @@ -110,11 +110,6 @@ var transactionRPCs = map[string]transactionsCondition{ "/gitaly.RepositoryService/RestoreCustomHooks": transactionsDisabled, "/gitaly.RepositoryService/SetConfig": transactionsDisabled, "/gitaly.RepositoryService/WriteCommitGraph": transactionsDisabled, - - // These shouldn't ever use transactions for the sake of not creating - // cyclic dependencies. - "/gitaly.RefTransaction/StopTransaction": transactionsDisabled, - "/gitaly.RefTransaction/VoteTransaction": transactionsDisabled, } // forcePrimaryRoutingRPCs tracks RPCs which need to always get routed to the primary. This should @@ -460,7 +455,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall errByNode: make(map[string]error), } - injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true) + injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), route.Primary.Storage, true, c.txMgr.RouteUUID().String()) if err != nil { return nil, err } @@ -478,7 +473,7 @@ func (c *Coordinator) mutatorStreamParameters(ctx context.Context, call grpcCall return nil, err } - injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false) + injectedCtx, err := metadata.InjectTransaction(ctx, transaction.ID(), secondary.Storage, false, c.txMgr.RouteUUID().String()) if err != nil { return nil, err } diff --git a/internal/praefect/coordinator_test.go b/internal/praefect/coordinator_test.go index dc4dd77bce46a327b191377b36eb0b5e1ad2829e..3ab5e5ffa32ed6f453e74bd9fc0429a4ec04e9cb 100644 --- a/internal/praefect/coordinator_test.go +++ b/internal/praefect/coordinator_test.go @@ -13,11 +13,13 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/empty" + "github.com/google/uuid" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/client" "gitlab.com/gitlab-org/gitaly/internal/git/gittest" + gtransaction "gitlab.com/gitlab-org/gitaly/internal/gitaly/service/transaction" "gitlab.com/gitlab-org/gitaly/internal/metadata/featureflag" "gitlab.com/gitlab-org/gitaly/internal/middleware/metadatahandler" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" @@ -276,7 +278,8 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { }, } - txMgr := transactions.NewManager(conf) + routeUUID := uuid.New() + txMgr := transactions.NewManager(conf, transactions.WithRouteUUID(routeUUID)) coordinator := NewCoordinator( datastore.NewMemoryReplicationEventQueue(conf), @@ -340,6 +343,117 @@ func TestStreamDirectorMutator_StopTransaction(t *testing.T) { err = streamParams.RequestFinalizer() require.NoError(t, err) + + t.Run("with Gitaly transaction service enabled", func(t *testing.T) { + ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, featureflag.GitalyTxSvc) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // register a gitaly transaction service for each node + gitalyClients := map[string]gitalypb.RefTransactionClient{} + connSet := map[string]map[string]*grpc.ClientConn{} + for _, vs := range conf.VirtualStorages { + connSet[vs.Name] = map[string]*grpc.ClientConn{} + for _, node := range vs.Nodes { + srv := testhelper.NewServer(t, nil, nil) + gitalypb.RegisterRefTransactionServer( + srv.GrpcServer(), gtransaction.NewServer()) + + srv.Start(t) + defer srv.Stop() + + addr := "unix://" + srv.Socket() + + cc, err := client.Dial(addr, nil) + require.NoError(t, err) + defer cc.Close() + + node.Address = addr + gitalyClients[node.Storage] = gitalypb.NewRefTransactionClient(cc) + connSet[vs.Name][node.Storage] = cc + } + } + + join := txMgr.StartRoutingVotes(ctx, connSet) + defer join() + + // wait until the routes are up + + streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) + require.NoError(t, err) + defer func() { require.NoError(t, streamParams.RequestFinalizer()) }() + + transaction, err := praefect_metadata.TransactionFromContext(streamParams.Primary().Ctx) + require.NoError(t, err) + + var wg sync.WaitGroup + var syncWG sync.WaitGroup + + wg.Add(2) + defer wg.Wait() + + syncWG.Add(2) + + stopped := make(chan struct{}) // tells us when the transaction has been stopped + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + + req := &gitalypb.VoteTransactionRequest{ + Repository: &repo, + TransactionId: transaction.ID, + Node: "primary", + ReferenceUpdatesHash: vote[:], + RouteUuid: routeUUID.String(), + } + resp, err := gitalyClients["primary"].VoteTransaction(ctx, req) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, resp.State) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + stopReq := &gitalypb.StopTransactionRequest{ + Repository: &repo, + TransactionId: transaction.ID, + RouteUuid: routeUUID.String(), + } + _, err = gitalyClients["primary"].StopTransaction(ctx, stopReq) + require.NoError(t, err) + close(stopped) + }() + + go func() { + defer wg.Done() + + vote := sha1.Sum([]byte("vote")) + + req := &gitalypb.VoteTransactionRequest{ + Repository: &repo, + TransactionId: transaction.ID, + Node: "secondary", + ReferenceUpdatesHash: vote[:], + RouteUuid: routeUUID.String(), + } + resp, err := gitalyClients["secondary"].VoteTransaction(ctx, req) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_COMMIT, resp.State) + + // Assure that at least one vote was agreed on. + syncWG.Done() + syncWG.Wait() + + <-stopped + resp, err = gitalyClients["secondary"].VoteTransaction(ctx, req) + require.NoError(t, err) + require.Equal(t, gitalypb.VoteTransactionResponse_STOP, resp.State) + + cancel() // test is done, stop routing votes + }() + }) } func TestStreamDirectorAccessor(t *testing.T) { diff --git a/internal/praefect/metadata/transaction.go b/internal/praefect/metadata/transaction.go index 08e0a43baeb05b4b89fe8ed97b54557d55487d28..a3c7b7d5a545332f124a65d89eda16a4b6ef653d 100644 --- a/internal/praefect/metadata/transaction.go +++ b/internal/praefect/metadata/transaction.go @@ -30,6 +30,9 @@ type Transaction struct { Node string `json:"node"` // Primary identifies the node's role in this transaction Primary bool `json:"primary"` + // RouteUUID is used to properly route a vote to the originating + // Praefect + RouteUUID string `json:"route_uuid"` } // serialize serializes a `Transaction` into a string. @@ -57,11 +60,12 @@ func transactionFromSerialized(serialized string) (Transaction, error) { } // InjectTransaction injects reference transaction metadata into an incoming context -func InjectTransaction(ctx context.Context, tranasctionID uint64, node string, primary bool) (context.Context, error) { +func InjectTransaction(ctx context.Context, tranasctionID uint64, node string, primary bool, routeUUID string) (context.Context, error) { transaction := Transaction{ - ID: tranasctionID, - Node: node, - Primary: primary, + ID: tranasctionID, + Node: node, + Primary: primary, + RouteUUID: routeUUID, } serialized, err := transaction.serialize() diff --git a/internal/praefect/service/transaction/server.go b/internal/praefect/service/transaction/server.go index ceabc22825588aaf6e8a229b44d1b4d414e3040f..c5d49430c670bbf143b0d58cdfb76dbd432f40ec 100644 --- a/internal/praefect/service/transaction/server.go +++ b/internal/praefect/service/transaction/server.go @@ -2,15 +2,13 @@ package transaction import ( "context" - "errors" - "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/transactions" "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" - "google.golang.org/grpc/codes" ) type Server struct { + *gitalypb.UnimplementedRefTransactionServer txMgr *transactions.Manager } @@ -25,28 +23,7 @@ func NewServer(txMgr *transactions.Manager) gitalypb.RefTransactionServer { // completed. func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransactionRequest) (*gitalypb.VoteTransactionResponse, error) { err := s.txMgr.VoteTransaction(ctx, in.TransactionId, in.Node, in.ReferenceUpdatesHash) - if err != nil { - switch { - case errors.Is(err, transactions.ErrNotFound): - return nil, helper.ErrNotFound(err) - case errors.Is(err, transactions.ErrTransactionCanceled): - return nil, helper.DecorateError(codes.Canceled, err) - case errors.Is(err, transactions.ErrTransactionStopped): - return &gitalypb.VoteTransactionResponse{ - State: gitalypb.VoteTransactionResponse_STOP, - }, nil - case errors.Is(err, transactions.ErrTransactionFailed): - return &gitalypb.VoteTransactionResponse{ - State: gitalypb.VoteTransactionResponse_ABORT, - }, nil - default: - return nil, helper.ErrInternal(err) - } - } - - return &gitalypb.VoteTransactionResponse{ - State: gitalypb.VoteTransactionResponse_COMMIT, - }, nil + return transactions.VoteResponseFor(err) } // StopTransaction is called by a client who wants to gracefully stop a @@ -54,18 +31,6 @@ func (s *Server) VoteTransaction(ctx context.Context, in *gitalypb.VoteTransacti // will not get accepted anymore. It is fine to call this RPC multiple times on // the same transaction. func (s *Server) StopTransaction(ctx context.Context, in *gitalypb.StopTransactionRequest) (*gitalypb.StopTransactionResponse, error) { - if err := s.txMgr.StopTransaction(ctx, in.TransactionId); err != nil { - switch { - case errors.Is(err, transactions.ErrNotFound): - return nil, helper.ErrNotFound(err) - case errors.Is(err, transactions.ErrTransactionCanceled): - return nil, helper.DecorateError(codes.Canceled, err) - case errors.Is(err, transactions.ErrTransactionStopped): - return &gitalypb.StopTransactionResponse{}, nil - default: - return nil, helper.ErrInternal(err) - } - } - - return &gitalypb.StopTransactionResponse{}, nil + err := s.txMgr.StopTransaction(ctx, in.TransactionId) + return transactions.StopResponseFor(err) } diff --git a/internal/praefect/transactions/manager.go b/internal/praefect/transactions/manager.go index abfbe8fdc01674e89ec00f40640322e1e4aba463..c04dd0eb887674ff62e3f88055eca18c0003e3cd 100644 --- a/internal/praefect/transactions/manager.go +++ b/internal/praefect/transactions/manager.go @@ -11,10 +11,16 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/internal/helper" "gitlab.com/gitlab-org/gitaly/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var ErrNotFound = errors.New("transaction not found") @@ -29,6 +35,7 @@ type Manager struct { counterMetric *prometheus.CounterVec delayMetric *prometheus.HistogramVec subtransactionsMetric prometheus.Histogram + routeUUID uuid.UUID } // TransactionIDGenerator is an interface for types that can generate transaction IDs. @@ -68,6 +75,14 @@ func WithTransactionIDGenerator(generator TransactionIDGenerator) ManagerOpt { } } +// WithRouteUUID is an option to manually set the manager's UUID used for +// routing transaction messages. +func WithRouteUUID(routeUUID uuid.UUID) ManagerOpt { + return func(mgr *Manager) { + mgr.routeUUID = routeUUID + } +} + // NewManager creates a new transactions Manager. func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { mgr := &Manager{ @@ -99,6 +114,7 @@ func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { Buckets: []float64{0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0}, }, ), + routeUUID: uuid.New(), } for _, opt := range opts { @@ -108,6 +124,9 @@ func NewManager(cfg config.Config, opts ...ManagerOpt) *Manager { return mgr } +// RouteUUID is a getter for the routeUUID +func (mgr *Manager) RouteUUID() uuid.UUID { return mgr.routeUUID } + func (mgr *Manager) Describe(descs chan<- *prometheus.Desc) { prometheus.DescribeByCollect(mgr, descs) } @@ -127,6 +146,182 @@ func (mgr *Manager) log(ctx context.Context) logrus.FieldLogger { // from the transaction manager. type CancelFunc func() error +// VoteResponseFor returns the appropriate response based on the provided error +func VoteResponseFor(err error) (*gitalypb.VoteTransactionResponse, error) { + if err != nil { + switch { + case errors.Is(err, ErrNotFound): + return nil, helper.ErrNotFound(err) + case errors.Is(err, ErrTransactionCanceled): + return nil, helper.DecorateError(codes.Canceled, err) + case errors.Is(err, ErrTransactionStopped): + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_STOP, + }, nil + case errors.Is(err, ErrTransactionFailed): + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_ABORT, + }, nil + default: + return nil, helper.ErrInternal(err) + } + } + + return &gitalypb.VoteTransactionResponse{ + State: gitalypb.VoteTransactionResponse_COMMIT, + }, nil +} + +// StopResponseFor will return the appropriate response for the provided error +func StopResponseFor(err error) (*gitalypb.StopTransactionResponse, error) { + if err != nil { + switch { + case errors.Is(err, ErrNotFound): + return nil, helper.ErrNotFound(err) + case errors.Is(err, ErrTransactionCanceled): + return nil, helper.DecorateError(codes.Canceled, err) + case errors.Is(err, ErrTransactionStopped): + return &gitalypb.StopTransactionResponse{}, nil + default: + return nil, helper.ErrInternal(err) + } + } + + return &gitalypb.StopTransactionResponse{}, nil +} + +func routeVoteErr(routeUUID uuid.UUID, err error) *gitalypb.RouteVoteRequest { + c := codes.Unknown + if s, ok := status.FromError(err); ok { + c = s.Code() + } + return &gitalypb.RouteVoteRequest{ + RouteUuid: routeUUID.String(), + Msg: &gitalypb.RouteVoteRequest_Error{ + &gitalypb.RouteVoteRequest_Status{ + Code: int32(c), + Message: err.Error(), + }, + }, + } +} + +// StartRoutingVotes will repeatedly attempt to route votes from a Gitaly hosted +// transaction service back to this Praefect server. A goroutine is launched for +// each node and will attempt to recover from errors and reestablish a route +// stream until the context is cancelled. +// The connSet param is meant to be provided by praefect.Connections from a +// praefect.NodeSet. It is keyed first by virtual storage name, then Gitaly +// storage name. +func (mgr *Manager) StartRoutingVotes(ctx context.Context, connSet map[string]map[string]*grpc.ClientConn) func() { + wg := sync.WaitGroup{} + + for _, vs := range connSet { + for nodeName, cc := range vs { + wg.Add(1) + go func(nodeName string, cc *grpc.ClientConn) { + defer wg.Done() + + // only case where we stop attempting to route + // votes is when the parent context is + // cancelled or the equivalent gRPC error. + // All other errors we will log and retry + for { + err := mgr.routeVotes(ctx, nodeName, cc) + if errors.Is(err, context.Canceled) { + return + } + if status.Code(err) == codes.Canceled { + return + } + mgr.log(ctx). + WithError(err). + WithField("route", mgr.routeUUID). + WithField("node", nodeName). + Error("vote routing failed, retrying") + } + }(nodeName, cc) + } + } + + return wg.Wait +} + +func (mgr *Manager) routeVotes(ctx context.Context, nodeName string, cc *grpc.ClientConn) error { + bidi, err := gitalypb.NewRefTransactionClient(cc).RouteVote(ctx) + if err != nil { + return err + } + defer func() { + if err := bidi.CloseSend(); err != nil { + mgr.log(ctx). + WithError(err). + WithField("route", mgr.routeUUID). + WithField("node", nodeName). + Error("unable to close route session") + } + }() + + // open transaction on Gitaly node + if err := bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: mgr.routeUUID.String(), + Msg: &gitalypb.RouteVoteRequest_OpenRouteRequest{ + OpenRouteRequest: &gitalypb.RouteVoteRequest_OpenRoute{}, + }, + }); err != nil { + return err + } + // wait for confirmation + _, err = bidi.Recv() + if err != nil { + return err + } + + for { + resp, err := bidi.Recv() + if err != nil { + return err + } + + switch r := resp.Msg.(type) { + case *gitalypb.RouteVoteRequest_VoteTxRequest: + err := mgr.VoteTransaction(ctx, + r.VoteTxRequest.GetTransactionId(), + r.VoteTxRequest.GetNode(), + r.VoteTxRequest.GetReferenceUpdatesHash(), + ) + resp, err := VoteResponseFor(err) + if err != nil { + return bidi.Send(routeVoteErr(mgr.routeUUID, err)) + } + if err := bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: r.VoteTxRequest.GetRouteUuid(), + Msg: &gitalypb.RouteVoteRequest_VoteTxResponse{resp}, + }); err != nil { + return err + } + + case *gitalypb.RouteVoteRequest_StopTxRequest: + err := mgr.StopTransaction(ctx, + r.StopTxRequest.GetTransactionId(), + ) + resp, err := StopResponseFor(err) + if err != nil { + return bidi.Send(routeVoteErr(mgr.routeUUID, err)) + } + if err := bidi.Send(&gitalypb.RouteVoteRequest{ + RouteUuid: r.StopTxRequest.GetRouteUuid(), + Msg: &gitalypb.RouteVoteRequest_StopTxResponse{resp}, + }); err != nil { + return err + } + + default: + return fmt.Errorf("received unexpected type %T", r) + } + } +} + // RegisterTransaction registers a new reference transaction for a set of nodes // taking part in the transaction. `threshold` is the threshold at which an // election will succeed. It needs to be in the range `weight(voters)/2 < diff --git a/proto/go/gitalypb/transaction.pb.go b/proto/go/gitalypb/transaction.pb.go index 6f70b1cfab0c6b891942c82bc2b5f28f42695248..08024665b2f77f653b8ddac5419b7f28b2a28d00 100644 --- a/proto/go/gitalypb/transaction.pb.go +++ b/proto/go/gitalypb/transaction.pb.go @@ -61,7 +61,10 @@ type VoteTransactionRequest struct { // Name of the Gitaly node that's voting on a transaction. Node string `protobuf:"bytes,3,opt,name=node,proto3" json:"node,omitempty"` // SHA1 of the references that are to be updated - ReferenceUpdatesHash []byte `protobuf:"bytes,4,opt,name=reference_updates_hash,json=referenceUpdatesHash,proto3" json:"reference_updates_hash,omitempty"` + ReferenceUpdatesHash []byte `protobuf:"bytes,4,opt,name=reference_updates_hash,json=referenceUpdatesHash,proto3" json:"reference_updates_hash,omitempty"` + // Route UUID is used when the transaction service on Gitaly is used to + // route messages back to Praefect clients + RouteUuid string `protobuf:"bytes,5,opt,name=route_uuid,json=routeUuid,proto3" json:"route_uuid,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -120,6 +123,13 @@ func (m *VoteTransactionRequest) GetReferenceUpdatesHash() []byte { return nil } +func (m *VoteTransactionRequest) GetRouteUuid() string { + if m != nil { + return m.RouteUuid + } + return "" +} + type VoteTransactionResponse struct { State VoteTransactionResponse_TransactionState `protobuf:"varint,1,opt,name=state,proto3,enum=gitaly.VoteTransactionResponse_TransactionState" json:"state,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -162,7 +172,10 @@ func (m *VoteTransactionResponse) GetState() VoteTransactionResponse_Transaction type StopTransactionRequest struct { Repository *Repository `protobuf:"bytes,1,opt,name=repository,proto3" json:"repository,omitempty"` // ID of the transaction we're processing - TransactionId uint64 `protobuf:"varint,2,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"` + TransactionId uint64 `protobuf:"varint,2,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"` + // Route UUID is used when the transaction service on Gitaly is used to + // route messages back to Praefect clients + RouteUuid string `protobuf:"bytes,5,opt,name=route_uuid,json=routeUuid,proto3" json:"route_uuid,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -207,6 +220,13 @@ func (m *StopTransactionRequest) GetTransactionId() uint64 { return 0 } +func (m *StopTransactionRequest) GetRouteUuid() string { + if m != nil { + return m.RouteUuid + } + return "" +} + type StopTransactionResponse struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -238,42 +258,295 @@ func (m *StopTransactionResponse) XXX_DiscardUnknown() { var xxx_messageInfo_StopTransactionResponse proto.InternalMessageInfo +// RouteVoteRequest wraps Gitaly messages so that they can +// be properly routed between Praefects. It also allows Praefect to manage +// transactions. +type RouteVoteRequest struct { + // The route UUID allows Gitaly messages to be routed to the correct + // Praefect client + RouteUuid string `protobuf:"bytes,1,opt,name=route_uuid,json=routeUuid,proto3" json:"route_uuid,omitempty"` + // Types that are valid to be assigned to Msg: + // *RouteVoteRequest_OpenRouteRequest + // *RouteVoteRequest_Error + // *RouteVoteRequest_VoteTxRequest + // *RouteVoteRequest_VoteTxResponse + // *RouteVoteRequest_StopTxRequest + // *RouteVoteRequest_StopTxResponse + Msg isRouteVoteRequest_Msg `protobuf_oneof:"msg"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RouteVoteRequest) Reset() { *m = RouteVoteRequest{} } +func (m *RouteVoteRequest) String() string { return proto.CompactTextString(m) } +func (*RouteVoteRequest) ProtoMessage() {} +func (*RouteVoteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_2cc4e03d2c28c490, []int{4} +} + +func (m *RouteVoteRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RouteVoteRequest.Unmarshal(m, b) +} +func (m *RouteVoteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RouteVoteRequest.Marshal(b, m, deterministic) +} +func (m *RouteVoteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_RouteVoteRequest.Merge(m, src) +} +func (m *RouteVoteRequest) XXX_Size() int { + return xxx_messageInfo_RouteVoteRequest.Size(m) +} +func (m *RouteVoteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_RouteVoteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_RouteVoteRequest proto.InternalMessageInfo + +func (m *RouteVoteRequest) GetRouteUuid() string { + if m != nil { + return m.RouteUuid + } + return "" +} + +type isRouteVoteRequest_Msg interface { + isRouteVoteRequest_Msg() +} + +type RouteVoteRequest_OpenRouteRequest struct { + OpenRouteRequest *RouteVoteRequest_OpenRoute `protobuf:"bytes,2,opt,name=open_route_request,json=openRouteRequest,proto3,oneof"` +} + +type RouteVoteRequest_Error struct { + Error *RouteVoteRequest_Status `protobuf:"bytes,3,opt,name=error,proto3,oneof"` +} + +type RouteVoteRequest_VoteTxRequest struct { + VoteTxRequest *VoteTransactionRequest `protobuf:"bytes,5,opt,name=vote_tx_request,json=voteTxRequest,proto3,oneof"` +} + +type RouteVoteRequest_VoteTxResponse struct { + VoteTxResponse *VoteTransactionResponse `protobuf:"bytes,6,opt,name=vote_tx_response,json=voteTxResponse,proto3,oneof"` +} + +type RouteVoteRequest_StopTxRequest struct { + StopTxRequest *StopTransactionRequest `protobuf:"bytes,7,opt,name=stop_tx_request,json=stopTxRequest,proto3,oneof"` +} + +type RouteVoteRequest_StopTxResponse struct { + StopTxResponse *StopTransactionResponse `protobuf:"bytes,8,opt,name=stop_tx_response,json=stopTxResponse,proto3,oneof"` +} + +func (*RouteVoteRequest_OpenRouteRequest) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_Error) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_VoteTxRequest) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_VoteTxResponse) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_StopTxRequest) isRouteVoteRequest_Msg() {} + +func (*RouteVoteRequest_StopTxResponse) isRouteVoteRequest_Msg() {} + +func (m *RouteVoteRequest) GetMsg() isRouteVoteRequest_Msg { + if m != nil { + return m.Msg + } + return nil +} + +func (m *RouteVoteRequest) GetOpenRouteRequest() *RouteVoteRequest_OpenRoute { + if x, ok := m.GetMsg().(*RouteVoteRequest_OpenRouteRequest); ok { + return x.OpenRouteRequest + } + return nil +} + +func (m *RouteVoteRequest) GetError() *RouteVoteRequest_Status { + if x, ok := m.GetMsg().(*RouteVoteRequest_Error); ok { + return x.Error + } + return nil +} + +func (m *RouteVoteRequest) GetVoteTxRequest() *VoteTransactionRequest { + if x, ok := m.GetMsg().(*RouteVoteRequest_VoteTxRequest); ok { + return x.VoteTxRequest + } + return nil +} + +func (m *RouteVoteRequest) GetVoteTxResponse() *VoteTransactionResponse { + if x, ok := m.GetMsg().(*RouteVoteRequest_VoteTxResponse); ok { + return x.VoteTxResponse + } + return nil +} + +func (m *RouteVoteRequest) GetStopTxRequest() *StopTransactionRequest { + if x, ok := m.GetMsg().(*RouteVoteRequest_StopTxRequest); ok { + return x.StopTxRequest + } + return nil +} + +func (m *RouteVoteRequest) GetStopTxResponse() *StopTransactionResponse { + if x, ok := m.GetMsg().(*RouteVoteRequest_StopTxResponse); ok { + return x.StopTxResponse + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*RouteVoteRequest) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*RouteVoteRequest_OpenRouteRequest)(nil), + (*RouteVoteRequest_Error)(nil), + (*RouteVoteRequest_VoteTxRequest)(nil), + (*RouteVoteRequest_VoteTxResponse)(nil), + (*RouteVoteRequest_StopTxRequest)(nil), + (*RouteVoteRequest_StopTxResponse)(nil), + } +} + +// OpenRoute is sent from Praefect to Gitaly to open a new route +// session. All transactions requests from Gitaly with the specified ID will +// route to the Praefect that opened the session. Only one route can be +// opened per stream. Closing the stream will also close the route. +type RouteVoteRequest_OpenRoute struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RouteVoteRequest_OpenRoute) Reset() { *m = RouteVoteRequest_OpenRoute{} } +func (m *RouteVoteRequest_OpenRoute) String() string { return proto.CompactTextString(m) } +func (*RouteVoteRequest_OpenRoute) ProtoMessage() {} +func (*RouteVoteRequest_OpenRoute) Descriptor() ([]byte, []int) { + return fileDescriptor_2cc4e03d2c28c490, []int{4, 0} +} + +func (m *RouteVoteRequest_OpenRoute) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RouteVoteRequest_OpenRoute.Unmarshal(m, b) +} +func (m *RouteVoteRequest_OpenRoute) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RouteVoteRequest_OpenRoute.Marshal(b, m, deterministic) +} +func (m *RouteVoteRequest_OpenRoute) XXX_Merge(src proto.Message) { + xxx_messageInfo_RouteVoteRequest_OpenRoute.Merge(m, src) +} +func (m *RouteVoteRequest_OpenRoute) XXX_Size() int { + return xxx_messageInfo_RouteVoteRequest_OpenRoute.Size(m) +} +func (m *RouteVoteRequest_OpenRoute) XXX_DiscardUnknown() { + xxx_messageInfo_RouteVoteRequest_OpenRoute.DiscardUnknown(m) +} + +var xxx_messageInfo_RouteVoteRequest_OpenRoute proto.InternalMessageInfo + +// Status is copy of google.rpc.Status, which represents errors in gRPC +type RouteVoteRequest_Status struct { + Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RouteVoteRequest_Status) Reset() { *m = RouteVoteRequest_Status{} } +func (m *RouteVoteRequest_Status) String() string { return proto.CompactTextString(m) } +func (*RouteVoteRequest_Status) ProtoMessage() {} +func (*RouteVoteRequest_Status) Descriptor() ([]byte, []int) { + return fileDescriptor_2cc4e03d2c28c490, []int{4, 1} +} + +func (m *RouteVoteRequest_Status) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_RouteVoteRequest_Status.Unmarshal(m, b) +} +func (m *RouteVoteRequest_Status) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_RouteVoteRequest_Status.Marshal(b, m, deterministic) +} +func (m *RouteVoteRequest_Status) XXX_Merge(src proto.Message) { + xxx_messageInfo_RouteVoteRequest_Status.Merge(m, src) +} +func (m *RouteVoteRequest_Status) XXX_Size() int { + return xxx_messageInfo_RouteVoteRequest_Status.Size(m) +} +func (m *RouteVoteRequest_Status) XXX_DiscardUnknown() { + xxx_messageInfo_RouteVoteRequest_Status.DiscardUnknown(m) +} + +var xxx_messageInfo_RouteVoteRequest_Status proto.InternalMessageInfo + +func (m *RouteVoteRequest_Status) GetCode() int32 { + if m != nil { + return m.Code + } + return 0 +} + +func (m *RouteVoteRequest_Status) GetMessage() string { + if m != nil { + return m.Message + } + return "" +} + func init() { proto.RegisterEnum("gitaly.VoteTransactionResponse_TransactionState", VoteTransactionResponse_TransactionState_name, VoteTransactionResponse_TransactionState_value) proto.RegisterType((*VoteTransactionRequest)(nil), "gitaly.VoteTransactionRequest") proto.RegisterType((*VoteTransactionResponse)(nil), "gitaly.VoteTransactionResponse") proto.RegisterType((*StopTransactionRequest)(nil), "gitaly.StopTransactionRequest") proto.RegisterType((*StopTransactionResponse)(nil), "gitaly.StopTransactionResponse") + proto.RegisterType((*RouteVoteRequest)(nil), "gitaly.RouteVoteRequest") + proto.RegisterType((*RouteVoteRequest_OpenRoute)(nil), "gitaly.RouteVoteRequest.OpenRoute") + proto.RegisterType((*RouteVoteRequest_Status)(nil), "gitaly.RouteVoteRequest.Status") } func init() { proto.RegisterFile("transaction.proto", fileDescriptor_2cc4e03d2c28c490) } var fileDescriptor_2cc4e03d2c28c490 = []byte{ - // 385 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x52, 0xc1, 0x6e, 0xda, 0x40, - 0x14, 0xec, 0x52, 0x63, 0xc1, 0x2b, 0xa5, 0xee, 0xaa, 0x02, 0x97, 0x43, 0x6b, 0x59, 0xaa, 0xe4, - 0x43, 0x6b, 0x23, 0xe8, 0xa1, 0xd7, 0x52, 0xa9, 0x2a, 0x07, 0x44, 0xb5, 0x38, 0x39, 0x70, 0x41, - 0x0b, 0x5e, 0xb0, 0x25, 0xe2, 0x75, 0x76, 0x97, 0x03, 0x3f, 0x92, 0xe4, 0x7f, 0x22, 0x45, 0xf9, - 0xa6, 0x9c, 0x22, 0xbc, 0x01, 0x2c, 0x08, 0xca, 0x31, 0xb7, 0x7d, 0x33, 0x6f, 0xc6, 0x6f, 0x46, - 0x86, 0x8f, 0x4a, 0xd0, 0x54, 0xd2, 0x99, 0x4a, 0x78, 0xea, 0x67, 0x82, 0x2b, 0x8e, 0xcd, 0x45, - 0xa2, 0xe8, 0x72, 0xdd, 0x82, 0x65, 0x92, 0x2a, 0x8d, 0xb5, 0x6a, 0x32, 0xa6, 0x82, 0x45, 0x7a, - 0x72, 0x6f, 0x11, 0x34, 0xce, 0xb9, 0x62, 0xe1, 0x5e, 0x4b, 0xd8, 0xe5, 0x8a, 0x49, 0x85, 0x7f, - 0x01, 0x08, 0x96, 0x71, 0x99, 0x28, 0x2e, 0xd6, 0x36, 0x72, 0x90, 0xf7, 0xae, 0x83, 0x7d, 0xed, - 0xe8, 0x93, 0x1d, 0xd3, 0x33, 0x6e, 0xee, 0xbe, 0x23, 0x52, 0xd8, 0xc5, 0xdf, 0xa0, 0x5e, 0xb8, - 0x65, 0x92, 0x44, 0x76, 0xc9, 0x41, 0x9e, 0x41, 0xde, 0x17, 0xd0, 0x7e, 0x84, 0x31, 0x18, 0x29, - 0x8f, 0x98, 0xfd, 0xd6, 0x41, 0x5e, 0x95, 0xe4, 0x6f, 0xfc, 0x13, 0x1a, 0x82, 0xcd, 0x99, 0x60, - 0xe9, 0x8c, 0x4d, 0x56, 0x59, 0x44, 0x15, 0x93, 0x93, 0x98, 0xca, 0xd8, 0x36, 0x1c, 0xe4, 0xd5, - 0xc8, 0xa7, 0x1d, 0x7b, 0xa6, 0xc9, 0x7f, 0x54, 0xc6, 0xee, 0x15, 0x82, 0xe6, 0x51, 0x0a, 0x99, - 0xf1, 0x54, 0x32, 0xfc, 0x17, 0xca, 0x52, 0x51, 0xc5, 0xf2, 0x04, 0xf5, 0x4e, 0x7b, 0x9b, 0xe0, - 0xc4, 0xbe, 0x5f, 0xc0, 0x46, 0x1b, 0x1d, 0xd1, 0x72, 0xb7, 0x0b, 0xd6, 0x21, 0x85, 0x01, 0xcc, - 0x3f, 0xc3, 0xc1, 0xa0, 0x1f, 0x5a, 0x6f, 0x70, 0x15, 0xca, 0xbf, 0x7b, 0x43, 0x12, 0x5a, 0x08, - 0x57, 0xc0, 0x18, 0x85, 0xc3, 0xff, 0x56, 0xc9, 0x5d, 0x43, 0x63, 0xa4, 0x78, 0xf6, 0x0a, 0xed, - 0xba, 0x9f, 0xa1, 0x79, 0xf4, 0x69, 0x1d, 0xb1, 0x73, 0x8f, 0xa0, 0x4e, 0xd8, 0xbc, 0x40, 0xe1, - 0x31, 0x7c, 0x38, 0x28, 0x04, 0x7f, 0x39, 0xd9, 0x54, 0x9e, 0xa0, 0xf5, 0xf5, 0x85, 0x26, 0x5d, - 0xf3, 0xe1, 0xda, 0x2b, 0x55, 0xd0, 0xc6, 0xfb, 0xe0, 0x92, 0xbd, 0xf7, 0xf3, 0xed, 0xec, 0xbd, - 0x4f, 0x44, 0xd8, 0x7a, 0xf7, 0xda, 0xe3, 0xcd, 0xe6, 0x92, 0x4e, 0xfd, 0x19, 0xbf, 0x08, 0xf4, - 0xf3, 0x07, 0x17, 0x8b, 0x40, 0xeb, 0x83, 0xfc, 0x2f, 0x0f, 0x16, 0xfc, 0x69, 0xce, 0xa6, 0x53, - 0x33, 0x87, 0xba, 0x8f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x5f, 0x19, 0xe6, 0x7d, 0x2f, 0x03, 0x00, + // 609 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x54, 0xdf, 0x6e, 0xd3, 0x3e, + 0x14, 0xae, 0xb7, 0xa4, 0x5b, 0x4f, 0xb7, 0x2e, 0x3f, 0xeb, 0xa7, 0x2d, 0x44, 0x02, 0xaa, 0x4a, + 0x48, 0xbd, 0x80, 0x74, 0xea, 0x10, 0x70, 0x4b, 0x27, 0xa1, 0x4c, 0x68, 0x2a, 0xf2, 0x3a, 0x2e, + 0xb8, 0x89, 0xb2, 0xc6, 0x6b, 0x23, 0x6d, 0x71, 0xb0, 0x1d, 0xb4, 0x5d, 0xf3, 0x0e, 0xb0, 0x47, + 0xe1, 0x09, 0x78, 0x10, 0x9e, 0x80, 0x47, 0x40, 0xb6, 0x97, 0x34, 0xed, 0x88, 0x7a, 0xc9, 0x9d, + 0x7d, 0xfe, 0x7c, 0xf9, 0xce, 0xf7, 0x1d, 0x07, 0xfe, 0x93, 0x3c, 0x4a, 0x45, 0x34, 0x95, 0x09, + 0x4b, 0xfd, 0x8c, 0x33, 0xc9, 0x70, 0x73, 0x96, 0xc8, 0xe8, 0xea, 0xd6, 0x83, 0xab, 0x24, 0x95, + 0x26, 0xe6, 0xed, 0x88, 0x79, 0xc4, 0x69, 0x6c, 0x6e, 0xbd, 0x5f, 0x08, 0xf6, 0x3f, 0x32, 0x49, + 0x27, 0x8b, 0x5e, 0x42, 0x3f, 0xe7, 0x54, 0x48, 0xfc, 0x06, 0x80, 0xd3, 0x8c, 0x89, 0x44, 0x32, + 0x7e, 0xeb, 0xa2, 0x2e, 0xea, 0xb7, 0x87, 0xd8, 0x37, 0x88, 0x3e, 0x29, 0x33, 0x23, 0xeb, 0xee, + 0xe7, 0x73, 0x44, 0x2a, 0xb5, 0xf8, 0x19, 0x74, 0x2a, 0x5c, 0xc2, 0x24, 0x76, 0x37, 0xba, 0xa8, + 0x6f, 0x91, 0xdd, 0x4a, 0xf4, 0x24, 0xc6, 0x18, 0xac, 0x94, 0xc5, 0xd4, 0xdd, 0xec, 0xa2, 0x7e, + 0x8b, 0xe8, 0x33, 0x7e, 0x09, 0xfb, 0x9c, 0x5e, 0x52, 0x4e, 0xd3, 0x29, 0x0d, 0xf3, 0x2c, 0x8e, + 0x24, 0x15, 0xe1, 0x3c, 0x12, 0x73, 0xd7, 0xea, 0xa2, 0xfe, 0x0e, 0xf9, 0xbf, 0xcc, 0x9e, 0x9b, + 0x64, 0x10, 0x89, 0x39, 0x7e, 0x0c, 0xc0, 0x59, 0x2e, 0x69, 0x98, 0xe7, 0x49, 0xec, 0xda, 0x1a, + 0xaf, 0xa5, 0x23, 0xe7, 0x79, 0x12, 0xf7, 0xbe, 0x21, 0x38, 0x78, 0x30, 0xa4, 0xc8, 0x58, 0x2a, + 0x28, 0x7e, 0x07, 0xb6, 0x90, 0x91, 0xa4, 0x7a, 0xc0, 0xce, 0xf0, 0xb0, 0x18, 0xb0, 0xa6, 0xde, + 0xaf, 0xc4, 0xce, 0x54, 0x1f, 0x31, 0xed, 0xbd, 0x23, 0x70, 0x56, 0x53, 0x18, 0xa0, 0x79, 0x3c, + 0x3e, 0x3d, 0x3d, 0x99, 0x38, 0x0d, 0xdc, 0x02, 0xfb, 0xed, 0x68, 0x4c, 0x26, 0x0e, 0xc2, 0xdb, + 0x60, 0x9d, 0x4d, 0xc6, 0x1f, 0x9c, 0x8d, 0xde, 0x1d, 0x82, 0xfd, 0x33, 0xc9, 0xb2, 0x7f, 0xa1, + 0xfe, 0x1a, 0xcd, 0x1e, 0xc1, 0xc1, 0x03, 0x66, 0x46, 0x82, 0xde, 0x0f, 0x0b, 0x1c, 0xa2, 0x0a, + 0x95, 0x46, 0x05, 0xdf, 0x65, 0x38, 0xb4, 0x02, 0x87, 0x09, 0x60, 0x96, 0xd1, 0x34, 0x34, 0x35, + 0xdc, 0x34, 0x69, 0x62, 0xed, 0x61, 0xaf, 0x1c, 0x6b, 0x05, 0xd4, 0x1f, 0x67, 0x34, 0xd5, 0xc1, + 0xa0, 0x41, 0x1c, 0x56, 0x5c, 0x8a, 0x4f, 0xbe, 0x06, 0x9b, 0x72, 0xce, 0xb8, 0x5e, 0xa0, 0xf6, + 0xf0, 0x69, 0x2d, 0x8c, 0x72, 0x23, 0x17, 0x41, 0x83, 0x98, 0x7a, 0x1c, 0xc0, 0xde, 0x17, 0x26, + 0x69, 0x28, 0x6f, 0x4a, 0x26, 0xb6, 0x86, 0x78, 0x52, 0xeb, 0xbe, 0xae, 0x0a, 0x1a, 0x64, 0x57, + 0x35, 0x4e, 0x6e, 0x0a, 0x0a, 0xef, 0xc1, 0x59, 0x20, 0x19, 0x79, 0xdc, 0xe6, 0x32, 0x9b, 0x9a, + 0x45, 0x0a, 0x1a, 0xa4, 0x53, 0x60, 0xdd, 0xaf, 0x62, 0x00, 0x7b, 0x42, 0xb2, 0xac, 0x4a, 0x6b, + 0x6b, 0x99, 0xd6, 0xdf, 0x77, 0x45, 0xd1, 0x52, 0x8d, 0x4b, 0xb4, 0x16, 0x48, 0xf7, 0xb4, 0xb6, + 0x97, 0x69, 0xd5, 0x98, 0xab, 0x68, 0x15, 0x58, 0x26, 0xe2, 0xb5, 0xa1, 0x55, 0xfa, 0xe0, 0xbd, + 0x82, 0xa6, 0x51, 0x53, 0xbd, 0xde, 0xa9, 0x7a, 0xbd, 0xca, 0x6a, 0x9b, 0xe8, 0x33, 0x76, 0x61, + 0xeb, 0x9a, 0x0a, 0x11, 0xcd, 0xa8, 0xb6, 0xb6, 0x45, 0x8a, 0xeb, 0xc8, 0x86, 0xcd, 0x6b, 0x31, + 0x1b, 0x7e, 0xdd, 0x80, 0x0e, 0xa1, 0x97, 0x95, 0x0f, 0x63, 0x02, 0x7b, 0x2b, 0x12, 0xe1, 0x35, + 0x36, 0x78, 0xeb, 0xb4, 0x55, 0x98, 0x2b, 0xf3, 0xe1, 0x35, 0x1a, 0x7a, 0xeb, 0x84, 0xc1, 0xc7, + 0xd0, 0x2a, 0x17, 0x0b, 0xbb, 0x75, 0xbb, 0xe6, 0xd5, 0x66, 0xfa, 0xe8, 0x10, 0x79, 0xd6, 0xef, + 0xef, 0x7d, 0x34, 0x3a, 0xfc, 0xa4, 0x8a, 0xae, 0xa2, 0x0b, 0x7f, 0xca, 0xae, 0x07, 0xe6, 0xf8, + 0x82, 0xf1, 0xd9, 0xc0, 0xb4, 0x0e, 0xf4, 0xaf, 0x79, 0x30, 0x63, 0xf7, 0xf7, 0xec, 0xe2, 0xa2, + 0xa9, 0x43, 0x47, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x94, 0x70, 0xdd, 0x7c, 0xe4, 0x05, 0x00, 0x00, } @@ -291,6 +564,40 @@ const _ = grpc.SupportPackageIsVersion4 type RefTransactionClient interface { VoteTransaction(ctx context.Context, in *VoteTransactionRequest, opts ...grpc.CallOption) (*VoteTransactionResponse, error) StopTransaction(ctx context.Context, in *StopTransactionRequest, opts ...grpc.CallOption) (*StopTransactionResponse, error) + // RouteVote allows Praefect to dial to a remote Gitaly and request + // intercepting VoteTransaction and StopTransaction calls made for a specific + // route UUID. For example, given a route UUID of 5: + // + // ┌────────┐ ┌──────┐ ┌────┐ + // │Praefect│ │Gitaly│ │Hook│ + // └───┬────┘ └──┬───┘ └─┬──┘ + // │ RouteVote open route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 opened │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ │ VoteTransaction for 5 │ + // │ │ <──────────────────────── + // │ │ │ + // │ Forward VoteTransactionRequest for 5 │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ Forward VoteTransactionResponse for 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ │ Response │ + // │ │ ────────────────────────> + // │ │ │ + // │ RouteVote close route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 closed │ │ + // │ <──────────────────────────────────────── │ + // ┌───┴────┐ ┌──┴───┐ ┌─┴──┐ + // │Praefect│ │Gitaly│ │Hook│ + // └────────┘ └──────┘ └────┘ + RouteVote(ctx context.Context, opts ...grpc.CallOption) (RefTransaction_RouteVoteClient, error) } type refTransactionClient struct { @@ -319,10 +626,75 @@ func (c *refTransactionClient) StopTransaction(ctx context.Context, in *StopTran return out, nil } +func (c *refTransactionClient) RouteVote(ctx context.Context, opts ...grpc.CallOption) (RefTransaction_RouteVoteClient, error) { + stream, err := c.cc.NewStream(ctx, &_RefTransaction_serviceDesc.Streams[0], "/gitaly.RefTransaction/RouteVote", opts...) + if err != nil { + return nil, err + } + x := &refTransactionRouteVoteClient{stream} + return x, nil +} + +type RefTransaction_RouteVoteClient interface { + Send(*RouteVoteRequest) error + Recv() (*RouteVoteRequest, error) + grpc.ClientStream +} + +type refTransactionRouteVoteClient struct { + grpc.ClientStream +} + +func (x *refTransactionRouteVoteClient) Send(m *RouteVoteRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *refTransactionRouteVoteClient) Recv() (*RouteVoteRequest, error) { + m := new(RouteVoteRequest) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // RefTransactionServer is the server API for RefTransaction service. type RefTransactionServer interface { VoteTransaction(context.Context, *VoteTransactionRequest) (*VoteTransactionResponse, error) StopTransaction(context.Context, *StopTransactionRequest) (*StopTransactionResponse, error) + // RouteVote allows Praefect to dial to a remote Gitaly and request + // intercepting VoteTransaction and StopTransaction calls made for a specific + // route UUID. For example, given a route UUID of 5: + // + // ┌────────┐ ┌──────┐ ┌────┐ + // │Praefect│ │Gitaly│ │Hook│ + // └───┬────┘ └──┬───┘ └─┬──┘ + // │ RouteVote open route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 opened │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ │ VoteTransaction for 5 │ + // │ │ <──────────────────────── + // │ │ │ + // │ Forward VoteTransactionRequest for 5 │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ Forward VoteTransactionResponse for 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ │ Response │ + // │ │ ────────────────────────> + // │ │ │ + // │ RouteVote close route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 closed │ │ + // │ <──────────────────────────────────────── │ + // ┌───┴────┐ ┌──┴───┐ ┌─┴──┐ + // │Praefect│ │Gitaly│ │Hook│ + // └────────┘ └──────┘ └────┘ + RouteVote(RefTransaction_RouteVoteServer) error } // UnimplementedRefTransactionServer can be embedded to have forward compatible implementations. @@ -335,6 +707,9 @@ func (*UnimplementedRefTransactionServer) VoteTransaction(ctx context.Context, r func (*UnimplementedRefTransactionServer) StopTransaction(ctx context.Context, req *StopTransactionRequest) (*StopTransactionResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method StopTransaction not implemented") } +func (*UnimplementedRefTransactionServer) RouteVote(srv RefTransaction_RouteVoteServer) error { + return status.Errorf(codes.Unimplemented, "method RouteVote not implemented") +} func RegisterRefTransactionServer(s *grpc.Server, srv RefTransactionServer) { s.RegisterService(&_RefTransaction_serviceDesc, srv) @@ -376,6 +751,32 @@ func _RefTransaction_StopTransaction_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } +func _RefTransaction_RouteVote_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(RefTransactionServer).RouteVote(&refTransactionRouteVoteServer{stream}) +} + +type RefTransaction_RouteVoteServer interface { + Send(*RouteVoteRequest) error + Recv() (*RouteVoteRequest, error) + grpc.ServerStream +} + +type refTransactionRouteVoteServer struct { + grpc.ServerStream +} + +func (x *refTransactionRouteVoteServer) Send(m *RouteVoteRequest) error { + return x.ServerStream.SendMsg(m) +} + +func (x *refTransactionRouteVoteServer) Recv() (*RouteVoteRequest, error) { + m := new(RouteVoteRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + var _RefTransaction_serviceDesc = grpc.ServiceDesc{ ServiceName: "gitaly.RefTransaction", HandlerType: (*RefTransactionServer)(nil), @@ -389,6 +790,13 @@ var _RefTransaction_serviceDesc = grpc.ServiceDesc{ Handler: _RefTransaction_StopTransaction_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "RouteVote", + Handler: _RefTransaction_RouteVote_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "transaction.proto", } diff --git a/proto/transaction.proto b/proto/transaction.proto index fb11879c46fd714fcc84cd9cca99eb5f48be46ce..f8da506a233b7e5659e73e83a4270b7d5dd801e6 100644 --- a/proto/transaction.proto +++ b/proto/transaction.proto @@ -8,21 +8,43 @@ import "lint.proto"; import "shared.proto"; service RefTransaction { - - rpc VoteTransaction (VoteTransactionRequest) returns (VoteTransactionResponse) { - option (op_type) = { - op: MUTATOR - scope_level: REPOSITORY - }; - } - - rpc StopTransaction (StopTransactionRequest) returns (StopTransactionResponse) { - option (op_type) = { - op: MUTATOR - scope_level: REPOSITORY - }; - } - + option (intercepted) = true; + rpc VoteTransaction (VoteTransactionRequest) returns (VoteTransactionResponse); + rpc StopTransaction (StopTransactionRequest) returns (StopTransactionResponse); + // RouteVote allows Praefect to dial to a remote Gitaly and request + // intercepting VoteTransaction and StopTransaction calls made for a specific + // route UUID. For example, given a route UUID of 5: + // + // ┌────────┐ ┌──────┐ ┌────┐ + // │Praefect│ │Gitaly│ │Hook│ + // └───┬────┘ └──┬───┘ └─┬──┘ + // │ RouteVote open route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 opened │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ │ VoteTransaction for 5 │ + // │ │ <──────────────────────── + // │ │ │ + // │ Forward VoteTransactionRequest for 5 │ │ + // │ <──────────────────────────────────────── │ + // │ │ │ + // │ Forward VoteTransactionResponse for 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ │ Response │ + // │ │ ────────────────────────> + // │ │ │ + // │ RouteVote close route 5 │ │ + // │ ────────────────────────────────────────> │ + // │ │ │ + // │ Route 5 closed │ │ + // │ <──────────────────────────────────────── │ + // ┌───┴────┐ ┌──┴───┐ ┌─┴──┐ + // │Praefect│ │Gitaly│ │Hook│ + // └────────┘ └──────┘ └────┘ + rpc RouteVote (stream RouteVoteRequest) returns (stream RouteVoteRequest); } message VoteTransactionRequest { @@ -33,6 +55,9 @@ message VoteTransactionRequest { string node = 3; // SHA1 of the references that are to be updated bytes reference_updates_hash = 4; + // Route UUID is used when the transaction service on Gitaly is used to + // route messages back to Praefect clients + string route_uuid = 5; } message VoteTransactionResponse { @@ -51,6 +76,45 @@ message StopTransactionRequest { Repository repository = 1[(target_repository)=true]; // ID of the transaction we're processing uint64 transaction_id = 2; + // Route UUID is used when the transaction service on Gitaly is used to + // route messages back to Praefect clients + string route_uuid = 5; } message StopTransactionResponse {} + +// RouteVoteRequest wraps Gitaly messages so that they can +// be properly routed between Praefects. It also allows Praefect to manage +// transactions. +message RouteVoteRequest{ + // The route UUID allows Gitaly messages to be routed to the correct + // Praefect client + string route_uuid = 1; + + // OpenRoute is sent from Praefect to Gitaly to open a new route + // session. All transactions requests from Gitaly with the specified ID will + // route to the Praefect that opened the session. Only one route can be + // opened per stream. Closing the stream will also close the route. + message OpenRoute {} + + // Status is copy of google.rpc.Status, which represents errors in gRPC + message Status { + int32 code = 1; + string message = 2; + } + + oneof msg { + // These messages are used by Praefect to manage route sessions + OpenRoute open_route_request = 2; + + // error can be sent from Praefect to Gitaly as a response to + // any routed RPC + Status error = 3; + + // These messages are wrapped to allow Gitaly RPCs to be routed + VoteTransactionRequest vote_tx_request = 5; + VoteTransactionResponse vote_tx_response = 6; + StopTransactionRequest stop_tx_request = 7; + StopTransactionResponse stop_tx_response = 8; + } +} diff --git a/ruby/proto/gitaly/transaction_pb.rb b/ruby/proto/gitaly/transaction_pb.rb index 59c7f56265c99f6639ca44d3fc23df284ad24f22..f05a633ea2389e24acad5724e97b9fff8645453d 100644 --- a/ruby/proto/gitaly/transaction_pb.rb +++ b/ruby/proto/gitaly/transaction_pb.rb @@ -12,6 +12,7 @@ Google::Protobuf::DescriptorPool.generated_pool.build do optional :transaction_id, :uint64, 2 optional :node, :string, 3 optional :reference_updates_hash, :bytes, 4 + optional :route_uuid, :string, 5 end add_message "gitaly.VoteTransactionResponse" do optional :state, :enum, 1, "gitaly.VoteTransactionResponse.TransactionState" @@ -24,9 +25,27 @@ Google::Protobuf::DescriptorPool.generated_pool.build do add_message "gitaly.StopTransactionRequest" do optional :repository, :message, 1, "gitaly.Repository" optional :transaction_id, :uint64, 2 + optional :route_uuid, :string, 5 end add_message "gitaly.StopTransactionResponse" do end + add_message "gitaly.RouteVoteRequest" do + optional :route_uuid, :string, 1 + oneof :msg do + optional :open_route_request, :message, 2, "gitaly.RouteVoteRequest.OpenRoute" + optional :error, :message, 3, "gitaly.RouteVoteRequest.Status" + optional :vote_tx_request, :message, 5, "gitaly.VoteTransactionRequest" + optional :vote_tx_response, :message, 6, "gitaly.VoteTransactionResponse" + optional :stop_tx_request, :message, 7, "gitaly.StopTransactionRequest" + optional :stop_tx_response, :message, 8, "gitaly.StopTransactionResponse" + end + end + add_message "gitaly.RouteVoteRequest.OpenRoute" do + end + add_message "gitaly.RouteVoteRequest.Status" do + optional :code, :int32, 1 + optional :message, :string, 2 + end end end @@ -36,4 +55,7 @@ module Gitaly VoteTransactionResponse::TransactionState = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.VoteTransactionResponse.TransactionState").enummodule StopTransactionRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.StopTransactionRequest").msgclass StopTransactionResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.StopTransactionResponse").msgclass + RouteVoteRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RouteVoteRequest").msgclass + RouteVoteRequest::OpenRoute = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RouteVoteRequest.OpenRoute").msgclass + RouteVoteRequest::Status = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("gitaly.RouteVoteRequest.Status").msgclass end diff --git a/ruby/proto/gitaly/transaction_services_pb.rb b/ruby/proto/gitaly/transaction_services_pb.rb index 1749482460c255ced55632202936545a8d18f457..79eafe17bab44cc79531da68e5b720e5512b5f46 100644 --- a/ruby/proto/gitaly/transaction_services_pb.rb +++ b/ruby/proto/gitaly/transaction_services_pb.rb @@ -16,6 +16,40 @@ module Gitaly rpc :VoteTransaction, Gitaly::VoteTransactionRequest, Gitaly::VoteTransactionResponse rpc :StopTransaction, Gitaly::StopTransactionRequest, Gitaly::StopTransactionResponse + # RouteVote allows Praefect to dial to a remote Gitaly and request + # intercepting VoteTransaction and StopTransaction calls made for a specific + # route UUID. For example, given a route UUID of 5: + # + # ┌────────┐ ┌──────┐ ┌────┐ + # │Praefect│ │Gitaly│ │Hook│ + # └───┬────┘ └──┬───┘ └─┬──┘ + # │ RouteVote open route 5 │ │ + # │ ────────────────────────────────────────> │ + # │ │ │ + # │ Route 5 opened │ │ + # │ <──────────────────────────────────────── │ + # │ │ │ + # │ │ VoteTransaction for 5 │ + # │ │ <──────────────────────── + # │ │ │ + # │ Forward VoteTransactionRequest for 5 │ │ + # │ <──────────────────────────────────────── │ + # │ │ │ + # │ Forward VoteTransactionResponse for 5 │ │ + # │ ────────────────────────────────────────> │ + # │ │ │ + # │ │ Response │ + # │ │ ────────────────────────> + # │ │ │ + # │ RouteVote close route 5 │ │ + # │ ────────────────────────────────────────> │ + # │ │ │ + # │ Route 5 closed │ │ + # │ <──────────────────────────────────────── │ + # ┌───┴────┐ ┌──┴───┐ ┌─┴──┐ + # │Praefect│ │Gitaly│ │Hook│ + # └────────┘ └──────┘ └────┘ + rpc :RouteVote, stream(Gitaly::RouteVoteRequest), stream(Gitaly::RouteVoteRequest) end Stub = Service.rpc_stub_class