From a287e8174aec7474791081a0a0cbd8791ea40e0b Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 22 Oct 2024 17:10:33 +0700 Subject: [PATCH 1/7] storagemgr: Add LSN to test hooks The transaction manager has a set of test hooks. They are triggered before performing certain critical actions. The test suite uses those hooks to inject undesirable events, such as crashes or unexpected manager closings. Currently, they don't accept any arguments since those events are applied widely. In some later commits, the test suite needs to trigger hooks conditionally. This is due to the fact that Raft injects some internal log entries, such as config changes or empty log entries, for verification. The tests fail prematurely if the hook is triggered while applying those entries. While the transaction manager resumes the processing successfully afterward, crashing Raft's internal log entries should be covered elsewhere. The existing tests only cover the crashing while committing "normal" log entries. This commit adds LSN to test hooks for that use case. --- .../partition/transaction_manager.go | 24 +++++++++---------- .../transaction_manager_hook_test.go | 24 +++++++++++++++---- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index f6b7e1ab42..7293d285f3 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -1045,10 +1045,10 @@ type TransactionManager struct { type testHooks struct { beforeInitialization func() - beforeAppendLogEntry func() - beforeApplyLogEntry func() - beforeStoreAppliedLSN func() - beforeDeleteLogEntryFiles func() + beforeAppendLogEntry func(storage.LSN) + beforeApplyLogEntry func(storage.LSN) + beforeStoreAppliedLSN func(storage.LSN) + beforeDeleteLogEntryFiles func(storage.LSN) beforeRunExiting func() } @@ -1097,10 +1097,10 @@ func NewTransactionManager( testHooks: testHooks{ beforeInitialization: func() {}, - beforeAppendLogEntry: func() {}, - beforeApplyLogEntry: func() {}, - beforeStoreAppliedLSN: func() {}, - beforeDeleteLogEntryFiles: func() {}, + beforeAppendLogEntry: func(storage.LSN) {}, + beforeApplyLogEntry: func(storage.LSN) {}, + beforeStoreAppliedLSN: func(storage.LSN) {}, + beforeDeleteLogEntryFiles: func(storage.LSN) {}, beforeRunExiting: func() {}, }, } @@ -3538,9 +3538,9 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende return fmt.Errorf("synchronizing WAL directory: %w", err) } - mgr.testHooks.beforeAppendLogEntry() - nextLSN := mgr.appendedLSN + 1 + mgr.testHooks.beforeAppendLogEntry(nextLSN) + // Move the log entry from the staging directory into its place in the log. destinationPath := walFilesPathForLSN(mgr.stateDirectory, nextLSN) if err := os.Rename(logEntryPath, destinationPath); err != nil { @@ -3600,7 +3600,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS delete(mgr.snapshotLocks, previousLSN) mgr.mutex.Unlock() - mgr.testHooks.beforeApplyLogEntry() + mgr.testHooks.beforeApplyLogEntry(lsn) if err := applyOperations(ctx, safe.NewSyncer().Sync, mgr.storagePath, walFilesPathForLSN(mgr.stateDirectory, lsn), logEntry, mgr.db); err != nil { return fmt.Errorf("apply operations: %w", err) @@ -3615,7 +3615,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS } } - mgr.testHooks.beforeStoreAppliedLSN() + mgr.testHooks.beforeStoreAppliedLSN(lsn) if err := mgr.storeAppliedLSN(lsn); err != nil { return fmt.Errorf("set applied LSN: %w", err) } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index 3434a94f40..a9c2639443 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -19,16 +19,14 @@ type hookFunc func(hookContext) type hookContext struct { // closeManager calls the calls Close on the TransactionManager. closeManager func() + // lsn stores the LSN context when the hook is triggered. + lsn storage.LSN } // installHooks takes the hooks in the test setup and configures them in the TransactionManager. func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, hooks testTransactionHooks) { for destination, source := range map[*func()]hookFunc{ - &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, - &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, - &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, - &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, - &mgr.testHooks.beforeDeleteLogEntryFiles: hooks.AfterDeleteLogEntry, + &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, &mgr.testHooks.beforeRunExiting: func(hookContext) { if hooks.WaitForTransactionsWhenClosing { inflightTransactions.Wait() @@ -46,6 +44,22 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, } } } + for destination, source := range map[*func(storage.LSN)]hookFunc{ + &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, + &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, + &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, + &mgr.testHooks.beforeDeleteLogEntryFiles: hooks.AfterDeleteLogEntry, + } { + if source != nil { + runHook := source + *destination = func(lsn storage.LSN) { + runHook(hookContext{ + closeManager: mgr.Close, + lsn: lsn, + }) + } + } + } } func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { -- GitLab From 258508ad38e2b16db50607c041123709e89e79ea Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 1 Nov 2024 10:48:39 +0700 Subject: [PATCH 2/7] storagemgr: Store TransactionManager in the hook context TransactionManager's test suite installs some hooks so that tests can access certain stages of the transaction's life cycle. Those hooks are fed with a hook context containing LSN and a close function. In some later commits, some tests need to access some data of TransactionManager. Adding arbitrary data to the hook context continuously is tedious. Thus, this commit embeds the pointer to TransactionManager in the hook context. This should not be a concern. Outsiders could not access the internal state. Only internal package tests can install such hooks. --- .../partition/transaction_manager_hook_test.go | 10 +++++----- .../storagemgr/partition/transaction_manager_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index a9c2639443..c8db383796 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -17,8 +17,8 @@ type hookFunc func(hookContext) // hookContext are the control toggles available in a hook. type hookContext struct { - // closeManager calls the calls Close on the TransactionManager. - closeManager func() + // manager points to the subject TransactionManager. + manager *TransactionManager // lsn stores the LSN context when the hook is triggered. lsn storage.LSN } @@ -39,7 +39,7 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, runHook := source *destination = func() { runHook(hookContext{ - closeManager: mgr.Close, + manager: mgr, }) } } @@ -54,8 +54,8 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, runHook := source *destination = func(lsn storage.LSN) { runHook(hookContext{ - closeManager: mgr.Close, - lsn: lsn, + manager: mgr, + lsn: lsn, }) } } diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index 021f6adddc..b3b1011185 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -485,7 +485,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeAppendLogEntry: func(hookContext hookContext) { hookContext.closeManager() }, + BeforeAppendLogEntry: func(hookContext hookContext) { hookContext.manager.Close() }, // This ensures we are testing the context cancellation errors being unwrapped properly // to an storage.ErrTransactionProcessingStopped instead of hitting the general case when // runDone is closed. -- GitLab From 3f774c48c293485c96114fc56e2de3a90b365017 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Tue, 22 Oct 2024 17:37:25 +0700 Subject: [PATCH 3/7] storagemgr: Centralize errSimulateCrash emitter The test suite of storage inserts a bunch of test hooks that panic with errSimulateCrash. Those hooks are unconditional. This commit creates a helper function that returns a hook function that emits the panic error. This is a preparation step for later commits where the crashing occurs for certain entries only. --- .../transaction_manager_alternate_test.go | 8 ++--- ...transaction_manager_default_branch_test.go | 4 +-- .../transaction_manager_hook_test.go | 20 ++++--------- .../transaction_manager_housekeeping_test.go | 4 +-- .../transaction_manager_key_value_test.go | 8 ++--- .../transaction_manager_repo_test.go | 16 +++------- .../partition/transaction_manager_test.go | 29 ++++++++++--------- 7 files changed, 32 insertions(+), 57 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go index dbe1843e39..bbd632c432 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go @@ -1124,9 +1124,7 @@ func generateAlternateTests(t *testing.T, setup testTransactionSetup) []transact CloseManager{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -1203,9 +1201,7 @@ func generateAlternateTests(t *testing.T, setup testTransactionSetup) []transact CloseManager{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go index 5bfd1c5fc9..07ab97957c 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go @@ -464,9 +464,7 @@ func generateDefaultBranchTests(t *testing.T, setup testTransactionSetup) []tran steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index c8db383796..218f09d48e 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -103,9 +103,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -153,9 +151,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -497,9 +493,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -591,9 +585,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -685,9 +677,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go index f0e36bd571..5af8d077c4 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go @@ -1335,9 +1335,7 @@ func generateHousekeepingPackRefsTests(t *testing.T, ctx context.Context, testPa CloseManager{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go index b39cc1b2b5..25f2055cb6 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go @@ -1,11 +1,13 @@ package partition import ( + "testing" + "github.com/dgraph-io/badger/v4" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" ) -func generateKeyValueTests(setup testTransactionSetup) []transactionTestCase { +func generateKeyValueTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { return []transactionTestCase{ { desc: "set keys with values", @@ -615,9 +617,7 @@ func generateKeyValueTests(setup testTransactionSetup) []transactionTestCase { steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go index 785204ee33..1f356bc4d6 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go @@ -364,9 +364,7 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t RemoveRepository{}, StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -408,9 +406,7 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t RemoveRepository{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -910,9 +906,7 @@ func generateDeleteRepositoryTests(t *testing.T, setup testTransactionSetup) []t steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -954,9 +948,7 @@ func generateDeleteRepositoryTests(t *testing.T, setup testTransactionSetup) []t steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index b3b1011185..3fadb58882 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -39,6 +39,13 @@ import ( // TransactionManager.Run execution. var errSimulatedCrash = errors.New("simulated crash") +// simulateCrashHook returns a hook function that panics with errSimulatedCrash. +var simulateCrashHook = func() func(hookContext) { + return func(hookContext) { + panic(errSimulatedCrash) + } +} + func manifestDirectoryEntry(expected *gitalypb.LogEntry) testhelper.DirectoryEntry { return testhelper.DirectoryEntry{ Mode: mode.File, @@ -350,7 +357,7 @@ func TestTransactionManager(t *testing.T) { "Housekeeping/RepackingConcurrent": generateHousekeepingRepackingConcurrentTests(t, ctx, setup), "Housekeeping/CommitGraphs": generateHousekeepingCommitGraphsTests(t, ctx, setup), "Consumer": generateConsumerTests(t, setup), - "KeyValue": generateKeyValueTests(setup), + "KeyValue": generateKeyValueTests(t, setup), } for desc, tests := range subTests { @@ -506,9 +513,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -865,9 +870,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio Prune{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -1438,13 +1441,11 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeReadAppliedLSN: func(hookContext) { - // Raise a panic when the manager is about to read the applied log - // index when initializing. In reality this would crash the server but - // in tests it serves as a way to abort the initialization in correct - // location. - panic(errSimulatedCrash) - }, + // Raise a panic when the manager is about to read the applied log + // index when initializing. In reality this would crash the server but + // in tests it serves as a way to abort the initialization in correct + // location. + BeforeReadAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, -- GitLab From 1f2a88d18c57ed9bf6374b64e6bf58725c385b3e Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Wed, 30 Oct 2024 10:23:17 +0700 Subject: [PATCH 4/7] storagemgr: Rename appendedLSN to committedLSN Recently, the TransactionManager uses two indices which are appendedLSN and appliedLSN. It increases the value of appendedLSN after it accepts a transaction. When the log entry is applied, the transaction manager increases appliedLSN accordingly. Any log entries prior to appliedLSN could be removed if there are no pending transactions referring to them. We are working on adding Raft consensus to the TransactionManager. When a log entry is accepted by the current node, it also needs to be transferred to and accepted by the majority of cluster members. To make it easier for later steps, the TransactionManager needs to track two indices independently: * appendedLSN: this index tracks the LSN of the log entry accepted by the current node, but not yet accepted by others. The transaction manager are allowed to override entries from committedLSN + 1 to appendedLSN. * committedLSN: this index tracks the committed log entries. They are safe to be applied to the repositories. Eventually, other nodes apply those entries in the same order. As the current appendedLSN index is taking the role of committedLSN, this commit renames it to commitedLSN. --- .../partition/transaction_manager.go | 74 ++++++++++--------- .../partition/transaction_manager_test.go | 12 +-- 2 files changed, 46 insertions(+), 40 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 7293d285f3..534586b807 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -350,7 +350,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti txn := &Transaction{ write: opts.Write, commit: mgr.commit, - snapshotLSN: mgr.appendedLSN, + snapshotLSN: mgr.committedLSN, finished: make(chan struct{}), relativePath: relativePath, metrics: mgr.metrics, @@ -925,7 +925,7 @@ func (p *consumerPosition) setPosition(pos storage.LSN) { // - The reference verification failures can be ignored instead of aborting the entire transaction. // If done, the references that failed verification are dropped from the transaction but the updates // that passed verification are still performed. -// 2. The transaction is appended to the write-ahead log. Once the write has been logged, it is effectively +// 2. The transaction is committed to the write-ahead log. Once the write has been logged, it is effectively // committed and will be applied to the repository even after restarting. // 3. The transaction is applied from the write-ahead log to the repository by actually performing the reference // changes. @@ -998,7 +998,7 @@ type TransactionManager struct { // initializationSuccessful is set if the TransactionManager initialized successfully. If it didn't, // transactions will fail to begin. initializationSuccessful bool - // mutex guards access to snapshotLocks and appendedLSN. These fields are accessed by both + // mutex guards access to snapshotLocks and committedLSN. These fields are accessed by both // Run and Begin which are ran in different goroutines. mutex sync.Mutex @@ -1008,8 +1008,14 @@ type TransactionManager struct { // snapshotManager is responsible for creation and management of file system snapshots. snapshotManager *snapshot.Manager - // appendedLSN holds the LSN of the last log entry appended to the partition's write-ahead log. - appendedLSN storage.LSN + // ┌─ oldestLSN ┌─ committedLSN + // ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ □ □ □ □ □ □ □ + // └─ appliedLSN └─ appendedLSN + // + // committedLSN holds the LSN of the last log entry committed to the partition's write-ahead log. A log entry is + // considered to be committed if it's accepted by the majority of cluster members. Eventually, it will be + // applied by all cluster members. + committedLSN storage.LSN // appliedLSN holds the LSN of the last log entry applied to the partition. appliedLSN storage.LSN // oldestLSN holds the LSN of the head of log entries which is still kept in the database. The manager keeps @@ -1020,7 +1026,7 @@ type TransactionManager struct { // the partition. It's keyed by the LSN the transaction is waiting to be applied and the // value is the resultChannel that is waiting the result. awaitingTransactions map[storage.LSN]resultChannel - // committedEntries keeps some latest appended log entries around. Some types of transactions, such as + // committedEntries keeps some latest committed log entries around. Some types of transactions, such as // housekeeping, operate on snapshot repository. There is a gap between transaction doing its work and the time // when it is committed. They need to verify if concurrent operations can cause conflict. These log entries are // still kept around even after they are applied. They are removed when there are no active readers accessing @@ -1228,7 +1234,7 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact // The reference updates are staged into the transaction when they are verified, including // the packed-refs. While the reference files would generally be small, the packed-refs file // may be large. Sync the contents of the file before entering the critical section to ensure - // we don't end up syncing the potentially very large file to disk when we're appending the + // we don't end up syncing the potentially very large file to disk when we're committing the // log entry. preImagePackedRefsInode, err := getInode(transaction.originalPackedRefsFilePath()) if err != nil { @@ -1774,7 +1780,7 @@ func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, tran Name: "pack-refs", // By using the '--auto' flag, we ensure that git uses the best heuristic // for compaction. For reftables, it currently uses a geometric progression. - // This ensures we don't keep compacting unecessarily to a single file. + // This ensures we don't keep compacting unnecessarily to a single file. Flags: []gitcmd.Option{gitcmd.Flag{Name: "--auto"}}, }, gitcmd.WithStderr(&stderr)); err != nil { return structerr.New("exec pack-refs: %w", err).WithMetadata("stderr", stderr.String()) @@ -2131,7 +2137,7 @@ func unwrapExpectedError(err error) error { return err } -// Run starts the transaction processing. On start up Run loads the indexes of the last appended and applied +// Run starts the transaction processing. On start up Run loads the indexes of the last committed and applied // log entries from the database. It will then apply any transactions that have been logged but not applied // to the repository. Once the recovery is completed, Run starts processing new transactions by verifying the // references, logging the transaction and finally applying it to the repository. The transactions are acknowledged @@ -2162,7 +2168,7 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { } for { - if mgr.appliedLSN < mgr.appendedLSN { + if mgr.appliedLSN < mgr.committedLSN { lsn := mgr.appliedLSN + 1 if err := mgr.applyLogEntry(ctx, lsn); err != nil { @@ -2307,13 +2313,13 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned logEntry.Operations = transaction.walEntry.Operations() - return mgr.appendLogEntry(ctx, transaction.objectDependencies, logEntry, transaction.walFilesPath()) + return mgr.commitLogEntry(ctx, transaction.objectDependencies, logEntry, transaction.walFilesPath()) }(); err != nil { transaction.result <- err return nil } - mgr.awaitingTransactions[mgr.appendedLSN] = transaction.result + mgr.awaitingTransactions[mgr.committedLSN] = transaction.result return nil } @@ -2429,7 +2435,7 @@ func (mgr *TransactionManager) snapshotsDir() string { return filepath.Join(mgr.stagingDirectory, "snapshots") } -// initialize initializes the TransactionManager's state from the database. It loads the appended and the applied +// initialize initializes the TransactionManager's state from the database. It loads the committed and the applied // LSNs and initializes the notification channels that synchronize transaction beginning with log entry applying. func (mgr *TransactionManager) initialize(ctx context.Context) error { defer trace.StartRegion(ctx, "initialize").End() @@ -2456,24 +2462,24 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return fmt.Errorf("new snapshot manager: %w", err) } - // The LSN of the last appended log entry is determined from the LSN of the latest entry in the log and + // The LSN of the last committed log entry is determined from the LSN of the latest entry in the log and // the latest applied log entry. The manager also keeps track of committed entries and reserves them until there // is no transaction refers them. It's possible there are some left-over entries in the database because a // transaction can hold the entry stubbornly. So, the manager could not clean them up in the last session. // - // ┌─ oldestLSN ┌─ appendedLSN - // ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ - // └─ appliedLSN + // ┌─ oldestLSN ┌─ committedLSN + // ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ □ □ □ □ □ □ □ + // └─ appliedLSN └─ appendedLSN // // // oldestLSN is initialized to appliedLSN + 1. If there are no log entries in the log, then everything has been // pruned already or there has not been any log entries yet. Setting this +1 avoids trying to clean up log entries // that do not exist. If there are some, we'll set oldestLSN to the head of the log below. mgr.oldestLSN = mgr.appliedLSN + 1 - // appendedLSN is initialized to appliedLSN. If there are no log entries, then there has been no transaction yet, or + // committedLSN is initialized to appliedLSN. If there are no log entries, then there has been no transaction yet, or // all log entries have been applied and have been already pruned. If there are some in the log, we'll update this // below to match. - mgr.appendedLSN = mgr.appliedLSN + mgr.committedLSN = mgr.appliedLSN if logEntries, err := os.ReadDir(walFilesPath(mgr.stateDirectory)); err != nil { return fmt.Errorf("read wal directory: %w", err) @@ -2481,13 +2487,13 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { return fmt.Errorf("parse oldest LSN: %w", err) } - if mgr.appendedLSN, err = storage.ParseLSN(logEntries[len(logEntries)-1].Name()); err != nil { - return fmt.Errorf("parse appended LSN: %w", err) + if mgr.committedLSN, err = storage.ParseLSN(logEntries[len(logEntries)-1].Name()); err != nil { + return fmt.Errorf("parse committed LSN: %w", err) } } if mgr.consumer != nil { - mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) + mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.committedLSN) } // Create a snapshot lock for the applied LSN as it is used for synchronizing @@ -2497,11 +2503,11 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { // Each unapplied log entry should have a snapshot lock as they are created in normal // operation when committing a log entry. Recover these entries. - for i := mgr.appliedLSN + 1; i <= mgr.appendedLSN; i++ { + for i := mgr.appliedLSN + 1; i <= mgr.committedLSN; i++ { mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})} } - if err := mgr.removeStaleWALFiles(ctx, mgr.oldestLSN, mgr.appendedLSN); err != nil { + if err := mgr.removeStaleWALFiles(ctx, mgr.oldestLSN, mgr.committedLSN); err != nil { return fmt.Errorf("remove stale packs: %w", err) } @@ -2597,15 +2603,15 @@ func (mgr *TransactionManager) removePackedRefsLocks(ctx context.Context, reposi // but the manager was interrupted before successfully persisting the log entry itself. // If the manager deletes a log entry successfully from the database but is interrupted before it cleans // up the associated files, such a directory can also be left at the head of the log. -func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, oldestLSN, appendedLSN storage.LSN) error { +func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, oldestLSN, committedLSN storage.LSN) error { needsFsync := false for _, possibleStaleFilesPath := range []string{ // Log entries are pruned one by one. If a write is interrupted, the only possible stale files would be // for the log entry preceding the oldest log entry. walFilesPathForLSN(mgr.stateDirectory, oldestLSN-1), - // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale + // Log entries are committed one by one to the log. If a write is interrupted, the only possible stale // files would be for the next LSN. Remove the files if they exist. - walFilesPathForLSN(mgr.stateDirectory, appendedLSN+1), + walFilesPathForLSN(mgr.stateDirectory, committedLSN+1), } { if _, err := os.Stat(possibleStaleFilesPath); err != nil { @@ -2857,7 +2863,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // to transaction operations. // // To ensure that we don't modify existing tables and autocompact, we lock the existing tables -// before applying the updates. This way the reftable backend willl only create new tables +// before applying the updates. This way the reftable backend will only create new tables func (mgr *TransactionManager) verifyReferencesWithGitForReftables( ctx context.Context, referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, @@ -3139,7 +3145,7 @@ func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transacti mgr.mutex.Lock() defer mgr.mutex.Unlock() - // Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest appended LSN. + // Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest committed LSN. if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry, objectDependencies map[git.ObjectID]struct{}) error { if entry.GetHousekeeping() != nil { return errHousekeepingConflictConcurrent @@ -3506,11 +3512,11 @@ func (mgr *TransactionManager) applyReferenceTransaction(ctx context.Context, ch return nil } -// appendLogEntry appends the transaction to the write-ahead log. It first writes the transaction's manifest file +// commitLogEntry commits the transaction to the write-ahead log. It first writes the transaction's manifest file // into the log entry's directory. Afterwards it moves the log entry's directory from the staging area to its final // place in the write-ahead log. -func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { - defer trace.StartRegion(ctx, "appendLogEntry").End() +func (mgr *TransactionManager) commitLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { + defer trace.StartRegion(ctx, "commitLogEntry").End() manifestBytes, err := proto.Marshal(logEntry) if err != nil { @@ -3538,7 +3544,7 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende return fmt.Errorf("synchronizing WAL directory: %w", err) } - nextLSN := mgr.appendedLSN + 1 + nextLSN := mgr.committedLSN + 1 mgr.testHooks.beforeAppendLogEntry(nextLSN) // Move the log entry from the staging directory into its place in the log. @@ -3566,7 +3572,7 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende // After this latch block, the transaction is committed and all subsequent transactions // are guaranteed to read it. mgr.mutex.Lock() - mgr.appendedLSN = nextLSN + mgr.committedLSN = nextLSN mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})} mgr.committedEntries.PushBack(&committedEntry{ lsn: nextLSN, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index 3fadb58882..c9a1a3ad80 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -2067,7 +2067,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, }, { - desc: "transaction manager cleans up left-over committed entries when appliedLSN == appendedLSN", + desc: "transaction manager cleans up left-over committed entries when appliedLSN == committedLSN", steps: steps{ StartManager{}, Begin{ @@ -2141,7 +2141,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(3)) - require.Equal(t, tm.appendedLSN, storage.LSN(3)) + require.Equal(t, tm.committedLSN, storage.LSN(3)) }), }, expectedState: StateAssertion{ @@ -2213,7 +2213,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, }, { - desc: "transaction manager cleans up left-over committed entries when appliedLSN < appendedLSN", + desc: "transaction manager cleans up left-over committed entries when appliedLSN < committedLSN", skip: func(t *testing.T) { testhelper.SkipWithReftable(t, "test requires manual log addition") }, @@ -2264,11 +2264,11 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { // Insert an out-of-band log-entry directly into the database for easier test // setup. It's a bit tricky to simulate committed log entries and un-processed - // appended log entries at the same time. + // committed log entries at the same time. logEntryPath := filepath.Join(t.TempDir(), "log_entry") require.NoError(t, os.Mkdir(logEntryPath, mode.Directory)) require.NoError(t, os.WriteFile(filepath.Join(logEntryPath, "1"), []byte(setup.Commits.First.OID+"\n"), mode.File)) - require.NoError(t, tm.appendLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath)) + require.NoError(t, tm.commitLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath)) RequireDatabase(t, ctx, tm.db, DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), @@ -2297,7 +2297,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(4).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(4)) - require.Equal(t, tm.appendedLSN, storage.LSN(4)) + require.Equal(t, tm.committedLSN, storage.LSN(4)) }), }, expectedState: StateAssertion{ -- GitLab From 129bb49a3f2fc7ae2f6d8d7a0fb96c25c9df9f63 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Thu, 10 Oct 2024 14:34:57 +0700 Subject: [PATCH 5/7] storagemgr: Re-introduce intermediate appendedLSN index In the previous commit, the TransactionManager renames appendedLSN to committedLSN. This commit re-introduces appendedLSN but with a different meaning. It now tracks the intermediate state waiting to be committed by other nodes. The `commitLogEntry` is split into two smaller functions. The first one encapsulates and updates appendedLSN. The second one is supposed to engage Raft updates committedLSN. As now we haven't integrated the Raft library yet, the second function simply increases the committedLSN index. This commit also introduces a new non-subtle change: committedLSN does not persist in KV DB but is next to appliedLSN. Before, as soon as a log entry is flushed to disk, that log entry will be applied, eventually. After splitting the append stage and commit stage, things might go wrong after a log entry is appended but is not committed. This is particularly true if Raft enters the picture and adds network latency to the log entry proposal. Thus, the transaction manager needs to persist the committedLSN to avoid applying for uncommitted entries after a crash. After restarts, it resumes from the latest committedLSN, applies unapplied entries, if any, and removes obsoleted appended entries, if any. --- .../storagemgr/partition/testhelper_test.go | 9 + .../partition/transaction_manager.go | 116 ++++++- .../transaction_manager_hook_test.go | 1 + .../partition/transaction_manager_test.go | 316 +++++++++++++++++- 4 files changed, 424 insertions(+), 18 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index c7f5b00464..fac426aa5a 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -627,6 +627,13 @@ func RequireDatabase(tb testing.TB, ctx context.Context, database keyvalue.Trans if expectedState == nil { expectedState = DatabaseState{} } + // Most of the time, persisted committedLSN is equal to appliedLSN except for some intentional tests. Thus, if + // appliedLSN is asserted, the test should backfill committedLSN if it's not there. + if appliedLSN, appliedExist := expectedState[string(keyAppliedLSN)]; appliedExist { + if _, committedExist := expectedState[string(keyCommittedLSN)]; !committedExist { + expectedState[string(keyCommittedLSN)] = appliedLSN + } + } actualState := DatabaseState{} unexpectedKeys := []string{} @@ -705,6 +712,8 @@ type testTransactionHooks struct { BeforeApplyLogEntry hookFunc // BeforeAppendLogEntry is called before a log entry is appended to the log. BeforeAppendLogEntry hookFunc + // BeforeAppendLogEntry is called before a log entry is marked as committed. + BeforeCommitLogEntry hookFunc // AfterDeleteLogEntry is called after a log entry is deleted. AfterDeleteLogEntry hookFunc // BeforeReadAppliedLSN is invoked before the applied LSN is read. diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 534586b807..44ad0b088e 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -101,6 +101,8 @@ var ( // keyAppliedLSN is the database key storing a partition's last applied log entry's LSN. keyAppliedLSN = []byte("applied_lsn") + // keyCommittedLSN is the database key storing a partition's last committed log entry's LSN. + keyCommittedLSN = []byte("committed_lsn") ) const relativePathKeyPrefix = "r/" @@ -1012,6 +1014,10 @@ type TransactionManager struct { // ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ □ □ □ □ □ □ □ // └─ appliedLSN └─ appendedLSN // + // appendedLSN holds the LSN of the last log entry emitted by the current node but not yet acknowledged by other + // nodes. After so, the committedLSN is increment respectively and catches up with appendedLSN. If Raft is not + // enabled or functions as a single-node cluster, committedLSN is increment instantly. + appendedLSN storage.LSN // committedLSN holds the LSN of the last log entry committed to the partition's write-ahead log. A log entry is // considered to be committed if it's accepted by the majority of cluster members. Eventually, it will be // applied by all cluster members. @@ -1026,6 +1032,10 @@ type TransactionManager struct { // the partition. It's keyed by the LSN the transaction is waiting to be applied and the // value is the resultChannel that is waiting the result. awaitingTransactions map[storage.LSN]resultChannel + + // appendedEntries keeps track of appended but not-yet committed entries. After an entry is committed, it is + // removed from this map. This provides quick reference to those entries. + appendedEntries map[storage.LSN]*gitalypb.LogEntry // committedEntries keeps some latest committed log entries around. Some types of transactions, such as // housekeeping, operate on snapshot repository. There is a gap between transaction doing its work and the time // when it is committed. They need to verify if concurrent operations can cause conflict. These log entries are @@ -1052,6 +1062,7 @@ type TransactionManager struct { type testHooks struct { beforeInitialization func() beforeAppendLogEntry func(storage.LSN) + beforeCommitLogEntry func(storage.LSN) beforeApplyLogEntry func(storage.LSN) beforeStoreAppliedLSN func(storage.LSN) beforeDeleteLogEntryFiles func(storage.LSN) @@ -1095,6 +1106,7 @@ func NewTransactionManager( stateDirectory: stateDir, stagingDirectory: stagingDir, awaitingTransactions: make(map[storage.LSN]resultChannel), + appendedEntries: map[storage.LSN]*gitalypb.LogEntry{}, committedEntries: list.New(), metrics: metrics, consumer: consumer, @@ -1104,6 +1116,7 @@ func NewTransactionManager( testHooks: testHooks{ beforeInitialization: func() {}, beforeAppendLogEntry: func(storage.LSN) {}, + beforeCommitLogEntry: func(storage.LSN) {}, beforeApplyLogEntry: func(storage.LSN) {}, beforeStoreAppliedLSN: func(storage.LSN) {}, beforeDeleteLogEntryFiles: func(storage.LSN) {}, @@ -2168,6 +2181,7 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { } for { + // We prioritize applying committed log entries to the partition first. if mgr.appliedLSN < mgr.committedLSN { lsn := mgr.appliedLSN + 1 @@ -2313,7 +2327,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned logEntry.Operations = transaction.walEntry.Operations() - return mgr.commitLogEntry(ctx, transaction.objectDependencies, logEntry, transaction.walFilesPath()) + return mgr.proposeLogEntry(ctx, transaction.objectDependencies, logEntry, transaction.walFilesPath()) }(); err != nil { transaction.result <- err return nil @@ -2476,19 +2490,49 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { // pruned already or there has not been any log entries yet. Setting this +1 avoids trying to clean up log entries // that do not exist. If there are some, we'll set oldestLSN to the head of the log below. mgr.oldestLSN = mgr.appliedLSN + 1 - // committedLSN is initialized to appliedLSN. If there are no log entries, then there has been no transaction yet, or - // all log entries have been applied and have been already pruned. If there are some in the log, we'll update this - // below to match. - mgr.committedLSN = mgr.appliedLSN + + // CommittedLSN is loaded from DB. A log entry is appended first and marked as committed later. There's a chance + // that log entry is never marked as committed. After a restart, especially after a crash, the manager won't be + // able to tell if it's committed or not. Thus, we need to persist this index. + // Because index persistence is introduced later, we need to fallback to appliedLSN if that key does not exist + // in the DB. + var committedLSN gitalypb.LSN + if err := mgr.readKey(keyCommittedLSN, &committedLSN); err != nil { + if !errors.Is(err, badger.ErrKeyNotFound) { + return fmt.Errorf("read committed LSN: %w", err) + } + mgr.committedLSN = mgr.appliedLSN + } else { + mgr.committedLSN = storage.LSN(committedLSN.GetValue()) + } + + // appendedLSN is always set to committedLSN after starting. If a log entry hasn't been committed, it could be + // discarded. Its caller never received the acknowledgement. + mgr.appendedLSN = mgr.committedLSN if logEntries, err := os.ReadDir(walFilesPath(mgr.stateDirectory)); err != nil { return fmt.Errorf("read wal directory: %w", err) } else if len(logEntries) > 0 { - if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { - return fmt.Errorf("parse oldest LSN: %w", err) - } - if mgr.committedLSN, err = storage.ParseLSN(logEntries[len(logEntries)-1].Name()); err != nil { - return fmt.Errorf("parse committed LSN: %w", err) + // All log entries starting from mgr.committedLSN + 1 are not committed. No reason to keep them around. + for i := len(logEntries) - 1; i >= 0; i-- { + logEntry := logEntries[i] + + lsn, err := storage.ParseLSN(logEntry.Name()) + if err != nil { + return fmt.Errorf("parse committed LSN: %w", err) + } + if lsn <= mgr.committedLSN { + // Found some on-disk log entries older than or equal to committedLSN. They might be + // referenced by other transactions before restart. Eventually, they'll be removed in + // the main loop. + if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { + return fmt.Errorf("parse oldest LSN: %w", err) + } + break + } + if err := mgr.deleteLogEntry(mgr.ctx, lsn); err != nil { + return fmt.Errorf("cleaning uncommitted log entry: %w", err) + } } } @@ -3512,11 +3556,23 @@ func (mgr *TransactionManager) applyReferenceTransaction(ctx context.Context, ch return nil } -// commitLogEntry commits the transaction to the write-ahead log. It first writes the transaction's manifest file -// into the log entry's directory. Afterwards it moves the log entry's directory from the staging area to its final -// place in the write-ahead log. -func (mgr *TransactionManager) commitLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { - defer trace.StartRegion(ctx, "commitLogEntry").End() +// proposeLogEntry proposes a log etnry of a transaction to the write-ahead log. It first writes the transaction's +// manifest file into the log entry's directory. Second, it sends the log entry to other cluster members if needed. +// Afterwards it moves the log entry's directory from the staging area to its final place in the write-ahead log. +func (mgr *TransactionManager) proposeLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { + nextLSN := mgr.appendedLSN + 1 + if err := mgr.appendLogEntry(ctx, nextLSN, logEntry, logEntryPath); err != nil { + return fmt.Errorf("append log entry: %w", err) + } + if err := mgr.commitLogEntry(ctx, nextLSN, objectDependencies); err != nil { + return fmt.Errorf("commit log entry: %w", err) + } + return nil +} + +// appendLogEntry appends the transaction to the write-ahead log. +func (mgr *TransactionManager) appendLogEntry(ctx context.Context, nextLSN storage.LSN, logEntry *gitalypb.LogEntry, logEntryPath string) error { + defer trace.StartRegion(ctx, "appendLogEntry").End() manifestBytes, err := proto.Marshal(logEntry) if err != nil { @@ -3544,7 +3600,6 @@ func (mgr *TransactionManager) commitLogEntry(ctx context.Context, objectDepende return fmt.Errorf("synchronizing WAL directory: %w", err) } - nextLSN := mgr.committedLSN + 1 mgr.testHooks.beforeAppendLogEntry(nextLSN) // Move the log entry from the staging directory into its place in the log. @@ -3571,9 +3626,32 @@ func (mgr *TransactionManager) commitLogEntry(ctx context.Context, objectDepende // After this latch block, the transaction is committed and all subsequent transactions // are guaranteed to read it. + mgr.mutex.Lock() + mgr.appendedLSN = nextLSN + mgr.appendedEntries[mgr.appendedLSN] = logEntry + mgr.mutex.Unlock() + + return nil +} + +// commitLogEntry commits the transaction to the write-ahead log. +func (mgr *TransactionManager) commitLogEntry(ctx context.Context, nextLSN storage.LSN, objectDependencies map[git.ObjectID]struct{}) error { + defer trace.StartRegion(ctx, "commitLogEntry").End() + mgr.testHooks.beforeCommitLogEntry(nextLSN) + + // Persist committed LSN before updating internal states. + if err := mgr.storeCommittedLSN(nextLSN); err != nil { + return fmt.Errorf("persisting committed entry: %w", err) + } + mgr.mutex.Lock() mgr.committedLSN = nextLSN mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})} + if _, exist := mgr.appendedEntries[nextLSN]; !exist { + mgr.mutex.Unlock() + return fmt.Errorf("log entry %s not found in the appended list", nextLSN) + } + delete(mgr.appendedEntries, nextLSN) mgr.committedEntries.PushBack(&committedEntry{ lsn: nextLSN, objectDependencies: objectDependencies, @@ -3583,6 +3661,7 @@ func (mgr *TransactionManager) commitLogEntry(ctx context.Context, objectDepende if mgr.consumer != nil { mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) } + return nil } @@ -4011,6 +4090,11 @@ func (mgr *TransactionManager) storeAppliedLSN(lsn storage.LSN) error { return mgr.setKey(keyAppliedLSN, lsn.ToProto()) } +// storeCommittedLSN stores the partition's committed LSN in the database. +func (mgr *TransactionManager) storeCommittedLSN(lsn storage.LSN) error { + return mgr.setKey(keyCommittedLSN, lsn.ToProto()) +} + // setKey marshals and stores a given protocol buffer message into the database under the given key. func (mgr *TransactionManager) setKey(key []byte, value proto.Message) error { marshaledValue, err := proto.Marshal(value) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index 218f09d48e..d2467b3dab 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -47,6 +47,7 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, for destination, source := range map[*func(storage.LSN)]hookFunc{ &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, + &mgr.testHooks.beforeCommitLogEntry: hooks.BeforeCommitLogEntry, &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, &mgr.testHooks.beforeDeleteLogEntryFiles: hooks.AfterDeleteLogEntry, } { diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index c9a1a3ad80..c42fa9aca7 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -346,6 +346,7 @@ func TestTransactionManager(t *testing.T) { subTests := map[string][]transactionTestCase{ "Common": generateCommonTests(t, ctx, setup), "CommittedEntries": generateCommittedEntriesTests(t, setup), + "AppendedEntries": generateAppendedEntriesTests(t, setup), "ModifyReferences": generateModifyReferencesTests(t, setup), "CreateRepository": generateCreateRepositoryTests(t, setup), "DeleteRepository": generateDeleteRepositoryTests(t, setup), @@ -531,6 +532,11 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio }, }, expectedState: StateAssertion{ + Database: DatabaseState{ + // The process crashes before apply but after it's committed. So, only + // committedLSN is persisted. + string(keyCommittedLSN): storage.LSN(1).ToProto(), + }, Directory: gittest.FilesOrReftables(testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -1751,6 +1757,8 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t } i++ } + + require.Empty(t, manager.appendedEntries) } return []transactionTestCase{ @@ -2142,6 +2150,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }) require.Equal(t, tm.appliedLSN, storage.LSN(3)) require.Equal(t, tm.committedLSN, storage.LSN(3)) + require.Equal(t, tm.appendedLSN, storage.LSN(3)) }), }, expectedState: StateAssertion{ @@ -2268,10 +2277,13 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t logEntryPath := filepath.Join(t.TempDir(), "log_entry") require.NoError(t, os.Mkdir(logEntryPath, mode.Directory)) require.NoError(t, os.WriteFile(filepath.Join(logEntryPath, "1"), []byte(setup.Commits.First.OID+"\n"), mode.File)) - require.NoError(t, tm.commitLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath)) + require.NoError(t, tm.proposeLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath)) RequireDatabase(t, ctx, tm.db, DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), + // Because this is an out-of-band insertion, the committedLSN is updated + // but new log entry is not applied until waken up. + string(keyCommittedLSN): storage.LSN(4).ToProto(), }) // Transaction 2 and 3 are left-over. testhelper.RequireDirectoryState(t, tm.stateDirectory, "", testhelper.DirectoryState{ @@ -2298,11 +2310,13 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }) require.Equal(t, tm.appliedLSN, storage.LSN(4)) require.Equal(t, tm.committedLSN, storage.LSN(4)) + require.Equal(t, tm.appendedLSN, storage.LSN(4)) }), }, expectedState: StateAssertion{ Database: DatabaseState{ - string(keyAppliedLSN): storage.LSN(4).ToProto(), + string(keyAppliedLSN): storage.LSN(4).ToProto(), + string(keyCommittedLSN): storage.LSN(4).ToProto(), }, Repositories: RepositoryStates{ setup.RelativePath: { @@ -2324,6 +2338,304 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t } } +func generateAppendedEntriesTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { + assertAppendedEntries := func(t *testing.T, manager *TransactionManager, expectedList []storage.LSN) { + actual := manager.appendedEntries + assert.Equalf(t, len(expectedList), len(actual), "appended entries not matched") + + for _, lsn := range expectedList { + expectedEntry, err := manager.readLogEntry(lsn) + assert.NoError(t, err) + testhelper.ProtoEqualAssert(t, expectedEntry, actual[lsn]) + } + } + + return []transactionTestCase{ + { + desc: "manager has just initialized", + steps: steps{ + StartManager{Hooks: testTransactionHooks{ + BeforeCommitLogEntry: func(c hookContext) { + assert.Fail(t, "there shouldn't be any committed entry") + }, + }}, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(0), tm.appendedLSN) + assertAppendedEntries(t, tm, nil) + }), + CloseManager{}, + }, + }, + { + desc: "appended entries are removed after committed", + steps: steps{ + StartManager{Hooks: testTransactionHooks{ + BeforeCommitLogEntry: func(c hookContext) { + // It is not the best solution. But as we intercept to assert an + // intermediate state, there is no cleaner way than using hooks. + lsn := c.lsn + manager := c.manager + switch lsn { + case storage.LSN(1): + assert.Equal(t, storage.LSN(1), manager.appendedLSN) + assert.Equal(t, storage.LSN(0), manager.committedLSN) + assertAppendedEntries(t, manager, []storage.LSN{1}) + case storage.LSN(2): + assert.Equal(t, storage.LSN(2), manager.appendedLSN) + assert.Equal(t, storage.LSN(1), manager.committedLSN) + assertAppendedEntries(t, manager, []storage.LSN{2}) + default: + assert.Fail(t, "there shouldn't be another committed entry") + } + }, + }}, + Begin{ + TransactionID: 1, + RelativePaths: []string{setup.RelativePath}, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: git.ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(1), tm.appendedLSN) + require.Equal(t, storage.LSN(1), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + Begin{ + TransactionID: 2, + RelativePaths: []string{setup.RelativePath}, + ExpectedSnapshotLSN: 1, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(1), tm.appendedLSN) + require.Equal(t, storage.LSN(1), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + Commit{ + TransactionID: 2, + ReferenceUpdates: git.ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(2), tm.appendedLSN) + require.Equal(t, storage.LSN(2), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + CloseManager{}, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN): storage.LSN(2).ToProto(), + }, + // The appended-but-not-committed log entry from transaction 1 is also removed. + Directory: testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }, + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: gittest.FilesOrReftables( + &ReferencesState{ + FilesBackend: &FilesBackendState{ + LooseReferences: map[git.ReferenceName]git.ObjectID{ + "refs/heads/main": setup.Commits.First.OID, + "refs/heads/branch-1": setup.Commits.First.OID, + }, + }, + }, &ReferencesState{ + ReftableBackend: &ReftableBackendState{ + Tables: []ReftableTable{ + { + MinIndex: 1, + MaxIndex: 1, + References: []git.Reference{ + { + Name: "HEAD", + Target: "refs/heads/main", + IsSymbolic: true, + }, + }, + }, + { + MinIndex: 2, + MaxIndex: 2, + References: []git.Reference{ + { + Name: "refs/heads/branch-1", + Target: setup.Commits.First.OID.String(), + }, + }, + }, + { + MinIndex: 3, + MaxIndex: 3, + References: []git.Reference{ + { + Name: "refs/heads/main", + Target: setup.Commits.First.OID.String(), + }, + }, + }, + }, + }, + }, + ), + }, + }, + }, + }, + { + desc: "transaction manager crashes after appending", + steps: steps{ + StartManager{ + Hooks: testTransactionHooks{ + BeforeCommitLogEntry: func(c hookContext) { + assert.Equal(t, storage.LSN(1), c.manager.appendedLSN) + assertAppendedEntries(t, c.manager, []storage.LSN{1}) + simulateCrashHook()(c) + }, + WaitForTransactionsWhenClosing: true, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 1, + RelativePaths: []string{setup.RelativePath}, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(0), tm.appendedLSN) + require.Equal(t, storage.LSN(0), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + Commit{ + TransactionID: 1, + ReferenceUpdates: git.ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + ExpectedError: storage.ErrTransactionProcessingStopped, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + // Appended entries are persisted. + testhelper.RequireDirectoryState(t, tm.stateDirectory, "", gittest.FilesOrReftables(testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/MANIFEST": manifestDirectoryEntry(&gitalypb.LogEntry{ + RelativePath: setup.RelativePath, + ReferenceTransactions: []*gitalypb.LogEntry_ReferenceTransaction{ + { + Changes: []*gitalypb.LogEntry_ReferenceTransaction_Change{ + { + ReferenceName: []byte("refs/heads/branch-1"), + NewOid: []byte(setup.Commits.First.OID), + }, + }, + }, + }, + Operations: []*gitalypb.LogEntry_Operation{ + { + Operation: &gitalypb.LogEntry_Operation_CreateHardLink_{ + CreateHardLink: &gitalypb.LogEntry_Operation_CreateHardLink{ + SourcePath: []byte("1"), + DestinationPath: []byte(filepath.Join(setup.RelativePath, "refs/heads/branch-1")), + }, + }, + }, + }, + }), + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, + }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ + 1: {{"refs/heads/branch-1": git.ReferenceUpdate{NewOID: setup.Commits.First.OID}}}, + }))) + }), + StartManager{}, + AssertManager{}, + // No-op, just to ensure the manager is initialized. + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + // Both of them are set to 0 now. + require.Equal(t, storage.LSN(0), tm.appendedLSN) + require.Equal(t, storage.LSN(0), tm.committedLSN) + // Appended entries are removed now. + testhelper.RequireDirectoryState(t, tm.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + assertAppendedEntries(t, tm, nil) + }), + Begin{ + TransactionID: 2, + RelativePaths: []string{setup.RelativePath}, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: git.ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + // Apply new commit only. The prior commit was rejected. + require.Equal(t, storage.LSN(1), tm.appendedLSN) + require.Equal(t, storage.LSN(1), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN): storage.LSN(1).ToProto(), + }, + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: gittest.FilesOrReftables( + &ReferencesState{ + FilesBackend: &FilesBackendState{ + LooseReferences: map[git.ReferenceName]git.ObjectID{ + "refs/heads/branch-1": setup.Commits.Second.OID, + }, + }, + }, &ReferencesState{ + ReftableBackend: &ReftableBackendState{ + Tables: []ReftableTable{ + { + MinIndex: 1, + MaxIndex: 1, + References: []git.Reference{ + { + Name: "HEAD", + Target: "refs/heads/main", + IsSymbolic: true, + }, + }, + }, + { + MinIndex: 2, + MaxIndex: 2, + References: []git.Reference{ + { + Name: "refs/heads/branch-1", + Target: setup.Commits.Second.OID.String(), + }, + }, + }, + }, + }, + }, + ), + }, + }, + }, + }, + } +} + // BenchmarkTransactionManager benchmarks the transaction throughput of the TransactionManager at various levels // of concurrency and transaction sizes. func BenchmarkTransactionManager(b *testing.B) { -- GitLab From eb014ff44afa7047806b3d1ea0faf7c510ce27b4 Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Mon, 21 Oct 2024 14:00:29 +0700 Subject: [PATCH 6/7] storagemgr: Extract transaction preparation steps out This commit extracts a portion of transaction preparation out as independent functions. There are essentially no changes in terms of functionality. Those independent functions will be used by Raft in later commits. When Raft is enabled, cluster-wide changes are recorded as internal log entries. Those entries use the same LSN sequence as normal log entries. We don't want to maintain two parallel indices systems. Hence, The raft manager must back-filled entries back to the WAL transaction manager. Internal entries have associated LSN and could not be changed. As a result, the transaction manager needs a "fast-track" to insert certain log entries at desirable locations. --- .../partition/transaction_manager.go | 211 ++++++++++-------- 1 file changed, 114 insertions(+), 97 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 44ad0b088e..e49dcbc64b 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -1134,8 +1134,6 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.Commit", nil) defer span.Finish() - transaction.result = make(resultChannel, 1) - if transaction.repositoryTarget() && !transaction.repositoryExists { // Determine if the repository was created in this transaction and stage its state // for committing if so. @@ -1149,6 +1147,43 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact } } + if err := mgr.prepareCommit(ctx, transaction); err != nil { + return err + } + + if err := func() error { + defer trace.StartRegion(ctx, "commit queue").End() + transaction.metrics.commitQueueDepth.Inc() + defer transaction.metrics.commitQueueDepth.Dec() + defer prometheus.NewTimer(mgr.metrics.commitQueueWaitSeconds).ObserveDuration() + + select { + case mgr.admissionQueue <- transaction: + transaction.admitted = true + return nil + case <-ctx.Done(): + return ctx.Err() + case <-mgr.closing: + return storage.ErrTransactionProcessingStopped + } + }(); err != nil { + return err + } + + defer trace.StartRegion(ctx, "result wait").End() + select { + case err := <-transaction.result: + return unwrapExpectedError(err) + case <-ctx.Done(): + return ctx.Err() + case <-mgr.closed: + return storage.ErrTransactionProcessingStopped + } +} + +func (mgr *TransactionManager) prepareCommit(ctx context.Context, transaction *Transaction) error { + transaction.result = make(resultChannel, 1) + // Create a directory to store all staging files. if err := os.Mkdir(transaction.walFilesPath(), mode.Directory); err != nil { return fmt.Errorf("create wal files directory: %w", err) @@ -1270,34 +1305,7 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact } } - if err := func() error { - defer trace.StartRegion(ctx, "commit queue").End() - transaction.metrics.commitQueueDepth.Inc() - defer transaction.metrics.commitQueueDepth.Dec() - defer prometheus.NewTimer(mgr.metrics.commitQueueWaitSeconds).ObserveDuration() - - select { - case mgr.admissionQueue <- transaction: - transaction.admitted = true - return nil - case <-ctx.Done(): - return ctx.Err() - case <-mgr.closing: - return storage.ErrTransactionProcessingStopped - } - }(); err != nil { - return err - } - - defer trace.StartRegion(ctx, "result wait").End() - select { - case err := <-transaction.result: - return unwrapExpectedError(err) - case <-ctx.Done(): - return ctx.Err() - case <-mgr.closed: - return storage.ErrTransactionProcessingStopped - } + return nil } // replaceObjectDirectory replaces the snapshot repository's object directory @@ -2247,95 +2255,104 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.processTransaction", nil) defer span.Finish() - if err := func() (commitErr error) { - repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath) + if err := func() error { + logEntry, logEntryPath, err := mgr.packageLogEntry(ctx, transaction) if err != nil { - return fmt.Errorf("does repository exist: %w", err) + return err } - logEntry := &gitalypb.LogEntry{ - RelativePath: transaction.relativePath, - } + return mgr.proposeLogEntry(ctx, transaction.objectDependencies, logEntry, logEntryPath) + }(); err != nil { + transaction.result <- err + return nil + } - if transaction.repositoryCreation != nil && repositoryExists { - return ErrRepositoryAlreadyExists - } else if transaction.repositoryCreation == nil && !repositoryExists { - return storage.ErrRepositoryNotFound - } + mgr.awaitingTransactions[mgr.committedLSN] = transaction.result - alternateRelativePath, err := mgr.verifyAlternateUpdate(ctx, transaction) - if err != nil { - return fmt.Errorf("verify alternate update: %w", err) - } + return nil +} - if err := mgr.setupStagingRepository(ctx, transaction, alternateRelativePath); err != nil { - return fmt.Errorf("setup staging snapshot: %w", err) - } +// packageLogEntry verifies the transaction and packaged its content to respective log entry. +func (mgr *TransactionManager) packageLogEntry(ctx context.Context, transaction *Transaction) (*gitalypb.LogEntry, string, error) { + repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath) + if err != nil { + return nil, "", fmt.Errorf("does repository exist: %w", err) + } - // Verify that all objects this transaction depends on are present in the repository. The dependency - // objects are the reference tips set in the transaction and the objects the transaction's packfile - // is based on. If an object dependency is missing, the transaction is aborted as applying it would - // result in repository corruption. - if err := mgr.verifyObjectsExist(ctx, transaction.stagingRepository, transaction.objectDependencies); err != nil { - return fmt.Errorf("verify object dependencies: %w", err) - } + logEntry := &gitalypb.LogEntry{ + RelativePath: transaction.relativePath, + } - if transaction.repositoryCreation == nil && transaction.runHousekeeping == nil && !transaction.deleteRepository { - logEntry.ReferenceTransactions, err = mgr.verifyReferences(ctx, transaction) - if err != nil { - return fmt.Errorf("verify references: %w", err) - } - } + if transaction.repositoryCreation != nil && repositoryExists { + return nil, "", ErrRepositoryAlreadyExists + } else if transaction.repositoryCreation == nil && !repositoryExists { + return nil, "", storage.ErrRepositoryNotFound + } - if transaction.customHooksUpdated { - // Log a deletion of the existing custom hooks so they are removed before the - // new ones are put in place. - if err := transaction.walEntry.RecordDirectoryRemoval( - mgr.storagePath, - filepath.Join(transaction.relativePath, repoutil.CustomHooksDir), - ); err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("record custom hook removal: %w", err) - } - } + alternateRelativePath, err := mgr.verifyAlternateUpdate(ctx, transaction) + if err != nil { + return nil, "", fmt.Errorf("verify alternate update: %w", err) + } - if transaction.deleteRepository { - logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} + if err := mgr.setupStagingRepository(ctx, transaction, alternateRelativePath); err != nil { + return nil, "", fmt.Errorf("setup staging snapshot: %w", err) + } - if err := transaction.walEntry.RecordDirectoryRemoval( - mgr.storagePath, - transaction.relativePath, - ); err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("record repository removal: %w", err) - } + // Verify that all objects this transaction depends on are present in the repository. The dependency + // objects are the reference tips set in the transaction and the objects the transaction's packfile + // is based on. If an object dependency is missing, the transaction is aborted as applying it would + // result in repository corruption. + if err := mgr.verifyObjectsExist(ctx, transaction.stagingRepository, transaction.objectDependencies); err != nil { + return nil, "", fmt.Errorf("verify object dependencies: %w", err) + } - if err := transaction.KV().Delete(relativePathKey(transaction.relativePath)); err != nil { - return fmt.Errorf("delete relative path: %w", err) - } + if transaction.repositoryCreation == nil && transaction.runHousekeeping == nil && !transaction.deleteRepository { + logEntry.ReferenceTransactions, err = mgr.verifyReferences(ctx, transaction) + if err != nil { + return nil, "", fmt.Errorf("verify references: %w", err) } + } - if transaction.runHousekeeping != nil { - housekeepingEntry, err := mgr.verifyHousekeeping(ctx, transaction) - if err != nil { - return fmt.Errorf("verifying pack refs: %w", err) - } - logEntry.Housekeeping = housekeepingEntry + if transaction.customHooksUpdated { + // Log a deletion of the existing custom hooks so they are removed before the + // new ones are put in place. + if err := transaction.walEntry.RecordDirectoryRemoval( + mgr.storagePath, + filepath.Join(transaction.relativePath, repoutil.CustomHooksDir), + ); err != nil && !errors.Is(err, fs.ErrNotExist) { + return nil, "", fmt.Errorf("record custom hook removal: %w", err) } + } + + if transaction.deleteRepository { + logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} - if err := mgr.verifyKeyValueOperations(ctx, transaction); err != nil { - return fmt.Errorf("verify key-value operations: %w", err) + if err := transaction.walEntry.RecordDirectoryRemoval( + mgr.storagePath, + transaction.relativePath, + ); err != nil && !errors.Is(err, fs.ErrNotExist) { + return nil, "", fmt.Errorf("record repository removal: %w", err) } - logEntry.Operations = transaction.walEntry.Operations() + if err := transaction.KV().Delete(relativePathKey(transaction.relativePath)); err != nil { + return nil, "", fmt.Errorf("delete relative path: %w", err) + } + } - return mgr.proposeLogEntry(ctx, transaction.objectDependencies, logEntry, transaction.walFilesPath()) - }(); err != nil { - transaction.result <- err - return nil + if transaction.runHousekeeping != nil { + housekeepingEntry, err := mgr.verifyHousekeeping(ctx, transaction) + if err != nil { + return nil, "", fmt.Errorf("verifying pack refs: %w", err) + } + logEntry.Housekeeping = housekeepingEntry } - mgr.awaitingTransactions[mgr.committedLSN] = transaction.result + if err := mgr.verifyKeyValueOperations(ctx, transaction); err != nil { + return nil, "", fmt.Errorf("verify key-value operations: %w", err) + } - return nil + logEntry.Operations = transaction.walEntry.Operations() + return logEntry, transaction.walFilesPath(), nil } // verifyKeyValueOperations checks the key-value operations of the transaction for conflicts and includes -- GitLab From 538f17a4d2ac729cc70f15bad55f0cf3e73c33bf Mon Sep 17 00:00:00 2001 From: Quang-Minh Nguyen Date: Fri, 1 Nov 2024 17:19:18 +0700 Subject: [PATCH 7/7] WIP: remove to debug race test --- .../storage/storagemgr/partition/testhelper_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index fac426aa5a..b5411472a9 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -602,12 +602,12 @@ func RequireRepositories(tb testing.TB, ctx context.Context, cfg config.Cfg, sto for _, relativePath := range expectedRelativePaths { func() { defer func() { - // RequireRepositoryState works within a repository and doesn't thus print out the - // relative path of the repository that failed. If the call failed the test, - // print out the relative path to ease troubleshooting. - if tb.Failed() { - require.Failf(tb, "unexpected repository state", "relative path: %q", relativePath) - } + // // RequireRepositoryState works within a repository and doesn't thus print out the + // // relative path of the repository that failed. If the call failed the test, + // // print out the relative path to ease troubleshooting. + // if tb.Failed() { + // require.Failf(tb, "unexpected repository state", "relative path: %q", relativePath) + // } }() RequireRepositoryState(tb, ctx, cfg, buildRepository(relativePath), expected[relativePath]) -- GitLab