diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index 794a6bc2aafca8e685e5c1059c60b440ee78d1f5..8095d363d73f958e22f8ca32910442a66d4764d1 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -311,6 +311,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti metrics: mgr.metrics, } + mgr.createSnapshotLockIfNeeded(txn.snapshotLSN) mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Add(1) defer mgr.snapshotLocks[txn.snapshotLSN].activeSnapshotters.Done() readReady := mgr.snapshotLocks[txn.snapshotLSN].applied @@ -1004,12 +1005,18 @@ type TransactionManager struct { metrics ManagerMetrics } +// testHooks defines hooks for testing various stages of WAL log operations. type testHooks struct { - beforeInitialization func() - beforeAppendLogEntry func() - beforeApplyLogEntry func() - beforeStoreAppliedLSN func() - beforeRunExiting func() + // beforeInitialization is triggered before initialization starts. + beforeInitialization func() + // beforeAppendLogEntry is triggered before appending a log entry at the target LSN. + beforeAppendLogEntry func(targetLSN storage.LSN) + // beforeApplyLogEntry is triggered before applying a log entry at the target LSN. + beforeApplyLogEntry func(targetLSN storage.LSN) + // beforeStoreAppliedLSN is triggered before storing the target applied LSN. + beforeStoreAppliedLSN func(targetLSN storage.LSN) + // beforeRunExiting is triggered before the run loop exits. + beforeRunExiting func() } // NewTransactionManager returns a new TransactionManager for the given repository. @@ -1058,9 +1065,9 @@ func NewTransactionManager( testHooks: testHooks{ beforeInitialization: func() {}, - beforeAppendLogEntry: func() {}, - beforeApplyLogEntry: func() {}, - beforeStoreAppliedLSN: func() {}, + beforeAppendLogEntry: func(storage.LSN) {}, + beforeApplyLogEntry: func(storage.LSN) {}, + beforeStoreAppliedLSN: func(storage.LSN) {}, beforeRunExiting: func() {}, }, } @@ -1141,8 +1148,9 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact if err := transaction.stageKeyValueOperations(); err != nil { return fmt.Errorf("stage key-value operations: %w", err) } + transaction.manifest.Operations = transaction.walEntry.Operations() - if err := transaction.flushLogEntry(ctx); err != nil { + if err := wal.WriteManifest(ctx, transaction.walEntry.Directory(), transaction.manifest); err != nil { return fmt.Errorf("flush log entry: %w", err) } @@ -2114,16 +2122,17 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned } transaction.manifest.Housekeeping = housekeepingEntry } + transaction.manifest.Operations = transaction.walEntry.Operations() // The transaction has already written the manifest to the disk as a read-only file // before queuing for commit. Remove the old file so we can replace it below. - if err := os.Remove(manifestPath(transaction.walEntry.Directory())); err != nil { + if err := wal.RemoveManifest(ctx, transaction.walEntry.Directory()); err != nil { return fmt.Errorf("remove outdated manifest") } // Operations working on the staging snapshot add more files into the log entry, - // and modify the manifest. Flush it to the disk to persist the new changes. - if err := transaction.flushLogEntry(ctx); err != nil { + // and modify the manifest. + if err := wal.WriteManifest(ctx, transaction.walEntry.Directory(), transaction.manifest); err != nil { return fmt.Errorf("flush log entry: %w", err) } } @@ -2154,7 +2163,7 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned return fmt.Errorf("verify file system operations: %w", err) } - mgr.testHooks.beforeAppendLogEntry() + mgr.testHooks.beforeAppendLogEntry(mgr.logManager.AppendedLSN() + 1) if err := mgr.appendLogEntry(ctx, transaction.objectDependencies, transaction.manifest, transaction.walFilesPath()); err != nil { return fmt.Errorf("append log entry: %w", err) } @@ -2376,13 +2385,13 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { // Create a snapshot lock for the applied LSN as it is used for synchronizing // the snapshotters with the log application. - mgr.snapshotLocks[mgr.appliedLSN] = &snapshotLock{applied: make(chan struct{})} + mgr.createSnapshotLockIfNeeded(mgr.appliedLSN) close(mgr.snapshotLocks[mgr.appliedLSN].applied) // Each unapplied log entry should have a snapshot lock as they are created in normal // operation when committing a log entry. Recover these entries. for i := mgr.appliedLSN + 1; i <= mgr.logManager.AppendedLSN(); i++ { - mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})} + mgr.createSnapshotLockIfNeeded(i) } mgr.testHooks.beforeInitialization() @@ -2416,11 +2425,6 @@ func (mgr *TransactionManager) getAbsolutePath(relativePath ...string) string { return filepath.Join(append([]string{mgr.storagePath}, relativePath...)...) } -// manifestPath returns the manifest file's path in the log entry. -func manifestPath(logEntryPath string) string { - return filepath.Join(logEntryPath, "MANIFEST") -} - // packFilePath returns a log entry's pack file's absolute path in the wal files directory. func packFilePath(walFiles string) string { return filepath.Join(walFiles, "transaction.pack") @@ -2952,57 +2956,15 @@ func (mgr *TransactionManager) applyReferenceTransaction(ctx context.Context, ch return nil } -// flushLogEntry writes the log entry's manifest to the disk, and fsyncs the entire -// log entry. -func (txn *Transaction) flushLogEntry(ctx context.Context) error { - txn.manifest.Operations = txn.walEntry.Operations() - - manifestBytes, err := proto.Marshal(txn.manifest) - if err != nil { - return fmt.Errorf("marshal manifest: %w", err) - } - - // Finalize the log entry by writing the MANIFEST file into the log entry's directory. - manifestPath := manifestPath(txn.walEntry.Directory()) - if err := os.WriteFile(manifestPath, manifestBytes, mode.File); err != nil { - return fmt.Errorf("write manifest: %w", err) - } - - // Sync the log entry completely before committing it. - // - // Ideally the log entry would be completely flushed to the disk before queuing the - // transaction for commit to ensure we don't write a lot to the disk while in the critical - // section. We currently stage some of the files only in the critical section though. This - // is due to currently lacking conflict checks which prevents staging the log entry completely - // before queuing it for commit. - // - // See https://gitlab.com/gitlab-org/gitaly/-/issues/5892 for more details. Once the issue is - // addressed, we could stage the transaction entirely before queuing it for commit, and thus not - // need to sync here. - if err := safe.NewSyncer().SyncRecursive(ctx, txn.walEntry.Directory()); err != nil { - return fmt.Errorf("synchronizing WAL directory: %w", err) - } - - return nil -} - // appendLogEntry appends a log entry of a transaction to the write-ahead log. After the log entry is appended to WAL, // the corresponding snapshot lock and in-memory reference for the latest appended LSN is created. func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { defer trace.StartRegion(ctx, "appendLogEntry").End() - // Pre-setup an snapshot lock entry for the assumed appended LSN location. - mgr.mutex.Lock() - mgr.snapshotLocks[mgr.logManager.AppendedLSN()+1] = &snapshotLock{applied: make(chan struct{})} - mgr.mutex.Unlock() - // After this latch block, the transaction is committed and all subsequent transactions // are guaranteed to read it. appendedLSN, err := mgr.logManager.AppendLogEntry(logEntryPath) if err != nil { - mgr.mutex.Lock() - delete(mgr.snapshotLocks, mgr.logManager.AppendedLSN()+1) - mgr.mutex.Unlock() return fmt.Errorf("append log entry: %w", err) } @@ -3023,7 +2985,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS defer prometheus.NewTimer(mgr.metrics.transactionApplicationDurationSeconds).ObserveDuration() - logEntry, err := mgr.readLogEntry(lsn) + manifest, err := wal.ReadManifest(mgr.logManager.GetEntryPath(lsn)) if err != nil { return fmt.Errorf("read log entry: %w", err) } @@ -3032,15 +2994,22 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS // the new state to the repository. No new snapshotters can arrive at this point. All // new transactions would be waiting for the committed log entry we are about to apply. previousLSN := lsn - 1 - mgr.snapshotLocks[previousLSN].activeSnapshotters.Wait() + + mgr.mutex.Lock() + previousLock := mgr.snapshotLocks[previousLSN] + mgr.mutex.Unlock() + + // This might take a while, it should better wait out side of mutex lock. + previousLock.activeSnapshotters.Wait() + mgr.mutex.Lock() delete(mgr.snapshotLocks, previousLSN) mgr.mutex.Unlock() - mgr.testHooks.beforeApplyLogEntry() + mgr.testHooks.beforeApplyLogEntry(lsn) if err := mgr.db.Update(func(tx keyvalue.ReadWriter) error { - if err := applyOperations(ctx, safe.NewSyncer().Sync, mgr.storagePath, mgr.logManager.GetEntryPath(lsn), logEntry.GetOperations(), tx); err != nil { + if err := applyOperations(ctx, safe.NewSyncer().Sync, mgr.storagePath, mgr.logManager.GetEntryPath(lsn), manifest.GetOperations(), tx); err != nil { return fmt.Errorf("apply operations: %w", err) } @@ -3056,29 +3025,17 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS // Notify the transactions waiting for this log entry to be applied prior to take their // snapshot. + mgr.mutex.Lock() + mgr.createSnapshotLockIfNeeded(lsn) close(mgr.snapshotLocks[lsn].applied) + mgr.mutex.Unlock() return nil } -// readLogEntry returns the log entry from the given position in the log. -func (mgr *TransactionManager) readLogEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { - manifestBytes, err := os.ReadFile(manifestPath(mgr.logManager.GetEntryPath(lsn))) - if err != nil { - return nil, fmt.Errorf("read manifest: %w", err) - } - - var logEntry gitalypb.LogEntry - if err := proto.Unmarshal(manifestBytes, &logEntry); err != nil { - return nil, fmt.Errorf("unmarshal manifest: %w", err) - } - - return &logEntry, nil -} - // storeAppliedLSN stores the partition's applied LSN in the database. func (mgr *TransactionManager) storeAppliedLSN(lsn storage.LSN) error { - mgr.testHooks.beforeStoreAppliedLSN() + mgr.testHooks.beforeStoreAppliedLSN(lsn) if err := mgr.setKey(keyAppliedLSN, lsn.ToProto()); err != nil { return err @@ -3198,3 +3155,9 @@ func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) bool { } return removedAnyEntry } + +func (mgr *TransactionManager) createSnapshotLockIfNeeded(lsn storage.LSN) { + if _, exist := mgr.snapshotLocks[lsn]; !exist { + mgr.snapshotLocks[lsn] = &snapshotLock{applied: make(chan struct{})} + } +} diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go index a16f5d9b21e82c4be0eec9348dcc08ba2301c9ea..26d8c1d3339e6dfee2d546d07e000af354012a28 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go @@ -836,9 +836,7 @@ func generateAlternateTests(t *testing.T, setup testTransactionSetup) []transact CloseManager{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -914,9 +912,7 @@ func generateAlternateTests(t *testing.T, setup testTransactionSetup) []transact CloseManager{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go index 9bfe4d2906965a4aa323976a516492bb46a42ad0..9b1ac217cacf770014f834487a41b578ddab960d 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go @@ -464,9 +464,7 @@ func generateDefaultBranchTests(t *testing.T, setup testTransactionSetup) []tran steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index 072970aafb70ebe980a75cae084331bb86a93819..529afda61dc1cb4d430bf70d8e8ffb390333a7ac 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -20,15 +20,14 @@ type hookFunc func(hookContext) type hookContext struct { // closeManager calls the calls Close on the TransactionManager. closeManager func() + // lsn stores the LSN context when the hook is triggered. + lsn storage.LSN } // installHooks takes the hooks in the test setup and configures them in the TransactionManager. func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, hooks testTransactionHooks) { for destination, source := range map[*func()]hookFunc{ - &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, - &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, - &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, - &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, + &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, &mgr.testHooks.beforeRunExiting: func(hookContext) { if hooks.WaitForTransactionsWhenClosing { inflightTransactions.Wait() @@ -46,6 +45,23 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, } } } + for destination, source := range map[*func(storage.LSN)]hookFunc{ + &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, + &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, + &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, + } { + if source != nil { + // Capture the hook function, we shouldn't store the loop variable as a test hook since it will be + // overridden on later iterations. + runHook := source + *destination = func(lsn storage.LSN) { + runHook(hookContext{ + closeManager: mgr.Close, + lsn: lsn, + }) + } + } + } } func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { @@ -96,9 +112,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -145,9 +159,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -493,9 +505,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -586,9 +596,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -679,9 +687,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go index c20c9a496681c90db096c14a0b28babbc2dfcf6b..42f413f69cdccebc346edbc78130af5d229dee5b 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go @@ -1334,9 +1334,7 @@ func generateHousekeepingPackRefsTests(t *testing.T, ctx context.Context, testPa CloseManager{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go index 589bada517520f2c35ffef95057e681b42bbd68f..d798d5ef109e69f525cc26810a535e52b256de7c 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go @@ -1,11 +1,13 @@ package partition import ( + "testing" + "github.com/dgraph-io/badger/v4" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" ) -func generateKeyValueTests(setup testTransactionSetup) []transactionTestCase { +func generateKeyValueTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { return []transactionTestCase{ { desc: "set keys with values", @@ -615,9 +617,7 @@ func generateKeyValueTests(setup testTransactionSetup) []transactionTestCase { steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go index 9fd8d1a548c885599252def12934bafbed2339d6..ee1c32cd974ae450b896511a5e0cc30154196722 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go @@ -367,9 +367,7 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t RemoveRepository{}, StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -410,9 +408,7 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t RemoveRepository{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -887,9 +883,7 @@ func generateDeleteRepositoryTests(t *testing.T, setup testTransactionSetup) []t steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -930,9 +924,7 @@ func generateDeleteRepositoryTests(t *testing.T, setup testTransactionSetup) []t steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index ccf028305a91171ed7d5644fba582c9bb19fb17a..7f2c97bb10e80eb2df6a2d209cd9287bd90643b4 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -30,6 +30,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/conflict/refdb" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -40,6 +41,13 @@ import ( // TransactionManager.Run execution. var errSimulatedCrash = errors.New("simulated crash") +// simulateCrashHook returns a hook function that panics with errSimulatedCrash. +var simulateCrashHook = func() func(hookContext) { + return func(hookContext) { + panic(errSimulatedCrash) + } +} + func manifestDirectoryEntry(expected *gitalypb.LogEntry) testhelper.DirectoryEntry { return testhelper.DirectoryEntry{ Mode: mode.File, @@ -351,7 +359,7 @@ func TestTransactionManager(t *testing.T) { "Housekeeping/RepackingConcurrent": generateHousekeepingRepackingConcurrentTests(t, ctx, setup), "Housekeeping/CommitGraphs": generateHousekeepingCommitGraphsTests(t, ctx, setup), "Consumer": generateConsumerTests(t, setup), - "KeyValue": generateKeyValueTests(setup), + "KeyValue": generateKeyValueTests(t, setup), } for desc, tests := range subTests { @@ -507,9 +515,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -867,9 +873,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio Prune{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -1439,13 +1443,11 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeReadAppliedLSN: func(hookContext) { - // Raise a panic when the manager is about to read the applied log - // index when initializing. In reality this would crash the server but - // in tests it serves as a way to abort the initialization in correct - // location. - panic(errSimulatedCrash) - }, + // Raise a panic when the manager is about to read the applied log + // index when initializing. In reality this would crash the server but + // in tests it serves as a way to abort the initialization in correct + // location. + BeforeReadAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -2255,7 +2257,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t expectedManifest := manifestDirectoryEntry(refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID)) manifestBytes, err := proto.Marshal(expectedManifest.Content.(proto.Message)) require.NoError(t, err) - require.NoError(t, os.WriteFile(manifestPath(logEntryPath), manifestBytes, mode.File)) + require.NoError(t, os.WriteFile(wal.ManifestPath(logEntryPath), manifestBytes, mode.File)) tracker := log.NewPositionTracker() if setup.Consumer != nil { diff --git a/internal/gitaly/storage/wal/manifest.go b/internal/gitaly/storage/wal/manifest.go new file mode 100644 index 0000000000000000000000000000000000000000..4a5592d0c737fe7cde9c0c5106f554f13bbd8c19 --- /dev/null +++ b/internal/gitaly/storage/wal/manifest.go @@ -0,0 +1,53 @@ +package wal + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/protobuf/proto" +) + +// ManifestPath returns the manifest file's path in the log entry. +func ManifestPath(logEntryPath string) string { + return filepath.Join(logEntryPath, "MANIFEST") +} + +// ReadManifest returns the log entry's manifest from the given position in the log. +func ReadManifest(stateDir string) (*gitalypb.LogEntry, error) { + manifestBytes, err := os.ReadFile(ManifestPath(stateDir)) + if err != nil { + return nil, fmt.Errorf("read manifest: %w", err) + } + + var logEntry gitalypb.LogEntry + if err := proto.Unmarshal(manifestBytes, &logEntry); err != nil { + return nil, fmt.Errorf("unmarshal manifest: %w", err) + } + + return &logEntry, nil +} + +// WriteManifest writes the log entry's manifest to the disk. +func WriteManifest(ctx context.Context, stateDir string, manifest *gitalypb.LogEntry) error { + manifestBytes, err := proto.Marshal(manifest) + if err != nil { + return fmt.Errorf("marshal manifest: %w", err) + } + + // Finalize the log entry by writing the MANIFEST file into the log entry's directory. + manifestPath := ManifestPath(stateDir) + if err := os.WriteFile(manifestPath, manifestBytes, mode.File); err != nil { + return fmt.Errorf("write manifest: %w", err) + } + + return nil +} + +// RemoveManifest removes the existing manifest file. +func RemoveManifest(ctx context.Context, stateDir string) error { + return os.Remove(ManifestPath(stateDir)) +} diff --git a/internal/gitaly/storage/wal/manifest_test.go b/internal/gitaly/storage/wal/manifest_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f9e0f52fd93f87edcafb624f233592ea88ccb04f --- /dev/null +++ b/internal/gitaly/storage/wal/manifest_test.go @@ -0,0 +1,85 @@ +package wal + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" +) + +func TestManifest(t *testing.T) { + t.Parallel() + + t.Run("manifest path", func(t *testing.T) { + t.Parallel() + require.Equal(t, + filepath.Join("path", "to", "entry", "MANIFEST"), + ManifestPath(filepath.Join("path", "to", "entry")), + ) + }) + + t.Run("successful read/write", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + tmpDir := testhelper.TempDir(t) + + // Create a manifest with different operation types + entry := NewEntry(tmpDir) + entry.SetKey([]byte("key1"), []byte("value1")) + entry.DeleteKey([]byte("key2")) + entry.CreateDirectory("dir1") + manifest := &gitalypb.LogEntry{Operations: entry.Operations()} + + // Write and verify permissions + require.NoError(t, WriteManifest(ctx, entry.Directory(), manifest)) + info, err := os.Stat(ManifestPath(tmpDir)) + require.NoError(t, err) + require.Equal(t, mode.File.Perm(), info.Mode().Perm()) + + // Read and verify content + readManifest, err := ReadManifest(entry.Directory()) + require.NoError(t, err) + testhelper.ProtoEqual(t, manifest.GetOperations(), readManifest.GetOperations()) + + // Test removal + require.NoError(t, RemoveManifest(ctx, tmpDir)) + require.NoFileExists(t, ManifestPath(tmpDir)) + }) + + t.Run("read non-existent manifest", func(t *testing.T) { + t.Parallel() + tmpDir := testhelper.TempDir(t) + _, err := ReadManifest(tmpDir) + require.Error(t, err) + require.Contains(t, err.Error(), "read manifest") + }) + + t.Run("read corrupted manifest", func(t *testing.T) { + t.Parallel() + tmpDir := testhelper.TempDir(t) + require.NoError(t, os.WriteFile(ManifestPath(tmpDir), []byte("corrupted"), mode.File)) + _, err := ReadManifest(tmpDir) + require.Error(t, err) + require.Contains(t, err.Error(), "unmarshal manifest") + }) + + t.Run("write with insufficient permission", func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + tmpDir := testhelper.TempDir(t) + + require.NoError(t, os.Chmod(tmpDir, 0o000)) + t.Cleanup(func() { require.NoError(t, os.Chmod(tmpDir, 0o755)) }) + + manifest := &gitalypb.LogEntry{} + err := WriteManifest(ctx, tmpDir, manifest) + require.Error(t, err) + require.Contains(t, err.Error(), "write manifest") + }) +}