diff --git a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go index c7f5b00464ddb7053962cf6040b3fc21d39b8655..05bc906871c2db6265fd203337242f52db60983a 100644 --- a/internal/gitaly/storage/storagemgr/partition/testhelper_test.go +++ b/internal/gitaly/storage/storagemgr/partition/testhelper_test.go @@ -606,7 +606,7 @@ func RequireRepositories(tb testing.TB, ctx context.Context, cfg config.Cfg, sto // relative path of the repository that failed. If the call failed the test, // print out the relative path to ease troubleshooting. if tb.Failed() { - require.Failf(tb, "unexpected repository state", "relative path: %q", relativePath) + tb.Log("unexpected repository state", "relative path: %q", relativePath) } }() @@ -627,6 +627,13 @@ func RequireDatabase(tb testing.TB, ctx context.Context, database keyvalue.Trans if expectedState == nil { expectedState = DatabaseState{} } + // Most of the time, persisted committedLSN is equal to appliedLSN except for some intentional tests. Thus, if + // appliedLSN is asserted, the test should backfill committedLSN if it's not there. + if appliedLSN, appliedExist := expectedState[string(keyAppliedLSN)]; appliedExist { + if _, committedExist := expectedState[string(keyCommittedLSN)]; !committedExist { + expectedState[string(keyCommittedLSN)] = appliedLSN + } + } actualState := DatabaseState{} unexpectedKeys := []string{} @@ -705,6 +712,8 @@ type testTransactionHooks struct { BeforeApplyLogEntry hookFunc // BeforeAppendLogEntry is called before a log entry is appended to the log. BeforeAppendLogEntry hookFunc + // BeforeAppendLogEntry is called before a log entry is marked as committed. + BeforeCommitLogEntry hookFunc // AfterDeleteLogEntry is called after a log entry is deleted. AfterDeleteLogEntry hookFunc // BeforeReadAppliedLSN is invoked before the applied LSN is read. diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go index f6b7e1ab420ea55eab5167c676b7aa73489b1053..0f83e04763643042d9e7a5bf8220e8f377a554c6 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager.go @@ -101,6 +101,8 @@ var ( // keyAppliedLSN is the database key storing a partition's last applied log entry's LSN. keyAppliedLSN = []byte("applied_lsn") + // keyCommittedLSN is the database key storing a partition's last committed log entry's LSN. + keyCommittedLSN = []byte("committed_lsn") ) const relativePathKeyPrefix = "r/" @@ -350,7 +352,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context, opts storage.BeginOpti txn := &Transaction{ write: opts.Write, commit: mgr.commit, - snapshotLSN: mgr.appendedLSN, + snapshotLSN: mgr.committedLSN, finished: make(chan struct{}), relativePath: relativePath, metrics: mgr.metrics, @@ -925,7 +927,7 @@ func (p *consumerPosition) setPosition(pos storage.LSN) { // - The reference verification failures can be ignored instead of aborting the entire transaction. // If done, the references that failed verification are dropped from the transaction but the updates // that passed verification are still performed. -// 2. The transaction is appended to the write-ahead log. Once the write has been logged, it is effectively +// 2. The transaction is committed to the write-ahead log. Once the write has been logged, it is effectively // committed and will be applied to the repository even after restarting. // 3. The transaction is applied from the write-ahead log to the repository by actually performing the reference // changes. @@ -998,7 +1000,7 @@ type TransactionManager struct { // initializationSuccessful is set if the TransactionManager initialized successfully. If it didn't, // transactions will fail to begin. initializationSuccessful bool - // mutex guards access to snapshotLocks and appendedLSN. These fields are accessed by both + // mutex guards access to snapshotLocks and committedLSN. These fields are accessed by both // Run and Begin which are ran in different goroutines. mutex sync.Mutex @@ -1008,8 +1010,18 @@ type TransactionManager struct { // snapshotManager is responsible for creation and management of file system snapshots. snapshotManager *snapshot.Manager - // appendedLSN holds the LSN of the last log entry appended to the partition's write-ahead log. + // ┌─ oldestLSN ┌─ committedLSN + // ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ □ □ □ □ □ □ □ + // └─ appliedLSN └─ appendedLSN + // + // appendedLSN holds the LSN of the last log entry emitted by the current node but not yet acknowledged by other + // nodes. After so, the committedLSN is increment respectively and catches up with appendedLSN. If Raft is not + // enabled or functions as a single-node cluster, committedLSN is increment instantly. appendedLSN storage.LSN + // committedLSN holds the LSN of the last log entry committed to the partition's write-ahead log. A log entry is + // considered to be committed if it's accepted by the majority of cluster members. Eventually, it will be + // applied by all cluster members. + committedLSN storage.LSN // appliedLSN holds the LSN of the last log entry applied to the partition. appliedLSN storage.LSN // oldestLSN holds the LSN of the head of log entries which is still kept in the database. The manager keeps @@ -1020,7 +1032,11 @@ type TransactionManager struct { // the partition. It's keyed by the LSN the transaction is waiting to be applied and the // value is the resultChannel that is waiting the result. awaitingTransactions map[storage.LSN]resultChannel - // committedEntries keeps some latest appended log entries around. Some types of transactions, such as + + // appendedEntries keeps track of appended but not-yet committed entries. After an entry is committed, it is + // removed from this map. This provides quick reference to those entries. + appendedEntries map[storage.LSN]*gitalypb.LogEntry + // committedEntries keeps some latest committed log entries around. Some types of transactions, such as // housekeeping, operate on snapshot repository. There is a gap between transaction doing its work and the time // when it is committed. They need to verify if concurrent operations can cause conflict. These log entries are // still kept around even after they are applied. They are removed when there are no active readers accessing @@ -1045,10 +1061,11 @@ type TransactionManager struct { type testHooks struct { beforeInitialization func() - beforeAppendLogEntry func() - beforeApplyLogEntry func() - beforeStoreAppliedLSN func() - beforeDeleteLogEntryFiles func() + beforeAppendLogEntry func(storage.LSN) + beforeCommitLogEntry func(storage.LSN) + beforeApplyLogEntry func(storage.LSN) + beforeStoreAppliedLSN func(storage.LSN) + beforeDeleteLogEntryFiles func(storage.LSN) beforeRunExiting func() } @@ -1089,6 +1106,7 @@ func NewTransactionManager( stateDirectory: stateDir, stagingDirectory: stagingDir, awaitingTransactions: make(map[storage.LSN]resultChannel), + appendedEntries: map[storage.LSN]*gitalypb.LogEntry{}, committedEntries: list.New(), metrics: metrics, consumer: consumer, @@ -1097,10 +1115,11 @@ func NewTransactionManager( testHooks: testHooks{ beforeInitialization: func() {}, - beforeAppendLogEntry: func() {}, - beforeApplyLogEntry: func() {}, - beforeStoreAppliedLSN: func() {}, - beforeDeleteLogEntryFiles: func() {}, + beforeAppendLogEntry: func(storage.LSN) {}, + beforeCommitLogEntry: func(storage.LSN) {}, + beforeApplyLogEntry: func(storage.LSN) {}, + beforeStoreAppliedLSN: func(storage.LSN) {}, + beforeDeleteLogEntryFiles: func(storage.LSN) {}, beforeRunExiting: func() {}, }, } @@ -1115,8 +1134,6 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.Commit", nil) defer span.Finish() - transaction.result = make(resultChannel, 1) - if transaction.repositoryTarget() && !transaction.repositoryExists { // Determine if the repository was created in this transaction and stage its state // for committing if so. @@ -1130,6 +1147,43 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact } } + if err := mgr.prepareCommit(ctx, transaction); err != nil { + return err + } + + if err := func() error { + defer trace.StartRegion(ctx, "commit queue").End() + transaction.metrics.commitQueueDepth.Inc() + defer transaction.metrics.commitQueueDepth.Dec() + defer prometheus.NewTimer(mgr.metrics.commitQueueWaitSeconds).ObserveDuration() + + select { + case mgr.admissionQueue <- transaction: + transaction.admitted = true + return nil + case <-ctx.Done(): + return ctx.Err() + case <-mgr.closing: + return storage.ErrTransactionProcessingStopped + } + }(); err != nil { + return err + } + + defer trace.StartRegion(ctx, "result wait").End() + select { + case err := <-transaction.result: + return unwrapExpectedError(err) + case <-ctx.Done(): + return ctx.Err() + case <-mgr.closed: + return storage.ErrTransactionProcessingStopped + } +} + +func (mgr *TransactionManager) prepareCommit(ctx context.Context, transaction *Transaction) error { + transaction.result = make(resultChannel, 1) + // Create a directory to store all staging files. if err := os.Mkdir(transaction.walFilesPath(), mode.Directory); err != nil { return fmt.Errorf("create wal files directory: %w", err) @@ -1228,7 +1282,7 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact // The reference updates are staged into the transaction when they are verified, including // the packed-refs. While the reference files would generally be small, the packed-refs file // may be large. Sync the contents of the file before entering the critical section to ensure - // we don't end up syncing the potentially very large file to disk when we're appending the + // we don't end up syncing the potentially very large file to disk when we're committing the // log entry. preImagePackedRefsInode, err := getInode(transaction.originalPackedRefsFilePath()) if err != nil { @@ -1251,34 +1305,7 @@ func (mgr *TransactionManager) commit(ctx context.Context, transaction *Transact } } - if err := func() error { - defer trace.StartRegion(ctx, "commit queue").End() - transaction.metrics.commitQueueDepth.Inc() - defer transaction.metrics.commitQueueDepth.Dec() - defer prometheus.NewTimer(mgr.metrics.commitQueueWaitSeconds).ObserveDuration() - - select { - case mgr.admissionQueue <- transaction: - transaction.admitted = true - return nil - case <-ctx.Done(): - return ctx.Err() - case <-mgr.closing: - return storage.ErrTransactionProcessingStopped - } - }(); err != nil { - return err - } - - defer trace.StartRegion(ctx, "result wait").End() - select { - case err := <-transaction.result: - return unwrapExpectedError(err) - case <-ctx.Done(): - return ctx.Err() - case <-mgr.closed: - return storage.ErrTransactionProcessingStopped - } + return nil } // replaceObjectDirectory replaces the snapshot repository's object directory @@ -1774,7 +1801,7 @@ func (mgr *TransactionManager) preparePackRefsReftable(ctx context.Context, tran Name: "pack-refs", // By using the '--auto' flag, we ensure that git uses the best heuristic // for compaction. For reftables, it currently uses a geometric progression. - // This ensures we don't keep compacting unecessarily to a single file. + // This ensures we don't keep compacting unnecessarily to a single file. Flags: []gitcmd.Option{gitcmd.Flag{Name: "--auto"}}, }, gitcmd.WithStderr(&stderr)); err != nil { return structerr.New("exec pack-refs: %w", err).WithMetadata("stderr", stderr.String()) @@ -2131,7 +2158,7 @@ func unwrapExpectedError(err error) error { return err } -// Run starts the transaction processing. On start up Run loads the indexes of the last appended and applied +// Run starts the transaction processing. On start up Run loads the indexes of the last committed and applied // log entries from the database. It will then apply any transactions that have been logged but not applied // to the repository. Once the recovery is completed, Run starts processing new transactions by verifying the // references, logging the transaction and finally applying it to the repository. The transactions are acknowledged @@ -2162,7 +2189,8 @@ func (mgr *TransactionManager) run(ctx context.Context) (returnedErr error) { } for { - if mgr.appliedLSN < mgr.appendedLSN { + // We prioritize applying committed log entries to the partition first. + if mgr.appliedLSN < mgr.committedLSN { lsn := mgr.appliedLSN + 1 if err := mgr.applyLogEntry(ctx, lsn); err != nil { @@ -2227,95 +2255,104 @@ func (mgr *TransactionManager) processTransaction(ctx context.Context) (returned span, ctx := tracing.StartSpanIfHasParent(ctx, "transaction.processTransaction", nil) defer span.Finish() - if err := func() (commitErr error) { - repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath) + if err := func() error { + logEntry, logEntryPath, err := mgr.packageLogEntry(ctx, transaction) if err != nil { - return fmt.Errorf("does repository exist: %w", err) + return err } - logEntry := &gitalypb.LogEntry{ - RelativePath: transaction.relativePath, - } + return mgr.proposeLogEntry(ctx, transaction.objectDependencies, logEntry, logEntryPath) + }(); err != nil { + transaction.result <- err + return nil + } - if transaction.repositoryCreation != nil && repositoryExists { - return ErrRepositoryAlreadyExists - } else if transaction.repositoryCreation == nil && !repositoryExists { - return storage.ErrRepositoryNotFound - } + mgr.awaitingTransactions[mgr.committedLSN] = transaction.result - alternateRelativePath, err := mgr.verifyAlternateUpdate(ctx, transaction) - if err != nil { - return fmt.Errorf("verify alternate update: %w", err) - } + return nil +} - if err := mgr.setupStagingRepository(ctx, transaction, alternateRelativePath); err != nil { - return fmt.Errorf("setup staging snapshot: %w", err) - } +// packageLogEntry verifies the transaction and packaged its content to respective log entry. +func (mgr *TransactionManager) packageLogEntry(ctx context.Context, transaction *Transaction) (*gitalypb.LogEntry, string, error) { + repositoryExists, err := mgr.doesRepositoryExist(ctx, transaction.relativePath) + if err != nil { + return nil, "", fmt.Errorf("does repository exist: %w", err) + } - // Verify that all objects this transaction depends on are present in the repository. The dependency - // objects are the reference tips set in the transaction and the objects the transaction's packfile - // is based on. If an object dependency is missing, the transaction is aborted as applying it would - // result in repository corruption. - if err := mgr.verifyObjectsExist(ctx, transaction.stagingRepository, transaction.objectDependencies); err != nil { - return fmt.Errorf("verify object dependencies: %w", err) - } + logEntry := &gitalypb.LogEntry{ + RelativePath: transaction.relativePath, + } - if transaction.repositoryCreation == nil && transaction.runHousekeeping == nil && !transaction.deleteRepository { - logEntry.ReferenceTransactions, err = mgr.verifyReferences(ctx, transaction) - if err != nil { - return fmt.Errorf("verify references: %w", err) - } - } + if transaction.repositoryCreation != nil && repositoryExists { + return nil, "", ErrRepositoryAlreadyExists + } else if transaction.repositoryCreation == nil && !repositoryExists { + return nil, "", storage.ErrRepositoryNotFound + } - if transaction.customHooksUpdated { - // Log a deletion of the existing custom hooks so they are removed before the - // new ones are put in place. - if err := transaction.walEntry.RecordDirectoryRemoval( - mgr.storagePath, - filepath.Join(transaction.relativePath, repoutil.CustomHooksDir), - ); err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("record custom hook removal: %w", err) - } - } + alternateRelativePath, err := mgr.verifyAlternateUpdate(ctx, transaction) + if err != nil { + return nil, "", fmt.Errorf("verify alternate update: %w", err) + } - if transaction.deleteRepository { - logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} + if err := mgr.setupStagingRepository(ctx, transaction, alternateRelativePath); err != nil { + return nil, "", fmt.Errorf("setup staging snapshot: %w", err) + } - if err := transaction.walEntry.RecordDirectoryRemoval( - mgr.storagePath, - transaction.relativePath, - ); err != nil && !errors.Is(err, fs.ErrNotExist) { - return fmt.Errorf("record repository removal: %w", err) - } + // Verify that all objects this transaction depends on are present in the repository. The dependency + // objects are the reference tips set in the transaction and the objects the transaction's packfile + // is based on. If an object dependency is missing, the transaction is aborted as applying it would + // result in repository corruption. + if err := mgr.verifyObjectsExist(ctx, transaction.stagingRepository, transaction.objectDependencies); err != nil { + return nil, "", fmt.Errorf("verify object dependencies: %w", err) + } - if err := transaction.KV().Delete(relativePathKey(transaction.relativePath)); err != nil { - return fmt.Errorf("delete relative path: %w", err) - } + if transaction.repositoryCreation == nil && transaction.runHousekeeping == nil && !transaction.deleteRepository { + logEntry.ReferenceTransactions, err = mgr.verifyReferences(ctx, transaction) + if err != nil { + return nil, "", fmt.Errorf("verify references: %w", err) } + } - if transaction.runHousekeeping != nil { - housekeepingEntry, err := mgr.verifyHousekeeping(ctx, transaction) - if err != nil { - return fmt.Errorf("verifying pack refs: %w", err) - } - logEntry.Housekeeping = housekeepingEntry + if transaction.customHooksUpdated { + // Log a deletion of the existing custom hooks so they are removed before the + // new ones are put in place. + if err := transaction.walEntry.RecordDirectoryRemoval( + mgr.storagePath, + filepath.Join(transaction.relativePath, repoutil.CustomHooksDir), + ); err != nil && !errors.Is(err, fs.ErrNotExist) { + return nil, "", fmt.Errorf("record custom hook removal: %w", err) } + } + + if transaction.deleteRepository { + logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} - if err := mgr.verifyKeyValueOperations(ctx, transaction); err != nil { - return fmt.Errorf("verify key-value operations: %w", err) + if err := transaction.walEntry.RecordDirectoryRemoval( + mgr.storagePath, + transaction.relativePath, + ); err != nil && !errors.Is(err, fs.ErrNotExist) { + return nil, "", fmt.Errorf("record repository removal: %w", err) } - logEntry.Operations = transaction.walEntry.Operations() + if err := transaction.KV().Delete(relativePathKey(transaction.relativePath)); err != nil { + return nil, "", fmt.Errorf("delete relative path: %w", err) + } + } - return mgr.appendLogEntry(ctx, transaction.objectDependencies, logEntry, transaction.walFilesPath()) - }(); err != nil { - transaction.result <- err - return nil + if transaction.runHousekeeping != nil { + housekeepingEntry, err := mgr.verifyHousekeeping(ctx, transaction) + if err != nil { + return nil, "", fmt.Errorf("verifying pack refs: %w", err) + } + logEntry.Housekeeping = housekeepingEntry } - mgr.awaitingTransactions[mgr.appendedLSN] = transaction.result + if err := mgr.verifyKeyValueOperations(ctx, transaction); err != nil { + return nil, "", fmt.Errorf("verify key-value operations: %w", err) + } - return nil + logEntry.Operations = transaction.walEntry.Operations() + return logEntry, transaction.walFilesPath(), nil } // verifyKeyValueOperations checks the key-value operations of the transaction for conflicts and includes @@ -2429,7 +2466,7 @@ func (mgr *TransactionManager) snapshotsDir() string { return filepath.Join(mgr.stagingDirectory, "snapshots") } -// initialize initializes the TransactionManager's state from the database. It loads the appended and the applied +// initialize initializes the TransactionManager's state from the database. It loads the committed and the applied // LSNs and initializes the notification channels that synchronize transaction beginning with log entry applying. func (mgr *TransactionManager) initialize(ctx context.Context) error { defer trace.StartRegion(ctx, "initialize").End() @@ -2456,38 +2493,70 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return fmt.Errorf("new snapshot manager: %w", err) } - // The LSN of the last appended log entry is determined from the LSN of the latest entry in the log and + // The LSN of the last committed log entry is determined from the LSN of the latest entry in the log and // the latest applied log entry. The manager also keeps track of committed entries and reserves them until there // is no transaction refers them. It's possible there are some left-over entries in the database because a // transaction can hold the entry stubbornly. So, the manager could not clean them up in the last session. // - // ┌─ oldestLSN ┌─ appendedLSN - // ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ - // └─ appliedLSN + // ┌─ oldestLSN ┌─ committedLSN + // ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ■ ■ ■ ■ ■ ■ ■ ■ □ □ □ □ □ □ □ + // └─ appliedLSN └─ appendedLSN // // // oldestLSN is initialized to appliedLSN + 1. If there are no log entries in the log, then everything has been // pruned already or there has not been any log entries yet. Setting this +1 avoids trying to clean up log entries // that do not exist. If there are some, we'll set oldestLSN to the head of the log below. mgr.oldestLSN = mgr.appliedLSN + 1 - // appendedLSN is initialized to appliedLSN. If there are no log entries, then there has been no transaction yet, or - // all log entries have been applied and have been already pruned. If there are some in the log, we'll update this - // below to match. - mgr.appendedLSN = mgr.appliedLSN + + // CommittedLSN is loaded from DB. A log entry is appended first and marked as committed later. There's a chance + // that log entry is never marked as committed. After a restart, especially after a crash, the manager won't be + // able to tell if it's committed or not. Thus, we need to persist this index. + // Because index persistence is introduced later, we need to fallback to appliedLSN if that key does not exist + // in the DB. + var committedLSN gitalypb.LSN + if err := mgr.readKey(keyCommittedLSN, &committedLSN); err != nil { + if !errors.Is(err, badger.ErrKeyNotFound) { + return fmt.Errorf("read committed LSN: %w", err) + } + mgr.committedLSN = mgr.appliedLSN + } else { + mgr.committedLSN = storage.LSN(committedLSN.GetValue()) + } + + // appendedLSN is always set to committedLSN after starting. If a log entry hasn't been committed, it could be + // discarded. Its caller never received the acknowledgement. + mgr.appendedLSN = mgr.committedLSN if logEntries, err := os.ReadDir(walFilesPath(mgr.stateDirectory)); err != nil { return fmt.Errorf("read wal directory: %w", err) } else if len(logEntries) > 0 { - if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { - return fmt.Errorf("parse oldest LSN: %w", err) - } - if mgr.appendedLSN, err = storage.ParseLSN(logEntries[len(logEntries)-1].Name()); err != nil { - return fmt.Errorf("parse appended LSN: %w", err) + // All log entries starting from mgr.committedLSN + 1 are not committed. No reason to keep them around. + // Returned log entries are sorted in ascending order. We iterate backward and break when the iterating + // LSN drops below committedLSN. + for i := len(logEntries) - 1; i >= 0; i-- { + logEntry := logEntries[i] + + lsn, err := storage.ParseLSN(logEntry.Name()) + if err != nil { + return fmt.Errorf("parse LSN: %w", err) + } + if lsn <= mgr.committedLSN { + // Found some on-disk log entries older than or equal to committedLSN. They might be + // referenced by other transactions before restart. Eventually, they'll be removed in + // the main loop. + if mgr.oldestLSN, err = storage.ParseLSN(logEntries[0].Name()); err != nil { + return fmt.Errorf("parse oldest LSN: %w", err) + } + break + } + if err := mgr.deleteLogEntry(mgr.ctx, lsn); err != nil { + return fmt.Errorf("cleaning uncommitted log entry: %w", err) + } } } if mgr.consumer != nil { - mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.appendedLSN) + mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.oldestLSN, mgr.committedLSN) } // Create a snapshot lock for the applied LSN as it is used for synchronizing @@ -2497,11 +2566,11 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { // Each unapplied log entry should have a snapshot lock as they are created in normal // operation when committing a log entry. Recover these entries. - for i := mgr.appliedLSN + 1; i <= mgr.appendedLSN; i++ { + for i := mgr.appliedLSN + 1; i <= mgr.committedLSN; i++ { mgr.snapshotLocks[i] = &snapshotLock{applied: make(chan struct{})} } - if err := mgr.removeStaleWALFiles(ctx, mgr.oldestLSN, mgr.appendedLSN); err != nil { + if err := mgr.removeStaleWALFiles(ctx, mgr.oldestLSN, mgr.committedLSN); err != nil { return fmt.Errorf("remove stale packs: %w", err) } @@ -2597,15 +2666,15 @@ func (mgr *TransactionManager) removePackedRefsLocks(ctx context.Context, reposi // but the manager was interrupted before successfully persisting the log entry itself. // If the manager deletes a log entry successfully from the database but is interrupted before it cleans // up the associated files, such a directory can also be left at the head of the log. -func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, oldestLSN, appendedLSN storage.LSN) error { +func (mgr *TransactionManager) removeStaleWALFiles(ctx context.Context, oldestLSN, committedLSN storage.LSN) error { needsFsync := false for _, possibleStaleFilesPath := range []string{ // Log entries are pruned one by one. If a write is interrupted, the only possible stale files would be // for the log entry preceding the oldest log entry. walFilesPathForLSN(mgr.stateDirectory, oldestLSN-1), - // Log entries are appended one by one to the log. If a write is interrupted, the only possible stale + // Log entries are committed one by one to the log. If a write is interrupted, the only possible stale // files would be for the next LSN. Remove the files if they exist. - walFilesPathForLSN(mgr.stateDirectory, appendedLSN+1), + walFilesPathForLSN(mgr.stateDirectory, committedLSN+1), } { if _, err := os.Stat(possibleStaleFilesPath); err != nil { @@ -2857,7 +2926,7 @@ func (mgr *TransactionManager) verifyReferences(ctx context.Context, transaction // to transaction operations. // // To ensure that we don't modify existing tables and autocompact, we lock the existing tables -// before applying the updates. This way the reftable backend willl only create new tables +// before applying the updates. This way the reftable backend will only create new tables func (mgr *TransactionManager) verifyReferencesWithGitForReftables( ctx context.Context, referenceTransactions []*gitalypb.LogEntry_ReferenceTransaction, @@ -3139,7 +3208,7 @@ func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transacti mgr.mutex.Lock() defer mgr.mutex.Unlock() - // Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest appended LSN. + // Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest committed LSN. if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry, objectDependencies map[git.ObjectID]struct{}) error { if entry.GetHousekeeping() != nil { return errHousekeepingConflictConcurrent @@ -3506,10 +3575,22 @@ func (mgr *TransactionManager) applyReferenceTransaction(ctx context.Context, ch return nil } -// appendLogEntry appends the transaction to the write-ahead log. It first writes the transaction's manifest file -// into the log entry's directory. Afterwards it moves the log entry's directory from the staging area to its final -// place in the write-ahead log. -func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { +// proposeLogEntry proposes a log etnry of a transaction to the write-ahead log. It first writes the transaction's +// manifest file into the log entry's directory. Second, it sends the log entry to other cluster members if needed. +// Afterwards it moves the log entry's directory from the staging area to its final place in the write-ahead log. +func (mgr *TransactionManager) proposeLogEntry(ctx context.Context, objectDependencies map[git.ObjectID]struct{}, logEntry *gitalypb.LogEntry, logEntryPath string) error { + nextLSN := mgr.appendedLSN + 1 + if err := mgr.appendLogEntry(ctx, nextLSN, logEntry, logEntryPath); err != nil { + return fmt.Errorf("append log entry: %w", err) + } + if err := mgr.commitLogEntry(ctx, nextLSN, objectDependencies); err != nil { + return fmt.Errorf("commit log entry: %w", err) + } + return nil +} + +// appendLogEntry appends the transaction to the write-ahead log. +func (mgr *TransactionManager) appendLogEntry(ctx context.Context, nextLSN storage.LSN, logEntry *gitalypb.LogEntry, logEntryPath string) error { defer trace.StartRegion(ctx, "appendLogEntry").End() manifestBytes, err := proto.Marshal(logEntry) @@ -3538,9 +3619,8 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende return fmt.Errorf("synchronizing WAL directory: %w", err) } - mgr.testHooks.beforeAppendLogEntry() + mgr.testHooks.beforeAppendLogEntry(nextLSN) - nextLSN := mgr.appendedLSN + 1 // Move the log entry from the staging directory into its place in the log. destinationPath := walFilesPathForLSN(mgr.stateDirectory, nextLSN) if err := os.Rename(logEntryPath, destinationPath); err != nil { @@ -3567,7 +3647,30 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende // are guaranteed to read it. mgr.mutex.Lock() mgr.appendedLSN = nextLSN + mgr.appendedEntries[mgr.appendedLSN] = logEntry + mgr.mutex.Unlock() + + return nil +} + +// commitLogEntry commits the transaction to the write-ahead log. +func (mgr *TransactionManager) commitLogEntry(ctx context.Context, nextLSN storage.LSN, objectDependencies map[git.ObjectID]struct{}) error { + defer trace.StartRegion(ctx, "commitLogEntry").End() + mgr.testHooks.beforeCommitLogEntry(nextLSN) + + // Persist committed LSN before updating internal states. + if err := mgr.storeCommittedLSN(nextLSN); err != nil { + return fmt.Errorf("persisting committed entry: %w", err) + } + + mgr.mutex.Lock() + mgr.committedLSN = nextLSN mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})} + if _, exist := mgr.appendedEntries[nextLSN]; !exist { + mgr.mutex.Unlock() + return fmt.Errorf("log entry %s not found in the appended list", nextLSN) + } + delete(mgr.appendedEntries, nextLSN) mgr.committedEntries.PushBack(&committedEntry{ lsn: nextLSN, objectDependencies: objectDependencies, @@ -3577,6 +3680,7 @@ func (mgr *TransactionManager) appendLogEntry(ctx context.Context, objectDepende if mgr.consumer != nil { mgr.consumer.NotifyNewTransactions(mgr.storageName, mgr.partitionID, mgr.lowWaterMark(), nextLSN) } + return nil } @@ -3600,7 +3704,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS delete(mgr.snapshotLocks, previousLSN) mgr.mutex.Unlock() - mgr.testHooks.beforeApplyLogEntry() + mgr.testHooks.beforeApplyLogEntry(lsn) if err := applyOperations(ctx, safe.NewSyncer().Sync, mgr.storagePath, walFilesPathForLSN(mgr.stateDirectory, lsn), logEntry, mgr.db); err != nil { return fmt.Errorf("apply operations: %w", err) @@ -3615,7 +3719,7 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn storage.LS } } - mgr.testHooks.beforeStoreAppliedLSN() + mgr.testHooks.beforeStoreAppliedLSN(lsn) if err := mgr.storeAppliedLSN(lsn); err != nil { return fmt.Errorf("set applied LSN: %w", err) } @@ -4005,6 +4109,11 @@ func (mgr *TransactionManager) storeAppliedLSN(lsn storage.LSN) error { return mgr.setKey(keyAppliedLSN, lsn.ToProto()) } +// storeCommittedLSN stores the partition's committed LSN in the database. +func (mgr *TransactionManager) storeCommittedLSN(lsn storage.LSN) error { + return mgr.setKey(keyCommittedLSN, lsn.ToProto()) +} + // setKey marshals and stores a given protocol buffer message into the database under the given key. func (mgr *TransactionManager) setKey(key []byte, value proto.Message) error { marshaledValue, err := proto.Marshal(value) diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go index dbe1843e39f6e3ae6cc7fe3dfd9e6629cfbd90b4..bbd632c432bbe4c1a5eaf27b51a6bbf600a8808f 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_alternate_test.go @@ -1124,9 +1124,7 @@ func generateAlternateTests(t *testing.T, setup testTransactionSetup) []transact CloseManager{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -1203,9 +1201,7 @@ func generateAlternateTests(t *testing.T, setup testTransactionSetup) []transact CloseManager{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go index 5bfd1c5fc969996e08a368dd90627742d1df717d..07ab97957c8e22e1fec21c94d7872832391e8208 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_default_branch_test.go @@ -464,9 +464,7 @@ func generateDefaultBranchTests(t *testing.T, setup testTransactionSetup) []tran steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go index 3434a94f40824a46259154d08693fc368026b464..d2467b3dab2dd5be494ea51db8334a603326ea96 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_hook_test.go @@ -17,18 +17,16 @@ type hookFunc func(hookContext) // hookContext are the control toggles available in a hook. type hookContext struct { - // closeManager calls the calls Close on the TransactionManager. - closeManager func() + // manager points to the subject TransactionManager. + manager *TransactionManager + // lsn stores the LSN context when the hook is triggered. + lsn storage.LSN } // installHooks takes the hooks in the test setup and configures them in the TransactionManager. func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, hooks testTransactionHooks) { for destination, source := range map[*func()]hookFunc{ - &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, - &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, - &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, - &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, - &mgr.testHooks.beforeDeleteLogEntryFiles: hooks.AfterDeleteLogEntry, + &mgr.testHooks.beforeInitialization: hooks.BeforeReadAppliedLSN, &mgr.testHooks.beforeRunExiting: func(hookContext) { if hooks.WaitForTransactionsWhenClosing { inflightTransactions.Wait() @@ -41,7 +39,24 @@ func installHooks(mgr *TransactionManager, inflightTransactions *sync.WaitGroup, runHook := source *destination = func() { runHook(hookContext{ - closeManager: mgr.Close, + manager: mgr, + }) + } + } + } + for destination, source := range map[*func(storage.LSN)]hookFunc{ + &mgr.testHooks.beforeApplyLogEntry: hooks.BeforeApplyLogEntry, + &mgr.testHooks.beforeAppendLogEntry: hooks.BeforeAppendLogEntry, + &mgr.testHooks.beforeCommitLogEntry: hooks.BeforeCommitLogEntry, + &mgr.testHooks.beforeStoreAppliedLSN: hooks.BeforeStoreAppliedLSN, + &mgr.testHooks.beforeDeleteLogEntryFiles: hooks.AfterDeleteLogEntry, + } { + if source != nil { + runHook := source + *destination = func(lsn storage.LSN) { + runHook(hookContext{ + manager: mgr, + lsn: lsn, }) } } @@ -89,9 +104,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -139,9 +152,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -483,9 +494,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -577,9 +586,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -671,9 +678,7 @@ func generateCustomHooksTests(t *testing.T, setup testTransactionSetup) []transa steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go index f0e36bd571e21bcb5063f35960618e12387fbf51..5af8d077c473ea9f45319b173f81b4a192ac540d 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_housekeeping_test.go @@ -1335,9 +1335,7 @@ func generateHousekeepingPackRefsTests(t *testing.T, ctx context.Context, testPa CloseManager{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go index b39cc1b2b534ade2356a77f76a317e254e600fc4..25f2055cb69b03980aea29ffa1d9f20a118b1fb4 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_key_value_test.go @@ -1,11 +1,13 @@ package partition import ( + "testing" + "github.com/dgraph-io/badger/v4" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" ) -func generateKeyValueTests(setup testTransactionSetup) []transactionTestCase { +func generateKeyValueTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { return []transactionTestCase{ { desc: "set keys with values", @@ -615,9 +617,7 @@ func generateKeyValueTests(setup testTransactionSetup) []transactionTestCase { steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go index 785204ee3381a73025bcade13af8416aab579165..1f356bc4d6a3dcc3bd1281db672dfbdb29bd1ab2 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_repo_test.go @@ -364,9 +364,7 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t RemoveRepository{}, StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -408,9 +406,7 @@ func generateCreateRepositoryTests(t *testing.T, setup testTransactionSetup) []t RemoveRepository{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -910,9 +906,7 @@ func generateDeleteRepositoryTests(t *testing.T, setup testTransactionSetup) []t steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -954,9 +948,7 @@ func generateDeleteRepositoryTests(t *testing.T, setup testTransactionSetup) []t steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, diff --git a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go index 021f6adddc322ac4afee2435f6d72fa49ba55e5c..725447712ffe8ee8b91851bb45fb9847c0c1a2c7 100644 --- a/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/partition/transaction_manager_test.go @@ -39,6 +39,13 @@ import ( // TransactionManager.Run execution. var errSimulatedCrash = errors.New("simulated crash") +// simulateCrashHook returns a hook function that panics with errSimulatedCrash. +var simulateCrashHook = func() func(hookContext) { + return func(hookContext) { + panic(errSimulatedCrash) + } +} + func manifestDirectoryEntry(expected *gitalypb.LogEntry) testhelper.DirectoryEntry { return testhelper.DirectoryEntry{ Mode: mode.File, @@ -339,6 +346,7 @@ func TestTransactionManager(t *testing.T) { subTests := map[string][]transactionTestCase{ "Common": generateCommonTests(t, ctx, setup), "CommittedEntries": generateCommittedEntriesTests(t, setup), + "AppendedEntries": generateAppendedEntriesTests(t, setup), "ModifyReferences": generateModifyReferencesTests(t, setup), "CreateRepository": generateCreateRepositoryTests(t, setup), "DeleteRepository": generateDeleteRepositoryTests(t, setup), @@ -350,7 +358,7 @@ func TestTransactionManager(t *testing.T) { "Housekeeping/RepackingConcurrent": generateHousekeepingRepackingConcurrentTests(t, ctx, setup), "Housekeeping/CommitGraphs": generateHousekeepingCommitGraphsTests(t, ctx, setup), "Consumer": generateConsumerTests(t, setup), - "KeyValue": generateKeyValueTests(setup), + "KeyValue": generateKeyValueTests(t, setup), } for desc, tests := range subTests { @@ -485,7 +493,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeAppendLogEntry: func(hookContext hookContext) { hookContext.closeManager() }, + BeforeAppendLogEntry: func(hookContext hookContext) { hookContext.manager.Close() }, // This ensures we are testing the context cancellation errors being unwrapped properly // to an storage.ErrTransactionProcessingStopped instead of hitting the general case when // runDone is closed. @@ -506,9 +514,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeApplyLogEntry: func(hookCtx hookContext) { - panic(errSimulatedCrash) - }, + BeforeApplyLogEntry: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -526,6 +532,11 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio }, }, expectedState: StateAssertion{ + Database: DatabaseState{ + // The process crashes before apply but after it's committed. So, only + // committedLSN is persisted. + string(keyCommittedLSN): storage.LSN(1).ToProto(), + }, Directory: gittest.FilesOrReftables(testhelper.DirectoryState{ "/": {Mode: mode.Directory}, "/wal": {Mode: mode.Directory}, @@ -865,9 +876,7 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio Prune{}, StartManager{ Hooks: testTransactionHooks{ - BeforeStoreAppliedLSN: func(hookContext) { - panic(errSimulatedCrash) - }, + BeforeStoreAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -1438,13 +1447,11 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio steps: steps{ StartManager{ Hooks: testTransactionHooks{ - BeforeReadAppliedLSN: func(hookContext) { - // Raise a panic when the manager is about to read the applied log - // index when initializing. In reality this would crash the server but - // in tests it serves as a way to abort the initialization in correct - // location. - panic(errSimulatedCrash) - }, + // Raise a panic when the manager is about to read the applied log + // index when initializing. In reality this would crash the server but + // in tests it serves as a way to abort the initialization in correct + // location. + BeforeReadAppliedLSN: simulateCrashHook(), }, ExpectedError: errSimulatedCrash, }, @@ -1750,6 +1757,8 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t } i++ } + + require.Empty(t, manager.appendedEntries) } return []transactionTestCase{ @@ -2066,7 +2075,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, }, { - desc: "transaction manager cleans up left-over committed entries when appliedLSN == appendedLSN", + desc: "transaction manager cleans up left-over committed entries when appliedLSN == committedLSN", steps: steps{ StartManager{}, Begin{ @@ -2140,6 +2149,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(3).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(3)) + require.Equal(t, tm.committedLSN, storage.LSN(3)) require.Equal(t, tm.appendedLSN, storage.LSN(3)) }), }, @@ -2212,7 +2222,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, }, { - desc: "transaction manager cleans up left-over committed entries when appliedLSN < appendedLSN", + desc: "transaction manager cleans up left-over committed entries when appliedLSN < committedLSN", skip: func(t *testing.T) { testhelper.SkipWithReftable(t, "test requires manual log addition") }, @@ -2263,14 +2273,17 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { // Insert an out-of-band log-entry directly into the database for easier test // setup. It's a bit tricky to simulate committed log entries and un-processed - // appended log entries at the same time. + // committed log entries at the same time. logEntryPath := filepath.Join(t.TempDir(), "log_entry") require.NoError(t, os.Mkdir(logEntryPath, mode.Directory)) require.NoError(t, os.WriteFile(filepath.Join(logEntryPath, "1"), []byte(setup.Commits.First.OID+"\n"), mode.File)) - require.NoError(t, tm.appendLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath)) + require.NoError(t, tm.proposeLogEntry(ctx, map[git.ObjectID]struct{}{setup.Commits.First.OID: {}}, refChangeLogEntry(setup, "refs/heads/branch-3", setup.Commits.First.OID), logEntryPath)) RequireDatabase(t, ctx, tm.db, DatabaseState{ string(keyAppliedLSN): storage.LSN(3).ToProto(), + // Because this is an out-of-band insertion, the committedLSN is updated + // but new log entry is not applied until waken up. + string(keyCommittedLSN): storage.LSN(4).ToProto(), }) // Transaction 2 and 3 are left-over. testhelper.RequireDirectoryState(t, tm.stateDirectory, "", testhelper.DirectoryState{ @@ -2296,12 +2309,14 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t string(keyAppliedLSN): storage.LSN(4).ToProto(), }) require.Equal(t, tm.appliedLSN, storage.LSN(4)) + require.Equal(t, tm.committedLSN, storage.LSN(4)) require.Equal(t, tm.appendedLSN, storage.LSN(4)) }), }, expectedState: StateAssertion{ Database: DatabaseState{ - string(keyAppliedLSN): storage.LSN(4).ToProto(), + string(keyAppliedLSN): storage.LSN(4).ToProto(), + string(keyCommittedLSN): storage.LSN(4).ToProto(), }, Repositories: RepositoryStates{ setup.RelativePath: { @@ -2323,6 +2338,305 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t } } +func generateAppendedEntriesTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { + assertAppendedEntries := func(t *testing.T, manager *TransactionManager, expectedList []storage.LSN) { + actual := manager.appendedEntries + assert.Equalf(t, len(expectedList), len(actual), "appended entries not matched") + + for _, lsn := range expectedList { + expectedEntry, err := manager.readLogEntry(lsn) + assert.NoError(t, err) + testhelper.ProtoEqualAssert(t, expectedEntry, actual[lsn]) + } + } + + return []transactionTestCase{ + { + desc: "manager has just initialized", + steps: steps{ + StartManager{Hooks: testTransactionHooks{ + BeforeCommitLogEntry: func(c hookContext) { + assert.Fail(t, "there shouldn't be any committed entry") + }, + }}, + AssertManager{}, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(0), tm.appendedLSN) + assertAppendedEntries(t, tm, nil) + }), + CloseManager{}, + }, + }, + { + desc: "appended entries are removed after committed", + steps: steps{ + StartManager{Hooks: testTransactionHooks{ + BeforeCommitLogEntry: func(c hookContext) { + // It is not the best solution. But as we intercept to assert an + // intermediate state, there is no cleaner way than using hooks. + lsn := c.lsn + manager := c.manager + switch lsn { + case storage.LSN(1): + assert.Equal(t, storage.LSN(1), manager.appendedLSN) + assert.Equal(t, storage.LSN(0), manager.committedLSN) + assertAppendedEntries(t, manager, []storage.LSN{1}) + case storage.LSN(2): + assert.Equal(t, storage.LSN(2), manager.appendedLSN) + assert.Equal(t, storage.LSN(1), manager.committedLSN) + assertAppendedEntries(t, manager, []storage.LSN{2}) + default: + assert.Fail(t, "there shouldn't be another committed entry") + } + }, + }}, + Begin{ + TransactionID: 1, + RelativePaths: []string{setup.RelativePath}, + }, + Commit{ + TransactionID: 1, + ReferenceUpdates: git.ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(1), tm.appendedLSN) + require.Equal(t, storage.LSN(1), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + Begin{ + TransactionID: 2, + RelativePaths: []string{setup.RelativePath}, + ExpectedSnapshotLSN: 1, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(1), tm.appendedLSN) + require.Equal(t, storage.LSN(1), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + Commit{ + TransactionID: 2, + ReferenceUpdates: git.ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(2), tm.appendedLSN) + require.Equal(t, storage.LSN(2), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + CloseManager{}, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN): storage.LSN(2).ToProto(), + }, + // The appended-but-not-committed log entry from transaction 1 is also removed. + Directory: testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }, + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: gittest.FilesOrReftables( + &ReferencesState{ + FilesBackend: &FilesBackendState{ + LooseReferences: map[git.ReferenceName]git.ObjectID{ + "refs/heads/main": setup.Commits.First.OID, + "refs/heads/branch-1": setup.Commits.First.OID, + }, + }, + }, &ReferencesState{ + ReftableBackend: &ReftableBackendState{ + Tables: []ReftableTable{ + { + MinIndex: 1, + MaxIndex: 1, + References: []git.Reference{ + { + Name: "HEAD", + Target: "refs/heads/main", + IsSymbolic: true, + }, + }, + }, + { + MinIndex: 2, + MaxIndex: 2, + References: []git.Reference{ + { + Name: "refs/heads/branch-1", + Target: setup.Commits.First.OID.String(), + }, + }, + }, + { + MinIndex: 3, + MaxIndex: 3, + References: []git.Reference{ + { + Name: "refs/heads/main", + Target: setup.Commits.First.OID.String(), + }, + }, + }, + }, + }, + }, + ), + }, + }, + }, + }, + { + desc: "transaction manager crashes after appending", + steps: steps{ + StartManager{ + Hooks: testTransactionHooks{ + BeforeCommitLogEntry: func(c hookContext) { + assert.Equal(t, storage.LSN(1), c.manager.appendedLSN) + assertAppendedEntries(t, c.manager, []storage.LSN{1}) + simulateCrashHook()(c) + }, + WaitForTransactionsWhenClosing: true, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 1, + RelativePaths: []string{setup.RelativePath}, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + require.Equal(t, storage.LSN(0), tm.appendedLSN) + require.Equal(t, storage.LSN(0), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + Commit{ + TransactionID: 1, + ReferenceUpdates: git.ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + ExpectedError: storage.ErrTransactionProcessingStopped, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + // Appended entries are persisted. + testhelper.RequireDirectoryState(t, tm.stateDirectory, "", gittest.FilesOrReftables(testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + "/wal/0000000000001": {Mode: mode.Directory}, + "/wal/0000000000001/MANIFEST": manifestDirectoryEntry(&gitalypb.LogEntry{ + RelativePath: setup.RelativePath, + ReferenceTransactions: []*gitalypb.LogEntry_ReferenceTransaction{ + { + Changes: []*gitalypb.LogEntry_ReferenceTransaction_Change{ + { + ReferenceName: []byte("refs/heads/branch-1"), + NewOid: []byte(setup.Commits.First.OID), + }, + }, + }, + }, + Operations: []*gitalypb.LogEntry_Operation{ + { + Operation: &gitalypb.LogEntry_Operation_CreateHardLink_{ + CreateHardLink: &gitalypb.LogEntry_Operation_CreateHardLink{ + SourcePath: []byte("1"), + DestinationPath: []byte(filepath.Join(setup.RelativePath, "refs/heads/branch-1")), + }, + }, + }, + }, + }), + "/wal/0000000000001/1": {Mode: mode.File, Content: []byte(setup.Commits.First.OID + "\n")}, + }, buildReftableDirectory(map[int][]git.ReferenceUpdates{ + 1: {{"refs/heads/branch-1": git.ReferenceUpdate{NewOID: setup.Commits.First.OID}}}, + }))) + }), + StartManager{}, + AssertManager{}, + // No-op, just to ensure the manager is initialized. + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + // Both of them are set to 0 now. + require.Equal(t, storage.LSN(0), tm.appendedLSN) + require.Equal(t, storage.LSN(0), tm.committedLSN) + // Appended entries are removed now. + testhelper.RequireDirectoryState(t, tm.stateDirectory, "", testhelper.DirectoryState{ + "/": {Mode: mode.Directory}, + "/wal": {Mode: mode.Directory}, + }) + assertAppendedEntries(t, tm, nil) + }), + Begin{ + TransactionID: 2, + RelativePaths: []string{setup.RelativePath}, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: git.ReferenceUpdates{ + "refs/heads/branch-1": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + }, + AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { + // Apply new commit only. The prior commit was rejected. + require.Equal(t, storage.LSN(1), tm.appendedLSN) + require.Equal(t, storage.LSN(1), tm.committedLSN) + assertAppendedEntries(t, tm, nil) + }), + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLSN): storage.LSN(1).ToProto(), + }, + Repositories: RepositoryStates{ + setup.RelativePath: { + DefaultBranch: "refs/heads/main", + References: gittest.FilesOrReftables( + &ReferencesState{ + FilesBackend: &FilesBackendState{ + LooseReferences: map[git.ReferenceName]git.ObjectID{ + "refs/heads/branch-1": setup.Commits.Second.OID, + }, + }, + }, &ReferencesState{ + ReftableBackend: &ReftableBackendState{ + Tables: []ReftableTable{ + { + MinIndex: 1, + MaxIndex: 1, + References: []git.Reference{ + { + Name: "HEAD", + Target: "refs/heads/main", + IsSymbolic: true, + }, + }, + }, + { + MinIndex: 2, + MaxIndex: 2, + References: []git.Reference{ + { + Name: "refs/heads/branch-1", + Target: setup.Commits.Second.OID.String(), + }, + }, + }, + }, + }, + }, + ), + }, + }, + }, + }, + } +} + // BenchmarkTransactionManager benchmarks the transaction throughput of the TransactionManager at various levels // of concurrency and transaction sizes. func BenchmarkTransactionManager(b *testing.B) {