From 90862f1891dce0e7ec7ff44041c49374b2abfdcb Mon Sep 17 00:00:00 2001 From: Justin Tobler Date: Fri, 8 Nov 2024 17:35:56 -0600 Subject: [PATCH 1/5] migration: Introduce migrations Repositories need a way to perform custom one-time operations that get committed to the WAL. These operations are plain go functions that, when executed, modify the specified transaction is some defined manner. To faciliate this, introduce the `migration.migration` type. When executed, the migration performs the configured function and writes a migration key to indicate the migration was been performed. --- .../partition/migration/migration.go | 61 +++++++++++ .../partition/migration/migration_test.go | 103 ++++++++++++++++++ 2 files changed, 164 insertions(+) create mode 100644 internal/gitaly/storage/storagemgr/partition/migration/migration.go create mode 100644 internal/gitaly/storage/storagemgr/partition/migration/migration_test.go diff --git a/internal/gitaly/storage/storagemgr/partition/migration/migration.go b/internal/gitaly/storage/storagemgr/partition/migration/migration.go new file mode 100644 index 0000000000..d7395e7e7b --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/migration.go @@ -0,0 +1,61 @@ +package migration + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" +) + +// errInvalidMigration is returned if a migration being run is improperly configured. +var errInvalidMigration = errors.New("invalid migration") + +const migrationKeyPrefix = "m/" + +// migration defines an individual migration job to be performed on a repository. +type migration struct { + // id is the unique identifier for a migration and used by the repository to record the last + // migration it performed. Subsequent migration jobs should always use increasing numbers. + id uint64 + // fn is the function executed to modify the WAL entry during transaction commit. + fn func(context.Context, storage.Transaction) error +} + +// run performs the migration job on the provided transaction. +func (m migration) run(ctx context.Context, txn storage.Transaction, relativePath string) error { + if m.fn == nil { + return errInvalidMigration + } + + if err := m.fn(ctx, txn); err != nil { + return fmt.Errorf("migrate repository: %w", err) + } + + // If migration operations are successfully recorded, the last run migration ID is also recorded + // signifying it has been completed. + if err := m.recordID(txn, relativePath); err != nil { + return fmt.Errorf("setting migration key: %w", err) + } + + return nil +} + +// recordID sets the migration ID to be recorded during a transaction. +func (m migration) recordID(txn storage.Transaction, relativePath string) error { + val := uint64ToBytes(m.id) + return txn.KV().Set(migrationKey(relativePath), val) +} + +// uint64ToBytes marshals the provided uint64 into a slice of bytes. +func uint64ToBytes(i uint64) []byte { + val := make([]byte, binary.Size(i)) + binary.BigEndian.PutUint64(val, i) + return val +} + +// migrationKey generate the database key for storing migration data for a repository. +func migrationKey(relativePath string) []byte { + return []byte(migrationKeyPrefix + relativePath) +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go b/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go new file mode 100644 index 0000000000..a09d0bbdda --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go @@ -0,0 +1,103 @@ +package migration + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMigration_Run(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + migrationErr := errors.New("migration error") + + for _, tc := range []struct { + desc string + migration migration + relativePath string + expectedKV map[string][]byte + expectedErr error + }{ + { + desc: "migration misconfigured", + migration: migration{fn: nil}, + expectedErr: errInvalidMigration, + }, + { + desc: "migration returns error", + migration: migration{fn: func(context.Context, storage.Transaction) error { + return migrationErr + }}, + expectedErr: fmt.Errorf("migrate repository: %w", migrationErr), + }, + { + desc: "migration modifies transaction", + migration: migration{ + id: 1, + fn: func(_ context.Context, txn storage.Transaction) error { + return txn.KV().Set([]byte("foo"), []byte("bar")) + }, + }, + relativePath: "foobar", + expectedKV: map[string][]byte{ + "foo": []byte("bar"), + "m/foobar": uint64ToBytes(1), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + actualKV := map[string][]byte{} + txn := mockTransaction{ + kvFn: func() keyvalue.ReadWriter { + return &mockReadWriter{ + setFn: func(key, value []byte) error { + actualKV[string(key)] = value + return nil + }, + } + }, + } + + err := tc.migration.run(ctx, txn, "foobar") + if tc.expectedErr != nil { + require.Equal(t, tc.expectedErr, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedKV, actualKV) + }) + } +} + +type mockTransaction struct { + storage.Transaction + kvFn func() keyvalue.ReadWriter +} + +func (m mockTransaction) KV() keyvalue.ReadWriter { + if m.kvFn != nil { + return m.kvFn() + } + return nil +} + +type mockReadWriter struct { + keyvalue.ReadWriter + setFn func(key, value []byte) error +} + +func (m mockReadWriter) Set(key, value []byte) error { + if m.setFn != nil { + return m.setFn(key, value) + } + return nil +} -- GitLab From 27c6584b7908d7b855b2942b008f60a1cd324387 Mon Sep 17 00:00:00 2001 From: Justin Tobler Date: Thu, 14 Nov 2024 16:10:54 -0600 Subject: [PATCH 2/5] migration: Initialize migration key on repository creation The migration key is used to determine if there are any outstanding migrations required for a specific repository. When a repository gets created it should have a key created in the same transaction as the repository is created in. When created, new repositories are expected to be up-to-date. Consequently, the value of the migration key should be the highest configured migration ID. Since there are currently no migrations configured, the key value will be initialized to zero. --- internal/backup/repository.go | 7 ++++++ internal/gitaly/service/objectpool/create.go | 7 ++++++ .../gitaly/service/repository/create_fork.go | 7 ++++++ .../service/repository/create_repository.go | 7 ++++++ .../create_repository_from_bundle.go | 7 ++++++ .../create_repository_from_snapshot.go | 7 ++++++ .../repository/create_repository_from_url.go | 7 ++++++ .../gitaly/service/repository/replicate.go | 7 ++++++ .../partition/migration/migration.go | 25 +++++++++++++++++-- 9 files changed, 79 insertions(+), 2 deletions(-) diff --git a/internal/backup/repository.go b/internal/backup/repository.go index 7d798480a4..cd5e554b81 100644 --- a/internal/backup/repository.go +++ b/internal/backup/repository.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/counter" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/helper/chunk" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -711,6 +712,12 @@ func (r *localRepository) Create(ctx context.Context, hash git.ObjectHash, defau return fmt.Errorf("local repository: create: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, r.repo.GetRelativePath()); err != nil { + return fmt.Errorf("recording migration key: %w", err) + } + } + // Recreate the local repository, since the cache of object hash and ref-format needs // to be invalidated. r.repo = localrepo.New(r.logger, r.locator, r.gitCmdFactory, r.catfileCache, r.repo) diff --git a/internal/gitaly/service/objectpool/create.go b/internal/gitaly/service/objectpool/create.go index 21dcadc638..2328583579 100644 --- a/internal/gitaly/service/objectpool/create.go +++ b/internal/gitaly/service/objectpool/create.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/objectpool" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -60,5 +61,11 @@ func (s *server) CreateObjectPool(ctx context.Context, in *gitalypb.CreateObject return nil, structerr.New("creating object pool: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, poolRepo.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateObjectPoolResponse{}, nil } diff --git a/internal/gitaly/service/repository/create_fork.go b/internal/gitaly/service/repository/create_fork.go index 89b5b245c9..5f0c1c82bb 100644 --- a/internal/gitaly/service/repository/create_fork.go +++ b/internal/gitaly/service/repository/create_fork.go @@ -10,6 +10,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -115,5 +116,11 @@ func (s *server) CreateFork(ctx context.Context, req *gitalypb.CreateForkRequest return nil, structerr.NewInternal("creating fork: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, targetRepository.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateForkResponse{}, nil } diff --git a/internal/gitaly/service/repository/create_repository.go b/internal/gitaly/service/repository/create_repository.go index 75c2d83d43..a189b29349 100644 --- a/internal/gitaly/service/repository/create_repository.go +++ b/internal/gitaly/service/repository/create_repository.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -41,5 +42,11 @@ func (s *server) CreateRepository(ctx context.Context, req *gitalypb.CreateRepos return nil, structerr.NewInternal("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, repository.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateRepositoryResponse{}, nil } diff --git a/internal/gitaly/service/repository/create_repository_from_bundle.go b/internal/gitaly/service/repository/create_repository_from_bundle.go index e20d137237..b191b05bb7 100644 --- a/internal/gitaly/service/repository/create_repository_from_bundle.go +++ b/internal/gitaly/service/repository/create_repository_from_bundle.go @@ -3,6 +3,7 @@ package repository import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/gitaly/v16/streamio" @@ -44,5 +45,11 @@ func (s *server) CreateRepositoryFromBundle(stream gitalypb.RepositoryService_Cr return structerr.NewInternal("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, repo.GetRelativePath()); err != nil { + return structerr.NewInternal("recording migration key: %w", err) + } + } + return stream.SendAndClose(&gitalypb.CreateRepositoryFromBundleResponse{}) } diff --git a/internal/gitaly/service/repository/create_repository_from_snapshot.go b/internal/gitaly/service/repository/create_repository_from_snapshot.go index afc88d0902..bdf94c3ff5 100644 --- a/internal/gitaly/service/repository/create_repository_from_snapshot.go +++ b/internal/gitaly/service/repository/create_repository_from_snapshot.go @@ -11,6 +11,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/command" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "gitlab.com/gitlab-org/labkit/correlation" @@ -148,5 +149,11 @@ func (s *server) CreateRepositoryFromSnapshot(ctx context.Context, in *gitalypb. return nil, structerr.NewInternal("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, repository.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateRepositoryFromSnapshotResponse{}, nil } diff --git a/internal/gitaly/service/repository/create_repository_from_url.go b/internal/gitaly/service/repository/create_repository_from_url.go index 9daea60360..de7ef11c19 100644 --- a/internal/gitaly/service/repository/create_repository_from_url.go +++ b/internal/gitaly/service/repository/create_repository_from_url.go @@ -14,6 +14,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitcmd" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -137,6 +138,12 @@ func (s *server) CreateRepositoryFromURL(ctx context.Context, req *gitalypb.Crea return nil, structerr.NewInternal("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, req.GetRepository().GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } + } + return &gitalypb.CreateRepositoryFromURLResponse{}, nil } diff --git a/internal/gitaly/service/repository/replicate.go b/internal/gitaly/service/repository/replicate.go index 54bf5fb04a..0aaa1cd3dd 100644 --- a/internal/gitaly/service/repository/replicate.go +++ b/internal/gitaly/service/repository/replicate.go @@ -18,6 +18,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/metadata" @@ -157,6 +158,12 @@ func (s *server) createFromSnapshot(ctx context.Context, source, target *gitalyp return fmt.Errorf("creating repository: %w", err) } + if tx := storage.ExtractTransaction(ctx); tx != nil { + if err := migration.RecordKeyCreation(tx, target.GetRelativePath()); err != nil { + return fmt.Errorf("recording migration key: %w", err) + } + } + return nil } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/migration.go b/internal/gitaly/storage/storagemgr/partition/migration/migration.go index d7395e7e7b..66504288c7 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/migration.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/migration.go @@ -9,8 +9,12 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" ) -// errInvalidMigration is returned if a migration being run is improperly configured. -var errInvalidMigration = errors.New("invalid migration") +var ( + // migrations is a list of configured migrations that must be performed on repositories. + migrations []migration + // errInvalidMigration is returned if a migration being run is improperly configured. + errInvalidMigration = errors.New("invalid migration") +) const migrationKeyPrefix = "m/" @@ -48,6 +52,23 @@ func (m migration) recordID(txn storage.Transaction, relativePath string) error return txn.KV().Set(migrationKey(relativePath), val) } +// RecordKeyCreation initializes the migration key for a new repository. +func RecordKeyCreation(txn storage.Transaction, relativePath string) error { + // Generally, migration keys should be initialized to the latest migration because we should not + // be created repositories with outdated state. The ID of the latest configured migration is + // recorded in the transaction. If no migrations are configured, the ID is set to zero. + var migr migration + if len(migrations) > 0 { + migr = migrations[len(migrations)-1] + } + + if err := migr.recordID(txn, relativePath); err != nil { + return fmt.Errorf("initializing key: %w", err) + } + + return nil +} + // uint64ToBytes marshals the provided uint64 into a slice of bytes. func uint64ToBytes(i uint64) []byte { val := make([]byte, binary.Size(i)) -- GitLab From 755993fd16ec617e568356e9635cbe8779b62da0 Mon Sep 17 00:00:00 2001 From: Justin Tobler Date: Tue, 12 Nov 2024 13:34:04 -0600 Subject: [PATCH 3/5] migration: Delete migration key on repository deletion When a repository gets deleted, the associated migration key should also be removed if it exists. Add the `migration.RecordKeyDeletion()` helper to do this and also update the `RemoveRepository` and `DeleteObjectPool` RPCs to invoke it. --- internal/gitaly/service/objectpool/delete.go | 4 ++++ internal/gitaly/service/repository/remove.go | 4 ++++ .../storage/storagemgr/partition/migration/migration.go | 9 +++++++++ 3 files changed, 17 insertions(+) diff --git a/internal/gitaly/service/objectpool/delete.go b/internal/gitaly/service/objectpool/delete.go index 3ccd06700a..010a7674c5 100644 --- a/internal/gitaly/service/objectpool/delete.go +++ b/internal/gitaly/service/objectpool/delete.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git/objectpool" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -28,6 +29,9 @@ func (s *server) DeleteObjectPool(ctx context.Context, in *gitalypb.DeleteObject if tx := storage.ExtractTransaction(ctx); tx != nil { tx.DeleteRepository() + if err := migration.RecordKeyDeletion(tx, pool.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } } return &gitalypb.DeleteObjectPoolResponse{}, nil diff --git a/internal/gitaly/service/repository/remove.go b/internal/gitaly/service/repository/remove.go index 551e8e8d10..42a4bca7b6 100644 --- a/internal/gitaly/service/repository/remove.go +++ b/internal/gitaly/service/repository/remove.go @@ -5,6 +5,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/repoutil" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -21,6 +22,9 @@ func (s *server) RemoveRepository(ctx context.Context, in *gitalypb.RemoveReposi if tx := storage.ExtractTransaction(ctx); tx != nil { tx.DeleteRepository() + if err := migration.RecordKeyDeletion(tx, repository.GetRelativePath()); err != nil { + return nil, structerr.NewInternal("recording migration key: %w", err) + } } return &gitalypb.RemoveRepositoryResponse{}, nil diff --git a/internal/gitaly/storage/storagemgr/partition/migration/migration.go b/internal/gitaly/storage/storagemgr/partition/migration/migration.go index 66504288c7..96bf726b0e 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/migration.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/migration.go @@ -69,6 +69,15 @@ func RecordKeyCreation(txn storage.Transaction, relativePath string) error { return nil } +// RecordKeyDeletion records in the provided transaction a migration key deletion. +func RecordKeyDeletion(txn storage.Transaction, relativePath string) error { + if err := txn.KV().Delete(migrationKey(relativePath)); err != nil { + return fmt.Errorf("deleting key: %w", err) + } + + return nil +} + // uint64ToBytes marshals the provided uint64 into a slice of bytes. func uint64ToBytes(i uint64) []byte { val := make([]byte, binary.Size(i)) -- GitLab From 134b4845cba8c691c6ec5ff6472c7d09b1db789f Mon Sep 17 00:00:00 2001 From: Justin Tobler Date: Thu, 24 Oct 2024 15:39:15 -0500 Subject: [PATCH 4/5] migration: Introduce migration framework It is expected that all outstanding migrations are executed for a repository prior to it serving traffic. To facilitate this, before beginning a transaction, the repository is checked for outstanding migrations. If any are identified, all operations are blocked on the repository until the required migrations finish. Once complete, all repository traffic that was blocked is able to resume and start transactions. To coordinate migrations on a single partition, `migration.migrationManager` is introduced. It ensures that only a single migration is run at a time and handles blocking concurrent requests to the same repository. --- .../storagemgr/partition/migration/manager.go | 235 +++++++++++ .../partition/migration/manager_test.go | 399 ++++++++++++++++++ .../partition/migration/migration.go | 2 + .../partition/migration/migration_test.go | 40 +- .../partition/migration/testhelper_test.go | 11 + 5 files changed, 686 insertions(+), 1 deletion(-) create mode 100644 internal/gitaly/storage/storagemgr/partition/migration/manager.go create mode 100644 internal/gitaly/storage/storagemgr/partition/migration/manager_test.go create mode 100644 internal/gitaly/storage/storagemgr/partition/migration/testhelper_test.go diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager.go b/internal/gitaly/storage/storagemgr/partition/migration/manager.go new file mode 100644 index 0000000000..5724a595e4 --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager.go @@ -0,0 +1,235 @@ +package migration + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "sync" + + "github.com/dgraph-io/badger/v4" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" +) + +// migrationState defines the state of a migration for a repository. +type migrationState struct { + // doneCh is closed once the repository has no ongoing migrations. + doneCh <-chan struct{} + // err indicates if there was an error during the migration process. + err error +} + +// migrationManager coordinates executing repository migrations. +type migrationManager struct { + storagemgr.Partition + mu sync.Mutex + logger log.Logger + // migrations defines all migration jobs that are expected to be performed on a repository + // before it can process incoming transactions. + migrations []migration + // migrationStates defines the state of a repository migration and is used to block concurrent + // transactions on the same repository while a migration is pending. + migrationStates map[string]*migrationState +} + +func (m *migrationManager) Begin(ctx context.Context, opts storage.BeginOptions) (storage.Transaction, error) { + if err := m.migrate(ctx, opts.RelativePaths); err != nil { + return nil, fmt.Errorf("migrate: %w", err) + } + + return m.Partition.Begin(ctx, opts) +} + +// migrate handles setting up migration state and executing outstanding migrations. +func (m *migrationManager) migrate(ctx context.Context, relativePaths []string) error { + // To perform a migration, the manager must have migrations configured and the transaction must + // target a repository. If not, skip migration handling and proceed with the transaction. + if len(m.migrations) == 0 || len(relativePaths) == 0 { + return nil + } + + relativePath := relativePaths[0] + + // Check if the repository already has a pending migration. + m.mu.Lock() + state, ok := m.migrationStates[relativePath] + if !ok { + doneCh := make(chan struct{}) + defer close(doneCh) + state = &migrationState{doneCh: doneCh} + m.migrationStates[relativePath] = state + } + m.mu.Unlock() + + // Block concurrent transactions on the same repository until outstanding migrations complete. + if ok { + select { + case <-ctx.Done(): + return ctx.Err() + case <-state.doneCh: + if state.err != nil { + // Migrations are required to succeed before the repository can serve traffic. + return fmt.Errorf("waiting on migrations: %w", state.err) + } + return nil + } + } + + // To avoid migration failures due to request context cancellation, a copy that is not canceled + // when parent is canceled is used. + if err := m.performMigrations(context.WithoutCancel(ctx), relativePaths); err != nil { + // Record the error as part of the migration state so concurrent transactions are notified. + state.err = err + return fmt.Errorf("performing migrations: %w", err) + } + + return nil +} + +// performMigrations performs any missing migrations on a repository. +func (m *migrationManager) performMigrations(ctx context.Context, relativePaths []string) (returnedErr error) { + relativePath := relativePaths[0] + + id, err := m.getLastMigrationID(ctx, relativePath) + if errors.Is(err, storage.ErrRepositoryNotFound) { + // If the repository is not found pretend the repository is up-to-date with migrations and + // let the downstream transaction set the migration key during repository creation. + return nil + } else if err != nil { + return fmt.Errorf("getting last migration: %w", err) + } + + // If the repository is already up-to-date, there is no need to start a transaction and perform + // migrations. + maxID := m.migrations[len(m.migrations)-1].id + if id == maxID { + return nil + } else if id > maxID { + return fmt.Errorf("repository has invalid migration key: %d", id) + } + + // Start a single transaction that records all outstanding migrations that get executed. + txn, err := m.Partition.Begin(ctx, storage.BeginOptions{ + Write: true, + RelativePaths: relativePaths, + }) + if err != nil { + return fmt.Errorf("begin migration update: %w", err) + } + defer func() { + if returnedErr != nil { + if err := txn.Rollback(ctx); err != nil { + returnedErr = errors.Join(err, fmt.Errorf("rollback: %w", err)) + } + } + }() + + for _, migration := range m.migrations { + if id >= migration.id { + continue + } + + // A migration may have configuration allowing it to be disabled. As migrations are + // performed in order, if a disabled migration is encountered, the remaining migrations are + // also not executed. Since repository migrations are currently only attempted once for a + // repository during the partition lifetime, a previously disabled migration may not + // immediately be executed in the next transaction. Migration state must first be reset. + if migration.isDisabled != nil && migration.isDisabled(ctx) { + break + } + + m.logger.WithFields(log.Fields{ + "migration_id": migration.id, + "relative_path": relativePath, + }).Info("running migration") + + if err := migration.run(ctx, txn, relativePath); err != nil { + return fmt.Errorf("run migration: %w", err) + } + } + + if err := txn.Commit(ctx); err != nil { + return fmt.Errorf("commit migration update: %w", err) + } + + return nil +} + +// getLastMigrationID returns the ID of the last executed migration for a repository. +func (m *migrationManager) getLastMigrationID(ctx context.Context, relativePath string) (_ uint64, returnedErr error) { + item, repoExists, err := m.readMigrationKey(ctx, relativePath) + if err != nil { + return 0, fmt.Errorf("reading migration key: %w", err) + } + + // If the repository does not exist, is it expected to be created by the downstream transaction. + if !repoExists { + return 0, storage.ErrRepositoryNotFound + } + + // If the repository does exist, it means the repository has never had a migration run. + // All configured migrations should be run against the migration. + if item == nil { + return 0, nil + } + + var id uint64 + _ = item.Value(func(value []byte) error { + id = binary.BigEndian.Uint64(value) + return nil + }) + + return id, nil +} + +// readMigrationKey returns the value for a repository migration key in a transaction and also +// returns if the repository exists on disk. If no key exists, nil is returned for the item value. +func (m *migrationManager) readMigrationKey(ctx context.Context, relativePath string) (_ keyvalue.Item, _ bool, returnedErr error) { + txn, err := m.Partition.Begin(ctx, storage.BeginOptions{RelativePaths: []string{relativePath}}) + if err != nil { + return nil, false, fmt.Errorf("begin migration key transaction: %w", err) + } + defer func() { + if returnedErr != nil { + if err := txn.Rollback(ctx); err != nil { + returnedErr = errors.Join(err, fmt.Errorf("rollback: %w", err)) + } + } + }() + + repoExists := true + item, err := txn.KV().Get(migrationKey(relativePath)) + switch { + case errors.Is(err, badger.ErrKeyNotFound): + // If no migration key is found, it means either the repository is being created or the + // repository has never performed a migration before. + repoExists, err = checkRepoExists(filepath.Join(txn.Root(), relativePath)) + if err != nil { + return nil, false, fmt.Errorf("check repo exists: %w", err) + } + case err != nil: + return nil, false, fmt.Errorf("getting migration key: %w", err) + } + + if err := txn.Commit(ctx); returnedErr == nil && err != nil { + return nil, false, fmt.Errorf("commit migration key transaction: %w", err) + } + + return item, repoExists, nil +} + +func checkRepoExists(repoPath string) (bool, error) { + if _, err := os.Stat(repoPath); err != nil { + if errors.Is(err, fs.ErrNotExist) { + return false, nil + } + return false, err + } + return true, nil +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go new file mode 100644 index 0000000000..7bd552a0ca --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager_test.go @@ -0,0 +1,399 @@ +package migration + +import ( + "context" + "errors" + "os" + "path/filepath" + "testing" + "time" + + "github.com/dgraph-io/badger/v4" + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" +) + +func TestMigrationManager_Begin(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + disabledFn := func(context.Context) bool { return true } + migrationErr := errors.New("migration error") + + errFn := func(context.Context, storage.Transaction) error { return migrationErr } + recordingFn := func(id uint64) func(_ context.Context, txn storage.Transaction) error { + return func(_ context.Context, txn storage.Transaction) error { + return txn.KV().Set(uint64ToBytes(id), nil) + } + } + + migrationFn := func(id uint64) migration { + return migration{ + id: id, + fn: recordingFn(id), + } + } + + for _, tc := range []struct { + desc string + migrations []migration + startingMigration *migration + noRepository bool + expectedState *migrationState + expectedMigrationIDs map[uint64]struct{} + expectedErr error + }{ + { + desc: "no migrations configured", + migrations: nil, + expectedState: nil, + expectedMigrationIDs: nil, + }, + { + desc: "repository does not exist", + migrations: []migration{migrationFn(1)}, + startingMigration: nil, + noRepository: true, + expectedState: &migrationState{}, + expectedMigrationIDs: nil, + }, + { + desc: "no migration key in preexisting repository", + migrations: []migration{migrationFn(1), migrationFn(2)}, + startingMigration: nil, + noRepository: false, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{1: {}, 2: {}}, + }, + { + desc: "no outstanding migrations", + migrations: []migration{migrationFn(1), migrationFn(2)}, + startingMigration: &migration{id: 2}, + expectedState: &migrationState{}, + expectedMigrationIDs: nil, + }, + { + desc: "single outstanding migration applied", + migrations: []migration{migrationFn(1), migrationFn(2)}, + startingMigration: &migration{id: 1}, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{2: {}}, + }, + { + desc: "multiple outstanding migration applied", + migrations: []migration{migrationFn(1), migrationFn(2), migrationFn(3)}, + startingMigration: &migration{id: 1}, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{2: {}, 3: {}}, + }, + { + desc: "disabled migration", + migrations: []migration{migrationFn(1), {id: 2, isDisabled: disabledFn, fn: recordingFn(2)}, migrationFn(3)}, + startingMigration: &migration{id: 0}, + expectedState: &migrationState{}, + expectedMigrationIDs: map[uint64]struct{}{1: {}}, + }, + { + desc: "error returned during migrations", + migrations: []migration{migrationFn(1), {id: 2, fn: errFn}, migrationFn(3)}, + startingMigration: &migration{id: 1}, + expectedState: &migrationState{ + err: migrationErr, + }, + expectedMigrationIDs: nil, + expectedErr: migrationErr, + }, + { + desc: "starting migration key invalid", + migrations: []migration{migrationFn(1), migrationFn(2), migrationFn(3)}, + startingMigration: &migration{id: 4}, + expectedState: &migrationState{ + err: errors.New("repository has invalid migration key: 4"), + }, + expectedMigrationIDs: nil, + expectedErr: errors.New("repository has invalid migration key: 4"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + cfg := testcfg.Build(t) + + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + relativePath := repo.GetRelativePath() + if tc.noRepository { + relativePath = "does-not-exist" + } + + testPartitionID := storage.PartitionID(1) + logger := testhelper.NewLogger(t) + database, err := keyvalue.NewBadgerStore(testhelper.SharedLogger(t), t.TempDir()) + require.NoError(t, err) + defer testhelper.MustClose(t, database) + + storageName := cfg.Storages[0].Name + storagePath := cfg.Storages[0].Path + + stateDir := filepath.Join(storagePath, "state") + require.NoError(t, os.MkdirAll(stateDir, mode.Directory)) + + stagingDir := filepath.Join(storagePath, "staging") + require.NoError(t, os.Mkdir(stagingDir, mode.Directory)) + + cmdFactory := gittest.NewCommandFactory(t, cfg) + cache := catfile.NewCache(cfg) + defer cache.Stop() + + repositoryFactory, err := localrepo.NewFactory( + logger, config.NewLocator(cfg), cmdFactory, cache, + ).ScopeByStorage(ctx, cfg.Storages[0].Name) + require.NoError(t, err) + + m := partition.NewMetrics( + housekeeping.NewMetrics(cfg.Prometheus), + snapshot.NewMetrics(), + ).Scope(storageName) + + tm := partition.NewTransactionManager( + testPartitionID, + logger, + database, + storageName, + storagePath, + stateDir, + stagingDir, + cmdFactory, + repositoryFactory, + m, + nil, + ) + + mm := migrationManager{ + Partition: tm, + logger: logger, + migrations: tc.migrations, + migrationStates: map[string]*migrationState{}, + } + + managerErr := make(chan error) + go func() { + managerErr <- tm.Run() + }() + + if tc.startingMigration != nil { + txn, err := tm.Begin(ctx, storage.BeginOptions{ + Write: true, + RelativePaths: []string{relativePath}, + }) + require.NoError(t, err) + + require.NoError(t, txn.KV().Set(migrationKey(relativePath), uint64ToBytes(tc.startingMigration.id))) + require.NoError(t, txn.Commit(ctx)) + } + + // Begin and commit transaction through the migration manager to exercise the migration logic. + if txn, err := mm.Begin(ctx, storage.BeginOptions{ + Write: false, + RelativePaths: []string{relativePath}, + }); err != nil { + require.ErrorContains(t, err, tc.expectedErr.Error()) + } else { + require.NoError(t, err) + + // In this test, each executed migration records its ID in the KV store. Validate + // that the expected migrations were performed. + for _, m := range tc.migrations { + _, expected := tc.expectedMigrationIDs[m.id] + if _, err := txn.KV().Get(uint64ToBytes(m.id)); err != nil { + require.ErrorIs(t, err, badger.ErrKeyNotFound) + require.False(t, expected) + } else { + require.NoError(t, err) + require.True(t, expected) + } + } + + require.NoError(t, txn.Commit(ctx)) + } + + if state, ok := mm.migrationStates[relativePath]; ok { + require.NotNil(t, tc.expectedState) + if tc.expectedState.err != nil { + require.ErrorContains(t, state.err, tc.expectedState.err.Error()) + } else { + require.NoError(t, state.err) + } + } else { + require.Nil(t, tc.expectedState) + require.Empty(t, mm.migrationStates) + } + + tm.Close() + require.NoError(t, tm.CloseSnapshots()) + require.NoError(t, <-managerErr) + }) + } +} + +func TestMigrationManager_Concurrent(t *testing.T) { + t.Parallel() + ctx := testhelper.Context(t) + noopFn := func(context.Context, storage.Transaction) error { return nil } + + setupMockPartition := func(firstTransactionFn func(context.Context) error) *mockPartition { + kvFn := func() keyvalue.ReadWriter { + return &mockReadWriter{ + getFn: func(key []byte) (keyvalue.Item, error) { + return &mockItem{ + valueFn: func(fn func(value []byte) error) error { + return fn(uint64ToBytes(0)) + }, + }, nil + }, + } + } + + firstTransaction := true + return &mockPartition{ + beginFn: func(context.Context, storage.BeginOptions) (storage.Transaction, error) { + if firstTransaction { + firstTransaction = false + return &mockTransaction{ + kvFn: kvFn, + commitFn: firstTransactionFn, + }, nil + } + return &mockTransaction{kvFn: kvFn}, nil + }, + } + } + + for _, tc := range []struct { + desc string + samePath bool + expectedBlocked bool + expectedErr error + }{ + { + desc: "same repo concurrent transaction blocked", + samePath: true, + expectedBlocked: true, + }, + { + desc: "different repo concurrent transaction not blocked", + samePath: false, + expectedBlocked: false, + }, + { + desc: "failed migration propagated to concurrent transaction", + samePath: true, + expectedBlocked: true, + expectedErr: errors.New("migration failed"), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + path1, path2 := "foo", "bar" + if tc.samePath { + path1 = path2 + } + + // The underlying transaction manager is mocked to provide a means to control when the + // first transaction gets committed. In this test, the first transaction that gets + // opened happens as part of the migration. Blocking here allows validation of + // concurrent transactions to ensure they are being blocked when required. + firstTransactionStarted := make(chan struct{}) + firstTransactionBlocked := make(chan struct{}) + mockPartition := setupMockPartition(func(ctx context.Context) error { + close(firstTransactionStarted) + <-firstTransactionBlocked + return tc.expectedErr + }) + + // In this test, the configured migrations are never executed because the repository + // does not exist in the snapshot. The migration is configured only to trigger the + // migration manager to block concurrent transactions. + mm := migrationManager{ + Partition: mockPartition, + logger: testhelper.NewLogger(t), + migrations: []migration{{id: 1, fn: noopFn}}, + migrationStates: map[string]*migrationState{}, + } + + // Start a transaction that triggers a migration. The mocks are set up in a such a way + // that migration manager determines no migrations need to be performed. + errCh1 := make(chan error) + go func() { + _, err := mm.Begin(ctx, storage.BeginOptions{ + RelativePaths: []string{path1}, + }) + errCh1 <- err + }() + + // Start a second transaction after the migration has started in the first. + errCh2 := make(chan error) + go func() { + <-firstTransactionStarted + _, err := mm.Begin(ctx, storage.BeginOptions{ + RelativePaths: []string{path2}, + }) + + // When a concurrent transaction is started with the same relative path, it is + // expected to be blocked until the migration has completed. If the concurrent + // transaction is started with a different relative path it can proceed without + // being blocked. + select { + case <-firstTransactionBlocked: + if !tc.expectedBlocked { + require.Fail(t, "transaction was not blocked") + } + default: + if tc.expectedBlocked { + require.Fail(t, "transaction was not blocked") + } + } + errCh2 <- err + }() + + // Wait a small amount of time before releasing the first transaction to ensure + // concurrent transaction against the same repository are blocked and concurrent + // transactions against a different repository are not blocked. + time.Sleep(time.Second) + close(firstTransactionBlocked) + + if tc.expectedErr != nil { + // If the migration returns an error, it is expected that the error message be + // propagated to the blocked concurrent transactions. + require.ErrorIs(t, <-errCh1, tc.expectedErr) + require.ErrorIs(t, <-errCh2, tc.expectedErr) + } else { + // If the migration succeeds, blocked concurrent transaction are expected to proceed + // without error. + require.NoError(t, <-errCh1) + require.NoError(t, <-errCh2) + } + }) + } +} + +type mockPartition struct { + storagemgr.Partition + beginFn func(context.Context, storage.BeginOptions) (storage.Transaction, error) +} + +func (m mockPartition) Begin(ctx context.Context, opts storage.BeginOptions) (storage.Transaction, error) { + return m.beginFn(ctx, opts) +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/migration.go b/internal/gitaly/storage/storagemgr/partition/migration/migration.go index 96bf726b0e..80d0760781 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/migration.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/migration.go @@ -25,6 +25,8 @@ type migration struct { id uint64 // fn is the function executed to modify the WAL entry during transaction commit. fn func(context.Context, storage.Transaction) error + // isDisabled defines an optional check to prevent a migration from being executed. + isDisabled func(ctx context.Context) bool } // run performs the migration job on the provided transaction. diff --git a/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go b/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go index a09d0bbdda..68d835c81e 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/migration_test.go @@ -80,7 +80,9 @@ func TestMigration_Run(t *testing.T) { type mockTransaction struct { storage.Transaction - kvFn func() keyvalue.ReadWriter + kvFn func() keyvalue.ReadWriter + commitFn func(context.Context) error + rootFn func() string } func (m mockTransaction) KV() keyvalue.ReadWriter { @@ -90,14 +92,50 @@ func (m mockTransaction) KV() keyvalue.ReadWriter { return nil } +func (m mockTransaction) Commit(ctx context.Context) error { + if m.commitFn != nil { + return m.commitFn(ctx) + } + return nil +} + +func (m mockTransaction) Rollback(context.Context) error { return nil } + +func (m mockTransaction) Root() string { + if m.rootFn != nil { + return m.rootFn() + } + return "" +} + type mockReadWriter struct { keyvalue.ReadWriter + getFn func(key []byte) (keyvalue.Item, error) setFn func(key, value []byte) error } +func (m mockReadWriter) Get(key []byte) (keyvalue.Item, error) { + if m.getFn != nil { + return m.getFn(key) + } + return nil, nil +} + func (m mockReadWriter) Set(key, value []byte) error { if m.setFn != nil { return m.setFn(key, value) } return nil } + +type mockItem struct { + keyvalue.Item + valueFn func(fn func(value []byte) error) error +} + +func (m mockItem) Value(fn func(value []byte) error) error { + if m.valueFn != nil { + return m.valueFn(fn) + } + return nil +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/migration/testhelper_test.go new file mode 100644 index 0000000000..7dc4e175bb --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/testhelper_test.go @@ -0,0 +1,11 @@ +package migration + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} -- GitLab From 2de197a103019e2b7ee481cf48efb23957ba1e6f Mon Sep 17 00:00:00 2001 From: Justin Tobler Date: Thu, 24 Oct 2024 15:55:57 -0500 Subject: [PATCH 5/5] partition: Wire migration manager The `partition.Factory` is used to configure a `storagemgr.Partition`. The previously introduced `migration.migrationManager` wraps `partition.TransactionManager` to provide migration handling for repositories in a partition. Introduce a new `migration.migrationFactory` that wraps a `storagemrg.PartitionFactory` in a similar fashion. Update call sites to leverage this new factory to wire usage of the migration manager. --- internal/cli/gitaly/serve.go | 25 +++++++++------ internal/cli/gitaly/subcmd_recovery.go | 13 +++++--- .../storagemgr/partition/migration/factory.go | 32 +++++++++++++++++++ .../storagemgr/partition/migration/manager.go | 10 ++++++ internal/testhelper/testserver/gitaly.go | 17 ++++++---- 5 files changed, 75 insertions(+), 22 deletions(-) create mode 100644 internal/gitaly/storage/storagemgr/partition/migration/factory.go diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 00e5dfd4e4..f4a27a3438 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -39,6 +39,7 @@ import ( nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" @@ -409,11 +410,13 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { storagemgr.NewFactory( logger, dbMgr, - partition.NewFactory( - gitCmdFactory, - localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), - partitionMetrics, - logConsumer, + migration.NewFactory( + partition.NewFactory( + gitCmdFactory, + localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), + partitionMetrics, + logConsumer, + ), ), 2, storageMetrics, @@ -455,11 +458,13 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { storagemgr.NewFactory( logger, dbMgr, - partition.NewFactory( - gitCmdFactory, - localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), - partitionMetrics, - nil, + migration.NewFactory( + partition.NewFactory( + gitCmdFactory, + localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), + partitionMetrics, + nil, + ), ), // In recovery mode we don't want to keep inactive partitions active. The cache // however can't be disabled so simply set it to one. diff --git a/internal/cli/gitaly/subcmd_recovery.go b/internal/cli/gitaly/subcmd_recovery.go index b348e9ba5b..ba71551c21 100644 --- a/internal/cli/gitaly/subcmd_recovery.go +++ b/internal/cli/gitaly/subcmd_recovery.go @@ -22,6 +22,7 @@ import ( nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/log" @@ -122,11 +123,13 @@ func recoveryStatusAction(ctx *cli.Context) (returnErr error) { storagemgr.NewFactory( logger, dbMgr, - partition.NewFactory( - gitCmdFactory, - localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), - partitionMetrics, - nil, + migration.NewFactory( + partition.NewFactory( + gitCmdFactory, + localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), + partitionMetrics, + nil, + ), ), 1, storageMetrics, diff --git a/internal/gitaly/storage/storagemgr/partition/migration/factory.go b/internal/gitaly/storage/storagemgr/partition/migration/factory.go new file mode 100644 index 0000000000..eed0e3ca0b --- /dev/null +++ b/internal/gitaly/storage/storagemgr/partition/migration/factory.go @@ -0,0 +1,32 @@ +package migration + +import ( + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" +) + +// migrationFactory defines a partition factory that wraps another partition factory. +type migrationFactory struct { + factory storagemgr.PartitionFactory +} + +// NewFactory returns a new Factory. +func NewFactory(factory storagemgr.PartitionFactory) storagemgr.PartitionFactory { + return &migrationFactory{factory: factory} +} + +// New returns a new Partition instance. +func (f migrationFactory) New( + logger log.Logger, + partitionID storage.PartitionID, + db keyvalue.Transactioner, + storageName string, + storagePath string, + absoluteStateDir string, + stagingDir string, +) storagemgr.Partition { + partition := f.factory.New(logger, partitionID, db, storageName, storagePath, absoluteStateDir, stagingDir) + return NewPartition(partition, logger) +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/manager.go b/internal/gitaly/storage/storagemgr/partition/migration/manager.go index 5724a595e4..2fe1e8c92d 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/manager.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/manager.go @@ -38,6 +38,16 @@ type migrationManager struct { migrationStates map[string]*migrationState } +// NewPartition creates a migration manager that wraps the provided partition. +func NewPartition(partition storagemgr.Partition, logger log.Logger) storagemgr.Partition { + return &migrationManager{ + Partition: partition, + logger: logger, + migrations: migrations, + migrationStates: map[string]*migrationState{}, + } +} + func (m *migrationManager) Begin(ctx context.Context, opts storage.BeginOptions) (storage.Transaction, error) { if err := m.migrate(ctx, opts.RelativePaths); err != nil { return nil, fmt.Errorf("migrate: %w", err) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 66f7964e7a..62ab00ae20 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -32,6 +32,7 @@ import ( nodeimpl "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/node" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/migration" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/snapshot" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" @@ -354,14 +355,16 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Conte storagemgr.NewFactory( gsd.logger, dbMgr, - partition.NewFactory( - gsd.gitCmdFactory, - localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), - partition.NewMetrics( - housekeeping.NewMetrics(cfg.Prometheus), - snapshot.NewMetrics(), + migration.NewFactory( + partition.NewFactory( + gsd.gitCmdFactory, + localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), + partition.NewMetrics( + housekeeping.NewMetrics(cfg.Prometheus), + snapshot.NewMetrics(), + ), + nil, ), - nil, ), storagemgr.DefaultMaxInactivePartitions, storagemgr.NewMetrics(cfg.Prometheus), -- GitLab