diff --git a/internal/gitaly/storage/storagemgr/transaction_manager.go b/internal/gitaly/storage/storagemgr/transaction_manager.go index bd7653a04d13f1cbf3572bfa364b30ed4bfa6baa..00031a6e54af9b1908ee5f5a11e3b5350cbeb5f1 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager.go @@ -49,6 +49,9 @@ var ( ErrTransactionAlreadyRollbacked = errors.New("transaction already rollbacked") // errInitializationFailed is returned when the TransactionManager failed to initialize successfully. errInitializationFailed = errors.New("initializing transaction processing failed") + // errCommittedEntryGone is returned when the log entry of a LSN is gone from database while it's still + // accessed by other transactions. + errCommittedEntryGone = errors.New("in-used committed entry is gone") // errNotDirectory is returned when the repository's path doesn't point to a directory errNotDirectory = errors.New("repository's path didn't point to a directory") // errRelativePathNotSet is returned when a transaction is begun without providing a relative path @@ -317,7 +320,9 @@ func (mgr *TransactionManager) Begin(ctx context.Context, relativePath string, s if !txn.readOnly { mgr.mutex.Lock() defer mgr.mutex.Unlock() - mgr.cleanCommittedEntry(entry) + if err := mgr.cleanCommittedEntry(entry); err != nil { + return fmt.Errorf("cleaning committed entry: %w", err) + } } return nil @@ -666,8 +671,6 @@ type snapshotLock struct { type committedEntry struct { // lsn is the associated LSN of the entry lsn LSN - // entry is the pointer to the corresponding log entry. - entry *gitalypb.LogEntry // snapshotReaders accounts for the number of transaction readers of the snapshot. snapshotReaders int } @@ -1807,18 +1810,16 @@ func (mgr *TransactionManager) verifyHousekeeping(ctx context.Context, transacti defer mgr.mutex.Unlock() // Check for any concurrent housekeeping between this transaction's snapshot LSN and the latest appended LSN. - elm := mgr.committedEntries.Front() - for elm != nil { - entry := elm.Value.(*committedEntry) - if entry.lsn > transaction.snapshotLSN && entry.entry.RelativePath == transaction.relativePath { - if entry.entry.GetHousekeeping() != nil { - return nil, errHousekeepingConflictConcurrent - } - if entry.entry.GetRepositoryDeletion() != nil { - return nil, errConflictRepositoryDeletion - } + if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry) error { + if entry.GetHousekeeping() != nil { + return errHousekeepingConflictConcurrent } - elm = elm.Next() + if entry.GetRepositoryDeletion() != nil { + return errConflictRepositoryDeletion + } + return nil + }); err != nil { + return nil, err } packRefsEntry, err := mgr.verifyPackRefs(mgr.ctx, transaction) @@ -1860,23 +1861,21 @@ func (mgr *TransactionManager) verifyPackRefs(ctx context.Context, transaction * packRefs := transaction.runHousekeeping.packRefs // Check for any concurrent ref deletion between this transaction's snapshot LSN to the end. - elm := mgr.committedEntries.Front() - for elm != nil { - entry := elm.Value.(*committedEntry) - if entry.lsn > transaction.snapshotLSN && entry.entry.RelativePath == transaction.relativePath { - for _, refTransaction := range entry.entry.ReferenceTransactions { - for _, change := range refTransaction.Changes { - if objectHash.IsZeroOID(git.ObjectID(change.GetNewOid())) { - // Oops, there is a reference deletion. Bail out. - return nil, errPackRefsConflictRefDeletion - } - // Ref update. Remove the updated ref from the list of pruned refs so that the - // new OID in loose reference shadows the outdated OID in packed-refs. - delete(packRefs.PrunedRefs, git.ReferenceName(change.GetReferenceName())) + if err := mgr.walkCommittedEntries(transaction, func(entry *gitalypb.LogEntry) error { + for _, refTransaction := range entry.ReferenceTransactions { + for _, change := range refTransaction.Changes { + if objectHash.IsZeroOID(git.ObjectID(change.GetNewOid())) { + // Oops, there is a reference deletion. Bail out. + return errPackRefsConflictRefDeletion } + // Ref update. Remove the updated ref from the list of pruned refs so that the + // new OID in loose reference shadows the outdated OID in packed-refs. + delete(packRefs.PrunedRefs, git.ReferenceName(change.GetReferenceName())) } } - elm = elm.Next() + return nil + }); err != nil { + return nil, err } var prunedRefs [][]byte @@ -1989,8 +1988,7 @@ func (mgr *TransactionManager) appendLogEntry(nextLSN LSN, logEntry *gitalypb.Lo mgr.appendedLSN = nextLSN mgr.snapshotLocks[nextLSN] = &snapshotLock{applied: make(chan struct{})} mgr.committedEntries.PushBack(&committedEntry{ - lsn: nextLSN, - entry: logEntry, + lsn: nextLSN, }) mgr.mutex.Unlock() @@ -2056,12 +2054,23 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, lsn LSN) error return fmt.Errorf("set applied LSN: %w", err) } - if err := mgr.deleteLogEntry(lsn); err != nil { - return fmt.Errorf("deleting log entry: %w", err) - } - mgr.appliedLSN = lsn + // When this log entry is applied, if there is any log in front of it which are still referred, we cannot delete + // it. This condition is to prevent a "hole" in the list. A transaction referring to a log entry at the + // low-water mark might scan all afterward log entries. + // + // ┌─ Can be removed ─┐ ┌─ Cannot be removed + // □ □ □ □ □ □ □ □ □ □ ■ ■ ⧅ ⧅ ⧅ ⧅ ⧅ ⧅ ■ ■ ⧅ ⧅ ⧅ ⧅ ■ + // └─ Low-water mark, still referred by another transaction + // + // Eventually, log entry at the low-water mark are removed when the last referring transaction finishes. + if lsn < mgr.lowWaterMark() { + if err := mgr.deleteLogEntry(lsn); err != nil { + return fmt.Errorf("deleting log entry: %w", err) + } + } + // There is no awaiter for a transaction if the transaction manager is recovering // transactions from the log after starting up. if resultChan, ok := mgr.awaitingTransactions[lsn]; ok { @@ -2535,6 +2544,19 @@ func (mgr *TransactionManager) deleteKey(key []byte) error { }) } +// lowWaterMark returns the earliest LSN of log entries which should be kept in the database. Any log entries LESS than +// this mark are removed. +func (mgr *TransactionManager) lowWaterMark() LSN { + mgr.mutex.Lock() + defer mgr.mutex.Unlock() + + elm := mgr.committedEntries.Front() + if elm == nil { + return mgr.appliedLSN + 1 + } + return elm.Value.(*committedEntry).lsn +} + // updateCommittedEntry updates the reader counter of the committed entry of the snapshot that this transaction depends on. func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN LSN) (*committedEntry, error) { // Since the goroutine doing this is holding the lock, the snapshotLSN shouldn't change and no new transactions @@ -2549,11 +2571,6 @@ func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN LSN) (*committed entry := &committedEntry{ lsn: snapshotLSN, snapshotReaders: 1, - // The log entry is left nil. This doesn't matter as the conflict checking only - // needs it when checking for conflicts with transactions committed after we took - // our snapshot. - // - // This `committedEntry` only exists to record the `snapshotReaders` at this LSN. } mgr.committedEntries.PushBack(entry) @@ -2561,9 +2578,34 @@ func (mgr *TransactionManager) updateCommittedEntry(snapshotLSN LSN) (*committed return entry, nil } +// walkCommittedEntries walks all committed entries after input transaction's snapshot LSN. It loads the content of the +// entry from disk and triggers the callback with entry content. +func (mgr *TransactionManager) walkCommittedEntries(transaction *Transaction, callback func(*gitalypb.LogEntry) error) error { + elm := mgr.committedEntries.Front() + for elm != nil { + committed := elm.Value.(*committedEntry) + if committed.lsn > transaction.snapshotLSN { + entry, err := mgr.readLogEntry(committed.lsn) + if err != nil { + return errCommittedEntryGone + } + // Transaction manager works on the partition level, including a repository and all of its pool + // member repositories (if any). We need to filter log entries of the repository this + // transaction targets. + if entry.RelativePath == transaction.relativePath { + if err := callback(entry); err != nil { + return err + } + } + } + elm = elm.Next() + } + return nil +} + // cleanCommittedEntry reduces the snapshot readers counter of the committed entry. It also removes entries with no more // readers at the head of the list. -func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) { +func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) error { entry.snapshotReaders-- elm := mgr.committedEntries.Front() @@ -2573,11 +2615,32 @@ func (mgr *TransactionManager) cleanCommittedEntry(entry *committedEntry) { // If the first entry had still some snapshot readers, that means // our transaction was not the oldest reader. We can't remove any entries // as they'll still be needed for conlict checking the older transactions. - return + return nil } + mgr.committedEntries.Remove(elm) + // When a transaction keeping references to the frontmost entry finishes, it's possible the transaction + // could be committed or canceled. If the transaction is committed, the manager blocks it until the + // referred log entry is applied. If the transaction is canceled, the referred log entry might not be + // applied. We could not clean them up until the manager applies them. + if front.lsn <= mgr.appliedLSN { + select { + // If the manager is closing or completely closed, the DB is probably in a non-ready state. It's + // likely a result of a rare race condition. There isn't anything we can do about it now. In the + // future, we'll need add a simple GC task to the manager to get rid of log entries <= + // appliedLSN at startup. That work is tracked in + // https://gitlab.com/gitlab-org/gitaly/-/issues/5798. + case <-mgr.closed: + case <-mgr.closing: + default: + if err := mgr.deleteLogEntry(front.lsn); err != nil { + return fmt.Errorf("removing log entry when not referred anymore: %w", err) + } + } + } elm = mgr.committedEntries.Front() } + return nil } // keyAppliedLSN returns the database key storing a partition's last applied log entry's LSN. diff --git a/internal/gitaly/storage/storagemgr/transaction_manager_test.go b/internal/gitaly/storage/storagemgr/transaction_manager_test.go index 2e5e2b8b894cfa8be26e049906d5ad41f9c8f582..9fda100f974d580c044233f4e940dec3e744e534 100644 --- a/internal/gitaly/storage/storagemgr/transaction_manager_test.go +++ b/internal/gitaly/storage/storagemgr/transaction_manager_test.go @@ -1543,8 +1543,14 @@ func generateCommonTests(t *testing.T, ctx context.Context, setup testTransactio } } +type expectedCommittedEntry struct { + lsn LSN + snapshotReaders int + entry *gitalypb.LogEntry +} + func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []transactionTestCase { - assertCommittedEntries := func(t *testing.T, expected []*committedEntry, actualList *list.List) { + assertCommittedEntries := func(t *testing.T, manager *TransactionManager, expected []*expectedCommittedEntry, actualList *list.List) { require.Equal(t, len(expected), actualList.Len()) i := 0 @@ -1552,7 +1558,13 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t actual := elm.Value.(*committedEntry) require.Equal(t, expected[i].lsn, actual.lsn) require.Equal(t, expected[i].snapshotReaders, actual.snapshotReaders) - testhelper.ProtoEqual(t, expected[i].entry, actual.entry) + + if expected[i].entry != nil { + actualEntry, err := manager.readLogEntry(actual.lsn) + require.NoError(t, err) + + testhelper.ProtoEqual(t, expected[i].entry, actualEntry) + } i++ } } @@ -1579,7 +1591,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t steps: steps{ StartManager{}, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries) }), }, }, @@ -1592,7 +1604,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t RelativePath: setup.RelativePath, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{ + assertCommittedEntries(t, tm, []*expectedCommittedEntry{ { lsn: 0, snapshotReaders: 1, @@ -1606,7 +1618,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries) }), Begin{ TransactionID: 2, @@ -1614,7 +1626,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t ExpectedSnapshotLSN: 1, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{ + assertCommittedEntries(t, tm, []*expectedCommittedEntry{ { lsn: 1, snapshotReaders: 1, @@ -1628,7 +1640,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries) }), }, expectedState: StateAssertion{ @@ -1673,7 +1685,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t ExpectedSnapshotLSN: 1, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{ + assertCommittedEntries(t, tm, []*expectedCommittedEntry{ { lsn: 1, snapshotReaders: 2, @@ -1687,7 +1699,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{ + assertCommittedEntries(t, tm, []*expectedCommittedEntry{ { lsn: 1, snapshotReaders: 1, @@ -1704,7 +1716,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t ExpectedSnapshotLSN: 2, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{ + assertCommittedEntries(t, tm, []*expectedCommittedEntry{ { lsn: 1, snapshotReaders: 1, @@ -1723,7 +1735,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t }, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{ + assertCommittedEntries(t, tm, []*expectedCommittedEntry{ { lsn: 2, entry: refChangeLogEntry("refs/heads/branch-1", setup.Commits.First.OID), @@ -1739,7 +1751,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t TransactionID: 4, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries) }), }, expectedState: StateAssertion{ @@ -1773,7 +1785,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t TransactionID: 1, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries) }), Begin{ TransactionID: 2, @@ -1784,7 +1796,7 @@ func generateCommittedEntriesTests(t *testing.T, setup testTransactionSetup) []t TransactionID: 2, }, AdhocAssertion(func(t *testing.T, ctx context.Context, tm *TransactionManager) { - assertCommittedEntries(t, []*committedEntry{}, tm.committedEntries) + assertCommittedEntries(t, tm, []*expectedCommittedEntry{}, tm.committedEntries) }), }, expectedState: StateAssertion{