From 864525a7ac27bd647ba7683ff1066bcc3f48700a Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Wed, 10 May 2023 12:06:32 +0300 Subject: [PATCH 01/17] Make storages fully independent in PartitionManager Gitaly's PartitionManager is responsible for managing partitions. This entails spawning TransactionManagers as needed to process transactions. Currently all of the partitions share the same database and staging directory. This is not correct as the storages generally reside on different disks. Doing copy free moves of files and hard links across disks is not possible. TransactionManager relies on this functionality to log pack files without copying them. Each storage should have their own embedded database. The storages are meant to be fully independent of each other and should keep functioning even if one of them fails or is removed, so they need to contain all of the data they need to function. If they share the database, all storages will lose their state if the disk hosting the storage holding the shared database fails. There's also a performance benefit for having a database separately for each storage, as the database performance on a single storage doesn't affect other storages if all of them have their own. It's also not necessary to synchronize reads or writes between storages. If the transactions target different storages, they are clearly working on separate pieces of data. This commit addresses the above points and makes the storages fully independent of each other by: - Giving each storage their own staging directory. - Giving each storage their own database. - Using separate locks for accessing data on different storages. Together, these reduce unnecessary contention transactions to differnet storages. As each storage is self contains all state, they become independent failure domains. They can also be moved between nodes freely as they don't depend on data in other storages. --- internal/gitaly/partition_manager.go | 230 ++++++++++------- internal/gitaly/partition_manager_test.go | 285 ++++++++++++++-------- 2 files changed, 318 insertions(+), 197 deletions(-) diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go index 408d5f4a32..feee61ce88 100644 --- a/internal/gitaly/partition_manager.go +++ b/internal/gitaly/partition_manager.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "io/fs" "os" + "path/filepath" "sync" "github.com/dgraph-io/badger/v3" @@ -13,6 +15,8 @@ import ( repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" + "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" ) @@ -21,29 +25,66 @@ var ErrPartitionManagerStopped = errors.New("partition manager stopped") // PartitionManager is responsible for managing the lifecycle of each TransactionManager. type PartitionManager struct { - // mu is the mutex to synchronize access to the partitions. + // storages are the storages configured in this Gitaly server. The map is keyed by the storage name. + storages map[string]*storagePartition +} + +// storagePartition represents a single storage. +type storagePartition struct { + // mu synchronizes access to the fields of storagePartition. mu sync.Mutex - // db is the handle to the key-value store used for storing the write-ahead log related state. - // It is used to create each transaction manager. - db *badger.DB - // partitions contains all the active partitions for which there are pending transactions. - // Each repository can have up to one partition. - partitions map[string]*partition - // localRepoFactory is used by PartitionManager to construct `localrepo.Repo`. - localRepoFactory localrepo.Factory - // logger handles all logging for PartitionManager. + // logger handles all logging for storagePartition. logger logrus.FieldLogger - // stopped tracks whether the PartitionManager has been stopped. If the manager is stopped, - // no new transactions are allowed to begin. - stopped bool - // partitionsWG keeps track of running partitions. - partitionsWG sync.WaitGroup + // path is the absolute path to the storage's root. + path string + // repoFactory is a factory type that builds localrepo instances for this storage. + repoFactory localrepo.StorageScopedFactory // stagingDirectory is the directory where all of the TransactionManager staging directories // should be created. stagingDirectory string - // storages are the storages configured in this Gitaly server. They are keyed by the name and the - // value is the storage's path. - storages map[string]string + // stopped tracks whether the storagePartition has been stopped. If it is is stopped, + // no new transactions are allowed to begin. + stopped bool + // db is the handle to the key-value store used for storing the storage's database state. + database *badger.DB + // partitions contains all the active partitions. Each repository can have up to one partition. + partitions map[string]*partition + // activePartitions keeps track of active partitions. + activePartitions sync.WaitGroup +} + +func (sp *storagePartition) stop() { + sp.mu.Lock() + // Mark the storage as stopped so no new transactions can begin anymore. This + // also means no more partitions are spawned. + sp.stopped = true + for _, ptn := range sp.partitions { + // Stop all partitions. + ptn.stop() + } + sp.mu.Unlock() + + // Wait for all partitions to finish. + sp.activePartitions.Wait() + + if err := sp.database.Close(); err != nil { + sp.logger.WithError(err).Error("failed closing storage's database") + } +} + +// transactionFinalizerFactory is executed when a transaction completes. The pending transaction counter +// for the partition is decremented by one and TransactionManager stopped if there are no longer +// any pending transactions. +func (sp *storagePartition) transactionFinalizerFactory(ptn *partition) func() { + return func() { + sp.mu.Lock() + defer sp.mu.Unlock() + + ptn.pendingTransactionCount-- + if ptn.pendingTransactionCount == 0 { + ptn.stop() + } + } } // partition contains the transaction manager and tracks the number of in-flight transactions for the partition. @@ -60,21 +101,61 @@ type partition struct { pendingTransactionCount uint } +// stop stops the partition's transaction manager. +func (ptn *partition) stop() { + ptn.shuttingDown = true + ptn.transactionManager.Stop() +} + // NewPartitionManager returns a new PartitionManager. -func NewPartitionManager(db *badger.DB, storages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger, stagingDir string) *PartitionManager { - storagesMap := make(map[string]string, len(storages)) - for _, storage := range storages { - storagesMap[storage.Name] = storage.Path - } +func NewPartitionManager(configuredStorages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger) (*PartitionManager, error) { + storages := make(map[string]*storagePartition, len(configuredStorages)) + for _, storage := range configuredStorages { + repoFactory, err := localRepoFactory.ScopeByStorage(storage.Name) + if err != nil { + return nil, fmt.Errorf("scope by storage: %w", err) + } - return &PartitionManager{ - db: db, - partitions: make(map[string]*partition), - localRepoFactory: localRepoFactory, - logger: logger, - stagingDirectory: stagingDir, - storages: storagesMap, + stagingDir := stagingDirectoryPath(storage.Path) + // Remove a possible already existing staging directory as it may contain stale files + // if the previous process didn't shutdown gracefully. + if err := os.RemoveAll(stagingDir); err != nil { + return nil, fmt.Errorf("failed clearing storage's staging directory: %w", err) + } + + if err := os.Mkdir(stagingDir, perm.PrivateDir); err != nil { + return nil, fmt.Errorf("create storage's staging directory: %w", err) + } + + databaseDir := filepath.Join(storage.Path, "database") + if err := os.Mkdir(databaseDir, perm.PrivateDir); err != nil && !errors.Is(err, fs.ErrExist) { + return nil, fmt.Errorf("create storage's database directory: %w", err) + } + + if err := safe.NewSyncer().SyncHierarchy(storage.Path, "database"); err != nil { + return nil, fmt.Errorf("sync database directory: %w", err) + } + + db, err := OpenDatabase(databaseDir) + if err != nil { + return nil, fmt.Errorf("create storage's database directory: %w", err) + } + + storages[storage.Name] = &storagePartition{ + logger: logrus.WithField("storage", storage.Name), + path: storage.Path, + repoFactory: repoFactory, + stagingDirectory: stagingDir, + database: db, + partitions: map[string]*partition{}, + } } + + return &PartitionManager{storages: storages}, nil +} + +func stagingDirectoryPath(storagePath string) string { + return filepath.Join(storagePath, "staging") } // getPartitionKey returns a partitions's key. @@ -86,12 +167,12 @@ func getPartitionKey(storageName, relativePath string) string { // TransactionManager is not already running, a new one is created and used. The partition tracks // the number of pending transactions and this counter gets incremented when Begin is invoked. func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Transaction, error) { - storagePath, ok := pm.storages[repo.GetStorageName()] + storagePtn, ok := pm.storages[repo.GetStorageName()] if !ok { return nil, structerr.NewNotFound("unknown storage: %q", repo.GetStorageName()) } - relativePath, err := storage.ValidateRelativePath(storagePath, repo.GetRelativePath()) + relativePath, err := storage.ValidateRelativePath(storagePtn.path, repo.GetRelativePath()) if err != nil { return nil, structerr.NewInvalidArgument("validate relative path: %w", err) } @@ -99,56 +180,50 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran partitionKey := getPartitionKey(repo.GetStorageName(), relativePath) for { - pm.mu.Lock() - if pm.stopped { - pm.mu.Unlock() + storagePtn.mu.Lock() + if storagePtn.stopped { + storagePtn.mu.Unlock() return nil, ErrPartitionManagerStopped } - ptn, ok := pm.partitions[partitionKey] + ptn, ok := storagePtn.partitions[partitionKey] if !ok { ptn = &partition{ shutdown: make(chan struct{}), } - stagingDir, err := os.MkdirTemp(pm.stagingDirectory, "") + stagingDir, err := os.MkdirTemp(storagePtn.stagingDirectory, "") if err != nil { - pm.mu.Unlock() + storagePtn.mu.Unlock() return nil, fmt.Errorf("create staging directory: %w", err) } - storageScopedFactory, err := pm.localRepoFactory.ScopeByStorage(repo.GetStorageName()) - if err != nil { - pm.mu.Unlock() - return nil, fmt.Errorf("scope by storage: %w", err) - } - - mgr := NewTransactionManager(pm.db, storagePath, relativePath, stagingDir, storageScopedFactory, pm.transactionFinalizerFactory(ptn)) + mgr := NewTransactionManager(storagePtn.database, storagePtn.path, relativePath, stagingDir, storagePtn.repoFactory, storagePtn.transactionFinalizerFactory(ptn)) ptn.transactionManager = mgr - pm.partitions[partitionKey] = ptn + storagePtn.partitions[partitionKey] = ptn - pm.partitionsWG.Add(1) + storagePtn.activePartitions.Add(1) go func() { if err := mgr.Run(); err != nil { - pm.logger.WithError(err).Error("partition failed") + storagePtn.logger.WithError(err).Error("partition failed") } // In the event that TransactionManager stops running, a new TransactionManager will // need to be started in order to continue processing transactions. The partition is // deleted allowing the next transaction for the repository to create a new partition // and TransactionManager. - pm.mu.Lock() - delete(pm.partitions, partitionKey) - pm.mu.Unlock() + storagePtn.mu.Lock() + delete(storagePtn.partitions, partitionKey) + storagePtn.mu.Unlock() close(ptn.shutdown) if err := os.RemoveAll(stagingDir); err != nil { - pm.logger.WithError(err).Error("failed removing partition's staging directory") + storagePtn.logger.WithError(err).Error("failed removing partition's staging directory") } - pm.partitionsWG.Done() + storagePtn.activePartitions.Done() }() } @@ -157,7 +232,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran // used. The lock is released while waiting for the partition to complete shutdown as to // not block other partitions from processing transactions. Once shutdown is complete, a // new attempt is made to get a valid partition. - pm.mu.Unlock() + storagePtn.mu.Unlock() select { case <-ctx.Done(): return nil, ctx.Err() @@ -168,7 +243,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran } ptn.pendingTransactionCount++ - pm.mu.Unlock() + storagePtn.mu.Unlock() transaction, err := ptn.transactionManager.Begin(ctx) if err != nil { @@ -176,7 +251,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran // inflight. A transaction failing does not necessarily mean the transaction manager has // stopped running. Consequently, if there are no other pending transactions the partition // should be stopped. - pm.transactionFinalizerFactory(ptn)() + storagePtn.transactionFinalizerFactory(ptn)() return nil, err } @@ -185,40 +260,17 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran } } -// Stop stops transaction processing for all running transaction managers and waits for shutdown -// completion. +// Stop stops transaction processing for all storages and waits for shutdown completion. func (pm *PartitionManager) Stop() { - pm.mu.Lock() - // Mark the PartitionManager as stopped so no new transactions can begin anymore. This - // also means no more partitions are spawned. - pm.stopped = true - for _, ptn := range pm.partitions { - // Stop all partitions. - ptn.stop() + var activeStorages sync.WaitGroup + for _, storagePtn := range pm.storages { + activeStorages.Add(1) + storagePtn := storagePtn + go func() { + storagePtn.stop() + activeStorages.Done() + }() } - pm.mu.Unlock() - - // Wait for all goroutines to complete. - pm.partitionsWG.Wait() -} - -// stop stops the partition's transaction manager. -func (ptn *partition) stop() { - ptn.shuttingDown = true - ptn.transactionManager.Stop() -} -// transactionFinalizerFactory is executed when a transaction completes. The pending transaction counter -// for the partition is decremented by one and TransactionManager stopped if there are no longer -// any pending transactions. -func (pm *PartitionManager) transactionFinalizerFactory(ptn *partition) func() { - return func() { - pm.mu.Lock() - defer pm.mu.Unlock() - - ptn.pendingTransactionCount-- - if ptn.pendingTransactionCount == 0 { - ptn.stop() - } - } + activeStorages.Wait() } diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go index aeb26f7e30..c20f8f7ad7 100644 --- a/internal/gitaly/partition_manager_test.go +++ b/internal/gitaly/partition_manager_test.go @@ -40,9 +40,9 @@ func TestPartitionManager(t *testing.T) { ctx context.Context // repo is the repository that the transaction belongs to. repo repo.GitRepo - // expectedState contains the expected repositories and their pending transaction count at + // expectedState contains the partitions by their storages and their pending transaction count at // the end of the step. - expectedState map[string]uint + expectedState map[string]map[string]uint // expectedError is the error expected to be returned when beginning the transaction. expectedError error } @@ -53,9 +53,9 @@ func TestPartitionManager(t *testing.T) { transactionID int // ctx is the context used when `Commit()` gets invoked. ctx context.Context - // expectedState contains the expected repositories and their pending transaction count at + // expectedState contains the partitions by their storages and their pending transaction count at // the end of the step. - expectedState map[string]uint + expectedState map[string]map[string]uint // expectedError is the error that is expected to be returned when committing the transaction. expectedError error } @@ -64,9 +64,9 @@ func TestPartitionManager(t *testing.T) { type rollback struct { // transactionID identifies the transaction to rollback. transactionID int - // expectedState contains the expected repositories and their pending transaction count at + // expectedState contains the partitions by their storages and their pending transaction count at // the end of the step. - expectedState map[string]uint + expectedState map[string]map[string]uint } // stopPartition stops the transaction manager for the specified repository. This is done to @@ -87,33 +87,52 @@ func TestPartitionManager(t *testing.T) { // being processed without a running partition manager. type stopManager struct{} - // blockOnPartitionShutdown checks if the specified partition is currently in the process of - // shutting down. If it is, the function waits for the shutdown process to complete before + // blockOnPartitionShutdown checks if any partitions are currently in the process of + // shutting down. If some are, the function waits for the shutdown process to complete before // continuing. This is required in order to accurately validate partition state. - blockOnPartitionShutdown := func(t *testing.T, ptn *partition) { + blockOnPartitionShutdown := func(t *testing.T, pm *PartitionManager) { t.Helper() - if ptn != nil && ptn.shuttingDown { - <-ptn.shutdown + var waitFor []chan struct{} + for _, sp := range pm.storages { + sp.mu.Lock() + for _, ptn := range sp.partitions { + if ptn.shuttingDown { + waitFor = append(waitFor, ptn.shutdown) + } + } + sp.mu.Unlock() + } + + for _, shutdown := range waitFor { + <-shutdown } } // checkExpectedState validates that the partition manager contains the correct partitions and // associated transaction count at the point of execution. - checkExpectedState := func(t *testing.T, partitionManager *PartitionManager, expectedState map[string]uint) { + checkExpectedState := func(t *testing.T, cfg config.Cfg, partitionManager *PartitionManager, expectedState map[string]map[string]uint) { t.Helper() - require.Equal(t, len(expectedState), len(partitionManager.partitions)) - for k, v := range expectedState { - partition, ok := partitionManager.partitions[k] - require.True(t, ok, "expected partition %q to be present", k) - require.Equal(t, v, partition.pendingTransactionCount) + actualState := map[string]map[string]uint{} + for storageName, storagePtn := range partitionManager.storages { + for partitionKey, partition := range storagePtn.partitions { + if actualState[storageName] == nil { + actualState[storageName] = map[string]uint{} + } + + actualState[storageName][partitionKey] = partition.pendingTransactionCount + } + } + + if expectedState == nil { + expectedState = map[string]map[string]uint{} } - } - cfg := testcfg.Build(t) + require.Equal(t, expectedState, actualState) + } - setupRepository := func(t *testing.T) repo.GitRepo { + setupRepository := func(t *testing.T, cfg config.Cfg) repo.GitRepo { t.Helper() repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ @@ -123,19 +142,11 @@ func TestPartitionManager(t *testing.T) { return repo } - cmdFactory, clean, err := git.NewExecCommandFactory(cfg) - require.NoError(t, err) - t.Cleanup(clean) - - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - localRepoFactory := localrepo.NewFactory(config.NewLocator(cfg), cmdFactory, catfileCache) - // transactionData holds relevant data for each transaction created during a testcase. type transactionData struct { - txn *Transaction - ptn *partition + txn *Transaction + storagePtn *storagePartition + ptn *partition } type setupData struct { @@ -144,19 +155,21 @@ func TestPartitionManager(t *testing.T) { for _, tc := range []struct { desc string - setup func(t *testing.T) setupData + setup func(t *testing.T, cfg config.Cfg) setupData }{ { desc: "transaction committed for single repository", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) return setupData{ steps: steps{ begin{ repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, commit{}, @@ -166,16 +179,18 @@ func TestPartitionManager(t *testing.T) { }, { desc: "two transactions committed for single repository sequentially", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) return setupData{ steps: steps{ begin{ transactionID: 1, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, commit{ @@ -184,8 +199,10 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 2, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, commit{ @@ -197,29 +214,35 @@ func TestPartitionManager(t *testing.T) { }, { desc: "two transactions committed for single repository in parallel", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) return setupData{ steps: steps{ begin{ transactionID: 1, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, begin{ transactionID: 2, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 2, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 2, + }, }, }, commit{ transactionID: 1, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, commit{ @@ -231,31 +254,37 @@ func TestPartitionManager(t *testing.T) { }, { desc: "transaction committed for two repositories", - setup: func(t *testing.T) setupData { - repoA := setupRepository(t) - repoB := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repoA := setupRepository(t, cfg) + repoB := setupRepository(t, cfg) return setupData{ steps: steps{ begin{ transactionID: 1, repo: repoA, - expectedState: map[string]uint{ - getPartitionKey(repoA.GetStorageName(), repoA.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repoA.GetStorageName(), repoA.GetRelativePath()): 1, + }, }, }, begin{ transactionID: 2, repo: repoB, - expectedState: map[string]uint{ - getPartitionKey(repoA.GetStorageName(), repoA.GetRelativePath()): 1, - getPartitionKey(repoB.GetStorageName(), repoB.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repoA.GetStorageName(), repoA.GetRelativePath()): 1, + getPartitionKey(repoB.GetStorageName(), repoB.GetRelativePath()): 1, + }, }, }, commit{ transactionID: 1, - expectedState: map[string]uint{ - getPartitionKey(repoB.GetStorageName(), repoB.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repoB.GetStorageName(), repoB.GetRelativePath()): 1, + }, }, }, commit{ @@ -267,15 +296,17 @@ func TestPartitionManager(t *testing.T) { }, { desc: "transaction rolled back for single repository", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) return setupData{ steps: steps{ begin{ repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, rollback{}, @@ -285,8 +316,8 @@ func TestPartitionManager(t *testing.T) { }, { desc: "starting transaction failed due to cancelled context", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) stepCtx, cancel := context.WithCancel(ctx) cancel() @@ -304,8 +335,8 @@ func TestPartitionManager(t *testing.T) { }, { desc: "committing transaction failed due to cancelled context", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) stepCtx, cancel := context.WithCancel(ctx) cancel() @@ -314,8 +345,10 @@ func TestPartitionManager(t *testing.T) { steps: steps{ begin{ repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, commit{ @@ -328,15 +361,17 @@ func TestPartitionManager(t *testing.T) { }, { desc: "committing transaction failed due to stopped transaction manager", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) return setupData{ steps: steps{ begin{ repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, stopPartition{}, @@ -349,16 +384,18 @@ func TestPartitionManager(t *testing.T) { }, { desc: "transaction from previous transaction manager finalized after new manager started", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) return setupData{ steps: steps{ begin{ transactionID: 1, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, stopPartition{ @@ -367,8 +404,10 @@ func TestPartitionManager(t *testing.T) { begin{ transactionID: 2, repo: repo, - expectedState: map[string]uint{ - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + expectedState: map[string]map[string]uint{ + "default": { + getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + }, }, }, finalizeTransaction{ @@ -383,8 +422,8 @@ func TestPartitionManager(t *testing.T) { }, { desc: "transaction started after partition manager stopped", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) return setupData{ steps: steps{ @@ -399,8 +438,8 @@ func TestPartitionManager(t *testing.T) { }, { desc: "multiple transactions started after partition manager stopped", - setup: func(t *testing.T) setupData { - repo := setupRepository(t) + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg) return setupData{ steps: steps{ @@ -424,28 +463,54 @@ func TestPartitionManager(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { t.Parallel() - setup := tc.setup(t) + cfg := testcfg.Build(t) - database, err := OpenDatabase(t.TempDir()) + cmdFactory, clean, err := git.NewExecCommandFactory(cfg) require.NoError(t, err) - defer testhelper.MustClose(t, database) + t.Cleanup(clean) + + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + localRepoFactory := localrepo.NewFactory(config.NewLocator(cfg), cmdFactory, catfileCache) - stagingDir := filepath.Join(t.TempDir(), "staging") - require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir)) + setup := tc.setup(t, cfg) - partitionManager := NewPartitionManager(database, cfg.Storages, localRepoFactory, logrus.StandardLogger(), stagingDir) + // Create some existing content in the staging directory so we can assert it gets removed and + // recreated. + for _, storage := range cfg.Storages { + require.NoError(t, + os.MkdirAll( + filepath.Join(stagingDirectoryPath(storage.Path), "existing-content"), + perm.PrivateDir, + ), + ) + } + + partitionManager, err := NewPartitionManager(cfg.Storages, localRepoFactory, logrus.StandardLogger()) + require.NoError(t, err) defer func() { partitionManager.Stop() - // Assert all staging directories have been removed. - testhelper.RequireDirectoryState(t, stagingDir, "", testhelper.DirectoryState{ - "/": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, - }) + for _, storage := range cfg.Storages { + // Assert all staging directories have been emptied at the end. + testhelper.RequireDirectoryState(t, storage.Path, "staging", testhelper.DirectoryState{ + "/staging": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + }) + } }() + for _, storage := range cfg.Storages { + // Assert the existing content in the staging directory was removed. + testhelper.RequireDirectoryState(t, storage.Path, "staging", testhelper.DirectoryState{ + "/staging": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + }) + } + // openTransactionData holds references to all transactions and its associated partition // created during the testcase. openTransactionData := map[int]*transactionData{} + var partitionManagerStopped bool for _, step := range setup.steps { switch step := step.(type) { case begin: @@ -459,16 +524,18 @@ func TestPartitionManager(t *testing.T) { txn, err := partitionManager.Begin(beginCtx, step.repo) require.Equal(t, step.expectedError, err) - partitionManager.mu.Lock() - ptn := partitionManager.partitions[getPartitionKey(step.repo.GetStorageName(), step.repo.GetRelativePath())] - partitionManager.mu.Unlock() + storagePtn := partitionManager.storages[step.repo.GetStorageName()] + storagePtn.mu.Lock() + ptn := storagePtn.partitions[getPartitionKey(step.repo.GetStorageName(), step.repo.GetRelativePath())] + storagePtn.mu.Unlock() - blockOnPartitionShutdown(t, ptn) - checkExpectedState(t, partitionManager, step.expectedState) + blockOnPartitionShutdown(t, partitionManager) + checkExpectedState(t, cfg, partitionManager, step.expectedState) openTransactionData[step.transactionID] = &transactionData{ - txn: txn, - ptn: ptn, + txn: txn, + storagePtn: storagePtn, + ptn: ptn, } case commit: require.Contains(t, openTransactionData, step.transactionID, "test error: transaction committed before being started") @@ -482,30 +549,32 @@ func TestPartitionManager(t *testing.T) { require.ErrorIs(t, data.txn.Commit(commitCtx), step.expectedError) - blockOnPartitionShutdown(t, data.ptn) - checkExpectedState(t, partitionManager, step.expectedState) + blockOnPartitionShutdown(t, partitionManager) + checkExpectedState(t, cfg, partitionManager, step.expectedState) case rollback: require.Contains(t, openTransactionData, step.transactionID, "test error: transaction rolled back before being started") data := openTransactionData[step.transactionID] require.NoError(t, data.txn.Rollback()) - blockOnPartitionShutdown(t, data.ptn) - checkExpectedState(t, partitionManager, step.expectedState) + blockOnPartitionShutdown(t, partitionManager) + checkExpectedState(t, cfg, partitionManager, step.expectedState) case stopPartition: require.Contains(t, openTransactionData, step.transactionID, "test error: transaction manager stopped before being started") data := openTransactionData[step.transactionID] data.ptn.stop() - blockOnPartitionShutdown(t, data.ptn) + blockOnPartitionShutdown(t, partitionManager) case finalizeTransaction: require.Contains(t, openTransactionData, step.transactionID, "test error: transaction finalized before being started") data := openTransactionData[step.transactionID] - partitionManager.transactionFinalizerFactory(data.ptn)() + + data.storagePtn.transactionFinalizerFactory(data.ptn)() case stopManager: - require.False(t, partitionManager.stopped, "test error: partition manager already stopped") + require.False(t, partitionManagerStopped, "test error: partition manager already stopped") + partitionManagerStopped = true partitionManager.Stop() } -- GitLab From afbe71f7238fb0b646578149297f9c10eb25f97d Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Wed, 10 May 2023 19:09:28 +0300 Subject: [PATCH 02/17] Remove storage name from partition keys Since all storages now have independent state for partitions, it is no longer necessary to prefix the partition keys with the storage name. Remove the storage name component from the partition keys. --- internal/gitaly/partition_manager.go | 13 +++------ internal/gitaly/partition_manager_test.go | 32 +++++++++++------------ 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go index feee61ce88..41ca3418ec 100644 --- a/internal/gitaly/partition_manager.go +++ b/internal/gitaly/partition_manager.go @@ -158,11 +158,6 @@ func stagingDirectoryPath(storagePath string) string { return filepath.Join(storagePath, "staging") } -// getPartitionKey returns a partitions's key. -func getPartitionKey(storageName, relativePath string) string { - return storageName + ":" + relativePath -} - // Begin gets the TransactionManager for the specified repository and starts a Transaction. If a // TransactionManager is not already running, a new one is created and used. The partition tracks // the number of pending transactions and this counter gets incremented when Begin is invoked. @@ -177,8 +172,6 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran return nil, structerr.NewInvalidArgument("validate relative path: %w", err) } - partitionKey := getPartitionKey(repo.GetStorageName(), relativePath) - for { storagePtn.mu.Lock() if storagePtn.stopped { @@ -186,7 +179,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran return nil, ErrPartitionManagerStopped } - ptn, ok := storagePtn.partitions[partitionKey] + ptn, ok := storagePtn.partitions[relativePath] if !ok { ptn = &partition{ shutdown: make(chan struct{}), @@ -201,7 +194,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran mgr := NewTransactionManager(storagePtn.database, storagePtn.path, relativePath, stagingDir, storagePtn.repoFactory, storagePtn.transactionFinalizerFactory(ptn)) ptn.transactionManager = mgr - storagePtn.partitions[partitionKey] = ptn + storagePtn.partitions[relativePath] = ptn storagePtn.activePartitions.Add(1) go func() { @@ -214,7 +207,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran // deleted allowing the next transaction for the repository to create a new partition // and TransactionManager. storagePtn.mu.Lock() - delete(storagePtn.partitions, partitionKey) + delete(storagePtn.partitions, relativePath) storagePtn.mu.Unlock() close(ptn.shutdown) diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go index c20f8f7ad7..6eb0bca7d3 100644 --- a/internal/gitaly/partition_manager_test.go +++ b/internal/gitaly/partition_manager_test.go @@ -168,7 +168,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -189,7 +189,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -201,7 +201,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -224,7 +224,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -233,7 +233,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 2, + repo.GetRelativePath(): 2, }, }, }, @@ -241,7 +241,7 @@ func TestPartitionManager(t *testing.T) { transactionID: 1, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -265,7 +265,7 @@ func TestPartitionManager(t *testing.T) { repo: repoA, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repoA.GetStorageName(), repoA.GetRelativePath()): 1, + repoA.GetRelativePath(): 1, }, }, }, @@ -274,8 +274,8 @@ func TestPartitionManager(t *testing.T) { repo: repoB, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repoA.GetStorageName(), repoA.GetRelativePath()): 1, - getPartitionKey(repoB.GetStorageName(), repoB.GetRelativePath()): 1, + repoA.GetRelativePath(): 1, + repoB.GetRelativePath(): 1, }, }, }, @@ -283,7 +283,7 @@ func TestPartitionManager(t *testing.T) { transactionID: 1, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repoB.GetStorageName(), repoB.GetRelativePath()): 1, + repoB.GetRelativePath(): 1, }, }, }, @@ -305,7 +305,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -347,7 +347,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -370,7 +370,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -394,7 +394,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -406,7 +406,7 @@ func TestPartitionManager(t *testing.T) { repo: repo, expectedState: map[string]map[string]uint{ "default": { - getPartitionKey(repo.GetStorageName(), repo.GetRelativePath()): 1, + repo.GetRelativePath(): 1, }, }, }, @@ -526,7 +526,7 @@ func TestPartitionManager(t *testing.T) { storagePtn := partitionManager.storages[step.repo.GetStorageName()] storagePtn.mu.Lock() - ptn := storagePtn.partitions[getPartitionKey(step.repo.GetStorageName(), step.repo.GetRelativePath())] + ptn := storagePtn.partitions[step.repo.GetRelativePath()] storagePtn.mu.Unlock() blockOnPartitionShutdown(t, partitionManager) -- GitLab From 4dc0f439518a4f924746817ae69c580a83368e0d Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Wed, 10 May 2023 19:16:37 +0300 Subject: [PATCH 03/17] Extend PartitionManager tests to cover more scenarios This commit extends PartitionManager's test to cover: 1. Multiple storages receiving transactions 2. Transaction for a non-existent storage. 3. Relative path cleaning and validation --- internal/gitaly/partition_manager_test.go | 116 ++++++++++++++++++---- 1 file changed, 98 insertions(+), 18 deletions(-) diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go index 6eb0bca7d3..d041c250c5 100644 --- a/internal/gitaly/partition_manager_test.go +++ b/internal/gitaly/partition_manager_test.go @@ -16,8 +16,10 @@ import ( repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func TestPartitionManager(t *testing.T) { @@ -132,10 +134,11 @@ func TestPartitionManager(t *testing.T) { require.Equal(t, expectedState, actualState) } - setupRepository := func(t *testing.T, cfg config.Cfg) repo.GitRepo { + setupRepository := func(t *testing.T, cfg config.Cfg, storage config.Storage) repo.GitRepo { t.Helper() repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + Storage: storage, SkipCreationViaService: true, }) @@ -160,7 +163,7 @@ func TestPartitionManager(t *testing.T) { { desc: "transaction committed for single repository", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -180,7 +183,7 @@ func TestPartitionManager(t *testing.T) { { desc: "two transactions committed for single repository sequentially", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -215,7 +218,7 @@ func TestPartitionManager(t *testing.T) { { desc: "two transactions committed for single repository in parallel", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -253,10 +256,11 @@ func TestPartitionManager(t *testing.T) { }, }, { - desc: "transaction committed for two repositories", + desc: "transaction committed for multiple repositories", setup: func(t *testing.T, cfg config.Cfg) setupData { - repoA := setupRepository(t, cfg) - repoB := setupRepository(t, cfg) + repoA := setupRepository(t, cfg, cfg.Storages[0]) + repoB := setupRepository(t, cfg, cfg.Storages[0]) + repoC := setupRepository(t, cfg, cfg.Storages[1]) return setupData{ steps: steps{ @@ -279,16 +283,40 @@ func TestPartitionManager(t *testing.T) { }, }, }, + begin{ + transactionID: 3, + repo: repoC, + expectedState: map[string]map[string]uint{ + "default": { + repoA.GetRelativePath(): 1, + repoB.GetRelativePath(): 1, + }, + "other-storage": { + repoC.GetRelativePath(): 1, + }, + }, + }, commit{ transactionID: 1, expectedState: map[string]map[string]uint{ "default": { repoB.GetRelativePath(): 1, }, + "other-storage": { + repoC.GetRelativePath(): 1, + }, }, }, commit{ transactionID: 2, + expectedState: map[string]map[string]uint{ + "other-storage": { + repoC.GetRelativePath(): 1, + }, + }, + }, + commit{ + transactionID: 3, }, }, } @@ -297,7 +325,7 @@ func TestPartitionManager(t *testing.T) { { desc: "transaction rolled back for single repository", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -317,7 +345,7 @@ func TestPartitionManager(t *testing.T) { { desc: "starting transaction failed due to cancelled context", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) stepCtx, cancel := context.WithCancel(ctx) cancel() @@ -336,7 +364,7 @@ func TestPartitionManager(t *testing.T) { { desc: "committing transaction failed due to cancelled context", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) stepCtx, cancel := context.WithCancel(ctx) cancel() @@ -362,7 +390,7 @@ func TestPartitionManager(t *testing.T) { { desc: "committing transaction failed due to stopped transaction manager", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -385,7 +413,7 @@ func TestPartitionManager(t *testing.T) { { desc: "transaction from previous transaction manager finalized after new manager started", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -423,7 +451,7 @@ func TestPartitionManager(t *testing.T) { { desc: "transaction started after partition manager stopped", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -439,7 +467,7 @@ func TestPartitionManager(t *testing.T) { { desc: "multiple transactions started after partition manager stopped", setup: func(t *testing.T, cfg config.Cfg) setupData { - repo := setupRepository(t, cfg) + repo := setupRepository(t, cfg, cfg.Storages[0]) return setupData{ steps: steps{ @@ -458,12 +486,60 @@ func TestPartitionManager(t *testing.T) { } }, }, + { + desc: "transaction for a non-existent storage", + setup: func(t *testing.T, cfg config.Cfg) setupData { + return setupData{ + steps: steps{ + begin{ + repo: &gitalypb.Repository{ + StorageName: "non-existent", + }, + expectedError: structerr.NewNotFound("unknown storage: %q", "non-existent"), + }, + }, + } + }, + }, + + { + desc: "relative paths are cleaned", + setup: func(t *testing.T, cfg config.Cfg) setupData { + repo := setupRepository(t, cfg, cfg.Storages[0]) + + return setupData{ + steps: steps{ + begin{ + transactionID: 1, + repo: repo, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 1, + }, + }, + }, + begin{ + transactionID: 2, + repo: &gitalypb.Repository{ + StorageName: repo.GetStorageName(), + RelativePath: filepath.Join(repo.GetRelativePath(), "child-dir", ".."), + }, + expectedState: map[string]map[string]uint{ + "default": { + repo.GetRelativePath(): 2, + }, + }, + }, + }, + } + }, + }, } { tc := tc t.Run(tc.desc, func(t *testing.T) { t.Parallel() - cfg := testcfg.Build(t) + cfg := testcfg.Build(t, testcfg.WithStorages("default", "other-storage")) cmdFactory, clean, err := git.NewExecCommandFactory(cfg) require.NoError(t, err) @@ -524,14 +600,18 @@ func TestPartitionManager(t *testing.T) { txn, err := partitionManager.Begin(beginCtx, step.repo) require.Equal(t, step.expectedError, err) + blockOnPartitionShutdown(t, partitionManager) + checkExpectedState(t, cfg, partitionManager, step.expectedState) + + if err != nil { + continue + } + storagePtn := partitionManager.storages[step.repo.GetStorageName()] storagePtn.mu.Lock() ptn := storagePtn.partitions[step.repo.GetRelativePath()] storagePtn.mu.Unlock() - blockOnPartitionShutdown(t, partitionManager) - checkExpectedState(t, cfg, partitionManager, step.expectedState) - openTransactionData[step.transactionID] = &transactionData{ txn: txn, storagePtn: storagePtn, -- GitLab From 0fcc02676a2952cc06b2a86ad733b27f35141407 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Wed, 10 May 2023 19:35:09 +0300 Subject: [PATCH 04/17] Include partition's ID when logging error messages related to it PartitionManager is currently not logging the partition's ID when logging error messages related to it. This makes it difficult to identify which partition faced errors. Include the ID in the logs. --- internal/gitaly/partition_manager.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go index 41ca3418ec..52a4a0c5f2 100644 --- a/internal/gitaly/partition_manager.go +++ b/internal/gitaly/partition_manager.go @@ -198,8 +198,10 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran storagePtn.activePartitions.Add(1) go func() { + logger := storagePtn.logger.WithField("partition", relativePath) + if err := mgr.Run(); err != nil { - storagePtn.logger.WithError(err).Error("partition failed") + logger.WithError(err).Error("partition failed") } // In the event that TransactionManager stops running, a new TransactionManager will @@ -213,7 +215,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran close(ptn.shutdown) if err := os.RemoveAll(stagingDir); err != nil { - storagePtn.logger.WithError(err).Error("failed removing partition's staging directory") + logger.WithError(err).Error("failed removing partition's staging directory") } storagePtn.activePartitions.Done() -- GitLab From 554d3468f594745273b196111be9ca49fab191d4 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 16 May 2023 12:35:56 +0300 Subject: [PATCH 05/17] Split UpdaterWithHooks into a separate package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We're about to integrate Gitaly's WAL logic into the first tests. This is about to create a cyclic import between the ìnternal/gitaly and internal/git/updateref packages. TransactionManager uses the updateref package to perform the refrence updates. UpdaterWithHooks is used from the operations service package to commit reference changes and execute hooks. In order to handle Transaction's from the TransactionManager, the updateref package would have to import them creating a cyclic dependency. UpdaterWithHooks is only used from Gitaly's services, and is not really a utility for working with Git itself. Move it's implementation to internal/gitaly/hook/updateref to break the cycle. It's places as a subpackage in the hook package as it uses it to run the hooks. --- internal/cli/gitaly/serve.go | 2 +- .../{git => gitaly/hook}/updateref/update_with_hooks.go | 3 ++- .../hook}/updateref/update_with_hooks_test.go | 6 +++++- internal/gitaly/server/auth_test.go | 2 +- internal/gitaly/service/conflicts/server.go | 2 +- internal/gitaly/service/dependencies.go | 2 +- internal/gitaly/service/operations/branches.go | 2 +- internal/gitaly/service/operations/cherry_pick.go | 2 +- internal/gitaly/service/operations/commit_files.go | 2 +- internal/gitaly/service/operations/merge.go | 2 +- internal/gitaly/service/operations/rebase.go | 2 +- internal/gitaly/service/operations/revert.go | 2 +- internal/gitaly/service/operations/server.go | 2 +- internal/gitaly/service/operations/submodules.go | 2 +- internal/gitaly/service/operations/tags.go | 2 +- internal/testhelper/testserver/gitaly.go | 2 +- 16 files changed, 21 insertions(+), 16 deletions(-) rename internal/{git => gitaly/hook}/updateref/update_with_hooks.go (98%) rename internal/{git => gitaly/hook}/updateref/update_with_hooks_test.go (99%) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 120f3fdb0b..5f069fd0ed 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -24,12 +24,12 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/sentry" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/maintenance" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" diff --git a/internal/git/updateref/update_with_hooks.go b/internal/gitaly/hook/updateref/update_with_hooks.go similarity index 98% rename from internal/git/updateref/update_with_hooks.go rename to internal/gitaly/hook/updateref/update_with_hooks.go index c2e19b86a6..2c7f15c99f 100644 --- a/internal/git/updateref/update_with_hooks.go +++ b/internal/gitaly/hook/updateref/update_with_hooks.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" @@ -237,7 +238,7 @@ func (u *UpdaterWithHooks) UpdateReference( // is packed, which is obviously a bad thing as Gitaly nodes may be differently packed. We // thus continue to manually drive the reference-transaction hook here, which doesn't have // this problem. - updater, err := New(ctx, repo, WithDisabledTransactions()) + updater, err := updateref.New(ctx, repo, updateref.WithDisabledTransactions()) if err != nil { return fmt.Errorf("creating updater: %w", err) } diff --git a/internal/git/updateref/update_with_hooks_test.go b/internal/gitaly/hook/updateref/update_with_hooks_test.go similarity index 99% rename from internal/git/updateref/update_with_hooks_test.go rename to internal/gitaly/hook/updateref/update_with_hooks_test.go index bea8839bd3..31b74f902c 100644 --- a/internal/git/updateref/update_with_hooks_test.go +++ b/internal/gitaly/hook/updateref/update_with_hooks_test.go @@ -13,9 +13,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" hookservice "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/metadata/featureflag" @@ -26,6 +26,10 @@ import ( "google.golang.org/grpc" ) +func TestMain(m *testing.M) { + testhelper.Run(m) +} + func TestUpdaterWithHooks_UpdateReference_invalidParameters(t *testing.T) { t.Parallel() diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index 566b5f93bd..2e13e32ea1 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -20,10 +20,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/auth" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" diff --git a/internal/gitaly/service/conflicts/server.go b/internal/gitaly/service/conflicts/server.go index 6e540a216c..52833c4356 100644 --- a/internal/gitaly/service/conflicts/server.go +++ b/internal/gitaly/service/conflicts/server.go @@ -6,9 +6,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 90044ff9cf..64d7b1e00d 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -7,10 +7,10 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" diff --git a/internal/gitaly/service/operations/branches.go b/internal/gitaly/service/operations/branches.go index d88e816c37..a5559ceea3 100644 --- a/internal/gitaly/service/operations/branches.go +++ b/internal/gitaly/service/operations/branches.go @@ -6,8 +6,8 @@ import ( "fmt" "gitlab.com/gitlab-org/gitaly/v16/internal/git" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" diff --git a/internal/gitaly/service/operations/cherry_pick.go b/internal/gitaly/service/operations/cherry_pick.go index 879456ef4c..06b3f52be9 100644 --- a/internal/gitaly/service/operations/cherry_pick.go +++ b/internal/gitaly/service/operations/cherry_pick.go @@ -8,8 +8,8 @@ import ( "time" "gitlab.com/gitlab-org/gitaly/v16/internal/git" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) diff --git a/internal/gitaly/service/operations/commit_files.go b/internal/gitaly/service/operations/commit_files.go index 24f376801e..32804947bf 100644 --- a/internal/gitaly/service/operations/commit_files.go +++ b/internal/gitaly/service/operations/commit_files.go @@ -14,8 +14,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/remoterepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" diff --git a/internal/gitaly/service/operations/merge.go b/internal/gitaly/service/operations/merge.go index 4511a23194..ea5194fa44 100644 --- a/internal/gitaly/service/operations/merge.go +++ b/internal/gitaly/service/operations/merge.go @@ -11,9 +11,9 @@ import ( "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" diff --git a/internal/gitaly/service/operations/rebase.go b/internal/gitaly/service/operations/rebase.go index a11cae6878..9441120cc7 100644 --- a/internal/gitaly/service/operations/rebase.go +++ b/internal/gitaly/service/operations/rebase.go @@ -6,8 +6,8 @@ import ( "time" "gitlab.com/gitlab-org/gitaly/v16/internal/git" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" diff --git a/internal/gitaly/service/operations/revert.go b/internal/gitaly/service/operations/revert.go index 8b6e3b6694..48bc6bb512 100644 --- a/internal/gitaly/service/operations/revert.go +++ b/internal/gitaly/service/operations/revert.go @@ -8,8 +8,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/remoterepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) diff --git a/internal/gitaly/service/operations/server.go b/internal/gitaly/service/operations/server.go index 865975f7d0..3ab231f8a9 100644 --- a/internal/gitaly/service/operations/server.go +++ b/internal/gitaly/service/operations/server.go @@ -9,9 +9,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" diff --git a/internal/gitaly/service/operations/submodules.go b/internal/gitaly/service/operations/submodules.go index f5daad03ce..45d93dc6f7 100644 --- a/internal/gitaly/service/operations/submodules.go +++ b/internal/gitaly/service/operations/submodules.go @@ -11,8 +11,8 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" diff --git a/internal/gitaly/service/operations/tags.go b/internal/gitaly/service/operations/tags.go index 944c9132f8..d7518d2d0b 100644 --- a/internal/gitaly/service/operations/tags.go +++ b/internal/gitaly/service/operations/tags.go @@ -11,8 +11,8 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index b9a62e3638..ad4a936c77 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -17,13 +17,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/auth" gitalylog "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/server" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" -- GitLab From 6f90179dc0ae9934ed39df26231225d5d597a449 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 15 May 2023 17:01:37 +0300 Subject: [PATCH 06/17] Acknowledge transactions only after application TransactionManager is currently ackonwledging transaction commits as soon as the transaction has been logged. This is the ultimate behavior we want to end up with. However, when we rollout the WAL, we want the configuration toggle to be safe to toggle back and forth. The current behavior means that if the WAL is toggled off before the log has been fully applied, we'd effectively lose writes. We can avoid this by only acknowledging the writes once they've been successfully applied to the repository. This way the write will be present even if the WAL is toggled off, or it won't have been acknowledged as committed. This also makes it easier to integrate the WAL in our tests. Many of the testst inspect disk state directly and do not interrogate the test state through RPCs. This means that the transactions may not be applied to the repository before the tests run their assertiosn on the state. Acknowledging writes only after they've been applied also synchronizes the tests as a side effect. --- internal/gitaly/transaction_manager.go | 24 ++++++++++++++++++--- internal/gitaly/transaction_manager_test.go | 9 ++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 6a7f4f1436..cde417e8d3 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -400,6 +400,11 @@ type TransactionManager struct { // transactionFinalizer executes when a transaction is completed. transactionFinalizer func() + + // awaitingTransactions contains transactions waiting for their log entry to be applied to + // the repository. It's keyed by the log index the transaction is waiting to be applied and the + // value is the resultChannel that is waiting the result. + awaitingTransactions map[LogIndex]resultChannel } // repository is the localrepo interface used by TransactionManager. @@ -430,6 +435,7 @@ func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir applyNotifications: make(map[LogIndex]chan struct{}), stagingDirectory: stagingDir, transactionFinalizer: transactionFinalizer, + awaitingTransactions: make(map[LogIndex]resultChannel), } } @@ -643,7 +649,7 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { return err } - transaction.result <- func() (commitErr error) { + if err := func() (commitErr error) { logEntry := &gitalypb.LogEntry{} var err error @@ -692,7 +698,12 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } return mgr.appendLogEntry(nextLogIndex, logEntry) - }() + }(); err != nil { + transaction.result <- err + return nil + } + + mgr.awaitingTransactions[mgr.appendedLogIndex] = transaction.result return nil } @@ -1050,7 +1061,7 @@ func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *g } // applyLogEntry reads a log entry at the given index and applies it to the repository. -func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) error { +func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIndex) (returnedErr error) { logEntry, err := mgr.readLogEntry(logIndex) if err != nil { return fmt.Errorf("read log entry: %w", err) @@ -1101,6 +1112,13 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn delete(mgr.applyNotifications, logIndex) close(notificationCh) + // There is no awaiter for a transaction if the transaction manager is recovering + // transactions from the log after starting up. + if resultChan, ok := mgr.awaitingTransactions[logIndex]; ok { + resultChan <- nil + delete(mgr.awaitingTransactions, logIndex) + } + return nil } diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index 966209f6b4..99aeebc00f 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -1077,6 +1077,7 @@ func TestTransactionManager(t *testing.T) { CustomHooksUpdate: &CustomHooksUpdate{ CustomHooksTAR: validCustomHooks(t), }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -1120,6 +1121,7 @@ func TestTransactionManager(t *testing.T) { CustomHooksUpdate: &CustomHooksUpdate{ CustomHooksTAR: validCustomHooks(t), }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -1302,6 +1304,7 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -1346,6 +1349,7 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{}, StartManager{}, @@ -1388,6 +1392,7 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{}, StartManager{}, @@ -1510,6 +1515,7 @@ func TestTransactionManager(t *testing.T) { ReferenceUpdates: ReferenceUpdates{ "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, + ExpectedError: ErrTransactionProcessingStopped, }, }, expectedState: StateAssertion{ @@ -1748,6 +1754,7 @@ func TestTransactionManager(t *testing.T) { DefaultBranchUpdate: &DefaultBranchUpdate{ Reference: "refs/heads/branch2", }, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -1975,6 +1982,7 @@ func TestTransactionManager(t *testing.T) { "refs/heads/main": {OldOID: setup.Commits.First.OID, NewOID: setup.Commits.Third.OID}, }, QuarantinedPacks: [][]byte{setup.Commits.Third.Pack}, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, @@ -2048,6 +2056,7 @@ func TestTransactionManager(t *testing.T) { "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, }, QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + ExpectedError: ErrTransactionProcessingStopped, }, AssertManager{ ExpectedError: errSimulatedCrash, -- GitLab From 5c29154509f7c88f33838419ad04a25b3c98ef48 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 15 May 2023 21:24:01 +0300 Subject: [PATCH 07/17] Don't initialize staging directory twice Transaction's staging directory could currently be initialized multiple times if it is called more than once. Right now the only way would be to call Transaction.QuarantineDirectory() multiple times. This can be convenient in some contexts though, so let's support it. --- internal/gitaly/transaction_manager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index cde417e8d3..7caf6148d3 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -192,6 +192,10 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) } txn.initStagingDirectory = func() error { + if txn.stagingDirectory != "" { + return nil + } + stagingDirectory, err := os.MkdirTemp(mgr.stagingDirectory, "") if err != nil { return fmt.Errorf("mkdir temp: %w", err) -- GitLab From 2a00cf08d137b0121ca41225973270fac21cfdef Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 15 May 2023 19:12:35 +0300 Subject: [PATCH 08/17] Add a missing error assertion to TestUserCommitFiles TestUserCommitFiles doesn't assert that the RPC didn't error if it isn't expected. This leads to a panic in the later assertions when accessing fields of the response when the response is nil. Add the missing error assertion. --- internal/gitaly/service/operations/commit_files_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/gitaly/service/operations/commit_files_test.go b/internal/gitaly/service/operations/commit_files_test.go index b2293d3c8a..c22798fad4 100644 --- a/internal/gitaly/service/operations/commit_files_test.go +++ b/internal/gitaly/service/operations/commit_files_test.go @@ -952,6 +952,7 @@ func TestUserCommitFiles(t *testing.T) { continue } + require.NoError(t, err) require.Equal(t, step.branchCreated, resp.BranchUpdate.BranchCreated, "step %d", i+1) require.Equal(t, step.repoCreated, resp.BranchUpdate.RepoCreated, "step %d", i+1) gittest.RequireTree(t, cfg, repoPath, branch, step.treeEntries) -- GitLab From b7de0c27ca2b4eeaf4299b803e42e9124328f4e3 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 16 May 2023 10:34:39 +0300 Subject: [PATCH 09/17] Break storages in tests only after initialization We're about to wire PartitionManager into Gitaly's test server. On initialization, it creates expected directories and opens a database handle for each storage's database. This does not work in some tests as they configure invalid storages which don't exist. Let's change the tests to break the storages after initialization. Gitaly should anyway be ensure that the storages exist on boot. --- .../gitaly/service/repository/repository_exists_test.go | 4 ++-- internal/gitaly/service/server/disk_stats_test.go | 8 ++++---- internal/gitaly/service/server/info_test.go | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/gitaly/service/repository/repository_exists_test.go b/internal/gitaly/service/repository/repository_exists_test.go index 70e2e39081..b325388bed 100644 --- a/internal/gitaly/service/repository/repository_exists_test.go +++ b/internal/gitaly/service/repository/repository_exists_test.go @@ -21,11 +21,11 @@ func TestRepositoryExists(t *testing.T) { cfgBuilder := testcfg.NewGitalyCfgBuilder(testcfg.WithStorages("default", "other", "broken")) cfg := cfgBuilder.Build(t) - require.NoError(t, os.RemoveAll(cfg.Storages[2].Path), "third storage needs to be invalid") - client, socketPath := runRepositoryService(t, cfg) cfg.SocketPath = socketPath + require.NoError(t, os.RemoveAll(cfg.Storages[2].Path), "third storage needs to be invalid") + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{}) queries := []struct { diff --git a/internal/gitaly/service/server/disk_stats_test.go b/internal/gitaly/service/server/disk_stats_test.go index b445d5499e..8f014e338b 100644 --- a/internal/gitaly/service/server/disk_stats_test.go +++ b/internal/gitaly/service/server/disk_stats_test.go @@ -4,10 +4,10 @@ package server import ( "math" + "os" "testing" "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -15,11 +15,11 @@ import ( ) func TestStorageDiskStatistics(t *testing.T) { - cfg := testcfg.Build(t) - - cfg.Storages = append(cfg.Storages, config.Storage{Name: "broken", Path: "/does/not/exist"}) + cfg := testcfg.Build(t, testcfg.WithStorages("default", "broken")) addr := runServer(t, cfg) + require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid") + client := newServerClient(t, addr) ctx := testhelper.Context(t) diff --git a/internal/gitaly/service/server/info_test.go b/internal/gitaly/service/server/info_test.go index 78c1e14014..19d5c34c2a 100644 --- a/internal/gitaly/service/server/info_test.go +++ b/internal/gitaly/service/server/info_test.go @@ -3,6 +3,7 @@ package server import ( + "os" "testing" "github.com/stretchr/testify/require" @@ -23,11 +24,10 @@ import ( ) func TestGitalyServerInfo(t *testing.T) { - cfg := testcfg.Build(t) - - cfg.Storages = append(cfg.Storages, config.Storage{Name: "broken", Path: "/does/not/exist"}) + cfg := testcfg.Build(t, testcfg.WithStorages("default", "broken")) addr := runServer(t, cfg, testserver.WithDisablePraefect()) + require.NoError(t, os.RemoveAll(cfg.Storages[1].Path), "second storage needs to be invalid") client := newServerClient(t, addr) ctx := testhelper.Context(t) -- GitLab From 30b6b1d1d37515e94fb504ff3b165c36eea614f5 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 15 May 2023 16:05:48 +0300 Subject: [PATCH 10/17] Wire PartitionManager into operations.Server in tests PartitionManager is the entry point for the WAL logic. It handles routing transaction to the correct partitions and the lifecycle of TransactionManagers responsible for those partitions. As a first step into integrating it into our tests, wire the PartitionManager into OperationService tests. As we eventually want run the tests with both WAL enabled and disabled, the PartitionManager is configured only if 'GITALY_TEST_WAL' environment variable is set. All of the users of the WAL will have to check whether or not it is set. This remains the same when we eventually make it possible to enable the WAL logic through the configuration. If set, we'll configure and dependency inject. If not set, we'll leave it nil. testPackObjectsConcurrency was changed to use a unique config for each test case so each of the tests have a unique storages for the PartitionManager to initialize state in. --- internal/gitaly/service/dependencies.go | 7 ++++ .../gitaly/service/hook/pack_objects_test.go | 4 +-- .../service/operations/branches_test.go | 2 ++ internal/gitaly/service/operations/server.go | 36 ++++++++++--------- .../service/operations/testhelper_test.go | 1 + internal/gitaly/service/setup/register.go | 1 + internal/testhelper/testserver/gitaly.go | 15 ++++++++ 7 files changed, 48 insertions(+), 18 deletions(-) diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 64d7b1e00d..2f9ff02544 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -8,6 +8,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" @@ -37,6 +38,7 @@ type Dependencies struct { Git2goExecutor *git2go.Executor UpdaterWithHooks *updateref.UpdaterWithHooks HousekeepingManager housekeeping.Manager + PartitionManager *gitaly.PartitionManager } // GetCfg returns service configuration. @@ -123,3 +125,8 @@ func (dc *Dependencies) GetHousekeepingManager() housekeeping.Manager { func (dc *Dependencies) GetPackObjectsLimiter() limithandler.Limiter { return dc.PackObjectsLimiter } + +// GetPartitionManager returns the PartitionManager. +func (dc *Dependencies) GetPartitionManager() *gitaly.PartitionManager { + return dc.PartitionManager +} diff --git a/internal/gitaly/service/hook/pack_objects_test.go b/internal/gitaly/service/hook/pack_objects_test.go index 4c2de55abc..80db2a339c 100644 --- a/internal/gitaly/service/hook/pack_objects_test.go +++ b/internal/gitaly/service/hook/pack_objects_test.go @@ -778,8 +778,6 @@ func TestPackObjects_concurrencyLimit(t *testing.T) { func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { t.Parallel() - cfg := cfgWithCache(t, 0) - args := []string{"pack-objects", "--revs", "--thin", "--stdout", "--progress", "--delta-base-offset"} for _, tc := range []struct { @@ -957,6 +955,8 @@ func testPackObjectsConcurrency(t *testing.T, ctx context.Context) { }, } { t.Run(tc.desc, func(t *testing.T) { + cfg := cfgWithCache(t, 0) + ticker := helper.NewManualTicker() monitor := limithandler.NewPackObjectsConcurrencyMonitor( cfg.Prometheus.GRPCLatencyBuckets, diff --git a/internal/gitaly/service/operations/branches_test.go b/internal/gitaly/service/operations/branches_test.go index a11bedc9a4..f34fa9410f 100644 --- a/internal/gitaly/service/operations/branches_test.go +++ b/internal/gitaly/service/operations/branches_test.go @@ -143,6 +143,7 @@ func TestUserCreateBranch_Transactions(t *testing.T) { deps.GetGitCmdFactory(), deps.GetCatfileCache(), deps.GetUpdaterWithHooks(), + deps.GetPartitionManager(), )) gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps.GetHookManager(), deps.GetGitCmdFactory(), deps.GetPackObjectsCache(), deps.GetPackObjectsConcurrencyTracker(), deps.GetPackObjectsLimiter())) // Praefect proxy execution disabled as praefect runs only on the UNIX socket, but @@ -815,6 +816,7 @@ func TestUserDeleteBranch_transaction(t *testing.T) { deps.GetGitCmdFactory(), deps.GetCatfileCache(), deps.GetUpdaterWithHooks(), + deps.GetPartitionManager(), )) }) diff --git a/internal/gitaly/service/operations/server.go b/internal/gitaly/service/operations/server.go index 3ab231f8a9..b9be0ff241 100644 --- a/internal/gitaly/service/operations/server.go +++ b/internal/gitaly/service/operations/server.go @@ -10,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" @@ -21,14 +22,15 @@ import ( //nolint:revive // This is unintentionally missing documentation. type Server struct { gitalypb.UnimplementedOperationServiceServer - hookManager hook.Manager - txManager transaction.Manager - locator storage.Locator - conns *client.Pool - git2goExecutor *git2go.Executor - gitCmdFactory git.CommandFactory - catfileCache catfile.Cache - updater *updateref.UpdaterWithHooks + hookManager hook.Manager + txManager transaction.Manager + locator storage.Locator + conns *client.Pool + git2goExecutor *git2go.Executor + gitCmdFactory git.CommandFactory + catfileCache catfile.Cache + updater *updateref.UpdaterWithHooks + partitionManager *gitaly.PartitionManager } // NewServer creates a new instance of a grpc OperationServiceServer @@ -41,16 +43,18 @@ func NewServer( gitCmdFactory git.CommandFactory, catfileCache catfile.Cache, updater *updateref.UpdaterWithHooks, + partitionManager *gitaly.PartitionManager, ) *Server { return &Server{ - hookManager: hookManager, - txManager: txManager, - locator: locator, - conns: conns, - git2goExecutor: git2goExecutor, - gitCmdFactory: gitCmdFactory, - catfileCache: catfileCache, - updater: updater, + hookManager: hookManager, + txManager: txManager, + locator: locator, + conns: conns, + git2goExecutor: git2goExecutor, + gitCmdFactory: gitCmdFactory, + catfileCache: catfileCache, + updater: updater, + partitionManager: partitionManager, } } diff --git a/internal/gitaly/service/operations/testhelper_test.go b/internal/gitaly/service/operations/testhelper_test.go index ff6ada770a..8cdab7afc2 100644 --- a/internal/gitaly/service/operations/testhelper_test.go +++ b/internal/gitaly/service/operations/testhelper_test.go @@ -101,6 +101,7 @@ func runOperationServiceServer(tb testing.TB, cfg config.Cfg, options ...testser deps.GetGitCmdFactory(), deps.GetCatfileCache(), deps.GetUpdaterWithHooks(), + deps.GetPartitionManager(), ) gitalypb.RegisterOperationServiceServer(srv, operationServer) diff --git a/internal/gitaly/service/setup/register.go b/internal/gitaly/service/setup/register.go index 50f0482c19..c83928881e 100644 --- a/internal/gitaly/service/setup/register.go +++ b/internal/gitaly/service/setup/register.go @@ -83,6 +83,7 @@ func RegisterAll(srv *grpc.Server, deps *service.Dependencies) { deps.GetGitCmdFactory(), deps.GetCatfileCache(), deps.GetUpdaterWithHooks(), + deps.GetPartitionManager(), )) gitalypb.RegisterRefServiceServer(srv, ref.NewServer( deps.GetLocator(), diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index ad4a936c77..4326953020 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -17,7 +17,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git2go" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/client" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config/auth" @@ -348,6 +350,18 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * gsd.housekeepingManager = housekeeping.NewManager(cfg.Prometheus, gsd.txMgr) } + var partitionManager *gitaly.PartitionManager + if _, ok := os.LookupEnv("GITALY_TEST_WAL"); ok { + var err error + partitionManager, err = gitaly.NewPartitionManager( + cfg.Storages, + localrepo.NewFactory(gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), + gsd.logger, + ) + require.NoError(tb, err) + tb.Cleanup(partitionManager.Stop) + } + return &service.Dependencies{ Cfg: cfg, ClientPool: gsd.conns, @@ -366,6 +380,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * Git2goExecutor: gsd.git2goExecutor, UpdaterWithHooks: gsd.updaterWithHooks, HousekeepingManager: gsd.housekeepingManager, + PartitionManager: partitionManager, } } -- GitLab From 4702511ffb7ab317b8fdfed7ece570777996526f Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 15 May 2023 20:26:10 +0300 Subject: [PATCH 11/17] Integrate WAL into UserCommitFiles for testing This commit changes UserCommitFiles to use the WAL for comitting its writes if enabled. When enabled, a transaction is begun through PartitionManager for the repository, the objects are written into the transaction's quarantine directory, and ultimately both reference changes and objects are committed through TransactionManager. WAL is enabled if PartitionManager is set on the server instance. For now it is only set when the `GITALY_TEST_WAL` environment variable is set when running the test. A Makefile target will later be added for testing the WAL and a CI job. There are slight behavior changes with the hooks which are left be for now: - 'reference-transaction prepared' is currently not acquiring locks nor verifying the reference changes. We'll likely have to extend the TransactionManager to handle this before we can roll WAL out given this affects Praefect's behavior. - 'update' hook is invoked with the objects still in the quarantine as opposed to the non-WAL variant. This will likely have to remain so. The objects are only written into the repository when the transaction is committed. For local execution, this shouldn't matter much since the quarantine directory is configured in the invocation. For loopback calls like what Rails does with pre-receive hook, this would be a breaking change. --- .../hook/updateref/update_with_hooks.go | 135 ++++++++++++------ .../hook/updateref/update_with_hooks_test.go | 5 +- .../service/conflicts/resolve_conflicts.go | 1 + .../gitaly/service/operations/apply_patch.go | 2 +- .../gitaly/service/operations/branches.go | 6 +- .../gitaly/service/operations/cherry_pick.go | 2 +- .../gitaly/service/operations/commit_files.go | 7 +- internal/gitaly/service/operations/merge.go | 4 +- internal/gitaly/service/operations/rebase.go | 1 + internal/gitaly/service/operations/revert.go | 2 +- internal/gitaly/service/operations/server.go | 30 ++++ .../gitaly/service/operations/submodules.go | 1 + internal/gitaly/service/operations/tags.go | 4 +- .../service/operations/update_with_hooks.go | 4 +- 14 files changed, 148 insertions(+), 56 deletions(-) diff --git a/internal/gitaly/hook/updateref/update_with_hooks.go b/internal/gitaly/hook/updateref/update_with_hooks.go index 2c7f15c99f..12e4f2b917 100644 --- a/internal/gitaly/hook/updateref/update_with_hooks.go +++ b/internal/gitaly/hook/updateref/update_with_hooks.go @@ -14,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/git/updateref" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" @@ -145,8 +146,12 @@ func NewUpdaterWithHooks( // with the quarantined repository as returned by the quarantine structure. If these hooks succeed, // quarantined objects will be migrated and all subsequent hooks are executed via the unquarantined // repository. +// +// If transaction is set, the actual update is done through the WAL by calling Commit on it. The +// quarantine directory is taken from the transaction, and the other quarantineDir parameter is ignored. func (u *UpdaterWithHooks) UpdateReference( ctx context.Context, + tx *gitaly.Transaction, repoProto *gitalypb.Repository, user *gitalypb.User, quarantineDir *quarantine.Dir, @@ -192,7 +197,22 @@ func (u *UpdaterWithHooks) UpdateReference( // then subsequently passed to Rails, which can use the quarantine directory to more // efficiently query which objects are new. quarantinedRepo := repoProto - if quarantineDir != nil { + if tx != nil { + quarantineDir, err := tx.QuarantineDirectory() + if err != nil { + return fmt.Errorf("quarantine directory: %w", err) + } + + repoPath, err := repo.Path() + if err != nil { + return fmt.Errorf("repo path: %w", err) + } + + quarantinedRepo, err = quarantine.Apply(repoPath, repoProto, quarantineDir) + if err != nil { + return fmt.Errorf("quarantine repo: %w", err) + } + } else if quarantineDir != nil { quarantinedRepo = quarantineDir.QuarantinedRepo() } @@ -218,6 +238,9 @@ func (u *UpdaterWithHooks) UpdateReference( // We only need to update the hooks payload to the unquarantined repo in case we // had a quarantine environment. Otherwise, the initial hooks payload is for the // real repository anyway. + // + // With WAL, the update and reference transaction hooks must still execute with the quarantine + // as the objects are only written into the repository once the transaction has been committed. hooksPayload, err = git.NewHooksPayload(u.cfg, repoProto, objectHash, transaction, &receiveHooksPayload, git.ReceivePackHooks, featureflag.FromContext(ctx)).Env() if err != nil { return fmt.Errorf("constructing quarantined hooks payload: %w", err) @@ -228,52 +251,84 @@ func (u *UpdaterWithHooks) UpdateReference( return fmt.Errorf("running update hooks: %w", wrapHookError(err, git.UpdateHook, stdout.String(), stderr.String())) } - // We are already manually invoking the reference-transaction hook, so there is no need to - // set up hooks again here. One could argue that it would be easier to just have git handle - // execution of the reference-transaction hook. But unfortunately, it has proven to be - // problematic: if we queue a deletion, and the reference to be deleted exists both as - // packed-ref and as loose ref, then we would see two transactions: first a transaction - // deleting the packed-ref which would otherwise get unshadowed by deleting the loose ref, - // and only then do we see the deletion of the loose ref. So this depends on how well a repo - // is packed, which is obviously a bad thing as Gitaly nodes may be differently packed. We - // thus continue to manually drive the reference-transaction hook here, which doesn't have - // this problem. - updater, err := updateref.New(ctx, repo, updateref.WithDisabledTransactions()) - if err != nil { - return fmt.Errorf("creating updater: %w", err) - } + if tx != nil { + // The prepared step deviates from the non-WAL behavior as it doesn't verify nor lock the references + // prior to casting the prepared vote. + if err := u.hookManager.ReferenceTransactionHook(ctx, hook.ReferenceTransactionPrepared, []string{hooksPayload}, strings.NewReader(changes)); err != nil { + return fmt.Errorf("executing preparatory reference-transaction hook: %w", err) + } - // We need to explicitly cancel the update here such that we release the lock when this - // function exits if there is any error between locking and committing. - defer func() { _ = updater.Close() }() + tx.UpdateReferences(gitaly.ReferenceUpdates{ + reference: {OldOID: oldrev, NewOID: newrev}, + }) + + if err := tx.Commit(ctx); err != nil { + var errReferenceVerification gitaly.ReferenceVerificationError + if errors.As(err, &errReferenceVerification) { + return Error{ + Reference: errReferenceVerification.ReferenceName, + OldOID: oldrev, + NewOID: newrev, + } + } + + return fmt.Errorf("commit: %w", err) + } - if err := updater.Start(); err != nil { - return fmt.Errorf("start reference transaction: %w", err) - } + // The quarantined objects are written into the repository following a commit and the quarantine directory + // removed. Replace the quarantined repository with the normal repository in the payload. + hooksPayload, err = git.NewHooksPayload(u.cfg, repoProto, objectHash, transaction, &receiveHooksPayload, git.ReceivePackHooks, featureflag.FromContext(ctx)).Env() + if err != nil { + return fmt.Errorf("constructing quarantined hooks payload: %w", err) + } + } else { + // We are already manually invoking the reference-transaction hook, so there is no need to + // set up hooks again here. One could argue that it would be easier to just have git handle + // execution of the reference-transaction hook. But unfortunately, it has proven to be + // problematic: if we queue a deletion, and the reference to be deleted exists both as + // packed-ref and as loose ref, then we would see two transactions: first a transaction + // deleting the packed-ref which would otherwise get unshadowed by deleting the loose ref, + // and only then do we see the deletion of the loose ref. So this depends on how well a repo + // is packed, which is obviously a bad thing as Gitaly nodes may be differently packed. We + // thus continue to manually drive the reference-transaction hook here, which doesn't have + // this problem. + updater, err := updateref.New(ctx, repo, updateref.WithDisabledTransactions()) + if err != nil { + return fmt.Errorf("creating updater: %w", err) + } - if err := updater.Update(reference, newrev, oldrev); err != nil { - return fmt.Errorf("queueing ref update: %w", err) - } + // We need to explicitly cancel the update here such that we release the lock when this + // function exits if there is any error between locking and committing. + defer func() { _ = updater.Close() }() - // We need to lock the reference before executing the reference-transaction hook such that - // there cannot be any concurrent modification. - if err := updater.Prepare(); err != nil { - return Error{ - Reference: reference, - OldOID: oldrev, - NewOID: newrev, + if err := updater.Start(); err != nil { + return fmt.Errorf("start reference transaction: %w", err) } - } - if err := u.hookManager.ReferenceTransactionHook(ctx, hook.ReferenceTransactionPrepared, []string{hooksPayload}, strings.NewReader(changes)); err != nil { - return fmt.Errorf("executing preparatory reference-transaction hook: %w", err) - } + if err := updater.Update(reference, newrev, oldrev); err != nil { + return fmt.Errorf("queueing ref update: %w", err) + } + + // We need to lock the reference before executing the reference-transaction hook such that + // there cannot be any concurrent modification. + if err := updater.Prepare(); err != nil { + return Error{ + Reference: reference, + OldOID: oldrev, + NewOID: newrev, + } + } + + if err := u.hookManager.ReferenceTransactionHook(ctx, hook.ReferenceTransactionPrepared, []string{hooksPayload}, strings.NewReader(changes)); err != nil { + return fmt.Errorf("executing preparatory reference-transaction hook: %w", err) + } - if err := updater.Commit(); err != nil { - return Error{ - Reference: reference, - OldOID: oldrev, - NewOID: newrev, + if err := updater.Commit(); err != nil { + return Error{ + Reference: reference, + OldOID: oldrev, + NewOID: newrev, + } } } diff --git a/internal/gitaly/hook/updateref/update_with_hooks_test.go b/internal/gitaly/hook/updateref/update_with_hooks_test.go index 31b74f902c..3dd06a6b4d 100644 --- a/internal/gitaly/hook/updateref/update_with_hooks_test.go +++ b/internal/gitaly/hook/updateref/update_with_hooks_test.go @@ -87,7 +87,7 @@ func TestUpdaterWithHooks_UpdateReference_invalidParameters(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - err := updater.UpdateReference(ctx, repo, gittest.TestUser, nil, tc.ref, tc.newRev, tc.oldRev) + err := updater.UpdateReference(ctx, nil, repo, gittest.TestUser, nil, tc.ref, tc.newRev, tc.oldRev) require.Equal(t, tc.expectedErr, err) }) } @@ -279,7 +279,7 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { gitCmdFactory := gittest.NewCommandFactory(t, cfg) updater := updateref.NewUpdaterWithHooks(cfg, config.NewLocator(cfg), hookManager, gitCmdFactory, nil) - err := updater.UpdateReference(ctx, repo, gittest.TestUser, nil, git.ReferenceName("refs/heads/main"), gittest.DefaultObjectHash.ZeroOID, commitID) + err := updater.UpdateReference(ctx, nil, repo, gittest.TestUser, nil, git.ReferenceName("refs/heads/main"), gittest.DefaultObjectHash.ZeroOID, commitID) if tc.expectedErr == "" { require.NoError(t, err) } else { @@ -387,6 +387,7 @@ func TestUpdaterWithHooks_quarantine(t *testing.T) { require.NoError(t, updateref.NewUpdaterWithHooks(cfg, locator, hookManager, gitCmdFactory, nil).UpdateReference( ctx, + nil, repoProto, &gitalypb.User{ GlId: "1234", diff --git a/internal/gitaly/service/conflicts/resolve_conflicts.go b/internal/gitaly/service/conflicts/resolve_conflicts.go index 3ff55ecbc2..2e6c6d9fb6 100644 --- a/internal/gitaly/service/conflicts/resolve_conflicts.go +++ b/internal/gitaly/service/conflicts/resolve_conflicts.go @@ -198,6 +198,7 @@ func (s *server) resolveConflicts(header *gitalypb.ResolveConflictsRequestHeader if err := s.updater.UpdateReference( ctx, + nil, header.Repository, header.User, quarantineDir, diff --git a/internal/gitaly/service/operations/apply_patch.go b/internal/gitaly/service/operations/apply_patch.go index 9cb9a5ee6d..7c181c9c11 100644 --- a/internal/gitaly/service/operations/apply_patch.go +++ b/internal/gitaly/service/operations/apply_patch.go @@ -193,7 +193,7 @@ func (s *Server) userApplyPatch(ctx context.Context, header *gitalypb.UserApplyP } } - if err := s.updateReferenceWithHooks(ctx, header.Repository, header.User, nil, targetBranch, patchedCommit, currentCommit); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, header.Repository, header.User, nil, targetBranch, patchedCommit, currentCommit); err != nil { return fmt.Errorf("update reference: %w", err) } diff --git a/internal/gitaly/service/operations/branches.go b/internal/gitaly/service/operations/branches.go index a5559ceea3..67b3c9faab 100644 --- a/internal/gitaly/service/operations/branches.go +++ b/internal/gitaly/service/operations/branches.go @@ -58,7 +58,7 @@ func (s *Server) UserCreateBranch(ctx context.Context, req *gitalypb.UserCreateB referenceName := git.NewReferenceNameFromBranchName(string(req.BranchName)) - if err := s.updateReferenceWithHooks(ctx, req.GetRepository(), req.User, quarantineDir, referenceName, startPointOID, git.ObjectHashSHA1.ZeroOID); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.GetRepository(), req.User, quarantineDir, referenceName, startPointOID, git.ObjectHashSHA1.ZeroOID); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { @@ -140,7 +140,7 @@ func (s *Server) UserUpdateBranch(ctx context.Context, req *gitalypb.UserUpdateB return nil, err } - if err := s.updateReferenceWithHooks(ctx, req.GetRepository(), req.User, quarantineDir, referenceName, newOID, oldOID); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.GetRepository(), req.User, quarantineDir, referenceName, newOID, oldOID); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return &gitalypb.UserUpdateBranchResponse{ @@ -203,7 +203,7 @@ func (s *Server) UserDeleteBranch(ctx context.Context, req *gitalypb.UserDeleteB } } - if err := s.updateReferenceWithHooks(ctx, req.Repository, req.User, nil, referenceName, git.ObjectHashSHA1.ZeroOID, referenceValue); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.Repository, req.User, nil, referenceName, git.ObjectHashSHA1.ZeroOID, referenceValue); err != nil { var notAllowedError hook.NotAllowedError var customHookErr updateref.CustomHookError var updateRefError updateref.Error diff --git a/internal/gitaly/service/operations/cherry_pick.go b/internal/gitaly/service/operations/cherry_pick.go index 06b3f52be9..797ad3ecf8 100644 --- a/internal/gitaly/service/operations/cherry_pick.go +++ b/internal/gitaly/service/operations/cherry_pick.go @@ -145,7 +145,7 @@ func (s *Server) UserCherryPick(ctx context.Context, req *gitalypb.UserCherryPic } } - if err := s.updateReferenceWithHooks(ctx, req.GetRepository(), req.User, quarantineDir, referenceName, newrev, oldrev); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.GetRepository(), req.User, quarantineDir, referenceName, newrev, oldrev); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return nil, structerr.NewFailedPrecondition("access check failed").WithDetail( diff --git a/internal/gitaly/service/operations/commit_files.go b/internal/gitaly/service/operations/commit_files.go index 32804947bf..588de9e9a2 100644 --- a/internal/gitaly/service/operations/commit_files.go +++ b/internal/gitaly/service/operations/commit_files.go @@ -126,10 +126,11 @@ func validatePath(rootPath, relPath string) (string, error) { } func (s *Server) userCommitFiles(ctx context.Context, header *gitalypb.UserCommitFilesRequestHeader, stream gitalypb.OperationService_UserCommitFilesServer) error { - quarantineDir, quarantineRepo, err := s.quarantinedRepo(ctx, header.GetRepository()) + tx, quarantineDir, quarantineRepo, clean, err := s.begin(ctx, header.GetRepository()) if err != nil { - return err + return fmt.Errorf("begin: %w", err) } + defer func() { _ = clean() }() repoPath, err := quarantineRepo.Path() if err != nil { @@ -339,7 +340,7 @@ func (s *Server) userCommitFiles(ctx context.Context, header *gitalypb.UserCommi } } - if err := s.updateReferenceWithHooks(ctx, header.GetRepository(), header.User, quarantineDir, targetBranchName, commitID, oldRevision); err != nil { + if err := s.updateReferenceWithHooks(ctx, tx, header.GetRepository(), header.User, quarantineDir, targetBranchName, commitID, oldRevision); err != nil { if errors.As(err, &updateref.Error{}) { return structerr.NewFailedPrecondition("%w", err) } diff --git a/internal/gitaly/service/operations/merge.go b/internal/gitaly/service/operations/merge.go index ea5194fa44..d2c6d8c9cf 100644 --- a/internal/gitaly/service/operations/merge.go +++ b/internal/gitaly/service/operations/merge.go @@ -193,7 +193,7 @@ func (s *Server) UserMergeBranch(stream gitalypb.OperationService_UserMergeBranc return structerr.NewFailedPrecondition("merge aborted by client") } - if err := s.updateReferenceWithHooks(ctx, firstRequest.GetRepository(), firstRequest.User, quarantineDir, referenceName, mergeOID, revision); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, firstRequest.GetRepository(), firstRequest.User, quarantineDir, referenceName, mergeOID, revision); err != nil { var notAllowedError hook.NotAllowedError var customHookErr updateref.CustomHookError var updateRefError updateref.Error @@ -331,7 +331,7 @@ func (s *Server) UserFFBranch(ctx context.Context, in *gitalypb.UserFFBranchRequ return nil, structerr.NewFailedPrecondition("not fast forward") } - if err := s.updateReferenceWithHooks(ctx, in.GetRepository(), in.User, quarantineDir, referenceName, commitID, revision); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, in.GetRepository(), in.User, quarantineDir, referenceName, commitID, revision); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return &gitalypb.UserFFBranchResponse{ diff --git a/internal/gitaly/service/operations/rebase.go b/internal/gitaly/service/operations/rebase.go index 9441120cc7..9e2282a1eb 100644 --- a/internal/gitaly/service/operations/rebase.go +++ b/internal/gitaly/service/operations/rebase.go @@ -110,6 +110,7 @@ func (s *Server) UserRebaseConfirmable(stream gitalypb.OperationService_UserReba if err := s.updateReferenceWithHooks( ctx, + nil, header.GetRepository(), header.User, quarantineDir, diff --git a/internal/gitaly/service/operations/revert.go b/internal/gitaly/service/operations/revert.go index 48bc6bb512..8f2bd15b22 100644 --- a/internal/gitaly/service/operations/revert.go +++ b/internal/gitaly/service/operations/revert.go @@ -120,7 +120,7 @@ func (s *Server) UserRevert(ctx context.Context, req *gitalypb.UserRevertRequest } } - if err := s.updateReferenceWithHooks(ctx, req.GetRepository(), req.User, quarantineDir, referenceName, newrev, oldrev); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.GetRepository(), req.User, quarantineDir, referenceName, newrev, oldrev); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return &gitalypb.UserRevertResponse{ diff --git a/internal/gitaly/service/operations/server.go b/internal/gitaly/service/operations/server.go index b9be0ff241..b379dacd6f 100644 --- a/internal/gitaly/service/operations/server.go +++ b/internal/gitaly/service/operations/server.go @@ -2,6 +2,7 @@ package operations import ( "context" + "fmt" "gitlab.com/gitlab-org/gitaly/v16/client" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -62,6 +63,35 @@ func (s *Server) localrepo(repo repository.GitRepo) *localrepo.Repo { return localrepo.New(s.locator, s.gitCmdFactory, s.catfileCache, repo) } +func (s *Server) begin(ctx context.Context, repo *gitalypb.Repository) (_ *gitaly.Transaction, _ *quarantine.Dir, _ *localrepo.Repo, _ func() error, returnedErr error) { + if s.partitionManager != nil { + tx, err := s.partitionManager.Begin(ctx, repo) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("begin: %w", err) + } + defer func() { + if returnedErr != nil { + _ = tx.Rollback() + } + }() + + quarantineDir, err := tx.QuarantineDirectory() + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("quarantine directory: %w", err) + } + + quarantineRepo, err := s.localrepo(repo).Quarantine(quarantineDir) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("quarantine repo: %w", err) + } + + return tx, nil, quarantineRepo, tx.Rollback, nil + } + + quarantineDir, quarantinedRepo, err := s.quarantinedRepo(ctx, repo) + return nil, quarantineDir, quarantinedRepo, func() error { return nil }, err +} + func (s *Server) quarantinedRepo( ctx context.Context, repo *gitalypb.Repository, ) (*quarantine.Dir, *localrepo.Repo, error) { diff --git a/internal/gitaly/service/operations/submodules.go b/internal/gitaly/service/operations/submodules.go index 45d93dc6f7..af10c681d7 100644 --- a/internal/gitaly/service/operations/submodules.go +++ b/internal/gitaly/service/operations/submodules.go @@ -258,6 +258,7 @@ func (s *Server) userUpdateSubmodule(ctx context.Context, req *gitalypb.UserUpda if err := s.updateReferenceWithHooks( ctx, + nil, req.GetRepository(), req.GetUser(), quarantineDir, diff --git a/internal/gitaly/service/operations/tags.go b/internal/gitaly/service/operations/tags.go index d7518d2d0b..b328ade667 100644 --- a/internal/gitaly/service/operations/tags.go +++ b/internal/gitaly/service/operations/tags.go @@ -63,7 +63,7 @@ func (s *Server) UserDeleteTag(ctx context.Context, req *gitalypb.UserDeleteTagR } } - if err := s.updateReferenceWithHooks(ctx, req.Repository, req.User, nil, referenceName, git.ObjectHashSHA1.ZeroOID, revision); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.Repository, req.User, nil, referenceName, git.ObjectHashSHA1.ZeroOID, revision); err != nil { var customHookErr updateref.CustomHookError if errors.As(err, &customHookErr) { return &gitalypb.UserDeleteTagResponse{ @@ -160,7 +160,7 @@ func (s *Server) UserCreateTag(ctx context.Context, req *gitalypb.UserCreateTagR ) } - if err := s.updateReferenceWithHooks(ctx, req.Repository, req.User, quarantineDir, referenceName, tagID, git.ObjectHashSHA1.ZeroOID); err != nil { + if err := s.updateReferenceWithHooks(ctx, nil, req.Repository, req.User, quarantineDir, referenceName, tagID, git.ObjectHashSHA1.ZeroOID); err != nil { var notAllowedError hook.NotAllowedError var customHookErr updateref.CustomHookError var updateRefError updateref.Error diff --git a/internal/gitaly/service/operations/update_with_hooks.go b/internal/gitaly/service/operations/update_with_hooks.go index 73df5e1532..4965edfc44 100644 --- a/internal/gitaly/service/operations/update_with_hooks.go +++ b/internal/gitaly/service/operations/update_with_hooks.go @@ -5,11 +5,13 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func (s *Server) updateReferenceWithHooks( ctx context.Context, + tx *gitaly.Transaction, repo *gitalypb.Repository, user *gitalypb.User, quarantine *quarantine.Dir, @@ -17,5 +19,5 @@ func (s *Server) updateReferenceWithHooks( newrev, oldrev git.ObjectID, pushOptions ...string, ) error { - return s.updater.UpdateReference(ctx, repo, user, quarantine, reference, newrev, oldrev, pushOptions...) + return s.updater.UpdateReference(ctx, tx, repo, user, quarantine, reference, newrev, oldrev, pushOptions...) } -- GitLab From f553046927a15ebbb21671d08ee973e92cd2e7f3 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 15 May 2023 20:59:29 +0300 Subject: [PATCH 12/17] Add makefile targets for testing with WAL This commit adds 'test-wal' and 'test-with-praefect-wal' targets to the makefile for running tests with the WAL enabled. --- Makefile | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Makefile b/Makefile index cb8e541403..10cb3a293f 100644 --- a/Makefile +++ b/Makefile @@ -400,6 +400,16 @@ bench: ${BENCHMARK_REPO} prepare-tests test-with-praefect: prepare-tests ${Q}GITALY_TEST_WITH_PRAEFECT=YesPlease $(call run_go_tests) +.PHONY: test-wal +## Run Go tests with write-ahead logging enabled. +test-wal: export GITALY_TEST_WAL = YesPlease +test-wal: test + +.PHONY: test-with-praefect-wal +## Run Go tests with write-ahead logging and Praefect enabled. +test-with-praefect-wal: export GITALY_TEST_WAL = YesPlease +test-with-praefect-wal: test-with-praefect + .PHONY: race-go ## Run Go tests with race detection enabled. race-go: override TEST_OPTIONS := ${TEST_OPTIONS} -race -- GitLab From 5cdcbe2c562520d77a1c2b7da8951cd2a501f109 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Mon, 15 May 2023 21:05:21 +0300 Subject: [PATCH 13/17] Add CI jobs for testing with write-ahead logging This commit adds the write-ahead log test targets to our CI jobs so they get tested as well. --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f62607ab4b..398dadd283 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -203,7 +203,7 @@ test: # using bundled Git binaries. - GO_VERSION: [ "1.18", "1.19" ] TEST_TARGET: test - - TEST_TARGET: [ test-with-praefect, race-go ] + - TEST_TARGET: [ test-with-praefect, race-go, test-wal, test-with-praefect-wal] # We also verify that things work as expected with a non-bundled Git # version matching our minimum required Git version. - TEST_TARGET: test -- GitLab From 76ef4efefc7a93c92422a75d98a0339d10589035 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 16 May 2023 17:10:56 +0300 Subject: [PATCH 14/17] Wire Transaction to the HookManager Transaction has a read snapshot that includes a specific version of custom hooks. TransactionManager is managing hooks via multiversion concurrency control, that is, hooks are always written as an entirely new version and never update in place. The HookManager needs to take in the Transaction so it can determine which version of the custom hooks to execute. This commit wires the Transaction to the HookManager but doesn't do any logic changes yet. --- internal/gitaly/hook/custom.go | 3 +- internal/gitaly/hook/custom_test.go | 12 +++--- internal/gitaly/hook/disabled_manager.go | 7 +-- internal/gitaly/hook/manager.go | 7 +-- internal/gitaly/hook/manager_mock.go | 31 ++++++------- internal/gitaly/hook/postreceive.go | 9 ++-- internal/gitaly/hook/postreceive_test.go | 6 +-- internal/gitaly/hook/prereceive.go | 9 ++-- internal/gitaly/hook/prereceive_test.go | 6 +-- internal/gitaly/hook/transactions_test.go | 6 +-- internal/gitaly/hook/update.go | 9 ++-- internal/gitaly/hook/update_test.go | 4 +- .../hook/updateref/update_with_hooks.go | 6 +-- .../hook/updateref/update_with_hooks_test.go | 43 ++++++++++--------- .../conflicts/resolve_conflicts_test.go | 3 +- internal/gitaly/service/hook/post_receive.go | 1 + internal/gitaly/service/hook/pre_receive.go | 1 + internal/gitaly/service/hook/update.go | 1 + .../service/smarthttp/receive_pack_test.go | 2 + 19 files changed, 90 insertions(+), 76 deletions(-) diff --git a/internal/gitaly/hook/custom.go b/internal/gitaly/hook/custom.go index 6ccf968a06..a659e81c57 100644 --- a/internal/gitaly/hook/custom.go +++ b/internal/gitaly/hook/custom.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/command" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "golang.org/x/sys/unix" @@ -45,7 +46,7 @@ func (e CustomHookError) Unwrap() error { // 3. /hooks/.d/* - global hooks // // Any files which are either not executable or have a trailing `~` are ignored. -func (m *GitLabHookManager) newCustomHooksExecutor(repo *gitalypb.Repository, hookName string) (customHooksExecutor, error) { +func (m *GitLabHookManager) newCustomHooksExecutor(tx *gitaly.Transaction, repo *gitalypb.Repository, hookName string) (customHooksExecutor, error) { repoPath, err := m.locator.GetRepoPath(repo) if err != nil { return nil, err diff --git a/internal/gitaly/hook/custom_test.go b/internal/gitaly/hook/custom_test.go index 4df9c6732a..930a41cbe1 100644 --- a/internal/gitaly/hook/custom_test.go +++ b/internal/gitaly/hook/custom_test.go @@ -165,7 +165,7 @@ func TestCustomHookPartialFailure(t *testing.T) { locator: config.NewLocator(cfg), } - caller, err := mgr.newCustomHooksExecutor(repo, tc.hook) + caller, err := mgr.newCustomHooksExecutor(nil, repo, tc.hook) require.NoError(t, err) var stdout, stderr bytes.Buffer @@ -224,7 +224,7 @@ func TestCustomHooksMultipleHooks(t *testing.T) { cfg: cfg, locator: config.NewLocator(cfg), } - hooksExecutor, err := mgr.newCustomHooksExecutor(repo, "update") + hooksExecutor, err := mgr.newCustomHooksExecutor(nil, repo, "update") require.NoError(t, err) var stdout, stderr bytes.Buffer @@ -297,7 +297,7 @@ func TestCustomHooksWithSymlinks(t *testing.T) { cfg: cfg, locator: config.NewLocator(cfg), } - hooksExecutor, err := mgr.newCustomHooksExecutor(repo, "update") + hooksExecutor, err := mgr.newCustomHooksExecutor(nil, repo, "update") require.NoError(t, err) var stdout, stderr bytes.Buffer @@ -331,7 +331,7 @@ func TestMultilineStdin(t *testing.T) { locator: config.NewLocator(cfg), } - hooksExecutor, err := mgr.newCustomHooksExecutor(repo, "pre-receive") + hooksExecutor, err := mgr.newCustomHooksExecutor(nil, repo, "pre-receive") require.NoError(t, err) changes := `old1 new1 ref1 @@ -370,7 +370,7 @@ func TestMultipleScriptsStdin(t *testing.T) { locator: config.NewLocator(cfg), } - hooksExecutor, err := mgr.newCustomHooksExecutor(repo, "pre-receive") + hooksExecutor, err := mgr.newCustomHooksExecutor(nil, repo, "pre-receive") require.NoError(t, err) changes := "oldref11 newref00 ref123445" @@ -399,7 +399,7 @@ func callAndVerifyHooks(t *testing.T, cfg config.Cfg, locator storage.Locator, r locator: locator, } - callHooks, err := mgr.newCustomHooksExecutor(repo, hookName) + callHooks, err := mgr.newCustomHooksExecutor(nil, repo, hookName) require.NoError(t, err) require.NoError(t, callHooks(ctx, args, env, bytes.NewBufferString(stdin), &stdout, &stderr)) diff --git a/internal/gitaly/hook/disabled_manager.go b/internal/gitaly/hook/disabled_manager.go index e1dee47576..7255f0bb5a 100644 --- a/internal/gitaly/hook/disabled_manager.go +++ b/internal/gitaly/hook/disabled_manager.go @@ -4,6 +4,7 @@ import ( "context" "io" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -11,17 +12,17 @@ import ( type DisabledManager struct{} // PreReceiveHook ignores its parameters and returns a nil error. -func (DisabledManager) PreReceiveHook(context.Context, *gitalypb.Repository, []string, []string, io.Reader, io.Writer, io.Writer) error { +func (DisabledManager) PreReceiveHook(context.Context, *gitaly.Transaction, *gitalypb.Repository, []string, []string, io.Reader, io.Writer, io.Writer) error { return nil } // PostReceiveHook ignores its parameters and returns a nil error. -func (DisabledManager) PostReceiveHook(context.Context, *gitalypb.Repository, []string, []string, io.Reader, io.Writer, io.Writer) error { +func (DisabledManager) PostReceiveHook(context.Context, *gitaly.Transaction, *gitalypb.Repository, []string, []string, io.Reader, io.Writer, io.Writer) error { return nil } // UpdateHook ignores its parameters and returns a nil error. -func (DisabledManager) UpdateHook(context.Context, *gitalypb.Repository, string, string, string, []string, io.Writer, io.Writer) error { +func (DisabledManager) UpdateHook(context.Context, *gitaly.Transaction, *gitalypb.Repository, string, string, string, []string, io.Writer, io.Writer) error { return nil } diff --git a/internal/gitaly/hook/manager.go b/internal/gitaly/hook/manager.go index b021d99090..6d1b2193e0 100644 --- a/internal/gitaly/hook/manager.go +++ b/internal/gitaly/hook/manager.go @@ -5,6 +5,7 @@ import ( "io" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" @@ -32,15 +33,15 @@ const ( type Manager interface { // PreReceiveHook executes the pre-receive Git hook and any installed custom hooks. stdin // must contain all references to be updated and match the format specified in githooks(5). - PreReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + PreReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error // PostReceiveHook executes the post-receive Git hook and any installed custom hooks. stdin // must contain all references to be updated and match the format specified in githooks(5). - PostReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + PostReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error // UpdateHook executes the update Git hook and any installed custom hooks for the reference // `ref` getting updated from `oldValue` to `newValue`. - UpdateHook(ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error + UpdateHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error // ReferenceTransactionHook executes the reference-transaction Git hook. stdin must contain // all references to be updated and match the format specified in githooks(5). diff --git a/internal/gitaly/hook/manager_mock.go b/internal/gitaly/hook/manager_mock.go index 51a7715e7b..b76b5eaab8 100644 --- a/internal/gitaly/hook/manager_mock.go +++ b/internal/gitaly/hook/manager_mock.go @@ -6,31 +6,32 @@ import ( "testing" "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) // MockManager mocks the Manager interface for Git hooks (e.g. pre-receive, post-receive) type MockManager struct { t *testing.T - preReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error - postReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error - update func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error + preReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + postReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + update func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error referenceTransaction func(t *testing.T, ctx context.Context, state ReferenceTransactionState, env []string, stdin io.Reader) error } var ( // NopPreReceive does nothing for the pre-receive hook - NopPreReceive = func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + NopPreReceive = func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil } // NopPostReceive does nothing for the post-receive hook - NopPostReceive = func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + NopPostReceive = func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil } // NopUpdate does nothing for the update hook - NopUpdate = func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + NopUpdate = func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { return nil } @@ -43,9 +44,9 @@ var ( // NewMockManager returns a mocked hook Manager with the stubbed functions func NewMockManager( t *testing.T, - preReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error, - postReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error, - update func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error, + preReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error, + postReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error, + update func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error, referenceTransaction func(t *testing.T, ctx context.Context, state ReferenceTransactionState, env []string, stdin io.Reader) error, ) Manager { return &MockManager{ @@ -58,24 +59,24 @@ func NewMockManager( } // PreReceiveHook executes the mocked pre-receive hook -func (m *MockManager) PreReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { +func (m *MockManager) PreReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { require.NotNil(m.t, m.preReceive, "preReceive not implemented") - return m.preReceive(m.t, ctx, repo, pushOptions, env, stdin, stdout, stderr) + return m.preReceive(m.t, ctx, tx, repo, pushOptions, env, stdin, stdout, stderr) } // PostReceiveHook executes the mocked post-receive hook -func (m *MockManager) PostReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { +func (m *MockManager) PostReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { require.NotNil(m.t, m.postReceive, "postReceive not implemented") - return m.postReceive(m.t, ctx, repo, pushOptions, env, stdin, stdout, stderr) + return m.postReceive(m.t, ctx, tx, repo, pushOptions, env, stdin, stdout, stderr) } // UpdateHook executes the mocked update hook -func (m *MockManager) UpdateHook(ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { +func (m *MockManager) UpdateHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { require.NotNil(m.t, m.update, "update not implemented") - return m.update(m.t, ctx, repo, ref, oldValue, newValue, env, stdout, stderr) + return m.update(m.t, ctx, tx, repo, ref, oldValue, newValue, env, stdout, stderr) } // ReferenceTransactionHook executes the mocked reference transaction hook diff --git a/internal/gitaly/hook/postreceive.go b/internal/gitaly/hook/postreceive.go index 094d09664b..b5847c45d2 100644 --- a/internal/gitaly/hook/postreceive.go +++ b/internal/gitaly/hook/postreceive.go @@ -11,6 +11,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -106,7 +107,7 @@ func printAlert(m gitlab.PostReceiveMessage, w io.Writer) error { } //nolint:revive // This is unintentionally missing documentation. -func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { payload, err := git.HooksPayloadFromEnv(env) if err != nil { return structerr.NewInternal("extracting hooks payload: %w", err) @@ -118,7 +119,7 @@ func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb. } if isPrimary(payload) { - if err := m.postReceiveHook(ctx, payload, repo, pushOptions, env, changes, stdout, stderr); err != nil { + if err := m.postReceiveHook(ctx, tx, payload, repo, pushOptions, env, changes, stdout, stderr); err != nil { ctxlogrus.Extract(ctx).WithError(err).Warn("stopping transaction because post-receive hook failed") // If the post-receive hook declines the push, then we need to stop any @@ -134,7 +135,7 @@ func (m *GitLabHookManager) PostReceiveHook(ctx context.Context, repo *gitalypb. return nil } -func (m *GitLabHookManager) postReceiveHook(ctx context.Context, payload git.HooksPayload, repo *gitalypb.Repository, pushOptions, env []string, stdin []byte, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) postReceiveHook(ctx context.Context, tx *gitaly.Transaction, payload git.HooksPayload, repo *gitalypb.Repository, pushOptions, env []string, stdin []byte, stdout, stderr io.Writer) error { if len(stdin) == 0 { return structerr.NewInternal("hook got no reference updates") } @@ -167,7 +168,7 @@ func (m *GitLabHookManager) postReceiveHook(ctx context.Context, payload git.Hoo return errors.New("") } - executor, err := m.newCustomHooksExecutor(repo, "post-receive") + executor, err := m.newCustomHooksExecutor(tx, repo, "post-receive") if err != nil { return structerr.NewInternal("creating custom hooks executor: %w", err) } diff --git a/internal/gitaly/hook/postreceive_test.go b/internal/gitaly/hook/postreceive_test.go index 08b12655a8..9b0cf5adc6 100644 --- a/internal/gitaly/hook/postreceive_test.go +++ b/internal/gitaly/hook/postreceive_test.go @@ -217,7 +217,7 @@ func TestPostReceive_customHook(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "post-receive", []byte(tc.hook)) var stdout, stderr bytes.Buffer - err = hookManager.PostReceiveHook(ctx, repo, tc.pushOptions, tc.env, strings.NewReader(tc.stdin), &stdout, &stderr) + err = hookManager.PostReceiveHook(ctx, nil, repo, tc.pushOptions, tc.env, strings.NewReader(tc.stdin), &stdout, &stderr) if tc.expectedErr != "" { require.Contains(t, err.Error(), tc.expectedErr) @@ -362,7 +362,7 @@ func TestPostReceive_gitlab(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "post-receive", []byte("#!/bin/sh\necho hook called\n")) var stdout, stderr bytes.Buffer - err = hookManager.PostReceiveHook(ctx, repo, tc.pushOptions, tc.env, strings.NewReader(tc.changes), &stdout, &stderr) + err = hookManager.PostReceiveHook(ctx, nil, repo, tc.pushOptions, tc.env, strings.NewReader(tc.changes), &stdout, &stderr) if tc.expectedErr != nil { require.Equal(t, tc.expectedErr, err) @@ -424,7 +424,7 @@ func TestPostReceive_quarantine(t *testing.T) { gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.ZeroOID)) var stdout, stderr bytes.Buffer - require.NoError(t, hookManager.PostReceiveHook(ctx, repo, nil, + require.NoError(t, hookManager.PostReceiveHook(ctx, nil, repo, nil, []string{env}, stdin, &stdout, &stderr)) if isQuarantined { diff --git a/internal/gitaly/hook/prereceive.go b/internal/gitaly/hook/prereceive.go index e857888814..e7e5d57f7b 100644 --- a/internal/gitaly/hook/prereceive.go +++ b/internal/gitaly/hook/prereceive.go @@ -11,6 +11,7 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/env" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" @@ -62,7 +63,7 @@ func getRelativeObjectDirs(repoPath, gitObjectDir, gitAlternateObjectDirs string // PreReceiveHook will try to authenticate the changes against the GitLab API. // If successful, it will execute custom hooks with the given parameters, push // options and environment. -func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { payload, err := git.HooksPayloadFromEnv(env) if err != nil { return structerr.NewInternal("extracting hooks payload: %w", err) @@ -75,7 +76,7 @@ func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, repo *gitalypb.R // Only the primary should execute hooks and increment reference counters. if isPrimary(payload) { - if err := m.preReceiveHook(ctx, payload, repo, pushOptions, env, changes, stdout, stderr); err != nil { + if err := m.preReceiveHook(ctx, tx, payload, repo, pushOptions, env, changes, stdout, stderr); err != nil { ctxlogrus.Extract(ctx).WithError(err).Warn("stopping transaction because pre-receive hook failed") // If the pre-receive hook declines the push, then we need to stop any @@ -91,7 +92,7 @@ func (m *GitLabHookManager) PreReceiveHook(ctx context.Context, repo *gitalypb.R return nil } -func (m *GitLabHookManager) preReceiveHook(ctx context.Context, payload git.HooksPayload, repo *gitalypb.Repository, pushOptions, envs []string, changes []byte, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) preReceiveHook(ctx context.Context, tx *gitaly.Transaction, payload git.HooksPayload, repo *gitalypb.Repository, pushOptions, envs []string, changes []byte, stdout, stderr io.Writer) error { repoPath, err := m.locator.GetRepoPath(repo) if err != nil { return structerr.NewInternal("getting repo path: %w", err) @@ -161,7 +162,7 @@ func (m *GitLabHookManager) preReceiveHook(ctx context.Context, payload git.Hook } } - executor, err := m.newCustomHooksExecutor(repo, "pre-receive") + executor, err := m.newCustomHooksExecutor(tx, repo, "pre-receive") if err != nil { return fmt.Errorf("creating custom hooks executor: %w", err) } diff --git a/internal/gitaly/hook/prereceive_test.go b/internal/gitaly/hook/prereceive_test.go index 9362b0ae31..2b479b3243 100644 --- a/internal/gitaly/hook/prereceive_test.go +++ b/internal/gitaly/hook/prereceive_test.go @@ -173,7 +173,7 @@ func TestPrereceive_customHooks(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "pre-receive", []byte(tc.hook)) var stdout, stderr bytes.Buffer - err = hookManager.PreReceiveHook(ctx, repo, tc.pushOptions, tc.env, strings.NewReader(tc.stdin), &stdout, &stderr) + err = hookManager.PreReceiveHook(ctx, nil, repo, tc.pushOptions, tc.env, strings.NewReader(tc.stdin), &stdout, &stderr) if tc.expectedErr != "" { require.Contains(t, err.Error(), tc.expectedErr) @@ -236,7 +236,7 @@ func TestPrereceive_quarantine(t *testing.T) { gittest.DefaultObjectHash.ZeroOID, gittest.DefaultObjectHash.ZeroOID)) var stdout, stderr bytes.Buffer - require.NoError(t, hookManager.PreReceiveHook(ctx, repo, nil, + require.NoError(t, hookManager.PreReceiveHook(ctx, nil, repo, nil, []string{env}, stdin, &stdout, &stderr)) if isQuarantined { @@ -396,7 +396,7 @@ func TestPrereceive_gitlab(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "pre-receive", []byte("#!/bin/sh\necho called\n")) var stdout, stderr bytes.Buffer - err = hookManager.PreReceiveHook(ctx, repo, nil, tc.env, strings.NewReader(tc.changes), &stdout, &stderr) + err = hookManager.PreReceiveHook(ctx, nil, repo, nil, tc.env, strings.NewReader(tc.changes), &stdout, &stderr) if tc.expectedErr != nil { require.Equal(t, tc.expectedErr, err) diff --git a/internal/gitaly/hook/transactions_test.go b/internal/gitaly/hook/transactions_test.go index da2e6d1d8f..d13e6c7e74 100644 --- a/internal/gitaly/hook/transactions_test.go +++ b/internal/gitaly/hook/transactions_test.go @@ -59,13 +59,13 @@ func TestHookManager_stopCalled(t *testing.T) { } preReceiveFunc := func(t *testing.T) error { - return hookManager.PreReceiveHook(ctx, repo, nil, []string{hooksPayload}, strings.NewReader("changes"), io.Discard, io.Discard) + return hookManager.PreReceiveHook(ctx, nil, repo, nil, []string{hooksPayload}, strings.NewReader("changes"), io.Discard, io.Discard) } updateFunc := func(t *testing.T) error { - return hookManager.UpdateHook(ctx, repo, "ref", gittest.DefaultObjectHash.ZeroOID.String(), gittest.DefaultObjectHash.ZeroOID.String(), []string{hooksPayload}, io.Discard, io.Discard) + return hookManager.UpdateHook(ctx, nil, repo, "ref", gittest.DefaultObjectHash.ZeroOID.String(), gittest.DefaultObjectHash.ZeroOID.String(), []string{hooksPayload}, io.Discard, io.Discard) } postReceiveFunc := func(t *testing.T) error { - return hookManager.PostReceiveHook(ctx, repo, nil, []string{hooksPayload}, strings.NewReader("changes"), io.Discard, io.Discard) + return hookManager.PostReceiveHook(ctx, nil, repo, nil, []string{hooksPayload}, strings.NewReader("changes"), io.Discard, io.Discard) } for _, tc := range []struct { diff --git a/internal/gitaly/hook/update.go b/internal/gitaly/hook/update.go index d8afa94dc4..acb729bb45 100644 --- a/internal/gitaly/hook/update.go +++ b/internal/gitaly/hook/update.go @@ -7,19 +7,20 @@ import ( "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) //nolint:revive // This is unintentionally missing documentation. -func (m *GitLabHookManager) UpdateHook(ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) UpdateHook(ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { payload, err := git.HooksPayloadFromEnv(env) if err != nil { return structerr.NewInternal("extracting hooks payload: %w", err) } if isPrimary(payload) { - if err := m.updateHook(ctx, payload, repo, ref, oldValue, newValue, env, stdout, stderr); err != nil { + if err := m.updateHook(ctx, tx, payload, repo, ref, oldValue, newValue, env, stdout, stderr); err != nil { ctxlogrus.Extract(ctx).WithError(err).Warn("stopping transaction because update hook failed") // If the update hook declines the push, then we need @@ -35,7 +36,7 @@ func (m *GitLabHookManager) UpdateHook(ctx context.Context, repo *gitalypb.Repos return nil } -func (m *GitLabHookManager) updateHook(ctx context.Context, payload git.HooksPayload, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { +func (m *GitLabHookManager) updateHook(ctx context.Context, tx *gitaly.Transaction, payload git.HooksPayload, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { objectHash, err := git.ObjectHashByFormat(payload.ObjectFormat) if err != nil { return fmt.Errorf("looking up object hash: %w", err) @@ -54,7 +55,7 @@ func (m *GitLabHookManager) updateHook(ctx context.Context, payload git.HooksPay return structerr.NewInternal("payload has no receive hooks info") } - executor, err := m.newCustomHooksExecutor(repo, "update") + executor, err := m.newCustomHooksExecutor(tx, repo, "update") if err != nil { return structerr.NewInternal("%w", err) } diff --git a/internal/gitaly/hook/update_test.go b/internal/gitaly/hook/update_test.go index 55d8acc060..707ae26c43 100644 --- a/internal/gitaly/hook/update_test.go +++ b/internal/gitaly/hook/update_test.go @@ -203,7 +203,7 @@ func TestUpdate_customHooks(t *testing.T) { gittest.WriteCustomHook(t, repoPath, "update", []byte(tc.hook)) var stdout, stderr bytes.Buffer - err = hookManager.UpdateHook(ctx, repo, tc.reference, tc.oldHash.String(), tc.newHash.String(), tc.env, &stdout, &stderr) + err = hookManager.UpdateHook(ctx, nil, repo, tc.reference, tc.oldHash.String(), tc.newHash.String(), tc.env, &stdout, &stderr) if tc.expectedErr != "" { require.Contains(t, err.Error(), tc.expectedErr) @@ -263,7 +263,7 @@ func TestUpdate_quarantine(t *testing.T) { require.NoError(t, err) var stdout, stderr bytes.Buffer - require.NoError(t, hookManager.UpdateHook(ctx, repo, "refs/heads/master", + require.NoError(t, hookManager.UpdateHook(ctx, nil, repo, "refs/heads/master", gittest.DefaultObjectHash.ZeroOID.String(), gittest.DefaultObjectHash.ZeroOID.String(), []string{env}, &stdout, &stderr)) if isQuarantined { diff --git a/internal/gitaly/hook/updateref/update_with_hooks.go b/internal/gitaly/hook/updateref/update_with_hooks.go index 12e4f2b917..362aa1576f 100644 --- a/internal/gitaly/hook/updateref/update_with_hooks.go +++ b/internal/gitaly/hook/updateref/update_with_hooks.go @@ -222,7 +222,7 @@ func (u *UpdaterWithHooks) UpdateReference( } var stdout, stderr bytes.Buffer - if err := u.hookManager.PreReceiveHook(ctx, quarantinedRepo, pushOptions, []string{hooksPayload}, strings.NewReader(changes), &stdout, &stderr); err != nil { + if err := u.hookManager.PreReceiveHook(ctx, tx, quarantinedRepo, pushOptions, []string{hooksPayload}, strings.NewReader(changes), &stdout, &stderr); err != nil { return fmt.Errorf("running pre-receive hooks: %w", wrapHookError(err, git.PreReceiveHook, stdout.String(), stderr.String())) } @@ -247,7 +247,7 @@ func (u *UpdaterWithHooks) UpdateReference( } } - if err := u.hookManager.UpdateHook(ctx, quarantinedRepo, reference.String(), oldrev.String(), newrev.String(), []string{hooksPayload}, &stdout, &stderr); err != nil { + if err := u.hookManager.UpdateHook(ctx, tx, quarantinedRepo, reference.String(), oldrev.String(), newrev.String(), []string{hooksPayload}, &stdout, &stderr); err != nil { return fmt.Errorf("running update hooks: %w", wrapHookError(err, git.UpdateHook, stdout.String(), stderr.String())) } @@ -336,7 +336,7 @@ func (u *UpdaterWithHooks) UpdateReference( return fmt.Errorf("executing committing reference-transaction hook: %w", err) } - if err := u.hookManager.PostReceiveHook(ctx, repoProto, pushOptions, []string{hooksPayload}, strings.NewReader(changes), &stdout, &stderr); err != nil { + if err := u.hookManager.PostReceiveHook(ctx, tx, repoProto, pushOptions, []string{hooksPayload}, strings.NewReader(changes), &stdout, &stderr); err != nil { // CustomHook errors are returned in case a custom hook has returned an error code. // The post-receive hook has special semantics though. Quoting githooks(5): // diff --git a/internal/gitaly/hook/updateref/update_with_hooks_test.go b/internal/gitaly/hook/updateref/update_with_hooks_test.go index 3dd06a6b4d..6d4d9eb2b0 100644 --- a/internal/gitaly/hook/updateref/update_with_hooks_test.go +++ b/internal/gitaly/hook/updateref/update_with_hooks_test.go @@ -13,6 +13,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/quarantine" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook/updateref" @@ -138,16 +139,16 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { referenceTransactionCalls := 0 testCases := []struct { desc string - preReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error - postReceive func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error - update func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error + preReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + postReceive func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error + update func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error referenceTransaction func(t *testing.T, ctx context.Context, state hook.ReferenceTransactionState, env []string, stdin io.Reader) error expectedErr string expectedRefDeletion bool }{ { desc: "successful update", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { changes, err := io.ReadAll(stdin) require.NoError(t, err) require.Equal(t, fmt.Sprintf("%s %s refs/heads/main\n", commitID, gittest.DefaultObjectHash.ZeroOID.String()), string(changes)) @@ -155,14 +156,14 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { requirePayload(t, env) return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { require.Equal(t, "refs/heads/main", ref) require.Equal(t, commitID.String(), oldValue) require.Equal(t, newValue, gittest.DefaultObjectHash.ZeroOID.String()) requirePayload(t, env) return nil }, - postReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + postReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { changes, err := io.ReadAll(stdin) require.NoError(t, err) require.Equal(t, fmt.Sprintf("%s %s refs/heads/main\n", commitID.String(), gittest.DefaultObjectHash.ZeroOID.String()), string(changes)) @@ -190,7 +191,7 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "prereceive error", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { _, err := io.Copy(stderr, strings.NewReader("prereceive failure")) require.NoError(t, err) return errors.New("ignored") @@ -199,17 +200,17 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "prereceive error from GitLab API response", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return hook.NotAllowedError{Message: "GitLab: file is locked"} }, expectedErr: "GitLab: file is locked", }, { desc: "update error", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { _, err := io.Copy(stderr, strings.NewReader("update failure")) require.NoError(t, err) return errors.New("ignored") @@ -218,10 +219,10 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "reference-transaction error", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { return nil }, referenceTransaction: func(t *testing.T, ctx context.Context, state hook.ReferenceTransactionState, env []string, stdin io.Reader) error { @@ -234,16 +235,16 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "post-receive custom hooks error is ignored", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { return nil }, referenceTransaction: func(t *testing.T, ctx context.Context, state hook.ReferenceTransactionState, env []string, stdin io.Reader) error { return nil }, - postReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + postReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { _, err := io.Copy(stderr, strings.NewReader("post-receive failure")) require.NoError(t, err) return hook.NewCustomHookError(errors.New("ignored")) @@ -252,16 +253,16 @@ func TestUpdaterWithHooks_UpdateReference(t *testing.T) { }, { desc: "post-receive non-custom hooks error returned", - preReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + preReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { return nil }, - update: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + update: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { return nil }, referenceTransaction: func(t *testing.T, ctx context.Context, state hook.ReferenceTransactionState, env []string, stdin io.Reader) error { return nil }, - postReceive: func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + postReceive: func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { _, err := io.Copy(stderr, strings.NewReader("post-receive failure")) require.NoError(t, err) return errors.New("uh oh") @@ -346,7 +347,7 @@ func TestUpdaterWithHooks_quarantine(t *testing.T) { hookExecutions := make(map[string]int) hookManager := hook.NewMockManager(t, // The pre-receive hook is not expected to have the object in the normal repo. - func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { expectQuarantined(t, env, true) testhelper.ProtoEqual(t, quarantine.QuarantinedRepo(), repo) hookExecutions["prereceive"]++ @@ -354,7 +355,7 @@ func TestUpdaterWithHooks_quarantine(t *testing.T) { }, // But the post-receive hook shall get the unquarantined repository as input, with // objects already having been migrated into the target repo. - func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { expectQuarantined(t, env, false) testhelper.ProtoEqual(t, repoProto, repo) hookExecutions["postreceive"]++ @@ -364,7 +365,7 @@ func TestUpdaterWithHooks_quarantine(t *testing.T) { // each reference that we're updating. As it is called immediately before the ref // gets queued for update, objects must have already been migrated or otherwise // updating the refs will fail due to missing objects. - func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { + func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, ref, oldValue, newValue string, env []string, stdout, stderr io.Writer) error { expectQuarantined(t, env, false) testhelper.ProtoEqual(t, quarantine.QuarantinedRepo(), repo) hookExecutions["update"]++ diff --git a/internal/gitaly/service/conflicts/resolve_conflicts_test.go b/internal/gitaly/service/conflicts/resolve_conflicts_test.go index ef18ff6a07..80037942ea 100644 --- a/internal/gitaly/service/conflicts/resolve_conflicts_test.go +++ b/internal/gitaly/service/conflicts/resolve_conflicts_test.go @@ -19,6 +19,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" @@ -60,7 +61,7 @@ var ( func TestSuccessfulResolveConflictsRequestHelper(t *testing.T) { var verifyFunc func(tb testing.TB, pushOptions []string, stdin io.Reader) - verifyFuncProxy := func(t *testing.T, ctx context.Context, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { + verifyFuncProxy := func(t *testing.T, ctx context.Context, tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer) error { // We use a proxy func here as we need to provide the hookManager dependency while creating the service but we only // know the commit IDs after the service is created. The proxy allows us to modify the verifyFunc after the service // is already built. diff --git a/internal/gitaly/service/hook/post_receive.go b/internal/gitaly/service/hook/post_receive.go index f504fb80d9..440325c0b2 100644 --- a/internal/gitaly/service/hook/post_receive.go +++ b/internal/gitaly/service/hook/post_receive.go @@ -48,6 +48,7 @@ func (s *server) PostReceiveHook(stream gitalypb.HookService_PostReceiveHookServ if err := s.manager.PostReceiveHook( stream.Context(), + nil, firstRequest.Repository, firstRequest.GetGitPushOptions(), firstRequest.GetEnvironmentVariables(), diff --git a/internal/gitaly/service/hook/pre_receive.go b/internal/gitaly/service/hook/pre_receive.go index 0765b88af3..ea2e69559a 100644 --- a/internal/gitaly/service/hook/pre_receive.go +++ b/internal/gitaly/service/hook/pre_receive.go @@ -38,6 +38,7 @@ func (s *server) PreReceiveHook(stream gitalypb.HookService_PreReceiveHookServer if err := s.manager.PreReceiveHook( stream.Context(), + nil, repository, firstRequest.GetGitPushOptions(), firstRequest.GetEnvironmentVariables(), diff --git a/internal/gitaly/service/hook/update.go b/internal/gitaly/service/hook/update.go index 17774cbade..e12f5cfd7b 100644 --- a/internal/gitaly/service/hook/update.go +++ b/internal/gitaly/service/hook/update.go @@ -30,6 +30,7 @@ func (s *server) UpdateHook(in *gitalypb.UpdateHookRequest, stream gitalypb.Hook if err := s.manager.UpdateHook( stream.Context(), + nil, in.GetRepository(), string(in.GetRef()), in.GetOldValue(), diff --git a/internal/gitaly/service/smarthttp/receive_pack_test.go b/internal/gitaly/service/smarthttp/receive_pack_test.go index 7e14ca0ebf..a19ff1e3fc 100644 --- a/internal/gitaly/service/smarthttp/receive_pack_test.go +++ b/internal/gitaly/service/smarthttp/receive_pack_test.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" gitalyhook "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/hook" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" @@ -843,6 +844,7 @@ func TestPostReceivePack_notAllowed(t *testing.T) { func( t *testing.T, ctx context.Context, + tx *gitaly.Transaction, repo *gitalypb.Repository, pushOptions, env []string, stdin io.Reader, stdout, stderr io.Writer, -- GitLab From 879ea25c6c32f51671d11140abf0f478c47b3101 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 16 May 2023 17:58:22 +0300 Subject: [PATCH 15/17] Extract hook path generation into a function This commit extracts the hook path generation into a function. Extracting a function will later make it easier in tests to assert we have the correct path when we add the hook path to the transaction's snapshot. While at it, update the parent directory to be synced with the recently introduced helper designed for it. --- internal/gitaly/transaction_manager.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 7caf6148d3..1e446ec574 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -1146,10 +1146,7 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo return nil } - syncer := safe.NewSyncer() - - hooksPath := filepath.Join("wal", "hooks") - targetDirectory := filepath.Join(mgr.repositoryPath, hooksPath, logIndex.String()) + targetDirectory := hookPathForLogIndex(mgr.repositoryPath, logIndex) if err := os.Mkdir(targetDirectory, fs.ModePerm); err != nil { // The target directory may exist if we previously tried to extract the // hooks there. TAR overwrites existing files and the hooks files are @@ -1163,19 +1160,26 @@ func (mgr *TransactionManager) applyCustomHooks(ctx context.Context, logIndex Lo return fmt.Errorf("extract hooks: %w", err) } + syncer := safe.NewSyncer() // TAR doesn't sync the extracted files so do it manually here. if err := syncer.SyncRecursive(targetDirectory); err != nil { return fmt.Errorf("sync hooks: %w", err) } // Sync the parent directory as well. - if err := syncer.Sync(filepath.Join(mgr.repositoryPath, hooksPath)); err != nil { + if err := syncer.SyncParent(targetDirectory); err != nil { return fmt.Errorf("sync hook directory: %w", err) } return nil } +// hookPathForLogIndex returns the filesystem paths where the hooks for the +// given log index are stored. +func hookPathForLogIndex(repositoryPath string, logIndex LogIndex) string { + return filepath.Join(repositoryPath, "wal", "hooks", logIndex.String()) +} + // deleteLogEntry deletes the log entry at the given index from the log. func (mgr *TransactionManager) deleteLogEntry(index LogIndex) error { return mgr.deleteKey(keyLogEntry(mgr.relativePath, index)) -- GitLab From d3fec72e7b050effa9177f4eeea47bd1f638c533 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 16 May 2023 18:07:41 +0300 Subject: [PATCH 16/17] Include hook path in a transaction's snapshot TransactionManager manages hooks via MVCC. Hooks are always written into a new directory to isolate transactions from concurrent writes. When executing or reading the hooks, the transactions should use the hooks included in the snapshot. While the log index of the hooks is already included in the snapshot, it's not useful for integration as it doesn't yet say where exactly the hooks should be executed from. Solve this problem by including an absolute path to the hooks on the disk. This will later be used by HookManager to execute the correct hooks for a transaction. Backwards compatibility logic is included to execute hooks from the repository if none have yet been written via WAL. --- internal/gitaly/transaction_manager.go | 9 +++++++++ internal/gitaly/transaction_manager_test.go | 8 +++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 1e446ec574..eaffeb4cae 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -115,6 +115,8 @@ type Snapshot struct { // HookIndex is index of the hooks on the disk that are included in this Transactions's snapshot // and were the latest on the read index. HookIndex LogIndex + // HookPath is an absolute filesystem path to the hooks in this snapshot. + HookPath string } // Transaction is a unit-of-work that contains reference changes to perform on the repository. @@ -179,9 +181,16 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) snapshot: Snapshot{ ReadIndex: mgr.appendedLogIndex, HookIndex: mgr.hookIndex, + HookPath: hookPathForLogIndex(mgr.repositoryPath, mgr.hookIndex), }, } + // If there are no hooks stored through the WAL yet, then default to the custom hooks + // that may already exist in the repository for backwards compatibility. + if txn.snapshot.HookIndex == 0 { + txn.snapshot.HookPath = filepath.Join(mgr.repositoryPath, "custom_hooks") + } + readReady := mgr.applyNotifications[txn.snapshot.ReadIndex] mgr.mutex.RUnlock() if readReady == nil { diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index 99aeebc00f..fb3358097d 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -2564,7 +2564,13 @@ func TestTransactionManager(t *testing.T) { transaction, err := transactionManager.Begin(beginCtx) require.Equal(t, step.ExpectedError, err) if err == nil { - require.Equal(t, step.ExpectedSnapshot, transaction.Snapshot()) + expectedSnapshot := step.ExpectedSnapshot + expectedSnapshot.HookPath = filepath.Join(repoPath, "custom_hooks") + if expectedSnapshot.HookIndex > 0 { + expectedSnapshot.HookPath = hookPathForLogIndex(repoPath, expectedSnapshot.HookIndex) + } + + require.Equal(t, expectedSnapshot, transaction.Snapshot()) } openTransactions[step.TransactionID] = transaction case Commit: -- GitLab From 62290e3154e12e5d668fbfebce58673920e469bc Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 16 May 2023 18:22:06 +0300 Subject: [PATCH 17/17] Execute custom hooks from transaction's snapshot Each transaction has a read snapshot that maintains its isolation from concurrent writes. The snapshot includes also the version of custom hooks. When the hooks are executed or read, the operation should be targeted at the correct version of hooks. The transaction's snapshot includes the absolute path to the custom hooks on the disk. This commit teaches HookManager to execute the snapshotted hooks from the directory included in the transaction. While the HookManager knows how to execute the correct hooks now, not all of the call sites pass a transaction yet. --- internal/gitaly/hook/custom.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/internal/gitaly/hook/custom.go b/internal/gitaly/hook/custom.go index a659e81c57..35082881d4 100644 --- a/internal/gitaly/hook/custom.go +++ b/internal/gitaly/hook/custom.go @@ -38,11 +38,17 @@ func (e CustomHookError) Unwrap() error { return e.err } -// newCustomHooksExecutor creates a new hooks executor for custom hooks. Hooks -// are looked up and executed in the following order: +// newCustomHooksExecutor creates a new hooks executor for custom hooks. // -// 1. .git/custom_hooks/ - per project hook -// 2. .git/custom_hooks/.d/* - per project hooks +// Repository specific custom hooks are executed by default from `.git/custom_hooks`. +// This is the non-WAL behavior where hooks are updated in place. With WAL, the hooks are +// managed with MVCC. If the transaction is set, we'll use the hook path from its snapshot +// to ensure we execute the correct version of the hooks. +// +// Hooks are looked up and executed in the following order: +// +// 1. / - per project hook +// 2. /.d/* - per project hooks // 3. /hooks/.d/* - global hooks // // Any files which are either not executable or have a trailing `~` are ignored. @@ -52,13 +58,18 @@ func (m *GitLabHookManager) newCustomHooksExecutor(tx *gitaly.Transaction, repo return nil, err } + hookPath := filepath.Join(repoPath, "custom_hooks") + if tx != nil { + hookPath = tx.Snapshot().HookPath + } + var hookFiles []string - projectCustomHookFile := filepath.Join(repoPath, "custom_hooks", hookName) + projectCustomHookFile := filepath.Join(hookPath, hookName) if isValidHook(projectCustomHookFile) { hookFiles = append(hookFiles, projectCustomHookFile) } - projectCustomHookDir := filepath.Join(repoPath, "custom_hooks", fmt.Sprintf("%s.d", hookName)) + projectCustomHookDir := filepath.Join(hookPath, fmt.Sprintf("%s.d", hookName)) files, err := findHooks(projectCustomHookDir) if err != nil { return nil, err -- GitLab