diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 3eb91de9196d8ecc16bc951de925f38b4d8545a7..7769bab78dd2db5501187df34f827e1d94435a76 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -188,7 +188,7 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors - processedRepos map[string]map[*gitalypb.Repository]struct{} + processedRepos map[string]map[repositoryKey]struct{} processedReposMu sync.Mutex } @@ -204,7 +204,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, - processedRepos: make(map[string]map[*gitalypb.Repository]struct{}), + processedRepos: make(map[string]map[repositoryKey]struct{}), } for _, opt := range opts { @@ -262,7 +262,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() (processedRepos map[string]map[*gitalypb.Repository]struct{}, err error) { +func (p *Pipeline) Done() (processedRepos map[string]map[repositoryKey]struct{}, err error) { close(p.done) p.workerWg.Wait() @@ -338,9 +338,9 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { storageName := cmd.Repository().StorageName p.processedReposMu.Lock() if _, ok := p.processedRepos[storageName]; !ok { - p.processedRepos[storageName] = make(map[*gitalypb.Repository]struct{}) + p.processedRepos[storageName] = make(map[repositoryKey]struct{}) } - p.processedRepos[storageName][cmd.Repository()] = struct{}{} + p.processedRepos[storageName][NewRepositoryKey(cmd.Repository())] = struct{}{} p.processedReposMu.Unlock() log.Info(fmt.Sprintf("completed %s", cmd.Name())) @@ -381,3 +381,10 @@ func (p *Pipeline) releaseWorkerSlot() { } <-p.totalWorkers } + +type repositoryKey string + +// NewRepositoryKey returns a unique identifier for the provided repo. +func NewRepositoryKey(repo *gitalypb.Repository) repositoryKey { + return repositoryKey(repo.StorageName + "-" + repo.RelativePath) +} diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 37f7c2e5b2befcd8e43fe0002f1cbae950c0312c..9d05eae7434ec16fc5ec257d9c4ea358e9eab8d0 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -323,16 +323,23 @@ func TestPipelineError(t *testing.T) { func TestPipelineProcessedRepos(t *testing.T) { strategy := MockStrategy{} - repos := map[string]map[*gitalypb.Repository]struct{}{ + repos := []*gitalypb.Repository{ + {RelativePath: "a.git", StorageName: "storage1"}, + {RelativePath: "b.git", StorageName: "storage1"}, + {RelativePath: "c.git", StorageName: "storage2"}, + {RelativePath: "d.git", StorageName: "storage3"}, + } + + expectedProcessedRepos := map[string]map[repositoryKey]struct{}{ "storage1": { - &gitalypb.Repository{RelativePath: "a.git", StorageName: "storage1"}: struct{}{}, - &gitalypb.Repository{RelativePath: "b.git", StorageName: "storage1"}: struct{}{}, + "storage1-a.git": {}, + "storage1-b.git": {}, }, "storage2": { - &gitalypb.Repository{RelativePath: "c.git", StorageName: "storage2"}: struct{}{}, + "storage2-c.git": {}, }, "storage3": { - &gitalypb.Repository{RelativePath: "d.git", StorageName: "storage3"}: struct{}{}, + "storage3-d.git": {}, }, } @@ -340,13 +347,11 @@ func TestPipelineProcessedRepos(t *testing.T) { require.NoError(t, err) ctx := testhelper.Context(t) - for _, v := range repos { - for repo := range v { - p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) - } + for _, repo := range repos { + p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) } processedRepos, err := p.Done() require.NoError(t, err) - require.EqualValues(t, repos, processedRepos) + require.EqualValues(t, expectedProcessedRepos, processedRepos) } diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index defffa2231dc3ee4912a429f97569180a3dad26a..7a3833f80b9c379fdf5f05443a783b83f5156f5a 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -189,7 +189,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin var removalErrors []error for storageName, repos := range existingRepos { for _, repo := range repos { - if dangling := restoredRepos[storageName][repo]; dangling == struct{}{} { + if _, ok := restoredRepos[storageName][backup.NewRepositoryKey(repo)]; !ok { // If we have dangling repos (those which exist in the storage but // weren't part of the restore), they need to be deleted so the // state of repos in Gitaly matches that in the Rails DB. diff --git a/internal/cli/gitalybackup/restore_test.go b/internal/cli/gitalybackup/restore_test.go index 6d235525c4031b613dc699cf35f22541e20e8723..4b1eee9aa09c6751375ce8499119f00f66b8362e 100644 --- a/internal/cli/gitalybackup/restore_test.go +++ b/internal/cli/gitalybackup/restore_test.go @@ -26,13 +26,6 @@ import ( func TestRestoreSubcommand(t *testing.T) { gittest.SkipWithSHA256(t) - testhelper.SkipWithWAL(t, ` -RemoveAll is removing the entire content of the storage. This would also remove the database's and -the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and -the database and only then perform the removal. - -Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) - ctx := testhelper.Context(t) cfg := testcfg.Build(t) @@ -40,24 +33,32 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + // This is an example of a "dangling" repository (one that was created after a backup was taken) that should be + // removed after the backup is restored. existingRepo, existRepoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ RelativePath: "existing_repo", }) gittest.WriteCommit(t, cfg, existRepoPath, gittest.WithBranch(git.DefaultBranch)) - path := testhelper.TempDir(t) - existingRepoBundlePath := filepath.Join(path, existingRepo.RelativePath+".bundle") - existingRepoRefPath := filepath.Join(path, existingRepo.RelativePath+".refs") + // The backupDir contains the artifacts that would've been created as part of a backup. + backupDir := testhelper.TempDir(t) + existingRepoBundlePath := filepath.Join(backupDir, existingRepo.RelativePath+".bundle") + existingRepoRefPath := filepath.Join(backupDir, existingRepo.RelativePath+".refs") gittest.Exec(t, cfg, "-C", existRepoPath, "bundle", "create", existingRepoBundlePath, "--all") require.NoError(t, os.WriteFile(existingRepoRefPath, gittest.Exec(t, cfg, "-C", existRepoPath, "show-ref"), perm.SharedFile)) + // These repos are the ones being restored, and should exist after the restore. var repos []*gitalypb.Repository for i := 0; i < 2; i++ { - repo := gittest.InitRepoDir(t, cfg.Storages[0].Path, fmt.Sprintf("repo-%d", i)) - repoBundlePath := filepath.Join(path, repo.RelativePath+".bundle") - repoRefPath := filepath.Join(path, repo.RelativePath+".refs") + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: fmt.Sprintf("repo-%d", i), + Storage: cfg.Storages[0], + }) + + repoBundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") testhelper.CopyFile(t, existingRepoBundlePath, repoBundlePath) + repoRefPath := filepath.Join(backupDir, repo.RelativePath+".refs") testhelper.CopyFile(t, existingRepoRefPath, repoRefPath) repos = append(repos, repo) } @@ -81,7 +82,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) progname, "restore", "--path", - path, + backupDir, "--parallel", strconv.Itoa(runtime.NumCPU()), "--parallel-storage", @@ -101,9 +102,10 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.NoDirExists(t, existRepoPath) + // Ensure the repos were restored correctly. for _, repo := range repos { repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo)) - bundlePath := filepath.Join(path, repo.RelativePath+".bundle") + bundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") output := gittest.Exec(t, cfg, "-C", repoPath, "bundle", "verify", bundlePath) require.Contains(t, string(output), "The bundle records a complete history") @@ -113,17 +115,10 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) func TestRestoreSubcommand_serverSide(t *testing.T) { gittest.SkipWithSHA256(t) - testhelper.SkipWithWAL(t, ` -RemoveAll is removing the entire content of the storage. This would also remove the database's and -the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and -the database and only then perform the removal. - -Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) - ctx := testhelper.Context(t) - path := testhelper.TempDir(t) - backupSink, err := backup.ResolveSink(ctx, path) + backupDir := testhelper.TempDir(t) + backupSink, err := backup.ResolveSink(ctx, backupDir) require.NoError(t, err) backupLocator, err := backup.ResolveLocator("pointer", backupSink) @@ -142,18 +137,22 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) }) gittest.WriteCommit(t, cfg, existRepoPath, gittest.WithBranch(git.DefaultBranch)) - existingRepoBundlePath := filepath.Join(path, existingRepo.RelativePath+".bundle") - existingRepoRefPath := filepath.Join(path, existingRepo.RelativePath+".refs") + existingRepoBundlePath := filepath.Join(backupDir, existingRepo.RelativePath+".bundle") + existingRepoRefPath := filepath.Join(backupDir, existingRepo.RelativePath+".refs") gittest.Exec(t, cfg, "-C", existRepoPath, "bundle", "create", existingRepoBundlePath, "--all") require.NoError(t, os.WriteFile(existingRepoRefPath, gittest.Exec(t, cfg, "-C", existRepoPath, "show-ref"), perm.SharedFile)) var repos []*gitalypb.Repository for i := 0; i < 2; i++ { - repo := gittest.InitRepoDir(t, cfg.Storages[0].Path, fmt.Sprintf("repo-%d", i)) - repoBundlePath := filepath.Join(path, repo.RelativePath+".bundle") - repoRefPath := filepath.Join(path, repo.RelativePath+".refs") + repo, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: fmt.Sprintf("repo-%d", i), + Storage: cfg.Storages[0], + }) + + repoBundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") testhelper.CopyFile(t, existingRepoBundlePath, repoBundlePath) + repoRefPath := filepath.Join(backupDir, repo.RelativePath+".refs") testhelper.CopyFile(t, existingRepoRefPath, repoRefPath) repos = append(repos, repo) } @@ -199,7 +198,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) for _, repo := range repos { repoPath := filepath.Join(cfg.Storages[0].Path, gittest.GetReplicaPath(t, ctx, cfg, repo)) - bundlePath := filepath.Join(path, repo.RelativePath+".bundle") + bundlePath := filepath.Join(backupDir, repo.RelativePath+".bundle") output := gittest.Exec(t, cfg, "-C", repoPath, "bundle", "verify", bundlePath) require.Contains(t, string(output), "The bundle records a complete history")