From d67e3b51c9c074b34a5594008cefea53b81b13c1 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Wed, 16 Jul 2025 09:45:15 +0200 Subject: [PATCH 1/4] reftable: Increase the reftable test coverage Our CI systems run jobs independently for the WAL and for the reftable backend. But we don't have one which runs for both of them together. Let's increase our test coverage for reftable by adding a matrix to run reftables with praefect, wal, raft and praefect-wal. This includes some fixes to make sure the pipeline works as expected: 1. In 'reftable/middleware_test.go' we used to return early if the default reference backend was reftables. This causes a goroutine leak since we don't close the reftable migrator when returning early. While one fix would be to also close the migrator while returning early. Let's instead let the test run its course, this way, we also validate that the migrator works as expected when working on a already migrated repository. 2. In `TestCalculateChecksum`, it rewrites the reftables file by replacing the HEAD ref with an invalid one. When the WAL is also enabled, the file becomes read-only as it belongs to the canonical repository. To cicumvent the read-only permissions, delete the file first before calling os.WriteFile. 3. Finally, skip running `TestRestoreRepository` with reftables for now. This needs fixes in the `backup` package which is being handled in https://gitlab.com/gitlab-org/gitaly/-/merge_requests/7971/. --- .gitlab-ci.yml | 7 +++--- .../repository/calculate_checksum_test.go | 3 +++ .../repository/restore_repository_test.go | 2 ++ .../migration/reftable/middleware_test.go | 25 ++++++------------- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3924d11369..49a6f6ae68 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -315,9 +315,10 @@ test:pgbouncer: test:reftable: <<: *test_definition - variables: - <<: *test_variables - TEST_TARGET: test-with-reftable + parallel: + matrix: + - TEST_TARGET: [test, test-wal, test-raft, test-with-praefect-wal] + GITALY_TEST_REF_FORMAT: "reftable" test:nightly: <<: *test_definition diff --git a/internal/gitaly/service/repository/calculate_checksum_test.go b/internal/gitaly/service/repository/calculate_checksum_test.go index 9505687f3b..b2b850b142 100644 --- a/internal/gitaly/service/repository/calculate_checksum_test.go +++ b/internal/gitaly/service/repository/calculate_checksum_test.go @@ -190,6 +190,9 @@ func TestCalculateChecksum(t *testing.T) { reftableFileContent[i+headRefIdx] = nope[i] } + // When the WAL is enabled, files in the canonical repository will have read-only permissions. + // We need to first remove the file and re-create it. + require.NoError(t, os.Remove(reftableFilePath)) require.NoError(t, os.WriteFile(reftableFilePath, reftableFileContent, os.ModePerm)) } else { commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch(git.DefaultBranch)) diff --git a/internal/gitaly/service/repository/restore_repository_test.go b/internal/gitaly/service/repository/restore_repository_test.go index 90faef013a..bbc4672ea6 100644 --- a/internal/gitaly/service/repository/restore_repository_test.go +++ b/internal/gitaly/service/repository/restore_repository_test.go @@ -16,6 +16,8 @@ import ( ) func TestRestoreRepository(t *testing.T) { + testhelper.SkipWithReftable(t, "This needs a fix on the restore package, which is being done in #6786") + t.Parallel() ctx := testhelper.Context(t) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go index 33cd2e53f3..6491b1cf95 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go @@ -6,7 +6,6 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" - "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" @@ -57,13 +56,6 @@ func testUnaryInterceptor(t *testing.T, ctx context.Context) { repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) - repo := localrepo.NewTestRepo(t, cfg, repoProto) - backend, err := repo.ReferenceBackend(ctx) - require.NoError(t, err) - if backend == git.ReferenceBackendReftables { - return - } - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), gittest.WithTreeEntries( gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, )) @@ -95,7 +87,10 @@ func testUnaryInterceptor(t *testing.T, ctx context.Context) { require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, + gittest.FilesOrReftables( + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, + ), ), repoInfo.GetReferences().GetReferenceBackend(), ) @@ -136,13 +131,6 @@ func testStreamInterceptor(t *testing.T, ctx context.Context) { repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) - repo := localrepo.NewTestRepo(t, cfg, repoProto) - backend, err := repo.ReferenceBackend(ctx) - require.NoError(t, err) - if backend == git.ReferenceBackendReftables { - return - } - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), gittest.WithTreeEntries( gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, )) @@ -186,7 +174,10 @@ func testStreamInterceptor(t *testing.T, ctx context.Context) { require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, + gittest.FilesOrReftables( + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, + ), ), repoInfo.GetReferences().GetReferenceBackend(), ) -- GitLab From 773a70b483e7fb595141dac8f96c2f9a00823704 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Wed, 9 Jul 2025 16:39:41 +0200 Subject: [PATCH 2/4] reftable: Don't log errors when repository doesn't exist The Opportunistic reftable migrator logs any failed migration. The migrator is invoked for all ACCESSOR RPC requests. Some of these RPCs are invoked on repositories which don't exist or being created concurrently. So for any ErrRepositoryNotFound found, let's simply return without logging this as a failed migration. --- .../partition/migration/reftable/migrator.go | 5 + .../migration/reftable/migrator_test.go | 92 ++++++++++++++----- 2 files changed, 74 insertions(+), 23 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go index 0956d48cd9..8d0d88a8e6 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go @@ -159,6 +159,11 @@ func (m *migrator) Run() { } latency, err := m.migrate(ctx, data.storageName, data.relativePath) + // We shouldn't care about migration status for repositories which don't + // event exist. + if errors.Is(err, storage.ErrRepositoryNotFound) { + return + } state.attempts = state.attempts + 1 state.cancelCtx = nil diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go index 274fbea86a..ea7be55360 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go @@ -24,7 +24,8 @@ import ( ) type mockMigrationHandler struct { - ch <-chan struct{} + ch <-chan struct{} + err error } func (m *mockMigrationHandler) Migrate(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { @@ -33,6 +34,10 @@ func (m *mockMigrationHandler) Migrate(ctx context.Context, tx storage.Transacti <-m.ch } + if m.err != nil { + return m.err + } + return nil } @@ -221,37 +226,80 @@ func TestMigrator(t *testing.T) { require.NoError(t, err) defer ptnMgr.Close() + type setupData struct { + run func(m *migrator, repo *gitalypb.Repository) + migrationHandler migrationHandler + repo *gitalypb.Repository + } + for _, tc := range []struct { desc string - setup func() (func(m *migrator, repo *gitalypb.Repository), migrationHandler) + setup func() setupData completed bool + attempts uint expectedLogMsg string }{ { desc: "cancelled migration", - setup: func() (func(m *migrator, repo *gitalypb.Repository), migrationHandler) { + setup: func() setupData { ch := make(chan struct{}) - return func(m *migrator, repo *gitalypb.Repository) { - ch <- struct{}{} - m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) - ch <- struct{}{} - }, &mockMigrationHandler{ch: ch} + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + run: func(m *migrator, repo *gitalypb.Repository) { + ch <- struct{}{} + m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) + ch <- struct{}{} + }, + migrationHandler: &mockMigrationHandler{ch: ch}, + repo: repo, + } }, completed: false, + attempts: 1, expectedLogMsg: "migration failed for repository", }, + { + desc: "repository not found error", + setup: func() setupData { + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + run: func(m *migrator, repo *gitalypb.Repository) {}, + migrationHandler: &mockMigrationHandler{err: storage.ErrRepositoryNotFound}, + repo: repo, + } + }, + // When we encounter a ErrRepositoryNotFound error, we simply + // skip the migration and don't mark it as completed or attempted. + completed: false, + attempts: 0, + }, { desc: "successful migration", - setup: func() (func(m *migrator, repo *gitalypb.Repository), migrationHandler) { - return func(m *migrator, repo *gitalypb.Repository) {}, &mockMigrationHandler{} + setup: func() setupData { + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + run: func(m *migrator, repo *gitalypb.Repository) {}, + migrationHandler: &mockMigrationHandler{}, + repo: repo, + } }, completed: true, + attempts: 1, expectedLogMsg: "migration successful for repository", }, } { t.Run(tc.desc, func(t *testing.T) { - run, migrationHandler := tc.setup() + data := tc.setup() parentCtx, parentCancel := context.WithCancel(context.Background()) m := &migrator{ @@ -261,44 +309,42 @@ func TestMigrator(t *testing.T) { metrics: metrics, node: ptnMgr, state: sync.Map{}, - migrationHandler: migrationHandler, + migrationHandler: data.migrationHandler, ctx: parentCtx, ctxCancel: parentCancel, } storageName := cfg.Storages[0].Name - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - m.Run() defer m.Close() // It is not guaranteed that the migration is registered, so run it in a // loop until it is. for { - if _, ok := m.state.Load(migrationKey(storageName, repo.GetRelativePath())); ok { + if _, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())); ok { break } - m.RegisterMigration(storageName, repo.GetRelativePath()) + m.RegisterMigration(storageName, data.repo.GetRelativePath()) } - run(m, repo) + data.run(m, data.repo) // Block till the old migration is complete. m.migrateCh <- migrationData{} - val, ok := m.state.Load(migrationKey(storageName, repo.GetRelativePath())) + val, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())) state := val.(migratorState) require.True(t, ok) require.Equal(t, tc.completed, state.completed) - require.Equal(t, uint(1), state.attempts) + require.Equal(t, tc.attempts, state.attempts) - entries := hook.AllEntries() - require.Equal(t, tc.expectedLogMsg, entries[len(entries)-1].Message) + if tc.expectedLogMsg != "" { + entries := hook.AllEntries() + require.Equal(t, tc.expectedLogMsg, entries[len(entries)-1].Message) + } }) } } -- GitLab From ce9884666a67a6990ec932a707e8438d33cad411 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Thu, 10 Jul 2025 17:36:17 +0200 Subject: [PATCH 3/4] reftable: Register migrations at the end of MUTATOR RPCs The reftable migrator only registers a migration when a ACCESSOR RPC comes in and registers a cancellation for all other types of RPCs. This is to ensure we don't create conflicts. This however has a week point. Write heavy repositories will always keep registering cancellations whenever a write comes in. And if only writes are being made to the repository, it wouldn't even register a migration. By initiating a migration at the end of a write cycle, we can avoid conflict while also ensure that migrations are registered even if the repository only sees writes. --- .../migration/reftable/middleware.go | 20 +- .../migration/reftable/middleware_test.go | 294 ++++++++++-------- 2 files changed, 187 insertions(+), 127 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go index 56e5bd1328..d9acbfb06b 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go @@ -39,9 +39,15 @@ func NewUnaryInterceptor(logger log.Logger, registry *protoregistry.Registry, re targetRepo = tx.OriginalRepository(targetRepo) - if methodInfo.Operation == protoregistry.OpAccessor { + switch methodInfo.Operation { + case protoregistry.OpAccessor: register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - } else { + case protoregistry.OpMutator: + defer register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) + fallthrough + default: + // Cancel any ongoing migrations to avoid conflicts + // but schedule one to start after we serve the request register.CancelMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) } } @@ -84,9 +90,15 @@ func NewStreamInterceptor(logger log.Logger, registry *protoregistry.Registry, r targetRepo = tx.OriginalRepository(targetRepo) - if methodInfo.Operation == protoregistry.OpAccessor { + switch methodInfo.Operation { + case protoregistry.OpAccessor: register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - } else { + case protoregistry.OpMutator: + defer register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) + fallthrough + default: + // Cancel any ongoing migrations to avoid conflicts + // but schedule one to start after we serve the request register.CancelMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go index 6491b1cf95..feaa08f019 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go @@ -6,11 +6,13 @@ import ( "github.com/stretchr/testify/require" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/commit" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/ref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" @@ -23,164 +25,210 @@ import ( "google.golang.org/grpc" ) -func TestUnaryInterceptor(t *testing.T) { - t.Parallel() - - testhelper.NewFeatureSets( - featureflag.ReftableMigration, - ).Run(t, testUnaryInterceptor) +type mockReftableMigrator struct { + registerCount int + cancelCount int + reftableMigrator *migrator } -func testUnaryInterceptor(t *testing.T, ctx context.Context) { - cfg := testcfg.Build(t) - - if !testhelper.IsWALEnabled() { - t.Skip("only works with the WAL") - } - - var reftableMigrator *migrator - callback := func(logger log.Logger, node storage.Node, factory localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) { - reftableMigrator = NewMigrator(logger, NewMetrics(), node, factory) - reftableMigrator.Run() - - return []grpc.UnaryServerInterceptor{NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator)}, - []grpc.StreamServerInterceptor{NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator)} - } - - serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { - gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(deps)) - gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps)) - gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps)) - }, testserver.WithTransactionInterceptors(callback)) - cfg.SocketPath = serverSocketPath - - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) - - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), gittest.WithTreeEntries( - gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, - )) - request := &gitalypb.FindCommitRequest{ - Repository: repoProto, - Revision: []byte("main"), - } - - conn, err := client.New(ctx, serverSocketPath) - require.NoError(t, err) - t.Cleanup(func() { conn.Close() }) - - client := gitalypb.NewCommitServiceClient(conn) - - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx = testhelper.MergeOutgoingMetadata(ctx, md) - - _, err = client.FindCommit(ctx, request) - require.NoError(t, err) - - // Block to ensure the previous migration was successful. - reftableMigrator.migrateCh <- migrationData{} - - repoInfo, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ - Repository: repoProto, - }) - require.NoError(t, err) - - require.Equal(t, - testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - gittest.FilesOrReftables( - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - ), - ), - repoInfo.GetReferences().GetReferenceBackend(), - ) +func (m *mockReftableMigrator) RegisterMigration(storageName, relativePath string) { + m.registerCount++ + m.reftableMigrator.RegisterMigration(storageName, relativePath) +} - reftableMigrator.Close() +func (m *mockReftableMigrator) CancelMigration(storageName, relativePath string) { + m.cancelCount++ + m.reftableMigrator.CancelMigration(storageName, relativePath) } -func TestStreamInterceptor(t *testing.T) { +func TestInterceptor(t *testing.T) { t.Parallel() testhelper.NewFeatureSets( featureflag.ReftableMigration, - ).Run(t, testStreamInterceptor) + ).Run(t, testInterceptor) } -func testStreamInterceptor(t *testing.T, ctx context.Context) { +func testInterceptor(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) if !testhelper.IsWALEnabled() { t.Skip("only works with the WAL") } + mockMigrator := mockReftableMigrator{} var reftableMigrator *migrator callback := func(logger log.Logger, node storage.Node, factory localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) { reftableMigrator = NewMigrator(logger, NewMetrics(), node, factory) reftableMigrator.Run() - return []grpc.UnaryServerInterceptor{NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator)}, - []grpc.StreamServerInterceptor{NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator)} + mockMigrator.reftableMigrator = reftableMigrator + + return []grpc.UnaryServerInterceptor{NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, &mockMigrator)}, + []grpc.StreamServerInterceptor{NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, &mockMigrator)} } serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(deps)) + gitalypb.RegisterRefServiceServer(srv, ref.NewServer(deps)) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps)) gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps)) }, testserver.WithTransactionInterceptors(callback)) cfg.SocketPath = serverSocketPath - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) - - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), gittest.WithTreeEntries( - gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, - )) - request := &gitalypb.FindCommitsRequest{ - Repository: repoProto, - Revision: []byte("main"), - } - conn, err := client.New(ctx, serverSocketPath) require.NoError(t, err) t.Cleanup(func() { conn.Close() }) - client := gitalypb.NewCommitServiceClient(conn) - - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx = testhelper.MergeOutgoingMetadata(ctx, md) - - stream, err := client.FindCommits(ctx, request) - require.NoError(t, err) - - _, err = testhelper.ReceiveAndFold(stream.Recv, func( - result []*gitalypb.GitCommit, - response *gitalypb.FindCommitsResponse, - ) []*gitalypb.GitCommit { - if response == nil { - return result - } - - return append(result, response.GetCommits()...) - }) - require.NoError(t, err) - - // Block to ensure the previous migration was successful. - reftableMigrator.migrateCh <- migrationData{} - - repoInfo, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ - Repository: repoProto, - }) - require.NoError(t, err) - - require.Equal(t, - testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - gittest.FilesOrReftables( - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - ), - ), - repoInfo.GetReferences().GetReferenceBackend(), - ) + for _, tc := range []struct { + desc string + setup func(repoProto *gitalypb.Repository, commitID git.ObjectID) + expectedRegistrations int + expectedCancellations int + }{ + { + desc: "unary accessor", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + request := &gitalypb.FindCommitRequest{ + Repository: repoProto, + Revision: []byte("main"), + } + + client := gitalypb.NewCommitServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + _, err := client.FindCommit(ctx, request) + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + }, + { + desc: "unary mutator", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + client := gitalypb.NewRefServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + _, err := client.DeleteRefs(ctx, &gitalypb.DeleteRefsRequest{ + Repository: repoProto, + Refs: [][]byte{ + []byte("refs/heads/main"), + }, + }) + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 3, 0), + }, + { + desc: "stream accessor", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + client := gitalypb.NewRefServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + stream, err := client.FindAllBranches(ctx, &gitalypb.FindAllBranchesRequest{ + Repository: repoProto, + }) + require.NoError(t, err) + + _, err = testhelper.ReceiveAndFold(stream.Recv, func( + result []*gitalypb.FindAllBranchesResponse_Branch, + response *gitalypb.FindAllBranchesResponse, + ) []*gitalypb.FindAllBranchesResponse_Branch { + if response == nil { + return result + } + + return append(result, response.GetBranches()...) + }) + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + }, + { + desc: "stream mutator", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + client := gitalypb.NewRefServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + updater, err := client.UpdateReferences(ctx) + require.NoError(t, err) + + err = updater.Send(&gitalypb.UpdateReferencesRequest{ + Repository: repoProto, + Updates: []*gitalypb.UpdateReferencesRequest_Update{ + { + Reference: []byte("refs/heads/fun"), + OldObjectId: []byte(gittest.DefaultObjectHash.ZeroOID), + NewObjectId: []byte(commitID), + }, + }, + }) + require.NoError(t, err) + + _, err = updater.CloseAndRecv() + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 3, 0), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // Reset the mock migrator count for each test + mockMigrator.registerCount, mockMigrator.cancelCount = 0, 0 + + // To avoid migration being triggered by CreateRepository, we trick into + // believing that the repository has already completed a migraiton. + relativePath := gittest.NewRepositoryName(t) + key := migrationKey(cfg.Storages[0].Name, relativePath) + reftableMigrator.state.Store(key, migratorState{completed: true}) + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: relativePath, + }) + + reftableMigrator.state.Delete(key) + + commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), + gittest.WithTreeEntries( + gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, + ), + ) + + tc.setup(repoProto, commitID) + + // Block to ensure the previous migration was successful. + reftableMigrator.migrateCh <- migrationData{} + + repoInfo, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: repoProto, + }) + require.NoError(t, err) + + require.Equal(t, + testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, + gittest.FilesOrReftables( + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, + ), + ), + repoInfo.GetReferences().GetReferenceBackend(), + ) + + require.Equal(t, tc.expectedRegistrations, mockMigrator.registerCount) + require.Equal(t, tc.expectedCancellations, mockMigrator.cancelCount) + }) + } reftableMigrator.Close() } -- GitLab From 9d11420ecf992f5296e5dd3f98135a13165a08f7 Mon Sep 17 00:00:00 2001 From: Karthik Nayak Date: Fri, 11 Jul 2025 23:53:39 +0200 Subject: [PATCH 4/4] reftable: Fix bug where cancellation func wasn't set The reftable migrator stores a cancellation func in the current state, so any incoming writes to the repository can trigger a cancellation which would stop the migration to avoid conflicts. However, there is a small bug here where if there was already existing state, we don't store the cancellation. This is because we use a `LoadOrStore`, but don't store if the Load branch is triggered. Fix this by explicitly storing in such a situation. Add a test for the same. --- .../partition/migration/reftable/migrator.go | 17 +++++++--- .../migration/reftable/migrator_test.go | 34 +++++++++++++++++-- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go index 8d0d88a8e6..33c586dac6 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go @@ -142,16 +142,23 @@ func (m *migrator) Run() { ctx, cancel := context.WithCancel(m.ctx) - val, _ := m.state.LoadOrStore( - migrationKey(data.storageName, data.relativePath), - migratorState{cancelCtx: cancel}, - ) + key := migrationKey(data.storageName, data.relativePath) + + val, ok := m.state.LoadOrStore(key, migratorState{cancelCtx: cancel}) state := val.(migratorState) + + // If the state was present, we still need to store our + // cancellation function. + if ok { + state.cancelCtx = cancel + m.state.Store(key, state) + } + // We don't do 'defer m.state.Store(...)' here, because that would // fix the state as is here. We want to delay the evaluvation of the // state defer func() { - m.state.Store(migrationKey(data.storageName, data.relativePath), state) + m.state.Store(key, state) }() if state.completed || state.coolDown.After(time.Now()) { diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go index ea7be55360..f6dfb62e3a 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go @@ -230,6 +230,7 @@ func TestMigrator(t *testing.T) { run func(m *migrator, repo *gitalypb.Repository) migrationHandler migrationHandler repo *gitalypb.Repository + startState *migratorState } for _, tc := range []struct { @@ -262,6 +263,30 @@ func TestMigrator(t *testing.T) { attempts: 1, expectedLogMsg: "migration failed for repository", }, + { + desc: "existing state, cancelled migration", + setup: func() setupData { + ch := make(chan struct{}) + + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + run: func(m *migrator, repo *gitalypb.Repository) { + ch <- struct{}{} + m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) + ch <- struct{}{} + }, + migrationHandler: &mockMigrationHandler{ch: ch}, + repo: repo, + startState: &migratorState{attempts: 3}, + } + }, + completed: false, + attempts: 4, + expectedLogMsg: "migration failed for repository", + }, { desc: "repository not found error", setup: func() setupData { @@ -315,6 +340,9 @@ func TestMigrator(t *testing.T) { } storageName := cfg.Storages[0].Name + if data.startState != nil { + m.state.Store(migrationKey(storageName, data.repo.GetRelativePath()), *data.startState) + } m.Run() defer m.Close() @@ -322,8 +350,10 @@ func TestMigrator(t *testing.T) { // It is not guaranteed that the migration is registered, so run it in a // loop until it is. for { - if _, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())); ok { - break + if val, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())); ok { + if val.(migratorState).cancelCtx != nil { + break + } } m.RegisterMigration(storageName, data.repo.GetRelativePath()) -- GitLab