From 324a9ef4fbbdc5d5b85e5dce0dbcc507570670e8 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 20 May 2025 14:16:11 +1000 Subject: [PATCH 1/3] housekeeping: Add middleware feature flag --- internal/featureflag/ff_housekeeping_middleware.go | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 internal/featureflag/ff_housekeeping_middleware.go diff --git a/internal/featureflag/ff_housekeeping_middleware.go b/internal/featureflag/ff_housekeeping_middleware.go new file mode 100644 index 0000000000..bbbd4771f3 --- /dev/null +++ b/internal/featureflag/ff_housekeeping_middleware.go @@ -0,0 +1,9 @@ +package featureflag + +// HousekeepingMiddleware enables the housekeeping scheduler middleware. +var HousekeepingMiddleware = NewFeatureFlag( + "housekeeping_middleware", + "v18.1.0", + "https://gitlab.com/gitlab-org/gitaly/-/issues/6761", + false, +) -- GitLab From e1ed4de3bd8d7209d433cc80fb28bf558c58f342 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 20 May 2025 16:04:48 +1000 Subject: [PATCH 2/3] housekeeping: Add scheduling middleware We are not currently performing repository housekeeping often enough, leading to repositories accumulating hundreds of packfiles before they get repacked. This leads to performance degradation, especially when transactions and the WAL are enabled. Housekeeping was historically scheduled in two places: - By Rails, after a specific (200) number of pushes was performed against a repository. Once this threshold was reached, Rails would invoke the OptimizeRepository RPC. - In Gitaly's DailyOptimizationWorker, which randomly walks storages and executes the OptimizeRepository function on it. This does not ensure full coverage as the worker only executes for a certain duration. Adding scheduling middleware is the first step to re-integrating housekeeping tasks into Gitaly, removing reliance on Rails. The middleware is transaction-independent and intercepts all gRPC requests. It ensures that at most one invocation of OptimizeRepository runs for each repository at a given time, and this applies to housekeeping tasks it starts itself as well as those requested via RPC. --- .../middleware/housekeeping/middleware.go | 262 ++++++++++++ .../housekeeping/middleware_test.go | 395 ++++++++++++++++++ .../housekeeping/testhelper_test.go | 11 + 3 files changed, 668 insertions(+) create mode 100644 internal/grpc/middleware/housekeeping/middleware.go create mode 100644 internal/grpc/middleware/housekeeping/middleware_test.go create mode 100644 internal/grpc/middleware/housekeeping/testhelper_test.go diff --git a/internal/grpc/middleware/housekeeping/middleware.go b/internal/grpc/middleware/housekeeping/middleware.go new file mode 100644 index 0000000000..8b0a8f61cb --- /dev/null +++ b/internal/grpc/middleware/housekeeping/middleware.go @@ -0,0 +1,262 @@ +package housekeeping + +import ( + "context" + "sync" + + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/manager" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/stats" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/log" + "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" + "gitlab.com/gitlab-org/gitaly/v16/middleware" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +// activity tracks housekeeping activity for a specific relative path. +type activity struct { + writeCount int + active bool +} + +type repoKey struct { + storage, relativePath string +} + +// Middleware manages scheduling of housekeeping tasks by intercepting gRPC requests. +type Middleware struct { + interval int + repoActivity map[repoKey]*activity + + mu sync.Mutex + wg sync.WaitGroup + + logger log.Logger + registry *protoregistry.Registry + manager manager.Manager + localRepoFactory localrepo.Factory +} + +// NewHousekeepingMiddleware returns a new middleware. +func NewHousekeepingMiddleware(logger log.Logger, registry *protoregistry.Registry, factory localrepo.Factory, manager manager.Manager, interval int) *Middleware { + return &Middleware{ + interval: interval, + logger: logger, + registry: registry, + localRepoFactory: factory, + manager: manager, + repoActivity: make(map[repoKey]*activity), + } +} + +// WaitForWorkers waits for any active housekeeping tasks to finish. +func (m *Middleware) WaitForWorkers() { + m.wg.Wait() +} + +// UnaryServerInterceptor returns gRPC unary middleware. +func (m *Middleware) UnaryServerInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + if featureflag.HousekeepingMiddleware.IsDisabled(ctx) { + return handler(ctx, req) + } + + methodInfo, err := m.registry.LookupMethod(info.FullMethod) + if err != nil { + m.logger.WithError(err).ErrorContext(ctx, "lookup method for housekeeping") + return handler(ctx, req) + } + + targetRepo, err := methodInfo.TargetRepo(req.(proto.Message)) + if err != nil { + m.logger.WithError(err).ErrorContext(ctx, "lookup target repository for housekeeping") + return handler(ctx, req) + } + + key := m.getRepoKey(targetRepo) + + if methodInfo.Operation == protoregistry.OpMaintenance { + m.mu.Lock() + + if m.isActive(key) { + m.mu.Unlock() + return nil, structerr.NewAlreadyExists("housekeeping already executing for repository") + } + m.markHousekeepingActive(key) + + m.mu.Unlock() + + resp, err := handler(ctx, req) + + m.markHousekeepingInactive(key) + + return resp, err + } + + if methodInfo.Operation == protoregistry.OpMutator { + // Execute the handler first so that housekeeping incorporates the latest writes. We also ensure that + // the scheduling logic doesn't run for invalid requests. + resp, err := handler(ctx, req) + if err != nil { + return resp, err + } + + m.scheduleHousekeeping(ctx, targetRepo) + return resp, err + } + + return handler(ctx, req) + } +} + +// StreamServerInterceptor returns gRPC stream request middleware. +func (m *Middleware) StreamServerInterceptor() grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if featureflag.HousekeepingMiddleware.IsDisabled(ss.Context()) { + return handler(srv, ss) + } + + methodInfo, err := m.registry.LookupMethod(info.FullMethod) + if err != nil { + m.logger.WithError(err).ErrorContext(ss.Context(), "lookup method for housekeeping") + return handler(srv, ss) + } + + req := methodInfo.NewRequest() + if err := ss.RecvMsg(req); err != nil { + m.logger.WithError(err).ErrorContext(ss.Context(), "lookup target repository for housekeeping") + return handler(srv, middleware.NewPeekedStream(ss.Context(), nil, err, ss)) + } + + targetRepo, err := methodInfo.TargetRepo(req) + if err != nil { + return handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) + } + + key := m.getRepoKey(targetRepo) + + if methodInfo.Operation == protoregistry.OpMaintenance { + m.mu.Lock() + if m.isActive(key) { + m.mu.Unlock() + return structerr.NewAlreadyExists("housekeeping already executing for repository") + } + + m.markHousekeepingActive(key) + m.mu.Unlock() + + // Ensure that the first message we consumed earlier is relayed to the client. + err = handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) + + m.markHousekeepingInactive(key) + + return err + } + + if methodInfo.Operation == protoregistry.OpMutator { + // Execute the handler first so that housekeeping incorporates the latest writes. We also ensure that + // the scheduling logic doesn't run for invalid requests. + if err := handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)); err != nil { + return err + } + + m.scheduleHousekeeping(ss.Context(), targetRepo) + return nil + } + + // Ensure that the first message we consumed earlier is relayed to the client. + return handler(srv, middleware.NewPeekedStream(ss.Context(), req, nil, ss)) + } +} + +func (m *Middleware) markHousekeepingActive(key repoKey) { + a, ok := m.repoActivity[key] + if !ok { + a = &activity{} + m.repoActivity[key] = a + } + + a.active = true + // Reset the counter at the start so we can track the number of write RPCs that executed while a housekeeping + // job is active. + a.writeCount = 0 +} + +func (m *Middleware) markHousekeepingInactive(key repoKey) { + m.mu.Lock() + defer m.mu.Unlock() + + a, ok := m.repoActivity[key] + if !ok { + a = &activity{} + m.repoActivity[key] = a + } + + // Since we reset the counter at the beginning of housekeeping, if the counter remains at 0 after housekeeping + // it means the repository is low-activity. We can remove the entry in the map if so. + if a.writeCount == 0 { + delete(m.repoActivity, key) + return + } + + a.active = false +} + +func (m *Middleware) isActive(key repoKey) bool { + a, ok := m.repoActivity[key] + if !ok { + return false + } + + return a.active +} + +func (m *Middleware) scheduleHousekeeping(ctx context.Context, repo *gitalypb.Repository) { + m.mu.Lock() + defer m.mu.Unlock() + + key := m.getRepoKey(repo) + + a, ok := m.repoActivity[key] + if !ok { + a = &activity{} + m.repoActivity[key] = a + } + a.writeCount++ + + if a.writeCount <= m.interval || a.active { + return + } + + m.logger.InfoContext(ctx, "beginning scheduled housekeeping") + + m.markHousekeepingActive(key) + + m.wg.Add(1) + go func() { + defer func() { + m.markHousekeepingInactive(key) + + m.logger.InfoContext(ctx, "ended scheduled housekeeping") + m.wg.Done() + }() + + localRepo := m.localRepoFactory.Build(repo) + if err := m.manager.OptimizeRepository(ctx, localRepo, manager.WithOptimizationStrategyConstructor( + func(info stats.RepositoryInfo) housekeeping.OptimizationStrategy { + return housekeeping.NewHeuristicalOptimizationStrategy(info) + }, + )); err != nil { + m.logger.ErrorContext(ctx, "failed scheduled housekeeping") + } + }() +} + +func (m *Middleware) getRepoKey(repo *gitalypb.Repository) repoKey { + return repoKey{storage: repo.GetStorageName(), relativePath: repo.GetRelativePath()} +} diff --git a/internal/grpc/middleware/housekeeping/middleware_test.go b/internal/grpc/middleware/housekeeping/middleware_test.go new file mode 100644 index 0000000000..54b02d7d55 --- /dev/null +++ b/internal/grpc/middleware/housekeeping/middleware_test.go @@ -0,0 +1,395 @@ +package housekeeping + +import ( + "context" + "fmt" + "net" + "sync" + "testing" + + "github.com/stretchr/testify/require" + "gitlab.com/gitlab-org/gitaly/v16/internal/featureflag" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/catfile" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/gittest" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/config" + housekeepingmgr "gitlab.com/gitlab-org/gitaly/v16/internal/git/housekeeping/manager" + "gitlab.com/gitlab-org/gitaly/v16/internal/git/localrepo" + gitalycfg "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" + "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper/testcfg" + "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type testService struct { + gitalypb.UnimplementedRepositoryServiceServer +} + +// Mutator, Unary +func (ts *testService) WriteRef(context.Context, *gitalypb.WriteRefRequest) (*gitalypb.WriteRefResponse, error) { + return &gitalypb.WriteRefResponse{}, nil +} + +// Mutator, Unary, Erroring +func (ts *testService) CreateRepositoryFromBundle(grpc.ClientStreamingServer[gitalypb.CreateRepositoryFromBundleRequest, gitalypb.CreateRepositoryFromBundleResponse]) error { + return fmt.Errorf("designed to error") +} + +// Accessor, Unary +func (ts *testService) RepositoryExists(context.Context, *gitalypb.RepositoryExistsRequest) (*gitalypb.RepositoryExistsResponse, error) { + return &gitalypb.RepositoryExistsResponse{}, nil +} + +// Accessor, Stream +func (ts *testService) GetArchive(*gitalypb.GetArchiveRequest, grpc.ServerStreamingServer[gitalypb.GetArchiveResponse]) error { + return nil +} + +// Accessor, Unary, Erroring +func (ts *testService) RepositoryInfo(context.Context, *gitalypb.RepositoryInfoRequest) (*gitalypb.RepositoryInfoResponse, error) { + return &gitalypb.RepositoryInfoResponse{}, fmt.Errorf("designed to error") +} + +// Maintenance, Unary +func (ts *testService) OptimizeRepository(context.Context, *gitalypb.OptimizeRepositoryRequest) (*gitalypb.OptimizeRepositoryResponse, error) { + return nil, nil +} + +// Maintenance, Unary +func (ts *testService) PruneUnreachableObjects(context.Context, *gitalypb.PruneUnreachableObjectsRequest) (*gitalypb.PruneUnreachableObjectsResponse, error) { + return nil, nil +} + +type mockHousekeepingManager struct { + optimizeRepositoryInvocations map[string]int // RelativePath -> count + mu sync.Mutex + + useDelayCh bool + delayCh chan struct{} +} + +func (m *mockHousekeepingManager) getOptimizeRepositoryInvocations(relativePath string) int { + m.mu.Lock() + defer m.mu.Unlock() + + return m.optimizeRepositoryInvocations[relativePath] +} + +func (m *mockHousekeepingManager) withDelay() chan struct{} { + m.mu.Lock() + defer m.mu.Unlock() + + m.delayCh = make(chan struct{}) + m.useDelayCh = true + + return m.delayCh +} + +func (m *mockHousekeepingManager) withoutDelay() { + m.mu.Lock() + defer m.mu.Unlock() + + m.useDelayCh = false +} + +func (m *mockHousekeepingManager) CleanStaleData(context.Context, *localrepo.Repo, housekeeping.CleanStaleDataConfig) error { + return nil +} + +func (m *mockHousekeepingManager) OptimizeRepository(ctx context.Context, repo *localrepo.Repo, opts ...housekeepingmgr.OptimizeRepositoryOption) error { + m.mu.Lock() + defer m.mu.Unlock() + + relativePath := repo.GetRelativePath() + if _, ok := m.optimizeRepositoryInvocations[relativePath]; !ok { + m.optimizeRepositoryInvocations[relativePath] = 0 + } + + m.optimizeRepositoryInvocations[relativePath]++ + + if m.useDelayCh { + <-m.delayCh + } + + return nil +} + +func (m *mockHousekeepingManager) AddPackRefsInhibitor(ctx context.Context, repo storage.Repository) (bool, func(), error) { + return false, nil, nil +} + +func (m *mockHousekeepingManager) OffloadRepository(context.Context, *localrepo.Repo, config.OffloadingConfig) error { + return nil +} + +func TestInterceptors(t *testing.T) { + testhelper.NewFeatureSets( + featureflag.HousekeepingMiddleware, + ).Run(t, testInterceptors) +} + +func testInterceptors(t *testing.T, ctx context.Context) { + cfg := testcfg.Build(t) + logger := testhelper.NewLogger(t) + catfileCache := catfile.NewCache(cfg) + t.Cleanup(catfileCache.Stop) + + localRepoFactory := localrepo.NewFactory(logger, gitalycfg.NewLocator(cfg), gittest.NewCommandFactory(t, cfg), catfileCache) + + housekeepingManager := &mockHousekeepingManager{ + optimizeRepositoryInvocations: make(map[string]int), + delayCh: make(chan struct{}), + } + + housekeepingMiddleware := NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localRepoFactory, housekeepingManager, 1) + defer housekeepingMiddleware.WaitForWorkers() + + server := grpc.NewServer( + grpc.StreamInterceptor(housekeepingMiddleware.StreamServerInterceptor()), + grpc.UnaryInterceptor(housekeepingMiddleware.UnaryServerInterceptor()), + ) + t.Cleanup(server.Stop) + + service := &testService{} + + gitalypb.RegisterRepositoryServiceServer(server, service) + + listener, err := net.Listen("tcp", ":0") + require.NoError(t, err) + go func() { + testhelper.MustServe(t, server, listener) + }() + + conn, err := grpc.NewClient( + listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer testhelper.MustClose(t, conn) + + t.Run("when unary mutator RPCs are intercepted", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo1", + } + + sendFn := func() { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations under the interval") + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 1, 0), housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "one invocation after the interval") + + for range 2 { + sendFn() + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 2, 0), housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "another invocation after the interval") + }) + + t.Run("when unary accessor RPCs are intercepted", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo2", + } + + sendFn := func() { + _, err = gitalypb.NewRepositoryServiceClient(conn).RepositoryExists(ctx, &gitalypb.RepositoryExistsRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations under the interval") + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations after the interval") + }) + + t.Run("when stream accessor RPCs are intercepted", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo3", + } + + sendFn := func() { + stream, err := gitalypb.NewRepositoryServiceClient(conn).GetArchive(ctx, &gitalypb.GetArchiveRequest{ + Repository: repo, + }) + require.NoError(t, err) + require.NoError(t, stream.CloseSend()) + } + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations under the interval") + + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations after the interval") + }) + + t.Run("when an erroring RPC is intercepted", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo4", + } + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).RepositoryInfo(ctx, &gitalypb.RepositoryInfoRequest{ + Repository: repo, + }) + require.EqualError(t, err, "rpc error: code = Unknown desc = designed to error", "middleware preserves the original error") + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations after the interval") + + for range 2 { + stream, err := gitalypb.NewRepositoryServiceClient(conn).CreateRepositoryFromBundle(ctx) + require.NoError(t, err) + require.NoError(t, stream.Send(&gitalypb.CreateRepositoryFromBundleRequest{ + Repository: repo, + })) + + _, err = stream.CloseAndRecv() + require.EqualError(t, err, "rpc error: code = Unknown desc = designed to error", "middleware preserves the original error") + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations after the interval") + }) + + t.Run("when the OptimizeRepository RPC is invoked", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo5", + } + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "does not schedule further housekeeping") + }) + + t.Run("when the PruneUnreachableObjects RPC is invoked", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo6", + } + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).PruneUnreachableObjects(ctx, &gitalypb.PruneUnreachableObjectsRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, 0, housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "does not schedule further housekeeping") + }) + + t.Run("when a housekeeping task is active when a maintenance RPC is received", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo7", + } + + ch := housekeepingManager.withDelay() + defer housekeepingManager.withoutDelay() + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + _, err = gitalypb.NewRepositoryServiceClient(conn).OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: repo, + }) + + if featureflag.HousekeepingMiddleware.IsEnabled(ctx) { + require.EqualError(t, err, "rpc error: code = AlreadyExists desc = housekeeping already executing for repository") + } else { + require.NoError(t, err) + } + + close(ch) + }) + + t.Run("when a maintenance RPC is active and the write interval is reached", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo8", + } + + ch := housekeepingManager.withDelay() + defer housekeepingManager.withoutDelay() + + _, err = gitalypb.NewRepositoryServiceClient(conn).OptimizeRepository(ctx, &gitalypb.OptimizeRepositoryRequest{ + Repository: repo, + }) + + for range 2 { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + close(ch) + + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 1, 0), housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "no invocations under the interval") + }) + + t.Run("when the write interval is reached again when housekeeping is active", func(t *testing.T) { + repo := &gitalypb.Repository{ + RelativePath: "myrepo9", + } + + sendFn := func() { + _, err = gitalypb.NewRepositoryServiceClient(conn).WriteRef(ctx, &gitalypb.WriteRefRequest{ + Repository: repo, + }) + require.NoError(t, err) + } + + ch := housekeepingManager.withDelay() + defer housekeepingManager.withoutDelay() + + // The first two requests will trigger housekeeping that runs until ch is closed. + // The next two requests won't trigger housekeeping as there's already an active job. + for range 4 { + sendFn() + } + + // Release the active housekeeping job. + close(ch) + + // The next request triggers housekeeping as the counter has already incremented past the interval. + sendFn() + + housekeepingMiddleware.WaitForWorkers() + require.Equal(t, testhelper.EnabledOrDisabledFlag(ctx, featureflag.HousekeepingMiddleware, 2, 0), housekeepingManager.getOptimizeRepositoryInvocations(repo.GetRelativePath()), "another invocation after the interval") + }) +} diff --git a/internal/grpc/middleware/housekeeping/testhelper_test.go b/internal/grpc/middleware/housekeeping/testhelper_test.go new file mode 100644 index 0000000000..e41506673e --- /dev/null +++ b/internal/grpc/middleware/housekeeping/testhelper_test.go @@ -0,0 +1,11 @@ +package housekeeping + +import ( + "testing" + + "gitlab.com/gitlab-org/gitaly/v16/internal/testhelper" +) + +func TestMain(m *testing.M) { + testhelper.Run(m) +} -- GitLab From 50b69e97099e7f97637fcd0077fdd03034ec5a47 Mon Sep 17 00:00:00 2001 From: James Liu Date: Tue, 20 May 2025 16:05:55 +1000 Subject: [PATCH 3/3] housekeeping: Integrate middleware with server Instantiates the housekeeping middleware and appends it to the list of gRPC interceptors. The middleware is optional as most tests don't need to explicitly configure it, and the use of feature flags would necessitate the use of feature sets if the middleware was included by default. --- cmd/gitaly-ssh/auth_test.go | 1 + internal/cli/gitaly/serve.go | 5 +++ internal/gitaly/server/auth_test.go | 3 +- internal/gitaly/server/server.go | 5 +++ internal/gitaly/server/server_factory.go | 32 +++++++++++-------- internal/gitaly/server/server_factory_test.go | 6 ++++ internal/testhelper/testserver/gitaly.go | 1 + 7 files changed, 38 insertions(+), 15 deletions(-) diff --git a/cmd/gitaly-ssh/auth_test.go b/cmd/gitaly-ssh/auth_test.go index 3051212019..088005601d 100644 --- a/cmd/gitaly-ssh/auth_test.go +++ b/cmd/gitaly-ssh/auth_test.go @@ -268,6 +268,7 @@ func TestConnectivity(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, server.TransactionMiddleware{}, ) t.Cleanup(sf.Stop) diff --git a/internal/cli/gitaly/serve.go b/internal/cli/gitaly/serve.go index 848c0ac616..b5ea271c7a 100644 --- a/internal/cli/gitaly/serve.go +++ b/internal/cli/gitaly/serve.go @@ -46,6 +46,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/gitlab" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client" + housekeepingmw "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry" "gitlab.com/gitlab-org/gitaly/v16/internal/helper" @@ -557,12 +558,16 @@ func run(appCtx *cli.Command, cfg config.Cfg, logger log.Logger) error { housekeepingManager := housekeepingmgr.New(cfg.Prometheus, logger, transactionManager, node) prometheus.MustRegister(housekeepingManager) + housekeepingMiddleware := housekeepingmw.NewHousekeepingMiddleware(logger, protoregistry.GitalyProtoPreregistered, localrepoFactory, housekeepingManager, 20) + defer housekeepingMiddleware.WaitForWorkers() + gitalyServerFactory := server.NewGitalyServerFactory( cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{perRPCLimitHandler}, + housekeepingMiddleware, txMiddleware, ) defer gitalyServerFactory.Stop() diff --git a/internal/gitaly/server/auth_test.go b/internal/gitaly/server/auth_test.go index cccced1877..a50083334c 100644 --- a/internal/gitaly/server/auth_test.go +++ b/internal/gitaly/server/auth_test.go @@ -248,7 +248,7 @@ func runServer(t *testing.T, cfg config.Cfg) string { limitHandler := limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters) updaterWithHooks := updateref.NewUpdaterWithHooks(cfg, logger, locator, hookManager, gitCmdFactory, catfileCache) - srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}, TransactionMiddleware{}).New(true, false) + srv, err := NewGitalyServerFactory(cfg, logger, registry, diskCache, []*limithandler.LimiterMiddleware{limitHandler}, nil, TransactionMiddleware{}).New(true, false) require.NoError(t, err) localRepoFactory := localrepo.NewFactory(logger, locator, gitCmdFactory, catfileCache) @@ -293,6 +293,7 @@ func runSecureServer(t *testing.T, cfg config.Cfg) string { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), []*limithandler.LimiterMiddleware{limithandler.New(cfg, limithandler.LimitConcurrencyByRepo, setupPerRPCConcurrencyLimiters)}, + nil, TransactionMiddleware{}, ).New(true, true) require.NoError(t, err) diff --git a/internal/gitaly/server/server.go b/internal/gitaly/server/server.go index dc9da257d8..273eb44e41 100644 --- a/internal/gitaly/server/server.go +++ b/internal/gitaly/server/server.go @@ -137,6 +137,11 @@ func (s *GitalyServerFactory) New(external, secure bool, opts ...Option) (*grpc. unaryServerInterceptors = append(unaryServerInterceptors, limitHandler.UnaryInterceptor()) } + if s.housekeepingMiddleware != nil { + streamServerInterceptors = append(streamServerInterceptors, s.housekeepingMiddleware.StreamServerInterceptor()) + unaryServerInterceptors = append(unaryServerInterceptors, s.housekeepingMiddleware.UnaryServerInterceptor()) + } + streamServerInterceptors = append(streamServerInterceptors, grpctracing.StreamServerTracingInterceptor(), cache.StreamInvalidator(s.cacheInvalidator, protoregistry.GitalyProtoPreregistered, s.logger), diff --git a/internal/gitaly/server/server_factory.go b/internal/gitaly/server/server_factory.go index bb4296a9b9..ca41c08cd5 100644 --- a/internal/gitaly/server/server_factory.go +++ b/internal/gitaly/server/server_factory.go @@ -6,6 +6,7 @@ import ( "gitlab.com/gitlab-org/gitaly/v16/internal/cache" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/backchannel" + "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/housekeeping" "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/middleware/limithandler" "gitlab.com/gitlab-org/gitaly/v16/internal/log" "google.golang.org/grpc" @@ -13,14 +14,15 @@ import ( // GitalyServerFactory is a factory of gitaly grpc servers type GitalyServerFactory struct { - registry *backchannel.Registry - cacheInvalidator cache.Invalidator - limitHandlers []*limithandler.LimiterMiddleware - cfg config.Cfg - logger log.Logger - externalServers []*grpc.Server - internalServers []*grpc.Server - txMiddleware TransactionMiddleware + registry *backchannel.Registry + cacheInvalidator cache.Invalidator + limitHandlers []*limithandler.LimiterMiddleware + cfg config.Cfg + logger log.Logger + externalServers []*grpc.Server + internalServers []*grpc.Server + txMiddleware TransactionMiddleware + housekeepingMiddleware *housekeeping.Middleware } // TransactionMiddleware collects transaction middleware into a single struct that can be @@ -39,15 +41,17 @@ func NewGitalyServerFactory( registry *backchannel.Registry, cacheInvalidator cache.Invalidator, limitHandlers []*limithandler.LimiterMiddleware, + housekeepingMiddleware *housekeeping.Middleware, txMiddleware TransactionMiddleware, ) *GitalyServerFactory { return &GitalyServerFactory{ - cfg: cfg, - logger: logger, - registry: registry, - cacheInvalidator: cacheInvalidator, - limitHandlers: limitHandlers, - txMiddleware: txMiddleware, + cfg: cfg, + logger: logger, + registry: registry, + cacheInvalidator: cacheInvalidator, + limitHandlers: limitHandlers, + txMiddleware: txMiddleware, + housekeepingMiddleware: housekeepingMiddleware, } } diff --git a/internal/gitaly/server/server_factory_test.go b/internal/gitaly/server/server_factory_test.go index 10ad0200e2..a61dd45b38 100644 --- a/internal/gitaly/server/server_factory_test.go +++ b/internal/gitaly/server/server_factory_test.go @@ -89,6 +89,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -110,6 +111,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -132,6 +134,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -159,6 +162,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -191,6 +195,7 @@ func TestGitalyServerFactory(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), logger), nil, + nil, TransactionMiddleware{}, ) t.Cleanup(sf.Stop) @@ -227,6 +232,7 @@ func TestGitalyServerFactory_closeOrder(t *testing.T) { backchannel.NewRegistry(), cache.New(cfg, config.NewLocator(cfg), testhelper.SharedLogger(t)), nil, + nil, TransactionMiddleware{}, ) defer sf.Stop() diff --git a/internal/testhelper/testserver/gitaly.go b/internal/testhelper/testserver/gitaly.go index 98e8cd126e..dffbc22ed7 100644 --- a/internal/testhelper/testserver/gitaly.go +++ b/internal/testhelper/testserver/gitaly.go @@ -216,6 +216,7 @@ func runGitaly(tb testing.TB, cfg config.Cfg, registrar func(srv *grpc.Server, d deps.GetBackchannelRegistry(), deps.GetDiskCache(), []*limithandler.LimiterMiddleware{deps.GetLimitHandler()}, + nil, txMiddleware, ) -- GitLab