diff --git a/internal/backup/repository.go b/internal/backup/repository.go index 7d798480a4d7b751aedbdff3a065890e1f2c009f..cd5e554b8179fb933c3189e2b193f6cb31c97d17 100644 --- a/internal/backup/repository.go +++ b/internal/backup/repository.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -711,6 +712,12 @@ func (r *localRepository) Create(ctx context.Context, hash git.ObjectHash, defau return fmt.Errorf("local repository: create: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, r.repo.GetRelativePath()); err != nil { + return fmt.Errorf("recording migration key: %w", err) + } + } + // Recreate the local repository, since the cache of object hash and ref-format needs // to be invalidated. r.repo = localrepo.New(r.logger, r.locator, r.gitCmdFactory, r.catfileCache, r.repo) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 00e5dfd4e4dfd417f24f37979fe3d89318a04937..f4a27a343868238380028e6311f2831ddb6b9c38 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -39,6 +39,7 @@ import ( nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" @@ -409,11 +410,13 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { storagemgr.NewFactory( logger, dbMgr, - partition.NewFactory( - gitCmdFactory, - localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), - partitionMetrics, - logConsumer, + migration.NewFactory( + partition.NewFactory( + gitCmdFactory, + localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), + partitionMetrics, + logConsumer, + ), ), 2, storageMetrics, @@ -455,11 +458,13 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { storagemgr.NewFactory( logger, dbMgr, - partition.NewFactory( - gitCmdFactory, - localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), - partitionMetrics, - nil, + migration.NewFactory( + partition.NewFactory( + gitCmdFactory, + localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), + partitionMetrics, + nil, + ), ), // In recovery mode we don't want to keep inactive partitions active. The cache // however can't be disabled so simply set it to one. diff --git a/internal/cli/gitaly/subcmd_recovery.go b/internal/cli/gitaly/subcmd_recovery.go index b348e9ba5b0cfe4abcd5b49fbb07dba280840190..ba71551c2189c241e663459d66213310d8910b13 100644 --- a/internal/cli/gitaly/subcmd_recovery.go +++ b/internal/cli/gitaly/subcmd_recovery.go @@ -22,6 +22,7 @@ import ( nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -122,11 +123,13 @@ func recoveryStatusAction(ctx *cli.Context) (returnErr error) { storagemgr.NewFactory( logger, dbMgr, - partition.NewFactory( - gitCmdFactory, - localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), - partitionMetrics, - nil, + migration.NewFactory( + partition.NewFactory( + gitCmdFactory, + localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), + partitionMetrics, + nil, + ), ), 1, storageMetrics, diff --git a/internal/gitaly/service/objectpool/create.go b/internal/gitaly/service/objectpool/create.go index 21dcadc63874cd0bc8de5f12d10d63c08752dfe4..232858357921ff862b5ba3f8435fca552e897922 100644 --- a/internal/gitaly/service/objectpool/create.go +++ b/internal/gitaly/service/objectpool/create.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/objectpool" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -60,5 +61,11 @@ func (s *server) CreateObjectPool(ctx context.Context, in *gitalypb.CreateObject return nil, structerr.New("creating object pool: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, poolRepo.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateObjectPoolResponse{}, nil } diff --git a/internal/gitaly/service/objectpool/delete.go b/internal/gitaly/service/objectpool/delete.go index 3ccd06700a6611a156b2d2bd224fc9cacd2a3bdb..010a7674c529f03aa4f2aef973f1ad6c43b3eaf1 100644 --- a/internal/gitaly/service/objectpool/delete.go +++ b/internal/gitaly/service/objectpool/delete.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/objectpool" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -28,6 +29,9 @@ func (s *server) DeleteObjectPool(ctx context.Context, in *gitalypb.DeleteObject if tx := storage.ExtractTransaction(ctx); tx != nil { tx.DeleteRepository() + if err := migration.RecordKeyDeletion(tx, pool.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } } return &gitalypb.DeleteObjectPoolResponse{}, nil diff --git a/internal/gitaly/service/repository/create_fork.go b/internal/gitaly/service/repository/create_fork.go index 89b5b245c94025ee49295756d17abe78fb4835eb..5f0c1c82bbb520a71692c74b9b7c54ccc2f374ee 100644 --- a/internal/gitaly/service/repository/create_fork.go +++ b/internal/gitaly/service/repository/create_fork.go @@ -10,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -115,5 +116,11 @@ func (s *server) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest return nil, structerr.NewInternal("creating fork: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, targetRepository.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateForkResponse{}, nil } diff --git a/internal/gitaly/service/repository/create_repository.go b/internal/gitaly/service/repository/create_repository.go index 75c2d83d43da51690d8d8309fbedfe60d97056f7..a189b29349f661a11c32b35049b3bc14ff7e4889 100644 --- a/internal/gitaly/service/repository/create_repository.go +++ b/internal/gitaly/service/repository/create_repository.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -41,5 +42,11 @@ func (s *server) CreateRepository(ctx context.Context, req *gitalypb.CreateRepos return nil, structerr.NewInternal("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, repository.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateRepositoryResponse{}, nil } diff --git a/internal/gitaly/service/repository/create_repository_from_bundle.go b/internal/gitaly/service/repository/create_repository_from_bundle.go index e20d137237c339d91992cd0dbd832fc799f49c59..b191b05bb722741f5b8402b063018222f3db9446 100644 --- a/internal/gitaly/service/repository/create_repository_from_bundle.go +++ b/internal/gitaly/service/repository/create_repository_from_bundle.go @@ -3,6 +3,7 @@ package repository import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v16/streamio" @@ -44,5 +45,11 @@ func (s *server) CreateRepositoryFromBundle(stream gitalypb.RepositoryService_Cr return structerr.NewInternal("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, repo.GetRelativePath()); err != nil { + return structerr.NewInternal("recording migration key: %w", err) + } + } + return stream.SendAndClose(&gitalypb.CreateRepositoryFromBundleResponse{}) } diff --git a/internal/gitaly/service/repository/create_repository_from_snapshot.go b/internal/gitaly/service/repository/create_repository_from_snapshot.go index afc88d0902e68b3666996d9c7165fd4cf69d5dc1..bdf94c3ff581bce9c5505dc35302363a68cd8e9a 100644 --- a/internal/gitaly/service/repository/create_repository_from_snapshot.go +++ b/internal/gitaly/service/repository/create_repository_from_snapshot.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/command" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" @@ -148,5 +149,11 @@ func (s *server) CreateRepositoryFromSnapshot(ctx context.Context, in *gitalypb. return nil, structerr.NewInternal("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, repository.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateRepositoryFromSnapshotResponse{}, nil } diff --git a/internal/gitaly/service/repository/create_repository_from_url.go b/internal/gitaly/service/repository/create_repository_from_url.go index 9daea60360fdbb04cee47c00d214b50fe1f593b9..de7ef11c19971ff83ba2bc6e857e41fc6d3bcd25 100644 --- a/internal/gitaly/service/repository/create_repository_from_url.go +++ b/internal/gitaly/service/repository/create_repository_from_url.go @@ -14,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -137,6 +138,12 @@ func (s *server) CreateRepositoryFromURL(ctx context.Context, req *gitalypb.Crea return nil, structerr.NewInternal("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, req.GetRepository().GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateRepositoryFromURLResponse{}, nil } diff --git a/internal/gitaly/service/repository/remove.go b/internal/gitaly/service/repository/remove.go index 551e8e8d1049d45f956c002e1f134bd78e70d51e..42a4bca7b62338f0c5b0f8b47419d5323daa0ebe 100644 --- a/internal/gitaly/service/repository/remove.go +++ b/internal/gitaly/service/repository/remove.go @@ -5,6 +5,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -21,6 +22,9 @@ func (s *server) RemoveRepository(ctx context.Context, in *gitalypb.RemoveReposi if tx := storage.ExtractTransaction(ctx); tx != nil { tx.DeleteRepository() + if err := migration.RecordKeyDeletion(tx, repository.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } } return &gitalypb.RemoveRepositoryResponse{}, nil diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index 54bf5fb04af5ca594c90acfb5e9cc3e7ee7d60e2..0aaa1cd3dde39da556d835a16170fb10d0d0b1e5 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" @@ -157,6 +158,12 @@ func (s *server) createFromSnapshot(ctx context.Context, source, target *gitalyp return fmt.Errorf("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, target.GetRelativePath()); err != nil { + return fmt.Errorf("recording migration key: %w", err) + } + } + return nil } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/factory.go b/internal/gitaly/storage/storagemgr/partition/migration/factory.go new file mode 100644 index 0000000000000000000000000000000000000000..eed0e3ca0b5beafa7feeced9c5dad2eca9c865ae --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/factory.go @@ -0,0 +1,32 @@ +package migration + +import ( + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" +) + +// migrationFactory defines a partition factory that wraps another partition factory. +type migrationFactory struct { + factory storagemgr.PartitionFactory +} + +// NewFactory returns a new Factory. +func NewFactory(factory storagemgr.PartitionFactory) storagemgr.PartitionFactory { + return &migrationFactory{factory: factory} +} + +// New returns a new Partition instance. +func (f migrationFactory) New( + logger log.Logger, + partitionID storage.PartitionID, + db keyvalue.Transactioner, + storageName string, + storagePath string, + absoluteStateDir string, + stagingDir string, +) storagemgr.Partition { + partition := f.factory.New(logger, partitionID, db, storageName, storagePath, absoluteStateDir, stagingDir) + return NewPartition(partition, logger) +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager.go b/internal/gitaly/storage/storagemgr/partition/migration/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..2fe1e8c92d70083b401bfc15c9d1ce4752ecafef --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager.go @@ -0,0 +1,245 @@ +package migration + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "sync" + + "github.com/dgraph-io/badger/v4" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" +) + +// migrationState defines the state of a migration for a repository. +type migrationState struct { + // doneCh is closed once the repository has no ongoing migrations. + doneCh <-chan struct{} + // err indicates if there was an error during the migration process. + err error +} + +// migrationManager coordinates executing repository migrations. +type migrationManager struct { + storagemgr.Partition + mu sync.Mutex + logger log.Logger + // migrations defines all migration jobs that are expected to be performed on a repository + // before it can process incoming transactions. + migrations []migration + // migrationStates defines the state of a repository migration and is used to block concurrent + // transactions on the same repository while a migration is pending. + migrationStates map[string]*migrationState +} + +// NewPartition creates a migration manager that wraps the provided partition. +func NewPartition(partition storagemgr.Partition, logger log.Logger) storagemgr.Partition { + return &migrationManager{ + Partition: partition, + logger: logger, + migrations: migrations, + migrationStates: map[string]*migrationState{}, + } +} + +func (m *migrationManager) Begin(ctx context.Context, opts storage.BeginOptions) (storage.Transaction, error) { + if err := m.migrate(ctx, opts.RelativePaths); err != nil { + return nil, fmt.Errorf("migrate: %w", err) + } + + return m.Partition.Begin(ctx, opts) +} + +// migrate handles setting up migration state and executing outstanding migrations. +func (m *migrationManager) migrate(ctx context.Context, relativePaths []string) error { + // To perform a migration, the manager must have migrations configured and the transaction must + // target a repository. If not, skip migration handling and proceed with the transaction. + if len(m.migrations) == 0 || len(relativePaths) == 0 { + return nil + } + + relativePath := relativePaths[0] + + // Check if the repository already has a pending migration. + m.mu.Lock() + state, ok := m.migrationStates[relativePath] + if !ok { + doneCh := make(chan struct{}) + defer close(doneCh) + state = &migrationState{doneCh: doneCh} + m.migrationStates[relativePath] = state + } + m.mu.Unlock() + + // Block concurrent transactions on the same repository until outstanding migrations complete. + if ok { + select { + case <-ctx.Done(): + return ctx.Err() + case <-state.doneCh: + if state.err != nil { + // Migrations are required to succeed before the repository can serve traffic. + return fmt.Errorf("waiting on migrations: %w", state.err) + } + return nil + } + } + + // To avoid migration failures due to request context cancellation, a copy that is not canceled + // when parent is canceled is used. + if err := m.performMigrations(context.WithoutCancel(ctx), relativePaths); err != nil { + // Record the error as part of the migration state so concurrent transactions are notified. + state.err = err + return fmt.Errorf("performing migrations: %w", err) + } + + return nil +} + +// performMigrations performs any missing migrations on a repository. +func (m *migrationManager) performMigrations(ctx context.Context, relativePaths []string) (returnedErr error) { + relativePath := relativePaths[0] + + id, err := m.getLastMigrationID(ctx, relativePath) + if errors.Is(err, storage.ErrRepositoryNotFound) { + // If the repository is not found pretend the repository is up-to-date with migrations and + // let the downstream transaction set the migration key during repository creation. + return nil + } else if err != nil { + return fmt.Errorf("getting last migration: %w", err) + } + + // If the repository is already up-to-date, there is no need to start a transaction and perform + // migrations. + maxID := m.migrations[len(m.migrations)-1].id + if id == maxID { + return nil + } else if id > maxID { + return fmt.Errorf("repository has invalid migration key: %d", id) + } + + // Start a single transaction that records all outstanding migrations that get executed. + txn, err := m.Partition.Begin(ctx, storage.BeginOptions{ + Write: true, + RelativePaths: relativePaths, + }) + if err != nil { + return fmt.Errorf("begin migration update: %w", err) + } + defer func() { + if returnedErr != nil { + if err := txn.Rollback(ctx); err != nil { + returnedErr = errors.Join(err, fmt.Errorf("rollback: %w", err)) + } + } + }() + + for _, migration := range m.migrations { + if id >= migration.id { + continue + } + + // A migration may have configuration allowing it to be disabled. As migrations are + // performed in order, if a disabled migration is encountered, the remaining migrations are + // also not executed. Since repository migrations are currently only attempted once for a + // repository during the partition lifetime, a previously disabled migration may not + // immediately be executed in the next transaction. Migration state must first be reset. + if migration.isDisabled != nil && migration.isDisabled(ctx) { + break + } + + m.logger.WithFields(log.Fields{ + "migration_id": migration.id, + "relative_path": relativePath, + }).Info("running migration") + + if err := migration.run(ctx, txn, relativePath); err != nil { + return fmt.Errorf("run migration: %w", err) + } + } + + if err := txn.Commit(ctx); err != nil { + return fmt.Errorf("commit migration update: %w", err) + } + + return nil +} + +// getLastMigrationID returns the ID of the last executed migration for a repository. +func (m *migrationManager) getLastMigrationID(ctx context.Context, relativePath string) (_ uint64, returnedErr error) { + item, repoExists, err := m.readMigrationKey(ctx, relativePath) + if err != nil { + return 0, fmt.Errorf("reading migration key: %w", err) + } + + // If the repository does not exist, is it expected to be created by the downstream transaction. + if !repoExists { + return 0, storage.ErrRepositoryNotFound + } + + // If the repository does exist, it means the repository has never had a migration run. + // All configured migrations should be run against the migration. + if item == nil { + return 0, nil + } + + var id uint64 + _ = item.Value(func(value []byte) error { + id = binary.BigEndian.Uint64(value) + return nil + }) + + return id, nil +} + +// readMigrationKey returns the value for a repository migration key in a transaction and also +// returns if the repository exists on disk. If no key exists, nil is returned for the item value. +func (m *migrationManager) readMigrationKey(ctx context.Context, relativePath string) (_ keyvalue.Item, _ bool, returnedErr error) { + txn, err := m.Partition.Begin(ctx, storage.BeginOptions{RelativePaths: []string{relativePath}}) + if err != nil { + return nil, false, fmt.Errorf("begin migration key transaction: %w", err) + } + defer func() { + if returnedErr != nil { + if err := txn.Rollback(ctx); err != nil { + returnedErr = errors.Join(err, fmt.Errorf("rollback: %w", err)) + } + } + }() + + repoExists := true + item, err := txn.KV().Get(migrationKey(relativePath)) + switch { + case errors.Is(err, badger.ErrKeyNotFound): + // If no migration key is found, it means either the repository is being created or the + // repository has never performed a migration before. + repoExists, err = checkRepoExists(filepath.Join(txn.Root(), relativePath)) + if err != nil { + return nil, false, fmt.Errorf("check repo exists: %w", err) + } + case err != nil: + return nil, false, fmt.Errorf("getting migration key: %w", err) + } + + if err := txn.Commit(ctx); returnedErr == nil && err != nil { + return nil, false, fmt.Errorf("commit migration key transaction: %w", err) + } + + return item, repoExists, nil +} + +func checkRepoExists(repoPath string) (bool, error) { + if _, err := os.Stat(repoPath); err != nil { + if errors.Is(err, fs.ErrNotExist) { + return false, nil + } + return false, err + } + return true, nil +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7bd552a0ca7553e11d1632a3901c5c1807614a9d --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go @@ -0,0 +1,399 @@ +package migration + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + "time" + + "github.com/dgraph-io/badger/v4" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" +) + +func TestMigrationManager_Begin(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + disabledFn := func(context.Context) bool { return true } + migrationErr := errors.New("migration error") + + errFn := func(context.Context, storage.Transaction) error { return migrationErr } + recordingFn := func(id uint64) func(_ context.Context, txn storage.Transaction) error { + return func(_ context.Context, txn storage.Transaction) error { + return txn.KV().Set(uint64ToBytes(id), nil) + } + } + + migrationFn := func(id uint64) migration { + return migration{ + id: id, + fn: recordingFn(id), + } + } + + for _, tc := range []struct { + desc string + migrations []migration + startingMigration *migration + noRepository bool + expectedState *migrationState + expectedMigrationIDs map[uint64]struct{} + expectedErr error + }{ + { + desc: "no migrations configured", + migrations: nil, + expectedState: nil, + expectedMigrationIDs: nil, + }, + { + desc: "repository does not exist", + migrations: []migration{migrationFn(1)}, + startingMigration: nil, + noRepository: true, + expectedState: &migrationState{}, + expectedMigrationIDs: nil, + }, + { + desc: "no migration key in preexisting repository", + migrations: []migration{migrationFn(1), migrationFn(2)}, + startingMigration: nil, + noRepository: false, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{1: {}, 2: {}}, + }, + { + desc: "no outstanding migrations", + migrations: []migration{migrationFn(1), migrationFn(2)}, + startingMigration: &migration{id: 2}, + expectedState: &migrationState{}, + expectedMigrationIDs: nil, + }, + { + desc: "single outstanding migration applied", + migrations: []migration{migrationFn(1), migrationFn(2)}, + startingMigration: &migration{id: 1}, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{2: {}}, + }, + { + desc: "multiple outstanding migration applied", + migrations: []migration{migrationFn(1), migrationFn(2), migrationFn(3)}, + startingMigration: &migration{id: 1}, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{2: {}, 3: {}}, + }, + { + desc: "disabled migration", + migrations: []migration{migrationFn(1), {id: 2, isDisabled: disabledFn, fn: recordingFn(2)}, migrationFn(3)}, + startingMigration: &migration{id: 0}, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{1: {}}, + }, + { + desc: "error returned during migrations", + migrations: []migration{migrationFn(1), {id: 2, fn: errFn}, migrationFn(3)}, + startingMigration: &migration{id: 1}, + expectedState: &migrationState{ + err: migrationErr, + }, + expectedMigrationIDs: nil, + expectedErr: migrationErr, + }, + { + desc: "starting migration key invalid", + migrations: []migration{migrationFn(1), migrationFn(2), migrationFn(3)}, + startingMigration: &migration{id: 4}, + expectedState: &migrationState{ + err: errors.New("repository has invalid migration key: 4"), + }, + expectedMigrationIDs: nil, + expectedErr: errors.New("repository has invalid migration key: 4"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + cfg := testcfg.Build(t) + + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + relativePath := repo.GetRelativePath() + if tc.noRepository { + relativePath = "does-not-exist" + } + + testPartitionID := storage.PartitionID(1) + logger := testhelper.NewLogger(t) + database, err := keyvalue.NewBadgerStore(testhelper.SharedLogger(t), t.TempDir()) + require.NoError(t, err) + defer testhelper.MustClose(t, database) + + storageName := cfg.Storages[0].Name + storagePath := cfg.Storages[0].Path + + stateDir := filepath.Join(storagePath, "state") + require.NoError(t, os.MkdirAll(stateDir, mode.Directory)) + + stagingDir := filepath.Join(storagePath, "staging") + require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) + + cmdFactory := gittest.NewCommandFactory(t, cfg) + cache := catfile.NewCache(cfg) + defer cache.Stop() + + repositoryFactory, err := localrepo.NewFactory( + logger, config.NewLocator(cfg), cmdFactory, cache, + ).ScopeByStorage(ctx, cfg.Storages[0].Name) + require.NoError(t, err) + + m := partition.NewMetrics( + housekeeping.NewMetrics(cfg.Prometheus), + snapshot.NewMetrics(), + ).Scope(storageName) + + tm := partition.NewTransactionManager( + testPartitionID, + logger, + database, + storageName, + storagePath, + stateDir, + stagingDir, + cmdFactory, + repositoryFactory, + m, + nil, + ) + + mm := migrationManager{ + Partition: tm, + logger: logger, + migrations: tc.migrations, + migrationStates: map[string]*migrationState{}, + } + + managerErr := make(chan error) + go func() { + managerErr <- tm.Run() + }() + + if tc.startingMigration != nil { + txn, err := tm.Begin(ctx, storage.BeginOptions{ + Write: true, + RelativePaths: []string{relativePath}, + }) + require.NoError(t, err) + + require.NoError(t, txn.KV().Set(migrationKey(relativePath), uint64ToBytes(tc.startingMigration.id))) + require.NoError(t, txn.Commit(ctx)) + } + + // Begin and commit transaction through the migration manager to exercise the migration logic. + if txn, err := mm.Begin(ctx, storage.BeginOptions{ + Write: false, + RelativePaths: []string{relativePath}, + }); err != nil { + require.ErrorContains(t, err, tc.expectedErr.Error()) + } else { + require.NoError(t, err) + + // In this test, each executed migration records its ID in the KV store. Validate + // that the expected migrations were performed. + for _, m := range tc.migrations { + _, expected := tc.expectedMigrationIDs[m.id] + if _, err := txn.KV().Get(uint64ToBytes(m.id)); err != nil { + require.ErrorIs(t, err, badger.ErrKeyNotFound) + require.False(t, expected) + } else { + require.NoError(t, err) + require.True(t, expected) + } + } + + require.NoError(t, txn.Commit(ctx)) + } + + if state, ok := mm.migrationStates[relativePath]; ok { + require.NotNil(t, tc.expectedState) + if tc.expectedState.err != nil { + require.ErrorContains(t, state.err, tc.expectedState.err.Error()) + } else { + require.NoError(t, state.err) + } + } else { + require.Nil(t, tc.expectedState) + require.Empty(t, mm.migrationStates) + } + + tm.Close() + require.NoError(t, tm.CloseSnapshots()) + require.NoError(t, <-managerErr) + }) + } +} + +func TestMigrationManager_Concurrent(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + noopFn := func(context.Context, storage.Transaction) error { return nil } + + setupMockPartition := func(firstTransactionFn func(context.Context) error) *mockPartition { + kvFn := func() keyvalue.ReadWriter { + return &mockReadWriter{ + getFn: func(key []byte) (keyvalue.Item, error) { + return &mockItem{ + valueFn: func(fn func(value []byte) error) error { + return fn(uint64ToBytes(0)) + }, + }, nil + }, + } + } + + firstTransaction := true + return &mockPartition{ + beginFn: func(context.Context, storage.BeginOptions) (storage.Transaction, error) { + if firstTransaction { + firstTransaction = false + return &mockTransaction{ + kvFn: kvFn, + commitFn: firstTransactionFn, + }, nil + } + return &mockTransaction{kvFn: kvFn}, nil + }, + } + } + + for _, tc := range []struct { + desc string + samePath bool + expectedBlocked bool + expectedErr error + }{ + { + desc: "same repo concurrent transaction blocked", + samePath: true, + expectedBlocked: true, + }, + { + desc: "different repo concurrent transaction not blocked", + samePath: false, + expectedBlocked: false, + }, + { + desc: "failed migration propagated to concurrent transaction", + samePath: true, + expectedBlocked: true, + expectedErr: errors.New("migration failed"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + path1, path2 := "foo", "bar" + if tc.samePath { + path1 = path2 + } + + // The underlying transaction manager is mocked to provide a means to control when the + // first transaction gets committed. In this test, the first transaction that gets + // opened happens as part of the migration. Blocking here allows validation of + // concurrent transactions to ensure they are being blocked when required. + firstTransactionStarted := make(chan struct{}) + firstTransactionBlocked := make(chan struct{}) + mockPartition := setupMockPartition(func(ctx context.Context) error { + close(firstTransactionStarted) + <-firstTransactionBlocked + return tc.expectedErr + }) + + // In this test, the configured migrations are never executed because the repository + // does not exist in the snapshot. The migration is configured only to trigger the + // migration manager to block concurrent transactions. + mm := migrationManager{ + Partition: mockPartition, + logger: testhelper.NewLogger(t), + migrations: []migration{{id: 1, fn: noopFn}}, + migrationStates: map[string]*migrationState{}, + } + + // Start a transaction that triggers a migration. The mocks are set up in a such a way + // that migration manager determines no migrations need to be performed. + errCh1 := make(chan error) + go func() { + _, err := mm.Begin(ctx, storage.BeginOptions{ + RelativePaths: []string{path1}, + }) + errCh1 <- err + }() + + // Start a second transaction after the migration has started in the first. + errCh2 := make(chan error) + go func() { + <-firstTransactionStarted + _, err := mm.Begin(ctx, storage.BeginOptions{ + RelativePaths: []string{path2}, + }) + + // When a concurrent transaction is started with the same relative path, it is + // expected to be blocked until the migration has completed. If the concurrent + // transaction is started with a different relative path it can proceed without + // being blocked. + select { + case <-firstTransactionBlocked: + if !tc.expectedBlocked { + require.Fail(t, "transaction was not blocked") + } + default: + if tc.expectedBlocked { + require.Fail(t, "transaction was not blocked") + } + } + errCh2 <- err + }() + + // Wait a small amount of time before releasing the first transaction to ensure + // concurrent transaction against the same repository are blocked and concurrent + // transactions against a different repository are not blocked. + time.Sleep(time.Second) + close(firstTransactionBlocked) + + if tc.expectedErr != nil { + // If the migration returns an error, it is expected that the error message be + // propagated to the blocked concurrent transactions. + require.ErrorIs(t, <-errCh1, tc.expectedErr) + require.ErrorIs(t, <-errCh2, tc.expectedErr) + } else { + // If the migration succeeds, blocked concurrent transaction are expected to proceed + // without error. + require.NoError(t, <-errCh1) + require.NoError(t, <-errCh2) + } + }) + } +} + +type mockPartition struct { + storagemgr.Partition + beginFn func(context.Context, storage.BeginOptions) (storage.Transaction, error) +} + +func (m mockPartition) Begin(ctx context.Context, opts storage.BeginOptions) (storage.Transaction, error) { + return m.beginFn(ctx, opts) +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/migration.go b/internal/gitaly/storage/storagemgr/partition/migration/migration.go new file mode 100644 index 0000000000000000000000000000000000000000..80d0760781853033bf1dae4d744af963a2ccc5b3 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/migration.go @@ -0,0 +1,93 @@ +package migration + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" +) + +var ( + // migrations is a list of configured migrations that must be performed on repositories. + migrations []migration + // errInvalidMigration is returned if a migration being run is improperly configured. + errInvalidMigration = errors.New("invalid migration") +) + +const migrationKeyPrefix = "m/" + +// migration defines an individual migration job to be performed on a repository. +type migration struct { + // id is the unique identifier for a migration and used by the repository to record the last + // migration it performed. Subsequent migration jobs should always use increasing numbers. + id uint64 + // fn is the function executed to modify the WAL entry during transaction commit. + fn func(context.Context, storage.Transaction) error + // isDisabled defines an optional check to prevent a migration from being executed. + isDisabled func(ctx context.Context) bool +} + +// run performs the migration job on the provided transaction. +func (m migration) run(ctx context.Context, txn storage.Transaction, relativePath string) error { + if m.fn == nil { + return errInvalidMigration + } + + if err := m.fn(ctx, txn); err != nil { + return fmt.Errorf("migrate repository: %w", err) + } + + // If migration operations are successfully recorded, the last run migration ID is also recorded + // signifying it has been completed. + if err := m.recordID(txn, relativePath); err != nil { + return fmt.Errorf("setting migration key: %w", err) + } + + return nil +} + +// recordID sets the migration ID to be recorded during a transaction. +func (m migration) recordID(txn storage.Transaction, relativePath string) error { + val := uint64ToBytes(m.id) + return txn.KV().Set(migrationKey(relativePath), val) +} + +// RecordKeyCreation initializes the migration key for a new repository. +func RecordKeyCreation(txn storage.Transaction, relativePath string) error { + // Generally, migration keys should be initialized to the latest migration because we should not + // be created repositories with outdated state. The ID of the latest configured migration is + // recorded in the transaction. If no migrations are configured, the ID is set to zero. + var migr migration + if len(migrations) > 0 { + migr = migrations[len(migrations)-1] + } + + if err := migr.recordID(txn, relativePath); err != nil { + return fmt.Errorf("initializing key: %w", err) + } + + return nil +} + +// RecordKeyDeletion records in the provided transaction a migration key deletion. +func RecordKeyDeletion(txn storage.Transaction, relativePath string) error { + if err := txn.KV().Delete(migrationKey(relativePath)); err != nil { + return fmt.Errorf("deleting key: %w", err) + } + + return nil +} + +// uint64ToBytes marshals the provided uint64 into a slice of bytes. +func uint64ToBytes(i uint64) []byte { + val := make([]byte, binary.Size(i)) + binary.BigEndian.PutUint64(val, i) + return val +} + +// migrationKey generate the database key for storing migration data for a repository. +func migrationKey(relativePath string) []byte { + return []byte(migrationKeyPrefix + relativePath) +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go b/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go new file mode 100644 index 0000000000000000000000000000000000000000..68d835c81e74d280331f80deb49ad480dfa1601f --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go @@ -0,0 +1,141 @@ +package migration + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMigration_Run(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + migrationErr := errors.New("migration error") + + for _, tc := range []struct { + desc string + migration migration + relativePath string + expectedKV map[string][]byte + expectedErr error + }{ + { + desc: "migration misconfigured", + migration: migration{fn: nil}, + expectedErr: errInvalidMigration, + }, + { + desc: "migration returns error", + migration: migration{fn: func(context.Context, storage.Transaction) error { + return migrationErr + }}, + expectedErr: fmt.Errorf("migrate repository: %w", migrationErr), + }, + { + desc: "migration modifies transaction", + migration: migration{ + id: 1, + fn: func(_ context.Context, txn storage.Transaction) error { + return txn.KV().Set([]byte("foo"), []byte("bar")) + }, + }, + relativePath: "foobar", + expectedKV: map[string][]byte{ + "foo": []byte("bar"), + "m/foobar": uint64ToBytes(1), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + actualKV := map[string][]byte{} + txn := mockTransaction{ + kvFn: func() keyvalue.ReadWriter { + return &mockReadWriter{ + setFn: func(key, value []byte) error { + actualKV[string(key)] = value + return nil + }, + } + }, + } + + err := tc.migration.run(ctx, txn, "foobar") + if tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedKV, actualKV) + }) + } +} + +type mockTransaction struct { + storage.Transaction + kvFn func() keyvalue.ReadWriter + commitFn func(context.Context) error + rootFn func() string +} + +func (m mockTransaction) KV() keyvalue.ReadWriter { + if m.kvFn != nil { + return m.kvFn() + } + return nil +} + +func (m mockTransaction) Commit(ctx context.Context) error { + if m.commitFn != nil { + return m.commitFn(ctx) + } + return nil +} + +func (m mockTransaction) Rollback(context.Context) error { return nil } + +func (m mockTransaction) Root() string { + if m.rootFn != nil { + return m.rootFn() + } + return "" +} + +type mockReadWriter struct { + keyvalue.ReadWriter + getFn func(key []byte) (keyvalue.Item, error) + setFn func(key, value []byte) error +} + +func (m mockReadWriter) Get(key []byte) (keyvalue.Item, error) { + if m.getFn != nil { + return m.getFn(key) + } + return nil, nil +} + +func (m mockReadWriter) Set(key, value []byte) error { + if m.setFn != nil { + return m.setFn(key, value) + } + return nil +} + +type mockItem struct { + keyvalue.Item + valueFn func(fn func(value []byte) error) error +} + +func (m mockItem) Value(fn func(value []byte) error) error { + if m.valueFn != nil { + return m.valueFn(fn) + } + return nil +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/migration/testhelper_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7dc4e175bb18b01bc6bcab6ed21d8dc7ceb77a56 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/testhelper_test.go @@ -0,0 +1,11 @@ +package migration + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 66f7964e7adb2c65c78bbd22d797258eabb20e85..62ab00ae20a0e9260a175db1ab30f848b962ae04 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -32,6 +32,7 @@ import ( nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" @@ -354,14 +355,16 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte storagemgr.NewFactory( gsd.logger, dbMgr, - partition.NewFactory( - gsd.gitCmdFactory, - localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), - partition.NewMetrics( - housekeeping.NewMetrics(cfg.Prometheus), - snapshot.NewMetrics(), + migration.NewFactory( + partition.NewFactory( + gsd.gitCmdFactory, + localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), + partition.NewMetrics( + housekeeping.NewMetrics(cfg.Prometheus), + snapshot.NewMetrics(), + ), + nil, ), - nil, ), storagemgr.DefaultMaxInactivePartitions, storagemgr.NewMetrics(cfg.Prometheus),