From f33c47e7bf554ecc9cc08e0a0a9397ea88dc5aaa Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 12 Dec 2023 14:12:04 +1100 Subject: [PATCH 1/8] backup: Track repos that have been processed Adds a map to the Pipeline to track repos that have been restored or backed up. A mutex is used to synchronise access to the map, as entries are appended by goroutines operating in the workers. The signature of Done() is modified to return the map, and is intentionally ignored in the actual backup and restore logic for now. A subsequent commit will utilise the map for restore operations. --- internal/backup/pipeline.go | 17 ++++++++++--- internal/backup/pipeline_test.go | 37 +++++++++++++++++++++++++--- internal/cli/gitalybackup/create.go | 2 +- internal/cli/gitalybackup/restore.go | 3 ++- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 5672493acd..8000f61af5 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -181,6 +181,9 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors + + processedRepos map[string][]*gitalypb.Repository + processedReposMu sync.Mutex } // NewPipeline creates a pipeline that executes backup and restore jobs. @@ -195,6 +198,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, + processedRepos: make(map[string][]*gitalypb.Repository), } for _, opt := range opts { @@ -252,19 +256,19 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() error { +func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { close(p.done) p.workerWg.Wait() if p.pipelineError != nil { - return fmt.Errorf("pipeline: %w", p.pipelineError) + return nil, fmt.Errorf("pipeline: %w", p.pipelineError) } if len(p.cmdErrors.errs) > 0 { - return fmt.Errorf("pipeline: %w", p.cmdErrors) + return nil, fmt.Errorf("pipeline: %w", p.cmdErrors) } - return nil + return p.processedRepos, nil } // getWorker finds the channel associated with a storage. When no channel is @@ -325,6 +329,11 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { return } + storageName := cmd.Repository().StorageName + p.processedReposMu.Lock() + p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + p.processedReposMu.Unlock() + log.Info(fmt.Sprintf("completed %s", cmd.Name())) } diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 04c539a5f4..8545c905ee 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -98,7 +98,8 @@ func TestPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage1"}})) p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "storage2"}})) } - require.NoError(t, p.Done()) + _, err = p.Done() + require.NoError(t, err) }) } }) @@ -115,7 +116,8 @@ func TestPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}})) - require.EqualError(t, p.Done(), "pipeline: context canceled") + _, err = p.Done() + require.EqualError(t, err, "pipeline: context canceled") }) } @@ -222,7 +224,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) { require.Equal(t, tc.level, logEntry.Level) } - err := p.Done() + _, err := p.Done() if tc.level == logrus.ErrorLevel { require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n") @@ -258,7 +260,7 @@ func testPipeline(t *testing.T, init func() *Pipeline) { for _, cmd := range commands { p.Handle(ctx, cmd) } - err := p.Done() + _, err := p.Done() require.EqualError(t, err, "pipeline: 1 failures encountered:\n - c.git: assert.AnError general error for testing\n") }) } @@ -309,3 +311,30 @@ func TestPipelineError(t *testing.T) { }) } } + +func TestPipelineProcessedRepos(t *testing.T) { + strategy := MockStrategy{} + + repos := map[string][]*gitalypb.Repository{ + "storage1": { + {RelativePath: "a.git", StorageName: "storage1"}, + {RelativePath: "b.git", StorageName: "storage1"}, + }, + "storage2": {{RelativePath: "c.git", StorageName: "storage2"}}, + "storage3": {{RelativePath: "d.git", StorageName: "storage3"}}, + } + + p, err := NewPipeline(testhelper.SharedLogger(t)) + require.NoError(t, err) + + ctx := testhelper.Context(t) + for _, v := range repos { + for _, repo := range v { + p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) + } + } + + processedRepos, err := p.Done() + require.NoError(t, err) + require.EqualValues(t, repos, processedRepos) +} diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go index 4ad6534902..be7e75cdbf 100644 --- a/internal/cli/gitalybackup/create.go +++ b/internal/cli/gitalybackup/create.go @@ -174,7 +174,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i })) } - if err := pipeline.Done(); err != nil { + if _, err := pipeline.Done(); err != nil { return fmt.Errorf("create: %w", err) } return nil diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index de9e2cd3d3..7a304f2eda 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -157,6 +157,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin decoder := json.NewDecoder(stdin) for { + var req restoreRequest if err := decoder.Decode(&req); errors.Is(err, io.EOF) { break @@ -178,7 +179,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin })) } - if err := pipeline.Done(); err != nil { + if _, err := pipeline.Done(); err != nil { return fmt.Errorf("restore: %w", err) } return nil -- GitLab From bc2c4ec44c917f4b0d4996c1e0013b689a088607 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 12 Dec 2023 14:54:25 +1100 Subject: [PATCH 2/8] backup: Add RemoveRepository to the strategy Adds a new method to the Strategy interface used by regular and server-side backups for performing repository backups and restores. This new method removes a single repository from its storage, and will eventually replace the existing RemoveAllRepositories method. --- internal/backup/backup.go | 19 +++++++++++++ internal/backup/backup_test.go | 43 +++++++++++++++++++++++++++++ internal/backup/pipeline.go | 7 +++++ internal/backup/pipeline_test.go | 8 ++++++ internal/backup/server_side.go | 19 +++++++++++++ internal/backup/server_side_test.go | 35 +++++++++++++++++++++++ 6 files changed, 131 insertions(+) diff --git a/internal/backup/backup.go b/internal/backup/backup.go index 5eed0724a5..bb6bad6a45 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -230,6 +230,25 @@ func (mgr *Manager) RemoveAllRepositories(ctx context.Context, req *RemoveAllRep return nil } +// RemoveRepository removes the specified repository from its storage. +func (mgr *Manager) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { + if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil { + return fmt.Errorf("remove repo: set context: %w", err) + } + + repoClient, err := mgr.newRepoClient(ctx, req.Server) + if err != nil { + return fmt.Errorf("remove repo: create client: %w", err) + } + + _, err = repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: req.Repo}) + if err != nil { + return fmt.Errorf("remove repo: remove: %w", err) + } + + return nil +} + // Create creates a repository backup. func (mgr *Manager) Create(ctx context.Context, req *CreateRequest) error { if req.VanityRepository == nil { diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index 14341ec6a7..ca92ad5467 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -65,6 +65,49 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.NoError(t, err) } +func TestManager_RemoveRepository(t *testing.T) { + if testhelper.IsPraefectEnabled() { + t.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect") + } + + t.Parallel() + + cfg := testcfg.Build(t) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg) + commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) + gittest.WriteTag(t, cfg, repoPath, "v1.0.0", commitID.Revision()) + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + backupRoot := testhelper.TempDir(t) + sink := backup.NewFilesystemSink(backupRoot) + defer testhelper.MustClose(t, sink) + + locator, err := backup.ResolveLocator("pointer", sink) + require.NoError(t, err) + + fsBackup := backup.NewManager(sink, locator, pool) + err = fsBackup.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + Repo: repo, + }) + require.NoError(t, err) + require.NoDirExists(t, repoPath) + + // With an invalid repository + err = fsBackup.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + Repo: &gitalypb.Repository{StorageName: "nonexistent", RelativePath: "nonexistent"}, + }) + + require.EqualError(t, err, "remove repo: remove: rpc error: code = InvalidArgument desc = storage name not found") +} + func TestManager_Create(t *testing.T) { t.Parallel() diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 8000f61af5..1b969ccacd 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -16,6 +16,7 @@ import ( type Strategy interface { Create(context.Context, *CreateRequest) error Restore(context.Context, *RestoreRequest) error + RemoveRepository(context.Context, *RemoveRepositoryRequest) error RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error } @@ -52,6 +53,12 @@ type RestoreRequest struct { BackupID string } +// RemoveRepositoryRequest is a request to remove an individual repository from its storage. +type RemoveRepositoryRequest struct { + Server storage.ServerInfo + Repo *gitalypb.Repository +} + // RemoveAllRepositoriesRequest is the request to remove all repositories in the specified // storage name. type RemoveAllRepositoriesRequest struct { diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 8545c905ee..cd77fb5a79 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -125,6 +125,7 @@ type MockStrategy struct { CreateFunc func(context.Context, *CreateRequest) error RestoreFunc func(context.Context, *RestoreRequest) error RemoveAllRepositoriesFunc func(context.Context, *RemoveAllRepositoriesRequest) error + RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error } func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error { @@ -148,6 +149,13 @@ func (s MockStrategy) RemoveAllRepositories(ctx context.Context, req *RemoveAllR return nil } +func (s MockStrategy) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { + if s.RemoveRepositoryFunc != nil { + return s.RemoveRepositoryFunc(ctx, req) + } + return nil +} + func testPipeline(t *testing.T, init func() *Pipeline) { strategy := MockStrategy{ CreateFunc: func(_ context.Context, req *CreateRequest) error { diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go index 35654f2150..17382e66e5 100644 --- a/internal/backup/server_side.go +++ b/internal/backup/server_side.go @@ -112,6 +112,25 @@ func (ss ServerSideAdapter) RemoveAllRepositories(ctx context.Context, req *Remo return nil } +// RemoveRepository removes the specified repository from its storage. +func (ss ServerSideAdapter) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { + if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil { + return fmt.Errorf("server-side remove repo: set context: %w", err) + } + + repoClient, err := ss.newRepoClient(ctx, req.Server) + if err != nil { + return fmt.Errorf("server-side remove repo: create client: %w", err) + } + + _, err = repoClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{Repository: req.Repo}) + if err != nil { + return fmt.Errorf("server-side remove repo: remove: %w", err) + } + + return nil +} + func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.ServerInfo) (gitalypb.RepositoryServiceClient, error) { conn, err := ss.pool.Dial(ctx, server.Address, server.Token) if err != nil { diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go index 2acc547c52..a7957bcd88 100644 --- a/internal/backup/server_side_test.go +++ b/internal/backup/server_side_test.go @@ -295,3 +295,38 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) }) require.NoError(t, err) } + +func TestServerSideAdapter_RemoveRepository(t *testing.T) { + t.Parallel() + + db := testdb.New(t) + db.TruncateAll(t) + datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"default"}}) + + cfg := testcfg.Build(t) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + repo, repoPath := gittest.CreateRepository(t, ctx, cfg) + gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + adapter := backup.NewServerSideAdapter(pool) + err := adapter.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + Repo: repo, + }) + require.NoError(t, err) + require.NoDirExists(t, repoPath) + + // With an invalid repository + err = adapter.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + Repo: &gitalypb.Repository{StorageName: "default", RelativePath: "nonexistent"}, + }) + + require.EqualError(t, err, "server-side remove repo: remove: rpc error: code = NotFound desc = repository does not exist") +} -- GitLab From 0da5fc413c24c23c79cf47c40bb930b4082f5f90 Mon Sep 17 00:00:00 2001 From: James Liu Date: Wed, 13 Dec 2023 17:03:58 +1100 Subject: [PATCH 3/8] backup: Add ListRepositories to the strategy Adds a new method to the Strategy interface used by regular and server-side backups for performing repository backups and restores. This new method calls the internal WalkRepos() RPC to fetch a list of repos in a given storage. --- internal/backup/backup.go | 40 +++++++++++++ internal/backup/backup_test.go | 89 +++++++++++++++++++++++++++++ internal/backup/pipeline.go | 7 +++ internal/backup/pipeline_test.go | 8 +++ internal/backup/server_side.go | 42 ++++++++++++++ internal/backup/server_side_test.go | 87 ++++++++++++++++++++++++++++ 6 files changed, 273 insertions(+) diff --git a/internal/backup/backup.go b/internal/backup/backup.go index bb6bad6a45..afdf63d485 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -249,6 +249,37 @@ func (mgr *Manager) RemoveRepository(ctx context.Context, req *RemoveRepositoryR return nil } +// ListRepositories returns a list of repositories found in the given storage. +func (mgr *Manager) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) (repos []*gitalypb.Repository, err error) { + if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { + return nil, fmt.Errorf("list repos: set context: %w", err) + } + + internalClient, err := mgr.newInternalClient(ctx, req.Server) + if err != nil { + return nil, fmt.Errorf("list repos: create client: %w", err) + } + + stream, err := internalClient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: req.StorageName}) + if err != nil { + return nil, fmt.Errorf("list repos: walk: %w", err) + } + + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, fmt.Errorf("list repos: receiving messages: %w", err) + } + + repos = append(repos, &gitalypb.Repository{RelativePath: resp.RelativePath, StorageName: req.StorageName}) + } + + return repos, nil +} + // Create creates a repository backup. func (mgr *Manager) Create(ctx context.Context, req *CreateRequest) error { if req.VanityRepository == nil { @@ -592,3 +623,12 @@ func (mgr *Manager) newRepoClient(ctx context.Context, server storage.ServerInfo return gitalypb.NewRepositoryServiceClient(conn), nil } + +func (mgr *Manager) newInternalClient(ctx context.Context, server storage.ServerInfo) (gitalypb.InternalGitalyClient, error) { + conn, err := mgr.conns.Dial(ctx, server.Address, server.Token) + if err != nil { + return nil, err + } + + return gitalypb.NewInternalGitalyClient(conn), nil +} diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index ca92ad5467..fa764cc527 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -108,6 +108,95 @@ func TestManager_RemoveRepository(t *testing.T) { require.EqualError(t, err, "remove repo: remove: rpc error: code = InvalidArgument desc = storage name not found") } +func TestManager_ListRepositories(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + repos map[string][]*gitalypb.Repository + }{ + { + desc: "no repos", + repos: make(map[string][]*gitalypb.Repository), + }, + { + desc: "repos in a single storage", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + }, + }, + { + desc: "repos in multiple storages", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + "storage-2": { + {RelativePath: "c", StorageName: "storage-2"}, + {RelativePath: "d", StorageName: "storage-2"}, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if testhelper.IsPraefectEnabled() { + t.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect") + } + + var storages []string + for storageName := range tc.repos { + storages = append(storages, storageName) + } + + // We don't really need a "default" storage, but this makes initialisation cleaner since + // WithStorages() takes at least one argument. + cfg := testcfg.Build(t, testcfg.WithStorages("default", storages...)) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + for storageName, repos := range tc.repos { + for _, repo := range repos { + storagePath, ok := cfg.StoragePath(storageName) + require.True(t, ok) + + _, _ = gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + RelativePath: repo.RelativePath, + Storage: config.Storage{Name: storageName, Path: storagePath}, + }) + } + } + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + backupRoot := testhelper.TempDir(t) + sink := backup.NewFilesystemSink(backupRoot) + defer testhelper.MustClose(t, sink) + + locator, err := backup.ResolveLocator("pointer", sink) + require.NoError(t, err) + + fsBackup := backup.NewManager(sink, locator, pool) + + for storageName, repos := range tc.repos { + actualRepos, err := fsBackup.ListRepositories(ctx, &backup.ListRepositoriesRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + StorageName: storageName, + }) + + require.NoError(t, err) + require.EqualValues(t, repos, actualRepos) + } + }) + } +} + func TestManager_Create(t *testing.T) { t.Parallel() diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 1b969ccacd..72e5d9740e 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -16,6 +16,7 @@ import ( type Strategy interface { Create(context.Context, *CreateRequest) error Restore(context.Context, *RestoreRequest) error + ListRepositories(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) RemoveRepository(context.Context, *RemoveRepositoryRequest) error RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error } @@ -66,6 +67,12 @@ type RemoveAllRepositoriesRequest struct { StorageName string } +// ListRepositoriesRequest is the request to list repositories in a given storage. +type ListRepositoriesRequest struct { + Server storage.ServerInfo + StorageName string +} + // Command handles a specific backup operation type Command interface { Repository() *gitalypb.Repository diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index cd77fb5a79..22dd65a83f 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -126,6 +126,7 @@ type MockStrategy struct { RestoreFunc func(context.Context, *RestoreRequest) error RemoveAllRepositoriesFunc func(context.Context, *RemoveAllRepositoriesRequest) error RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error + ListRepositoriesFunc func(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) } func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error { @@ -156,6 +157,13 @@ func (s MockStrategy) RemoveRepository(ctx context.Context, req *RemoveRepositor return nil } +func (s MockStrategy) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) ([]*gitalypb.Repository, error) { + if s.ListRepositoriesFunc != nil { + return s.ListRepositoriesFunc(ctx, req) + } + return nil, nil +} + func testPipeline(t *testing.T, init func() *Pipeline) { strategy := MockStrategy{ CreateFunc: func(_ context.Context, req *CreateRequest) error { diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go index 17382e66e5..1b2319d73f 100644 --- a/internal/backup/server_side.go +++ b/internal/backup/server_side.go @@ -2,7 +2,9 @@ package backup import ( "context" + "errors" "fmt" + "io" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" @@ -131,6 +133,37 @@ func (ss ServerSideAdapter) RemoveRepository(ctx context.Context, req *RemoveRep return nil } +// ListRepositories returns a list of repositories found in the given storage. +func (ss ServerSideAdapter) ListRepositories(ctx context.Context, req *ListRepositoriesRequest) (repos []*gitalypb.Repository, err error) { + if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { + return nil, fmt.Errorf("server-side list repos: set context: %w", err) + } + + internalClient, err := ss.newInternalClient(ctx, req.Server) + if err != nil { + return nil, fmt.Errorf("server-side list repos: create client: %w", err) + } + + stream, err := internalClient.WalkRepos(ctx, &gitalypb.WalkReposRequest{StorageName: req.StorageName}) + if err != nil { + return nil, fmt.Errorf("server-side list repos: walk: %w", err) + } + + for { + resp, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + + repos = append(repos, &gitalypb.Repository{RelativePath: resp.RelativePath, StorageName: req.StorageName}) + } + + return repos, nil +} + func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.ServerInfo) (gitalypb.RepositoryServiceClient, error) { conn, err := ss.pool.Dial(ctx, server.Address, server.Token) if err != nil { @@ -139,3 +172,12 @@ func (ss ServerSideAdapter) newRepoClient(ctx context.Context, server storage.Se return gitalypb.NewRepositoryServiceClient(conn), nil } + +func (ss ServerSideAdapter) newInternalClient(ctx context.Context, server storage.ServerInfo) (gitalypb.InternalGitalyClient, error) { + conn, err := ss.pool.Dial(ctx, server.Address, server.Token) + if err != nil { + return nil, err + } + + return gitalypb.NewInternalGitalyClient(conn), nil +} diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go index a7957bcd88..e4f1383c36 100644 --- a/internal/backup/server_side_test.go +++ b/internal/backup/server_side_test.go @@ -13,9 +13,11 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/setup" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testserver" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) @@ -330,3 +332,88 @@ func TestServerSideAdapter_RemoveRepository(t *testing.T) { require.EqualError(t, err, "server-side remove repo: remove: rpc error: code = NotFound desc = repository does not exist") } + +func TestServerSideAdapter_ListRepositories(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + repos map[string][]*gitalypb.Repository + }{ + { + desc: "no repos", + repos: make(map[string][]*gitalypb.Repository), + }, + { + desc: "repos in a single storage", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + }, + }, + { + desc: "repos in multiple storages", + repos: map[string][]*gitalypb.Repository{ + "storage-1": { + {RelativePath: "a", StorageName: "storage-1"}, + {RelativePath: "b", StorageName: "storage-1"}, + }, + "storage-2": { + {RelativePath: "c", StorageName: "storage-2"}, + {RelativePath: "d", StorageName: "storage-2"}, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + var storages []string + for storageName := range tc.repos { + storages = append(storages, storageName) + } + + db := testdb.New(t) + db.TruncateAll(t) + rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": storages}) + + // We don't really need a "default" storage, but this makes initialisation cleaner since + // WithStorages() takes at least one argument. + cfg := testcfg.Build(t, testcfg.WithStorages("default", storages...)) + cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) + + ctx := testhelper.Context(t) + + repoID := 1 + for storageName, repos := range tc.repos { + for _, repo := range repos { + storagePath, ok := cfg.StoragePath(storageName) + require.True(t, ok) + + _, _ = gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + RelativePath: repo.RelativePath, + Storage: config.Storage{Name: storageName, Path: storagePath}, + }) + + require.NoError(t, rs.CreateRepository(ctx, int64(repoID), "virtual-storage", repo.RelativePath, repo.RelativePath, storageName, nil, nil, false, false)) + + repoID++ + } + } + + pool := client.NewPool() + defer testhelper.MustClose(t, pool) + + adapter := backup.NewServerSideAdapter(pool) + + for storageName, repos := range tc.repos { + actualRepos, err := adapter.ListRepositories(ctx, &backup.ListRepositoriesRequest{ + Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, + StorageName: storageName, + }) + require.NoError(t, err) + require.EqualValues(t, repos, actualRepos) + } + }) + } +} -- GitLab From 26c3ef8c515ac525130a9e7f37011cc3cd6e728c Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 14 Dec 2023 11:48:00 +1100 Subject: [PATCH 4/8] backup: Stop sending invalid requests Modifies the Restore CLI tests so we stop sending an invalid restore command as the final JSON object into stdin. This is required as a subsequent commit will move the repository removal logic to execute after the Pipeline completes successfully. If the tests purposely cause the pipeline to fail, the restore logic will never execute. Coverage of the Pipeline's error messaging is covered separately in the Pipeline's unit tests. --- internal/cli/gitalybackup/restore_test.go | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/internal/cli/gitalybackup/restore_test.go b/internal/cli/gitalybackup/restore_test.go index 4eae52e4c8..6d235525c4 100644 --- a/internal/cli/gitalybackup/restore_test.go +++ b/internal/cli/gitalybackup/restore_test.go @@ -75,12 +75,6 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) })) } - require.NoError(t, encoder.Encode(map[string]string{ - "address": "invalid", - "token": "invalid", - "relative_path": "invalid", - })) - ctx = testhelper.MergeIncomingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) args := []string{ @@ -103,9 +97,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.DirExists(t, existRepoPath) - require.EqualError(t, - cmd.RunContext(ctx, args), - "restore: pipeline: 1 failures encountered:\n - invalid: manager: could not dial source: invalid connection string: \"invalid\"\n") + require.NoError(t, cmd.RunContext(ctx, args)) require.NoDirExists(t, existRepoPath) @@ -179,12 +171,6 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) })) } - require.NoError(t, encoder.Encode(map[string]string{ - "address": "invalid", - "token": "invalid", - "relative_path": "invalid", - })) - ctx = testhelper.MergeIncomingMetadata(ctx, testcfg.GitalyServersMetadataFromCfg(t, cfg)) args := []string{ @@ -207,9 +193,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.DirExists(t, existRepoPath) - require.EqualError(t, - cmd.RunContext(ctx, args), - "restore: pipeline: 1 failures encountered:\n - invalid: server-side restore: could not dial source: invalid connection string: \"invalid\"\n") + require.NoError(t, cmd.RunContext(ctx, args)) require.NoDirExists(t, existRepoPath) -- GitLab From cdf8772c1429cd0dcbd968a12a17a756daa5f80e Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 14 Dec 2023 11:51:00 +1100 Subject: [PATCH 5/8] backup: Only remove "dangling" repositories Instead of invoking the RemoveAll() RPC prior to the restore, we instead compare the set of existing repositories to the set of repositories contained in the backup being restored. The difference between the two sets -- the set of "dangling" repositories -- are removed individually. This is done to ensure the state of repos in Gitaly matches the worldview held by the Rails DB after a GitLab instance restore. --- internal/backup/pipeline.go | 11 +++++---- internal/backup/pipeline_test.go | 16 ++++++++----- internal/cli/gitalybackup/restore.go | 35 +++++++++++++++++++++++----- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 72e5d9740e..a10962cad6 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -196,7 +196,7 @@ type Pipeline struct { pipelineError error cmdErrors *commandErrors - processedRepos map[string][]*gitalypb.Repository + processedRepos map[string]map[*gitalypb.Repository]struct{} processedReposMu sync.Mutex } @@ -212,7 +212,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), cmdErrors: &commandErrors{}, - processedRepos: make(map[string][]*gitalypb.Repository), + processedRepos: make(map[string]map[*gitalypb.Repository]struct{}), } for _, opt := range opts { @@ -270,7 +270,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } // Done waits for any in progress jobs to complete then reports any accumulated errors -func (p *Pipeline) Done() (processedRepos map[string][]*gitalypb.Repository, err error) { +func (p *Pipeline) Done() (processedRepos map[string]map[*gitalypb.Repository]struct{}, err error) { close(p.done) p.workerWg.Wait() @@ -345,7 +345,10 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { storageName := cmd.Repository().StorageName p.processedReposMu.Lock() - p.processedRepos[storageName] = append(p.processedRepos[storageName], cmd.Repository()) + if _, ok := p.processedRepos[storageName]; !ok { + p.processedRepos[storageName] = make(map[*gitalypb.Repository]struct{}) + } + p.processedRepos[storageName][cmd.Repository()] = struct{}{} p.processedReposMu.Unlock() log.Info(fmt.Sprintf("completed %s", cmd.Name())) diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 22dd65a83f..2dfae66d44 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -331,13 +331,17 @@ func TestPipelineError(t *testing.T) { func TestPipelineProcessedRepos(t *testing.T) { strategy := MockStrategy{} - repos := map[string][]*gitalypb.Repository{ + repos := map[string]map[*gitalypb.Repository]struct{}{ "storage1": { - {RelativePath: "a.git", StorageName: "storage1"}, - {RelativePath: "b.git", StorageName: "storage1"}, + &gitalypb.Repository{RelativePath: "a.git", StorageName: "storage1"}: struct{}{}, + &gitalypb.Repository{RelativePath: "b.git", StorageName: "storage1"}: struct{}{}, + }, + "storage2": { + &gitalypb.Repository{RelativePath: "c.git", StorageName: "storage2"}: struct{}{}, + }, + "storage3": { + &gitalypb.Repository{RelativePath: "d.git", StorageName: "storage3"}: struct{}{}, }, - "storage2": {{RelativePath: "c.git", StorageName: "storage2"}}, - "storage3": {{RelativePath: "d.git", StorageName: "storage3"}}, } p, err := NewPipeline(testhelper.SharedLogger(t)) @@ -345,7 +349,7 @@ func TestPipelineProcessedRepos(t *testing.T) { ctx := testhelper.Context(t) for _, v := range repos { - for _, repo := range v { + for repo := range v { p.Handle(ctx, NewRestoreCommand(strategy, RestoreRequest{Repository: repo})) } } diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index 7a304f2eda..defffa2231 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -135,15 +135,18 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin manager = backup.NewManager(sink, locator, pool) } + // Get the set of existing repositories keyed by storage. We'll later use this to determine any + // dangling repos that should be removed. + existingRepos := make(map[string][]*gitalypb.Repository) for _, storageName := range cmd.removeAllRepositories { - err := manager.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{ + repos, err := manager.ListRepositories(ctx, &backup.ListRepositoriesRequest{ StorageName: storageName, }) if err != nil { - // Treat RemoveAll failures as soft failures until we can determine - // how often it fails. - logger.WithError(err).WithField("storage_name", storageName).Warn("failed to remove all repositories") + logger.WithError(err).WithField("storage_name", storageName).Warn("failed to list repositories") } + + existingRepos[storageName] = repos } var opts []backup.PipelineOption @@ -157,7 +160,6 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin decoder := json.NewDecoder(stdin) for { - var req restoreRequest if err := decoder.Decode(&req); errors.Is(err, io.EOF) { break @@ -179,8 +181,29 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin })) } - if _, err := pipeline.Done(); err != nil { + restoredRepos, err := pipeline.Done() + if err != nil { return fmt.Errorf("restore: %w", err) } + + var removalErrors []error + for storageName, repos := range existingRepos { + for _, repo := range repos { + if dangling := restoredRepos[storageName][repo]; dangling == struct{}{} { + // If we have dangling repos (those which exist in the storage but + // weren't part of the restore), they need to be deleted so the + // state of repos in Gitaly matches that in the Rails DB. + if err := manager.RemoveRepository(ctx, &backup.RemoveRepositoryRequest{Repo: repo}); err != nil { + removalErrors = append(removalErrors, fmt.Errorf("storage_name %q relative_path %q: %w", storageName, repo.RelativePath, err)) + } + } + } + } + + if len(removalErrors) > 0 { + return fmt.Errorf("remove dangling repositories: %d failures encountered: %w", + len(removalErrors), errors.Join(removalErrors...)) + } + return nil } -- GitLab From 4ce717fc16eff4249791d758666f13beae6ac38e Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 14 Dec 2023 11:51:56 +1100 Subject: [PATCH 6/8] backup: Delete RemoveAllRepositories from Strategy This is now deprecated in favour of removing individual repos. --- internal/backup/backup.go | 19 -------------- internal/backup/backup_test.go | 37 --------------------------- internal/backup/pipeline.go | 8 ------ internal/backup/pipeline_test.go | 16 +++--------- internal/backup/server_side.go | 19 -------------- internal/backup/server_side_test.go | 39 ----------------------------- 6 files changed, 4 insertions(+), 134 deletions(-) diff --git a/internal/backup/backup.go b/internal/backup/backup.go index afdf63d485..a15f47abeb 100644 --- a/internal/backup/backup.go +++ b/internal/backup/backup.go @@ -211,25 +211,6 @@ func NewManagerLocal( } } -// RemoveAllRepositories removes all repositories in the specified storage name. -func (mgr *Manager) RemoveAllRepositories(ctx context.Context, req *RemoveAllRepositoriesRequest) error { - if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { - return fmt.Errorf("manager: %w", err) - } - - repoClient, err := mgr.newRepoClient(ctx, req.Server) - if err != nil { - return fmt.Errorf("manager: %w", err) - } - - _, err = repoClient.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: req.StorageName}) - if err != nil { - return fmt.Errorf("manager: %w", err) - } - - return nil -} - // RemoveRepository removes the specified repository from its storage. func (mgr *Manager) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil { diff --git a/internal/backup/backup_test.go b/internal/backup/backup_test.go index fa764cc527..81e87a33a7 100644 --- a/internal/backup/backup_test.go +++ b/internal/backup/backup_test.go @@ -28,43 +28,6 @@ import ( "google.golang.org/protobuf/proto" ) -func TestManager_RemoveAllRepositories(t *testing.T) { - testhelper.SkipWithWAL(t, ` -RemoveAll is removing the entire content of the storage. This would also remove the database's and -the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and -the database and only then perform the removal. - -Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) - - t.Parallel() - - cfg := testcfg.Build(t) - cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll) - - ctx := testhelper.Context(t) - - repo, repoPath := gittest.CreateRepository(t, ctx, cfg) - commitID := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) - gittest.WriteTag(t, cfg, repoPath, "v1.0.0", commitID.Revision()) - - pool := client.NewPool() - defer testhelper.MustClose(t, pool) - - backupRoot := testhelper.TempDir(t) - sink := backup.NewFilesystemSink(backupRoot) - defer testhelper.MustClose(t, sink) - - locator, err := backup.ResolveLocator("pointer", sink) - require.NoError(t, err) - - fsBackup := backup.NewManager(sink, locator, pool) - err = fsBackup.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{ - Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, - StorageName: repo.StorageName, - }) - require.NoError(t, err) -} - func TestManager_RemoveRepository(t *testing.T) { if testhelper.IsPraefectEnabled() { t.Skip("local backup manager expects to operate on the local filesystem so cannot operate through praefect") diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index a10962cad6..3eb91de919 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -18,7 +18,6 @@ type Strategy interface { Restore(context.Context, *RestoreRequest) error ListRepositories(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) RemoveRepository(context.Context, *RemoveRepositoryRequest) error - RemoveAllRepositories(context.Context, *RemoveAllRepositoriesRequest) error } // CreateRequest is the request to create a backup @@ -60,13 +59,6 @@ type RemoveRepositoryRequest struct { Repo *gitalypb.Repository } -// RemoveAllRepositoriesRequest is the request to remove all repositories in the specified -// storage name. -type RemoveAllRepositoriesRequest struct { - Server storage.ServerInfo - StorageName string -} - // ListRepositoriesRequest is the request to list repositories in a given storage. type ListRepositoriesRequest struct { Server storage.ServerInfo diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index 2dfae66d44..37f7c2e5b2 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -122,11 +122,10 @@ func TestPipeline(t *testing.T) { } type MockStrategy struct { - CreateFunc func(context.Context, *CreateRequest) error - RestoreFunc func(context.Context, *RestoreRequest) error - RemoveAllRepositoriesFunc func(context.Context, *RemoveAllRepositoriesRequest) error - RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error - ListRepositoriesFunc func(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) + CreateFunc func(context.Context, *CreateRequest) error + RestoreFunc func(context.Context, *RestoreRequest) error + RemoveRepositoryFunc func(context.Context, *RemoveRepositoryRequest) error + ListRepositoriesFunc func(context.Context, *ListRepositoriesRequest) ([]*gitalypb.Repository, error) } func (s MockStrategy) Create(ctx context.Context, req *CreateRequest) error { @@ -143,13 +142,6 @@ func (s MockStrategy) Restore(ctx context.Context, req *RestoreRequest) error { return nil } -func (s MockStrategy) RemoveAllRepositories(ctx context.Context, req *RemoveAllRepositoriesRequest) error { - if s.RemoveAllRepositoriesFunc != nil { - return s.RemoveAllRepositoriesFunc(ctx, req) - } - return nil -} - func (s MockStrategy) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { if s.RemoveRepositoryFunc != nil { return s.RemoveRepositoryFunc(ctx, req) diff --git a/internal/backup/server_side.go b/internal/backup/server_side.go index 1b2319d73f..a1a9a37eb4 100644 --- a/internal/backup/server_side.go +++ b/internal/backup/server_side.go @@ -95,25 +95,6 @@ func (ss ServerSideAdapter) Restore(ctx context.Context, req *RestoreRequest) er return nil } -// RemoveAllRepositories removes all repositories in the specified storage name. -func (ss ServerSideAdapter) RemoveAllRepositories(ctx context.Context, req *RemoveAllRepositoriesRequest) error { - if err := setContextServerInfo(ctx, &req.Server, req.StorageName); err != nil { - return fmt.Errorf("server-side remove all: %w", err) - } - - repoClient, err := ss.newRepoClient(ctx, req.Server) - if err != nil { - return fmt.Errorf("server-side remove all: %w", err) - } - - _, err = repoClient.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: req.StorageName}) - if err != nil { - return fmt.Errorf("server-side remove all: %w", err) - } - - return nil -} - // RemoveRepository removes the specified repository from its storage. func (ss ServerSideAdapter) RemoveRepository(ctx context.Context, req *RemoveRepositoryRequest) error { if err := setContextServerInfo(ctx, &req.Server, req.Repo.StorageName); err != nil { diff --git a/internal/backup/server_side_test.go b/internal/backup/server_side_test.go index e4f1383c36..69669df609 100644 --- a/internal/backup/server_side_test.go +++ b/internal/backup/server_side_test.go @@ -259,45 +259,6 @@ func TestServerSideAdapter_Restore(t *testing.T) { } } -func TestServerSideAdapter_RemoveAllRepositories(t *testing.T) { - testhelper.SkipWithWAL(t, ` -RemoveAll is removing the entire content of the storage. This would also remove the database's and -the transaction manager's disk state. The RPC needs to be updated to shut down all partitions and -the database and only then perform the removal. - -Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) - - t.Parallel() - - backupRoot := testhelper.TempDir(t) - sink := backup.NewFilesystemSink(backupRoot) - defer testhelper.MustClose(t, sink) - - locator, err := backup.ResolveLocator("pointer", sink) - require.NoError(t, err) - - cfg := testcfg.Build(t) - cfg.SocketPath = testserver.RunGitalyServer(t, cfg, setup.RegisterAll, - testserver.WithBackupSink(sink), - testserver.WithBackupLocator(locator), - ) - - ctx := testhelper.Context(t) - - repo, repoPath := gittest.CreateRepository(t, ctx, cfg) - gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("main")) - - pool := client.NewPool() - defer testhelper.MustClose(t, pool) - - adapter := backup.NewServerSideAdapter(pool) - err = adapter.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{ - Server: storage.ServerInfo{Address: cfg.SocketPath, Token: cfg.Auth.Token}, - StorageName: repo.StorageName, - }) - require.NoError(t, err) -} - func TestServerSideAdapter_RemoveRepository(t *testing.T) { t.Parallel() -- GitLab From bead28d4dd8c6fa0166d4490fe479dca78c51bc7 Mon Sep 17 00:00:00 2001 From: James Liu Date: Thu, 14 Dec 2023 11:55:36 +1100 Subject: [PATCH 7/8] proto: Deprecate RemoveAll Now that we've adjusted the restore mechanism to delete individual repos as needed, this RPC is no longer required. See the following issues for more context: - https://gitlab.com/gitlab-org/gitaly/-/issues/5357 - https://gitlab.com/gitlab-org/gitaly/-/issues/5269 Changelog: deprecated --- .../service/repository/remove_all_test.go | 1 + internal/praefect/remove_all.go | 1 + internal/praefect/remove_all_test.go | 1 + proto/go/gitalypb/repository.pb.go | 52 +++++++++---------- proto/go/gitalypb/repository_grpc.pb.go | 5 ++ proto/repository.proto | 2 + 6 files changed, 36 insertions(+), 26 deletions(-) diff --git a/internal/gitaly/service/repository/remove_all_test.go b/internal/gitaly/service/repository/remove_all_test.go index 785f54b017..fd913523af 100644 --- a/internal/gitaly/service/repository/remove_all_test.go +++ b/internal/gitaly/service/repository/remove_all_test.go @@ -28,6 +28,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) require.DirExists(t, repo1Path) require.DirExists(t, repo1Path) + //nolint:staticcheck _, err := client.RemoveAll(ctx, &gitalypb.RemoveAllRequest{ StorageName: cfg.Storages[0].Name, }) diff --git a/internal/praefect/remove_all.go b/internal/praefect/remove_all.go index 9fa08206f1..9c383cd0ea 100644 --- a/internal/praefect/remove_all.go +++ b/internal/praefect/remove_all.go @@ -32,6 +32,7 @@ func RemoveAllHandler(rs datastore.RepositoryStore, conns Connections) grpc.Stre conn := conn group.Go(func() error { + //nolint:staticcheck _, err := gitalypb.NewRepositoryServiceClient(conn).RemoveAll(ctx, &gitalypb.RemoveAllRequest{ StorageName: rewrittenStorage, }) diff --git a/internal/praefect/remove_all_test.go b/internal/praefect/remove_all_test.go index 965ffb95bb..6b5e6f848b 100644 --- a/internal/praefect/remove_all_test.go +++ b/internal/praefect/remove_all_test.go @@ -96,6 +96,7 @@ Issue: https://gitlab.com/gitlab-org/gitaly/-/issues/5269`) _, err = client.RepositorySize(ctx, &gitalypb.RepositorySizeRequest{Repository: &gitalypb.Repository{}}) testhelper.RequireGrpcError(t, errServedByGitaly, err) + //nolint:staticcheck resp, err := client.RemoveAll(ctx, &gitalypb.RemoveAllRequest{StorageName: virtualStorage}) require.NoError(t, err) diff --git a/proto/go/gitalypb/repository.pb.go b/proto/go/gitalypb/repository.pb.go index ca005e2b55..bdfd812edc 100644 --- a/proto/go/gitalypb/repository.pb.go +++ b/proto/go/gitalypb/repository.pb.go @@ -6163,7 +6163,7 @@ var file_repository_proto_rawDesc = []byte{ 0x1c, 0x0a, 0x09, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x32, 0xaf, 0x20, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, + 0x6c, 0x75, 0x65, 0x32, 0xb2, 0x20, 0x0a, 0x11, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5d, 0x0a, 0x10, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x78, 0x69, 0x73, 0x74, 0x73, 0x12, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, @@ -6399,34 +6399,34 @@ var file_repository_proto_rawDesc = []byte{ 0x79, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x09, 0xfa, 0x97, 0x28, - 0x02, 0x08, 0x02, 0x88, 0x02, 0x01, 0x12, 0x4a, 0x0a, 0x09, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, + 0x02, 0x08, 0x02, 0x88, 0x02, 0x01, 0x12, 0x4d, 0x0a, 0x09, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x41, 0x6c, 0x6c, 0x12, 0x18, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x41, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x41, 0x6c, 0x6c, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x08, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, - 0x10, 0x02, 0x12, 0x5d, 0x0a, 0x10, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, - 0x2e, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, - 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, - 0x02, 0x12, 0x60, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, - 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, - 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, - 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, - 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, 0x28, - 0x02, 0x08, 0x01, 0x12, 0x60, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, - 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, 0x6c, - 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, - 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x67, 0x69, 0x74, - 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, 0x74, 0x72, 0x69, - 0x62, 0x75, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, - 0x97, 0x28, 0x02, 0x08, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, 0x67, 0x2f, 0x67, - 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, - 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x0b, 0xfa, 0x97, 0x28, 0x04, 0x08, 0x01, + 0x10, 0x02, 0x88, 0x02, 0x01, 0x12, 0x5d, 0x0a, 0x10, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, + 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x1f, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, + 0x6f, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x67, 0x69, 0x74, + 0x61, 0x6c, 0x79, 0x2e, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, 0xfa, 0x97, + 0x28, 0x02, 0x08, 0x02, 0x12, 0x60, 0x0a, 0x11, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, + 0x65, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x20, 0x2e, 0x67, 0x69, 0x74, 0x61, + 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, 0x73, 0x69, + 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x52, 0x65, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x70, 0x6f, + 0x73, 0x69, 0x74, 0x6f, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x06, + 0xfa, 0x97, 0x28, 0x02, 0x08, 0x01, 0x12, 0x60, 0x0a, 0x11, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, + 0x65, 0x41, 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x20, 0x2e, 0x67, 0x69, + 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, 0x74, 0x72, + 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, + 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x41, 0x74, + 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x06, 0xfa, 0x97, 0x28, 0x02, 0x08, 0x02, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, 0x6c, + 0x61, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x69, 0x74, 0x6c, 0x61, 0x62, 0x2d, 0x6f, 0x72, + 0x67, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x2f, 0x76, 0x31, 0x36, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x69, 0x74, 0x61, 0x6c, 0x79, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/proto/go/gitalypb/repository_grpc.pb.go b/proto/go/gitalypb/repository_grpc.pb.go index d5af55b940..b407b948cb 100644 --- a/proto/go/gitalypb/repository_grpc.pb.go +++ b/proto/go/gitalypb/repository_grpc.pb.go @@ -192,7 +192,9 @@ type RepositoryServiceClient interface { // FullPath reads the "gitlab.fullpath" configuration from the repository's // gitconfig. Returns an error in case the full path has not been configured. FullPath(ctx context.Context, in *FullPathRequest, opts ...grpc.CallOption) (*FullPathResponse, error) + // Deprecated: Do not use. // RemoveAll deletes all repositories on a specified storage. + // Deprecated in favour of individually removing repositories with RemoveRepository. RemoveAll(ctx context.Context, in *RemoveAllRequest, opts ...grpc.CallOption) (*RemoveAllResponse, error) // BackupRepository creates a full or incremental backup streamed directly to // object-storage. The backup is created synchronously. The destination must @@ -957,6 +959,7 @@ func (c *repositoryServiceClient) FullPath(ctx context.Context, in *FullPathRequ return out, nil } +// Deprecated: Do not use. func (c *repositoryServiceClient) RemoveAll(ctx context.Context, in *RemoveAllRequest, opts ...grpc.CallOption) (*RemoveAllResponse, error) { out := new(RemoveAllResponse) err := c.cc.Invoke(ctx, "/gitaly.RepositoryService/RemoveAll", in, out, opts...) @@ -1167,7 +1170,9 @@ type RepositoryServiceServer interface { // FullPath reads the "gitlab.fullpath" configuration from the repository's // gitconfig. Returns an error in case the full path has not been configured. FullPath(context.Context, *FullPathRequest) (*FullPathResponse, error) + // Deprecated: Do not use. // RemoveAll deletes all repositories on a specified storage. + // Deprecated in favour of individually removing repositories with RemoveRepository. RemoveAll(context.Context, *RemoveAllRequest) (*RemoveAllResponse, error) // BackupRepository creates a full or incremental backup streamed directly to // object-storage. The backup is created synchronously. The destination must diff --git a/proto/repository.proto b/proto/repository.proto index 25d446f619..4a5d8dbf8f 100644 --- a/proto/repository.proto +++ b/proto/repository.proto @@ -381,11 +381,13 @@ service RepositoryService { } // RemoveAll deletes all repositories on a specified storage. + // Deprecated in favour of individually removing repositories with RemoveRepository. rpc RemoveAll(RemoveAllRequest) returns (RemoveAllResponse) { option (op_type) = { op: MUTATOR scope_level: STORAGE }; + option deprecated = true; } // BackupRepository creates a full or incremental backup streamed directly to -- GitLab From 8d7f29eb815ac443fbb84898975976ff84877cf4 Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 18 Dec 2023 17:49:48 +1100 Subject: [PATCH 8/8] praefect: Intercept WalkRepos RPC Adds a handler to Praefect to intercept calls to the WalkRepos RPC. The handler provides an alternate implementation of listing repositories in a storage, which queries the Praefect DB rather than walking the filesystem on disk. This is required so when the RPC is invoked via Praefect, the DB is used as the source of truth rather than a random Gitaly node. The only user-facing difference between this and the original implementation is that the `modification_time` attribute of the response message is left empty, as this cannot be determined via the DB. --- .../praefect/datastore/repository_store.go | 27 ++++++ internal/praefect/server.go | 3 + internal/praefect/walkrepos.go | 47 ++++++++++ internal/praefect/walkrepos_test.go | 88 +++++++++++++++++++ 4 files changed, 165 insertions(+) create mode 100644 internal/praefect/walkrepos.go create mode 100644 internal/praefect/walkrepos_test.go diff --git a/internal/praefect/datastore/repository_store.go b/internal/praefect/datastore/repository_store.go index 77dce6ba06..4925fcb50f 100644 --- a/internal/praefect/datastore/repository_store.go +++ b/internal/praefect/datastore/repository_store.go @@ -111,6 +111,8 @@ type RepositoryStore interface { MarkVirtualStorageUnverified(ctx context.Context, virtualStorage string) (int64, error) // MarkStorageUnverified marsk all replicas on the storage as unverified. MarkStorageUnverified(ctx context.Context, virtualStorage, storage string) (int64, error) + // ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage. + ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) } // PostgresRepositoryStore is a Postgres implementation of RepositoryStore. @@ -916,3 +918,28 @@ func (rs *PostgresRepositoryStore) GetReplicaPath(ctx context.Context, repositor return replicaPath, nil } + +// ListRepositoryPaths retrieves the relative path for all repositories present on the given virtual storage. +func (rs *PostgresRepositoryStore) ListRepositoryPaths(ctx context.Context, virtualStorage string) ([]string, error) { + rows, err := rs.db.QueryContext(ctx, ` +SELECT relative_path +FROM repositories +WHERE virtual_storage = $1 +`, virtualStorage) + if err != nil { + return nil, fmt.Errorf("query: %w", err) + } + defer rows.Close() + + var relativePaths []string + for rows.Next() { + var relativePath string + if err := rows.Scan(&relativePath); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + + relativePaths = append(relativePaths, relativePath) + } + + return relativePaths, rows.Err() +} diff --git a/internal/praefect/server.go b/internal/praefect/server.go index 54cbee079a..c52b160f5b 100644 --- a/internal/praefect/server.go +++ b/internal/praefect/server.go @@ -195,6 +195,9 @@ func NewGRPCServer( "DeleteObjectPool": DeleteObjectPoolHandler(deps.RepositoryStore, deps.Logger, deps.Conns), "GetObjectPool": GetObjectPoolHandler(deps.RepositoryStore, deps.Router), }) + proxy.RegisterStreamHandlers(srv, "gitaly.InternalGitaly", map[string]grpc.StreamHandler{ + "WalkRepos": WalkReposHandler(deps.RepositoryStore), + }) } return srv diff --git a/internal/praefect/walkrepos.go b/internal/praefect/walkrepos.go new file mode 100644 index 0000000000..1f321a4c7d --- /dev/null +++ b/internal/praefect/walkrepos.go @@ -0,0 +1,47 @@ +package praefect + +import ( + "fmt" + + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" +) + +// WalkReposHandler implements an interceptor for the WalkRepos RPC, invoked when calling +// through Praefect. Instead of walking the storage directory in the filesystem, this Praefect +// implementation queries the database for all known repositories in the given virtual storage. +// As a consequence, the modification_time parameter can't be populated in the response. +func WalkReposHandler(rs datastore.RepositoryStore) grpc.StreamHandler { + return func(srv interface{}, stream grpc.ServerStream) error { + sendRepo := func(relPath string) error { + return stream.SendMsg(&gitalypb.WalkReposResponse{ + RelativePath: relPath, + }) + } + + var req gitalypb.WalkReposRequest + if err := stream.RecvMsg(&req); err != nil { + return fmt.Errorf("receive request: %w", err) + } + + if req.StorageName == "" { + return structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet) + } + + repos, err := rs.ListRepositoryPaths(stream.Context(), req.StorageName) + if err != nil { + return structerr.NewInternal("list repository paths: %w", err) + } + + for _, repo := range repos { + if err := sendRepo(repo); err != nil { + return structerr.NewInternal("send repository path: %w", err) + } + } + + return nil + } +} diff --git a/internal/praefect/walkrepos_test.go b/internal/praefect/walkrepos_test.go new file mode 100644 index 0000000000..63301caf58 --- /dev/null +++ b/internal/praefect/walkrepos_test.go @@ -0,0 +1,88 @@ +package praefect + +import ( + "net" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testdb" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestWalkReposHandler(t *testing.T) { + t.Parallel() + + db := testdb.New(t) + for _, tc := range []struct { + desc string + request *gitalypb.WalkReposRequest + responses []*gitalypb.WalkReposResponse + expectedErr error + }{ + { + desc: "missing storage name", + request: &gitalypb.WalkReposRequest{}, + expectedErr: structerr.NewInvalidArgument("%w", storage.ErrStorageNotSet), + }, + { + desc: "repositories found", + request: &gitalypb.WalkReposRequest{StorageName: "virtual-storage"}, + responses: []*gitalypb.WalkReposResponse{ + {RelativePath: "relative-path"}, + {RelativePath: "relative-path-2"}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + db.TruncateAll(t) + rs := datastore.NewPostgresRepositoryStore(db, map[string][]string{"virtual-storage": {"storage"}}) + ctx := testhelper.Context(t) + + require.NoError(t, rs.CreateRepository(ctx, 0, "virtual-storage", "relative-path", "relative-path", "storage", nil, nil, false, false)) + require.NoError(t, rs.CreateRepository(ctx, 1, "virtual-storage", "relative-path-2", "relative-path-2", "storage", nil, nil, false, false)) + + tmp := testhelper.TempDir(t) + + ln, err := net.Listen("unix", filepath.Join(tmp, "praefect")) + require.NoError(t, err) + + srv := NewGRPCServer(&Dependencies{ + Config: config.Config{Failover: config.Failover{ElectionStrategy: config.ElectionStrategyPerRepository}}, + Logger: testhelper.SharedLogger(t), + RepositoryStore: rs, + Registry: protoregistry.GitalyProtoPreregistered, + }, nil) + defer srv.Stop() + + go testhelper.MustServe(t, srv, ln) + + clientConn, err := grpc.DialContext(ctx, "unix://"+ln.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer testhelper.MustClose(t, clientConn) + + client := gitalypb.NewInternalGitalyClient(clientConn) + + stream, err := client.WalkRepos(ctx, tc.request) + if tc.expectedErr != nil { + // Consume the first message and test for errors only if we're expecting an error. + _, err = stream.Recv() + testhelper.RequireGrpcError(t, tc.expectedErr, err) + return + } + require.NoError(t, err) + + actualRepos, err := testhelper.Receive(stream.Recv) + require.NoError(t, err) + testhelper.ProtoEqual(t, tc.responses, actualRepos) + }) + } +} -- GitLab