From b5e0ef7bd87bc49c4b8210863aa0bbb623784698 Mon Sep 17 00:00:00 2001 From: Sohan Dhanak Date: Thu, 4 Dec 2025 15:59:53 +0000 Subject: [PATCH 1/2] housekeeping: Introduce SelectiveOptimizationStrategy SelectiveOptimizationStrategy wraps HeuristicalOptimizationStrategy but only runs operations that have been explicitly enabled. This allows housekeeping to trigger operations independently based on per-operation thresholds. --- internal/git/housekeeping/config/config.go | 14 + .../git/housekeeping/selective_strategy.go | 66 ++++ .../housekeeping/selective_strategy_test.go | 366 ++++++++++++++++++ 3 files changed, 446 insertions(+) create mode 100644 internal/git/housekeeping/selective_strategy.go create mode 100644 internal/git/housekeeping/selective_strategy_test.go diff --git a/internal/git/housekeeping/config/config.go b/internal/git/housekeeping/config/config.go index 56db3de82c..75b45ead1d 100644 --- a/internal/git/housekeeping/config/config.go +++ b/internal/git/housekeeping/config/config.go @@ -68,3 +68,17 @@ type OffloadingConfig struct { // the prefix will need to set because we need to know the prefix to look up the object beforehand. Prefix string } + +// OperationType represents different housekeeping operations that can be scheduled independently. +type OperationType string + +const ( + // OpRepackRefs represents the pack-refs operation. + OpRepackRefs OperationType = "repack_refs" + // OpRepackObjects represents the object repacking operation. + OpRepackObjects OperationType = "repack_objects" + // OpPruneObjects represents the object pruning operation. + OpPruneObjects OperationType = "prune_objects" + // OpWriteCommitGraph represents the commit-graph writing operation. + OpWriteCommitGraph OperationType = "write_commit_graph" +) diff --git a/internal/git/housekeeping/selective_strategy.go b/internal/git/housekeeping/selective_strategy.go new file mode 100644 index 0000000000..15f8e4cadc --- /dev/null +++ b/internal/git/housekeeping/selective_strategy.go @@ -0,0 +1,66 @@ +package housekeeping + +import ( + "context" + + "gitlab.com/gitlab-org/gitaly/v18/internal/git/housekeeping/config" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/stats" +) + +// SelectiveOptimizationStrategy wraps HeuristicalOptimizationStrategy but only +// enables operations that have been explicitly scheduled based on their individual thresholds. +type SelectiveOptimizationStrategy struct { + inner *HeuristicalOptimizationStrategy + enabledOps map[config.OperationType]bool +} + +// NewSelectiveOptimizationStrategy creates a strategy that only runs the specified operations. +func NewSelectiveOptimizationStrategy(info stats.RepositoryInfo, ops []config.OperationType) *SelectiveOptimizationStrategy { + enabledOps := make(map[config.OperationType]bool) + for _, op := range ops { + enabledOps[op] = true + } + + heuristicalStrategy := NewHeuristicalOptimizationStrategy(info) + return &SelectiveOptimizationStrategy{ + inner: &heuristicalStrategy, + enabledOps: enabledOps, + } +} + +// EnabledOps returns the map of enabled operations for testing purposes. +func (s *SelectiveOptimizationStrategy) EnabledOps() map[config.OperationType]bool { + return s.enabledOps +} + +// ShouldRepackObjects delegates to the inner strategy only if repack is enabled. +func (s *SelectiveOptimizationStrategy) ShouldRepackObjects(ctx context.Context) (bool, config.RepackObjectsConfig) { + if !s.enabledOps[config.OpRepackObjects] { + return false, config.RepackObjectsConfig{} + } + return s.inner.ShouldRepackObjects(ctx) +} + +// ShouldPruneObjects delegates to the inner strategy only if prune is enabled. +func (s *SelectiveOptimizationStrategy) ShouldPruneObjects(ctx context.Context) (bool, PruneObjectsConfig) { + if !s.enabledOps[config.OpPruneObjects] { + return false, PruneObjectsConfig{} + } + return s.inner.ShouldPruneObjects(ctx) +} + +// ShouldRepackReferences delegates to the inner strategy only if pack-refs is enabled. +func (s *SelectiveOptimizationStrategy) ShouldRepackReferences(ctx context.Context) bool { + if !s.enabledOps[config.OpRepackRefs] { + return false + } + return s.inner.ShouldRepackReferences(ctx) +} + +// ShouldWriteCommitGraph delegates to the inner strategy only if commit-graph is enabled. +func (s *SelectiveOptimizationStrategy) ShouldWriteCommitGraph(ctx context.Context) (bool, config.WriteCommitGraphConfig, error) { + if !s.enabledOps[config.OpWriteCommitGraph] { + return false, config.WriteCommitGraphConfig{}, nil + } + return s.inner.ShouldWriteCommitGraph(ctx) +} diff --git a/internal/git/housekeeping/selective_strategy_test.go b/internal/git/housekeeping/selective_strategy_test.go new file mode 100644 index 0000000000..37eeac3afc --- /dev/null +++ b/internal/git/housekeeping/selective_strategy_test.go @@ -0,0 +1,366 @@ +package housekeeping + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/housekeeping/config" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/stats" + "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" +) + +func TestSelectiveOptimizationStrategy_ShouldRepackObjects(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + // Repository info that would trigger a repack in HeuristicalOptimizationStrategy + infoNeedingRepack := stats.RepositoryInfo{ + LooseObjects: stats.LooseObjectsInfo{ + Count: 2000, + }, + Packfiles: stats.PackfilesInfo{ + Bitmap: stats.BitmapInfo{ + Exists: true, + }, + MultiPackIndex: stats.MultiPackIndexInfo{ + Exists: true, + }, + }, + } + + for _, tc := range []struct { + desc string + enabledOps []config.OperationType + info stats.RepositoryInfo + expectedNeeded bool + expectedConfig config.RepackObjectsConfig + }{ + { + desc: "repack disabled", + enabledOps: []config.OperationType{config.OpRepackRefs, config.OpPruneObjects, config.OpWriteCommitGraph}, + info: infoNeedingRepack, + expectedNeeded: false, + expectedConfig: config.RepackObjectsConfig{}, + }, + { + desc: "repack enabled delegates to inner strategy", + enabledOps: []config.OperationType{config.OpRepackObjects}, + info: infoNeedingRepack, + expectedNeeded: true, + expectedConfig: config.RepackObjectsConfig{ + Strategy: config.RepackObjectsStrategyIncrementalWithUnreachable, + }, + }, + { + desc: "repack enabled but not needed by inner strategy", + enabledOps: []config.OperationType{config.OpRepackObjects}, + info: stats.RepositoryInfo{}, + expectedNeeded: false, + expectedConfig: config.RepackObjectsConfig{}, + }, + { + desc: "all operations enabled", + enabledOps: []config.OperationType{config.OpRepackRefs, config.OpRepackObjects, config.OpPruneObjects, config.OpWriteCommitGraph}, + info: infoNeedingRepack, + expectedNeeded: true, + expectedConfig: config.RepackObjectsConfig{ + Strategy: config.RepackObjectsStrategyIncrementalWithUnreachable, + }, + }, + { + desc: "no operations enabled", + enabledOps: []config.OperationType{}, + info: infoNeedingRepack, + expectedNeeded: false, + expectedConfig: config.RepackObjectsConfig{}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + strategy := NewSelectiveOptimizationStrategy(tc.info, tc.enabledOps) + + repackNeeded, repackCfg := strategy.ShouldRepackObjects(ctx) + require.Equal(t, tc.expectedNeeded, repackNeeded) + require.Equal(t, tc.expectedConfig, repackCfg) + }) + } +} + +func TestSelectiveOptimizationStrategy_ShouldPruneObjects(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + // Repository info that would trigger a prune in HeuristicalOptimizationStrategy + infoNeedingPrune := stats.RepositoryInfo{ + LooseObjects: stats.LooseObjectsInfo{ + StaleCount: 2000, + }, + } + + for _, tc := range []struct { + desc string + enabledOps []config.OperationType + info stats.RepositoryInfo + expectedNeeded bool + }{ + { + desc: "prune disabled", + enabledOps: []config.OperationType{config.OpRepackRefs, config.OpRepackObjects, config.OpWriteCommitGraph}, + info: infoNeedingPrune, + expectedNeeded: false, + }, + { + desc: "prune enabled delegates to inner strategy", + enabledOps: []config.OperationType{config.OpPruneObjects}, + info: infoNeedingPrune, + expectedNeeded: true, + }, + { + desc: "prune enabled but not needed by inner strategy", + enabledOps: []config.OperationType{config.OpPruneObjects}, + info: stats.RepositoryInfo{}, + expectedNeeded: false, + }, + { + desc: "prune disabled for object pools even when enabled", + enabledOps: []config.OperationType{config.OpPruneObjects}, + info: stats.RepositoryInfo{ + IsObjectPool: true, + LooseObjects: stats.LooseObjectsInfo{ + StaleCount: 2000, + }, + }, + expectedNeeded: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + strategy := NewSelectiveOptimizationStrategy(tc.info, tc.enabledOps) + + pruneNeeded, pruneCfg := strategy.ShouldPruneObjects(ctx) + require.Equal(t, tc.expectedNeeded, pruneNeeded) + if tc.expectedNeeded { + require.NotZero(t, pruneCfg.ExpireBefore) + } else { + require.Equal(t, PruneObjectsConfig{}, pruneCfg) + } + }) + } +} + +func TestSelectiveOptimizationStrategy_ShouldRepackReferences(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + // Repository info that would trigger a pack-refs in HeuristicalOptimizationStrategy + infoNeedingPackRefs := stats.RepositoryInfo{ + References: stats.ReferencesInfo{ + ReferenceBackendName: gittest.DefaultReferenceBackend.Name, + LooseReferencesCount: 100, + PackedReferencesSize: 1024, + }, + } + + for _, tc := range []struct { + desc string + enabledOps []config.OperationType + info stats.RepositoryInfo + expectedNeeded bool + }{ + { + desc: "pack-refs disabled", + enabledOps: []config.OperationType{config.OpRepackObjects, config.OpPruneObjects, config.OpWriteCommitGraph}, + info: infoNeedingPackRefs, + expectedNeeded: false, + }, + { + desc: "pack-refs enabled delegates to inner strategy", + enabledOps: []config.OperationType{config.OpRepackRefs}, + info: infoNeedingPackRefs, + expectedNeeded: true, + }, + { + desc: "pack-refs enabled but not needed by inner strategy", + enabledOps: []config.OperationType{config.OpRepackRefs}, + info: stats.RepositoryInfo{}, + expectedNeeded: false, + }, + { + desc: "pack-refs enabled with no loose refs", + enabledOps: []config.OperationType{config.OpRepackRefs}, + info: stats.RepositoryInfo{ + References: stats.ReferencesInfo{ + LooseReferencesCount: 0, + }, + }, + expectedNeeded: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + strategy := NewSelectiveOptimizationStrategy(tc.info, tc.enabledOps) + + packRefsNeeded := strategy.ShouldRepackReferences(ctx) + require.Equal(t, tc.expectedNeeded, packRefsNeeded) + }) + } +} + +func TestSelectiveOptimizationStrategy_ShouldWriteCommitGraph(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + // Repository info that would trigger a commit-graph write in HeuristicalOptimizationStrategy + infoNeedingCommitGraph := stats.RepositoryInfo{ + References: stats.ReferencesInfo{ + ReferenceBackendName: gittest.DefaultReferenceBackend.Name, + LooseReferencesCount: 1, + }, + CommitGraph: stats.CommitGraphInfo{ + HasBloomFilters: false, + }, + } + + for _, tc := range []struct { + desc string + enabledOps []config.OperationType + info stats.RepositoryInfo + expectedNeeded bool + expectedConfig config.WriteCommitGraphConfig + }{ + { + desc: "commit-graph disabled", + enabledOps: []config.OperationType{config.OpRepackRefs, config.OpRepackObjects, config.OpPruneObjects}, + info: infoNeedingCommitGraph, + expectedNeeded: false, + expectedConfig: config.WriteCommitGraphConfig{}, + }, + { + desc: "commit-graph enabled delegates to inner strategy", + enabledOps: []config.OperationType{config.OpWriteCommitGraph}, + info: infoNeedingCommitGraph, + expectedNeeded: true, + expectedConfig: config.WriteCommitGraphConfig{ + ReplaceChain: true, + }, + }, + { + desc: "commit-graph enabled but not needed by inner strategy", + enabledOps: []config.OperationType{config.OpWriteCommitGraph}, + info: stats.RepositoryInfo{ + References: stats.ReferencesInfo{ + ReferenceBackendName: gittest.DefaultReferenceBackend.Name, + LooseReferencesCount: 1, + }, + CommitGraph: stats.CommitGraphInfo{ + CommitGraphChainLength: 1, + HasBloomFilters: true, + HasGenerationData: true, + }, + }, + expectedNeeded: false, + expectedConfig: config.WriteCommitGraphConfig{}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + strategy := NewSelectiveOptimizationStrategy(tc.info, tc.enabledOps) + + commitGraphNeeded, commitGraphCfg, err := strategy.ShouldWriteCommitGraph(ctx) + require.NoError(t, err) + require.Equal(t, tc.expectedNeeded, commitGraphNeeded) + require.Equal(t, tc.expectedConfig, commitGraphCfg) + }) + } +} + +func TestSelectiveOptimizationStrategy_IndependentOperations(t *testing.T) { + t.Parallel() + + ctx := testhelper.Context(t) + + // Repository info where all operations would be triggered by HeuristicalOptimizationStrategy + info := stats.RepositoryInfo{ + LooseObjects: stats.LooseObjectsInfo{ + Count: 2000, + StaleCount: 2000, + }, + Packfiles: stats.PackfilesInfo{ + Bitmap: stats.BitmapInfo{ + Exists: true, + }, + MultiPackIndex: stats.MultiPackIndexInfo{ + Exists: true, + }, + }, + References: stats.ReferencesInfo{ + ReferenceBackendName: gittest.DefaultReferenceBackend.Name, + LooseReferencesCount: 100, + PackedReferencesSize: 1024, + }, + CommitGraph: stats.CommitGraphInfo{ + HasBloomFilters: false, + }, + } + + t.Run("only pack-refs enabled", func(t *testing.T) { + strategy := NewSelectiveOptimizationStrategy(info, []config.OperationType{config.OpRepackRefs}) + + repackNeeded, _ := strategy.ShouldRepackObjects(ctx) + require.False(t, repackNeeded, "repack should be disabled") + + pruneNeeded, _ := strategy.ShouldPruneObjects(ctx) + require.False(t, pruneNeeded, "prune should be disabled") + + packRefsNeeded := strategy.ShouldRepackReferences(ctx) + require.True(t, packRefsNeeded, "pack-refs should be enabled and needed") + + commitGraphNeeded, _, err := strategy.ShouldWriteCommitGraph(ctx) + require.NoError(t, err) + require.False(t, commitGraphNeeded, "commit-graph should be disabled") + }) + + t.Run("only repack and commit-graph enabled", func(t *testing.T) { + strategy := NewSelectiveOptimizationStrategy(info, []config.OperationType{config.OpRepackObjects, config.OpWriteCommitGraph}) + + repackNeeded, _ := strategy.ShouldRepackObjects(ctx) + require.True(t, repackNeeded, "repack should be enabled and needed") + + pruneNeeded, _ := strategy.ShouldPruneObjects(ctx) + require.False(t, pruneNeeded, "prune should be disabled") + + packRefsNeeded := strategy.ShouldRepackReferences(ctx) + require.False(t, packRefsNeeded, "pack-refs should be disabled") + + commitGraphNeeded, _, err := strategy.ShouldWriteCommitGraph(ctx) + require.NoError(t, err) + require.True(t, commitGraphNeeded, "commit-graph should be enabled and needed") + }) + + t.Run("all operations enabled", func(t *testing.T) { + allOps := []config.OperationType{config.OpRepackRefs, config.OpRepackObjects, config.OpPruneObjects, config.OpWriteCommitGraph} + selectiveStrategy := NewSelectiveOptimizationStrategy(info, allOps) + heuristicStrategy := NewHeuristicalOptimizationStrategy(info) + + selectiveRepack, selectiveRepackCfg := selectiveStrategy.ShouldRepackObjects(ctx) + heuristicRepack, heuristicRepackCfg := heuristicStrategy.ShouldRepackObjects(ctx) + require.Equal(t, heuristicRepack, selectiveRepack) + require.Equal(t, heuristicRepackCfg, selectiveRepackCfg) + + selectivePrune, _ := selectiveStrategy.ShouldPruneObjects(ctx) + heuristicPrune, _ := heuristicStrategy.ShouldPruneObjects(ctx) + require.Equal(t, heuristicPrune, selectivePrune) + + selectivePackRefs := selectiveStrategy.ShouldRepackReferences(ctx) + heuristicPackRefs := heuristicStrategy.ShouldRepackReferences(ctx) + require.Equal(t, heuristicPackRefs, selectivePackRefs) + + selectiveCommitGraph, selectiveCommitGraphCfg, err := selectiveStrategy.ShouldWriteCommitGraph(ctx) + require.NoError(t, err) + heuristicCommitGraph, heuristicCommitGraphCfg, err := heuristicStrategy.ShouldWriteCommitGraph(ctx) + require.NoError(t, err) + require.Equal(t, heuristicCommitGraph, selectiveCommitGraph) + require.Equal(t, heuristicCommitGraphCfg, selectiveCommitGraphCfg) + }) +} -- GitLab From 495296727049082f6a779faaacb0845553e0f982 Mon Sep 17 00:00:00 2001 From: Sohan Dhanak Date: Fri, 5 Dec 2025 00:01:00 +0000 Subject: [PATCH 2/2] housekeeping: Add per-operation interval configuration Allow different housekeeping operations to run at independent intervals. This change only edits the interval for pack-refs has changed and other operations continue to run every 20 RPCs. The middleware now uses SelectiveOptimizationStrategy to run only operations that exceed their thresholds. --- internal/cli/gitaly/serve.go | 3 +- .../middleware/housekeeping/middleware.go | 140 ++++++++-- .../housekeeping/middleware_test.go | 253 ++++++++++++++++-- 3 files changed, 357 insertions(+), 39 deletions(-) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 0531be28fa..991cec209b 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -589,7 +589,8 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { housekeepingManager := housekeepingmgr.New(cfg.Prometheus, logger, transactionManager, node) prometheus.MustRegister(housekeepingManager) - housekeepingMiddleware := housekeepingmw.NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localrepoFactory, housekeepingManager, 20) + housekeepingMiddlewareConfig := housekeepingmw.DefaultMiddlewareConfig() + housekeepingMiddleware := housekeepingmw.NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localrepoFactory, housekeepingManager, housekeepingMiddlewareConfig) defer housekeepingMiddleware.WaitForWorkers() gitalyServerFactory := server.NewGitalyServerFactory( diff --git a/internal/grpc/middleware/housekeeping/middleware.go b/internal/grpc/middleware/housekeeping/middleware.go index 70fca20baa..398f269c75 100644 --- a/internal/grpc/middleware/housekeeping/middleware.go +++ b/internal/grpc/middleware/housekeeping/middleware.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/internal/featureflag" "gitlab.com/gitlab-org/gitaly/v18/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/housekeeping/config" "gitlab.com/gitlab-org/gitaly/v18/internal/git/housekeeping/manager" "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" "gitlab.com/gitlab-org/gitaly/v18/internal/git/stats" @@ -19,10 +20,50 @@ import ( "google.golang.org/protobuf/proto" ) +// OperationThreshold defines when a specific operation should be triggered. +type OperationThreshold struct { + // RPCInterval is the number of write RPCs after which this operation should potentially run. + RPCInterval int + // StatThreshold is the minimum file + directory count to trigger this operation on accessor RPCs. + StatThreshold int +} + +// MiddlewareConfig holds configuration for all housekeeping operations. +type MiddlewareConfig struct { + // OperationThresholds maps each operation type to its triggering thresholds. + OperationThresholds map[config.OperationType]OperationThreshold + // DefaultThresholds provides a default threshold for operations which aren't explicitly configured. + DefaultThresholds OperationThreshold +} + +// DefaultMiddlewareConfig returns the default configuration with pack-refs running more frequently. +func DefaultMiddlewareConfig() MiddlewareConfig { + return MiddlewareConfig{ + DefaultThresholds: OperationThreshold{ + RPCInterval: 20, + StatThreshold: 1000, + }, + OperationThresholds: map[config.OperationType]OperationThreshold{ + config.OpRepackRefs: { + RPCInterval: 10, + StatThreshold: 500, + }, + }, + } +} + // activity tracks housekeeping activity for a specific relative path. type activity struct { writeCount int active bool + // lastRunAt tracks the write count at which each operation last ran. + lastRunAt map[config.OperationType]int +} + +func newActivity() *activity { + return &activity{ + lastRunAt: make(map[config.OperationType]int), + } } type repoKey struct { @@ -31,7 +72,7 @@ type repoKey struct { // Middleware manages scheduling of housekeeping tasks by intercepting gRPC requests. type Middleware struct { - interval int + config MiddlewareConfig repoActivity map[repoKey]*activity mu sync.Mutex @@ -42,28 +83,34 @@ type Middleware struct { manager manager.Manager localRepoFactory localrepo.Factory statsCache *sync.Map - statThreshold int } -// forceHousekeepingRPCs are all of the RPCs that we should force housekeeping right after. +// forceHousekeepingRPCs are all the RPCs that we should force housekeeping right after. var forceHousekeepingRPCs = map[string]struct{}{ gitalypb.CleanupService_RewriteHistory_FullMethodName: {}, } // NewHousekeepingMiddleware returns a new middleware. -func NewHousekeepingMiddleware(logger log.Logger, registry *protoregistry.Registry, factory localrepo.Factory, manager manager.Manager, interval int) *Middleware { +func NewHousekeepingMiddleware(logger log.Logger, registry *protoregistry.Registry, factory localrepo.Factory, manager manager.Manager, cfg MiddlewareConfig) *Middleware { return &Middleware{ - interval: interval, + config: cfg, logger: logger, registry: registry, localRepoFactory: factory, manager: manager, repoActivity: make(map[repoKey]*activity), statsCache: &sync.Map{}, - statThreshold: 1000, } } +// getThresholds returns the thresholds for a given operation type. +func (m *Middleware) getThresholds(op config.OperationType) OperationThreshold { + if thresholds, ok := m.config.OperationThresholds[op]; ok { + return thresholds + } + return m.config.DefaultThresholds +} + // WaitForWorkers waits for any active housekeeping tasks to finish. func (m *Middleware) WaitForWorkers() { m.wg.Wait() @@ -193,14 +240,11 @@ func (m *Middleware) StreamServerInterceptor() grpc.StreamServerInterceptor { func (m *Middleware) markHousekeepingActive(key repoKey) { a, ok := m.repoActivity[key] if !ok { - a = &activity{} + a = newActivity() m.repoActivity[key] = a } a.active = true - // Reset the counter at the start so we can track the number of write RPCs that executed while a housekeeping - // job is active. - a.writeCount = 0 } func (m *Middleware) markHousekeepingInactive(key repoKey) { @@ -209,13 +253,12 @@ func (m *Middleware) markHousekeepingInactive(key repoKey) { a, ok := m.repoActivity[key] if !ok { - a = &activity{} + a = newActivity() m.repoActivity[key] = a } - // Since we reset the counter at the beginning of housekeeping, if the counter remains at 0 after housekeeping - // it means the repository is low-activity. We can remove the entry in the map if so. - if a.writeCount == 0 { + // If no operations have run yet, remove the entry to save memory. + if len(a.lastRunAt) == 0 { delete(m.repoActivity, key) return } @@ -232,6 +275,42 @@ func (m *Middleware) isActive(key repoKey) bool { return a.active } +// allOperations contains all known operation types. +var allOperations = []config.OperationType{ + config.OpRepackRefs, config.OpRepackObjects, config.OpPruneObjects, config.OpWriteCommitGraph, +} + +// pendingOperations returns operations that have exceeded their RPC interval thresholds. +func (m *Middleware) pendingOperations(a *activity, force bool) []config.OperationType { + var pending []config.OperationType + + for _, op := range allOperations { + thresholds := m.getThresholds(op) + lastRun := a.lastRunAt[op] + writesSinceLastRun := a.writeCount - lastRun + + if force || writesSinceLastRun > thresholds.RPCInterval { + pending = append(pending, op) + } + } + + return pending +} + +// operationsExceedingStatThreshold returns operations whose stat threshold is exceeded. +func (m *Middleware) operationsExceedingStatThreshold(statCount int) []config.OperationType { + var ops []config.OperationType + + for _, op := range allOperations { + thresholds := m.getThresholds(op) + if statCount > thresholds.StatThreshold { + ops = append(ops, op) + } + } + + return ops +} + func (m *Middleware) scheduleHousekeeping(ctx context.Context, repo *gitalypb.Repository, force bool) { m.mu.Lock() defer m.mu.Unlock() @@ -240,16 +319,29 @@ func (m *Middleware) scheduleHousekeeping(ctx context.Context, repo *gitalypb.Re a, ok := m.repoActivity[key] if !ok { - a = &activity{} + a = newActivity() m.repoActivity[key] = a } a.writeCount++ - if a.active || (a.writeCount <= m.interval && !force) { + if a.active { + return + } + + pendingOps := m.pendingOperations(a, force) + if len(pendingOps) == 0 { return } - m.logger.WithField("forced", force).InfoContext(ctx, "beginning scheduled housekeeping") + m.logger.WithFields(log.Fields{ + "forced": force, + "operations": pendingOps, + }).InfoContext(ctx, "beginning scheduled housekeeping") + + // Mark that these operations are running at the current write count + for _, op := range pendingOps { + a.lastRunAt[op] = a.writeCount + } m.markHousekeepingActive(key) @@ -270,7 +362,7 @@ func (m *Middleware) scheduleHousekeeping(ctx context.Context, repo *gitalypb.Re localRepo := m.localRepoFactory.Build(repo) if err := m.manager.OptimizeRepository(housekeepingCtx, localRepo, manager.WithOptimizationStrategyConstructor( func(info stats.RepositoryInfo) housekeeping.OptimizationStrategy { - return housekeeping.NewHeuristicalOptimizationStrategy(info) + return housekeeping.NewSelectiveOptimizationStrategy(info, pendingOps) }, )); err != nil { m.logger.WithError(err).ErrorContext(housekeepingCtx, "failed scheduled housekeeping") @@ -293,21 +385,23 @@ func (m *Middleware) scheduleHousekeepingIfNeeded(ctx context.Context, key repoK return } - stats := snapshot.RepositoryStatistics{} - if err := snapshot.WalkPathForStats(ctx, repositoryPath, &stats); err != nil { + repoStats := snapshot.RepositoryStatistics{} + if err := snapshot.WalkPathForStats(ctx, repositoryPath, &repoStats); err != nil { m.logger.WithError(err).ErrorContext(ctx, "calculate repository statistics") return } m.logger.WithFields(log.Fields{ "repository_stats": map[string]any{ - "directory_count": stats.DirectoryCount, - "file_count": stats.FileCount, + "directory_count": repoStats.DirectoryCount, + "file_count": repoStats.FileCount, }, }).InfoContext(ctx, "collected repository statistics") m.statsCache.Store(key, struct{}{}) - if stats.DirectoryCount+stats.FileCount > m.statThreshold { + totalCount := repoStats.DirectoryCount + repoStats.FileCount + ops := m.operationsExceedingStatThreshold(totalCount) + if len(ops) > 0 { m.scheduleHousekeeping(ctx, targetRepo, true) } } diff --git a/internal/grpc/middleware/housekeeping/middleware_test.go b/internal/grpc/middleware/housekeeping/middleware_test.go index 397ed2e209..157e3b9ed4 100644 --- a/internal/grpc/middleware/housekeeping/middleware_test.go +++ b/internal/grpc/middleware/housekeeping/middleware_test.go @@ -15,6 +15,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v18/internal/git/housekeeping/config" housekeepingmgr "gitlab.com/gitlab-org/gitaly/v18/internal/git/housekeeping/manager" "gitlab.com/gitlab-org/gitaly/v18/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v18/internal/git/stats" gitalycfg "gitlab.com/gitlab-org/gitaly/v18/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v18/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v18/internal/testhelper" @@ -89,8 +90,11 @@ func (*healthServer) Check(context.Context, *healthpb.HealthCheckRequest) (*heal } type mockHousekeepingManager struct { - optimizeRepositoryInvocations map[string]int // RelativePath -> count - mu sync.Mutex + // optimizeRepositoryInvocations allows us to track how many times OptimizeRepository was called for a given repository + optimizeRepositoryInvocations map[string]int + // lastEnabledOps allows us to get the last set of operations that were scheduled to run when OptimizeRepository was called + lastEnabledOps map[string]map[config.OperationType]bool + mu sync.Mutex useDelayCh bool delayCh chan struct{} @@ -103,6 +107,13 @@ func (m *mockHousekeepingManager) getOptimizeRepositoryInvocations(relativePath return m.optimizeRepositoryInvocations[relativePath] } +func (m *mockHousekeepingManager) getLastEnabledOps(relativePath string) map[config.OperationType]bool { + m.mu.Lock() + defer m.mu.Unlock() + + return m.lastEnabledOps[relativePath] +} + func (m *mockHousekeepingManager) withDelay() chan struct{} { m.mu.Lock() defer m.mu.Unlock() @@ -135,6 +146,21 @@ func (m *mockHousekeepingManager) OptimizeRepository(ctx context.Context, repo * m.optimizeRepositoryInvocations[relativePath]++ + // Extract enabled operations from the strategy constructor + var cfg housekeepingmgr.OptimizeRepositoryConfig + for _, opt := range opts { + opt(&cfg) + } + if cfg.StrategyConstructor != nil { + strategy := cfg.StrategyConstructor(stats.RepositoryInfo{}) + if selective, ok := strategy.(*housekeeping.SelectiveOptimizationStrategy); ok { + if m.lastEnabledOps == nil { + m.lastEnabledOps = make(map[string]map[config.OperationType]bool) + } + m.lastEnabledOps[relativePath] = selective.EnabledOps() + } + } + if m.useDelayCh { <-m.delayCh } @@ -150,6 +176,24 @@ func (m *mockHousekeepingManager) RehydrateRepository(ctx context.Context, repo return nil } +// testMiddlewareConfig returns a MiddlewareConfig for testing with all operations +// using the same interval and threshold for simpler test assertions. +func testMiddlewareConfig(interval, statThreshold int) MiddlewareConfig { + thresholds := OperationThreshold{ + RPCInterval: interval, + StatThreshold: statThreshold, + } + return MiddlewareConfig{ + DefaultThresholds: thresholds, + OperationThresholds: map[config.OperationType]OperationThreshold{ + config.OpRepackRefs: thresholds, + config.OpRepackObjects: thresholds, + config.OpPruneObjects: thresholds, + config.OpWriteCommitGraph: thresholds, + }, + } +} + func TestInterceptors(t *testing.T) { testhelper.NewFeatureSets( featureflag.HousekeepingMiddleware, @@ -169,7 +213,9 @@ func testInterceptors(t *testing.T, ctx context.Context) { delayCh: make(chan struct{}), } - housekeepingMiddleware := NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localRepoFactory, housekeepingManager, 1) + middlewareConfig := testMiddlewareConfig(1, 1000) + + housekeepingMiddleware := NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localRepoFactory, housekeepingManager, middlewareConfig) defer housekeepingMiddleware.WaitForWorkers() server := grpc.NewServer( @@ -514,18 +560,40 @@ func testInterceptors(t *testing.T, ctx context.Context) { SkipCreationViaService: true, }) - // Setting low threshold to easily pass it - housekeepingMiddleware.statThreshold = 1 + // Create a new middleware with low threshold to easily pass it + lowThresholdConfig := testMiddlewareConfig(1, 1) + lowThresholdMiddleware := NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localRepoFactory, housekeepingManager, lowThresholdConfig) + defer lowThresholdMiddleware.WaitForWorkers() + + lowThresholdServer := grpc.NewServer( + grpc.StreamInterceptor(lowThresholdMiddleware.StreamServerInterceptor()), + grpc.UnaryInterceptor(lowThresholdMiddleware.UnaryServerInterceptor()), + ) + t.Cleanup(lowThresholdServer.Stop) + + gitalypb.RegisterRepositoryServiceServer(lowThresholdServer, service) + + lowThresholdListener, err := net.Listen("tcp", ":0") + require.NoError(t, err) + go func() { + testhelper.MustServe(t, lowThresholdServer, lowThresholdListener) + }() + + lowThresholdConn, err := grpc.NewClient( + lowThresholdListener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer testhelper.MustClose(t, lowThresholdConn) initialCount := housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()) - _, err = gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + _, err = gitalypb.NewRepositoryServiceClient(lowThresholdConn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ Repository: repo, }) require.NoError(t, err) - // Wait for any async housekeeping to complete - housekeepingMiddleware.WaitForWorkers() + lowThresholdMiddleware.WaitForWorkers() newCount := housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()) @@ -537,13 +605,12 @@ func testInterceptors(t *testing.T, ctx context.Context) { ) // Next request should not trigger housekeeping as it is added to the stats cache - _, err = gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + _, err = gitalypb.NewRepositoryServiceClient(lowThresholdConn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ Repository: repo, }) require.NoError(t, err) - // Wait for any async housekeeping to complete - housekeepingMiddleware.WaitForWorkers() - // Verify that housekeeping was not triggered + + lowThresholdMiddleware.WaitForWorkers() require.Equal(t, newCount, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), @@ -556,9 +623,6 @@ func testInterceptors(t *testing.T, ctx context.Context) { SkipCreationViaService: true, }) - // Setting high threshold to stay below it - housekeepingMiddleware.statThreshold = 1000 - initialCount := housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()) _, err = gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ @@ -577,3 +641,162 @@ func testInterceptors(t *testing.T, ctx context.Context) { ) }) } + +func TestIndependentOperationThresholds(t *testing.T) { + testhelper.NewFeatureSets( + featureflag.HousekeepingMiddleware, + ).Run(t, testIndependentOperationThresholds) +} + +func testIndependentOperationThresholds(t *testing.T, ctx context.Context) { + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + localRepoFactory := localrepo.NewFactory(logger, gitalycfg.NewLocator(cfg), gittest.NewCommandFactory(t, cfg), catfileCache) + + housekeepingManager := &mockHousekeepingManager{ + optimizeRepositoryInvocations: make(map[string]int), + delayCh: make(chan struct{}), + } + + // Configure different thresholds for different operations + middlewareConfig := MiddlewareConfig{ + DefaultThresholds: OperationThreshold{ + RPCInterval: 20, + StatThreshold: 1000, + }, + OperationThresholds: map[config.OperationType]OperationThreshold{ + config.OpRepackRefs: { + RPCInterval: 2, + StatThreshold: 500, + }, + config.OpRepackObjects: { + RPCInterval: 5, + StatThreshold: 1000, + }, + }, + } + + housekeepingMiddleware := NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localRepoFactory, housekeepingManager, middlewareConfig) + defer housekeepingMiddleware.WaitForWorkers() + + server := grpc.NewServer( + grpc.StreamInterceptor(housekeepingMiddleware.StreamServerInterceptor()), + grpc.UnaryInterceptor(housekeepingMiddleware.UnaryServerInterceptor()), + ) + t.Cleanup(server.Stop) + + service := &testService{} + gitalypb.RegisterRepositoryServiceServer(server, service) + + listener, err := net.Listen("tcp", ":0") + require.NoError(t, err) + go func() { + testhelper.MustServe(t, server, listener) + }() + + conn, err := grpc.NewClient( + listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer testhelper.MustClose(t, conn) + + t.Run("pack-refs triggers before repack due to lower threshold", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "independent-thresholds-repo", + } + + sendFn := func() { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + sendFn() + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), + "no housekeeping after 1 RPC") + + sendFn() + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 0, 0), + housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), + "no housekeeping AT threshold") + + sendFn() + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 1, 0), + housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), + "housekeeping triggered after pack-refs passing threshold") + + sendFn() + sendFn() + sendFn() + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 2, 0), + housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), + "housekeeping triggered again after pack-refs threshold") + }) + + t.Run("repack-refs and repack-objects run at independent intervals", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "independent-intervals-repo", + } + + sendFn := func() { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + // With RPCInterval: 2 for pack-refs and RPCInterval: 5 for repack-objects: + // pack-refs triggers when writeCount > 2 (after 3 RPCs) + // repack-objects triggers when writeCount > 5 (after 6 RPCs) + + // Send 3 RPCs - pack-refs threshold exceeded (3 > 2), repack-objects not (3 < 5) + for i := 0; i < 3; i++ { + sendFn() + } + housekeepingMiddleware.WaitForWorkers() + + require.Equal(t, + testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 1, 0), + housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), + "housekeeping should trigger after pack-refs threshold (3 RPCs)") + + // Verify pack-refs was requested but NOT repack-objects + if featureflag.HousekeepingMiddleware.IsEnabled(ctx) { + ops := housekeepingManager.getLastEnabledOps(repo.GetRelativePath()) + require.True(t, ops[config.OpRepackRefs], + "pack-refs should be enabled after 3 RPCs") + require.False(t, ops[config.OpRepackObjects], + "repack-objects should NOT be enabled after only 3 RPCs") + } + + // Send 3 more RPCs + // pack-refs: 3 > 2, triggers again + // repack-objects: 6 > 5, triggers now + for i := 0; i < 3; i++ { + sendFn() + } + housekeepingMiddleware.WaitForWorkers() + + require.Equal(t, + testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 2, 0), + housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), + "housekeeping should trigger again after 6 total RPCs") + + if featureflag.HousekeepingMiddleware.IsEnabled(ctx) { + ops := housekeepingManager.getLastEnabledOps(repo.GetRelativePath()) + require.True(t, ops[config.OpRepackRefs], + "pack-refs should be enabled") + require.True(t, ops[config.OpRepackObjects], + "repack-objects should now be enabled after 6 RPCs") + } + }) +} -- GitLab