From ef1ffee64b7801e2ff3cda8bfba15b517ab8f1bb Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Tue, 11 Apr 2023 17:44:04 +0300 Subject: [PATCH 1/4] Track open transactions in TransactionManager The TransactionManager needs to keep track of open transactions in order to avoid removing data they still need. Given sequence: - *latest hooks are from index 1* - Begin TX2 - Begin TX3 - Commit TX3 storing new hooks The TransactionManager can't prune hooks from index 1 before TX2 has finished as TX2 may still read them. Once all transactions that may be reading the old version of hooks have finished, the TransactionManager should prune the old version of the hooks to ensure we don't keep filling up the disk. This commit adds a list to keep track of open transactions. This list can later be used to synchronize with open transactions and waiting until all open transactions using the data have finished prior to removing it. As Begin is now registering the transactions and thus also writing to the TransactionManager's state from a different goroutine than Run(), the mutex is changed to a normal mutex from RWMutex. --- internal/gitaly/transaction_manager.go | 74 ++++++++++++++------- internal/gitaly/transaction_manager_test.go | 1 + 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 6a7f4f1436..3c67e3631f 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -2,6 +2,7 @@ package gitaly import ( "bytes" + "container/list" "context" "encoding/binary" "errors" @@ -133,6 +134,13 @@ type Transaction struct { // from the client goroutine to the TransactionManager.Run() goroutine, and the client goroutine must // not do any modifications to the state of the transcation anymore to avoid races. admitted bool + // finish cleans up the transaction releasing the resources associated with it. It must be called + // once the transaction is done with. + finish func() error + // finished is closed when the transaction has been finished. This enables waiting on transactions + // to finish where needed. + finished chan struct{} + // initStagingDirectory is called to lazily initialize the staging directory when it is // needed. initStagingDirectory func() error @@ -163,7 +171,7 @@ type Transaction struct { // // The returned Transaction's read snapshot includes all writes that were committed prior to the // Begin call. Begin blocks until the committed writes have been applied to the repository. -func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) { +func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, returnedErr error) { // Wait until the manager has been initialized so the notification channels // and the log indexes are loaded. select { @@ -172,7 +180,7 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) case <-mgr.initialized: } - mgr.mutex.RLock() + mgr.mutex.Lock() txn := &Transaction{ commit: mgr.commit, finalize: mgr.transactionFinalizer, @@ -180,10 +188,13 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) ReadIndex: mgr.appendedLogIndex, HookIndex: mgr.hookIndex, }, + finished: make(chan struct{}), } + openTransactionElement := mgr.openTransactions.PushBack(txn) + readReady := mgr.applyNotifications[txn.snapshot.ReadIndex] - mgr.mutex.RUnlock() + mgr.mutex.Unlock() if readReady == nil { // The snapshot log entry is already applied if there is no notification channel for it. // If so, the transaction is ready to begin immediately. @@ -201,6 +212,30 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) return nil } + txn.finish = func() error { + defer close(txn.finished) + + mgr.mutex.Lock() + mgr.openTransactions.Remove(openTransactionElement) + mgr.mutex.Unlock() + + if txn.stagingDirectory != "" { + if err := os.RemoveAll(txn.stagingDirectory); err != nil { + return fmt.Errorf("remove staging directory: %w", err) + } + } + + return nil + } + + defer func() { + if returnedErr != nil { + // finish can't return an error as the staging directory isn't yet created, and won't + // be attempted to be deleted. + _ = txn.finish() + } + }() + select { case <-ctx.Done(): return nil, ctx.Err() @@ -217,7 +252,7 @@ func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) { defer func() { txn.finalize() - if err := txn.cleanUnadmitted(); err != nil && returnedErr == nil { + if err := txn.finishUnadmitted(); err != nil && returnedErr == nil { returnedErr = err } }() @@ -228,29 +263,18 @@ func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) { // Rollback releases resources associated with the transaction without performing any changes. func (txn *Transaction) Rollback() error { defer txn.finalize() - return txn.cleanUnadmitted() + return txn.finishUnadmitted() } -// cleanUnadmitted cleans up after the transaction if it wasn't yet admitted. If the transaction was admitted, +// finishUnadmitted cleans up after the transaction if it wasn't yet admitted. If the transaction was admitted, // the Transaction is being processed by TransactionManager. The clean up responsibility moves there as well // to avoid races. -func (txn *Transaction) cleanUnadmitted() error { +func (txn *Transaction) finishUnadmitted() error { if txn.admitted { return nil } - return txn.clean() -} - -// clean cleans up the resources associated with the transaction. -func (txn *Transaction) clean() error { - if txn.stagingDirectory != "" { - if err := os.RemoveAll(txn.stagingDirectory); err != nil { - return fmt.Errorf("remove staging directory: %w", err) - } - } - - return nil + return txn.finish() } // Snapshot returns the details of the Transaction's read snapshot. @@ -381,13 +405,16 @@ type TransactionManager struct { // admissionQueue is where the incoming writes are waiting to be admitted to the transaction // manager. admissionQueue chan *Transaction + // openTransactions contains all transactions that have been begun but not yet committed or rolled back. + // The transactions are ordered from the oldest to the newest. + openTransactions *list.List // initialized is closed when the manager has been initialized. It's used to block new transactions // from beginning prior to the manager having initialized its runtime state on start up. initialized chan struct{} // mutex guards access to applyNotifications and appendedLogIndex. These fields are accessed by both // Run and Begin which are ran in different goroutines. - mutex sync.RWMutex + mutex sync.Mutex // applyNotifications stores channels that are closed when a log entry is applied. These // are used to block transactions from beginning before their snapshot is ready. applyNotifications map[LogIndex]chan struct{} @@ -426,6 +453,7 @@ func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir relativePath: relativePath, db: newDatabaseAdapter(db), admissionQueue: make(chan *Transaction), + openTransactions: list.New(), initialized: make(chan struct{}), applyNotifications: make(map[LogIndex]chan struct{}), stagingDirectory: stagingDir, @@ -630,10 +658,10 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { var transaction *Transaction select { case transaction = <-mgr.admissionQueue: - // The Transaction does not clean up itself anymore once it has been admitted for + // The Transaction does not finish itself anymore once it has been admitted for // processing. This avoids the Transaction concurrently removing the staged state - // while the manager is still operating on it. We thus need to defer its clean up. - cleanUps = append(cleanUps, transaction.clean) + // while the manager is still operating on it. We thus need to defer its finishing. + cleanUps = append(cleanUps, transaction.finish) case <-mgr.ctx.Done(): } diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index c2c68f76ea..d67b269ed5 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -2664,6 +2664,7 @@ func checkManagerError(t *testing.T, managerErrChannel chan error, mgr *Transact referenceUpdates: ReferenceUpdates{"sentinel": {}}, result: make(chan error, 1), finalize: func() {}, + finish: func() error { return nil }, } var ( -- GitLab From e4979c119f91dc0ec4a1860609ddc284fa97f56a Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Sat, 15 Apr 2023 13:53:45 +0300 Subject: [PATCH 2/4] Write-ahead log repository deletions Repository deletions need to be write-ahead logged as well. This ensures their atomicity and that they can be replicated later as part of the log. Once a repository deletion has been commited, all subsequent Begin and Commit calls will fail with a 'repository not found' error. The repository is logically deleted but not yet physically. The physical deletion needs to wait for open transactions to finish so we don't remove the files they are operating on. For now, it's possible to set other updates in the Transaction even if it ultimately removes the repository. Technically this is fine but it's a bit non-sensical and we don't have a use case for it in Gitaly. We'll probably later improve the interface by splitting out different transaction types so UpdateReferences() can't be called on the same transaction that deletes the repository. --- internal/gitaly/transaction_manager.go | 193 ++++++++++-- internal/gitaly/transaction_manager_test.go | 314 ++++++++++++++++++-- proto/go/gitalypb/log.pb.go | 125 ++++++-- proto/log.proto | 6 + 4 files changed, 568 insertions(+), 70 deletions(-) diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 3c67e3631f..b4476b13de 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -24,11 +24,15 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" ) +// ErrRepositoryNotFound is returned when the repository doesn't exist. +var ErrRepositoryNotFound = structerr.NewNotFound("repository not found") + // ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions. var ErrTransactionProcessingStopped = errors.New("transaction processing stopped") @@ -164,6 +168,7 @@ type Transaction struct { referenceUpdates ReferenceUpdates defaultBranchUpdate *DefaultBranchUpdate customHooksUpdate *CustomHooksUpdate + deleteRepository bool } // Begin opens a new transaction. The caller must call either Commit or Rollback to release @@ -181,6 +186,11 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, retur } mgr.mutex.Lock() + if !mgr.repositoryExists { + mgr.mutex.Unlock() + return nil, ErrRepositoryNotFound + } + txn := &Transaction{ commit: mgr.commit, finalize: mgr.transactionFinalizer, @@ -299,6 +309,11 @@ func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) { txn.referenceUpdates = updates } +// DeleteRepository deletes the repository when the transaction is committed. +func (txn *Transaction) DeleteRepository() { + txn.deleteRepository = true +} + // QuarantineDirectory returns an absolute path to the transaction's quarantine directory. The quarantine directory // is a Git object directory where the new objects introduced in the transaction must be written. The quarantined // objects needed by the updated reference tips will be included in the transaction. @@ -394,6 +409,9 @@ type TransactionManager struct { // Gitaly starts. stagingDirectory string + // repositoryExists marks whether the repository exists or not. The repository may not exist if it has + // never been created, or if it has been deleted. + repositoryExists bool // repository is the repository this TransactionManager is acting on. repository repository // repositoryPath is the path to the repository this TransactionManager is acting on. @@ -672,6 +690,10 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } transaction.result <- func() (commitErr error) { + if !mgr.repositoryExists { + return ErrRepositoryNotFound + } + logEntry := &gitalypb.LogEntry{} var err error @@ -695,8 +717,8 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { CustomHooksTar: transaction.customHooksUpdate.CustomHooksTAR, } } - nextLogIndex := mgr.appendedLogIndex + 1 + nextLogIndex := mgr.appendedLogIndex + 1 if transaction.includesPack { logEntry.IncludesPack = true @@ -719,6 +741,10 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } } + if transaction.deleteRepository { + logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} + } + return mgr.appendLogEntry(nextLogIndex, logEntry) }() @@ -733,10 +759,6 @@ func (mgr *TransactionManager) Stop() { mgr.stop() } func (mgr *TransactionManager) initialize(ctx context.Context) error { defer close(mgr.initialized) - if err := mgr.createDirectories(); err != nil { - return fmt.Errorf("create directories: %w", err) - } - var appliedLogIndex gitalypb.LogIndex if err := mgr.readKey(keyAppliedLogIndex(mgr.relativePath), &appliedLogIndex); err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return fmt.Errorf("read applied log index: %w", err) @@ -770,6 +792,10 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return fmt.Errorf("determine appended log index: %w", err) } + if err := mgr.determineRepositoryExistence(); err != nil { + return fmt.Errorf("determine repository existence: %w", err) + } + var err error mgr.hookIndex, err = mgr.determineHookIndex(ctx, mgr.appendedLogIndex, mgr.appliedLogIndex) if err != nil { @@ -789,6 +815,46 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return nil } +// determineRepositoryExistence determines whether the repository exists or not by looking +// at whether the directory exists and whether there is a deletion request logged. +func (mgr *TransactionManager) determineRepositoryExistence() error { + stat, err := os.Stat(mgr.repositoryPath) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("stat repository directory: %w", err) + } + } + + if stat != nil { + if !stat.IsDir() { + return fmt.Errorf("repository's path didn't point to a directory") + } + + if err := mgr.createDirectories(); err != nil { + return fmt.Errorf("create directories: %w", err) + } + + mgr.repositoryExists = true + } + + // Check whether the last log entry is a repository deletion. If so, + // the repository has been deleted but the deletion wasn't yet applied. + // The deletion is the last entry always as no further writes are + // accepted if the repository doesn't exist. + if mgr.appliedLogIndex < mgr.appendedLogIndex { + logEntry, err := mgr.readLogEntry(mgr.appendedLogIndex) + if err != nil { + return fmt.Errorf("read log entry: %w", err) + } + + if logEntry.RepositoryDeletion != nil { + mgr.repositoryExists = false + } + } + + return nil +} + // determineHookIndex determines the latest hooks in the repository. // // 1. First we iterate through the unapplied log in reverse order. The first log entry that @@ -798,6 +864,11 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { // to see which are the latest. // 3. If we found no hooks in the log nor in the repository, there are no hooks configured. func (mgr *TransactionManager) determineHookIndex(ctx context.Context, appendedIndex, appliedIndex LogIndex) (LogIndex, error) { + if !mgr.repositoryExists { + // If the repository doesn't exist, then there are no hooks either. + return 0, nil + } + for i := appendedIndex; appliedIndex < i; i-- { logEntry, err := mgr.readLogEntry(i) if err != nil { @@ -1072,6 +1143,10 @@ func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *g mgr.hookIndex = nextLogIndex } mgr.applyNotifications[nextLogIndex] = make(chan struct{}) + if logEntry.RepositoryDeletion != nil { + mgr.repositoryExists = false + mgr.hookIndex = 0 + } mgr.mutex.Unlock() return nil @@ -1084,27 +1159,36 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return fmt.Errorf("read log entry: %w", err) } - if logEntry.IncludesPack { - if err := mgr.applyPackFile(ctx, logIndex); err != nil { - return fmt.Errorf("apply pack file: %w", err) + if logEntry.RepositoryDeletion != nil { + // If the repository is being deleted, just delete it without any other changes given + // they'd all be removed anyway. Reapplying the other changes after a crash would also + // not work if the repository was successfully deleted before the crash. + if err := mgr.applyRepositoryDeletion(ctx, logIndex); err != nil { + return fmt.Errorf("apply repository deletion: %w", err) + } + } else { + if logEntry.IncludesPack { + if err := mgr.applyPackFile(ctx, logIndex); err != nil { + return fmt.Errorf("apply pack file: %w", err) + } } - } - updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, mgr.repository) - if err != nil { - return fmt.Errorf("perpare reference transaction: %w", err) - } + updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, mgr.repository) + if err != nil { + return fmt.Errorf("prepare reference transaction: %w", err) + } - if err := updater.Commit(); err != nil { - return fmt.Errorf("commit transaction: %w", err) - } + if err := updater.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } - if err := mgr.updateDefaultBranch(ctx, logEntry.DefaultBranchUpdate); err != nil { - return fmt.Errorf("writing default branch: %w", err) - } + if err := mgr.updateDefaultBranch(ctx, logEntry.DefaultBranchUpdate); err != nil { + return fmt.Errorf("writing default branch: %w", err) + } - if err := mgr.applyCustomHooks(ctx, logIndex, logEntry.CustomHooksUpdate); err != nil { - return fmt.Errorf("apply custom hooks: %w", err) + if err := mgr.applyCustomHooks(ctx, logIndex, logEntry.CustomHooksUpdate); err != nil { + return fmt.Errorf("apply custom hooks: %w", err) + } } if err := mgr.storeAppliedLogIndex(logIndex); err != nil { @@ -1132,6 +1216,73 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return nil } +// applyRepositoryDeletion deletes the repository. +// +// Given how the repositories are laid out in the storage, we currently can't support MVCC for them. +// This is because there is only ever a single instance of a given repository. We have to wait for all +// of the readers to finish before we can delete the repository as otherwise the readers could fail in +// unexpected ways and it would be an isolation violation. Repository deletions thus block before all +// transaction with an older read snapshot are done with the repository. +func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, index LogIndex) error { + for { + mgr.mutex.Lock() + oldestElement := mgr.openTransactions.Front() + mgr.mutex.Unlock() + if oldestElement == nil { + // If there are no open transactions, the deletion can proceed as there are + // no readers. + // + // Any new transaction would have the deletion in their snapshot, and are waiting + // for it to be applied prior to beginning. + break + } + + oldestTransaction := oldestElement.Value.(*Transaction) + if oldestTransaction.snapshot.ReadIndex >= index { + // If the oldest transaction is reading at this or later log index, it already has the deletion + // in its snapshot, and is waiting for it to be applied. Proceed with the deletion as there + // are no readers with the pre-deletion state in the snapshot. + break + } + + for { + select { + case <-oldestTransaction.finished: + // The oldest transaction finished. Proceed to check the second oldest open transaction. + case transaction := <-mgr.admissionQueue: + // The oldest transaction could also be waiting to commit. Since the Run goroutine is + // blocked here waiting for the transaction to finish, the write would never be admitted + // for processing, leading to a deadlock. Since the repository was deleted, the only correct + // outcome for the transaction would be to receive a not found error. Admit the transaction, + // and finish it with the correct result so we can unblock the deletion. + transaction.result <- ErrRepositoryNotFound + if err := transaction.finish(); err != nil { + return fmt.Errorf("finish transaction: %w", err) + } + + continue + case <-ctx.Done(): + } + + if err := ctx.Err(); err != nil { + return err + } + + break + } + } + + if err := os.RemoveAll(mgr.repositoryPath); err != nil { + return fmt.Errorf("remove repository: %w", err) + } + + if err := safe.NewSyncer().Sync(filepath.Dir(mgr.repositoryPath)); err != nil { + return fmt.Errorf("sync: %w", err) + } + + return nil +} + // applyPackFile unpacks the objects from the pack file into the repository if the log entry // has an associated pack file. func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error { diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index d67b269ed5..f5d422da16 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -293,6 +293,8 @@ func TestTransactionManager(t *testing.T) { DefaultBranchUpdate *DefaultBranchUpdate // CustomHooksUpdate is the custom hooks update to commit. CustomHooksUpdate *CustomHooksUpdate + // DeleteRepository deletes the repository on commit. + DeleteRepository bool } // Rollback calls Rollback on a transaction. @@ -307,8 +309,15 @@ func TestTransactionManager(t *testing.T) { ExpectedObjects []git.ObjectID } + // RemoveRepository removes the repository from the disk. It must be run with the TransactionManager + // stopped. + type RemoveRepository struct{} + // StateAssertions models an assertion of the entire state managed by the TransactionManager. type StateAssertion struct { + // RepositoryDoesntExist indicates the repository should not exist. If so, we just check that's + // the case and don't assert the other state. + RepositoryDoesntExist bool // DefaultBranch is the expected refname that HEAD points to. DefaultBranch git.ReferenceName // References is the expected state of references. @@ -2327,6 +2336,257 @@ func TestTransactionManager(t *testing.T) { }, }, }, + { + desc: "begin fails after repository deletion", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Begin{ + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "repository deletion fails if repository is deleted", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Commit{ + TransactionID: 2, + DeleteRepository: true, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "custom hooks update fails if repository is deleted", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Commit{ + TransactionID: 2, + CustomHooksUpdate: &CustomHooksUpdate{validCustomHooks(t)}, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "reference updates fail if repository is deleted", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "default branch update fails if repository is deleted", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Commit{ + TransactionID: 2, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/new-default", + }, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "logged repository deletions are considered after restart", + steps: steps{ + StartManager{ + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + StartManager{}, + Begin{ + TransactionID: 2, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "reapplying repository deletion works", + steps: steps{ + StartManager{ + Hooks: testHooks{ + BeforeStoreAppliedLogIndex: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + StartManager{}, + Begin{ + TransactionID: 2, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "non-existent repository is correctly handled", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + }, + }, + { + // This is a serialization violation as the outcome would be different + // if the transactions were applied in different order. + desc: "deletion succeeds with concurrent writes to repository", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/branch", + }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, + }, + Commit{ + TransactionID: 2, + DeleteRepository: true, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(), + }, + }, + }, } type invalidReferenceTestCase struct { @@ -2599,6 +2859,10 @@ func TestTransactionManager(t *testing.T) { } + if step.DeleteRepository { + transaction.DeleteRepository() + } + commitCtx := ctx if step.Context != nil { commitCtx = step.Context @@ -2611,6 +2875,8 @@ func TestTransactionManager(t *testing.T) { case Prune: gittest.Exec(t, setup.Config, "-C", repoPath, "prune") require.ElementsMatch(t, step.ExpectedObjects, gittest.ListObjects(t, setup.Config, repoPath)) + case RemoveRepository: + require.NoError(t, os.RemoveAll(repoPath)) default: t.Fatalf("unhandled step type: %T", step) } @@ -2621,34 +2887,40 @@ func TestTransactionManager(t *testing.T) { require.NoError(t, err) } - RequireReferences(t, ctx, repo, tc.expectedState.References) - RequireDefaultBranch(t, ctx, repo, tc.expectedState.DefaultBranch) RequireDatabase(t, ctx, database, tc.expectedState.Database) - expectedDirectory := tc.expectedState.Directory - if expectedDirectory == nil { - // Set the base state as the default so we don't have to repeat it in every test case but it - // gets asserted. - expectedDirectory = testhelper.DirectoryState{ - "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + if !tc.expectedState.RepositoryDoesntExist { + require.DirExists(t, repoPath) + RequireReferences(t, ctx, repo, tc.expectedState.References) + RequireDefaultBranch(t, ctx, repo, tc.expectedState.DefaultBranch) + + expectedDirectory := tc.expectedState.Directory + if expectedDirectory == nil { + // Set the base state as the default so we don't have to repeat it in every test case but it + // gets asserted. + expectedDirectory = testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + } } - } - testhelper.RequireDirectoryState(t, repoPath, "wal", expectedDirectory) + testhelper.RequireDirectoryState(t, repoPath, "wal", expectedDirectory) - expectedObjects := tc.expectedState.Objects - if expectedObjects == nil { - expectedObjects = []git.ObjectID{ - setup.ObjectHash.EmptyTreeOID, - setup.Commits.First.OID, - setup.Commits.Second.OID, - setup.Commits.Third.OID, + expectedObjects := tc.expectedState.Objects + if expectedObjects == nil { + expectedObjects = []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + setup.Commits.Second.OID, + setup.Commits.Third.OID, + } } - } - require.ElementsMatch(t, expectedObjects, gittest.ListObjects(t, setup.Config, repoPath)) + require.ElementsMatch(t, expectedObjects, gittest.ListObjects(t, setup.Config, repoPath)) + } else { + require.NoDirExists(t, repoPath) + } entries, err := os.ReadDir(stagingDir) require.NoError(t, err) diff --git a/proto/go/gitalypb/log.pb.go b/proto/go/gitalypb/log.pb.go index 73fb101488..3ed22d2827 100644 --- a/proto/go/gitalypb/log.pb.go +++ b/proto/go/gitalypb/log.pb.go @@ -41,6 +41,8 @@ type LogEntry struct { // includes_pack denotes whether this log entry has a pack file associated // with it. The pack files are stored separately on the filesystem. IncludesPack bool `protobuf:"varint,4,opt,name=includes_pack,json=includesPack,proto3" json:"includes_pack,omitempty"` + // RepositoryDeletion, when set, indicates this log entry deletes the repository. + RepositoryDeletion *LogEntry_RepositoryDeletion `protobuf:"bytes,5,opt,name=repository_deletion,json=repositoryDeletion,proto3" json:"repository_deletion,omitempty"` } func (x *LogEntry) Reset() { @@ -103,6 +105,13 @@ func (x *LogEntry) GetIncludesPack() bool { return false } +func (x *LogEntry) GetRepositoryDeletion() *LogEntry_RepositoryDeletion { + if x != nil { + return x.RepositoryDeletion + } + return nil +} + // LogIndex serializes a log index. It's used for storing a repository's // applied log index in the database. // @@ -318,11 +327,50 @@ func (x *LogEntry_CustomHooksUpdate) GetCustomHooksTar() []byte { return nil } +// RepositoryDeletion models a repository deletion. +type LogEntry_RepositoryDeletion struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *LogEntry_RepositoryDeletion) Reset() { + *x = LogEntry_RepositoryDeletion{} + if protoimpl.UnsafeEnabled { + mi := &file_log_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogEntry_RepositoryDeletion) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEntry_RepositoryDeletion) ProtoMessage() {} + +func (x *LogEntry_RepositoryDeletion) ProtoReflect() protoreflect.Message { + mi := &file_log_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEntry_RepositoryDeletion.ProtoReflect.Descriptor instead. +func (*LogEntry_RepositoryDeletion) Descriptor() ([]byte, []int) { + return file_log_proto_rawDescGZIP(), []int{0, 3} +} + var File_log_proto protoreflect.FileDescriptor var file_log_proto_rawDesc = []byte{ 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x22, 0xfc, 0x03, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x61, 0x6c, 0x79, 0x22, 0xe8, 0x04, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x4d, 0x0a, 0x11, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, @@ -341,26 +389,33 @@ var file_log_proto_rawDesc = []byte{ 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x50, 0x61, - 0x63, 0x6b, 0x1a, 0x51, 0x0a, 0x0f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, - 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, - 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, - 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, - 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, - 0x65, 0x77, 0x4f, 0x69, 0x64, 0x1a, 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, - 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, - 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, - 0x61, 0x6d, 0x65, 0x1a, 0x3d, 0x0a, 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, - 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, - 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, - 0x61, 0x72, 0x22, 0x27, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, - 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, 0x67, - 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, - 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, - 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x6b, 0x12, 0x54, 0x0a, 0x13, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, + 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, + 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, + 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x51, 0x0a, 0x0f, 0x52, 0x65, 0x66, 0x65, + 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, + 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, 0x4f, 0x69, 0x64, 0x1a, 0x3c, 0x0a, 0x13, 0x44, + 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, + 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x1a, 0x3d, 0x0a, 0x11, 0x43, 0x75, 0x73, + 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, + 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x74, + 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, + 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, 0x61, 0x72, 0x1a, 0x14, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x27, + 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, + 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6c, + 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, + 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -375,23 +430,25 @@ func file_log_proto_rawDescGZIP() []byte { return file_log_proto_rawDescData } -var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_log_proto_goTypes = []interface{}{ (*LogEntry)(nil), // 0: gitaly.LogEntry (*LogIndex)(nil), // 1: gitaly.LogIndex (*LogEntry_ReferenceUpdate)(nil), // 2: gitaly.LogEntry.ReferenceUpdate (*LogEntry_DefaultBranchUpdate)(nil), // 3: gitaly.LogEntry.DefaultBranchUpdate (*LogEntry_CustomHooksUpdate)(nil), // 4: gitaly.LogEntry.CustomHooksUpdate + (*LogEntry_RepositoryDeletion)(nil), // 5: gitaly.LogEntry.RepositoryDeletion } var file_log_proto_depIdxs = []int32{ 2, // 0: gitaly.LogEntry.reference_updates:type_name -> gitaly.LogEntry.ReferenceUpdate 3, // 1: gitaly.LogEntry.default_branch_update:type_name -> gitaly.LogEntry.DefaultBranchUpdate 4, // 2: gitaly.LogEntry.custom_hooks_update:type_name -> gitaly.LogEntry.CustomHooksUpdate - 3, // [3:3] is the sub-list for method output_type - 3, // [3:3] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 5, // 3: gitaly.LogEntry.repository_deletion:type_name -> gitaly.LogEntry.RepositoryDeletion + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_log_proto_init() } @@ -460,6 +517,18 @@ func file_log_proto_init() { return nil } } + file_log_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LogEntry_RepositoryDeletion); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -467,7 +536,7 @@ func file_log_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_log_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 6, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/log.proto b/proto/log.proto index 9ca6c32ac9..15b031c549 100644 --- a/proto/log.proto +++ b/proto/log.proto @@ -35,6 +35,10 @@ message LogEntry { bytes custom_hooks_tar = 1; } + // RepositoryDeletion models a repository deletion. + message RepositoryDeletion { + } + // reference_updates contains the reference updates this log // entry records. The logged reference updates have already passed // through verification and are applied without any further checks. @@ -47,6 +51,8 @@ message LogEntry { // includes_pack denotes whether this log entry has a pack file associated // with it. The pack files are stored separately on the filesystem. bool includes_pack = 4; + // RepositoryDeletion, when set, indicates this log entry deletes the repository. + RepositoryDeletion repository_deletion = 5; } // LogIndex serializes a log index. It's used for storing a repository's -- GitLab From 86c1b4ec4ac3170ade5eb17aaf5a2764156309ee Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Sun, 16 Apr 2023 15:47:11 +0300 Subject: [PATCH 3/4] Prevent transaction beginning if initialization fails Begin currently waits for the TransactionManager to initialize but it doesn't check whether the initialization was successful. As the transaction will not anyway succeed if the initialization failed, return an appropriate error from Begin instead. This way the transaction doesn't end up doing any useless work and the error message will be clearer than having a random failure at some point during the transaction. --- internal/gitaly/transaction_manager.go | 25 ++++++++++++++++----- internal/gitaly/transaction_manager_test.go | 24 ++++++++++++++++++++ 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index b4476b13de..ad8159ac17 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -30,11 +30,16 @@ import ( "google.golang.org/protobuf/proto" ) -// ErrRepositoryNotFound is returned when the repository doesn't exist. -var ErrRepositoryNotFound = structerr.NewNotFound("repository not found") - -// ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions. -var ErrTransactionProcessingStopped = errors.New("transaction processing stopped") +var ( + // ErrRepositoryNotFound is returned when the repository doesn't exist. + ErrRepositoryNotFound = structerr.NewNotFound("repository not found") + // ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions. + ErrTransactionProcessingStopped = errors.New("transaction processing stopped") + // errInitializationFailed is returned when the TransactionManager failed to initialize successfully. + errInitializationFailed = errors.New("initializing transaction processing failed") + // errNotDirectory is returned when the repository's path doesn't point to a directory + errNotDirectory = errors.New("repository's path didn't point to a directory") +) // InvalidReferenceFormatError is returned when a reference name was invalid. type InvalidReferenceFormatError struct { @@ -183,6 +188,9 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, retur case <-ctx.Done(): return nil, ctx.Err() case <-mgr.initialized: + if !mgr.initializationSuccessful { + return nil, errInitializationFailed + } } mgr.mutex.Lock() @@ -430,6 +438,9 @@ type TransactionManager struct { // initialized is closed when the manager has been initialized. It's used to block new transactions // from beginning prior to the manager having initialized its runtime state on start up. initialized chan struct{} + // initializationSuccessful is set if the TransactionManager initialized successfully. If it didn't, + // transactions will fail to begin. + initializationSuccessful bool // mutex guards access to applyNotifications and appendedLogIndex. These fields are accessed by both // Run and Begin which are ran in different goroutines. mutex sync.Mutex @@ -812,6 +823,8 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return fmt.Errorf("remove stale packs: %w", err) } + mgr.initializationSuccessful = true + return nil } @@ -827,7 +840,7 @@ func (mgr *TransactionManager) determineRepositoryExistence() error { if stat != nil { if !stat.IsDir() { - return fmt.Errorf("repository's path didn't point to a directory") + return errNotDirectory } if err := mgr.createDirectories(); err != nil { diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index f5d422da16..4f5ad8ace2 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -2587,6 +2587,30 @@ func TestTransactionManager(t *testing.T) { }, }, }, + { + desc: "failing initialization prevents transaction beginning", + steps: steps{ + StartManager{ + ModifyRepository: func(tb testing.TB, repoPath string) { + // Remove the repository's directory and create a file in its place + // to fail the initialization. + require.NoError(t, os.RemoveAll(repoPath)) + require.NoError(t, os.WriteFile(repoPath, nil, fs.ModePerm)) + }, + ExpectedError: errNotDirectory, + }, + Begin{ + ExpectedError: errInitializationFailed, + }, + AssertManager{ + ExpectedError: errNotDirectory, + }, + }, + expectedState: StateAssertion{ + // The file still exists on the disk but this skips the repository assertions. + RepositoryDoesntExist: true, + }, + }, } type invalidReferenceTestCase struct { -- GitLab From d9438b2033f2494e8c7d3daf35f86abc07453a45 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Sun, 16 Apr 2023 16:23:22 +0300 Subject: [PATCH 4/4] Write-ahead log repository creations This commits implements write-ahead logging for repository creations. Repository creations need to be write-ahead logged as well. This is necessary for a few reasons: 1. The repository creations need to be serialized with other writes to avoid bad interactions from concurrency. Gitaly is already serializing them by using lock files on the disk. However, as the TransactionManager is already serializing writes, it makes sense to converge on one approach. This way reads also don't go through before the repository is fully created, thus making creations atomic. 2. The creations need to be logged so they can be backed up. This allows for the repository to be immediately backed up with WAL archival directly from creation. 3. Logging the creations ensures they'll be replicated eventually as part of the log when Raft comes around. --- internal/gitaly/partition_manager.go | 8 +- internal/gitaly/partition_manager_test.go | 2 +- internal/gitaly/transaction_manager.go | 147 ++++++++++++- internal/gitaly/transaction_manager_test.go | 232 +++++++++++++++++++- proto/go/gitalypb/log.pb.go | 190 +++++++++++----- proto/log.proto | 10 + 6 files changed, 521 insertions(+), 68 deletions(-) diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go index 408d5f4a32..83f9c9ec4b 100644 --- a/internal/gitaly/partition_manager.go +++ b/internal/gitaly/partition_manager.go @@ -9,6 +9,7 @@ import ( "github.com/dgraph-io/badger/v3" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" @@ -31,6 +32,8 @@ type PartitionManager struct { partitions map[string]*partition // localRepoFactory is used by PartitionManager to construct `localrepo.Repo`. localRepoFactory localrepo.Factory + // commandFactory is passed as a dependency to the constructed TransactionManagers. + commandFactory git.CommandFactory // logger handles all logging for PartitionManager. logger logrus.FieldLogger // stopped tracks whether the PartitionManager has been stopped. If the manager is stopped, @@ -61,7 +64,7 @@ type partition struct { } // NewPartitionManager returns a new PartitionManager. -func NewPartitionManager(db *badger.DB, storages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger, stagingDir string) *PartitionManager { +func NewPartitionManager(db *badger.DB, storages []config.Storage, localRepoFactory localrepo.Factory, cmdFactory git.CommandFactory, logger logrus.FieldLogger, stagingDir string) *PartitionManager { storagesMap := make(map[string]string, len(storages)) for _, storage := range storages { storagesMap[storage.Name] = storage.Path @@ -71,6 +74,7 @@ func NewPartitionManager(db *badger.DB, storages []config.Storage, localRepoFact db: db, partitions: make(map[string]*partition), localRepoFactory: localRepoFactory, + commandFactory: cmdFactory, logger: logger, stagingDirectory: stagingDir, storages: storagesMap, @@ -123,7 +127,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran return nil, fmt.Errorf("scope by storage: %w", err) } - mgr := NewTransactionManager(pm.db, storagePath, relativePath, stagingDir, storageScopedFactory, pm.transactionFinalizerFactory(ptn)) + mgr := NewTransactionManager(pm.db, storagePath, relativePath, stagingDir, storageScopedFactory, pm.commandFactory, pm.transactionFinalizerFactory(ptn)) ptn.transactionManager = mgr pm.partitions[partitionKey] = ptn diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go index aeb26f7e30..39e3e14f35 100644 --- a/internal/gitaly/partition_manager_test.go +++ b/internal/gitaly/partition_manager_test.go @@ -433,7 +433,7 @@ func TestPartitionManager(t *testing.T) { stagingDir := filepath.Join(t.TempDir(), "staging") require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir)) - partitionManager := NewPartitionManager(database, cfg.Storages, localRepoFactory, logrus.StandardLogger(), stagingDir) + partitionManager := NewPartitionManager(database, cfg.Storages, localRepoFactory, cmdFactory, logrus.StandardLogger(), stagingDir) defer func() { partitionManager.Stop() // Assert all staging directories have been removed. diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index ad8159ac17..477296a0ed 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -33,6 +33,8 @@ import ( var ( // ErrRepositoryNotFound is returned when the repository doesn't exist. ErrRepositoryNotFound = structerr.NewNotFound("repository not found") + // ErrRepositoryAlreadyExists is returned when the repository already exists. + ErrRepositoryAlreadyExists = structerr.NewAlreadyExists("repository already exists") // ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions. ErrTransactionProcessingStopped = errors.New("transaction processing stopped") // errInitializationFailed is returned when the TransactionManager failed to initialize successfully. @@ -164,6 +166,9 @@ type Transaction struct { includesPack bool // stagingRepository is a repository that is used to stage the transaction. If there are quarantined // objects, it has the quarantine applied so the objects are available for verification and packing. + // Generally the staging repository is the actual repository instance. If the repository doesn't exist + // yet, the staging repository is a temporary repository that is deleted once the transaction has been + // finished. stagingRepository repository // Snapshot contains the details of the Transaction's read snapshot. @@ -173,6 +178,7 @@ type Transaction struct { referenceUpdates ReferenceUpdates defaultBranchUpdate *DefaultBranchUpdate customHooksUpdate *CustomHooksUpdate + repositoryCreation *gitalypb.LogEntry_RepositoryCreation deleteRepository bool } @@ -181,7 +187,17 @@ type Transaction struct { // // The returned Transaction's read snapshot includes all writes that were committed prior to the // Begin call. Begin blocks until the committed writes have been applied to the repository. -func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, returnedErr error) { +func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) { + return mgr.begin(ctx, true) +} + +// BeginCreation starts a new transaction. It allowas the transaction to start even if the repository +// doesn't exist. See the documentation for Begin for details on the use. +func (mgr *TransactionManager) BeginCreation(ctx context.Context) (*Transaction, error) { + return mgr.begin(ctx, false) +} + +func (mgr *TransactionManager) begin(ctx context.Context, repositoryShouldExist bool) (_ *Transaction, returnedErr error) { // Wait until the manager has been initialized so the notification channels // and the log indexes are loaded. select { @@ -194,9 +210,9 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, retur } mgr.mutex.Lock() - if !mgr.repositoryExists { + if err := mgr.verifyRepositoryExistence(repositoryShouldExist); err != nil { mgr.mutex.Unlock() - return nil, ErrRepositoryNotFound + return nil, err } txn := &Transaction{ @@ -221,6 +237,10 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, retur } txn.initStagingDirectory = func() error { + if txn.stagingDirectory != "" { + return nil + } + stagingDirectory, err := os.MkdirTemp(mgr.stagingDirectory, "") if err != nil { return fmt.Errorf("mkdir temp: %w", err) @@ -264,6 +284,16 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (_ *Transaction, retur } } +func (mgr *TransactionManager) verifyRepositoryExistence(shouldExist bool) error { + if shouldExist && !mgr.repositoryExists { + return ErrRepositoryNotFound + } else if !shouldExist && mgr.repositoryExists { + return ErrRepositoryAlreadyExists + } + + return nil +} + // Commit performs the changes. If no error is returned, the transaction was successful and the changes // have been performed. If an error was returned, the transaction may or may not be persisted. func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) { @@ -317,6 +347,13 @@ func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) { txn.referenceUpdates = updates } +// CreateRepository creates the repository with the specified object format when the transaction commits. +func (txn *Transaction) CreateRepository(objectFormat git.ObjectHash) { + txn.repositoryCreation = &gitalypb.LogEntry_RepositoryCreation{ + ObjectFormat: objectFormat.ProtoFormat, + } +} + // DeleteRepository deletes the repository when the transaction is committed. func (txn *Transaction) DeleteRepository() { txn.deleteRepository = true @@ -416,7 +453,10 @@ type TransactionManager struct { // left around after crashes. The files are temporary and any leftover files are expected to be cleaned up when // Gitaly starts. stagingDirectory string - + // commandFactory is used to spawn git commands without a repository. + commandFactory git.CommandFactory + // repositoryFactory is used to build a localrepo.Repo for operations that need it. + repositoryFactory localrepo.StorageScopedFactory // repositoryExists marks whether the repository exists or not. The repository may not exist if it has // never been created, or if it has been deleted. repositoryExists bool @@ -424,6 +464,8 @@ type TransactionManager struct { repository repository // repositoryPath is the path to the repository this TransactionManager is acting on. repositoryPath string + // storagePath is the absolute path to this storage this TransactionManager is operating on. + storagePath string // relativePath is the repository's relative path inside the storage. relativePath string // db is the handle to the key-value store used for storing the write-ahead log related state. @@ -470,7 +512,7 @@ type repository interface { } // NewTransactionManager returns a new TransactionManager for the given repository. -func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir string, repositoryFactory localrepo.StorageScopedFactory, transactionFinalizer func()) *TransactionManager { +func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir string, repositoryFactory localrepo.StorageScopedFactory, cmdFactory git.CommandFactory, transactionFinalizer func()) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) return &TransactionManager{ ctx: ctx, @@ -479,6 +521,7 @@ func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir stop: cancel, repository: repositoryFactory.Build(relativePath), repositoryPath: filepath.Join(storagePath, relativePath), + storagePath: storagePath, relativePath: relativePath, db: newDatabaseAdapter(db), admissionQueue: make(chan *Transaction), @@ -486,6 +529,8 @@ func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir initialized: make(chan struct{}), applyNotifications: make(map[LogIndex]chan struct{}), stagingDirectory: stagingDir, + commandFactory: cmdFactory, + repositoryFactory: repositoryFactory, transactionFinalizer: transactionFinalizer, } } @@ -531,6 +576,25 @@ func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, trans // If the repository exists, we use it for staging the transaction. transaction.stagingRepository = mgr.repository + if transaction.repositoryCreation != nil { + // Git requires that certain commands like 'pack-objects' and 'rev-list' are ran in a repository. + // Providing just an object directory does not suffice. If the repository doesn't exist yet, we create + // a temporary repository that we use to stage the transaction. The staging repository is used to run the + // commands that require a repository. The reference updates in the transaction will also be verified + // against temporary staging repository. After the transaction is logged, the staging repository + // is removed, and the actual repository will be created when the log entry is applied. + if err := transaction.initStagingDirectory(); err != nil { + return fmt.Errorf("init staging directory: %w", err) + } + + stagingRepositoryRelativePath := filepath.Join(strings.TrimPrefix(transaction.stagingDirectory, mgr.storagePath+"/"), "repository") + if err := mgr.createRepository(ctx, filepath.Join(transaction.stagingDirectory, "repository"), transaction.repositoryCreation.ObjectFormat); err != nil { + return fmt.Errorf("create staging repository: %w", err) + } + + transaction.stagingRepository = mgr.repositoryFactory.Build(stagingRepositoryRelativePath) + } + // If the transaction has a quarantine directory, we must use it when staging the pack // file and verifying the references so the objects are available. if transaction.quarantineDirectory != "" { @@ -701,12 +765,22 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } transaction.result <- func() (commitErr error) { - if !mgr.repositoryExists { + logEntry := &gitalypb.LogEntry{} + + if transaction.repositoryCreation != nil { + if mgr.repositoryExists { + return ErrRepositoryAlreadyExists + } + + if err := os.MkdirAll(filepath.Join(mgr.repositoryPath, "wal", "packs"), fs.ModePerm); err != nil { + return fmt.Errorf("create repository directory: %w", err) + } + + logEntry.RepositoryCreation = transaction.repositoryCreation + } else if !mgr.repositoryExists { return ErrRepositoryNotFound } - logEntry := &gitalypb.LogEntry{} - var err error logEntry.ReferenceUpdates, err = mgr.verifyReferences(mgr.ctx, transaction) if err != nil { @@ -1156,6 +1230,10 @@ func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *g mgr.hookIndex = nextLogIndex } mgr.applyNotifications[nextLogIndex] = make(chan struct{}) + if logEntry.RepositoryCreation != nil { + mgr.repositoryExists = true + } + if logEntry.RepositoryDeletion != nil { mgr.repositoryExists = false mgr.hookIndex = 0 @@ -1180,6 +1258,10 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return fmt.Errorf("apply repository deletion: %w", err) } } else { + if err := mgr.applyRepositoryCreation(ctx, logEntry.RepositoryCreation); err != nil { + return fmt.Errorf("apply repository deletion: %w", err) + } + if logEntry.IncludesPack { if err := mgr.applyPackFile(ctx, logIndex); err != nil { return fmt.Errorf("apply pack file: %w", err) @@ -1229,6 +1311,55 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return nil } +// applyRepositoryCreation applies a repository creation by creating a repository. +func (mgr *TransactionManager) applyRepositoryCreation(ctx context.Context, entry *gitalypb.LogEntry_RepositoryCreation) error { + if entry == nil { + return nil + } + + if err := mgr.createRepository(ctx, mgr.repositoryPath, entry.ObjectFormat); err != nil { + return fmt.Errorf("create repository: %w", err) + } + + if err := mgr.createDirectories(); err != nil { + return fmt.Errorf("create directories: %w", err) + } + + // Sync the parent directory. We expect that git syncs its own writes. + if err := safe.NewSyncer().Sync(filepath.Dir(mgr.repositoryPath)); err != nil { + return fmt.Errorf("sync: %w", err) + } + + return nil +} + +func (mgr *TransactionManager) createRepository(ctx context.Context, repositoryPath string, objectFormat gitalypb.ObjectFormat) error { + objectHash, err := git.ObjectHashByProto(objectFormat) + if err != nil { + return fmt.Errorf("object hash by proto: %w", err) + } + + stderr := &bytes.Buffer{} + cmd, err := mgr.commandFactory.NewWithoutRepo(ctx, git.Command{ + Name: "init", + Flags: []git.Option{ + git.Flag{Name: "--bare"}, + git.Flag{Name: "--quiet"}, + git.Flag{Name: "--object-format=" + objectHash.Format}, + }, + Args: []string{repositoryPath}, + }, git.WithStderr(stderr)) + if err != nil { + return fmt.Errorf("spawn git init: %w", err) + } + + if err := cmd.Wait(); err != nil { + return structerr.New("wait git init: %w", err).WithMetadata("stderr", stderr.String()) + } + + return nil +} + // applyRepositoryDeletion deletes the repository. // // Given how the repositories are laid out in the storage, we currently can't support MVCC for them. diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index 4f5ad8ace2..43b38b1ee0 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -131,6 +131,7 @@ func TestTransactionManager(t *testing.T) { type testSetup struct { Config config.Cfg RepositoryFactory localrepo.StorageScopedFactory + CommandFactory git.CommandFactory ObjectHash git.ObjectHash NonExistentOID git.ObjectID Commits testCommits @@ -194,6 +195,7 @@ func TestTransactionManager(t *testing.T) { Config: cfg, ObjectHash: objectHash, RepositoryFactory: repositoryFactory, + CommandFactory: cmdFactory, NonExistentOID: nonExistentOID, Commits: testCommits{ First: testCommit{ @@ -266,6 +268,8 @@ func TestTransactionManager(t *testing.T) { // TransactionID is the identifier given to the transaction created. This is used to identify // the transaction in later steps. TransactionID int + + AllowNonExistent bool // Context is the context to use for the Begin call. Context context.Context // ExpectedSnapshot is the expected snapshot of the transaction. @@ -295,6 +299,8 @@ func TestTransactionManager(t *testing.T) { CustomHooksUpdate *CustomHooksUpdate // DeleteRepository deletes the repository on commit. DeleteRepository bool + // CreateRepository creates the repository on commit. + CreateRepository bool } // Rollback calls Rollback on a transaction. @@ -2611,6 +2617,209 @@ func TestTransactionManager(t *testing.T) { RepositoryDoesntExist: true, }, }, + { + desc: "create repository when it doesn't exist", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + AllowNonExistent: true, + }, + Commit{ + CreateRepository: true, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "create repository when it already exists", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + TransactionID: 1, + AllowNonExistent: true, + }, + Commit{ + TransactionID: 1, + CreateRepository: true, + }, + Begin{ + TransactionID: 2, + AllowNonExistent: true, + ExpectedError: ErrRepositoryAlreadyExists, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "create repository with interleaved creations", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + TransactionID: 1, + AllowNonExistent: true, + }, + Begin{ + TransactionID: 2, + AllowNonExistent: true, + }, + Commit{ + TransactionID: 1, + CreateRepository: true, + }, + Commit{ + TransactionID: 2, + CreateRepository: true, + ExpectedError: ErrRepositoryAlreadyExists, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "create repository again after deletion", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + TransactionID: 1, + AllowNonExistent: true, + }, + Commit{ + TransactionID: 1, + CreateRepository: true, + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, + }, + Begin{ + TransactionID: 3, + ExpectedSnapshot: Snapshot{ + ReadIndex: 2, + HookIndex: 2, + }, + }, + Commit{ + TransactionID: 3, + DeleteRepository: true, + }, + Begin{ + TransactionID: 4, + AllowNonExistent: true, + ExpectedSnapshot: Snapshot{ + ReadIndex: 3, + }, + }, + Commit{ + TransactionID: 4, + CreateRepository: true, + }, + Begin{ + TransactionID: 5, + ExpectedSnapshot: Snapshot{ + ReadIndex: 4, + }, + }, + Rollback{ + TransactionID: 5, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(4).toProto(), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "create repository with full state", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + AllowNonExistent: true, + }, + Commit{ + CreateRepository: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + "refs/heads/branch": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack, setup.Commits.Second.Pack}, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/branch", + }, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, + }, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/branch", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + {Name: "refs/heads/branch", Target: setup.Commits.Second.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + setup.Commits.Second.OID, + }, + ), + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks/1/pre-receive": { + Mode: umask.Mask(fs.ModePerm), + Content: []byte("hook content"), + }, + "/wal/hooks/1/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/wal/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, + }, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + setup.Commits.Second.OID, + }, + }, + }, } type invalidReferenceTestCase struct { @@ -2745,14 +2954,15 @@ func TestTransactionManager(t *testing.T) { require.NoError(t, err) defer testhelper.MustClose(t, database) - stagingDir := t.TempDir() + stagingDir := filepath.Join(setup.Config.Storages[0].Path, "staging") + require.NoError(t, os.MkdirAll(stagingDir, perm.PrivateDir)) storagePath := setup.Config.Storages[0].Path var ( // managerRunning tracks whether the manager is running or stopped. managerRunning bool // transactionManager is the current TransactionManager instance. - transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, setup.CommandFactory, noopTransactionFinalizer) // managerErr is used for synchronizing manager stopping and returning // the error from Run. managerErr chan error @@ -2793,7 +3003,7 @@ func TestTransactionManager(t *testing.T) { managerRunning = true managerErr = make(chan error) - transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, setup.CommandFactory, noopTransactionFinalizer) installHooks(t, transactionManager, database, hooks{ beforeReadLogEntry: step.Hooks.BeforeApplyLogEntry, beforeResolveRevision: step.Hooks.BeforeAppendLogEntry, @@ -2835,7 +3045,15 @@ func TestTransactionManager(t *testing.T) { beginCtx = step.Context } - transaction, err := transactionManager.Begin(beginCtx) + var transaction *Transaction + var err error + + if step.AllowNonExistent { + transaction, err = transactionManager.BeginCreation(beginCtx) + } else { + transaction, err = transactionManager.Begin(beginCtx) + } + require.Equal(t, step.ExpectedError, err) if err == nil { require.Equal(t, step.ExpectedSnapshot, transaction.Snapshot()) @@ -2887,6 +3105,10 @@ func TestTransactionManager(t *testing.T) { transaction.DeleteRepository() } + if step.CreateRepository { + transaction.CreateRepository(setup.ObjectHash) + } + commitCtx := ctx if step.Context != nil { commitCtx = step.Context @@ -3101,7 +3323,7 @@ func BenchmarkTransactionManager(b *testing.B) { commit1 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents()) commit2 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents(commit1)) - manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), repositoryFactory, noopTransactionFinalizer) + manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), repositoryFactory, cmdFactory, noopTransactionFinalizer) managers = append(managers, manager) managerWG.Add(1) diff --git a/proto/go/gitalypb/log.pb.go b/proto/go/gitalypb/log.pb.go index 3ed22d2827..8d2f8db00f 100644 --- a/proto/go/gitalypb/log.pb.go +++ b/proto/go/gitalypb/log.pb.go @@ -43,6 +43,8 @@ type LogEntry struct { IncludesPack bool `protobuf:"varint,4,opt,name=includes_pack,json=includesPack,proto3" json:"includes_pack,omitempty"` // RepositoryDeletion, when set, indicates this log entry deletes the repository. RepositoryDeletion *LogEntry_RepositoryDeletion `protobuf:"bytes,5,opt,name=repository_deletion,json=repositoryDeletion,proto3" json:"repository_deletion,omitempty"` + // repository_creation is set if this log entry creates a repository. + RepositoryCreation *LogEntry_RepositoryCreation `protobuf:"bytes,6,opt,name=repository_creation,json=repositoryCreation,proto3" json:"repository_creation,omitempty"` } func (x *LogEntry) Reset() { @@ -112,6 +114,13 @@ func (x *LogEntry) GetRepositoryDeletion() *LogEntry_RepositoryDeletion { return nil } +func (x *LogEntry) GetRepositoryCreation() *LogEntry_RepositoryCreation { + if x != nil { + return x.RepositoryCreation + } + return nil +} + // LogIndex serializes a log index. It's used for storing a repository's // applied log index in the database. // @@ -366,56 +375,116 @@ func (*LogEntry_RepositoryDeletion) Descriptor() ([]byte, []int) { return file_log_proto_rawDescGZIP(), []int{0, 3} } +// RepositoryCreation models a log entry that creates a repository. +type LogEntry_RepositoryCreation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // object_format defines the object format to use for the repository. + ObjectFormat ObjectFormat `protobuf:"varint,1,opt,name=object_format,json=objectFormat,proto3,enum=gitaly.ObjectFormat" json:"object_format,omitempty"` +} + +func (x *LogEntry_RepositoryCreation) Reset() { + *x = LogEntry_RepositoryCreation{} + if protoimpl.UnsafeEnabled { + mi := &file_log_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogEntry_RepositoryCreation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEntry_RepositoryCreation) ProtoMessage() {} + +func (x *LogEntry_RepositoryCreation) ProtoReflect() protoreflect.Message { + mi := &file_log_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEntry_RepositoryCreation.ProtoReflect.Descriptor instead. +func (*LogEntry_RepositoryCreation) Descriptor() ([]byte, []int) { + return file_log_proto_rawDescGZIP(), []int{0, 4} +} + +func (x *LogEntry_RepositoryCreation) GetObjectFormat() ObjectFormat { + if x != nil { + return x.ObjectFormat + } + return ObjectFormat_OBJECT_FORMAT_UNSPECIFIED +} + var File_log_proto protoreflect.FileDescriptor var file_log_proto_rawDesc = []byte{ 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x22, 0xe8, 0x04, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x12, 0x4d, 0x0a, 0x11, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, - 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x10, 0x72, - 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, - 0x58, 0x0a, 0x15, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x5f, 0x62, 0x72, 0x61, 0x6e, 0x63, - 0x68, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, - 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x2e, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x52, 0x13, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, - 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x52, 0x0a, 0x13, 0x63, 0x75, 0x73, - 0x74, 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, - 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x11, 0x63, 0x75, 0x73, 0x74, - 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a, - 0x0d, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x50, 0x61, - 0x63, 0x6b, 0x12, 0x54, 0x0a, 0x13, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x5f, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, - 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, - 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x51, 0x0a, 0x0f, 0x52, 0x65, 0x66, 0x65, - 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, - 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, - 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, 0x4f, 0x69, 0x64, 0x1a, 0x3c, 0x0a, 0x13, 0x44, + 0x61, 0x6c, 0x79, 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0x8f, 0x06, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x4d, + 0x0a, 0x11, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, + 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x10, 0x72, 0x65, 0x66, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, 0x58, 0x0a, + 0x15, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x5f, 0x62, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x5f, + 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, - 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, - 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x1a, 0x3d, 0x0a, 0x11, 0x43, 0x75, 0x73, - 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, - 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x74, - 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, - 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, 0x61, 0x72, 0x1a, 0x14, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x27, - 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, 0x0a, 0x09, 0x6c, 0x6f, - 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x6c, - 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, - 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x65, 0x52, 0x13, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, + 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x52, 0x0a, 0x13, 0x63, 0x75, 0x73, 0x74, 0x6f, + 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, + 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, + 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x11, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, + 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x69, + 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x0c, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x50, 0x61, 0x63, 0x6b, + 0x12, 0x54, 0x0a, 0x13, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x64, + 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, + 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, + 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x54, 0x0a, 0x13, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x51, 0x0a, 0x0f, + 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, + 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, + 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, 0x4f, 0x69, 0x64, 0x1a, + 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, + 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, + 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x1a, 0x3d, 0x0a, + 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, + 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, + 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, 0x61, 0x72, 0x1a, 0x14, 0x0a, 0x12, + 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, + 0x6f, 0x6e, 0x1a, 0x4f, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x0d, 0x6f, 0x62, 0x6a, 0x65, + 0x63, 0x74, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x14, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x46, + 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x52, 0x0c, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x46, 0x6f, 0x72, + 0x6d, 0x61, 0x74, 0x22, 0x27, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, + 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, + 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, + 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -430,7 +499,7 @@ func file_log_proto_rawDescGZIP() []byte { return file_log_proto_rawDescData } -var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_log_proto_goTypes = []interface{}{ (*LogEntry)(nil), // 0: gitaly.LogEntry (*LogIndex)(nil), // 1: gitaly.LogIndex @@ -438,17 +507,21 @@ var file_log_proto_goTypes = []interface{}{ (*LogEntry_DefaultBranchUpdate)(nil), // 3: gitaly.LogEntry.DefaultBranchUpdate (*LogEntry_CustomHooksUpdate)(nil), // 4: gitaly.LogEntry.CustomHooksUpdate (*LogEntry_RepositoryDeletion)(nil), // 5: gitaly.LogEntry.RepositoryDeletion + (*LogEntry_RepositoryCreation)(nil), // 6: gitaly.LogEntry.RepositoryCreation + (ObjectFormat)(0), // 7: gitaly.ObjectFormat } var file_log_proto_depIdxs = []int32{ 2, // 0: gitaly.LogEntry.reference_updates:type_name -> gitaly.LogEntry.ReferenceUpdate 3, // 1: gitaly.LogEntry.default_branch_update:type_name -> gitaly.LogEntry.DefaultBranchUpdate 4, // 2: gitaly.LogEntry.custom_hooks_update:type_name -> gitaly.LogEntry.CustomHooksUpdate 5, // 3: gitaly.LogEntry.repository_deletion:type_name -> gitaly.LogEntry.RepositoryDeletion - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 6, // 4: gitaly.LogEntry.repository_creation:type_name -> gitaly.LogEntry.RepositoryCreation + 7, // 5: gitaly.LogEntry.RepositoryCreation.object_format:type_name -> gitaly.ObjectFormat + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_log_proto_init() } @@ -456,6 +529,7 @@ func file_log_proto_init() { if File_log_proto != nil { return } + file_shared_proto_init() if !protoimpl.UnsafeEnabled { file_log_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*LogEntry); i { @@ -529,6 +603,18 @@ func file_log_proto_init() { return nil } } + file_log_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LogEntry_RepositoryCreation); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -536,7 +622,7 @@ func file_log_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_log_proto_rawDesc, NumEnums: 0, - NumMessages: 6, + NumMessages: 7, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/log.proto b/proto/log.proto index 15b031c549..564ddd1a1e 100644 --- a/proto/log.proto +++ b/proto/log.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package gitaly; +import "shared.proto"; + option go_package = "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"; // LogEntry is a single entry in a repository's write-ahead log. @@ -39,6 +41,12 @@ message LogEntry { message RepositoryDeletion { } + // RepositoryCreation models a log entry that creates a repository. + message RepositoryCreation { + // object_format defines the object format to use for the repository. + ObjectFormat object_format = 1; + } + // reference_updates contains the reference updates this log // entry records. The logged reference updates have already passed // through verification and are applied without any further checks. @@ -53,6 +61,8 @@ message LogEntry { bool includes_pack = 4; // RepositoryDeletion, when set, indicates this log entry deletes the repository. RepositoryDeletion repository_deletion = 5; + // repository_creation is set if this log entry creates a repository. + RepositoryCreation repository_creation = 6; } // LogIndex serializes a log index. It's used for storing a repository's -- GitLab