From b242348d10c9ebdd4f024920acc739394c2b21cf Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Tue, 18 Nov 2025 15:06:05 +0100 Subject: [PATCH 1/7] reftable: Remove the old WAL based migration code The reftable migration code currently used depends on the WAL. To unblock the two rollouts, we want to add a non-WAL migrator. Let's reduce clutter by removing this old code. --- internal/cli/gitaly/serve.go | 10 +- .../partition/migration/reftable/metrics.go | 56 --- .../migration/reftable/middleware.go | 110 ----- .../migration/reftable/middleware_test.go | 239 ----------- .../partition/migration/reftable/migrator.go | 241 ----------- .../migration/reftable/migrator_test.go | 383 ------------------ .../migration/reftable/testhelper_test.go | 11 - 7 files changed, 1 insertion(+), 1049 deletions(-) delete mode 100644 internal/gitaly/storage/storagemgr/partition/migration/reftable/metrics.go delete mode 100644 internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go delete mode 100644 internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go delete mode 100644 internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go delete mode 100644 internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go delete mode 100644 internal/gitaly/storage/storagemgr/partition/migration/reftable/testhelper_test.go diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 0531be28fa..51d33b5356 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -41,7 +41,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/migration" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/migration/reftable" "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/transaction" "gitlab.com/gitlab-org/gitaly/v18/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/backchannel" @@ -370,8 +369,7 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { raftMetrics := raftmgr.NewMetrics() partitionMetrics := partition.NewMetrics(housekeepingMetrics) migrationMetrics := migration.NewMetrics() - reftableMigratorMetrics := reftable.NewMetrics() - prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics, raftMetrics, reftableMigratorMetrics) + prometheus.MustRegister(housekeepingMetrics, storageMetrics, partitionMetrics, migrationMetrics, raftMetrics) migrations := []migration.Migration{ migration.NewLeftoverFileMigration(locator), @@ -501,18 +499,12 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { return err } - reftableMigrator := reftable.NewMigrator(logger, reftableMigratorMetrics, node, localrepoFactory) - reftableMigrator.Run() - defer reftableMigrator.Close() - txMiddleware = server.TransactionMiddleware{ UnaryInterceptors: []grpc.UnaryServerInterceptor{ storagemgr.NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, txRegistry, node, locator), - reftable.NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator), }, StreamInterceptors: []grpc.StreamServerInterceptor{ storagemgr.NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, txRegistry, node, locator), - reftable.NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator), }, } } else { diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/metrics.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/metrics.go deleted file mode 100644 index 50acb85e8a..0000000000 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/metrics.go +++ /dev/null @@ -1,56 +0,0 @@ -package reftable - -import ( - "context" - "errors" - - "github.com/prometheus/client_golang/prometheus" -) - -// Metrics contains the metrics collected across reftable migrations. -type Metrics struct { - // latencyMetric is a metric to capture latency of the reftable migration. - // This is only logged for successful migrations, so the count would also - // provide the number of successful migrations. - latencyMetric *prometheus.HistogramVec - // failsMetric is a metric to capture the number of migration failures. - failsMetric *prometheus.CounterVec -} - -func failMetricReason(err error) string { - if errors.Is(err, context.Canceled) { - return "context_cancelled" - } - return "migration_error" -} - -// NewMetrics returns a new Metrics instance. -func NewMetrics() Metrics { - return Metrics{ - latencyMetric: prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "gitaly_reftable_migration_latency_seconds", - Help: "Latency of a successful repository migration", - }, - []string{}, - ), - failsMetric: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gitaly_reftable_migration_failure", - Help: "Counter of the total number of migration failures", - }, - []string{"reason"}, - ), - } -} - -// Describe implements prometheus.Collector. -func (m Metrics) Describe(descs chan<- *prometheus.Desc) { - prometheus.DescribeByCollect(m, descs) -} - -// Collect implements prometheus.Collector. -func (m Metrics) Collect(metrics chan<- prometheus.Metric) { - m.latencyMetric.Collect(metrics) - m.failsMetric.Collect(metrics) -} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go deleted file mode 100644 index 480d411e42..0000000000 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go +++ /dev/null @@ -1,110 +0,0 @@ -package reftable - -import ( - "context" - "fmt" - - "gitlab.com/gitlab-org/gitaly/v18/internal/featureflag" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/protoregistry" - "gitlab.com/gitlab-org/gitaly/v18/internal/log" - "gitlab.com/gitlab-org/gitaly/v18/middleware" - "google.golang.org/grpc" - "google.golang.org/protobuf/proto" -) - -type migrationRegister interface { - RegisterMigration(storageName, relativePath string) - CancelMigration(storageName, relativePath string) -} - -// NewUnaryInterceptor is an oppurtunistic middleware to aid in reftable migration. -// It only registers a migration for an incoming ACCESSOR request. If any other -// type of request is received, it tries to cancel the migration if any exist -// and are ongoing. -func NewUnaryInterceptor(logger log.Logger, registry *protoregistry.Registry, register migrationRegister) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, returnedErr error) { - tx := storage.ExtractTransaction(ctx) - - if tx != nil && featureflag.ReftableMigration.IsEnabled(ctx) { - methodInfo, err := registry.LookupMethod(info.FullMethod) - if err != nil { - return nil, fmt.Errorf("lookup method: %w", err) - } - - targetRepo, err := methodInfo.TargetRepo(req.(proto.Message)) - if err != nil { - return nil, fmt.Errorf("extract repository: %w", err) - } - - targetRepo = tx.OriginalRepository(targetRepo) - - switch methodInfo.Operation { - case protoregistry.OpAccessor: - register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - case protoregistry.OpMutator: - defer register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - fallthrough - default: - // Cancel any ongoing migrations to avoid conflicts - // but schedule one to start after we serve the request - register.CancelMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - } - } - - return handler(ctx, req) - } -} - -// NewStreamInterceptor is an oppurtunistic middleware to aid in reftable migration. -// It only registers a migration for an incoming ACCESSOR request. If any other -// type of request is received, it tries to cancel the migration if any exist -// and are ongoing. -// -// For streaming RPCs we consume the first request and wrap it again before calling -// the next handler. This ensures that the next request doesn't miss the first message. -func NewStreamInterceptor(logger log.Logger, registry *protoregistry.Registry, register migrationRegister) grpc.StreamServerInterceptor { - return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (returnedErr error) { - tx := storage.ExtractTransaction(ss.Context()) - - if tx != nil && featureflag.ReftableMigration.IsEnabled(ss.Context()) { - methodInfo, err := registry.LookupMethod(info.FullMethod) - if err != nil { - return fmt.Errorf("lookup method: %w", err) - } - - req := methodInfo.NewRequest() - if err := ss.RecvMsg(req); err != nil { - // All of the repository scoped streaming RPCs send the repository in the first message. - // Generally it should be fine to error out in all cases if there is no message sent. - // To maintain compatibility with tests, we instead invoke the handler to let them return - // the asserted error messages. Once the transaction management is on by default, we should - // error out here directly and amend the failing test cases. - return handler(srv, middleware.NewPeekedStream(ss.Context(), nil, err, ss)) - } - - targetRepo, err := methodInfo.TargetRepo(req) - if err != nil { - return fmt.Errorf("extract repository: %w", err) - } - - targetRepo = tx.OriginalRepository(targetRepo) - - switch methodInfo.Operation { - case protoregistry.OpAccessor: - register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - case protoregistry.OpMutator: - defer register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - fallthrough - default: - // Cancel any ongoing migrations to avoid conflicts - // but schedule one to start after we serve the request - register.CancelMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - } - - return handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) - } - - return handler(srv, ss) - } -} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go deleted file mode 100644 index e552515e30..0000000000 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go +++ /dev/null @@ -1,239 +0,0 @@ -package reftable - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v18/internal/featureflag" - "gitlab.com/gitlab-org/gitaly/v18/internal/git" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/commit" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/hook" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/ref" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/repository" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/client" - "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/protoregistry" - "gitlab.com/gitlab-org/gitaly/v18/internal/log" - "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" - "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testserver" - "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" - "google.golang.org/grpc" -) - -type mockReftableMigrator struct { - registerCount int - cancelCount int - reftableMigrator *migrator -} - -func (m *mockReftableMigrator) RegisterMigration(storageName, relativePath string) { - m.registerCount++ - m.reftableMigrator.RegisterMigration(storageName, relativePath) -} - -func (m *mockReftableMigrator) CancelMigration(storageName, relativePath string) { - m.cancelCount++ - m.reftableMigrator.CancelMigration(storageName, relativePath) -} - -func TestInterceptor(t *testing.T) { - t.Parallel() - - testhelper.NewFeatureSets( - featureflag.ReftableMigration, - ).Run(t, testInterceptor) -} - -func testInterceptor(t *testing.T, ctx context.Context) { - cfg := testcfg.Build(t) - - if !testhelper.IsWALEnabled() { - t.Skip("only works with the WAL") - } - - // Ideally we should use an RPC routed via praefect to avoid the race. - if testhelper.IsPraefectEnabled() { - t.Skip("usage of gittest.WriteCommit causes a race condition.") - } - - mockMigrator := mockReftableMigrator{} - var reftableMigrator *migrator - callback := func(logger log.Logger, node storage.Node, factory localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) { - reftableMigrator = NewMigrator(logger, NewMetrics(), node, factory) - reftableMigrator.Run() - - mockMigrator.reftableMigrator = reftableMigrator - - return []grpc.UnaryServerInterceptor{NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, &mockMigrator)}, - []grpc.StreamServerInterceptor{NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, &mockMigrator)} - } - - serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { - gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(deps)) - gitalypb.RegisterRefServiceServer(srv, ref.NewServer(deps)) - gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps)) - gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps)) - }, testserver.WithTransactionInterceptors(callback)) - cfg.SocketPath = serverSocketPath - - conn, err := client.New(ctx, serverSocketPath) - require.NoError(t, err) - t.Cleanup(func() { conn.Close() }) - - for _, tc := range []struct { - desc string - setup func(repoProto *gitalypb.Repository, commitID git.ObjectID) - expectedRegistrations int - expectedCancellations int - }{ - { - desc: "unary accessor", - setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { - request := &gitalypb.FindCommitRequest{ - Repository: repoProto, - Revision: []byte("main"), - } - - client := gitalypb.NewCommitServiceClient(conn) - - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx = testhelper.MergeOutgoingMetadata(ctx, md) - - _, err := client.FindCommit(ctx, request) - require.NoError(t, err) - }, - expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), - expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), - }, - { - desc: "unary mutator", - setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { - client := gitalypb.NewRefServiceClient(conn) - - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx = testhelper.MergeOutgoingMetadata(ctx, md) - - _, err := client.DeleteRefs(ctx, &gitalypb.DeleteRefsRequest{ - Repository: repoProto, - Refs: [][]byte{ - []byte("refs/heads/main"), - }, - }) - require.NoError(t, err) - }, - expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), - expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 3, 0), - }, - { - desc: "stream accessor", - setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { - client := gitalypb.NewRefServiceClient(conn) - - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx = testhelper.MergeOutgoingMetadata(ctx, md) - - stream, err := client.FindAllBranches(ctx, &gitalypb.FindAllBranchesRequest{ - Repository: repoProto, - }) - require.NoError(t, err) - - _, err = testhelper.ReceiveAndFold(stream.Recv, func( - result []*gitalypb.FindAllBranchesResponse_Branch, - response *gitalypb.FindAllBranchesResponse, - ) []*gitalypb.FindAllBranchesResponse_Branch { - if response == nil { - return result - } - - return append(result, response.GetBranches()...) - }) - require.NoError(t, err) - }, - expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), - expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), - }, - { - desc: "stream mutator", - setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { - client := gitalypb.NewRefServiceClient(conn) - - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx = testhelper.MergeOutgoingMetadata(ctx, md) - - updater, err := client.UpdateReferences(ctx) - require.NoError(t, err) - - err = updater.Send(&gitalypb.UpdateReferencesRequest{ - Repository: repoProto, - Updates: []*gitalypb.UpdateReferencesRequest_Update{ - { - Reference: []byte("refs/heads/fun"), - OldObjectId: []byte(gittest.DefaultObjectHash.ZeroOID), - NewObjectId: []byte(commitID), - }, - }, - }) - require.NoError(t, err) - - _, err = updater.CloseAndRecv() - require.NoError(t, err) - }, - expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), - expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 3, 0), - }, - } { - t.Run(tc.desc, func(t *testing.T) { - // Reset the mock migrator count for each test - mockMigrator.registerCount, mockMigrator.cancelCount = 0, 0 - - // To avoid migration being triggered by CreateRepository, we trick into - // believing that the repository has already completed a migraiton. - relativePath := gittest.NewRepositoryName(t) - key := migrationKey(cfg.Storages[0].Name, relativePath) - reftableMigrator.state.Store(key, migratorState{completed: true}) - - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - RelativePath: relativePath, - }) - - reftableMigrator.state.Delete(key) - - commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), - gittest.WithTreeEntries( - gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, - ), - ) - - tc.setup(repoProto, commitID) - - // Block to ensure the previous migration was successful. - reftableMigrator.migrateCh <- migrationData{} - - repoInfo, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ - Repository: repoProto, - }) - require.NoError(t, err) - - require.Equal(t, - testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - gittest.FilesOrReftables( - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - ), - ), - repoInfo.GetReferences().GetReferenceBackend(), - ) - - require.Equal(t, tc.expectedRegistrations, mockMigrator.registerCount) - require.Equal(t, tc.expectedCancellations, mockMigrator.cancelCount) - }) - } - - reftableMigrator.Close() -} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go deleted file mode 100644 index 389be226db..0000000000 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go +++ /dev/null @@ -1,241 +0,0 @@ -package reftable - -import ( - "context" - "errors" - "fmt" - "math" - "sync" - "time" - - "gitlab.com/gitlab-org/gitaly/v18/internal/git" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/migration" - migrationid "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition/migration/id" - "gitlab.com/gitlab-org/gitaly/v18/internal/log" -) - -type migrationHandler interface { - Migrate(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error -} - -type refBackendMigrator struct { - migration.Migration -} - -func (r *refBackendMigrator) Migrate(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { - return r.Fn(ctx, tx, storageName, relativePath) -} - -type migratorState struct { - completed bool - attempts uint - coolDown time.Time - cancelCtx context.CancelFunc -} - -type migrationData struct { - relativePath string - storageName string -} - -type migrator struct { - wg sync.WaitGroup - migrateCh chan migrationData - - logger log.Logger - metrics Metrics - node storage.Node - migrationHandler migrationHandler - - ctx context.Context - ctxCancel context.CancelFunc - - state sync.Map -} - -// NewMigrator provides a new reftable migrator. The migrator holds -// in-memory state regarding migrations attempted. Failed migrations -// have a exponential cooldown penalty before the next attempt. -// -// The migrator must first be initialized via the `Run()` function, -// which spawns the goroutine which listens for migrations and runs -// a single migration at a given time. -// -// The `RegisterMigration()` function can be used to register a new -// migration, however this function is non-blocking and can skip -// registering a migration if there is already one being processed. -// This makes it safe to be called multiple times in hot-paths of -// the code. -// -// Finally, `CancelMigration()` can be used to cancel an ongoing -// migration if necessary. -func NewMigrator(logger log.Logger, metrics Metrics, node storage.Node, localRepoFactory localrepo.Factory) *migrator { - ctx, cancel := context.WithCancel(context.Background()) - - return &migrator{ - migrateCh: make(chan migrationData), - logger: logger, - metrics: metrics, - node: node, - state: sync.Map{}, - migrationHandler: &refBackendMigrator{ - migration.NewReferenceBackendMigration(migrationid.Reftable, git.ReferenceBackendReftables, localRepoFactory, nil), - }, - ctx: ctx, - ctxCancel: cancel, - } -} - -func (m *migrator) migrate(ctx context.Context, storageName, relativePath string) (_ time.Duration, returnedErr error) { - storageHandle, err := m.node.GetStorage(storageName) - if err != nil { - return 0, fmt.Errorf("get storage: %w", err) - } - - t := time.Now() - - tx, err := storageHandle.Begin(ctx, storage.TransactionOptions{RelativePath: relativePath}) - if err != nil { - return time.Since(t), fmt.Errorf("start transaction: %w", err) - } - defer func() { - if returnedErr != nil { - if err := tx.Rollback(ctx); err != nil { - returnedErr = errors.Join(err, fmt.Errorf("rollback: %w", err)) - } - } else { - commitLSN, err := tx.Commit(ctx) - if err != nil { - returnedErr = errors.Join(err, fmt.Errorf("commit: %w", err)) - return - } - - storage.LogTransactionCommit(ctx, m.logger, commitLSN, "reftable migration") - } - }() - - if err := m.migrationHandler.Migrate(ctx, tx, storageName, relativePath); err != nil { - return time.Since(t), fmt.Errorf("run migration: %w", err) - } - - return time.Since(t), nil -} - -// Run is used to spawn the goroutine which listens to new migration -// requests. It takes a very safe approach to run a single migration -// at a given time across all repositories.. -func (m *migrator) Run() { - m.wg.Add(1) - go func() { - defer m.wg.Done() - - for { - select { - case <-m.ctx.Done(): - return - case data := <-m.migrateCh: - func() { - if data.relativePath == "" || data.storageName == "" { - return - } - - ctx, cancel := context.WithCancel(m.ctx) - - key := migrationKey(data.storageName, data.relativePath) - - val, ok := m.state.LoadOrStore(key, migratorState{cancelCtx: cancel}) - state := val.(migratorState) - - // If the state was present, we still need to store our - // cancellation function. - if ok { - state.cancelCtx = cancel - m.state.Store(key, state) - } - - // We don't do 'defer m.state.Store(...)' here, because that would - // fix the state as is here. We want to delay the evaluvation of the - // state - defer func() { - m.state.Store(key, state) - }() - - if state.completed || state.coolDown.After(time.Now()) { - return - } - - latency, err := m.migrate(ctx, data.storageName, data.relativePath) - // We shouldn't care about migration status for repositories which don't - // event exist. - if errors.Is(err, storage.ErrRepositoryNotFound) { - return - } - - state.attempts = state.attempts + 1 - state.cancelCtx = nil - - if err != nil { - m.logger.WithError(err).WithFields(log.Fields{ - "storage_name": data.storageName, - "relative_path": data.relativePath, - "migration_latency": latency, - "migration_attempts": state.attempts, - }).ErrorContext(ctx, "reftable migration failed for repository") - m.metrics.failsMetric.WithLabelValues(failMetricReason(err)).Add(1) - - // Let's delay exponentially, but with a max of 6hrs - delay := min(math.Pow(2, float64(state.attempts)), 6) - state.coolDown = time.Now().Add(time.Duration(delay) * time.Hour) - } else { - m.logger.WithFields(log.Fields{ - "storage_name": data.storageName, - "relative_path": data.relativePath, - "migration_latency": latency, - "migration_attempts": state.attempts, - }).InfoContext(ctx, "reftable migration successful for repository") - m.metrics.latencyMetric.WithLabelValues().Observe(latency.Seconds()) - - state.completed = true - } - }() - } - } - }() -} - -// Close is used to stop the migrator. -func (m *migrator) Close() { - defer m.wg.Wait() - m.ctxCancel() -} - -// RegisterMigration is used to register a new migration. This function -// is non-blocking and doesn't return an error. It attempts to register -// a migration but exits if the migrator is already processing one. -func (m *migrator) RegisterMigration(storageName, relativePath string) { - select { - case m.migrateCh <- migrationData{relativePath: relativePath, storageName: storageName}: - default: - return - } -} - -// CancelMigration cancels the ongoing migration if it matches the -// state provided. -func (m *migrator) CancelMigration(storageName, relativePath string) { - val, ok := m.state.Load(migrationKey(storageName, relativePath)) - - if !ok { - return - } - - if cancel := val.(migratorState).cancelCtx; cancel != nil { - cancel() - } -} - -func migrationKey(storageName, relativePath string) string { - return fmt.Sprintf("%s-%s", storageName, relativePath) -} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go deleted file mode 100644 index 4a470707c4..0000000000 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go +++ /dev/null @@ -1,383 +0,0 @@ -package reftable - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/catfile" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" - "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/config" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/keyvalue/databasemgr" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/node" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr" - "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage/storagemgr/partition" - "gitlab.com/gitlab-org/gitaly/v18/internal/helper" - "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" - "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" - "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" -) - -type mockMigrationHandler struct { - ch <-chan struct{} - err error -} - -func (m *mockMigrationHandler) Migrate(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { - if m.ch != nil { - <-m.ch - <-m.ch - } - - if m.err != nil { - return m.err - } - - return nil -} - -func TestMigrator_RegisterMigration(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - desc string - state migratorState - expectedData bool - }{ - { - desc: "migration already completed", - state: migratorState{ - completed: true, - }, - }, - { - desc: "migration in cooldown period", - state: migratorState{ - coolDown: time.Now().Add(1 * time.Hour), - }, - }, - { - desc: "migration with expired cooldown period", - state: migratorState{ - coolDown: time.Now().Add(-1 * time.Hour), - }, - expectedData: true, - }, - { - desc: "migration state doesn't exist", - expectedData: true, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - t.Parallel() - - parentCtx, parentCancel := context.WithCancel(context.Background()) - m := &migrator{ - migrateCh: make(chan migrationData), - state: sync.Map{}, - ctx: parentCtx, - ctxCancel: parentCancel, - } - - var wg sync.WaitGroup - defer wg.Wait() - - stopCh := make(chan struct{}) - - // We raise multiple gorountines, only one would go through. - // The others would go to the default case. - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-stopCh: - return - default: - wg.Add(1) - go func() { - defer wg.Done() - m.RegisterMigration("foo", "bar") - }() - } - } - }() - - defer func() { - close(stopCh) - }() - - if tc.expectedData { - data := <-m.migrateCh - - require.Equal(t, "foo", data.storageName) - require.Equal(t, "bar", data.relativePath) - } else { - select { - case <-m.migrateCh: - t.Fatal("unexpected migration data") - default: - // Expected: channel should be empty - } - } - }) - } -} - -func TestMigrator_CancelMigration(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - desc string - data *migrationData - expectedErr error - }{ - { - desc: "current migration not-set", - data: &migrationData{}, - }, - { - desc: "wrong relative path", - data: &migrationData{ - storageName: "foo", - relativePath: "buzz", - }, - }, - { - desc: "wrong storage name", - data: &migrationData{ - storageName: "buzz", - relativePath: "bar", - }, - }, - { - desc: "success path", - data: &migrationData{ - storageName: "foo", - relativePath: "bar", - }, - expectedErr: context.Canceled, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - t.Parallel() - - ctx, cancel := context.WithCancel(context.Background()) - - parentCtx, parentCancel := context.WithCancel(context.Background()) - m := &migrator{ - state: sync.Map{}, - ctx: parentCtx, - ctxCancel: parentCancel, - } - m.state.Store(migrationKey(tc.data.storageName, tc.data.relativePath), migratorState{cancelCtx: cancel}) - - m.CancelMigration("foo", "bar") - require.ErrorIs(t, ctx.Err(), tc.expectedErr) - }) - } -} - -func TestMigrator(t *testing.T) { - t.Parallel() - - testhelper.SkipWithRaft(t, "specifically test interaction directly with the transaction manager") - - ctx := testhelper.Context(t) - cfg := testcfg.Build(t) - - logger := testhelper.NewLogger(t) - hook := testhelper.AddLoggerHook(logger) - metrics := NewMetrics() - - dbMgr, err := databasemgr.NewDBManager( - ctx, - cfg.Storages, - keyvalue.NewBadgerStore, - helper.NewTimerTickerFactory(time.Minute), - logger, - ) - require.NoError(t, err) - defer dbMgr.Close() - - catfileCache := catfile.NewCache(cfg) - t.Cleanup(catfileCache.Stop) - - cmdFactory := gittest.NewCommandFactory(t, cfg) - localRepoFactory := localrepo.NewFactory(logger, config.NewLocator(cfg), cmdFactory, catfileCache) - - partitionFactoryOptions := []partition.FactoryOption{ - partition.WithCmdFactory(cmdFactory), - partition.WithRepoFactory(localRepoFactory), - partition.WithMetrics(partition.NewMetrics(nil)), - partition.WithRaftConfig(cfg.Raft), - } - - partitionFactory := partition.NewFactory(partitionFactoryOptions...) - - ptnMgr, err := node.NewManager(cfg.Storages, storagemgr.NewFactory( - logger, dbMgr, partitionFactory, config.DefaultMaxInactivePartitions, storagemgr.NewMetrics(cfg.Prometheus), - )) - require.NoError(t, err) - defer ptnMgr.Close() - - type setupData struct { - run func(m *migrator, repo *gitalypb.Repository) - migrationHandler migrationHandler - repo *gitalypb.Repository - startState *migratorState - } - - for _, tc := range []struct { - desc string - setup func() setupData - completed bool - attempts uint - expectedLogMsg string - }{ - { - desc: "cancelled migration", - setup: func() setupData { - ch := make(chan struct{}) - - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - return setupData{ - run: func(m *migrator, repo *gitalypb.Repository) { - ch <- struct{}{} - m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) - ch <- struct{}{} - }, - migrationHandler: &mockMigrationHandler{ch: ch}, - repo: repo, - } - }, - completed: false, - attempts: 1, - expectedLogMsg: "reftable migration failed for repository", - }, - { - desc: "existing state, cancelled migration", - setup: func() setupData { - ch := make(chan struct{}) - - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - return setupData{ - run: func(m *migrator, repo *gitalypb.Repository) { - ch <- struct{}{} - m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) - ch <- struct{}{} - }, - migrationHandler: &mockMigrationHandler{ch: ch}, - repo: repo, - startState: &migratorState{attempts: 3}, - } - }, - completed: false, - attempts: 4, - expectedLogMsg: "reftable migration failed for repository", - }, - { - desc: "repository not found error", - setup: func() setupData { - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - return setupData{ - run: func(m *migrator, repo *gitalypb.Repository) {}, - migrationHandler: &mockMigrationHandler{err: storage.ErrRepositoryNotFound}, - repo: repo, - } - }, - // When we encounter a ErrRepositoryNotFound error, we simply - // skip the migration and don't mark it as completed or attempted. - completed: false, - attempts: 0, - }, - { - desc: "successful migration", - setup: func() setupData { - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - - return setupData{ - run: func(m *migrator, repo *gitalypb.Repository) {}, - migrationHandler: &mockMigrationHandler{}, - repo: repo, - } - }, - completed: true, - attempts: 1, - expectedLogMsg: "reftable migration successful for repository", - }, - } { - t.Run(tc.desc, func(t *testing.T) { - data := tc.setup() - - parentCtx, parentCancel := context.WithCancel(context.Background()) - m := &migrator{ - wg: sync.WaitGroup{}, - migrateCh: make(chan migrationData), - logger: logger, - metrics: metrics, - node: ptnMgr, - state: sync.Map{}, - migrationHandler: data.migrationHandler, - ctx: parentCtx, - ctxCancel: parentCancel, - } - - storageName := cfg.Storages[0].Name - if data.startState != nil { - m.state.Store(migrationKey(storageName, data.repo.GetRelativePath()), *data.startState) - } - - m.Run() - defer m.Close() - - // It is not guaranteed that the migration is registered, so run it in a - // loop until it is. - for { - if val, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())); ok { - if val.(migratorState).cancelCtx != nil { - break - } - } - - m.RegisterMigration(storageName, data.repo.GetRelativePath()) - } - - data.run(m, data.repo) - - // Block till the old migration is complete. - m.migrateCh <- migrationData{} - - val, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())) - state := val.(migratorState) - - require.True(t, ok) - require.Equal(t, tc.completed, state.completed) - require.Equal(t, tc.attempts, state.attempts) - - if tc.expectedLogMsg != "" { - entries := hook.AllEntries() - entry := entries[len(entries)-1] - require.Equal(t, tc.expectedLogMsg, entry.Message) - require.Greater(t, entry.Data["migration_latency"].(time.Duration), time.Duration(0)) - require.Greater(t, entry.Data["migration_attempts"].(uint), uint(0)) - } - }) - } -} diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/testhelper_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/testhelper_test.go deleted file mode 100644 index 2bb5a311cf..0000000000 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/testhelper_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package reftable - -import ( - "testing" - - "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" -) - -func TestMain(m *testing.M) { - testhelper.Run(m) -} -- GitLab From 9d5c006e32f9d803a3399db2d4476310c3836f25 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Tue, 18 Nov 2025 20:22:47 +0100 Subject: [PATCH 2/7] updateref: Add option to set GIT_REF_URI env In Git, the env variable 'GIT_REF_URI' is used to set an alternate reference directory for doing reference operations. Let's add support for that in UpdateRef. --- internal/git/updateref/updateref.go | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/internal/git/updateref/updateref.go b/internal/git/updateref/updateref.go index 8f85d000ba..eba997dd5c 100644 --- a/internal/git/updateref/updateref.go +++ b/internal/git/updateref/updateref.go @@ -270,6 +270,7 @@ type UpdaterOpt func(*updaterConfig) type updaterConfig struct { disableTransactions bool noDeref bool + refURI string } // WithDisabledTransactions disables hooks such that no reference-transactions @@ -280,6 +281,14 @@ func WithDisabledTransactions() UpdaterOpt { } } +// WithRefURI allows sets the GIT_REF_URI env variable which allows using +// an alternate reference directory for all reference operations. +func WithRefURI(uri string) UpdaterOpt { + return func(cfg *updaterConfig) { + cfg.refURI = uri + } +} + // WithNoDeref disables de-reference while updating ref. If this option is turned on, // itself is overwritten, rather than the result of following the symbolic ref. func WithNoDeref() UpdaterOpt { @@ -310,21 +319,29 @@ func New(ctx context.Context, repo gitcmd.RepositoryExecutor, opts ...UpdaterOpt txOption = gitcmd.WithDisabledHooks() } + var stderr bytes.Buffer + cmdOpts := []gitcmd.CmdOpt{ + txOption, + gitcmd.WithSetupStdin(), + gitcmd.WithSetupStdout(), + gitcmd.WithStderr(&stderr), + } + + if len(cfg.refURI) != 0 { + cmdOpts = append(cmdOpts, gitcmd.WithEnv("GIT_REF_URI="+cfg.refURI)) + } + cmdFlags := []gitcmd.Option{gitcmd.Flag{Name: "-z"}, gitcmd.Flag{Name: "--stdin"}} if cfg.noDeref { cmdFlags = append(cmdFlags, gitcmd.Flag{Name: "--no-deref"}) } - var stderr bytes.Buffer cmd, err := repo.Exec(ctx, gitcmd.Command{ Name: "update-ref", Flags: cmdFlags, }, - txOption, - gitcmd.WithSetupStdin(), - gitcmd.WithSetupStdout(), - gitcmd.WithStderr(&stderr), + cmdOpts..., ) if err != nil { return nil, err -- GitLab From f1ef6559b1496dac4e901787bd44fbb9f9432bf3 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Mon, 24 Nov 2025 19:34:49 +0100 Subject: [PATCH 3/7] updateref: Remove `git.Version` dependency in `UpdateSymbolicReference` The call to `UpdateSymbolicReference` requires a `git.Version` argument. This is never used, cleanup the code and its callees. --- internal/git/localrepo/refs.go | 10 ++-------- internal/git/localrepo/refs_external_test.go | 5 +---- internal/git/updateref/updateref.go | 2 +- internal/git/updateref/updateref_test.go | 7 ++----- 4 files changed, 6 insertions(+), 18 deletions(-) diff --git a/internal/git/localrepo/refs.go b/internal/git/localrepo/refs.go index 55c1fe0db2..cde4c391df 100644 --- a/internal/git/localrepo/refs.go +++ b/internal/git/localrepo/refs.go @@ -149,23 +149,17 @@ func (repo *Repo) UpdateRef(ctx context.Context, reference git.ReferenceName, ne // SetDefaultBranch sets the repository's HEAD to point to the given reference. // It will not verify the reference actually exists. func (repo *Repo) SetDefaultBranch(ctx context.Context, txManager transaction.Manager, reference git.ReferenceName) error { - version, err := repo.GitVersion(ctx) - if err != nil { - return fmt.Errorf("detecting Git version: %w", err) - } - if err := git.ValidateReference(reference.String()); err != nil { return fmt.Errorf("%q is a malformed refname", reference) } - return repo.setDefaultBranchWithUpdateRef(ctx, reference, version) + return repo.setDefaultBranchWithUpdateRef(ctx, reference) } // setDefaultBranchWithUpdateRef uses 'symref-update' command to update HEAD. func (repo *Repo) setDefaultBranchWithUpdateRef( ctx context.Context, reference git.ReferenceName, - version git.Version, ) (err error) { updater, err := updateref.New(ctx, repo, updateref.WithNoDeref()) if err != nil { @@ -182,7 +176,7 @@ func (repo *Repo) setDefaultBranchWithUpdateRef( return fmt.Errorf("start: %w", err) } - if err = updater.UpdateSymbolicReference(version, "HEAD", reference); err != nil { + if err = updater.UpdateSymbolicReference("HEAD", reference); err != nil { return fmt.Errorf("update: %w", err) } diff --git a/internal/git/localrepo/refs_external_test.go b/internal/git/localrepo/refs_external_test.go index a399619af4..38c2cf0e4d 100644 --- a/internal/git/localrepo/refs_external_test.go +++ b/internal/git/localrepo/refs_external_test.go @@ -159,14 +159,11 @@ func TestRepo_SetDefaultBranch_errors(t *testing.T) { ref, err := repo.HeadReference(ctx) require.NoError(t, err) - version, err := repo.GitVersion(ctx) - require.NoError(t, err) - updater, err := updateref.New(ctx, repo) require.NoError(t, err) require.NoError(t, updater.Start()) - require.NoError(t, updater.UpdateSymbolicReference(version, "HEAD", "refs/heads/temp")) + require.NoError(t, updater.UpdateSymbolicReference("HEAD", "refs/heads/temp")) require.NoError(t, updater.Prepare()) t.Cleanup(func() { require.NoError(t, updater.Close()) }) diff --git a/internal/git/updateref/updateref.go b/internal/git/updateref/updateref.go index eba997dd5c..21ee5fff1c 100644 --- a/internal/git/updateref/updateref.go +++ b/internal/git/updateref/updateref.go @@ -414,7 +414,7 @@ func (u *Updater) Update(reference git.ReferenceName, newOID, oldOID git.ObjectI // UpdateSymbolicReference is used to do a symbolic reference update. We can potentially provide the oldTarget // or the oldOID. -func (u *Updater) UpdateSymbolicReference(version git.Version, reference, newTarget git.ReferenceName) error { +func (u *Updater) UpdateSymbolicReference(reference, newTarget git.ReferenceName) error { if err := u.expectState(stateStarted); err != nil { return err } diff --git a/internal/git/updateref/updateref_test.go b/internal/git/updateref/updateref_test.go index 7a8c4b50f7..5fbe088e53 100644 --- a/internal/git/updateref/updateref_test.go +++ b/internal/git/updateref/updateref_test.go @@ -964,17 +964,14 @@ func TestUpdater_symrefs(t *testing.T) { ctx := testhelper.Context(t) - cfg, executor, repoPath, updater := setupUpdater(t, ctx) + cfg, _, repoPath, updater := setupUpdater(t, ctx) defer testhelper.MustClose(t, updater) - actual, err := executor.GitVersion(ctx) - require.NoError(t, err) - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("master")) require.NoError(t, updater.Start()) - require.NoError(t, updater.UpdateSymbolicReference(actual, "refs/heads/symref", "refs/heads/master")) + require.NoError(t, updater.UpdateSymbolicReference("refs/heads/symref", "refs/heads/master")) require.NoError(t, updater.Commit()) // Verify that the reference was created as expected. -- GitLab From 1a885f12c24bf29a08191923b51a1e30fda76227 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Tue, 25 Nov 2025 14:58:33 +0100 Subject: [PATCH 4/7] reftable: Add code for facilitating a migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While the earlier ideology was to migrate repositories using the WAL. It's been now decided to unblock the two efforts and move them independently. To do this, we add the required migration code. Since the files backend doesn't support any form of snap-shotting without the WAL, migrations will be blocking in nature. But we can reduce the amount of blocking to almost be negligible. This is done as follows: 1. Run git-pack-refs(1) and note timestamp of the generated packed-refs file. 2. Run git refs migrate –dry-run. 3. If there are no ongoing reference requests (read/write) a. Lock the repository by blocking incoming requests (done on a layer above git, in Gitaly [1]). b. If the timestamp of the packed-refs file has changed, unlock the repo and repeat from step 1. c. Apply all the loose refs to the dry-run reftable folder (this requires support in Git to write refs to arbitrary folder). d. Move the reftable dry-run folder into the GIT_DIR. e. Swap the repo config f. Unlock repo access This way, the O(N) for blocking is only dependent on the loose refs created post the packed-refs generated. Generally Gitaly repacks repositories on a regular basis and as such the number of loose references should be low. --- internal/git/reference.go | 6 + internal/gitaly/reftable/migrator.go | 248 ++++++++++++++++++++ internal/gitaly/reftable/migrator_test.go | 218 +++++++++++++++++ internal/gitaly/reftable/testhelper_test.go | 11 + 4 files changed, 483 insertions(+) create mode 100644 internal/gitaly/reftable/migrator.go create mode 100644 internal/gitaly/reftable/migrator_test.go create mode 100644 internal/gitaly/reftable/testhelper_test.go diff --git a/internal/git/reference.go b/internal/git/reference.go index cb888e620a..5787159f8a 100644 --- a/internal/git/reference.go +++ b/internal/git/reference.go @@ -225,3 +225,9 @@ func ValidateReference(name string) error { name = tail } } + +// CreateRefURI creates the GIT_REF_URI value for the given +// reference backend and path. +func CreateRefURI(backend ReferenceBackend, path string) string { + return fmt.Sprintf("%s://%s", backend.Name, path) +} diff --git a/internal/gitaly/reftable/migrator.go b/internal/gitaly/reftable/migrator.go new file mode 100644 index 0000000000..13c3b84e57 --- /dev/null +++ b/internal/gitaly/reftable/migrator.go @@ -0,0 +1,248 @@ +package reftable + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + + "regexp" + + "gitlab.com/gitlab-org/gitaly/v18/internal/git" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gitcmd" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/updateref" +) + +var nameRegex = regexp.MustCompile("^[[:ascii:]]*'([[:ascii:]]*)'\n$") + +type blocker interface { + Block() error + Unblock() error + Repo() +} + +func migrateWithDryRun(ctx context.Context, repo *localrepo.Repo) (string, error) { + cmd, err := repo.Exec(ctx, gitcmd.Command{ + Name: "refs", + Action: "migrate", + Flags: []gitcmd.Option{ + gitcmd.Flag{Name: "--ref-format=reftable"}, + gitcmd.Flag{Name: "--no-reflog"}, + gitcmd.Flag{Name: "--dry-run"}, + }, + }, gitcmd.WithSetupStdout()) + if err != nil { + return "", fmt.Errorf("running refs migrate: %w", err) + } + + output, err := io.ReadAll(cmd) + if err != nil { + return "", fmt.Errorf("reading refs migrate output: %w", err) + } + + if err = cmd.Wait(); err != nil { + return "", fmt.Errorf("closing refs migrate: %w", err) + } + + matches := nameRegex.FindStringSubmatch(string(output)) + if len(matches) != 2 { + return "", fmt.Errorf("couldn't find migration directory") + } + + return matches[1], nil +} + +func applyLooseRefs(ctx context.Context, repo *localrepo.Repo, repoPath string, updater *updateref.Updater) error { + err := filepath.WalkDir(filepath.Join(repoPath, "refs"), func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + return nil + } + + content, err := os.ReadFile(path) + if err != nil { + return fmt.Errorf("reading file: %w", err) + } + content = bytes.TrimSpace(content) + + refname, err := filepath.Rel(repoPath, path) + if err != nil { + return fmt.Errorf("parsing refname: %w", err) + } + + if strings.HasPrefix(string(content), "ref: ") { + return updater.UpdateSymbolicReference(git.ReferenceName(refname), git.ReferenceName(content[5:])) + } else { + return updater.Update(git.ReferenceName(refname), git.ObjectID(content), "") + } + }) + if err != nil { + return fmt.Errorf("walking refs folder: %w", err) + } + + head, err := repo.GetDefaultBranch(ctx) + if err != nil { + return fmt.Errorf("reading HEAD ref: %w", err) + } + + return updater.UpdateSymbolicReference("HEAD", head) +} + +func swapBackend(ctx context.Context, repo *localrepo.Repo, repoPath, reftablePath string) error { + if err := os.Rename( + filepath.Join(reftablePath, "reftable"), + filepath.Join(repoPath, "reftable"), + ); err != nil { + return fmt.Errorf("moving the reftable folder: %w", err) + } + + var stderr bytes.Buffer + if err := repo.ExecAndWait(ctx, gitcmd.Command{ + Name: "config", + Action: "set", + Args: []string{"core.repositoryformatversion", "1"}, + }, gitcmd.WithStderr(&stderr)); err != nil { + return fmt.Errorf("writing repository format version config: %w: %v", err, stderr.String()) + } + + if err := repo.ExecAndWait(ctx, gitcmd.Command{ + Name: "config", + Action: "set", + Args: []string{"extensions.refstorage", "reftable"}, + }, gitcmd.WithStderr(&stderr)); err != nil { + return fmt.Errorf("writing refstorage config: %w: %v", err, stderr.String()) + } + + return nil +} + +func cleanupRepo(repoPath, reftablePath string) error { + if err := os.Rename( + filepath.Join(reftablePath, "HEAD"), + filepath.Join(repoPath, "HEAD"), + ); err != nil { + return fmt.Errorf("replace HEAD: %w", err) + } + + if err := os.Remove(filepath.Join(repoPath, "packed-refs")); err != nil { + return fmt.Errorf("removing packed-refs: %w", err) + } + + if err := os.RemoveAll(filepath.Join(repoPath, "refs")); err != nil { + return fmt.Errorf("removing refs: %w", err) + } + + if err := os.Rename( + filepath.Join(reftablePath, "refs"), + filepath.Join(repoPath, "refs"), + ); err != nil { + return fmt.Errorf("move dry-run refs folder: %w", err) + } + + if err := os.Remove(reftablePath); err != nil { + return fmt.Errorf("removing migration folder: %w", err) + } + + return nil +} + +func migrate( + ctx context.Context, + repo *localrepo.Repo, + block func() error, + unblock func() error, +) (err error) { + backend, err := repo.ReferenceBackend(ctx) + if err != nil { + return fmt.Errorf("reference backend: %w", err) + } + + // Already migrated \m/ + if backend == git.ReferenceBackendReftables { + return nil + } + + repoPath, err := repo.Path(ctx) + if err != nil { + return fmt.Errorf("getting repo path: %w", err) + } + + before, err := os.Stat(filepath.Join(repoPath, "packed-refs")) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("packed-refs not found: %w", err) + } + return fmt.Errorf("stat packed-refs: %w", err) + } + + reftablePath, err := migrateWithDryRun(ctx, repo) + if err != nil { + return fmt.Errorf("creating dry-run reftable: %w", err) + } + defer func() { + if err != nil { + if cleanupErr := os.RemoveAll(reftablePath); cleanupErr != nil { + err = errors.Join(err, cleanupErr) + } + } + }() + + uri := git.CreateRefURI(git.ReferenceBackendReftables, reftablePath) + updater, err := updateref.New( + ctx, + repo, + updateref.WithNoDeref(), + updateref.WithDisabledTransactions(), + updateref.WithRefURI(uri)) + if err != nil { + return fmt.Errorf("creating updateref: %w", err) + } + + if err := updater.Start(); err != nil { + return fmt.Errorf("start update-ref: %w", err) + } + + if err := func() (returnedErr error) { + if returnedErr = block(); returnedErr != nil { + return fmt.Errorf("failed to block: %w", returnedErr) + } + + if err = applyLooseRefs(ctx, repo, repoPath, updater); err != nil { + return fmt.Errorf("applying loose-refs: %w", err) + } + + if err := updater.Commit(); err != nil { + return fmt.Errorf("commit update-ref: %w", err) + } + + after, err := os.Stat(filepath.Join(repoPath, "packed-refs")) + if err != nil { + return fmt.Errorf("stat packed-refs: %w", err) + } + + defer func() { + if err := unblock(); err != nil { + returnedErr = errors.Join(fmt.Errorf("failed to unblock: %w", err), returnedErr) + } + }() + + if before.ModTime().Compare(after.ModTime()) != 0 { + return fmt.Errorf("packed-refs modified") + } + + return swapBackend(ctx, repo, repoPath, reftablePath) + }(); err != nil { + return fmt.Errorf("swapping backend: %w", err) + } + + return cleanupRepo(repoPath, reftablePath) +} diff --git a/internal/gitaly/reftable/migrator_test.go b/internal/gitaly/reftable/migrator_test.go new file mode 100644 index 0000000000..bb0edd0dce --- /dev/null +++ b/internal/gitaly/reftable/migrator_test.go @@ -0,0 +1,218 @@ +package reftable + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "gitlab.com/gitlab-org/gitaly/v18/internal/git" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" +) + +func TestMigrator(t *testing.T) { + testhelper.SkipWithReftable(t, "checks migration from files-backend") + + t.Parallel() + + type setupData struct { + repo *localrepo.Repo + repoPath string + block func() error + unblock func() error + expectedErrSubstring string + } + + for _, tc := range []struct { + desc string + setup func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData + }{ + { + desc: "no packed-refs", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: func() error { return nil }, + unblock: func() error { return nil }, + expectedErrSubstring: "packed-refs not found", + } + }, + }, + { + desc: "already on reftable", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.Exec(t, cfg, "-C", repoPath, "refs", "migrate", "--ref-format=reftable") + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: func() error { return nil }, + unblock: func() error { return nil }, + } + }, + }, + { + desc: "with packed-refs", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: func() error { return nil }, + unblock: func() error { return nil }, + } + }, + }, + { + desc: "with packed-refs and some loose refs", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + c1 := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch"), gittest.WithTree( + gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "foo"}, + })), + ) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(c1), gittest.WithBranch("loose"), + gittest.WithTree(gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "bar"}, + })), + ) + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: func() error { return nil }, + unblock: func() error { return nil }, + } + }, + }, + { + desc: "packed-refs modified", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch"), gittest.WithTree( + gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "foo"}, + })), + ) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + blockFn := func() error { + return os.Chtimes(filepath.Join(repoPath, "packed-refs"), time.Now(), time.Now()) + } + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: blockFn, + unblock: func() error { return nil }, + expectedErrSubstring: "packed-refs modified", + } + }, + }, + { + desc: "ref modified before blocking", + setup: func(t *testing.T, ctx context.Context, cfg config.Cfg) setupData { + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + c1 := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch"), gittest.WithTree( + gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "foo"}, + })), + ) + c2 := gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(c1), gittest.WithBranch("loose"), + gittest.WithTree(gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "bar"}, + })), + ) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(c2), gittest.WithBranch("loose-2"), + gittest.WithTree(gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "goo"}, + })), + ) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + blockFn := func() error { + gittest.WriteCommit(t, cfg, repoPath, gittest.WithParents(c2), gittest.WithBranch("loose"), + gittest.WithTree(gittest.WriteTree(t, cfg, repoPath, []gittest.TreeEntry{ + {Path: "foo", Mode: "100644", Content: "boo"}, + })), + ) + + return nil + } + + return setupData{ + repo: localrepo.NewTestRepo(t, cfg, repoProto), + repoPath: repoPath, + block: blockFn, + unblock: func() error { return nil }, + } + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + data := tc.setup(t, ctx, cfg) + + before := gittest.Exec(t, cfg, "-C", data.repoPath, "refs", "list") + // Some tests modify the repository in the block stage. We should update + // the refs list accordingly. + if data.block != nil { + prev := data.block + data.block = func() error { + err := prev() + before = gittest.Exec(t, cfg, "-C", data.repoPath, "refs", "list") + return err + } + } + + migrateErr := migrate(ctx, data.repo, data.block, data.unblock) + + repo := localrepo.NewTestRepo(t, cfg, data.repo.Repository) + backend, err := repo.ReferenceBackend(ctx) + require.NoError(t, err) + + if len(data.expectedErrSubstring) > 0 { + require.Contains(t, migrateErr.Error(), data.expectedErrSubstring) + require.Equal(t, git.ReferenceBackendFiles, backend) + } else { + require.NoError(t, migrateErr) + require.Equal(t, git.ReferenceBackendReftables, backend) + } + + after := gittest.Exec(t, cfg, "-C", data.repoPath, "refs", "list") + require.Equal(t, string(before), string(after)) + }) + } +} diff --git a/internal/gitaly/reftable/testhelper_test.go b/internal/gitaly/reftable/testhelper_test.go new file mode 100644 index 0000000000..2bb5a311cf --- /dev/null +++ b/internal/gitaly/reftable/testhelper_test.go @@ -0,0 +1,11 @@ +package reftable + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} -- GitLab From ad32d10281c625784186ee1741a5d7ecb1e32651 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Sat, 29 Nov 2025 21:22:38 +0100 Subject: [PATCH 5/7] reftable: Add a WAL-less migration handler In the commit, we added a migrator which given a single repository can migrate that repository to reftables. We need a handler which can now take requests for migration of different repositories and handle their state. Add a `migrationHandler` type, which does this. The migration handler runs a long running goroutine and handles a single repository migration at a given time. If a migration fails, it adds an exponential delay to that repository, this avoids consecutive attempts on the same repository taking up resources. The state is only maintained in memory and will be flushed when Gitaly is restarted. The migration handler exposes the following functions: 1. Run() -> This spawns a long running goroutine, which handles all incoming requests their state and migration. 2. Close() -> Close the migration handler by waiting for any ongoing tasks. 3. RegisterMigration(), CancelMigration() -> Use to register a migration and cancel a ongoing migration. --- internal/gitaly/reftable/metrics.go | 56 +++ internal/gitaly/reftable/migration_handler.go | 185 +++++++++ .../gitaly/reftable/migration_handler_test.go | 369 ++++++++++++++++++ 3 files changed, 610 insertions(+) create mode 100644 internal/gitaly/reftable/metrics.go create mode 100644 internal/gitaly/reftable/migration_handler.go create mode 100644 internal/gitaly/reftable/migration_handler_test.go diff --git a/internal/gitaly/reftable/metrics.go b/internal/gitaly/reftable/metrics.go new file mode 100644 index 0000000000..4ceb89cbe8 --- /dev/null +++ b/internal/gitaly/reftable/metrics.go @@ -0,0 +1,56 @@ +package reftable + +import ( + "context" + "errors" + + "github.com/prometheus/client_golang/prometheus" +) + +// Metrics contains the metrics collected across reftable migrations. +type Metrics struct { + // latencyMetric is a metric to capture latency of the reftable migration. + // This is only logged for successful migrations, so the count would also + // provide the number of successful migrations. + latencyMetric *prometheus.HistogramVec + // failsMetric is a metric to capture the number of migration failures. + failsMetric *prometheus.CounterVec +} + +func failMetricReason(err error) string { + if errors.Is(err, context.Canceled) { + return "context_cancelled" + } + return "migration_error" +} + +// NewMetrics returns a new Metrics instance. +func NewMetrics() Metrics { + return Metrics{ + latencyMetric: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gitaly_reftable_migration_latency_seconds", + Help: "Latency of a successful repository migration", + }, + []string{"without_wal"}, + ), + failsMetric: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_reftable_migration_failure", + Help: "Counter of the total number of migration failures", + }, + []string{"without_wal", "reason"}, + ), + } +} + +// Describe implements prometheus.Collector. +func (m Metrics) Describe(descs chan<- *prometheus.Desc) { + prometheus.DescribeByCollect(m, descs) +} + +// Collect implements prometheus.Collector. +func (m Metrics) Collect(metrics chan<- prometheus.Metric) { + m.latencyMetric.Collect(metrics) + m.failsMetric.Collect(metrics) +} diff --git a/internal/gitaly/reftable/migration_handler.go b/internal/gitaly/reftable/migration_handler.go new file mode 100644 index 0000000000..5a43fa1631 --- /dev/null +++ b/internal/gitaly/reftable/migration_handler.go @@ -0,0 +1,185 @@ +package reftable + +import ( + "context" + "errors" + "math" + "sync" + "time" + + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/log" +) + +type migratorState struct { + completed bool + attempts uint + coolDown time.Time + cancelCtx context.CancelFunc +} + +type migrationData struct { + repo *localrepo.Repo + block func() error + unblock func() error +} + +type migrationHandler struct { + wg sync.WaitGroup + + ctx context.Context + ctxCancel context.CancelFunc + + logger log.Logger + metrics Metrics + + migrateCh chan migrationData + state sync.Map +} + +// NewMigrationHandler provides a new migration handler. +func NewMigrationHandler(logger log.Logger, metrics Metrics) *migrationHandler { + ctx, cancel := context.WithCancel(context.Background()) + + return &migrationHandler{ + ctx: ctx, + ctxCancel: cancel, + + logger: logger, + metrics: metrics, + + migrateCh: make(chan migrationData), + state: sync.Map{}, + } +} + +// Run creates a long running goroutine which handles all migration requests. +// Must be accompanied with a Close(), which will safely stop the goroutine. +func (m *migrationHandler) Run() { + m.wg.Add(1) + go func() { + defer m.wg.Done() + + for { + select { + case <-m.ctx.Done(): + return + case data := <-m.migrateCh: + func() { + if data.repo == nil || data.block == nil || data.unblock == nil { + return + } + + ctx, cancel := context.WithCancel(m.ctx) + repoPath, err := data.repo.Path(ctx) + if err != nil { + m.logger.WithError(err).WithFields(log.Fields{ + "repo_path": repoPath, + }).ErrorContext(ctx, "reftable migration failed getting path") + + cancel() + return + } + + val, ok := m.state.LoadOrStore(repoPath, migratorState{cancelCtx: cancel}) + state := val.(migratorState) + + // If the state was present, we still need to store our + // cancellation function. + if ok { + state.cancelCtx = cancel + m.state.Store(repoPath, state) + } + + // We don't do 'defer m.state.Store(...)' here, because that would + // fix the state as is here. We want to delay the evaluation of the + // state + defer func() { + m.state.Store(repoPath, state) + }() + + if state.completed || state.coolDown.After(time.Now()) { + return + } + + before := time.Now() + err = migrate(ctx, data.repo, data.block, data.unblock) + // We shouldn't care about migration status for repositories which don't + // event exist. + if errors.Is(err, storage.ErrRepositoryNotFound) { + return + } + latency := time.Since(before) + + state.attempts = state.attempts + 1 + state.cancelCtx = nil + + if err != nil { + m.logger.WithError(err).WithFields(log.Fields{ + "repo_path": repoPath, + "migration_latency": latency, + "migration_attempts": state.attempts, + }).ErrorContext(ctx, "reftable migration failed for repository") + m.metrics.failsMetric.WithLabelValues("true", failMetricReason(err)).Add(1) + + // Let's delay exponentially, but with a max of 6hrs + delay := min(math.Pow(2, float64(state.attempts)), 6) + state.coolDown = time.Now().Add(time.Duration(delay) * time.Hour) + } else { + m.logger.WithFields(log.Fields{ + "repo_path": repoPath, + "migration_latency": latency, + "migration_attempts": state.attempts, + }).InfoContext(ctx, "reftable migration successful for repository") + m.metrics.latencyMetric.WithLabelValues("true").Observe(latency.Seconds()) + + state.completed = true + } + }() + } + } + }() +} + +// Close is used to stop the handler. +func (m *migrationHandler) Close() { + defer m.wg.Wait() + m.ctxCancel() +} + +// RegisterMigration is used to register a new migration. This function +// is non-blocking and doesn't return an error. +func (m *migrationHandler) RegisterMigration( + repo *localrepo.Repo, + block func() error, + unblock func() error, +) bool { + select { + case m.migrateCh <- migrationData{repo: repo, block: block, unblock: unblock}: + return true + default: + return false + } +} + +// CancelMigration cancels the ongoing migration if it matches the +// state provided. +func (m *migrationHandler) CancelMigration(repo *localrepo.Repo) { + repoPath, err := repo.Path(m.ctx) + if err != nil { + m.logger.WithError(err).WithFields(log.Fields{ + "repo_path": repoPath, + }).ErrorContext(m.ctx, "reftable migration failed getting path") + return + } + + val, ok := m.state.Load(repoPath) + if !ok { + return + } + + if cancel := val.(migratorState).cancelCtx; cancel != nil { + cancel() + } +} diff --git a/internal/gitaly/reftable/migration_handler_test.go b/internal/gitaly/reftable/migration_handler_test.go new file mode 100644 index 0000000000..7846bbcd61 --- /dev/null +++ b/internal/gitaly/reftable/migration_handler_test.go @@ -0,0 +1,369 @@ +package reftable + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" +) + +// Exercise the registration function. Migrations should be registered +// irrelevant of the current state (completed, pending, cooldown). +func TestMigrationHandler_RegisterMigration(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + state migratorState + }{ + { + desc: "migration already completed", + state: migratorState{ + completed: true, + }, + }, + { + desc: "migration in cooldown period", + state: migratorState{ + coolDown: time.Now().Add(1 * time.Hour), + }, + }, + { + desc: "migration with expired cooldown period", + state: migratorState{ + coolDown: time.Now().Add(-1 * time.Hour), + }, + }, + { + desc: "migration state doesn't exist", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + parentCtx, parentCancel := context.WithCancel(context.Background()) + m := &migrationHandler{ + migrateCh: make(chan migrationData), + state: sync.Map{}, + ctx: parentCtx, + ctxCancel: parentCancel, + } + m.state.Store(repoPath, tc.state) + + var wg sync.WaitGroup + defer wg.Wait() + + stopCh := make(chan struct{}) + + // We raise multiple gorountines, only one would go through. + // The others would go to the default case. + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopCh: + return + default: + wg.Add(1) + go func() { + defer wg.Done() + m.RegisterMigration(repo, func() error { return nil }, func() error { return nil }) + }() + } + } + }() + + defer func() { + close(stopCh) + }() + + data := <-m.migrateCh + require.Equal(t, repo, data.repo) + }) + } +} + +func TestMigrationHandler_CancelMigration(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + repoProto, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + for _, tc := range []struct { + desc string + repo *localrepo.Repo + expectedErr error + }{ + { + desc: "incorrect repo", + repo: func() *localrepo.Repo { + repoProto, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + return localrepo.NewTestRepo(t, cfg, repoProto) + }(), + }, + { + desc: "success path", + repo: repo, + expectedErr: context.Canceled, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + + parentCtx, parentCancel := context.WithCancel(context.Background()) + m := &migrationHandler{ + state: sync.Map{}, + ctx: parentCtx, + ctxCancel: parentCancel, + } + + repoPath, err := repo.Path(m.ctx) + require.NoError(t, err) + m.state.Store(repoPath, migratorState{cancelCtx: cancel}) + + m.CancelMigration(tc.repo) + require.ErrorIs(t, ctx.Err(), tc.expectedErr) + }) + } +} + +func TestMigrationHandler(t *testing.T) { + t.Parallel() + + testhelper.SkipWithWAL(t, "the migration code runs without WAL") + testhelper.SkipWithPraefect(t, "we shouldn't migrate with Praefect") + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + logger := testhelper.NewLogger(t) + hook := testhelper.AddLoggerHook(logger) + metrics := NewMetrics() + + type setupData struct { + block func() error + unblock func() error + run func(m *migrationHandler, repo *localrepo.Repo) + repoProto *gitalypb.Repository + startState *migratorState + } + + for _, tc := range []struct { + desc string + setup func() setupData + completed bool + attempts uint + expectedLogMsg string + }{ + { + desc: "cancelled migration", + setup: func() setupData { + ch := make(chan struct{}) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repoProto: repo, + block: func() error { + <-ch + <-ch + + return nil + }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) { + ch <- struct{}{} + m.CancelMigration(repo) + ch <- struct{}{} + }, + } + }, + completed: false, + attempts: 1, + expectedLogMsg: "reftable migration failed for repository", + }, + { + desc: "existing state with cancelled migration", + setup: func() setupData { + ch := make(chan struct{}) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repoProto: repo, + block: func() error { + <-ch + <-ch + + return nil + }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) { + ch <- struct{}{} + m.CancelMigration(repo) + ch <- struct{}{} + }, + startState: &migratorState{attempts: 3}, + } + }, + completed: false, + attempts: 4, + expectedLogMsg: "reftable migration failed for repository", + }, + { + desc: "repository not found error", + setup: func() setupData { + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repoProto: repo, + // Mimic repository not existing by passing the error + // through the block function. + block: func() error { return storage.ErrRepositoryNotFound }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) {}, + } + }, + // When we encounter a ErrRepositoryNotFound error, we simply + // skip the migration and don't mark it as completed or attempted. + completed: false, + attempts: 0, + }, + { + desc: "unsuccessful migration without packed-refs", + setup: func() setupData { + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + + return setupData{ + repoProto: repo, + block: func() error { return nil }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) {}, + } + }, + completed: false, + attempts: 1, + expectedLogMsg: "reftable migration failed for repository", + }, + { + desc: "successful migration", + setup: func() setupData { + repo, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("branch")) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + return setupData{ + repoProto: repo, + block: func() error { return nil }, + unblock: func() error { return nil }, + run: func(m *migrationHandler, repo *localrepo.Repo) {}, + } + }, + completed: true, + attempts: 1, + expectedLogMsg: "reftable migration successful for repository", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + data := tc.setup() + + parentCtx, parentCancel := context.WithCancel(context.Background()) + m := &migrationHandler{ + wg: sync.WaitGroup{}, + + ctx: parentCtx, + ctxCancel: parentCancel, + + logger: logger, + metrics: metrics, + + migrateCh: make(chan migrationData), + state: sync.Map{}, + } + + repo := localrepo.NewTestRepo(t, cfg, data.repoProto) + repoPath, err := repo.Path(ctx) + require.NoError(t, err) + + if data.startState != nil { + m.state.Store(repoPath, *data.startState) + } + + m.Run() + defer m.Close() + + // It is not guaranteed that the migration is registered, so run it in a + // loop until it is. + for { + if val, ok := m.state.Load(repoPath); ok { + if val.(migratorState).cancelCtx != nil { + break + } + } + + m.RegisterMigration(repo, data.block, data.unblock) + } + + data.run(m, repo) + + // Block till the old migration is complete. + m.migrateCh <- migrationData{} + + val, ok := m.state.Load(repoPath) + state := val.(migratorState) + + require.True(t, ok) + require.Equal(t, tc.completed, state.completed) + require.Equal(t, tc.attempts, state.attempts) + + if tc.expectedLogMsg != "" { + entries := hook.AllEntries() + entry := entries[len(entries)-1] + require.Equal(t, tc.expectedLogMsg, entry.Message) + require.Greater(t, entry.Data["migration_latency"].(time.Duration), time.Duration(0)) + require.Greater(t, entry.Data["migration_attempts"].(uint), uint(0)) + } + }) + } +} -- GitLab From 47e5ff781932ab23a50ff53870dec9d92399e0b2 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Tue, 2 Dec 2025 07:52:10 +0100 Subject: [PATCH 6/7] fixup! reftable: Add code for facilitating a migration --- internal/gitaly/reftable/migrator.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/gitaly/reftable/migrator.go b/internal/gitaly/reftable/migrator.go index 13c3b84e57..d50db58467 100644 --- a/internal/gitaly/reftable/migrator.go +++ b/internal/gitaly/reftable/migrator.go @@ -216,6 +216,12 @@ func migrate( return fmt.Errorf("failed to block: %w", returnedErr) } + defer func() { + if err := unblock(); err != nil { + returnedErr = errors.Join(fmt.Errorf("failed to unblock: %w", err), returnedErr) + } + }() + if err = applyLooseRefs(ctx, repo, repoPath, updater); err != nil { return fmt.Errorf("applying loose-refs: %w", err) } @@ -229,12 +235,6 @@ func migrate( return fmt.Errorf("stat packed-refs: %w", err) } - defer func() { - if err := unblock(); err != nil { - returnedErr = errors.Join(fmt.Errorf("failed to unblock: %w", err), returnedErr) - } - }() - if before.ModTime().Compare(after.ModTime()) != 0 { return fmt.Errorf("packed-refs modified") } -- GitLab From 42800550f79dc5b144b575edc3f585223e56e591 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Tue, 2 Dec 2025 07:52:14 +0100 Subject: [PATCH 7/7] s3 --- internal/gitaly/reftable/middleware.go | 129 ++++++++++ internal/gitaly/reftable/middleware_test.go | 229 ++++++++++++++++++ internal/gitaly/reftable/migration_handler.go | 5 +- internal/testhelper/testserver/gitaly.go | 98 +++++--- 4 files changed, 418 insertions(+), 43 deletions(-) create mode 100644 internal/gitaly/reftable/middleware.go create mode 100644 internal/gitaly/reftable/middleware_test.go diff --git a/internal/gitaly/reftable/middleware.go b/internal/gitaly/reftable/middleware.go new file mode 100644 index 0000000000..9acec3f697 --- /dev/null +++ b/internal/gitaly/reftable/middleware.go @@ -0,0 +1,129 @@ +package reftable + +import ( + "context" + "fmt" + "sync" + + "gitlab.com/gitlab-org/gitaly/v18/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v18/middleware" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +type migrationRegister interface { + RegisterMigration(repo *localrepo.Repo, block func() error, unblock func() error) + CancelMigration(repo *localrepo.Repo) +} + +// NewInterceptors provides opportunistic middlewares to aid in reftable migration. +// It only registers a migration for an incoming ACCESSOR request. If any other +// type of request is received, it tries to cancel the migration if any exist +// and are ongoing. +// +// For streaming RPCs we consume the first request and wrap it again before calling +// the next handler. This ensures that the next handler doesn't miss the first message. +func NewInterceptors( + factory localrepo.Factory, + registry *protoregistry.Registry, + register migrationRegister, +) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor) { + locks := sync.Map{} + + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (_ any, returnedErr error) { + tx := storage.ExtractTransaction(ctx) + + if tx == nil && featureflag.ReftableMigration.IsEnabled(ctx) { + methodInfo, err := registry.LookupMethod(info.FullMethod) + if err != nil { + // We might get an error for health checks, let's err on + // the side of safety. + return handler(ctx, req) + } + + targetRepo, err := methodInfo.TargetRepo(req.(proto.Message)) + if err != nil { + return nil, fmt.Errorf("extract repository: %w", err) + } + + lockAny, _ := locks.LoadOrStore(targetRepo.GetRelativePath(), &sync.RWMutex{}) + lock := lockAny.(*sync.RWMutex) + + repo := factory.Build(targetRepo) + block := func() error { lock.Lock(); return nil } + unblock := func() error { lock.Unlock(); return nil } + + switch methodInfo.Operation { + case protoregistry.OpAccessor: + register.RegisterMigration(repo, block, unblock) + case protoregistry.OpMutator: + defer register.RegisterMigration(repo, block, unblock) + fallthrough + default: + // Cancel any ongoing migrations to avoid conflicts + // but schedule one to start after we serve the request + register.CancelMigration(repo) + } + + lock.RLock() + defer lock.RUnlock() + return handler(ctx, req) + } + + return handler(ctx, req) + }, func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (returnedErr error) { + tx := storage.ExtractTransaction(ss.Context()) + + if tx == nil && featureflag.ReftableMigration.IsEnabled(ss.Context()) { + methodInfo, err := registry.LookupMethod(info.FullMethod) + if err != nil { + // We might get an error for health checks, let's err on + // the side of safety. + return handler(srv, ss) + } + + req := methodInfo.NewRequest() + if err := ss.RecvMsg(req); err != nil { + // All of the repository scoped streaming RPCs send the repository in the first message. + // Generally it should be fine to error out in all cases if there is no message sent. + // To maintain compatibility with tests, we instead invoke the handler to let them return + // the asserted error messages. Once the transaction management is on by default, we should + // error out here directly and amend the failing test cases. + return handler(srv, middleware.NewPeekedStream(ss.Context(), nil, err, ss)) + } + + targetRepo, err := methodInfo.TargetRepo(req) + if err != nil { + return fmt.Errorf("extract repository: %w", err) + } + + lockAny, _ := locks.LoadOrStore(targetRepo.GetRelativePath(), &sync.RWMutex{}) + lock := lockAny.(*sync.RWMutex) + + repo := factory.Build(targetRepo) + block := func() error { lock.Lock(); return nil } + unblock := func() error { lock.Unlock(); return nil } + + switch methodInfo.Operation { + case protoregistry.OpAccessor: + register.RegisterMigration(repo, block, unblock) + case protoregistry.OpMutator: + defer register.RegisterMigration(repo, block, unblock) + fallthrough + default: + // Cancel any ongoing migrations to avoid conflicts + // but schedule one to start after we serve the request + register.CancelMigration(repo) + } + + lock.RLock() + defer lock.RUnlock() + return handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) + } + + return handler(srv, ss) + } +} diff --git a/internal/gitaly/reftable/middleware_test.go b/internal/gitaly/reftable/middleware_test.go new file mode 100644 index 0000000000..06bbca0057 --- /dev/null +++ b/internal/gitaly/reftable/middleware_test.go @@ -0,0 +1,229 @@ +package reftable + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v18/internal/git" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/commit" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/hook" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/ref" + "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/service/repository" + "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v18/internal/log" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper/testserver" + "gitlab.com/gitlab-org/gitaly/v18/proto/go/gitalypb" + "google.golang.org/grpc" +) + +type mockReftableMigrator struct { + registerCount int + cancelCount int + handler *migrationHandler +} + +func (m *mockReftableMigrator) RegisterMigration(repo *localrepo.Repo, block, unblock func() error) { + m.registerCount++ + m.handler.RegisterMigration(repo, block, unblock) +} + +func (m *mockReftableMigrator) CancelMigration(repo *localrepo.Repo) { + m.cancelCount++ + m.handler.CancelMigration(repo) +} + +func TestInterceptor(t *testing.T) { + t.Parallel() + + testhelper.NewFeatureSets(featureflag.ReftableMigration).Run(t, testInterceptor) +} + +func testInterceptor(t *testing.T, ctx context.Context) { + cfg := testcfg.Build(t) + + testhelper.SkipWithWAL(t, "the migration code runs without WAL") + testhelper.SkipWithPraefect(t, "we shouldn't migrate with Praefect") + + mockMigrator := mockReftableMigrator{} + var handler *migrationHandler + callback := func(logger log.Logger, factory localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) { + handler = NewMigrationHandler(logger, NewMetrics()) + handler.Run() + + mockMigrator.handler = handler + + unary, stream := NewInterceptors(factory, protoregistry.GitalyProtoPreregistered, &mockMigrator) + + return []grpc.UnaryServerInterceptor{unary}, []grpc.StreamServerInterceptor{stream} + } + + serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { + gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(deps)) + gitalypb.RegisterRefServiceServer(srv, ref.NewServer(deps)) + gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps)) + gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps)) + }, testserver.WithInterceptors(callback), + testserver.WithContext(ctx)) + cfg.SocketPath = serverSocketPath + + conn, err := client.New(ctx, serverSocketPath) + require.NoError(t, err) + t.Cleanup(func() { conn.Close() }) + + for _, tc := range []struct { + desc string + setup func(repoProto *gitalypb.Repository, commitID git.ObjectID) + expectedRegistrations int + expectedCancellations int + }{ + { + desc: "unary accessor", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + request := &gitalypb.FindCommitRequest{ + Repository: repoProto, + Revision: []byte("main"), + } + + client := gitalypb.NewCommitServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + _, err := client.FindCommit(ctx, request) + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 0, 0), + }, + { + desc: "unary mutator", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + client := gitalypb.NewRefServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + _, err := client.DeleteRefs(ctx, &gitalypb.DeleteRefsRequest{ + Repository: repoProto, + Refs: [][]byte{ + []byte("refs/heads/main"), + }, + }) + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 0, 0), + }, + { + desc: "stream accessor", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + client := gitalypb.NewRefServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + stream, err := client.FindAllBranches(ctx, &gitalypb.FindAllBranchesRequest{ + Repository: repoProto, + }) + require.NoError(t, err) + + _, err = testhelper.ReceiveAndFold(stream.Recv, func( + result []*gitalypb.FindAllBranchesResponse_Branch, + response *gitalypb.FindAllBranchesResponse, + ) []*gitalypb.FindAllBranchesResponse_Branch { + if response == nil { + return result + } + + return append(result, response.GetBranches()...) + }) + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 0, 0), + }, + { + desc: "stream mutator", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + client := gitalypb.NewRefServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + updater, err := client.UpdateReferences(ctx) + require.NoError(t, err) + + err = updater.Send(&gitalypb.UpdateReferencesRequest{ + Repository: repoProto, + Updates: []*gitalypb.UpdateReferencesRequest_Update{ + { + Reference: []byte("refs/heads/fun"), + OldObjectId: []byte(gittest.DefaultObjectHash.ZeroOID), + NewObjectId: []byte(commitID), + }, + }, + }) + require.NoError(t, err) + + _, err = updater.CloseAndRecv() + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 0, 0), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // Reset the mock migration handler count for each test + mockMigrator.registerCount, mockMigrator.cancelCount = 0, 0 + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), + gittest.WithTreeEntries( + gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, + ), + ) + gittest.Exec(t, cfg, "-C", repoPath, "pack-refs", "--all") + + tc.setup(repoProto, commitID) + + if featureflag.ReftableMigration.IsEnabled(ctx) { + // Block to ensure the previous migration was successful. + handler.migrateCh <- migrationData{} + } + + repoInfo, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: repoProto, + }) + require.NoError(t, err) + + require.Equal(t, + testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, + gittest.FilesOrReftables( + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, + ), + ), + repoInfo.GetReferences().GetReferenceBackend(), + ) + + require.Equal(t, tc.expectedRegistrations, mockMigrator.registerCount) + require.Equal(t, tc.expectedCancellations, mockMigrator.cancelCount) + }) + } + + if featureflag.ReftableMigration.IsEnabled(ctx) { + handler.Close() + } +} diff --git a/internal/gitaly/reftable/migration_handler.go b/internal/gitaly/reftable/migration_handler.go index 5a43fa1631..bd3ea08286 100644 --- a/internal/gitaly/reftable/migration_handler.go +++ b/internal/gitaly/reftable/migration_handler.go @@ -154,12 +154,11 @@ func (m *migrationHandler) RegisterMigration( repo *localrepo.Repo, block func() error, unblock func() error, -) bool { +) { select { case m.migrateCh <- migrationData{repo: repo, block: block, unblock: unblock}: - return true default: - return false + return } } diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index b995c1c68c..6efb02f96a 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -176,6 +176,10 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d gsd = opt(gsd) } + if gsd.ctx != nil { + ctx = gsd.ctx + } + // We set up the structerr interceptors so that any error metadata that gets set via // `structerr.WithMetadata()` is not only logged, but also present in the error details. serverOpts := []server.Option{ @@ -188,6 +192,16 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d deps := gsd.createDependencies(tb, ctx, cfg) tb.Cleanup(func() { testhelper.MustClose(tb, gsd.conns) }) + if gsd.interceptorsFn != nil { + unary, stream := gsd.interceptorsFn(deps.GetLogger(), deps.GetRepositoryFactory()) + for _, v := range unary { + serverOpts = append(serverOpts, server.WithUnaryInterceptor(v)) + } + for _, v := range stream { + serverOpts = append(serverOpts, server.WithStreamInterceptor(v)) + } + } + var txMiddleware server.TransactionMiddleware if deps.GetNode() != nil { unaryInterceptors := []grpc.UnaryServerInterceptor{ @@ -201,12 +215,6 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d ), } - if gsd.transactionInterceptorsFn != nil { - unary, stream := gsd.transactionInterceptorsFn(deps.GetLogger(), deps.GetNode(), deps.GetRepositoryFactory()) - unaryInterceptors = append(unaryInterceptors, unary...) - streamInterceptors = append(streamInterceptors, stream...) - } - txMiddleware = server.TransactionMiddleware{ UnaryInterceptors: unaryInterceptors, StreamInterceptors: streamInterceptors, @@ -290,36 +298,37 @@ func registerHealthServerIfNotRegistered(srv *grpc.Server) { } type gitalyServerDeps struct { - disablePraefect bool - logger log.Logger - conns *client.Pool - locator storage.Locator - txMgr transaction.Manager - hookMgr hook.Manager - gitlabClient gitlab.Client - gitCmdFactory gitcmd.CommandFactory - backchannelReg *backchannel.Registry - catfileCache catfile.Cache - diskCache cache.Cache - packObjectsCache streamcache.Cache - packObjectsLimiter limiter.Limiter - limitHandler *limithandler.LimiterMiddleware - repositoryCounter *counter.RepositoryCounter - updaterWithHooks *updateref.UpdaterWithHooks - housekeepingManager housekeepingmgr.Manager - backupSink *backup.Sink - backupLocator backup.Locator - signingKey string - transactionRegistry *storagemgr.TransactionRegistry - procReceiveRegistry *hook.ProcReceiveRegistry - bundleURIManager *bundleuri.GenerationManager - bundleURISink *bundleuri.Sink - bundleURIStrategy bundleuri.GenerationStrategy - localRepoFactory localrepo.Factory - migrations *[]migration.Migration - archiveCache streamcache.Cache - MigrationStateManager migration.StateManager - transactionInterceptorsFn func(log.Logger, storage.Node, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) + ctx context.Context + disablePraefect bool + logger log.Logger + conns *client.Pool + locator storage.Locator + txMgr transaction.Manager + hookMgr hook.Manager + gitlabClient gitlab.Client + gitCmdFactory gitcmd.CommandFactory + backchannelReg *backchannel.Registry + catfileCache catfile.Cache + diskCache cache.Cache + packObjectsCache streamcache.Cache + packObjectsLimiter limiter.Limiter + limitHandler *limithandler.LimiterMiddleware + repositoryCounter *counter.RepositoryCounter + updaterWithHooks *updateref.UpdaterWithHooks + housekeepingManager housekeepingmgr.Manager + backupSink *backup.Sink + backupLocator backup.Locator + signingKey string + transactionRegistry *storagemgr.TransactionRegistry + procReceiveRegistry *hook.ProcReceiveRegistry + bundleURIManager *bundleuri.GenerationManager + bundleURISink *bundleuri.Sink + bundleURIStrategy bundleuri.GenerationStrategy + localRepoFactory localrepo.Factory + migrations *[]migration.Migration + archiveCache streamcache.Cache + MigrationStateManager migration.StateManager + interceptorsFn func(log.Logger, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) } func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, ctx context.Context, cfg config.Cfg) *service.Dependencies { @@ -723,13 +732,13 @@ func WithRepositoryFactory(repoFactory localrepo.Factory) GitalyServerOpt { } } -// WithTransactionInterceptors allows for setting additional transaction middlewares to the server via +// WithInterceptors allows for setting additional transaction middlewares to the server via // a callback function. -func WithTransactionInterceptors( - fn func(log.Logger, storage.Node, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor), +func WithInterceptors( + fn func(log.Logger, localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor), ) GitalyServerOpt { return func(deps gitalyServerDeps) gitalyServerDeps { - deps.transactionInterceptorsFn = fn + deps.interceptorsFn = fn return deps } } @@ -741,3 +750,12 @@ func WithMigrations(migrations *[]migration.Migration) GitalyServerOpt { return deps } } + +// WithContext allows using a custom context. This is for adding server code which +// rely on feature sets. +func WithContext(ctx context.Context) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.ctx = ctx + return deps + } +} -- GitLab