From de8aea4c8d10e58d0f418611373fdce838ce5ba9 Mon Sep 17 00:00:00 2001 From: James Liu Date: Fri, 8 Dec 2023 11:25:22 +1100 Subject: [PATCH 1/8] backup: Embed serverRepository in restore struct Refactors the restoreRequest struct to embed the existing serverRepository struct instead of redefining the same fields. This is done as backups and restore jobs share the same inputs except for the AlwaysCreate option. Additionally, this is similar to how the structure is defined in Rails, see: https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/backup/gitaly_backup.rb --- internal/cli/gitalybackup/restore.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index 06a20bb6ed..64f9bf4368 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -17,11 +17,8 @@ import ( ) type restoreRequest struct { - storage.ServerInfo - StorageName string `json:"storage_name"` - RelativePath string `json:"relative_path"` - GlProjectPath string `json:"gl_project_path"` - AlwaysCreate bool `json:"always_create"` + serverRepository + AlwaysCreate bool `json:"always_create"` } type restoreSubcommand struct { -- GitLab From fd9753bdaac3f788e0e8f77283bf3c3258b2e10f Mon Sep 17 00:00:00 2001 From: James Liu Date: Fri, 8 Dec 2023 12:06:15 +1100 Subject: [PATCH 2/8] backup: Consolidate Logging and Parallel pipelines Merges the two existing pipeline implementations (LoggingPipeline and ParallelPipeline) into ParallelPipeline for simplicity. A ParallelPipeline that's concurrency-limited to a single worker basically performs the same duty as the previous LoggingPipeline. A future commit will rename ParallelPipeline to Pipeline. --- internal/backup/pipeline.go | 96 +++++++++++----------------- internal/backup/pipeline_test.go | 10 ++- internal/cli/gitalybackup/create.go | 13 ++-- internal/cli/gitalybackup/restore.go | 13 ++-- 4 files changed, 59 insertions(+), 73 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 8e4e15784d..470b4a402b 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -153,55 +153,14 @@ func (e PipelineErrors) Error() string { return builder.String() } -// LoggingPipeline outputs logging for each command executed -type LoggingPipeline struct { - log log.Logger - - mu sync.Mutex - errs PipelineErrors -} - -// NewLoggingPipeline creates a new logging pipeline -func NewLoggingPipeline(log log.Logger) *LoggingPipeline { - return &LoggingPipeline{ - log: log, - } -} - -// Handle takes a command to process. Commands are logged and executed immediately. -func (p *LoggingPipeline) Handle(ctx context.Context, cmd Command) { - log := p.cmdLogger(cmd) - log.Info(fmt.Sprintf("started %s", cmd.Name())) - - if err := cmd.Execute(ctx); err != nil { - if errors.Is(err, ErrSkipped) { - log.Warn(fmt.Sprintf("skipped %s", cmd.Name())) - } else { - log.WithError(err).Error(fmt.Sprintf("%s failed", cmd.Name())) - p.addError(cmd.Repository(), err) - } - return - } - - log.Info(fmt.Sprintf("completed %s", cmd.Name())) -} - -func (p *LoggingPipeline) addError(repo *gitalypb.Repository, err error) { - p.mu.Lock() - defer p.mu.Unlock() +func (p *ParallelPipeline) addError(repo *gitalypb.Repository, err error) { + p.errMu.Lock() + defer p.errMu.Unlock() p.errs.AddError(repo, err) } -// Done indicates that the pipeline is complete and returns any accumulated errors -func (p *LoggingPipeline) Done() error { - if len(p.errs) > 0 { - return fmt.Errorf("pipeline: %w", p.errs) - } - return nil -} - -func (p *LoggingPipeline) cmdLogger(cmd Command) log.Logger { +func (p *ParallelPipeline) cmdLogger(cmd Command) log.Logger { return p.log.WithFields(log.Fields{ "command": cmd.Name(), "storage_name": cmd.Repository().StorageName, @@ -217,7 +176,9 @@ type contextCommand struct { // ParallelPipeline is a pipeline that executes commands in parallel type ParallelPipeline struct { - next Pipeline + log log.Logger + errs PipelineErrors + parallel int parallelStorage int @@ -225,9 +186,10 @@ type ParallelPipeline struct { workerSlots chan struct{} done chan struct{} - mu sync.Mutex - requests map[string]chan *contextCommand - err error + errMu sync.Mutex + storageMu sync.Mutex + requests map[string]chan *contextCommand + err error } // NewParallelPipeline creates a new ParallelPipeline where all commands are @@ -239,7 +201,7 @@ type ParallelPipeline struct { // // Note: When both `parallel` and `parallelStorage` are zero or less no workers // are created and the pipeline will block forever. -func NewParallelPipeline(next Pipeline, parallel, parallelStorage int) *ParallelPipeline { +func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *ParallelPipeline { var workerSlots chan struct{} if parallel > 0 && parallelStorage > 0 { // workerSlots allows the total number of parallel jobs to be @@ -248,7 +210,7 @@ func NewParallelPipeline(next Pipeline, parallel, parallelStorage int) *Parallel workerSlots = make(chan struct{}, parallel) } return &ParallelPipeline{ - next: next, + log: log, parallel: parallel, parallelStorage: parallelStorage, workerSlots: workerSlots, @@ -277,20 +239,23 @@ func (p *ParallelPipeline) Handle(ctx context.Context, cmd Command) { func (p *ParallelPipeline) Done() error { close(p.done) p.wg.Wait() - if err := p.next.Done(); err != nil { - return err - } + if p.err != nil { return fmt.Errorf("pipeline: %w", p.err) } + + if len(p.errs) > 0 { + return fmt.Errorf("pipeline: %w", p.errs) + } + return nil } // getStorage finds the channel associated with a storage. When no channel is // found, one is created and n-workers are started to process requests. func (p *ParallelPipeline) getStorage(storage string) chan<- *contextCommand { - p.mu.Lock() - defer p.mu.Unlock() + p.storageMu.Lock() + defer p.storageMu.Unlock() workers := p.parallelStorage @@ -329,12 +294,25 @@ func (p *ParallelPipeline) processCommand(ctx context.Context, cmd Command) { p.acquireWorkerSlot() defer p.releaseWorkerSlot() - p.next.Handle(ctx, cmd) + log := p.cmdLogger(cmd) + log.Info(fmt.Sprintf("started %s", cmd.Name())) + + if err := cmd.Execute(ctx); err != nil { + if errors.Is(err, ErrSkipped) { + log.Warn(fmt.Sprintf("skipped %s", cmd.Name())) + } else { + log.WithError(err).Error(fmt.Sprintf("%s failed", cmd.Name())) + p.addError(cmd.Repository(), err) + } + return + } + + log.Info(fmt.Sprintf("completed %s", cmd.Name())) } func (p *ParallelPipeline) setErr(err error) { - p.mu.Lock() - defer p.mu.Unlock() + p.errMu.Lock() + defer p.errMu.Unlock() if p.err != nil { return } diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index fe3c82616c..e9eda038be 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -19,7 +19,7 @@ func TestLoggingPipeline(t *testing.T) { t.Parallel() testPipeline(t, func() Pipeline { - return NewLoggingPipeline(testhelper.SharedLogger(t)) + return NewParallelPipeline(testhelper.SharedLogger(t), 1, 1) }) } @@ -27,7 +27,7 @@ func TestParallelPipeline(t *testing.T) { t.Parallel() testPipeline(t, func() Pipeline { - return NewParallelPipeline(NewLoggingPipeline(testhelper.SharedLogger(t)), 2, 0) + return NewParallelPipeline(testhelper.SharedLogger(t), 2, 0) }) t.Run("parallelism", func(t *testing.T) { @@ -66,8 +66,7 @@ func TestParallelPipeline(t *testing.T) { }, } var p Pipeline - p = NewLoggingPipeline(testhelper.SharedLogger(t)) - p = NewParallelPipeline(p, tc.parallel, tc.parallelStorage) + p = NewParallelPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage) ctx := testhelper.Context(t) for i := 0; i < 10; i++ { @@ -82,8 +81,7 @@ func TestParallelPipeline(t *testing.T) { t.Run("context done", func(t *testing.T) { var strategy MockStrategy var p Pipeline - p = NewLoggingPipeline(testhelper.SharedLogger(t)) - p = NewParallelPipeline(p, 0, 0) // make sure worker channels always block + p = NewParallelPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block ctx, cancel := context.WithCancel(testhelper.Context(t)) diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go index a67cc75da1..34ccec989b 100644 --- a/internal/cli/gitalybackup/create.go +++ b/internal/cli/gitalybackup/create.go @@ -142,11 +142,16 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i manager = backup.NewManager(sink, locator, pool) } - var pipeline backup.Pipeline - pipeline = backup.NewLoggingPipeline(logger) - if cmd.parallel > 0 || cmd.parallelStorage > 0 { - pipeline = backup.NewParallelPipeline(pipeline, cmd.parallel, cmd.parallelStorage) + // Defaults to no concurrency. + parallel := 1 + if cmd.parallel > 0 { + parallel = cmd.parallel } + parallelStorage := 1 + if cmd.parallelStorage > 0 { + parallelStorage = cmd.parallelStorage + } + pipeline := backup.NewParallelPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index 64f9bf4368..b195c14360 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -146,11 +146,16 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin } } - var pipeline backup.Pipeline - pipeline = backup.NewLoggingPipeline(logger) - if cmd.parallel > 0 || cmd.parallelStorage > 0 { - pipeline = backup.NewParallelPipeline(pipeline, cmd.parallel, cmd.parallelStorage) + // Defaults to no concurrency. + parallel := 1 + if cmd.parallel > 0 { + parallel = cmd.parallel } + parallelStorage := 1 + if cmd.parallelStorage > 0 { + parallelStorage = cmd.parallelStorage + } + pipeline := backup.NewParallelPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { -- GitLab From 3c1d745a3216187bdf6a30ae52639b4488ae3dbb Mon Sep 17 00:00:00 2001 From: James Liu Date: Fri, 8 Dec 2023 12:16:14 +1100 Subject: [PATCH 3/8] backup: Rename ParallelPipeline to Pipeline Now that we have a single Pipeline implementation, we can drop the Parallel prefix from the Pipeline name. This also removes the need for the separate Pipeline interface, which can now be deleted too. --- internal/backup/pipeline.go | 65 +++++++++++++--------------- internal/backup/pipeline_test.go | 16 +++---- internal/cli/gitalybackup/create.go | 2 +- internal/cli/gitalybackup/restore.go | 2 +- 4 files changed, 38 insertions(+), 47 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 470b4a402b..a734a13822 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -66,13 +66,6 @@ type Command interface { Execute(context.Context) error } -// Pipeline executes a series of commands and encapsulates error handling for -// the caller. -type Pipeline interface { - Handle(context.Context, Command) - Done() error -} - // CreateCommand creates a backup for a repository type CreateCommand struct { strategy Strategy @@ -153,29 +146,13 @@ func (e PipelineErrors) Error() string { return builder.String() } -func (p *ParallelPipeline) addError(repo *gitalypb.Repository, err error) { - p.errMu.Lock() - defer p.errMu.Unlock() - - p.errs.AddError(repo, err) -} - -func (p *ParallelPipeline) cmdLogger(cmd Command) log.Logger { - return p.log.WithFields(log.Fields{ - "command": cmd.Name(), - "storage_name": cmd.Repository().StorageName, - "relative_path": cmd.Repository().RelativePath, - "gl_project_path": cmd.Repository().GlProjectPath, - }) -} - type contextCommand struct { Command Command Context context.Context } -// ParallelPipeline is a pipeline that executes commands in parallel -type ParallelPipeline struct { +// Pipeline is a pipeline that executes commands in parallel +type Pipeline struct { log log.Logger errs PipelineErrors @@ -192,7 +169,7 @@ type ParallelPipeline struct { err error } -// NewParallelPipeline creates a new ParallelPipeline where all commands are +// NewPipeline creates a new Pipeline where all commands are // passed onto `next` to be processed, `parallel` is the maximum number of // parallel backups that will run and `parallelStorage` is the maximum number // of parallel backups that will run per storage. Since the number of storages @@ -201,7 +178,7 @@ type ParallelPipeline struct { // // Note: When both `parallel` and `parallelStorage` are zero or less no workers // are created and the pipeline will block forever. -func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *ParallelPipeline { +func NewPipeline(log log.Logger, parallel, parallelStorage int) *Pipeline { var workerSlots chan struct{} if parallel > 0 && parallelStorage > 0 { // workerSlots allows the total number of parallel jobs to be @@ -209,7 +186,7 @@ func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *Paralle // each storage, while still limiting the absolute parallelism. workerSlots = make(chan struct{}, parallel) } - return &ParallelPipeline{ + return &Pipeline{ log: log, parallel: parallel, parallelStorage: parallelStorage, @@ -221,7 +198,7 @@ func NewParallelPipeline(log log.Logger, parallel, parallelStorage int) *Paralle // Handle queues a request to create a backup. Commands are processed by // n-workers per storage. -func (p *ParallelPipeline) Handle(ctx context.Context, cmd Command) { +func (p *Pipeline) Handle(ctx context.Context, cmd Command) { ch := p.getStorage(cmd.Repository().StorageName) select { @@ -236,7 +213,7 @@ func (p *ParallelPipeline) Handle(ctx context.Context, cmd Command) { // Done waits for any in progress calls to `next` to complete then reports any // accumulated errors -func (p *ParallelPipeline) Done() error { +func (p *Pipeline) Done() error { close(p.done) p.wg.Wait() @@ -253,7 +230,7 @@ func (p *ParallelPipeline) Done() error { // getStorage finds the channel associated with a storage. When no channel is // found, one is created and n-workers are started to process requests. -func (p *ParallelPipeline) getStorage(storage string) chan<- *contextCommand { +func (p *Pipeline) getStorage(storage string) chan<- *contextCommand { p.storageMu.Lock() defer p.storageMu.Unlock() @@ -278,7 +255,7 @@ func (p *ParallelPipeline) getStorage(storage string) chan<- *contextCommand { return ch } -func (p *ParallelPipeline) worker(ch <-chan *contextCommand) { +func (p *Pipeline) worker(ch <-chan *contextCommand) { defer p.wg.Done() for { select { @@ -290,7 +267,7 @@ func (p *ParallelPipeline) worker(ch <-chan *contextCommand) { } } -func (p *ParallelPipeline) processCommand(ctx context.Context, cmd Command) { +func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { p.acquireWorkerSlot() defer p.releaseWorkerSlot() @@ -310,7 +287,7 @@ func (p *ParallelPipeline) processCommand(ctx context.Context, cmd Command) { log.Info(fmt.Sprintf("completed %s", cmd.Name())) } -func (p *ParallelPipeline) setErr(err error) { +func (p *Pipeline) setErr(err error) { p.errMu.Lock() defer p.errMu.Unlock() if p.err != nil { @@ -319,9 +296,25 @@ func (p *ParallelPipeline) setErr(err error) { p.err = err } +func (p *Pipeline) addError(repo *gitalypb.Repository, err error) { + p.errMu.Lock() + defer p.errMu.Unlock() + + p.errs.AddError(repo, err) +} + +func (p *Pipeline) cmdLogger(cmd Command) log.Logger { + return p.log.WithFields(log.Fields{ + "command": cmd.Name(), + "storage_name": cmd.Repository().StorageName, + "relative_path": cmd.Repository().RelativePath, + "gl_project_path": cmd.Repository().GlProjectPath, + }) +} + // acquireWorkerSlot queues the worker until a slot is available. // It never blocks if `parallel` or `parallelStorage` are 0 -func (p *ParallelPipeline) acquireWorkerSlot() { +func (p *Pipeline) acquireWorkerSlot() { if p.workerSlots == nil { return } @@ -329,7 +322,7 @@ func (p *ParallelPipeline) acquireWorkerSlot() { } // releaseWorkerSlot releases the worker slot. -func (p *ParallelPipeline) releaseWorkerSlot() { +func (p *Pipeline) releaseWorkerSlot() { if p.workerSlots == nil { return } diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index e9eda038be..deb31c7d0f 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -18,16 +18,16 @@ import ( func TestLoggingPipeline(t *testing.T) { t.Parallel() - testPipeline(t, func() Pipeline { - return NewParallelPipeline(testhelper.SharedLogger(t), 1, 1) + testPipeline(t, func() *Pipeline { + return NewPipeline(testhelper.SharedLogger(t), 1, 1) }) } func TestParallelPipeline(t *testing.T) { t.Parallel() - testPipeline(t, func() Pipeline { - return NewParallelPipeline(testhelper.SharedLogger(t), 2, 0) + testPipeline(t, func() *Pipeline { + return NewPipeline(testhelper.SharedLogger(t), 2, 0) }) t.Run("parallelism", func(t *testing.T) { @@ -65,8 +65,7 @@ func TestParallelPipeline(t *testing.T) { return nil }, } - var p Pipeline - p = NewParallelPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage) + p := NewPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage) ctx := testhelper.Context(t) for i := 0; i < 10; i++ { @@ -80,8 +79,7 @@ func TestParallelPipeline(t *testing.T) { t.Run("context done", func(t *testing.T) { var strategy MockStrategy - var p Pipeline - p = NewParallelPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block + p := NewPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block ctx, cancel := context.WithCancel(testhelper.Context(t)) @@ -122,7 +120,7 @@ func (s MockStrategy) RemoveAllRepositories(ctx context.Context, req *RemoveAllR return nil } -func testPipeline(t *testing.T, init func() Pipeline) { +func testPipeline(t *testing.T, init func() *Pipeline) { strategy := MockStrategy{ CreateFunc: func(_ context.Context, req *CreateRequest) error { switch req.Repository.StorageName { diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go index 34ccec989b..64347b0e32 100644 --- a/internal/cli/gitalybackup/create.go +++ b/internal/cli/gitalybackup/create.go @@ -151,7 +151,7 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i if cmd.parallelStorage > 0 { parallelStorage = cmd.parallelStorage } - pipeline := backup.NewParallelPipeline(logger, parallel, parallelStorage) + pipeline := backup.NewPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index b195c14360..c86f2067da 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -155,7 +155,7 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin if cmd.parallelStorage > 0 { parallelStorage = cmd.parallelStorage } - pipeline := backup.NewParallelPipeline(logger, parallel, parallelStorage) + pipeline := backup.NewPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { -- GitLab From cb57a73f569aa3a8b34956f47e32b534d62599cd Mon Sep 17 00:00:00 2001 From: James Liu Date: Fri, 8 Dec 2023 12:21:34 +1100 Subject: [PATCH 4/8] backup: Unexport pipelineErrors It's not used outside of the backup package. --- internal/backup/pipeline.go | 10 +++++----- internal/backup/pipeline_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index a734a13822..65094788c9 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -124,11 +124,11 @@ func (cmd RestoreCommand) Execute(ctx context.Context) error { return cmd.strategy.Restore(ctx, &cmd.request) } -// PipelineErrors represents a summary of errors by repository -type PipelineErrors []error +// pipelineErrors represents a summary of errors by repository +type pipelineErrors []error // AddError adds an error associated with a repository to the summary. -func (e *PipelineErrors) AddError(repo *gitalypb.Repository, err error) { +func (e *pipelineErrors) AddError(repo *gitalypb.Repository, err error) { if repo.GetGlProjectPath() != "" { err = fmt.Errorf("%s (%s): %w", repo.GetRelativePath(), repo.GetGlProjectPath(), err) } else { @@ -137,7 +137,7 @@ func (e *PipelineErrors) AddError(repo *gitalypb.Repository, err error) { *e = append(*e, err) } -func (e PipelineErrors) Error() string { +func (e pipelineErrors) Error() string { var builder strings.Builder _, _ = fmt.Fprintf(&builder, "%d failures encountered:\n", len(e)) for _, err := range e { @@ -154,7 +154,7 @@ type contextCommand struct { // Pipeline is a pipeline that executes commands in parallel type Pipeline struct { log log.Logger - errs PipelineErrors + errs pipelineErrors parallel int parallelStorage int diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index deb31c7d0f..bb25a4179e 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -273,7 +273,7 @@ func TestPipelineError(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - err := PipelineErrors{} + err := pipelineErrors{} for _, repo := range tc.repos { err.AddError(repo, assert.AnError) -- GitLab From 10912dca956c655cd22b258fa2b8e59113100e15 Mon Sep 17 00:00:00 2001 From: James Liu Date: Fri, 8 Dec 2023 13:59:11 +1100 Subject: [PATCH 5/8] backup: Add option to set Pipeline concurrency Uses the option function pattern to set the concurrency of a pipeline during initialisation. This makes for a more readable flow, and allows us to set the default state of nil-concurrency internally. --- internal/backup/pipeline.go | 88 ++++++++++++++++++---------- internal/backup/pipeline_test.go | 30 +++++----- internal/cli/gitalybackup/create.go | 14 ++--- internal/cli/gitalybackup/restore.go | 14 ++--- 4 files changed, 86 insertions(+), 60 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 65094788c9..c13ce8e995 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -151,7 +151,7 @@ type contextCommand struct { Context context.Context } -// Pipeline is a pipeline that executes commands in parallel +// Pipeline is a pipeline for running backup and restore jobs. type Pipeline struct { log log.Logger errs pipelineErrors @@ -159,7 +159,10 @@ type Pipeline struct { parallel int parallelStorage int - wg sync.WaitGroup + wg sync.WaitGroup + // workerSlots allows the total number of parallel jobs to be + // limited. This allows us to create the required workers for + // each storage, while still limiting the absolute parallelism. workerSlots chan struct{} done chan struct{} @@ -169,35 +172,60 @@ type Pipeline struct { err error } -// NewPipeline creates a new Pipeline where all commands are -// passed onto `next` to be processed, `parallel` is the maximum number of -// parallel backups that will run and `parallelStorage` is the maximum number -// of parallel backups that will run per storage. Since the number of storages -// is unknown at initialisation, workers are created lazily as new storage -// names are encountered. -// -// Note: When both `parallel` and `parallelStorage` are zero or less no workers -// are created and the pipeline will block forever. -func NewPipeline(log log.Logger, parallel, parallelStorage int) *Pipeline { - var workerSlots chan struct{} - if parallel > 0 && parallelStorage > 0 { - // workerSlots allows the total number of parallel jobs to be - // limited. This allows us to create the required workers for - // each storage, while still limiting the absolute parallelism. - workerSlots = make(chan struct{}, parallel) - } - return &Pipeline{ - log: log, - parallel: parallel, - parallelStorage: parallelStorage, - workerSlots: workerSlots, +// NewPipeline creates a pipeline that executes backup and restore jobs. +// The pipeline executes sequentially by default, but can be made concurrent +// by calling WithConcurrency() after initialisation. +func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { + p := &Pipeline{ + log: log, + // Default to no concurrency. + parallel: 1, + parallelStorage: 0, done: make(chan struct{}), requests: make(map[string]chan *contextCommand), } + + for _, opt := range opts { + if err := opt(p); err != nil { + return nil, err + } + } + + return p, nil +} + +// PipelineOption represents an optional configuration parameter for the Pipeline. +type PipelineOption func(*Pipeline) error + +// WithConcurrency configures the pipeline to run backup and restore jobs concurrently. +// total defines the absolute maximum number of jobs that the pipeline should execute +// concurrently. perStorage defines the number of jobs per Gitaly storage that the +// pipeline should attempt to execute concurrently. +// +// For example, in a Gitaly deployment with 2 storages, WithConcurrency(3, 2) means +// that at most 3 jobs will execute concurrently, despite 2 concurrent jobs being allowed +// per storage (2*2=4). +func WithConcurrency(total, perStorage int) PipelineOption { + return func(p *Pipeline) error { + if total == 0 && perStorage == 0 { + return errors.New("total and perStorage cannot both be 0") + } + + p.parallel = total + p.parallelStorage = perStorage + + if total > 0 && perStorage > 0 { + // When both values are provided, we ensure that total limits + // the global concurrency. + p.workerSlots = make(chan struct{}, total) + } + + return nil + } } -// Handle queues a request to create a backup. Commands are processed by -// n-workers per storage. +// Handle queues a request to create a backup. Commands either processed sequentially +// or concurrently, if WithConcurrency() was called. func (p *Pipeline) Handle(ctx context.Context, cmd Command) { ch := p.getStorage(cmd.Repository().StorageName) @@ -211,8 +239,7 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { } } -// Done waits for any in progress calls to `next` to complete then reports any -// accumulated errors +// Done waits for any in progress jobs to complete then reports any accumulated errors func (p *Pipeline) Done() error { close(p.done) p.wg.Wait() @@ -230,13 +257,15 @@ func (p *Pipeline) Done() error { // getStorage finds the channel associated with a storage. When no channel is // found, one is created and n-workers are started to process requests. +// If parallelStorage is 0, a channel is created against a pseudo-storage to +// enforce the number of total concurrent jobs. func (p *Pipeline) getStorage(storage string) chan<- *contextCommand { p.storageMu.Lock() defer p.storageMu.Unlock() workers := p.parallelStorage - if p.parallelStorage < 1 { + if p.parallelStorage == 0 { // if the workers are not limited by storage, then pretend there is a single storage with `parallel` workers storage = "" workers = p.parallel @@ -313,7 +342,6 @@ func (p *Pipeline) cmdLogger(cmd Command) log.Logger { } // acquireWorkerSlot queues the worker until a slot is available. -// It never blocks if `parallel` or `parallelStorage` are 0 func (p *Pipeline) acquireWorkerSlot() { if p.workerSlots == nil { return diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index bb25a4179e..a0a4f22051 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -15,21 +15,17 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) -func TestLoggingPipeline(t *testing.T) { +func TestPipeline(t *testing.T) { t.Parallel() + // Sequential testPipeline(t, func() *Pipeline { - return NewPipeline(testhelper.SharedLogger(t), 1, 1) - }) -} - -func TestParallelPipeline(t *testing.T) { - t.Parallel() - - testPipeline(t, func() *Pipeline { - return NewPipeline(testhelper.SharedLogger(t), 2, 0) + p, err := NewPipeline(testhelper.SharedLogger(t)) + require.NoError(t, err) + return p }) + // Concurrent t.Run("parallelism", func(t *testing.T) { for _, tc := range []struct { parallel int @@ -51,6 +47,11 @@ func TestParallelPipeline(t *testing.T) { parallelStorage: 3, expectedMaxParallel: 6, // 2 storages * 3 workers per storage }, + { + parallel: 3, + parallelStorage: 2, + expectedMaxParallel: 3, // `parallel` takes priority, which is why 2 storages * 2 workers is not the max + }, } { t.Run(fmt.Sprintf("parallel:%d,parallelStorage:%d", tc.parallel, tc.parallelStorage), func(t *testing.T) { var calls int64 @@ -65,7 +66,8 @@ func TestParallelPipeline(t *testing.T) { return nil }, } - p := NewPipeline(testhelper.SharedLogger(t), tc.parallel, tc.parallelStorage) + p, err := NewPipeline(testhelper.SharedLogger(t), WithConcurrency(tc.parallel, tc.parallelStorage)) + require.NoError(t, err) ctx := testhelper.Context(t) for i := 0; i < 10; i++ { @@ -79,7 +81,8 @@ func TestParallelPipeline(t *testing.T) { t.Run("context done", func(t *testing.T) { var strategy MockStrategy - p := NewPipeline(testhelper.SharedLogger(t), 0, 0) // make sure worker channels always block + p, err := NewPipeline(testhelper.SharedLogger(t)) + require.NoError(t, err) ctx, cancel := context.WithCancel(testhelper.Context(t)) @@ -88,8 +91,7 @@ func TestParallelPipeline(t *testing.T) { p.Handle(ctx, NewCreateCommand(strategy, CreateRequest{Repository: &gitalypb.Repository{StorageName: "default"}})) - err := p.Done() - require.EqualError(t, err, "pipeline: context canceled") + require.EqualError(t, p.Done(), "pipeline: context canceled") }) } diff --git a/internal/cli/gitalybackup/create.go b/internal/cli/gitalybackup/create.go index 64347b0e32..288e5a08d0 100644 --- a/internal/cli/gitalybackup/create.go +++ b/internal/cli/gitalybackup/create.go @@ -142,16 +142,14 @@ func (cmd *createSubcommand) run(ctx context.Context, logger log.Logger, stdin i manager = backup.NewManager(sink, locator, pool) } - // Defaults to no concurrency. - parallel := 1 - if cmd.parallel > 0 { - parallel = cmd.parallel + var opts []backup.PipelineOption + if cmd.parallel > 0 || cmd.parallelStorage > 0 { + opts = append(opts, backup.WithConcurrency(cmd.parallel, cmd.parallelStorage)) } - parallelStorage := 1 - if cmd.parallelStorage > 0 { - parallelStorage = cmd.parallelStorage + pipeline, err := backup.NewPipeline(logger, opts...) + if err != nil { + return fmt.Errorf("create pipeline: %w", err) } - pipeline := backup.NewPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { diff --git a/internal/cli/gitalybackup/restore.go b/internal/cli/gitalybackup/restore.go index c86f2067da..de9e2cd3d3 100644 --- a/internal/cli/gitalybackup/restore.go +++ b/internal/cli/gitalybackup/restore.go @@ -146,16 +146,14 @@ func (cmd *restoreSubcommand) run(ctx context.Context, logger log.Logger, stdin } } - // Defaults to no concurrency. - parallel := 1 - if cmd.parallel > 0 { - parallel = cmd.parallel + var opts []backup.PipelineOption + if cmd.parallel > 0 || cmd.parallelStorage > 0 { + opts = append(opts, backup.WithConcurrency(cmd.parallel, cmd.parallelStorage)) } - parallelStorage := 1 - if cmd.parallelStorage > 0 { - parallelStorage = cmd.parallelStorage + pipeline, err := backup.NewPipeline(logger, opts...) + if err != nil { + return fmt.Errorf("create pipeline: %w", err) } - pipeline := backup.NewPipeline(logger, parallel, parallelStorage) decoder := json.NewDecoder(stdin) for { -- GitLab From 2602dbd41fa65a63e6cd5cd370b0688bfbb40aa3 Mon Sep 17 00:00:00 2001 From: James Liu Date: Fri, 8 Dec 2023 14:43:40 +1100 Subject: [PATCH 6/8] backup: Reorganise Pipeline fields for readability --- internal/backup/pipeline.go | 82 +++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index c13ce8e995..2684e71823 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -153,23 +153,27 @@ type contextCommand struct { // Pipeline is a pipeline for running backup and restore jobs. type Pipeline struct { - log log.Logger - errs pipelineErrors + log log.Logger parallel int parallelStorage int - wg sync.WaitGroup - // workerSlots allows the total number of parallel jobs to be + // totalWorkers allows the total number of parallel jobs to be // limited. This allows us to create the required workers for // each storage, while still limiting the absolute parallelism. - workerSlots chan struct{} - done chan struct{} + totalWorkers chan struct{} - errMu sync.Mutex - storageMu sync.Mutex - requests map[string]chan *contextCommand - err error + workerWg sync.WaitGroup + workersByStorage map[string]chan *contextCommand + workersByStorageMu sync.Mutex + + // done signals that no more commands will be provided to the Pipeline via + // Handle(), and the pipeline should wait for workers to complete and exit. + done chan struct{} + + pipelineError error + commandErrors pipelineErrors + commandErrorsMu sync.Mutex } // NewPipeline creates a pipeline that executes backup and restore jobs. @@ -179,10 +183,10 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { p := &Pipeline{ log: log, // Default to no concurrency. - parallel: 1, - parallelStorage: 0, - done: make(chan struct{}), - requests: make(map[string]chan *contextCommand), + parallel: 1, + parallelStorage: 0, + done: make(chan struct{}), + workersByStorage: make(map[string]chan *contextCommand), } for _, opt := range opts { @@ -217,7 +221,7 @@ func WithConcurrency(total, perStorage int) PipelineOption { if total > 0 && perStorage > 0 { // When both values are provided, we ensure that total limits // the global concurrency. - p.workerSlots = make(chan struct{}, total) + p.totalWorkers = make(chan struct{}, total) } return nil @@ -227,7 +231,7 @@ func WithConcurrency(total, perStorage int) PipelineOption { // Handle queues a request to create a backup. Commands either processed sequentially // or concurrently, if WithConcurrency() was called. func (p *Pipeline) Handle(ctx context.Context, cmd Command) { - ch := p.getStorage(cmd.Repository().StorageName) + ch := p.getWorker(cmd.Repository().StorageName) select { case <-ctx.Done(): @@ -242,26 +246,26 @@ func (p *Pipeline) Handle(ctx context.Context, cmd Command) { // Done waits for any in progress jobs to complete then reports any accumulated errors func (p *Pipeline) Done() error { close(p.done) - p.wg.Wait() + p.workerWg.Wait() - if p.err != nil { - return fmt.Errorf("pipeline: %w", p.err) + if p.pipelineError != nil { + return fmt.Errorf("pipeline: %w", p.pipelineError) } - if len(p.errs) > 0 { - return fmt.Errorf("pipeline: %w", p.errs) + if len(p.commandErrors) > 0 { + return fmt.Errorf("pipeline: %w", p.commandErrors) } return nil } -// getStorage finds the channel associated with a storage. When no channel is +// getWorker finds the channel associated with a storage. When no channel is // found, one is created and n-workers are started to process requests. // If parallelStorage is 0, a channel is created against a pseudo-storage to // enforce the number of total concurrent jobs. -func (p *Pipeline) getStorage(storage string) chan<- *contextCommand { - p.storageMu.Lock() - defer p.storageMu.Unlock() +func (p *Pipeline) getWorker(storage string) chan<- *contextCommand { + p.workersByStorageMu.Lock() + defer p.workersByStorageMu.Unlock() workers := p.parallelStorage @@ -271,13 +275,13 @@ func (p *Pipeline) getStorage(storage string) chan<- *contextCommand { workers = p.parallel } - ch, ok := p.requests[storage] + ch, ok := p.workersByStorage[storage] if !ok { ch = make(chan *contextCommand) - p.requests[storage] = ch + p.workersByStorage[storage] = ch for i := 0; i < workers; i++ { - p.wg.Add(1) + p.workerWg.Add(1) go p.worker(ch) } } @@ -285,7 +289,7 @@ func (p *Pipeline) getStorage(storage string) chan<- *contextCommand { } func (p *Pipeline) worker(ch <-chan *contextCommand) { - defer p.wg.Done() + defer p.workerWg.Done() for { select { case <-p.done: @@ -317,19 +321,17 @@ func (p *Pipeline) processCommand(ctx context.Context, cmd Command) { } func (p *Pipeline) setErr(err error) { - p.errMu.Lock() - defer p.errMu.Unlock() - if p.err != nil { + if p.pipelineError != nil { return } - p.err = err + p.pipelineError = err } func (p *Pipeline) addError(repo *gitalypb.Repository, err error) { - p.errMu.Lock() - defer p.errMu.Unlock() + p.commandErrorsMu.Lock() + defer p.commandErrorsMu.Unlock() - p.errs.AddError(repo, err) + p.commandErrors.AddError(repo, err) } func (p *Pipeline) cmdLogger(cmd Command) log.Logger { @@ -343,16 +345,16 @@ func (p *Pipeline) cmdLogger(cmd Command) log.Logger { // acquireWorkerSlot queues the worker until a slot is available. func (p *Pipeline) acquireWorkerSlot() { - if p.workerSlots == nil { + if p.totalWorkers == nil { return } - p.workerSlots <- struct{}{} + p.totalWorkers <- struct{}{} } // releaseWorkerSlot releases the worker slot. func (p *Pipeline) releaseWorkerSlot() { - if p.workerSlots == nil { + if p.totalWorkers == nil { return } - <-p.workerSlots + <-p.totalWorkers } -- GitLab From 7e9eb65bc798112b498760e3ce4456cc2a2a89da Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 11 Dec 2023 14:49:27 +1100 Subject: [PATCH 7/8] backup: Track concurrency per storage in tests Modifies the parallelism tests for the Pipeline so the number of concurrent jobs can be tracked per storage. This allows us to ensure that both the `parallel` and `parallelStorage` thresholds are being respected, and will come in handy for a future optimisation that will attempt to balance operations across storages. --- internal/backup/pipeline_test.go | 66 ++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 21 deletions(-) diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index a0a4f22051..f4164d26bf 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -3,7 +3,7 @@ package backup import ( "context" "fmt" - "sync/atomic" + "sync" "testing" "time" @@ -28,39 +28,63 @@ func TestPipeline(t *testing.T) { // Concurrent t.Run("parallelism", func(t *testing.T) { for _, tc := range []struct { - parallel int - parallelStorage int - expectedMaxParallel int64 + parallel int + parallelStorage int + expectedMaxParallel int + expectedMaxStorageParallel int }{ { - parallel: 2, - parallelStorage: 0, - expectedMaxParallel: 2, + parallel: 2, + parallelStorage: 0, + expectedMaxParallel: 2, + expectedMaxStorageParallel: 2, }, { - parallel: 2, - parallelStorage: 3, - expectedMaxParallel: 2, + parallel: 2, + parallelStorage: 3, + expectedMaxParallel: 2, + expectedMaxStorageParallel: 2, }, { - parallel: 0, - parallelStorage: 3, - expectedMaxParallel: 6, // 2 storages * 3 workers per storage + parallel: 0, + parallelStorage: 3, + expectedMaxParallel: 6, // 2 storages * 3 workers per storage + expectedMaxStorageParallel: 3, }, { - parallel: 3, - parallelStorage: 2, - expectedMaxParallel: 3, // `parallel` takes priority, which is why 2 storages * 2 workers is not the max + parallel: 3, + parallelStorage: 2, + expectedMaxParallel: 3, + expectedMaxStorageParallel: 2, }, } { t.Run(fmt.Sprintf("parallel:%d,parallelStorage:%d", tc.parallel, tc.parallelStorage), func(t *testing.T) { - var calls int64 + var mu sync.Mutex + // callsPerStorage tracks the number of concurrent jobs running for each storage. + callsPerStorage := map[string]int{ + "storage1": 0, + "storage2": 0, + } + strategy := MockStrategy{ CreateFunc: func(ctx context.Context, req *CreateRequest) error { - currentCalls := atomic.AddInt64(&calls, 1) - defer atomic.AddInt64(&calls, -1) - - assert.LessOrEqual(t, currentCalls, tc.expectedMaxParallel) + mu.Lock() + callsPerStorage[req.Repository.StorageName]++ + allCalls := 0 + for _, v := range callsPerStorage { + allCalls += v + } + // We ensure that the concurrency for each storage is not above the + // parallelStorage threshold, and also that the total number of concurrent + // jobs is not above the parallel threshold. + require.LessOrEqual(t, callsPerStorage[req.Repository.StorageName], tc.expectedMaxStorageParallel) + require.LessOrEqual(t, allCalls, tc.expectedMaxParallel) + mu.Unlock() + defer func() { + mu.Lock() + callsPerStorage[req.Repository.StorageName]-- + mu.Unlock() + }() time.Sleep(time.Millisecond) return nil -- GitLab From 073f6348207f110294ff9117edc9ba10153e8e57 Mon Sep 17 00:00:00 2001 From: James Liu Date: Mon, 18 Dec 2023 11:22:05 +1100 Subject: [PATCH 8/8] backup: Rename pipelineErrors and encapsulate mu Renames pipelineErrors to commandErrors and encapsulates the mutex so locking can be managed internally. --- internal/backup/pipeline.go | 37 ++++++++++++++++++-------------- internal/backup/pipeline_test.go | 2 +- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/internal/backup/pipeline.go b/internal/backup/pipeline.go index 2684e71823..5672493acd 100644 --- a/internal/backup/pipeline.go +++ b/internal/backup/pipeline.go @@ -124,23 +124,31 @@ func (cmd RestoreCommand) Execute(ctx context.Context) error { return cmd.strategy.Restore(ctx, &cmd.request) } -// pipelineErrors represents a summary of errors by repository -type pipelineErrors []error +// commandErrors represents a summary of errors by repository +// +//nolint:errname +type commandErrors struct { + errs []error + mu sync.Mutex +} // AddError adds an error associated with a repository to the summary. -func (e *pipelineErrors) AddError(repo *gitalypb.Repository, err error) { +func (c *commandErrors) AddError(repo *gitalypb.Repository, err error) { + c.mu.Lock() + defer c.mu.Unlock() + if repo.GetGlProjectPath() != "" { err = fmt.Errorf("%s (%s): %w", repo.GetRelativePath(), repo.GetGlProjectPath(), err) } else { err = fmt.Errorf("%s: %w", repo.GetRelativePath(), err) } - *e = append(*e, err) + c.errs = append(c.errs, err) } -func (e pipelineErrors) Error() string { +func (c *commandErrors) Error() string { var builder strings.Builder - _, _ = fmt.Fprintf(&builder, "%d failures encountered:\n", len(e)) - for _, err := range e { + _, _ = fmt.Fprintf(&builder, "%d failures encountered:\n", len(c.errs)) + for _, err := range c.errs { _, _ = fmt.Fprintf(&builder, " - %s\n", err.Error()) } return builder.String() @@ -171,9 +179,8 @@ type Pipeline struct { // Handle(), and the pipeline should wait for workers to complete and exit. done chan struct{} - pipelineError error - commandErrors pipelineErrors - commandErrorsMu sync.Mutex + pipelineError error + cmdErrors *commandErrors } // NewPipeline creates a pipeline that executes backup and restore jobs. @@ -187,6 +194,7 @@ func NewPipeline(log log.Logger, opts ...PipelineOption) (*Pipeline, error) { parallelStorage: 0, done: make(chan struct{}), workersByStorage: make(map[string]chan *contextCommand), + cmdErrors: &commandErrors{}, } for _, opt := range opts { @@ -252,8 +260,8 @@ func (p *Pipeline) Done() error { return fmt.Errorf("pipeline: %w", p.pipelineError) } - if len(p.commandErrors) > 0 { - return fmt.Errorf("pipeline: %w", p.commandErrors) + if len(p.cmdErrors.errs) > 0 { + return fmt.Errorf("pipeline: %w", p.cmdErrors) } return nil @@ -328,10 +336,7 @@ func (p *Pipeline) setErr(err error) { } func (p *Pipeline) addError(repo *gitalypb.Repository, err error) { - p.commandErrorsMu.Lock() - defer p.commandErrorsMu.Unlock() - - p.commandErrors.AddError(repo, err) + p.cmdErrors.AddError(repo, err) } func (p *Pipeline) cmdLogger(cmd Command) log.Logger { diff --git a/internal/backup/pipeline_test.go b/internal/backup/pipeline_test.go index f4164d26bf..04c539a5f4 100644 --- a/internal/backup/pipeline_test.go +++ b/internal/backup/pipeline_test.go @@ -299,7 +299,7 @@ func TestPipelineError(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - err := pipelineErrors{} + err := &commandErrors{} for _, repo := range tc.repos { err.AddError(repo, assert.AnError) -- GitLab