diff --git a/internal/gitaly/partition_manager.go b/internal/gitaly/partition_manager.go index 408d5f4a32467c0eee70b6127313b5034f1c04f0..83f9c9ec4bd9ee5b9d2c5003c0b707ba7ce10c62 100644 --- a/internal/gitaly/partition_manager.go +++ b/internal/gitaly/partition_manager.go @@ -9,6 +9,7 @@ import ( "github.com/dgraph-io/badger/v3" "github.com/sirupsen/logrus" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" repo "gitlab.com/gitlab-org/gitaly/v16/internal/git/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" @@ -31,6 +32,8 @@ type PartitionManager struct { partitions map[string]*partition // localRepoFactory is used by PartitionManager to construct `localrepo.Repo`. localRepoFactory localrepo.Factory + // commandFactory is passed as a dependency to the constructed TransactionManagers. + commandFactory git.CommandFactory // logger handles all logging for PartitionManager. logger logrus.FieldLogger // stopped tracks whether the PartitionManager has been stopped. If the manager is stopped, @@ -61,7 +64,7 @@ type partition struct { } // NewPartitionManager returns a new PartitionManager. -func NewPartitionManager(db *badger.DB, storages []config.Storage, localRepoFactory localrepo.Factory, logger logrus.FieldLogger, stagingDir string) *PartitionManager { +func NewPartitionManager(db *badger.DB, storages []config.Storage, localRepoFactory localrepo.Factory, cmdFactory git.CommandFactory, logger logrus.FieldLogger, stagingDir string) *PartitionManager { storagesMap := make(map[string]string, len(storages)) for _, storage := range storages { storagesMap[storage.Name] = storage.Path @@ -71,6 +74,7 @@ func NewPartitionManager(db *badger.DB, storages []config.Storage, localRepoFact db: db, partitions: make(map[string]*partition), localRepoFactory: localRepoFactory, + commandFactory: cmdFactory, logger: logger, stagingDirectory: stagingDir, storages: storagesMap, @@ -123,7 +127,7 @@ func (pm *PartitionManager) Begin(ctx context.Context, repo repo.GitRepo) (*Tran return nil, fmt.Errorf("scope by storage: %w", err) } - mgr := NewTransactionManager(pm.db, storagePath, relativePath, stagingDir, storageScopedFactory, pm.transactionFinalizerFactory(ptn)) + mgr := NewTransactionManager(pm.db, storagePath, relativePath, stagingDir, storageScopedFactory, pm.commandFactory, pm.transactionFinalizerFactory(ptn)) ptn.transactionManager = mgr pm.partitions[partitionKey] = ptn diff --git a/internal/gitaly/partition_manager_test.go b/internal/gitaly/partition_manager_test.go index aeb26f7e30ede9cda81e8673d98d1550c6756304..39e3e14f35e0feccf6eb2cb447aec8ff23d5195f 100644 --- a/internal/gitaly/partition_manager_test.go +++ b/internal/gitaly/partition_manager_test.go @@ -433,7 +433,7 @@ func TestPartitionManager(t *testing.T) { stagingDir := filepath.Join(t.TempDir(), "staging") require.NoError(t, os.Mkdir(stagingDir, perm.PrivateDir)) - partitionManager := NewPartitionManager(database, cfg.Storages, localRepoFactory, logrus.StandardLogger(), stagingDir) + partitionManager := NewPartitionManager(database, cfg.Storages, localRepoFactory, cmdFactory, logrus.StandardLogger(), stagingDir) defer func() { partitionManager.Stop() // Assert all staging directories have been removed. diff --git a/internal/gitaly/transaction_manager.go b/internal/gitaly/transaction_manager.go index 6a7f4f143637ae153c13b4c68b2e54bbbeac3a68..477296a0ed3cbb9a837cac868ad8c2f877a86ded 100644 --- a/internal/gitaly/transaction_manager.go +++ b/internal/gitaly/transaction_manager.go @@ -2,6 +2,7 @@ package gitaly import ( "bytes" + "container/list" "context" "encoding/binary" "errors" @@ -23,13 +24,24 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/perm" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" ) -// ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions. -var ErrTransactionProcessingStopped = errors.New("transaction processing stopped") +var ( + // ErrRepositoryNotFound is returned when the repository doesn't exist. + ErrRepositoryNotFound = structerr.NewNotFound("repository not found") + // ErrRepositoryAlreadyExists is returned when the repository already exists. + ErrRepositoryAlreadyExists = structerr.NewAlreadyExists("repository already exists") + // ErrTransactionProcessingStopped is returned when the TransactionManager stops processing transactions. + ErrTransactionProcessingStopped = errors.New("transaction processing stopped") + // errInitializationFailed is returned when the TransactionManager failed to initialize successfully. + errInitializationFailed = errors.New("initializing transaction processing failed") + // errNotDirectory is returned when the repository's path doesn't point to a directory + errNotDirectory = errors.New("repository's path didn't point to a directory") +) // InvalidReferenceFormatError is returned when a reference name was invalid. type InvalidReferenceFormatError struct { @@ -133,6 +145,13 @@ type Transaction struct { // from the client goroutine to the TransactionManager.Run() goroutine, and the client goroutine must // not do any modifications to the state of the transcation anymore to avoid races. admitted bool + // finish cleans up the transaction releasing the resources associated with it. It must be called + // once the transaction is done with. + finish func() error + // finished is closed when the transaction has been finished. This enables waiting on transactions + // to finish where needed. + finished chan struct{} + // initStagingDirectory is called to lazily initialize the staging directory when it is // needed. initStagingDirectory func() error @@ -147,6 +166,9 @@ type Transaction struct { includesPack bool // stagingRepository is a repository that is used to stage the transaction. If there are quarantined // objects, it has the quarantine applied so the objects are available for verification and packing. + // Generally the staging repository is the actual repository instance. If the repository doesn't exist + // yet, the staging repository is a temporary repository that is deleted once the transaction has been + // finished. stagingRepository repository // Snapshot contains the details of the Transaction's read snapshot. @@ -156,6 +178,8 @@ type Transaction struct { referenceUpdates ReferenceUpdates defaultBranchUpdate *DefaultBranchUpdate customHooksUpdate *CustomHooksUpdate + repositoryCreation *gitalypb.LogEntry_RepositoryCreation + deleteRepository bool } // Begin opens a new transaction. The caller must call either Commit or Rollback to release @@ -164,15 +188,33 @@ type Transaction struct { // The returned Transaction's read snapshot includes all writes that were committed prior to the // Begin call. Begin blocks until the committed writes have been applied to the repository. func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) { + return mgr.begin(ctx, true) +} + +// BeginCreation starts a new transaction. It allowas the transaction to start even if the repository +// doesn't exist. See the documentation for Begin for details on the use. +func (mgr *TransactionManager) BeginCreation(ctx context.Context) (*Transaction, error) { + return mgr.begin(ctx, false) +} + +func (mgr *TransactionManager) begin(ctx context.Context, repositoryShouldExist bool) (_ *Transaction, returnedErr error) { // Wait until the manager has been initialized so the notification channels // and the log indexes are loaded. select { case <-ctx.Done(): return nil, ctx.Err() case <-mgr.initialized: + if !mgr.initializationSuccessful { + return nil, errInitializationFailed + } + } + + mgr.mutex.Lock() + if err := mgr.verifyRepositoryExistence(repositoryShouldExist); err != nil { + mgr.mutex.Unlock() + return nil, err } - mgr.mutex.RLock() txn := &Transaction{ commit: mgr.commit, finalize: mgr.transactionFinalizer, @@ -180,10 +222,13 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) ReadIndex: mgr.appendedLogIndex, HookIndex: mgr.hookIndex, }, + finished: make(chan struct{}), } + openTransactionElement := mgr.openTransactions.PushBack(txn) + readReady := mgr.applyNotifications[txn.snapshot.ReadIndex] - mgr.mutex.RUnlock() + mgr.mutex.Unlock() if readReady == nil { // The snapshot log entry is already applied if there is no notification channel for it. // If so, the transaction is ready to begin immediately. @@ -192,6 +237,10 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) } txn.initStagingDirectory = func() error { + if txn.stagingDirectory != "" { + return nil + } + stagingDirectory, err := os.MkdirTemp(mgr.stagingDirectory, "") if err != nil { return fmt.Errorf("mkdir temp: %w", err) @@ -201,6 +250,30 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) return nil } + txn.finish = func() error { + defer close(txn.finished) + + mgr.mutex.Lock() + mgr.openTransactions.Remove(openTransactionElement) + mgr.mutex.Unlock() + + if txn.stagingDirectory != "" { + if err := os.RemoveAll(txn.stagingDirectory); err != nil { + return fmt.Errorf("remove staging directory: %w", err) + } + } + + return nil + } + + defer func() { + if returnedErr != nil { + // finish can't return an error as the staging directory isn't yet created, and won't + // be attempted to be deleted. + _ = txn.finish() + } + }() + select { case <-ctx.Done(): return nil, ctx.Err() @@ -211,13 +284,23 @@ func (mgr *TransactionManager) Begin(ctx context.Context) (*Transaction, error) } } +func (mgr *TransactionManager) verifyRepositoryExistence(shouldExist bool) error { + if shouldExist && !mgr.repositoryExists { + return ErrRepositoryNotFound + } else if !shouldExist && mgr.repositoryExists { + return ErrRepositoryAlreadyExists + } + + return nil +} + // Commit performs the changes. If no error is returned, the transaction was successful and the changes // have been performed. If an error was returned, the transaction may or may not be persisted. func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) { defer func() { txn.finalize() - if err := txn.cleanUnadmitted(); err != nil && returnedErr == nil { + if err := txn.finishUnadmitted(); err != nil && returnedErr == nil { returnedErr = err } }() @@ -228,29 +311,18 @@ func (txn *Transaction) Commit(ctx context.Context) (returnedErr error) { // Rollback releases resources associated with the transaction without performing any changes. func (txn *Transaction) Rollback() error { defer txn.finalize() - return txn.cleanUnadmitted() + return txn.finishUnadmitted() } -// cleanUnadmitted cleans up after the transaction if it wasn't yet admitted. If the transaction was admitted, +// finishUnadmitted cleans up after the transaction if it wasn't yet admitted. If the transaction was admitted, // the Transaction is being processed by TransactionManager. The clean up responsibility moves there as well // to avoid races. -func (txn *Transaction) cleanUnadmitted() error { +func (txn *Transaction) finishUnadmitted() error { if txn.admitted { return nil } - return txn.clean() -} - -// clean cleans up the resources associated with the transaction. -func (txn *Transaction) clean() error { - if txn.stagingDirectory != "" { - if err := os.RemoveAll(txn.stagingDirectory); err != nil { - return fmt.Errorf("remove staging directory: %w", err) - } - } - - return nil + return txn.finish() } // Snapshot returns the details of the Transaction's read snapshot. @@ -275,6 +347,18 @@ func (txn *Transaction) UpdateReferences(updates ReferenceUpdates) { txn.referenceUpdates = updates } +// CreateRepository creates the repository with the specified object format when the transaction commits. +func (txn *Transaction) CreateRepository(objectFormat git.ObjectHash) { + txn.repositoryCreation = &gitalypb.LogEntry_RepositoryCreation{ + ObjectFormat: objectFormat.ProtoFormat, + } +} + +// DeleteRepository deletes the repository when the transaction is committed. +func (txn *Transaction) DeleteRepository() { + txn.deleteRepository = true +} + // QuarantineDirectory returns an absolute path to the transaction's quarantine directory. The quarantine directory // is a Git object directory where the new objects introduced in the transaction must be written. The quarantined // objects needed by the updated reference tips will be included in the transaction. @@ -369,11 +453,19 @@ type TransactionManager struct { // left around after crashes. The files are temporary and any leftover files are expected to be cleaned up when // Gitaly starts. stagingDirectory string - + // commandFactory is used to spawn git commands without a repository. + commandFactory git.CommandFactory + // repositoryFactory is used to build a localrepo.Repo for operations that need it. + repositoryFactory localrepo.StorageScopedFactory + // repositoryExists marks whether the repository exists or not. The repository may not exist if it has + // never been created, or if it has been deleted. + repositoryExists bool // repository is the repository this TransactionManager is acting on. repository repository // repositoryPath is the path to the repository this TransactionManager is acting on. repositoryPath string + // storagePath is the absolute path to this storage this TransactionManager is operating on. + storagePath string // relativePath is the repository's relative path inside the storage. relativePath string // db is the handle to the key-value store used for storing the write-ahead log related state. @@ -381,13 +473,19 @@ type TransactionManager struct { // admissionQueue is where the incoming writes are waiting to be admitted to the transaction // manager. admissionQueue chan *Transaction + // openTransactions contains all transactions that have been begun but not yet committed or rolled back. + // The transactions are ordered from the oldest to the newest. + openTransactions *list.List // initialized is closed when the manager has been initialized. It's used to block new transactions // from beginning prior to the manager having initialized its runtime state on start up. initialized chan struct{} + // initializationSuccessful is set if the TransactionManager initialized successfully. If it didn't, + // transactions will fail to begin. + initializationSuccessful bool // mutex guards access to applyNotifications and appendedLogIndex. These fields are accessed by both // Run and Begin which are ran in different goroutines. - mutex sync.RWMutex + mutex sync.Mutex // applyNotifications stores channels that are closed when a log entry is applied. These // are used to block transactions from beginning before their snapshot is ready. applyNotifications map[LogIndex]chan struct{} @@ -414,7 +512,7 @@ type repository interface { } // NewTransactionManager returns a new TransactionManager for the given repository. -func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir string, repositoryFactory localrepo.StorageScopedFactory, transactionFinalizer func()) *TransactionManager { +func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir string, repositoryFactory localrepo.StorageScopedFactory, cmdFactory git.CommandFactory, transactionFinalizer func()) *TransactionManager { ctx, cancel := context.WithCancel(context.Background()) return &TransactionManager{ ctx: ctx, @@ -423,12 +521,16 @@ func NewTransactionManager(db *badger.DB, storagePath, relativePath, stagingDir stop: cancel, repository: repositoryFactory.Build(relativePath), repositoryPath: filepath.Join(storagePath, relativePath), + storagePath: storagePath, relativePath: relativePath, db: newDatabaseAdapter(db), admissionQueue: make(chan *Transaction), + openTransactions: list.New(), initialized: make(chan struct{}), applyNotifications: make(map[LogIndex]chan struct{}), stagingDirectory: stagingDir, + commandFactory: cmdFactory, + repositoryFactory: repositoryFactory, transactionFinalizer: transactionFinalizer, } } @@ -474,6 +576,25 @@ func (mgr *TransactionManager) setupStagingRepository(ctx context.Context, trans // If the repository exists, we use it for staging the transaction. transaction.stagingRepository = mgr.repository + if transaction.repositoryCreation != nil { + // Git requires that certain commands like 'pack-objects' and 'rev-list' are ran in a repository. + // Providing just an object directory does not suffice. If the repository doesn't exist yet, we create + // a temporary repository that we use to stage the transaction. The staging repository is used to run the + // commands that require a repository. The reference updates in the transaction will also be verified + // against temporary staging repository. After the transaction is logged, the staging repository + // is removed, and the actual repository will be created when the log entry is applied. + if err := transaction.initStagingDirectory(); err != nil { + return fmt.Errorf("init staging directory: %w", err) + } + + stagingRepositoryRelativePath := filepath.Join(strings.TrimPrefix(transaction.stagingDirectory, mgr.storagePath+"/"), "repository") + if err := mgr.createRepository(ctx, filepath.Join(transaction.stagingDirectory, "repository"), transaction.repositoryCreation.ObjectFormat); err != nil { + return fmt.Errorf("create staging repository: %w", err) + } + + transaction.stagingRepository = mgr.repositoryFactory.Build(stagingRepositoryRelativePath) + } + // If the transaction has a quarantine directory, we must use it when staging the pack // file and verifying the references so the objects are available. if transaction.quarantineDirectory != "" { @@ -630,10 +751,10 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { var transaction *Transaction select { case transaction = <-mgr.admissionQueue: - // The Transaction does not clean up itself anymore once it has been admitted for + // The Transaction does not finish itself anymore once it has been admitted for // processing. This avoids the Transaction concurrently removing the staged state - // while the manager is still operating on it. We thus need to defer its clean up. - cleanUps = append(cleanUps, transaction.clean) + // while the manager is still operating on it. We thus need to defer its finishing. + cleanUps = append(cleanUps, transaction.finish) case <-mgr.ctx.Done(): } @@ -646,6 +767,20 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { transaction.result <- func() (commitErr error) { logEntry := &gitalypb.LogEntry{} + if transaction.repositoryCreation != nil { + if mgr.repositoryExists { + return ErrRepositoryAlreadyExists + } + + if err := os.MkdirAll(filepath.Join(mgr.repositoryPath, "wal", "packs"), fs.ModePerm); err != nil { + return fmt.Errorf("create repository directory: %w", err) + } + + logEntry.RepositoryCreation = transaction.repositoryCreation + } else if !mgr.repositoryExists { + return ErrRepositoryNotFound + } + var err error logEntry.ReferenceUpdates, err = mgr.verifyReferences(mgr.ctx, transaction) if err != nil { @@ -667,8 +802,8 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { CustomHooksTar: transaction.customHooksUpdate.CustomHooksTAR, } } - nextLogIndex := mgr.appendedLogIndex + 1 + nextLogIndex := mgr.appendedLogIndex + 1 if transaction.includesPack { logEntry.IncludesPack = true @@ -691,6 +826,10 @@ func (mgr *TransactionManager) processTransaction() (returnedErr error) { } } + if transaction.deleteRepository { + logEntry.RepositoryDeletion = &gitalypb.LogEntry_RepositoryDeletion{} + } + return mgr.appendLogEntry(nextLogIndex, logEntry) }() @@ -705,10 +844,6 @@ func (mgr *TransactionManager) Stop() { mgr.stop() } func (mgr *TransactionManager) initialize(ctx context.Context) error { defer close(mgr.initialized) - if err := mgr.createDirectories(); err != nil { - return fmt.Errorf("create directories: %w", err) - } - var appliedLogIndex gitalypb.LogIndex if err := mgr.readKey(keyAppliedLogIndex(mgr.relativePath), &appliedLogIndex); err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return fmt.Errorf("read applied log index: %w", err) @@ -742,6 +877,10 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return fmt.Errorf("determine appended log index: %w", err) } + if err := mgr.determineRepositoryExistence(); err != nil { + return fmt.Errorf("determine repository existence: %w", err) + } + var err error mgr.hookIndex, err = mgr.determineHookIndex(ctx, mgr.appendedLogIndex, mgr.appliedLogIndex) if err != nil { @@ -758,6 +897,48 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { return fmt.Errorf("remove stale packs: %w", err) } + mgr.initializationSuccessful = true + + return nil +} + +// determineRepositoryExistence determines whether the repository exists or not by looking +// at whether the directory exists and whether there is a deletion request logged. +func (mgr *TransactionManager) determineRepositoryExistence() error { + stat, err := os.Stat(mgr.repositoryPath) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("stat repository directory: %w", err) + } + } + + if stat != nil { + if !stat.IsDir() { + return errNotDirectory + } + + if err := mgr.createDirectories(); err != nil { + return fmt.Errorf("create directories: %w", err) + } + + mgr.repositoryExists = true + } + + // Check whether the last log entry is a repository deletion. If so, + // the repository has been deleted but the deletion wasn't yet applied. + // The deletion is the last entry always as no further writes are + // accepted if the repository doesn't exist. + if mgr.appliedLogIndex < mgr.appendedLogIndex { + logEntry, err := mgr.readLogEntry(mgr.appendedLogIndex) + if err != nil { + return fmt.Errorf("read log entry: %w", err) + } + + if logEntry.RepositoryDeletion != nil { + mgr.repositoryExists = false + } + } + return nil } @@ -770,6 +951,11 @@ func (mgr *TransactionManager) initialize(ctx context.Context) error { // to see which are the latest. // 3. If we found no hooks in the log nor in the repository, there are no hooks configured. func (mgr *TransactionManager) determineHookIndex(ctx context.Context, appendedIndex, appliedIndex LogIndex) (LogIndex, error) { + if !mgr.repositoryExists { + // If the repository doesn't exist, then there are no hooks either. + return 0, nil + } + for i := appendedIndex; appliedIndex < i; i-- { logEntry, err := mgr.readLogEntry(i) if err != nil { @@ -1044,6 +1230,14 @@ func (mgr *TransactionManager) appendLogEntry(nextLogIndex LogIndex, logEntry *g mgr.hookIndex = nextLogIndex } mgr.applyNotifications[nextLogIndex] = make(chan struct{}) + if logEntry.RepositoryCreation != nil { + mgr.repositoryExists = true + } + + if logEntry.RepositoryDeletion != nil { + mgr.repositoryExists = false + mgr.hookIndex = 0 + } mgr.mutex.Unlock() return nil @@ -1056,27 +1250,40 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return fmt.Errorf("read log entry: %w", err) } - if logEntry.IncludesPack { - if err := mgr.applyPackFile(ctx, logIndex); err != nil { - return fmt.Errorf("apply pack file: %w", err) + if logEntry.RepositoryDeletion != nil { + // If the repository is being deleted, just delete it without any other changes given + // they'd all be removed anyway. Reapplying the other changes after a crash would also + // not work if the repository was successfully deleted before the crash. + if err := mgr.applyRepositoryDeletion(ctx, logIndex); err != nil { + return fmt.Errorf("apply repository deletion: %w", err) + } + } else { + if err := mgr.applyRepositoryCreation(ctx, logEntry.RepositoryCreation); err != nil { + return fmt.Errorf("apply repository deletion: %w", err) } - } - updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, mgr.repository) - if err != nil { - return fmt.Errorf("perpare reference transaction: %w", err) - } + if logEntry.IncludesPack { + if err := mgr.applyPackFile(ctx, logIndex); err != nil { + return fmt.Errorf("apply pack file: %w", err) + } + } - if err := updater.Commit(); err != nil { - return fmt.Errorf("commit transaction: %w", err) - } + updater, err := mgr.prepareReferenceTransaction(ctx, logEntry.ReferenceUpdates, mgr.repository) + if err != nil { + return fmt.Errorf("prepare reference transaction: %w", err) + } - if err := mgr.updateDefaultBranch(ctx, logEntry.DefaultBranchUpdate); err != nil { - return fmt.Errorf("writing default branch: %w", err) - } + if err := updater.Commit(); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + if err := mgr.updateDefaultBranch(ctx, logEntry.DefaultBranchUpdate); err != nil { + return fmt.Errorf("writing default branch: %w", err) + } - if err := mgr.applyCustomHooks(ctx, logIndex, logEntry.CustomHooksUpdate); err != nil { - return fmt.Errorf("apply custom hooks: %w", err) + if err := mgr.applyCustomHooks(ctx, logIndex, logEntry.CustomHooksUpdate); err != nil { + return fmt.Errorf("apply custom hooks: %w", err) + } } if err := mgr.storeAppliedLogIndex(logIndex); err != nil { @@ -1104,6 +1311,122 @@ func (mgr *TransactionManager) applyLogEntry(ctx context.Context, logIndex LogIn return nil } +// applyRepositoryCreation applies a repository creation by creating a repository. +func (mgr *TransactionManager) applyRepositoryCreation(ctx context.Context, entry *gitalypb.LogEntry_RepositoryCreation) error { + if entry == nil { + return nil + } + + if err := mgr.createRepository(ctx, mgr.repositoryPath, entry.ObjectFormat); err != nil { + return fmt.Errorf("create repository: %w", err) + } + + if err := mgr.createDirectories(); err != nil { + return fmt.Errorf("create directories: %w", err) + } + + // Sync the parent directory. We expect that git syncs its own writes. + if err := safe.NewSyncer().Sync(filepath.Dir(mgr.repositoryPath)); err != nil { + return fmt.Errorf("sync: %w", err) + } + + return nil +} + +func (mgr *TransactionManager) createRepository(ctx context.Context, repositoryPath string, objectFormat gitalypb.ObjectFormat) error { + objectHash, err := git.ObjectHashByProto(objectFormat) + if err != nil { + return fmt.Errorf("object hash by proto: %w", err) + } + + stderr := &bytes.Buffer{} + cmd, err := mgr.commandFactory.NewWithoutRepo(ctx, git.Command{ + Name: "init", + Flags: []git.Option{ + git.Flag{Name: "--bare"}, + git.Flag{Name: "--quiet"}, + git.Flag{Name: "--object-format=" + objectHash.Format}, + }, + Args: []string{repositoryPath}, + }, git.WithStderr(stderr)) + if err != nil { + return fmt.Errorf("spawn git init: %w", err) + } + + if err := cmd.Wait(); err != nil { + return structerr.New("wait git init: %w", err).WithMetadata("stderr", stderr.String()) + } + + return nil +} + +// applyRepositoryDeletion deletes the repository. +// +// Given how the repositories are laid out in the storage, we currently can't support MVCC for them. +// This is because there is only ever a single instance of a given repository. We have to wait for all +// of the readers to finish before we can delete the repository as otherwise the readers could fail in +// unexpected ways and it would be an isolation violation. Repository deletions thus block before all +// transaction with an older read snapshot are done with the repository. +func (mgr *TransactionManager) applyRepositoryDeletion(ctx context.Context, index LogIndex) error { + for { + mgr.mutex.Lock() + oldestElement := mgr.openTransactions.Front() + mgr.mutex.Unlock() + if oldestElement == nil { + // If there are no open transactions, the deletion can proceed as there are + // no readers. + // + // Any new transaction would have the deletion in their snapshot, and are waiting + // for it to be applied prior to beginning. + break + } + + oldestTransaction := oldestElement.Value.(*Transaction) + if oldestTransaction.snapshot.ReadIndex >= index { + // If the oldest transaction is reading at this or later log index, it already has the deletion + // in its snapshot, and is waiting for it to be applied. Proceed with the deletion as there + // are no readers with the pre-deletion state in the snapshot. + break + } + + for { + select { + case <-oldestTransaction.finished: + // The oldest transaction finished. Proceed to check the second oldest open transaction. + case transaction := <-mgr.admissionQueue: + // The oldest transaction could also be waiting to commit. Since the Run goroutine is + // blocked here waiting for the transaction to finish, the write would never be admitted + // for processing, leading to a deadlock. Since the repository was deleted, the only correct + // outcome for the transaction would be to receive a not found error. Admit the transaction, + // and finish it with the correct result so we can unblock the deletion. + transaction.result <- ErrRepositoryNotFound + if err := transaction.finish(); err != nil { + return fmt.Errorf("finish transaction: %w", err) + } + + continue + case <-ctx.Done(): + } + + if err := ctx.Err(); err != nil { + return err + } + + break + } + } + + if err := os.RemoveAll(mgr.repositoryPath); err != nil { + return fmt.Errorf("remove repository: %w", err) + } + + if err := safe.NewSyncer().Sync(filepath.Dir(mgr.repositoryPath)); err != nil { + return fmt.Errorf("sync: %w", err) + } + + return nil +} + // applyPackFile unpacks the objects from the pack file into the repository if the log entry // has an associated pack file. func (mgr *TransactionManager) applyPackFile(ctx context.Context, logIndex LogIndex) error { diff --git a/internal/gitaly/transaction_manager_test.go b/internal/gitaly/transaction_manager_test.go index c2c68f76eaf5f6b060d7504253fbd76cf40c9f16..43b38b1ee00e0716b35e31f87368230a4d7b8e1e 100644 --- a/internal/gitaly/transaction_manager_test.go +++ b/internal/gitaly/transaction_manager_test.go @@ -131,6 +131,7 @@ func TestTransactionManager(t *testing.T) { type testSetup struct { Config config.Cfg RepositoryFactory localrepo.StorageScopedFactory + CommandFactory git.CommandFactory ObjectHash git.ObjectHash NonExistentOID git.ObjectID Commits testCommits @@ -194,6 +195,7 @@ func TestTransactionManager(t *testing.T) { Config: cfg, ObjectHash: objectHash, RepositoryFactory: repositoryFactory, + CommandFactory: cmdFactory, NonExistentOID: nonExistentOID, Commits: testCommits{ First: testCommit{ @@ -266,6 +268,8 @@ func TestTransactionManager(t *testing.T) { // TransactionID is the identifier given to the transaction created. This is used to identify // the transaction in later steps. TransactionID int + + AllowNonExistent bool // Context is the context to use for the Begin call. Context context.Context // ExpectedSnapshot is the expected snapshot of the transaction. @@ -293,6 +297,10 @@ func TestTransactionManager(t *testing.T) { DefaultBranchUpdate *DefaultBranchUpdate // CustomHooksUpdate is the custom hooks update to commit. CustomHooksUpdate *CustomHooksUpdate + // DeleteRepository deletes the repository on commit. + DeleteRepository bool + // CreateRepository creates the repository on commit. + CreateRepository bool } // Rollback calls Rollback on a transaction. @@ -307,8 +315,15 @@ func TestTransactionManager(t *testing.T) { ExpectedObjects []git.ObjectID } + // RemoveRepository removes the repository from the disk. It must be run with the TransactionManager + // stopped. + type RemoveRepository struct{} + // StateAssertions models an assertion of the entire state managed by the TransactionManager. type StateAssertion struct { + // RepositoryDoesntExist indicates the repository should not exist. If so, we just check that's + // the case and don't assert the other state. + RepositoryDoesntExist bool // DefaultBranch is the expected refname that HEAD points to. DefaultBranch git.ReferenceName // References is the expected state of references. @@ -2327,6 +2342,484 @@ func TestTransactionManager(t *testing.T) { }, }, }, + { + desc: "begin fails after repository deletion", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Begin{ + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "repository deletion fails if repository is deleted", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Commit{ + TransactionID: 2, + DeleteRepository: true, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "custom hooks update fails if repository is deleted", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Commit{ + TransactionID: 2, + CustomHooksUpdate: &CustomHooksUpdate{validCustomHooks(t)}, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "reference updates fail if repository is deleted", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "default branch update fails if repository is deleted", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + Commit{ + TransactionID: 2, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/new-default", + }, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "logged repository deletions are considered after restart", + steps: steps{ + StartManager{ + Hooks: testHooks{ + BeforeApplyLogEntry: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + StartManager{}, + Begin{ + TransactionID: 2, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "reapplying repository deletion works", + steps: steps{ + StartManager{ + Hooks: testHooks{ + BeforeStoreAppliedLogIndex: func(hookContext) { + panic(errSimulatedCrash) + }, + }, + ExpectedError: errSimulatedCrash, + }, + Begin{ + TransactionID: 1, + }, + Commit{ + TransactionID: 1, + DeleteRepository: true, + }, + AssertManager{ + ExpectedError: errSimulatedCrash, + }, + StartManager{}, + Begin{ + TransactionID: 2, + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + }, + }, + { + desc: "non-existent repository is correctly handled", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + ExpectedError: ErrRepositoryNotFound, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + }, + }, + { + // This is a serialization violation as the outcome would be different + // if the transactions were applied in different order. + desc: "deletion succeeds with concurrent writes to repository", + steps: steps{ + StartManager{}, + Begin{ + TransactionID: 1, + }, + Begin{ + TransactionID: 2, + }, + Commit{ + TransactionID: 1, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/branch", + }, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/branch": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, + }, + Commit{ + TransactionID: 2, + DeleteRepository: true, + }, + }, + expectedState: StateAssertion{ + RepositoryDoesntExist: true, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(2).toProto(), + }, + }, + }, + { + desc: "failing initialization prevents transaction beginning", + steps: steps{ + StartManager{ + ModifyRepository: func(tb testing.TB, repoPath string) { + // Remove the repository's directory and create a file in its place + // to fail the initialization. + require.NoError(t, os.RemoveAll(repoPath)) + require.NoError(t, os.WriteFile(repoPath, nil, fs.ModePerm)) + }, + ExpectedError: errNotDirectory, + }, + Begin{ + ExpectedError: errInitializationFailed, + }, + AssertManager{ + ExpectedError: errNotDirectory, + }, + }, + expectedState: StateAssertion{ + // The file still exists on the disk but this skips the repository assertions. + RepositoryDoesntExist: true, + }, + }, + { + desc: "create repository when it doesn't exist", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + AllowNonExistent: true, + }, + Commit{ + CreateRepository: true, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "create repository when it already exists", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + TransactionID: 1, + AllowNonExistent: true, + }, + Commit{ + TransactionID: 1, + CreateRepository: true, + }, + Begin{ + TransactionID: 2, + AllowNonExistent: true, + ExpectedError: ErrRepositoryAlreadyExists, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "create repository with interleaved creations", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + TransactionID: 1, + AllowNonExistent: true, + }, + Begin{ + TransactionID: 2, + AllowNonExistent: true, + }, + Commit{ + TransactionID: 1, + CreateRepository: true, + }, + Commit{ + TransactionID: 2, + CreateRepository: true, + ExpectedError: ErrRepositoryAlreadyExists, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "create repository again after deletion", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + TransactionID: 1, + AllowNonExistent: true, + }, + Commit{ + TransactionID: 1, + CreateRepository: true, + }, + Begin{ + TransactionID: 2, + ExpectedSnapshot: Snapshot{ + ReadIndex: 1, + }, + }, + Commit{ + TransactionID: 2, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack}, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, + }, + Begin{ + TransactionID: 3, + ExpectedSnapshot: Snapshot{ + ReadIndex: 2, + HookIndex: 2, + }, + }, + Commit{ + TransactionID: 3, + DeleteRepository: true, + }, + Begin{ + TransactionID: 4, + AllowNonExistent: true, + ExpectedSnapshot: Snapshot{ + ReadIndex: 3, + }, + }, + Commit{ + TransactionID: 4, + CreateRepository: true, + }, + Begin{ + TransactionID: 5, + ExpectedSnapshot: Snapshot{ + ReadIndex: 4, + }, + }, + Rollback{ + TransactionID: 5, + }, + }, + expectedState: StateAssertion{ + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(4).toProto(), + }, + Objects: []git.ObjectID{}, + }, + }, + { + desc: "create repository with full state", + steps: steps{ + RemoveRepository{}, + StartManager{}, + Begin{ + AllowNonExistent: true, + }, + Commit{ + CreateRepository: true, + ReferenceUpdates: ReferenceUpdates{ + "refs/heads/main": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.First.OID}, + "refs/heads/branch": {OldOID: setup.ObjectHash.ZeroOID, NewOID: setup.Commits.Second.OID}, + }, + QuarantinedPacks: [][]byte{setup.Commits.First.Pack, setup.Commits.Second.Pack}, + DefaultBranchUpdate: &DefaultBranchUpdate{ + Reference: "refs/heads/branch", + }, + CustomHooksUpdate: &CustomHooksUpdate{ + CustomHooksTAR: validCustomHooks(t), + }, + }, + }, + expectedState: StateAssertion{ + DefaultBranch: "refs/heads/branch", + References: []git.Reference{ + {Name: "refs/heads/main", Target: setup.Commits.First.OID.String()}, + {Name: "refs/heads/branch", Target: setup.Commits.Second.OID.String()}, + }, + Database: DatabaseState{ + string(keyAppliedLogIndex(relativePath)): LogIndex(1).toProto(), + }, + Directory: testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs/1.pack": packFileDirectoryEntry( + setup.Config, + umask.Mask(perm.PrivateFile), + []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + setup.Commits.Second.OID, + }, + ), + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks/1": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks/1/pre-receive": { + Mode: umask.Mask(fs.ModePerm), + Content: []byte("hook content"), + }, + "/wal/hooks/1/private-dir": {Mode: umask.Mask(fs.ModeDir | perm.PrivateDir)}, + "/wal/hooks/1/private-dir/private-file": {Mode: umask.Mask(perm.PrivateFile), Content: []byte("private content")}, + }, + Objects: []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + setup.Commits.Second.OID, + }, + }, + }, } type invalidReferenceTestCase struct { @@ -2461,14 +2954,15 @@ func TestTransactionManager(t *testing.T) { require.NoError(t, err) defer testhelper.MustClose(t, database) - stagingDir := t.TempDir() + stagingDir := filepath.Join(setup.Config.Storages[0].Path, "staging") + require.NoError(t, os.MkdirAll(stagingDir, perm.PrivateDir)) storagePath := setup.Config.Storages[0].Path var ( // managerRunning tracks whether the manager is running or stopped. managerRunning bool // transactionManager is the current TransactionManager instance. - transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, setup.CommandFactory, noopTransactionFinalizer) // managerErr is used for synchronizing manager stopping and returning // the error from Run. managerErr chan error @@ -2509,7 +3003,7 @@ func TestTransactionManager(t *testing.T) { managerRunning = true managerErr = make(chan error) - transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, noopTransactionFinalizer) + transactionManager = NewTransactionManager(database, storagePath, relativePath, stagingDir, setup.RepositoryFactory, setup.CommandFactory, noopTransactionFinalizer) installHooks(t, transactionManager, database, hooks{ beforeReadLogEntry: step.Hooks.BeforeApplyLogEntry, beforeResolveRevision: step.Hooks.BeforeAppendLogEntry, @@ -2551,7 +3045,15 @@ func TestTransactionManager(t *testing.T) { beginCtx = step.Context } - transaction, err := transactionManager.Begin(beginCtx) + var transaction *Transaction + var err error + + if step.AllowNonExistent { + transaction, err = transactionManager.BeginCreation(beginCtx) + } else { + transaction, err = transactionManager.Begin(beginCtx) + } + require.Equal(t, step.ExpectedError, err) if err == nil { require.Equal(t, step.ExpectedSnapshot, transaction.Snapshot()) @@ -2599,6 +3101,14 @@ func TestTransactionManager(t *testing.T) { } + if step.DeleteRepository { + transaction.DeleteRepository() + } + + if step.CreateRepository { + transaction.CreateRepository(setup.ObjectHash) + } + commitCtx := ctx if step.Context != nil { commitCtx = step.Context @@ -2611,6 +3121,8 @@ func TestTransactionManager(t *testing.T) { case Prune: gittest.Exec(t, setup.Config, "-C", repoPath, "prune") require.ElementsMatch(t, step.ExpectedObjects, gittest.ListObjects(t, setup.Config, repoPath)) + case RemoveRepository: + require.NoError(t, os.RemoveAll(repoPath)) default: t.Fatalf("unhandled step type: %T", step) } @@ -2621,34 +3133,40 @@ func TestTransactionManager(t *testing.T) { require.NoError(t, err) } - RequireReferences(t, ctx, repo, tc.expectedState.References) - RequireDefaultBranch(t, ctx, repo, tc.expectedState.DefaultBranch) RequireDatabase(t, ctx, database, tc.expectedState.Database) - expectedDirectory := tc.expectedState.Directory - if expectedDirectory == nil { - // Set the base state as the default so we don't have to repeat it in every test case but it - // gets asserted. - expectedDirectory = testhelper.DirectoryState{ - "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, - "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + if !tc.expectedState.RepositoryDoesntExist { + require.DirExists(t, repoPath) + RequireReferences(t, ctx, repo, tc.expectedState.References) + RequireDefaultBranch(t, ctx, repo, tc.expectedState.DefaultBranch) + + expectedDirectory := tc.expectedState.Directory + if expectedDirectory == nil { + // Set the base state as the default so we don't have to repeat it in every test case but it + // gets asserted. + expectedDirectory = testhelper.DirectoryState{ + "/wal": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/packs": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + "/wal/hooks": {Mode: umask.Mask(fs.ModeDir | fs.ModePerm)}, + } } - } - testhelper.RequireDirectoryState(t, repoPath, "wal", expectedDirectory) + testhelper.RequireDirectoryState(t, repoPath, "wal", expectedDirectory) - expectedObjects := tc.expectedState.Objects - if expectedObjects == nil { - expectedObjects = []git.ObjectID{ - setup.ObjectHash.EmptyTreeOID, - setup.Commits.First.OID, - setup.Commits.Second.OID, - setup.Commits.Third.OID, + expectedObjects := tc.expectedState.Objects + if expectedObjects == nil { + expectedObjects = []git.ObjectID{ + setup.ObjectHash.EmptyTreeOID, + setup.Commits.First.OID, + setup.Commits.Second.OID, + setup.Commits.Third.OID, + } } - } - require.ElementsMatch(t, expectedObjects, gittest.ListObjects(t, setup.Config, repoPath)) + require.ElementsMatch(t, expectedObjects, gittest.ListObjects(t, setup.Config, repoPath)) + } else { + require.NoDirExists(t, repoPath) + } entries, err := os.ReadDir(stagingDir) require.NoError(t, err) @@ -2664,6 +3182,7 @@ func checkManagerError(t *testing.T, managerErrChannel chan error, mgr *Transact referenceUpdates: ReferenceUpdates{"sentinel": {}}, result: make(chan error, 1), finalize: func() {}, + finish: func() error { return nil }, } var ( @@ -2804,7 +3323,7 @@ func BenchmarkTransactionManager(b *testing.B) { commit1 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents()) commit2 = gittest.WriteCommit(b, cfg, repoPath, gittest.WithParents(commit1)) - manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), repositoryFactory, noopTransactionFinalizer) + manager := NewTransactionManager(database, cfg.Storages[0].Path, repo.RelativePath, b.TempDir(), repositoryFactory, cmdFactory, noopTransactionFinalizer) managers = append(managers, manager) managerWG.Add(1) diff --git a/proto/go/gitalypb/log.pb.go b/proto/go/gitalypb/log.pb.go index 73fb1014882d020ed0505ffd70d6c94f89046a9e..8d2f8db00fe3a683606d4d8d634865ab3d144126 100644 --- a/proto/go/gitalypb/log.pb.go +++ b/proto/go/gitalypb/log.pb.go @@ -41,6 +41,10 @@ type LogEntry struct { // includes_pack denotes whether this log entry has a pack file associated // with it. The pack files are stored separately on the filesystem. IncludesPack bool `protobuf:"varint,4,opt,name=includes_pack,json=includesPack,proto3" json:"includes_pack,omitempty"` + // RepositoryDeletion, when set, indicates this log entry deletes the repository. + RepositoryDeletion *LogEntry_RepositoryDeletion `protobuf:"bytes,5,opt,name=repository_deletion,json=repositoryDeletion,proto3" json:"repository_deletion,omitempty"` + // repository_creation is set if this log entry creates a repository. + RepositoryCreation *LogEntry_RepositoryCreation `protobuf:"bytes,6,opt,name=repository_creation,json=repositoryCreation,proto3" json:"repository_creation,omitempty"` } func (x *LogEntry) Reset() { @@ -103,6 +107,20 @@ func (x *LogEntry) GetIncludesPack() bool { return false } +func (x *LogEntry) GetRepositoryDeletion() *LogEntry_RepositoryDeletion { + if x != nil { + return x.RepositoryDeletion + } + return nil +} + +func (x *LogEntry) GetRepositoryCreation() *LogEntry_RepositoryCreation { + if x != nil { + return x.RepositoryCreation + } + return nil +} + // LogIndex serializes a log index. It's used for storing a repository's // applied log index in the database. // @@ -318,49 +336,155 @@ func (x *LogEntry_CustomHooksUpdate) GetCustomHooksTar() []byte { return nil } +// RepositoryDeletion models a repository deletion. +type LogEntry_RepositoryDeletion struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *LogEntry_RepositoryDeletion) Reset() { + *x = LogEntry_RepositoryDeletion{} + if protoimpl.UnsafeEnabled { + mi := &file_log_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogEntry_RepositoryDeletion) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEntry_RepositoryDeletion) ProtoMessage() {} + +func (x *LogEntry_RepositoryDeletion) ProtoReflect() protoreflect.Message { + mi := &file_log_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEntry_RepositoryDeletion.ProtoReflect.Descriptor instead. +func (*LogEntry_RepositoryDeletion) Descriptor() ([]byte, []int) { + return file_log_proto_rawDescGZIP(), []int{0, 3} +} + +// RepositoryCreation models a log entry that creates a repository. +type LogEntry_RepositoryCreation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // object_format defines the object format to use for the repository. + ObjectFormat ObjectFormat `protobuf:"varint,1,opt,name=object_format,json=objectFormat,proto3,enum=gitaly.ObjectFormat" json:"object_format,omitempty"` +} + +func (x *LogEntry_RepositoryCreation) Reset() { + *x = LogEntry_RepositoryCreation{} + if protoimpl.UnsafeEnabled { + mi := &file_log_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LogEntry_RepositoryCreation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LogEntry_RepositoryCreation) ProtoMessage() {} + +func (x *LogEntry_RepositoryCreation) ProtoReflect() protoreflect.Message { + mi := &file_log_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LogEntry_RepositoryCreation.ProtoReflect.Descriptor instead. +func (*LogEntry_RepositoryCreation) Descriptor() ([]byte, []int) { + return file_log_proto_rawDescGZIP(), []int{0, 4} +} + +func (x *LogEntry_RepositoryCreation) GetObjectFormat() ObjectFormat { + if x != nil { + return x.ObjectFormat + } + return ObjectFormat_OBJECT_FORMAT_UNSPECIFIED +} + var File_log_proto protoreflect.FileDescriptor var file_log_proto_rawDesc = []byte{ 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x22, 0xfc, 0x03, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x12, 0x4d, 0x0a, 0x11, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x67, 0x69, - 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, - 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x10, 0x72, - 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, - 0x58, 0x0a, 0x15, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x5f, 0x62, 0x72, 0x61, 0x6e, 0x63, - 0x68, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, - 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x2e, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x52, 0x13, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, - 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x52, 0x0a, 0x13, 0x63, 0x75, 0x73, - 0x74, 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, - 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x11, 0x63, 0x75, 0x73, 0x74, - 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a, - 0x0d, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x50, 0x61, - 0x63, 0x6b, 0x1a, 0x51, 0x0a, 0x0f, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, - 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, - 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, - 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, - 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, - 0x65, 0x77, 0x4f, 0x69, 0x64, 0x1a, 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, - 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, - 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, - 0x61, 0x6d, 0x65, 0x1a, 0x3d, 0x0a, 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, - 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, - 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, - 0x61, 0x72, 0x22, 0x27, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, - 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, 0x67, - 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, - 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, - 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x6c, 0x79, 0x1a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x65, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x22, 0x8f, 0x06, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x4d, + 0x0a, 0x11, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x65, + 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x10, 0x72, 0x65, 0x66, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x12, 0x58, 0x0a, + 0x15, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x5f, 0x62, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x5f, + 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x67, + 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x44, + 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x52, 0x13, 0x64, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, + 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x52, 0x0a, 0x13, 0x63, 0x75, 0x73, 0x74, 0x6f, + 0x6d, 0x5f, 0x68, 0x6f, 0x6f, 0x6b, 0x73, 0x5f, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, + 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, + 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x52, 0x11, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, + 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x69, + 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x0c, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x73, 0x50, 0x61, 0x63, 0x6b, + 0x12, 0x54, 0x0a, 0x13, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x64, + 0x65, 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, + 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, + 0x6c, 0x65, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x54, 0x0a, 0x13, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x5f, 0x63, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4c, 0x6f, 0x67, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x12, 0x72, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x51, 0x0a, 0x0f, + 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, + 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, + 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x6e, 0x65, 0x77, 0x5f, 0x6f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6e, 0x65, 0x77, 0x4f, 0x69, 0x64, 0x1a, + 0x3c, 0x0a, 0x13, 0x44, 0x65, 0x66, 0x61, 0x75, 0x6c, 0x74, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, + 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0d, + 0x72, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x1a, 0x3d, 0x0a, + 0x11, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x12, 0x28, 0x0a, 0x10, 0x63, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x5f, 0x68, 0x6f, 0x6f, + 0x6b, 0x73, 0x5f, 0x74, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x63, 0x75, + 0x73, 0x74, 0x6f, 0x6d, 0x48, 0x6f, 0x6f, 0x6b, 0x73, 0x54, 0x61, 0x72, 0x1a, 0x14, 0x0a, 0x12, + 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x69, + 0x6f, 0x6e, 0x1a, 0x4f, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x39, 0x0a, 0x0d, 0x6f, 0x62, 0x6a, 0x65, + 0x63, 0x74, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x14, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x46, + 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x52, 0x0c, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x46, 0x6f, 0x72, + 0x6d, 0x61, 0x74, 0x22, 0x27, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, + 0x1b, 0x0a, 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x42, 0x34, 0x5a, 0x32, + 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, + 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, + 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -375,23 +499,29 @@ func file_log_proto_rawDescGZIP() []byte { return file_log_proto_rawDescData } -var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_log_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_log_proto_goTypes = []interface{}{ (*LogEntry)(nil), // 0: gitaly.LogEntry (*LogIndex)(nil), // 1: gitaly.LogIndex (*LogEntry_ReferenceUpdate)(nil), // 2: gitaly.LogEntry.ReferenceUpdate (*LogEntry_DefaultBranchUpdate)(nil), // 3: gitaly.LogEntry.DefaultBranchUpdate (*LogEntry_CustomHooksUpdate)(nil), // 4: gitaly.LogEntry.CustomHooksUpdate + (*LogEntry_RepositoryDeletion)(nil), // 5: gitaly.LogEntry.RepositoryDeletion + (*LogEntry_RepositoryCreation)(nil), // 6: gitaly.LogEntry.RepositoryCreation + (ObjectFormat)(0), // 7: gitaly.ObjectFormat } var file_log_proto_depIdxs = []int32{ 2, // 0: gitaly.LogEntry.reference_updates:type_name -> gitaly.LogEntry.ReferenceUpdate 3, // 1: gitaly.LogEntry.default_branch_update:type_name -> gitaly.LogEntry.DefaultBranchUpdate 4, // 2: gitaly.LogEntry.custom_hooks_update:type_name -> gitaly.LogEntry.CustomHooksUpdate - 3, // [3:3] is the sub-list for method output_type - 3, // [3:3] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 5, // 3: gitaly.LogEntry.repository_deletion:type_name -> gitaly.LogEntry.RepositoryDeletion + 6, // 4: gitaly.LogEntry.repository_creation:type_name -> gitaly.LogEntry.RepositoryCreation + 7, // 5: gitaly.LogEntry.RepositoryCreation.object_format:type_name -> gitaly.ObjectFormat + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_log_proto_init() } @@ -399,6 +529,7 @@ func file_log_proto_init() { if File_log_proto != nil { return } + file_shared_proto_init() if !protoimpl.UnsafeEnabled { file_log_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*LogEntry); i { @@ -460,6 +591,30 @@ func file_log_proto_init() { return nil } } + file_log_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LogEntry_RepositoryDeletion); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_log_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LogEntry_RepositoryCreation); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -467,7 +622,7 @@ func file_log_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_log_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 7, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/log.proto b/proto/log.proto index 9ca6c32ac9d9a68f9ec9f074c8850e86d7842c9a..564ddd1a1e484cf7469b00f4b7d55db9933cf6d5 100644 --- a/proto/log.proto +++ b/proto/log.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package gitaly; +import "shared.proto"; + option go_package = "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"; // LogEntry is a single entry in a repository's write-ahead log. @@ -35,6 +37,16 @@ message LogEntry { bytes custom_hooks_tar = 1; } + // RepositoryDeletion models a repository deletion. + message RepositoryDeletion { + } + + // RepositoryCreation models a log entry that creates a repository. + message RepositoryCreation { + // object_format defines the object format to use for the repository. + ObjectFormat object_format = 1; + } + // reference_updates contains the reference updates this log // entry records. The logged reference updates have already passed // through verification and are applied without any further checks. @@ -47,6 +59,10 @@ message LogEntry { // includes_pack denotes whether this log entry has a pack file associated // with it. The pack files are stored separately on the filesystem. bool includes_pack = 4; + // RepositoryDeletion, when set, indicates this log entry deletes the repository. + RepositoryDeletion repository_deletion = 5; + // repository_creation is set if this log entry creates a repository. + RepositoryCreation repository_creation = 6; } // LogIndex serializes a log index. It's used for storing a repository's