diff --git a/internal/bundleuri/git_config_test.go b/internal/bundleuri/git_config_test.go index 882e6ddb90d308981a87ae1e4df4dcf76c67d0a8..af1b64601aba25f869eddc8c1f3b4567d644eede 100644 --- a/internal/bundleuri/git_config_test.go +++ b/internal/bundleuri/git_config_test.go @@ -25,7 +25,7 @@ func TestUploadPackGitConfig(t *testing.T) { } func testUploadPackGitConfig(t *testing.T, ctx context.Context) { - t.Parallel() + //t.Parallel() cfg := testcfg.Build(t) repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ @@ -45,7 +45,7 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { require.NoError(t, keyFile.Close()) type setupData struct { - sink *Sink + manager *GenerationManager } for _, tc := range []struct { @@ -57,7 +57,9 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { { desc: "no sink", setup: func(t *testing.T) setupData { - return setupData{} + return setupData{ + manager: &GenerationManager{}, + } }, expectedConfig: nil, expectedErr: errors.New("bundle-URI sink missing"), @@ -67,10 +69,11 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { setup: func(t *testing.T) setupData { sinkDir := t.TempDir() sink, err := NewSink(ctx, "file://"+sinkDir+"?base_url=http://example.com&secret_key_path="+keyFile.Name()) + manager := NewGenerationManager(sink, testhelper.NewLogger(t), 1, 0, NewInProgressTracker()) require.NoError(t, err) return setupData{ - sink: sink, + manager: manager, } }, expectedConfig: nil, @@ -81,12 +84,13 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { setup: func(t *testing.T) setupData { sinkDir := t.TempDir() sink, err := NewSink(ctx, "file://"+sinkDir) + manager := NewGenerationManager(sink, testhelper.NewLogger(t), 1, 0, NewInProgressTracker()) require.NoError(t, err) - require.NoError(t, sink.Generate(ctx, repo)) + require.NoError(t, generate(ctx, sink, repo)) return setupData{ - sink: sink, + manager: manager, } }, expectedConfig: nil, @@ -97,12 +101,13 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { setup: func(t *testing.T) setupData { sinkDir := t.TempDir() sink, err := NewSink(ctx, "file://"+sinkDir+"?base_url=http://example.com&secret_key_path="+keyFile.Name()) + manager := NewGenerationManager(sink, testhelper.NewLogger(t), 1, 0, NewInProgressTracker()) require.NoError(t, err) - require.NoError(t, sink.Generate(ctx, repo)) + require.NoError(t, generate(ctx, sink, repo)) return setupData{ - sink: sink, + manager: manager, } }, expectedConfig: []gitcmd.ConfigPair{ @@ -134,10 +139,10 @@ func testUploadPackGitConfig(t *testing.T, ctx context.Context) { }, } { t.Run(tc.desc, func(t *testing.T) { - t.Parallel() + //t.Parallel() data := tc.setup(t) - sink := data.sink + sink := data.manager.sink actual, err := UploadPackGitConfig(ctx, sink, repoProto) diff --git a/internal/bundleuri/manager.go b/internal/bundleuri/manager.go index 92270dae1522c556494516aa819ecb5bb42723c1..e2abe11f8a051fd9d4f42d2f6f933db65ded8afe 100644 --- a/internal/bundleuri/manager.go +++ b/internal/bundleuri/manager.go @@ -2,7 +2,15 @@ package bundleuri import ( "context" + "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v16/internal/backup" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "io" + "path/filepath" + "strings" "sync" "time" @@ -72,14 +80,23 @@ func (g *GenerationManager) Collect(metrics chan<- prometheus.Metric) { } } -// StopAll blocks until all of the goroutines that are generating bundles are finished. +// StopAll blocks until all goroutines that are generating bundles are finished. func (g *GenerationManager) StopAll() { g.cancel() g.wg.Wait() } -func (g *GenerationManager) generate(ctx context.Context, repo *localrepo.Repo) error { - bundlePath := g.sink.relativePath(repo, defaultBundle) +// GenerateIfAboveThreshold runs given function f(). While that function is running it +// has incremented an "in progress" counter. When there are multiple concurrent +// calls making the counter for the given repository reach the threshold, a +// background goroutine to generate a bundle is started. +func (g *GenerationManager) GenerateIfAboveThreshold(ctx context.Context, repo *localrepo.Repo, f func() error) error { + if !featureflag.BundleGeneration.IsEnabled(ctx) { + return nil + } + + repoPath := repo.GetRelativePath() + bundlePath := bundleRelativePath(repo, defaultBundle) shouldGenerate := func() bool { g.mutex.Lock() @@ -101,25 +118,6 @@ func (g *GenerationManager) generate(ctx context.Context, repo *localrepo.Repo) return nil } - defer func() { - g.mutex.Lock() - defer g.mutex.Unlock() - delete(g.bundleGenerationInProgress, bundlePath) - }() - - if err := g.sink.Generate(ctx, repo); err != nil { - return fmt.Errorf("generate: %w", err) - } - - return nil -} - -// GenerateIfAboveThreshold runs given function f(). While that function is running it -// has incremented an "in progress" counter. When there are multiple concurrent -// calls making the counter for the given repository reach the threshold, a -// background goroutine to generate a bundle is started. -func (g *GenerationManager) GenerateIfAboveThreshold(ctx context.Context, repo *localrepo.Repo, f func() error) error { - repoPath := repo.GetRelativePath() g.inProgressTracker.IncrementInProgress(repoPath) defer g.inProgressTracker.DecrementInProgress(repoPath) @@ -127,19 +125,75 @@ func (g *GenerationManager) GenerateIfAboveThreshold(ctx context.Context, repo * g.wg.Add(1) go func() { defer g.wg.Done() - if featureflag.BundleGeneration.IsEnabled(ctx) { - start := time.Now() - if err := g.generate(g.ctx, repo); err != nil { - g.logger.WithField("gl_project_path", repo.GetGlProjectPath()). - WithError(err). - Error("failed to generate bundle") - return - } - bundleGenerationLatency.Observe(time.Since(start).Seconds()) + start := time.Now() + + defer func() { + g.mutex.Lock() + defer g.mutex.Unlock() + delete(g.bundleGenerationInProgress, bundlePath) + }() + if err := generate(g.ctx, g.sink, repo); err != nil { + g.logger.WithField("gl_project_path", repo.GetGlProjectPath()). + WithError(err). + Error("failed to generate bundle") + return } + bundleGenerationLatency.Observe(time.Since(start).Seconds()) + g.logger.WithField("gl_project_path", repo.GetGlProjectPath()).Info("bundle generation") }() } return f() } + +// generate contains the actual logic to generate a bundle. This function does not validate +// the condition in which a bundle must be created. Calling this function will generate a bundle +// no matter what the context is. +func generate(ctx context.Context, sink *Sink, repo *localrepo.Repo) (returnErr error) { + bundlePath := bundleRelativePath(repo, defaultBundle) + + ref, err := repo.HeadReference(ctx) + if err != nil { + return fmt.Errorf("resolve HEAD ref: %w", err) + } + + repoProto, ok := repo.Repository.(*gitalypb.Repository) + if !ok { + return fmt.Errorf("unexpected repository type %t", repo.Repository) + } + + if tx := storage.ExtractTransaction(ctx); tx != nil { + origRepo := tx.OriginalRepository(repoProto) + bundlePath = bundleRelativePath(origRepo, defaultBundle) + } + + writer := backup.NewLazyWriter(func() (io.WriteCloser, error) { + return sink.getWriter(ctx, bundlePath) + }) + defer func() { + if err := writer.Close(); err != nil && returnErr == nil { + returnErr = fmt.Errorf("write bundle: %w", err) + } + }() + + opts := localrepo.CreateBundleOpts{ + Patterns: strings.NewReader(ref.String()), + } + + err = repo.CreateBundle(ctx, writer, &opts) + switch { + case errors.Is(err, localrepo.ErrEmptyBundle): + return structerr.NewFailedPrecondition("ref %q does not exist: %w", ref, err) + case err != nil: + return structerr.NewInternal("%w", err) + } + + return nil +} + +// bundleRelativePath returns a relative path of the bundle-URI bundle inside the bucket. +func bundleRelativePath(repo storage.Repository, name string) string { + repoPath := filepath.Join(repo.GetStorageName(), repo.GetRelativePath()) + return filepath.Join(repoPath, "uri", name+".bundle") +} diff --git a/internal/bundleuri/manager_test.go b/internal/bundleuri/manager_test.go index a5af51e579de00041ce5c7ea121b0987385869db..edbd2c598e523f5df4659acd72fca3d72109a2fc 100644 --- a/internal/bundleuri/manager_test.go +++ b/internal/bundleuri/manager_test.go @@ -99,10 +99,10 @@ func TestGenerationManager_GenerateIfAboveThreshold(t *testing.T) { if tc.expectFileExist { require.Equal(t, 1, testutil.CollectAndCount(manager, "gitaly_bundle_generation_seconds")) - require.FileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) + require.FileExists(t, filepath.Join(sinkDir, bundleRelativePath(repo, "default"))) return } - require.NoFileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) + require.NoFileExists(t, filepath.Join(sinkDir, bundleRelativePath(repo, "default"))) }) } @@ -123,7 +123,7 @@ func TestGenerationManager_GenerateIfAboveThreshold(t *testing.T) { // pretend like there is already another bundle generation happening for // this repo. - bundlePath := sink.relativePath(repo, defaultBundle) + bundlePath := bundleRelativePath(repo, defaultBundle) manager.bundleGenerationInProgress[bundlePath] = struct{}{} err = manager.GenerateIfAboveThreshold(ctx, repo, func() error { @@ -159,6 +159,56 @@ func TestGenerationManager_GenerateIfAboveThreshold(t *testing.T) { return nil }) require.NoError(t, err) - require.NoFileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) + require.NoFileExists(t, filepath.Join(sinkDir, bundleRelativePath(repo, "default"))) }) } + +func TestManager_generate(t *testing.T) { + t.Parallel() + + cfg := testcfg.Build(t) + ctx := testhelper.Context(t) + + for _, tc := range []struct { + desc string + setup func(t *testing.T, repoPath string) + expectedErr error + }{ + { + desc: "creates bundle successfully", + setup: func(t *testing.T, repoPath string) { + gittest.WriteCommit(t, cfg, repoPath, + gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), + gittest.WithBranch("main")) + }, + }, + { + desc: "fails with missing HEAD", + setup: func(t *testing.T, repoPath string) {}, + expectedErr: structerr.NewFailedPrecondition("ref %q does not exist: %w", "refs/heads/main", fmt.Errorf("create bundle: %w", localrepo.ErrEmptyBundle)), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ + SkipCreationViaService: true, + }) + repo := localrepo.NewTestRepo(t, cfg, repoProto) + + tc.setup(t, repoPath) + + sinkDir := t.TempDir() + sink, err := NewSink(ctx, "file://"+sinkDir) + require.NoError(t, err) + + err = generate(ctx, sink, repo) + if tc.expectedErr == nil { + require.NoError(t, err) + require.FileExists(t, filepath.Join(sinkDir, bundleRelativePath(repo, "default"))) + } else { + require.Equal(t, err, tc.expectedErr, err) + } + }) + } +} diff --git a/internal/bundleuri/sink.go b/internal/bundleuri/sink.go index 87e76fb697eb2b8cb5fe7926aa62a348bf8c94b2..80a57db8b93ac42abb0a578c3c2737ef707cd685 100644 --- a/internal/bundleuri/sink.go +++ b/internal/bundleuri/sink.go @@ -5,12 +5,8 @@ import ( "errors" "fmt" "io" - "path/filepath" - "strings" "time" - "gitlab.com/gitlab-org/gitaly/v16/internal/backup" - "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" @@ -45,17 +41,6 @@ func NewSink(ctx context.Context, uri string) (*Sink, error) { }, nil } -// relativePath returns a relative path of the bundle-URI bundle inside the -// bucket. -func (s *Sink) relativePath(repo storage.Repository, name string) string { - repoPath := filepath.Join( - repo.GetStorageName(), - repo.GetRelativePath(), - ) - - return filepath.Join(repoPath, "uri", name+".bundle") -} - // getWriter creates a writer to store data into a relative path on the // configured bucket. // It is the callers responsibility to Close the reader after usage. @@ -75,52 +60,9 @@ func (s *Sink) getWriter(ctx context.Context, relativePath string) (io.WriteClos return writer, nil } -// Generate creates a bundle for bundle-URI use into the bucket. -func (s Sink) Generate(ctx context.Context, repo *localrepo.Repo) (returnErr error) { - ref, err := repo.HeadReference(ctx) - if err != nil { - return fmt.Errorf("resolve HEAD ref: %w", err) - } - - bundlePath := s.relativePath(repo, defaultBundle) - - repoProto, ok := repo.Repository.(*gitalypb.Repository) - if !ok { - return fmt.Errorf("unexpected repository type %t", repo.Repository) - } - - if tx := storage.ExtractTransaction(ctx); tx != nil { - origRepo := tx.OriginalRepository(repoProto) - bundlePath = s.relativePath(origRepo, defaultBundle) - } - - writer := backup.NewLazyWriter(func() (io.WriteCloser, error) { - return s.getWriter(ctx, bundlePath) - }) - defer func() { - if err := writer.Close(); err != nil && returnErr == nil { - returnErr = fmt.Errorf("write bundle: %w", err) - } - }() - - opts := localrepo.CreateBundleOpts{ - Patterns: strings.NewReader(ref.String()), - } - - err = repo.CreateBundle(ctx, writer, &opts) - switch { - case errors.Is(err, localrepo.ErrEmptyBundle): - return structerr.NewFailedPrecondition("ref %q does not exist: %w", ref, err) - case err != nil: - return structerr.NewInternal("%w", err) - } - - return nil -} - // SignedURL returns a public URL to give anyone access to download the bundle from. func (s Sink) SignedURL(ctx context.Context, repo storage.Repository) (string, error) { - relativePath := s.relativePath(repo, defaultBundle) + relativePath := bundleRelativePath(repo, defaultBundle) repoProto, ok := repo.(*gitalypb.Repository) if !ok { @@ -129,7 +71,7 @@ func (s Sink) SignedURL(ctx context.Context, repo storage.Repository) (string, e if tx := storage.ExtractTransaction(ctx); tx != nil { origRepo := tx.OriginalRepository(repoProto) - relativePath = s.relativePath(origRepo, defaultBundle) + relativePath = bundleRelativePath(origRepo, defaultBundle) } if exists, err := s.bucket.Exists(ctx, relativePath); !exists { diff --git a/internal/bundleuri/sink_test.go b/internal/bundleuri/sink_test.go index 94af2834b5aa4466af2788af57263fec79f3b72a..b131446ab9aae05e036c5652ac3a20c492340cd5 100644 --- a/internal/bundleuri/sink_test.go +++ b/internal/bundleuri/sink_test.go @@ -1,7 +1,6 @@ package bundleuri import ( - "fmt" "os" "path/filepath" "testing" @@ -15,56 +14,6 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" ) -func TestSink_Generate(t *testing.T) { - t.Parallel() - - cfg := testcfg.Build(t) - ctx := testhelper.Context(t) - - for _, tc := range []struct { - desc string - setup func(t *testing.T, repoPath string) - expectedErr error - }{ - { - desc: "creates bundle successfully", - setup: func(t *testing.T, repoPath string) { - gittest.WriteCommit(t, cfg, repoPath, - gittest.WithTreeEntries(gittest.TreeEntry{Mode: "100644", Path: "README", Content: "much"}), - gittest.WithBranch("main")) - }, - }, - { - desc: "fails with missing HEAD", - setup: func(t *testing.T, repoPath string) {}, - expectedErr: structerr.NewFailedPrecondition("ref %q does not exist: %w", "refs/heads/main", fmt.Errorf("create bundle: %w", localrepo.ErrEmptyBundle)), - }, - } { - t.Run(tc.desc, func(t *testing.T) { - t.Parallel() - - repoProto, repoPath := gittest.CreateRepository(t, ctx, cfg, gittest.CreateRepositoryConfig{ - SkipCreationViaService: true, - }) - repo := localrepo.NewTestRepo(t, cfg, repoProto) - - tc.setup(t, repoPath) - - sinkDir := t.TempDir() - sink, err := NewSink(ctx, "file://"+sinkDir) - require.NoError(t, err) - - err = sink.Generate(ctx, repo) - if tc.expectedErr == nil { - require.NoError(t, err) - require.FileExists(t, filepath.Join(sinkDir, sink.relativePath(repo, "default"))) - } else { - require.Equal(t, err, tc.expectedErr, err) - } - }) - } -} - func TestSink_SignedURL(t *testing.T) { t.Parallel() @@ -95,7 +44,7 @@ func TestSink_SignedURL(t *testing.T) { { desc: "signs bundle successfully", setup: func(t *testing.T, sinkDir string, sink *Sink) { - path := filepath.Join(sinkDir, sink.relativePath(repo, "default")) + path := filepath.Join(sinkDir, bundleRelativePath(repo, "default")) require.NoError(t, os.MkdirAll(filepath.Dir(path), mode.Directory)) require.NoError(t, os.WriteFile(path, []byte("hello"), mode.File)) }, diff --git a/internal/gitaly/service/repository/bundle_uri.go b/internal/gitaly/service/repository/bundle_uri.go index 6cdd22850c59d2caef4d458e2b85ab2dc8ca0d72..fb1a21b581240f4fafe9b4f55f0b4ac48f134d76 100644 --- a/internal/gitaly/service/repository/bundle_uri.go +++ b/internal/gitaly/service/repository/bundle_uri.go @@ -20,7 +20,7 @@ func (s *server) GenerateBundleURI(ctx context.Context, req *gitalypb.GenerateBu repo := s.localRepoFactory.Build(repository) - if err := s.bundleURISink.Generate(ctx, repo); err != nil { + if err := s.bundleManager.GenerateIfAboveThreshold(ctx, repo, func() error { return nil }); err != nil { return nil, structerr.NewInternal("generate bundle: %w", err) } diff --git a/internal/gitaly/service/repository/server.go b/internal/gitaly/service/repository/server.go index 94c256eaac8b2f8dce4b498849252a338ddb87af..628b65cb04a5d822181bf2d65dc167552cff513c 100644 --- a/internal/gitaly/service/repository/server.go +++ b/internal/gitaly/service/repository/server.go @@ -41,6 +41,7 @@ type server struct { repositoryCounter *counter.RepositoryCounter localRepoFactory localrepo.Factory licenseCache *unarycache.Cache[git.ObjectID, *gitalypb.FindLicenseResponse] + bundleManager *bundleuri.GenerationManager } // NewServer creates a new instance of a gRPC repo server @@ -62,6 +63,7 @@ func NewServer(deps *service.Dependencies) gitalypb.RepositoryServiceServer { repositoryCounter: deps.GetRepositoryCounter(), localRepoFactory: deps.GetRepositoryFactory(), licenseCache: newLicenseCache(), + bundleManager: deps.BundleGenerationManager, } }