diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3924d11369575d23d5618a08be4ee1aabe143bff..49a6f6ae688a70b51c778f75bd4a372130dfaec7 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -315,9 +315,10 @@ test:pgbouncer: test:reftable: <<: *test_definition - variables: - <<: *test_variables - TEST_TARGET: test-with-reftable + parallel: + matrix: + - TEST_TARGET: [test, test-wal, test-raft, test-with-praefect-wal] + GITALY_TEST_REF_FORMAT: "reftable" test:nightly: <<: *test_definition diff --git a/internal/gitaly/service/repository/calculate_checksum_test.go b/internal/gitaly/service/repository/calculate_checksum_test.go index 9505687f3b94e4a2e770a7c54955e99f7895a2fa..b2b850b142228fc59d9174a2941bcd23af0f943d 100644 --- a/internal/gitaly/service/repository/calculate_checksum_test.go +++ b/internal/gitaly/service/repository/calculate_checksum_test.go @@ -190,6 +190,9 @@ func TestCalculateChecksum(t *testing.T) { reftableFileContent[i+headRefIdx] = nope[i] } + // When the WAL is enabled, files in the canonical repository will have read-only permissions. + // We need to first remove the file and re-create it. + require.NoError(t, os.Remove(reftableFilePath)) require.NoError(t, os.WriteFile(reftableFilePath, reftableFileContent, os.ModePerm)) } else { commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch(git.DefaultBranch)) diff --git a/internal/gitaly/service/repository/restore_repository_test.go b/internal/gitaly/service/repository/restore_repository_test.go index 90faef013ab7bb427f27bbf4858e7d42e0ad86d5..bbc4672ea6f106a2053d6dff24e6beca7867d430 100644 --- a/internal/gitaly/service/repository/restore_repository_test.go +++ b/internal/gitaly/service/repository/restore_repository_test.go @@ -16,6 +16,8 @@ import ( ) func TestRestoreRepository(t *testing.T) { + testhelper.SkipWithReftable(t, "This needs a fix on the restore package, which is being done in #6786") + t.Parallel() ctx := testhelper.Context(t) diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go index 56e5bd1328dab3558285be50da313a98dc273e83..d9acbfb06b458a677b7a078f6c1073feff5a7537 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware.go @@ -39,9 +39,15 @@ func NewUnaryInterceptor(logger log.Logger, registry *protoregistry.Registry, re targetRepo = tx.OriginalRepository(targetRepo) - if methodInfo.Operation == protoregistry.OpAccessor { + switch methodInfo.Operation { + case protoregistry.OpAccessor: register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - } else { + case protoregistry.OpMutator: + defer register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) + fallthrough + default: + // Cancel any ongoing migrations to avoid conflicts + // but schedule one to start after we serve the request register.CancelMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) } } @@ -84,9 +90,15 @@ func NewStreamInterceptor(logger log.Logger, registry *protoregistry.Registry, r targetRepo = tx.OriginalRepository(targetRepo) - if methodInfo.Operation == protoregistry.OpAccessor { + switch methodInfo.Operation { + case protoregistry.OpAccessor: register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) - } else { + case protoregistry.OpMutator: + defer register.RegisterMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) + fallthrough + default: + // Cancel any ongoing migrations to avoid conflicts + // but schedule one to start after we serve the request register.CancelMigration(targetRepo.GetStorageName(), targetRepo.GetRelativePath()) } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go index 33cd2e53f363e0f166907a941941b300d49bab2d..feaa08f01935cb5eb82788b546c7a168481c17ed 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/middleware_test.go @@ -12,6 +12,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/commit" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/hook" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/ref" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" @@ -24,172 +25,210 @@ import ( "google.golang.org/grpc" ) -func TestUnaryInterceptor(t *testing.T) { - t.Parallel() - - testhelper.NewFeatureSets( - featureflag.ReftableMigration, - ).Run(t, testUnaryInterceptor) +type mockReftableMigrator struct { + registerCount int + cancelCount int + reftableMigrator *migrator } -func testUnaryInterceptor(t *testing.T, ctx context.Context) { - cfg := testcfg.Build(t) - - if !testhelper.IsWALEnabled() { - t.Skip("only works with the WAL") - } - - var reftableMigrator *migrator - callback := func(logger log.Logger, node storage.Node, factory localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) { - reftableMigrator = NewMigrator(logger, NewMetrics(), node, factory) - reftableMigrator.Run() - - return []grpc.UnaryServerInterceptor{NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator)}, - []grpc.StreamServerInterceptor{NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator)} - } - - serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { - gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(deps)) - gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps)) - gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps)) - }, testserver.WithTransactionInterceptors(callback)) - cfg.SocketPath = serverSocketPath - - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) - - repo := localrepo.NewTestRepo(t, cfg, repoProto) - backend, err := repo.ReferenceBackend(ctx) - require.NoError(t, err) - if backend == git.ReferenceBackendReftables { - return - } - - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), gittest.WithTreeEntries( - gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, - )) - request := &gitalypb.FindCommitRequest{ - Repository: repoProto, - Revision: []byte("main"), - } - - conn, err := client.New(ctx, serverSocketPath) - require.NoError(t, err) - t.Cleanup(func() { conn.Close() }) - - client := gitalypb.NewCommitServiceClient(conn) - - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx = testhelper.MergeOutgoingMetadata(ctx, md) - - _, err = client.FindCommit(ctx, request) - require.NoError(t, err) - - // Block to ensure the previous migration was successful. - reftableMigrator.migrateCh <- migrationData{} - - repoInfo, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ - Repository: repoProto, - }) - require.NoError(t, err) - - require.Equal(t, - testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, - ), - repoInfo.GetReferences().GetReferenceBackend(), - ) +func (m *mockReftableMigrator) RegisterMigration(storageName, relativePath string) { + m.registerCount++ + m.reftableMigrator.RegisterMigration(storageName, relativePath) +} - reftableMigrator.Close() +func (m *mockReftableMigrator) CancelMigration(storageName, relativePath string) { + m.cancelCount++ + m.reftableMigrator.CancelMigration(storageName, relativePath) } -func TestStreamInterceptor(t *testing.T) { +func TestInterceptor(t *testing.T) { t.Parallel() testhelper.NewFeatureSets( featureflag.ReftableMigration, - ).Run(t, testStreamInterceptor) + ).Run(t, testInterceptor) } -func testStreamInterceptor(t *testing.T, ctx context.Context) { +func testInterceptor(t *testing.T, ctx context.Context) { cfg := testcfg.Build(t) if !testhelper.IsWALEnabled() { t.Skip("only works with the WAL") } + mockMigrator := mockReftableMigrator{} var reftableMigrator *migrator callback := func(logger log.Logger, node storage.Node, factory localrepo.Factory) ([]grpc.UnaryServerInterceptor, []grpc.StreamServerInterceptor) { reftableMigrator = NewMigrator(logger, NewMetrics(), node, factory) reftableMigrator.Run() - return []grpc.UnaryServerInterceptor{NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator)}, - []grpc.StreamServerInterceptor{NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, reftableMigrator)} + mockMigrator.reftableMigrator = reftableMigrator + + return []grpc.UnaryServerInterceptor{NewUnaryInterceptor(logger, protoregistry.GitalyProtoPreregistered, &mockMigrator)}, + []grpc.StreamServerInterceptor{NewStreamInterceptor(logger, protoregistry.GitalyProtoPreregistered, &mockMigrator)} } serverSocketPath := testserver.RunGitalyServer(t, cfg, func(srv *grpc.Server, deps *service.Dependencies) { gitalypb.RegisterCommitServiceServer(srv, commit.NewServer(deps)) + gitalypb.RegisterRefServiceServer(srv, ref.NewServer(deps)) gitalypb.RegisterRepositoryServiceServer(srv, repository.NewServer(deps)) gitalypb.RegisterHookServiceServer(srv, hook.NewServer(deps)) }, testserver.WithTransactionInterceptors(callback)) cfg.SocketPath = serverSocketPath - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) - - repo := localrepo.NewTestRepo(t, cfg, repoProto) - backend, err := repo.ReferenceBackend(ctx) - require.NoError(t, err) - if backend == git.ReferenceBackendReftables { - return - } - - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), gittest.WithTreeEntries( - gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, - )) - request := &gitalypb.FindCommitsRequest{ - Repository: repoProto, - Revision: []byte("main"), - } - conn, err := client.New(ctx, serverSocketPath) require.NoError(t, err) t.Cleanup(func() { conn.Close() }) - client := gitalypb.NewCommitServiceClient(conn) - - md := testcfg.GitalyServersMetadataFromCfg(t, cfg) - ctx = testhelper.MergeOutgoingMetadata(ctx, md) - - stream, err := client.FindCommits(ctx, request) - require.NoError(t, err) - - _, err = testhelper.ReceiveAndFold(stream.Recv, func( - result []*gitalypb.GitCommit, - response *gitalypb.FindCommitsResponse, - ) []*gitalypb.GitCommit { - if response == nil { - return result - } - - return append(result, response.GetCommits()...) - }) - require.NoError(t, err) - - // Block to ensure the previous migration was successful. - reftableMigrator.migrateCh <- migrationData{} - - repoInfo, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ - Repository: repoProto, - }) - require.NoError(t, err) - - require.Equal(t, - testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, - gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, - ), - repoInfo.GetReferences().GetReferenceBackend(), - ) + for _, tc := range []struct { + desc string + setup func(repoProto *gitalypb.Repository, commitID git.ObjectID) + expectedRegistrations int + expectedCancellations int + }{ + { + desc: "unary accessor", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + request := &gitalypb.FindCommitRequest{ + Repository: repoProto, + Revision: []byte("main"), + } + + client := gitalypb.NewCommitServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + _, err := client.FindCommit(ctx, request) + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + }, + { + desc: "unary mutator", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + client := gitalypb.NewRefServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + _, err := client.DeleteRefs(ctx, &gitalypb.DeleteRefsRequest{ + Repository: repoProto, + Refs: [][]byte{ + []byte("refs/heads/main"), + }, + }) + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 3, 0), + }, + { + desc: "stream accessor", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + client := gitalypb.NewRefServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + stream, err := client.FindAllBranches(ctx, &gitalypb.FindAllBranchesRequest{ + Repository: repoProto, + }) + require.NoError(t, err) + + _, err = testhelper.ReceiveAndFold(stream.Recv, func( + result []*gitalypb.FindAllBranchesResponse_Branch, + response *gitalypb.FindAllBranchesResponse, + ) []*gitalypb.FindAllBranchesResponse_Branch { + if response == nil { + return result + } + + return append(result, response.GetBranches()...) + }) + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 2, 0), + }, + { + desc: "stream mutator", + setup: func(repoProto *gitalypb.Repository, commitID git.ObjectID) { + client := gitalypb.NewRefServiceClient(conn) + + md := testcfg.GitalyServersMetadataFromCfg(t, cfg) + ctx = testhelper.MergeOutgoingMetadata(ctx, md) + + updater, err := client.UpdateReferences(ctx) + require.NoError(t, err) + + err = updater.Send(&gitalypb.UpdateReferencesRequest{ + Repository: repoProto, + Updates: []*gitalypb.UpdateReferencesRequest_Update{ + { + Reference: []byte("refs/heads/fun"), + OldObjectId: []byte(gittest.DefaultObjectHash.ZeroOID), + NewObjectId: []byte(commitID), + }, + }, + }) + require.NoError(t, err) + + _, err = updater.CloseAndRecv() + require.NoError(t, err) + }, + expectedRegistrations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 4, 0), + expectedCancellations: testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, 3, 0), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // Reset the mock migrator count for each test + mockMigrator.registerCount, mockMigrator.cancelCount = 0, 0 + + // To avoid migration being triggered by CreateRepository, we trick into + // believing that the repository has already completed a migraiton. + relativePath := gittest.NewRepositoryName(t) + key := migrationKey(cfg.Storages[0].Name, relativePath) + reftableMigrator.state.Store(key, migratorState{completed: true}) + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: relativePath, + }) + + reftableMigrator.state.Delete(key) + + commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main"), + gittest.WithTreeEntries( + gittest.TreeEntry{Mode: "100644", Path: "foo", Content: "bar"}, + ), + ) + + tc.setup(repoProto, commitID) + + // Block to ensure the previous migration was successful. + reftableMigrator.migrateCh <- migrationData{} + + repoInfo, err := gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: repoProto, + }) + require.NoError(t, err) + + require.Equal(t, + testhelper.EnabledOrDisabledFlag(ctx, featureflag.ReftableMigration, + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, + gittest.FilesOrReftables( + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_FILES, + gitalypb.RepositoryInfoResponse_ReferencesInfo_REFERENCE_BACKEND_REFTABLE, + ), + ), + repoInfo.GetReferences().GetReferenceBackend(), + ) + + require.Equal(t, tc.expectedRegistrations, mockMigrator.registerCount) + require.Equal(t, tc.expectedCancellations, mockMigrator.cancelCount) + }) + } reftableMigrator.Close() } diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go index 0956d48cd97ae2c154cbf789bcfc23cf10ce79fd..33c586dac62f8119773716c6749c7d8d432d7478 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator.go @@ -142,16 +142,23 @@ func (m *migrator) Run() { ctx, cancel := context.WithCancel(m.ctx) - val, _ := m.state.LoadOrStore( - migrationKey(data.storageName, data.relativePath), - migratorState{cancelCtx: cancel}, - ) + key := migrationKey(data.storageName, data.relativePath) + + val, ok := m.state.LoadOrStore(key, migratorState{cancelCtx: cancel}) state := val.(migratorState) + + // If the state was present, we still need to store our + // cancellation function. + if ok { + state.cancelCtx = cancel + m.state.Store(key, state) + } + // We don't do 'defer m.state.Store(...)' here, because that would // fix the state as is here. We want to delay the evaluvation of the // state defer func() { - m.state.Store(migrationKey(data.storageName, data.relativePath), state) + m.state.Store(key, state) }() if state.completed || state.coolDown.After(time.Now()) { @@ -159,6 +166,11 @@ func (m *migrator) Run() { } latency, err := m.migrate(ctx, data.storageName, data.relativePath) + // We shouldn't care about migration status for repositories which don't + // event exist. + if errors.Is(err, storage.ErrRepositoryNotFound) { + return + } state.attempts = state.attempts + 1 state.cancelCtx = nil diff --git a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go index 274fbea86ae8ed1c029750655270d02f94b0a6bf..f6dfb62e3a61e3d70f2a8eb1a97b6d3da481940a 100644 --- a/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go +++ b/internal/gitaly/storage/storagemgr/partition/migration/reftable/migrator_test.go @@ -24,7 +24,8 @@ import ( ) type mockMigrationHandler struct { - ch <-chan struct{} + ch <-chan struct{} + err error } func (m *mockMigrationHandler) Migrate(ctx context.Context, tx storage.Transaction, storageName string, relativePath string) error { @@ -33,6 +34,10 @@ func (m *mockMigrationHandler) Migrate(ctx context.Context, tx storage.Transacti <-m.ch } + if m.err != nil { + return m.err + } + return nil } @@ -221,37 +226,105 @@ func TestMigrator(t *testing.T) { require.NoError(t, err) defer ptnMgr.Close() + type setupData struct { + run func(m *migrator, repo *gitalypb.Repository) + migrationHandler migrationHandler + repo *gitalypb.Repository + startState *migratorState + } + for _, tc := range []struct { desc string - setup func() (func(m *migrator, repo *gitalypb.Repository), migrationHandler) + setup func() setupData completed bool + attempts uint expectedLogMsg string }{ { desc: "cancelled migration", - setup: func() (func(m *migrator, repo *gitalypb.Repository), migrationHandler) { + setup: func() setupData { + ch := make(chan struct{}) + + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + run: func(m *migrator, repo *gitalypb.Repository) { + ch <- struct{}{} + m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) + ch <- struct{}{} + }, + migrationHandler: &mockMigrationHandler{ch: ch}, + repo: repo, + } + }, + completed: false, + attempts: 1, + expectedLogMsg: "migration failed for repository", + }, + { + desc: "existing state, cancelled migration", + setup: func() setupData { ch := make(chan struct{}) - return func(m *migrator, repo *gitalypb.Repository) { - ch <- struct{}{} - m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) - ch <- struct{}{} - }, &mockMigrationHandler{ch: ch} + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + run: func(m *migrator, repo *gitalypb.Repository) { + ch <- struct{}{} + m.CancelMigration(cfg.Storages[0].Name, repo.GetRelativePath()) + ch <- struct{}{} + }, + migrationHandler: &mockMigrationHandler{ch: ch}, + repo: repo, + startState: &migratorState{attempts: 3}, + } }, completed: false, + attempts: 4, expectedLogMsg: "migration failed for repository", }, + { + desc: "repository not found error", + setup: func() setupData { + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + run: func(m *migrator, repo *gitalypb.Repository) {}, + migrationHandler: &mockMigrationHandler{err: storage.ErrRepositoryNotFound}, + repo: repo, + } + }, + // When we encounter a ErrRepositoryNotFound error, we simply + // skip the migration and don't mark it as completed or attempted. + completed: false, + attempts: 0, + }, { desc: "successful migration", - setup: func() (func(m *migrator, repo *gitalypb.Repository), migrationHandler) { - return func(m *migrator, repo *gitalypb.Repository) {}, &mockMigrationHandler{} + setup: func() setupData { + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + + return setupData{ + run: func(m *migrator, repo *gitalypb.Repository) {}, + migrationHandler: &mockMigrationHandler{}, + repo: repo, + } }, completed: true, + attempts: 1, expectedLogMsg: "migration successful for repository", }, } { t.Run(tc.desc, func(t *testing.T) { - run, migrationHandler := tc.setup() + data := tc.setup() parentCtx, parentCancel := context.WithCancel(context.Background()) m := &migrator{ @@ -261,16 +334,15 @@ func TestMigrator(t *testing.T) { metrics: metrics, node: ptnMgr, state: sync.Map{}, - migrationHandler: migrationHandler, + migrationHandler: data.migrationHandler, ctx: parentCtx, ctxCancel: parentCancel, } storageName := cfg.Storages[0].Name - - repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) + if data.startState != nil { + m.state.Store(migrationKey(storageName, data.repo.GetRelativePath()), *data.startState) + } m.Run() defer m.Close() @@ -278,27 +350,31 @@ func TestMigrator(t *testing.T) { // It is not guaranteed that the migration is registered, so run it in a // loop until it is. for { - if _, ok := m.state.Load(migrationKey(storageName, repo.GetRelativePath())); ok { - break + if val, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())); ok { + if val.(migratorState).cancelCtx != nil { + break + } } - m.RegisterMigration(storageName, repo.GetRelativePath()) + m.RegisterMigration(storageName, data.repo.GetRelativePath()) } - run(m, repo) + data.run(m, data.repo) // Block till the old migration is complete. m.migrateCh <- migrationData{} - val, ok := m.state.Load(migrationKey(storageName, repo.GetRelativePath())) + val, ok := m.state.Load(migrationKey(storageName, data.repo.GetRelativePath())) state := val.(migratorState) require.True(t, ok) require.Equal(t, tc.completed, state.completed) - require.Equal(t, uint(1), state.attempts) + require.Equal(t, tc.attempts, state.attempts) - entries := hook.AllEntries() - require.Equal(t, tc.expectedLogMsg, entries[len(entries)-1].Message) + if tc.expectedLogMsg != "" { + entries := hook.AllEntries() + require.Equal(t, tc.expectedLogMsg, entries[len(entries)-1].Message) + } }) } }