From a908a023d93cda6f4efddf4b19e27500d126daa1 Mon Sep 17 00:00:00 2001 From: Olivier Campeau Date: Fri, 20 Dec 2024 13:56:50 -0500 Subject: [PATCH] refactor: Move bundle generation logic into the manager All the logic to generate a bundle is now handled by the manager. The Sink should only deal with data access (read/write) and data location. In a next commit, the `SignedURL()` method will also be moved from the Sink to the manager. This change brings a couple of advantages: 1. Better seperation of concern between the Sink and the Manager. This will help for future development of the feature. 2. Before this change, having a reference to the manager was not enough to manage a bundle. Because the Sink holds the logic to get the SignedURL, we also need to pass around the reference to the Sink used internally by the manager. This change aims at making the Manager the sole reference we need to manage bundle in their lifecycle. Although the `SignedURL` is still on the Sink, it will be moved in the next commit. But I tought it was easier to understand if I started by moving only the generation code in this commit. References: * https://gitlab.com/gitlab-org/gitaly/-/issues/6558 --- internal/bundleuri/git_config_test.go | 25 ++-- internal/bundleuri/manager.go | 116 +++++++++++++----- internal/bundleuri/manager_test.go | 58 ++++++++- internal/bundleuri/sink.go | 62 +--------- internal/bundleuri/sink_test.go | 53 +------- .../gitaly/service/repository/bundle_uri.go | 2 +- internal/gitaly/service/repository/server.go | 2 + 7 files changed, 160 insertions(+), 158 deletions(-) diff --git a/internal/bundleuri/git_config_test.go b/internal/bundleuri/git_config_test.go index 882e6ddb90..af1b64601a 100644 --- a/internal/bundleuri/git_config_test.go +++ b/internal/bundleuri/git_config_test.go @@ -25,7 +25,7 @@ func TestUploadPackGitConfig(t *testing.T) { } func testUploadPackGitConfig(t *testing.T, ctx context.Context) { - t.Parallel() + //t.Parallel() cfg := testcfg.Build(t) repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ @@ -45,7 +45,7 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { require.NoError(t, keyFile.Close()) type setupData struct { - sink *Sink + manager *GenerationManager } for _, tc := range []struct { @@ -57,7 +57,9 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { { desc: "no sink", setup: func(t *testing.T) setupData { - return setupData{} + return setupData{ + manager: &GenerationManager{}, + } }, expectedConfig: nil, expectedErr: errors.New("bundle-URI sink missing"), @@ -67,10 +69,11 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { setup: func(t *testing.T) setupData { sinkDir := t.TempDir() sink, err := NewSink(ctx, "file://"+sinkDir+"?base_url=http://example.com&secret_key_path="+keyFile.Name()) + manager := NewGenerationManager(sink, testhelper.NewLogger(t), 1, 0, NewInProgressTracker()) require.NoError(t, err) return setupData{ - sink: sink, + manager: manager, } }, expectedConfig: nil, @@ -81,12 +84,13 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { setup: func(t *testing.T) setupData { sinkDir := t.TempDir() sink, err := NewSink(ctx, "file://"+sinkDir) + manager := NewGenerationManager(sink, testhelper.NewLogger(t), 1, 0, NewInProgressTracker()) require.NoError(t, err) - require.NoError(t, sink.Generate(ctx, repo)) + require.NoError(t, generate(ctx, sink, repo)) return setupData{ - sink: sink, + manager: manager, } }, expectedConfig: nil, @@ -97,12 +101,13 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { setup: func(t *testing.T) setupData { sinkDir := t.TempDir() sink, err := NewSink(ctx, "file://"+sinkDir+"?base_url=http://example.com&secret_key_path="+keyFile.Name()) + manager := NewGenerationManager(sink, testhelper.NewLogger(t), 1, 0, NewInProgressTracker()) require.NoError(t, err) - require.NoError(t, sink.Generate(ctx, repo)) + require.NoError(t, generate(ctx, sink, repo)) return setupData{ - sink: sink, + manager: manager, } }, expectedConfig: []gitcmd.ConfigPair{ @@ -134,10 +139,10 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { }, } { t.Run(tc.desc, func(t *testing.T) { - t.Parallel() + //t.Parallel() data := tc.setup(t) - sink := data.sink + sink := data.manager.sink actual, err := UploadPackGitConfig(ctx, sink, repoProto) diff --git a/internal/bundleuri/manager.go b/internal/bundleuri/manager.go index 92270dae15..e2abe11f8a 100644 --- a/internal/bundleuri/manager.go +++ b/internal/bundleuri/manager.go @@ -2,7 +2,15 @@ package bundleuri import ( "context" + "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v16/internal/backup" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "io" + "path/filepath" + "strings" "sync" "time" @@ -72,14 +80,23 @@ func (g *GenerationManager) Collect(metrics chan<- prometheus.Metric) { } } -// StopAll blocks until all of the goroutines that are generating bundles are finished. +// StopAll blocks until all goroutines that are generating bundles are finished. func (g *GenerationManager) StopAll() { g.cancel() g.wg.Wait() } -func (g *GenerationManager) generate(ctx context.Context, repo *localrepo.Repo) error { - bundlePath := g.sink.relativePath(repo, defaultBundle) +// GenerateIfAboveThreshold runs given function f(). While that function is running it +// has incremented an "in progress" counter. When there are multiple concurrent +// calls making the counter for the given repository reach the threshold, a +// background goroutine to generate a bundle is started. +func (g *GenerationManager) GenerateIfAboveThreshold(ctx context.Context, repo *localrepo.Repo, f func() error) error { + if !featureflag.BundleGeneration.IsEnabled(ctx) { + return nil + } + + repoPath := repo.GetRelativePath() + bundlePath := bundleRelativePath(repo, defaultBundle) shouldGenerate := func() bool { g.mutex.Lock() @@ -101,25 +118,6 @@ func (g *GenerationManager) generate(ctx context.Context, repo *localrepo.Repo) return nil } - defer func() { - g.mutex.Lock() - defer g.mutex.Unlock() - delete(g.bundleGenerationInProgress, bundlePath) - }() - - if err := g.sink.Generate(ctx, repo); err != nil { - return fmt.Errorf("generate: %w", err) - } - - return nil -} - -// GenerateIfAboveThreshold runs given function f(). While that function is running it -// has incremented an "in progress" counter. When there are multiple concurrent -// calls making the counter for the given repository reach the threshold, a -// background goroutine to generate a bundle is started. -func (g *GenerationManager) GenerateIfAboveThreshold(ctx context.Context, repo *localrepo.Repo, f func() error) error { - repoPath := repo.GetRelativePath() g.inProgressTracker.IncrementInProgress(repoPath) defer g.inProgressTracker.DecrementInProgress(repoPath) @@ -127,19 +125,75 @@ func (g *GenerationManager) GenerateIfAboveThreshold(ctx context.Context, repo * g.wg.Add(1) go func() { defer g.wg.Done() - if featureflag.BundleGeneration.IsEnabled(ctx) { - start := time.Now() - if err := g.generate(g.ctx, repo); err != nil { - g.logger.WithField("gl_project_path", repo.GetGlProjectPath()). - WithError(err). - Error("failed to generate bundle") - return - } - bundleGenerationLatency.Observe(time.Since(start).Seconds()) + start := time.Now() + + defer func() { + g.mutex.Lock() + defer g.mutex.Unlock() + delete(g.bundleGenerationInProgress, bundlePath) + }() + if err := generate(g.ctx, g.sink, repo); err != nil { + g.logger.WithField("gl_project_path", repo.GetGlProjectPath()). + WithError(err). + Error("failed to generate bundle") + return } + bundleGenerationLatency.Observe(time.Since(start).Seconds()) + g.logger.WithField("gl_project_path", repo.GetGlProjectPath()).Info("bundle generation") }() } return f() } + +// generate contains the actual logic to generate a bundle. This function does not validate +// the condition in which a bundle must be created. Calling this function will generate a bundle +// no matter what the context is. +func generate(ctx context.Context, sink *Sink, repo *localrepo.Repo) (returnErr error) { + bundlePath := bundleRelativePath(repo, defaultBundle) + + ref, err := repo.HeadReference(ctx) + if err != nil { + return fmt.Errorf("resolve HEAD ref: %w", err) + } + + repoProto, ok := repo.Repository.(*gitalypb.Repository) + if !ok { + return fmt.Errorf("unexpected repository type %t", repo.Repository) + } + + if tx := storage.ExtractTransaction(ctx); tx != nil { + origRepo := tx.OriginalRepository(repoProto) + bundlePath = bundleRelativePath(origRepo, defaultBundle) + } + + writer := backup.NewLazyWriter(func() (io.WriteCloser, error) { + return sink.getWriter(ctx, bundlePath) + }) + defer func() { + if err := writer.Close(); err != nil && returnErr == nil { + returnErr = fmt.Errorf("write bundle: %w", err) + } + }() + + opts := localrepo.CreateBundleOpts{ + Patterns: strings.NewReader(ref.String()), + } + + err = repo.CreateBundle(ctx, writer, &opts) + switch { + case errors.Is(err, localrepo.ErrEmptyBundle): + return structerr.NewFailedPrecondition("ref %q does not exist: %w", ref, err) + case err != nil: + return structerr.NewInternal("%w", err) + } + + return nil +} + +// bundleRelativePath returns a relative path of the bundle-URI bundle inside the bucket. +func bundleRelativePath(repo storage.Repository, name string) string { + repoPath := filepath.Join(repo.GetStorageName(), repo.GetRelativePath()) + return filepath.Join(repoPath, "uri", name+".bundle") +} diff --git a/internal/bundleuri/manager_test.go b/internal/bundleuri/manager_test.go index a5af51e579..edbd2c598e 100644 --- a/internal/bundleuri/manager_test.go +++ b/internal/bundleuri/manager_test.go @@ -99,10 +99,10 @@ func TestGenerationManager_GenerateIfAboveThreshold(t *testing.T) { if tc.expectFileExist { require.Equal(t, 1, testutil.CollectAndCount(manager, "gitaly_bundle_generation_seconds")) - require.FileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) + require.FileExists(t, filepath.Join(sinkDir, bundleRelativePath(repo, "default"))) return } - require.NoFileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) + require.NoFileExists(t, filepath.Join(sinkDir, bundleRelativePath(repo, "default"))) }) } @@ -123,7 +123,7 @@ func TestGenerationManager_GenerateIfAboveThreshold(t *testing.T) { // pretend like there is already another bundle generation happening for // this repo. - bundlePath := sink.relativePath(repo, defaultBundle) + bundlePath := bundleRelativePath(repo, defaultBundle) manager.bundleGenerationInProgress[bundlePath] = struct{}{} err = manager.GenerateIfAboveThreshold(ctx, repo, func() error { @@ -159,6 +159,56 @@ func TestGenerationManager_GenerateIfAboveThreshold(t *testing.T) { return nil }) require.NoError(t, err) - require.NoFileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) + require.NoFileExists(t, filepath.Join(sinkDir, bundleRelativePath(repo, "default"))) }) } + +func TestManager_generate(t *testing.T) { + t.Parallel() + + cfg := testcfg.Build(t) + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + setup func(t *testing.T, repoPath string) + expectedErr error + }{ + { + desc: "creates bundle successfully", + setup: func(t *testing.T, repoPath string) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + }, + }, + { + desc: "fails with missing HEAD", + setup: func(t *testing.T, repoPath string) {}, + expectedErr: structerr.NewFailedPrecondition("ref %q does not exist: %w", "refs/heads/main", fmt.Errorf("create bundle: %w", localrepo.ErrEmptyBundle)), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + tc.setup(t, repoPath) + + sinkDir := t.TempDir() + sink, err := NewSink(ctx, "file://"+sinkDir) + require.NoError(t, err) + + err = generate(ctx, sink, repo) + if tc.expectedErr == nil { + require.NoError(t, err) + require.FileExists(t, filepath.Join(sinkDir, bundleRelativePath(repo, "default"))) + } else { + require.Equal(t, err, tc.expectedErr, err) + } + }) + } +} diff --git a/internal/bundleuri/sink.go b/internal/bundleuri/sink.go index 87e76fb697..80a57db8b9 100644 --- a/internal/bundleuri/sink.go +++ b/internal/bundleuri/sink.go @@ -5,12 +5,8 @@ import ( "errors" "fmt" "io" - "path/filepath" - "strings" "time" - "gitlab.com/gitlab-org/gitaly/v16/internal/backup" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -45,17 +41,6 @@ func NewSink(ctx context.Context, uri string) (*Sink, error) { }, nil } -// relativePath returns a relative path of the bundle-URI bundle inside the -// bucket. -func (s *Sink) relativePath(repo storage.Repository, name string) string { - repoPath := filepath.Join( - repo.GetStorageName(), - repo.GetRelativePath(), - ) - - return filepath.Join(repoPath, "uri", name+".bundle") -} - // getWriter creates a writer to store data into a relative path on the // configured bucket. // It is the callers responsibility to Close the reader after usage. @@ -75,52 +60,9 @@ func (s *Sink) getWriter(ctx context.Context, relativePath string) (io.WriteClos return writer, nil } -// Generate creates a bundle for bundle-URI use into the bucket. -func (s Sink) Generate(ctx context.Context, repo *localrepo.Repo) (returnErr error) { - ref, err := repo.HeadReference(ctx) - if err != nil { - return fmt.Errorf("resolve HEAD ref: %w", err) - } - - bundlePath := s.relativePath(repo, defaultBundle) - - repoProto, ok := repo.Repository.(*gitalypb.Repository) - if !ok { - return fmt.Errorf("unexpected repository type %t", repo.Repository) - } - - if tx := storage.ExtractTransaction(ctx); tx != nil { - origRepo := tx.OriginalRepository(repoProto) - bundlePath = s.relativePath(origRepo, defaultBundle) - } - - writer := backup.NewLazyWriter(func() (io.WriteCloser, error) { - return s.getWriter(ctx, bundlePath) - }) - defer func() { - if err := writer.Close(); err != nil && returnErr == nil { - returnErr = fmt.Errorf("write bundle: %w", err) - } - }() - - opts := localrepo.CreateBundleOpts{ - Patterns: strings.NewReader(ref.String()), - } - - err = repo.CreateBundle(ctx, writer, &opts) - switch { - case errors.Is(err, localrepo.ErrEmptyBundle): - return structerr.NewFailedPrecondition("ref %q does not exist: %w", ref, err) - case err != nil: - return structerr.NewInternal("%w", err) - } - - return nil -} - // SignedURL returns a public URL to give anyone access to download the bundle from. func (s Sink) SignedURL(ctx context.Context, repo storage.Repository) (string, error) { - relativePath := s.relativePath(repo, defaultBundle) + relativePath := bundleRelativePath(repo, defaultBundle) repoProto, ok := repo.(*gitalypb.Repository) if !ok { @@ -129,7 +71,7 @@ func (s Sink) SignedURL(ctx context.Context, repo storage.Repository) (string, e if tx := storage.ExtractTransaction(ctx); tx != nil { origRepo := tx.OriginalRepository(repoProto) - relativePath = s.relativePath(origRepo, defaultBundle) + relativePath = bundleRelativePath(origRepo, defaultBundle) } if exists, err := s.bucket.Exists(ctx, relativePath); !exists { diff --git a/internal/bundleuri/sink_test.go b/internal/bundleuri/sink_test.go index 94af2834b5..b131446ab9 100644 --- a/internal/bundleuri/sink_test.go +++ b/internal/bundleuri/sink_test.go @@ -1,7 +1,6 @@ package bundleuri import ( - "fmt" "os" "path/filepath" "testing" @@ -15,56 +14,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" ) -func TestSink_Generate(t *testing.T) { - t.Parallel() - - cfg := testcfg.Build(t) - ctx := testhelper.Context(t) - - for _, tc := range []struct { - desc string - setup func(t *testing.T, repoPath string) - expectedErr error - }{ - { - desc: "creates bundle successfully", - setup: func(t *testing.T, repoPath string) { - gittest.WriteCommit(t, cfg, repoPath, - gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), - gittest.WithBranch("main")) - }, - }, - { - desc: "fails with missing HEAD", - setup: func(t *testing.T, repoPath string) {}, - expectedErr: structerr.NewFailedPrecondition("ref %q does not exist: %w", "refs/heads/main", fmt.Errorf("create bundle: %w", localrepo.ErrEmptyBundle)), - }, - } { - t.Run(tc.desc, func(t *testing.T) { - t.Parallel() - - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - repo := localrepo.NewTestRepo(t, cfg, repoProto) - - tc.setup(t, repoPath) - - sinkDir := t.TempDir() - sink, err := NewSink(ctx, "file://"+sinkDir) - require.NoError(t, err) - - err = sink.Generate(ctx, repo) - if tc.expectedErr == nil { - require.NoError(t, err) - require.FileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) - } else { - require.Equal(t, err, tc.expectedErr, err) - } - }) - } -} - func TestSink_SignedURL(t *testing.T) { t.Parallel() @@ -95,7 +44,7 @@ func TestSink_SignedURL(t *testing.T) { { desc: "signs bundle successfully", setup: func(t *testing.T, sinkDir string, sink *Sink) { - path := filepath.Join(sinkDir, sink.relativePath(repo, "default")) + path := filepath.Join(sinkDir, bundleRelativePath(repo, "default")) require.NoError(t, os.MkdirAll(filepath.Dir(path), mode.Directory)) require.NoError(t, os.WriteFile(path, []byte("hello"), mode.File)) }, diff --git a/internal/gitaly/service/repository/bundle_uri.go b/internal/gitaly/service/repository/bundle_uri.go index 6cdd22850c..fb1a21b581 100644 --- a/internal/gitaly/service/repository/bundle_uri.go +++ b/internal/gitaly/service/repository/bundle_uri.go @@ -20,7 +20,7 @@ func (s *server) GenerateBundleURI(ctx context.Context, req *gitalypb.GenerateBu repo := s.localRepoFactory.Build(repository) - if err := s.bundleURISink.Generate(ctx, repo); err != nil { + if err := s.bundleManager.GenerateIfAboveThreshold(ctx, repo, func() error { return nil }); err != nil { return nil, structerr.NewInternal("generate bundle: %w", err) } diff --git a/internal/gitaly/service/repository/server.go b/internal/gitaly/service/repository/server.go index 94c256eaac..628b65cb04 100644 --- a/internal/gitaly/service/repository/server.go +++ b/internal/gitaly/service/repository/server.go @@ -41,6 +41,7 @@ type server struct { repositoryCounter *counter.RepositoryCounter localRepoFactory localrepo.Factory licenseCache *unarycache.Cache[git.ObjectID, *gitalypb.FindLicenseResponse] + bundleManager *bundleuri.GenerationManager } // NewServer creates a new instance of a gRPC repo server @@ -62,6 +63,7 @@ func NewServer(deps *service.Dependencies) gitalypb.RepositoryServiceServer { repositoryCounter: deps.GetRepositoryCounter(), localRepoFactory: deps.GetRepositoryFactory(), licenseCache: newLicenseCache(), + bundleManager: deps.BundleGenerationManager, } } -- GitLab