From 6fc11ddae868e070e402654ed0b147b8c80d7358 Mon Sep 17 00:00:00 2001 From: John Cai Date: Mon, 12 Aug 2024 15:20:55 -0400 Subject: [PATCH 01/10] bundleuri: Introduce bundle generation manager A bundle generation manager will be responsible for handling requests to generate bundles for repositories. It will be in control of these processes and be able to stop them in the event Gitaly is shutting down. It also will control concurrency and how many bundle generation processes are allowed to be running at one time. This will be passed in and used to automatically generate bundles for bundle uri. --- internal/bundleuri/manager.go | 105 ++++++++++++++++++++++++++++ internal/bundleuri/manager_test.go | 108 +++++++++++++++++++++++++++++ 2 files changed, 213 insertions(+) create mode 100644 internal/bundleuri/manager.go create mode 100644 internal/bundleuri/manager_test.go diff --git a/internal/bundleuri/manager.go b/internal/bundleuri/manager.go new file mode 100644 index 0000000000..77c0de6132 --- /dev/null +++ b/internal/bundleuri/manager.go @@ -0,0 +1,105 @@ +package bundleuri + +import ( + "context" + "errors" + "fmt" + "sync" + + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" +) + +const globalLimitingKey = "bundle-uri-generation" + +var ( + // ErrBundleGenerationInProgress indicates there is a bundle already + // being generated for this repository. + ErrBundleGenerationInProgress = errors.New("a bundle is already beng generated for this repository") + // ErrBundleGenerationConcurrencyReached indicates the global concurrency + // limit has been reached. + ErrBundleGenerationConcurrencyReached = errors.New("bundle generation concurrency reached") + // ErrTerminatedEarly indicates the context was cancelled, or the + // goroutine handling the bundle generation was stopped. + ErrTerminatedEarly = errors.New("bundle generatio terminated early") +) + +// BundleGenerationManager manages bundle generation. It handles requests to +// generate bundles for a repository, and enforces concurrency by limiting one +// bundle generation per repo at any given time as well as a global limit across +// all repositories. +type BundleGenerationManager struct { + limiter limiter.Limiter + sink *Sink + bundleGenerationInProgress sync.Map + stopChan chan struct{} + wg sync.WaitGroup +} + +// NewBundleGenerationManager creates a new BundleGenerationManager +func NewBundleGenerationManager(sink *Sink, concurrencyLimiter limiter.Limiter) *BundleGenerationManager { + return &BundleGenerationManager{ + limiter: concurrencyLimiter, + sink: sink, + stopChan: make(chan struct{}), + } +} + +// StopAll blocks until all of the goroutines that are generating bundles are finished. +func (b *BundleGenerationManager) StopAll() { + close(b.stopChan) + b.wg.Wait() +} + +// generateOneAtATime generates a bundle for a repository, but only if there is not already +// one in flight. +func (b *BundleGenerationManager) generateOneAtATime(ctx context.Context, repo *localrepo.Repo) (string, error) { + b.wg.Add(1) + defer b.wg.Done() + + bundlePath := b.sink.relativePath(repo, defaultBundle) + + _, loaded := b.bundleGenerationInProgress.LoadOrStore( + bundlePath, + struct{}{}, + ) + if loaded { + return "", ErrBundleGenerationInProgress + } + defer b.bundleGenerationInProgress.Delete(bundlePath) + + errChan := make(chan error) + + go func() { + errChan <- b.sink.Generate(ctx, repo) + }() + + select { + case err := <-errChan: + return bundlePath, err + case <-b.stopChan: + return "", ErrTerminatedEarly + } +} + +func (b *BundleGenerationManager) generateOneAtATimeLimitFunc(ctx context.Context, repo *localrepo.Repo) limiter.LimitedFunc { + return func() (interface{}, error) { + var bundlePath string + var err error + if bundlePath, err = b.generateOneAtATime(ctx, repo); err != nil { + return nil, err + } + + return bundlePath, nil + } +} + +// Generate generates a bundle for a given repository while enforcing +// concurrency limits +func (b *BundleGenerationManager) Generate(ctx context.Context, repo *localrepo.Repo) error { + if _, err := b.limiter.Limit(ctx, globalLimitingKey, b.generateOneAtATimeLimitFunc(ctx, repo)); err != nil { + return fmt.Errorf("error generating bundle: %w", err) + } + + return nil +} diff --git a/internal/bundleuri/manager_test.go b/internal/bundleuri/manager_test.go new file mode 100644 index 0000000000..b358b5c110 --- /dev/null +++ b/internal/bundleuri/manager_test.go @@ -0,0 +1,108 @@ +package bundleuri + +import ( + "context" + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" +) + +type passthroughLimiter struct{} + +func (p *passthroughLimiter) Limit(ctx context.Context, lockKey string, f limiter.LimitedFunc) (interface{}, error) { + return f() +} + +func TestBundleGenerationManager_generateOneAtATime(t *testing.T) { + t.Parallel() + + cfg := testcfg.Build(t) + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + setup func(t *testing.T, repoPath string) + expectedErr error + }{ + { + desc: "creates bundle successfully", + setup: func(t *testing.T, repoPath string) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + }, + }, + { + desc: "fails with missing HEAD", + setup: func(t *testing.T, repoPath string) {}, + expectedErr: structerr.NewFailedPrecondition("ref %q does not exist: %w", "refs/heads/main", fmt.Errorf("create bundle: %w", localrepo.ErrEmptyBundle)), + }, + } { + tc := tc + + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + tc.setup(t, repoPath) + + sinkDir := t.TempDir() + sink, err := NewSink( + ctx, + "file://"+sinkDir, + ) + require.NoError(t, err) + + manager := NewBundleGenerationManager(sink, &passthroughLimiter{}) + + _, err = manager.generateOneAtATime(ctx, repo) + + if tc.expectedErr == nil { + require.FileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) + } else { + require.Equal(t, tc.expectedErr, err) + } + }) + } +} + +func TestSink_generateOneAtATime_inProgress(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + sinkDir := t.TempDir() + sink, err := NewSink( + ctx, + "file://"+sinkDir, + ) + require.NoError(t, err) + + repoProto, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + manager := NewBundleGenerationManager(sink, &passthroughLimiter{}) + + // pretend like there is already another bundle generation happening for + // this repo. + bundlePath := sink.relativePath(repo, defaultBundle) + manager.bundleGenerationInProgress.Store(bundlePath, struct{}{}) + + _, err = manager.generateOneAtATime(testhelper.Context(t), repo) + require.Equal(t, ErrBundleGenerationInProgress, err) +} -- GitLab From 0fbc6ec3efcd30f192396a43ee14695ffc30d5a9 Mon Sep 17 00:00:00 2001 From: John Cai Date: Tue, 13 Aug 2024 15:57:42 -0400 Subject: [PATCH 02/10] limiter: Add monitor for bundle generation limiter --- internal/limiter/concurrency_limiter.go | 3 ++ internal/limiter/monitor.go | 43 +++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go index 7f23eb3ad2..409a88db76 100644 --- a/internal/limiter/concurrency_limiter.go +++ b/internal/limiter/concurrency_limiter.go @@ -75,6 +75,9 @@ const ( // TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request // information (RemoteIP/Repository/User) as the limiting key. TypePackObjects = "pack-objects" + // TypeBundleGeneration is a dedicated concurrency limiter for + // bundle-generation. + TypeBundleGeneration = "bundle-generation" ) // ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the diff --git a/internal/limiter/monitor.go b/internal/limiter/monitor.go index 9230525d86..cde32d0fd6 100644 --- a/internal/limiter/monitor.go +++ b/internal/limiter/monitor.go @@ -194,3 +194,46 @@ func NewPackObjectsConcurrencyMonitor(latencyBuckets []float64) *PromMonitor { requestsDroppedVec, ) } + +// NewBundleGenerationConcurrencyMonitor returns a concurrency monitor for use +// with limiting pack objects processes. +func NewBundleGenerationConcurrencyMonitor(latencyBuckets []float64) *PromMonitor { + acquiringSecondsVec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gitaly_bundle_generation_acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: latencyBuckets, + }, + nil, + ) + + inProgressVec := prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "gitaly_bundle_generation_in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + ) + + queuedVec := prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "gitaly_bundle_generation_queued", + Help: "Gauge of number of queued calls", + }, + ) + + requestsDroppedVec := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_bundle_generation_dropped_total", + Help: "Number of requests dropped from the queue", + }, + []string{"reason"}, + ) + + return newPromMonitor( + TypeBundleGeneration, + queuedVec, + inProgressVec, + acquiringSecondsVec, + requestsDroppedVec, + ) +} -- GitLab From 354100451a41fc190f206f63fa1d83a7acaeac2d Mon Sep 17 00:00:00 2001 From: John Cai Date: Fri, 21 Jun 2024 12:02:15 -0400 Subject: [PATCH 03/10] service: Add in progress tracker An in progress tracker can be used used to track how many pack-objects processes are inflight for a given repository in order to inform whether or not a bundle should be created for a given repository. --- internal/gitaly/service/dependencies.go | 6 ++ .../gitaly/service/in_progress_tracker.go | 48 +++++++++++++ .../service/in_progress_tracker_test.go | 67 +++++++++++++++++++ internal/gitaly/service/testhelper_test.go | 11 +++ internal/testhelper/testserver/gitaly.go | 9 +++ 5 files changed, 141 insertions(+) create mode 100644 internal/gitaly/service/in_progress_tracker.go create mode 100644 internal/gitaly/service/in_progress_tracker_test.go create mode 100644 internal/gitaly/service/testhelper_test.go diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 4fc6f952f9..cbdbb40372 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -48,6 +48,7 @@ type Dependencies struct { BackupLocator backup.Locator BundleURISink *bundleuri.Sink ProcReceiveRegistry *gitalyhook.ProcReceiveRegistry + InProgressTracker InProgressTracker } // GetLogger returns the logger. @@ -164,3 +165,8 @@ func (dc *Dependencies) GetBundleURISink() *bundleuri.Sink { func (dc *Dependencies) GetProcReceiveRegistry() *gitalyhook.ProcReceiveRegistry { return dc.ProcReceiveRegistry } + +// GetInProgressTracker returns the ProcReceiveRegistry. +func (dc *Dependencies) GetInProgressTracker() InProgressTracker { + return dc.InProgressTracker +} diff --git a/internal/gitaly/service/in_progress_tracker.go b/internal/gitaly/service/in_progress_tracker.go new file mode 100644 index 0000000000..5626170453 --- /dev/null +++ b/internal/gitaly/service/in_progress_tracker.go @@ -0,0 +1,48 @@ +package service + +import ( + "sync" +) + +// InProgressTracker can be used to keep track of processes that are in flight +type InProgressTracker interface { + GetInProgress(key string) uint + IncrementInProgress(key string) + DecrementInProgress(key string) +} + +type inProgressTracker struct { + inProgress map[string]uint + l sync.RWMutex +} + +// NewInProgressTracker instantiates a new inProgressTracker. +func NewInProgressTracker() *inProgressTracker { + return &inProgressTracker{ + inProgress: make(map[string]uint), + } +} + +// GetInProgress gets the number of inflight processes for a given key. +func (p *inProgressTracker) GetInProgress(key string) uint { + p.l.RLock() + defer p.l.RUnlock() + + return p.inProgress[key] +} + +// IncrementInProgress increments the number of inflight processes for a given key. +func (p *inProgressTracker) IncrementInProgress(key string) { + p.l.Lock() + defer p.l.Unlock() + + p.inProgress[key]++ +} + +// DecrementInProgress decrements the number of inflight processes for a given key. +func (p *inProgressTracker) DecrementInProgress(key string) { + p.l.Lock() + defer p.l.Unlock() + + p.inProgress[key]-- +} diff --git a/internal/gitaly/service/in_progress_tracker_test.go b/internal/gitaly/service/in_progress_tracker_test.go new file mode 100644 index 0000000000..70c40483c5 --- /dev/null +++ b/internal/gitaly/service/in_progress_tracker_test.go @@ -0,0 +1,67 @@ +package service_test + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" +) + +func TestInProgressTracker(t *testing.T) { + key := "key1" + + testCases := []struct { + desc string + expectedInProgress uint + actions func(service.InProgressTracker) + }{ + { + desc: "one in flight", + expectedInProgress: 1, + actions: func(t service.InProgressTracker) { + t.IncrementInProgress(key) + t.IncrementInProgress(key) + t.DecrementInProgress(key) + }, + }, + { + desc: "two in flight with concurrent writes", + expectedInProgress: 2, + actions: func(t service.InProgressTracker) { + var wg sync.WaitGroup + + wg.Add(4) + go func() { + t.IncrementInProgress(key) + wg.Done() + }() + go func() { + t.IncrementInProgress(key) + wg.Done() + }() + + go func() { + t.IncrementInProgress(key) + wg.Done() + }() + + go func() { + t.DecrementInProgress(key) + wg.Done() + }() + + wg.Wait() + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + tracker := service.NewInProgressTracker() + + tc.actions(tracker) + require.Equal(t, tc.expectedInProgress, tracker.GetInProgress(key)) + }) + } +} diff --git a/internal/gitaly/service/testhelper_test.go b/internal/gitaly/service/testhelper_test.go new file mode 100644 index 0000000000..43ac8d85bf --- /dev/null +++ b/internal/gitaly/service/testhelper_test.go @@ -0,0 +1,11 @@ +package service_test + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 974eac082f..886f3cd5f9 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -290,6 +290,7 @@ type gitalyServerDeps struct { signingKey string transactionRegistry *storagemgr.TransactionRegistry procReceiveRegistry *hook.ProcReceiveRegistry + inProgressTracker service.InProgressTracker } func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies { @@ -567,6 +568,14 @@ func WithBundleURISink(sink *bundleuri.Sink) GitalyServerOpt { } } +// WithInProgressTracker sets the bundleuri.Sink that will be used for Gitaly services +func WithInProgressTracker(tracker service.InProgressTracker) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.inProgressTracker = tracker + return deps + } +} + // WithSigningKey sets the signing key path that will be used for Gitaly // services. func WithSigningKey(signingKey string) GitalyServerOpt { -- GitLab From 323e1f9bd4300833f1c1591655d357d10e8d8e74 Mon Sep 17 00:00:00 2001 From: John Cai Date: Tue, 13 Aug 2024 15:48:44 -0400 Subject: [PATCH 04/10] bundleuri: Return sentinel error when sink is not found Return a sentinel error when a sink is not found so that upstream callers can decide what to do based on a concrete error type. --- internal/bundleuri/git_config.go | 5 ++++- internal/bundleuri/git_config_test.go | 6 ++---- internal/bundleuri/sink.go | 7 +++++-- internal/bundleuri/sink_test.go | 2 +- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/bundleuri/git_config.go b/internal/bundleuri/git_config.go index 69cc21b49b..f18fd4bda1 100644 --- a/internal/bundleuri/git_config.go +++ b/internal/bundleuri/git_config.go @@ -11,6 +11,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) +// ErrSinkMissing indicates a sink is missing +var ErrSinkMissing = errors.New("bundle-URI sink missing") + // CapabilitiesGitConfig returns a slice of git.ConfigPairs that can be injected // into the Git config to make it aware the bundle-URI capabilities are // supported. @@ -42,7 +45,7 @@ func UploadPackGitConfig( } if sink == nil { - return CapabilitiesGitConfig(ctx), errors.New("bundle-URI sink missing") + return CapabilitiesGitConfig(ctx), ErrSinkMissing } uri, err := sink.SignedURL(ctx, repo) diff --git a/internal/bundleuri/git_config_test.go b/internal/bundleuri/git_config_test.go index 0a5da2bbc2..19c79bf26f 100644 --- a/internal/bundleuri/git_config_test.go +++ b/internal/bundleuri/git_config_test.go @@ -2,7 +2,6 @@ package bundleuri import ( "context" - "errors" "fmt" "os" "path/filepath" @@ -14,7 +13,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" ) @@ -60,7 +58,7 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { return setupData{} }, expectedConfig: nil, - expectedErr: errors.New("bundle-URI sink missing"), + expectedErr: ErrSinkMissing, }, { desc: "no bundle found", @@ -74,7 +72,7 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { } }, expectedConfig: nil, - expectedErr: structerr.NewNotFound("no bundle available"), + expectedErr: ErrBundleNotFound, }, { desc: "not signed", diff --git a/internal/bundleuri/sink.go b/internal/bundleuri/sink.go index a2d1ac8d00..30011f63d1 100644 --- a/internal/bundleuri/sink.go +++ b/internal/bundleuri/sink.go @@ -29,6 +29,9 @@ const ( defaultExpiry = 10 * time.Minute ) +// ErrBundleNotFound indicates that no bundle could be found for a given repository. +var ErrBundleNotFound = errors.New("no bundle found") + // Sink is a wrapper around the storage bucket used for accessing/writing // bundleuri bundles. type Sink struct { @@ -132,9 +135,9 @@ func (s Sink) SignedURL(ctx context.Context, repo storage.Repository) (string, e if exists, err := s.bucket.Exists(ctx, relativePath); !exists { if err == nil { - return "", structerr.NewNotFound("no bundle available") + return "", ErrBundleNotFound } - return "", structerr.NewNotFound("no bundle available: %w", err) + return "", fmt.Errorf("checking bundle existence: %w", err) } uri, err := s.bucket.SignedURL(ctx, relativePath, &blob.SignedURLOptions{ diff --git a/internal/bundleuri/sink_test.go b/internal/bundleuri/sink_test.go index 3454186121..0b1079a37d 100644 --- a/internal/bundleuri/sink_test.go +++ b/internal/bundleuri/sink_test.go @@ -105,7 +105,7 @@ func TestSink_SignedURL(t *testing.T) { { desc: "fails with missing bundle", setup: func(t *testing.T, sinkDir string, sink *Sink) {}, - expectedErr: structerr.NewNotFound("no bundle available"), + expectedErr: ErrBundleNotFound, }, } { tc := tc -- GitLab From bac6c3c09e5926a1e2ddbec2727c2537a89f70d9 Mon Sep 17 00:00:00 2001 From: John Cai Date: Tue, 13 Aug 2024 15:50:13 -0400 Subject: [PATCH 05/10] config: Refactor Concurrency to RPCConcurrency Currently the Concurrency struct contains configuration for the RPC concurrency setting. Let's make Concurrency more generic so that other operations can also utilize it. Create a new RPCConcurrency type that is specific to RPC concurrency, while keeping Concurency generic. --- internal/gitaly/config/config.go | 35 +++--- internal/gitaly/config/config_test.go | 38 +++---- internal/gitaly/server/auth_test.go | 2 +- .../limithandler/middleware_test.go | 104 +++++++++++------- 4 files changed, 105 insertions(+), 74 deletions(-) diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 72526a14a4..05637715d8 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -119,7 +119,7 @@ type Cfg struct { // This setting is thus deprecated and should ideally not be used anymore. GitlabShell GitlabShell `toml:"gitlab-shell,omitempty" json:"gitlab-shell"` Hooks Hooks `toml:"hooks,omitempty" json:"hooks"` - Concurrency []Concurrency `toml:"concurrency,omitempty" json:"concurrency"` + Concurrency []RPCConcurrency `toml:"concurrency,omitempty" json:"concurrency"` RateLimiting []RateLimiting `toml:"rate_limiting,omitempty" json:"rate_limiting"` GracefulRestartTimeout duration.Duration `toml:"graceful_restart_timeout,omitempty" json:"graceful_restart_timeout"` DailyMaintenance DailyJob `toml:"daily_maintenance,omitempty" json:"daily_maintenance"` @@ -466,12 +466,20 @@ type Logging struct { Sentry } -// Concurrency allows endpoints to be limited to a maximum concurrency per repo. +// RPCConcurrency defines Concurrency for RPCs +type RPCConcurrency struct { + Concurrency + // RPC is the name of the RPC to set concurrency limits for + RPC string `toml:"rpc" json:"rpc"` + // MaxPerRepo is the maximum number of concurrent calls for a given repository. This config is used only + // if Adaptive is false. + MaxPerRepo int `toml:"max_per_repo" json:"max_per_repo"` +} + +// Concurrency allowsoperati9ons to be limited to a maximum concurrency per repo. // Requests that come in after the maximum number of concurrent requests are in progress will wait // in a queue that is bounded by MaxQueueSize. type Concurrency struct { - // RPC is the name of the RPC to set concurrency limits for - RPC string `toml:"rpc" json:"rpc"` // Adaptive determines the behavior of the concurrency limit. If set to true, the concurrency limit is dynamic // and starts at InitialLimit, then adjusts within the range [MinLimit, MaxLimit] based on current resource // usage. If set to false, the concurrency limit is static and is set to MaxPerRepo. @@ -482,9 +490,6 @@ type Concurrency struct { MaxLimit int `toml:"max_limit,omitempty" json:"max_limit,omitempty"` // MinLimit is the mini adaptive concurrency limit. MinLimit int `toml:"min_limit,omitempty" json:"min_limit,omitempty"` - // MaxPerRepo is the maximum number of concurrent calls for a given repository. This config is used only - // if Adaptive is false. - MaxPerRepo int `toml:"max_per_repo" json:"max_per_repo"` // MaxQueueSize is the maximum number of requests in the queue waiting to be picked up // after which subsequent requests will return with an error. MaxQueueSize int `toml:"max_queue_size" json:"max_queue_size"` @@ -494,17 +499,17 @@ type Concurrency struct { } // Validate runs validation on all fields and compose all found errors. -func (c Concurrency) Validate() error { +func (r RPCConcurrency) Validate() error { errs := cfgerror.New(). - Append(cfgerror.Comparable(c.MaxPerRepo).GreaterOrEqual(0), "max_per_repo"). - Append(cfgerror.Comparable(c.MaxQueueSize).GreaterOrEqual(0), "max_queue_size"). - Append(cfgerror.Comparable(c.MaxQueueWait.Duration()).GreaterOrEqual(0), "max_queue_wait") + Append(cfgerror.Comparable(r.MaxPerRepo).GreaterOrEqual(0), "max_per_repo"). + Append(cfgerror.Comparable(r.MaxQueueSize).GreaterOrEqual(0), "max_queue_size"). + Append(cfgerror.Comparable(r.MaxQueueWait.Duration()).GreaterOrEqual(0), "max_queue_wait") - if c.Adaptive { + if r.Adaptive { errs = errs. - Append(cfgerror.Comparable(c.MinLimit).GreaterThan(0), "min_limit"). - Append(cfgerror.Comparable(c.MaxLimit).GreaterOrEqual(c.InitialLimit), "max_limit"). - Append(cfgerror.Comparable(c.InitialLimit).GreaterOrEqual(c.MinLimit), "initial_limit") + Append(cfgerror.Comparable(r.MinLimit).GreaterThan(0), "min_limit"). + Append(cfgerror.Comparable(r.MaxLimit).GreaterOrEqual(r.InitialLimit), "max_limit"). + Append(cfgerror.Comparable(r.InitialLimit).GreaterOrEqual(r.MinLimit), "initial_limit") } return errs.AsError() } diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go index de1abd9781..de5a024ae2 100644 --- a/internal/gitaly/config/config_test.go +++ b/internal/gitaly/config/config_test.go @@ -1951,9 +1951,9 @@ func TestPackObjectsLimiting_Validate(t *testing.T) { func TestConcurrency_Validate(t *testing.T) { t.Parallel() - require.NoError(t, Concurrency{MaxPerRepo: 0}.Validate()) - require.NoError(t, Concurrency{MaxPerRepo: 1}.Validate()) - require.NoError(t, Concurrency{MaxPerRepo: 100}.Validate()) + require.NoError(t, RPCConcurrency{MaxPerRepo: 0}.Validate()) + require.NoError(t, RPCConcurrency{MaxPerRepo: 1}.Validate()) + require.NoError(t, RPCConcurrency{MaxPerRepo: 100}.Validate()) require.Equal( t, cfgerror.ValidationErrors{ @@ -1962,12 +1962,12 @@ func TestConcurrency_Validate(t *testing.T) { "max_per_repo", ), }, - Concurrency{MaxPerRepo: -1}.Validate(), + RPCConcurrency{MaxPerRepo: -1}.Validate(), ) - require.NoError(t, Concurrency{Adaptive: true, InitialLimit: 1, MinLimit: 1, MaxLimit: 100}.Validate()) - require.NoError(t, Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 1, MaxLimit: 100}.Validate()) - require.NoError(t, Concurrency{Adaptive: true, InitialLimit: 100, MinLimit: 1, MaxLimit: 100}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 1, MinLimit: 1, MaxLimit: 100}}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 1, MaxLimit: 100}}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 100, MinLimit: 1, MaxLimit: 100}}.Validate()) require.Equal( t, cfgerror.ValidationErrors{ @@ -1976,7 +1976,7 @@ func TestConcurrency_Validate(t *testing.T) { "min_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 0, MinLimit: 0, MaxLimit: 100}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 0, MinLimit: 0, MaxLimit: 100}}.Validate(), ) require.Equal( t, @@ -1986,7 +1986,7 @@ func TestConcurrency_Validate(t *testing.T) { "initial_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: -1, MinLimit: 1, MaxLimit: 100}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: -1, MinLimit: 1, MaxLimit: 100}}.Validate(), ) require.Equal( t, @@ -1996,7 +1996,7 @@ func TestConcurrency_Validate(t *testing.T) { "initial_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 11, MaxLimit: 100}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 11, MaxLimit: 100}}.Validate(), ) require.Equal( t, @@ -2006,7 +2006,7 @@ func TestConcurrency_Validate(t *testing.T) { "max_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: 3}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: 3}}.Validate(), ) require.Equal( t, @@ -2016,7 +2016,7 @@ func TestConcurrency_Validate(t *testing.T) { "min_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 5, MinLimit: -1, MaxLimit: 99}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 5, MinLimit: -1, MaxLimit: 99}}.Validate(), ) require.Equal( t, @@ -2026,12 +2026,12 @@ func TestConcurrency_Validate(t *testing.T) { "max_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: -1}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: -1}}.Validate(), ) - require.NoError(t, Concurrency{MaxQueueSize: 0}.Validate()) - require.NoError(t, Concurrency{MaxQueueSize: 1}.Validate()) - require.NoError(t, Concurrency{MaxQueueSize: 100}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{MaxQueueSize: 0}}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{MaxQueueSize: 1}}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{MaxQueueSize: 100}}.Validate()) require.Equal( t, cfgerror.ValidationErrors{ @@ -2040,10 +2040,10 @@ func TestConcurrency_Validate(t *testing.T) { "max_queue_size", ), }, - Concurrency{MaxQueueSize: -1}.Validate(), + RPCConcurrency{Concurrency: Concurrency{MaxQueueSize: -1}}.Validate(), ) - require.NoError(t, Concurrency{MaxQueueWait: duration.Duration(1)}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{MaxQueueWait: duration.Duration(1)}}.Validate()) require.Equal( t, cfgerror.ValidationErrors{ @@ -2052,7 +2052,7 @@ func TestConcurrency_Validate(t *testing.T) { "max_queue_wait", ), }, - Concurrency{MaxQueueWait: duration.Duration(-time.Minute)}.Validate(), + RPCConcurrency{Concurrency: Concurrency{MaxQueueWait: duration.Duration(-time.Minute)}}.Validate(), ) } diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index d12bc0eadf..04b371c3cf 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -362,7 +362,7 @@ func TestAuthBeforeLimit(t *testing.T) { testhelper.NewFeatureSets(featureflag.UseResizableSemaphoreInConcurrencyLimiter, featureflag.UseResizableSemaphoreLifoStrategy).Run(t, func(t *testing.T, ctx netctx.Context) { cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{ Auth: auth.Config{Token: "abc123"}, - Concurrency: []config.Concurrency{{ + Concurrency: []config.RPCConcurrency{{ RPC: "/gitaly.OperationService/UserCreateTag", MaxPerRepo: 1, }}, diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go index 99eff32a0b..d651798bab 100644 --- a/internal/grpc/middleware/limithandler/middleware_test.go +++ b/internal/grpc/middleware/limithandler/middleware_test.go @@ -38,7 +38,7 @@ func TestWithConcurrencyLimiters(t *testing.T) { t.Parallel() testhelper.NewFeatureSets(featureflag.UseResizableSemaphoreInConcurrencyLimiter, featureflag.UseResizableSemaphoreLifoStrategy).Run(t, func(t *testing.T, ctx context.Context) { cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ { RPC: "/grpc.testing.TestService/UnaryCall", MaxPerRepo: 1, @@ -48,11 +48,13 @@ func TestWithConcurrencyLimiters(t *testing.T) { MaxPerRepo: 99, }, { - RPC: "/grpc.testing.TestService/AnotherUnaryCall", - Adaptive: true, - MinLimit: 5, - InitialLimit: 10, - MaxLimit: 15, + RPC: "/grpc.testing.TestService/AnotherUnaryCall", + Concurrency: config.Concurrency{ + Adaptive: true, + MinLimit: 5, + InitialLimit: 10, + MaxLimit: 15, + }, }, }, } @@ -87,7 +89,7 @@ func TestUnaryLimitHandler(t *testing.T) { } cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ {RPC: "/grpc.testing.TestService/UnaryCall", MaxPerRepo: 2}, }, } @@ -167,8 +169,14 @@ func testUnaryLimitHandlerQueueStrategy(b *testing.B, ctx context.Context, numRe } cfg := config.Cfg{ - Concurrency: []config.Concurrency{ - {RPC: "/grpc.testing.TestService/UnaryCall", MaxPerRepo: 50, MaxQueueSize: 100}, + Concurrency: []config.RPCConcurrency{ + { + RPC: "/grpc.testing.TestService/UnaryCall", + MaxPerRepo: 50, + Concurrency: config.Concurrency{ + MaxQueueSize: 100, + }, + }, }, } @@ -214,25 +222,27 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { testhelper.NewFeatureSets(featureflag.UseResizableSemaphoreInConcurrencyLimiter, featureflag.UseResizableSemaphoreLifoStrategy).Run(t, func(t *testing.T, ctx context.Context) { t.Run("simple timeout", func(t *testing.T) { cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ { - RPC: "/grpc.testing.TestService/UnaryCall", - MaxPerRepo: 1, - MaxQueueSize: 1, - // This test setups two requests: - // - The first one is eligible. It enters the handler and blocks the queue. - // - The second request is blocked until timeout. - // Both of them shares this timeout. Internally, the limiter creates a context - // deadline to reject timed out requests. If it's set too low, there's a tiny - // possibility that the context reaches the deadline when the limiter checks the - // request. Thus, setting a reasonable timeout here and adding some retry - // attempts below make the test stable. - // Another approach is to implement a hooking mechanism that allows us to - // override context deadline setup. However, that approach exposes the internal - // implementation of the limiter. It also adds unnecessarily logics. - // Congiuring the timeout is more straight-forward and close to the expected - // behavior. - MaxQueueWait: duration.Duration(100 * time.Millisecond), + RPC: "/grpc.testing.TestService/UnaryCall", + MaxPerRepo: 1, + Concurrency: config.Concurrency{ + MaxQueueSize: 1, + // This test setups two requests: + // - The first one is eligible. It enters the handler and blocks the queue. + // - The second request is blocked until timeout. + // Both of them shares this timeout. Internally, the limiter creates a context + // deadline to reject timed out requests. If it's set too low, there's a tiny + // possibility that the context reaches the deadline when the limiter checks the + // request. Thus, setting a reasonable timeout here and adding some retry + // attempts below make the test stable. + // Another approach is to implement a hooking mechanism that allows us to + // override context deadline setup. However, that approach exposes the internal + // implementation of the limiter. It also adds unnecessarily logics. + // Congiuring the timeout is more straight-forward and close to the expected + // behavior. + MaxQueueWait: duration.Duration(100 * time.Millisecond), + }, }, }, } @@ -288,7 +298,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { t.Run("unlimited queueing", func(t *testing.T) { cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ // Due to a bug queueing wait times used to leak into subsequent // concurrency configuration in case they didn't explicitly set up // the queueing wait time. We thus set up two limits here: one dummy @@ -296,9 +306,11 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { // that has no wait limit. We of course expect that the actual // config should not have any maximum queueing time. { - RPC: "dummy", - MaxPerRepo: 1, - MaxQueueWait: duration.Duration(1 * time.Nanosecond), + RPC: "dummy", + MaxPerRepo: 1, + Concurrency: config.Concurrency{ + MaxQueueWait: duration.Duration(1 * time.Nanosecond), + }, }, { RPC: "/grpc.testing.TestService/UnaryCall", @@ -564,11 +576,13 @@ func TestStreamLimitHandler(t *testing.T) { maxQueueSize := 1 cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ { - RPC: tc.fullname, - MaxPerRepo: tc.maxConcurrency, - MaxQueueSize: maxQueueSize, + RPC: tc.fullname, + MaxPerRepo: tc.maxConcurrency, + Concurrency: config.Concurrency{ + MaxQueueSize: maxQueueSize, + }, }, }, } @@ -619,8 +633,14 @@ func TestStreamLimitHandler_error(t *testing.T) { s.blockCh = make(chan struct{}) cfg := config.Cfg{ - Concurrency: []config.Concurrency{ - {RPC: "/grpc.testing.TestService/FullDuplexCall", MaxPerRepo: 1, MaxQueueSize: 1}, + Concurrency: []config.RPCConcurrency{ + { + RPC: "/grpc.testing.TestService/FullDuplexCall", + MaxPerRepo: 1, + Concurrency: config.Concurrency{ + MaxQueueSize: 1, + }, + }, }, } @@ -755,8 +775,14 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { methodName := "/grpc.testing.TestService/UnaryCall" cfg := config.Cfg{ - Concurrency: []config.Concurrency{ - {RPC: methodName, MaxPerRepo: 1, MaxQueueSize: 1}, + Concurrency: []config.RPCConcurrency{ + { + RPC: methodName, + MaxPerRepo: 1, + Concurrency: config.Concurrency{ + MaxQueueSize: 1, + }, + }, }, } -- GitLab From 1c03f03b3afe92e0597855201e66490548c11728 Mon Sep 17 00:00:00 2001 From: John Cai Date: Wed, 17 Jul 2024 20:46:25 -0400 Subject: [PATCH 06/10] testhelper: Allow passing in InProgressTracker Allow passing in an InProgressTracker to the test server --- internal/testhelper/testserver/gitaly.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 886f3cd5f9..ab95e8f35b 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -332,6 +332,10 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * gsd.procReceiveRegistry = hook.NewProcReceiveRegistry() } + if gsd.inProgressTracker == nil { + gsd.inProgressTracker = service.NewInProgressTracker() + } + var partitionManager *storagemgr.PartitionManager if testhelper.IsWALEnabled() { dbMgr, err := keyvalue.NewDBManager( @@ -439,6 +443,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * BackupLocator: gsd.backupLocator, BundleURISink: gsd.bundleURISink, ProcReceiveRegistry: gsd.procReceiveRegistry, + InProgressTracker: gsd.inProgressTracker, } } -- GitLab From c942eac6da8fb0a718eadbe0e4fd6d72cca8b6e7 Mon Sep 17 00:00:00 2001 From: John Cai Date: Wed, 14 Aug 2024 13:20:33 -0400 Subject: [PATCH 07/10] testserver: Add ability to pass in BundleGenerationManager --- internal/testhelper/testserver/gitaly.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index ab95e8f35b..88d5a47635 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -291,6 +291,7 @@ type gitalyServerDeps struct { transactionRegistry *storagemgr.TransactionRegistry procReceiveRegistry *hook.ProcReceiveRegistry inProgressTracker service.InProgressTracker + bundleGenerationMgr *bundleuri.BundleGenerationManager } func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies { @@ -444,6 +445,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * BundleURISink: gsd.bundleURISink, ProcReceiveRegistry: gsd.procReceiveRegistry, InProgressTracker: gsd.inProgressTracker, + BundleGenerationMgr: gsd.bundleGenerationMgr, } } @@ -581,6 +583,14 @@ func WithInProgressTracker(tracker service.InProgressTracker) GitalyServerOpt { } } +// WithBundleGenerationManager sets the bundleuri.Sink that will be used for Gitaly services +func WithBundleGenerationManager(mgr *bundleuri.BundleGenerationManager) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.bundleGenerationMgr = mgr + return deps + } +} + // WithSigningKey sets the signing key path that will be used for Gitaly // services. func WithSigningKey(signingKey string) GitalyServerOpt { -- GitLab From 61cabab3078f2120d400f1037ece6ba42be1ed68 Mon Sep 17 00:00:00 2001 From: John Cai Date: Tue, 13 Aug 2024 15:56:49 -0400 Subject: [PATCH 08/10] service: Inject BundleGenerationManager into smarthttp service --- internal/cli/gitaly/serve.go | 37 ++++++++++++++++++++- internal/gitaly/service/dependencies.go | 6 ++++ internal/gitaly/service/smarthttp/server.go | 10 +++--- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index d94704aac6..7c3ade4cf3 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -542,11 +542,45 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { } var bundleURISink *bundleuri.Sink + var bundleGenerationMgr *bundleuri.BundleGenerationManager if cfg.BundleURI.GoCloudURL != "" { - bundleURISink, err = bundleuri.NewSink(ctx, cfg.BundleURI.GoCloudURL) + bundleURISink, err = bundleuri.NewSink( + ctx, + cfg.BundleURI.GoCloudURL, + ) if err != nil { return fmt.Errorf("create bundle-URI sink: %w", err) } + + if cfg.BundleURI.Autogeneration { + var bundleGenerationLimit *limiter.AdaptiveLimit + if cfg.BundleURI.Concurrency.Adaptive { + bundleGenerationLimit = limiter.NewAdaptiveLimit("bundleGeneration", limiter.AdaptiveSetting{ + Initial: cfg.BundleURI.Concurrency.InitialLimit, + Max: cfg.BundleURI.Concurrency.MaxLimit, + Min: cfg.BundleURI.Concurrency.MinLimit, + BackoffFactor: limiter.DefaultBackoffFactor, + }) + } else { + bundleGenerationLimit = limiter.NewAdaptiveLimit("bundleGeneration", limiter.AdaptiveSetting{Initial: cfg.BundleURI.Concurrency.MaxConcurrency}) + } + + bundleGenerationMonitor := limiter.NewPackObjectsConcurrencyMonitor( + cfg.Prometheus.GRPCLatencyBuckets, + ) + bundleGenerationLimiter := limiter.NewConcurrencyLimiter( + bundleGenerationLimit, + cfg.BundleURI.Concurrency.MaxQueueSize, + cfg.BundleURI.Concurrency.MaxQueueWait.Duration(), + bundleGenerationMonitor, + ) + prometheus.MustRegister(bundleGenerationMonitor) + + bundleGenerationMgr = bundleuri.NewBundleGenerationManager( + bundleURISink, + bundleGenerationLimiter, + ) + } } for _, c := range []starter.Config{ @@ -592,6 +626,7 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { BackupSink: backupSink, BackupLocator: backupLocator, BundleURISink: bundleURISink, + BundleGenerationMgr: bundleGenerationMgr, }) b.RegisterStarter(starter.New(c, srv, logger)) } diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index cbdbb40372..c50cb1b774 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -49,6 +49,7 @@ type Dependencies struct { BundleURISink *bundleuri.Sink ProcReceiveRegistry *gitalyhook.ProcReceiveRegistry InProgressTracker InProgressTracker + BundleGenerationMgr *bundleuri.BundleGenerationManager } // GetLogger returns the logger. @@ -170,3 +171,8 @@ func (dc *Dependencies) GetProcReceiveRegistry() *gitalyhook.ProcReceiveRegistry func (dc *Dependencies) GetInProgressTracker() InProgressTracker { return dc.InProgressTracker } + +// GetBundleGenerationManager returns the ProcReceiveRegistry. +func (dc *Dependencies) GetBundleGenerationManager() *bundleuri.BundleGenerationManager { + return dc.BundleGenerationMgr +} diff --git a/internal/gitaly/service/smarthttp/server.go b/internal/gitaly/service/smarthttp/server.go index 528deeb82e..3f67ead09d 100644 --- a/internal/gitaly/service/smarthttp/server.go +++ b/internal/gitaly/service/smarthttp/server.go @@ -34,6 +34,7 @@ type server struct { backupLocator backup.Locator backupSink backup.Sink bundleURISink *bundleuri.Sink + bundleGenerationMgr *bundleuri.BundleGenerationManager } // NewServer creates a new instance of a grpc SmartHTTPServer @@ -52,10 +53,11 @@ func NewServer(deps *service.Dependencies, serverOpts ...ServerOpt) gitalypb.Sma prometheus.CounterOpts{}, []string{"git_negotiation_feature"}, ), - infoRefCache: newInfoRefCache(deps.GetLogger(), deps.GetDiskCache()), - backupLocator: deps.GetBackupLocator(), - backupSink: deps.GetBackupSink(), - bundleURISink: deps.GetBundleURISink(), + infoRefCache: newInfoRefCache(deps.GetLogger(), deps.GetDiskCache()), + backupLocator: deps.GetBackupLocator(), + backupSink: deps.GetBackupSink(), + bundleURISink: deps.GetBundleURISink(), + bundleGenerationMgr: deps.GetBundleGenerationManager(), } for _, serverOpt := range serverOpts { -- GitLab From cbd5dd0051d6bb0575f62a67b1bb56cf5d89a7d2 Mon Sep 17 00:00:00 2001 From: John Cai Date: Wed, 14 Aug 2024 14:14:45 -0400 Subject: [PATCH 09/10] config: Add Bundle Generation Config Add a config for auto bundle generation --- internal/gitaly/config/config.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 05637715d8..4164476a5c 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -466,6 +466,12 @@ type Logging struct { Sentry } +// BundleGenerationConcurrency defines concurrency for bundle generation. +type BundleGenerationConcurrency struct { + Concurrency + MaxConcurrency int `toml:"max_concurrency" json:"max_concurrency"` +} + // RPCConcurrency defines Concurrency for RPCs type RPCConcurrency struct { Concurrency @@ -623,11 +629,19 @@ func (bc BackupConfig) Validate() error { return errs.AsError() } +// BundleAutogeneration configures the setting for autogeneration of bundles for +// bundle-uri +type BundleAutogeneration struct { + Enabled bool `toml:"enabled,omitempty" json:"enabled,omitempty"` + Concurrency BundleGenerationConcurrency `toml:"concurrency,omitempty" json:"concurrency,omitempty"` +} + // BundleURIConfig configures use of Bundle-URI type BundleURIConfig struct { // GoCloudURL is the blob storage GoCloud URL that will be used to store // Git bundles for Bundle-URI use. - GoCloudURL string `toml:"go_cloud_url,omitempty" json:"go_cloud_url,omitempty"` + GoCloudURL string `toml:"go_cloud_url,omitempty" json:"go_cloud_url,omitempty"` + AutoGeneration BundleAutogeneration `toml:"auto_generation,omitempty" json:"auto_generation,omitempty"` } // Validate runs validation on all fields and returns any errors found. -- GitLab From a1da4696e33551a0fb60cd27e1955fda4d5ccda8 Mon Sep 17 00:00:00 2001 From: John Cai Date: Wed, 17 Jul 2024 20:48:26 -0400 Subject: [PATCH 10/10] smarthttp: Generate bundle in background clone, check if 5 or more clones are already happening for this repository. If so, and a bundle does not exist, then we want to generate one because this is likely a busy repository. --- internal/cli/gitaly/serve.go | 17 +- .../featureflag/ff_autogenerate_bundles.go | 9 + internal/gitaly/service/smarthttp/server.go | 6 + .../gitaly/service/smarthttp/upload_pack.go | 82 +++++++ .../service/smarthttp/upload_pack_test.go | 207 ++++++++++++++++++ 5 files changed, 313 insertions(+), 8 deletions(-) create mode 100644 internal/featureflag/ff_autogenerate_bundles.go diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 7c3ade4cf3..bd45fc1b39 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -552,17 +552,17 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { return fmt.Errorf("create bundle-URI sink: %w", err) } - if cfg.BundleURI.Autogeneration { + if cfg.BundleURI.AutoGeneration.Enabled { var bundleGenerationLimit *limiter.AdaptiveLimit - if cfg.BundleURI.Concurrency.Adaptive { + if cfg.BundleURI.AutoGeneration.Concurrency.Adaptive { bundleGenerationLimit = limiter.NewAdaptiveLimit("bundleGeneration", limiter.AdaptiveSetting{ - Initial: cfg.BundleURI.Concurrency.InitialLimit, - Max: cfg.BundleURI.Concurrency.MaxLimit, - Min: cfg.BundleURI.Concurrency.MinLimit, + Initial: cfg.BundleURI.AutoGeneration.Concurrency.InitialLimit, + Max: cfg.BundleURI.AutoGeneration.Concurrency.MaxLimit, + Min: cfg.BundleURI.AutoGeneration.Concurrency.MinLimit, BackoffFactor: limiter.DefaultBackoffFactor, }) } else { - bundleGenerationLimit = limiter.NewAdaptiveLimit("bundleGeneration", limiter.AdaptiveSetting{Initial: cfg.BundleURI.Concurrency.MaxConcurrency}) + bundleGenerationLimit = limiter.NewAdaptiveLimit("bundleGeneration", limiter.AdaptiveSetting{Initial: cfg.BundleURI.AutoGeneration.Concurrency.MaxConcurrency}) } bundleGenerationMonitor := limiter.NewPackObjectsConcurrencyMonitor( @@ -570,8 +570,8 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { ) bundleGenerationLimiter := limiter.NewConcurrencyLimiter( bundleGenerationLimit, - cfg.BundleURI.Concurrency.MaxQueueSize, - cfg.BundleURI.Concurrency.MaxQueueWait.Duration(), + cfg.BundleURI.AutoGeneration.Concurrency.MaxQueueSize, + cfg.BundleURI.AutoGeneration.Concurrency.MaxQueueWait.Duration(), bundleGenerationMonitor, ) prometheus.MustRegister(bundleGenerationMonitor) @@ -627,6 +627,7 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { BackupLocator: backupLocator, BundleURISink: bundleURISink, BundleGenerationMgr: bundleGenerationMgr, + InProgressTracker: service.NewInProgressTracker(), }) b.RegisterStarter(starter.New(c, srv, logger)) } diff --git a/internal/featureflag/ff_autogenerate_bundles.go b/internal/featureflag/ff_autogenerate_bundles.go new file mode 100644 index 0000000000..61692778f3 --- /dev/null +++ b/internal/featureflag/ff_autogenerate_bundles.go @@ -0,0 +1,9 @@ +package featureflag + +// AutogenerateBundlesForBundleURI enables the use of git's bundle URI feature +var AutogenerateBundlesForBundleURI = NewFeatureFlag( + "autogenerate_bundles_for_bundleuri", + "v17.3.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/6204", + false, +) diff --git a/internal/gitaly/service/smarthttp/server.go b/internal/gitaly/service/smarthttp/server.go index 3f67ead09d..87669e39ec 100644 --- a/internal/gitaly/service/smarthttp/server.go +++ b/internal/gitaly/service/smarthttp/server.go @@ -35,6 +35,9 @@ type server struct { backupSink backup.Sink bundleURISink *bundleuri.Sink bundleGenerationMgr *bundleuri.BundleGenerationManager + inProgressTracker service.InProgressTracker + generateBundles bool + partitionMgr *storagemgr.PartitionManager } // NewServer creates a new instance of a grpc SmartHTTPServer @@ -58,6 +61,9 @@ func NewServer(deps *service.Dependencies, serverOpts ...ServerOpt) gitalypb.Sma backupSink: deps.GetBackupSink(), bundleURISink: deps.GetBundleURISink(), bundleGenerationMgr: deps.GetBundleGenerationManager(), + inProgressTracker: deps.GetInProgressTracker(), + generateBundles: deps.GetCfg().BundleURI.AutoGeneration.Enabled, + partitionMgr: deps.GetPartitionManager(), } for _, serverOpt := range serverOpts { diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index 6486436f66..7919a3c05e 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -6,16 +6,27 @@ import ( "errors" "fmt" "io" + "time" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri" "gitlab.com/gitlab-org/gitaly/v16/internal/command" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) +const ( + concurrentUploadPackThreshold = 5 + bundleGenerationTimeout = 24 * time.Hour +) + func (s *server) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) { repoPath, gitConfig, err := s.validateUploadPackRequest(ctx, req) if err != nil { @@ -116,8 +127,75 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack gitConfig = append(gitConfig, bundleuri.CapabilitiesGitConfig(ctx)...) + originalRepo := req.GetRepository() + + storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) { + originalRepo = tx.OriginalRepository(req.GetRepository()) + }) + + key := originalRepo.GetStorageName() + ":" + originalRepo.GetRelativePath() + uploadPackConfig, err := bundleuri.UploadPackGitConfig(ctx, s.bundleURISink, req.GetRepository()) if err != nil { + if errors.Is(err, bundleuri.ErrBundleNotFound) && + featureflag.AutogenerateBundlesForBundleURI.IsEnabled(ctx) && + s.generateBundles && + s.inProgressTracker.GetInProgress(key) > concurrentUploadPackThreshold { + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), bundleGenerationTimeout) + defer cancel() + + if s.partitionMgr != nil { + tx, err := s.partitionMgr.Begin( + ctx, + originalRepo.GetStorageName(), + 0, + storagemgr.TransactionOptions{ + ReadOnly: true, + RelativePath: originalRepo.GetRelativePath(), + }, + ) + if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("failed starting transaction") + return + } + + if err := s.bundleGenerationMgr.Generate(ctx, localrepo.New( + s.logger, + s.locator, + s.gitCmdFactory, + s.catfileCache, + originalRepo)); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("generate bundle") + + if err := tx.Rollback(); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("failed rolling back transaction") + return + } + + return + } + + if err := tx.Commit(ctx); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("committing transaction") + } + } else { + if err := s.bundleGenerationMgr.Generate(ctx, localrepo.New( + s.logger, + s.locator, + s.gitCmdFactory, + s.catfileCache, + originalRepo, + )); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("generate bundle") + return + } + } + }() + } else if !errors.Is(err, bundleuri.ErrSinkMissing) { + s.logger.WithError(err).ErrorContext(ctx, "failed configuring bundle-uri") + } } else { gitConfig = append(gitConfig, uploadPackConfig...) } @@ -130,6 +208,9 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack git.WithPackObjectsHookEnv(req.GetRepository(), "http"), } + s.inProgressTracker.IncrementInProgress(key) + defer s.inProgressTracker.DecrementInProgress(key) + cmd, err := s.gitCmdFactory.New(ctx, req.GetRepository(), git.Command{ Name: "upload-pack", Flags: []git.Option{git.Flag{Name: "--stateless-rpc"}}, @@ -158,5 +239,6 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack } s.logger.WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).InfoContext(ctx, "request details") + return nil, nil } diff --git a/internal/gitaly/service/smarthttp/upload_pack_test.go b/internal/gitaly/service/smarthttp/upload_pack_test.go index a6359d08d0..7f2b2f3128 100644 --- a/internal/gitaly/service/smarthttp/upload_pack_test.go +++ b/internal/gitaly/service/smarthttp/upload_pack_test.go @@ -21,12 +21,15 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -277,6 +280,7 @@ func testServerPostUploadPackWithSidechannelUsesPackObjectsHook(t *testing.T, ct func testServerPostUploadPackUsesPackObjectsHook(t *testing.T, ctx context.Context, makeRequest requestMaker, opts ...testcfg.Option) { cfg := testcfg.Build(t, append(opts, testcfg.WithPackObjectsCacheEnabled())...) + cfg.BinDir = testhelper.TempDir(t) outputPath := filepath.Join(cfg.BinDir, "output") //nolint:gitaly-linters @@ -379,6 +383,7 @@ func TestServer_PostUploadPackWithBundleURI(t *testing.T) { ctx := testhelper.Context(t) ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.BundleURI, true) + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.AutogenerateBundlesForBundleURI, false) tempDir := testhelper.TempDir(t) keyFile, err := os.Create(filepath.Join(tempDir, "secret.key")) @@ -499,6 +504,208 @@ func TestServer_PostUploadPackWithBundleURI(t *testing.T) { } } +type testInProgressTracker struct { + thresholdReached bool +} + +func (t *testInProgressTracker) GetInProgress(key string) uint { + if t.thresholdReached { + return concurrentUploadPackThreshold + 1 + } + + return 0 +} + +func (t *testInProgressTracker) IncrementInProgress(key string) {} + +func (t *testInProgressTracker) DecrementInProgress(key string) {} + +type blockingLimiter struct { + ch chan struct{} + lastBundleGeneratedPath string +} + +func (l *blockingLimiter) Limit(ctx context.Context, lockKey string, f limiter.LimitedFunc) (interface{}, error) { + res, err := f() + l.lastBundleGeneratedPath = res.(string) + l.ch <- struct{}{} + return res, err +} + +func TestServer_PostUploadPackAutogenerateBundles(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.AutogenerateBundlesForBundleURI, true) + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.BundleURI, true) + + testCases := []struct { + desc string + sinkDir string + setup func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *testInProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) + expectBundleGenerated bool + verifyBundle func(*testing.T, config.Cfg, string, git.ObjectID) + }{ + { + desc: "autogeneration successful", + setup: func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *testInProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + + tracker.thresholdReached = true + }, + expectBundleGenerated: true, + verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) { + tempDir := t.TempDir() + objectFormat := gittest.DefaultObjectHash.Format + gittest.Exec(t, cfg, "init", "--object-format="+objectFormat, tempDir) + gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath) + // A new bundle is expected to be created containing the new commit + gittest.RequireObjectExists(t, cfg, tempDir, commit) + }, + }, + { + desc: "bundle already exists", + setup: func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *testInProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + tracker.thresholdReached = true + + repo := localrepo.NewTestRepo(t, cfg, repoProto) + require.NoError(t, sink.Generate(ctx, repo)) + }, + expectBundleGenerated: false, + verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) { + tempDir := t.TempDir() + objectFormat := gittest.DefaultObjectHash.Format + gittest.Exec(t, cfg, "init", "--object-format="+objectFormat, tempDir) + gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath) + // No new bundle is expected to be created since one already existed. + gittest.RequireObjectNotExists(t, cfg, tempDir, commit) + }, + }, + { + desc: "no concurrent upload packs in flight", + setup: func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *testInProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) { + tracker.thresholdReached = false + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + }, + expectBundleGenerated: false, + verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) { + tempDir := t.TempDir() + gittest.Exec(t, cfg, "init", tempDir) + gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath) + // No new bundle is expected to have been created because there are no + // inflight upload pack calls. + gittest.RequireObjectNotExists(t, cfg, tempDir, commit) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + tracker := &testInProgressTracker{} + + sinkDir := t.TempDir() + sink, err := bundleuri.NewSink(ctx, "file://"+sinkDir) + require.NoError(t, err) + + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + + cfg.BundleURI.AutoGeneration.Enabled = true + + gitCmdFactory := gittest.NewCommandFactory(t, cfg) + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + blockingCh := make(chan struct{}) + limiter := &blockingLimiter{ch: blockingCh} + + server := startSmartHTTPServerWithOptions(t, cfg, nil, []testserver.GitalyServerOpt{ + testserver.WithBundleGenerationManager(bundleuri.NewBundleGenerationManager(sink, limiter)), + testserver.WithBundleURISink(sink), + testserver.WithLogger(logger), + testserver.WithInProgressTracker(tracker), + testserver.WithTransactionRegistry(storagemgr.NewTransactionRegistry()), + testserver.WithGitCommandFactory(gitCmdFactory), + }) + + cfg.SocketPath = server.Address() + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) + oldCommit := gittest.WriteCommit(t, cfg, repoPath) + newCommit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("master"), gittest.WithParents(oldCommit)) + + if tc.setup != nil { + tc.setup(t, ctx, cfg, sink, tracker, repoProto, repoPath) + } + + commitInUpdatedBundle := gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "CHANGELOG", Content: "nothing changed"}), + gittest.WithBranch("main")) + + // UploadPack request is a "want" packet line followed by a packet flush, then many "have" packets followed by a packet flush. + // This is explained a bit in https://git-scm.com/book/en/v2/Git-Internals-Transfer-Protocols#_downloading_data + requestBuffer := &bytes.Buffer{} + gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("want %s %s\n", newCommit, clientCapabilities)) + gittest.WritePktlineFlush(t, requestBuffer) + gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("have %s\n", oldCommit)) + gittest.WritePktlineFlush(t, requestBuffer) + + req := &gitalypb.PostUploadPackWithSidechannelRequest{Repository: repoProto} + responseBuffer, err := makePostUploadPackWithSidechannelRequest(t, ctx, cfg.SocketPath, cfg.Auth.Token, req, requestBuffer) + require.NoError(t, err) + + pack, _, _ := extractPackDataFromResponse(t, responseBuffer) + require.NotEmpty(t, pack, "Expected to find a pack file in response, found none") + + if tc.expectBundleGenerated { + <-blockingCh + tc.verifyBundle(t, cfg, filepath.Join(sinkDir, limiter.lastBundleGeneratedPath), commitInUpdatedBundle) + } else { + require.Empty(t, limiter.lastBundleGeneratedPath) + } + }) + } +} + func testServerPostUploadPackWithSideChannelValidation(t *testing.T, ctx context.Context, makeRequest requestMaker, opts ...testcfg.Option) { cfg := testcfg.Build(t, opts...) serverSocketPath := runSmartHTTPServer(t, cfg) -- GitLab