From ddf25efd74ea844cebcf9d202e36cbd3c6700db4 Mon Sep 17 00:00:00 2001 From: Eric Ju Date: Mon, 8 Jul 2024 15:46:18 -0400 Subject: [PATCH] repository_path: Add blob offload path functions As we are going to be able to offload blobs in some packfiles that might be stored on HDDs to save storage cost, we should have some repository path infrastructure to find the path where the new packfiles will be and to know if a repo uses blob offloading. Add offloading manager Change make file and revlist is working Use gitpipe.Catfile to upload objects Milesone: uplading and repakcing is working Add promisor creation TBD --- Makefile | 11 + internal/cli/gitaly/serve.go | 2 +- internal/git/gitpipe/revision.go | 21 ++ internal/git/housekeeping/config/config.go | 23 ++ internal/git/housekeeping/objects.go | 27 ++- .../git/housekeeping/optimization_strategy.go | 12 + internal/git/offloading/manager.go | 218 ++++++++++++++++++ internal/gitaly/storage/repository_path.go | 30 +++ .../gitaly/storage/repository_path_test.go | 60 +++++ this_is_a_test_for_local_remote | 0 10 files changed, 402 insertions(+), 2 deletions(-) create mode 100644 internal/git/offloading/manager.go create mode 100644 this_is_a_test_for_local_remote diff --git a/Makefile b/Makefile index 7b270f74c4..0d678b0adf 100644 --- a/Makefile +++ b/Makefile @@ -67,6 +67,7 @@ GOLANGCI_LINT_CONFIG ?= ${SOURCE_DIR}/.golangci.yml GITALY_PACKAGE := $(shell go list -m 2>/dev/null || echo unknown) GITALY_VERSION := $(shell ${GIT} describe --match v* 2>/dev/null | sed 's/^v//' || cat ${SOURCE_DIR}/VERSION 2>/dev/null || echo unknown) GO_LDFLAGS := -X ${GITALY_PACKAGE}/internal/version.version=${GITALY_VERSION} +GO_GCFLAGS ?= SERVER_BUILD_TAGS := tracer_static,tracer_static_jaeger,tracer_static_stackdriver,continuous_profiler_stackdriver # Temporary GNU build ID used as a placeholder value so that we can replace it @@ -599,7 +600,17 @@ ${BUILD_DIR}/intermediate/%: clear-go-build-cache-if-needed .FOR @ # of "TEMP_GITALY_BUILD_ID". In the final binary we replace this build ID with @ # the computed build ID for this binary. @ # We cd to SOURCE_DIR to avoid corner cases where workdir may be a symlink + @ # ${Q}cd ${SOURCE_DIR} && go build -o "$@" -gcflags '${GO_GCFLAGS}' -ldflags '-B 0x${TEMPORARY_BUILD_ID} ${GO_LDFLAGS}' -tags "${GO_BUILD_TAGS}" $(addprefix ${SOURCE_DIR}/cmd/,$(@F)) + +ifdef GO_GCFLAGS + ${Q}echo "Buiding $@ with -gcflags '${GO_GCFLAGS}'" + ${Q}cd ${SOURCE_DIR} && go build -o "$@" -gcflags '${GO_GCFLAGS}' -ldflags '-B 0x${TEMPORARY_BUILD_ID} ${GO_LDFLAGS}' -tags "${GO_BUILD_TAGS}" $(addprefix ${SOURCE_DIR}/cmd/,$(@F)) +else + @ # We're writing the version into the "version" file in Git's own source + @ # directory. If it exists, Git's Makefile will pick it up and use it as + @ # the version instead of auto-detecting via git-describe(1). ${Q}cd ${SOURCE_DIR} && go build -o "$@" -ldflags '-B 0x${TEMPORARY_BUILD_ID} ${GO_LDFLAGS}' -tags "${GO_BUILD_TAGS}" $(addprefix ${SOURCE_DIR}/cmd/,$(@F)) +endif # This is a build hack to avoid excessive rebuilding of targets. Instead of # depending on the Makefile, we start to depend on tool versions as defined in diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index c94361a4df..10f11c5efd 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -118,7 +118,7 @@ func serveAction(ctx *cli.Context) error { logger.Warn("Authentication is enabled but not enforced because transitioning=true. Gitaly will accept unauthenticated requests.") } - logger.WithField("version", version.GetVersion()).Info("Starting Gitaly") + logger.WithField("version", version.GetVersion()).Info("Starting Gitaly: POC OLO with Eric") fips.Check() if err := run(ctx, cfg, logger); err != nil { diff --git a/internal/git/gitpipe/revision.go b/internal/git/gitpipe/revision.go index 64bb328853..3aec8b7fd9 100644 --- a/internal/git/gitpipe/revision.go +++ b/internal/git/gitpipe/revision.go @@ -45,6 +45,7 @@ const ( type revlistConfig struct { blobLimit int objects bool + noObjectName bool objectType ObjectType order Order reverse bool @@ -56,6 +57,7 @@ type revlistConfig struct { regexIgnoreCase bool commitMessagePatterns [][]byte skipResult func(*RevisionResult) bool + filterPrintOmitted bool } // RevlistOption is an option for the revlist pipeline step. @@ -69,6 +71,18 @@ func WithObjects() RevlistOption { } } +func WithNoObjectNames() RevlistOption { + return func(cfg *revlistConfig) { + cfg.noObjectName = true + } +} + +func WithFilterPrintOmitted() RevlistOption { + return func(cfg *revlistConfig) { + cfg.filterPrintOmitted = true + } +} + // WithBlobLimit sets up a size limit for blobs. Only blobs whose size is smaller than this limit // will be returned by the pipeline step. func WithBlobLimit(limit int) RevlistOption { @@ -216,10 +230,17 @@ func Revlist( ) } + if cfg.noObjectName { + flags = append(flags, git.Flag{Name: "--no-object-names"}) + } + if cfg.blobLimit > 0 { flags = append(flags, git.Flag{ Name: fmt.Sprintf("--filter=blob:limit=%d", cfg.blobLimit), }) + if cfg.filterPrintOmitted { + flags = append(flags, git.Flag{Name: "--filter-print-omitted"}) + } } if cfg.objectType != "" { diff --git a/internal/git/housekeeping/config/config.go b/internal/git/housekeeping/config/config.go index e12f60d69c..0304d60d79 100644 --- a/internal/git/housekeeping/config/config.go +++ b/internal/git/housekeeping/config/config.go @@ -37,6 +37,26 @@ const ( // of objects. Loose objects will get soaked up as part of the repack regardless of their // reachability. RepackObjectsStrategyGeometric = RepackObjectsStrategy("geometric") + + // TODO [mark] POC of offloading large objects (OLO) + RepackObjectsStrategyOffloading = RepackObjectsStrategy("offloading") +) + +// FilterBlobAction defines what we want to do regarding blob offloading. +type FilterBlobAction string + +const ( + // FilterBlobActionNoChange means that we should filter blobs only if such filtering is already + // setup, and we shouldn't change the blob filtering setup. + FilterBlobActionNoChange = FilterBlobAction("no_change") + // FilterBlobActionStartFilteringAll means that we should setup filtering all blobs and perform + // such filtering. If blob filtering is already setup, this should be the same as + // FilterBlobActionNoChange. + FilterBlobActionStartFilteringAll = FilterBlobAction("start_filtering_all_blobs") + // FilterBlobActionStopFilteringAny means that we should remove any setup for filtering all blobs, + // stop performing such filtering and repack all the blobs with the other Git objects. If blob + // filtering is not already setup, this should be the same as FilterBlobActionNoChange. + FilterBlobActionStopFilteringAny = FilterBlobAction("stop_filtering_any_blob") ) // RepackObjectsConfig is configuration for RepackObjects. @@ -49,6 +69,9 @@ type RepackObjectsConfig struct { WriteBitmap bool // WriteMultiPackIndex determines whether a multi-pack index should be written or not. WriteMultiPackIndex bool + // FilterBlobs determines if we want to start or stop filtering out blobs into a separate + // packfile, or if we want to continue doing what we already do regarding blob filtering. + FilterBlobs FilterBlobAction // CruftExpireBefore determines the cutoff date before which unreachable cruft objects shall // be expired and thus deleted. CruftExpireBefore time.Time diff --git a/internal/git/housekeeping/objects.go b/internal/git/housekeeping/objects.go index 6685a6c38c..dca564af90 100644 --- a/internal/git/housekeeping/objects.go +++ b/internal/git/housekeeping/objects.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/offloading" + "os" "os/exec" "path/filepath" "strconv" @@ -39,7 +41,7 @@ func ValidateRepacking(cfg config.RepackObjectsConfig) (bool, error) { } case config.RepackObjectsStrategyGeometric: isFullRepack = false - case config.RepackObjectsStrategyFullWithCruft, config.RepackObjectsStrategyFullWithUnreachable: + case config.RepackObjectsStrategyFullWithCruft, config.RepackObjectsStrategyFullWithUnreachable, config.RepackObjectsStrategyOffloading: isFullRepack = true default: return false, structerr.NewInvalidArgument("invalid strategy: %q", cfg.Strategy) @@ -132,6 +134,23 @@ func RepackObjects(ctx context.Context, repo *localrepo.Repo, cfg config.RepackO return PerformFullRepackingWithUnreachable(ctx, repo, cfg) case config.RepackObjectsStrategyGeometric: return PerformGeometricRepacking(ctx, repo, cfg) + case config.RepackObjectsStrategyOffloading: + // TODO [mark] POC of offloading large objects (OLO) + offloadingMgr, err := offloading.NewOffloadingManager(ctx, repo) + if err != nil { + return err + } + options := offloadingMgr.BlobFilterOptions() + if err := offloadingMgr.UploadObjects(); err != nil { + //log.Error + return err + } + if err := PerformRepack(ctx, repo, cfg, options...); err != nil { + return err + } + if err := offloadingMgr.CreatePromisorFile(); err != nil { + return err + } } return nil } @@ -260,6 +279,12 @@ func PerformRepack(ctx context.Context, repo *localrepo.Repo, cfg config.RepackO opts = append(opts, git.Flag{Name: "--write-midx"}) } + // TODO [mark] POC of offloading large objects (OLO) + shouldOffloadDryRun := os.Getenv("OFFLOADING_DRY_RUN") + if shouldOffloadDryRun != "false" { // it is dry run + return nil + } + var stderr strings.Builder if err := repo.ExecAndWait(ctx, git.Command{ diff --git a/internal/git/housekeeping/optimization_strategy.go b/internal/git/housekeeping/optimization_strategy.go index b7b9cec986..2ed6df86b4 100644 --- a/internal/git/housekeeping/optimization_strategy.go +++ b/internal/git/housekeeping/optimization_strategy.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "os" "time" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -376,6 +377,17 @@ func (s EagerOptimizationStrategy) ShouldRepackObjects(ctx context.Context) (boo // loose objects. This is inefficient as these objects cannot be deltified and thus take up // more space. Instead, we ask git-repack(1) to append unreachable objects to the newly // created packfile. + + // TODO [mark] POC of offloading large objects (OLO) + shouldOffload := os.Getenv("OFFLOADING_POC_ENABLED") + if shouldOffload == "true" { + cfg.Strategy = config.RepackObjectsStrategyOffloading + cfg.FilterBlobs = config.FilterBlobActionStartFilteringAll + cfg.WriteBitmap = false + cfg.WriteMultiPackIndex = false + return true, cfg + } + if !s.info.IsObjectPool { cfg.Strategy = config.RepackObjectsStrategyFullWithCruft cfg.CruftExpireBefore = s.expireBefore diff --git a/internal/git/offloading/manager.go b/internal/git/offloading/manager.go new file mode 100644 index 0000000000..94331284a2 --- /dev/null +++ b/internal/git/offloading/manager.go @@ -0,0 +1,218 @@ +package offloading + +import ( + "cloud.google.com/go/storage" + "context" + "fmt" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitpipe" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/labkit/log" + "io" + "io/fs" + "os" + "path/filepath" + "strconv" + "strings" +) + +// Manager manages the offloading of Git data to external storage +type Manager struct { + filter string + filterLimit int + filterTo string + + gcpClient *storage.Client + bucket string + + catfileCache catfile.Cache + ctx context.Context + repo *localrepo.Repo +} + +// NewOffloadingManager creates a new OffloadingManager instance +func NewOffloadingManager(ctx context.Context, repo *localrepo.Repo) (*Manager, error) { + filterLimitStr := os.Getenv("OFFLOADING_POC_FILTER_LIMIT") + var err error + var filterLimit int + var filter string + if len(filterLimitStr) == 0 { + return nil, fmt.Errorf("I need a filter limit OFFLOADING_POC_FILTER_LIMIT") + } else { + filterLimit, err = strconv.Atoi(filterLimitStr) + if err != nil { + return nil, fmt.Errorf("OFFLOADING_POC_FILTER_LIMIT is not a number: %s", filterLimitStr) + } + filter = fmt.Sprintf("blob:limit=%d", filterLimit) + } + filterTo := os.Getenv("OFFLOADING_POC_FILTER_TO_PATH") + if len(filterTo) == 0 { + filterTo = "/tmp/pack" + } + bucket := os.Getenv("OFFLOADING_POC_BUCKET") + if len(bucket) == 0 { + bucket = "blob_offloads" + } + + // new Google Storage Client + // Use Google Application Default Credentials to authorize and authenticate the client. + // More information about Application Default Credentials and how to enable is at + // https://developers.google.com/identity/protocols/application-default-credentials. + gsClient, err := storage.NewClient(ctx) + if err != nil { + return nil, fmt.Errorf("google storage client error: %w", err) + } + cfg := config.Cfg{} + cfg.Git.CatfileCacheSize = 100 + catfileCache := catfile.NewCache(cfg) + + return &Manager{ + filter: filter, + filterLimit: filterLimit, + filterTo: filterTo, + gcpClient: gsClient, + bucket: bucket, + catfileCache: catfileCache, + + ctx: ctx, + repo: repo, + }, nil +} + +func (m *Manager) FindObjects() []string { + + //TODO use channel and goroutine to optimize ?? + //largeBlobChan := make(chan git.ObjectID) + res := make([]string, 0) + + revlistOptions := []gitpipe.RevlistOption{ + gitpipe.WithObjects(), + gitpipe.WithBlobLimit(m.filterLimit), + gitpipe.WithFilterPrintOmitted(), + } + + revlistIter := gitpipe.Revlist(m.ctx, m.repo, []string{"--all"}, revlistOptions...) + for revlistIter.Next() { + oid := revlistIter.Result().OID.String() + if strings.HasPrefix(oid, "~") { + log.Info(oid) + res = append(res, strings.TrimPrefix(oid, "~")) + } + + } + // TODO avoid double uploading + //defer close(largeBlobChan) + + return res +} + +func (m *Manager) CatBlobs(oids []string) (it gitpipe.CatfileObjectIterator, err error) { + + catfileInfo := make([]gitpipe.CatfileInfoResult, 0) + for _, oid := range oids { + catfileInfo = append(catfileInfo, gitpipe.CatfileInfoResult{ + ObjectInfo: &catfile.ObjectInfo{Oid: git.ObjectID(oid), Type: "blob"}, + }) + } + + objectReader, _, err := m.catfileCache.ObjectReader(m.ctx, m.repo) + if err != nil { + return nil, err + } + + it, err = gitpipe.CatfileObject(m.ctx, objectReader, gitpipe.NewCatfileInfoIterator(m.ctx, catfileInfo)) + if err != nil { + return nil, err + } + + return it, nil +} + +// UploadObjects upload the blobs on to cloud storage +func (m *Manager) UploadObjects() error { + defer m.catfileCache.Evict() + + client, err := storage.NewClient(m.ctx) + if err != nil { + return err + } + + bucketClient := client.Bucket(m.bucket) + + uploadList := m.FindObjects() + objIt, err := m.CatBlobs(uploadList) + if err != nil { + return err + } + + for objIt.Next() { + + result := objIt.Result() + objId := result.ObjectID() + + bucketPath := fmt.Sprintf("%s/%s", m.repo.GetRelativePath(), objId) + wc := bucketClient.Object(bucketPath).NewWriter(m.ctx) + + if _, err := io.Copy(wc, result); err != nil { + return fmt.Errorf("io.Copy: %v", err) + } + log.Info("Uploading object ", objId) + if err := wc.Close(); err != nil { + return fmt.Errorf("Writer.Close: %v", err) + } + } + + // close gs client + return nil +} + +// BlobFilterOptions returns the flags for repacking without large blobs +func (m *Manager) BlobFilterOptions() []git.Option { + opts := []git.Option{ + git.Flag{Name: "-a"}, + git.Flag{Name: "-d"}, + git.ValueFlag{Name: "--filter", Value: m.filter}, + git.ValueFlag{Name: "--filter-to", Value: m.filterTo}, + } + + return opts +} + +// CreatePromisorFile is the next step after repacking without large blobs +// TODO Ideally, I think this should be done in git repack, say give it a flag and it create the promisor file +// But for now, we add it in gitaly +func (m *Manager) CreatePromisorFile() error { + // find the name for the promisor file + var err error + repoPath, err := m.repo.Path() + packFolder := filepath.Join(repoPath, "objects", "pack") + packFiles := make([]string, 0) + err = filepath.Walk(packFolder, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + if filepath.Ext(info.Name()) == ".pack" { + packFiles = append(packFiles, info.Name()) + } + } + return nil + }) + if err != nil { + return err + } + + // add the promisor file for each packFiles + for _, f := range packFiles { + fileName := strings.TrimSuffix(f, ".pack") + promisorFile := fmt.Sprintf("%s/%s.promisor", packFolder, fileName) + _, err := os.Create(promisorFile) + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/gitaly/storage/repository_path.go b/internal/gitaly/storage/repository_path.go index 4448f45482..5a11a1dfd1 100644 --- a/internal/gitaly/storage/repository_path.go +++ b/internal/gitaly/storage/repository_path.go @@ -3,6 +3,7 @@ package storage import ( "crypto/sha256" "fmt" + log "github.com/sirupsen/logrus" "path/filepath" "regexp" "strconv" @@ -14,6 +15,8 @@ var ( PraefectRootPathPrefix = "@cluster" // praefectPoolPathPrefix is the prefix directory where Praefect places object pools. praefectPoolPathPrefix = filepath.Join(PraefectRootPathPrefix, "pools") + // praefectBlobOffloadPathPrefix is the prefix directory where Praefect places offloaded blob packfiles. + praefectBlobOffloadPathPrefix = filepath.Join(PraefectRootPathPrefix, "blob_offloads") // praefectRepositoryPathPrefix is the prefix directory where Praefect places repositories. praefectRepositoryPathPrefix = filepath.Join(PraefectRootPathPrefix, "repositories") // prafectPoolDirRegexp is used to validate object pool directory structure and name as generated by Praefect. @@ -42,6 +45,23 @@ func IsPoolRepository(repo Repository) bool { return IsRailsPoolRepository(repo) || IsPraefectPoolRepository(repo) } +// BlobOffloadPath returns the path where blobs are offloaded if some blobs are offloaded. It returns an +// empty string if no blobs are offloaded. praefectBlobOffloadPathPrefix should be in one of the +// GitAlternateObjectDirectories of the repo if some blobs are offloaded. +func BlobOffloadPath(repo Repository) string { + x := repo.GetGitAlternateObjectDirectories() + log.Info("I have git alternate dir: %v", x) + for _, s := range repo.GetGitAlternateObjectDirectories() { + if strings.HasPrefix(s, praefectBlobOffloadPathPrefix) { + return s + } + } + return "" +} +func BlobOffloadPathOnDevice(repositoryID int64) string { + return deriveDiskPath(fmt.Sprintf("/tmp/%s", praefectBlobOffloadPathPrefix), repositoryID) +} + // DeriveReplicaPath derives a repository's disk storage path from its repository ID. The repository ID // is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to // ensure even distribution into subdirectories. The format is @cluster/repositories/ab/cd/. @@ -58,6 +78,16 @@ func DerivePoolPath(repositoryID int64) string { return deriveDiskPath(praefectPoolPathPrefix, repositoryID) } +// DeriveBlobOffloadPath derives a blob offload's disk storage path from its repository ID. The repository ID +// is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to +// ensure even distribution into subdirectories. The format is @cluster/blob_offloads/ab/cd/. +// The blob offload packfiles are not full repository. They only contain packfiles. They are separated from the +// other repos so they can more easily be stored on separated cheap storage, like HDDs. They are accessed using +// the Git alternates mechanism. +func DeriveBlobOffloadPath(repositoryID int64) string { + return deriveDiskPath(praefectBlobOffloadPathPrefix, repositoryID) +} + func deriveDiskPath(prefixDir string, repositoryID int64) string { hasher := sha256.New() // String representation of the ID is used to make it easier to derive the replica paths with diff --git a/internal/gitaly/storage/repository_path_test.go b/internal/gitaly/storage/repository_path_test.go index aa9b4de808..a3d30ff189 100644 --- a/internal/gitaly/storage/repository_path_test.go +++ b/internal/gitaly/storage/repository_path_test.go @@ -19,6 +19,11 @@ func TestDerivePoolPath(t *testing.T) { require.Equal(t, "@cluster/pools/d4/73/2", storage.DerivePoolPath(2)) } +func TestDeriveBlobOffloadPath(t *testing.T) { + require.Equal(t, "@cluster/blob_offloads/6b/86/1", storage.DeriveBlobOffloadPath(1)) + require.Equal(t, "@cluster/blob_offloads/d4/73/2", storage.DeriveBlobOffloadPath(2)) +} + func TestIsPoolRepository(t *testing.T) { t.Parallel() @@ -113,3 +118,58 @@ func TestIsPoolRepository(t *testing.T) { }) } } + +func TestBlobOffloadPath(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + repo *gitalypb.Repository + blobOffloadPath string + } { + { + desc: "missing repository", + blobOffloadPath: "", + }, + { + desc: "empty string list", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ }, + }, + blobOffloadPath: "", + }, + { + desc: "list with empty string", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ "" }, + }, + blobOffloadPath: "", + }, + { + desc: "blob offload path", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ storage.DeriveBlobOffloadPath(1) }, + }, + blobOffloadPath: storage.DeriveBlobOffloadPath(1), + }, + { + desc: "invalid blob offload path", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ storage.DerivePoolPath(1) }, + }, + blobOffloadPath: "", + }, + { + desc: "mixed valid and invalid blob offload path", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ storage.DerivePoolPath(1), "", + storage.DeriveBlobOffloadPath(2), "" }, + }, + blobOffloadPath: storage.DeriveBlobOffloadPath(2), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.blobOffloadPath, storage.BlobOffloadPath(tc.repo)) + }) + } +} diff --git a/this_is_a_test_for_local_remote b/this_is_a_test_for_local_remote new file mode 100644 index 0000000000..e69de29bb2 -- GitLab