From e40fd0f928f966d6b6930bbec3870138f112213b Mon Sep 17 00:00:00 2001 From: John Cai Date: Fri, 21 Jun 2024 12:02:15 -0400 Subject: [PATCH 1/5] service: Add inflight tracker An inflight tracker can be used to track anything that's inflight. This will be used to track how many pack-objects processes are inflight for a given repository in order to inform whether or not a bundle should be created for a given repository. --- .../gitaly/service/in_progress_tracker.go | 42 ++++++++++++ .../service/in_progress_tracker_test.go | 67 +++++++++++++++++++ internal/gitaly/service/testhelper_test.go | 11 +++ 3 files changed, 120 insertions(+) create mode 100644 internal/gitaly/service/in_progress_tracker.go create mode 100644 internal/gitaly/service/in_progress_tracker_test.go create mode 100644 internal/gitaly/service/testhelper_test.go diff --git a/internal/gitaly/service/in_progress_tracker.go b/internal/gitaly/service/in_progress_tracker.go new file mode 100644 index 0000000000..bd1aa7052e --- /dev/null +++ b/internal/gitaly/service/in_progress_tracker.go @@ -0,0 +1,42 @@ +package service + +import ( + "sync" +) + +// InProgressTracker can be used to keep track of processes that are in flight +type InProgressTracker struct { + inProgress map[string]int + l sync.RWMutex +} + +// NewInProgressTracker instantiates a new InProgressTracker. +func NewInProgressTracker() *InProgressTracker { + return &InProgressTracker{ + inProgress: make(map[string]int), + } +} + +// GetInProgress gets the number of inflight processes for a given key. +func (p *InProgressTracker) GetInProgress(key string) int { + p.l.RLock() + defer p.l.RUnlock() + + return p.inProgress[key] +} + +// IncrementInProgress increments the number of inflight processes for a given key. +func (p *InProgressTracker) IncrementInProgress(key string) { + p.l.Lock() + defer p.l.Unlock() + + p.inProgress[key]++ +} + +// DecrementInProgress decrements the number of inflight processes for a given key. +func (p *InProgressTracker) DecrementInProgress(key string) { + p.l.Lock() + defer p.l.Unlock() + + p.inProgress[key]-- +} diff --git a/internal/gitaly/service/in_progress_tracker_test.go b/internal/gitaly/service/in_progress_tracker_test.go new file mode 100644 index 0000000000..99178863c6 --- /dev/null +++ b/internal/gitaly/service/in_progress_tracker_test.go @@ -0,0 +1,67 @@ +package service_test + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" +) + +func TestInProgressTracker(t *testing.T) { + key := "key1" + + testCases := []struct { + desc string + expectedInProgress int + actions func(*service.InProgressTracker) + }{ + { + desc: "one in flight", + expectedInProgress: 1, + actions: func(t *service.InProgressTracker) { + t.IncrementInProgress(key) + t.IncrementInProgress(key) + t.DecrementInProgress(key) + }, + }, + { + desc: "two in flight with concurrent writes", + expectedInProgress: 2, + actions: func(t *service.InProgressTracker) { + var wg sync.WaitGroup + + wg.Add(4) + go func() { + t.IncrementInProgress(key) + wg.Done() + }() + go func() { + t.IncrementInProgress(key) + wg.Done() + }() + + go func() { + t.IncrementInProgress(key) + wg.Done() + }() + + go func() { + t.DecrementInProgress(key) + wg.Done() + }() + + wg.Wait() + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + tracker := service.NewInProgressTracker() + + tc.actions(tracker) + require.Equal(t, tc.expectedInProgress, tracker.GetInProgress(key)) + }) + } +} diff --git a/internal/gitaly/service/testhelper_test.go b/internal/gitaly/service/testhelper_test.go new file mode 100644 index 0000000000..43ac8d85bf --- /dev/null +++ b/internal/gitaly/service/testhelper_test.go @@ -0,0 +1,11 @@ +package service_test + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} -- GitLab From f31b811277d59182919afb333c3a42841c0fdc92 Mon Sep 17 00:00:00 2001 From: John Cai Date: Fri, 21 Jun 2024 16:15:02 -0400 Subject: [PATCH 2/5] bundleuri: Add a function to generate bundles in the background We will need the ability to generate bundles in the background if one does not exist for a repository during concurrent clones. To do so, introduce a function on the Sink to call Generate() asynchronously. --- internal/bundleuri/git_config.go | 5 +- internal/bundleuri/git_config_test.go | 7 +- internal/bundleuri/sink.go | 92 +++++++++++++-- internal/bundleuri/sink_test.go | 164 +++++++++++++++++++++++++- 4 files changed, 255 insertions(+), 13 deletions(-) diff --git a/internal/bundleuri/git_config.go b/internal/bundleuri/git_config.go index 69cc21b49b..f18fd4bda1 100644 --- a/internal/bundleuri/git_config.go +++ b/internal/bundleuri/git_config.go @@ -11,6 +11,9 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/log" ) +// ErrSinkMissing indicates a sink is missing +var ErrSinkMissing = errors.New("bundle-URI sink missing") + // CapabilitiesGitConfig returns a slice of git.ConfigPairs that can be injected // into the Git config to make it aware the bundle-URI capabilities are // supported. @@ -42,7 +45,7 @@ func UploadPackGitConfig( } if sink == nil { - return CapabilitiesGitConfig(ctx), errors.New("bundle-URI sink missing") + return CapabilitiesGitConfig(ctx), ErrSinkMissing } uri, err := sink.SignedURL(ctx, repo) diff --git a/internal/bundleuri/git_config_test.go b/internal/bundleuri/git_config_test.go index 0a5da2bbc2..d81224bf4f 100644 --- a/internal/bundleuri/git_config_test.go +++ b/internal/bundleuri/git_config_test.go @@ -14,7 +14,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/git" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" - "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" ) @@ -60,7 +59,7 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { return setupData{} }, expectedConfig: nil, - expectedErr: errors.New("bundle-URI sink missing"), + expectedErr: ErrSinkMissing, }, { desc: "no bundle found", @@ -74,7 +73,7 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { } }, expectedConfig: nil, - expectedErr: structerr.NewNotFound("no bundle available"), + expectedErr: ErrBundleNotFound, }, { desc: "not signed", @@ -144,7 +143,7 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { actual, err := UploadPackGitConfig(ctx, sink, repoProto) if featureflag.BundleURI.IsEnabled(ctx) { - require.Equal(t, tc.expectedErr, err) + require.True(t, errors.Is(err, tc.expectedErr) || strings.Contains(err.Error(), tc.expectedErr.Error())) if tc.expectedConfig != nil { require.Equal(t, len(tc.expectedConfig), len(actual)) diff --git a/internal/bundleuri/sink.go b/internal/bundleuri/sink.go index a2d1ac8d00..e6beabe5e7 100644 --- a/internal/bundleuri/sink.go +++ b/internal/bundleuri/sink.go @@ -7,6 +7,7 @@ import ( "io" "path/filepath" "strings" + "sync" "time" "gitlab.com/gitlab-org/gitaly/v16/internal/backup" @@ -29,21 +30,61 @@ const ( defaultExpiry = 10 * time.Minute ) +var ( + // ErrBundleGenerationInProgress indicates that an existing bundle generation + // is already in progress. + ErrBundleGenerationInProgress = errors.New("bundle generation in progress") + // ErrBundleNotFound indicates that no bundle could be found for a given repository. + ErrBundleNotFound = errors.New("no bundle found") +) + // Sink is a wrapper around the storage bucket used for accessing/writing // bundleuri bundles. type Sink struct { - bucket *blob.Bucket + bucket *blob.Bucket + bundleCreationMutex map[string]*sync.Mutex + + config sinkConfig +} + +type sinkConfig struct { + notifyBundleGeneration func(string, error) +} + +// SinkOption can be passed into NewSink to pass in options when creating a new sink. +type SinkOption func(s *sinkConfig) + +// WithBundleGenerationNotifier sets a notifier function that gets called when GenerateOneAtATime +// finishes. GenerateOneAtATime will be called in a separate background goroutine, so this function +// is an entrypoint to pass in logic to be called after the bundle has been generated. +func WithBundleGenerationNotifier(f func(string, error)) SinkOption { + return func(s *sinkConfig) { + s.notifyBundleGeneration = f + } } // NewSink creates a Sink from the given parameters. -func NewSink(ctx context.Context, uri string) (*Sink, error) { +func NewSink(ctx context.Context, uri string, options ...SinkOption) (*Sink, error) { bucket, err := blob.OpenBucket(ctx, uri) if err != nil { return nil, fmt.Errorf("open bucket: %w", err) } - return &Sink{ - bucket: bucket, - }, nil + + s := &Sink{ + bucket: bucket, + bundleCreationMutex: make(map[string]*sync.Mutex), + } + + var c sinkConfig + if len(options) > 0 { + for _, option := range options { + option(&c) + } + + s.config = c + } + + return s, nil } // relativePath returns a relative path of the bundle-URI bundle inside the @@ -73,6 +114,43 @@ func (s *Sink) getWriter(ctx context.Context, relativePath string) (io.WriteClos return writer, nil } +// GenerateOneAtATime generates a bundle for a repository, but only if there is not already +// one in flight. +func (s *Sink) GenerateOneAtATime(ctx context.Context, repo *localrepo.Repo) error { + bundlePath := s.relativePath(repo, defaultBundle) + + var m *sync.Mutex + var ok bool + + if m, ok = s.bundleCreationMutex[bundlePath]; !ok { + s.bundleCreationMutex[bundlePath] = &sync.Mutex{} + m = s.bundleCreationMutex[bundlePath] + } + + if m.TryLock() { + defer m.Unlock() + errChan := make(chan error) + + go func(ctx context.Context) { + select { + case errChan <- s.Generate(ctx, repo): + case <-ctx.Done(): + errChan <- ctx.Err() + } + }(ctx) + + err := <-errChan + + if s.config.notifyBundleGeneration != nil { + s.config.notifyBundleGeneration(bundlePath, err) + } + } else { + return fmt.Errorf("%w: %s", ErrBundleGenerationInProgress, bundlePath) + } + + return nil +} + // Generate creates a bundle for bundle-URI use into the bucket. func (s Sink) Generate(ctx context.Context, repo *localrepo.Repo) (returnErr error) { ref, err := repo.HeadReference(ctx) @@ -132,9 +210,9 @@ func (s Sink) SignedURL(ctx context.Context, repo storage.Repository) (string, e if exists, err := s.bucket.Exists(ctx, relativePath); !exists { if err == nil { - return "", structerr.NewNotFound("no bundle available") + return "", ErrBundleNotFound } - return "", structerr.NewNotFound("no bundle available: %w", err) + return "", fmt.Errorf("%w: %w", ErrBundleNotFound, err) } uri, err := s.bucket.SignedURL(ctx, relativePath, &blob.SignedURLOptions{ diff --git a/internal/bundleuri/sink_test.go b/internal/bundleuri/sink_test.go index 7ab6798bbe..50746cc81b 100644 --- a/internal/bundleuri/sink_test.go +++ b/internal/bundleuri/sink_test.go @@ -1,6 +1,7 @@ package bundleuri import ( + "context" "fmt" "os" "path/filepath" @@ -105,7 +106,7 @@ func TestSink_SignedURL(t *testing.T) { { desc: "fails with missing bundle", setup: func(t *testing.T, sinkDir string, sink *Sink) {}, - expectedErr: structerr.NewNotFound("no bundle available"), + expectedErr: ErrBundleNotFound, }, } { tc := tc @@ -123,9 +124,170 @@ func TestSink_SignedURL(t *testing.T) { if tc.expectedErr == nil { require.NoError(t, err) require.Regexp(t, "http://example\\.com", uri) + } else { + require.ErrorIs(t, err, tc.expectedErr) + } + }) + } +} + +func TestSink_GenerateOneAtATime(t *testing.T) { + t.Parallel() + + cfg := testcfg.Build(t) + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + setup func(t *testing.T, repoPath string) + expectedErr error + }{ + { + desc: "creates bundle successfully", + setup: func(t *testing.T, repoPath string) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + }, + }, + { + desc: "fails with missing HEAD", + setup: func(t *testing.T, repoPath string) {}, + expectedErr: structerr.NewFailedPrecondition("ref %q does not exist: %w", "refs/heads/main", fmt.Errorf("create bundle: %w", localrepo.ErrEmptyBundle)), + }, + } { + tc := tc + + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + tc.setup(t, repoPath) + + doneChan := make(chan struct{}) + errChan := make(chan error) + sinkDir := t.TempDir() + sink, err := NewSink( + ctx, + "file://"+sinkDir, + WithBundleGenerationNotifier( + func(_ string, err error) { + close(doneChan) + errChan <- err + }, + ), + ) + require.NoError(t, err) + + go func() { + err := sink.GenerateOneAtATime(ctx, repo) + require.NoError(t, err) + }() + + <-doneChan + err = <-errChan + + if tc.expectedErr == nil { + require.NoError(t, err) + require.FileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) } else { require.Equal(t, err, tc.expectedErr, err) } }) } } + +func TestSink_GenerateOneAtATimeConcurrent(t *testing.T) { + t.Parallel() + + cfg := testcfg.Build(t) + ctx := testhelper.Context(t) + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + + doneChan, startNotifierCh := make(chan struct{}), make(chan struct{}) + errChan := make(chan error) + + sinkDir := t.TempDir() + sink, err := NewSink( + ctx, + "file://"+sinkDir, + WithBundleGenerationNotifier( + func(_ string, err error) { + close(startNotifierCh) + close(doneChan) + errChan <- err + }, + ), + ) + require.NoError(t, err) + + go func() { + err := sink.GenerateOneAtATime(ctx, repo) + require.NoError(t, err) + }() + + <-startNotifierCh + + err = sink.GenerateOneAtATime(ctx, repo) + require.ErrorIs(t, err, ErrBundleGenerationInProgress) + + <-doneChan + err = <-errChan + + require.NoError(t, err) + require.FileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) +} + +func TestSink_GenerateOneAtATime_ContextCancelled(t *testing.T) { + t.Parallel() + + cfg := testcfg.Build(t) + ctx := testhelper.Context(t) + ctx, cancel := context.WithCancel(ctx) + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + + errChan := make(chan error) + + sinkDir := t.TempDir() + sink, err := NewSink( + ctx, + "file://"+sinkDir, + WithBundleGenerationNotifier( + func(_ string, err error) { + errChan <- err + }, + ), + ) + require.NoError(t, err) + + cancel() + + go func() { + require.NoError(t, sink.GenerateOneAtATime(ctx, repo)) + }() + + err = <-errChan + + require.ErrorIs(t, err, context.Canceled) + require.NoFileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) +} -- GitLab From a2f0488c4dc46508150b1f23383164fb2d3ebdc0 Mon Sep 17 00:00:00 2001 From: John Cai Date: Thu, 4 Jul 2024 21:00:21 -0700 Subject: [PATCH 3/5] testserver: Allow passing in partition manager There are tests that need a partition manager to be present. For this reason, add a server option that allows the passing in of a PartitionManager. --- internal/testhelper/testserver/gitaly.go | 56 ++++++++++++++---------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index d3dc6aca66..acd1e15160 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -290,6 +290,7 @@ type gitalyServerDeps struct { signingKey string transactionRegistry *storagemgr.TransactionRegistry procReceiveRegistry *hook.ProcReceiveRegistry + partitionManager *storagemgr.PartitionManager } func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies { @@ -333,27 +334,30 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * var partitionManager *storagemgr.PartitionManager if testhelper.IsWALEnabled() { - dbMgr, err := keyvalue.NewDBManager( - cfg.Storages, - keyvalue.NewBadgerStore, - helper.NewNullTickerFactory(), - gsd.logger, - ) - require.NoError(tb, err) - tb.Cleanup(dbMgr.Close) - - partitionManager, err = storagemgr.NewPartitionManager( - testhelper.Context(tb), - cfg.Storages, - gsd.gitCmdFactory, - localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), - gsd.logger, - dbMgr, - cfg.Prometheus, - nil, - ) - require.NoError(tb, err) - tb.Cleanup(partitionManager.Close) + if gsd.partitionManager == nil { + dbMgr, err := keyvalue.NewDBManager( + cfg.Storages, + keyvalue.NewBadgerStore, + helper.NewNullTickerFactory(), + gsd.logger, + ) + require.NoError(tb, err) + tb.Cleanup(dbMgr.Close) + + partitionManager, err = storagemgr.NewPartitionManager( + testhelper.Context(tb), + cfg.Storages, + gsd.gitCmdFactory, + localrepo.NewFactory(gsd.logger, gsd.locator, gsd.gitCmdFactory, gsd.catfileCache), + gsd.logger, + dbMgr, + cfg.Prometheus, + nil, + ) + require.NoError(tb, err) + tb.Cleanup(partitionManager.Close) + gsd.partitionManager = partitionManager + } } if gsd.hookMgr == nil { @@ -433,7 +437,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * UpdaterWithHooks: gsd.updaterWithHooks, HousekeepingManager: gsd.housekeepingManager, TransactionRegistry: gsd.transactionRegistry, - PartitionManager: partitionManager, + PartitionManager: gsd.partitionManager, BackupSink: gsd.backupSink, BackupLocator: gsd.backupLocator, BundleURISink: gsd.bundleURISink, @@ -591,3 +595,11 @@ func WithProcReceiveRegistry(registry *hook.ProcReceiveRegistry) GitalyServerOpt return deps } } + +// WithPartitionManager sets the proc receive registry that will be used for Gitaly services. +func WithPartitionManager(partitionMgr *storagemgr.PartitionManager) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.partitionManager = partitionMgr + return deps + } +} -- GitLab From 187325a55659568840ca2a7b59e5645c58f71c9c Mon Sep 17 00:00:00 2001 From: John Cai Date: Wed, 17 Jul 2024 20:46:25 -0400 Subject: [PATCH 4/5] testhelper: Allow passing in InProgressTracker Allow passing in an InProgressTracker to the test server --- internal/testhelper/testserver/gitaly.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index acd1e15160..56a9b1c069 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -291,6 +291,7 @@ type gitalyServerDeps struct { transactionRegistry *storagemgr.TransactionRegistry procReceiveRegistry *hook.ProcReceiveRegistry partitionManager *storagemgr.PartitionManager + inProgressTracker *service.InProgressTracker } func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) *service.Dependencies { @@ -332,6 +333,10 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * gsd.procReceiveRegistry = hook.NewProcReceiveRegistry() } + if gsd.inProgressTracker == nil { + gsd.inProgressTracker = service.NewInProgressTracker() + } + var partitionManager *storagemgr.PartitionManager if testhelper.IsWALEnabled() { if gsd.partitionManager == nil { @@ -442,6 +447,7 @@ func (gsd *gitalyServerDeps) createDependencies(tb testing.TB, cfg config.Cfg) * BackupLocator: gsd.backupLocator, BundleURISink: gsd.bundleURISink, ProcReceiveRegistry: gsd.procReceiveRegistry, + InProgressTracker: gsd.inProgressTracker, } } @@ -571,6 +577,14 @@ func WithBundleURISink(sink *bundleuri.Sink) GitalyServerOpt { } } +// WithInProgressTracker sets the bundleuri.Sink that will be used for Gitaly services +func WithInProgressTracker(tracker *service.InProgressTracker) GitalyServerOpt { + return func(deps gitalyServerDeps) gitalyServerDeps { + deps.inProgressTracker = tracker + return deps + } +} + // WithSigningKey sets the signing key path that will be used for Gitaly // services. func WithSigningKey(signingKey string) GitalyServerOpt { -- GitLab From 42a1cbf39ec763b08c7e6a066c28db5ce703df38 Mon Sep 17 00:00:00 2001 From: John Cai Date: Wed, 17 Jul 2024 20:48:26 -0400 Subject: [PATCH 5/5] smarthttp: Generate bundle in background clone, check if 5 or more clones are already happening for this repository. If so, and a bundle does not exist, then we want to generate one because this is likely a busy repository. --- internal/cli/gitaly/serve.go | 13 +- .../featureflag/ff_autogenerate_bundles.go | 9 + internal/gitaly/config/config.go | 2 + internal/gitaly/service/dependencies.go | 6 + internal/gitaly/service/smarthttp/server.go | 16 +- .../gitaly/service/smarthttp/upload_pack.go | 80 +++++- .../service/smarthttp/upload_pack_test.go | 232 ++++++++++++++++++ 7 files changed, 352 insertions(+), 6 deletions(-) create mode 100644 internal/featureflag/ff_autogenerate_bundles.go diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index c94361a4df..788c1cbd96 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -491,7 +491,17 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { var bundleURISink *bundleuri.Sink if cfg.BundleURI.GoCloudURL != "" { - bundleURISink, err = bundleuri.NewSink(ctx, cfg.BundleURI.GoCloudURL) + bundleURISink, err = bundleuri.NewSink( + ctx, + cfg.BundleURI.GoCloudURL, + bundleuri.WithBundleGenerationNotifier(func(bundlePath string, err error) { + if err != nil { + logger.WithField("bundle_path", bundlePath). + WithError(err). + Warn("bundle generation failed") + } + }), + ) if err != nil { return fmt.Errorf("create bundle-URI sink: %w", err) } @@ -540,6 +550,7 @@ func run(appCtx *cli.Context, cfg config.Cfg, logger log.Logger) error { BackupSink: backupSink, BackupLocator: backupLocator, BundleURISink: bundleURISink, + InProgressTracker: service.NewInProgressTracker(), }) b.RegisterStarter(starter.New(c, srv, logger)) } diff --git a/internal/featureflag/ff_autogenerate_bundles.go b/internal/featureflag/ff_autogenerate_bundles.go new file mode 100644 index 0000000000..61692778f3 --- /dev/null +++ b/internal/featureflag/ff_autogenerate_bundles.go @@ -0,0 +1,9 @@ +package featureflag + +// AutogenerateBundlesForBundleURI enables the use of git's bundle URI feature +var AutogenerateBundlesForBundleURI = NewFeatureFlag( + "autogenerate_bundles_for_bundleuri", + "v17.3.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/6204", + false, +) diff --git a/internal/gitaly/config/config.go b/internal/gitaly/config/config.go index 48392af76a..4fcad0c4b4 100644 --- a/internal/gitaly/config/config.go +++ b/internal/gitaly/config/config.go @@ -618,6 +618,8 @@ type BundleURIConfig struct { // GoCloudURL is the blob storage GoCloud URL that will be used to store // Git bundles for Bundle-URI use. GoCloudURL string `toml:"go_cloud_url,omitempty" json:"go_cloud_url,omitempty"` + // Autogeneration controls whether or not bundles for bundle uris are auto generated + Autogeneration bool `toml:"autogeneration,omitempty" json:"autogeneration"` } // Validate runs validation on all fields and returns any errors found. diff --git a/internal/gitaly/service/dependencies.go b/internal/gitaly/service/dependencies.go index 4fc6f952f9..73f45e525b 100644 --- a/internal/gitaly/service/dependencies.go +++ b/internal/gitaly/service/dependencies.go @@ -48,6 +48,7 @@ type Dependencies struct { BackupLocator backup.Locator BundleURISink *bundleuri.Sink ProcReceiveRegistry *gitalyhook.ProcReceiveRegistry + InProgressTracker *InProgressTracker } // GetLogger returns the logger. @@ -164,3 +165,8 @@ func (dc *Dependencies) GetBundleURISink() *bundleuri.Sink { func (dc *Dependencies) GetProcReceiveRegistry() *gitalyhook.ProcReceiveRegistry { return dc.ProcReceiveRegistry } + +// GetInProgressTracker returns the ProcReceiveRegistry. +func (dc *Dependencies) GetInProgressTracker() *InProgressTracker { + return dc.InProgressTracker +} diff --git a/internal/gitaly/service/smarthttp/server.go b/internal/gitaly/service/smarthttp/server.go index 528deeb82e..ffd455aa27 100644 --- a/internal/gitaly/service/smarthttp/server.go +++ b/internal/gitaly/service/smarthttp/server.go @@ -34,6 +34,10 @@ type server struct { backupLocator backup.Locator backupSink backup.Sink bundleURISink *bundleuri.Sink + inflightTracker *service.InProgressTracker + generateBundles bool + partitionMgr *storagemgr.PartitionManager + transactionRegistry *storagemgr.TransactionRegistry } // NewServer creates a new instance of a grpc SmartHTTPServer @@ -52,10 +56,14 @@ func NewServer(deps *service.Dependencies, serverOpts ...ServerOpt) gitalypb.Sma prometheus.CounterOpts{}, []string{"git_negotiation_feature"}, ), - infoRefCache: newInfoRefCache(deps.GetLogger(), deps.GetDiskCache()), - backupLocator: deps.GetBackupLocator(), - backupSink: deps.GetBackupSink(), - bundleURISink: deps.GetBundleURISink(), + infoRefCache: newInfoRefCache(deps.GetLogger(), deps.GetDiskCache()), + backupLocator: deps.GetBackupLocator(), + backupSink: deps.GetBackupSink(), + bundleURISink: deps.GetBundleURISink(), + inflightTracker: deps.GetInProgressTracker(), + generateBundles: deps.GetCfg().BundleURI.Autogeneration, + partitionMgr: deps.GetPartitionManager(), + transactionRegistry: deps.GetTransactionRegistry(), } for _, serverOpt := range serverOpts { diff --git a/internal/gitaly/service/smarthttp/upload_pack.go b/internal/gitaly/service/smarthttp/upload_pack.go index 05b5dedb7b..ee1173fb47 100644 --- a/internal/gitaly/service/smarthttp/upload_pack.go +++ b/internal/gitaly/service/smarthttp/upload_pack.go @@ -1,22 +1,35 @@ package smarthttp import ( + "bytes" "context" "crypto/sha1" "errors" "fmt" "io" + "time" + "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri" "gitlab.com/gitlab-org/gitaly/v16/internal/command" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagectx" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) +const ( + concurrentUploadPackThreshold = 5 + bundleGenerationTimeout = 24 * time.Hour +) + func (s *server) PostUploadPackWithSidechannel(ctx context.Context, req *gitalypb.PostUploadPackWithSidechannelRequest) (*gitalypb.PostUploadPackWithSidechannelResponse, error) { repoPath, gitConfig, err := s.validateUploadPackRequest(ctx, req) if err != nil { @@ -117,21 +130,85 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack gitConfig = append(gitConfig, bundleuri.CapabilitiesGitConfig(ctx)...) + txID := storage.ExtractTransactionID(ctx) + + var originalRepo *gitalypb.Repository + + if txID != 0 { + currentTx, err := s.transactionRegistry.Get(txID) + if err != nil { + return nil, structerr.NewInternal("error getting transaction: %w", err) + } + originalRepo = currentTx.OriginalRepository(req.GetRepository()) + } else { + originalRepo = req.GetRepository() + } + + key := originalRepo.GetGlRepository() + uploadPackConfig, err := bundleuri.UploadPackGitConfig(ctx, s.bundleURISink, req.GetRepository()) if err != nil { - log.AddFields(ctx, log.Fields{"bundle_uri_error": err}) + if errors.Is(err, bundleuri.ErrBundleNotFound) && + featureflag.AutogenerateBundlesForBundleURI.IsEnabled(ctx) && + s.generateBundles && + s.inflightTracker.GetInProgress(key) > concurrentUploadPackThreshold { + + go func() { + ctx, cancel := context.WithTimeout(context.Background(), bundleGenerationTimeout) + defer cancel() + + tx, err := s.partitionMgr.Begin( + ctx, + originalRepo.GetStorageName(), + originalRepo.GetRelativePath(), + 0, + storagemgr.TransactionOptions{ + ReadOnly: true, + }, + ) + if err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("failed starting transaction") + } + + ctx = storagectx.ContextWithTransaction(ctx, tx) + + if err := s.bundleURISink.GenerateOneAtATime(ctx, localrepo.New( + s.logger, + s.locator, + s.gitCmdFactory, + s.catfileCache, + originalRepo)); err != nil { + ctxlogrus.Extract(ctx).WithError(err).Error("generate bundle") + if err := tx.Rollback(); err != nil && !errors.Is(err, storagemgr.ErrTransactionAlreadyCommitted) { + ctxlogrus.Extract(ctx).WithError(err).Error("failed rolling back transaction") + } + } + + if err := tx.Commit(ctx); err != nil && !errors.Is(err, storagemgr.ErrTransactionAlreadyCommitted) { + ctxlogrus.Extract(ctx).WithError(err).Error("committing transaction") + } + }() + } else if !errors.Is(err, bundleuri.ErrSinkMissing) { + log.AddFields(ctx, log.Fields{"bundle_uri_error": err}) + } } else { gitConfig = append(gitConfig, uploadPackConfig...) } + var stderr bytes.Buffer + commandOpts := []git.CmdOpt{ git.WithStdin(stdin), + git.WithStderr(&stderr), git.WithSetupStdout(), git.WithGitProtocol(s.logger, req), git.WithConfig(gitConfig...), git.WithPackObjectsHookEnv(req.GetRepository(), "http"), } + s.inflightTracker.IncrementInProgress(key) + defer s.inflightTracker.DecrementInProgress(key) + cmd, err := s.gitCmdFactory.New(ctx, req.GetRepository(), git.Command{ Name: "upload-pack", Flags: []git.Option{git.Flag{Name: "--stateless-rpc"}}, @@ -160,5 +237,6 @@ func (s *server) runUploadPack(ctx context.Context, req *gitalypb.PostUploadPack } s.logger.WithField("request_sha", fmt.Sprintf("%x", h.Sum(nil))).WithField("response_bytes", respBytes).InfoContext(ctx, "request details") + return nil, nil } diff --git a/internal/gitaly/service/smarthttp/upload_pack_test.go b/internal/gitaly/service/smarthttp/upload_pack_test.go index 944268aee5..8c91384c5f 100644 --- a/internal/gitaly/service/smarthttp/upload_pack_test.go +++ b/internal/gitaly/service/smarthttp/upload_pack_test.go @@ -21,12 +21,17 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/bundleuri" "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/git/pktline" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/helper" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" @@ -380,6 +385,7 @@ func TestServer_PostUploadPackWithBundleURI(t *testing.T) { ctx := testhelper.Context(t) ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.BundleURI, true) + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.AutogenerateBundlesForBundleURI, false) tempDir := testhelper.TempDir(t) keyFile, err := os.Create(filepath.Join(tempDir, "secret.key")) @@ -500,6 +506,232 @@ func TestServer_PostUploadPackWithBundleURI(t *testing.T) { } } +func TestServer_PostUploadPackAutogenerateBundles(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.AutogenerateBundlesForBundleURI, true) + ctx = featureflag.ContextWithFeatureFlag(ctx, featureflag.BundleURI, true) + + tempDir := testhelper.TempDir(t) + keyFile, err := os.Create(filepath.Join(tempDir, "secret.key")) + require.NoError(t, err) + _, err = keyFile.WriteString("super-secret-key") + require.NoError(t, err) + require.NoError(t, keyFile.Close()) + + testCases := []struct { + desc string + sinkDir string + setup func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *service.InProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) + expectBundleGenerated bool + verifyBundle func(*testing.T, config.Cfg, string, git.ObjectID) + }{ + { + desc: "autogeneration successful", + setup: func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *service.InProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + + key := repoProto.GetGlRepository() + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + }, + expectBundleGenerated: true, + verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) { + tempDir := t.TempDir() + objectFormat := gittest.DefaultObjectHash.Format + gittest.Exec(t, cfg, "init", "--object-format="+objectFormat, tempDir) + gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath) + // A new bundle is expected to be created containing the new commit + gittest.RequireObjectExists(t, cfg, tempDir, commit) + }, + }, + { + desc: "bundle already exists", + setup: func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *service.InProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + key := repoProto.GetGlRepository() + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + tracker.IncrementInProgress(key) + + repo := localrepo.NewTestRepo(t, cfg, repoProto) + require.NoError(t, sink.Generate(ctx, repo)) + }, + expectBundleGenerated: false, + verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) { + tempDir := t.TempDir() + objectFormat := gittest.DefaultObjectHash.Format + gittest.Exec(t, cfg, "init", "--object-format="+objectFormat, tempDir) + gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath) + // No new bundle is expected to be created since one already existed. + gittest.RequireObjectNotExists(t, cfg, tempDir, commit) + }, + }, + { + desc: "no concurrent upload packs in flight", + setup: func( + t *testing.T, + ctx context.Context, + cfg config.Cfg, + sink *bundleuri.Sink, + tracker *service.InProgressTracker, + repoProto *gitalypb.Repository, + repoPath string, + ) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + }, + expectBundleGenerated: false, + verifyBundle: func(t *testing.T, cfg config.Cfg, bundlePath string, commit git.ObjectID) { + tempDir := t.TempDir() + gittest.Exec(t, cfg, "init", tempDir) + gittest.Exec(t, cfg, "-C", tempDir, "bundle", "unbundle", bundlePath) + // No new bundle is expected to have been created because there are no + // inflight upload pack calls. + gittest.RequireObjectNotExists(t, cfg, tempDir, commit) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + doneChan := make(chan struct{}) + errChan := make(chan error) + + var bundlePath string + + bundleGeneratedNotifier := func(path string, err error) { + bundlePath = path + + close(doneChan) + errChan <- err + } + + tracker := service.NewInProgressTracker() + + sinkDir := t.TempDir() + sink, err := bundleuri.NewSink(ctx, "file://"+sinkDir, bundleuri.WithBundleGenerationNotifier(bundleGeneratedNotifier)) + require.NoError(t, err) + + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + + cfg.BundleURI.Autogeneration = true + + gitCmdFactory := gittest.NewCommandFactory(t, cfg) + locator := config.NewLocator(cfg) + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + dbMgr, err := keyvalue.NewDBManager( + cfg.Storages, + keyvalue.NewBadgerStore, + helper.NewNullTickerFactory(), + logger, + ) + require.NoError(t, err) + t.Cleanup(dbMgr.Close) + + partitionManager, err := storagemgr.NewPartitionManager( + ctx, + cfg.Storages, + gitCmdFactory, + localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache), + logger, + dbMgr, + cfg.Prometheus, + nil, + ) + require.NoError(t, err) + t.Cleanup(partitionManager.Close) + + server := startSmartHTTPServerWithOptions(t, cfg, nil, []testserver.GitalyServerOpt{ + testserver.WithBundleURISink(sink), + testserver.WithLogger(logger), + testserver.WithInProgressTracker(tracker), + testserver.WithTransactionRegistry(storagemgr.NewTransactionRegistry()), + testserver.WithPartitionManager(partitionManager), + testserver.WithGitCommandFactory(gitCmdFactory), + }) + + cfg.SocketPath = server.Address() + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg) + oldCommit := gittest.WriteCommit(t, cfg, repoPath) + newCommit := gittest.WriteCommit(t, cfg, repoPath, gittest.WithBranch("master"), gittest.WithParents(oldCommit)) + + if tc.setup != nil { + tc.setup(t, ctx, cfg, sink, tracker, repoProto, repoPath) + } + + commitInUpdatedBundle := gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "CHANGELOG", Content: "nothing changed"}), + gittest.WithBranch("main")) + + // UploadPack request is a "want" packet line followed by a packet flush, then many "have" packets followed by a packet flush. + // This is explained a bit in https://git-scm.com/book/en/v2/Git-Internals-Transfer-Protocols#_downloading_data + requestBuffer := &bytes.Buffer{} + gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("want %s %s\n", newCommit, clientCapabilities)) + gittest.WritePktlineFlush(t, requestBuffer) + gittest.WritePktlineString(t, requestBuffer, fmt.Sprintf("have %s\n", oldCommit)) + gittest.WritePktlineFlush(t, requestBuffer) + + req := &gitalypb.PostUploadPackWithSidechannelRequest{Repository: repoProto} + responseBuffer, err := makePostUploadPackWithSidechannelRequest(t, ctx, cfg.SocketPath, cfg.Auth.Token, req, requestBuffer) + require.NoError(t, err) + + pack, _, _ := extractPackDataFromResponse(t, responseBuffer) + require.NotEmpty(t, pack, "Expected to find a pack file in response, found none") + + if tc.expectBundleGenerated { + <-doneChan + err := <-errChan + require.NoError(t, err) + tc.verifyBundle(t, cfg, filepath.Join(sinkDir, bundlePath), commitInUpdatedBundle) + } else { + require.Empty(t, bundlePath) + } + }) + } +} + func testServerPostUploadPackWithSideChannelValidation(t *testing.T, ctx context.Context, makeRequest requestMaker, opts ...testcfg.Option) { cfg := testcfg.Build(t, opts...) serverSocketPath := runSmartHTTPServer(t, cfg) -- GitLab