diff --git a/internal/git/localrepo/refs.go b/internal/git/localrepo/refs.go index 224bd407c585b5869e780deba5cb3eef623a24df..d88f95911ade385fadc63f30ffbc91f2928816a8 100644 --- a/internal/git/localrepo/refs.go +++ b/internal/git/localrepo/refs.go @@ -337,7 +337,7 @@ func (repo *Repo) GetDefaultBranch(ctx context.Context) (git.ReferenceName, erro return branches[0].Name, nil } - headReference, err := repo.headReference(ctx) + headReference, err := repo.HeadReference(ctx) if err != nil { return "", err } @@ -371,7 +371,8 @@ func (repo *Repo) GetDefaultBranch(ctx context.Context) (git.ReferenceName, erro return branches[0].Name, nil } -func (repo *Repo) headReference(ctx context.Context) (git.ReferenceName, error) { +// HeadReference returns the current value of HEAD. +func (repo *Repo) HeadReference(ctx context.Context) (git.ReferenceName, error) { cmd, err := repo.Exec(ctx, git.Command{ Name: "symbolic-ref", Args: []string{"HEAD"}, diff --git a/internal/git/localrepo/refs_test.go b/internal/git/localrepo/refs_test.go index da98d5cba22e53488d79371efadeaeebab305dfd..df9cb9bca092fc4c732469b3d335662553973d8a 100644 --- a/internal/git/localrepo/refs_test.go +++ b/internal/git/localrepo/refs_test.go @@ -545,6 +545,22 @@ func TestRepo_SetDefaultBranch(t *testing.T) { } } +func TestRepo_HeadReference(t *testing.T) { + ctx := testhelper.Context(t) + _, repo, _ := setupRepo(t) + + referenceName, err := repo.HeadReference(ctx) + require.NoError(t, err) + require.Equal(t, git.DefaultRef, referenceName) + + newDefaultBranch := git.ReferenceName("refs/heads/non-existent") + require.NoError(t, repo.SetDefaultBranch(ctx, &transaction.MockManager{}, newDefaultBranch)) + + referenceName, err = repo.HeadReference(ctx) + require.NoError(t, err) + require.Equal(t, newDefaultBranch, referenceName) +} + type blockingManager struct { ch chan struct{} } diff --git a/internal/git/localrepo/remote_extra_test.go b/internal/git/localrepo/remote_extra_test.go index 69772479c09d6ecc17d23c5686bf4066d294a1f7..5435abdc969be81c838d5752c64aabf9b9ae1a67 100644 --- a/internal/git/localrepo/remote_extra_test.go +++ b/internal/git/localrepo/remote_extra_test.go @@ -46,6 +46,7 @@ func TestRepo_FetchInternal(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, testserver.WithGitCommandFactory(protocolDetectingFactory)) diff --git a/internal/git/remoterepo/testhelper_test.go b/internal/git/remoterepo/testhelper_test.go index 9e964548728dd084ede779ccfe4538b67275cf7a..845a5fc6dc96d1a942bdfa375a3f0a74bef779df 100644 --- a/internal/git/remoterepo/testhelper_test.go +++ b/internal/git/remoterepo/testhelper_test.go @@ -32,6 +32,7 @@ func setupGitalyServer(t *testing.T) config.Cfg { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterCommitServiceServer(srv, commit.NewServer( deps.GetCfg(), diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go index 95650a405b01e4dfecc7085fedaebe5b06d3db98..a558dd9e4fd5f13fe2d5e3fb141e14aa46f7847c 100644 --- a/internal/gitaly/partition_manager.go +++ b/internal/gitaly/partition_manager.go @@ -15,6 +15,7 @@ import ( repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -27,6 +28,8 @@ var ErrPartitionManagerStopped = errors.New("partition manager stopped") type PartitionManager struct { // storages are the storages configured in this Gitaly server. The map is keyed by the storage name. storages map[string]*storageManager + // voteManager casts transactional votes to Praefect. + voteManager transaction.Manager } // storageManager represents a single storage. @@ -108,7 +111,7 @@ func (ptn *partition) stop() { } // NewPartitionManager returns a new PartitionManager. -func NewPartitionManager(configuredStorages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger) (*PartitionManager, error) { +func NewPartitionManager(configuredStorages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger, voteManager transaction.Manager) (*PartitionManager, error) { storages := make(map[string]*storageManager, len(configuredStorages)) for _, storage := range configuredStorages { repoFactory, err := localRepoFactory.ScopeByStorage(storage.Name) @@ -151,7 +154,10 @@ func NewPartitionManager(configuredStorages []config.Storage, localRepoFactory l } } - return &PartitionManager{storages: storages}, nil + return &PartitionManager{ + storages: storages, + voteManager: voteManager, + }, nil } func stagingDirectoryPath(storagePath string) string { @@ -191,7 +197,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran return nil, fmt.Errorf("create staging directory: %w", err) } - mgr := NewTransactionManager(storageMgr.database, storageMgr.path, relativePath, stagingDir, storageMgr.repoFactory, storageMgr.transactionFinalizerFactory(ptn)) + mgr := NewTransactionManager(storageMgr.database, storageMgr.path, relativePath, stagingDir, storageMgr.repoFactory, storageMgr.transactionFinalizerFactory(ptn), pm.voteManager) ptn.transactionManager = mgr storageMgr.partitions[relativePath] = ptn diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go index 183cf813f699229f8613a34efede48033008b47c..758ac740b9bd274edc8f1be3b183131658caae6d 100644 --- a/internal/gitaly/partition_manager_test.go +++ b/internal/gitaly/partition_manager_test.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -563,7 +564,7 @@ func TestPartitionManager(t *testing.T) { ) } - partitionManager, err := NewPartitionManager(cfg.Storages, localRepoFactory, logrus.StandardLogger()) + partitionManager, err := NewPartitionManager(cfg.Storages, localRepoFactory, logrus.StandardLogger(), &transaction.MockManager{}) require.NoError(t, err) defer func() { partitionManager.Stop() diff --git a/internal/gitaly/service/blob/testhelper_test.go b/internal/gitaly/service/blob/testhelper_test.go index eb34361fee8a14dc2ec664e38c8e04ba9aad5ce0..da0ee1673f6a818e45de14decc03cc9996cecd8e 100644 --- a/internal/gitaly/service/blob/testhelper_test.go +++ b/internal/gitaly/service/blob/testhelper_test.go @@ -38,6 +38,7 @@ func setup(tb testing.TB, ctx context.Context) (config.Cfg, gitalypb.BlobService deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }) cfg.SocketPath = addr diff --git a/internal/gitaly/service/cleanup/testhelper_test.go b/internal/gitaly/service/cleanup/testhelper_test.go index 69204c6c1f85ee624feb532bc4897db4c0b64b09..b198d8e24978f7ce81a97cc79f01951782344357 100644 --- a/internal/gitaly/service/cleanup/testhelper_test.go +++ b/internal/gitaly/service/cleanup/testhelper_test.go @@ -50,6 +50,7 @@ func runCleanupServiceServer(t *testing.T, cfg config.Cfg) string { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }) } diff --git a/internal/gitaly/service/commit/testhelper_test.go b/internal/gitaly/service/commit/testhelper_test.go index 9428bddd8b811a01c202aa23afa8b9b952ae5bc3..7e5f69f272acea965a3c5162ea754976a0c43e96 100644 --- a/internal/gitaly/service/commit/testhelper_test.go +++ b/internal/gitaly/service/commit/testhelper_test.go @@ -85,6 +85,7 @@ func startTestServices(tb testing.TB, cfg config.Cfg, opts ...testserver.GitalyS deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, opts...) } diff --git a/internal/gitaly/service/conflicts/testhelper_test.go b/internal/gitaly/service/conflicts/testhelper_test.go index 848a4b132c4bf0d599eb2789d0b28efa01748c63..730d64651ef83a20d9248ffe5ba46e7e94e10633 100644 --- a/internal/gitaly/service/conflicts/testhelper_test.go +++ b/internal/gitaly/service/conflicts/testhelper_test.go @@ -70,6 +70,7 @@ func runConflictsServer(tb testing.TB, cfg config.Cfg, hookManager hook.Manager) deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/service/diff/testhelper_test.go b/internal/gitaly/service/diff/testhelper_test.go index 234aae21b91d57c95efe2a309f9d92598e6fbcb2..894876641c8e03a8654ff3525ea49fb665190eae 100644 --- a/internal/gitaly/service/diff/testhelper_test.go +++ b/internal/gitaly/service/diff/testhelper_test.go @@ -49,6 +49,7 @@ func setupDiffServiceWithoutRepo(tb testing.TB, opt ...testserver.GitalyServerOp deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, opt...) cfg.SocketPath = addr diff --git a/internal/gitaly/service/hook/testhelper_test.go b/internal/gitaly/service/hook/testhelper_test.go index f6ab54d5a64bd0c266d3db23783c2579795427ca..0ae9e57348e73c487737a025b420f679facc6488 100644 --- a/internal/gitaly/service/hook/testhelper_test.go +++ b/internal/gitaly/service/hook/testhelper_test.go @@ -81,6 +81,7 @@ func runHooksServer(tb testing.TB, cfg config.Cfg, opts []serverOption, serverOp deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, serverOpts...) } diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go index 6e9a1133a8ae15c11532da322e5b62fbe1d3c471..26c77eef5b2b65f9668fc950e0ff169f19b1b72b 100644 --- a/internal/gitaly/service/objectpool/testhelper_test.go +++ b/internal/gitaly/service/objectpool/testhelper_test.go @@ -83,6 +83,7 @@ func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator, deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, append(opts, testserver.WithLocator(locator), testserver.WithLogger(logger))...) } diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go index 5549739e8bbb965b92160371f7b059a834e36071..7a03dc9f02c69175d09681dfac04a9b9da0969ce 100644 --- a/internal/gitaly/service/operations/testhelper_test.go +++ b/internal/gitaly/service/operations/testhelper_test.go @@ -112,6 +112,7 @@ func runOperationServiceServer(tb testing.TB, cfg config.Cfg, options ...testser deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterRefServiceServer(srv, ref.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/service/ref/delete_refs_test.go b/internal/gitaly/service/ref/delete_refs_test.go index 831c21663de63c9112980914c39247f90485941f..aa6141489475357a79aa153108cf4adc9613dfb9 100644 --- a/internal/gitaly/service/ref/delete_refs_test.go +++ b/internal/gitaly/service/ref/delete_refs_test.go @@ -118,6 +118,7 @@ func TestDeleteRefs_transaction(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) }, testserver.WithTransactionManager(txManager)) diff --git a/internal/gitaly/service/ref/testhelper_test.go b/internal/gitaly/service/ref/testhelper_test.go index c4d72d41870e16506a7ee21c90464951b82afb1b..31edcfeeed0ad53e3b34804883a5684ff5d05ce0 100644 --- a/internal/gitaly/service/ref/testhelper_test.go +++ b/internal/gitaly/service/ref/testhelper_test.go @@ -75,6 +75,7 @@ func runRefServiceServer(tb testing.TB, cfg config.Cfg) string { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }) } diff --git a/internal/gitaly/service/remote/testhelper_test.go b/internal/gitaly/service/remote/testhelper_test.go index d1c03eeca4278a9dace287f0c74c09f9bb94309b..e132da125afe96f994e3bd2856a565a428eb20c8 100644 --- a/internal/gitaly/service/remote/testhelper_test.go +++ b/internal/gitaly/service/remote/testhelper_test.go @@ -41,6 +41,7 @@ func setupRemoteService(t *testing.T, ctx context.Context, opts ...testserver.Gi deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, opts...) cfg.SocketPath = addr diff --git a/internal/gitaly/service/remote/update_remote_mirror_test.go b/internal/gitaly/service/remote/update_remote_mirror_test.go index 897f539858b9c9eaa8a4c69e3268df82b1dd3f11..7eada23e162bea225c74fe7e986313e537197ba0 100644 --- a/internal/gitaly/service/remote/update_remote_mirror_test.go +++ b/internal/gitaly/service/remote/update_remote_mirror_test.go @@ -589,6 +589,7 @@ func TestUpdateRemoteMirror(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }) cfg.SocketPath = addr diff --git a/internal/gitaly/service/repository/server.go b/internal/gitaly/service/repository/server.go index 6b3f8821df9111a0580be47941779805134987dd..3f5176832d3cc9d0e2f9482ab872990ab67dd036 100644 --- a/internal/gitaly/service/repository/server.go +++ b/internal/gitaly/service/repository/server.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" @@ -30,6 +31,7 @@ type server struct { catfileCache catfile.Cache git2goExecutor *git2go.Executor housekeepingManager housekeeping.Manager + partitionManager *gitaly.PartitionManager licenseCache *unarycache.Cache[git.ObjectID, *gitalypb.FindLicenseResponse] } @@ -44,6 +46,7 @@ func NewServer( connsPool *client.Pool, git2goExecutor *git2go.Executor, housekeepingManager housekeeping.Manager, + partitionManager *gitaly.PartitionManager, ) gitalypb.RepositoryServiceServer { return &server{ locator: locator, @@ -55,6 +58,7 @@ func NewServer( catfileCache: catfileCache, git2goExecutor: git2goExecutor, housekeepingManager: housekeepingManager, + partitionManager: partitionManager, licenseCache: newLicenseCache(), } diff --git a/internal/gitaly/service/repository/testhelper_test.go b/internal/gitaly/service/repository/testhelper_test.go index a114a9b917002ad8e82a1a10515dccd9c780efdf..cb7d2f0957c91f2801987ebb15edd680c7f5a55c 100644 --- a/internal/gitaly/service/repository/testhelper_test.go +++ b/internal/gitaly/service/repository/testhelper_test.go @@ -66,6 +66,7 @@ func runRepositoryService(tb testing.TB, cfg config.Cfg, opts ...testserver.Gita deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterRemoteServiceServer(srv, remote.NewServer( diff --git a/internal/gitaly/service/repository/write_ref.go b/internal/gitaly/service/repository/write_ref.go index eeb32561c8bb9eac75638b2611017d4e065f78e2..b37e8d1a393b09a434f79fdd3b22eed8da5d3c67 100644 --- a/internal/gitaly/service/repository/write_ref.go +++ b/internal/gitaly/service/repository/write_ref.go @@ -8,7 +8,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/txutil" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -17,8 +19,15 @@ func (s *server) WriteRef(ctx context.Context, req *gitalypb.WriteRefRequest) (* if err := validateWriteRefRequest(req); err != nil { return nil, structerr.NewInvalidArgument("%w", err) } - if err := s.writeRef(ctx, req); err != nil { - return nil, structerr.NewInternal("%w", err) + + if s.partitionManager != nil { + if err := s.writeRefWAL(ctx, req); err != nil { + return nil, err + } + } else { + if err := s.writeRef(ctx, req); err != nil { + return nil, structerr.NewInternal("%w", err) + } } return &gitalypb.WriteRefResponse{}, nil @@ -38,7 +47,30 @@ func (s *server) writeRef(ctx context.Context, req *gitalypb.WriteRefRequest) er return updateRef(ctx, repo, req) } -func updateRef(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRefRequest) (returnedErr error) { +func (s *server) writeRefWAL(ctx context.Context, req *gitalypb.WriteRefRequest) error { + tx, err := s.partitionManager.Begin(ctx, req.GetRepository()) + if err != nil { + return fmt.Errorf("begin: %w", err) + } + defer txutil.LogRollback(ctx, tx) + + if string(req.Ref) == "HEAD" { + tx.SetDefaultBranch(git.ReferenceName(req.GetRevision())) + } else { + oldOID, newOID, err := resolveObjectIDs(ctx, s.localrepo(req.GetRepository()), req) + if err != nil { + return err + } + + tx.UpdateReferences(gitaly.ReferenceUpdates{ + git.ReferenceName(req.GetRef()): {Force: oldOID == "", OldOID: oldOID, NewOID: newOID}, + }) + } + + return tx.Commit(ctx) +} + +func resolveObjectIDs(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRefRequest) (git.ObjectID, git.ObjectID, error) { var newObjectID git.ObjectID if git.ObjectHashSHA1.IsZeroOID(git.ObjectID(req.GetRevision())) { // Passing the all-zeroes object ID as new value means that we should delete the @@ -52,7 +84,7 @@ func updateRef(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRef var err error newObjectID, err = repo.ResolveRevision(ctx, git.Revision(req.GetRevision())+"^{object}") if err != nil { - return fmt.Errorf("resolving new revision: %w", err) + return "", "", fmt.Errorf("resolving new revision: %w", err) } } @@ -66,11 +98,20 @@ func updateRef(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRef var err error oldObjectID, err = repo.ResolveRevision(ctx, git.Revision(req.GetOldRevision())+"^{object}") if err != nil { - return fmt.Errorf("resolving old revision: %w", err) + return "", "", fmt.Errorf("resolving old revision: %w", err) } } } + return oldObjectID, newObjectID, nil +} + +func updateRef(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRefRequest) (returnedErr error) { + oldObjectID, newObjectID, err := resolveObjectIDs(ctx, repo, req) + if err != nil { + return err + } + u, err := updateref.New(ctx, repo) if err != nil { return fmt.Errorf("error when running creating new updater: %w", err) diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 50f0482c19c64de2e63a7c6aa08a3453c01a81db..93e6cb7749fa468dc9bbdfe3cbb4b236d6f523f1 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -99,6 +99,7 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go index 124ff76fcc5b2128bdb6d37d9a6b623ca55437d1..b1b31b45172c963bcfd5b50344184f553dae2228 100644 --- a/internal/gitaly/service/smarthttp/testhelper_test.go +++ b/internal/gitaly/service/smarthttp/testhelper_test.go @@ -46,6 +46,7 @@ func startSmartHTTPServerWithOptions(t *testing.T, cfg config.Cfg, opts []Server deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterObjectPoolServiceServer(srv, objectpool.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/service/ssh/testhelper_test.go b/internal/gitaly/service/ssh/testhelper_test.go index c91b4a991909a09b6fc75afeaba9670cdce0c3fa..4c26bf62643a8b14ae50765557892d54e23c4bfa 100644 --- a/internal/gitaly/service/ssh/testhelper_test.go +++ b/internal/gitaly/service/ssh/testhelper_test.go @@ -51,6 +51,7 @@ func startSSHServerWithOptions(t *testing.T, cfg config.Cfg, opts []ServerOpt, s deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterObjectPoolServiceServer(srv, objectpool.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 4e2060dcf8b7a5084568335b76aeb8e7551717ef..4573f12b3165ff7fbe4226bdd15f4e112daba6a6 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -25,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" @@ -179,6 +180,10 @@ type Transaction struct { defaultBranchUpdate *DefaultBranchUpdate customHooksUpdate *CustomHooksUpdate deleteRepository bool + // requestContext is the context from goroutine that begun the transaction. This is the context that + // contains Praefect's transaction information if it exists. It's only capture the transaction + // information into the TransactionManager and should not be used for anything else. + requestContext context.Context } // Begin opens a new transaction. The caller must call either Commit or Rollback to release @@ -212,7 +217,8 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, retur CustomHookIndex: mgr.customHookIndex, CustomHookPath: customHookPathForLogIndex(mgr.repositoryPath, mgr.customHookIndex), }, - finished: make(chan struct{}), + finished: make(chan struct{}), + requestContext: ctx, } // If there are no custom hooks stored through the WAL yet, then default to the custom hooks @@ -473,6 +479,9 @@ type TransactionManager struct { // the repository. It's keyed by the log index the transaction is waiting to be applied and the // value is the resultChannel that is waiting the result. awaitingTransactions map[LogIndex]resultChannel + + // voteManager casts transactional votes to Praefect. + voteManager transaction.Manager } // repository is the localrepo interface used by TransactionManager. @@ -484,10 +493,11 @@ type repository interface { Quarantine(string) (*localrepo.Repo, error) WalkUnreachableObjects(context.Context, io.Reader, io.Writer) error PackObjects(context.Context, io.Reader, io.Writer) error + HeadReference(context.Context) (git.ReferenceName, error) } // NewTransactionManager returns a new TransactionManager for the given repository. -func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir string, repositoryFactory localrepo.StorageScopedFactory, transactionFinalizer func()) *TransactionManager { +func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir string, repositoryFactory localrepo.StorageScopedFactory, transactionFinalizer func(), voteManager transaction.Manager) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) return &TransactionManager{ ctx: ctx, @@ -505,6 +515,7 @@ func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir stagingDirectory: stagingDir, transactionFinalizer: transactionFinalizer, awaitingTransactions: make(map[LogIndex]resultChannel), + voteManager: voteManager, } } @@ -684,7 +695,7 @@ func (mgr *TransactionManager) Run() (returnedErr error) { continue } - if err := mgr.processTransaction(); err != nil { + if err := mgr.processTransaction(mgr.ctx); err != nil { return fmt.Errorf("process transaction: %w", err) } } @@ -692,7 +703,7 @@ func (mgr *TransactionManager) Run() (returnedErr error) { // processTransaction waits for a transaction and processes it by verifying and // logging it. -func (mgr *TransactionManager) processTransaction() (returnedErr error) { +func (mgr *TransactionManager) processTransaction(ctx context.Context) (returnedErr error) { var cleanUps []func() error defer func() { for _, cleanUp := range cleanUps { @@ -774,7 +785,19 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} } - return mgr.appendLogEntry(nextLogIndex, logEntry) + if err := mgr.voteTransaction(ctx, voting.Prepared, transaction, logEntry); err != nil { + return fmt.Errorf("vote prepared: %w", err) + } + + if err := mgr.appendLogEntry(nextLogIndex, logEntry); err != nil { + return fmt.Errorf("append log entry: %w", err) + } + + if err := mgr.voteTransaction(ctx, voting.Committed, transaction, logEntry); err != nil { + return fmt.Errorf("vote committed: %w", err) + } + + return nil }(); err != nil { transaction.result <- err return nil @@ -785,6 +808,62 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { return nil } +// voteTransaction constructs a vote from the changes and casts the vote to Praefect if the request context +// contained a Prafect transaction. The voting is not required by the TransactionManager itself and is purely +// for backwards compatibility with Praefect's transactions when WAL is enabled. +func (mgr *TransactionManager) voteTransaction(ctx context.Context, phase voting.Phase, tx *Transaction, logEntry *gitalypb.LogEntry) error { + hasher := voting.NewVoteHash() + if tx.defaultBranchUpdate != nil { + fmt.Fprintf(hasher, "ref: %s\n", tx.defaultBranchUpdate.Reference) + } + + head, err := tx.stagingRepository.HeadReference(ctx) + if err != nil { + return fmt.Errorf("head reference: %w", err) + } + + for _, loggedUpdate := range logEntry.ReferenceUpdates { + referenceName := git.ReferenceName(loggedUpdate.ReferenceName) + // The logged reference updates have been sorted but don't contain the old OIDs. Use the order + // in the log entry to access the updates so the voting happens in a deterministic order. + update := tx.referenceUpdates[referenceName] + + oldOID := update.OldOID + if update.Force { + // Force updates have a zero OID as the old value in the reference transaction hook. + // Model the same behavior so the votes match. + objectHash, err := mgr.repository.ObjectHash(ctx) + if err != nil { + return fmt.Errorf("object hash: %w", err) + } + + oldOID = objectHash.ZeroOID + } + + fmt.Fprintf(hasher, "%s %s %s\n", oldOID, update.NewOID, referenceName) + if referenceName == head { + // When the reference pointed to by HEAD is updated, the reference transaction hook + // also prints a line updating HEAD. Print out a line for the HEAD here as well to ensure + // the vote matches what the reference transaction hook would produce. + // + // This is seemingly a bug as the reference transaction hook has been documented to + // not cover symbolic references: https://git-scm.com/docs/githooks#_reference_transaction + fmt.Fprintf(hasher, "%s %s %s\n", oldOID, update.NewOID, "HEAD") + } + } + + vote, err := hasher.Vote() + if err != nil { + return fmt.Errorf("vote from hash: %w", err) + } + + if err := transaction.VoteOnContext(tx.requestContext, mgr.voteManager, vote, phase); err != nil { + return fmt.Errorf("vote on context: %w", err) + } + + return nil +} + // Stop stops the transaction processing causing Run to return. func (mgr *TransactionManager) Stop() { mgr.stop() } @@ -1078,7 +1157,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction }) } - // Sort the reference updates so the reference changes are always logged in a deterministic order. + // Sort the reference updates so the reference changes are always logged and voted in a deterministic order. sort.Slice(referenceUpdates, func(i, j int) bool { return bytes.Compare( referenceUpdates[i].ReferenceName, diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index 92eb248d3be87365a0fc84f5777bf3173872b747..5881f18c836f79a92e153076be0058d95d570e6b 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -25,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -2855,7 +2856,7 @@ func TestTransactionManager(t *testing.T) { // managerRunning tracks whether the manager is running or stopped. managerRunning bool // transactionManager is the current TransactionManager instance. - transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer, &transaction.MockManager{}) // managerErr is used for synchronizing manager stopping and returning // the error from Run. managerErr chan error @@ -2896,7 +2897,7 @@ func TestTransactionManager(t *testing.T) { managerRunning = true managerErr = make(chan error) - transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer, &transaction.MockManager{}) installHooks(t, transactionManager, database, hooks{ beforeReadLogEntry: step.Hooks.BeforeApplyLogEntry, beforeResolveRevision: step.Hooks.BeforeAppendLogEntry, @@ -3238,7 +3239,7 @@ func BenchmarkTransactionManager(b *testing.B) { commit1 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents()) commit2 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents(commit1)) - manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), repositoryFactory, noopTransactionFinalizer) + manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), repositoryFactory, noopTransactionFinalizer, &transaction.MockManager{}) managers = append(managers, manager) managerWG.Add(1) diff --git a/internal/gitaly/txutil/rollback.go b/internal/gitaly/txutil/rollback.go new file mode 100644 index 0000000000000000000000000000000000000000..6ee58715c8104366b04ad384673844a7077d1bf8 --- /dev/null +++ b/internal/gitaly/txutil/rollback.go @@ -0,0 +1,14 @@ +package txutil + +import ( + "context" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" +) + +// LogRollback rolls back the transaction and logs any possible error. +func LogRollback(ctx context.Context, tx interface{ Rollback() error }) { + if err := tx.Rollback(); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("failed rolling back transaction") + } +} diff --git a/internal/gitaly/txutil/rollback_test.go b/internal/gitaly/txutil/rollback_test.go new file mode 100644 index 0000000000000000000000000000000000000000..52e64c556199fd002811d20b941c97589135590c --- /dev/null +++ b/internal/gitaly/txutil/rollback_test.go @@ -0,0 +1,40 @@ +package txutil + +import ( + "errors" + "testing" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +type rollbackerFunc func() error + +func (fn rollbackerFunc) Rollback() error { return fn() } + +func TestLogRollback(t *testing.T) { + t.Run("no error", func(t *testing.T) { + logger, hook := test.NewNullLogger() + ctx := ctxlogrus.ToContext(testhelper.Context(t), logrus.NewEntry(logger)) + + LogRollback(ctx, rollbackerFunc(func() error { return nil })) + + require.Empty(t, hook.AllEntries()) + }) + + t.Run("error", func(t *testing.T) { + logger, hook := test.NewNullLogger() + ctx := ctxlogrus.ToContext(testhelper.Context(t), logrus.NewEntry(logger)) + + expectedErr := errors.New("expected error") + LogRollback(ctx, rollbackerFunc(func() error { return expectedErr })) + + require.Len(t, hook.AllEntries(), 1) + require.Equal(t, "failed rolling back transaction", hook.LastEntry().Message) + require.Equal(t, logrus.Fields{logrus.ErrorKey: expectedErr}, hook.LastEntry().Data) + require.Equal(t, logrus.ErrorLevel, hook.LastEntry().Level) + }) +} diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index e1f989149914fafa527ec25b4c803b9da6923259..1838b5610e3c080c320da7217641577bf1785fc7 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -47,6 +47,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, testserver.WithDisablePraefect()) cfgNodes = append(cfgNodes, &config.Node{ diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go index f7938c34a35c60546b8e8658a239f840ba16c423..478c6b21087c783ba1e161c54b18c6840ac2d134 100644 --- a/internal/praefect/verifier_test.go +++ b/internal/praefect/verifier_test.go @@ -486,6 +486,7 @@ func TestVerifier(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )}) } } diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 1447447c0a096ef46e668aa31f52b4d0611089c4..feca491bde308ee64c7427c919834fc1cc3bc265 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -356,6 +356,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * cfg.Storages, localrepo.NewFactory(gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), gsd.logger, + gsd.txMgr, ) require.NoError(tb, err) tb.Cleanup(partitionManager.Stop)