diff --git a/internal/bundleuri/git_config.go b/internal/bundleuri/git_config.go index 69cc21b49b9eb82aedfc84d84da7a3b8145b6024..f18fd4bda176fdff63e758ab623101b321e4ddb9 100644 --- a/internal/bundleuri/git_config.go +++ b/internal/bundleuri/git_config.go @@ -11,6 +11,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) +// ErrSinkMissing indicates a sink is missing +var ErrSinkMissing = errors.New("bundle-URI sink missing") + // CapabilitiesGitConfig returns a slice of git.ConfigPairs that can be injected // into the Git config to make it aware the bundle-URI capabilities are // supported. @@ -42,7 +45,7 @@ func UploadPackGitConfig( } if sink == nil { - return CapabilitiesGitConfig(ctx), errors.New("bundle-URI sink missing") + return CapabilitiesGitConfig(ctx), ErrSinkMissing } uri, err := sink.SignedURL(ctx, repo) diff --git a/internal/bundleuri/git_config_test.go b/internal/bundleuri/git_config_test.go index 0a5da2bbc24c9de079b4d4005cfda31e6871dc4a..19c79bf26f9e94774c3d7f39d10983b56c8f866d 100644 --- a/internal/bundleuri/git_config_test.go +++ b/internal/bundleuri/git_config_test.go @@ -2,7 +2,6 @@ package bundleuri import ( "context" - "errors" "fmt" "os" "path/filepath" @@ -14,7 +13,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" ) @@ -60,7 +58,7 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { return setupData{} }, expectedConfig: nil, - expectedErr: errors.New("bundle-URI sink missing"), + expectedErr: ErrSinkMissing, }, { desc: "no bundle found", @@ -74,7 +72,7 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { } }, expectedConfig: nil, - expectedErr: structerr.NewNotFound("no bundle available"), + expectedErr: ErrBundleNotFound, }, { desc: "not signed", diff --git a/internal/bundleuri/manager.go b/internal/bundleuri/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..77c0de6132ab93d854c484c176b65c364a1bf766 --- /dev/null +++ b/internal/bundleuri/manager.go @@ -0,0 +1,105 @@ +package bundleuri + +import ( + "context" + "errors" + "fmt" + "sync" + + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" +) + +const globalLimitingKey = "bundle-uri-generation" + +var ( + // ErrBundleGenerationInProgress indicates there is a bundle already + // being generated for this repository. + ErrBundleGenerationInProgress = errors.New("a bundle is already beng generated for this repository") + // ErrBundleGenerationConcurrencyReached indicates the global concurrency + // limit has been reached. + ErrBundleGenerationConcurrencyReached = errors.New("bundle generation concurrency reached") + // ErrTerminatedEarly indicates the context was cancelled, or the + // goroutine handling the bundle generation was stopped. + ErrTerminatedEarly = errors.New("bundle generatio terminated early") +) + +// BundleGenerationManager manages bundle generation. It handles requests to +// generate bundles for a repository, and enforces concurrency by limiting one +// bundle generation per repo at any given time as well as a global limit across +// all repositories. +type BundleGenerationManager struct { + limiter limiter.Limiter + sink *Sink + bundleGenerationInProgress sync.Map + stopChan chan struct{} + wg sync.WaitGroup +} + +// NewBundleGenerationManager creates a new BundleGenerationManager +func NewBundleGenerationManager(sink *Sink, concurrencyLimiter limiter.Limiter) *BundleGenerationManager { + return &BundleGenerationManager{ + limiter: concurrencyLimiter, + sink: sink, + stopChan: make(chan struct{}), + } +} + +// StopAll blocks until all of the goroutines that are generating bundles are finished. +func (b *BundleGenerationManager) StopAll() { + close(b.stopChan) + b.wg.Wait() +} + +// generateOneAtATime generates a bundle for a repository, but only if there is not already +// one in flight. +func (b *BundleGenerationManager) generateOneAtATime(ctx context.Context, repo *localrepo.Repo) (string, error) { + b.wg.Add(1) + defer b.wg.Done() + + bundlePath := b.sink.relativePath(repo, defaultBundle) + + _, loaded := b.bundleGenerationInProgress.LoadOrStore( + bundlePath, + struct{}{}, + ) + if loaded { + return "", ErrBundleGenerationInProgress + } + defer b.bundleGenerationInProgress.Delete(bundlePath) + + errChan := make(chan error) + + go func() { + errChan <- b.sink.Generate(ctx, repo) + }() + + select { + case err := <-errChan: + return bundlePath, err + case <-b.stopChan: + return "", ErrTerminatedEarly + } +} + +func (b *BundleGenerationManager) generateOneAtATimeLimitFunc(ctx context.Context, repo *localrepo.Repo) limiter.LimitedFunc { + return func() (interface{}, error) { + var bundlePath string + var err error + if bundlePath, err = b.generateOneAtATime(ctx, repo); err != nil { + return nil, err + } + + return bundlePath, nil + } +} + +// Generate generates a bundle for a given repository while enforcing +// concurrency limits +func (b *BundleGenerationManager) Generate(ctx context.Context, repo *localrepo.Repo) error { + if _, err := b.limiter.Limit(ctx, globalLimitingKey, b.generateOneAtATimeLimitFunc(ctx, repo)); err != nil { + return fmt.Errorf("error generating bundle: %w", err) + } + + return nil +} diff --git a/internal/bundleuri/manager_test.go b/internal/bundleuri/manager_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b358b5c110c868e4e0fa1505a1a6f6b0b0543748 --- /dev/null +++ b/internal/bundleuri/manager_test.go @@ -0,0 +1,108 @@ +package bundleuri + +import ( + "context" + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" +) + +type passthroughLimiter struct{} + +func (p *passthroughLimiter) Limit(ctx context.Context, lockKey string, f limiter.LimitedFunc) (interface{}, error) { + return f() +} + +func TestBundleGenerationManager_generateOneAtATime(t *testing.T) { + t.Parallel() + + cfg := testcfg.Build(t) + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + setup func(t *testing.T, repoPath string) + expectedErr error + }{ + { + desc: "creates bundle successfully", + setup: func(t *testing.T, repoPath string) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + }, + }, + { + desc: "fails with missing HEAD", + setup: func(t *testing.T, repoPath string) {}, + expectedErr: structerr.NewFailedPrecondition("ref %q does not exist: %w", "refs/heads/main", fmt.Errorf("create bundle: %w", localrepo.ErrEmptyBundle)), + }, + } { + tc := tc + + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + tc.setup(t, repoPath) + + sinkDir := t.TempDir() + sink, err := NewSink( + ctx, + "file://"+sinkDir, + ) + require.NoError(t, err) + + manager := NewBundleGenerationManager(sink, &passthroughLimiter{}) + + _, err = manager.generateOneAtATime(ctx, repo) + + if tc.expectedErr == nil { + require.FileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) + } else { + require.Equal(t, tc.expectedErr, err) + } + }) + } +} + +func TestSink_generateOneAtATime_inProgress(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + cfg := testcfg.Build(t) + + sinkDir := t.TempDir() + sink, err := NewSink( + ctx, + "file://"+sinkDir, + ) + require.NoError(t, err) + + repoProto, _ := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + manager := NewBundleGenerationManager(sink, &passthroughLimiter{}) + + // pretend like there is already another bundle generation happening for + // this repo. + bundlePath := sink.relativePath(repo, defaultBundle) + manager.bundleGenerationInProgress.Store(bundlePath, struct{}{}) + + _, err = manager.generateOneAtATime(testhelper.Context(t), repo) + require.Equal(t, ErrBundleGenerationInProgress, err) +} diff --git a/internal/bundleuri/sink.go b/internal/bundleuri/sink.go index a2d1ac8d0047cdb753b39232f1abbe58934b288b..30011f63d16334422cc0db7b803b9950a24b0d0e 100644 --- a/internal/bundleuri/sink.go +++ b/internal/bundleuri/sink.go @@ -29,6 +29,9 @@ const ( defaultExpiry = 10 * time.Minute ) +// ErrBundleNotFound indicates that no bundle could be found for a given repository. +var ErrBundleNotFound = errors.New("no bundle found") + // Sink is a wrapper around the storage bucket used for accessing/writing // bundleuri bundles. type Sink struct { @@ -132,9 +135,9 @@ func (s Sink) SignedURL(ctx context.Context, repo storage.Repository) (string, e if exists, err := s.bucket.Exists(ctx, relativePath); !exists { if err == nil { - return "", structerr.NewNotFound("no bundle available") + return "", ErrBundleNotFound } - return "", structerr.NewNotFound("no bundle available: %w", err) + return "", fmt.Errorf("checking bundle existence: %w", err) } uri, err := s.bucket.SignedURL(ctx, relativePath, &blob.SignedURLOptions{ diff --git a/internal/bundleuri/sink_test.go b/internal/bundleuri/sink_test.go index 3454186121b215e3cb6a279470a31bc587b347ae..0b1079a37dc8495e3a44321dd60fd6b99d6f52a9 100644 --- a/internal/bundleuri/sink_test.go +++ b/internal/bundleuri/sink_test.go @@ -105,7 +105,7 @@ func TestSink_SignedURL(t *testing.T) { { desc: "fails with missing bundle", setup: func(t *testing.T, sinkDir string, sink *Sink) {}, - expectedErr: structerr.NewNotFound("no bundle available"), + expectedErr: ErrBundleNotFound, }, } { tc := tc diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index d94704aac6a91662c5e09f4ca9f7d9817434c416..bd45fc1b397111d5ebae0955725d3c00295a5eb4 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -542,11 +542,45 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { } var bundleURISink *bundleuri.Sink + var bundleGenerationMgr *bundleuri.BundleGenerationManager if cfg.BundleURI.GoCloudURL != "" { - bundleURISink, err = bundleuri.NewSink(ctx, cfg.BundleURI.GoCloudURL) + bundleURISink, err = bundleuri.NewSink( + ctx, + cfg.BundleURI.GoCloudURL, + ) if err != nil { return fmt.Errorf("create bundle-URI sink: %w", err) } + + if cfg.BundleURI.AutoGeneration.Enabled { + var bundleGenerationLimit *limiter.AdaptiveLimit + if cfg.BundleURI.AutoGeneration.Concurrency.Adaptive { + bundleGenerationLimit = limiter.NewAdaptiveLimit("bundleGeneration", limiter.AdaptiveSetting{ + Initial: cfg.BundleURI.AutoGeneration.Concurrency.InitialLimit, + Max: cfg.BundleURI.AutoGeneration.Concurrency.MaxLimit, + Min: cfg.BundleURI.AutoGeneration.Concurrency.MinLimit, + BackoffFactor: limiter.DefaultBackoffFactor, + }) + } else { + bundleGenerationLimit = limiter.NewAdaptiveLimit("bundleGeneration", limiter.AdaptiveSetting{Initial: cfg.BundleURI.AutoGeneration.Concurrency.MaxConcurrency}) + } + + bundleGenerationMonitor := limiter.NewPackObjectsConcurrencyMonitor( + cfg.Prometheus.GRPCLatencyBuckets, + ) + bundleGenerationLimiter := limiter.NewConcurrencyLimiter( + bundleGenerationLimit, + cfg.BundleURI.AutoGeneration.Concurrency.MaxQueueSize, + cfg.BundleURI.AutoGeneration.Concurrency.MaxQueueWait.Duration(), + bundleGenerationMonitor, + ) + prometheus.MustRegister(bundleGenerationMonitor) + + bundleGenerationMgr = bundleuri.NewBundleGenerationManager( + bundleURISink, + bundleGenerationLimiter, + ) + } } for _, c := range []starter.Config{ @@ -592,6 +626,8 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { BackupSink: backupSink, BackupLocator: backupLocator, BundleURISink: bundleURISink, + BundleGenerationMgr: bundleGenerationMgr, + InProgressTracker: service.NewInProgressTracker(), }) b.RegisterStarter(starter.New(c, srv, logger)) } diff --git a/internal/featureflag/ff_autogenerate_bundles.go b/internal/featureflag/ff_autogenerate_bundles.go new file mode 100644 index 0000000000000000000000000000000000000000..61692778f32d355af1a5b5d4fe028d201216b386 --- /dev/null +++ b/internal/featureflag/ff_autogenerate_bundles.go @@ -0,0 +1,9 @@ +package featureflag + +// AutogenerateBundlesForBundleURI enables the use of git's bundle URI feature +var AutogenerateBundlesForBundleURI = NewFeatureFlag( + "autogenerate_bundles_for_bundleuri", + "v17.3.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/6204", + false, +) diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 72526a14a4f4d6e2e895b2c8ef0a324359e63817..4164476a5c880114ab2df7c6b5ad03f9671fbd57 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -119,7 +119,7 @@ type Cfg struct { // This setting is thus deprecated and should ideally not be used anymore. GitlabShell GitlabShell `toml:"gitlab-shell,omitempty" json:"gitlab-shell"` Hooks Hooks `toml:"hooks,omitempty" json:"hooks"` - Concurrency []Concurrency `toml:"concurrency,omitempty" json:"concurrency"` + Concurrency []RPCConcurrency `toml:"concurrency,omitempty" json:"concurrency"` RateLimiting []RateLimiting `toml:"rate_limiting,omitempty" json:"rate_limiting"` GracefulRestartTimeout duration.Duration `toml:"graceful_restart_timeout,omitempty" json:"graceful_restart_timeout"` DailyMaintenance DailyJob `toml:"daily_maintenance,omitempty" json:"daily_maintenance"` @@ -466,12 +466,26 @@ type Logging struct { Sentry } -// Concurrency allows endpoints to be limited to a maximum concurrency per repo. +// BundleGenerationConcurrency defines concurrency for bundle generation. +type BundleGenerationConcurrency struct { + Concurrency + MaxConcurrency int `toml:"max_concurrency" json:"max_concurrency"` +} + +// RPCConcurrency defines Concurrency for RPCs +type RPCConcurrency struct { + Concurrency + // RPC is the name of the RPC to set concurrency limits for + RPC string `toml:"rpc" json:"rpc"` + // MaxPerRepo is the maximum number of concurrent calls for a given repository. This config is used only + // if Adaptive is false. + MaxPerRepo int `toml:"max_per_repo" json:"max_per_repo"` +} + +// Concurrency allowsoperati9ons to be limited to a maximum concurrency per repo. // Requests that come in after the maximum number of concurrent requests are in progress will wait // in a queue that is bounded by MaxQueueSize. type Concurrency struct { - // RPC is the name of the RPC to set concurrency limits for - RPC string `toml:"rpc" json:"rpc"` // Adaptive determines the behavior of the concurrency limit. If set to true, the concurrency limit is dynamic // and starts at InitialLimit, then adjusts within the range [MinLimit, MaxLimit] based on current resource // usage. If set to false, the concurrency limit is static and is set to MaxPerRepo. @@ -482,9 +496,6 @@ type Concurrency struct { MaxLimit int `toml:"max_limit,omitempty" json:"max_limit,omitempty"` // MinLimit is the mini adaptive concurrency limit. MinLimit int `toml:"min_limit,omitempty" json:"min_limit,omitempty"` - // MaxPerRepo is the maximum number of concurrent calls for a given repository. This config is used only - // if Adaptive is false. - MaxPerRepo int `toml:"max_per_repo" json:"max_per_repo"` // MaxQueueSize is the maximum number of requests in the queue waiting to be picked up // after which subsequent requests will return with an error. MaxQueueSize int `toml:"max_queue_size" json:"max_queue_size"` @@ -494,17 +505,17 @@ type Concurrency struct { } // Validate runs validation on all fields and compose all found errors. -func (c Concurrency) Validate() error { +func (r RPCConcurrency) Validate() error { errs := cfgerror.New(). - Append(cfgerror.Comparable(c.MaxPerRepo).GreaterOrEqual(0), "max_per_repo"). - Append(cfgerror.Comparable(c.MaxQueueSize).GreaterOrEqual(0), "max_queue_size"). - Append(cfgerror.Comparable(c.MaxQueueWait.Duration()).GreaterOrEqual(0), "max_queue_wait") + Append(cfgerror.Comparable(r.MaxPerRepo).GreaterOrEqual(0), "max_per_repo"). + Append(cfgerror.Comparable(r.MaxQueueSize).GreaterOrEqual(0), "max_queue_size"). + Append(cfgerror.Comparable(r.MaxQueueWait.Duration()).GreaterOrEqual(0), "max_queue_wait") - if c.Adaptive { + if r.Adaptive { errs = errs. - Append(cfgerror.Comparable(c.MinLimit).GreaterThan(0), "min_limit"). - Append(cfgerror.Comparable(c.MaxLimit).GreaterOrEqual(c.InitialLimit), "max_limit"). - Append(cfgerror.Comparable(c.InitialLimit).GreaterOrEqual(c.MinLimit), "initial_limit") + Append(cfgerror.Comparable(r.MinLimit).GreaterThan(0), "min_limit"). + Append(cfgerror.Comparable(r.MaxLimit).GreaterOrEqual(r.InitialLimit), "max_limit"). + Append(cfgerror.Comparable(r.InitialLimit).GreaterOrEqual(r.MinLimit), "initial_limit") } return errs.AsError() } @@ -618,11 +629,19 @@ func (bc BackupConfig) Validate() error { return errs.AsError() } +// BundleAutogeneration configures the setting for autogeneration of bundles for +// bundle-uri +type BundleAutogeneration struct { + Enabled bool `toml:"enabled,omitempty" json:"enabled,omitempty"` + Concurrency BundleGenerationConcurrency `toml:"concurrency,omitempty" json:"concurrency,omitempty"` +} + // BundleURIConfig configures use of Bundle-URI type BundleURIConfig struct { // GoCloudURL is the blob storage GoCloud URL that will be used to store // Git bundles for Bundle-URI use. - GoCloudURL string `toml:"go_cloud_url,omitempty" json:"go_cloud_url,omitempty"` + GoCloudURL string `toml:"go_cloud_url,omitempty" json:"go_cloud_url,omitempty"` + AutoGeneration BundleAutogeneration `toml:"auto_generation,omitempty" json:"auto_generation,omitempty"` } // Validate runs validation on all fields and returns any errors found. diff --git a/internal/gitaly/config/config_test.go b/internal/gitaly/config/config_test.go index de1abd9781a9fef4d7926d353694d764c3a3f473..de5a024ae218194be8807d4d6b09edb0f38d403b 100644 --- a/internal/gitaly/config/config_test.go +++ b/internal/gitaly/config/config_test.go @@ -1951,9 +1951,9 @@ func TestPackObjectsLimiting_Validate(t *testing.T) { func TestConcurrency_Validate(t *testing.T) { t.Parallel() - require.NoError(t, Concurrency{MaxPerRepo: 0}.Validate()) - require.NoError(t, Concurrency{MaxPerRepo: 1}.Validate()) - require.NoError(t, Concurrency{MaxPerRepo: 100}.Validate()) + require.NoError(t, RPCConcurrency{MaxPerRepo: 0}.Validate()) + require.NoError(t, RPCConcurrency{MaxPerRepo: 1}.Validate()) + require.NoError(t, RPCConcurrency{MaxPerRepo: 100}.Validate()) require.Equal( t, cfgerror.ValidationErrors{ @@ -1962,12 +1962,12 @@ func TestConcurrency_Validate(t *testing.T) { "max_per_repo", ), }, - Concurrency{MaxPerRepo: -1}.Validate(), + RPCConcurrency{MaxPerRepo: -1}.Validate(), ) - require.NoError(t, Concurrency{Adaptive: true, InitialLimit: 1, MinLimit: 1, MaxLimit: 100}.Validate()) - require.NoError(t, Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 1, MaxLimit: 100}.Validate()) - require.NoError(t, Concurrency{Adaptive: true, InitialLimit: 100, MinLimit: 1, MaxLimit: 100}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 1, MinLimit: 1, MaxLimit: 100}}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 1, MaxLimit: 100}}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 100, MinLimit: 1, MaxLimit: 100}}.Validate()) require.Equal( t, cfgerror.ValidationErrors{ @@ -1976,7 +1976,7 @@ func TestConcurrency_Validate(t *testing.T) { "min_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 0, MinLimit: 0, MaxLimit: 100}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 0, MinLimit: 0, MaxLimit: 100}}.Validate(), ) require.Equal( t, @@ -1986,7 +1986,7 @@ func TestConcurrency_Validate(t *testing.T) { "initial_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: -1, MinLimit: 1, MaxLimit: 100}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: -1, MinLimit: 1, MaxLimit: 100}}.Validate(), ) require.Equal( t, @@ -1996,7 +1996,7 @@ func TestConcurrency_Validate(t *testing.T) { "initial_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 11, MaxLimit: 100}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 11, MaxLimit: 100}}.Validate(), ) require.Equal( t, @@ -2006,7 +2006,7 @@ func TestConcurrency_Validate(t *testing.T) { "max_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: 3}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: 3}}.Validate(), ) require.Equal( t, @@ -2016,7 +2016,7 @@ func TestConcurrency_Validate(t *testing.T) { "min_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 5, MinLimit: -1, MaxLimit: 99}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 5, MinLimit: -1, MaxLimit: 99}}.Validate(), ) require.Equal( t, @@ -2026,12 +2026,12 @@ func TestConcurrency_Validate(t *testing.T) { "max_limit", ), }, - Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: -1}.Validate(), + RPCConcurrency{Concurrency: Concurrency{Adaptive: true, InitialLimit: 10, MinLimit: 5, MaxLimit: -1}}.Validate(), ) - require.NoError(t, Concurrency{MaxQueueSize: 0}.Validate()) - require.NoError(t, Concurrency{MaxQueueSize: 1}.Validate()) - require.NoError(t, Concurrency{MaxQueueSize: 100}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{MaxQueueSize: 0}}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{MaxQueueSize: 1}}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{MaxQueueSize: 100}}.Validate()) require.Equal( t, cfgerror.ValidationErrors{ @@ -2040,10 +2040,10 @@ func TestConcurrency_Validate(t *testing.T) { "max_queue_size", ), }, - Concurrency{MaxQueueSize: -1}.Validate(), + RPCConcurrency{Concurrency: Concurrency{MaxQueueSize: -1}}.Validate(), ) - require.NoError(t, Concurrency{MaxQueueWait: duration.Duration(1)}.Validate()) + require.NoError(t, RPCConcurrency{Concurrency: Concurrency{MaxQueueWait: duration.Duration(1)}}.Validate()) require.Equal( t, cfgerror.ValidationErrors{ @@ -2052,7 +2052,7 @@ func TestConcurrency_Validate(t *testing.T) { "max_queue_wait", ), }, - Concurrency{MaxQueueWait: duration.Duration(-time.Minute)}.Validate(), + RPCConcurrency{Concurrency: Concurrency{MaxQueueWait: duration.Duration(-time.Minute)}}.Validate(), ) } diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index d12bc0eadf2328bdb8c38b45b6c2c308dfdcb9cc..04b371c3cf37a8f9c17dd76b563fdd27fea6f813 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -362,7 +362,7 @@ func TestAuthBeforeLimit(t *testing.T) { testhelper.NewFeatureSets(featureflag.UseResizableSemaphoreInConcurrencyLimiter, featureflag.UseResizableSemaphoreLifoStrategy).Run(t, func(t *testing.T, ctx netctx.Context) { cfg := testcfg.Build(t, testcfg.WithBase(config.Cfg{ Auth: auth.Config{Token: "abc123"}, - Concurrency: []config.Concurrency{{ + Concurrency: []config.RPCConcurrency{{ RPC: "/gitaly.OperationService/UserCreateTag", MaxPerRepo: 1, }}, diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 4fc6f952f9af6bdf70ea21787b48fda7f91ff3f8..c50cb1b774c58113935bccfbbbed0f2028896a0d 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -48,6 +48,8 @@ type Dependencies struct { BackupLocator backup.Locator BundleURISink *bundleuri.Sink ProcReceiveRegistry *gitalyhook.ProcReceiveRegistry + InProgressTracker InProgressTracker + BundleGenerationMgr *bundleuri.BundleGenerationManager } // GetLogger returns the logger. @@ -164,3 +166,13 @@ func (dc *Dependencies) GetBundleURISink() *bundleuri.Sink { func (dc *Dependencies) GetProcReceiveRegistry() *gitalyhook.ProcReceiveRegistry { return dc.ProcReceiveRegistry } + +// GetInProgressTracker returns the ProcReceiveRegistry. +func (dc *Dependencies) GetInProgressTracker() InProgressTracker { + return dc.InProgressTracker +} + +// GetBundleGenerationManager returns the ProcReceiveRegistry. +func (dc *Dependencies) GetBundleGenerationManager() *bundleuri.BundleGenerationManager { + return dc.BundleGenerationMgr +} diff --git a/internal/gitaly/service/in_progress_tracker.go b/internal/gitaly/service/in_progress_tracker.go new file mode 100644 index 0000000000000000000000000000000000000000..5626170453b40a8dbd4bc0dd27d191fc28febfb0 --- /dev/null +++ b/internal/gitaly/service/in_progress_tracker.go @@ -0,0 +1,48 @@ +package service + +import ( + "sync" +) + +// InProgressTracker can be used to keep track of processes that are in flight +type InProgressTracker interface { + GetInProgress(key string) uint + IncrementInProgress(key string) + DecrementInProgress(key string) +} + +type inProgressTracker struct { + inProgress map[string]uint + l sync.RWMutex +} + +// NewInProgressTracker instantiates a new inProgressTracker. +func NewInProgressTracker() *inProgressTracker { + return &inProgressTracker{ + inProgress: make(map[string]uint), + } +} + +// GetInProgress gets the number of inflight processes for a given key. +func (p *inProgressTracker) GetInProgress(key string) uint { + p.l.RLock() + defer p.l.RUnlock() + + return p.inProgress[key] +} + +// IncrementInProgress increments the number of inflight processes for a given key. +func (p *inProgressTracker) IncrementInProgress(key string) { + p.l.Lock() + defer p.l.Unlock() + + p.inProgress[key]++ +} + +// DecrementInProgress decrements the number of inflight processes for a given key. +func (p *inProgressTracker) DecrementInProgress(key string) { + p.l.Lock() + defer p.l.Unlock() + + p.inProgress[key]-- +} diff --git a/internal/gitaly/service/in_progress_tracker_test.go b/internal/gitaly/service/in_progress_tracker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..70c40483c5ec34b18b624c8c44da29c55a2b2507 --- /dev/null +++ b/internal/gitaly/service/in_progress_tracker_test.go @@ -0,0 +1,67 @@ +package service_test + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" +) + +func TestInProgressTracker(t *testing.T) { + key := "key1" + + testCases := []struct { + desc string + expectedInProgress uint + actions func(service.InProgressTracker) + }{ + { + desc: "one in flight", + expectedInProgress: 1, + actions: func(t service.InProgressTracker) { + t.IncrementInProgress(key) + t.IncrementInProgress(key) + t.DecrementInProgress(key) + }, + }, + { + desc: "two in flight with concurrent writes", + expectedInProgress: 2, + actions: func(t service.InProgressTracker) { + var wg sync.WaitGroup + + wg.Add(4) + go func() { + t.IncrementInProgress(key) + wg.Done() + }() + go func() { + t.IncrementInProgress(key) + wg.Done() + }() + + go func() { + t.IncrementInProgress(key) + wg.Done() + }() + + go func() { + t.DecrementInProgress(key) + wg.Done() + }() + + wg.Wait() + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + tracker := service.NewInProgressTracker() + + tc.actions(tracker) + require.Equal(t, tc.expectedInProgress, tracker.GetInProgress(key)) + }) + } +} diff --git a/internal/gitaly/service/smarthttp/server.go b/internal/gitaly/service/smarthttp/server.go index 528deeb82eda9a4f2efd78dd624d2237d0802c17..87669e39ec9deed625ae07daa5b07d13a0d9598c 100644 --- a/internal/gitaly/service/smarthttp/server.go +++ b/internal/gitaly/service/smarthttp/server.go @@ -34,6 +34,10 @@ type server struct { backupLocator backup.Locator backupSink backup.Sink bundleURISink *bundleuri.Sink + bundleGenerationMgr *bundleuri.BundleGenerationManager + inProgressTracker service.InProgressTracker + generateBundles bool + partitionMgr *storagemgr.PartitionManager } // NewServer creates a new instance of a grpc SmartHTTPServer @@ -52,10 +56,14 @@ func NewServer(deps *service.Dependencies, serverOpts ...ServerOpt) gitalypb.Sma prometheus.CounterOpts{}, []string{"git_negotiation_feature"}, ), - infoRefCache: newInfoRefCache(deps.GetLogger(), deps.GetDiskCache()), - backupLocator: deps.GetBackupLocator(), - backupSink: deps.GetBackupSink(), - bundleURISink: deps.GetBundleURISink(), + infoRefCache: newInfoRefCache(deps.GetLogger(), deps.GetDiskCache()), + backupLocator: deps.GetBackupLocator(), + backupSink: deps.GetBackupSink(), + bundleURISink: deps.GetBundleURISink(), + bundleGenerationMgr: deps.GetBundleGenerationManager(), + inProgressTracker: deps.GetInProgressTracker(), + generateBundles: deps.GetCfg().BundleURI.AutoGeneration.Enabled, + partitionMgr: deps.GetPartitionManager(), } for _, serverOpt := range serverOpts { diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index 6486436f66180de3399542b684f3d419cc1ea351..7919a3c05e9871a319702adbacdf797215d1bb58 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -6,16 +6,27 @@ import ( "errors" "fmt" "io" + "time" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri" "gitlab.com/gitlab-org/gitaly/v16/internal/command" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) +const ( + concurrentUploadPackThreshold = 5 + bundleGenerationTimeout = 24 * time.Hour +) + func (s *server) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) { repoPath, gitConfig, err := s.validateUploadPackRequest(ctx, req) if err != nil { @@ -116,8 +127,75 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack gitConfig = append(gitConfig, bundleuri.CapabilitiesGitConfig(ctx)...) + originalRepo := req.GetRepository() + + storagectx.RunWithTransaction(ctx, func(tx storagectx.Transaction) { + originalRepo = tx.OriginalRepository(req.GetRepository()) + }) + + key := originalRepo.GetStorageName() + ":" + originalRepo.GetRelativePath() + uploadPackConfig, err := bundleuri.UploadPackGitConfig(ctx, s.bundleURISink, req.GetRepository()) if err != nil { + if errors.Is(err, bundleuri.ErrBundleNotFound) && + featureflag.AutogenerateBundlesForBundleURI.IsEnabled(ctx) && + s.generateBundles && + s.inProgressTracker.GetInProgress(key) > concurrentUploadPackThreshold { + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), bundleGenerationTimeout) + defer cancel() + + if s.partitionMgr != nil { + tx, err := s.partitionMgr.Begin( + ctx, + originalRepo.GetStorageName(), + 0, + storagemgr.TransactionOptions{ + ReadOnly: true, + RelativePath: originalRepo.GetRelativePath(), + }, + ) + if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("failed starting transaction") + return + } + + if err := s.bundleGenerationMgr.Generate(ctx, localrepo.New( + s.logger, + s.locator, + s.gitCmdFactory, + s.catfileCache, + originalRepo)); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("generate bundle") + + if err := tx.Rollback(); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("failed rolling back transaction") + return + } + + return + } + + if err := tx.Commit(ctx); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("committing transaction") + } + } else { + if err := s.bundleGenerationMgr.Generate(ctx, localrepo.New( + s.logger, + s.locator, + s.gitCmdFactory, + s.catfileCache, + originalRepo, + )); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("generate bundle") + return + } + } + }() + } else if !errors.Is(err, bundleuri.ErrSinkMissing) { + s.logger.WithError(err).ErrorContext(ctx, "failed configuring bundle-uri") + } } else { gitConfig = append(gitConfig, uploadPackConfig...) } @@ -130,6 +208,9 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack git.WithPackObjectsHookEnv(req.GetRepository(), "http"), } + s.inProgressTracker.IncrementInProgress(key) + defer s.inProgressTracker.DecrementInProgress(key) + cmd, err := s.gitCmdFactory.New(ctx, req.GetRepository(), git.Command{ Name: "upload-pack", Flags: []git.Option{git.Flag{Name: "--stateless-rpc"}}, @@ -158,5 +239,6 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack } s.logger.WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).InfoContext(ctx, "request details") + return nil, nil } diff --git a/internal/gitaly/service/smarthttp/upload_pack_test.go b/internal/gitaly/service/smarthttp/upload_pack_test.go index a6359d08d0a07ec535152f212cc1e0af30903cbe..7f2b2f312891d09ab5fad0282c0739321159a3d5 100644 --- a/internal/gitaly/service/smarthttp/upload_pack_test.go +++ b/internal/gitaly/service/smarthttp/upload_pack_test.go @@ -21,12 +21,15 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/limiter" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -277,6 +280,7 @@ func testServerPostUploadPackWithSidechannelUsesPackObjectsHook(t *testing.T, ct func testServerPostUploadPackUsesPackObjectsHook(t *testing.T, ctx context.Context, makeRequest requestMaker, opts ...testcfg.Option) { cfg := testcfg.Build(t, append(opts, testcfg.WithPackObjectsCacheEnabled())...) + cfg.BinDir = testhelper.TempDir(t) outputPath := filepath.Join(cfg.BinDir, "output") //nolint:gitaly-linters @@ -379,6 +383,7 @@ func TestServer_PostUploadPackWithBundleURI(t *testing.T) { ctx := testhelper.Context(t) ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.BundleURI, true) + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.AutogenerateBundlesForBundleURI, false) tempDir := testhelper.TempDir(t) keyFile, err := os.Create(filepath.Join(tempDir, "secret.key")) @@ -499,6 +504,208 @@ func TestServer_PostUploadPackWithBundleURI(t *testing.T) { } } +type testInProgressTracker struct { + thresholdReached bool +} + +func (t *testInProgressTracker) GetInProgress(key string) uint { + if t.thresholdReached { + return concurrentUploadPackThreshold + 1 + } + + return 0 +} + +func (t *testInProgressTracker) IncrementInProgress(key string) {} + +func (t *testInProgressTracker) DecrementInProgress(key string) {} + +type blockingLimiter struct { + ch chan struct{} + lastBundleGeneratedPath string +} + +func (l *blockingLimiter) Limit(ctx context.Context, lockKey string, f limiter.LimitedFunc) (interface{}, error) { + res, err := f() + l.lastBundleGeneratedPath = res.(string) + l.ch <- struct{}{} + return res, err +} + +func TestServer_PostUploadPackAutogenerateBundles(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.AutogenerateBundlesForBundleURI, true) + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.BundleURI, true) + + testCases := []struct { + desc string + sinkDir string + setup func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *testInProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) + expectBundleGenerated bool + verifyBundle func(*testing.T, config.Cfg, string, git.ObjectID) + }{ + { + desc: "autogeneration successful", + setup: func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *testInProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + + tracker.thresholdReached = true + }, + expectBundleGenerated: true, + verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) { + tempDir := t.TempDir() + objectFormat := gittest.DefaultObjectHash.Format + gittest.Exec(t, cfg, "init", "--object-format="+objectFormat, tempDir) + gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath) + // A new bundle is expected to be created containing the new commit + gittest.RequireObjectExists(t, cfg, tempDir, commit) + }, + }, + { + desc: "bundle already exists", + setup: func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *testInProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + tracker.thresholdReached = true + + repo := localrepo.NewTestRepo(t, cfg, repoProto) + require.NoError(t, sink.Generate(ctx, repo)) + }, + expectBundleGenerated: false, + verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) { + tempDir := t.TempDir() + objectFormat := gittest.DefaultObjectHash.Format + gittest.Exec(t, cfg, "init", "--object-format="+objectFormat, tempDir) + gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath) + // No new bundle is expected to be created since one already existed. + gittest.RequireObjectNotExists(t, cfg, tempDir, commit) + }, + }, + { + desc: "no concurrent upload packs in flight", + setup: func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *testInProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) { + tracker.thresholdReached = false + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + }, + expectBundleGenerated: false, + verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) { + tempDir := t.TempDir() + gittest.Exec(t, cfg, "init", tempDir) + gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath) + // No new bundle is expected to have been created because there are no + // inflight upload pack calls. + gittest.RequireObjectNotExists(t, cfg, tempDir, commit) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + tracker := &testInProgressTracker{} + + sinkDir := t.TempDir() + sink, err := bundleuri.NewSink(ctx, "file://"+sinkDir) + require.NoError(t, err) + + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + + cfg.BundleURI.AutoGeneration.Enabled = true + + gitCmdFactory := gittest.NewCommandFactory(t, cfg) + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + blockingCh := make(chan struct{}) + limiter := &blockingLimiter{ch: blockingCh} + + server := startSmartHTTPServerWithOptions(t, cfg, nil, []testserver.GitalyServerOpt{ + testserver.WithBundleGenerationManager(bundleuri.NewBundleGenerationManager(sink, limiter)), + testserver.WithBundleURISink(sink), + testserver.WithLogger(logger), + testserver.WithInProgressTracker(tracker), + testserver.WithTransactionRegistry(storagemgr.NewTransactionRegistry()), + testserver.WithGitCommandFactory(gitCmdFactory), + }) + + cfg.SocketPath = server.Address() + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) + oldCommit := gittest.WriteCommit(t, cfg, repoPath) + newCommit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("master"), gittest.WithParents(oldCommit)) + + if tc.setup != nil { + tc.setup(t, ctx, cfg, sink, tracker, repoProto, repoPath) + } + + commitInUpdatedBundle := gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "CHANGELOG", Content: "nothing changed"}), + gittest.WithBranch("main")) + + // UploadPack request is a "want" packet line followed by a packet flush, then many "have" packets followed by a packet flush. + // This is explained a bit in https://git-scm.com/book/en/v2/Git-Internals-Transfer-Protocols#_downloading_data + requestBuffer := &bytes.Buffer{} + gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("want %s %s\n", newCommit, clientCapabilities)) + gittest.WritePktlineFlush(t, requestBuffer) + gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("have %s\n", oldCommit)) + gittest.WritePktlineFlush(t, requestBuffer) + + req := &gitalypb.PostUploadPackWithSidechannelRequest{Repository: repoProto} + responseBuffer, err := makePostUploadPackWithSidechannelRequest(t, ctx, cfg.SocketPath, cfg.Auth.Token, req, requestBuffer) + require.NoError(t, err) + + pack, _, _ := extractPackDataFromResponse(t, responseBuffer) + require.NotEmpty(t, pack, "Expected to find a pack file in response, found none") + + if tc.expectBundleGenerated { + <-blockingCh + tc.verifyBundle(t, cfg, filepath.Join(sinkDir, limiter.lastBundleGeneratedPath), commitInUpdatedBundle) + } else { + require.Empty(t, limiter.lastBundleGeneratedPath) + } + }) + } +} + func testServerPostUploadPackWithSideChannelValidation(t *testing.T, ctx context.Context, makeRequest requestMaker, opts ...testcfg.Option) { cfg := testcfg.Build(t, opts...) serverSocketPath := runSmartHTTPServer(t, cfg) diff --git a/internal/gitaly/service/testhelper_test.go b/internal/gitaly/service/testhelper_test.go new file mode 100644 index 0000000000000000000000000000000000000000..43ac8d85bfd2896d10a3903f2c03d42044eac1db --- /dev/null +++ b/internal/gitaly/service/testhelper_test.go @@ -0,0 +1,11 @@ +package service_test + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} diff --git a/internal/grpc/middleware/limithandler/middleware_test.go b/internal/grpc/middleware/limithandler/middleware_test.go index 99eff32a0bc0261d099f50c3364f6c3995eed9b2..d651798bab29acf0f11219bfda6ce6174352e11b 100644 --- a/internal/grpc/middleware/limithandler/middleware_test.go +++ b/internal/grpc/middleware/limithandler/middleware_test.go @@ -38,7 +38,7 @@ func TestWithConcurrencyLimiters(t *testing.T) { t.Parallel() testhelper.NewFeatureSets(featureflag.UseResizableSemaphoreInConcurrencyLimiter, featureflag.UseResizableSemaphoreLifoStrategy).Run(t, func(t *testing.T, ctx context.Context) { cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ { RPC: "/grpc.testing.TestService/UnaryCall", MaxPerRepo: 1, @@ -48,11 +48,13 @@ func TestWithConcurrencyLimiters(t *testing.T) { MaxPerRepo: 99, }, { - RPC: "/grpc.testing.TestService/AnotherUnaryCall", - Adaptive: true, - MinLimit: 5, - InitialLimit: 10, - MaxLimit: 15, + RPC: "/grpc.testing.TestService/AnotherUnaryCall", + Concurrency: config.Concurrency{ + Adaptive: true, + MinLimit: 5, + InitialLimit: 10, + MaxLimit: 15, + }, }, }, } @@ -87,7 +89,7 @@ func TestUnaryLimitHandler(t *testing.T) { } cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ {RPC: "/grpc.testing.TestService/UnaryCall", MaxPerRepo: 2}, }, } @@ -167,8 +169,14 @@ func testUnaryLimitHandlerQueueStrategy(b *testing.B, ctx context.Context, numRe } cfg := config.Cfg{ - Concurrency: []config.Concurrency{ - {RPC: "/grpc.testing.TestService/UnaryCall", MaxPerRepo: 50, MaxQueueSize: 100}, + Concurrency: []config.RPCConcurrency{ + { + RPC: "/grpc.testing.TestService/UnaryCall", + MaxPerRepo: 50, + Concurrency: config.Concurrency{ + MaxQueueSize: 100, + }, + }, }, } @@ -214,25 +222,27 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { testhelper.NewFeatureSets(featureflag.UseResizableSemaphoreInConcurrencyLimiter, featureflag.UseResizableSemaphoreLifoStrategy).Run(t, func(t *testing.T, ctx context.Context) { t.Run("simple timeout", func(t *testing.T) { cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ { - RPC: "/grpc.testing.TestService/UnaryCall", - MaxPerRepo: 1, - MaxQueueSize: 1, - // This test setups two requests: - // - The first one is eligible. It enters the handler and blocks the queue. - // - The second request is blocked until timeout. - // Both of them shares this timeout. Internally, the limiter creates a context - // deadline to reject timed out requests. If it's set too low, there's a tiny - // possibility that the context reaches the deadline when the limiter checks the - // request. Thus, setting a reasonable timeout here and adding some retry - // attempts below make the test stable. - // Another approach is to implement a hooking mechanism that allows us to - // override context deadline setup. However, that approach exposes the internal - // implementation of the limiter. It also adds unnecessarily logics. - // Congiuring the timeout is more straight-forward and close to the expected - // behavior. - MaxQueueWait: duration.Duration(100 * time.Millisecond), + RPC: "/grpc.testing.TestService/UnaryCall", + MaxPerRepo: 1, + Concurrency: config.Concurrency{ + MaxQueueSize: 1, + // This test setups two requests: + // - The first one is eligible. It enters the handler and blocks the queue. + // - The second request is blocked until timeout. + // Both of them shares this timeout. Internally, the limiter creates a context + // deadline to reject timed out requests. If it's set too low, there's a tiny + // possibility that the context reaches the deadline when the limiter checks the + // request. Thus, setting a reasonable timeout here and adding some retry + // attempts below make the test stable. + // Another approach is to implement a hooking mechanism that allows us to + // override context deadline setup. However, that approach exposes the internal + // implementation of the limiter. It also adds unnecessarily logics. + // Congiuring the timeout is more straight-forward and close to the expected + // behavior. + MaxQueueWait: duration.Duration(100 * time.Millisecond), + }, }, }, } @@ -288,7 +298,7 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { t.Run("unlimited queueing", func(t *testing.T) { cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ // Due to a bug queueing wait times used to leak into subsequent // concurrency configuration in case they didn't explicitly set up // the queueing wait time. We thus set up two limits here: one dummy @@ -296,9 +306,11 @@ func TestUnaryLimitHandler_queueing(t *testing.T) { // that has no wait limit. We of course expect that the actual // config should not have any maximum queueing time. { - RPC: "dummy", - MaxPerRepo: 1, - MaxQueueWait: duration.Duration(1 * time.Nanosecond), + RPC: "dummy", + MaxPerRepo: 1, + Concurrency: config.Concurrency{ + MaxQueueWait: duration.Duration(1 * time.Nanosecond), + }, }, { RPC: "/grpc.testing.TestService/UnaryCall", @@ -564,11 +576,13 @@ func TestStreamLimitHandler(t *testing.T) { maxQueueSize := 1 cfg := config.Cfg{ - Concurrency: []config.Concurrency{ + Concurrency: []config.RPCConcurrency{ { - RPC: tc.fullname, - MaxPerRepo: tc.maxConcurrency, - MaxQueueSize: maxQueueSize, + RPC: tc.fullname, + MaxPerRepo: tc.maxConcurrency, + Concurrency: config.Concurrency{ + MaxQueueSize: maxQueueSize, + }, }, }, } @@ -619,8 +633,14 @@ func TestStreamLimitHandler_error(t *testing.T) { s.blockCh = make(chan struct{}) cfg := config.Cfg{ - Concurrency: []config.Concurrency{ - {RPC: "/grpc.testing.TestService/FullDuplexCall", MaxPerRepo: 1, MaxQueueSize: 1}, + Concurrency: []config.RPCConcurrency{ + { + RPC: "/grpc.testing.TestService/FullDuplexCall", + MaxPerRepo: 1, + Concurrency: config.Concurrency{ + MaxQueueSize: 1, + }, + }, }, } @@ -755,8 +775,14 @@ func TestConcurrencyLimitHandlerMetrics(t *testing.T) { methodName := "/grpc.testing.TestService/UnaryCall" cfg := config.Cfg{ - Concurrency: []config.Concurrency{ - {RPC: methodName, MaxPerRepo: 1, MaxQueueSize: 1}, + Concurrency: []config.RPCConcurrency{ + { + RPC: methodName, + MaxPerRepo: 1, + Concurrency: config.Concurrency{ + MaxQueueSize: 1, + }, + }, }, } diff --git a/internal/limiter/concurrency_limiter.go b/internal/limiter/concurrency_limiter.go index 7f23eb3ad2527c70bbb3b23c6178d76da26ec966..409a88db76050ede978d7914dd192ca0cc782080 100644 --- a/internal/limiter/concurrency_limiter.go +++ b/internal/limiter/concurrency_limiter.go @@ -75,6 +75,9 @@ const ( // TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request // information (RemoteIP/Repository/User) as the limiting key. TypePackObjects = "pack-objects" + // TypeBundleGeneration is a dedicated concurrency limiter for + // bundle-generation. + TypeBundleGeneration = "bundle-generation" ) // ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the diff --git a/internal/limiter/monitor.go b/internal/limiter/monitor.go index 9230525d86545f20a52a0fb8dc61c15e803da56c..cde32d0fd645d23f8ea0b9ef233bb19d147d74c9 100644 --- a/internal/limiter/monitor.go +++ b/internal/limiter/monitor.go @@ -194,3 +194,46 @@ func NewPackObjectsConcurrencyMonitor(latencyBuckets []float64) *PromMonitor { requestsDroppedVec, ) } + +// NewBundleGenerationConcurrencyMonitor returns a concurrency monitor for use +// with limiting pack objects processes. +func NewBundleGenerationConcurrencyMonitor(latencyBuckets []float64) *PromMonitor { + acquiringSecondsVec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gitaly_bundle_generation_acquiring_seconds", + Help: "Histogram of time calls are rate limited (in seconds)", + Buckets: latencyBuckets, + }, + nil, + ) + + inProgressVec := prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "gitaly_bundle_generation_in_progress", + Help: "Gauge of number of concurrent in-progress calls", + }, + ) + + queuedVec := prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "gitaly_bundle_generation_queued", + Help: "Gauge of number of queued calls", + }, + ) + + requestsDroppedVec := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gitaly_bundle_generation_dropped_total", + Help: "Number of requests dropped from the queue", + }, + []string{"reason"}, + ) + + return newPromMonitor( + TypeBundleGeneration, + queuedVec, + inProgressVec, + acquiringSecondsVec, + requestsDroppedVec, + ) +} diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 974eac082f4df13955ecde35f496d9ee570c95fd..88d5a47635b5c52439eb9f8e563bca5b7df4d151 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -290,6 +290,8 @@ type gitalyServerDeps struct { signingKey string transactionRegistry *storagemgr.TransactionRegistry procReceiveRegistry *hook.ProcReceiveRegistry + inProgressTracker service.InProgressTracker + bundleGenerationMgr *bundleuri.BundleGenerationManager } func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies { @@ -331,6 +333,10 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * gsd.procReceiveRegistry = hook.NewProcReceiveRegistry() } + if gsd.inProgressTracker == nil { + gsd.inProgressTracker = service.NewInProgressTracker() + } + var partitionManager *storagemgr.PartitionManager if testhelper.IsWALEnabled() { dbMgr, err := keyvalue.NewDBManager( @@ -438,6 +444,8 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * BackupLocator: gsd.backupLocator, BundleURISink: gsd.bundleURISink, ProcReceiveRegistry: gsd.procReceiveRegistry, + InProgressTracker: gsd.inProgressTracker, + BundleGenerationMgr: gsd.bundleGenerationMgr, } } @@ -567,6 +575,22 @@ func WithBundleURISink(sink *bundleuri.Sink) GitalyServerOpt { } } +// WithInProgressTracker sets the bundleuri.Sink that will be used for Gitaly services +func WithInProgressTracker(tracker service.InProgressTracker) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.inProgressTracker = tracker + return deps + } +} + +// WithBundleGenerationManager sets the bundleuri.Sink that will be used for Gitaly services +func WithBundleGenerationManager(mgr *bundleuri.BundleGenerationManager) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.bundleGenerationMgr = mgr + return deps + } +} + // WithSigningKey sets the signing key path that will be used for Gitaly // services. func WithSigningKey(signingKey string) GitalyServerOpt {