diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 0531be28fa4fc4b51fd8adfa6dab70fbe332dc16..51d33b5356820038093b79582c680f6fc9660ca2 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -41,7 +41,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/migration" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/migration/reftable" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v18/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/backchannel" @@ -370,8 +369,7 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { raftMetrics := raftmgr.NewMetrics() partitionMetrics := partition.NewMetrics(housekeepingMetrics) migrationMetrics := migration.NewMetrics() - reftableMigratorMetrics := reftable.NewMetrics() - prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics, raftMetrics, reftableMigratorMetrics) + prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics, raftMetrics) migrations := []migration.Migration{ migration.NewLeftoverFileMigration(locator), @@ -501,18 +499,12 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { return err } - reftableMigrator := reftable.NewMigrator(logger, reftableMigratorMetrics, node, localrepoFactory) - reftableMigrator.Run() - defer reftableMigrator.Close() - txMiddleware = server.TransactionMiddleware{ UnaryInterceptors: []grpc.UnaryServerInterceptor{ storagemgr.NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, txRegistry, node, locator), - reftable.NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator), }, StreamInterceptors: []grpc.StreamServerInterceptor{ storagemgr.NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, txRegistry, node, locator), - reftable.NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator), }, } } else { diff --git a/internal/git/localrepo/refs.go b/internal/git/localrepo/refs.go index 55c1fe0db24805a247aea186956b4086acd1ef79..cde4c391df271362599c0c503ffce632c4284f74 100644 --- a/internal/git/localrepo/refs.go +++ b/internal/git/localrepo/refs.go @@ -149,23 +149,17 @@ func (repo *Repo) UpdateRef(ctx context.Context, reference git.ReferenceName, ne // SetDefaultBranch sets the repository's HEAD to point to the given reference. // It will not verify the reference actually exists. func (repo *Repo) SetDefaultBranch(ctx context.Context, txManager transaction.Manager, reference git.ReferenceName) error { - version, err := repo.GitVersion(ctx) - if err != nil { - return fmt.Errorf("detecting Git version: %w", err) - } - if err := git.ValidateReference(reference.String()); err != nil { return fmt.Errorf("%q is a malformed refname", reference) } - return repo.setDefaultBranchWithUpdateRef(ctx, reference, version) + return repo.setDefaultBranchWithUpdateRef(ctx, reference) } // setDefaultBranchWithUpdateRef uses 'symref-update' command to update HEAD. func (repo *Repo) setDefaultBranchWithUpdateRef( ctx context.Context, reference git.ReferenceName, - version git.Version, ) (err error) { updater, err := updateref.New(ctx, repo, updateref.WithNoDeref()) if err != nil { @@ -182,7 +176,7 @@ func (repo *Repo) setDefaultBranchWithUpdateRef( return fmt.Errorf("start: %w", err) } - if err = updater.UpdateSymbolicReference(version, "HEAD", reference); err != nil { + if err = updater.UpdateSymbolicReference("HEAD", reference); err != nil { return fmt.Errorf("update: %w", err) } diff --git a/internal/git/localrepo/refs_external_test.go b/internal/git/localrepo/refs_external_test.go index a399619af47f2584f8d0a5c550a03c755f917c5c..38c2cf0e4d29987e40f445efe722b6dd22b7bc88 100644 --- a/internal/git/localrepo/refs_external_test.go +++ b/internal/git/localrepo/refs_external_test.go @@ -159,14 +159,11 @@ func TestRepo_SetDefaultBranch_errors(t *testing.T) { ref, err := repo.HeadReference(ctx) require.NoError(t, err) - version, err := repo.GitVersion(ctx) - require.NoError(t, err) - updater, err := updateref.New(ctx, repo) require.NoError(t, err) require.NoError(t, updater.Start()) - require.NoError(t, updater.UpdateSymbolicReference(version, "HEAD", "refs/heads/temp")) + require.NoError(t, updater.UpdateSymbolicReference("HEAD", "refs/heads/temp")) require.NoError(t, updater.Prepare()) t.Cleanup(func() { require.NoError(t, updater.Close()) }) diff --git a/internal/git/reference.go b/internal/git/reference.go index cb888e620a4552d65dc9e59d9e52e80f904e358c..5787159f8a66d9eef8048f7d1798bf5d1ad2f7d8 100644 --- a/internal/git/reference.go +++ b/internal/git/reference.go @@ -225,3 +225,9 @@ func ValidateReference(name string) error { name = tail } } + +// CreateRefURI creates the GIT_REF_URI value for the given +// reference backend and path. +func CreateRefURI(backend ReferenceBackend, path string) string { + return fmt.Sprintf("%s://%s", backend.Name, path) +} diff --git a/internal/git/updateref/updateref.go b/internal/git/updateref/updateref.go index 8f85d000ba4e721028196a7b2db70c6506b243d1..21ee5fff1cfd566d8f9766fc07dc00f1fc64a722 100644 --- a/internal/git/updateref/updateref.go +++ b/internal/git/updateref/updateref.go @@ -270,6 +270,7 @@ type UpdaterOpt func(*updaterConfig) type updaterConfig struct { disableTransactions bool noDeref bool + refURI string } // WithDisabledTransactions disables hooks such that no reference-transactions @@ -280,6 +281,14 @@ func WithDisabledTransactions() UpdaterOpt { } } +// WithRefURI allows sets the GIT_REF_URI env variable which allows using +// an alternate reference directory for all reference operations. +func WithRefURI(uri string) UpdaterOpt { + return func(cfg *updaterConfig) { + cfg.refURI = uri + } +} + // WithNoDeref disables de-reference while updating ref. If this option is turned on, // itself is overwritten, rather than the result of following the symbolic ref. func WithNoDeref() UpdaterOpt { @@ -310,21 +319,29 @@ func New(ctx context.Context, repo gitcmd.RepositoryExecutor, opts ...UpdaterOpt txOption = gitcmd.WithDisabledHooks() } + var stderr bytes.Buffer + cmdOpts := []gitcmd.CmdOpt{ + txOption, + gitcmd.WithSetupStdin(), + gitcmd.WithSetupStdout(), + gitcmd.WithStderr(&stderr), + } + + if len(cfg.refURI) != 0 { + cmdOpts = append(cmdOpts, gitcmd.WithEnv("GIT_REF_URI="+cfg.refURI)) + } + cmdFlags := []gitcmd.Option{gitcmd.Flag{Name: "-z"}, gitcmd.Flag{Name: "--stdin"}} if cfg.noDeref { cmdFlags = append(cmdFlags, gitcmd.Flag{Name: "--no-deref"}) } - var stderr bytes.Buffer cmd, err := repo.Exec(ctx, gitcmd.Command{ Name: "update-ref", Flags: cmdFlags, }, - txOption, - gitcmd.WithSetupStdin(), - gitcmd.WithSetupStdout(), - gitcmd.WithStderr(&stderr), + cmdOpts..., ) if err != nil { return nil, err @@ -397,7 +414,7 @@ func (u *Updater) Update(reference git.ReferenceName, newOID, oldOID git.ObjectI // UpdateSymbolicReference is used to do a symbolic reference update. We can potentially provide the oldTarget // or the oldOID. -func (u *Updater) UpdateSymbolicReference(version git.Version, reference, newTarget git.ReferenceName) error { +func (u *Updater) UpdateSymbolicReference(reference, newTarget git.ReferenceName) error { if err := u.expectState(stateStarted); err != nil { return err } diff --git a/internal/git/updateref/updateref_test.go b/internal/git/updateref/updateref_test.go index 7a8c4b50f7b2ed9a52787e62c22c526164d1d133..5fbe088e5314abbf8076bc8932f2a5f427b15ed4 100644 --- a/internal/git/updateref/updateref_test.go +++ b/internal/git/updateref/updateref_test.go @@ -964,17 +964,14 @@ func TestUpdater_symrefs(t *testing.T) { ctx := testhelper.Context(t) - cfg, executor, repoPath, updater := setupUpdater(t, ctx) + cfg, _, repoPath, updater := setupUpdater(t, ctx) defer testhelper.MustClose(t, updater) - actual, err := executor.GitVersion(ctx) - require.NoError(t, err) - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("master")) require.NoError(t, updater.Start()) - require.NoError(t, updater.UpdateSymbolicReference(actual, "refs/heads/symref", "refs/heads/master")) + require.NoError(t, updater.UpdateSymbolicReference("refs/heads/symref", "refs/heads/master")) require.NoError(t, updater.Commit()) // Verify that the reference was created as expected. diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/metrics.go b/internal/gitaly/reftable/metrics.go similarity index 95% rename from internal/gitaly/storage/storagemgr/partition/migration/reftable/metrics.go rename to internal/gitaly/reftable/metrics.go index 50acb85e8a799e3454ecff0db93c1d91a0d664b1..4ceb89cbe852fb10f41d0737e8d9e5b86b08a8e2 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/metrics.go +++ b/internal/gitaly/reftable/metrics.go @@ -32,14 +32,14 @@ func NewMetrics() Metrics { Name: "gitaly_reftable_migration_latency_seconds", Help: "Latency of a successful repository migration", }, - []string{}, + []string{"without_wal"}, ), failsMetric: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gitaly_reftable_migration_failure", Help: "Counter of the total number of migration failures", }, - []string{"reason"}, + []string{"without_wal", "reason"}, ), } } diff --git a/internal/gitaly/reftable/middleware.go b/internal/gitaly/reftable/middleware.go new file mode 100644 index 0000000000000000000000000000000000000000..9acec3f6979edd1fe69ca17b39b70f489af95b95 --- /dev/null +++ b/internal/gitaly/reftable/middleware.go @@ -0,0 +1,129 @@ +package reftable + +import ( + "context" + "fmt" + "sync" + + "gitlab.com/gitlab-org/gitaly/v18/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v18/middleware" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +type migrationRegister interface { + RegisterMigration(repo *localrepo.Repo, block func() error, unblock func() error) + CancelMigration(repo *localrepo.Repo) +} + +// NewInterceptors provides opportunistic middlewares to aid in reftable migration. +// It only registers a migration for an incoming ACCESSOR request. If any other +// type of request is received, it tries to cancel the migration if any exist +// and are ongoing. +// +// For streaming RPCs we consume the first request and wrap it again before calling +// the next handler. This ensures that the next handler doesn't miss the first message. +func NewInterceptors( + factory localrepo.Factory, + registry *protoregistry.Registry, + register migrationRegister, +) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) { + locks := sync.Map{} + + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, returnedErr error) { + tx := storage.ExtractTransaction(ctx) + + if tx == nil && featureflag.ReftableMigration.IsEnabled(ctx) { + methodInfo, err := registry.LookupMethod(info.FullMethod) + if err != nil { + // We might get an error for health checks, let's err on + // the side of safety. + return handler(ctx, req) + } + + targetRepo, err := methodInfo.TargetRepo(req.(proto.Message)) + if err != nil { + return nil, fmt.Errorf("extract repository: %w", err) + } + + lockAny, _ := locks.LoadOrStore(targetRepo.GetRelativePath(), &sync.RWMutex{}) + lock := lockAny.(*sync.RWMutex) + + repo := factory.Build(targetRepo) + block := func() error { lock.Lock(); return nil } + unblock := func() error { lock.Unlock(); return nil } + + switch methodInfo.Operation { + case protoregistry.OpAccessor: + register.RegisterMigration(repo, block, unblock) + case protoregistry.OpMutator: + defer register.RegisterMigration(repo, block, unblock) + fallthrough + default: + // Cancel any ongoing migrations to avoid conflicts + // but schedule one to start after we serve the request + register.CancelMigration(repo) + } + + lock.RLock() + defer lock.RUnlock() + return handler(ctx, req) + } + + return handler(ctx, req) + }, func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (returnedErr error) { + tx := storage.ExtractTransaction(ss.Context()) + + if tx == nil && featureflag.ReftableMigration.IsEnabled(ss.Context()) { + methodInfo, err := registry.LookupMethod(info.FullMethod) + if err != nil { + // We might get an error for health checks, let's err on + // the side of safety. + return handler(srv, ss) + } + + req := methodInfo.NewRequest() + if err := ss.RecvMsg(req); err != nil { + // All of the repository scoped streaming RPCs send the repository in the first message. + // Generally it should be fine to error out in all cases if there is no message sent. + // To maintain compatibility with tests, we instead invoke the handler to let them return + // the asserted error messages. Once the transaction management is on by default, we should + // error out here directly and amend the failing test cases. + return handler(srv, middleware.NewPeekedStream(ss.Context(), nil, err, ss)) + } + + targetRepo, err := methodInfo.TargetRepo(req) + if err != nil { + return fmt.Errorf("extract repository: %w", err) + } + + lockAny, _ := locks.LoadOrStore(targetRepo.GetRelativePath(), &sync.RWMutex{}) + lock := lockAny.(*sync.RWMutex) + + repo := factory.Build(targetRepo) + block := func() error { lock.Lock(); return nil } + unblock := func() error { lock.Unlock(); return nil } + + switch methodInfo.Operation { + case protoregistry.OpAccessor: + register.RegisterMigration(repo, block, unblock) + case protoregistry.OpMutator: + defer register.RegisterMigration(repo, block, unblock) + fallthrough + default: + // Cancel any ongoing migrations to avoid conflicts + // but schedule one to start after we serve the request + register.CancelMigration(repo) + } + + lock.RLock() + defer lock.RUnlock() + return handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) + } + + return handler(srv, ss) + } +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go b/internal/gitaly/reftable/middleware_test.go similarity index 74% rename from internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go rename to internal/gitaly/reftable/middleware_test.go index e552515e3082d1d42874d89f75fe8d6a90819067..06bbca00577475f1493791fb13c33fd70860eca7 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go +++ b/internal/gitaly/reftable/middleware_test.go @@ -14,7 +14,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/hook" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/ref" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/repository" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/client" "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v18/internal/log" @@ -26,51 +25,44 @@ import ( ) type mockReftableMigrator struct { - registerCount int - cancelCount int - reftableMigrator *migrator + registerCount int + cancelCount int + handler *migrationHandler } -func (m *mockReftableMigrator) RegisterMigration(storageName, relativePath string) { +func (m *mockReftableMigrator) RegisterMigration(repo *localrepo.Repo, block, unblock func() error) { m.registerCount++ - m.reftableMigrator.RegisterMigration(storageName, relativePath) + m.handler.RegisterMigration(repo, block, unblock) } -func (m *mockReftableMigrator) CancelMigration(storageName, relativePath string) { +func (m *mockReftableMigrator) CancelMigration(repo *localrepo.Repo) { m.cancelCount++ - m.reftableMigrator.CancelMigration(storageName, relativePath) + m.handler.CancelMigration(repo) } func TestInterceptor(t *testing.T) { t.Parallel() - testhelper.NewFeatureSets( - featureflag.ReftableMigration, - ).Run(t, testInterceptor) + testhelper.NewFeatureSets(featureflag.ReftableMigration).Run(t, testInterceptor) } func testInterceptor(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) - if !testhelper.IsWALEnabled() { - t.Skip("only works with the WAL") - } - - // Ideally we should use an RPC routed via praefect to avoid the race. - if testhelper.IsPraefectEnabled() { - t.Skip("usage of gittest.WriteCommit causes a race condition.") - } + testhelper.SkipWithWAL(t, "the migration code runs without WAL") + testhelper.SkipWithPraefect(t, "we shouldn't migrate with Praefect") mockMigrator := mockReftableMigrator{} - var reftableMigrator *migrator - callback := func(logger log.Logger, node storage.Node, factory localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) { - reftableMigrator = NewMigrator(logger, NewMetrics(), node, factory) - reftableMigrator.Run() + var handler *migrationHandler + callback := func(logger log.Logger, factory localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) { + handler = NewMigrationHandler(logger, NewMetrics()) + handler.Run() + + mockMigrator.handler = handler - mockMigrator.reftableMigrator = reftableMigrator + unary, stream := NewInterceptors(factory, protoregistry.GitalyProtoPreregistered, &mockMigrator) - return []grpc.UnaryServerInterceptor{NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, &mockMigrator)}, - []grpc.StreamServerInterceptor{NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, &mockMigrator)} + return []grpc.UnaryServerInterceptor{unary}, []grpc.StreamServerInterceptor{stream} } serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { @@ -78,7 +70,8 @@ func testInterceptor(t *testing.T, ctx context.Context) { gitalypb.RegisterRefServiceServer(srv, ref.NewServer(deps)) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps)) gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps)) - }, testserver.WithTransactionInterceptors(callback)) + }, testserver.WithInterceptors(callback), + testserver.WithContext(ctx)) cfg.SocketPath = serverSocketPath conn, err := client.New(ctx, serverSocketPath) @@ -107,8 +100,8 @@ func testInterceptor(t *testing.T, ctx context.Context) { _, err := client.FindCommit(ctx, request) require.NoError(t, err) }, - expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), - expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 0, 0), }, { desc: "unary mutator", @@ -126,8 +119,8 @@ func testInterceptor(t *testing.T, ctx context.Context) { }) require.NoError(t, err) }, - expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), - expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 3, 0), + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 0, 0), }, { desc: "stream accessor", @@ -154,8 +147,8 @@ func testInterceptor(t *testing.T, ctx context.Context) { }) require.NoError(t, err) }, - expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), - expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 0, 0), }, { desc: "stream mutator", @@ -183,36 +176,31 @@ func testInterceptor(t *testing.T, ctx context.Context) { _, err = updater.CloseAndRecv() require.NoError(t, err) }, - expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), - expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 3, 0), + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 0, 0), }, } { t.Run(tc.desc, func(t *testing.T) { - // Reset the mock migrator count for each test + // Reset the mock migration handler count for each test mockMigrator.registerCount, mockMigrator.cancelCount = 0, 0 - // To avoid migration being triggered by CreateRepository, we trick into - // believing that the repository has already completed a migraiton. - relativePath := gittest.NewRepositoryName(t) - key := migrationKey(cfg.Storages[0].Name, relativePath) - reftableMigrator.state.Store(key, migratorState{completed: true}) - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - RelativePath: relativePath, + SkipCreationViaService: true, }) - reftableMigrator.state.Delete(key) - commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), gittest.WithTreeEntries( gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, ), ) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") tc.setup(repoProto, commitID) - // Block to ensure the previous migration was successful. - reftableMigrator.migrateCh <- migrationData{} + if featureflag.ReftableMigration.IsEnabled(ctx) { + // Block to ensure the previous migration was successful. + handler.migrateCh <- migrationData{} + } repoInfo, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ Repository: repoProto, @@ -235,5 +223,7 @@ func testInterceptor(t *testing.T, ctx context.Context) { }) } - reftableMigrator.Close() + if featureflag.ReftableMigration.IsEnabled(ctx) { + handler.Close() + } } diff --git a/internal/gitaly/reftable/migration_handler.go b/internal/gitaly/reftable/migration_handler.go new file mode 100644 index 0000000000000000000000000000000000000000..bd3ea082867a9739f2a1cd0c8dd08c5d5c50257e --- /dev/null +++ b/internal/gitaly/reftable/migration_handler.go @@ -0,0 +1,184 @@ +package reftable + +import ( + "context" + "errors" + "math" + "sync" + "time" + + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/log" +) + +type migratorState struct { + completed bool + attempts uint + coolDown time.Time + cancelCtx context.CancelFunc +} + +type migrationData struct { + repo *localrepo.Repo + block func() error + unblock func() error +} + +type migrationHandler struct { + wg sync.WaitGroup + + ctx context.Context + ctxCancel context.CancelFunc + + logger log.Logger + metrics Metrics + + migrateCh chan migrationData + state sync.Map +} + +// NewMigrationHandler provides a new migration handler. +func NewMigrationHandler(logger log.Logger, metrics Metrics) *migrationHandler { + ctx, cancel := context.WithCancel(context.Background()) + + return &migrationHandler{ + ctx: ctx, + ctxCancel: cancel, + + logger: logger, + metrics: metrics, + + migrateCh: make(chan migrationData), + state: sync.Map{}, + } +} + +// Run creates a long running goroutine which handles all migration requests. +// Must be accompanied with a Close(), which will safely stop the goroutine. +func (m *migrationHandler) Run() { + m.wg.Add(1) + go func() { + defer m.wg.Done() + + for { + select { + case <-m.ctx.Done(): + return + case data := <-m.migrateCh: + func() { + if data.repo == nil || data.block == nil || data.unblock == nil { + return + } + + ctx, cancel := context.WithCancel(m.ctx) + repoPath, err := data.repo.Path(ctx) + if err != nil { + m.logger.WithError(err).WithFields(log.Fields{ + "repo_path": repoPath, + }).ErrorContext(ctx, "reftable migration failed getting path") + + cancel() + return + } + + val, ok := m.state.LoadOrStore(repoPath, migratorState{cancelCtx: cancel}) + state := val.(migratorState) + + // If the state was present, we still need to store our + // cancellation function. + if ok { + state.cancelCtx = cancel + m.state.Store(repoPath, state) + } + + // We don't do 'defer m.state.Store(...)' here, because that would + // fix the state as is here. We want to delay the evaluation of the + // state + defer func() { + m.state.Store(repoPath, state) + }() + + if state.completed || state.coolDown.After(time.Now()) { + return + } + + before := time.Now() + err = migrate(ctx, data.repo, data.block, data.unblock) + // We shouldn't care about migration status for repositories which don't + // event exist. + if errors.Is(err, storage.ErrRepositoryNotFound) { + return + } + latency := time.Since(before) + + state.attempts = state.attempts + 1 + state.cancelCtx = nil + + if err != nil { + m.logger.WithError(err).WithFields(log.Fields{ + "repo_path": repoPath, + "migration_latency": latency, + "migration_attempts": state.attempts, + }).ErrorContext(ctx, "reftable migration failed for repository") + m.metrics.failsMetric.WithLabelValues("true", failMetricReason(err)).Add(1) + + // Let's delay exponentially, but with a max of 6hrs + delay := min(math.Pow(2, float64(state.attempts)), 6) + state.coolDown = time.Now().Add(time.Duration(delay) * time.Hour) + } else { + m.logger.WithFields(log.Fields{ + "repo_path": repoPath, + "migration_latency": latency, + "migration_attempts": state.attempts, + }).InfoContext(ctx, "reftable migration successful for repository") + m.metrics.latencyMetric.WithLabelValues("true").Observe(latency.Seconds()) + + state.completed = true + } + }() + } + } + }() +} + +// Close is used to stop the handler. +func (m *migrationHandler) Close() { + defer m.wg.Wait() + m.ctxCancel() +} + +// RegisterMigration is used to register a new migration. This function +// is non-blocking and doesn't return an error. +func (m *migrationHandler) RegisterMigration( + repo *localrepo.Repo, + block func() error, + unblock func() error, +) { + select { + case m.migrateCh <- migrationData{repo: repo, block: block, unblock: unblock}: + default: + return + } +} + +// CancelMigration cancels the ongoing migration if it matches the +// state provided. +func (m *migrationHandler) CancelMigration(repo *localrepo.Repo) { + repoPath, err := repo.Path(m.ctx) + if err != nil { + m.logger.WithError(err).WithFields(log.Fields{ + "repo_path": repoPath, + }).ErrorContext(m.ctx, "reftable migration failed getting path") + return + } + + val, ok := m.state.Load(repoPath) + if !ok { + return + } + + if cancel := val.(migratorState).cancelCtx; cancel != nil { + cancel() + } +} diff --git a/internal/gitaly/reftable/migration_handler_test.go b/internal/gitaly/reftable/migration_handler_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7846bbcd61b4254c622a662cf7a412bb8ac321e3 --- /dev/null +++ b/internal/gitaly/reftable/migration_handler_test.go @@ -0,0 +1,369 @@ +package reftable + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" +) + +// Exercise the registration function. Migrations should be registered +// irrelevant of the current state (completed, pending, cooldown). +func TestMigrationHandler_RegisterMigration(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + state migratorState + }{ + { + desc: "migration already completed", + state: migratorState{ + completed: true, + }, + }, + { + desc: "migration in cooldown period", + state: migratorState{ + coolDown: time.Now().Add(1 * time.Hour), + }, + }, + { + desc: "migration with expired cooldown period", + state: migratorState{ + coolDown: time.Now().Add(-1 * time.Hour), + }, + }, + { + desc: "migration state doesn't exist", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + parentCtx, parentCancel := context.WithCancel(context.Background()) + m := &migrationHandler{ + migrateCh: make(chan migrationData), + state: sync.Map{}, + ctx: parentCtx, + ctxCancel: parentCancel, + } + m.state.Store(repoPath, tc.state) + + var wg sync.WaitGroup + defer wg.Wait() + + stopCh := make(chan struct{}) + + // We raise multiple gorountines, only one would go through. + // The others would go to the default case. + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopCh: + return + default: + wg.Add(1) + go func() { + defer wg.Done() + m.RegisterMigration(repo, func() error { return nil }, func() error { return nil }) + }() + } + } + }() + + defer func() { + close(stopCh) + }() + + data := <-m.migrateCh + require.Equal(t, repo, data.repo) + }) + } +} + +func TestMigrationHandler_CancelMigration(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + repoProto, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + for _, tc := range []struct { + desc string + repo *localrepo.Repo + expectedErr error + }{ + { + desc: "incorrect repo", + repo: func() *localrepo.Repo { + repoProto, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + return localrepo.NewTestRepo(t, cfg, repoProto) + }(), + }, + { + desc: "success path", + repo: repo, + expectedErr: context.Canceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + + parentCtx, parentCancel := context.WithCancel(context.Background()) + m := &migrationHandler{ + state: sync.Map{}, + ctx: parentCtx, + ctxCancel: parentCancel, + } + + repoPath, err := repo.Path(m.ctx) + require.NoError(t, err) + m.state.Store(repoPath, migratorState{cancelCtx: cancel}) + + m.CancelMigration(tc.repo) + require.ErrorIs(t, ctx.Err(), tc.expectedErr) + }) + } +} + +func TestMigrationHandler(t *testing.T) { + t.Parallel() + + testhelper.SkipWithWAL(t, "the migration code runs without WAL") + testhelper.SkipWithPraefect(t, "we shouldn't migrate with Praefect") + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + logger := testhelper.NewLogger(t) + hook := testhelper.AddLoggerHook(logger) + metrics := NewMetrics() + + type setupData struct { + block func() error + unblock func() error + run func(m *migrationHandler, repo *localrepo.Repo) + repoProto *gitalypb.Repository + startState *migratorState + } + + for _, tc := range []struct { + desc string + setup func() setupData + completed bool + attempts uint + expectedLogMsg string + }{ + { + desc: "cancelled migration", + setup: func() setupData { + ch := make(chan struct{}) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repoProto: repo, + block: func() error { + <-ch + <-ch + + return nil + }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) { + ch <- struct{}{} + m.CancelMigration(repo) + ch <- struct{}{} + }, + } + }, + completed: false, + attempts: 1, + expectedLogMsg: "reftable migration failed for repository", + }, + { + desc: "existing state with cancelled migration", + setup: func() setupData { + ch := make(chan struct{}) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repoProto: repo, + block: func() error { + <-ch + <-ch + + return nil + }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) { + ch <- struct{}{} + m.CancelMigration(repo) + ch <- struct{}{} + }, + startState: &migratorState{attempts: 3}, + } + }, + completed: false, + attempts: 4, + expectedLogMsg: "reftable migration failed for repository", + }, + { + desc: "repository not found error", + setup: func() setupData { + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repoProto: repo, + // Mimic repository not existing by passing the error + // through the block function. + block: func() error { return storage.ErrRepositoryNotFound }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) {}, + } + }, + // When we encounter a ErrRepositoryNotFound error, we simply + // skip the migration and don't mark it as completed or attempted. + completed: false, + attempts: 0, + }, + { + desc: "unsuccessful migration without packed-refs", + setup: func() setupData { + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + + return setupData{ + repoProto: repo, + block: func() error { return nil }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) {}, + } + }, + completed: false, + attempts: 1, + expectedLogMsg: "reftable migration failed for repository", + }, + { + desc: "successful migration", + setup: func() setupData { + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repoProto: repo, + block: func() error { return nil }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) {}, + } + }, + completed: true, + attempts: 1, + expectedLogMsg: "reftable migration successful for repository", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + data := tc.setup() + + parentCtx, parentCancel := context.WithCancel(context.Background()) + m := &migrationHandler{ + wg: sync.WaitGroup{}, + + ctx: parentCtx, + ctxCancel: parentCancel, + + logger: logger, + metrics: metrics, + + migrateCh: make(chan migrationData), + state: sync.Map{}, + } + + repo := localrepo.NewTestRepo(t, cfg, data.repoProto) + repoPath, err := repo.Path(ctx) + require.NoError(t, err) + + if data.startState != nil { + m.state.Store(repoPath, *data.startState) + } + + m.Run() + defer m.Close() + + // It is not guaranteed that the migration is registered, so run it in a + // loop until it is. + for { + if val, ok := m.state.Load(repoPath); ok { + if val.(migratorState).cancelCtx != nil { + break + } + } + + m.RegisterMigration(repo, data.block, data.unblock) + } + + data.run(m, repo) + + // Block till the old migration is complete. + m.migrateCh <- migrationData{} + + val, ok := m.state.Load(repoPath) + state := val.(migratorState) + + require.True(t, ok) + require.Equal(t, tc.completed, state.completed) + require.Equal(t, tc.attempts, state.attempts) + + if tc.expectedLogMsg != "" { + entries := hook.AllEntries() + entry := entries[len(entries)-1] + require.Equal(t, tc.expectedLogMsg, entry.Message) + require.Greater(t, entry.Data["migration_latency"].(time.Duration), time.Duration(0)) + require.Greater(t, entry.Data["migration_attempts"].(uint), uint(0)) + } + }) + } +} diff --git a/internal/gitaly/reftable/migrator.go b/internal/gitaly/reftable/migrator.go new file mode 100644 index 0000000000000000000000000000000000000000..d50db584678f1a3996bbba5c50cc08af1951fa44 --- /dev/null +++ b/internal/gitaly/reftable/migrator.go @@ -0,0 +1,248 @@ +package reftable + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + + "regexp" + + "gitlab.com/gitlab-org/gitaly/v18/internal/git" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gitcmd" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/updateref" +) + +var nameRegex = regexp.MustCompile("^[[:ascii:]]*'([[:ascii:]]*)'\n$") + +type blocker interface { + Block() error + Unblock() error + Repo() +} + +func migrateWithDryRun(ctx context.Context, repo *localrepo.Repo) (string, error) { + cmd, err := repo.Exec(ctx, gitcmd.Command{ + Name: "refs", + Action: "migrate", + Flags: []gitcmd.Option{ + gitcmd.Flag{Name: "--ref-format=reftable"}, + gitcmd.Flag{Name: "--no-reflog"}, + gitcmd.Flag{Name: "--dry-run"}, + }, + }, gitcmd.WithSetupStdout()) + if err != nil { + return "", fmt.Errorf("running refs migrate: %w", err) + } + + output, err := io.ReadAll(cmd) + if err != nil { + return "", fmt.Errorf("reading refs migrate output: %w", err) + } + + if err = cmd.Wait(); err != nil { + return "", fmt.Errorf("closing refs migrate: %w", err) + } + + matches := nameRegex.FindStringSubmatch(string(output)) + if len(matches) != 2 { + return "", fmt.Errorf("couldn't find migration directory") + } + + return matches[1], nil +} + +func applyLooseRefs(ctx context.Context, repo *localrepo.Repo, repoPath string, updater *updateref.Updater) error { + err := filepath.WalkDir(filepath.Join(repoPath, "refs"), func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + return nil + } + + content, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("reading file: %w", err) + } + content = bytes.TrimSpace(content) + + refname, err := filepath.Rel(repoPath, path) + if err != nil { + return fmt.Errorf("parsing refname: %w", err) + } + + if strings.HasPrefix(string(content), "ref: ") { + return updater.UpdateSymbolicReference(git.ReferenceName(refname), git.ReferenceName(content[5:])) + } else { + return updater.Update(git.ReferenceName(refname), git.ObjectID(content), "") + } + }) + if err != nil { + return fmt.Errorf("walking refs folder: %w", err) + } + + head, err := repo.GetDefaultBranch(ctx) + if err != nil { + return fmt.Errorf("reading HEAD ref: %w", err) + } + + return updater.UpdateSymbolicReference("HEAD", head) +} + +func swapBackend(ctx context.Context, repo *localrepo.Repo, repoPath, reftablePath string) error { + if err := os.Rename( + filepath.Join(reftablePath, "reftable"), + filepath.Join(repoPath, "reftable"), + ); err != nil { + return fmt.Errorf("moving the reftable folder: %w", err) + } + + var stderr bytes.Buffer + if err := repo.ExecAndWait(ctx, gitcmd.Command{ + Name: "config", + Action: "set", + Args: []string{"core.repositoryformatversion", "1"}, + }, gitcmd.WithStderr(&stderr)); err != nil { + return fmt.Errorf("writing repository format version config: %w: %v", err, stderr.String()) + } + + if err := repo.ExecAndWait(ctx, gitcmd.Command{ + Name: "config", + Action: "set", + Args: []string{"extensions.refstorage", "reftable"}, + }, gitcmd.WithStderr(&stderr)); err != nil { + return fmt.Errorf("writing refstorage config: %w: %v", err, stderr.String()) + } + + return nil +} + +func cleanupRepo(repoPath, reftablePath string) error { + if err := os.Rename( + filepath.Join(reftablePath, "HEAD"), + filepath.Join(repoPath, "HEAD"), + ); err != nil { + return fmt.Errorf("replace HEAD: %w", err) + } + + if err := os.Remove(filepath.Join(repoPath, "packed-refs")); err != nil { + return fmt.Errorf("removing packed-refs: %w", err) + } + + if err := os.RemoveAll(filepath.Join(repoPath, "refs")); err != nil { + return fmt.Errorf("removing refs: %w", err) + } + + if err := os.Rename( + filepath.Join(reftablePath, "refs"), + filepath.Join(repoPath, "refs"), + ); err != nil { + return fmt.Errorf("move dry-run refs folder: %w", err) + } + + if err := os.Remove(reftablePath); err != nil { + return fmt.Errorf("removing migration folder: %w", err) + } + + return nil +} + +func migrate( + ctx context.Context, + repo *localrepo.Repo, + block func() error, + unblock func() error, +) (err error) { + backend, err := repo.ReferenceBackend(ctx) + if err != nil { + return fmt.Errorf("reference backend: %w", err) + } + + // Already migrated \m/ + if backend == git.ReferenceBackendReftables { + return nil + } + + repoPath, err := repo.Path(ctx) + if err != nil { + return fmt.Errorf("getting repo path: %w", err) + } + + before, err := os.Stat(filepath.Join(repoPath, "packed-refs")) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("packed-refs not found: %w", err) + } + return fmt.Errorf("stat packed-refs: %w", err) + } + + reftablePath, err := migrateWithDryRun(ctx, repo) + if err != nil { + return fmt.Errorf("creating dry-run reftable: %w", err) + } + defer func() { + if err != nil { + if cleanupErr := os.RemoveAll(reftablePath); cleanupErr != nil { + err = errors.Join(err, cleanupErr) + } + } + }() + + uri := git.CreateRefURI(git.ReferenceBackendReftables, reftablePath) + updater, err := updateref.New( + ctx, + repo, + updateref.WithNoDeref(), + updateref.WithDisabledTransactions(), + updateref.WithRefURI(uri)) + if err != nil { + return fmt.Errorf("creating updateref: %w", err) + } + + if err := updater.Start(); err != nil { + return fmt.Errorf("start update-ref: %w", err) + } + + if err := func() (returnedErr error) { + if returnedErr = block(); returnedErr != nil { + return fmt.Errorf("failed to block: %w", returnedErr) + } + + defer func() { + if err := unblock(); err != nil { + returnedErr = errors.Join(fmt.Errorf("failed to unblock: %w", err), returnedErr) + } + }() + + if err = applyLooseRefs(ctx, repo, repoPath, updater); err != nil { + return fmt.Errorf("applying loose-refs: %w", err) + } + + if err := updater.Commit(); err != nil { + return fmt.Errorf("commit update-ref: %w", err) + } + + after, err := os.Stat(filepath.Join(repoPath, "packed-refs")) + if err != nil { + return fmt.Errorf("stat packed-refs: %w", err) + } + + if before.ModTime().Compare(after.ModTime()) != 0 { + return fmt.Errorf("packed-refs modified") + } + + return swapBackend(ctx, repo, repoPath, reftablePath) + }(); err != nil { + return fmt.Errorf("swapping backend: %w", err) + } + + return cleanupRepo(repoPath, reftablePath) +} diff --git a/internal/gitaly/reftable/migrator_test.go b/internal/gitaly/reftable/migrator_test.go new file mode 100644 index 0000000000000000000000000000000000000000..bb0edd0dce17094c94c3748335f15ada05902b7b --- /dev/null +++ b/internal/gitaly/reftable/migrator_test.go @@ -0,0 +1,218 @@ +package reftable + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitaly/v18/internal/git" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" +) + +func TestMigrator(t *testing.T) { + testhelper.SkipWithReftable(t, "checks migration from files-backend") + + t.Parallel() + + type setupData struct { + repo *localrepo.Repo + repoPath string + block func() error + unblock func() error + expectedErrSubstring string + } + + for _, tc := range []struct { + desc string + setup func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData + }{ + { + desc: "no packed-refs", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: func() error { return nil }, + unblock: func() error { return nil }, + expectedErrSubstring: "packed-refs not found", + } + }, + }, + { + desc: "already on reftable", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.Exec(t, cfg, "-C", repoPath, "refs", "migrate", "--ref-format=reftable") + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: func() error { return nil }, + unblock: func() error { return nil }, + } + }, + }, + { + desc: "with packed-refs", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: func() error { return nil }, + unblock: func() error { return nil }, + } + }, + }, + { + desc: "with packed-refs and some loose refs", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + c1 := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch"), gittest.WithTree( + gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "foo"}, + })), + ) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(c1), gittest.WithBranch("loose"), + gittest.WithTree(gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "bar"}, + })), + ) + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: func() error { return nil }, + unblock: func() error { return nil }, + } + }, + }, + { + desc: "packed-refs modified", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch"), gittest.WithTree( + gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "foo"}, + })), + ) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + blockFn := func() error { + return os.Chtimes(filepath.Join(repoPath, "packed-refs"), time.Now(), time.Now()) + } + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: blockFn, + unblock: func() error { return nil }, + expectedErrSubstring: "packed-refs modified", + } + }, + }, + { + desc: "ref modified before blocking", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + c1 := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch"), gittest.WithTree( + gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "foo"}, + })), + ) + c2 := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(c1), gittest.WithBranch("loose"), + gittest.WithTree(gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "bar"}, + })), + ) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(c2), gittest.WithBranch("loose-2"), + gittest.WithTree(gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "goo"}, + })), + ) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + blockFn := func() error { + gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(c2), gittest.WithBranch("loose"), + gittest.WithTree(gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "boo"}, + })), + ) + + return nil + } + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: blockFn, + unblock: func() error { return nil }, + } + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + data := tc.setup(t, ctx, cfg) + + before := gittest.Exec(t, cfg, "-C", data.repoPath, "refs", "list") + // Some tests modify the repository in the block stage. We should update + // the refs list accordingly. + if data.block != nil { + prev := data.block + data.block = func() error { + err := prev() + before = gittest.Exec(t, cfg, "-C", data.repoPath, "refs", "list") + return err + } + } + + migrateErr := migrate(ctx, data.repo, data.block, data.unblock) + + repo := localrepo.NewTestRepo(t, cfg, data.repo.Repository) + backend, err := repo.ReferenceBackend(ctx) + require.NoError(t, err) + + if len(data.expectedErrSubstring) > 0 { + require.Contains(t, migrateErr.Error(), data.expectedErrSubstring) + require.Equal(t, git.ReferenceBackendFiles, backend) + } else { + require.NoError(t, migrateErr) + require.Equal(t, git.ReferenceBackendReftables, backend) + } + + after := gittest.Exec(t, cfg, "-C", data.repoPath, "refs", "list") + require.Equal(t, string(before), string(after)) + }) + } +} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/testhelper_test.go b/internal/gitaly/reftable/testhelper_test.go similarity index 100% rename from internal/gitaly/storage/storagemgr/partition/migration/reftable/testhelper_test.go rename to internal/gitaly/reftable/testhelper_test.go diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go deleted file mode 100644 index 480d411e422f7833a1fea164bb41ef40faa29997..0000000000000000000000000000000000000000 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go +++ /dev/null @@ -1,110 +0,0 @@ -package reftable - -import ( - "context" - "fmt" - - "gitlab.com/gitlab-org/gitaly/v18/internal/featureflag" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/protoregistry" - "gitlab.com/gitlab-org/gitaly/v18/internal/log" - "gitlab.com/gitlab-org/gitaly/v18/middleware" - "google.golang.org/grpc" - "google.golang.org/protobuf/proto" -) - -type migrationRegister interface { - RegisterMigration(storageName, relativePath string) - CancelMigration(storageName, relativePath string) -} - -// NewUnaryInterceptor is an oppurtunistic middleware to aid in reftable migration. -// It only registers a migration for an incoming ACCESSOR request. If any other -// type of request is received, it tries to cancel the migration if any exist -// and are ongoing. -func NewUnaryInterceptor(logger log.Logger, registry *protoregistry.Registry, register migrationRegister) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, returnedErr error) { - tx := storage.ExtractTransaction(ctx) - - if tx != nil && featureflag.ReftableMigration.IsEnabled(ctx) { - methodInfo, err := registry.LookupMethod(info.FullMethod) - if err != nil { - return nil, fmt.Errorf("lookup method: %w", err) - } - - targetRepo, err := methodInfo.TargetRepo(req.(proto.Message)) - if err != nil { - return nil, fmt.Errorf("extract repository: %w", err) - } - - targetRepo = tx.OriginalRepository(targetRepo) - - switch methodInfo.Operation { - case protoregistry.OpAccessor: - register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - case protoregistry.OpMutator: - defer register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - fallthrough - default: - // Cancel any ongoing migrations to avoid conflicts - // but schedule one to start after we serve the request - register.CancelMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - } - } - - return handler(ctx, req) - } -} - -// NewStreamInterceptor is an oppurtunistic middleware to aid in reftable migration. -// It only registers a migration for an incoming ACCESSOR request. If any other -// type of request is received, it tries to cancel the migration if any exist -// and are ongoing. -// -// For streaming RPCs we consume the first request and wrap it again before calling -// the next handler. This ensures that the next request doesn't miss the first message. -func NewStreamInterceptor(logger log.Logger, registry *protoregistry.Registry, register migrationRegister) grpc.StreamServerInterceptor { - return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (returnedErr error) { - tx := storage.ExtractTransaction(ss.Context()) - - if tx != nil && featureflag.ReftableMigration.IsEnabled(ss.Context()) { - methodInfo, err := registry.LookupMethod(info.FullMethod) - if err != nil { - return fmt.Errorf("lookup method: %w", err) - } - - req := methodInfo.NewRequest() - if err := ss.RecvMsg(req); err != nil { - // All of the repository scoped streaming RPCs send the repository in the first message. - // Generally it should be fine to error out in all cases if there is no message sent. - // To maintain compatibility with tests, we instead invoke the handler to let them return - // the asserted error messages. Once the transaction management is on by default, we should - // error out here directly and amend the failing test cases. - return handler(srv, middleware.NewPeekedStream(ss.Context(), nil, err, ss)) - } - - targetRepo, err := methodInfo.TargetRepo(req) - if err != nil { - return fmt.Errorf("extract repository: %w", err) - } - - targetRepo = tx.OriginalRepository(targetRepo) - - switch methodInfo.Operation { - case protoregistry.OpAccessor: - register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - case protoregistry.OpMutator: - defer register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - fallthrough - default: - // Cancel any ongoing migrations to avoid conflicts - // but schedule one to start after we serve the request - register.CancelMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - } - - return handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) - } - - return handler(srv, ss) - } -} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go deleted file mode 100644 index 389be226db99f246d8be46c33b9c3ca3050d0893..0000000000000000000000000000000000000000 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go +++ /dev/null @@ -1,241 +0,0 @@ -package reftable - -import ( - "context" - "errors" - "fmt" - "math" - "sync" - "time" - - "gitlab.com/gitlab-org/gitaly/v18/internal/git" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/migration" - migrationid "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/migration/id" - "gitlab.com/gitlab-org/gitaly/v18/internal/log" -) - -type migrationHandler interface { - Migrate(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error -} - -type refBackendMigrator struct { - migration.Migration -} - -func (r *refBackendMigrator) Migrate(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { - return r.Fn(ctx, tx, storageName, relativePath) -} - -type migratorState struct { - completed bool - attempts uint - coolDown time.Time - cancelCtx context.CancelFunc -} - -type migrationData struct { - relativePath string - storageName string -} - -type migrator struct { - wg sync.WaitGroup - migrateCh chan migrationData - - logger log.Logger - metrics Metrics - node storage.Node - migrationHandler migrationHandler - - ctx context.Context - ctxCancel context.CancelFunc - - state sync.Map -} - -// NewMigrator provides a new reftable migrator. The migrator holds -// in-memory state regarding migrations attempted. Failed migrations -// have a exponential cooldown penalty before the next attempt. -// -// The migrator must first be initialized via the `Run()` function, -// which spawns the goroutine which listens for migrations and runs -// a single migration at a given time. -// -// The `RegisterMigration()` function can be used to register a new -// migration, however this function is non-blocking and can skip -// registering a migration if there is already one being processed. -// This makes it safe to be called multiple times in hot-paths of -// the code. -// -// Finally, `CancelMigration()` can be used to cancel an ongoing -// migration if necessary. -func NewMigrator(logger log.Logger, metrics Metrics, node storage.Node, localRepoFactory localrepo.Factory) *migrator { - ctx, cancel := context.WithCancel(context.Background()) - - return &migrator{ - migrateCh: make(chan migrationData), - logger: logger, - metrics: metrics, - node: node, - state: sync.Map{}, - migrationHandler: &refBackendMigrator{ - migration.NewReferenceBackendMigration(migrationid.Reftable, git.ReferenceBackendReftables, localRepoFactory, nil), - }, - ctx: ctx, - ctxCancel: cancel, - } -} - -func (m *migrator) migrate(ctx context.Context, storageName, relativePath string) (_ time.Duration, returnedErr error) { - storageHandle, err := m.node.GetStorage(storageName) - if err != nil { - return 0, fmt.Errorf("get storage: %w", err) - } - - t := time.Now() - - tx, err := storageHandle.Begin(ctx, storage.TransactionOptions{RelativePath: relativePath}) - if err != nil { - return time.Since(t), fmt.Errorf("start transaction: %w", err) - } - defer func() { - if returnedErr != nil { - if err := tx.Rollback(ctx); err != nil { - returnedErr = errors.Join(err, fmt.Errorf("rollback: %w", err)) - } - } else { - commitLSN, err := tx.Commit(ctx) - if err != nil { - returnedErr = errors.Join(err, fmt.Errorf("commit: %w", err)) - return - } - - storage.LogTransactionCommit(ctx, m.logger, commitLSN, "reftable migration") - } - }() - - if err := m.migrationHandler.Migrate(ctx, tx, storageName, relativePath); err != nil { - return time.Since(t), fmt.Errorf("run migration: %w", err) - } - - return time.Since(t), nil -} - -// Run is used to spawn the goroutine which listens to new migration -// requests. It takes a very safe approach to run a single migration -// at a given time across all repositories.. -func (m *migrator) Run() { - m.wg.Add(1) - go func() { - defer m.wg.Done() - - for { - select { - case <-m.ctx.Done(): - return - case data := <-m.migrateCh: - func() { - if data.relativePath == "" || data.storageName == "" { - return - } - - ctx, cancel := context.WithCancel(m.ctx) - - key := migrationKey(data.storageName, data.relativePath) - - val, ok := m.state.LoadOrStore(key, migratorState{cancelCtx: cancel}) - state := val.(migratorState) - - // If the state was present, we still need to store our - // cancellation function. - if ok { - state.cancelCtx = cancel - m.state.Store(key, state) - } - - // We don't do 'defer m.state.Store(...)' here, because that would - // fix the state as is here. We want to delay the evaluvation of the - // state - defer func() { - m.state.Store(key, state) - }() - - if state.completed || state.coolDown.After(time.Now()) { - return - } - - latency, err := m.migrate(ctx, data.storageName, data.relativePath) - // We shouldn't care about migration status for repositories which don't - // event exist. - if errors.Is(err, storage.ErrRepositoryNotFound) { - return - } - - state.attempts = state.attempts + 1 - state.cancelCtx = nil - - if err != nil { - m.logger.WithError(err).WithFields(log.Fields{ - "storage_name": data.storageName, - "relative_path": data.relativePath, - "migration_latency": latency, - "migration_attempts": state.attempts, - }).ErrorContext(ctx, "reftable migration failed for repository") - m.metrics.failsMetric.WithLabelValues(failMetricReason(err)).Add(1) - - // Let's delay exponentially, but with a max of 6hrs - delay := min(math.Pow(2, float64(state.attempts)), 6) - state.coolDown = time.Now().Add(time.Duration(delay) * time.Hour) - } else { - m.logger.WithFields(log.Fields{ - "storage_name": data.storageName, - "relative_path": data.relativePath, - "migration_latency": latency, - "migration_attempts": state.attempts, - }).InfoContext(ctx, "reftable migration successful for repository") - m.metrics.latencyMetric.WithLabelValues().Observe(latency.Seconds()) - - state.completed = true - } - }() - } - } - }() -} - -// Close is used to stop the migrator. -func (m *migrator) Close() { - defer m.wg.Wait() - m.ctxCancel() -} - -// RegisterMigration is used to register a new migration. This function -// is non-blocking and doesn't return an error. It attempts to register -// a migration but exits if the migrator is already processing one. -func (m *migrator) RegisterMigration(storageName, relativePath string) { - select { - case m.migrateCh <- migrationData{relativePath: relativePath, storageName: storageName}: - default: - return - } -} - -// CancelMigration cancels the ongoing migration if it matches the -// state provided. -func (m *migrator) CancelMigration(storageName, relativePath string) { - val, ok := m.state.Load(migrationKey(storageName, relativePath)) - - if !ok { - return - } - - if cancel := val.(migratorState).cancelCtx; cancel != nil { - cancel() - } -} - -func migrationKey(storageName, relativePath string) string { - return fmt.Sprintf("%s-%s", storageName, relativePath) -} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go deleted file mode 100644 index 4a470707c4ab034084a85ccd6735ecbb290a6a49..0000000000000000000000000000000000000000 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go +++ /dev/null @@ -1,383 +0,0 @@ -package reftable - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/catfile" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue/databasemgr" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/node" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition" - "gitlab.com/gitlab-org/gitaly/v18/internal/helper" - "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" - "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" -) - -type mockMigrationHandler struct { - ch <-chan struct{} - err error -} - -func (m *mockMigrationHandler) Migrate(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { - if m.ch != nil { - <-m.ch - <-m.ch - } - - if m.err != nil { - return m.err - } - - return nil -} - -func TestMigrator_RegisterMigration(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - desc string - state migratorState - expectedData bool - }{ - { - desc: "migration already completed", - state: migratorState{ - completed: true, - }, - }, - { - desc: "migration in cooldown period", - state: migratorState{ - coolDown: time.Now().Add(1 * time.Hour), - }, - }, - { - desc: "migration with expired cooldown period", - state: migratorState{ - coolDown: time.Now().Add(-1 * time.Hour), - }, - expectedData: true, - }, - { - desc: "migration state doesn't exist", - expectedData: true, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - t.Parallel() - - parentCtx, parentCancel := context.WithCancel(context.Background()) - m := &migrator{ - migrateCh: make(chan migrationData), - state: sync.Map{}, - ctx: parentCtx, - ctxCancel: parentCancel, - } - - var wg sync.WaitGroup - defer wg.Wait() - - stopCh := make(chan struct{}) - - // We raise multiple gorountines, only one would go through. - // The others would go to the default case. - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-stopCh: - return - default: - wg.Add(1) - go func() { - defer wg.Done() - m.RegisterMigration("foo", "bar") - }() - } - } - }() - - defer func() { - close(stopCh) - }() - - if tc.expectedData { - data := <-m.migrateCh - - require.Equal(t, "foo", data.storageName) - require.Equal(t, "bar", data.relativePath) - } else { - select { - case <-m.migrateCh: - t.Fatal("unexpected migration data") - default: - // Expected: channel should be empty - } - } - }) - } -} - -func TestMigrator_CancelMigration(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - desc string - data *migrationData - expectedErr error - }{ - { - desc: "current migration not-set", - data: &migrationData{}, - }, - { - desc: "wrong relative path", - data: &migrationData{ - storageName: "foo", - relativePath: "buzz", - }, - }, - { - desc: "wrong storage name", - data: &migrationData{ - storageName: "buzz", - relativePath: "bar", - }, - }, - { - desc: "success path", - data: &migrationData{ - storageName: "foo", - relativePath: "bar", - }, - expectedErr: context.Canceled, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - - parentCtx, parentCancel := context.WithCancel(context.Background()) - m := &migrator{ - state: sync.Map{}, - ctx: parentCtx, - ctxCancel: parentCancel, - } - m.state.Store(migrationKey(tc.data.storageName, tc.data.relativePath), migratorState{cancelCtx: cancel}) - - m.CancelMigration("foo", "bar") - require.ErrorIs(t, ctx.Err(), tc.expectedErr) - }) - } -} - -func TestMigrator(t *testing.T) { - t.Parallel() - - testhelper.SkipWithRaft(t, "specifically test interaction directly with the transaction manager") - - ctx := testhelper.Context(t) - cfg := testcfg.Build(t) - - logger := testhelper.NewLogger(t) - hook := testhelper.AddLoggerHook(logger) - metrics := NewMetrics() - - dbMgr, err := databasemgr.NewDBManager( - ctx, - cfg.Storages, - keyvalue.NewBadgerStore, - helper.NewTimerTickerFactory(time.Minute), - logger, - ) - require.NoError(t, err) - defer dbMgr.Close() - - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - cmdFactory := gittest.NewCommandFactory(t, cfg) - localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - - partitionFactoryOptions := []partition.FactoryOption{ - partition.WithCmdFactory(cmdFactory), - partition.WithRepoFactory(localRepoFactory), - partition.WithMetrics(partition.NewMetrics(nil)), - partition.WithRaftConfig(cfg.Raft), - } - - partitionFactory := partition.NewFactory(partitionFactoryOptions...) - - ptnMgr, err := node.NewManager(cfg.Storages, storagemgr.NewFactory( - logger, dbMgr, partitionFactory, config.DefaultMaxInactivePartitions, storagemgr.NewMetrics(cfg.Prometheus), - )) - require.NoError(t, err) - defer ptnMgr.Close() - - type setupData struct { - run func(m *migrator, repo *gitalypb.Repository) - migrationHandler migrationHandler - repo *gitalypb.Repository - startState *migratorState - } - - for _, tc := range []struct { - desc string - setup func() setupData - completed bool - attempts uint - expectedLogMsg string - }{ - { - desc: "cancelled migration", - setup: func() setupData { - ch := make(chan struct{}) - - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - return setupData{ - run: func(m *migrator, repo *gitalypb.Repository) { - ch <- struct{}{} - m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) - ch <- struct{}{} - }, - migrationHandler: &mockMigrationHandler{ch: ch}, - repo: repo, - } - }, - completed: false, - attempts: 1, - expectedLogMsg: "reftable migration failed for repository", - }, - { - desc: "existing state, cancelled migration", - setup: func() setupData { - ch := make(chan struct{}) - - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - return setupData{ - run: func(m *migrator, repo *gitalypb.Repository) { - ch <- struct{}{} - m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) - ch <- struct{}{} - }, - migrationHandler: &mockMigrationHandler{ch: ch}, - repo: repo, - startState: &migratorState{attempts: 3}, - } - }, - completed: false, - attempts: 4, - expectedLogMsg: "reftable migration failed for repository", - }, - { - desc: "repository not found error", - setup: func() setupData { - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - return setupData{ - run: func(m *migrator, repo *gitalypb.Repository) {}, - migrationHandler: &mockMigrationHandler{err: storage.ErrRepositoryNotFound}, - repo: repo, - } - }, - // When we encounter a ErrRepositoryNotFound error, we simply - // skip the migration and don't mark it as completed or attempted. - completed: false, - attempts: 0, - }, - { - desc: "successful migration", - setup: func() setupData { - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - return setupData{ - run: func(m *migrator, repo *gitalypb.Repository) {}, - migrationHandler: &mockMigrationHandler{}, - repo: repo, - } - }, - completed: true, - attempts: 1, - expectedLogMsg: "reftable migration successful for repository", - }, - } { - t.Run(tc.desc, func(t *testing.T) { - data := tc.setup() - - parentCtx, parentCancel := context.WithCancel(context.Background()) - m := &migrator{ - wg: sync.WaitGroup{}, - migrateCh: make(chan migrationData), - logger: logger, - metrics: metrics, - node: ptnMgr, - state: sync.Map{}, - migrationHandler: data.migrationHandler, - ctx: parentCtx, - ctxCancel: parentCancel, - } - - storageName := cfg.Storages[0].Name - if data.startState != nil { - m.state.Store(migrationKey(storageName, data.repo.GetRelativePath()), *data.startState) - } - - m.Run() - defer m.Close() - - // It is not guaranteed that the migration is registered, so run it in a - // loop until it is. - for { - if val, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())); ok { - if val.(migratorState).cancelCtx != nil { - break - } - } - - m.RegisterMigration(storageName, data.repo.GetRelativePath()) - } - - data.run(m, data.repo) - - // Block till the old migration is complete. - m.migrateCh <- migrationData{} - - val, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())) - state := val.(migratorState) - - require.True(t, ok) - require.Equal(t, tc.completed, state.completed) - require.Equal(t, tc.attempts, state.attempts) - - if tc.expectedLogMsg != "" { - entries := hook.AllEntries() - entry := entries[len(entries)-1] - require.Equal(t, tc.expectedLogMsg, entry.Message) - require.Greater(t, entry.Data["migration_latency"].(time.Duration), time.Duration(0)) - require.Greater(t, entry.Data["migration_attempts"].(uint), uint(0)) - } - }) - } -} diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index b995c1c68c938405160c9b747abe257559d3972a..6efb02f96a46ed4e2be1665702420138a61a291c 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -176,6 +176,10 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d gsd = opt(gsd) } + if gsd.ctx != nil { + ctx = gsd.ctx + } + // We set up the structerr interceptors so that any error metadata that gets set via // `structerr.WithMetadata()` is not only logged, but also present in the error details. serverOpts := []server.Option{ @@ -188,6 +192,16 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d deps := gsd.createDependencies(tb, ctx, cfg) tb.Cleanup(func() { testhelper.MustClose(tb, gsd.conns) }) + if gsd.interceptorsFn != nil { + unary, stream := gsd.interceptorsFn(deps.GetLogger(), deps.GetRepositoryFactory()) + for _, v := range unary { + serverOpts = append(serverOpts, server.WithUnaryInterceptor(v)) + } + for _, v := range stream { + serverOpts = append(serverOpts, server.WithStreamInterceptor(v)) + } + } + var txMiddleware server.TransactionMiddleware if deps.GetNode() != nil { unaryInterceptors := []grpc.UnaryServerInterceptor{ @@ -201,12 +215,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d ), } - if gsd.transactionInterceptorsFn != nil { - unary, stream := gsd.transactionInterceptorsFn(deps.GetLogger(), deps.GetNode(), deps.GetRepositoryFactory()) - unaryInterceptors = append(unaryInterceptors, unary...) - streamInterceptors = append(streamInterceptors, stream...) - } - txMiddleware = server.TransactionMiddleware{ UnaryInterceptors: unaryInterceptors, StreamInterceptors: streamInterceptors, @@ -290,36 +298,37 @@ func registerHealthServerIfNotRegistered(srv *grpc.Server) { } type gitalyServerDeps struct { - disablePraefect bool - logger log.Logger - conns *client.Pool - locator storage.Locator - txMgr transaction.Manager - hookMgr hook.Manager - gitlabClient gitlab.Client - gitCmdFactory gitcmd.CommandFactory - backchannelReg *backchannel.Registry - catfileCache catfile.Cache - diskCache cache.Cache - packObjectsCache streamcache.Cache - packObjectsLimiter limiter.Limiter - limitHandler *limithandler.LimiterMiddleware - repositoryCounter *counter.RepositoryCounter - updaterWithHooks *updateref.UpdaterWithHooks - housekeepingManager housekeepingmgr.Manager - backupSink *backup.Sink - backupLocator backup.Locator - signingKey string - transactionRegistry *storagemgr.TransactionRegistry - procReceiveRegistry *hook.ProcReceiveRegistry - bundleURIManager *bundleuri.GenerationManager - bundleURISink *bundleuri.Sink - bundleURIStrategy bundleuri.GenerationStrategy - localRepoFactory localrepo.Factory - migrations *[]migration.Migration - archiveCache streamcache.Cache - MigrationStateManager migration.StateManager - transactionInterceptorsFn func(log.Logger, storage.Node, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) + ctx context.Context + disablePraefect bool + logger log.Logger + conns *client.Pool + locator storage.Locator + txMgr transaction.Manager + hookMgr hook.Manager + gitlabClient gitlab.Client + gitCmdFactory gitcmd.CommandFactory + backchannelReg *backchannel.Registry + catfileCache catfile.Cache + diskCache cache.Cache + packObjectsCache streamcache.Cache + packObjectsLimiter limiter.Limiter + limitHandler *limithandler.LimiterMiddleware + repositoryCounter *counter.RepositoryCounter + updaterWithHooks *updateref.UpdaterWithHooks + housekeepingManager housekeepingmgr.Manager + backupSink *backup.Sink + backupLocator backup.Locator + signingKey string + transactionRegistry *storagemgr.TransactionRegistry + procReceiveRegistry *hook.ProcReceiveRegistry + bundleURIManager *bundleuri.GenerationManager + bundleURISink *bundleuri.Sink + bundleURIStrategy bundleuri.GenerationStrategy + localRepoFactory localrepo.Factory + migrations *[]migration.Migration + archiveCache streamcache.Cache + MigrationStateManager migration.StateManager + interceptorsFn func(log.Logger, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) } func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Context, cfg config.Cfg) *service.Dependencies { @@ -723,13 +732,13 @@ func WithRepositoryFactory(repoFactory localrepo.Factory) GitalyServerOpt { } } -// WithTransactionInterceptors allows for setting additional transaction middlewares to the server via +// WithInterceptors allows for setting additional transaction middlewares to the server via // a callback function. -func WithTransactionInterceptors( - fn func(log.Logger, storage.Node, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor), +func WithInterceptors( + fn func(log.Logger, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor), ) GitalyServerOpt { return func(deps gitalyServerDeps) gitalyServerDeps { - deps.transactionInterceptorsFn = fn + deps.interceptorsFn = fn return deps } } @@ -741,3 +750,12 @@ func WithMigrations(migrations *[]migration.Migration) GitalyServerOpt { return deps } } + +// WithContext allows using a custom context. This is for adding server code which +// rely on feature sets. +func WithContext(ctx context.Context) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.ctx = ctx + return deps + } +}