From cda429728f35a067c03a948cab09459db0eb59b2 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Sat, 20 May 2023 12:19:09 +0300 Subject: [PATCH 1/5] Add a helper for logging rollback errors Rolling back a transaction may return an error. This commit adds a helper function that rolls back the transaction and logs any possible error returned. This makes integrating the WAL more convenient as one can simply defer this after beginning a transaction to roll it back in case of errors and log the error. --- internal/gitaly/txutil/rollback.go | 14 +++++++++ internal/gitaly/txutil/rollback_test.go | 40 +++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 internal/gitaly/txutil/rollback.go create mode 100644 internal/gitaly/txutil/rollback_test.go diff --git a/internal/gitaly/txutil/rollback.go b/internal/gitaly/txutil/rollback.go new file mode 100644 index 0000000000..6ee58715c8 --- /dev/null +++ b/internal/gitaly/txutil/rollback.go @@ -0,0 +1,14 @@ +package txutil + +import ( + "context" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" +) + +// LogRollback rolls back the transaction and logs any possible error. +func LogRollback(ctx context.Context, tx interface{ Rollback() error }) { + if err := tx.Rollback(); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("failed rolling back transaction") + } +} diff --git a/internal/gitaly/txutil/rollback_test.go b/internal/gitaly/txutil/rollback_test.go new file mode 100644 index 0000000000..52e64c5561 --- /dev/null +++ b/internal/gitaly/txutil/rollback_test.go @@ -0,0 +1,40 @@ +package txutil + +import ( + "errors" + "testing" + + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +type rollbackerFunc func() error + +func (fn rollbackerFunc) Rollback() error { return fn() } + +func TestLogRollback(t *testing.T) { + t.Run("no error", func(t *testing.T) { + logger, hook := test.NewNullLogger() + ctx := ctxlogrus.ToContext(testhelper.Context(t), logrus.NewEntry(logger)) + + LogRollback(ctx, rollbackerFunc(func() error { return nil })) + + require.Empty(t, hook.AllEntries()) + }) + + t.Run("error", func(t *testing.T) { + logger, hook := test.NewNullLogger() + ctx := ctxlogrus.ToContext(testhelper.Context(t), logrus.NewEntry(logger)) + + expectedErr := errors.New("expected error") + LogRollback(ctx, rollbackerFunc(func() error { return expectedErr })) + + require.Len(t, hook.AllEntries(), 1) + require.Equal(t, "failed rolling back transaction", hook.LastEntry().Message) + require.Equal(t, logrus.Fields{logrus.ErrorKey: expectedErr}, hook.LastEntry().Data) + require.Equal(t, logrus.ErrorLevel, hook.LastEntry().Level) + }) +} -- GitLab From 4368188e8f9b79d8c06cad99502eca30593be558 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Fri, 19 May 2023 12:18:39 +0300 Subject: [PATCH 2/5] Export HeadReference from localrepo localrepo currently exports GetDefaultBranch for getting the repository's default branch. This is generally the value of HEAD. GetDefaultBranch also has some legacy logic and returns a different value in some cases. In cases where one really needs to have the actual value of HEAD, let's export the HeadReference method to do so. This will eventually be used in the TransactionManager to determine the value of repository's HEAD. --- internal/git/localrepo/refs.go | 5 +++-- internal/git/localrepo/refs_test.go | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/internal/git/localrepo/refs.go b/internal/git/localrepo/refs.go index 224bd407c5..d88f95911a 100644 --- a/internal/git/localrepo/refs.go +++ b/internal/git/localrepo/refs.go @@ -337,7 +337,7 @@ func (repo *Repo) GetDefaultBranch(ctx context.Context) (git.ReferenceName, erro return branches[0].Name, nil } - headReference, err := repo.headReference(ctx) + headReference, err := repo.HeadReference(ctx) if err != nil { return "", err } @@ -371,7 +371,8 @@ func (repo *Repo) GetDefaultBranch(ctx context.Context) (git.ReferenceName, erro return branches[0].Name, nil } -func (repo *Repo) headReference(ctx context.Context) (git.ReferenceName, error) { +// HeadReference returns the current value of HEAD. +func (repo *Repo) HeadReference(ctx context.Context) (git.ReferenceName, error) { cmd, err := repo.Exec(ctx, git.Command{ Name: "symbolic-ref", Args: []string{"HEAD"}, diff --git a/internal/git/localrepo/refs_test.go b/internal/git/localrepo/refs_test.go index da98d5cba2..df9cb9bca0 100644 --- a/internal/git/localrepo/refs_test.go +++ b/internal/git/localrepo/refs_test.go @@ -545,6 +545,22 @@ func TestRepo_SetDefaultBranch(t *testing.T) { } } +func TestRepo_HeadReference(t *testing.T) { + ctx := testhelper.Context(t) + _, repo, _ := setupRepo(t) + + referenceName, err := repo.HeadReference(ctx) + require.NoError(t, err) + require.Equal(t, git.DefaultRef, referenceName) + + newDefaultBranch := git.ReferenceName("refs/heads/non-existent") + require.NoError(t, repo.SetDefaultBranch(ctx, &transaction.MockManager{}, newDefaultBranch)) + + referenceName, err = repo.HeadReference(ctx) + require.NoError(t, err) + require.Equal(t, newDefaultBranch, referenceName) +} + type blockingManager struct { ch chan struct{} } -- GitLab From 32a7138d4b32b722c75637c40e297abd03c34f40 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Fri, 19 May 2023 12:24:38 +0300 Subject: [PATCH 3/5] Implement reference update voting in TransactionManager Praefect's transactions are currently relying on Git's reference-transaction hook to vote on the changes. The hook verifies the references and locks them before voting. When WAL is enabled, this needs to be handled in the TransactionManager instead as Git's reference transaction hook is not part of the write flow anymore. This commit implements voting for reference updates in TransactionManager. The semantics stay the same with the references verified prior to voting and concurrent operations prevented. There is no explicit locking though as the synchronization relies on the TransactionManager processing only a single transaction at a time. This leads to coarser locking. Where as there could previously be multiple transactions voting concurrently on disjoint reference updates, with the current implementation in TransactionManager there can only be one transaction voting at a time. This leads to worse performance but should behaviorally be correct. We can later improve the performance with more fine grained locking semantics. The focus right now is having a functional implementation that can be integrated and tested. The voting logic performs the vote on the whole transaction which could produce votes that the previous voting logic wouldn't, say if references are updated in the same transaction as the default branch. In practice this doesn't matter, as none of the RPC handlers would commit such a transaction. The voting logic is not tested in separately in the TransactionManager's tests. Most importantly we should test that the votes produced by the RPCs are the same with the WAL logic and non-WAL logic. The votes themselves are just opaque hashes, so we can't really say much about whether they are correct or not in isolation. We'll thus test the logic only through integration in the RPC handlers. --- internal/gitaly/partition_manager.go | 12 ++- internal/gitaly/partition_manager_test.go | 3 +- internal/gitaly/transaction_manager.go | 91 +++++++++++++++++++-- internal/gitaly/transaction_manager_test.go | 7 +- internal/testhelper/testserver/gitaly.go | 1 + 5 files changed, 101 insertions(+), 13 deletions(-) diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go index 95650a405b..a558dd9e4f 100644 --- a/internal/gitaly/partition_manager.go +++ b/internal/gitaly/partition_manager.go @@ -15,6 +15,7 @@ import ( repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -27,6 +28,8 @@ var ErrPartitionManagerStopped = errors.New("partition manager stopped") type PartitionManager struct { // storages are the storages configured in this Gitaly server. The map is keyed by the storage name. storages map[string]*storageManager + // voteManager casts transactional votes to Praefect. + voteManager transaction.Manager } // storageManager represents a single storage. @@ -108,7 +111,7 @@ func (ptn *partition) stop() { } // NewPartitionManager returns a new PartitionManager. -func NewPartitionManager(configuredStorages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger) (*PartitionManager, error) { +func NewPartitionManager(configuredStorages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger, voteManager transaction.Manager) (*PartitionManager, error) { storages := make(map[string]*storageManager, len(configuredStorages)) for _, storage := range configuredStorages { repoFactory, err := localRepoFactory.ScopeByStorage(storage.Name) @@ -151,7 +154,10 @@ func NewPartitionManager(configuredStorages []config.Storage, localRepoFactory l } } - return &PartitionManager{storages: storages}, nil + return &PartitionManager{ + storages: storages, + voteManager: voteManager, + }, nil } func stagingDirectoryPath(storagePath string) string { @@ -191,7 +197,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran return nil, fmt.Errorf("create staging directory: %w", err) } - mgr := NewTransactionManager(storageMgr.database, storageMgr.path, relativePath, stagingDir, storageMgr.repoFactory, storageMgr.transactionFinalizerFactory(ptn)) + mgr := NewTransactionManager(storageMgr.database, storageMgr.path, relativePath, stagingDir, storageMgr.repoFactory, storageMgr.transactionFinalizerFactory(ptn), pm.voteManager) ptn.transactionManager = mgr storageMgr.partitions[relativePath] = ptn diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go index 183cf813f6..758ac740b9 100644 --- a/internal/gitaly/partition_manager_test.go +++ b/internal/gitaly/partition_manager_test.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -563,7 +564,7 @@ func TestPartitionManager(t *testing.T) { ) } - partitionManager, err := NewPartitionManager(cfg.Storages, localRepoFactory, logrus.StandardLogger()) + partitionManager, err := NewPartitionManager(cfg.Storages, localRepoFactory, logrus.StandardLogger(), &transaction.MockManager{}) require.NoError(t, err) defer func() { partitionManager.Stop() diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 4e2060dcf8..4573f12b31 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -25,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/transaction/voting" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" @@ -179,6 +180,10 @@ type Transaction struct { defaultBranchUpdate *DefaultBranchUpdate customHooksUpdate *CustomHooksUpdate deleteRepository bool + // requestContext is the context from goroutine that begun the transaction. This is the context that + // contains Praefect's transaction information if it exists. It's only capture the transaction + // information into the TransactionManager and should not be used for anything else. + requestContext context.Context } // Begin opens a new transaction. The caller must call either Commit or Rollback to release @@ -212,7 +217,8 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, retur CustomHookIndex: mgr.customHookIndex, CustomHookPath: customHookPathForLogIndex(mgr.repositoryPath, mgr.customHookIndex), }, - finished: make(chan struct{}), + finished: make(chan struct{}), + requestContext: ctx, } // If there are no custom hooks stored through the WAL yet, then default to the custom hooks @@ -473,6 +479,9 @@ type TransactionManager struct { // the repository. It's keyed by the log index the transaction is waiting to be applied and the // value is the resultChannel that is waiting the result. awaitingTransactions map[LogIndex]resultChannel + + // voteManager casts transactional votes to Praefect. + voteManager transaction.Manager } // repository is the localrepo interface used by TransactionManager. @@ -484,10 +493,11 @@ type repository interface { Quarantine(string) (*localrepo.Repo, error) WalkUnreachableObjects(context.Context, io.Reader, io.Writer) error PackObjects(context.Context, io.Reader, io.Writer) error + HeadReference(context.Context) (git.ReferenceName, error) } // NewTransactionManager returns a new TransactionManager for the given repository. -func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir string, repositoryFactory localrepo.StorageScopedFactory, transactionFinalizer func()) *TransactionManager { +func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir string, repositoryFactory localrepo.StorageScopedFactory, transactionFinalizer func(), voteManager transaction.Manager) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) return &TransactionManager{ ctx: ctx, @@ -505,6 +515,7 @@ func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir stagingDirectory: stagingDir, transactionFinalizer: transactionFinalizer, awaitingTransactions: make(map[LogIndex]resultChannel), + voteManager: voteManager, } } @@ -684,7 +695,7 @@ func (mgr *TransactionManager) Run() (returnedErr error) { continue } - if err := mgr.processTransaction(); err != nil { + if err := mgr.processTransaction(mgr.ctx); err != nil { return fmt.Errorf("process transaction: %w", err) } } @@ -692,7 +703,7 @@ func (mgr *TransactionManager) Run() (returnedErr error) { // processTransaction waits for a transaction and processes it by verifying and // logging it. -func (mgr *TransactionManager) processTransaction() (returnedErr error) { +func (mgr *TransactionManager) processTransaction(ctx context.Context) (returnedErr error) { var cleanUps []func() error defer func() { for _, cleanUp := range cleanUps { @@ -774,7 +785,19 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} } - return mgr.appendLogEntry(nextLogIndex, logEntry) + if err := mgr.voteTransaction(ctx, voting.Prepared, transaction, logEntry); err != nil { + return fmt.Errorf("vote prepared: %w", err) + } + + if err := mgr.appendLogEntry(nextLogIndex, logEntry); err != nil { + return fmt.Errorf("append log entry: %w", err) + } + + if err := mgr.voteTransaction(ctx, voting.Committed, transaction, logEntry); err != nil { + return fmt.Errorf("vote committed: %w", err) + } + + return nil }(); err != nil { transaction.result <- err return nil @@ -785,6 +808,62 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { return nil } +// voteTransaction constructs a vote from the changes and casts the vote to Praefect if the request context +// contained a Prafect transaction. The voting is not required by the TransactionManager itself and is purely +// for backwards compatibility with Praefect's transactions when WAL is enabled. +func (mgr *TransactionManager) voteTransaction(ctx context.Context, phase voting.Phase, tx *Transaction, logEntry *gitalypb.LogEntry) error { + hasher := voting.NewVoteHash() + if tx.defaultBranchUpdate != nil { + fmt.Fprintf(hasher, "ref: %s\n", tx.defaultBranchUpdate.Reference) + } + + head, err := tx.stagingRepository.HeadReference(ctx) + if err != nil { + return fmt.Errorf("head reference: %w", err) + } + + for _, loggedUpdate := range logEntry.ReferenceUpdates { + referenceName := git.ReferenceName(loggedUpdate.ReferenceName) + // The logged reference updates have been sorted but don't contain the old OIDs. Use the order + // in the log entry to access the updates so the voting happens in a deterministic order. + update := tx.referenceUpdates[referenceName] + + oldOID := update.OldOID + if update.Force { + // Force updates have a zero OID as the old value in the reference transaction hook. + // Model the same behavior so the votes match. + objectHash, err := mgr.repository.ObjectHash(ctx) + if err != nil { + return fmt.Errorf("object hash: %w", err) + } + + oldOID = objectHash.ZeroOID + } + + fmt.Fprintf(hasher, "%s %s %s\n", oldOID, update.NewOID, referenceName) + if referenceName == head { + // When the reference pointed to by HEAD is updated, the reference transaction hook + // also prints a line updating HEAD. Print out a line for the HEAD here as well to ensure + // the vote matches what the reference transaction hook would produce. + // + // This is seemingly a bug as the reference transaction hook has been documented to + // not cover symbolic references: https://git-scm.com/docs/githooks#_reference_transaction + fmt.Fprintf(hasher, "%s %s %s\n", oldOID, update.NewOID, "HEAD") + } + } + + vote, err := hasher.Vote() + if err != nil { + return fmt.Errorf("vote from hash: %w", err) + } + + if err := transaction.VoteOnContext(tx.requestContext, mgr.voteManager, vote, phase); err != nil { + return fmt.Errorf("vote on context: %w", err) + } + + return nil +} + // Stop stops the transaction processing causing Run to return. func (mgr *TransactionManager) Stop() { mgr.stop() } @@ -1078,7 +1157,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction }) } - // Sort the reference updates so the reference changes are always logged in a deterministic order. + // Sort the reference updates so the reference changes are always logged and voted in a deterministic order. sort.Slice(referenceUpdates, func(i, j int) bool { return bytes.Compare( referenceUpdates[i].ReferenceName, diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index 92eb248d3b..5881f18c83 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -25,6 +25,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -2855,7 +2856,7 @@ func TestTransactionManager(t *testing.T) { // managerRunning tracks whether the manager is running or stopped. managerRunning bool // transactionManager is the current TransactionManager instance. - transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer, &transaction.MockManager{}) // managerErr is used for synchronizing manager stopping and returning // the error from Run. managerErr chan error @@ -2896,7 +2897,7 @@ func TestTransactionManager(t *testing.T) { managerRunning = true managerErr = make(chan error) - transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer, &transaction.MockManager{}) installHooks(t, transactionManager, database, hooks{ beforeReadLogEntry: step.Hooks.BeforeApplyLogEntry, beforeResolveRevision: step.Hooks.BeforeAppendLogEntry, @@ -3238,7 +3239,7 @@ func BenchmarkTransactionManager(b *testing.B) { commit1 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents()) commit2 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents(commit1)) - manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), repositoryFactory, noopTransactionFinalizer) + manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), repositoryFactory, noopTransactionFinalizer, &transaction.MockManager{}) managers = append(managers, manager) managerWG.Add(1) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 1447447c0a..feca491bde 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -356,6 +356,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * cfg.Storages, localrepo.NewFactory(gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), gsd.logger, + gsd.txMgr, ) require.NoError(tb, err) tb.Cleanup(partitionManager.Stop) -- GitLab From aecf8d2a4011d8166493c702c472bab10766f92c Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Wed, 17 May 2023 12:28:02 +0300 Subject: [PATCH 4/5] Wire PartitionManager into repository service in tests This commit wires PartitionManager into repository service in tests. PartitionManager is the entry point to the transaction management logic and having it wired up enables integrating and testing the WAL logic. --- internal/git/localrepo/remote_extra_test.go | 1 + internal/git/remoterepo/testhelper_test.go | 1 + internal/gitaly/service/blob/testhelper_test.go | 1 + internal/gitaly/service/cleanup/testhelper_test.go | 1 + internal/gitaly/service/commit/testhelper_test.go | 1 + internal/gitaly/service/conflicts/testhelper_test.go | 1 + internal/gitaly/service/diff/testhelper_test.go | 1 + internal/gitaly/service/hook/testhelper_test.go | 1 + internal/gitaly/service/objectpool/testhelper_test.go | 1 + internal/gitaly/service/operations/testhelper_test.go | 1 + internal/gitaly/service/ref/delete_refs_test.go | 1 + internal/gitaly/service/ref/testhelper_test.go | 1 + internal/gitaly/service/remote/testhelper_test.go | 1 + internal/gitaly/service/remote/update_remote_mirror_test.go | 1 + internal/gitaly/service/repository/server.go | 4 ++++ internal/gitaly/service/repository/testhelper_test.go | 1 + internal/gitaly/service/setup/register.go | 1 + internal/gitaly/service/smarthttp/testhelper_test.go | 1 + internal/gitaly/service/ssh/testhelper_test.go | 1 + internal/praefect/info_service_test.go | 1 + internal/praefect/verifier_test.go | 1 + 21 files changed, 24 insertions(+) diff --git a/internal/git/localrepo/remote_extra_test.go b/internal/git/localrepo/remote_extra_test.go index 69772479c0..5435abdc96 100644 --- a/internal/git/localrepo/remote_extra_test.go +++ b/internal/git/localrepo/remote_extra_test.go @@ -46,6 +46,7 @@ func TestRepo_FetchInternal(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, testserver.WithGitCommandFactory(protocolDetectingFactory)) diff --git a/internal/git/remoterepo/testhelper_test.go b/internal/git/remoterepo/testhelper_test.go index 9e96454872..845a5fc6dc 100644 --- a/internal/git/remoterepo/testhelper_test.go +++ b/internal/git/remoterepo/testhelper_test.go @@ -32,6 +32,7 @@ func setupGitalyServer(t *testing.T) config.Cfg { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterCommitServiceServer(srv, commit.NewServer( deps.GetCfg(), diff --git a/internal/gitaly/service/blob/testhelper_test.go b/internal/gitaly/service/blob/testhelper_test.go index eb34361fee..da0ee1673f 100644 --- a/internal/gitaly/service/blob/testhelper_test.go +++ b/internal/gitaly/service/blob/testhelper_test.go @@ -38,6 +38,7 @@ func setup(tb testing.TB, ctx context.Context) (config.Cfg, gitalypb.BlobService deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }) cfg.SocketPath = addr diff --git a/internal/gitaly/service/cleanup/testhelper_test.go b/internal/gitaly/service/cleanup/testhelper_test.go index 69204c6c1f..b198d8e249 100644 --- a/internal/gitaly/service/cleanup/testhelper_test.go +++ b/internal/gitaly/service/cleanup/testhelper_test.go @@ -50,6 +50,7 @@ func runCleanupServiceServer(t *testing.T, cfg config.Cfg) string { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }) } diff --git a/internal/gitaly/service/commit/testhelper_test.go b/internal/gitaly/service/commit/testhelper_test.go index 9428bddd8b..7e5f69f272 100644 --- a/internal/gitaly/service/commit/testhelper_test.go +++ b/internal/gitaly/service/commit/testhelper_test.go @@ -85,6 +85,7 @@ func startTestServices(tb testing.TB, cfg config.Cfg, opts ...testserver.GitalyS deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, opts...) } diff --git a/internal/gitaly/service/conflicts/testhelper_test.go b/internal/gitaly/service/conflicts/testhelper_test.go index 848a4b132c..730d64651e 100644 --- a/internal/gitaly/service/conflicts/testhelper_test.go +++ b/internal/gitaly/service/conflicts/testhelper_test.go @@ -70,6 +70,7 @@ func runConflictsServer(tb testing.TB, cfg config.Cfg, hookManager hook.Manager) deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/service/diff/testhelper_test.go b/internal/gitaly/service/diff/testhelper_test.go index 234aae21b9..894876641c 100644 --- a/internal/gitaly/service/diff/testhelper_test.go +++ b/internal/gitaly/service/diff/testhelper_test.go @@ -49,6 +49,7 @@ func setupDiffServiceWithoutRepo(tb testing.TB, opt ...testserver.GitalyServerOp deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, opt...) cfg.SocketPath = addr diff --git a/internal/gitaly/service/hook/testhelper_test.go b/internal/gitaly/service/hook/testhelper_test.go index f6ab54d5a6..0ae9e57348 100644 --- a/internal/gitaly/service/hook/testhelper_test.go +++ b/internal/gitaly/service/hook/testhelper_test.go @@ -81,6 +81,7 @@ func runHooksServer(tb testing.TB, cfg config.Cfg, opts []serverOption, serverOp deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, serverOpts...) } diff --git a/internal/gitaly/service/objectpool/testhelper_test.go b/internal/gitaly/service/objectpool/testhelper_test.go index 6e9a1133a8..26c77eef5b 100644 --- a/internal/gitaly/service/objectpool/testhelper_test.go +++ b/internal/gitaly/service/objectpool/testhelper_test.go @@ -83,6 +83,7 @@ func runObjectPoolServer(t *testing.T, cfg config.Cfg, locator storage.Locator, deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, append(opts, testserver.WithLocator(locator), testserver.WithLogger(logger))...) } diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go index 5549739e8b..7a03dc9f02 100644 --- a/internal/gitaly/service/operations/testhelper_test.go +++ b/internal/gitaly/service/operations/testhelper_test.go @@ -112,6 +112,7 @@ func runOperationServiceServer(tb testing.TB, cfg config.Cfg, options ...testser deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterRefServiceServer(srv, ref.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/service/ref/delete_refs_test.go b/internal/gitaly/service/ref/delete_refs_test.go index 831c21663d..aa61414894 100644 --- a/internal/gitaly/service/ref/delete_refs_test.go +++ b/internal/gitaly/service/ref/delete_refs_test.go @@ -118,6 +118,7 @@ func TestDeleteRefs_transaction(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) }, testserver.WithTransactionManager(txManager)) diff --git a/internal/gitaly/service/ref/testhelper_test.go b/internal/gitaly/service/ref/testhelper_test.go index c4d72d4187..31edcfeeed 100644 --- a/internal/gitaly/service/ref/testhelper_test.go +++ b/internal/gitaly/service/ref/testhelper_test.go @@ -75,6 +75,7 @@ func runRefServiceServer(tb testing.TB, cfg config.Cfg) string { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }) } diff --git a/internal/gitaly/service/remote/testhelper_test.go b/internal/gitaly/service/remote/testhelper_test.go index d1c03eeca4..e132da125a 100644 --- a/internal/gitaly/service/remote/testhelper_test.go +++ b/internal/gitaly/service/remote/testhelper_test.go @@ -41,6 +41,7 @@ func setupRemoteService(t *testing.T, ctx context.Context, opts ...testserver.Gi deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, opts...) cfg.SocketPath = addr diff --git a/internal/gitaly/service/remote/update_remote_mirror_test.go b/internal/gitaly/service/remote/update_remote_mirror_test.go index 897f539858..7eada23e16 100644 --- a/internal/gitaly/service/remote/update_remote_mirror_test.go +++ b/internal/gitaly/service/remote/update_remote_mirror_test.go @@ -589,6 +589,7 @@ func TestUpdateRemoteMirror(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }) cfg.SocketPath = addr diff --git a/internal/gitaly/service/repository/server.go b/internal/gitaly/service/repository/server.go index 6b3f8821df..3f5176832d 100644 --- a/internal/gitaly/service/repository/server.go +++ b/internal/gitaly/service/repository/server.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" @@ -30,6 +31,7 @@ type server struct { catfileCache catfile.Cache git2goExecutor *git2go.Executor housekeepingManager housekeeping.Manager + partitionManager *gitaly.PartitionManager licenseCache *unarycache.Cache[git.ObjectID, *gitalypb.FindLicenseResponse] } @@ -44,6 +46,7 @@ func NewServer( connsPool *client.Pool, git2goExecutor *git2go.Executor, housekeepingManager housekeeping.Manager, + partitionManager *gitaly.PartitionManager, ) gitalypb.RepositoryServiceServer { return &server{ locator: locator, @@ -55,6 +58,7 @@ func NewServer( catfileCache: catfileCache, git2goExecutor: git2goExecutor, housekeepingManager: housekeepingManager, + partitionManager: partitionManager, licenseCache: newLicenseCache(), } diff --git a/internal/gitaly/service/repository/testhelper_test.go b/internal/gitaly/service/repository/testhelper_test.go index a114a9b917..cb7d2f0957 100644 --- a/internal/gitaly/service/repository/testhelper_test.go +++ b/internal/gitaly/service/repository/testhelper_test.go @@ -66,6 +66,7 @@ func runRepositoryService(tb testing.TB, cfg config.Cfg, opts ...testserver.Gita deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterHookServiceServer(srv, hookservice.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) gitalypb.RegisterRemoteServiceServer(srv, remote.NewServer( diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 50f0482c19..93e6cb7749 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -99,6 +99,7 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterSSHServiceServer(srv, ssh.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/service/smarthttp/testhelper_test.go b/internal/gitaly/service/smarthttp/testhelper_test.go index 124ff76fcc..b1b31b4517 100644 --- a/internal/gitaly/service/smarthttp/testhelper_test.go +++ b/internal/gitaly/service/smarthttp/testhelper_test.go @@ -46,6 +46,7 @@ func startSmartHTTPServerWithOptions(t *testing.T, cfg config.Cfg, opts []Server deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterObjectPoolServiceServer(srv, objectpool.NewServer( deps.GetLocator(), diff --git a/internal/gitaly/service/ssh/testhelper_test.go b/internal/gitaly/service/ssh/testhelper_test.go index c91b4a9919..4c26bf6264 100644 --- a/internal/gitaly/service/ssh/testhelper_test.go +++ b/internal/gitaly/service/ssh/testhelper_test.go @@ -51,6 +51,7 @@ func startSSHServerWithOptions(t *testing.T, cfg config.Cfg, opts []ServerOpt, s deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) gitalypb.RegisterObjectPoolServiceServer(srv, objectpool.NewServer( deps.GetLocator(), diff --git a/internal/praefect/info_service_test.go b/internal/praefect/info_service_test.go index e1f9891499..1838b5610e 100644 --- a/internal/praefect/info_service_test.go +++ b/internal/praefect/info_service_test.go @@ -47,6 +47,7 @@ func TestInfoService_RepositoryReplicas(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )) }, testserver.WithDisablePraefect()) cfgNodes = append(cfgNodes, &config.Node{ diff --git a/internal/praefect/verifier_test.go b/internal/praefect/verifier_test.go index f7938c34a3..478c6b2108 100644 --- a/internal/praefect/verifier_test.go +++ b/internal/praefect/verifier_test.go @@ -486,6 +486,7 @@ func TestVerifier(t *testing.T) { deps.GetConnsPool(), deps.GetGit2goExecutor(), deps.GetHousekeepingManager(), + deps.GetPartitionManager(), )}) } } -- GitLab From 5aad2f1e119ac88cdfd818f88c59e7b6752c0873 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Wed, 17 May 2023 14:31:16 +0300 Subject: [PATCH 5/5] Integrate WAL into WriteRef This commit implements WAL support into WriteRef. If the WAL is enabled, as determined by whether the PartitionManager is set, the RPC performs its update through it. External behavior should be unchanged. --- .../gitaly/service/repository/write_ref.go | 51 +++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/internal/gitaly/service/repository/write_ref.go b/internal/gitaly/service/repository/write_ref.go index eeb32561c8..b37e8d1a39 100644 --- a/internal/gitaly/service/repository/write_ref.go +++ b/internal/gitaly/service/repository/write_ref.go @@ -8,7 +8,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/txutil" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -17,8 +19,15 @@ func (s *server) WriteRef(ctx context.Context, req *gitalypb.WriteRefRequest) (* if err := validateWriteRefRequest(req); err != nil { return nil, structerr.NewInvalidArgument("%w", err) } - if err := s.writeRef(ctx, req); err != nil { - return nil, structerr.NewInternal("%w", err) + + if s.partitionManager != nil { + if err := s.writeRefWAL(ctx, req); err != nil { + return nil, err + } + } else { + if err := s.writeRef(ctx, req); err != nil { + return nil, structerr.NewInternal("%w", err) + } } return &gitalypb.WriteRefResponse{}, nil @@ -38,7 +47,30 @@ func (s *server) writeRef(ctx context.Context, req *gitalypb.WriteRefRequest) er return updateRef(ctx, repo, req) } -func updateRef(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRefRequest) (returnedErr error) { +func (s *server) writeRefWAL(ctx context.Context, req *gitalypb.WriteRefRequest) error { + tx, err := s.partitionManager.Begin(ctx, req.GetRepository()) + if err != nil { + return fmt.Errorf("begin: %w", err) + } + defer txutil.LogRollback(ctx, tx) + + if string(req.Ref) == "HEAD" { + tx.SetDefaultBranch(git.ReferenceName(req.GetRevision())) + } else { + oldOID, newOID, err := resolveObjectIDs(ctx, s.localrepo(req.GetRepository()), req) + if err != nil { + return err + } + + tx.UpdateReferences(gitaly.ReferenceUpdates{ + git.ReferenceName(req.GetRef()): {Force: oldOID == "", OldOID: oldOID, NewOID: newOID}, + }) + } + + return tx.Commit(ctx) +} + +func resolveObjectIDs(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRefRequest) (git.ObjectID, git.ObjectID, error) { var newObjectID git.ObjectID if git.ObjectHashSHA1.IsZeroOID(git.ObjectID(req.GetRevision())) { // Passing the all-zeroes object ID as new value means that we should delete the @@ -52,7 +84,7 @@ func updateRef(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRef var err error newObjectID, err = repo.ResolveRevision(ctx, git.Revision(req.GetRevision())+"^{object}") if err != nil { - return fmt.Errorf("resolving new revision: %w", err) + return "", "", fmt.Errorf("resolving new revision: %w", err) } } @@ -66,11 +98,20 @@ func updateRef(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRef var err error oldObjectID, err = repo.ResolveRevision(ctx, git.Revision(req.GetOldRevision())+"^{object}") if err != nil { - return fmt.Errorf("resolving old revision: %w", err) + return "", "", fmt.Errorf("resolving old revision: %w", err) } } } + return oldObjectID, newObjectID, nil +} + +func updateRef(ctx context.Context, repo *localrepo.Repo, req *gitalypb.WriteRefRequest) (returnedErr error) { + oldObjectID, newObjectID, err := resolveObjectIDs(ctx, repo, req) + if err != nil { + return err + } + u, err := updateref.New(ctx, repo) if err != nil { return fmt.Errorf("error when running creating new updater: %w", err) -- GitLab