diff --git a/Makefile b/Makefile index 7b270f74c4a97d7710b5b3d4db8936a761a41958..0d678b0adfeadfb25a16656b8587c2212d8b8220 100644 --- a/Makefile +++ b/Makefile @@ -67,6 +67,7 @@ GOLANGCI_LINT_CONFIG ?= ${SOURCE_DIR}/.golangci.yml GITALY_PACKAGE := $(shell go list -m 2>/dev/null || echo unknown) GITALY_VERSION := $(shell ${GIT} describe --match v* 2>/dev/null | sed 's/^v//' || cat ${SOURCE_DIR}/VERSION 2>/dev/null || echo unknown) GO_LDFLAGS := -X ${GITALY_PACKAGE}/internal/version.version=${GITALY_VERSION} +GO_GCFLAGS ?= SERVER_BUILD_TAGS := tracer_static,tracer_static_jaeger,tracer_static_stackdriver,continuous_profiler_stackdriver # Temporary GNU build ID used as a placeholder value so that we can replace it @@ -599,7 +600,17 @@ ${BUILD_DIR}/intermediate/%: clear-go-build-cache-if-needed .FOR @ # of "TEMP_GITALY_BUILD_ID". In the final binary we replace this build ID with @ # the computed build ID for this binary. @ # We cd to SOURCE_DIR to avoid corner cases where workdir may be a symlink + @ # ${Q}cd ${SOURCE_DIR} && go build -o "$@" -gcflags '${GO_GCFLAGS}' -ldflags '-B 0x${TEMPORARY_BUILD_ID} ${GO_LDFLAGS}' -tags "${GO_BUILD_TAGS}" $(addprefix ${SOURCE_DIR}/cmd/,$(@F)) + +ifdef GO_GCFLAGS + ${Q}echo "Buiding $@ with -gcflags '${GO_GCFLAGS}'" + ${Q}cd ${SOURCE_DIR} && go build -o "$@" -gcflags '${GO_GCFLAGS}' -ldflags '-B 0x${TEMPORARY_BUILD_ID} ${GO_LDFLAGS}' -tags "${GO_BUILD_TAGS}" $(addprefix ${SOURCE_DIR}/cmd/,$(@F)) +else + @ # We're writing the version into the "version" file in Git's own source + @ # directory. If it exists, Git's Makefile will pick it up and use it as + @ # the version instead of auto-detecting via git-describe(1). ${Q}cd ${SOURCE_DIR} && go build -o "$@" -ldflags '-B 0x${TEMPORARY_BUILD_ID} ${GO_LDFLAGS}' -tags "${GO_BUILD_TAGS}" $(addprefix ${SOURCE_DIR}/cmd/,$(@F)) +endif # This is a build hack to avoid excessive rebuilding of targets. Instead of # depending on the Makefile, we start to depend on tool versions as defined in diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index c94361a4df6a620007e8faf9a66255b2a3c22247..10f11c5efd0117ba5f6064c38ae667c76c956e48 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -118,7 +118,7 @@ func serveAction(ctx *cli.Context) error { logger.Warn("Authentication is enabled but not enforced because transitioning=true. Gitaly will accept unauthenticated requests.") } - logger.WithField("version", version.GetVersion()).Info("Starting Gitaly") + logger.WithField("version", version.GetVersion()).Info("Starting Gitaly: POC OLO with Eric") fips.Check() if err := run(ctx, cfg, logger); err != nil { diff --git a/internal/git/gitpipe/revision.go b/internal/git/gitpipe/revision.go index 64bb328853dc201321e5dc5cccfbb4122bfedb81..3aec8b7fd9db7fd394f80a6b3e4ee40ecc544803 100644 --- a/internal/git/gitpipe/revision.go +++ b/internal/git/gitpipe/revision.go @@ -45,6 +45,7 @@ const ( type revlistConfig struct { blobLimit int objects bool + noObjectName bool objectType ObjectType order Order reverse bool @@ -56,6 +57,7 @@ type revlistConfig struct { regexIgnoreCase bool commitMessagePatterns [][]byte skipResult func(*RevisionResult) bool + filterPrintOmitted bool } // RevlistOption is an option for the revlist pipeline step. @@ -69,6 +71,18 @@ func WithObjects() RevlistOption { } } +func WithNoObjectNames() RevlistOption { + return func(cfg *revlistConfig) { + cfg.noObjectName = true + } +} + +func WithFilterPrintOmitted() RevlistOption { + return func(cfg *revlistConfig) { + cfg.filterPrintOmitted = true + } +} + // WithBlobLimit sets up a size limit for blobs. Only blobs whose size is smaller than this limit // will be returned by the pipeline step. func WithBlobLimit(limit int) RevlistOption { @@ -216,10 +230,17 @@ func Revlist( ) } + if cfg.noObjectName { + flags = append(flags, git.Flag{Name: "--no-object-names"}) + } + if cfg.blobLimit > 0 { flags = append(flags, git.Flag{ Name: fmt.Sprintf("--filter=blob:limit=%d", cfg.blobLimit), }) + if cfg.filterPrintOmitted { + flags = append(flags, git.Flag{Name: "--filter-print-omitted"}) + } } if cfg.objectType != "" { diff --git a/internal/git/housekeeping/config/config.go b/internal/git/housekeeping/config/config.go index e12f60d69c2073446462691f242eab83972e943c..0304d60d79a73deaae1f135592d82224f42b5344 100644 --- a/internal/git/housekeeping/config/config.go +++ b/internal/git/housekeeping/config/config.go @@ -37,6 +37,26 @@ const ( // of objects. Loose objects will get soaked up as part of the repack regardless of their // reachability. RepackObjectsStrategyGeometric = RepackObjectsStrategy("geometric") + + // TODO [mark] POC of offloading large objects (OLO) + RepackObjectsStrategyOffloading = RepackObjectsStrategy("offloading") +) + +// FilterBlobAction defines what we want to do regarding blob offloading. +type FilterBlobAction string + +const ( + // FilterBlobActionNoChange means that we should filter blobs only if such filtering is already + // setup, and we shouldn't change the blob filtering setup. + FilterBlobActionNoChange = FilterBlobAction("no_change") + // FilterBlobActionStartFilteringAll means that we should setup filtering all blobs and perform + // such filtering. If blob filtering is already setup, this should be the same as + // FilterBlobActionNoChange. + FilterBlobActionStartFilteringAll = FilterBlobAction("start_filtering_all_blobs") + // FilterBlobActionStopFilteringAny means that we should remove any setup for filtering all blobs, + // stop performing such filtering and repack all the blobs with the other Git objects. If blob + // filtering is not already setup, this should be the same as FilterBlobActionNoChange. + FilterBlobActionStopFilteringAny = FilterBlobAction("stop_filtering_any_blob") ) // RepackObjectsConfig is configuration for RepackObjects. @@ -49,6 +69,9 @@ type RepackObjectsConfig struct { WriteBitmap bool // WriteMultiPackIndex determines whether a multi-pack index should be written or not. WriteMultiPackIndex bool + // FilterBlobs determines if we want to start or stop filtering out blobs into a separate + // packfile, or if we want to continue doing what we already do regarding blob filtering. + FilterBlobs FilterBlobAction // CruftExpireBefore determines the cutoff date before which unreachable cruft objects shall // be expired and thus deleted. CruftExpireBefore time.Time diff --git a/internal/git/housekeeping/objects.go b/internal/git/housekeeping/objects.go index 6685a6c38cce9175898e16c2bd4a137847e3b5cd..dca564af904ffc80896ff549859b0a555d8d4937 100644 --- a/internal/git/housekeeping/objects.go +++ b/internal/git/housekeeping/objects.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/offloading" + "os" "os/exec" "path/filepath" "strconv" @@ -39,7 +41,7 @@ func ValidateRepacking(cfg config.RepackObjectsConfig) (bool, error) { } case config.RepackObjectsStrategyGeometric: isFullRepack = false - case config.RepackObjectsStrategyFullWithCruft, config.RepackObjectsStrategyFullWithUnreachable: + case config.RepackObjectsStrategyFullWithCruft, config.RepackObjectsStrategyFullWithUnreachable, config.RepackObjectsStrategyOffloading: isFullRepack = true default: return false, structerr.NewInvalidArgument("invalid strategy: %q", cfg.Strategy) @@ -132,6 +134,23 @@ func RepackObjects(ctx context.Context, repo *localrepo.Repo, cfg config.RepackO return PerformFullRepackingWithUnreachable(ctx, repo, cfg) case config.RepackObjectsStrategyGeometric: return PerformGeometricRepacking(ctx, repo, cfg) + case config.RepackObjectsStrategyOffloading: + // TODO [mark] POC of offloading large objects (OLO) + offloadingMgr, err := offloading.NewOffloadingManager(ctx, repo) + if err != nil { + return err + } + options := offloadingMgr.BlobFilterOptions() + if err := offloadingMgr.UploadObjects(); err != nil { + //log.Error + return err + } + if err := PerformRepack(ctx, repo, cfg, options...); err != nil { + return err + } + if err := offloadingMgr.CreatePromisorFile(); err != nil { + return err + } } return nil } @@ -260,6 +279,12 @@ func PerformRepack(ctx context.Context, repo *localrepo.Repo, cfg config.RepackO opts = append(opts, git.Flag{Name: "--write-midx"}) } + // TODO [mark] POC of offloading large objects (OLO) + shouldOffloadDryRun := os.Getenv("OFFLOADING_DRY_RUN") + if shouldOffloadDryRun != "false" { // it is dry run + return nil + } + var stderr strings.Builder if err := repo.ExecAndWait(ctx, git.Command{ diff --git a/internal/git/housekeeping/optimization_strategy.go b/internal/git/housekeeping/optimization_strategy.go index b7b9cec98650aad047860ee2a337f3e588eb08dc..2ed6df86b4bfc80519e06bc45dddef9af98476ec 100644 --- a/internal/git/housekeeping/optimization_strategy.go +++ b/internal/git/housekeeping/optimization_strategy.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "os" "time" "gitlab.com/gitlab-org/gitaly/v16/internal/git" @@ -376,6 +377,17 @@ func (s EagerOptimizationStrategy) ShouldRepackObjects(ctx context.Context) (boo // loose objects. This is inefficient as these objects cannot be deltified and thus take up // more space. Instead, we ask git-repack(1) to append unreachable objects to the newly // created packfile. + + // TODO [mark] POC of offloading large objects (OLO) + shouldOffload := os.Getenv("OFFLOADING_POC_ENABLED") + if shouldOffload == "true" { + cfg.Strategy = config.RepackObjectsStrategyOffloading + cfg.FilterBlobs = config.FilterBlobActionStartFilteringAll + cfg.WriteBitmap = false + cfg.WriteMultiPackIndex = false + return true, cfg + } + if !s.info.IsObjectPool { cfg.Strategy = config.RepackObjectsStrategyFullWithCruft cfg.CruftExpireBefore = s.expireBefore diff --git a/internal/git/offloading/manager.go b/internal/git/offloading/manager.go new file mode 100644 index 0000000000000000000000000000000000000000..94331284a2b3d8d9e631e08c31897d20f571acbd --- /dev/null +++ b/internal/git/offloading/manager.go @@ -0,0 +1,218 @@ +package offloading + +import ( + "cloud.google.com/go/storage" + "context" + "fmt" + "gitlab.com/gitlab-org/gitaly/v16/internal/git" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gitpipe" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/labkit/log" + "io" + "io/fs" + "os" + "path/filepath" + "strconv" + "strings" +) + +// Manager manages the offloading of Git data to external storage +type Manager struct { + filter string + filterLimit int + filterTo string + + gcpClient *storage.Client + bucket string + + catfileCache catfile.Cache + ctx context.Context + repo *localrepo.Repo +} + +// NewOffloadingManager creates a new OffloadingManager instance +func NewOffloadingManager(ctx context.Context, repo *localrepo.Repo) (*Manager, error) { + filterLimitStr := os.Getenv("OFFLOADING_POC_FILTER_LIMIT") + var err error + var filterLimit int + var filter string + if len(filterLimitStr) == 0 { + return nil, fmt.Errorf("I need a filter limit OFFLOADING_POC_FILTER_LIMIT") + } else { + filterLimit, err = strconv.Atoi(filterLimitStr) + if err != nil { + return nil, fmt.Errorf("OFFLOADING_POC_FILTER_LIMIT is not a number: %s", filterLimitStr) + } + filter = fmt.Sprintf("blob:limit=%d", filterLimit) + } + filterTo := os.Getenv("OFFLOADING_POC_FILTER_TO_PATH") + if len(filterTo) == 0 { + filterTo = "/tmp/pack" + } + bucket := os.Getenv("OFFLOADING_POC_BUCKET") + if len(bucket) == 0 { + bucket = "blob_offloads" + } + + // new Google Storage Client + // Use Google Application Default Credentials to authorize and authenticate the client. + // More information about Application Default Credentials and how to enable is at + // https://developers.google.com/identity/protocols/application-default-credentials. + gsClient, err := storage.NewClient(ctx) + if err != nil { + return nil, fmt.Errorf("google storage client error: %w", err) + } + cfg := config.Cfg{} + cfg.Git.CatfileCacheSize = 100 + catfileCache := catfile.NewCache(cfg) + + return &Manager{ + filter: filter, + filterLimit: filterLimit, + filterTo: filterTo, + gcpClient: gsClient, + bucket: bucket, + catfileCache: catfileCache, + + ctx: ctx, + repo: repo, + }, nil +} + +func (m *Manager) FindObjects() []string { + + //TODO use channel and goroutine to optimize ?? + //largeBlobChan := make(chan git.ObjectID) + res := make([]string, 0) + + revlistOptions := []gitpipe.RevlistOption{ + gitpipe.WithObjects(), + gitpipe.WithBlobLimit(m.filterLimit), + gitpipe.WithFilterPrintOmitted(), + } + + revlistIter := gitpipe.Revlist(m.ctx, m.repo, []string{"--all"}, revlistOptions...) + for revlistIter.Next() { + oid := revlistIter.Result().OID.String() + if strings.HasPrefix(oid, "~") { + log.Info(oid) + res = append(res, strings.TrimPrefix(oid, "~")) + } + + } + // TODO avoid double uploading + //defer close(largeBlobChan) + + return res +} + +func (m *Manager) CatBlobs(oids []string) (it gitpipe.CatfileObjectIterator, err error) { + + catfileInfo := make([]gitpipe.CatfileInfoResult, 0) + for _, oid := range oids { + catfileInfo = append(catfileInfo, gitpipe.CatfileInfoResult{ + ObjectInfo: &catfile.ObjectInfo{Oid: git.ObjectID(oid), Type: "blob"}, + }) + } + + objectReader, _, err := m.catfileCache.ObjectReader(m.ctx, m.repo) + if err != nil { + return nil, err + } + + it, err = gitpipe.CatfileObject(m.ctx, objectReader, gitpipe.NewCatfileInfoIterator(m.ctx, catfileInfo)) + if err != nil { + return nil, err + } + + return it, nil +} + +// UploadObjects upload the blobs on to cloud storage +func (m *Manager) UploadObjects() error { + defer m.catfileCache.Evict() + + client, err := storage.NewClient(m.ctx) + if err != nil { + return err + } + + bucketClient := client.Bucket(m.bucket) + + uploadList := m.FindObjects() + objIt, err := m.CatBlobs(uploadList) + if err != nil { + return err + } + + for objIt.Next() { + + result := objIt.Result() + objId := result.ObjectID() + + bucketPath := fmt.Sprintf("%s/%s", m.repo.GetRelativePath(), objId) + wc := bucketClient.Object(bucketPath).NewWriter(m.ctx) + + if _, err := io.Copy(wc, result); err != nil { + return fmt.Errorf("io.Copy: %v", err) + } + log.Info("Uploading object ", objId) + if err := wc.Close(); err != nil { + return fmt.Errorf("Writer.Close: %v", err) + } + } + + // close gs client + return nil +} + +// BlobFilterOptions returns the flags for repacking without large blobs +func (m *Manager) BlobFilterOptions() []git.Option { + opts := []git.Option{ + git.Flag{Name: "-a"}, + git.Flag{Name: "-d"}, + git.ValueFlag{Name: "--filter", Value: m.filter}, + git.ValueFlag{Name: "--filter-to", Value: m.filterTo}, + } + + return opts +} + +// CreatePromisorFile is the next step after repacking without large blobs +// TODO Ideally, I think this should be done in git repack, say give it a flag and it create the promisor file +// But for now, we add it in gitaly +func (m *Manager) CreatePromisorFile() error { + // find the name for the promisor file + var err error + repoPath, err := m.repo.Path() + packFolder := filepath.Join(repoPath, "objects", "pack") + packFiles := make([]string, 0) + err = filepath.Walk(packFolder, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + if filepath.Ext(info.Name()) == ".pack" { + packFiles = append(packFiles, info.Name()) + } + } + return nil + }) + if err != nil { + return err + } + + // add the promisor file for each packFiles + for _, f := range packFiles { + fileName := strings.TrimSuffix(f, ".pack") + promisorFile := fmt.Sprintf("%s/%s.promisor", packFolder, fileName) + _, err := os.Create(promisorFile) + if err != nil { + return err + } + } + + return nil +} diff --git a/internal/gitaly/storage/repository_path.go b/internal/gitaly/storage/repository_path.go index 4448f45482babf99278696aaa3a31e93d093d0ff..5a11a1dfd1e986793c5ea5ae34464257ef089a3b 100644 --- a/internal/gitaly/storage/repository_path.go +++ b/internal/gitaly/storage/repository_path.go @@ -3,6 +3,7 @@ package storage import ( "crypto/sha256" "fmt" + log "github.com/sirupsen/logrus" "path/filepath" "regexp" "strconv" @@ -14,6 +15,8 @@ var ( PraefectRootPathPrefix = "@cluster" // praefectPoolPathPrefix is the prefix directory where Praefect places object pools. praefectPoolPathPrefix = filepath.Join(PraefectRootPathPrefix, "pools") + // praefectBlobOffloadPathPrefix is the prefix directory where Praefect places offloaded blob packfiles. + praefectBlobOffloadPathPrefix = filepath.Join(PraefectRootPathPrefix, "blob_offloads") // praefectRepositoryPathPrefix is the prefix directory where Praefect places repositories. praefectRepositoryPathPrefix = filepath.Join(PraefectRootPathPrefix, "repositories") // prafectPoolDirRegexp is used to validate object pool directory structure and name as generated by Praefect. @@ -42,6 +45,23 @@ func IsPoolRepository(repo Repository) bool { return IsRailsPoolRepository(repo) || IsPraefectPoolRepository(repo) } +// BlobOffloadPath returns the path where blobs are offloaded if some blobs are offloaded. It returns an +// empty string if no blobs are offloaded. praefectBlobOffloadPathPrefix should be in one of the +// GitAlternateObjectDirectories of the repo if some blobs are offloaded. +func BlobOffloadPath(repo Repository) string { + x := repo.GetGitAlternateObjectDirectories() + log.Info("I have git alternate dir: %v", x) + for _, s := range repo.GetGitAlternateObjectDirectories() { + if strings.HasPrefix(s, praefectBlobOffloadPathPrefix) { + return s + } + } + return "" +} +func BlobOffloadPathOnDevice(repositoryID int64) string { + return deriveDiskPath(fmt.Sprintf("/tmp/%s", praefectBlobOffloadPathPrefix), repositoryID) +} + // DeriveReplicaPath derives a repository's disk storage path from its repository ID. The repository ID // is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to // ensure even distribution into subdirectories. The format is @cluster/repositories/ab/cd/. @@ -58,6 +78,16 @@ func DerivePoolPath(repositoryID int64) string { return deriveDiskPath(praefectPoolPathPrefix, repositoryID) } +// DeriveBlobOffloadPath derives a blob offload's disk storage path from its repository ID. The repository ID +// is hashed with SHA256 and the first four hex digits of the hash are used as the two subdirectories to +// ensure even distribution into subdirectories. The format is @cluster/blob_offloads/ab/cd/. +// The blob offload packfiles are not full repository. They only contain packfiles. They are separated from the +// other repos so they can more easily be stored on separated cheap storage, like HDDs. They are accessed using +// the Git alternates mechanism. +func DeriveBlobOffloadPath(repositoryID int64) string { + return deriveDiskPath(praefectBlobOffloadPathPrefix, repositoryID) +} + func deriveDiskPath(prefixDir string, repositoryID int64) string { hasher := sha256.New() // String representation of the ID is used to make it easier to derive the replica paths with diff --git a/internal/gitaly/storage/repository_path_test.go b/internal/gitaly/storage/repository_path_test.go index aa9b4de8083ef8d87230b1dbd71882cd83e7e78c..a3d30ff189868c8c1fc1ea4137d06a365f0a3e8a 100644 --- a/internal/gitaly/storage/repository_path_test.go +++ b/internal/gitaly/storage/repository_path_test.go @@ -19,6 +19,11 @@ func TestDerivePoolPath(t *testing.T) { require.Equal(t, "@cluster/pools/d4/73/2", storage.DerivePoolPath(2)) } +func TestDeriveBlobOffloadPath(t *testing.T) { + require.Equal(t, "@cluster/blob_offloads/6b/86/1", storage.DeriveBlobOffloadPath(1)) + require.Equal(t, "@cluster/blob_offloads/d4/73/2", storage.DeriveBlobOffloadPath(2)) +} + func TestIsPoolRepository(t *testing.T) { t.Parallel() @@ -113,3 +118,58 @@ func TestIsPoolRepository(t *testing.T) { }) } } + +func TestBlobOffloadPath(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + desc string + repo *gitalypb.Repository + blobOffloadPath string + } { + { + desc: "missing repository", + blobOffloadPath: "", + }, + { + desc: "empty string list", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ }, + }, + blobOffloadPath: "", + }, + { + desc: "list with empty string", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ "" }, + }, + blobOffloadPath: "", + }, + { + desc: "blob offload path", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ storage.DeriveBlobOffloadPath(1) }, + }, + blobOffloadPath: storage.DeriveBlobOffloadPath(1), + }, + { + desc: "invalid blob offload path", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ storage.DerivePoolPath(1) }, + }, + blobOffloadPath: "", + }, + { + desc: "mixed valid and invalid blob offload path", + repo: &gitalypb.Repository{ + GitAlternateObjectDirectories: []string{ storage.DerivePoolPath(1), "", + storage.DeriveBlobOffloadPath(2), "" }, + }, + blobOffloadPath: storage.DeriveBlobOffloadPath(2), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.blobOffloadPath, storage.BlobOffloadPath(tc.repo)) + }) + } +} diff --git a/this_is_a_test_for_local_remote b/this_is_a_test_for_local_remote new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391